如何在ZMQ中使用序列化发送图像和数据字符串?

2024-05-20 02:44:35 发布

您现在位置:Python中文网/ 问答频道 /正文

我的目标是从RPi(服务器)向客户机发送图像和数据字符串。我使用send_json(data),其中数据是dict {'img': img_ls, 'telemetry':'0.01, 320, -10'}img_ls是转换为列表的图像。问题是我得到了len( img_ls ) = 57556,而原始图像的大小是:320x240=76800。我不明白为什么会有差异。代码如下:

服务器端

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://0.0.0.0:5557")


def outputs():
    stream = io.BytesIO()
    while True:
        yield stream
        stream.seek(0)
        sensors = '0.01, 320, -10'
        img_ls = np.fromstring(stream.getvalue(), dtype=np.uint8).tolist()
        data = {'telemetry': sensors, 'img': img_ls}
        socket.send_json(data)
        stream.seek(0)
        stream.truncate()

with picamera.PiCamera() as camera:
        camera.resolution = (320, 240)
        camera.framerate = 80
        time.sleep(2)
        camera.capture_sequence(outputs(), 'jpeg', use_video_port=True)

客户端

^{pr2}$

最后一点:这是我尝试将图像和传感器数据从RPi同步传输到客户端。我担心数组和列表转换(在RPi端完成)可能会减慢流式处理的速度。如果有更好的方法(仍然)使用zmq,请告诉我。在


Tags: 数据图像sendjson列表imgdatastream
2条回答

图像处理是CPU昂贵的。所以,表演第一:

ZeroMQ应允许一个人享受零拷贝的操作方式,以防止任何破坏这一点的不利操作。在

由于只使用了一个通用的OpenCV相机,而不是RPi/PiCamera,我总是喜欢在一个受控事件循环下,在采集端采集单独的相机帧(而不是序列)。在

相机获得一个已知的、固定的几何图形(在opencvanumpy.ndarray3D结构[X,Y,[B,G,R]]),因此最快和最直接的序列化是在发送方使用struct.pack( CONST_FRAME_STRUCT_MASK, aFrame ),在接收方使用struct.unpack( CONST_FRAME_STRUCT_MASK, aMessage )。在

是的,struct.pack()是迄今为止最快的方法,即使文档提供了其他方法(灵活性需要额外的成本,这是不合理的):

import numpy

def send_array( socket, A, flags = 0, copy = True, track = False ):
    """send a numpy array with metadata"""
    md = dict( dtype = str( A.dtype ),
               shape =      A.shape,
               )
    pass;  socket.send_json( md, flags | zmq.SNDMORE )
    return socket.send(      A,  flags, copy = copy, track = track )

def recv_array( socket, flags = 0, copy = True, track = False ):
    """recv a numpy array"""
    md = socket.recv_json( flags = flags )
    msg = socket.recv(     flags = flags, copy = copy, track = track )
    buf = buffer( msg )
    pass;  A = numpy.frombuffer( buf, dtype = md['dtype'] )
    return A.reshape(                         md['shape'] )

任何颜色转换和类似的源端转换都可能消耗+150~180[ms],因此请尽量避免任何和所有不必要的颜色空间或重塑或类似的非核心转换,因为这些会对累积的管道延迟包络造成不利影响。在

使用struct.pack()还可以避免任何类型的大小不匹配,因此加载到二进制有效载荷着陆台上的内容正是接收端接收到的内容。在

如果确实希望在消息核心数据周围也有与JSON相关的开销,那么应该设置一个双套接字范式,都有ZMQ_CONFLATE == 1,其中第一个移动{}-有效负载,第二个JSON修饰的遥测。在

如果RPi允许,zmq.Context( nIOthreads )可以使用nIOthreads >= 2进一步增加两边的数据泵送吞吐量,并且附加的{}映射可以分离/分配工作负载,使每个工作负载都在不同的、独立的IOthread上运行。在

查看下面的代码。我使用了Nlohmann json和一些小的调整(来自各种来源)来发送图像、vetor、string等。

客户代码

#include <zmq.hpp> 
#include <string>
#include <iostream>
#include <sstream>

#include <nlohmann/json.hpp> 
#include <opencv2/opencv.hpp>
#include "opencv2/imgproc/imgproc_c.h"
#include "opencv2/imgproc/imgproc.hpp"
#include <typeinfo>
using json = nlohmann::json;


class image_test
{
  public:
    void client1(){
      zmq::context_t context (1);
      zmq::socket_t socket (context, ZMQ_REQ);
      socket.connect ("tcp://localhost:5555");

      while (true){
        // create an empty structure (null)
        json j;
        std::string data;
        float f = 3.12;
        cv::Mat mat = cv::imread("cat.jpg",CV_LOAD_IMAGE_COLOR);

        // std::cout<<Imgdata;

        std::vector<uchar> array;
        if (mat.isContinuous()) 
          {
            array.assign(mat.datastart, mat.dataend);
          } 

        else 
          {
            for (int i = 0; i < mat.rows; ++i) 
              {
                  array.insert(array.end(), mat.ptr<uchar>(i), mat.ptr<uchar>(i)+mat.cols);
               }
          }

        std::vector<uint> v = {1,5,9};

        j["Type"] = f;
        j["vec"] = v;
        j["Image"]["rows"] = mat.rows;
        j["Image"]["cols"] = mat.cols;
        j["Image"]["channels"] = mat.channels();
        j["Image"]["data"] = array;

        // add a Boolean that is stored as bool
        j["Parameter"] = "Frequency";

        // add a string that is stored as std::string
        j["Value"] = "5.17e9";


        // explicit conversion to string
        std::string s = j.dump();  


        zmq::message_t request (s.size());
        memcpy (request.data (), (s.c_str()), (s.size()));
        socket.send(request);

        zmq::message_t reply;
        socket.recv (&reply);
        std::string rpl = std::string(static_cast<char*>(reply.data()), reply.size());

        json second = json::parse(rpl);

        std::cout << second["num"] << std::endl;


      }
    }         
};


int main (void)
{

  image_test caller;
  caller.client1();
}

服务器代码

^{2}$

应该包括nlohmann git的include包。或者yoy可以直接从:https://github.com/zsfVishnu/zmq.git下载源代码和链接。 另外,如果您使用的是g++或任何其他编译器,但不包括nlohmann的include文件夹,只需在CLI ie add-I/include文件夹的路径/

相关问题 更多 >