擅长:python、mysql、java
<p>由于SFTPToGCSOperator在发动机罩下使用<code>airflow.providers.sftp.operators.SFTPOperator</code>而导致该错误,该错误出现在气流中>;=2.0.0. </p>
<p>坏消息是需要升级气流版本以使用<code>airflow.providers.google.cloud.transfers.sftp_to_gcs.SFTPToGCSOperator</code></p>
<p>如果不希望/无法升级气流,可以创建DAG链接两个操作符:</p>
<div class="s-table-container">
^{tb1}$
</div>
<p>这应该可以做到:</p>
<pre class="lang-py prettyprint-override"><code>
LOCALFILE = '/tmp/kk'
with models.DAG("test_ssh_to_gcs", start_date=days_ago(1), schedule_interval=None) as dag:
download_sftp = SFTPOperator(
task_id = 'part1_sftp_download_to_local',
ssh_conn_id="sftp_test",
local_file=LOCALFILE,
remote_file='',
operation='get')
gcp_upload = FileToGoogleCloudStorageOperator(
task_id='part2_upload_to_gcs',
bucket='test_sftp_to_gcs',
src=LOCALFILE,
dst="test/test.csv",
google_cloud_storage_conn_id="google_cloud_default" # configured in Airflow
)
sftp_download >> gcp_upload
</code></pre>