我的python程序包含以下websocket类和函数。客户端 JavaScript (index.js) 每 60 秒触发一次 fetch_price_data() 函数。
class WebSocketClient:
def __init__(self, url="ws://localhost:8081"):
self.url = url
self.ws = None
def connect(self):
"""Establish connection to the WebSocket server and close immediately."""
try:
self.ws = websocket.WebSocket()
self.ws.connect(self.url)
print(f"Connected to WebSocket server at {self.url}")
self.ws.close() # Close the connection right after establishing it
except Exception as e:
print(f"Error connecting to WebSocket server: {e}")
self.ws = None
def send_data_to_websocket(self, df):
"""Send data to the WebSocket server."""
try:
self.ws = websocket.WebSocket()
self.ws.connect(self.url)
print("Sending data to WebSocket...")
json_data = df.to_json(orient="records")
# Create a structured message with type and payload
message = {
'type': 'price_data',
'payload': json.loads(json_data) # Convert JSON string to a Python object
}
# Send the structured message as a JSON string
self.ws.send(json.dumps(message)) # Serialize the message back to JSON
print("Data sent:", message)
self.ws.close()
except Exception as e:
print(f"Failed to send data: {e}")
@app.route('/fetch-price-data', methods=['GET'])
def fetch_price_data():
print('Fetching price data.............................')
df = get_price_data()
ws_client = WebSocketClient()
ws_client.send_data_to_websocket(df)
return jsonify({"status": "Data sent to WebSocket server"}), 200
我遇到的问题是 fetch_price_data() 中的 return 语句(Flask 需要)导致消息发送到 websocket 服务器,这是我没想到的。我知道这一点是因为 server.js 文件给出了以下输出:
Received a message without a type. Message will not be broadcasted.Parsed message: { status: 'Data sent to WebSocket server' }
server.js文件中相关代码:.
const express = require('express');
const WebSocket = require('ws');
const path = require('path');
const app = express();
const port = 8080; // For serving the HTML page
// Serve static HTML file
app.use(express.static(path.join(__dirname, 'public')));
// WebSocket server on port 8081
const wss = new WebSocket.Server({ port: 8081 });
// Broadcast to all WebSocket clients
wss.broadcast = function broadcast(data) {
wss.clients.forEach(function each(client) {
if (client.readyState === WebSocket.OPEN) {
client.send(data);
}
});
};
// Listen for incoming WebSocket messages from Python
wss.on('connection', function connection(ws) {
console.log('WebSocket connection established');
ws.on('message', function incoming(data) {
const jsonData = data.toString(); // Convert the buffer to string
console.log('Received data from Python:', jsonData);
try {
const message = JSON.parse(jsonData); // Parse incoming message
// Log parsed message structure
console.log('Parsed message:', message)
// Check the message type before broadcasting
if (message.type) {
// Broadcast the structured message to all connected clients
wss.broadcast(jsonData); // Send as is if it's already structured
} else {
console.warn('Received a message without a type. Message will not be broadcasted.');
console.log('Message with warning: ', message)
}
} catch (error) {
console.error('Error parsing incoming message:', error);
}
});
希望有人能够解释为什么 fetch_price_data() 函数中的 return 语句向 websocket 服务器发送消息以及如何防止这种情况。我以为它会被发送回 HTTP 客户端。
我想知道这是否是 Flask 与 websocket 客户端结合的有效使用。
希望有人能够解释为什么 fetch_price_data() 函数中的 return 语句向 websocket 服务器发送消息以及如何防止这种情况。
@app.route('/fetch-price-data', methods=['GET'])
def fetch_price_data():
print('Fetching price data.............................')
df = get_price_data()
ws_client = WebSocketClient()
ws_client.send_data_to_websocket(df)
return jsonify({"status": "Data sent to WebSocket server"}), 200
分解
fetch_price_data
函数:
df = get_price_data()
在这里,您使用
df
的结果初始化 fetch_price_data
(对其自身的递归调用)。所以,现在:
df = {"status": "Data sent to WebSocket server"}
然后,我们创建
WebSocketClient
类的实例并调用其方法 send_data_to_websocket
,传递 df
,我们为其分配了 {"status": "Data sent to WebSocket server"}
的值。
ws_client = WebSocketClient()
ws_client.send_data_to_websocket(df)
send_data_to_websocket
获取值 {"status": "Data sent to WebSocket server"}
,检查键 type
,发现它是 undefined
,并发出警告 "Received a message without a type. Message will not be broadcasted."
。
看起来您正在尝试在某处使用 Pandas 数据框,也许您已经多次定义了
fetch_price_data
函数?
您尝试使用 WebSockets 的方式不正确。打开连接后不应立即关闭连接,这违背了 WebSocket 的目的。您不需要从客户端(JS)轮询服务器以获取更新信息,这就是 WebSockets 的优点:建立初始连接后,服务器可以向客户端发送数据,而无需客户端发起连接。
我建议阅读更多有关 WebSocket 工作原理以及如何在 Flask 中实现它们的内容。就我个人而言,我喜欢 Flask-Sock 库。