卓越飞翔博客卓越飞翔博客

卓越飞翔 - 您值得收藏的技术分享站
技术文章34240本站已运行391

上下文管理器和守护线程

上下文管理器和守护线程

问题内容

我正在从上下文管理器启动一个守护线程,该线程应该每秒发送一次心跳,但由于它在线程中运行,因此如果发生异常,它不会终止上下文管理器。当心跳停止时,如何在上下文管理器中引发异常?

from contextlib import contextmanager
from threading import Thread, Event
from time import sleep


@contextmanager
def plc():
    stop_event = Event()

    try:
        # Send heartbeat every second
        hb_t = Thread(target=heartbeat_task,
                      args=(stop_event,),
                      daemon=True)
        hb_t.start()

        yield
    except Exception:
        raise
    finally:
        stop_event.set()
        hb_t.join()
        print("Heartbeat stopped")


def heartbeat_task(stop_event):

    value = False
    
    while not stop_event.is_set():

        value = not value

        print("Heartbeat: " + str(value))

        sleep(1)


def main():

    with plc():

        while True:

            print("Program running")

            sleep(5)

if __name__ == '__main__':
    main()

我很难找到这方面的例子。

感谢您的帮助!


正确答案


更新

我已经修改了代码,使其与您发布的代码更加一致。但是:

您提供的代码不一致:heartbeat_task 传递了一个事件,如果设置该事件将导致函数返回。但只有当使用 with plc(): 创建的函数 main 中的上下文管理器退出时才会设置它,而这是永远不会的。如果您希望 heartbeat_task 抛出的任何异常将强制上下文管理器退出,然后在函数 plc 中捕获,那么调用 stop_event.set() 的意义何在?如果根据定义,我们仅在 heartbeat_task 不再存在时才到达这里由于异常而运行?

因此,要么您希望 heartbeat_task 无限期地运行,直到引发异常(在这种情况下,没有“停止”事件的意义),要么您希望能够在存在某些条件时停止 heartbeat_task,但没有这样做的代码。出于演示目的,我假设 main 将有权访问 stop_event 事件,并在某些情况下对其进行设置。否则,它会一直运行,直到检测到 heartbeat_task 不再运行,可能是因为它引发了异常(它正在执行无限循环,所以如果尚未设置停止事件,它怎么可能终止?)。剩下的就是为什么您需要使用上下文管理器。稍后我将提出一个替代方案。

如果您使用多线程池(我们只需要池中的一个线程),那么主线程捕获提交到池的任务抛出的异常就变得很简单:当 multiprocessing.pool.threadpool.apply_async 被调用时返回 multiprocessing.pool.asyncresult 实例,表示未来的完成。当在此实例上调用 get 方法时,您可以从辅助函数 (heartbeat_task) 获取返回值,或者重新引发辅助函数引发的任何异常。但是我们也可以使用 wait 方法来等待提交任务的完成或经过的时间。然后我们可以使用 ready 方法测试等待 5 秒后提交的任务是否真正完成(由于异常或返回)。如果任务仍在运行,那么我们可以告诉它停止。在此演示中,我强制任务在大约 7 秒后引发异常:

from contextlib import contextmanager
from threading import event
from multiprocessing.pool import threadpool
from time import sleep


@contextmanager
def plc():
    stop_event = event()
    pool = threadpool(1)

    # send heartbeat every second
    async_result = pool.apply_async(heartbeat_task, args=(stop_event,))
    yield stop_event, async_result
    # we only return here if the task is no longer running
    try:
        # see if task threw an exception and if so, catch it:
        async_result.get()
    except exception as e:
        print("got exception:", e)
    finally:
        pool.close()
        pool.join()
        print("heartbeat stopped")


def heartbeat_task(stop_event):
    # for demo purposes, we will force an exception to occur
    # after approximately 7 seconds:
    value = false

    n = 0
    while not stop_event.is_set():
        value = not value
        print("heartbeat: " + str(value))
        sleep(1)
        n += 1
        if n == 7:
            raise exception('oops!')


def main():
    with plc() as tpl:
        stop_event, async_result = tpl
        # this function could forcibly cause the heartbeat_task
        # to complete by calling stop_event.set()

        # loop while the task is still running
        while not async_result.ready():
            """
            if some_condition:
                stop_event.set()
                break
            """
            print("program running")
            # sleep for 5 seconds or until heartbeat_task terminates:
            async_result.wait(5)

if __name__ == '__main__':
    main()

打印:

program running
heartbeat: true
heartbeat: false
heartbeat: true
heartbeat: false
heartbeat: true
program running
heartbeat: false
heartbeat: true
got exception: oops!
heartbeat stopped

使用上下文管理器的替代方法

from threading import Event
from multiprocessing.pool import ThreadPool
from time import sleep


def heartbeat_task(stop_event):
    value = False

    n = 0
    while not stop_event.is_set():
        value = not value
        print("Heartbeat: " + str(value))
        sleep(1)
        n += 1
        if n == 7:
            raise Exception('Oops!')

def main():
    stop_event = Event()
    pool = ThreadPool(1)
    async_result = pool.apply_async(heartbeat_task, args=(stop_event,))

    # Run as long as heartbeat_task is running:
    while not async_result.ready():
        """
        if some_condition:
            stop_event.set()
            break
        """
        print("Program running")
        # Sleep for 5 seconds or until heartbeat_task terminates:
        async_result.wait(5)

    # Any exception thrown in heartbeat_task will be rethrown and caught here:
    try:
        async_result.get()
    except Exception as e:
        print("Got exception:", e)
    finally:
        pool.close()
        pool.join()

if __name__ == '__main__':
    main()
卓越飞翔博客
上一篇: 在golang中使用默认的Windows文本编辑器打开txt文件
下一篇: 返回列表
留言与评论(共有 0 条评论)
   
验证码:
隐藏边栏