Bagaimana untuk melaksanakan pemuatan panas fail konfigurasi dalam Python

WBOY
Lepaskan: 2023-05-07 18:31:20
ke hadapan
1756 orang telah melayarinya

    Latar Belakang

    Disebabkan keperluan kerja terkini, adalah perlu untuk menambah fungsi baharu pada projek sedia ada untuk melaksanakan fungsi muat semula panas konfigurasi. Muat semula panas konfigurasi yang dipanggil bermakna selepas perkhidmatan menerima mesej kemas kini konfigurasi, kami boleh menggunakan konfigurasi terkini untuk melaksanakan tugas tanpa memulakan semula perkhidmatan.

    Cara melaksanakan

    Di bawah saya menggunakan kaedah berbilang proses, berbilang benang dan coroutine untuk melaksanakan pemuatan panas konfigurasi.

    Gunakan berbilang proses untuk melaksanakan pemuatan panas konfigurasi

    Jika kami menggunakan berbilang proses dalam pelaksanaan kod, proses utama 1 mengemas kini konfigurasi dan menghantar arahan, dan panggilan tugas adalah proses 2. Bagaimana untuk melaksanakan konfigurasi pemuatan panas?

    Gunakan semaphore isyarat untuk melaksanakan pemuatan panas

    Bagaimana untuk melaksanakan pemuatan panas fail konfigurasi dalam Python

    Apabila proses utama menerima mesej kemas kini konfigurasi (bagaimana bacaan konfigurasi menerima mesej kemas kini konfigurasi ? Kami tidak akan membincangkannya di sini lagi), proses utama menghantar isyarat bunuh ke sub-proses 1, sub-proses 1 keluar selepas menerima isyarat bunuh, dan kemudian fungsi pemprosesan isyarat memulakan proses baharu, menggunakan fail konfigurasi terkini. Teruskan dengan misi.

    fungsi utama

    def main():
        # 启动一个进程执行任务
        p1 = Process(target=run, args=("p1",))
        p1.start()
    
        monitor(p1, run) # 注册信号
        processes["case100"] = p1 #将进程pid保存
        num = 0 
        while True: # 模拟获取配置更新
            print(
                f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n")
            print(f"{processes=}\n")
            sleep(2)
            if num == 4:
                kill_process(processes["case100"]) # kill 当前进程
            if num == 8:
                kill_process(processes["case100"]) # kill 当前进程
            if num == 12:
                kill_process(processes["case100"]) # kill 当前进程
            num += 1
    Salin selepas log masuk

    fungsi pengendali_isyarat

    def signal_handler(process: Process, func, signum, frame):
        # print(f"{signum=}")
        global counts
    
        if signum == 17:  # 17 is SIGCHILD 
            # 这个循环是为了忽略SIGTERM发出的信号,避免抢占了主进程发出的SIGCHILD
            for signame in [SIGTERM, SIGCHLD, SIGQUIT]:
                signal.signal(signame, SIG_DFL)
    
            print("Launch a new process")
            p = multiprocessing.Process(target=func, args=(f"p{counts}",))
            p.start()
            monitor(p, run)
            processes["case100"] = p
            counts += 1
    
        if signum == 2:
            if process.is_alive():
                print(f"Kill {process} process")
                process.terminate()
            signal.signal(SIGCHLD, SIG_IGN)
            sys.exit("kill parent process")
    Salin selepas log masuk

    Kod lengkap adalah seperti berikut

    #! /usr/local/bin/python3.8
    from multiprocessing import Process
    from typing import Dict
    import signal
    from signal import SIGCHLD, SIGTERM, SIGINT, SIGQUIT, SIG_DFL, SIG_IGN
    import multiprocessing
    from multiprocessing import Process
    from typing import Callable
    from data import processes
    import sys
    from functools import partial
    import time
    
    processes: Dict[str, Process] = {}
    counts = 2
    
    
    def run(process: Process):
        while True:
            print(f"{process} running...")
            time.sleep(1)
    
    
    def kill_process(process: Process):
        print(f"kill {process}")
        process.terminate()
    
    
    def monitor(process: Process, func: Callable):
        for signame in [SIGTERM, SIGCHLD, SIGINT, SIGQUIT]:
            # SIGTERM is kill signal.
            # No SIGCHILD is not trigger singnal_handler,
            # No SIGINT is not handler ctrl+c,
            # No SIGQUIT is RuntimeError: reentrant call inside <_io.BufferedWriter name=&#39;<stdout>&#39;>
            signal.signal(signame, partial(signal_handler, process, func))
    
    
    def signal_handler(process: Process, func, signum, frame):
        print(f"{signum=}")
        global counts
    
        if signum == 17:  # 17 is SIGTERM
            for signame in [SIGTERM, SIGCHLD, SIGQUIT]:
                signal.signal(signame, SIG_DFL)
            print("Launch a new process")
            p = multiprocessing.Process(target=func, args=(f"p{counts}",))
            p.start()
            monitor(p, run)
            processes["case100"] = p
            counts += 1
    
        if signum == 2:
            if process.is_alive():
                print(f"Kill {process} process")
                process.terminate()
            signal.signal(SIGCHLD, SIG_IGN)
            sys.exit("kill parent process")
    
    
    def main():
        p1 = Process(target=run, args=("p1",))
        p1.start()
        monitor(p1, run)
        processes["case100"] = p1
        num = 0
        while True:
            print(
                f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n")
            print(f"{processes=}\n")
            time.sleep(2)
            if num == 4:
                kill_process(processes["case100"])
            if num == 8:
                kill_process(processes["case100"])
            if num == 12:
                kill_process(processes["case100"])
            num += 1
    
    
    if __name__ == &#39;__main__&#39;:
        main()
    Salin selepas log masuk

    Hasil pelaksanaan adalah seperti berikut

    multiprocessing.active_children()=[<Process name=&#39;Process-1&#39; pid=2533 parent=2532 started>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-1&#39; pid=2533 parent=2532 started>}
    
    p1 running...
    p1 running...
    kill <Process name=&#39;Process-1&#39; pid=2533 parent=2532 started>
    multiprocessing.active_children()=[<Process name=&#39;Process-1&#39; pid=2533 parent=2532 started>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-1&#39; pid=2533 parent=2532 started>}
    
    signum=17
    Launch a new process
    p2 running...
    p2 running...
    multiprocessing.active_children()=[<Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>}
    
    p2 running...
    p2 running...
    multiprocessing.active_children()=[<Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>}
    
    p2 running...
    p2 running...
    multiprocessing.active_children()=[<Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>}
    
    p2 running...
    p2 running...
    kill <Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>
    signum=17
    Launch a new process
    multiprocessing.active_children()=[<Process name=&#39;Process-2&#39; pid=2577 parent=2532 stopped exitcode=-SIGTERM>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-3&#39; pid=2675 parent=2532 started>}
    
    p3 running...
    p3 running...
    multiprocessing.active_children()=[<Process name=&#39;Process-3&#39; pid=2675 parent=2532 started>], count=1
    Salin selepas log masuk

    Ringkasan

    Faedah: Menggunakan semaphore boleh menangani masalah komunikasi antara pelbagai proses.

    Kelemahan: Kod sukar ditulis dan kod bertulis sukar difahami. Anda mesti biasa dengan penggunaan semafor, jika tidak, ia adalah mudah untuk menulis pepijat untuk diri sendiri (Semua pemula harus menggunakannya dengan berhati-hati, kecuali pemandu berpengalaman.)

    Perkara lain yang tidak difahami secara khusus ialah <. 🎜> menghantar isyarat adalah process.terminate() dan nombornya ialah 15, tetapi kali pertama SIGTERM menerima isyarat itu adalah nombor=17 Jika saya ingin mengendalikan isyarat 15, ia akan menyebabkan masalah yang sebelumnya proses tidak boleh dibunuh. Sesiapa yang biasa dengan semaphore dialu-alukan untuk memberi nasihat kepada kami. signal_handler

    Gunakan multiprocessing.Event untuk melaksanakan pemuatan panas konfigurasi

    Logik pelaksanaan ialah proses utama 1 mengemas kini konfigurasi dan menghantar arahan. Proses 2 memulakan tugas penjadualan.

    Pada masa ini, selepas proses utama 1 mengemas kini konfigurasi, ia menghantar arahan untuk memproses 2. Arahan pada masa ini ialah menggunakan Acara untuk memberitahu acara tak segerak.

    Pergi terus ke kod

    fungsi penjadual

    def scheduler():
        while True:
            print(&#39;wait message...&#39;)
            case_configurations = scheduler_notify_queue.get()
            print(f"Got case configurations {case_configurations=}...")
    
            task_schedule_event.set() # 设置set之后, is_set 为True
    
            print(f"Schedule will start ...")
            while task_schedule_event.is_set(): # is_set 为True的话,那么任务就会一直执行
                run(case_configurations)
    
            print("Clearing all scheduling job ...")
    Salin selepas log masuk

    fungsi penjadual_event

    def event_scheduler(case_config):
    
        scheduler_notify_queue.put(case_config)
        print(f"Put cases config to the Queue ...")
    
        task_schedule_event.clear() # clear之后,is_set 为False
        print(f"Clear scheduler jobs ...")
    
        print(f"Schedule job ...")
    Salin selepas log masuk

    Kod lengkap Keputusan pelaksanaan adalah seperti berikut

    import multiprocessing
    import time
    
    
    scheduler_notify_queue = multiprocessing.Queue()
    task_schedule_event = multiprocessing.Event()
    
    
    def run(case_configurations: str):
        print(f&#39;{case_configurations} running...&#39;)
        time.sleep(3)
    
    
    def scheduler():
        while True:
            print(&#39;wait message...&#39;)
            case_configurations = scheduler_notify_queue.get()
    
            print(f"Got case configurations {case_configurations=}...")
            task_schedule_event.set()
    
            print(f"Schedule will start ...")
            while task_schedule_event.is_set():
                run(case_configurations)
    
            print("Clearing all scheduling job ...")
    
    
    def event_scheduler(case_config: str):
    
        scheduler_notify_queue.put(case_config)
        print(f"Put cases config to the Queue ...")
    
        task_schedule_event.clear()
        print(f"Clear scheduler jobs ...")
    
        print(f"Schedule job ...")
    
    
    def main():
        scheduler_notify_queue.put(&#39;1&#39;)
        p = multiprocessing.Process(target=scheduler)
        p.start()
    
        count = 1
        print(f&#39;{count=}&#39;)
        while True:
            if count == 5:
                event_scheduler(&#39;100&#39;)
            if count == 10:
                event_scheduler(&#39;200&#39;)
            count += 1
            time.sleep(1)
    
    
    if __name__ == &#39;__main__&#39;:
        main()
    Salin selepas log masuk

    Keputusan pelaksanaan adalah seperti berikut

    wait message...
    Got case configurations case_configurations=&#39;1&#39;...
    Schedule will start ...
    1 running...
    1 running...
    Put cases config to the Queue ...
    Clear scheduler jobs ...
    Schedule job ...
    Clearing all scheduling job ...
    wait message...
    Got case configurations case_configurations=&#39;100&#39;...
    Schedule will start ...
    100 running...
    Put cases config to the Queue ...
    Clear scheduler jobs ...
    Schedule job ...
    Clearing all scheduling job ...
    wait message...
    Got case configurations case_configurations=&#39;200&#39;...
    Schedule will start ...
    200 running...
    200 running...
    Salin selepas log masuk

    Ringkasan

    Menggunakan pemberitahuan acara Acara, kod kurang terdedah kepada ralat, kurang kod ditulis dan lebih mudah dibaca. Berbanding dengan kaedah semaphore sebelumnya, adalah disyorkan agar anda menggunakan kaedah ini lebih kerap.

    Menggunakan multi-threading atau coroutine sebenarnya adalah sama dengan kaedah pelaksanaan di atas. Satu-satunya perbezaan ialah perpustakaan yang berbeza dipanggil,

    dan queue.event

    # threading
    scheduler_notify_queue = queue.Queue()
    task_schedule_event = threading.Event()
    
    # async
    scheduler_notify_queue = asyncio.Queue()
    task_schedule_event = asyncio.Event()
    Salin selepas log masuk

    Atas ialah kandungan terperinci Bagaimana untuk melaksanakan pemuatan panas fail konfigurasi dalam Python. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

    Label berkaitan:
    sumber:yisu.com
    Kenyataan Laman Web ini
    Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
    Tutorial Popular
    Lagi>
    Muat turun terkini
    Lagi>
    kesan web
    Kod sumber laman web
    Bahan laman web
    Templat hujung hadapan