我需要从 binance api 接收一些数据。当硬币发生变化时,我需要更新 websocket 的 url。但我想花尽可能少的时间。为了避免主线程过载,我在单独的进程中创建websocket 这是代码,我是如何实现的:
from functools import partial
from threading import Thread
from multiprocessing import Process, Manager
import time
import json
import websocket
import requests
def on_message(_wsa, result, prices):
result = json.loads(result)
prices[result['s']]['best_bid'] = result['b']
prices[result['s']]['best_ask'] = result['a']
def start_stream_currency(curr_list,prices):
url = 'wss://fstream.binance.com/ws'
manager = Manager()
for curr in curr_list:
url += '/' + curr.lower() + '@bookTicker'
wsa = websocket.WebSocketApp(url, on_message=partial(on_message, prices=prices))
for curr in curr_list:
prices[curr] = manager.dict()
wsa.run_forever()
def check_price_symbols(prices): # This func check prices from websocket
while True:
for key, value in prices.copy().items():
print(f"{key} {value}")
#some calculations
time.sleep(1)
def main():
prices = Manager().dict()
th = Thread(target=check_price_symbols, args=(prices,))
th.start()
curr_list = ['BTCUSDT','ADAUSDT']
pr = Process(target=start_stream_currency, args=(curr_list,prices))
pr.start()
time.sleep(15)
print('Restart starting')
curr_list2 = ['ADAUSDT','XRPUSDT']
pr2 = Process(target=start_stream_currency, args=(curr_list2, prices))
pr2.start()
pr.terminate()
del prices['BTCUSDT']
pr2.join()
if __name__ =='__main__':
main()
当新进程已经运行时需要关闭旧进程。谁能告诉我如何最好地实现这一点?
无需重新启动进程,甚至无需重新启动 WebSocket 连接。
继续相同 WebSocket 连接但更改符号的更简洁方法是通过“实时订阅/取消订阅”方法,该方法在 Futures API 文档的“WebSocket 市场流”部分中进行了描述。