我正试图使用 RabbitMQ + .NET C# 来提高发布和消费消息的性能。
我在下面定义了一个相对 "大 "的 xml 文件(作为嵌入式资源)。
<?xml version="1.0" encoding="UTF-8"?>
<Document>
<CstmrCdtTrfInitn>
<GrpHdr>
<MsgId>ABC/090928/CCT001</MsgId>
<CreDtTm>2009-09-28T14:07:00</CreDtTm>
<NbOfTxs>3</NbOfTxs>
<CtrlSum>11500000</CtrlSum>
<InitgPty>
<Nm>ABC Corporation</Nm>
<PstlAdr>
<StrtNm>Times Square</StrtNm>
<BldgNb>7</BldgNb>
<PstCd>NY 10036</PstCd>
<TwnNm>New York</TwnNm>
<Ctry>US</Ctry>
</PstlAdr>
</InitgPty>
</GrpHdr>
<PmtInf>
<PmtInfId>ABC/086</PmtInfId>
<PmtMtd>TRF</PmtMtd>
<BtchBookg>false</BtchBookg>
<ReqdExctnDt>2009-09-29</ReqdExctnDt>
<Dbtr>
<Nm>ABC Corporation</Nm>
<PstlAdr>
<StrtNm>Times Square</StrtNm>
<BldgNb>7</BldgNb>
<PstCd>NY 10036</PstCd>
<TwnNm>New York</TwnNm>
<Ctry>US</Ctry>
</PstlAdr>
</Dbtr>
<DbtrAcct>
<Id>
<Othr>
<Id>00125574999</Id>
</Othr>
</Id>
</DbtrAcct>
<DbtrAgt>
<FinInstnId>
<BIC>BBBBUS33</BIC>
</FinInstnId>
</DbtrAgt>
<CdtTrfTxInf>
<PmtId>
<InstrId>ABC/090928/CCT001/01</InstrId>
<EndToEndId>ABC/4562/2009-09-08</EndToEndId>
</PmtId>
<Amt>
<InstdAmt Ccy="JPY">10000000</InstdAmt>
</Amt>
<ChrgBr>SHAR</ChrgBr>
<CdtrAgt>
<FinInstnId>
<BIC>AAAAGB2L</BIC>
</FinInstnId>
</CdtrAgt>
<Cdtr>
<Nm>DEF Electronics</Nm>
<PstlAdr>
<AdrLine>Corn Exchange 5th Floor</AdrLine>
<AdrLine>Mark Lane 55</AdrLine>
<AdrLine>EC3R7NE London</AdrLine>
<AdrLine>GB</AdrLine>
</PstlAdr>
</Cdtr>
<CdtrAcct>
<Id>
<Othr>
<Id>23683707994125</Id>
</Othr>
</Id>
</CdtrAcct>
<Purp>
<Cd>CINV</Cd>
</Purp>
<RmtInf>
<Strd>
<RfrdDocInf>
<Nb>4562</Nb>
<RltdDt>2009-09-08</RltdDt>
</RfrdDocInf>
</Strd>
</RmtInf>
</CdtTrfTxInf>
<CdtTrfTxInf>
<PmtId>
<InstrId>ABC/090628/CCT001/2</InstrId>
<EndToEndId>ABC/ABC-13679/2009-09-15</EndToEndId>
</PmtId>
<Amt>
<InstdAmt Ccy="EUR">500000</InstdAmt>
</Amt>
<ChrgBr>CRED</ChrgBr>
<CdtrAgt>
<FinInstnId>
<BIC>DDDDBEBB</BIC>
</FinInstnId>
</CdtrAgt>
<Cdtr>
<Nm>GHI Semiconductors</Nm>
<PstlAdr>
<StrtNm>Avenue Brugmann</StrtNm>
<BldgNb>415</BldgNb>
<PstCd>1180</PstCd>
<TwnNm>Brussels</TwnNm>
</PstlAdr>
</Cdtr>
<CdtrAcct>
<Id>
<IBAN>BE30001216371411</IBAN>
</Id>
</CdtrAcct>
<InstrForCdtrAgt>
<Cd>PHOB</Cd>
<InstrInf>+32/2/2222222</InstrInf>
</InstrForCdtrAgt>
<Purp>
<Cd>GDDS</Cd>
</Purp>
<RmtInf>
<Strd>
<RfrdDocInf>
<Tp>
<CdOrPrtry>
<Cd>CINV</Cd>
</CdOrPrtry>
</Tp>
<Nb>ABC-13679</Nb>
<RltdDt>2009-09-15</RltdDt>
</RfrdDocInf>
</Strd>
</RmtInf>
</CdtTrfTxInf>
<CdtTrfTxInf>
<PmtId>
<InstrId>ABC/090928/CCT001/3</InstrId>
<EndToEndId>ABC/987-AC/2009-09-27</EndToEndId>
</PmtId>
<Amt>
<InstdAmt Ccy="USD">1000000</InstdAmt>
</Amt>
<ChrgBr>SHAR</ChrgBr>
<CdtrAgt>
<FinInstnId>
<BIC>BBBBUS66</BIC>
</FinInstnId>
</CdtrAgt>
<Cdtr>
<Nm>ABC Corporation</Nm>
<PstlAdr>
<Dept>Treasury department</Dept>
<StrtNm>Bush Street</StrtNm>
<BldgNb>13</BldgNb>
<PstCd>CA 94108</PstCd>
<TwnNm>San Francisco</TwnNm>
<Ctry>US</Ctry>
</PstlAdr>
</Cdtr>
<CdtrAcct>
<Id>
<Othr>
<Id>4895623</Id>
</Othr>
</Id>
</CdtrAcct>
<Purp>
<Cd>INTC</Cd>
</Purp>
<RmtInf>
<Strd>
<RfrdDocInf>
<Tp>
<CdOrPrtry>
<Cd>CINV</Cd>
</CdOrPrtry>
</Tp>
<Nb>987-AC</Nb>
<RltdDt>2009-09-27</RltdDt>
</RfrdDocInf>
</Strd>
</RmtInf>
</CdtTrfTxInf>
</PmtInf>
</CstmrCdtTrfInitn>
</Document>
然后,我将上述文件发布到同一个队列中大约50万次,发布和消费发生在下面同一个.NET Core C#程序中。
.NET C#代码。
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace CSharpPlayground
{
public static class EmbeddedResource
{
public static string Load(string fileName)
{
var executingAssembly = Assembly.GetExecutingAssembly();
var resource = $"{executingAssembly.ManifestModule.Name.Replace(".dll", string.Empty)}.{fileName}";
using var stream = executingAssembly.GetManifestResourceStream(resource);
using var streamReader = new StreamReader(stream!);
return streamReader.ReadToEnd();
}
}
public static class CheapExtensions
{
public static void ForEach<T>(this IEnumerable<T> source, Action<T> action)
{
foreach(var item in source)
{
action(item);
}
}
public static void ForEach<T>(this IEnumerable<T> source) =>
source.ForEach(item => {});
}
public static class Program
{
private static readonly string XmlStuff = EmbeddedResource.Load("XmlStuff.xml");
public static void Main()
{
const string queueName = "hello";
var factory = new ConnectionFactory
{
HostName = "localhost",
DispatchConsumersAsync = true
};
using var connection = factory.CreateConnection();
using var queueDeclareChannel = connection.CreateModel();
queueDeclareChannel.QueueDeclare(
queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
using var consumerChannel = connection.CreateModel();
var consumer = new AsyncEventingBasicConsumer(consumerChannel);
consumer.Received += async (sender, eventArgs) =>
{
var receivedBody = eventArgs.Body;
var receivedMessage = Encoding.UTF8.GetString(receivedBody.ToArray());
//Console.WriteLine($"[x] Received stuff");
};
consumerChannel.BasicConsume(
queue: queueName,
autoAck: true,
consumer: consumer);
var messageToSend = XmlStuff;
var bodyToSend = new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes(messageToSend));
Enumerable
.Repeat((messageToSend, bodyToSend), 500_000)
.AsParallel()
.Select((data, index) =>
{
var (message, body) = data;
using var publishChannel = connection.CreateModel();
var basicProperties = publishChannel.CreateBasicProperties();
publishChannel.BasicPublish(
exchange: "",
routingKey: queueName,
mandatory: false,
basicProperties: basicProperties,
body: body);
//Console.WriteLine($"[x] Sent stuff"
return index;
}).ForEach();
Console.ReadKey();
}
}
}
我已经尽可能的去掉了".NET Core C#"程序中的 Console.WriteLine
调用来规避将某些东西推送到标准输出的瓶颈。
结果发现性能似乎相当低,我想知道我的性能(在消息速率方面)是否属于某种 "正常"。
我不是RabbitMQ专家,但我知道如何创建一个通道(CreateModel()
)每条消息的发布是RabbitMQ的反模式,会损害性能。这是因为每个通道的创建都需要网络往返于broker。这种方法也有可能使经纪人筋疲力尽。
不幸的是,许多入门的RabbitMQ博客都是以这种方式开始的,可能是因为它简化了问题。可以说,问题在于,一个通道并不是线程安全的。
如果你想在RabbitMQ的基础上写自己的消息框架,那么我会从一个单一的发布通道开始,使用锁来同步它。
如果你想建立一个真正的应用程序,避免重新发明轮子,以及许多头痛的问题,我会看看一些支持良好的开源框架,如EasyNetQ或MassTransit。