使用Python客户机API运行Google云Pub/Sub同步拉入

2024-09-30 03:25:56 发布

您现在位置:Python中文网/ 问答频道 /正文

我在Python客户机API中找不到returnimmediate标志。 有什么具体的原因吗? 在Python中,有没有其他方法可以同步从订阅中提取排队消息?在


Tags: 方法api消息客户机标志原因排队returnimmediate
3条回答

谷歌没有提供这样的服务。但是您可以通过实现自己的队列轻松地解决它

from Queue import Queue

from google.cloud import pubsub

subscriber = pubsub.SubscriberClient()
topic = "projects/newproject-xxxxx/topics/tarunlalwani"
subscription_name = 'projects/newproject-xxxxx/subscriptions/sub1'

class SynchronousSubscription(object):

    def callback(self, message):
        print(message.data)
        message.ack()
        self.pending_messages.put(message)

    def __init__(self, subscription):
        self.subscription_future = subscriber.subscribe(subscription_name, self.callback)
        self.pending_messages = Queue()

    def consume_sync(self):
        return self.pending_messages.get()

sub = SynchronousSubscription(subscription_name)
data = sub.consume_sync()

当我测试的时候它对我很有用

Working Example

扩展上一个答案:

当前存在一个具有所需功能的函数,以下是来自subscriber_client.py的文档:

def pull(self,
         subscription,
         max_messages,
         return_immediately=None,
         options=None):
    ...
Args:
    ...
      return_immediately (bool): If this field set to true, the system 
        will respond immediately even if
        it there are no messages available to return in the ``Pull`` response.
        Otherwise, the system may wait (for a bounded amount of time) until at
        least one message is available, rather than returning no messages. The
        client may cancel the request if it does not wish to wait any longer for
        the response.

但是,首先读取this comment的执行返回两个异常(我显示的异常是两个异常的聚合):

RetryError(Exception occurred in retry method that was not classified as transient, caused by <_Rendezvous of RPC that terminated with (StatusCode.INVALID_ARGUMENT, A required argument is missing in the request: (argument="max_messages").)>)

如果您需要更多详细信息,可以使用related issue。在

云发布/订阅客户端库并不直接公开pull方法,而是提供了一个异步API,旨在高效地接收消息。如果您有特定的原因需要调用同步pull方法(包括使用returnimmediate属性),那么您需要生成基于gRPC的库。您将需要获取service definition,然后generate the client。或者,您可以使用REST API version of pull发出一个HTTP请求。在

相关问题 更多 >

    热门问题