如何定义STFP操作员对气流的操作?

2024-05-17 09:02:25 发布

您现在位置:Python中文网/ 问答频道 /正文

class SFTPOperation(object):
    PUT = 'put'
    GET = 'get'  

operation=SFTPOperation.GET,
NameError: name 'SFTPOperation' is not defined

我在这里定义了运营商,但在互联网上找不到任何与运营相关的内容

class sftpplugin(AirflowPlugin):
    name = "sftp_plugin"
    operators = [SFTPOperator]

任何帮助都将不胜感激

谢谢


Tags: nameget定义objectputisnotoperation
1条回答
网友
1楼 · 发布于 2024-05-17 09:02:25

注意到SFTP操作符使用ssh_钩子打开sftp传输通道,您应该需要为文件传输提供ssh_hookssh_conn_id。首先,让我们看一个提供参数ssh_conn_id的示例

from airflow.providers.sftp.operators import sftp_operator
from airflow import DAG
import datetime

dag = DAG(
'test_dag',
start_date = datetime.datetime(2020,1,8,0,0,0),
schedule_interval = '@daily'
)

put_operation = SFTPOperator(
            task_id="operation",
            ssh_conn_id="ssh_default",
            local_filepath="route_to_local_file",
            remote_filepath="remote_route_to_copy",
            operation="put",
            dag=dag
            )
get_operation = SFTPOperator(....,
            operation = "get",
            dag = dag
            )

put_operation >> get_operation

请注意,dag应该根据任务的需要进行调度,这里的示例考虑从中午开始的每日调度。现在,如果您正在提供SSHhook,则需要对上述代码进行以下更改

from airflow.contrib.hooks.ssh_hook import SSHHook
...

put_operation = SFTPOperator(
            task_id="operation",
            ssh_hook=SSHHook("Name_of_variable_defined"),
            ...
            dag=dag
            )
....

其中"Name_of_variable_defined"是在Admin->;气流接口处的连接

相关问题 更多 >