Apache气流DAG未在GUI中显示

2024-05-18 14:22:08 发布

您现在位置:Python中文网/ 问答频道 /正文

在代码中更新DAG及其序列(气流显示更新),但这不会显示在气流GUI(树/图形视图)中。正如其他帖子和他们的解决方案所建议的,已经尝试过刷新、重新启动气流等。出于某种原因,它没有选择“提取api数据任务”,即使下面有说明:创建psql模式任务>&燃气轮机;创建\u psql\u表\u任务>&燃气轮机;提取api数据任务>&燃气轮机;插入数据任务。我不知道该怎么办。任何帮助都将不胜感激。 [1]

import os
import logging
import requests
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from psycopg2.extras import execute_values
from airflow import AirflowException

########################################################
#
#   DAG Settings
#
#########################################################

from airflow import DAG

dag_default_args = {
    'owner': 'Nigel',
    'start_date': datetime.now() - timedelta(days=2),
    'email': [],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'depends_on_past': False,
    'wait_for_downstream': False,
}

dag = DAG(
    dag_id='AT2_DAG',
    default_args=dag_default_args,
    schedule_interval="@daily",
    catchup=True,
    max_active_runs=1,
    concurrency=5
)


#########################################################
#
#   Custom Logic for Operator
#
#########################################################

def api_url():
    return f"http://data.insideairbnb.com/australia/nsw/sydney/2021-04-10/data/listings.csv.gz" 

def extract_api_data_func():
    url = api_url()
    df = pd.read_csv(url, compression='gzip', header=0, sep=',', quotechar='"')
    df_dict = df.to_dict()
    return df_dict

def insert_data_func(**kwargs):
         
    ps_pg_hook = PostgresHook(postgres_conn_id="postgres")
    conn_ps = ps_pg_hook.get_conn()

    ti = kwargs['ti']
    insert_df_dict = ti.xcom_pull(task_ids=f'extract_api_data_task_id')
    insert_df = pd.DataFrame.from_dict(insert_df_dict)

    if len(insert_df) > 0:
        col_names = ['listing_url','scrape_id','last_scraped','name','description','neighborhood_overview','picture_url','host_id','host_url','host_name','host_since','host_location','host_about','host_response_time','host_response_rate','host_acceptance_rate','host_is_superhost','host_thumbnail_url','host_picture_url','host_neighbourhood','host_listings_count','host_total_listings_count','host_verifications','host_has_profile_pic','host_identity_verified','neighbourhood','neighbourhood_cleansed','neighbourhood_group_cleansed','latitude','longitude','property_type','room_type','accommodates','bathrooms','bathrooms_text','bedrooms','beds','amenities','price','minimum_nights','maximum_nights','minimum_minimum_nights','maximum_minimum_nights','minimum_maximum_nights','maximum_maximum_nights','minimum_nights_avg_ntm','maximum_nights_avg_ntm','calendar_updated','has_availability','availability_30','availability_60','availability_90','availability_365','calendar_last_scraped','number_of_reviews','number_of_reviews_ltm','number_of_reviews_l30d','first_review','last_review','review_scores_rating','review_scores_accuracy','review_scores_cleanliness','review_scores_checkin','review_scores_communication','review_scores_location','review_scores_value','license','instant_bookable','calculated_host_listings_count','calculated_host_listings_count_entire_homes','calculated_host_listings_count_private_rooms','calculated_host_listings_count_shared_rooms','reviews_per_month']

        values = insert_df[col_names].to_dict('split')
        values = values['data']
        logging.info(values)

        insert_sql = """
                    INSERT INTO bde_at2.airbnb (listing_url,scrape_id, last_scraped, name, description, neighborhood_overview, picture_url, host_id, host_url, host_name, host_since, host_location, host_about, host_response_time, host_response_rate, host_acceptance_rate, host_is_superhost, host_thumbnail_url, host_picture_url, host_neighbourhood, host_listings_count, host_total_listings_count, host_verifications, host_has_profile_pic, host_identity_verified, neighbourhood, neighbourhood_cleansed, neighbourhood_group_cleansed, latitude, longitude, property_type, room_type, accommodates, bathrooms, bathrooms_text, bedrooms, beds, amenities, price, minimum_nights, maximum_nights, minimum_minimum_nights, maximum_minimum_nights, minimum_maximum_nights, maximum_maximum_nights, minimum_nights_avg_ntm, maximum_nights_avg_ntm, calendar_updated, has_availability, availability_30, availability_60, availability_90, availability_365, calendar_last_scraped, number_of_reviews, number_of_reviews_ltm, number_of_reviews_l30d, first_review, last_review, review_scores_rating, review_scores_accuracy, review_scores_cleanliness, review_scores_checkin, review_scores_communication, review_scores_location, review_scores_value, license, instant_bookable, calculated_host_listings_count, calculated_host_listings_count_entire_homes, calculated_host_listings_count_private_rooms, calculated_host_listings_count_shared_rooms, reviews_per_month)
                    VALUES %s
                    """

        result = execute_values(conn_ps.cursor(), insert_sql, values, page_size=len(insert_df))
        conn_ps.commit()
    else:
        None

    return None 

