我正在编写一个控制台应用程序,它扫描目录中的文件并将其中的数据上传到数据库。这些文件是专有类型,需要特定的库,而该库要求程序在 STA 线程上运行。当我调用函数以
OnCreated
FileSystemWatcher
方法上传文件时,我收到一条关于需要在 STA 线程中运行的错误警告。当我使用直接文件路径直接在 Main() 函数中调用上传函数时,这不会显示。
我对此做了一些研究,发现
FileSystemWatcher
在单独的线程中调用其事件。如果您使用 Winforms 或 WPF,您似乎可以将 SynchronizingObject
属性设置为您的 UI 元素,它将在主 UI 线程上运行。但是,我找不到任何有关如何将其应用于控制台应用程序的信息。所以基本上我想知道是否有任何方法可以让 FileSystemWatcher
在主线程上运行,或者让辅助线程运行我在其中创建的函数,作为 STA 线程运行。
我没有这样做过,而且我也没有太多使用 COM,所以对此持保留态度。
有一个 Thread.SetApartment 方法。据我所知,这需要在线程启动之前调用,因此您不能使用它来更改 FileSystemWatcher 将在其上引发事件的线程池线程的单元状态。您可能需要显式创建一个新线程并在其上设置单元状态。您可能还需要使用此线程来调用此第三方库。 但是你可以创建一个简单的消息队列。让新创建的 STA 线程在
BlockingCollection.GetConsumingEnumerable() 上运行 foreach 循环,并使用您想要在循环内运行的任何代码,并让 fileSystemWatcher 在发生任何更改时将对象添加到此集合中。您可以使用 Action
作为集合的对象类型以使线程运行任意代码,或者如果您只需要处理文件事件,则可以使用
FileSystemEventArgs
。 的解决方案,因此它允许您使用所有与任务相关的功能和工具:
static void Main(string[] args)
{
var scheduler = new SingleThreadTaskScheduler(thread =>
{
thread.SetApartmentState(System.Threading.ApartmentState.STA);
});
using var fsw = new FileSystemWatcher(@"c:\temp");
fsw.Created += onEvent;
fsw.Changed += onEvent;
fsw.Deleted += onEvent;
fsw.Renamed += onRenamed;
fsw.EnableRaisingEvents = true;
Console.ReadKey(true);
void onEvent(object sender, FileSystemEventArgs e)
{
Task.Factory.StartNew(() => { Console.WriteLine(e.FullPath); /* do something in STA */ }, CancellationToken.None, TaskCreationOptions.None, scheduler);
}
void onRenamed(object sender, RenamedEventArgs e)
{
Task.Factory.StartNew(() => { Console.WriteLine(e.FullPath); /* do something in STA */ }, CancellationToken.None, TaskCreationOptions.None, scheduler);
}
}
public sealed class SingleThreadTaskScheduler : TaskScheduler, IDisposable
{
private readonly AutoResetEvent _stop = new AutoResetEvent(false);
private readonly AutoResetEvent _dequeue = new AutoResetEvent(false);
private readonly ConcurrentQueue<Task> _tasks = new ConcurrentQueue<Task>();
private readonly Thread _thread;
public event EventHandler Executing;
public SingleThreadTaskScheduler(Action<Thread> threadConfigure = null)
{
_thread = new Thread(SafeThreadExecute) { IsBackground = true };
threadConfigure?.Invoke(_thread);
_thread.Start();
}
public DateTime LastDequeue { get; private set; }
public bool DequeueOnDispose { get; set; }
public int DisposeThreadJoinTimeout { get; set; } = 1000;
public int WaitTimeout { get; set; } = 1000;
public int DequeueTimeout { get; set; }
public int QueueCount => _tasks.Count;
public void ClearQueue() => Dequeue(false);
public bool TriggerDequeue()
{
if (DequeueTimeout <= 0)
return _dequeue != null && _dequeue.Set();
var ts = DateTime.Now - LastDequeue;
if (ts.TotalMilliseconds < DequeueTimeout)
return false;
LastDequeue = DateTime.Now;
return _dequeue != null && _dequeue.Set();
}
public void Dispose()
{
_stop.Set();
_stop.Dispose();
_dequeue.Dispose();
if (DequeueOnDispose)
{
Dequeue(true);
}
if (_thread != null && _thread.IsAlive)
{
_thread.Join(DisposeThreadJoinTimeout);
}
}
private int Dequeue(bool execute)
{
var count = 0;
do
{
if (!_tasks.TryDequeue(out var task))
break;
if (execute)
{
Executing?.Invoke(this, EventArgs.Empty);
TryExecuteTask(task);
}
count++;
}
while (true);
return count;
}
private void SafeThreadExecute()
{
try
{
ThreadExecute();
}
catch
{
// continue
}
}
private void ThreadExecute()
{
do
{
if (_stop == null || _dequeue == null)
return;
_ = Dequeue(true);
// note: Stop must be first in array (in case both events happen at the same exact time)
var i = WaitHandle.WaitAny(new[] { _stop, _dequeue }, WaitTimeout);
if (i == 0)
break;
// note: we can dequeue on _dequeue event, or on timeout
_ = Dequeue(true);
}
while (true);
}
protected override void QueueTask(Task task)
{
if (task == null)
throw new ArgumentNullException(nameof(task));
_tasks.Enqueue(task);
TriggerDequeue();
}
protected override IEnumerable<Task> GetScheduledTasks() => _tasks;
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => false;
}