Google Cloud PubSub:不发送/接收来自Cloud Functions的所有消息

苏拉夫·马拉(Sulav Malla)

摘要:我的客户代码通过将消息发布到发布/订阅主题来触发861后台Google Cloud Function。每个Cloud Function都执行一个任务,将结果上传到Google Storage,然后将消息发布到客户端代码正在监听的另一个Pub / Sub主题。尽管执行了所有Cloud Functions(通过Google Storage中的结果数验证),客户端代码不会接收所有消息。

服务器端:我有一个后台Google Cloud Function,每次将消息发布到TRIGGER Pub / Sub主题时都会触发。消息数据的自定义属性根据功能执行特定任务的方式充当功能参数。然后将结果上传到Google Storage中的存储桶,并将一条消息(带有taskID和执行时间详细信息)发布到RESULTS Pub / Sub主题(与用于触发此功能的消息不同)。

客户端:我需要执行861个不同的任务,这需要使用861个稍微不同的输入来调用Cloud Function。这些任务是相似的,云功能执行它们需要20秒钟至2分钟(中位数约为1分钟)。我为此创建了一个python脚本,该脚本是从Google Cloud Shell(或本地计算机Shell)运行的。客户端python脚本向TRIGGER Pub / Sub主题发布861条消息,该消息同时触发了多个Cloud Functions,每个函数都在[0,860]中传递了唯一的taskID。然后,客户端python脚本以“同步提取”方式轮询RESULTS Pub / Sub主题,以查找任何消息。执行任务后,Cloud Function将使用唯一的taskID和计时详细信息将消息发布到RESULTS Pub / Sub主题。客户端使用此唯一的taskID来标识消息来自哪个任务。它还有助于识别被丢弃的重复消息。

基本步骤

  1. 客户端python脚本将861条消息(每条消息具有唯一的taskID)发布到TRIGGER Pub / Sub主题,并等待来自Cloud Function的结果消息。
  2. 调用了861个不同的Cloud Functions,每个函数都执行一个任务,将结果上传到Google Storage,并将消息(带有taskID和执行时间详细信息)发布到RESULTS Pub / Sub主题。
  3. 客户端同步获取所有消息,并将任务标记为完成。

问题:当客户端从RESULTS Pub / Sub主题轮询消息时,我没有收到所有taskID的消息。我确定Cloud Function已被调用并正确执行(我在Google存储桶中有861个结果)。我重复了几次,每次都发生。奇怪的是,每次运行时丢失的taskID的数量都会更改,并且不同的taskID也会丢失。我还跟踪收到的重复taskID的数量。表格中给出了5次独立运行所接收,丢失和重复的唯一taskID的数量。

SN   # of Tasks  Received  Missing  Repeated
1     861          860      1        25
2     861          840      21       3
3     861          851      10       1
4     861          837      24       3
5     861          856      5        1

我不确定此问题可能来自何处。考虑到数字的随机性以及缺少的taskID,我怀疑Pub / Sub至少一次传递逻辑中存在一些错误。如果在Cloud Function中,我睡了几秒钟而不是执行任务(例如使用time.sleep(5)),则一切正常(我在客户端收到了所有861 taskID)。

重现此问题的代码。

在下面,main.pyrequirements.txt一起作为client.py客户端代码部署为Google Cloud Function 用100个并发任务运行客户端,python client.py 100任务重复5次。每次都会丢失不同数量的taskID。

requirements.txt

google-cloud-pubsub

main.py

"""
This file is deployed as Google Cloud Function. This function starts,
sleeps for some seconds and pulishes back the taskID.

Deloyment:
    gcloud functions deploy gcf_run --runtime python37 --trigger-topic <TRIGGER_TOPIC> --memory=128MB --timeout=300s
"""

import time
from random import randint
from google.cloud import pubsub_v1

# Global variables
project_id = "<Your Google Cloud Project ID>"  # Your Google Cloud Project ID
topic_name = "<RESULTS_TOPIC>"  # Your Pub/Sub topic name


