情况非常简单。我有3项任务需要执行:
独立地执行这些任务是完美的。如果我通过CLI手动启动Flume代理(源HTTP、接收器HDFS),然后发送一个curl命令,那么代理会将收到的消息转储到HDFS中
正如我所说的,通过CLI手动执行每个操作是无缝的
然而,通过气流DAG安排该过程是一个挑战。我试过用几种方法构建我的DAG,但都没有用。主要问题是flume任务(BashOperator)一直处于运行状态,而且从未结束。这是有道理的。不应该
但是DAG永远不会继续到下一个节点(睡眠30秒->;发送curl命令)
这样,分支A跳过flume_任务,分支B成功执行sleep_任务,失败执行http_任务。失败的http_任务有意义,没有代理正在运行
提供最后一个示例的代码,我认为它具有最合理的DAG构造。我希望有人能对此有所了解。 我在网上阅读了大量的参考资料,我知道Flume是event-driven,但我无法理解我的气流脚本到底做错了什么
如果有人能给我指出正确的方向,我将不胜感激
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from datetime import datetime
from airflow.models import TaskInstance
# Defaults
default_args = {
'start_date': datetime(2019, 11, 11),
'depends_on_past':False
}
# DAGs
dag = DAG('HTTP_2_HDFS', default_args=default_args, schedule_interval='59 * * * *')
# Commands
flume_command = "flume-ng agent --name myAgent --conf conf --conf-file /home/hadoop/flume/conf/http.conf "
sleep_command = "sleep 30 "
http_command = "/home/hadoop/flume/hdfs_test/HTTP_2_HDFS.sh "
# Tasks
def check_status(**kwargs):
flume_task_instance = TaskInstance(flume_task, datetime(2019, 11, 11))
state = flume_task_instance.current_state()
if state == "running":
print("FLUME PROCESS RUNNING !!!")
flume_task = BashOperator(
task_id='FLUME',
bash_command=flume_command,
dag=dag
)
sleep_task = BashOperator(
task_id='SLEEP',
bash_command=sleep_command,
dag=dag
)
http_task = BashOperator(
task_id='HTTP',
bash_command=http_command,
dag=dag
)
check_running_task = PythonOperator(
task_id='CHECK_FLUME_STATUS',
python_callable=check_status,
provide_context=True,
dag=dag
)
# Node Connections
flume_task
sleep_task >> check_running_task >> http_task
# branch = BranchPythonOperator(task_id='BRANCH', provide_context=True, python_callable=check_status, dag=dag)
# branch >> flume_task
# branch >> sleep_task >> http_task
我没有直接使用flume的经验,因此如果其中一些假设不成立,我深表歉意,但根据我的经验,气流在长时间运行的过程中并不起作用
看起来你可能想要一个不同的设计。如果http任务在dag中以1分钟的间隔自行运行,该怎么办?它还能和水槽探员说话并按预期的方式冲洗吗
因此flume独立运行,您不需要睡眠任务,因为您使用的是airflow scheduler以一定的间隔运行
我们有一个类似的设置,用于访问《气流》中的卡夫卡主题。只有耗电元件在气流内部运行。我们已经围绕它构建了包装器来模拟“流”,但它实际上更像是几个重叠的微批次在运行,给人一种流的错觉
相关问题 更多 >
编程相关推荐