googleappengine:如何使用任务队列进行此处理?

2024-09-29 22:00:05 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用的是pythongaesdk。在

我需要对MyKind的6000多个实例进行一些处理。它太慢了,不能在一个请求中完成,所以我使用任务队列。如果我使一个任务进程只有一个实体,那么它应该只需要几秒钟。在

documentation表示一个“批处理”中只能添加100个任务。(这是什么意思?在一个请求中?在一个任务中?)在

所以,假设“批处理”意味着“请求”,我试图找出最好的方法是为数据存储中的每个实体创建一个任务。你怎么认为?在

如果我可以假设MyKind的顺序永远不会改变,那就更容易了。(处理过程实际上不会改变MyKind实例-它只会创建其他类型的新实例。)我可以创建一组任务,给每个任务一个起始位置的偏移量,间隔不到100。然后,每个任务可以创建执行实际处理的单个任务。在

但是,如果实体太多,原始请求无法添加所有必要的调度任务,该怎么办?这使我认为我需要一个递归的解决方案-每个任务都要查看它给定的范围。如果范围中只存在一个元素,则对其进行处理。否则,它将范围进一步细分为后续任务。在

如果我不能指望使用偏移量和限制来识别实体(因为它们的顺序不能保证是恒定的),也许我可以使用它们的键?但是我可以把1000把钥匙到处送,这看起来很不方便。在

我是走在正确的道路上,还是有其他的设计我应该考虑?在


Tags: 数据实例方法实体类型间隔队列进程
2条回答

另外,根据您的设计,您可以做我做的,即编号所有需要处理的记录。我处理大约3500个项目,每个项目需要3秒左右的时间来处理。为了避免重叠、超时并考虑将来的扩展,我的第一个任务是从数据库中获取所有此类唯一项的列表。然后,它将它分成500个每个项目标识符的列表,循环直到它占到我数据库中所有唯一的项目,并将500个标识符的每个块发布到第二层处理程序任务。第二个处理程序任务(当前是七个或八个不同的任务)都有一个包含500个项目的唯一列表,每个处理程序任务添加500个任务,每个任务对应一个唯一标识符。在

因为它都是通过循环来管理的,并且根据数据库中唯一项的数量进行计数,所以我可以添加任意数量的唯一项,并且任务的数量将扩展到完全没有重复的情况下容纳它们。我每天都用它来跟踪游戏中的价格,所以它都是用一个cron作业来启动的,根本不需要我的干预。在

当您运行类似taskqueue.add(url='/worker', params={'cursor': cursor})的代码时,您正在将一个任务排入队列;使用您提供的参数调度请求在带外执行。显然,你可以在一次手术中安排多达100次。在

不过,我觉得你不想。任务链接将使这一点变得简单得多:

您的辅助任务将执行以下操作:

  • 运行查询以获取一些记录以进行处理。如果任务参数中提供了游标,请使用它。将查询限制为10条记录,或者您认为可以在30秒内完成的任何记录。

  • 流程

  • 如果您的查询返回10条记录,请将另一个任务排队,并将查询中更新的游标传递给它,这样它就可以从您停止的地方获取信息。

  • 如果你的记录少于10张,你就完了。万岁!发一封电子邮件什么的然后辞职。

有了这个路线,你只需要启动第一个任务,其余的任务就可以自己添加了。在

请注意,如果一个任务失败,appengine将重试该任务,直到任务成功为止,因此您不必担心数据存储中断导致一个任务超时并中断链。在

编辑:

上述步骤并不保证一个实体只被处理一次。任务通常只运行一次,但Google建议您为幂等性进行设计。如果这是一个主要的问题,这里有一个方法来处理它:

  • 在每个要处理的实体上放置一个状态标志,或者创建一个补充实体来保存该标志。它应该具有类似于Pending、Processing和Processed的状态。

  • 获取要处理的新实体时,事务性地锁定并递增处理标志。仅在实体处于挂起状态时运行该实体。处理完成后,再次增加标志。

请注意,在开始之前,并不一定要向每个实体添加处理标志。“挂起”状态可能意味着属性或相应的实体尚不存在。在

相关问题 更多 >

    热门问题