我正在使用一个非托管库,该库规定对其API的所有调用都要在同一个线程上运行。我们想使用Reactive扩展的 EventLoopScheduler
来方便,因为我们将使用Observable来做其他事情。
我使用了一个类似于 Run
方法来执行调度器中的代码,而这些代码将始终在同一个线程上运行。当我在处理托管代码时,这就像预期的那样,所有的调用都会在事件循环管理的线程上运行,而在异步调用之前和之后都是主线程。
但是,当我调用一个PInvoke(代码示例中的那个只是一个例子,我的代码中并没有真正调用这个,但行为是一样的)时,这个线程确实是在事件循环线程上运行的,但之后的所有事情也是如此!我试着添加了一个新的线程。
我试过在事件循环线程上添加 ConfigureAwait(true)
(和 false
),但这并没有改变什么。我真的被这个行为搞糊涂了,为什么调用PInvoke会改变线程在 await !!之后继续进行?
下面是重现的代码。
[DllImport("user32.dll", CharSet = CharSet.Unicode, SetLastError = true)]
private static extern int MessageBox(IntPtr hWnd, string lpText, string lpCaption, uint uType);
public static Task Run(Action action, IScheduler scheduler)
{
return Observable.Start(action, scheduler).SingleAsync().ToTask();
}
public static string ThreadInfo() =>
$"\"{Thread.CurrentThread.Name}\" ({Thread.CurrentThread.ManagedThreadId})";
private static async Task Main(string[] args)
{
var scheduler = new EventLoopScheduler();
Console.WriteLine($"Before managed call on thread {ThreadInfo()}");
await Run(() => Console.WriteLine($"Managed call on thread {ThreadInfo()}"), scheduler);
Console.WriteLine($"After managed call on thread {ThreadInfo()}");
Console.WriteLine($"Before PInvoke on thread {ThreadInfo()}");
await Run(() => MessageBox(IntPtr.Zero, $"Running on thread {ThreadInfo()}", "Attention", 0), scheduler);
Console.WriteLine($"After PInvoke on thread {ThreadInfo()}");
}
执行的结果是这样的
Before managed call on thread "" (1)
Managed call on thread "Event Loop 1" (6)
After managed call on thread "" (1)
Before PInvoke on thread "" (1)
Message box displayed with text: Running on thread "Event Loop 1" (6)
After PInvoke on thread "Event Loop 1" (6)
我期望的是
Before managed call on thread "" (1)
Managed call on thread "Event Loop 1" (6)
After managed call on thread "" (1)
Before PInvoke on thread "" (1)
Message box displayed with text: Running on thread "Event Loop 1" (6)
After PInvoke on thread "" (1)
A Task
或承诺只是一个回调的抽象。而asyncawait只是任务的语法糖。
既然是回调的抽象。await
并没有阻挡住线程.为什么要阻挡?样子 像被挡住了一样?那是因为 await
将你的代码重写成一个状态机,当被等待的任务完成时,它就会通过它的状态前进。
大致是这样重写的。
switch (state)
{
case 0:
Console.WriteLine($"Before managed call on thread {ThreadInfo()}");
Await(Run(() => Console.WriteLine($"Managed call on thread {ThreadInfo()}"), scheduler));
return;
case 1:
Console.WriteLine($"After managed call on thread {ThreadInfo()}");
Console.WriteLine($"Before PInvoke on thread {ThreadInfo()}");
Await(Run(() => MessageBox(IntPtr.Zero, $"Running on thread {ThreadInfo()}", "Attention", 0), scheduler));
return;
case 2:
Console.WriteLine($"After PInvoke on thread {ThreadInfo()}");
return;
}
实际的重写使用 goto
而非 switch
,但概念是一样的。所以当一个任务完成时,它在同一个线程上下文中调用这个state-machine,state += 1 -。只有当你使用任务调度器时,你才会看到任务池线程。
为什么你会看到这种特殊行为的解释。
After managed call on thread "" (1)
是相当复杂的 它与一个预定的thunk是否立即完成有关。如果你添加了一个 Thread.Sleep
在第一个托管调用中,你会注意到继续运行在事件循环线程上。
这是由于调度优化倾向于只在事件循环线程上排队等候 如果正在运行. 当你打电话 ToTask()
你使用的是默认的调度器,也就是当前线程调度器。
当前线程调度器的工作原理是这样的。
Free? 立即运行。
忙吗,排队工作。
立即运行的行为就是为什么你看到主线程上运行的日志。如果你只是在主线程上添加一个
var scheduler = new EventLoopScheduler();
scheduler.Schedule(() => Thread.Sleep(1000));
到最开始的时候,你让事件循环变得繁忙,导致所有的东西都进入了队列,所以你就会看到所有的东西都记录在事件循环线程中。 所以这与PInvoke无关。
要说明的是,这不是关于调度器被指定为观察,而是订阅。当你把Observable转换为其他抽象,如Tasks、Enumerables、Blocking Joins等,一些内部的复杂性可能会泄露出来。