我编写了使用特定状态选项的异步文件阅读器。读取所有文件后,我需要通知。但是,这个可观察的对象永远不会完成。您能帮助我理解为什么以及如何手动完成它吗?
public enum States
{
Processing, Stopped, Paused
};
public static class AsyncReader
{
public static IObservable<KeyValuePair<string,string>> ReadFile(string filePath, IObservable<States> readState)
{
return readState.Where(state => state == States.Processing).SelectMany(_ =>
Observable.Using(() => new StreamReader(filePath),
reader => Observable.FromAsync(reader.ReadLineAsync)
.Repeat()
.TakeWhile(line => line != null).Select(line => new KeyValuePair<string,string>(filePath, line))));
}
public static IObservable<KeyValuePair<string,string>> ReadFiles(string[] files, IObservable<States> readState)
{
IObservable<KeyValuePair<string, string>> dataSource = Observable.Empty<KeyValuePair<string, string>>();
foreach (var file in files)
{
dataSource = dataSource.Concat(ReadFile(file, readState));
}
return dataSource;
}
}
此外,我还将其用于动态更改文件列表。例如:
private Subject<string[]> filesSrc = new Subject<string[]>();
private Subject<States> state = new Subject<string[]>();
public void Start()
{
state.OnNext(States.Processing);
}
public void SelectFiles(string[] files)
{
filesSrc.OnNext(files);
}
public IObservable<KeyValuePair<string, string>> GetDataSource()
{
return filesSrc.Select(files => AsyncReader.ReadFiles(files, state)).Switch();
}
请让我知道这是否有效,然后我将说明为什么原因:
public static IObservable<KeyValuePair<string, string>> ReadFile(string filePath, IObservable<States> readState) =>
readState
.Where(state => state == States.Processing)
.SelectMany(_ =>
Observable
.Using(
() => new StreamReader(filePath),
reader =>
Observable
.Defer(
() =>
Observable
.FromAsync(reader.ReadLineAsync))
.Repeat()
.TakeWhile(line => line != null)
.Select(line => new KeyValuePair<string, string>(filePath, line))));
}