等待最后一个元素时阻塞通量

问题描述 投票:0回答:1

我想通过rsocket连接两个应用程序。一种是用GO编写的,另一种是用Kotlin编写的。我想实现客户端发送数据流和服务器发送确认响应的连接。

问题在于等待所有元素,如果服务器没有BlockOnLast(ctx),则读取整个流,但是在所有条目到达之前发送响应。如果添加了BlockOnLast(ctx),则服务器(GoLang)被卡住。

我也在Kotlin编写了客户,在这种情况下,整个交流工作都很好。

enyone可能有帮助吗?

GO服务器:

package main

import (
"context"
"github.com/golang/protobuf/proto"
"github.com/rsocket/rsocket-go"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx"
"github.com/rsocket/rsocket-go/rx/flux"
"rsocket-go-rpc-test/proto"
)

func main() {
addr := "tcp://127.0.0.1:8081"
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := rsocket.Receive().
    Fragment(1024).
    Resume().
    Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
        return rsocket.NewAbstractSocket(
            rsocket.RequestChannel(func(payloads rx.Publisher) flux.Flux {
                println("START")

                payloads.(flux.Flux).
                    DoOnNext(func(input payload.Payload) {
                        chunk := &pl_dwojciechowski_proto.Chunk{}
                        proto.Unmarshal(input.Data(), chunk)
                        println(string(chunk.Content))
                    }).BlockLast(ctx)

                return flux.Create(func(i context.Context, sink flux.Sink) {
                    status, _ := proto.Marshal(&pl_dwojciechowski_proto.UploadStatus{
                        Message: "OK",
                        Code:    0,
                    })

                    sink.Next(payload.New(status, make([]byte, 1)))
                    sink.Complete()
                    println("SENT")
                })
            }),
        ), nil
    }).
    Transport(addr).
    Serve(ctx)
panic(err)

}

Kotlin客户:

private fun clientCall() {
val rSocket = RSocketFactory.connect().transport(TcpClientTransport.create(8081)).start().block()
val client = FileServiceClient(rSocket)

val requests: Flux<Chunk> = Flux.range(1, 10)
    .map { i: Int -> "sending -> $i" }
    .map<Chunk> {
        Chunk.newBuilder()
            .setContent(ByteString.copyFrom(it.toByteArray())).build()
    }

val response = client.send(requests).block() ?: throw Exception("")
rSocket.dispose()
System.out.println(response.message)

}

和用Kotlin编写的GO等效:

    val serviceServer = FileServiceServer(DefaultService(), Optional.empty(), Optional.empty())
val closeableChannel = RSocketFactory.receive()
    .acceptor { setup: ConnectionSetupPayload?, sendingSocket: RSocket? ->
        Mono.just(
            RequestHandlingRSocket(serviceServer)
        )
    }
    .transport(TcpServerTransport.create(8081))
    .start()
    .block()
    closeableChannel.onClose().block()

class DefaultService : FileService {
override fun send(messages: Publisher<Service.Chunk>?, metadata: ByteBuf?): Mono<Service.UploadStatus> {
    return Flux.from(messages)
        .windowTimeout(10, Duration.ofSeconds(500))
        .flatMap(Function.identity())
        .doOnNext { println(it.content.toStringUtf8()) }
        .then(Mono.just(Service.UploadStatus.newBuilder().setCode(Service.UploadStatusCode.Ok).setMessage("test").build()))
}
}

服务器输出:

START
sending -> 1
java go kotlin rpc rsocket
1个回答
0
投票
package main import ( "context" "github.com/golang/protobuf/proto" "github.com/rsocket/rsocket-go" "github.com/rsocket/rsocket-go/payload" "github.com/rsocket/rsocket-go/rx" "github.com/rsocket/rsocket-go/rx/flux" "rsocket-go-rpc-test/proto" ) type TestService struct { totals int pl_dwojciechowski_proto.FileService } var statusOK = &pl_dwojciechowski_proto.UploadStatus{ Message: "code", Code: pl_dwojciechowski_proto.UploadStatusCode_Ok, } var statusErr = &pl_dwojciechowski_proto.UploadStatus{ Message: "code", Code: pl_dwojciechowski_proto.UploadStatusCode_Failed, } func main() { addr := "tcp://127.0.0.1:8081" ctx, cancel := context.WithCancel(context.Background()) defer cancel() err := rsocket.Receive(). Fragment(1024). Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) { return rsocket.NewAbstractSocket( rsocket.RequestChannel(func(msgs rx.Publisher) flux.Flux { dataReceivedChan := make(chan bool, 1) toChan, _ := flux.Clone(msgs). DoOnError(func(e error) { dataReceivedChan <- false }). DoOnComplete(func() { dataReceivedChan <- true }). ToChan(ctx, 1) fluxResponse := flux.Create(func(ctx context.Context, s flux.Sink) { gluedContent := make([]byte, 1024) for c := range toChan { chunk := pl_dwojciechowski_proto.Chunk{} _ = chunk.XXX_Unmarshal(c.Data()) gluedContent = append(gluedContent, chunk.Content...) } if <-dataReceivedChan { marshal, _ := proto.Marshal(statusOK) s.Next(payload.New(marshal, nil)) s.Complete() } else { marshal, _ := proto.Marshal(statusErr) s.Next(payload.New(marshal, nil)) s.Complete() } }) return fluxResponse }), ), nil }). Transport(addr). Serve(ctx) panic(err) }
© www.soinside.com 2019 - 2024. All rights reserved.