Python的asyncio同步工作

奥哈德·巴桑

我正在尝试利用Python的新asyncio库发送异步HTTP请求。我想timeout在发送每个请求之前等待几毫秒(该变量),但是,当然要异步发送所有请求,而不是在发送每个请求后都等待响应。

我正在执行以下操作:

@asyncio.coroutine
def handle_line(self, line, destination):
    print("Inside! line {} destination {}".format(line, destination))
    response = yield from aiohttp.request('POST', destination, data=line,
                               headers=tester.headers)
    print(response.status)
    return (yield from response.read())

@asyncio.coroutine
def send_data(self, filename, timeout):
    destination='foo'
    logging.log(logging.DEBUG, 'sending_data')
    with open(filename) as log_file:
        for line in log_file:
            try:
                json_event = json.loads(line)
            except ValueError as e:
                print("Error parsing json event")
            time.sleep(timeout)
            yield from asyncio.async(self.handle_line(json.dumps(json_event), destination))


loop=asyncio.get_event_loop().run_until_complete(send_data('foo.txt', 1))

我得到的输出(通过打印200个响应)看起来像这段代码正在同步运行。我究竟做错了什么?

损害

这里有几个问题:

  1. 您应该使用asyncio.sleep,而不是time.sleep,因为后者会阻塞事件循环。

  2. 调用yield from不应该使用asyncio.async(self.handle_line(...)),因为这将使脚本阻塞,直到self.handle_line协程完成为止,这意味着您最终不会同时执行任何操作。您处理每一行,等待处理完成,然后继续进行下一行。相反,您应该asyncio.async不等待而运行所有调用,将Task返回对象保存到列表中,然后asyncio.wait在启动所有对象后等待它们完成。

放在一起:

@asyncio.coroutine
def handle_line(self, line, destination):
    print("Inside! line {} destination {}".format(line, destination))
    response = yield from aiohttp.request('POST', destination, data=line,
                               headers=tester.headers)
    print(response.status)
    return (yield from response.read())

@asyncio.coroutine
def send_data(self, filename, timeout):
    destination='foo'
    logging.log(logging.DEBUG, 'sending_data')
    tasks = []
    with open(filename) as log_file:
        for line in log_file:
            try:
                json_event = json.loads(line)
            except ValueError as e:
                print("Error parsing json event")
            yield from asyncio.sleep(timeout)
            tasks.append(asyncio.async(
                 self.handle_line(json.dumps(json_event), destination))
    yield from asyncio.wait(tasks)


asyncio.get_event_loop().run_until_complete(send_data('foo.txt', 1))

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章