存储和检索大量小型非结构化消息的最快方法

2024-09-29 23:27:50 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在开发一个物联网应用程序,它要求我处理许多小的非结构化消息(这意味着它们的字段可能会随着时间的推移而变化——一些字段可能会出现,而另一些字段可能会消失)。这些消息通常有2到15个字段,这些字段的值属于基本数据类型(int/long、string、boolean)。这些消息非常适合JSON数据格式(或msgpack)

按照消息到达的顺序处理消息是至关重要的(请理解:它们需要由一个线程来处理-无法并行化这一部分)。我有自己的实时处理这些消息的逻辑(吞吐量相对较小,最多每秒几十万条消息),但是越来越需要引擎能够通过重放消息的历史来模拟/重放以前的时间段。虽然它最初并不是为了这个目的而写的,但如果我能够以足够的速度向它提供历史数据,我的事件处理引擎(用Go编写)每秒可以很好地处理几十条(可能是低的数百条)数百万条消息

这正是问题所在。在很长一段时间(几年)内,我一直在以分隔的msgpack格式(https://github.com/msgpack/msgpack-python#streaming-unpacking)存储许多(数千亿)这样的消息。在这种设置和其他设置(见下文)中,我能够以每秒约200万条消息的峰值解析速度(在2019年的Macbook Pro上,仅解析),这远远没有使磁盘IO饱和

即使不谈论IO,也要执行以下操作:

import json
message = {
    'meta1': "measurement",
    'location': "NYC",
    'time': "20200101",
    'value1': 1.0,
    'value2': 2.0,
    'value3': 3.0,
    'value4': 4.0
}
json_message = json.dumps(message)

%%timeit
json.loads(json_message)

给我的解析时间为3微秒/条消息,略高于300k条消息/秒。与ujson、rapidjson和orjson(而不是标准库的json模块)相比,我能够获得1微秒/消息的峰值速度(使用ujson),即大约1M消息/秒

Msgpack稍好一些:

import msgpack
message = {
    'meta1': "measurement",
    'location': "NYC",
    'time': "20200101",
    'value1': 1.0,
    'value2': 2.0,
    'value3': 3.0,
    'value4': 4.0
}
msgpack_message = msgpack.packb(message)

%%timeit
msgpack.unpackb(msgpack_message)

给我的处理时间约为750ns/条消息(约100ns/字段),即约为130万条消息/秒。我最初认为C++可以更快得多。下面是一个使用nlohmann/json的示例,尽管这与msgpack没有直接的可比性:

#include <iostream>
#include "json.hpp"

using json = nlohmann::json;

const std::string message = "{\"value\": \"hello\"}";

int main() {
  auto jsonMessage = json::parse(message);
  for(size_t i=0; i<1000000; ++i) {
    jsonMessage = json::parse(message);
  }
  std::cout << jsonMessage["value"] << std::endl; // To avoid having the compiler optimize the loop away. 
};

使用clang11.0.3(std=c++17,-O3)编译,在同一台Macbook上以~1.4s的速度运行,也就是说,解析速度为~700k条消息/秒,消息比Python示例更小。我知道nlohmann/json可能非常慢,并且能够使用simdjson的domapi获得每秒约200万条消息的解析速度

这对于我的用例来说仍然太慢了。我对所有建议都有改进,可以用Python、C++、java(或者JVM语言)或Go来改进潜在的消息解析速度。p>

注:

  • 我不一定关心磁盘上消息的大小(如果您建议的存储方法是内存高效的,则可以考虑这一点)
  • 我所需要的只是基本数据类型的键值模型——我不需要嵌套的字典或列表
  • 转换现有数据根本不是问题。我只是在寻找一些阅读优化
  • 我不一定需要将整个内容解析为结构或自定义对象,只需要在需要时访问一些字段(我通常需要每条消息的一小部分字段)——如果这会带来惩罚,只要惩罚不会破坏整个应用程序的吞吐量,就可以了
  • 我对定制/稍微不安全的解决方案持开放态度
  • 我选择使用的任何格式都需要自然地进行分隔,从某种意义上说,消息将被串行写入一个文件(我目前每天使用一个文件,这对于我的用例来说已经足够了)。我过去曾遇到过不恰当的消息分隔问题(请参阅Java Protobuf API中的WriteDelimiteTo-丢失一个字节,整个文件都会被破坏)

我已经探讨过的事情:

  • JSON:使用rapidjson、simdjson进行试验,nlohmann/json等)
  • 带分隔符msgpack的平面文件(请参见此API:https://github.com/msgpack/msgpack-python#streaming-unpacking):我当前用于存储消息的内容
  • 协议缓冲区:稍微快一点,但并不真正适合数据的非结构化性质

谢谢


Tags: 文件json应用程序消息messagestring时间msgpack
1条回答
网友
1楼 · 发布于 2024-09-29 23:27:50

我假设消息只包含几个基本类型的命名属性(在运行时定义),这些基本类型是例如字符串、整数和浮点数

为了快速实施,最好:

  • 避免文本解析(速度慢,因为顺序性强且充满条件)
  • 避免检查消息是否格式错误(此处不需要,因为它们都应该格式正确)
  • 尽量避免拨款
  • 处理消息块

因此,我们首先需要设计一个简单而快速的二进制消息协议

二进制消息包含其属性数(按1字节编码),后跟属性列表。每个属性都包含一个以其大小(编码为1字节)为前缀的字符串,后跟属性类型(std::variant中类型的索引,编码为1字节)以及属性值(大小前缀字符串、64位整数或64位浮点数)

每个编码的消息都是一个字节流,可以放入一个大的缓冲区(分配一次并重新用于多个传入消息)

以下是从原始二进制缓冲区解码消息的代码:

#include <unordered_map>
#include <variant>
#include <climits>

// Define the possible types here
using AttrType = std::variant<std::string_view, int64_t, double>;

// Decode the `msgData` buffer and write the decoded message into `result`.
// Assume the message is not ill-formed!
// msgData must not be freed or modified while the resulting map is being used.
void decode(const char* msgData, std::unordered_map<std::string_view, AttrType>& result)
{
    static_assert(CHAR_BIT == 8);

    const size_t attrCount = msgData[0];
    size_t cur = 1;

    result.clear();

    for(size_t i=0 ; i<attrCount ; ++i)
    {
        const size_t keyLen = msgData[cur];
        std::string_view key(msgData+cur+1, keyLen);
        cur += 1 + keyLen;
        const size_t attrType = msgData[cur];
        cur++;

        // A switch could be better if there is more types
        if(attrType == 0) // std::string_view
        {
            const size_t valueLen = msgData[cur];
            std::string_view value(msgData+cur+1, valueLen);
            cur += 1 + valueLen;

            result[key] = std::move(AttrType(value));
        }
        else if(attrType == 1) // Native-endian 64-bit integer
        {
            int64_t value;

            // Required to not break the strict aliasing rule
            std::memcpy(&value, msgData+cur, sizeof(int64_t));
            cur += sizeof(int64_t);

            result[key] = std::move(AttrType(value));
        }
        else // IEEE-754 double
        {
            double value;

            // Required to not break the strict aliasing rule
            std::memcpy(&value, msgData+cur, sizeof(double));
            cur += sizeof(double);

            result[key] = std::move(AttrType(value));
        }
    }
}

您可能也需要编写编码函数(基于相同的想法)

下面是一个用法示例(基于json相关代码):

const char* message = "\x01\x05value\x00\x05hello";

void bench()
{
    std::unordered_map<std::string_view, AttrType> decodedMsg;
    decodedMsg.reserve(16);

    decode(message, decodedMsg);

    for(size_t i=0; i<1000*1000; ++i)
    {
        decode(message, decodedMsg);
    }

    visit([](const auto& v) { cout << "Result: " << v << endl; }, decodedMsg["value"]);
}

在我的机器上(使用Intel i7-9700KF处理器),基于您的基准测试,我使用nlohmann json库获得270万条消息/秒的代码,使用新代码获得35.4万条消息/秒的代码

请注意,此代码可以快得多。事实上,大部分时间都花在高效的哈希和分配上。您可以通过使用更快的哈希映射实现(例如boost::container::flat_映射或ska::bytell_hash_映射)和/或使用自定义分配器来缓解此问题。另一种方法是构建自己的经过仔细调优的哈希映射实现。另一种选择是使用键值对向量并使用线性搜索来执行查找(这应该很快,因为您的消息不应该有很多属性,而且您说您需要每个消息的一小部分属性)。 但是,消息越大,解码速度越慢。因此,您可能需要利用并行性来更快地解码消息块。 所有这些,都有可能达到超过100米的消息/秒

相关问题 更多 >

    热门问题