数据流BigQuery插入作业在大数据时立即失败

2024-10-02 20:43:00 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用beam python库设计了一个beam/dataflow管道。管道大致执行以下操作:

  1. ParDo:从API收集JSON数据
  2. ParDo:转换JSON数据
  3. 转换到写入数据的表:bigo

一般来说,代码做它应该做的事情。但是,当从API收集大数据集(大约500.000个JSON文件)时,在使用DataflowRunner(它与在我的计算机上执行的DirectRunner一起工作)时,bigquery插入作业在启动后立即停止(=在一秒钟内)。当使用较小的数据集时,一切正常。在

数据流日志如下:

2019-04-22 (00:41:29) Executing BigQuery import job "dataflow_job_14675275193414385105". You can check its status with the...
Executing BigQuery import job "dataflow_job_14675275193414385105". You can check its status with the bq tool: "bq show -j --project_id=X dataflow_job_14675275193414385105". 
2019-04-22 (00:41:29) Workflow failed. Causes: S01:Create Dummy Element/Read+Call API+Transform JSON+Write to Bigquery /Wr...
Workflow failed. Causes: S01:Create Dummy Element/Read+Call API+Transform JSON+Write to Bigquery /WriteToBigQuery/NativeWrite failed., A work item was attempted 4 times without success. Each time the worker eventually lost contact with the service. The work item was attempted on: 
beamapp-X-04212005-04211305-sf4k-harness-lqjg,
beamapp-X-04212005-04211305-sf4k-harness-lgg2,
beamapp-X-04212005-04211305-sf4k-harness-qn55,
beamapp-X-04212005-04211305-sf4k-harness-hcsn

按照建议使用bqcli工具来获取有关bq加载作业的更多信息是行不通的。找不到这个工作(我怀疑它是因为瞬间的失败而创建的)。在

我想我遇到了某种配额/bq限制,甚至内存不足的问题(请参见:https://beam.apache.org/documentation/io/built-in/google-bigquery/

Limitations BigQueryIO currently has the following limitations.

You can’t sequence the completion of a BigQuery write with other steps of >your pipeline.

If you are using the Beam SDK for Python, you might have import size quota >issues if you write a very large dataset. As a workaround, you can partition >the dataset (for example, using Beam’s Partition transform) and write to >multiple BigQuery tables. The Beam SDK for Java does not have this >limitation as it partitions your dataset for you.

如果能给我一些关于如何缩小这个问题根源的建议,我将不胜感激。在

我还想尝试一个分区Fn,但是没有找到任何python源代码示例如何将分区的pcollection写入BigQuery表。在


Tags: the数据youapijsonforwithjob
2条回答

据我所知,在云数据流和apachebeam的pythonsdk中没有诊断OOM的选项(javasdk有可能)。我建议您在Cloud Dataflow issue tracker中打开feature request,以获取此类问题的更多详细信息。在

除了检查数据流作业日志文件之外,我建议您使用提供每个作业的资源使用情况的Stackdriver Monitoring tool来监视管道(如Total memory usage time)。在

关于Python SDK中分区函数的使用,以下代码(基于Apache Beam的documentation)将数据分成3个BigQuery加载作业:

def partition_fn(input_data, num_partitions):
      return int(get_percentile(lines) * num_partitions / 100)

    partition = input_data | beam.Partition(partition_fn, 3)

    for x in range(3):
      partition[x] | 'WritePartition %s' % x >> beam.io.WriteToBigQuery(
        table_spec,
        schema=table_schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

可能有助于调试的一件事是查看Stackdriver日志。在

如果您在Google console中打开数据流作业,并单击图形面板右上角的LOGS,那么应该会打开底部的logs面板。LOGS面板的右上角有一个指向Stackdriver的链接。这将为您提供有关此特定工作的工人/洗牌/等的大量日志信息。在

其中有很多内容,很难筛选出相关的内容,但希望您能够找到比A work item was attempted 4 times without success更有用的东西。例如,每个worker偶尔会记录它正在使用的内存量,可以将其与每个worker的内存量(基于机器类型)进行比较,以确定它们是否确实内存不足,或者您的错误是否发生在其他地方。在

祝你好运!在

相关问题 更多 >