在我当前的项目中,当我发送停止信号(程序在我的计算机上运行)时,我正在尝试停止视频流(程序在Raspberry PI上运行)。你知道吗
为了实现这个目标,我编写了一个多处理python代码,它运行两个进程。你知道吗
过程1从一个装有覆盆子圆周率的摄像头读取数据,并将其发送给亚马逊Kinesis。它使用树莓PI的OpenCV
库将图像发送给Kinesis。
进程2是一个订户(通过AWS IoT),它等待来自AWS IoT的停止信号。当它接收到停止信号时,它停止流。
我使用了commandFlag
作为标志。它用0
初始化。每当线程2接收到停止信号时,commandFlag
就会转到1
。你知道吗
在当前程序中,不调用def on_message(client, userdata, msg):
。因此,它没有停止视频流(因为commandFlag
标志没有设置为1
)
此外,它不会释放附加的相机资源。你知道吗
你能建议其他方法来解决这个问题吗?我是否需要在程序中进行任何其他更改才能正常工作?你知道吗
import sys
import cPickle
import datetime
import cv2
import boto3
import time
import cPickle
from multiprocessing import Pool
import pytz
import threading
import multiprocessing
#Subscriber
import paho.mqtt.client as paho
import os
import socket
import ssl
commandFlag = 0
# The unique hostname that &IoT; generated for this device.
awshost = "AAAAAA-ats.iot.us-east-1.amazonaws.com"
awsport = 8883
# A programmatic shadow handler name prefix.
clientId = "IoTDevice"
thingName = "IoTDevice"
# The relative path to the correct root CA file for &IoT;, which you have already saved onto this device.
caPath = "AmazonRootCA1.pem"
# The relative path to your certificate file that
# &IoT; generated for this device, which you
# have already saved onto this device.
certPath = "AAAAA-certificate.pem.crt"
# The relative path to your private key file that
# &IoT; generated for this device, which you
# have already saved onto this device.
keyPath = "AAAAAA-private.pem.key"
kinesis_client = boto3.client("kinesis", region_name='us-east-1')
rekog_client = boto3.client("rekognition",region_name='us-east-1')
camera_index = 0 # 0 is usually the built-in webcam
capture_rate = 30 # Frame capture rate.. every X frames. Positive integer.
rekog_max_labels = 123
rekog_min_conf = 50.0
def on_connect(client, userdata, flags, rc):
global commandFlag
print("I am ready to receive control message...." )
client.subscribe("#" , 1 )
def on_message(client, userdata, msg):
global commandFlag
print("Received Switch Off message from AWS IoT....")
commandFlag = 1
def subscriber():
global commandFlag
mqttc = paho.Client()
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.tls_set(caPath, certfile=certPath, keyfile=keyPath, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
mqttc.connect(awshost, awsport, keepalive=60)
mqttc.loop_forever()
#Send frame to Kinesis stream
def encode_and_send_frame(frame, frame_count, enable_kinesis=True, enable_rekog=False, write_file=False):
try:
#convert opencv Mat to jpg image
#print "----FRAME---"
retval, buff = cv2.imencode(".jpg", frame)
img_bytes = bytearray(buff)
utc_dt = pytz.utc.localize(datetime.datetime.now())
now_ts_utc = (utc_dt - datetime.datetime(1970, 1, 1, tzinfo=pytz.utc)).total_seconds()
frame_package = {
'ApproximateCaptureTime' : now_ts_utc,
'FrameCount' : frame_count,
'ImageBytes' : img_bytes
}
if write_file:
print("Writing file img_{}.jpg".format(frame_count))
target = open("img_{}.jpg".format(frame_count), 'w')
target.write(img_bytes)
target.close()
#put encoded image in kinesis stream
if enable_kinesis:
print "Sending image to Kinesis"
response = kinesis_client.put_record(
StreamName="FrameStream",
Data=cPickle.dumps(frame_package),
PartitionKey="partitionkey"
)
print response
if enable_rekog:
response = rekog_client.detect_labels(
Image={
'Bytes': img_bytes
},
MaxLabels=rekog_max_labels,
MinConfidence=rekog_min_conf
)
print response
except Exception as e:
print e
def main():
global commandFlag
#capture_rate
argv_len = len(sys.argv)
if argv_len > 1 and sys.argv[1].isdigit():
capture_rate = int(sys.argv[1])
cap = cv2.VideoCapture(0) #Use 0 for built-in camera. Use 1, 2, etc. for attached cameras.
pool = Pool(processes=3)
frame_count = 0
while True:
# Capture frame-by-frame
ret, frame = cap.read()
#cv2.resize(frame, (640, 360));
if ret is False:
break
if frame_count % 30 == 0:
result = pool.apply_async(encode_and_send_frame, (frame, frame_count, True, False, False,))
frame_count += 1
# Display the resulting frame
cv2.imshow('frame', frame)
#if cv2.waitKey(1) & 0xFF == ord('q'):
if commandFlag == 1:
break;
# When everything done, release the capture
cap.release()
cv2.destroyAllWindows()
return
if __name__ == '__main__':
t1 = multiprocessing.Process(target=main)
t1.start()
t2 = multiprocessing.Process(target=subscriber)
t2.start()
while True:
if commandFlag == 1:
t1.terminate()
t2.terminate()
sys.exit(1)
目前没有回答
相关问题 更多 >
编程相关推荐