在尝试了不同的方法以使其正常工作之后,我还没有设法理解 Rails 的神奇价值以提供 websocket 接口。已知客户端代码可以工作,因为它已经使用 NodeJS 和 FastAPI 接口进行了测试。据我了解,Rails 服务器读取/响应 WS 事件的正确方法是通过最后一个代码片段中实现的“send_msg”方法,但是应该如何调用该方法?似乎为了调用 send_msg 方法,我必须修改客户端代码以使用 Rails 提供的 JS 库(如here),这是不可能的。
正如标题所说,问题是如何创建一个简单的(但通用的?)WS 消息接收器/广播器?
websocket 接口应该如何工作
客户端如何尝试连接和请求时间(NodeJS) (连接 X 个客户端,广播时间 Y 次)
import async from "async";
import fetch from "node-fetch";
import fs from "fs";
import ws from "ws";
// client that only counts the received messages
function wsFunction(requestCount) {
return resolve => {
let limit = requestCount;
// construct all promises
const client = new ws("ws://localhost:3000/clock");
client.on('message', data => {
if(--limit == 0) {
client.close();
}
});
client.on('close', () => {
if(limit > 0) {
console.log("Socket closed prematurely... ", limit);
}
});
client.on('open', () => {
resolve(client); // client connected
});
const close = () => {
if(client.readyState !== ws.CLOSED && client.readyState !== ws.CLOSING) {
client.close();
}
}
}
}
/**
*
* @param {*} limit
* @returns operation time for limit messages, or -1 if connection is cut
*/
function attemptClockFetches(clientCount, retrieveCount) {
const clients = [];
for(let i = 0; i < clientCount - 1; ++i) {
clients.push(async () => new Promise(wsFunction(retrieveCount)));
}
// client that initiates the broadcast
const promise = new Promise(async resolve => {
const startTime = performance.now();
const sockets = await async.parallel(clients); // connect all clients
// create updater client
const client = new ws("ws://localhost:3000/clock");
// now update until limit is reached
client.on('close', () => {
if(retrieveCount > 0) {
console.log("Parent socket closed prematurely...");
}
});
client.on('message', () => {
if(--retrieveCount > 0) {
client.send("requestTime");
} else {
client.close();
const endTime = performance.now();
// close all sockets
for(let s of sockets) {
s.close();
}
resolve(endTime - startTime);
}
});
client.on('open', () => {
client.send("requestTime");
});
});
return promise;
}
async function doStressTest() {
await attemptClockFetches(10, 10);
}
const i = setInterval(() => {
// prevent node from killing process
}, 1000);
doStressTest().then(() => {
clearInterval(i);
});
工作的 NodeJS WebSocket 响应器的片段,本质上这是需要在 Rails 中复制的内容
const wsServer = new ws.WebSocketServer({ server: server, path: "/clock" });
wsServer.on('connection', socket => {
socket.on('error', err => {
console.error(err);
});
socket.on('message', data => {
if(data.toString() === "requestTime") {
// broadcast time on requestTime event to all clients
wsServer.clients.forEach(client => {
if(client.readyState === ws.OPEN) {
client.send((new Date()).getMilliseconds());
}
});
}
});
});
我目前实现了什么 我已将其添加到 routes.rb,假设它将所有 WS 事件定向到路径 /clock,即 ClocksChannel
Rails.application.routes.draw do
get '/users/:userId/cards', to: 'card#index'
# get '/clock', to: 'card#clock' <- ADDING THIS MAKES RAILS RESPOND IN HTTP EVEN THOUGH USING WebSocket PROTOCOL
mount ActionCable.server => '/clock'
end
实现这个频道,假设它订阅和取消订阅客户。至于调用send_msg,我不太清楚应该怎么调用
require "time"
class ClocksChannel < ApplicationCable::Channel
def subscribed
# stream_from "some_channel"
end
def unsubscribed
# Any cleanup needed when channel is unsubscribed
end
def send_msg(data)
if data == "requestTime"
ActionCable.server.broadcast "requestTime", message: (Time.now.to_f * 1000).to_i
end
end
end
主卡_controller.rb的内容
class CardController < ApplicationController
def index
# do some index things, not part of WS
end
# def clock
# render "Hello World"
# end
end
当服务器接收到具有给定设置的连接时,将给出以下输出:
Started GET "/clock" for 127.0.0.1 at 2023-03-09 18:44:37 +0200
Started GET "/clock/" [WebSocket] for 127.0.0.1 at 2023-03-09 18:44:37 +0200
Request origin not allowed:
Failed to upgrade to WebSocket (REQUEST_METHOD: GET, HTTP_CONNECTION: Upgrade, HTTP_UPGRADE: websocket)
Finished "/clock/" [WebSocket] for 127.0.0.1 at 2023-03-09 18:44:37 +0200
客户得到这个回应
Error: Unexpected server response: 404