如何检测dynamodb2中的未写入项?

2024-09-30 02:28:50 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在将代码从dynamodb迁移到dynamodb2。代码执行批处理写入,我遇到的一个主要问题是检测未处理的消息。我的代码不断地从队列接收消息,然后成批地将它们插入DynamoDB表中。似乎有很大一部分(约20%)的条目从未进入表中,而我却没有收到任何关于它的错误消息。所以我的问题是,当项目没有插入时,如何捕捉,以及如何重新处理它们?下面是我的dynamodb代码块,它可以做到这一点:

def do_batch_write(items,conn,table,diagn):
    batch_list = conn.new_batch_write_list()
    batch_list.add_batch(table, puts=items)
    iTry = 0
    rems = []
    while True:
        iTry = iTry + 1
        try:
            response = conn.batch_write_item(batch_list)
        except Exception, e:
            tRetry = 5
            log.error("Error while attempting batch_write_item, try %d, retrying after %d secs: %s" % (iTry, tRetry, str(e)))
            time.sleep(tRetry)
            continue

        unprocessed = response.get('UnprocessedItems', None)
        if not unprocessed:
            if len(items) == 1 and diagn:
                log.info("Trivial batch processed")
            break
        batch_list = conn.new_batch_write_list()
        unprocessed_list = unprocessed[table.name]
        items = []
        for u in unprocessed_list:
            item_attr = u['PutRequest']['Item']
            item = table.new_item( attrs=item_attr)
            items.append(item)
        rems.append(len(items))
        batch_list.add_batch(table, puts=items)

    return iTry

这里有一个dynamodb2代码块,我正试图修改它来处理未处理/放弃的项。在

^{pr2}$

我看了看this page,但没有找到帮助。你能帮我弄清楚怎么修改吗?泰铢

更新:我仍然有缺少项目的问题。我修改了上述区块如下:

i = 0
with table.batch_write() as batch:
    while True:
         m = inq.read()
         i = i + 1
         mStr = json.dumps(m)
         pid = m['primaryId']
         sid = m['secondaryId']
         item_data = {"primaryId" : pid, "secondaryId"] : sid, "message"] : mStr}
         batch.put_item(data=item_data)

         if i == 25:
             batch.resend_unprocessed()
             i = 0

然而,我在仔细记录所有传入数据后发现(为了节省空间,上面的代码片段中没有包含log print语句),至少在一种情况下,我看到了以下情况:

  1. put_item向批处理中添加一组连续摄取的大约20条消息
  2. 调用resend_unprocessed()时,它报告0个未处理的项目
  3. 当我试图从DDB表中检索20条消息中的任何一条时,都找不到它们

因此,当boto成功地将项写入表中时,我似乎不能真正信任它。看起来像个bug,或者这是dynamodb2的某种“特性”?在

有一件事我之前忘了提:我有几个相同的“worker”进程在同一个awsec2实例上并行运行,从同一个输入队列读取数据并写入同一个Dynamo表。我创建了其中几个,以跟上传入数据量。我的印象是,他们不应该为访问表而斗争,即使他们之间存在某种冲突,也必须在“幕后”解决。即使这会导致某些项目以某种方式被删除,它也不应该在resend_unprocessed()中报告所有事情都已成功处理。在


Tags: 项目代码消息newbatchtableitemsconn
2条回答

看来这是可能的。在

批处理写入可能无法写入“所有”项。在本例中,API成功,但未写入的项在响应中被指示为“UnprocessedItems”。您需要对此进行调查,然后重试这些项目。在

发生这种情况的典型原因是表吞吐量超过了(可能还有其他原因)。在

添加相关代码片段(感谢以下要点):

while True:
    response = dynamodb_conn.batch_write_item(batch_list)
    unprocessed = response.get('UnprocessedItems', None)
    if not unprocessed:
        break
    batch_list = dynamodb_conn.new_batch_write_list()
    unprocessed_list = unprocessed[table_name]
    items = []
    for u in unprocessed_list:
        item_attr = u['PutRequest']['Item']
        item = dynamodb_table.new_item(
                attrs=item_attr
        )
        items.append(item)
    batch_list.add_batch(dynamodb_table, puts=items)

这些额外的阅读资料将告诉你细节——最后一个也是python代码。在

  1. BatchWriteItem - Amazon DynamoDB
  2. The correct way of using DynamoDB BatchWriteItem with boto
  3. Python的要点:https://gist.github.com/griggheo/2698152

以上答案与发电机B2无关。在

我使用的是resend_unprocessed(),它实际上起作用了。在

boto日志:

2015-11-03 08:45:13,427 INFO: table.resend_unprocessed (1491): 1424-MainThread: Re-sending 11 unprocessed items.
2015-11-03 08:45:13,427 INFO: table.resend_unprocessed (1502): 1424-MainThread: Sending 11 items
2015-11-03 08:45:13,428 DEBUG: connection._mexe (910): 1424-MainThread: Method: POST
2015-11-03 08:45:13,428 DEBUG: connection._mexe (911): 1424-MainThread: Path: /
2015-11-03 08:45:13,428 DEBUG: connection._mexe (912): 1424-MainThread: Data: {"RequestItems": {"user_feed": [{"PutRequest": XXXXXXXXXXXXX }
2015-11-03 08:45:13,429 DEBUG: connection._mexe (913): 1424-MainThread: Headers: {'Host': 'dynamodb.us-east-1.amazonaws.com', 'Content-Type': 'application/x-amz-json-1.0', 'Content-Length': '1540', 'X-Amz-Target': 'DynamoDB_20120810.BatchWriteItem'}

2015-11-03 08:45:13,453 DEBUG: layer1._retry_handler (2746): 1424-MainThread: Saw HTTP status: 200
2015-11-03 08:45:13,453 DEBUG: layer1._retry_handler (2778): 1424-MainThread: Validating crc32 checksum for body: {"UnprocessedItems":{}}
2015-11-03 08:45:13,453 DEBUG: layer1.make_request (2733): 1424-MainThread: {"UnprocessedItems":{}}
2015-11-03 08:45:13,453 INFO: table.resend_unprocessed (1506): 1424-MainThread: 0 unprocessed items left

但正如在http://boto.readthedocs.org/en/latest/ref/dynamodb2.html所写

“由表用作批处理写入的上下文管理器。在

您可能不想直接使用此对象。”

不应使用未处理的重发。上下文管理器在处理您的请求时执行此操作。(看看你的博图日志)

比如:

^{pr2}$

相关问题 更多 >

    热门问题