Context managers and daemon threads

WBOY
Release: 2024-02-06 11:27:08
forward
937 people have browsed it

Context managers and daemon threads

Question content

I am starting a daemon thread from the context manager which is supposed to send a heartbeat every second but since it is running in a thread , so it does not terminate the context manager if an exception occurs. How to raise exception in context manager when heartbeat stops?

from contextlib import contextmanager
from threading import Thread, Event
from time import sleep


@contextmanager
def plc():
    stop_event = Event()

    try:
        # Send heartbeat every second
        hb_t = Thread(target=heartbeat_task,
                      args=(stop_event,),
                      daemon=True)
        hb_t.start()

        yield
    except Exception:
        raise
    finally:
        stop_event.set()
        hb_t.join()
        print("Heartbeat stopped")


def heartbeat_task(stop_event):

    value = False
    
    while not stop_event.is_set():

        value = not value

        print("Heartbeat: " + str(value))

        sleep(1)


def main():

    with plc():

        while True:

            print("Program running")

            sleep(5)

if __name__ == '__main__':
    main()
Copy after login

I'm having trouble finding examples of this.

thanks for your help!


Correct answer


Update

I've modified the code to be more consistent with the code you posted. but:

The code you provided is inconsistent: heartbeat_task is passed an event that, if set, will cause the function to return. But it is only set when the context manager in function main created using with plc(): exits, which is never the case. The point of calling stop_event.set() if you want any exception thrown by heartbeat_task to force the context manager to exit and then be caught in function plc Where? What if by definition we only get here when heartbeat_task no longer exists due to an exception?

So either you want heartbeat_task to run indefinitely until an exception is thrown (in which case there is no point in a "stop" event), or you want to be able to stop when some condition exists heartbeat_task, but there is no code to do so. For demonstration purposes, I'm assuming main will have access to the stop_event event and set it under certain conditions. Otherwise, it will run until it detects that heartbeat_task is no longer running, probably because it raised an exception (it is executing an infinite loop, so how could it terminate if the stop event has not been set yet?). The rest is why you need to use a context manager. I will propose an alternative later.

If you use a multi-thread pool (we only need one thread from the pool), then it becomes simple for the main thread to catch exceptions thrown by tasks submitted to the pool: when multiprocessing.pool.threadpool.apply_async Returns a multiprocessing.pool.asyncresult instance when called, indicating future completion. When the get method is called on this instance, you can get the return value from the helper function (heartbeat_task) or re-raise any exceptions thrown by the helper function. But we can also use the wait method to wait for the completion of the submitted task or the elapsed time. We can then use the ready method to test whether the submitted task after waiting 5 seconds is actually completed (due to an exception or return). If the task is still running then we can tell it to stop. In this demo, I force the task to throw an exception after about 7 seconds:

from contextlib import contextmanager
from threading import event
from multiprocessing.pool import threadpool
from time import sleep


@contextmanager
def plc():
    stop_event = event()
    pool = threadpool(1)

    # send heartbeat every second
    async_result = pool.apply_async(heartbeat_task, args=(stop_event,))
    yield stop_event, async_result
    # we only return here if the task is no longer running
    try:
        # see if task threw an exception and if so, catch it:
        async_result.get()
    except exception as e:
        print("got exception:", e)
    finally:
        pool.close()
        pool.join()
        print("heartbeat stopped")


def heartbeat_task(stop_event):
    # for demo purposes, we will force an exception to occur
    # after approximately 7 seconds:
    value = false

    n = 0
    while not stop_event.is_set():
        value = not value
        print("heartbeat: " + str(value))
        sleep(1)
        n += 1
        if n == 7:
            raise exception('oops!')


def main():
    with plc() as tpl:
        stop_event, async_result = tpl
        # this function could forcibly cause the heartbeat_task
        # to complete by calling stop_event.set()

        # loop while the task is still running
        while not async_result.ready():
            """
            if some_condition:
                stop_event.set()
                break
            """
            print("program running")
            # sleep for 5 seconds or until heartbeat_task terminates:
            async_result.wait(5)

if __name__ == '__main__':
    main()
Copy after login

Print:

program running
heartbeat: true
heartbeat: false
heartbeat: true
heartbeat: false
heartbeat: true
program running
heartbeat: false
heartbeat: true
got exception: oops!
heartbeat stopped
Copy after login

Alternatives to using context managers

from threading import Event
from multiprocessing.pool import ThreadPool
from time import sleep


def heartbeat_task(stop_event):
    value = False

    n = 0
    while not stop_event.is_set():
        value = not value
        print("Heartbeat: " + str(value))
        sleep(1)
        n += 1
        if n == 7:
            raise Exception('Oops!')

def main():
    stop_event = Event()
    pool = ThreadPool(1)
    async_result = pool.apply_async(heartbeat_task, args=(stop_event,))

    # Run as long as heartbeat_task is running:
    while not async_result.ready():
        """
        if some_condition:
            stop_event.set()
            break
        """
        print("Program running")
        # Sleep for 5 seconds or until heartbeat_task terminates:
        async_result.wait(5)

    # Any exception thrown in heartbeat_task will be rethrown and caught here:
    try:
        async_result.get()
    except Exception as e:
        print("Got exception:", e)
    finally:
        pool.close()
        pool.join()

if __name__ == '__main__':
    main()
Copy after login

The above is the detailed content of Context managers and daemon threads. For more information, please follow other related articles on the PHP Chinese website!

source:stackoverflow.com
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template
About us Disclaimer Sitemap
php.cn:Public welfare online PHP training,Help PHP learners grow quickly!