def gcf_run(data, context):
    """Background Cloud Function to be triggered by Pub/Sub.
    Args:
         data (dict): The dictionary with data specific to this type of event.
         context (google.cloud.functions.Context): The Cloud Functions event
         metadata.
    """

    # Message should contain taskID (in addition to the data)
    if 'attributes' in data:
        attributes = data['attributes']
        if 'taskID' in attributes:
            taskID = attributes['taskID']
        else:
            print('taskID missing!')
            return
    else:
        print('attributes missing!')
        return

    # Sleep for a random time beteen 30 seconds to 1.5 minutes
    print("Start execution for {}".format(taskID))
    sleep_time = randint(30, 90)  # sleep for this many seconds
    time.sleep(sleep_time)  # sleep for few seconds

    # Marks this task complete by publishing a message to Pub/Sub.
    data = u'Message number {}'.format(taskID)
    data = data.encode('utf-8')  # Data must be a bytestring
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_name)
    publisher.publish(topic_path, data=data, taskID=taskID)

    return

client.py

"""
The client code creates the given number of tasks and publishes to Pub/Sub,
which in turn calls the Google Cloud Functions concurrently.
Run:
    python client.py 100
"""

from __future__ import print_function
import sys
import time
from google.cloud import pubsub_v1

# Global variables
project_id = "<Google Cloud Project ID>" # Google Cloud Project ID
topic_name = "<TRIGGER_TOPIC>"    # Pub/Sub topic name to publish
subscription_name = "<subscriber to RESULTS_TOPIC>"  # Pub/Sub subscription name
num_experiments = 5  # number of times to repeat the experiment
time_between_exp = 120.0 # number of seconds between experiments

# Initialize the Publisher (to send commands that invoke Cloud Functions)
# as well as Subscriber (to receive results written by the Cloud Functions)
# Configure the batch to publish as soon as there is one kilobyte
# of data or one second has passed.
batch_settings = pubsub_v1.types.BatchSettings(
    max_bytes=1024,  # One kilobyte
    max_latency=1,   # One second
)
publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path(project_id, topic_name)

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)


class Task:
    """
    A task which will execute the Cloud Function once.

    Attributes:
        taskID (int)       : A unique number given to a task (starting from 0).
        complete (boolean) : Flag to indicate if this task has completed.
    """
    def __init__(self, taskID):
        self.taskID = taskID
        self.complete = False

    def start(self):
        """
        Start the execution of Cloud Function by publishing a message with
        taskID to the Pub/Sub topic.
        """
        data = u'Message number {}'.format(self.taskID)
        data = data.encode('utf-8')  # Data must be a bytestring
        publisher.publish(topic_path, data=data, taskID=str(self.taskID))

    def end(self):
        """
        Mark the end of this task.
            Returns (boolean):
                True if normal, False if task was already marked before.
        """
        # If this task was not complete, mark it as completed
        if not self.complete:
            self.complete = True
            return True

        return False
    # [END of Task Class]


def createTasks(num_tasks):
    """
    Create a list of tasks and return it.
        Args:
            num_tasks (int) : Number of tasks (Cloud Function calls)
        Returns (list):
            A list of tasks.
    """
    all_tasks = list()
    for taskID in range(0, num_tasks):
        all_tasks.append(Task(taskID=taskID))

    return all_tasks


