Comment implémenter le chargement à chaud des fichiers de configuration en Python

WBOY
Libérer: 2023-05-07 18:31:20
avant
1753 Les gens l'ont consulté

    Contexte

    En raison des exigences de travaux récentes, il est nécessaire d'ajouter une nouvelle fonction au projet existant pour implémenter la fonction de rechargement à chaud de la configuration. Le soi-disant rechargement à chaud de la configuration signifie qu'une fois que le service a reçu le message de mise à jour de la configuration, nous pouvons utiliser la dernière configuration pour effectuer des tâches sans redémarrer le service.

    Comment implémenter

    Ci-dessous, j'utilise des méthodes multi-processus, multi-thread et coroutine pour implémenter le rechargement à chaud de la configuration.

    Utiliser plusieurs processus pour implémenter le chargement à chaud de la configuration

    Si nous utilisons plusieurs processus dans l'implémentation du code, le processus principal 1 met à jour la configuration et envoie des instructions, et l'appel de tâche est le processus 2. Comment implémenter le chargement à chaud de la configuration ?

    Utilisez un sémaphore de signal pour implémenter le chargement à chaud

    Comment implémenter le chargement à chaud des fichiers de configuration en Python

    Lorsque le processus principal reçoit le message de mise à jour de la configuration (comment la lecture de configuration reçoit-elle le message de mise à jour de la configuration ? Nous n'en discuterons pas ici), le processus principal passe au sous-processus 1 envoie un signal kill. Le sous-processus 1 se termine après avoir reçu le signal kill. Ensuite, la fonction de traitement du signal démarre un nouveau processus et utilise le dernier fichier de configuration pour continuer à exécuter la tâche.

    fonction principale

    def main():
        # 启动一个进程执行任务
        p1 = Process(target=run, args=("p1",))
        p1.start()
    
        monitor(p1, run) # 注册信号
        processes["case100"] = p1 #将进程pid保存
        num = 0 
        while True: # 模拟获取配置更新
            print(
                f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n")
            print(f"{processes=}\n")
            sleep(2)
            if num == 4:
                kill_process(processes["case100"]) # kill 当前进程
            if num == 8:
                kill_process(processes["case100"]) # kill 当前进程
            if num == 12:
                kill_process(processes["case100"]) # kill 当前进程
            num += 1
    Copier après la connexion

    fonction signal_handler

    def signal_handler(process: Process, func, signum, frame):
        # print(f"{signum=}")
        global counts
    
        if signum == 17:  # 17 is SIGCHILD 
            # 这个循环是为了忽略SIGTERM发出的信号,避免抢占了主进程发出的SIGCHILD
            for signame in [SIGTERM, SIGCHLD, SIGQUIT]:
                signal.signal(signame, SIG_DFL)
    
            print("Launch a new process")
            p = multiprocessing.Process(target=func, args=(f"p{counts}",))
            p.start()
            monitor(p, run)
            processes["case100"] = p
            counts += 1
    
        if signum == 2:
            if process.is_alive():
                print(f"Kill {process} process")
                process.terminate()
            signal.signal(SIGCHLD, SIG_IGN)
            sys.exit("kill parent process")
    Copier après la connexion

    Le code complet est le suivant

    #! /usr/local/bin/python3.8
    from multiprocessing import Process
    from typing import Dict
    import signal
    from signal import SIGCHLD, SIGTERM, SIGINT, SIGQUIT, SIG_DFL, SIG_IGN
    import multiprocessing
    from multiprocessing import Process
    from typing import Callable
    from data import processes
    import sys
    from functools import partial
    import time
    
    processes: Dict[str, Process] = {}
    counts = 2
    
    
    def run(process: Process):
        while True:
            print(f"{process} running...")
            time.sleep(1)
    
    
    def kill_process(process: Process):
        print(f"kill {process}")
        process.terminate()
    
    
    def monitor(process: Process, func: Callable):
        for signame in [SIGTERM, SIGCHLD, SIGINT, SIGQUIT]:
            # SIGTERM is kill signal.
            # No SIGCHILD is not trigger singnal_handler,
            # No SIGINT is not handler ctrl+c,
            # No SIGQUIT is RuntimeError: reentrant call inside <_io.BufferedWriter name=&#39;<stdout>&#39;>
            signal.signal(signame, partial(signal_handler, process, func))
    
    
    def signal_handler(process: Process, func, signum, frame):
        print(f"{signum=}")
        global counts
    
        if signum == 17:  # 17 is SIGTERM
            for signame in [SIGTERM, SIGCHLD, SIGQUIT]:
                signal.signal(signame, SIG_DFL)
            print("Launch a new process")
            p = multiprocessing.Process(target=func, args=(f"p{counts}",))
            p.start()
            monitor(p, run)
            processes["case100"] = p
            counts += 1
    
        if signum == 2:
            if process.is_alive():
                print(f"Kill {process} process")
                process.terminate()
            signal.signal(SIGCHLD, SIG_IGN)
            sys.exit("kill parent process")
    
    
    def main():
        p1 = Process(target=run, args=("p1",))
        p1.start()
        monitor(p1, run)
        processes["case100"] = p1
        num = 0
        while True:
            print(
                f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n")
            print(f"{processes=}\n")
            time.sleep(2)
            if num == 4:
                kill_process(processes["case100"])
            if num == 8:
                kill_process(processes["case100"])
            if num == 12:
                kill_process(processes["case100"])
            num += 1
    
    
    if __name__ == &#39;__main__&#39;:
        main()
    Copier après la connexion

    Les résultats de l'exécution sont les suivants

    multiprocessing.active_children()=[<Process name=&#39;Process-1&#39; pid=2533 parent=2532 started>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-1&#39; pid=2533 parent=2532 started>}
    
    p1 running...
    p1 running...
    kill <Process name=&#39;Process-1&#39; pid=2533 parent=2532 started>
    multiprocessing.active_children()=[<Process name=&#39;Process-1&#39; pid=2533 parent=2532 started>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-1&#39; pid=2533 parent=2532 started>}
    
    signum=17
    Launch a new process
    p2 running...
    p2 running...
    multiprocessing.active_children()=[<Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>}
    
    p2 running...
    p2 running...
    multiprocessing.active_children()=[<Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>}
    
    p2 running...
    p2 running...
    multiprocessing.active_children()=[<Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>}
    
    p2 running...
    p2 running...
    kill <Process name=&#39;Process-2&#39; pid=2577 parent=2532 started>
    signum=17
    Launch a new process
    multiprocessing.active_children()=[<Process name=&#39;Process-2&#39; pid=2577 parent=2532 stopped exitcode=-SIGTERM>], count=1
    
    processes={&#39;case100&#39;: <Process name=&#39;Process-3&#39; pid=2675 parent=2532 started>}
    
    p3 running...
    p3 running...
    multiprocessing.active_children()=[<Process name=&#39;Process-3&#39; pid=2675 parent=2532 started>], count=1
    Copier après la connexion

    Résumé

    Avantage s : L'utilisation de sémaphores peut gérer la communication entre plusieurs problème de processus.

    Inconvénients : Le code est difficile à écrire et le code écrit est difficile à comprendre. Vous devez être familier avec l'utilisation des sémaphores, sinon il est facile d'écrire un bug pour vous-même. (Tous les débutants doivent l'utiliser avec prudence, à l'exception des pilotes expérimentés.)

    Une autre chose qui n'est pas particulièrement comprise est le processus . terminate()< /code> Le signal envoyé est <code>SIGTERM et le nombre est 15, mais la première fois que signal_handler reçoit le signal, le numéro = 17. le signal de 15, alors cela entraînera le problème que le processus précédent ne peut pas être tué. Tous ceux qui connaissent les sémaphores sont invités à nous donner quelques conseils. Merci beaucoup. process.terminate() 发送出信号是SIGTERM number是15,但是第一次signal_handler收到信号却是number=17,如果我要去处理15的信号,就会导致前一个进程不能kill掉的问题。欢迎有对信号量比较熟悉的大佬,前来指点迷津,不甚感谢。

    采用multiprocessing.Event 来实现配置热加载

    实现逻辑是主进程1 更新配置并发送指令。进程2启动调度任务。

    这时候当主进程1更新好配置之后,发送指令给进程2,这时候的指令就是用Event一个异步事件通知。

    直接上代码

    scheduler 函数

    def scheduler():
        while True:
            print(&#39;wait message...&#39;)
            case_configurations = scheduler_notify_queue.get()
            print(f"Got case configurations {case_configurations=}...")
    
            task_schedule_event.set() # 设置set之后, is_set 为True
    
            print(f"Schedule will start ...")
            while task_schedule_event.is_set(): # is_set 为True的话,那么任务就会一直执行
                run(case_configurations)
    
            print("Clearing all scheduling job ...")
    Copier après la connexion

    event_scheduler 函数

    def event_scheduler(case_config):
    
        scheduler_notify_queue.put(case_config)
        print(f"Put cases config to the Queue ...")
    
        task_schedule_event.clear() # clear之后,is_set 为False
        print(f"Clear scheduler jobs ...")
    
        print(f"Schedule job ...")
    Copier après la connexion

    完整代码如下

    import multiprocessing
    import time
    
    
    scheduler_notify_queue = multiprocessing.Queue()
    task_schedule_event = multiprocessing.Event()
    
    
    def run(case_configurations: str):
        print(f&#39;{case_configurations} running...&#39;)
        time.sleep(3)
    
    
    def scheduler():
        while True:
            print(&#39;wait message...&#39;)
            case_configurations = scheduler_notify_queue.get()
    
            print(f"Got case configurations {case_configurations=}...")
            task_schedule_event.set()
    
            print(f"Schedule will start ...")
            while task_schedule_event.is_set():
                run(case_configurations)
    
            print("Clearing all scheduling job ...")
    
    
    def event_scheduler(case_config: str):
    
        scheduler_notify_queue.put(case_config)
        print(f"Put cases config to the Queue ...")
    
        task_schedule_event.clear()
        print(f"Clear scheduler jobs ...")
    
        print(f"Schedule job ...")
    
    
    def main():
        scheduler_notify_queue.put(&#39;1&#39;)
        p = multiprocessing.Process(target=scheduler)
        p.start()
    
        count = 1
        print(f&#39;{count=}&#39;)
        while True:
            if count == 5:
                event_scheduler(&#39;100&#39;)
            if count == 10:
                event_scheduler(&#39;200&#39;)
            count += 1
            time.sleep(1)
    
    
    if __name__ == &#39;__main__&#39;:
        main()
    Copier après la connexion

    执行结果如下

    wait message...
    Got case configurations case_configurations=&#39;1&#39;...
    Schedule will start ...
    1 running...
    1 running...
    Put cases config to the Queue ...
    Clear scheduler jobs ...
    Schedule job ...
    Clearing all scheduling job ...
    wait message...
    Got case configurations case_configurations=&#39;100&#39;...
    Schedule will start ...
    100 running...
    Put cases config to the Queue ...
    Clear scheduler jobs ...
    Schedule job ...
    Clearing all scheduling job ...
    wait message...
    Got case configurations case_configurations=&#39;200&#39;...
    Schedule will start ...
    200 running...
    200 running...
    Copier après la connexion

    总结

    使用Event事件通知,代码不易出错,代码编写少,易读。相比之前信号量的方法,推荐大家多使用这种方式。

    使用多线程或协程的方式,其实和上述实现方式一致。唯一区别就是调用了不同库中,queueevent

    Utilisation de multiprocessing.Event pour implémenter le chargement à chaud de la configuration🎜🎜La logique d'implémentation est que le processus principal 1 met à jour la configuration et envoie des instructions. Le processus 2 démarre la tâche de planification. 🎜🎜À ce stade, après que le processus principal 1 a mis à jour la configuration, il envoie une instruction au processus 2. L'instruction à ce moment est d'utiliser Event pour notifier un événement asynchrone. 🎜🎜Accédez directement au code🎜🎜🎜fonction de planification🎜🎜
    # threading
    scheduler_notify_queue = queue.Queue()
    task_schedule_event = threading.Event()
    
    # async
    scheduler_notify_queue = asyncio.Queue()
    task_schedule_event = asyncio.Event()
    Copier après la connexion
    🎜🎜fonction event_scheduler🎜🎜rrreee🎜🎜Le code complet est le suivant🎜🎜rrreee🎜Les résultats d'exécution sont les suivants🎜rrreee🎜🎜 Résumé🎜🎜 🎜Utiliser l'événement notification d'événement, le code est moins sujet aux erreurs, le code est moins écrit et plus facile à lire. Par rapport à la méthode sémaphore précédente, il est recommandé d'utiliser cette méthode plus souvent. 🎜🎜L'utilisation du multi-threading ou de la coroutine est en fait la même que la méthode d'implémentation ci-dessus. La seule différence est que différentes bibliothèques sont appelées, queue et event.🎜rrreee

    Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

    Étiquettes associées:
    source:yisu.com
    Déclaration de ce site Web
    Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
    Tutoriels populaires
    Plus>
    Derniers téléchargements
    Plus>
    effets Web
    Code source du site Web
    Matériel du site Web
    Modèle frontal