无法完成观察

问题描述 投票:0回答:1

我编写了使用特定状态选项的异步文件阅读器。读取所有文件后,我需要通知。但是,这个可观察的对象永远不会完成。您能帮助我理解为什么以及如何手动完成它吗?

public enum States
{
    Processing, Stopped, Paused
};

public static class AsyncReader
    {
        public static IObservable<KeyValuePair<string,string>> ReadFile(string filePath, IObservable<States> readState)
        {
            return readState.Where(state => state == States.Processing).SelectMany(_ =>
                Observable.Using(() => new StreamReader(filePath),
                    reader => Observable.FromAsync(reader.ReadLineAsync)
                    .Repeat()
                    .TakeWhile(line => line != null).Select(line => new KeyValuePair<string,string>(filePath, line))));
        }

        public static IObservable<KeyValuePair<string,string>> ReadFiles(string[] files, IObservable<States> readState)
        {
            IObservable<KeyValuePair<string, string>> dataSource = Observable.Empty<KeyValuePair<string, string>>();
            foreach (var file in files)
            {
                dataSource = dataSource.Concat(ReadFile(file, readState));
            }
            return dataSource;
        }
    }

此外,我还将其用于动态更改文件列表。例如:

private Subject<string[]> filesSrc = new Subject<string[]>();
private Subject<States> state = new Subject<string[]>();

public void Start()
{
    state.OnNext(States.Processing);
}

public void SelectFiles(string[] files)
{
    filesSrc.OnNext(files);
}

public IObservable<KeyValuePair<string, string>> GetDataSource()
{
    return filesSrc.Select(files => AsyncReader.ReadFiles(files, state)).Switch();
}

c# observable system.reactive reactiveui
1个回答
0
投票

请让我知道这是否有效,然后我将说明为什么原因:

public static IObservable<KeyValuePair<string, string>> ReadFile(string filePath, IObservable<States> readState) =>
    readState
        .Where(state => state == States.Processing)
        .SelectMany(_ =>
            Observable
                .Using(
                    () => new StreamReader(filePath),
                    reader =>
                        Observable
                            .Defer(
                                () =>
                                    Observable
                                        .FromAsync(reader.ReadLineAsync))
                            .Repeat()
                            .TakeWhile(line => line != null)
                            .Select(line => new KeyValuePair<string, string>(filePath, line))));
}
© www.soinside.com 2019 - 2024. All rights reserved.