我是apache-flink的新手,我需要处理来自akka本地scoket的一些数据,这些数据流式传输到'ws:// localhost:9000 / ws'
在flink API中,我只能找到一个名为'socketTextStream'的函数,它接受一个主机名,端口和分隔符
例如:
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
如何将套接字指定为'ws:// localhost:9000 / ws'?
问题是socketTextStream
内部使用常规套接字,即java.net.Socket
与指定地址连接。但假设从您的描述中的地址,您正在处理WebSockets。您不能使用常规套接字从WebSockets读取数据。目前,Flink没有用于从WebSockets AFAIK创建数据流的API。获得你想要获得的东西的唯一方法是编写你自己的SourceFunction
,它将在内部使用javax.websocket-api
创建连接并从你的服务器读取数据。