气流DAG如何先检查BQ(必要时删除)然后运行数据流作业?

2024-07-03 07:01:22 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在使用cloudcomposer为到达GCS并转到BigQuery的文件编排ETL。我有一个cloud函数,当文件到达时触发dag,cloud函数将文件名/位置传递给dag。在我的DAG中,我有两个任务:

1)使用DataflowPythonOperator运行一个数据流作业,该作业从GCS中的文本中读取数据并将其转换并输入到BQ中,2)根据作业是失败还是成功,将文件移到失败/成功存储桶中。 每个文件都有一个文件ID,它是bigquery表中的一个列。有时一个文件会被编辑一到两次(它不是一个流媒体的东西,它经常),我希望能够删除现有的记录,该文件首先。在

我查看了其他气流操作符,但希望在运行数据流作业之前在DAG中有两个任务:

  1. 根据文件名获取文件id(现在我有一个bigquery表映射文件名->文件id,但我也可以引入一个json作为映射,我想这更容易些)
  2. 如果文件ID已经存在于bigquery表(从数据流作业输出转换数据的表)中,删除它,然后运行dataflow作业,这样我就有了最新的信息。我知道有一个选择是只添加一个时间戳,只使用最新的记录,但因为每个文件可能有100万条记录,而且不像我每天要删除100个文件(可能是1-2个顶部),这看起来可能会很混乱和混乱。在

在数据流作业之后,理想情况下,在将文件移动到成功/失败文件夹之前,我想附加一些“记录”表,说明这个游戏是在这个时候输入的。这将是我查看所有插入的方式。 我尝试过寻找不同的方法来实现这一点,我对cloudcomposer还是个新手,所以我不清楚在经过10多个小时的研究之后这是如何工作的,否则我会发布代码以供输入。在

谢谢,我非常感谢大家的帮助,如果没有你想要的那么清楚,我很抱歉,关于气流的文档非常强大,但是考虑到cloud composer和bigquery相对较新,很难彻底地学习如何完成一些GCP特定的任务。在


Tags: 文件函数idcloud文件名作业记录bigquery
1条回答
网友
1楼 · 发布于 2024-07-03 07:01:22

听起来有点复杂。很高兴,几乎每个GCP服务都有运营商。另一件事是何时触发DAG执行。你知道了吗?每次有新文件进入GCS存储桶时,都会触发一个Google云函数来运行。在

  1. 触发你的狗

要触发DAG,您需要使用依赖于Object Finalize或{a2}触发器的Google云函数来调用它。在

  1. 正在将数据加载到BigQuery

如果您的文件已经在GCS中,并且是JSON或CSV格式,那么使用数据流作业就太过分了。您可以使用GoogleCloudStorageToBigQueryOperator将文件加载到BQ。在

  1. 跟踪文件ID

计算文件ID的最佳方法可能是使用flow中的Bash或Python操作符。你能直接从文件名中得到它吗?在

如果是这样,那么可以在GoogleCloudStorageObjectSensor的上游有一个Python操作符来检查文件是否在成功的目录中。在

如果是,那么可以使用BigQueryOperator对BQ运行删除查询。在

之后,运行GoogleCloudStorageToBigQueryOperator。在

  1. 四处移动文件

如果您要将文件从GCS移动到GCS位置,那么GoogleCloudStorageToGoogleCloudStorageOperator应该可以完成您需要的操作。如果BQ load操作符失败,那么移到failed files位置,如果成功,则移到successful jobs位置。在

  1. 记录任务日志

也许您只需要将任务信息记录到GCS中就可以了。退房how to log task information to GCS

这有帮助吗?在

相关问题 更多 >