在Python中使用带有线程的websockets(用于加密货币交易)

问题描述 投票:0回答:1

我正在尝试使用Python在Upbit中制作一个加密货币交易程序。

我想制作两个线程: 在第一个线程中,将使用Upbit-websockets获取实时加密货币。 在第二个线程中,我从第一个线程接收价格,如果价格高于/低于特定价格,我想出售/购买它。

我制作了一个示例代码,通过使用 websockets 带来实时的加密货币价格,如下所示:

import json
import asyncio
import nest_asyncio
from upbit.websocket import UpbitWebSocket
nest_asyncio.apply()


async def ticker(sock, payload):
    async with sock as conn:
        await conn.send(payload)
        while True:
            recv = await conn.recv()
            data = recv.decode('utf8')
            result = json.loads(data)
            print(result)


sock = UpbitWebSocket()

currencies = ['KRW-BTC']
type_field = sock.generate_type_field(
    type='ticker',
    codes=currencies
)
payload = sock.generate_payload(
    type_fields=[type_field]
)

event_loop = asyncio.get_event_loop()
event_loop.run_until_complete( ticker(sock, payload) )

效果很好。 基于此代码,我尝试继续下一步。在真正的交易之前,我想在第二个线程中打印加密货币的价格,这是由第一个线程实现的。 我编写了如下代码,但它无法正常工作。

import json
import asyncio
import nest_asyncio
import threading
from upbit.websocket import UpbitWebSocket
import time
import tracemalloc
import queue
tracemalloc.start()
nest_asyncio.apply()

# Function to fetch real-time BTC price
async def ticker(sock, payload, price_queue):
    async with sock as conn:
        await conn.send(payload)
        while True:
            recv = await conn.recv()
            data = recv.decode('utf8')
            result = json.loads(data)
            price_queue.put(result)  # Put the price in the queue

# Thread to print the BTC price
def print_price_thread(price_queue):
    while True:
        price = price_queue.get()  # Get the price from the queue
        print(price)

sock = UpbitWebSocket()
currencies = ['KRW-BTC']
type_field = sock.generate_type_field(
    type='ticker',
    codes=currencies
)
payload = sock.generate_payload(
    type_fields=[type_field]
)

price_queue = queue.Queue()  # Create a queue to pass prices between threads

# Create and start the threads
price_fetcher = threading.Thread(target=ticker, args=(sock, payload, price_queue))
price_printer = threading.Thread(target=print_price_thread, args=(price_queue,))

# Start both threads
price_fetcher.start()
price_printer.start()

我认为使用线程和异步是相当棘手的。有什么办法可以让上面的代码工作吗?

python websocket python-multithreading cryptocurrency
1个回答
0
投票

这可能是因为

ticker
协程中的事件循环没有在创建它的线程中运行。要实现此目的,您应该使用
loop.run_in_executor
在正确的线程中运行协程。

这是代码的修改版本:

import json
import asyncio
import nest_asyncio
import threading
from upbit.websocket import UpbitWebSocket
import queue

nest_asyncio.apply()

# Function to fetch real-time BTC price
async def ticker(sock, payload, price_queue):
    async with sock as conn:
        await conn.send(payload)
        while True:
            recv = await conn.recv()
            data = recv.decode('utf8')
            result = json.loads(data)
            price_queue.put(result)  # Put the price in the queue

# Thread to print the BTC price
def print_price_thread(loop, sock, payload, price_queue):
    asyncio.set_event_loop(loop)
    loop.run_until_complete(ticker(sock, payload, price_queue))

sock = UpbitWebSocket()
currencies = ['KRW-BTC']
type_field = sock.generate_type_field(
    type='ticker',
    codes=currencies
)
payload = sock.generate_payload(
    type_fields=[type_field]
)

price_queue = queue.Queue()  # Create a queue to pass prices between threads

# Create and start the threads
event_loop = asyncio.new_event_loop()
price_fetcher = threading.Thread(target=event_loop.run_until_complete, args=(ticker(sock, payload, price_queue),))
price_printer = threading.Thread(target=print_price_thread, args=(event_loop, sock, payload, price_queue))

# Start both threads
price_fetcher.start()
price_printer.start()

此修改后的代码为线程中的异步任务创建一个新的事件循环,并使用

run_until_complete
在正确的线程中运行
ticker
协程。此外,
print_price_thread
函数现在将事件循环作为参数。

您可以尝试看看它是否可以解决您所面临的问题!

© www.soinside.com 2019 - 2024. All rights reserved.