0%

Python - 如何使用PG的COPY命令将SQL结果直接输出到客户端文件

0. 背景

psql中可以执行copy命令将数据导出到文件,有两种变体

1
2
COPY (SELECT id FROM table_name LIMIT 1) TO '/path/to/file.csv' WITH CSV DELIMITER ',';
\COPY (SELECT id FROM table_name LIMIT 1) TO '/path/to/file.csv' WITH CSV DELIMITER ',';

COPY\COPY的区别是前者输出的文件在服务端(且需要有服务端的管理员权限,如果是使用的云数据库则不可用),后者输出到客户端。
但如果使用COPY TO STDOUT的形式则可以将数据回传到客户端,可以用于文件写入,如

1
COPY (SELECT id FROM table_name LIMIT 1) TO STDOUT WITH CSV DELIMITER ',';

1. 踩坑探索

python中使用psycopg2库连接PG直接执行COPY或者\COPY语句都会报错

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import psycopg2

conn = psycopg2.connect("dbname=dbname user=user password=password")
cursor = conn.cursor()

# COPY写法
sql = "COPY (SELECT id FROM table_name LIMIT 1) TO '/path/to/file.csv' WITH CSV DELIMITER ','" # 文件会输出到数据库服务器上
sql = "COPY (SELECT id FROM table_name LIMIT 1) TO STDOUT WITH CSV DELIMITER ','" # STDOUT回传数据到客户端

# \COPY写法
sql = "\COPY (SELECT id FROM table_name LIMIT 1) TO '/path/to/file.csv' WITH CSV DELIMITER ','"

cursor.execute(sql)
cursor.close()

报错信息分别如下:

1
ProgrammingError: can't execute COPY TO: use the copy_to() method instead
1
2
3
SyntaxError: syntax error at or near "\"
LINE 1: \COPY (SELECT id FROM table_name LIMIT 1...
^

2. 查看psycopg2文档

发现copy命令被封装成了copy_expert函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def copy_expert(self, sql, file, size=8192): # real signature unknown; restored from __doc__
"""
copy_expert(sql, file, size=8192) -- Submit a user-composed COPY statement.
`file` must be an open, readable file for COPY FROM or an open, writable
file for COPY TO. The optional `size` argument, when specified for a COPY
FROM statement, will be passed to file's read method to control the read
buffer size.
"""
pass

def copy_from(self, file, table, sep=None, null=None, size=8192, columns=None): # real signature unknown; restored from __doc__
""" copy_from(file, table, sep='\t', null='\\N', size=8192, columns=None) -- Copy table from file. """
pass

def copy_to(self, file, table, sep=None, null=None, columns=None): # real signature unknown; restored from __doc__
""" copy_to(file, table, sep='\t', null='\\N', columns=None) -- Copy table to file. """
pass

def execute(self, query, vars=None): # real signature unknown; restored from __doc__
""" execute(query, vars=None) -- Execute query with bound vars. """
pass

3. 使用psycopg2实现

可以使用copy_expert函数将sql结果输出到客户端文件,代码如下

1
2
3
4
5
6
7
8
9
10
import psycopg2

conn = psycopg2.connect("dbname=dbname user=user password=password")
cursor = conn.cursor()

sql = """COPY (SELECT id FROM table_name LIMIT 1) TO STDOUT WITH CSV DELIMITER ','"""
with open('/path/to/file.csv', 'w') as f:
cursor.copy_expert(sql, f)

cursor.close()

4. 引申:在Odoo中实现

Odoo(原OpenERP)中使用cr执行上述命令
查看Odoo代码发现Odoocr.execute是将psycopg2库的execute封装了一层,但是使用_obj依然可以引用到原生的cusor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class ConnectionPool(object):
...

def borrow(self, dsn):
self._debug('Borrow connection to %r', dsn)
try:
result = psycopg2.connect(dsn=dsn, connection_factory=PsycoConnection, connect_timeout=4)
except psycopg2.Error:
_logger.exception('Connection to the database failed')
raise
self._debug('Create new connection')
return result

class Cursor(object):
...
# lazy load connection, cursor
@property
def _cnx(self):
if self._cnx_cache is None:
self._cnx_cache = self._pool.borrow(dsn(self.dbname))
return self._cnx_cache

@property
def _obj(self):
if self._obj_cache is None:
self._obj_cache = self._cnx.cursor(cursor_factory=DictCursor)
return self._obj_cache

def execute(self, query, params=None, log_exceptions=None):
...
try:
...
res = self._obj.execute(query, params)
except psycopg2.ProgrammingError as pe:
...
...

所以在Odoo中使用COPY命令导出的代码为:

1
2
3
sql = """COPY (SELECT id FROM table_name LIMIT 1) TO STDOUT WITH CSV DELIMITER ','"""
with open('/path/to/file.csv', 'w') as f:
cr._obj.copy_expert(sql, f)