Berikut ialah tugas "menambah cincang pada DataFrame sedia ada" daripada mengambil masa beberapa hari kepada mengambil hampir keseluruhan pecut.
Pada Q2 2022, saya mula mengusahakan saluran paip data yang mengambil data pasaran daripada perkhidmatan REST dan menyimpannya dalam jadual BigQuery. Ini adalah penjelasan peringkat tinggi mengenai saluran paip. Bahagian yang menarik ialah cara data ditanya data, menukarkannya kepada DataFrame dan kemudian memuat naiknya jadual BigQuery menggunakan GCSToBigQueryOperator AirFlow.
Pada mulanya, ia kelihatan mudah untuk ditulis, tetapi prinsip "idempoten" Aliran Udara menambahkannya sedikit cabaran. Perkara yang perlu diambil daripada perkhidmatan REST ini telah diputuskan oleh jadual lain dan walaupun JOB adalah idempoten, jadual yang digunakan sebagai rujukan boleh berubah antara 2 larian. Selepas meluangkan masa tambahan, bercakap dengan saluran paip Jurutera Data telah sedia Menjelang penghujung S3 tahun 2022.
Maju pantas ke S1 2024. Pada masa ini, kami mempunyai lebih ramai pengguna yang mengakses data dan kami menyedari corak pertanyaan kami tidak menggunakan partition dengan betul. Atau sebaliknya, kami ingin mengakses data berdasarkan lajur rentetan, tetapi tidak boleh membahagikan lajur rentetan dalam BigQuery. Ini membawa kepada mengimbas sejumlah besar data dan kerap mencapai kuota harian.
Ini membawa kami untuk mempertimbangkan cara membahagikan data berdasarkan lajur rentetan. Jurutera data kami mencadangkan untuk menukar lajur rentetan itu kepada integer menggunakan FarmHash dengan operasi modulo tambahan. Dalam bukti konsep, ini mengurangkan pengimbasan hampir 90% dan prestasi pertanyaan meningkat sebanyak 3-5x. Kami memutuskan untuk meneruskan ini sebagai penyelesaian terakhir. Apa yang kami perlukan ialah:
Untuk mengira cap jari FarmHash dalam Python, terdapat modul pyfarmhash. Saya memasang modul dan menggunakan kod di bawah untuk mengira cincang, dan secara tempatan semuanya berfungsi seperti yang dikehendaki.
def get_hash(val: str) -> int: return additonal_logic(pyfarmhash.fingerprint64(...)) df[‘hash’] = df[‘Col’].apply(get_hash)
Dengan semua ujian lulus, sudah tiba masanya untuk menolak kod ke Aliran Udara dan menjalankannya. Saya tidak menjangkakan apa-apa akan berlaku pada peringkat ini. Malah, saya gembira kerana semuanya berjalan seperti yang dirancang dan dalam masa yang dianggarkan.
Dengan fikiran yang gembira dan penuh keyakinan, saya menolak perubahan saya, memulakan kerja, dan kemudian menunggu selama 10-15 minit untuk ia selesai. Sementara itu, saya beralih kepada tugas lain. Tidak lama kemudian, saya menerima e-mel kegagalan yang tidak dijangka daripada Airflow. Saya melihat log dan terkejut melihat bahawa ia gagal semasa memasang modul pyfarmhash!
Untuk membantu anda memahami masalah, saya perlu menerangkan struktur kerja. Pekerjaan mempunyai langkah-langkah berikut:
Dalam proses ini, tugasan-1, yang memuat turun data, ialah modul Python yang berasingan. Untuk menjalankannya, saya menggunakan PythonVirtualenvOperator daripada Airflow. Operator ini membenarkan anda untuk menentukan pakej sebagai keperluan dan kemudian memasangnya dalam persekitaran maya yang baru dibuat. Setelah pakej dipasang, semua kebergantungannya turut dipasang dan anda sudah bersedia untuk melancarkan.
Saya menambah pyfarmhash sebagai pergantungan kepada modul yang memuat turun data dan semua yang lain kekal tidak berubah. Dan ia gagal! Kenapa?
pyfarmhash ialah perpustakaan pencincangan yang dilaksanakan dalam C/C++. Selepas pemasangan, ia memerlukan GCC untuk menyusun pakej, dan itu tidak terdapat pada hos Aliran Udara. Adalah masuk akal untuk tidak mempunyai GCC pada hos Aliran Udara, tetapi malangnya, ini adalah penyekat untuk saya.
Saya mencari pelaksanaan Python tulen bagi pakej pyfarmhash, tetapi tiada. Kemudian, saya mencari pakej roda, tetapi sekali lagi, tidak ada. Saya mempertimbangkan untuk membina pakej roda dan menolaknya, tetapi itu akan membawa kepada tanggungjawab jangka panjang untuk menyediakan pakej roda secara dalaman. Saya ingin mengelakkan langkah tambahan seperti penyelesaian. Saya meneroka semua pilihan dan membincangkannya dengan pasukan yang mengekalkan Aliran Udara. Mereka mencadangkan mencipta imej Docker dan menjalankannya dalam KubernetesPodOperator. Ini adalah pilihan yang baik, kerana saya boleh mengawal persekitaran dan memasukkan apa sahaja yang diperlukan tanpa bergantung pada persekitaran luaran. Selain itu, penyelesaian ini tidak mempunyai penyelesaian. Satu-satunya kelemahan jangka pendek ialah ia memerlukan lebih banyak masa untuk dilaksanakan.
Sebelum bermula dengan penyelesaian berasaskan Docker, saya telah menghabiskan kira-kira 16-20 jam untuk tugasan ini. Untuk penyelesaian berasaskan Docker, saya juga perlu:
Memandangkan saya tidak lagi akan menggunakan PythonVirtualEnvOperator dalam Aliran Udara, saya memutuskan untuk mengalih keluarnya sepenuhnya juga menambah baik aliran kerja. Saya terpaksa menukar pakej python untuk mempunyai titik masuk untuk mula memuat turun & membersihkan logik
Saya mengambil masa 30-36 jam tambahan untuk mendapatkan penyelesaian muktamad dengan imej Docker sedia, iaitu 6-7 hari bekerja dan, dengan 2 hari permulaan disertakan, ia menjadi tugasan pecut yang panjang.
Saya melihat kembali perkara ini dan tertanya-tanya, saya terpaksa membuang penyelesaian yang berfungsi, menukar struktur modul, mencipta imej docker, menukar 10+ kerja AirFlow untuk menggunakan imej Docker untuk tugasan, menangani realiti ini dan mengatasi kekecewaan awal. Semua ini hanya kerana, "Satu modul python memerlukan "gcc" untuk dikompilasi!"
Atas ialah kandungan terperinci Menganggarkan Tugasan Pengekodan: Apa yang Boleh Berlaku Salah?. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!