如何检测DynamoDB2中未写入的项目?

3 投票
2 回答
1952 浏览
提问于 2025-04-17 20:57

我正在把我的代码从 dynamodb 迁移到 dynamodb2。这段代码是用来批量写入数据的,我现在遇到的一个主要问题是如何检测未处理的消息。我这段代码不断地从一个队列中获取消息,然后将它们批量插入到 DynamoDB 表中。结果发现,有一大部分的项目(大约20%)根本没有被写入表中,而我并没有收到任何错误提示。所以我想知道,怎么才能知道哪些项目没有被插入,以及如何重新处理它们?以下是我 dynamodb 代码的一部分:

def do_batch_write(items,conn,table,diagn):
    batch_list = conn.new_batch_write_list()
    batch_list.add_batch(table, puts=items)
    iTry = 0
    rems = []
    while True:
        iTry = iTry + 1
        try:
            response = conn.batch_write_item(batch_list)
        except Exception, e:
            tRetry = 5
            log.error("Error while attempting batch_write_item, try %d, retrying after %d secs: %s" % (iTry, tRetry, str(e)))
            time.sleep(tRetry)
            continue

        unprocessed = response.get('UnprocessedItems', None)
        if not unprocessed:
            if len(items) == 1 and diagn:
                log.info("Trivial batch processed")
            break
        batch_list = conn.new_batch_write_list()
        unprocessed_list = unprocessed[table.name]
        items = []
        for u in unprocessed_list:
            item_attr = u['PutRequest']['Item']
            item = table.new_item( attrs=item_attr)
            items.append(item)
        rems.append(len(items))
        batch_list.add_batch(table, puts=items)

    return iTry

这是我正在尝试修改的 dynamodb2 代码,目的是处理未处理或被放弃的项目。

with table.batch_write() as batch:
    while True:
         m = inq.read()
         mStr = json.dumps(m)
         pid = m['primaryId']
         sid = m['secondaryId']
         item_data = {"primaryId" : pid, "secondaryId"] : sid, "message"] : mStr}
         batch.put_item(data=item_data)

我查看了这个页面,但没有找到帮助。你能帮我想想怎么修改吗?谢谢!

更新:我仍然遇到缺失项目的问题。我对上面的代码块进行了如下修改:

i = 0
with table.batch_write() as batch:
    while True:
         m = inq.read()
         i = i + 1
         mStr = json.dumps(m)
         pid = m['primaryId']
         sid = m['secondaryId']
         item_data = {"primaryId" : pid, "secondaryId"] : sid, "message"] : mStr}
         batch.put_item(data=item_data)

         if i == 25:
             batch.resend_unprocessed()
             i = 0

不过,在仔细记录所有传入数据后(为了节省空间,上面的代码片段没有包含日志打印语句),我发现,至少在一个情况下,出现了以下情况:

  1. 大约20条连续获取的消息通过 put_item 被添加到批处理中
  2. 当调用 resend_unprocessed() 时,它报告没有未处理的项目
  3. 当我尝试从 DDB 表中检索这20条消息时,根本找不到它们

所以看起来我不能完全相信 boto 说它成功地将项目写入了表中。这看起来像是一个bug,还是说这是 dynamodb2 的某种“特性”?

还有一件我之前忘记提的事:我在同一个 AWS EC2 实例上并行运行了几个相同的“工作者”进程,它们都在读取同一个输入队列并写入同一个 Dynamo 表。我创建了多个进程是为了应对大量的传入数据。我原以为它们不应该争抢对表的访问,即使它们之间有某种冲突,也应该在 dynamodb 的内部解决。即使这导致某些项目被丢弃,也不应该在 resend_unprocessed() 中报告说所有项目都已成功处理。

2 个回答

1

上面的回答和dynamodb2没有关系。

我在使用resend_unprocessed(),而且它确实有效。

boto的日志:

2015-11-03 08:45:13,427 INFO: table.resend_unprocessed (1491): 1424-MainThread: Re-sending 11 unprocessed items.
2015-11-03 08:45:13,427 INFO: table.resend_unprocessed (1502): 1424-MainThread: Sending 11 items
2015-11-03 08:45:13,428 DEBUG: connection._mexe (910): 1424-MainThread: Method: POST
2015-11-03 08:45:13,428 DEBUG: connection._mexe (911): 1424-MainThread: Path: /
2015-11-03 08:45:13,428 DEBUG: connection._mexe (912): 1424-MainThread: Data: {"RequestItems": {"user_feed": [{"PutRequest": XXXXXXXXXXXXX }
2015-11-03 08:45:13,429 DEBUG: connection._mexe (913): 1424-MainThread: Headers: {'Host': 'dynamodb.us-east-1.amazonaws.com', 'Content-Type': 'application/x-amz-json-1.0', 'Content-Length': '1540', 'X-Amz-Target': 'DynamoDB_20120810.BatchWriteItem'}

2015-11-03 08:45:13,453 DEBUG: layer1._retry_handler (2746): 1424-MainThread: Saw HTTP status: 200
2015-11-03 08:45:13,453 DEBUG: layer1._retry_handler (2778): 1424-MainThread: Validating crc32 checksum for body: {"UnprocessedItems":{}}
2015-11-03 08:45:13,453 DEBUG: layer1.make_request (2733): 1424-MainThread: {"UnprocessedItems":{}}
2015-11-03 08:45:13,453 INFO: table.resend_unprocessed (1506): 1424-MainThread: 0 unprocessed items left

但是在http://boto.readthedocs.org/en/latest/ref/dynamodb2.html上写着:

“被表作为批量写入的上下文管理器使用。”

你可能不想直接使用这个对象。

你不应该使用resend_unprocessed。上下文管理器会在处理你的请求时自动处理这个问题。(看看你的boto日志)

像这样:

2015-11-03 08:43:43,551 INFO: table.handle_unprocessed (1483): 1424-MainThread: 23 items were unprocessed. Storing for later.

2015-11-03 08:43:43,551 INFO: table.resend_unprocessed (1506): 1424-MainThread: 51 unprocessed items left
2

看起来这是可能的。

批量写入可能无法写入“所有”项目。在这种情况下,API会成功,但未写入的项目会在响应中标记为“未处理项目”。你需要查看这些项目,并再次尝试写入它们。

发生这种情况的典型原因是你的表的吞吐量超过了限制(当然也可能有其他原因)。

这里附上相关的代码片段(感谢下面的链接):

while True:
    response = dynamodb_conn.batch_write_item(batch_list)
    unprocessed = response.get('UnprocessedItems', None)
    if not unprocessed:
        break
    batch_list = dynamodb_conn.new_batch_write_list()
    unprocessed_list = unprocessed[table_name]
    items = []
    for u in unprocessed_list:
        item_attr = u['PutRequest']['Item']
        item = dynamodb_table.new_item(
                attrs=item_attr
        )
        items.append(item)
    batch_list.add_batch(dynamodb_table, puts=items)

这些额外的阅读材料会告诉你更多细节——最后一个是Python代码。

  1. BatchWriteItem - 亚马逊DynamoDB
  2. 正确使用DynamoDB BatchWriteItem的方法
  3. 这个Python代码片段:https://gist.github.com/griggheo/2698152

撰写回答