此解决方案使用Spark Streaming中的pika异步使用者示例和socketTextStream
方法
.py
文件Consumer
课程在下面,if __name__ == '__main__':
我们需要使用HOST
和来打开一个套接字,该套接字与PORT
您与Spark Streaming的TCP连接相对应。我们必须将方法sendall
从套接字保存到变量中,然后将其传递给Consumer
类
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((HOST, PORT))
s.listen(1)
conn, addr = s.accept()
dispatcher = conn.sendall #assigning sendall to dispatcher variable
consumer = Consumer(dispatcher)
try:
consumer.run()
except Exception as e:
consumer.stop()
s.close()
修改__init__
Consumer中的方法以传递dispatcher
def __init__(self,dispatcher):
self._connection = None
self._channel = None
self._closing = False
self._consumer_tag = None
self._url = amqp_url
#new code
self._dispatcher = dispatcher
在on_message
使用者内部的方法中,我们调用self._dispatcher
发送body
AMQP消息
def on_message(self, unused_channel, basic_deliver, properties, body):
self._channel.basic_ack(basic_deliver.delivery_tag)
try:
# we need an '\n' at the each row Spark socketTextStream
self._dispatcher(bytes(body.decode("utf-8")+'\n',"utf-8"))
except Exception as e:
raise
火花,把ssc.socketTextStream(HOST, int(PORT))
与HOST
和PORT
对应于我们的TCP套接字。Spark将管理连接
首先运行使用者,然后运行Spark应用程序
最后说明:
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句