我想以二进制(bytea)形式将Numpy数组存储在PostgreSQL数据库中。我可以在测试#1(见下文)中正常工作,但我不想在插入之前和之后选择之后操作数据数组 - 我想使用psycopg2的适配器和转换器。
这就是我现在所拥有的:
import numpy as np
import psycopg2, psycopg2.extras
def my_adapter(spectrum):
return psycopg2.Binary(spectrum)
def my_converter(my_buffer, cursor):
return np.frombuffer(my_buffer)
class MyBinaryTest():
# Connection info
user = 'postgres'
password = 'XXXXXXXXXX'
host = 'localhost'
database = 'test_binary'
def __init__(self):
pass
def set_up(self):
# Set up
connection = psycopg2.connect(host=self.host, user=self.user, password=self.password)
connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cursor = connection.cursor()
try: # Clear out any old test database
cursor.execute('drop database %s' % (self.database, ))
except:
pass
cursor.execute('create database %s' % (self.database, ))
cursor.close()
connection.close()
# Direct connectly to the database and set up our table
self.connection = psycopg2.connect(host=self.host, user=self.user, password=self.password, database=self.database)
self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.DictCursor)
self.cursor.execute('''CREATE TABLE spectrum (
"sid" integer not null primary key,
"data" bytea not null
);
CREATE SEQUENCE spectrum_id;
ALTER TABLE spectrum
ALTER COLUMN sid
SET DEFAULT NEXTVAL('spectrum_id');
''')
self.connection.commit()
def perform_test_one(self):
# Lets do a test
shape = (2, 100)
data = np.random.random(shape)
# Binary up the data
send_data = psycopg2.Binary(data)
self.cursor.execute('insert into spectrum (data) values (%s) returning sid;', [send_data])
self.connection.commit()
# Retrieve the data we just inserted
query = self.cursor.execute('select * from spectrum')
result = self.cursor.fetchall()
print "Type of data retrieved:", type(result[0]['data'])
# Convert it back to a numpy array of the same shape
retrieved_data = np.frombuffer(result[0]['data']).reshape(*shape)
# Ensure there was no problem
assert np.all(retrieved_data == data)
print "Everything went swimmingly in test one!"
return True
def perform_test_two(self):
if not self.use_adapters: return False
# Lets do a test
shape = (2, 100)
data = np.random.random(shape)
# No changes made to the data, as the adapter should take care of it (and it does)
self.cursor.execute('insert into spectrum (data) values (%s) returning sid;', [data])
self.connection.commit()
# Retrieve the data we just inserted
query = self.cursor.execute('select * from spectrum')
result = self.cursor.fetchall()
# No need to change the type of data, as the converter should take care of it
# (But, we never make it here)
retrieved_data = result[0]['data']
# Ensure there was no problem
assert np.all(retrieved_data == data.flatten())
print "Everything went swimmingly in test two!"
return True
def setup_adapters_and_converters(self):
# Set up test adapters
psycopg2.extensions.register_adapter(np.ndarray, my_adapter)
# Register our converter
self.cursor.execute("select null::bytea;")
my_oid = self.cursor.description[0][1]
obj = psycopg2.extensions.new_type((my_oid, ), "numpy_array", my_converter)
psycopg2.extensions.register_type(obj, self.connection)
self.connection.commit()
self.use_adapters = True
def tear_down(self):
# Tear down
self.cursor.close()
self.connection.close()
connection = psycopg2.connect(host=self.host, user=self.user, password=self.password)
connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cursor = connection.cursor()
cursor.execute('drop database %s' % (self.database, ))
cursor.close()
connection.close()
test = MyBinaryTest()
test.set_up()
test.perform_test_one()
test.setup_adapters_and_converters()
test.perform_test_two()
test.tear_down()
现在,测试#1工作正常。当我使用我在测试1中使用的代码并设置psycopg2适配器和转换器时,它不起作用(测试2)。这是因为馈送到转换器的数据实际上不再是缓冲器;它是PosgreSQL的bytea字符串表示。输出如下:
In [1]: run -i test_binary.py
Type of data retrieved: type 'buffer'>
Everything went swimmingly in test one!
ERROR: An unexpected error occurred while tokenizing input
The following traceback may be corrupted or invalid
The error message is: ('EOF in multi-line statement', (273, 0))
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
/Users/andycasey/thesis/scope/scope/test_binary.py in <module>()
155 test.perform_test_one()
156 test.setup_adapters_and_converters()
--> 157 test.perform_test_two()
158 test.tear_down()
159
/Users/andycasey/thesis/scope/scope/test_binary.py in perform_test_two(self)
101 # Retrieve the data we just inserted
102 query = self.cursor.execute('select * from spectrum')
--> 103 result = self.cursor.fetchall()
104
105 # No need to change the type of data, as the converter should take care of it
/Library/Python/2.6/site-packages/psycopg2/extras.pyc in fetchall(self)
81 def fetchall(self):
82 if self._prefetch:
---> 83 res = _cursor.fetchall(self)
84 if self._query_executed:
85 self._build_index()
/Users/andycasey/thesis/scope/scope/test_binary.py in my_converter(my_buffer, cursor)
7
8 def my_converter(my_buffer, cursor):
----> 9 return np.frombuffer(my_buffer)
10
11
ValueError: buffer size must be a multiple of element size
WARNING: Failure executing file: <test_binary.py>
In [2]: %debug
> /Users/andycasey/thesis/scope/scope/test_binary.py(9)my_converter()
8 def my_converter(my_buffer, cursor):
----> 9 return np.frombuffer(my_buffer)
10
ipdb> my_buffer
'\\x40e67378b9b8ae3f78b15ebecf20ef3f4092f00289dc803f20a843f40b9ddd3f64b6ec99bf62e83f8cea6eb60758d43f2ba47d8e6d5be73f4e88f267bbb2d83ffacc8aad2220d43fc6006b9c7eb7d33ff440cccc638de33f70e0b4b906a1e13fe0eca2af2f87c83f98d31f41e081ee3f1e6f5b8a52fdea3f80fcbd0ec3a0a93f95316c9e462eed3f83fe6d8d2463ea3fb44849fa8404d33f701be5924049df3f6ef3ca0c50f6d63f0c7b7d800cfdda3fc000e89b890c983fb32cf3e4ba1dea3f87f17f7efc06e33f2e194b361190ed3f60e955f0456d933ff24dd5aabc7eeb3f7802405af74ddc3f9ce9c3852db8e03fa0c936267c19d33f3406c35637f9ec3f288d23502e70ee3f08fe67e7ed8ec53f00f5cde29763dc3f26bcb4d362c4e23fa9e01fac6cd8e33fbec912f5ff7ae13f7fbd61e2e585ed3fa0070671e970e83f68ef1f6e0b90da3fce9ce834bfa6d43fa02b825d144e903f42912641e5aedd3f645a299de883db3fd8b5126bb8f6c23f3c5d4ae40ecccd3f5ae503835d00e13fcc784bdb7ea9c43f880ebfb30719be3f1dffcb042f58e23f44cc727ab3dfc53f1bbe477eb861e43f3c4f55f6aea5e53fdc80f6fa91d6e33f12b580ef03acd03f1cb78f8dccaac13f9ebdbd206453d43f32ffc626fe4ddc3f625ff4e2b317d33f44822e2f0d52ca3f38fea7c36ba6cb3ff0290b4707cedc3fd456190f786bcd3f7ed46219b47eda3f66fbdef755c3df3f40ccd47f88978c3f382897872cf5b73f5d24a66af5d7e13f2dd179d56ea3ee3fc4bb5b0962bcd63f20024c1c55ddb63f68a02e5f73fbd13f21eeb68b333de63f1a19dfe1b713e53f7556fedbb698e53f44eb6e9228accf3fe61a509c1d4ae43fe0fb0624828fa83f1822e55e76cdd23f801708ab685dd93f06076be2e92bed3f5ac2ff90247fed3fd42902b6b974d13f9df97b70385ce83fdabc4af1e81fe83f250611249338e73fc0251f9c9739e93f5821b6024621d63f7a7e1fc15605e73fab085fa8bb67e83fb4eb1d087ef5dd3fd1b450d406cbe13f0078ed1c422d3e3f44ed12d19085e83f117d628438daea3f15c776903519e23f747f248fa2e0c83ffcd052e9c4edc93f177a255a0a91e93fbe3b9b894d8edf3fea9fb6dd8be4e23fdc879e88e094e83f18bd28327ae3c03fc1bfd06d0379ec3fe8d7ee7e066ee03f750c4e0f4802e33fca3e4d0e34d3da3fe0578becde30c43f6044d9ad900ed23f08a2562899a3d13f5a83cf6694f3e33f001c61debd5f513fa009953fde2c9a3f29d53b02ca65e53fda066b4421a8ea3f58f074484a08cc3fe239b4b7eb57e03f1f904fe586bde43f9ce6edd599d1d13f43878f622d7ee23fd3ebab4e7904e93f7c3437ad0e16d23fac5e5e9e08a9c83f2b7b2d56db34e73f74f8cd68effeed3f4c279a9d4210c53ffafad9b31886d33f4c3eb4acc9b0dc3f6ed2f82f486edc3fc349273cbe1fec3fe2f70e89b061d83facaa25cb8fdbcd3fb0659c127fb7e83f00a224076b6da43f9ab1eb331dfade3fc86e03757e3bec3f3d00c8545ccce93f90fac6a4cc21b93f08f57560a68bc63fd8cccbabcd13b03fc679c7f9ece6df3f4a8c78aa1a1aed3ffecac18174dbe43fdfe102cffb48e93f0078f7fa27cc463fb40acdaea46ee63f54f754df4daadf3f2a9e063d0ab3da3f82a21b50d3c6d33f1182e48aafb5ed3fb67f3de3b109d63f494258c18422e13f8a5542fc1491e63f43247cbeabece13feb9355572f68eb3f3cf415eee8f1d53f887df6aab75bb43f0042cd907780523ff5e724cad881e03fdb9de04e99ffe43fd6594feb9b75ec3f6d4e6fcf7690e13fabe634f015dee13f584563d26021c93f6f1916ee57c8e13fd8906bad6fa7cd3ff8fad5b03b02eb3f1b3b87c15f16e53f4014ec100f79c73f1aee1302d960d83f45be6b695ed9e13ffc86d1d311dbdb3f089e89e6389fb93f24d742e400cbd63fa048c53d8fbf9c3f6eb1db094d81ed3f8bbf0cba79fde63f70e8f3d63c43c33ff1c5e6fed947e43f64f3a21f062ee03f0d12c4282794e03fa0a3be998572ba3f16510b776d7aeb3fb8c7ca308d2acd3f6f37eb1eb330ef3f1ba1bdb6577fe73f78d805294a05b43f0ed0bea2f180db3f5a4cce890b57ea3f2472556ba6f1e43f1a79fcc20701e53fe2ae8a1ea5f7d73fe0bd1efc12caec3ff94b1e02a75bed3f78e098184e3fea3f46ff0b2344dedb3f1cdc0f7b72efdb3f6ceb0b772b37e43f47e49b2a7088ea3f'
有谁知道我怎么能(a)反序列化在my_converter中返回给我的字符串表示,所以我每次都返回一个Numpy数组,或者(b)强制PostgreSQL / psycopg2将缓冲区表示发送到转换器(我可以使用)而不是字符串表示?
谢谢!
我在OS X 10.6.8上使用Python 2.6.1(r261:67515),PostgreSQL 9.0.3和psycopg2 2.4(dt dec pq3 ext)
您在调试器中看到的格式很容易解析:它是PostgreSQL十六进制二进制格式(http://www.postgresql.org/docs/9.1/static/datatype-binary.html)。 psycopg可以解析该格式并返回包含数据的缓冲区;您可以使用该缓冲区来获取数组。写一个调用原始函数并对其结果进行后处理,而不是从头编写一个类型编写器。抱歉,我现在不记得它的名字了,我是用手机写的:你可以从邮件列表中获得进一步的帮助。
编辑:完整解决方案。
默认的bytea类型转换器(可以解析postgres二进制表示并从中返回缓冲区对象的对象)是psycopg2.BINARY。我们可以用它来创建一个转换为数组的类型转换器:
In [1]: import psycopg2
In [2]: import numpy as np
In [3]: a = np.eye(3)
In [4]: a
Out[4]:
array([[ 1., 0., 0.],
[ 0., 1., 0.],
[ 0., 0., 1.]])
In [5]: cnn = psycopg2.connect('')
# The adapter: converts from python to postgres
# note: this only works on numpy version whose arrays
# support the buffer protocol,
# e.g. it works on 1.5.1 but not on 1.0.4 on my tests.
In [12]: def adapt_array(a):
....: return psycopg2.Binary(a)
....:
In [13]: psycopg2.extensions.register_adapter(np.ndarray, adapt_array)
# The typecaster: from postgres to python
In [21]: def typecast_array(data, cur):
....: if data is None: return None
....: buf = psycopg2.BINARY(data, cur)
....: return np.frombuffer(buf)
....:
In [24]: ARRAY = psycopg2.extensions.new_type(psycopg2.BINARY.values,
'ARRAY', typecast_array)
In [25]: psycopg2.extensions.register_type(ARRAY)
# Now it works "as expected"
In [26]: cur = cnn.cursor()
In [27]: cur.execute("select %s", (a,))
In [28]: cur.fetchone()[0]
Out[28]: array([ 1., 0., 0., 0., 1., 0., 0., 0., 1.])
如您所知,np.frombuffer(a)会丢失数组形状,因此您必须找到保留它的方法。
对于numpy数组的情况,可以避免缓冲策略的所有缺点,如形状和数据类型的损失。在关于在sqlite3中存储numpy数组的stackoverflow question之后,可以很容易地调整postgres的方法。
import os
import psycopg2 as psql
import numpy as np
# converts from python to postgres
def _adapt_array(text):
out = io.BytesIO()
np.save(out, text)
out.seek(0)
return psql.Binary(out.read())
# converts from postgres to python
def _typecast_array(value, cur):
if value is None:
return None
data = psql.BINARY(value, cur)
bdata = io.BytesIO(data)
bdata.seek(0)
return np.load(bdata)
con = psql.connect('')
psql.extensions.register_adapter(np.ndarray, _adapt_array)
t_array = sql.extensions.new_type(sql.BINARY.values, "numpy", _typecast_array)
psql.extensions.register_type(t_array)
cur = con.cursor()
现在可以创建并填充表格(使用a
定义为上一篇文章)
cur.execute("create table test (column BYTEA)")
cur.execute("insert into test values(%s)", (a,))
并恢复numpy对象
cur.execute("select * from test")
cur.fetchone()[0]
结果:
array([[ 1., 0., 0.],
[ 0., 1., 0.],
[ 0., 0., 1.]])
我尝试了这两个答案,但是在修改Daniel的代码以使用np.savetxt
和np.loadtxt
并更改了他的类型之前我无法使它们工作
bdata = BytesIO(data[1:-1])
所以这两个功能现在看起来像
def _adapt_array(arr):
out = BytesIO()
np.savetxt(out, arr, fmt='%.2f')
out.seek(0)
return pg2.Binary(out.read())
def _typecast_array(value, cur):
if value is None:
return None
data = pg2.BINARY(value, cur)
bdata = BytesIO(data[1:-1])
bdata.seek(0)
return np.loadtxt(bdata)
pg2.extensions.register_adapter(np.ndarray, _adapt_array)
t_array = pg2.extensions.new_type(pg2.BINARY.values, 'numpy', _typecast_array)
pg2.extensions.register_type(t_array)
我得到的错误是could not convert string to float: '[473.07'
。我怀疑这个修复只适用于平面阵列,但这就是我的数据结构,所以它对我有用。