我正在寻找实现以下功能:
public static IObservable<TResult> Publish<TSource, TResult>(
this IObservable<TSource>[] sources,
Func<IObservable<TSource>[], IObservable<TResult>> selector)
{
// how?
}
行为应该与现有的相同
IObservable<TResult> Publish<TSource, TResult>(
IObservable<TSource> source,
Func<IObservable<TSource>, Iobservable<TResult>> selector)
当然唯一的区别是源(和选择器的参数)是一个可观察数组。基本上,“给定一组可观察对象,产生一个结果可观察对象,同时共享对数组的每个可观察对象的单个订阅”。
这可以用任何巧妙的方式完成吗?
到目前为止,我唯一的想法是通过链接单个可观察的
.Publish()
调用的表达式树来实现这一点——出于性能原因,这是不可行的。
或者去做这样的事情。但这看起来很乱,我发现很难推理出正确性
public static IObservable<TResult> Publish<TSource, TResult>(
this IObservable<TSource>[] sources,
Func<IObservable<TSource>[], IObservable<TResult>> selector)
{
return Observable.Create<TResult>(observer =>
{
var published = sources.Select(x => x.Publish()).ToArray();
var result = new CompositeDisposable();
result.Add(selector(published).Subscribe(observer));
foreach(var p in published)
result.Add(p.Connect());
return result;
});
}
我现在想出了一个解决方案,但很高兴听到更好的想法!
public static IObservable<TResult> Publish<TSource, TResult>(this IReadOnlyList<IObservable<TSource>> sources, Func<IReadOnlyList<IObservable<TSource>>, IObservable<TResult>> selector)
{
IObservable<TResult> Generator(ImmutableArray<IObservable<TSource>> published)
{
return published.Length == sources.Count
? selector(published)
: sources[published.Length].Publish(x => Generator(published.Add(x)));
}
return Generator(ImmutableArray<IObservable<TSource>>.Empty);
}
为什么这个有用的问题出现了。为了清楚起见,我附上了我打算用它做的事情之一。以下函数与现有的 CombineLatest() 工作方式相同,但集成了节流功能:项目的第一个组合会立即发出,之后的所有内容都会去抖动,如果结果选择器的调用开销很大,这会很有帮助。
public static IObservable<TResult> CombineLatest<TSource, TResult>(this IReadOnlyList<IObservable<TSource>> sources, Func<IList<TSource>, TResult> resultSelector, TimeSpan timeSpan, IScheduler? scheduler = null)
{
scheduler ??= Scheduler.Default;
return sources.Publish<TSource, TResult>(publishedSources => publishedSources
.CombineLatest(_ => Unit.Default)
.Publish(x => x.Take(1).Merge(x.Skip(1).Sample(timeSpan, scheduler))
.Publish(signal => publishedSources
.Select(source => signal.WithLatestFrom(source, (_,value) => value))
.Zip()
.Select(resultSelector)));
}
在我看来,您使用的
selector
是错误的。
这是您当前的实现
selector
:
public static IObservable<TResult> Publish<TSource, TResult>(
this IObservable<TSource>[] sources,
Func<IObservable<TSource>[], IObservable<TResult>> selector)
=> selector(sources);
这是微不足道的。这也有点同义反复。你的
selector
是你想要创造的东西。
唯一明智的选择器是
Func<IObservable<TSource>, IObservable<TResult>> selector
.
可以这样吗:
public static IObservable<TResult> Publish<TSource, TResult>(
this IObservable<TSource>[] sources,
Func<IObservable<TSource>, IObservable<TResult>> selector)
=> sources.Select(x => x.Publish(selector)).Merge();