如何使用celery worker从SQS轮询消息,消息为JSON格式,worker无法解码该格式

克里斯蒂斯塔瓦(Dristy Srivastava)

如何使用celery worker从SQS轮询消息,消息为JSON格式,worker无法解码该格式

注意:这些消息不会使用芹菜节拍发送到SQS,此队列是从SNS订阅的

我的celery worker命令是:celery worker -A status_handling -l info -Q es_status_test

Msg in Queue:

{
  "Type" : "Notification",
  "MessageId" : "f7e40fd9-8f92-59c5-afd9-5a1847aaae57",
  "TopicArn" : "***",
  "Message" : "{\"SESResponseStatusCode\": 200, \"Status\": \"Delivered\", \"Message\": \"Email sent successfully.\", \"MessageId\": \"a59e85a2-8b7a-4b49-9354-0a7a4170b0c0\", \"Uuid\": null}",
  "Timestamp" : "2019-08-05T06:00:24.943Z",
  "SignatureVersion" : "1",
  "Signature" : "pass",
  "SigningCertURL" : "pass",
  "UnsubscribeURL" : "pass"
}

错误即将到来:

[2019-08-04 23:00:25,116: CRITICAL/MainProcess] Unrecoverable error: JSONDecodeError('Expecting value: line 1 column 1 (char 0)')
Traceback (most recent call last):
  File "/home/vagrant/env/lib/python3.7/site-packages/celery/worker/worker.py", line 205, in start
    self.blueprint.start(self)
  File "/home/vagrant/env/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/home/vagrant/env/lib/python3.7/site-packages/celery/bootsteps.py", line 369, in start
    return self.obj.start()
  File "/home/vagrant/env/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 318, in start
    blueprint.start(self)
  File "/home/vagrant/env/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/home/vagrant/env/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 596, in start
    c.loop(*c.loop_args())
  File "/home/vagrant/env/lib/python3.7/site-packages/celery/worker/loops.py", line 91, in asynloop
    next(loop)
  File "/home/vagrant/env/lib/python3.7/site-packages/kombu/asynchronous/hub.py", line 362, in create_loop
    cb(*cbargs)
  File "/home/vagrant/env/lib/python3.7/site-packages/kombu/asynchronous/http/curl.py", line 111, in on_readable
    return self._on_event(fd, _pycurl.CSELECT_IN)
  File "/home/vagrant/env/lib/python3.7/site-packages/kombu/asynchronous/http/curl.py", line 124, in _on_event
    self._process_pending_requests()
  File "/home/vagrant/env/lib/python3.7/site-packages/kombu/asynchronous/http/curl.py", line 130, in _process_pending_requests
    self._process(curl)
  File "/home/vagrant/env/lib/python3.7/site-packages/kombu/asynchronous/http/curl.py", line 178, in _process
    buffer=buffer, effective_url=effective_url, error=error,
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 177, in __call__
    svpending(*ca, **ck)
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 170, in __call__
    return self.throw()
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 167, in __call__
    retval = fun(*final_args, **final_kwargs)
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/funtools.py", line 100, in _transback
    return callback(ret)
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 170, in __call__
    return self.throw()
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 167, in __call__
    retval = fun(*final_args, **final_kwargs)
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/funtools.py", line 100, in _transback
    return callback(ret)
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 170, in __call__
    return self.throw()
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 167, in __call__
    retval = fun(*final_args, **final_kwargs)
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/funtools.py", line 98, in _transback
    callback.throw()
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/funtools.py", line 96, in _transback
    ret = filter_(*args + (ret,), **kwargs)
  File "/home/vagrant/env/lib/python3.7/site-packages/kombu/transport/SQS.py", line 370, in _on_messages_ready
    msg_parsed = self._message_to_python(msg, qname, queue)
  File "/home/vagrant/env/lib/python3.7/site-packages/kombu/transport/SQS.py", line 236, in _message_to_python
    payload = loads(bytes_to_str(body))
  File "/home/vagrant/env/lib/python3.7/site-packages/kombu/utils/json.py", line 94, in loads
    return stdjson.loads(s)
  File "/usr/local/lib/python3.7/json/__init__.py", line 348, in loads
    return _default_decoder.decode(s)
  File "/usr/local/lib/python3.7/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/lib/python3.7/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
丹尼尔·赫珀

Celery中的SQS支持仅用作Celery特定消息的传输机制。您不能使用Celery消耗任意SQS消息。

相反,我建议编写一个自定义Django管理命令,在其中使用boto3库轮询SQS队列。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

如何在elasticbeanstalk worker tomcat中获取SQS消息属性?

来自分类Dev

如何在elasticbeanstalk worker tomcat中获取SQS消息属性?

来自分类Dev

使用Ruby SDK从SNS接收消息时,AWS SQS JSON格式

来自分类Dev

如何将Rails日志消息自定义为JSON格式

来自分类Dev

如何使用 slackr 改进 slack 消息的格式?

来自分类Dev

如何在抖动中将Json解码为DateTime格式?

来自分类Dev

在 Elastic Beanstalk 上启动 SQS celery worker

来自分类Dev

如何在Python中产生JSON格式的Kafka消息

来自分类Dev

如何将JSON格式的debezium消息转换为可以加载到Redshift中的消息

来自分类Dev

如何在AWS Elastic Beanstalk上运行celery worker?

来自分类Dev

如何在Google App Engine上部署Celery Worker

来自分类Dev

使用AJAX轮询读取Celery任务进度

来自分类Dev

如何使用wp_mail()发送html格式的消息?

来自分类Dev

使用golang JSON解码PubNub消息

来自分类Dev

使用HttpResponseMessage显示HTML格式的错误消息

来自分类Dev

使用Docker的Postgresql上的消息格式无效

来自分类Dev

Celery + Django-使用Django消息框架进行状态和报告成功或失败的轮询任务

来自分类Dev

如何将接收到的消息设置为单个数组格式

来自分类Dev

是否应在Node.js + Koa应用程序中使用AWS SQS消息轮询?

来自分类Dev

是否应在Node.js + Koa应用程序中使用AWS SQS消息轮询?

来自分类Dev

Python:模拟无法在celery任务中使用

来自分类Dev

如何运行播放命令?我收到错误消息“播放失败的格式:无法打开输入文件”

来自分类Dev

尽管使用setTimeout,Web Worker只返回一条消息,此后没有消息

来自分类Dev

使用Java将SOAP消息格式转换为套接字消息格式,反之亦然

来自分类Dev

Android如何开发SMS App,它将接收用户消息并以JSON格式发送

来自分类Dev

使用Spring将AMQP消息解码为Map

来自分类Dev

无法使用boto3将消息写入SQS

来自分类Dev

如何使用celery守护程序调用celery任务

来自分类Dev

如何将消息总线消息类型设置为JSON?

Related 相关文章

  1. 1

    如何在elasticbeanstalk worker tomcat中获取SQS消息属性?

  2. 2

    如何在elasticbeanstalk worker tomcat中获取SQS消息属性?

  3. 3

    使用Ruby SDK从SNS接收消息时,AWS SQS JSON格式

  4. 4

    如何将Rails日志消息自定义为JSON格式

  5. 5

    如何使用 slackr 改进 slack 消息的格式?

  6. 6

    如何在抖动中将Json解码为DateTime格式?

  7. 7

    在 Elastic Beanstalk 上启动 SQS celery worker

  8. 8

    如何在Python中产生JSON格式的Kafka消息

  9. 9

    如何将JSON格式的debezium消息转换为可以加载到Redshift中的消息

  10. 10

    如何在AWS Elastic Beanstalk上运行celery worker?

  11. 11

    如何在Google App Engine上部署Celery Worker

  12. 12

    使用AJAX轮询读取Celery任务进度

  13. 13

    如何使用wp_mail()发送html格式的消息?

  14. 14

    使用golang JSON解码PubNub消息

  15. 15

    使用HttpResponseMessage显示HTML格式的错误消息

  16. 16

    使用Docker的Postgresql上的消息格式无效

  17. 17

    Celery + Django-使用Django消息框架进行状态和报告成功或失败的轮询任务

  18. 18

    如何将接收到的消息设置为单个数组格式

  19. 19

    是否应在Node.js + Koa应用程序中使用AWS SQS消息轮询?

  20. 20

    是否应在Node.js + Koa应用程序中使用AWS SQS消息轮询?

  21. 21

    Python:模拟无法在celery任务中使用

  22. 22

    如何运行播放命令?我收到错误消息“播放失败的格式:无法打开输入文件”

  23. 23

    尽管使用setTimeout,Web Worker只返回一条消息,此后没有消息

  24. 24

    使用Java将SOAP消息格式转换为套接字消息格式,反之亦然

  25. 25

    Android如何开发SMS App,它将接收用户消息并以JSON格式发送

  26. 26

    使用Spring将AMQP消息解码为Map

  27. 27

    无法使用boto3将消息写入SQS

  28. 28

    如何使用celery守护程序调用celery任务

  29. 29

    如何将消息总线消息类型设置为JSON?

热门标签

归档