我尝试启动Binance websocket收集蜡烛数据。如果数据处理功能没有延迟,它将很好地工作。但是,当函数在处理一个行情指示器数据时出现一些暂停时,也会延迟其他行情指示器的响应。有人知道如何独立运行它们吗?
from binance.client import Client
from binance.websockets import BinanceSocketManager
api_key = ''
api_secret = ''
client = Client(api_key, api_secret)
bm = BinanceSocketManager(client, user_timeout=60)
def process(msg):
print(msg['s'])
if msg['s'] == 'ETHUSDT':
time.sleep(5)
def socket_1():
conn_key = bm.start_kline_socket('ETHUSDT', process, '1h')
def socket_2():
conn_key = bm.start_kline_socket('BNBUSDT', process, '1h')
socket_1()
socket_2()
bm.start()
我试图让套接字asyncio
按照@Mike Malyi的建议运行两个单独的任务,但并没有消除延迟:
import asyncio
def process(msg):
asyncio.run(main(msg))
async def main(msg):
if msg['s'] == 'ETHUSDT':
task1 = asyncio.create_task(process_message(msg))
await task1
else:
task2 = asyncio.create_task(process_message(msg))
await task2
async def process_message(msg):
print(msg['s'])
if msg['s'] == 'ETHUSDT':
await asyncio.sleep(5)
eth_key = bm.start_kline_socket('ETHUSDT', process, '1h')
bnb_key = bm.start_kline_socket('BNBUSDT', process, '1h')
bm.start()
我还尝试使用Queue
in来使该函数独立运行threads
,但是它没有帮助,一个函数仍会延迟另一个函数:
from queue import Queue
def consumer(in_q):
while True:
msg = in_q.get()
process_message(msg)
def producer(out_q):
eth = bm.start_kline_socket('ETHUSDT', out_q.put, '1h')
bnb = bm.start_kline_socket('BNBUSDT', out_q.put, '1h')
def process_message(msg):
if msg['s'] == 'ETHUSDT':
time.sleep(5)
print(f"{msg['s']} with delay, {time.strftime('%X')}")
else:
print(f"{msg['s']} {time.strftime('%X')}")
q = Queue()
t1 = Thread(target = consumer, args =(q, ))
t2 = Thread(target = producer, args =(q, ))
t1.start()
t2.start()
bm.start()
from binance.client import Client
from binance.websockets import BinanceSocketManager
import _thread as thread
import time
import queue
api_key = ''
api_secret = ''
client = Client(api_key, api_secret)
def process_message(msg):
if msg['s'] == 'ETHUSDT':
print(f"{msg['s']} with delay, {time.strftime('%X')}")
time.sleep(5)
print('delay end')
else:
print(f"{msg['s']} {time.strftime('%X')}")
def build_thread (symbol):
print('start thread', symbol)
q = queue.Queue()
bm = BinanceSocketManager(client, user_timeout=60)
conn_key = bm.start_kline_socket(symbol, q.put, '1h')
bm.start()
while(True):
msg = q.get()
process_message(msg)
thread.start_new_thread(build_thread, ('ETHUSDT', ))
thread.start_new_thread(build_thread, ('BNBUSDT', ))
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句