How to implement hot loading of configuration files in Python

WBOY
Release: 2023-05-07 18:31:20
forward
1744 people have browsed it

    Background

    Due to recent work requirements, it is necessary to add a new function to the existing project to implement the configuration hot reloading function. The so-called configuration hot reloading means that after the service receives the configuration update message, we can use the latest configuration to perform tasks without restarting the service.

    How to implement

    Below I use multi-process, multi-thread, and coroutine methods to implement hot reloading of configuration.

    Use multiple processes to implement configuration hot loading

    If we use multiple processes in code implementation, the main process 1 updates the configuration and sends instructions, and the task call is process 2. How to implement configuration hot loading Woolen cloth?

    Use signal semaphore to implement hot loading

    How to implement hot loading of configuration files in Python

    When the main process receives the configuration update message (how does the configuration read receive the configuration update message? ? We won’t discuss it here yet), the main process sends a kill signal to sub-process 1, sub-process 1 exits after receiving the kill signal, and then the signal processing function starts a new process, using the latest configuration file. Continue with the mission.

    main function

    def main():
        # 启动一个进程执行任务
        p1 = Process(target=run, args=("p1",))
        p1.start()
    
        monitor(p1, run) # 注册信号
        processes["case100"] = p1 #将进程pid保存
        num = 0 
        while True: # 模拟获取配置更新
            print(
                f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n")
            print(f"{processes=}\n")
            sleep(2)
            if num == 4:
                kill_process(processes["case100"]) # kill 当前进程
            if num == 8:
                kill_process(processes["case100"]) # kill 当前进程
            if num == 12:
                kill_process(processes["case100"]) # kill 当前进程
            num += 1
    Copy after login

    signal_handler function

    def signal_handler(process: Process, func, signum, frame):
        # print(f"{signum=}")
        global counts
    
        if signum == 17:  # 17 is SIGCHILD 
            # 这个循环是为了忽略SIGTERM发出的信号,避免抢占了主进程发出的SIGCHILD
            for signame in [SIGTERM, SIGCHLD, SIGQUIT]:
                signal.signal(signame, SIG_DFL)
    
            print("Launch a new process")
            p = multiprocessing.Process(target=func, args=(f"p{counts}",))
            p.start()
            monitor(p, run)
            processes["case100"] = p
            counts += 1
    
        if signum == 2:
            if process.is_alive():
                print(f"Kill {process} process")
                process.terminate()
            signal.signal(SIGCHLD, SIG_IGN)
            sys.exit("kill parent process")
    Copy after login

    The complete code is as follows

    #! /usr/local/bin/python3.8
    from multiprocessing import Process
    from typing import Dict
    import signal
    from signal import SIGCHLD, SIGTERM, SIGINT, SIGQUIT, SIG_DFL, SIG_IGN
    import multiprocessing
    from multiprocessing import Process
    from typing import Callable
    from data import processes
    import sys
    from functools import partial
    import time
    
    processes: Dict[str, Process] = {}
    counts = 2
    
    
    def run(process: Process):
        while True:
            print(f"{process} running...")
            time.sleep(1)
    
    
    def kill_process(process: Process):
        print(f"kill {process}")
        process.terminate()
    
    
    def monitor(process: Process, func: Callable):
        for signame in [SIGTERM, SIGCHLD, SIGINT, SIGQUIT]:
            # SIGTERM is kill signal.
            # No SIGCHILD is not trigger singnal_handler,
            # No SIGINT is not handler ctrl+c,
            # No SIGQUIT is RuntimeError: reentrant call inside <_io.BufferedWriter name=&#39;<stdout>&#39;>
            signal.signal(signame, partial(signal_handler, process, func))
    
    
    def signal_handler(process: Process, func, signum, frame):
        print(f"{signum=}")
        global counts
    
        if signum == 17:  # 17 is SIGTERM
            for signame in [SIGTERM, SIGCHLD, SIGQUIT]:
                signal.signal(signame, SIG_DFL)
            print("Launch a new process")
            p = multiprocessing.Process(target=func, args=(f"p{counts}",))
            p.start()
            monitor(p, run)
            processes["case100"] = p
            counts += 1
    
        if signum == 2:
            if process.is_alive():
                print(f"Kill {process} process")
                process.terminate()
            signal.signal(SIGCHLD, SIG_IGN)
            sys.exit("kill parent process")
    
    
    def main():
        p1 = Process(target=run, args=("p1",))
        p1.start()
        monitor(p1, run)
        processes["case100"] = p1
        num = 0
        while True:
            print(
                f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n")
            print(f"{processes=}\n")
            time.sleep(2)
            if num == 4:
                kill_process(processes["case100"])
            if num == 8:
                kill_process(processes["case100"])
            if num == 12:
                kill_process(processes["case100"])
            num += 1
    
    
    if __name__ == &#39;__main__&#39;:
        main()
    Copy after login

    The execution results are as follows

    multiprocessing.active_children()=[<Process name=&#39;Process-1&#39; pid=2533 parent=2532 started>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-1&#39; pid=2533 parent=2532 started>}
    
    p1 running...
    p1 running...
    kill <Process name=&#39;Process-1&#39; pid=2533 parent=2532 started>
    multiprocessing.active_children()=[<Process name=&#39;Process-1&#39; pid=2533 parent=2532 started>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-1&#39; pid=2533 parent=2532 started>}
    
    signum=17
    Launch a new process
    p2 running...
    p2 running...
    multiprocessing.active_children()=[<Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>}
    
    p2 running...
    p2 running...
    multiprocessing.active_children()=[<Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>}
    
    p2 running...
    p2 running...
    multiprocessing.active_children()=[<Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>}
    
    p2 running...
    p2 running...
    kill <Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>
    signum=17
    Launch a new process
    multiprocessing.active_children()=[<Process name=&#39;Process-2&#39; pid=2577 parent=2532 stopped exitcode=-SIGTERM>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-3&#39; pid=2675 parent=2532 started>}
    
    p3 running...
    p3 running...
    multiprocessing.active_children()=[<Process name=&#39;Process-3&#39; pid=2675 parent=2532 started>], count=1
    Copy after login

    Summary

    Benefits: Using semaphores can handle communication problems between multiple processes.

    Disadvantages: The code is difficult to write, and the written code is difficult to understand. You must be familiar with the use of semaphores, otherwise it is easy to write a bug for yourself. (All beginners should use it with caution, except experienced drivers.)

    Another thing that is not particularly understood isprocess. terminate() The signal sent is SIGTERM number is 15, but the first time signal_handler receives the signal is number=17. If I want to process the signal of 15, This will lead to the problem that the previous process cannot be killed. Anyone who is familiar with semaphores is welcome to give us some advice. Thank you very much.

    Use multiprocessing.Event to implement configuration hot loading

    The implementation logic is that the main process 1 updates the configuration and sends instructions. Process 2 starts the scheduling task.

    At this time, after the main process 1 updates the configuration, it sends an instruction to process 2. The instruction at this time is to use Event to notify an asynchronous event.

    Go directly to the code

    scheduler function

    def scheduler():
        while True:
            print(&#39;wait message...&#39;)
            case_configurations = scheduler_notify_queue.get()
            print(f"Got case configurations {case_configurations=}...")
    
            task_schedule_event.set() # 设置set之后, is_set 为True
    
            print(f"Schedule will start ...")
            while task_schedule_event.is_set(): # is_set 为True的话,那么任务就会一直执行
                run(case_configurations)
    
            print("Clearing all scheduling job ...")
    Copy after login

    event_scheduler function

    def event_scheduler(case_config):
    
        scheduler_notify_queue.put(case_config)
        print(f"Put cases config to the Queue ...")
    
        task_schedule_event.clear() # clear之后,is_set 为False
        print(f"Clear scheduler jobs ...")
    
        print(f"Schedule job ...")
    Copy after login

    Complete code The execution results are as follows

    import multiprocessing
    import time
    
    
    scheduler_notify_queue = multiprocessing.Queue()
    task_schedule_event = multiprocessing.Event()
    
    
    def run(case_configurations: str):
        print(f&#39;{case_configurations} running...&#39;)
        time.sleep(3)
    
    
    def scheduler():
        while True:
            print(&#39;wait message...&#39;)
            case_configurations = scheduler_notify_queue.get()
    
            print(f"Got case configurations {case_configurations=}...")
            task_schedule_event.set()
    
            print(f"Schedule will start ...")
            while task_schedule_event.is_set():
                run(case_configurations)
    
            print("Clearing all scheduling job ...")
    
    
    def event_scheduler(case_config: str):
    
        scheduler_notify_queue.put(case_config)
        print(f"Put cases config to the Queue ...")
    
        task_schedule_event.clear()
        print(f"Clear scheduler jobs ...")
    
        print(f"Schedule job ...")
    
    
    def main():
        scheduler_notify_queue.put(&#39;1&#39;)
        p = multiprocessing.Process(target=scheduler)
        p.start()
    
        count = 1
        print(f&#39;{count=}&#39;)
        while True:
            if count == 5:
                event_scheduler(&#39;100&#39;)
            if count == 10:
                event_scheduler(&#39;200&#39;)
            count += 1
            time.sleep(1)
    
    
    if __name__ == &#39;__main__&#39;:
        main()
    Copy after login

    The execution results are as follows

    wait message...
    Got case configurations case_configurations=&#39;1&#39;...
    Schedule will start ...
    1 running...
    1 running...
    Put cases config to the Queue ...
    Clear scheduler jobs ...
    Schedule job ...
    Clearing all scheduling job ...
    wait message...
    Got case configurations case_configurations=&#39;100&#39;...
    Schedule will start ...
    100 running...
    Put cases config to the Queue ...
    Clear scheduler jobs ...
    Schedule job ...
    Clearing all scheduling job ...
    wait message...
    Got case configurations case_configurations=&#39;200&#39;...
    Schedule will start ...
    200 running...
    200 running...
    Copy after login

    Summary

    Using Event event notification, the code is less error-prone, less code is written, and is easier to read. Compared with the previous semaphore method, it is recommended that you use this method more often.

    Using multi-threading or coroutine is actually the same as the above implementation method. The only difference is that different libraries are called, queue and event.

    # threading
    scheduler_notify_queue = queue.Queue()
    task_schedule_event = threading.Event()
    
    # async
    scheduler_notify_queue = asyncio.Queue()
    task_schedule_event = asyncio.Event()
    Copy after login

    The above is the detailed content of How to implement hot loading of configuration files in Python. For more information, please follow other related articles on the PHP Chinese website!

    Related labels:
    source:yisu.com
    Statement of this Website
    The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
    Popular Tutorials
    More>
    Latest Downloads
    More>
    Web Effects
    Website Source Code
    Website Materials
    Front End Template