RabbitMQ + .NET C#。消息发布+消费的性能低下且变化多端

问题描述 投票:0回答:1

我正试图使用 RabbitMQ + .NET C# 来提高发布和消费消息的性能。

我在下面定义了一个相对 "大 "的 xml 文件(作为嵌入式资源)。

<?xml version="1.0" encoding="UTF-8"?>
<Document>
  <CstmrCdtTrfInitn>
    <GrpHdr>
      <MsgId>ABC/090928/CCT001</MsgId>
      <CreDtTm>2009-09-28T14:07:00</CreDtTm>
      <NbOfTxs>3</NbOfTxs>
      <CtrlSum>11500000</CtrlSum>
      <InitgPty>
        <Nm>ABC Corporation</Nm>
        <PstlAdr>
          <StrtNm>Times Square</StrtNm>
          <BldgNb>7</BldgNb>
          <PstCd>NY 10036</PstCd>
          <TwnNm>New York</TwnNm>
          <Ctry>US</Ctry>
        </PstlAdr>
      </InitgPty>
    </GrpHdr>
    <PmtInf>
      <PmtInfId>ABC/086</PmtInfId>
      <PmtMtd>TRF</PmtMtd>
      <BtchBookg>false</BtchBookg>
      <ReqdExctnDt>2009-09-29</ReqdExctnDt>
      <Dbtr>
        <Nm>ABC Corporation</Nm>
        <PstlAdr>
          <StrtNm>Times Square</StrtNm>
          <BldgNb>7</BldgNb>
          <PstCd>NY 10036</PstCd>
          <TwnNm>New York</TwnNm>
          <Ctry>US</Ctry>
        </PstlAdr>
      </Dbtr>
      <DbtrAcct>
        <Id>
          <Othr>
            <Id>00125574999</Id>
          </Othr>
        </Id>
      </DbtrAcct>
      <DbtrAgt>
        <FinInstnId>
          <BIC>BBBBUS33</BIC>
        </FinInstnId>
      </DbtrAgt>
      <CdtTrfTxInf>
        <PmtId>
          <InstrId>ABC/090928/CCT001/01</InstrId>
          <EndToEndId>ABC/4562/2009-09-08</EndToEndId>
        </PmtId>
        <Amt>
          <InstdAmt Ccy="JPY">10000000</InstdAmt>
        </Amt>
        <ChrgBr>SHAR</ChrgBr>
        <CdtrAgt>
          <FinInstnId>
            <BIC>AAAAGB2L</BIC>
          </FinInstnId>
        </CdtrAgt>
        <Cdtr>
          <Nm>DEF Electronics</Nm>
          <PstlAdr>
            <AdrLine>Corn Exchange 5th Floor</AdrLine>
            <AdrLine>Mark Lane 55</AdrLine>
            <AdrLine>EC3R7NE London</AdrLine>
            <AdrLine>GB</AdrLine>
          </PstlAdr>
        </Cdtr>
        <CdtrAcct>
          <Id>
            <Othr>
              <Id>23683707994125</Id>
            </Othr>
          </Id>
        </CdtrAcct>
        <Purp>
          <Cd>CINV</Cd>
        </Purp>
        <RmtInf>
          <Strd>
            <RfrdDocInf>
              <Nb>4562</Nb>
              <RltdDt>2009-09-08</RltdDt>
            </RfrdDocInf>
          </Strd>
        </RmtInf>
      </CdtTrfTxInf>
      <CdtTrfTxInf>
        <PmtId>
          <InstrId>ABC/090628/CCT001/2</InstrId>
          <EndToEndId>ABC/ABC-13679/2009-09-15</EndToEndId>
        </PmtId>
        <Amt>
          <InstdAmt Ccy="EUR">500000</InstdAmt>
        </Amt>
        <ChrgBr>CRED</ChrgBr>
        <CdtrAgt>
          <FinInstnId>
            <BIC>DDDDBEBB</BIC>
          </FinInstnId>
        </CdtrAgt>
        <Cdtr>
          <Nm>GHI Semiconductors</Nm>
          <PstlAdr>
            <StrtNm>Avenue Brugmann</StrtNm>
            <BldgNb>415</BldgNb>
            <PstCd>1180</PstCd>
            <TwnNm>Brussels</TwnNm>
          </PstlAdr>
        </Cdtr>
        <CdtrAcct>
          <Id>
            <IBAN>BE30001216371411</IBAN>
          </Id>
        </CdtrAcct>
        <InstrForCdtrAgt>
          <Cd>PHOB</Cd>
          <InstrInf>+32/2/2222222</InstrInf>
        </InstrForCdtrAgt>
        <Purp>
          <Cd>GDDS</Cd>
        </Purp>
        <RmtInf>
          <Strd>
            <RfrdDocInf>
              <Tp>
                <CdOrPrtry>
                  <Cd>CINV</Cd>
                </CdOrPrtry>
              </Tp>
              <Nb>ABC-13679</Nb>
              <RltdDt>2009-09-15</RltdDt>
            </RfrdDocInf>
          </Strd>
        </RmtInf>
      </CdtTrfTxInf>
      <CdtTrfTxInf>
        <PmtId>
          <InstrId>ABC/090928/CCT001/3</InstrId>
          <EndToEndId>ABC/987-AC/2009-09-27</EndToEndId>
        </PmtId>
        <Amt>
          <InstdAmt Ccy="USD">1000000</InstdAmt>
        </Amt>
        <ChrgBr>SHAR</ChrgBr>
        <CdtrAgt>
          <FinInstnId>
            <BIC>BBBBUS66</BIC>
          </FinInstnId>
        </CdtrAgt>
        <Cdtr>
          <Nm>ABC Corporation</Nm>
          <PstlAdr>
            <Dept>Treasury department</Dept>
            <StrtNm>Bush Street</StrtNm>
            <BldgNb>13</BldgNb>
            <PstCd>CA 94108</PstCd>
            <TwnNm>San Francisco</TwnNm>
            <Ctry>US</Ctry>
          </PstlAdr>
        </Cdtr>
        <CdtrAcct>
          <Id>
            <Othr>
              <Id>4895623</Id>
            </Othr>
          </Id>
        </CdtrAcct>
        <Purp>
          <Cd>INTC</Cd>
        </Purp>
        <RmtInf>
          <Strd>
            <RfrdDocInf>
              <Tp>
                <CdOrPrtry>
                  <Cd>CINV</Cd>
                </CdOrPrtry>
              </Tp>
              <Nb>987-AC</Nb>
              <RltdDt>2009-09-27</RltdDt>
            </RfrdDocInf>
          </Strd>
        </RmtInf>
      </CdtTrfTxInf>
    </PmtInf>
  </CstmrCdtTrfInitn>
