Disebabkan keperluan kerja terkini, adalah perlu untuk menambah fungsi baharu pada projek sedia ada untuk melaksanakan fungsi muat semula panas konfigurasi. Muat semula panas konfigurasi yang dipanggil bermakna selepas perkhidmatan menerima mesej kemas kini konfigurasi, kami boleh menggunakan konfigurasi terkini untuk melaksanakan tugas tanpa memulakan semula perkhidmatan.
Di bawah saya menggunakan kaedah berbilang proses, berbilang benang dan coroutine untuk melaksanakan pemuatan panas konfigurasi.
Jika kami menggunakan berbilang proses dalam pelaksanaan kod, proses utama 1 mengemas kini konfigurasi dan menghantar arahan, dan panggilan tugas adalah proses 2. Bagaimana untuk melaksanakan konfigurasi pemuatan panas?
Apabila proses utama menerima mesej kemas kini konfigurasi (bagaimana bacaan konfigurasi menerima mesej kemas kini konfigurasi ? Kami tidak akan membincangkannya di sini lagi), proses utama menghantar isyarat bunuh ke sub-proses 1, sub-proses 1 keluar selepas menerima isyarat bunuh, dan kemudian fungsi pemprosesan isyarat memulakan proses baharu, menggunakan fail konfigurasi terkini. Teruskan dengan misi.
fungsi utama
def main(): # 启动一个进程执行任务 p1 = Process(target=run, args=("p1",)) p1.start() monitor(p1, run) # 注册信号 processes["case100"] = p1 #将进程pid保存 num = 0 while True: # 模拟获取配置更新 print( f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n") print(f"{processes=}\n") sleep(2) if num == 4: kill_process(processes["case100"]) # kill 当前进程 if num == 8: kill_process(processes["case100"]) # kill 当前进程 if num == 12: kill_process(processes["case100"]) # kill 当前进程 num += 1
fungsi pengendali_isyarat
def signal_handler(process: Process, func, signum, frame): # print(f"{signum=}") global counts if signum == 17: # 17 is SIGCHILD # 这个循环是为了忽略SIGTERM发出的信号,避免抢占了主进程发出的SIGCHILD for signame in [SIGTERM, SIGCHLD, SIGQUIT]: signal.signal(signame, SIG_DFL) print("Launch a new process") p = multiprocessing.Process(target=func, args=(f"p{counts}",)) p.start() monitor(p, run) processes["case100"] = p counts += 1 if signum == 2: if process.is_alive(): print(f"Kill {process} process") process.terminate() signal.signal(SIGCHLD, SIG_IGN) sys.exit("kill parent process")
Kod lengkap adalah seperti berikut
#! /usr/local/bin/python3.8 from multiprocessing import Process from typing import Dict import signal from signal import SIGCHLD, SIGTERM, SIGINT, SIGQUIT, SIG_DFL, SIG_IGN import multiprocessing from multiprocessing import Process from typing import Callable from data import processes import sys from functools import partial import time processes: Dict[str, Process] = {} counts = 2 def run(process: Process): while True: print(f"{process} running...") time.sleep(1) def kill_process(process: Process): print(f"kill {process}") process.terminate() def monitor(process: Process, func: Callable): for signame in [SIGTERM, SIGCHLD, SIGINT, SIGQUIT]: # SIGTERM is kill signal. # No SIGCHILD is not trigger singnal_handler, # No SIGINT is not handler ctrl+c, # No SIGQUIT is RuntimeError: reentrant call inside <_io.BufferedWriter name='<stdout>'> signal.signal(signame, partial(signal_handler, process, func)) def signal_handler(process: Process, func, signum, frame): print(f"{signum=}") global counts if signum == 17: # 17 is SIGTERM for signame in [SIGTERM, SIGCHLD, SIGQUIT]: signal.signal(signame, SIG_DFL) print("Launch a new process") p = multiprocessing.Process(target=func, args=(f"p{counts}",)) p.start() monitor(p, run) processes["case100"] = p counts += 1 if signum == 2: if process.is_alive(): print(f"Kill {process} process") process.terminate() signal.signal(SIGCHLD, SIG_IGN) sys.exit("kill parent process") def main(): p1 = Process(target=run, args=("p1",)) p1.start() monitor(p1, run) processes["case100"] = p1 num = 0 while True: print( f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n") print(f"{processes=}\n") time.sleep(2) if num == 4: kill_process(processes["case100"]) if num == 8: kill_process(processes["case100"]) if num == 12: kill_process(processes["case100"]) num += 1 if __name__ == '__main__': main()
Hasil pelaksanaan adalah seperti berikut
multiprocessing.active_children()=[<Process name='Process-1' pid=2533 parent=2532 started>], count=1 processes={'case100': <Process name='Process-1' pid=2533 parent=2532 started>} p1 running... p1 running... kill <Process name='Process-1' pid=2533 parent=2532 started> multiprocessing.active_children()=[<Process name='Process-1' pid=2533 parent=2532 started>], count=1 processes={'case100': <Process name='Process-1' pid=2533 parent=2532 started>} signum=17 Launch a new process p2 running... p2 running... multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 started>], count=1 processes={'case100': <Process name='Process-2' pid=2577 parent=2532 started>} p2 running... p2 running... multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 started>], count=1 processes={'case100': <Process name='Process-2' pid=2577 parent=2532 started>} p2 running... p2 running... multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 started>], count=1 processes={'case100': <Process name='Process-2' pid=2577 parent=2532 started>} p2 running... p2 running... kill <Process name='Process-2' pid=2577 parent=2532 started> signum=17 Launch a new process multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 stopped exitcode=-SIGTERM>], count=1 processes={'case100': <Process name='Process-3' pid=2675 parent=2532 started>} p3 running... p3 running... multiprocessing.active_children()=[<Process name='Process-3' pid=2675 parent=2532 started>], count=1
Ringkasan
Faedah: Menggunakan semaphore boleh menangani masalah komunikasi antara pelbagai proses.
Kelemahan: Kod sukar ditulis dan kod bertulis sukar difahami. Anda mesti biasa dengan penggunaan semafor, jika tidak, ia adalah mudah untuk menulis pepijat untuk diri sendiri (Semua pemula harus menggunakannya dengan berhati-hati, kecuali pemandu berpengalaman.)
Perkara lain yang tidak difahami secara khusus ialah <. 🎜> menghantar isyarat adalah process.terminate()
dan nombornya ialah 15, tetapi kali pertama SIGTERM
menerima isyarat itu adalah nombor=17 Jika saya ingin mengendalikan isyarat 15, ia akan menyebabkan masalah yang sebelumnya proses tidak boleh dibunuh. Sesiapa yang biasa dengan semaphore dialu-alukan untuk memberi nasihat kepada kami. signal_handler
fungsi penjadual
def scheduler(): while True: print('wait message...') case_configurations = scheduler_notify_queue.get() print(f"Got case configurations {case_configurations=}...") task_schedule_event.set() # 设置set之后, is_set 为True print(f"Schedule will start ...") while task_schedule_event.is_set(): # is_set 为True的话,那么任务就会一直执行 run(case_configurations) print("Clearing all scheduling job ...")
fungsi penjadual_event
def event_scheduler(case_config): scheduler_notify_queue.put(case_config) print(f"Put cases config to the Queue ...") task_schedule_event.clear() # clear之后,is_set 为False print(f"Clear scheduler jobs ...") print(f"Schedule job ...")
Kod lengkap Keputusan pelaksanaan adalah seperti berikut
import multiprocessing import time scheduler_notify_queue = multiprocessing.Queue() task_schedule_event = multiprocessing.Event() def run(case_configurations: str): print(f'{case_configurations} running...') time.sleep(3) def scheduler(): while True: print('wait message...') case_configurations = scheduler_notify_queue.get() print(f"Got case configurations {case_configurations=}...") task_schedule_event.set() print(f"Schedule will start ...") while task_schedule_event.is_set(): run(case_configurations) print("Clearing all scheduling job ...") def event_scheduler(case_config: str): scheduler_notify_queue.put(case_config) print(f"Put cases config to the Queue ...") task_schedule_event.clear() print(f"Clear scheduler jobs ...") print(f"Schedule job ...") def main(): scheduler_notify_queue.put('1') p = multiprocessing.Process(target=scheduler) p.start() count = 1 print(f'{count=}') while True: if count == 5: event_scheduler('100') if count == 10: event_scheduler('200') count += 1 time.sleep(1) if __name__ == '__main__': main()
wait message... Got case configurations case_configurations='1'... Schedule will start ... 1 running... 1 running... Put cases config to the Queue ... Clear scheduler jobs ... Schedule job ... Clearing all scheduling job ... wait message... Got case configurations case_configurations='100'... Schedule will start ... 100 running... Put cases config to the Queue ... Clear scheduler jobs ... Schedule job ... Clearing all scheduling job ... wait message... Got case configurations case_configurations='200'... Schedule will start ... 200 running... 200 running...
Ringkasan
Menggunakan pemberitahuan acara Acara, kod kurang terdedah kepada ralat, kurang kod ditulis dan lebih mudah dibaca. Berbanding dengan kaedah semaphore sebelumnya, adalah disyorkan agar anda menggunakan kaedah ini lebih kerap. Menggunakan multi-threading atau coroutine sebenarnya adalah sama dengan kaedah pelaksanaan di atas. Satu-satunya perbezaan ialah perpustakaan yang berbeza dipanggil, dan queue
.event
# threading scheduler_notify_queue = queue.Queue() task_schedule_event = threading.Event() # async scheduler_notify_queue = asyncio.Queue() task_schedule_event = asyncio.Event()
Atas ialah kandungan terperinci Bagaimana untuk melaksanakan pemuatan panas fail konfigurasi dalam Python. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!