Dalam templat SQL, Airflow's XCom mengembalikan nilai sebagai rentetan petikan dua kali manakala Snowflake menggunakan rentetan petikan tunggal dalam pertanyaan SQL
P粉043470158
P粉043470158 2024-02-25 15:36:12
0
1
497

Saya baru dalam ETL dan mengusahakan aliran udara dan kepingan salji. Saya menggunakan pengendali python untuk mendapatkan nilai ciptaan maksimum daripada jadual mysql dan berdasarkan xcom pengendali itu, saya mencipta fail csv data kepingan salji untuk membuang hanya data ciptaan terkini dari mysql ke kepingan salji. Masalahnya ialah apabila saya mengekstrak nilai dalam templat sql, aliran udara xcom mengembalikan petikan berganda. Dan Snowflake menerima petikan tunggal dalam pertanyaan sqlnya. Gambar ralat

Ini kod DAG saya:

def defaultconverter(o):
    if isinstance(o, datetime):
        return o.__str__()


def get_max_created_timestamp(sql_table_name):
    hook = MySqlHook(MYSQL_CONN)
    check_column = f"select column_name from INFORMATION_SCHEMA.COLUMNS where table_name = '{sql_table_name}' " \
                   f"and table_schema = '{MYSQL_SCHEMA}';"
    print(hook.schema)
    data = hook.get_records(check_column)
    if any('created_at' in x for x in data):
        date_sql = f'select max(created_at) created_timestamp_max from {MYSQL_SCHEMA}.{sql_table_name}'
        (created_timestamp_max,) = hook.get_first(date_sql)
        return json.dumps(created_timestamp_max, default=defaultconverter)
        # return int(created_timestamp_max)
    else:
        return 0


default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "email": [],
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 0,
    "retry_delay": timedelta(minutes=5),
    "template_searchpath": [TEMPLATE_SEARCHPATH, ]
}


with DAG(dag_id="lion_sense_snowflake_to_mysql_v1",
         start_date=datetime(2021, 12, 1, 0, 0, 0, 0),
         schedule_interval="@daily",
         catchup=False,
         default_args=default_args,
         max_active_runs=1,
         ) as dag:
    dag.doc_md = DOCS

    for table in tables:
        mysql_table = table["mysql_table"]
        snowflake_table = table["snowflake_table"]
        delete_flag = table["delete"]

        get_max_timestamp_task = PythonOperator(
            task_id=f"get_max_timestamp_{mysql_table}",
            python_callable=get_max_created_timestamp,
            op_args=[mysql_table, ],
            do_xcom_push=True,
        )

        create_snowflake_table_csv = SnowflakeOperator(
            task_id=f"create_snowflake_{snowflake_table}_table_csv",
            dag=dag,
            sql="sql/convert_snowflake_table_to_csv.sql",
            snowflake_conn_id=SNOWFLAKE_CONN_ID,
            warehouse=SNOWFLAKE_WAREHOUSE,
            database=SNOWFLAKE_DATABASE,
            schema=SNOWFLAKE_SCHEMA,
            role=SNOWFLAKE_ROLE,
            params={
                "snowflake_table": snowflake_table,
                "delete_flag": delete_flag,
                "max_date": get_max_timestamp_task.output
            }
        )

Templat pertanyaan mysql:

copy into @S3_TKXEL_DEVEOPMENT_STAGE/airflow/{{ dag.dag_id }}/{{ ds_nodash }}/{{ params.snowflake_table }}/{{ ds_nodash }}_{{ params.snowflake_table }}.csv
from (
    select * from {{ params.snowflake_table }}
        {% if params.delete_flag %}
 where created_at > {{ params.max_date}}
        {% endif %}
    )
file_format = (TYPE = CSV, COMPRESSION = NONE, NULL_IF=(''), field_optionally_enclosed_by='"' )
OVERWRITE = TRUE
SINGLE = TRUE
MAX_FILE_SIZE=5000000000;

Terima kasih terlebih dahulu kerana menambah pengetahuan saya.

P粉043470158
P粉043470158

membalas semua(1)
P粉896751037

Sama ada hendak menukar output get_max_created_timestamp:

def get_max_created_timestamp(sql_table_name):
    hook = MySqlHook(MYSQL_CONN)
    check_column = f"select column_name from INFORMATION_SCHEMA.COLUMNS where table_name = '{sql_table_name}' " \
                   f"and table_schema = '{MYSQL_SCHEMA}';"
    print(hook.schema)
    data = hook.get_records(check_column)
    if any('created_at' in x for x in data):
        date_sql = f'select max(created_at) created_timestamp_max from {MYSQL_SCHEMA}.{sql_table_name}'
        (created_timestamp_max,) = hook.get_first(date_sql)
        return "'" + created_timestamp_max + "'"
    else:
        return 0

Bantuan, kini rentetan akan dipetik dengan betul untuk mewakili jangkaan rentetan kepingan salji.

Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan