I am starting a daemon thread from the context manager which is supposed to send a heartbeat every second but since it is running in a thread , so it does not terminate the context manager if an exception occurs. How to raise exception in context manager when heartbeat stops?
from contextlib import contextmanager from threading import Thread, Event from time import sleep @contextmanager def plc(): stop_event = Event() try: # Send heartbeat every second hb_t = Thread(target=heartbeat_task, args=(stop_event,), daemon=True) hb_t.start() yield except Exception: raise finally: stop_event.set() hb_t.join() print("Heartbeat stopped") def heartbeat_task(stop_event): value = False while not stop_event.is_set(): value = not value print("Heartbeat: " + str(value)) sleep(1) def main(): with plc(): while True: print("Program running") sleep(5) if __name__ == '__main__': main()
I'm having trouble finding examples of this.
thanks for your help!
Update
I've modified the code to be more consistent with the code you posted. but:
The code you provided is inconsistent: heartbeat_task
is passed an event that, if set, will cause the function to return. But it is only set when the context manager in function main
created using with plc():
exits, which is never the case. The point of calling stop_event.set()
if you want any exception thrown by heartbeat_task
to force the context manager to exit and then be caught in function plc
Where? What if by definition we only get here when heartbeat_task
no longer exists due to an exception?
So either you want heartbeat_task
to run indefinitely until an exception is thrown (in which case there is no point in a "stop" event), or you want to be able to stop when some condition exists heartbeat_task
, but there is no code to do so. For demonstration purposes, I'm assuming main
will have access to the stop_event
event and set it under certain conditions. Otherwise, it will run until it detects that heartbeat_task
is no longer running, probably because it raised an exception (it is executing an infinite loop, so how could it terminate if the stop event has not been set yet?). The rest is why you need to use a context manager. I will propose an alternative later.
If you use a multi-thread pool (we only need one thread from the pool), then it becomes simple for the main thread to catch exceptions thrown by tasks submitted to the pool: when multiprocessing.pool.threadpool.apply_async
Returns a multiprocessing.pool.asyncresult
instance when called, indicating future completion. When the get
method is called on this instance, you can get the return value from the helper function (heartbeat_task
) or re-raise any exceptions thrown by the helper function. But we can also use the wait
method to wait for the completion of the submitted task or the elapsed time. We can then use the ready
method to test whether the submitted task after waiting 5 seconds is actually completed (due to an exception or return). If the task is still running then we can tell it to stop. In this demo, I force the task to throw an exception after about 7 seconds:
from contextlib import contextmanager from threading import event from multiprocessing.pool import threadpool from time import sleep @contextmanager def plc(): stop_event = event() pool = threadpool(1) # send heartbeat every second async_result = pool.apply_async(heartbeat_task, args=(stop_event,)) yield stop_event, async_result # we only return here if the task is no longer running try: # see if task threw an exception and if so, catch it: async_result.get() except exception as e: print("got exception:", e) finally: pool.close() pool.join() print("heartbeat stopped") def heartbeat_task(stop_event): # for demo purposes, we will force an exception to occur # after approximately 7 seconds: value = false n = 0 while not stop_event.is_set(): value = not value print("heartbeat: " + str(value)) sleep(1) n += 1 if n == 7: raise exception('oops!') def main(): with plc() as tpl: stop_event, async_result = tpl # this function could forcibly cause the heartbeat_task # to complete by calling stop_event.set() # loop while the task is still running while not async_result.ready(): """ if some_condition: stop_event.set() break """ print("program running") # sleep for 5 seconds or until heartbeat_task terminates: async_result.wait(5) if __name__ == '__main__': main()
Print:
program running heartbeat: true heartbeat: false heartbeat: true heartbeat: false heartbeat: true program running heartbeat: false heartbeat: true got exception: oops! heartbeat stopped
Alternatives to using context managers
from threading import Event from multiprocessing.pool import ThreadPool from time import sleep def heartbeat_task(stop_event): value = False n = 0 while not stop_event.is_set(): value = not value print("Heartbeat: " + str(value)) sleep(1) n += 1 if n == 7: raise Exception('Oops!') def main(): stop_event = Event() pool = ThreadPool(1) async_result = pool.apply_async(heartbeat_task, args=(stop_event,)) # Run as long as heartbeat_task is running: while not async_result.ready(): """ if some_condition: stop_event.set() break """ print("Program running") # Sleep for 5 seconds or until heartbeat_task terminates: async_result.wait(5) # Any exception thrown in heartbeat_task will be rethrown and caught here: try: async_result.get() except Exception as e: print("Got exception:", e) finally: pool.close() pool.join() if __name__ == '__main__': main()
The above is the detailed content of Context managers and daemon threads. For more information, please follow other related articles on the PHP Chinese website!