接收重复消息的 Google PubSub Python 多订阅者客户端

丹妮尔·汉克斯

我有一个非常简单的应用程序,它启动一个 PubSub 订阅者 StreamingPull 客户端。我已将其部署在 Kubernetes 上,因此我可以进行扩展。当我部署了一个 pod 时,一切都按预期工作。当我扩展到 2 个容器时,我开始收到重复的消息。我知道一些小的重复消息是可以预料的,但几乎一半的消息,有时更多,被多次接收。

我的过程需要大约 600 毫秒来处理一条消息。订阅确认截止时间设置为 600 秒。我发布了 1000 条消息,不到一分钟就清空了订阅,但acknowledge_message_operation 指标显示约1500 次调用,其中有少量响应代码已过期。我的过程中没有失败,所有消息在处理时都被确认。日志显示两个容器同时收到了相同的消息。处理所有消息的时间远低于订阅的确认截止日期,并且 Python 客户端应该处理租约管理,所以我不知道为什么会有任何过期的消息。我也不明白为什么同时向多个订阅者客户端发送相同的消息。

最小工作示例:

import time

from google.cloud import pubsub_v1

PROJECT_ID = 'my-project'
PUBSUB_TOPIC_ID = 'duplicate-test'
PUBSUB_SUBSCRIPTION_ID = 'duplicate-test'

def subscribe(sleep_time=None):
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        PROJECT_ID, PUBSUB_SUBSCRIPTION_ID)

    def callback(message):
        print(message.data.decode())
        if sleep_time:
            time.sleep(sleep_time)
        print(f'acking {message.data.decode()}')
        message.ack()

    future = subscriber.subscribe(
        subscription_path, callback=callback)
    print(f'Listening for messages on {subscription_path}')
    future.result()


def publish(num_messages):
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(PROJECT_ID, PUBSUB_TOPIC_ID)
    for i in range(num_messages):
        publisher.publish(topic_path, str(i).encode())

在两个终端中,运行 subscribe(1)。在第三个终端中,运行 publish(200)。对我来说,这将在两个用户终端中提供重复项。

卡迈勒·阿布-霍恩

两个订阅者同时收到相同的消息是不寻常的,除非:

  1. 由于重试,消息发布了两次(因此就 Cloud Pub/Sub 而言,有两条消息)。在这种情况下,两条消息的内容将相同,但它们的消息 ID 将不同。因此,确保您查看服务提供的消息 ID 以确保消息确实是重复的可能是值得的。
  2. 订阅者在不同的订阅中,这意味着每个订阅者都会收到所有消息。

如果这两种情况都不是,那么重复应该相对较少。使用流式拉取(这是 Python 客户端库使用的)处理大量积压的小消息时,存在一种边缘情况基本上,如果非常小的消息以突发形式发布,然后订阅者消费该突发,则可能会看到您所看到的行为。所有消息最终都会被发送到两个订阅者之一,并且会被缓冲在未完成消息数量的流量控制限制之后。这些消息可能会超过它们的 ack 截止时间,从而导致重新传递,很可能会发送给其他订阅者。第一个订阅者的缓冲区中仍然有这些消息,并且也会看到这些消息。

但是,如果您始终看到两个新订阅者立即收到具有相同消息 ID 的相同消息,那么您应该联系 Google Cloud 支持并提供您的项目名称、订阅名称和消息 ID 示例。他们将能够更好地调查为什么会发生这种直接的重复。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Google PubSub Python 客户端中的内存泄漏

来自分类Dev

连续使用异步提取从Google PubSub接收消息

来自分类Dev

python pubsub订阅到期

来自分类Dev

python pubsub订阅多个主题

来自分类Dev

python pubsub订阅多个主题

来自分类Dev

Google Cloud PubSub:不发送/接收来自Cloud Functions的所有消息

来自分类Dev

使用 pika(python 客户端)通过 RabbitMQ 发布/订阅 MQTT 消息

来自分类Dev

python客户端的PubSub publish()函数中的动态属性数量

