Je démarre un thread démon à partir d'un gestionnaire de contexte qui est censé envoyer un battement de cœur toutes les secondes, mais comme il s'exécute dans un thread, il ne termine pas le gestionnaire de contexte si une exception se produit. Comment déclencher une exception dans le gestionnaire de contexte lorsque le battement de cœur s'arrête ?
from contextlib import contextmanager from threading import Thread, Event from time import sleep @contextmanager def plc(): stop_event = Event() try: # Send heartbeat every second hb_t = Thread(target=heartbeat_task, args=(stop_event,), daemon=True) hb_t.start() yield except Exception: raise finally: stop_event.set() hb_t.join() print("Heartbeat stopped") def heartbeat_task(stop_event): value = False while not stop_event.is_set(): value = not value print("Heartbeat: " + str(value)) sleep(1) def main(): with plc(): while True: print("Program running") sleep(5) if __name__ == '__main__': main()
J'ai du mal à trouver des exemples de cela.
Merci pour votre aide !
mise à jour
J'ai modifié le code pour qu'il soit plus cohérent avec le code que vous avez publié. Mais :
Le code que vous avez fourni est incohérent : heartbeat_task
传递了一个事件,如果设置该事件将导致函数返回。但只有当使用 with plc():
创建的函数 main
中的上下文管理器退出时才会设置它,而这是永远不会的。如果您希望 heartbeat_task
抛出的任何异常将强制上下文管理器退出,然后在函数 plc
中捕获,那么调用 stop_event.set()
的意义何在?如果根据定义,我们仅在 heartbeat_task
Êtes-vous arrivé ici uniquement lorsqu'il n'existe plus en raison d'une exception ?
Donc soit vous voulez heartbeat_task
无限期地运行,直到引发异常(在这种情况下,没有“停止”事件的意义),要么您希望能够在存在某些条件时停止 heartbeat_task
,但没有这样做的代码。出于演示目的,我假设 main
将有权访问 stop_event
事件,并在某些情况下对其进行设置。否则,它会一直运行,直到检测到 heartbeat_task
ne plus jamais s'exécuter, probablement parce qu'il a levé une exception (il exécute une boucle infinie, alors comment peut-il se terminer si l'événement d'arrêt n'a pas encore été défini ?). Le reste est la raison pour laquelle vous devez utiliser un gestionnaire de contexte. Je proposerai une alternative plus tard.
Si vous utilisez un pool multi-thread (nous n'avons besoin que d'un seul thread dans le pool), alors il devient simple pour le thread principal d'intercepter l'exception levée par la tâche soumise au pool : lorsque la méthode multiprocessing.pool.threadpool.apply_async
被调用时返回 multiprocessing.pool.asyncresult
实例,表示未来的完成。当在此实例上调用 get
方法时,您可以从辅助函数 (heartbeat_task
) 获取返回值,或者重新引发辅助函数引发的任何异常。但是我们也可以使用 wait
方法来等待提交任务的完成或经过的时间。然后我们可以使用 ready
teste si la tâche soumise après avoir attendu 5 secondes, il est réellement terminé (en raison d'une exception ou d'un retour). Si la tâche est toujours en cours d'exécution, nous pouvons lui dire de s'arrêter. Dans cette démo, je force la tâche à lever une exception après environ 7 secondes :
from contextlib import contextmanager from threading import event from multiprocessing.pool import threadpool from time import sleep @contextmanager def plc(): stop_event = event() pool = threadpool(1) # send heartbeat every second async_result = pool.apply_async(heartbeat_task, args=(stop_event,)) yield stop_event, async_result # we only return here if the task is no longer running try: # see if task threw an exception and if so, catch it: async_result.get() except exception as e: print("got exception:", e) finally: pool.close() pool.join() print("heartbeat stopped") def heartbeat_task(stop_event): # for demo purposes, we will force an exception to occur # after approximately 7 seconds: value = false n = 0 while not stop_event.is_set(): value = not value print("heartbeat: " + str(value)) sleep(1) n += 1 if n == 7: raise exception('oops!') def main(): with plc() as tpl: stop_event, async_result = tpl # this function could forcibly cause the heartbeat_task # to complete by calling stop_event.set() # loop while the task is still running while not async_result.ready(): """ if some_condition: stop_event.set() break """ print("program running") # sleep for 5 seconds or until heartbeat_task terminates: async_result.wait(5) if __name__ == '__main__': main()
Impression :
program running heartbeat: true heartbeat: false heartbeat: true heartbeat: false heartbeat: true program running heartbeat: false heartbeat: true got exception: oops! heartbeat stopped
Une alternative à l'utilisation des gestionnaires de contexte
from threading import Event from multiprocessing.pool import ThreadPool from time import sleep def heartbeat_task(stop_event): value = False n = 0 while not stop_event.is_set(): value = not value print("Heartbeat: " + str(value)) sleep(1) n += 1 if n == 7: raise Exception('Oops!') def main(): stop_event = Event() pool = ThreadPool(1) async_result = pool.apply_async(heartbeat_task, args=(stop_event,)) # Run as long as heartbeat_task is running: while not async_result.ready(): """ if some_condition: stop_event.set() break """ print("Program running") # Sleep for 5 seconds or until heartbeat_task terminates: async_result.wait(5) # Any exception thrown in heartbeat_task will be rethrown and caught here: try: async_result.get() except Exception as e: print("Got exception:", e) finally: pool.close() pool.join() if __name__ == '__main__': main()
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!