python-asyncio 相关问题

此标记用于asyncio Python包,它提供了编写单线程并发代码的机制。 asyncio包提供从Python 3.4开始的异步I / O,事件循环,协同程序和任务。


<code>python3.5</code>

->文档中调用

回答 0 投票 0

我如何修复Asyncio.run()如果从运行事件循环中调用? 我正在尝试创建一个Python脚本,该脚本从羊驼贸易API中流出了一大堆数据。起初,我发现无法从运行事件循环中调用Asyncio.run()的错误...

import os import pandas as pd import numpy as np import asyncio import json from collections import deque from alpaca_trade_api.stream import Stream from alpaca_trade_api.rest import REST from dotenv import load_dotenv load_dotenv() ALPACA_API_KEY = os.getenv("ALPACA_API_KEY") ALPACA_SECRET_KEY = os.getenv("ALPACA_SECRET_KEY") BASE_URL = "https://paper-api.alpaca.markets/v2" SYMBOL = "AAPL" DATA_QUEUE = deque(maxlen=1000) rest_api = REST(ALPACA_API_KEY, ALPACA_SECRET_KEY, BASE_URL) PROCESSED_DATA_PATH = "data/processed/" os.makedirs(PROCESSED_DATA_PATH, exist_ok=True) COLUMNS = [ "timestamp", "close", "high", "low", "trade_count", "open", "volume", "vwap", "ema_9", "ema_21", "rsi", "macd", "macd_signal", "bollinger_h", "bollinger_l", "order_flow" ] def preprocess_live_data(df): nan_threshold = 0.2 * len(df.columns) df.dropna(thresh=len(df.columns) - nan_threshold, inplace=True) df.fillna(method='ffill', inplace=True) df.fillna(method='bfill', inplace=True) df.fillna(df.mean(), inplace=True) return df def save_live_data(df): file_path = os.path.join(PROCESSED_DATA_PATH, f"{SYMBOL}_live_data.csv") df.to_csv(file_path, mode='a', header=not os.path.exists(file_path), index=False) async def handle_trade_update(trade): global DATA_QUEUE try: data = { "timestamp": str(trade.timestamp), "close": trade.price, "high": trade.price, "low": trade.price, "trade_count": trade.size, "open": trade.price, "volume": trade.size, "vwap": trade.price, "ema_9": np.nan, "ema_21": np.nan, "rsi": np.nan, "macd": np.nan, "macd_signal": np.nan, "bollinger_h": np.nan, "bollinger_l": np.nan, "order_flow": trade.size * np.sign(trade.price) } df = pd.DataFrame([data], columns=COLUMNS) df = preprocess_live_data(df) if not df.empty: DATA_QUEUE.append(df) save_live_data(df) print("Live data processed and saved:", df) except Exception as e: print(f"Error processing trade update: {e}") async def start_stream(): stream = Stream(ALPACA_API_KEY, ALPACA_SECRET_KEY, base_url=BASE_URL, data_feed='iex') stream.subscribe_trades(handle_trade_update, SYMBOL) await stream.run() if __name__ == "__main__": try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(start_stream()) except KeyboardInterrupt: print("Stream interrupted by user.") except Exception as e: print(f"Error: {e}") finally: loop.close()

回答 1 投票 0


fastapi全局超时中间件不适用于同步路由

我是我的TimeOutMiddleware实现:

回答 0 投票 0





如何立即在Websocket停止请求时立即停止函数?

我想在WebSocket中停止时立即停止立即运行生成功能。 类图像世代(碱基): 异步def过程(self,websocket,提示): ...

回答 1 投票 0

对其进行了装饰。该功能在某些延期上等待。然后,我需要在其中运行一些

回答 1 投票 0






Aasyncio和Paramiko并发SSH连接

import paramiko import time import asyncio async def sshTest(ipaddress,deviceUsername,devicePassword,sshPort): #finalDict try: print("Performing SSH Connection to the device") client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(ipaddress, username=deviceUsername, password=devicePassword, port=sshPort, look_for_keys=False, allow_agent=False) print("Channel established") except Exception as e: print(e) async def main(): print("Session 1 \n") await sshTest('192.168.255.11','admin','admin','22') print("Session 2 \n") await sshTest('192.168.254.11','admin','admin','22') if __name__ == "__main__": start = time.time() asyncio.run(main()) end = time.time() print("The time of execution of above program is :", end-start)

回答 1 投票 0

dateTime.strptime()正在阻止asyncio.queue.get()

在以下代码dateTime.strptime()中,正在阻止asyncio.queue.get()操作 进口时间 导入异步 从DateTime Import DateTime 从函数引入部分导入 def写(队列): da ...

回答 1 投票 0


attributeError:模块'select'没有属性'select'errorasyncio

import aiohttp import asyncio import time async def download_file(url): print(f'started downloading{url}') connector = aiohttp.TCPConnector(limit=60) async with aiohttp.clientSession(connector) as session: async with session.get(url) as resp: content = await resp.read() print (f'Finished download{url}') return content async def write_file(n, content): filename = f'async_{n}.html' with open(filename,'wb') as f: print(f'started writing{filename}') f.write(content) print(f'Finished writing{filename}') async def scrape_task(n,url): content = await download_file(url) await write_file(n,content) async def main(): tasks = [] for n,url in enumerate(open('urls.txt').readlines()): tasks.append((scrape_task(n, url))) await asyncio.wait(tasks) if __name__ == '__main__': t=time.perf_counter() loop = asyncio.get_event_loop() loop.run_until_complete(main()) t2 = time.perf_counter() - t print(f'Total time taken: {t2:0.2f} seconds')

回答 0 投票 0

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.