正在检查gRPC流何时为空或不是流式数据流

2024-04-27 03:03:57 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在创建一个存根,该存根连接到一个服务器,该服务器以特定的时间间隔将数据流化,然后将其上载到TSDB。我已经实现了批处理来优化上载,但是如果在一个时间间隔内流出的数据量与批处理大小不一致,那么有些数据将在下一个时间间隔之前上载,这是我不希望的。有没有办法在gRPC存根上检查流是否为空?在

class DialInClient(object):
    def __init__(self, host, port, timeout=100000000, user='root', password='lablab'):
        self._host = host
        self._port = port
        self._timeout = float(timeout)
        self._channel = None
        self._cisco_ems_stub = None
        self._connected = False
        self._metadata = [('username', user), ('password', password)]

    def subscribe(self, sub_id):
        sub_args = CreateSubsArgs(ReqId=1, encode=3, subidstr=sub_id)
        stream = self._cisco_ems_stub.CreateSubs(sub_args, timeout=self._timeout, metadata=self._metadata)
        for segment in stream:
            yield segment 

    def connect(self):
        self._channel = grpc.insecure_channel(':'.join([self._host,self._port]))
        try:
            grpc.channel_ready_future(self._channel).result(timeout=10)
            self._connected = True
        except grpc.FutureTimeoutError as e:
            raise DeviceFailedToConnect from e
        else:
            self._cisco_ems_stub = gRPCConfigOperStub(self._channel)

如果我设置了一个低超时,整个通道就会断开连接,我想在for循环中添加某种超时,看看我是否在1秒内得到另一个片段,从而告诉我的另一部分,这就是结束,并且在没有完整的批处理大小的情况下上传。在


Tags: selfhostgrpc间隔portdef时间timeout
1条回答
网友
1楼 · 发布于 2024-04-27 03:03:57

GRPC中本机不存在这样的机制,但是threading库应该允许您在批处理满之前发送它们。我已经包含了python GRPC hello world example的一个修改版本,让您了解如何实现这一点。在

from __future__ import print_function                                                                                                        

import grpc                                                                                                                                  

import helloworld_pb2
import helloworld_pb2_grpc                                                                                                                   

import threading
from six.moves import queue
import time 

# 10 second batches    
BATCH_PERIOD = 10.0

def collect_responses(resp_queue, finished):                                                                                                 
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = helloworld_pb2_grpc.GreeterStub(channel)                                                                                      
        for i, response in enumerate(stub.SayHello(helloworld_pb2.HelloRequest(name='you', num_greetings="100"))):                           
            resp_queue.put(response)                                                                                                         
    finished.set()                                                                                                                           

def is_batch_end(batch_start):                                                                                                               
    return time.time() - batch_start < BATCH_PERIOD                                                                                          

def get_remaining_time(time_start):                                                                                                          
    return (time_start + BATCH_PERIOD) - time.time()

def batch_responses(resp_queue, finished):
    batch_num = 0
    while True:        
        batch_resps = []
        batch_start = time.time()
        remaining_time = get_remaining_time(batch_start)                                                                                     
        while remaining_time > 0.0 and not finished.is_set():
            try:       
                batch_resps.append(resp_queue.get())                                                                                         
            except queue.Empty:                                                                                                              
                pass                                                                                                                         
            finally:
                remaining_time = get_remaining_time(batch_start)
        print("Batch {} ({}):".format(batch_num + 1, len(batch_resps)))                                                                      
        for resp in batch_resps:                                                                                                             
            print("  '{}'".format(resp.message))
        batch_num += 1

def run():                                                                                                                                   
    resp_queue = queue.Queue()
    finished = threading.Event()                                                                                                             
    client_thread = threading.Thread(target=collect_responses, args=(resp_queue, finished))                                                  
    client_thread.start()
    batch_responses(resp_queue, finished)                                                                                                    
    client_thread.join()

if __name__ == '__main__':                                                                                                                   
    run() 

相关问题 更多 >