气流Google CloudStorageToLoglecloudStorageOperator错误

2024-09-25 00:34:02 发布

您现在位置:Python中文网/ 问答频道 /正文

已经两周了,我在DAG中遇到了一个奇怪的问题。我的用例如下:一位同事手动将文件上传到GCS存储桶中。这将触发一个云函数,该函数反过来从API启动气流DAG。 DAG的第一个任务是将文件从“登录区域”传输到“保存区域”,然后继续DAG的其余部分

我使用GoogleCloudStorageToLoglecloudStorageOperator将文件从存储桶A移动到B。 直到两三周前,一切都正常。最老的DAG只有6个月大,即使我们改变了一些东西,它也在DAG的另一部分。因此,我们从未接触过这一部分,也很长一段时间没有遇到任何问题

现在,大多数情况下,第一个任务传输失败。文件被很好地移动了,但是由于未知的原因,如果我重试,并且在下一次尝试时,使用完全相同的文件,我会连续出现2到3次此错误。。。它起作用了。我找不到导致这个问题的因素。我快疯了

--------------------------------------------------------------------------------
[2020-10-15 09:34:50,599] {taskinstance.py:867} INFO - Starting attempt 1 of 1
[2020-10-15 09:34:50,599] {taskinstance.py:868} INFO - 
--------------------------------------------------------------------------------
[2020-10-15 09:34:50,620] {taskinstance.py:887} INFO - Executing <Task(GoogleCloudStorageToGoogleCloudStorageOperator): transfer-landing-to-safe> on 2020-10-15T07:34:40+00:00
[2020-10-15 09:34:50,626] {standard_task_runner.py:53} INFO - Started process 31555 to run task
[2020-10-15 09:34:50,775] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: geometrie-preprocessing.transfer-landing-to-safe 2020-10-15T07:34:40+00:00 [running]> blablabla.internal
[2020-10-15 09:34:50,860] {gcs_to_gcs.py:193} INFO - Executing copy of gs://blablabla-landing/geometrie/Track_Geometry-20201005_032915.csv to gs://blablabla-safe/geometrie/original/track_geometry_20201005_032915.csv
[2020-10-15 09:34:50,861] {logging_mixin.py:112} INFO - [2020-10-15 09:34:50,860] {gcp_api_base_hook.py:146} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2020-10-15 09:34:50,980] {taskinstance.py:1128} ERROR - 404 POST https://storage.googleapis.com/storage/v1/b/blablabla-landing/o/geometrie%2FTrack_Geometry-20201005_032915.csv/rewriteTo/b/blablabla-safe/o/geometrie%2Foriginal%2Ftrack_geometry_20201005_032915.csv: No such object: blablabla-landing/geometrie/Track_Geometry-20201005_032915.csv
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/dist-packages/airflow/contrib/operators/gcs_to_gcs.py", line 178, in execute
    destination_object=self.destination_object)
  File "/usr/local/lib/python3.7/dist-packages/airflow/contrib/operators/gcs_to_gcs.py", line 196, in _copy_single_object
    self.destination_bucket, destination_object)
  File "/usr/local/lib/python3.7/dist-packages/airflow/contrib/hooks/gcs_hook.py", line 135, in rewrite
    source=source_object
  File "/usr/local/lib/python3.7/dist-packages/google/cloud/storage/blob.py", line 2098, in rewrite
    timeout=timeout,
  File "/usr/local/lib/python3.7/dist-packages/google/cloud/_http.py", line 423, in api_request
    raise exceptions.from_http_response(response)
google.api_core.exceptions.NotFound: 404 POST https://storage.googleapis.com/storage/v1/b/blablabla-landing/o/geometrie%2FTrack_Geometry-20201005_032915.csv/rewriteTo/b/blablabla-safe/o/geometrie%2Foriginal%2Ftrack_geometry_20201005_032915.csv: No such object: blablabla-landing/geometrie/Track_Geometry-20201005_032915.csv
[2020-10-15 09:34:50,984] {taskinstance.py:1185} INFO - Marking task as FAILED.dag_id=geometrie-preprocessing, task_id=transfer-landing-to-safe, execution_date=20201015T073440, start_date=20201015T073450, end_date=20201015T073450
[2020-10-15 09:35:00,556] {logging_mixin.py:112} INFO - [2020-10-15 09:35:00,556] {local_task_job.py:103} INFO - Task exited with return code 1

操作员部分:

transfer_landing_to_safe = GoogleCloudStorageToGoogleCloudStorageOperator(
        task_id=f"transfer-landing-to-safe{env_extension}",
        source_bucket=f"blablabla-landing{env_extension}",
        source_object="{{ dag_run.conf['file_name'] }}",
        destination_bucket=f"blablabla-safe{env_extension}",
        destination_object="geometrie/original/track_geometry_{{ dag_run.conf['file_name'][-19:] }}",
        move_object=True,
        google_cloud_storage_conn_id="gcp_conn"
    )

Tags: csvtopyinfotaskobjectlocalstorage