私は ETL を初めて使用し、エアフローとスノーフレークに取り組んでいます。私はPythonオペレーターを使用してmysqlテーブルから作成された最大値を取得し、そのオペレーターのxcomに基づいてスノーフレークデータのcsvファイルを作成して、最新に作成されたデータのみをmysqlからスノーフレークにダンプします。問題は、SQL テンプレートの値を抽出すると、airflow xcom が二重引用符を返すことです。また、Snowflake は SQL クエリで一重引用符を受け入れます。エラー画像
これが私の DAG コードです:
リーリーMySQL クエリ テンプレート:
リーリー私の知識をさらに深めていただきありがとうございます。
get_max_created_timestamp
def get_max_created_timestamp(sql_table_name): フック = MySqlHook(MYSQL_CONN) check_column = f"テーブル名 = '{sql_table_name}' の INFORMATION_SCHEMA.COLUMNS から列名を選択します " \ f"および table_schema = '{MYSQL_SCHEMA}';" print(フック.スキーマ) データ = フック.get_records(check_column) if any(x の「created_at」 for x のデータ): date_sql = f'{MYSQL_SCHEMA}.{sql_table_name} から max(created_at) created_timestamp_max を選択します' (created_timestamp_max,) =フック.get_first(date_sql) return "'" created_timestamp_max "'" それ以外: 0 助けてください。これで、文字列はスノーフレーク文字列の期待値を表すために正しく引用符で囲まれるようになります。の出力を変更するかどうか: