发布者等待消费消息,同时它应该做两件事(发布和订阅订阅者的回复)。)

什塔尔·贾达夫

发布者在使用第一个 message_reply 后被阻止。它不会向订阅者发送任何消息。

我已经尝试过,如果我不使用“start_sumption()”方法,发布者会连续发送数据,但不会从订阅者打印reply_message。如果我使用 'start_sumption()' 方法,它只会阻止发布者并等待。

#publisher.py
#!/usr/bin/env python
import pika
import sys
import uuid

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
channel.queue_declare(queue='reply_queue', durable=True)

message = "Hello World!"
corr_id = str(uuid.uuid4())

def on_response(ch, method, properties, body):
    print("----- On_response -----")
    print("Received CORR_ID : ", properties.correlation_id)
    if(corr_id == properties.correlation_id):
        resp = body.decode('utf-8')
        print("RESPONSE : ", resp)
    ch.basic_ack(delivery_tag=method.delivery_tag)

def consume_response(channel):
    channel.basic_consume(queue='reply_queue', on_message_callback=on_response)
    channel.start_consuming()

while True:
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message,
        properties=pika.BasicProperties(
            reply_to='reply_queue',
            correlation_id=corr_id
        ))
    print(" [x] Sent %r" % message)
    consume_response(channel)
#subscriber.py
#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body.decode('utf-8'))
    #time.sleep(body.count(b'.'))
    print(" [x] Done")
    print("CORR_ID : ",str(properties.correlation_id))
    print("Reply : ", str(properties.reply_to))
    ch.basic_ack(delivery_tag=method.delivery_tag)
    res = "Received "+str(body.decode('utf-8'))
    ch.basic_publish(
        exchange='',
        routing_key='reply_queue',
        properties = pika.BasicProperties(
            correlation_id=\
                properties.correlation_id
        ),
        body=res
    )


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()
Actual result is-
publisher                                           subscriber
   (1)----------publish message 'Hello'-------------->  |
    | <---------reply_to_publisher_queue---------------(2)
   (3)prints_the message
   (4)publisher waits for consuming messages

我想设计这个-

(expected)
publisher                                           subscriber
   (1)----------publish message 'Hello'-------------->  |
    | <---------reply_to_publisher_queue---------------(2)
   (3)prints_the message
   (4)----------publish message 'Hello'-------------->  |
    | <---------reply_to_publisher_queue---------------(5)
    .
    .
    .
   continues....

请帮忙。

什塔尔·贾达夫

channel.start_consuming()channel.connection.process_data_events(time_limit=1)in替换了该行publisher.py它按我的预期工作。

我也关注了https://github.com/pika/pika/issues/770

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

DirectProcessor 和 UnicastProcessor 可以在不应该订阅上游发布者时订阅。为什么?

来自分类Dev

Redis发布/订阅-发布者也是订阅者?

来自分类Dev

ros python发布者/订阅者

来自分类Dev

opentok:发布者和订阅者视频显示相同

来自分类Dev

nanomsg (nng) 中的多个发布者和订阅者

来自分类Dev

当发布者和处理者是同一对象时,我应该取消订阅事件吗?

来自分类Dev

适用于发布/订阅,单向和竞争消费者的Aeron消息传递模式

来自分类Dev

发布/订阅中的发布者应该是同步的还是异步的?

来自分类Dev

使用新的订阅者连接(MQTT / Mosquitto)显示来自发布者的错过的消息

来自分类Dev

发布者订阅者模式的现代替代方案

来自分类Dev

订阅者无法读取发布者的图片

来自分类Dev

使用$ .deferred作为发布者/订阅者?

来自分类Dev

ReactJS中的发布者/订阅者模型

来自分类Dev

处理多个主题,发布者和订阅者时出现错误+ WSO2MB

来自分类Dev

Android Opentok SDK 2.0,订阅者和发布者音频

来自分类Dev

如何从广播的Akka流中获取订阅者和发布者?

来自分类Dev

Nservice总线...发布者订阅者正在筛选订阅者

来自分类Dev

订阅返回的发布者后,如何触发流程?

来自分类Dev

如何在Redis Rails中订阅多个发布者?

来自分类Dev

NServiceBus可以将同一应用程序同时作为发布者/订阅者吗?

来自分类Dev

订阅服务器和发布者在ROS中的一个文件中

来自分类Dev

如何在Web套接字连接中以无反应流的形式从订阅者向发布者发送消息

来自分类Dev

Web日历发布(.ical或ics)是由发布者推送还是由订阅者推送?

来自分类Dev

使用qos 2进行发布的发布者得到经纪人或订阅者的确认

来自分类Dev

如何等待多个Flux和Mono发布者同时完成

来自分类Dev

如何等待多个Flux和Mono发布者同时完成

来自分类Dev

结合使用SwiftUI和自定义发布者-使用.assign订阅者时出现意外行为

来自分类Dev

AMQP Rabbitmq Nodejs-每次订阅者关闭时都会创建其他发布者

来自分类Dev

Swift Combine自定义发布者:是否存储对订阅者数组的引用?

Related 相关文章

  1. 1

    DirectProcessor 和 UnicastProcessor 可以在不应该订阅上游发布者时订阅。为什么?

  2. 2

    Redis发布/订阅-发布者也是订阅者?

  3. 3

    ros python发布者/订阅者

  4. 4

    opentok:发布者和订阅者视频显示相同

  5. 5

    nanomsg (nng) 中的多个发布者和订阅者

  6. 6

    当发布者和处理者是同一对象时,我应该取消订阅事件吗?

  7. 7

    适用于发布/订阅,单向和竞争消费者的Aeron消息传递模式

  8. 8

    发布/订阅中的发布者应该是同步的还是异步的?

  9. 9

    使用新的订阅者连接(MQTT / Mosquitto)显示来自发布者的错过的消息

  10. 10

    发布者订阅者模式的现代替代方案

  11. 11

    订阅者无法读取发布者的图片

  12. 12

    使用$ .deferred作为发布者/订阅者?

  13. 13

    ReactJS中的发布者/订阅者模型

  14. 14

    处理多个主题,发布者和订阅者时出现错误+ WSO2MB

  15. 15

    Android Opentok SDK 2.0,订阅者和发布者音频

  16. 16

    如何从广播的Akka流中获取订阅者和发布者?

  17. 17

    Nservice总线...发布者订阅者正在筛选订阅者

  18. 18

    订阅返回的发布者后,如何触发流程?

  19. 19

    如何在Redis Rails中订阅多个发布者?

  20. 20

    NServiceBus可以将同一应用程序同时作为发布者/订阅者吗?

  21. 21

    订阅服务器和发布者在ROS中的一个文件中

  22. 22

    如何在Web套接字连接中以无反应流的形式从订阅者向发布者发送消息

  23. 23

    Web日历发布(.ical或ics)是由发布者推送还是由订阅者推送?

  24. 24

    使用qos 2进行发布的发布者得到经纪人或订阅者的确认

  25. 25

    如何等待多个Flux和Mono发布者同时完成

  26. 26

    如何等待多个Flux和Mono发布者同时完成

  27. 27

    结合使用SwiftUI和自定义发布者-使用.assign订阅者时出现意外行为

  28. 28

    AMQP Rabbitmq Nodejs-每次订阅者关闭时都会创建其他发布者

  29. 29

    Swift Combine自定义发布者:是否存储对订阅者数组的引用?

热门标签

归档