在函数之间异步传递数据

2024-09-28 05:26:39 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在编写一个程序,使用谷歌珊瑚设备在视频流中查找对象

我有一个功能,可以连续检测视频中的对象

我想将数据从该函数“流”到另外两个函数/类:

  1. VideoViewer:这是一个管理调试窗口的类,显示边界框等

  2. API端点:应用程序的这一部分尚未实现,但我希望稍后通过websocket公开对象检测函数中的数据

以下是我到目前为止的情况。它产生了结果

from PIL import Image
from cv2 import VideoCapture

def detect_with_video(self, video: VideoCapture):
    """Detect objects in a video"""
    streaming = True
    frame_id = 0
    results = []
    detection_delay = 4

    while streaming:
        _, frame = video.read()

        if frame_id % detection_delay == 0:
            results = self._detect_image(Image.fromarray(frame))

        for result in results:
            yield result

在我的app.py中,我希望能够做到以下几点:

class App:
    """The application"""
    def __init__(self):
        self.video: VideoCapture = cv2.VideoCapture(0)

    def run(self):
        # Initialize video
        VideoViewer().new_window()

        generator = Detector().detect_with_video(self.video)

        VideoViewer().set_stream(generator)
        HttpApi().set_stream(generator)

class VideoViewer:
    """VideoViewer launches video windows"""

   def set_stream(self, generator):
       for item in generator:
            print(item)

我的猜测是我必须使set_stream方法异步,但是是的

如何使两个消费者都不阻塞

我是python新手,不知道如何做到这一点。非常感谢您的帮助


Tags: 数据对象函数inselfstreamdefvideo

热门问题