我在 Jersey Resource 类中实现了流输出。
@GET
@Path("xxxxx")
@Produces(BulkConstants.TEXT_XML_MEDIA_TYPE})
public Response getFile() {
FeedReturnStreamingOutput sout = new FeedReturnStreamingOutput();
response = Response.ok(sout).build();
return response;
}
class FeedReturnStreamingOutput implements StreamingOutput {
public FeedReturnStreamingOutput()
@Override
public void write(OutputStream outputStream) {
//write into Output Stream
}
}
问题是,即使在调用 FeedReturnStreamingOutput 之前从资源发回响应,Jersey 客户端也会等待,直到 FeedReturnStreamingOutput 执行完成。
客户代码:
Client client = Client.create();
ClientResponse response = webResource
//headers
.get(ClientResponse.class);
//The codes underneath executes after FeedReturnStreamingOutput is executed which undermines the necessity of streaming
OutputStream os = new FileOutputStream("c:\\test\\feedoutput5.txt");
System.out.println(new Date() + " : Reached point A");
if (response.getStatus() == 200) {
System.out.println(new Date() + " : Reached point B");
InputStream io = response.getEntityInputStream();
byte[] buff = new byte[1024000];
int count = 0;
while ((count = io.read(buff, 0, buff.length)) != -1) {
os.write(buff, 0, count);
}
os.close();
io.close();
} else {
System.out.println("Response code :" + response.getStatus());
}
System.out.println("Time taken -->> "+(System.currentTimeMillis()-startTime)+" ms");
问题在于 Jersey 用于缓冲实体以确定 Content-Length 标头的缓冲
OutputStream
。缓冲区的大小默认为 8 kb。如果需要,您可以禁用缓冲,或者只是使用属性更改缓冲区的大小
ServerProperties.OUTBOUND_CONTENT_LENGTH_BUFFER
一个整数值,定义用于缓冲服务器端响应实体的缓冲区大小,以确定其大小并设置 HTTP“Content-Length”标头的值。
如果实体大小超过配置的缓冲区大小,则缓冲将被取消,并且实体大小将无法确定。小于或等于零的值将完全禁用实体的缓冲。
此属性可在服务器端用于覆盖出站消息缓冲区大小值 - 默认值或使用“jersey.config.contentLength.buffer”全局属性设置的全局自定义值。
默认值为8192。
这是一个例子
@Path("streaming")
public class StreamingResource {
@GET
@Produces("application/octet-stream")
public Response getStream() {
return Response.ok(new FeedReturnStreamingOutput()).build();
}
public static class FeedReturnStreamingOutput implements StreamingOutput {
@Override
public void write(OutputStream output)
throws IOException, WebApplicationException {
try {
for (int i = 0; i < 10; i++) {
output.write(String.format("Hello %d\n", i).getBytes());
output.flush();
TimeUnit.MILLISECONDS.sleep(500);
}
} catch (InterruptedException e) { throw new RuntimeException(e); }
}
}
}
这是未设置属性的结果
这是将属性值设置为
0
后的结果
public class AppConfig extends ResourceConfig {
public AppConfig() {
...
property(ServerProperties.OUTBOUND_CONTENT_LENGTH_BUFFER, 0);
}
}
尝试在写入输出流或类似内容的每 X 个字节数中从方法
outputStream.flush()
调用 FeedReturnStreamingOutput.write(...)
。
我猜连接的缓冲区没有填充您返回的数据。因此,在 Jersey 调用
outputStream.close()
之前,该服务不会返回任何内容。
就我而言,我有一项传输数据的服务,并且我正在按照您的方式进行操作:通过返回
Response.ok(<instance of StreamingOutput>).build();
。
我的服务从数据库返回数据,并在将每一行写入输出流后调用
outputStream.flush()
。
我知道服务会传输数据,因为我可以看到客户端在服务完成发送整个结果之前就开始接收数据。
您的响应太小并且永远不会被分块,因此服务器会立即刷新整个请求。或者您遇到服务器端问题,您的 jax-rs 库正在等待刷新之前获得完整的流。
然而,这看起来更像是客户端问题。而且您似乎使用的是旧版本的球衣客户端。
还有
.get(ClientResponse.class)
看起来很可疑。
尝试使用现在的 JAX-RS 标准(至少在客户端):
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
Client client = ClientBuilder.newBuilder().build();
WebTarget target = client.target("http://localhost:8080/");
Response response = target.path("path/to/resource").request().get();
当类路径中有球衣客户端 2.17 时:
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-client</artifactId>
<version>2.17</version>
</dependency>
考虑使用
@Context
注入底层 HttpServletResponse 对象并直接写入/刷新其流。这似乎绕过了在应用程序级别配置有问题的 OUTBOUND_CONTENT_LENGTH_BUFFER
设置。
这是 Scala 的例子:
@POST
@Path("/internal/stream-test")
@ApiOperation(value = "Stream test")
def testStream(@Context response: HttpServletResponse): Unit = {
val stream = response.getOutputStream
Range(1, 10).foreach(i => {
val data = s"Hello $i".getBytes
stream.write(data)
stream.flush()
Thread.sleep(2000)
})
}
我也更喜欢使用 Jersey 的 ChunkedOutput 抽象来返回渐进式结果,因为 ChunkedOutput 强制您引入一个单独的线程来写入块,而不是在处理时仅添加到流中。此示例返回 204 而不是 200,希望这不会打扰您。