我正在开发一个组件,需要处理实时馈送并将数据以相当快的方式广播给听众(大约 100 纳秒级的精度,如果我能做到的话,甚至会低于这个精度)目前我正在发起一个事件订阅者可以订阅我的代码。但是,因为在 C# 事件处理程序中运行在引发事件的同一线程上,所以引发事件的线程将被阻塞,直到所有订阅者完成处理事件。我无法控制订阅者的代码,因此他们可能在事件处理程序中执行任何耗时的操作,这可能会阻塞正在广播的线程。
我该怎么做才能将数据广播给其他订阅者,但仍然可以相当快地广播内容?
您似乎正在寻找任务。以下是我为我的工作编写的扩展方法,它可以异步调用事件,以便每个事件处理程序都在自己的线程上。我无法评论它的速度,因为这对我来说从来都不是一个要求。
更新
根据评论,我对其进行了调整,以便仅创建一个任务来呼叫所有订阅者
/// <summary>
/// Extension method to safely encapsulate asynchronous event calls with checks
/// </summary>
/// <param name="evnt">The event to call</param>
/// <param name="sender">The sender of the event</param>
/// <param name="args">The arguments for the event</param>
/// <param name="object">The state information that is passed to the callback method</param>
/// <remarks>
/// This method safely calls the each event handler attached to the event. This method uses <see cref="System.Threading.Tasks"/> to
/// asynchronously call invoke without any exception handling. As such, if any of the event handlers throw exceptions the application will
/// most likely crash when the task is collected. This is an explicit decision since it is really in the hands of the event handler
/// creators to make sure they handle issues that occur do to their code. There isn't really a way for the event raiser to know
/// what is going on.
/// </remarks>
[System.Diagnostics.DebuggerStepThrough]
public static void AsyncSafeInvoke( this EventHandler evnt, object sender, EventArgs args )
{
// Used to make a temporary copy of the event to avoid possibility of
// a race condition if the last subscriber unsubscribes
// immediately after the null check and before the event is raised.
EventHandler handler = evnt;
if (handler != null)
{
// Manually calling all event handlers so that we could capture and aggregate all the
// exceptions that are thrown by any of the event handlers attached to this event.
var invocationList = handler.GetInvocationList();
Task.Factory.StartNew(() =>
{
foreach (EventHandler h in invocationList)
{
// Explicitly not catching any exceptions. While there are several possibilities for handling these
// exceptions, such as a callback, the correct place to handle the exception is in the event handler.
h.Invoke(sender, args);
}
});
}
}
您可以在事件处理程序上使用这些简单的扩展方法:
public static void Raise<T>(this EventHandler<T> handler, object sender, T e) where T : EventArgs {
if (handler != null) handler(sender, e);
}
public static void Raise(this EventHandler handler, object sender, EventArgs e) {
if (handler != null) handler(sender, e);
}
public static void RaiseOnDifferentThread<T>(this EventHandler<T> handler, object sender, T e) where T : EventArgs {
if (handler != null) Task.Factory.StartNewOnDifferentThread(() => handler.Raise(sender, e));
}
public static void RaiseOnDifferentThread(this EventHandler handler, object sender, EventArgs e) {
if (handler != null) Task.Factory.StartNewOnDifferentThread(() => handler.Raise(sender, e));
}
public static Task StartNewOnDifferentThread(this TaskFactory taskFactory, Action action) {
return taskFactory.StartNew(action: action, cancellationToken: new CancellationToken());
}
用途:
public static Test() {
myEventHandler.RaiseOnDifferentThread(null, EventArgs.Empty);
}
cancellationToken
对于保证StartNew()
实际上使用不同的线程是必要的,如here所述。
我无法确定这是否能可靠地满足 100 纳秒的要求,但这里有一种替代方案,您可以为最终用户提供一种方法,为您提供一个您将填充的 ConcurrentQueue,并且他们可以在单独的线程上侦听。
class Program
{
static void Main(string[] args)
{
var multicaster = new QueueMulticaster<int>();
var listener1 = new Listener(); //Make a couple of listening Q objects.
listener1.Listen();
multicaster.Subscribe(listener1);
var listener2 = new Listener();
listener2.Listen();
multicaster.Subscribe(listener2);
multicaster.Broadcast(6); //Send a 6 to both concurrent Queues.
Console.ReadLine();
}
}
//The listeners would run on their own thread and poll the Q like crazy.
class Listener : IListenToStuff<int>
{
public ConcurrentQueue<int> StuffQueue { get; set; }
public void Listen()
{
StuffQueue = new ConcurrentQueue<int>();
var t = new Thread(ListenAggressively);
t.Start();
}
void ListenAggressively()
{
while (true)
{
int val;
if(StuffQueue.TryDequeue(out val))
Console.WriteLine(val);
}
}
}
//Simple class that allows you to subscribe a Queue to a broadcast event.
public class QueueMulticaster<T>
{
readonly List<IListenToStuff<T>> _subscribers = new List<IListenToStuff<T>>();
public void Subscribe(IListenToStuff<T> subscriber)
{
_subscribers.Add(subscriber);
}
public void Broadcast(T value)
{
foreach (var listenToStuff in _subscribers)
{
listenToStuff.StuffQueue.Enqueue(value);
}
}
}
public interface IListenToStuff<T>
{
ConcurrentQueue<T> StuffQueue { get; set; }
}
鉴于您无法阻止其他侦听器上的处理,这意味着多个线程。在侦听器上拥有专用的侦听线程似乎是一种合理的尝试方法,并且并发队列似乎是一种不错的交付机制。在此实现中,它只是不断轮询,但您可能可以使用线程信号来减少 CPU 负载,例如
AutoResetEvent
。
信号和共享内存非常快。您可以发送单独的信号来告诉应用程序从共享内存位置读取消息。 当然,如果您想要低延迟,信号仍然是您的应用程序必须在高优先级线程上使用的事件。我会在数据中包含一个时间标签,以便接收器可以补偿不可避免的延迟。