如何在后端微服务中接收实时消息以使用 Azure PubSub 服务进行处理

问题描述 投票:0回答:1

我的客户端(python、java)正在使用简单的 WebSocket 连接来发送消息。我想在后端 .NET 8 Web API 中接收这些消息并进一步处理它们,以将微调的数据发送到一些分布式缓存服务。

客户只想使用其语言中可用的纯 websocket 库,而不是 azure pub sub 库。

从文档中,我只了解到我可以使用后端仅将带有令牌的访问 URL 发送给客户端,而集线器仅用于处理事件(而不是消息)。

我不知道如何在后端服务本身中接收消息,但我想我需要在后端服务本身中使用“WebPubSubClient”来接收和处理消息。这似乎不正确,因为名称表明了客户。

怀疑

  • 为了接收消息,如何使用ServiceClient从后端服务建立WebSocket连接?
  • 如果不是,使用 Client SDK 在 Web api 服务中接收消息是否正确(可能在持续运行的托管服务中使用它)?

分享相关代码供参考

发件人:

WebPubSubClient pubSubClient = new(serverUri);
await pubSubClient.StartAsync();

//await pubSubClient.JoinGroupAsync(PubSubUtils.GroupName);
WebPubSubResult result = await pubSubClient.SendToGroupAsync(PubSubUtils.GroupName,
                    BinaryData.FromString("hello_from_dheeraj"),
                    WebPubSubDataType.Text);

接收者

    await pubSubClient.StartAsync();
    pubSubClient.GroupMessageReceived += eventArgs =>
    {
        Console.WriteLine($"Receive message: {eventArgs.Message.Data}");
        

return Task.CompletedTask;
};
azure asp.net-core-webapi azure-web-pubsub
1个回答
0
投票

为了接收消息,如何使用ServiceClient从后端服务建立WebSocket连接?

要使用

ServiceClient
从后端服务建立 WebSocket 连接,Azure Web PubSub
ServiceClient
主要设计用于服务器到服务通信,例如向客户端发送消息、管理组和生成客户端访问网址。它没有提供后端直接作为 WebSocket 客户端接收消息的内置方法。

如果您的后端需要充当 WebSocket 客户端来接收消息,则需要使用

WebPubSubServiceClient.GetClientAccessUri()
方法生成访问 URL,并使用
ClientWebSocket
等 WebSocket 库建立 WebSocket 连接。

var serviceClient = new WebPubSubServiceClient(connectionString, hubName); var accessUri = serviceClient.GetClientAccessUri(); 
using var clientWebSocket = new ClientWebSocket();

请参阅此 MSDOC 使用 Azure Web PubSub 中的子协议在 WebSocket 客户端之间发布和订阅消息 service

下面的代码是接收消息Azure Web PubSub:

using Azure.Messaging.WebPubSub;
using Microsoft.Extensions.Azure;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddAzureClients(clientBuilder =>
{
    clientBuilder.AddWebPubSubServiceClient(
        builder.Configuration["Azure:WebPubSub:ConnectionString"], "messageHub");
});

var app = builder.Build();

app.MapGet("/negotiate", async context =>
{
    var serviceClient = context.RequestServices.GetRequiredService<WebPubSubServiceClient>();
    var response = new
    {
        url = serviceClient.GetClientAccessUri(roles: new[] { "webpubsub.sendToGroup.messageHub", "webpubsub.joinLeaveGroup.messageHub" }).AbsoluteUri
    };
    await context.Response.WriteAsJsonAsync(response);
});

app.Run();

            const string connectionString = "Endpoint=https://your-webpubsub-service.webpubsub.azure.com;AccessKey=your-access-key;Version=1.0;";
            const string hubName = "messageHub";

            var serviceClient = new WebPubSubServiceClient(connectionString, hubName);
            var accessUri = serviceClient.GetClientAccessUri();

            using var clientWebSocket = new ClientWebSocket();
            clientWebSocket.Options.AddSubProtocol("json.webpubsub.azure.v1");

            Console.WriteLine("Connecting to Azure Web PubSub...");
            await clientWebSocket.ConnectAsync(new Uri(accessUri), CancellationToken.None);

            Console.WriteLine("Connected to Web PubSub.");
            await ReceiveMessagesAsync(clientWebSocket);
        }

        private static async Task ReceiveMessagesAsync(ClientWebSocket client)
        {
            var buffer = new byte[4096];
            while (client.State == WebSocketState.Open)
            {
                var result = await client.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
                if (result.MessageType == WebSocketMessageType.Text)
                {
                    var message = Encoding.UTF8.GetString(buffer, 0, result.Count);
                    Console.WriteLine($"Message received: {message}");

                    await ProcessMessageAsync(message);
                }
                else if (result.MessageType == WebSocketMessageType.Close)
                {
                    Console.WriteLine("WebSocket connection closed.");
                    break;
                }
            }
        }

        private static async Task ProcessMessageAsync(string message)
        {
            Console.WriteLine($"Processing message: {message}");
            
            await Task.CompletedTask;
   
<!DOCTYPE html>
<html>
<head>
  <title>WebSocket Stream</title>
  <style>
    #output {
      white-space: pre;
      font-family: monospace;
    }
  </style>
</head>
<body>
  <div id="output"></div>
  <script>
    (async function () {
      const res = await fetch('/negotiate');
      const { url } = await res.json();
      const ws = new WebSocket(url, 'json.webpubsub.azure.v1');
      
      ws.onopen = () => {
        console.log('WebSocket connected.');
        ws.send(JSON.stringify({ type: 'joinGroup', group: 'stream' }));
      };
   ....// Handle incoming message
        }
      };
    })();
  </script>
</body>
</html>

后端作为 WebSocket 客户端连接到 Azure Web PubSub 以接收和处理消息。

输出: Local

local

loacl

使用 Client SDK 在 Web api 服务中接收消息是否正确(可能在持续运行的托管服务中使用它)?

如果您需要接收消息,您的后端应该充当 WebSocket 客户端。这涉及使用通过

WebPubSubServiceClient.GetClientAccessUri()
生成的访问 URL 建立连接。 基于 WebSocket 的方法是实时消息处理的最佳解决方案。

© www.soinside.com 2019 - 2024. All rights reserved.