#########################################################
#
#   DAG Operator Setup
#
#########################################################

from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook

extract_api_data_task = PythonOperator(
    task_id='extract_api_data_task_id',
    python_callable=extract_api_data_func,
    op_kwargs={},
    provide_context=True,
    dag=dag
)

create_psql_schema_task = PostgresOperator(
    task_id="create_psql_schema_task_id",
    postgres_conn_id="postgres",
    sql="""
        CREATE SCHEMA IF NOT EXISTS bde_at2;
    """,
    dag=dag
)

create_psql_table_task = PostgresOperator(
    task_id="create_psql_table_task_id",
    postgres_conn_id="postgres",
    sql="""
        CREATE TABLE IF NOT EXISTS bde_at2.airbnb (
            id                                              TEXT,
            listing_url                                     TEXT,
            scrape_id                                       TEXT,
            last_scraped                                    TEXT,
            name                                            TEXT,
            description                                     TEXT,
            neighborhood_overview                           TEXT,
            picture_url                                     TEXT,
            host_id                                         TEXT,
            host_url                                        TEXT,
            host_name                                       TEXT,
            host_since                                      TEXT,
            host_location                                   TEXT,
            host_about                                      TEXT,
            host_response_time                              TEXT,
            host_response_rate                              TEXT,
            host_acceptance_rate                            TEXT,
            host_is_superhost                               TEXT,
            host_thumbnail_url                              TEXT,
            host_picture_url                                TEXT,
            host_neighbourhood                              TEXT,
            host_listings_count                             TEXT,
            host_total_listings_count                       TEXT,
            host_verifications                              TEXT,
            host_has_profile_pic                            TEXT,
            host_identity_verified                          TEXT,
            neighbourhood                                   TEXT,
            neighbourhood_cleansed                          TEXT,
            neighbourhood_group_cleansed                    TEXT,
            latitude                                        TEXT,
            longitude                                       TEXT,
            property_type                                   TEXT,
            room_type                                       TEXT,
            accommodates                                    TEXT,
            bathrooms                                       TEXT,
            bathrooms_text                                  TEXT,
            bedrooms                                        TEXT,
            beds                                            TEXT,
            amenities                                       TEXT,
            price                                           TEXT,
            minimum_nights                                  TEXT,
            maximum_nights                                  TEXT,
            minimum_minimum_nights                          TEXT,
            maximum_minimum_nights                          TEXT,
            minimum_maximum_nights                          TEXT,
            maximum_maximum_nights                          TEXT,
            minimum_nights_avg_ntm                          TEXT,
            maximum_nights_avg_ntm                          TEXT,
            calendar_updated                                TEXT,
            has_availability                                TEXT,
            availability_30                                 TEXT,
            availability_60                                 TEXT,
            availability_90                                 TEXT,
            availability_365                                TEXT,
            calendar_last_scraped                           TEXT,
            number_of_reviews                               TEXT,
            number_of_reviews_ltm                           TEXT,
            number_of_reviews_l30d                          TEXT,
            first_review                                    TEXT,
            last_review                                     TEXT,
            review_scores_rating                            TEXT,
            review_scores_accuracy                          TEXT,
            review_scores_cleanliness                       TEXT,
            review_scores_checkin                           TEXT,
            review_scores_communication                     TEXT,
            review_scores_location                          TEXT,
            review_scores_value                             TEXT,
            license                                         TEXT,
            instant_bookable                                TEXT,
            calculated_host_listings_count                  TEXT,
            calculated_host_listings_count_entire_homes     TEXT,
            calculated_host_listings_count_private_rooms    TEXT,
            calculated_host_listings_count_shared_rooms     TEXT,
            reviews_per_month                               TEXT

            );
    """,
    dag=dag
)

insert_data_task = PythonOperator(
    task_id='insert_data_task_id',
    python_callable=insert_data_func,
    op_kwargs={},
    provide_context=True,
    dag=dag
)

create_psql_schema_task >> create_psql_table_task >> extract_api_data_task >> insert_data_task```


  [1]: https://i.stack.imgur.com/bcYVv.png

日志:

Reading local file: /opt/airflow/logs/AT2_DAG/insert_data_task_id/2021-05-12T00:00:00+00:00/1.log
[2021-05-13 06:05:52,878] {taskinstance.py:845} INFO - Dependencies not met for <TaskInstance: AT2_DAG.insert_data_task_id 2021-05-12T00:00:00+00:00 [queued]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'extract_api_data_task_id'}
[2021-05-13 06:05:52,880] {local_task_job.py:93} INFO - Task is not able to be run

Tags: textidhosturltaskdatacountreview

热门问题