如何在DAG中使用WasbHook从Azure获取blob列表

2024-09-27 19:23:37 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个用于Azure Blob存储的简单DAG

import airflow
from airflow import DAG
from airflow.contrib.hooks.wasb_hook import WasbHook
from airflow.operators.python_operator import PythonOperator

azure = WasbHook(wasb_conn_id='connect_to_azure')

args = {
    "owner": "Airflow",
    "start_date": airflow.utils.dates.days_ago(2)}

dag = DAG(
    dag_id="wasb_sensor_test",
    default_args=args,
    schedule_interval=None,
    tags=['poc', 'azure'])  
    
def get_blob_list():
    blob_list = azure.check_for_prefix(container_name='MY_CONTAINER_NAME', prefix='MY_PREFIX')
    
print_blob_list = PythonOperator(
    task_id='get_blob_list',
    python_callable=get_blob_list,
    dag=dag)
    

print_blob_list

我想得到一个关于适当容器和前缀的BLOB列表。正如我从hook(https://github.com/apache/airflow/blob/6d612efc7e19fff01b0da98bc345320edde70237/airflow/providers/microsoft/azure/hooks/wasb.py#L73)的代码源所理解的那样,如果调用check_for_prefix函数,并添加和附加参数以代替**kwargs,则可以这样做。 差不多

blob_list = azure.check_for_prefix(container_name='MY_CONTAINER_NAME', prefix='MY_PREFIX', blob_list_return)

但我不知道如何正确地做


Tags: fromimportidgetprefixmycheckargs
2条回答

是我的错。check_for_prefix函数中**kwargs的所有参数都在调用BlockBlobService时传递。在函数中列出(…,**kwargs)blobs

对于更新了Azure Blob存储库v12的WasbHook,get_Blob_列表仍与BlockBlob服务一样有效。唯一的区别是它返回一个blob名称列表

azure.get_blobs_list(container_name,prefix)

相关问题 更多 >

    热门问题