Due to recent work requirements, it is necessary to add a new function to the existing project to implement the configuration hot reloading function. The so-called configuration hot reloading means that after the service receives the configuration update message, we can use the latest configuration to perform tasks without restarting the service.
Below I use multi-process, multi-thread, and coroutine methods to implement hot reloading of configuration.
If we use multiple processes in code implementation, the main process 1 updates the configuration and sends instructions, and the task call is process 2. How to implement configuration hot loading Woolen cloth?
When the main process receives the configuration update message (how does the configuration read receive the configuration update message? ? We won’t discuss it here yet), the main process sends a kill signal to sub-process 1, sub-process 1 exits after receiving the kill signal, and then the signal processing function starts a new process, using the latest configuration file. Continue with the mission.
main function
def main(): # 启动一个进程执行任务 p1 = Process(target=run, args=("p1",)) p1.start() monitor(p1, run) # 注册信号 processes["case100"] = p1 #将进程pid保存 num = 0 while True: # 模拟获取配置更新 print( f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n") print(f"{processes=}\n") sleep(2) if num == 4: kill_process(processes["case100"]) # kill 当前进程 if num == 8: kill_process(processes["case100"]) # kill 当前进程 if num == 12: kill_process(processes["case100"]) # kill 当前进程 num += 1
signal_handler function
def signal_handler(process: Process, func, signum, frame): # print(f"{signum=}") global counts if signum == 17: # 17 is SIGCHILD # 这个循环是为了忽略SIGTERM发出的信号,避免抢占了主进程发出的SIGCHILD for signame in [SIGTERM, SIGCHLD, SIGQUIT]: signal.signal(signame, SIG_DFL) print("Launch a new process") p = multiprocessing.Process(target=func, args=(f"p{counts}",)) p.start() monitor(p, run) processes["case100"] = p counts += 1 if signum == 2: if process.is_alive(): print(f"Kill {process} process") process.terminate() signal.signal(SIGCHLD, SIG_IGN) sys.exit("kill parent process")
The complete code is as follows
#! /usr/local/bin/python3.8 from multiprocessing import Process from typing import Dict import signal from signal import SIGCHLD, SIGTERM, SIGINT, SIGQUIT, SIG_DFL, SIG_IGN import multiprocessing from multiprocessing import Process from typing import Callable from data import processes import sys from functools import partial import time processes: Dict[str, Process] = {} counts = 2 def run(process: Process): while True: print(f"{process} running...") time.sleep(1) def kill_process(process: Process): print(f"kill {process}") process.terminate() def monitor(process: Process, func: Callable): for signame in [SIGTERM, SIGCHLD, SIGINT, SIGQUIT]: # SIGTERM is kill signal. # No SIGCHILD is not trigger singnal_handler, # No SIGINT is not handler ctrl+c, # No SIGQUIT is RuntimeError: reentrant call inside <_io.BufferedWriter name='<stdout>'> signal.signal(signame, partial(signal_handler, process, func)) def signal_handler(process: Process, func, signum, frame): print(f"{signum=}") global counts if signum == 17: # 17 is SIGTERM for signame in [SIGTERM, SIGCHLD, SIGQUIT]: signal.signal(signame, SIG_DFL) print("Launch a new process") p = multiprocessing.Process(target=func, args=(f"p{counts}",)) p.start() monitor(p, run) processes["case100"] = p counts += 1 if signum == 2: if process.is_alive(): print(f"Kill {process} process") process.terminate() signal.signal(SIGCHLD, SIG_IGN) sys.exit("kill parent process") def main(): p1 = Process(target=run, args=("p1",)) p1.start() monitor(p1, run) processes["case100"] = p1 num = 0 while True: print( f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n") print(f"{processes=}\n") time.sleep(2) if num == 4: kill_process(processes["case100"]) if num == 8: kill_process(processes["case100"]) if num == 12: kill_process(processes["case100"]) num += 1 if __name__ == '__main__': main()
The execution results are as follows
multiprocessing.active_children()=[<Process name='Process-1' pid=2533 parent=2532 started>], count=1 processes={'case100': <Process name='Process-1' pid=2533 parent=2532 started>} p1 running... p1 running... kill <Process name='Process-1' pid=2533 parent=2532 started> multiprocessing.active_children()=[<Process name='Process-1' pid=2533 parent=2532 started>], count=1 processes={'case100': <Process name='Process-1' pid=2533 parent=2532 started>} signum=17 Launch a new process p2 running... p2 running... multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 started>], count=1 processes={'case100': <Process name='Process-2' pid=2577 parent=2532 started>} p2 running... p2 running... multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 started>], count=1 processes={'case100': <Process name='Process-2' pid=2577 parent=2532 started>} p2 running... p2 running... multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 started>], count=1 processes={'case100': <Process name='Process-2' pid=2577 parent=2532 started>} p2 running... p2 running... kill <Process name='Process-2' pid=2577 parent=2532 started> signum=17 Launch a new process multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 stopped exitcode=-SIGTERM>], count=1 processes={'case100': <Process name='Process-3' pid=2675 parent=2532 started>} p3 running... p3 running... multiprocessing.active_children()=[<Process name='Process-3' pid=2675 parent=2532 started>], count=1
Summary
Benefits: Using semaphores can handle communication problems between multiple processes.
Disadvantages: The code is difficult to write, and the written code is difficult to understand. You must be familiar with the use of semaphores, otherwise it is easy to write a bug for yourself. (All beginners should use it with caution, except experienced drivers.)
Another thing that is not particularly understood isprocess. terminate()
The signal sent is SIGTERM
number is 15, but the first time signal_handler
receives the signal is number=17. If I want to process the signal of 15, This will lead to the problem that the previous process cannot be killed. Anyone who is familiar with semaphores is welcome to give us some advice. Thank you very much.
The implementation logic is that the main process 1 updates the configuration and sends instructions. Process 2 starts the scheduling task.
At this time, after the main process 1 updates the configuration, it sends an instruction to process 2. The instruction at this time is to use Event to notify an asynchronous event.
Go directly to the code
scheduler function
def scheduler(): while True: print('wait message...') case_configurations = scheduler_notify_queue.get() print(f"Got case configurations {case_configurations=}...") task_schedule_event.set() # 设置set之后, is_set 为True print(f"Schedule will start ...") while task_schedule_event.is_set(): # is_set 为True的话,那么任务就会一直执行 run(case_configurations) print("Clearing all scheduling job ...")
event_scheduler function
def event_scheduler(case_config): scheduler_notify_queue.put(case_config) print(f"Put cases config to the Queue ...") task_schedule_event.clear() # clear之后,is_set 为False print(f"Clear scheduler jobs ...") print(f"Schedule job ...")
Complete code The execution results are as follows
import multiprocessing import time scheduler_notify_queue = multiprocessing.Queue() task_schedule_event = multiprocessing.Event() def run(case_configurations: str): print(f'{case_configurations} running...') time.sleep(3) def scheduler(): while True: print('wait message...') case_configurations = scheduler_notify_queue.get() print(f"Got case configurations {case_configurations=}...") task_schedule_event.set() print(f"Schedule will start ...") while task_schedule_event.is_set(): run(case_configurations) print("Clearing all scheduling job ...") def event_scheduler(case_config: str): scheduler_notify_queue.put(case_config) print(f"Put cases config to the Queue ...") task_schedule_event.clear() print(f"Clear scheduler jobs ...") print(f"Schedule job ...") def main(): scheduler_notify_queue.put('1') p = multiprocessing.Process(target=scheduler) p.start() count = 1 print(f'{count=}') while True: if count == 5: event_scheduler('100') if count == 10: event_scheduler('200') count += 1 time.sleep(1) if __name__ == '__main__': main()
The execution results are as follows
wait message... Got case configurations case_configurations='1'... Schedule will start ... 1 running... 1 running... Put cases config to the Queue ... Clear scheduler jobs ... Schedule job ... Clearing all scheduling job ... wait message... Got case configurations case_configurations='100'... Schedule will start ... 100 running... Put cases config to the Queue ... Clear scheduler jobs ... Schedule job ... Clearing all scheduling job ... wait message... Got case configurations case_configurations='200'... Schedule will start ... 200 running... 200 running...
Summary
Using Event event notification, the code is less error-prone, less code is written, and is easier to read. Compared with the previous semaphore method, it is recommended that you use this method more often.
Using multi-threading or coroutine is actually the same as the above implementation method. The only difference is that different libraries are called, queue
and event
.
# threading scheduler_notify_queue = queue.Queue() task_schedule_event = threading.Event() # async scheduler_notify_queue = asyncio.Queue() task_schedule_event = asyncio.Event()
The above is the detailed content of How to implement hot loading of configuration files in Python. For more information, please follow other related articles on the PHP Chinese website!