</Document>

然后,我将上述文件发布到同一个队列中大约50万次,发布和消费发生在下面同一个.NET Core C#程序中。

.NET C#代码。

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Text;

using RabbitMQ.Client;
using RabbitMQ.Client.Events;


namespace CSharpPlayground
{
    public static class EmbeddedResource
    {
        public static string Load(string fileName)
        {
            var executingAssembly = Assembly.GetExecutingAssembly();
            var resource = $"{executingAssembly.ManifestModule.Name.Replace(".dll", string.Empty)}.{fileName}";
            using var stream = executingAssembly.GetManifestResourceStream(resource);
            using var streamReader = new StreamReader(stream!);
            return streamReader.ReadToEnd();
        }
    }

    public static class CheapExtensions
    {
        public static void ForEach<T>(this IEnumerable<T> source, Action<T> action)
        {
            foreach(var item in source)
            {
                action(item);
            }
        }
        public static void ForEach<T>(this IEnumerable<T> source) =>
            source.ForEach(item => {});
    }

    public static class Program
    {
        private static readonly string XmlStuff = EmbeddedResource.Load("XmlStuff.xml");

        public static void Main()
        {
            const string queueName = "hello";
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                DispatchConsumersAsync = true
            };
            using var connection = factory.CreateConnection();
            using var queueDeclareChannel = connection.CreateModel();

            queueDeclareChannel.QueueDeclare(
                queue: queueName,
                durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: null);

            using var consumerChannel = connection.CreateModel();
            var consumer = new AsyncEventingBasicConsumer(consumerChannel);
            consumer.Received += async (sender, eventArgs) =>
            {
                var receivedBody = eventArgs.Body;
                var receivedMessage = Encoding.UTF8.GetString(receivedBody.ToArray());
                //Console.WriteLine($"[x] Received stuff");
            };
            consumerChannel.BasicConsume(
                queue: queueName,
                autoAck: true,
                consumer: consumer);

            var messageToSend = XmlStuff;
            var bodyToSend = new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes(messageToSend));

            Enumerable
                .Repeat((messageToSend, bodyToSend), 500_000)
                .AsParallel()
                .Select((data, index) =>
                {
                    var (message, body) = data;
                    using var publishChannel = connection.CreateModel();
                    var basicProperties = publishChannel.CreateBasicProperties();
                    publishChannel.BasicPublish(
                        exchange: "",
                        routingKey: queueName,
                        mandatory: false,
                        basicProperties: basicProperties,
                        body: body);
                    //Console.WriteLine($"[x] Sent stuff"
                    return index;
                }).ForEach();

            Console.ReadKey();
        }
    }
}

我已经尽可能的去掉了".NET Core C#"程序中的 Console.WriteLine 调用来规避将某些东西推送到标准输出的瓶颈。

结果发现性能似乎相当低,我想知道我的性能(在消息速率方面)是否属于某种 "正常"。

RabbitMQ Admin - OverviewRabbitMQ Admin - Hello Queue

c# .net performance .net-core rabbitmq
1个回答
1
投票

我不是RabbitMQ专家,但我知道如何创建一个通道(CreateModel())每条消息的发布是RabbitMQ的反模式,会损害性能。这是因为每个通道的创建都需要网络往返于broker。这种方法也有可能使经纪人筋疲力尽。

不幸的是,许多入门的RabbitMQ博客都是以这种方式开始的,可能是因为它简化了问题。可以说,问题在于,一个通道并不是线程安全的。

如果你想在RabbitMQ的基础上写自己的消息框架,那么我会从一个单一的发布通道开始,使用锁来同步它。

如果你想建立一个真正的应用程序,避免重新发明轮子,以及许多头痛的问题,我会看看一些支持良好的开源框架,如EasyNetQ或MassTransit。

© www.soinside.com 2019 - 2024. All rights reserved.