使用数据流模板读取大查询表

2024-10-04 05:30:54 发布

您现在位置:Python中文网/ 问答频道 /正文

我想使用Python和Dataflow在BigQuery中读取一个表。我事先不知道那张桌子的名字。我使用模板传递表名,如下所示:

.
.
.
from apache_beam.options.pipeline_options import PipelineOptions


class DataflowOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--table_name',
            help='Name of table on BigQuery')


def run(argv=None):
    pipeline_options = PipelineOptions()
    dataflow_options = pipeline_options.view_as(DataflowOptions)

    with beam.Pipeline(options=pipeline_options) as pipeline:
        table_spec = bigquery.TableReference(
            projectId='MyProyectId',
            datasetId='MyDataset',
            tableId=str(dataflow_options.table_name))

        p = (pipeline | 'Read Table' >> beam.io.Read(beam.io.BigQuerySource(table_spec)))


if __name__ == '__main__':
    run()

但当我启动作业时,会出现以下错误:

Workflow failed. Causes: S01:Read Table+Batch Users/ParDo(_GlobalWindowsBatchingDoFn)+Hash Users+Upload to Ads failed., BigQuery getting table "RuntimeValueProvider(option: table_name, type: str, default_value: None)" from dataset "MyDataset" in project "MyProject" failed., BigQuery execution failed., Error:
 Message: Invalid table ID "RuntimeValueProvider(option: table_name, type: str, default_value: None)".
 HTTP Code: 400

我读过this answer,但到目前为止,还没有2017年的消息吗?你知道吗


Tags: namefromnonereadpipelinevaluedeftable
1条回答
网友
1楼 · 发布于 2024-10-04 05:30:54

根据前面提到的文档hereTableReference采用以下参数(dataset_ref, table_id)。从您的代码片段来看,大括号的位置似乎不正确。你知道吗

with beam.Pipeline(options=pipeline_options) as pipeline:
        dataset_ref = bigquery.DatasetReference('my-project-id', 'some_dataset')
        table_spec = bigquery.TableReference(dataset_ref,
            tableId=str(dataflow_options.table_name)

相关问题 更多 >