我正在编写一个程序,使用谷歌珊瑚设备在视频流中查找对象
我有一个功能,可以连续检测视频中的对象
我想将数据从该函数“流”到另外两个函数/类:
VideoViewer:这是一个管理调试窗口的类,显示边界框等
API端点:应用程序的这一部分尚未实现,但我希望稍后通过websocket公开对象检测函数中的数据
以下是我到目前为止的情况。它产生了结果
from PIL import Image
from cv2 import VideoCapture
def detect_with_video(self, video: VideoCapture):
"""Detect objects in a video"""
streaming = True
frame_id = 0
results = []
detection_delay = 4
while streaming:
_, frame = video.read()
if frame_id % detection_delay == 0:
results = self._detect_image(Image.fromarray(frame))
for result in results:
yield result
在我的app.py
中,我希望能够做到以下几点:
class App:
"""The application"""
def __init__(self):
self.video: VideoCapture = cv2.VideoCapture(0)
def run(self):
# Initialize video
VideoViewer().new_window()
generator = Detector().detect_with_video(self.video)
VideoViewer().set_stream(generator)
HttpApi().set_stream(generator)
class VideoViewer:
"""VideoViewer launches video windows"""
def set_stream(self, generator):
for item in generator:
print(item)
我的猜测是我必须使set_stream
方法异步,但是是的
如何使两个消费者都不阻塞
我是python新手,不知道如何做到这一点。非常感谢您的帮助
目前没有回答
相关问题 更多 >
编程相关推荐