def receiveResults(all_tasks):
    """
    Receives messages from the Pub/Sub subscription. I am using a blocking
    Synchronous Pull instead of the usual asynchronous pull with a callback
    funtion as I rely on a polling pattern to retrieve messages.
    See: https://cloud.google.com/pubsub/docs/pull
        Args:
            all_tasks (list) : List of all tasks.
    """
    num_tasks = len(all_tasks)
    total_msg_received = 0  # track the number of messages received
    NUM_MESSAGES = 10  # maximum number of messages to pull synchronously
    TIMEOUT = 600.0    # number of seconds to wait for response (10 minutes)

    # Keep track of elapsed time and exit if > TIMEOUT
    __MyFuncStartTime = time.time()
    __MyFuncElapsedTime = 0.0

    print('Listening for messages on {}'.format(subscription_path))
    while (total_msg_received < num_tasks) and (__MyFuncElapsedTime < TIMEOUT):
        # The subscriber pulls a specific number of messages.
        response = subscriber.pull(subscription_path,
            max_messages=NUM_MESSAGES, timeout=TIMEOUT, retry=None)
        ack_ids = []

        # Keep track of all received messages
        for received_message in response.received_messages:
            if received_message.message.attributes:
                attributes = received_message.message.attributes
                taskID = int(attributes['taskID'])
                if all_tasks[taskID].end():
                    # increment count only if task completes the first time
                    # if False, we received a duplicate message
                    total_msg_received += 1
                #     print("Received taskID = {} ({} of {})".format(
                #         taskID, total_msg_received, num_tasks))
                # else:
                #     print('REPEATED: taskID {} was already marked'.format(taskID))
            else:
                print('attributes missing!')

            ack_ids.append(received_message.ack_id)

        # Acknowledges the received messages so they will not be sent again.
        if ack_ids:
            subscriber.acknowledge(subscription_path, ack_ids)

        time.sleep(0.2)  # Wait 200 ms before polling again
        __MyFuncElapsedTime = time.time() - __MyFuncStartTime
        # print("{} s elapsed. Listening again.".format(__MyFuncElapsedTime))

    # if total_msg_received != num_tasks, function exit due to timeout
    if total_msg_received != num_tasks:
        print("WARNING: *** Receiver timed out! ***")
    print("Received {} messages out of {}. Done.".format(
        total_msg_received, num_tasks))


def main(num_tasks):
    """
    Main execution point of the program
    """

    for experiment_num in range(1, num_experiments + 1):
        print("Starting experiment {} of {} with {} tasks".format(
            experiment_num, num_experiments, num_tasks))
        # Create all tasks and start them
        all_tasks = createTasks(num_tasks)
        for task in all_tasks:     # Start all tasks
            task.start()
        print("Published {} taskIDs".format(num_tasks))

        receiveResults(all_tasks)  # Receive message from Pub/Sub subscription

        print("Waiting {} seconds\n\n".format(time_between_exp))
        time.sleep(time_between_exp)  # sleep between experiments


if __name__ == "__main__":
    if(len(sys.argv) != 2):
        print("usage: python client.py  <num_tasks>")
        print("    num_tasks: Number of concurrent Cloud Function calls")
        sys.exit()

    num_tasks = int(sys.argv[1])
    main(num_tasks)
丹尼尔·柯林斯

在您的云功能中,此行:

Publisher.publish(topic_path,data = data,taskID = taskID)

您不是在等待Publisher.publish返回的未来。这意味着您无法保证当您离开该gcf_run函数的结尾时,实际上已经在该主题上进行了发布,但是无论如何,TRIGGER主题云函数订阅上的消息还是会被确认。

相反,要等到发布发生云功能终止时,这应该是:

publisher.publish(topic_path, data=data, taskID=taskID).result()

您还应该避免在每次函数调用时启动和关闭发布者客户端,而应将客户端作为全局变量。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Google Cloud Messaging-发送消息

来自分类Dev

在Google Cloud PubSub中获取单个消息的大小

来自分类Dev

Google Cloud Logging + Google流利的丢弃消息

来自分类Dev

没有名为providers.google.cloud.operators.pubsub的模块:Google Cloud Composer

来自分类Dev

没有Google帐户的Google Cloud Endpoints

来自分类Dev

没有Google帐户的Google Cloud Endpoints

来自分类Dev

GCP Cloud Function无法正确接收/确认PubSub消息

来自分类Dev

