带有 aiozmq 流的简单 PUB/SUB

布克斯

我正在尝试使用 aiozmq 流(由于某些原因我不想使用 aiozmq rpc)来制作一个简单的 PUB/SUB,但没有成功:

发布文件

# coding: utf-8
import asyncio
import time

import aiozmq
import zmq


async def do():
    stream = await aiozmq.stream.create_zmq_stream(
        zmq_type=zmq.PUB,
        bind='tcp://127.0.0.1:5556',
    )
    while True:
        await asyncio.sleep(1)
        msg = [str(time.time()).encode()]
        print('write ', msg)
        stream.write(msg)

loop = asyncio.get_event_loop()
loop.run_until_complete(do())

子.py

# coding: utf-8
import asyncio

import aiozmq
import zmq


async def do():
    stream = await aiozmq.stream.create_zmq_stream(
        zmq_type=zmq.SUB,
        connect='tcp://127.0.0.1:5556',
    )
    while True:
        print('wait ...')
        msg = await stream.read()
        print('received ', msg)

loop = asyncio.get_event_loop()
loop.run_until_complete(do())

执行 pub.py 时:

python pub.py      
write  [b'1534927086.914483']
write  [b'1534927087.9154818']
write  [b'1534927088.9164672']

然后执行sub.py:

python sub.py
wait ...

我错过了什么?

布克斯

只是错过了sub.py. 有一个工作 sub.py:

# coding: utf-8
import asyncio

import aiozmq
import zmq


async def do():
    stream = await aiozmq.stream.create_zmq_stream(
        zmq_type=zmq.SUB,
        connect='tcp://127.0.0.1:5556',
    )
    stream.transport.subscribe(b'')

    while True:
        print('wait ...')
        msg = await stream.read()
        print('received ', msg)

loop = asyncio.get_event_loop()
loop.run_until_complete(do())

谁生产:

python sub.py 
wait ...
received  [b'1534927504.0462704']
wait ...
received  [b'1534927505.0478334']

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章