来自分类Dev

如何在Google Cloud App Engine上使用PubSub创建订阅者,以通过Publisher从Google Cloud App Engine Flex收听消息?

来自分类Dev

通过HTTP的python pubsub /消息队列?

来自分类Dev

在Dataflow Python中从PubSub读取AVRO消息

来自分类Dev

Firebase Cloud pubsub订阅停止监听消息

来自分类Dev

ChucK服务器不接收来自python-osc客户端的OSC消息

来自分类Dev

客户端使用python套接字接收两个单独的消息

来自分类Dev

客户端-服务器python:接收到的消息不能单独打印

来自分类Dev

ChucK服务器不接收来自python-osc客户端的OSC消息

来自分类Dev

HiveMQ Java阻止客户端订阅者不使用任何消息

来自分类Dev

在Google Cloud PubSub中获取单个消息的大小

来自分类Dev

GCP Pubsub 中的消息丢失和重复

来自分类Dev

部署后使用requirements_file参数后,python数据流作业不接受来自pubsub订阅的消息

来自分类Dev

Google API客户端GTMHTTPUploadFetcher已重复

来自分类Dev

Google PubSub和GCM

来自分类Dev

Google PubSub和GCM

来自分类Dev

使用Python客户端的Google BigQuery API

来自分类Dev

Google BigQuery - python 客户端 - 创建/管理作业

来自分类Dev

将 google api 客户端库导入 python/flask

来自分类Dev

在 Apache Beam 中导入 Google Firestore Python 客户端

来自分类Dev

接收消息 ID(未来)是否表示消息是在 PubSub 中发送的?

来自分类Dev

关闭客户端后,Android TCP Server仅显示来自Python客户端的消息

Related 相关文章

  1. 1

    Google PubSub Python 客户端中的内存泄漏

  2. 2

    连续使用异步提取从Google PubSub接收消息

  3. 3

    python pubsub订阅到期

  4. 4

    python pubsub订阅多个主题

  5. 5

    python pubsub订阅多个主题

  6. 6

    Google Cloud PubSub:不发送/接收来自Cloud Functions的所有消息

  7. 7

    使用 pika(python 客户端)通过 RabbitMQ 发布/订阅 MQTT 消息

  8. 8

    python客户端的PubSub publish()函数中的动态属性数量

  9. 9

    如何在Google Cloud App Engine上使用PubSub创建订阅者,以通过Publisher从Google Cloud App Engine Flex收听消息?

  10. 10

    通过HTTP的python pubsub /消息队列?

  11. 11

    在Dataflow Python中从PubSub读取AVRO消息

  12. 12

    Firebase Cloud pubsub订阅停止监听消息

  13. 13

    ChucK服务器不接收来自python-osc客户端的OSC消息

  14. 14

    客户端使用python套接字接收两个单独的消息

  15. 15

    客户端-服务器python:接收到的消息不能单独打印

  16. 16

    ChucK服务器不接收来自python-osc客户端的OSC消息

  17. 17

    HiveMQ Java阻止客户端订阅者不使用任何消息

  18. 18

    在Google Cloud PubSub中获取单个消息的大小

  19. 19

    GCP Pubsub 中的消息丢失和重复

  20. 20

    部署后使用requirements_file参数后,python数据流作业不接受来自pubsub订阅的消息

  21. 21

    Google API客户端GTMHTTPUploadFetcher已重复

  22. 22

    Google PubSub和GCM

  23. 23

    Google PubSub和GCM

  24. 24

    使用Python客户端的Google BigQuery API

  25. 25

    Google BigQuery - python 客户端 - 创建/管理作业

  26. 26

    将 google api 客户端库导入 python/flask

  27. 27

    在 Apache Beam 中导入 Google Firestore Python 客户端

  28. 28

    接收消息 ID(未来)是否表示消息是在 PubSub 中发送的?

  29. 29

    关闭客户端后,Android TCP Server仅显示来自Python客户端的消息

热门标签

归档