如何独立运行websockets

鲁斯兰·阿萨杜林(Ruslan Asadullin)

我尝试启动Binance websocket收集蜡烛数据。如果数据处理功能没有延迟,它将很好地工作。但是,当函数在处理一个行情指示器数据时出现一些暂停时,也会延迟其他行情指示器的响应。有人知道如何独立运行它们吗?

from binance.client import Client
from binance.websockets import BinanceSocketManager

api_key = ''
api_secret = ''
client = Client(api_key, api_secret)
bm = BinanceSocketManager(client, user_timeout=60)

def process(msg):
    print(msg['s'])
    if msg['s'] == 'ETHUSDT':
        time.sleep(5)

def socket_1():
     conn_key = bm.start_kline_socket('ETHUSDT', process, '1h')

def socket_2():
     conn_key = bm.start_kline_socket('BNBUSDT', process, '1h')

socket_1()
socket_2()

bm.start()

我试图让套接字asyncio按照@Mike Malyi的建议运行两个单独的任务,但并没有消除延迟:

import asyncio

def process(msg):
    asyncio.run(main(msg))

async def main(msg):
    if msg['s'] == 'ETHUSDT':
        task1 = asyncio.create_task(process_message(msg))
        await task1
    else:
        task2 = asyncio.create_task(process_message(msg))
        await task2

async def process_message(msg):
    print(msg['s'])
    if msg['s'] == 'ETHUSDT':
        await asyncio.sleep(5)

eth_key = bm.start_kline_socket('ETHUSDT', process, '1h')
bnb_key = bm.start_kline_socket('BNBUSDT', process, '1h')

bm.start()

我还尝试使用Queuein来使该函数独立运行threads,但是它没有帮助,一个函数仍会延迟另一个函数:

from queue import Queue

def consumer(in_q):
    while True:
        msg = in_q.get()
        process_message(msg)
    
def producer(out_q):
    eth = bm.start_kline_socket('ETHUSDT', out_q.put, '1h')
    bnb = bm.start_kline_socket('BNBUSDT', out_q.put, '1h')

def process_message(msg):
    if msg['s'] == 'ETHUSDT':
        time.sleep(5)
        print(f"{msg['s']} with delay, {time.strftime('%X')}")
    else:
        print(f"{msg['s']} {time.strftime('%X')}")


q = Queue()
t1 = Thread(target = consumer, args =(q, )) 
t2 = Thread(target = producer, args =(q, )) 
t1.start() 
t2.start() 

bm.start() 
迈克·玛利
from binance.client import Client
from binance.websockets import BinanceSocketManager
import _thread as thread
import time
import queue

api_key = ''
api_secret = ''
client = Client(api_key, api_secret)

def process_message(msg):
    if msg['s'] == 'ETHUSDT':
      print(f"{msg['s']} with delay, {time.strftime('%X')}")
      time.sleep(5)
      print('delay end')  
    else:
        print(f"{msg['s']} {time.strftime('%X')}")
  

def build_thread (symbol):
  print('start thread', symbol)
  q = queue.Queue()
  bm = BinanceSocketManager(client, user_timeout=60)
  conn_key = bm.start_kline_socket(symbol, q.put, '1h')
  bm.start()
  while(True):
    msg = q.get()
    process_message(msg)

thread.start_new_thread(build_thread, ('ETHUSDT', ))  
thread.start_new_thread(build_thread, ('BNBUSDT', ))  

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

如何独立洗牌

来自分类Dev

如何独立运行两个if语句?

来自分类Dev

如何独立于终端运行Electron应用程序?

来自分类Dev

Python平台如何独立?

来自分类Dev

如何独立浮动元素?

来自分类Dev

如何独立于基于软件包的安装运行PostgreSQL安装程序?

来自分类Dev

Python + Tkinter,如何独立于tk前端在后台运行?

来自分类Dev

道如何独立于视图?

来自分类Dev

程序如何独立于OS?

来自分类Dev

如何独立于IIS缓存数据

来自分类Dev

道如何独立于视图?

来自分类Dev

如何独立地对列进行排序?

来自分类Dev

Hibernate数据库如何独立?

来自分类Dev

鼠标(或USB设备)平台如何独立?

来自分类Dev

如何独立显示多个窗口?

来自分类Dev

Web服务如何独立于DAL实现?

来自分类Dev

如何独立于容器宽度移动背景渐变

来自分类Dev

如何独立证明(左右,居中)每个孩子?

来自分类Dev

如何独立于标准启用GCC的GNU扩展?

来自分类Dev

我如何独立设置kivy小部件位置平台?

来自分类Dev

WSO2 ESB如何独立处理肥皂操作

来自分类Dev

如何独立执行数据库操作?

来自分类Dev

如何独立于IP地址更改主机名?

来自分类Dev

如何独立于刷新率对 requestAnimationFrame() 计时

来自分类Dev

如何使轮播独立运行

来自分类Dev

如何在循环中如何独立显示/隐藏具有相同类名的div

来自分类Dev

如何独立设置背景图像位置和渐变位置?

来自分类Dev

如何独立于任何损失函数来实现Softmax导数?

来自分类Dev

2个jar库如何独立使用log4j?