气流mysql到gcp Dag

2024-09-30 01:34:37 发布

您现在位置:Python中文网/ 问答频道 /正文

我最近开始研究气流。我正在研究DAG:

  1. 查询MySQL数据库
  2. 提取查询并将其作为JSON文件存储在云存储桶中
  3. 将存储的JSON文件上载到BigQuery

Dag导入三个运算符:MySqlOperatorMySqlToGoogleCloudStorageOperator和{}

我使用的是Airflow 1.8.0、Python3和Pandas 0.19.0。在

这是我的Dag代码:

sql2gcp_csv = MySqlToGoogleCloudStorageOperator(

    task_id='sql2gcp_csv',
    sql='airflow_gcp/aws_sql_extract_7days.sql',
    bucket='gs://{{var.value.gcs_bucket}}/{{ ds_nodash }}/',
    filename='{{ ds_nodash }}-account-*.json',
    schema_filename='support/file.json',
    approx_max_file_size_bytes=1900000000,
    mysql_conn_id='aws_mysql',
    google_cloud_storage_conn_id='airflow_gcp',

)

但是,当我运行它时,我收到以下错误:

^{pr2}$

有人知道为什么抛出这个异常吗?在


Tags: 文件csvawsidjsonsqlbucketds
1条回答
网友
1楼 · 发布于 2024-09-30 01:34:37

根据您的回溯,您的代码在this point处中断。如您所见,它处理代码:

json.dump(row_dict, tmp_file_handle)

tmp_file_handle是一个带有默认输入参数的NamedTemporaryFileinitialized,也就是说,它模拟用w+b模式打开的文件(因此只接受类似字节的数据作为输入)。在

问题是在Python2中所有的字符串都是字节,而在Python3中,字符串是文本(默认编码为utf-8)。在

如果打开Python 2并运行以下代码:

^{pr2}$

它工作得很好。在

但如果打开Python 3并运行相同的代码:

In [54]: from tempfile import NamedTemporaryFile
In [55]: tmp_f = NamedTemporaryFile(delete=True)
In [56]: import json
In [57]: json.dump({'1': 1}, tmp_f)
                                     -
TypeError                                 Traceback (most recent call last)
<ipython-input-57-81743b9013c4> in <module>()
  > 1 json.dump({'1': 1}, tmp_f)

/usr/local/lib/python3.6/json/__init__.py in dump(obj, fp, skipkeys, ensure_ascii, check_circular, allow_nan, cls, indent, separators, default, sort_keys, **kw)
    178     # a debuggability cost
    179     for chunk in iterable:
 > 180         fp.write(chunk)
    181 
    182 

/usr/local/lib/python3.6/tempfile.py in func_wrapper(*args, **kwargs)
    481             @_functools.wraps(func)
    482             def func_wrapper(*args, **kwargs):
 > 483                 return func(*args, **kwargs)
    484             # Avoid closing the file as long as the wrapper is alive,
    485             # see issue #18879.

TypeError: a bytes-like object is required, not 'str'

我们的错误和你的一样。在

这意味着python3仍然不完全支持flow(正如您在test coverage中看到的那样,airflow/contrib/operators/mysql_to_gcs.py模块尚未在python2或python3中进行测试)。确认这一点的一种方法是使用python2运行代码,看看它是否有效。在

我建议在their JIRA上创建一个问题,请求两个版本的Python的可移植性。在

相关问题 更多 >

    热门问题