如何使用Pyspark Streaming模块实现RabbitMQ使用者?

乔瑟夫·弗兰奎兹

我有一个Apache Spark集群和RabbitMQ代理,我想使用pyspark.streaming模块使用消息并计算一些指标

问题是我只找到了这个包,但是是在JavaScala中实现的除此之外,我没有在Python中找到任何示例或桥接实现

我有一个使用Pika实现的使用者,但是我不知道如何将有效载荷传递给我的StreamingContext

乔瑟夫·弗兰奎兹

此解决方案使用Spark Streaming中的pika异步使用者示例socketTextStream方法

  1. 下载示例并将其另存为.py文件
  2. 修改文件以使用您自己的RabbitMQ凭据和连接参数。就我而言,我必须修改Consumer课程
  3. 在下面,if __name__ == '__main__':我们需要使用HOST来打开一个套接字,该套接字与PORT您与Spark Streaming的TCP连接相对应。我们必须将方法sendall从套接字保存到变量中,然后将其传递给Consumer

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
      s.bind((HOST, PORT))
      s.listen(1)
      conn, addr = s.accept()
      dispatcher = conn.sendall #assigning sendall to dispatcher variable
    consumer = Consumer(dispatcher)
    try:
      consumer.run()
    except Exception as e:
      consumer.stop()
      s.close()
    
  4. 修改__init__Consumer中方法以传递dispatcher

    def __init__(self,dispatcher):
      self._connection = None
      self._channel = None
      self._closing = False
      self._consumer_tag = None
      self._url = amqp_url
      #new code
      self._dispatcher = dispatcher
    
  5. on_message使用者内部的方法中,我们调用self._dispatcher发送bodyAMQP消息

    def on_message(self, unused_channel, basic_deliver, properties, body):
      self._channel.basic_ack(basic_deliver.delivery_tag)
      try:
        # we need an '\n' at the each row Spark socketTextStream
        self._dispatcher(bytes(body.decode("utf-8")+'\n',"utf-8"))
      except Exception as e:
        raise
    
  6. 火花,把ssc.socketTextStream(HOST, int(PORT))HOSTPORT对应于我们的TCP套接字。Spark将管理连接

  7. 首先运行使用者,然后运行Spark应用程序

最后说明:

  • 尝试在其他计算机而不是Spark计算机上运行使用者
  • 任何超过10000的端口都可以。不要让内核打开一些随机端口
  • 平台:Linux Debian 7和8,以及Ubuntu 14.04和16.04
  • Pika版本0.10.0
  • Python版本3.5.2
  • Spark版本1.6.1、1.6.2和2.0.0

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

RabbitMq使用者未处理消息

来自分类Dev

如何使用Docker运行RabbitMQ使用者-NodeJS

来自分类Dev

在PHP上如何取消RabbitMQ中的使用者?

来自分类Dev

如何为ServiceStack RabbitMQ使用者创建多个线程?

来自分类Dev

RabbitMQ-如何为新使用者保存消息

来自分类Dev

使用nodeJS在RabbitMQ中删除使用者

来自分类Dev

RabbitMQ固定答复和使用者配置

来自分类Dev

PHP RabbitMQ使用者:预取1

来自分类Dev

RabbitMQ重试消息而不从使用者发布

来自分类Dev

Rabbitmq使用者获取不同路由密钥的消息

来自分类Dev

RabbitMQ使用者在收到MQTT消息时失败

来自分类Dev

闲置90秒后,RabbitMQ使用者连接消失

来自分类Dev

RabbitMQ使用者未收到该消息

来自分类Dev

PHP RabbitMQ使用者:预取1

来自分类Dev

RabbitMQ:如何在Python生产者和使用者之间发送Python字典?

来自分类Dev

在Java中实现LTI工具使用者

来自分类Dev

如何使用直接流在Kafka Spark Streaming中指定使用者组

来自分类Dev

如何在.Net中与不同类型的使用者一起使用RabbitMq消息?

来自分类Dev

如何在.Net中与不同类型的使用者一起使用RabbitMq消息?

来自分类Dev

如何设置RabbitMQ使用者以从非空队列中使用?

来自分类Dev

如何使用pyspark绘图?

来自分类Dev

如何使用Mulesoft实现Web服务使用者SOAP

来自分类Dev

如何实现Kafka使用者使用spring-cloud-stream来按需处理事件?

来自分类Dev

如何在SpringBoot:RabbitMQ中每个队列仅配置一个使用者?

来自分类Dev

RabbitMQ渠道闲置问题| 如何恢复未确认的AMQP消息| Javaclient使用者

来自分类Dev

如何保证RabbitMQ中具有多个使用者的队列的任务订单处理?

来自分类Dev

删除RabbitMQ使用者并在浏览器的RabbitMQ控制台中查看

来自分类Dev

Rabbitmq生产者(symfony 3)和使用者错误(NodeJs)

来自分类Dev

如何在Spring Boot中实现循环队列使用者

Related 相关文章

  1. 1

    RabbitMq使用者未处理消息

  2. 2

    如何使用Docker运行RabbitMQ使用者-NodeJS

  3. 3

    在PHP上如何取消RabbitMQ中的使用者?

  4. 4

    如何为ServiceStack RabbitMQ使用者创建多个线程?

  5. 5

    RabbitMQ-如何为新使用者保存消息

  6. 6

    使用nodeJS在RabbitMQ中删除使用者

  7. 7

    RabbitMQ固定答复和使用者配置

  8. 8

    PHP RabbitMQ使用者:预取1

  9. 9

    RabbitMQ重试消息而不从使用者发布

  10. 10

    Rabbitmq使用者获取不同路由密钥的消息

  11. 11

    RabbitMQ使用者在收到MQTT消息时失败

  12. 12

    闲置90秒后,RabbitMQ使用者连接消失

  13. 13

    RabbitMQ使用者未收到该消息

  14. 14

    PHP RabbitMQ使用者:预取1

  15. 15

    RabbitMQ:如何在Python生产者和使用者之间发送Python字典?

  16. 16

    在Java中实现LTI工具使用者

  17. 17

    如何使用直接流在Kafka Spark Streaming中指定使用者组

  18. 18

    如何在.Net中与不同类型的使用者一起使用RabbitMq消息?

  19. 19

    如何在.Net中与不同类型的使用者一起使用RabbitMq消息?

  20. 20

    如何设置RabbitMQ使用者以从非空队列中使用?

  21. 21

    如何使用pyspark绘图?

  22. 22

    如何使用Mulesoft实现Web服务使用者SOAP

  23. 23

    如何实现Kafka使用者使用spring-cloud-stream来按需处理事件?

  24. 24

    如何在SpringBoot:RabbitMQ中每个队列仅配置一个使用者?

  25. 25

    RabbitMQ渠道闲置问题| 如何恢复未确认的AMQP消息| Javaclient使用者

  26. 26

    如何保证RabbitMQ中具有多个使用者的队列的任务订单处理?

  27. 27

    删除RabbitMQ使用者并在浏览器的RabbitMQ控制台中查看

  28. 28

    Rabbitmq生产者(symfony 3)和使用者错误(NodeJs)

  29. 29

    如何在Spring Boot中实现循环队列使用者

热门标签

归档