我正在使用cloudcomposer为到达GCS并转到BigQuery的文件编排ETL。我有一个cloud函数,当文件到达时触发dag,cloud函数将文件名/位置传递给dag。在我的DAG中,我有两个任务:
1)使用DataflowPythonOperator
运行一个数据流作业,该作业从GCS中的文本中读取数据并将其转换并输入到BQ中,2)根据作业是失败还是成功,将文件移到失败/成功存储桶中。
每个文件都有一个文件ID,它是bigquery表中的一个列。有时一个文件会被编辑一到两次(它不是一个流媒体的东西,它经常),我希望能够删除现有的记录,该文件首先。在
我查看了其他气流操作符,但希望在运行数据流作业之前在DAG中有两个任务:
在数据流作业之后,理想情况下,在将文件移动到成功/失败文件夹之前,我想附加一些“记录”表,说明这个游戏是在这个时候输入的。这将是我查看所有插入的方式。 我尝试过寻找不同的方法来实现这一点,我对cloudcomposer还是个新手,所以我不清楚在经过10多个小时的研究之后这是如何工作的,否则我会发布代码以供输入。在
谢谢,我非常感谢大家的帮助,如果没有你想要的那么清楚,我很抱歉,关于气流的文档非常强大,但是考虑到cloud composer和bigquery相对较新,很难彻底地学习如何完成一些GCP特定的任务。在
听起来有点复杂。很高兴,几乎每个GCP服务都有运营商。另一件事是何时触发DAG执行。你知道了吗?每次有新文件进入GCS存储桶时,都会触发一个Google云函数来运行。在
要触发DAG,您需要使用依赖于Object Finalize或{a2}触发器的Google云函数来调用它。在
如果您的文件已经在GCS中,并且是JSON或CSV格式,那么使用数据流作业就太过分了。您可以使用GoogleCloudStorageToBigQueryOperator将文件加载到BQ。在
计算文件ID的最佳方法可能是使用flow中的Bash或Python操作符。你能直接从文件名中得到它吗?在
如果是这样,那么可以在GoogleCloudStorageObjectSensor的上游有一个Python操作符来检查文件是否在成功的目录中。在
如果是,那么可以使用BigQueryOperator对BQ运行删除查询。在
之后,运行GoogleCloudStorageToBigQueryOperator。在
如果您要将文件从GCS移动到GCS位置,那么GoogleCloudStorageToGoogleCloudStorageOperator应该可以完成您需要的操作。如果BQ load操作符失败,那么移到failed files位置,如果成功,则移到successful jobs位置。在
也许您只需要将任务信息记录到GCS中就可以了。退房how to log task information to GCS
这有帮助吗?在
相关问题 更多 >
编程相关推荐