Cloud Functions部署中没有GOOGLE_APPLICATION_CREDENTIALS

来自分类Dev

删除Google Cloud Storage文件夹下的所有文件

来自分类Dev

删除Google Cloud Storage文件夹下的所有文件

来自分类Dev

删除 Google Cloud 项目中的所有资源

来自分类Dev

Google Cloud PubSub-如何将多个参数发送到Cloud Function

来自分类Dev

Google Cloud Functions是否可以启用CORS?

来自分类Dev

以编程方式验证对Google Cloud Functions的调用

来自分类Dev

Google Cloud Functions:监视HTTP状态代码

来自分类Dev

动态更新Google Cloud Functions的变量

来自分类Dev

基于日志的Google Cloud Functions警报

来自分类Dev

Google Cloud Functions 数据存储连接问题

来自分类Dev

从 Firebase 获取数据 - Google Cloud Functions

来自分类Dev

Google Cloud Functions onFinalize() 上下文

来自分类Dev

从 Google Cloud Functions 发送 HTTP 请求是否有请求数限制?

来自分类Dev

Google Cloud Spanner和Cloud SQL有什么区别?

来自分类Dev

设备停止接收来自Google云消息传递的推送通知

来自分类Dev

使用DoFn使用Cloud Dataflow从PubSub写入Google Cloud Storage

来自分类Dev

Google Cloud PubSub在异步请求中侦听消息时抛出504 Deadline Exceeded错误

来自分类Dev

Google Cloud PubSub 访问查询参数

来自分类Dev

带有Firebase的Google Cloud Platform

来自分类Dev

带有Javascript的Google Cloud Speech

来自分类Dev

带有ChildProcessError的Google Cloud Function错误

Related 相关文章

  1. 1

    Google Cloud Messaging-发送消息

  2. 2

    在Google Cloud PubSub中获取单个消息的大小

  3. 3

    Google Cloud Logging + Google流利的丢弃消息

  4. 4

    没有名为providers.google.cloud.operators.pubsub的模块:Google Cloud Composer

  5. 5

    没有Google帐户的Google Cloud Endpoints

  6. 6

    没有Google帐户的Google Cloud Endpoints

  7. 7

    GCP Cloud Function无法正确接收/确认PubSub消息

  8. 8

    Cloud Functions部署中没有GOOGLE_APPLICATION_CREDENTIALS

  9. 9

    删除Google Cloud Storage文件夹下的所有文件

  10. 10

    删除Google Cloud Storage文件夹下的所有文件

  11. 11

    删除 Google Cloud 项目中的所有资源

  12. 12

    Google Cloud PubSub-如何将多个参数发送到Cloud Function

  13. 13

    Google Cloud Functions是否可以启用CORS?

  14. 14

    以编程方式验证对Google Cloud Functions的调用

  15. 15

    Google Cloud Functions:监视HTTP状态代码

  16. 16

    动态更新Google Cloud Functions的变量

  17. 17

    基于日志的Google Cloud Functions警报

  18. 18

    Google Cloud Functions 数据存储连接问题

  19. 19

    从 Firebase 获取数据 - Google Cloud Functions

  20. 20

    Google Cloud Functions onFinalize() 上下文

  21. 21

    从 Google Cloud Functions 发送 HTTP 请求是否有请求数限制?

  22. 22

    Google Cloud Spanner和Cloud SQL有什么区别?

  23. 23

    设备停止接收来自Google云消息传递的推送通知

  24. 24

    使用DoFn使用Cloud Dataflow从PubSub写入Google Cloud Storage

  25. 25

    Google Cloud PubSub在异步请求中侦听消息时抛出504 Deadline Exceeded错误

  26. 26

    Google Cloud PubSub 访问查询参数

  27. 27

    带有Firebase的Google Cloud Platform

  28. 28

    带有Javascript的Google Cloud Speech

  29. 29

    带有ChildProcessError的Google Cloud Function错误

热门标签

归档