bigQueryPythonAPI:在提取表作业期间保留空字段

2024-09-30 18:16:29 发布

您现在位置:Python中文网/ 问答频道 /正文

我有以下代码:

job_config = bigquery.ExtractJobConfig()
job_config.compression = bigquery.Compression.GZIP
job_config.destination_format = (
    bigquery.DestinationFormat.NEWLINE_DELIMITED_JSON)

destination_uri = 'gs://{}/{}'.format(bucket_name, gcs_filename)

extract_job = client.extract_table(
    table,
    destination_uri,
    job_config=job_config,
    location='US')  # API request
extract_job.result()  # Waits for job to complete.

(请注意,我正在别处获取表对象。)

这是可行的,并将请求的表作为newline delimited JSON转储到GCS中。但是,表中的某些列是可以为null的,其中一些列确实包含null值。为了保证所有数据的一致性,我希望在json结果中保留null值。有没有办法不用avro就能做到这一点?在

这篇文章:Big Query table extract in JSON, preserve nulls?。。。建议实际查询表。我不认为这是我的选择,因为我提取的表每个包含数百万行。我看到的一个包含近100万行数据,数据量超过25GB。但是我还没有找到一种方法来设置提取作业来保存空值。在


Tags: 数据代码configjsonformattablejobextract
2条回答

这个论点以前在SO中已经提出过。我建议您回顾一下this post,包括对您的问题的解释和解决方法。在

有一些很好的答案,例如这一个,来自Mosha(谷歌软件工程师):

This is standard behavior of NULL in SQL, and all SQL databases (Oracle, Microsoft SQL Server, PostgreSQL, MySQL etc) have exactly same behavior. If the IS NULL check is too tedious, alternative solution is to use IFNULL or COALESCE function to convert NULL into non-NULL, i.e.

select * from
(select NULL as some_nullable_col, "name1" as name),
(select 4 as some_nullable_col, "name2" as name),
(select 1 as some_nullable_col, "name3" as name),
(select 7 as some_nullable_col, "name4" as name),
(select 3 as some_nullable_col, "name5" as name)
WHERE ifnull(some_nullable_col,0) != 3

我认为最好的方法是先使用查询作业。在

  1. 您需要从某处提取表并运行查询作业
  2. 以不带标题的CSV格式运行提取

有这样做的代码

job_config = bigquery.QueryJobConfig()
gcs_filename = 'file_with_nulls*.json.gzip'

table_ref = client.dataset(dataset_id).table('my_null_table')
job_config.destination = table_ref

job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

# Start the query, passing in the extra configuration.
query_job = client.query(
    """#standardSql
    select TO_JSON_STRING(t) AS json from `project.dataset.table` as t ;""",
    location='US',
    job_config=job_config)

while not query_job.done():
    time.sleep(1)

#check if table successfully written
print("query completed")
job_config = bigquery.ExtractJobConfig()
job_config.compression = bigquery.Compression.GZIP
job_config.destination_format = (
    bigquery.DestinationFormat.CSV)
job_config.print_header = False

destination_uri = 'gs://{}/{}'.format(bucket_name, gcs_filename)

extract_job = client.extract_table(
    table_ref,
    destination_uri,
    job_config=job_config,
    location='US')  # API request
extract_job.result()
print("extract completed")

完成所有操作后,您可以删除在步骤1中创建的临时表。 如果您快速完成,成本将非常低(每月1TB的存储是20美元—因此,对于25GB的存储,即使是1小时,也将是20/30/24=3美分)

相关问题 更多 >