如果一个节点始终处于运行状态,而另一个节点从未执行,我如何构造DAG?

2024-10-07 00:31:32 发布

您现在位置:Python中文网/ 问答频道 /正文

情况非常简单。我有3项任务需要执行:

  • 水槽任务>&燃气轮机;睡眠任务>&燃气轮机;http_任务

独立地执行这些任务是完美的。如果我通过CLI手动启动Flume代理(源HTTP、接收器HDFS),然后发送一个curl命令,那么代理会将收到的消息转储到HDFS中

正如我所说的,通过CLI手动执行每个操作是无缝的

然而,通过气流DAG安排该过程是一个挑战。我试过用几种方法构建我的DAG,但都没有用。主要问题是flume任务(BashOperator)一直处于运行状态,而且从未结束。这是有道理的。不应该

但是DAG永远不会继续到下一个节点(睡眠30秒->;发送curl命令)

  • 我构建了一个线性依赖关系(flume_task>&燃气轮机;睡眠任务>&燃气轮机;http_任务)-陷入flume_任务
  • 我已经用它构造了BranchingPythonOperator
    • 分支A=水槽任务
    • 分支B=睡眠任务->;http_任务

这样,分支A跳过flume_任务,分支B成功执行sleep_任务,失败执行http_任务。失败的http_任务有意义,没有代理正在运行

  • 然后,我决定在根目录下分离,以便flume_任务是独立的:
    • 水槽任务
    • 睡眠任务>&燃气轮机;检查\u水槽\u状态>&燃气轮机;http_任务

提供最后一个示例的代码,我认为它具有最合理的DAG构造。我希望有人能对此有所了解。 我在网上阅读了大量的参考资料,我知道Flume是event-driven,但我无法理解我的气流脚本到底做错了什么

如果有人能给我指出正确的方向,我将不胜感激

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from datetime import datetime
from airflow.models import TaskInstance

# Defaults
default_args = {
    'start_date': datetime(2019, 11, 11),
    'depends_on_past':False
}

# DAGs
dag = DAG('HTTP_2_HDFS', default_args=default_args, schedule_interval='59 * * * *')

# Commands
flume_command = "flume-ng agent --name myAgent --conf conf --conf-file /home/hadoop/flume/conf/http.conf "

sleep_command = "sleep 30 "

http_command = "/home/hadoop/flume/hdfs_test/HTTP_2_HDFS.sh "

# Tasks
def check_status(**kwargs):
    flume_task_instance = TaskInstance(flume_task, datetime(2019, 11, 11))
    state = flume_task_instance.current_state()
    if state == "running":
        print("FLUME PROCESS RUNNING !!!")

flume_task = BashOperator(
    task_id='FLUME',
    bash_command=flume_command,
    dag=dag
)

sleep_task = BashOperator(
    task_id='SLEEP',
    bash_command=sleep_command,
    dag=dag
)

http_task = BashOperator(
    task_id='HTTP',
    bash_command=http_command,
    dag=dag
)

check_running_task = PythonOperator(
        task_id='CHECK_FLUME_STATUS',
        python_callable=check_status,
        provide_context=True,
        dag=dag
)

# Node Connections
flume_task
sleep_task >> check_running_task >> http_task

# branch = BranchPythonOperator(task_id='BRANCH', provide_context=True, python_callable=check_status, dag=dag)

# branch >> flume_task
# branch >> sleep_task >> http_task



Tags: fromimportgtidhttptaskconfcheck
1条回答
网友
1楼 · 发布于 2024-10-07 00:31:32

我没有直接使用flume的经验,因此如果其中一些假设不成立,我深表歉意,但根据我的经验,气流在长时间运行的过程中并不起作用

看起来你可能想要一个不同的设计。如果http任务在dag中以1分钟的间隔自行运行,该怎么办?它还能和水槽探员说话并按预期的方式冲洗吗

因此flume独立运行,您不需要睡眠任务,因为您使用的是airflow scheduler以一定的间隔运行

我们有一个类似的设置,用于访问《气流》中的卡夫卡主题。只有耗电元件在气流内部运行。我们已经围绕它构建了包装器来模拟“流”,但它实际上更像是几个重叠的微批次在运行,给人一种流的错觉

相关问题 更多 >