我正在尝试使用Python在Upbit中制作一个加密货币交易程序。
我想制作两个线程: 在第一个线程中,将使用Upbit-websockets获取实时加密货币。 在第二个线程中,我从第一个线程接收价格,如果价格高于/低于特定价格,我想出售/购买它。
我制作了一个示例代码,通过使用 websockets 带来实时的加密货币价格,如下所示:
import json
import asyncio
import nest_asyncio
from upbit.websocket import UpbitWebSocket
nest_asyncio.apply()
async def ticker(sock, payload):
async with sock as conn:
await conn.send(payload)
while True:
recv = await conn.recv()
data = recv.decode('utf8')
result = json.loads(data)
print(result)
sock = UpbitWebSocket()
currencies = ['KRW-BTC']
type_field = sock.generate_type_field(
type='ticker',
codes=currencies
)
payload = sock.generate_payload(
type_fields=[type_field]
)
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete( ticker(sock, payload) )
效果很好。 基于此代码,我尝试继续下一步。在真正的交易之前,我想在第二个线程中打印加密货币的价格,这是由第一个线程实现的。 我编写了如下代码,但它无法正常工作。
import json
import asyncio
import nest_asyncio
import threading
from upbit.websocket import UpbitWebSocket
import time
import tracemalloc
import queue
tracemalloc.start()
nest_asyncio.apply()
# Function to fetch real-time BTC price
async def ticker(sock, payload, price_queue):
async with sock as conn:
await conn.send(payload)
while True:
recv = await conn.recv()
data = recv.decode('utf8')
result = json.loads(data)
price_queue.put(result) # Put the price in the queue
# Thread to print the BTC price
def print_price_thread(price_queue):
while True:
price = price_queue.get() # Get the price from the queue
print(price)
sock = UpbitWebSocket()
currencies = ['KRW-BTC']
type_field = sock.generate_type_field(
type='ticker',
codes=currencies
)
payload = sock.generate_payload(
type_fields=[type_field]
)
price_queue = queue.Queue() # Create a queue to pass prices between threads
# Create and start the threads
price_fetcher = threading.Thread(target=ticker, args=(sock, payload, price_queue))
price_printer = threading.Thread(target=print_price_thread, args=(price_queue,))
# Start both threads
price_fetcher.start()
price_printer.start()
我认为使用线程和异步是相当棘手的。有什么办法可以让上面的代码工作吗?
这可能是因为
ticker
协程中的事件循环没有在创建它的线程中运行。要实现此目的,您应该使用 loop.run_in_executor
在正确的线程中运行协程。
这是代码的修改版本:
import json
import asyncio
import nest_asyncio
import threading
from upbit.websocket import UpbitWebSocket
import queue
nest_asyncio.apply()
# Function to fetch real-time BTC price
async def ticker(sock, payload, price_queue):
async with sock as conn:
await conn.send(payload)
while True:
recv = await conn.recv()
data = recv.decode('utf8')
result = json.loads(data)
price_queue.put(result) # Put the price in the queue
# Thread to print the BTC price
def print_price_thread(loop, sock, payload, price_queue):
asyncio.set_event_loop(loop)
loop.run_until_complete(ticker(sock, payload, price_queue))
sock = UpbitWebSocket()
currencies = ['KRW-BTC']
type_field = sock.generate_type_field(
type='ticker',
codes=currencies
)
payload = sock.generate_payload(
type_fields=[type_field]
)
price_queue = queue.Queue() # Create a queue to pass prices between threads
# Create and start the threads
event_loop = asyncio.new_event_loop()
price_fetcher = threading.Thread(target=event_loop.run_until_complete, args=(ticker(sock, payload, price_queue),))
price_printer = threading.Thread(target=print_price_thread, args=(event_loop, sock, payload, price_queue))
# Start both threads
price_fetcher.start()
price_printer.start()
此修改后的代码为线程中的异步任务创建一个新的事件循环,并使用
run_until_complete
在正确的线程中运行 ticker
协程。此外,print_price_thread
函数现在将事件循环作为参数。
您可以尝试看看它是否可以解决您所面临的问题!