我有一个非常简单的应用程序,它启动一个 PubSub 订阅者 StreamingPull 客户端。我已将其部署在 Kubernetes 上,因此我可以进行扩展。当我部署了一个 pod 时,一切都按预期工作。当我扩展到 2 个容器时,我开始收到重复的消息。我知道一些小的重复消息是可以预料的,但几乎一半的消息,有时更多,被多次接收。
我的过程需要大约 600 毫秒来处理一条消息。订阅确认截止时间设置为 600 秒。我发布了 1000 条消息,不到一分钟就清空了订阅,但acknowledge_message_operation 指标显示约1500 次调用,其中有少量响应代码已过期。我的过程中没有失败,所有消息在处理时都被确认。日志显示两个容器同时收到了相同的消息。处理所有消息的时间远低于订阅的确认截止日期,并且 Python 客户端应该处理租约管理,所以我不知道为什么会有任何过期的消息。我也不明白为什么同时向多个订阅者客户端发送相同的消息。
最小工作示例:
import time
from google.cloud import pubsub_v1
PROJECT_ID = 'my-project'
PUBSUB_TOPIC_ID = 'duplicate-test'
PUBSUB_SUBSCRIPTION_ID = 'duplicate-test'
def subscribe(sleep_time=None):
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
PROJECT_ID, PUBSUB_SUBSCRIPTION_ID)
def callback(message):
print(message.data.decode())
if sleep_time:
time.sleep(sleep_time)
print(f'acking {message.data.decode()}')
message.ack()
future = subscriber.subscribe(
subscription_path, callback=callback)
print(f'Listening for messages on {subscription_path}')
future.result()
def publish(num_messages):
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, PUBSUB_TOPIC_ID)
for i in range(num_messages):
publisher.publish(topic_path, str(i).encode())
在两个终端中,运行 subscribe(1)。在第三个终端中,运行 publish(200)。对我来说,这将在两个用户终端中提供重复项。
两个订阅者同时收到相同的消息是不寻常的,除非:
如果这两种情况都不是,那么重复应该相对较少。在使用流式拉取(这是 Python 客户端库使用的)处理大量积压的小消息时,存在一种边缘情况。基本上,如果非常小的消息以突发形式发布,然后订阅者消费该突发,则可能会看到您所看到的行为。所有消息最终都会被发送到两个订阅者之一,并且会被缓冲在未完成消息数量的流量控制限制之后。这些消息可能会超过它们的 ack 截止时间,从而导致重新传递,很可能会发送给其他订阅者。第一个订阅者的缓冲区中仍然有这些消息,并且也会看到这些消息。
但是,如果您始终看到两个新订阅者立即收到具有相同消息 ID 的相同消息,那么您应该联系 Google Cloud 支持并提供您的项目名称、订阅名称和消息 ID 示例。他们将能够更好地调查为什么会发生这种直接的重复。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句