我正在创建一个存根,该存根连接到一个服务器,该服务器以特定的时间间隔将数据流化,然后将其上载到TSDB。我已经实现了批处理来优化上载,但是如果在一个时间间隔内流出的数据量与批处理大小不一致,那么有些数据将在下一个时间间隔之前上载,这是我不希望的。有没有办法在gRPC存根上检查流是否为空?在
class DialInClient(object):
def __init__(self, host, port, timeout=100000000, user='root', password='lablab'):
self._host = host
self._port = port
self._timeout = float(timeout)
self._channel = None
self._cisco_ems_stub = None
self._connected = False
self._metadata = [('username', user), ('password', password)]
def subscribe(self, sub_id):
sub_args = CreateSubsArgs(ReqId=1, encode=3, subidstr=sub_id)
stream = self._cisco_ems_stub.CreateSubs(sub_args, timeout=self._timeout, metadata=self._metadata)
for segment in stream:
yield segment
def connect(self):
self._channel = grpc.insecure_channel(':'.join([self._host,self._port]))
try:
grpc.channel_ready_future(self._channel).result(timeout=10)
self._connected = True
except grpc.FutureTimeoutError as e:
raise DeviceFailedToConnect from e
else:
self._cisco_ems_stub = gRPCConfigOperStub(self._channel)
如果我设置了一个低超时,整个通道就会断开连接,我想在for循环中添加某种超时,看看我是否在1秒内得到另一个片段,从而告诉我的另一部分,这就是结束,并且在没有完整的批处理大小的情况下上传。在
GRPC中本机不存在这样的机制,但是
threading
库应该允许您在批处理满之前发送它们。我已经包含了python GRPC hello world example的一个修改版本,让您了解如何实现这一点。在相关问题 更多 >
编程相关推荐