以下代码将项目批量写入Dynamodb:
with table.batch_writer() as batch:
for item in chunk:
dynamodb_item = {
'itemId': item['key'],
'time': item['time'],
'value': item['value']
}
batch.put_item( Item = dynamodb_item )
如以下文档所述,如果批量调用失败,它将返回未处理的项目: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.batch_write_item
在boto3中,如何获取响应中未处理的项目?
如何判断是否全部处理成功或者呼叫响应是否有未处理的项目?
使用您提到的
batch_write_item
,而不是 put_item
检查这个示例:
response = await client.batch_write_item(
RequestItems=request_items
)
if len(response['UnprocessedItems']) == 0:
print('Wrote 25 items to dynamo')
else:
await asyncio.sleep(5)
unprocessed_items = response['UnprocessedItems']
# proceed with unprocessed_items
我自己在寻找类似信息时发现了这个问题,我想我会为未来的人带来一些额外的信息。
没有像batch_write_item这样的未处理项目的响应。
但是,它在其文档中声称“此外,批处理编写器还将自动处理任何未处理的项目并根据需要重新发送它们。”
我在 github 上查看了 batch_writer 在 boto3 中的源代码,并且在幕后,batch_writer 实际上从对 batch_write_item 调用的响应中获取 UnprocessedItems,并将它们添加回自己的缓冲区以进行重试。源代码的相关部分:
def _flush(self):
items_to_send = self._items_buffer[: self._flush_amount]
self._items_buffer = self._items_buffer[self._flush_amount :]
response = self._client.batch_write_item(
RequestItems={self._table_name: items_to_send}
)
unprocessed_items = response['UnprocessedItems']
if not unprocessed_items:
unprocessed_items = {}
item_list = unprocessed_items.get(self._table_name, [])
# Any unprocessed_items are immediately added to the
# next batch we send.
self._items_buffer.extend(item_list)