如何为一组可观察对象实现 Observable.Publish

问题描述 投票:0回答:2

我正在寻找实现以下功能:

public static IObservable<TResult> Publish<TSource, TResult>(
    this IObservable<TSource>[] sources,
    Func<IObservable<TSource>[], IObservable<TResult>> selector)
{
  // how?
}

行为应该与现有的相同

IObservable<TResult> Publish<TSource, TResult>(
    IObservable<TSource> source,
    Func<IObservable<TSource>, Iobservable<TResult>> selector)

当然唯一的区别是源(和选择器的参数)是一个可观察数组。基本上,“给定一组可观察对象,产生一个结果可观察对象,同时共享对数组的每个可观察对象的单个订阅”。

这可以用任何巧妙的方式完成吗?

到目前为止,我唯一的想法是通过链接单个可观察的

.Publish()
调用的表达式树来实现这一点——出于性能原因,这是不可行的。

或者去做这样的事情。但这看起来很乱,我发现很难推理出正确性

public static IObservable<TResult> Publish<TSource, TResult>(
    this IObservable<TSource>[] sources,
    Func<IObservable<TSource>[], IObservable<TResult>> selector)
{
  return Observable.Create<TResult>(observer => 
  {
    var published = sources.Select(x => x.Publish()).ToArray();
    var result = new CompositeDisposable();
    result.Add(selector(published).Subscribe(observer));
    foreach(var p in published)
      result.Add(p.Connect());
    return result;
  });
}
c# system.reactive
2个回答
1
投票

我现在想出了一个解决方案,但很高兴听到更好的想法!

public static IObservable<TResult> Publish<TSource, TResult>(this IReadOnlyList<IObservable<TSource>> sources, Func<IReadOnlyList<IObservable<TSource>>, IObservable<TResult>> selector)
{
  IObservable<TResult> Generator(ImmutableArray<IObservable<TSource>> published)
  {
    return published.Length == sources.Count
      ? selector(published)
      : sources[published.Length].Publish(x => Generator(published.Add(x)));
  }

  return Generator(ImmutableArray<IObservable<TSource>>.Empty);
}

为什么这个有用的问题出现了。为了清楚起见,我附上了我打算用它做的事情之一。以下函数与现有的 CombineLatest() 工作方式相同,但集成了节流功能:项目的第一个组合会立即发出,之后的所有内容都会去抖动,如果结果选择器的调用开销很大,这会很有帮助。

public static IObservable<TResult> CombineLatest<TSource, TResult>(this IReadOnlyList<IObservable<TSource>> sources, Func<IList<TSource>, TResult> resultSelector, TimeSpan timeSpan, IScheduler? scheduler = null)
{
  scheduler ??= Scheduler.Default;
  return sources.Publish<TSource, TResult>(publishedSources => publishedSources
    .CombineLatest(_ => Unit.Default)
    .Publish(x => x.Take(1).Merge(x.Skip(1).Sample(timeSpan, scheduler))
    .Publish(signal => publishedSources
      .Select(source => signal.WithLatestFrom(source, (_,value) => value))
      .Zip()
      .Select(resultSelector)));
}

-2
投票

在我看来,您使用的

selector
是错误的。

这是您当前的实现

selector

public static IObservable<TResult> Publish<TSource, TResult>(
        this IObservable<TSource>[] sources,
        Func<IObservable<TSource>[], IObservable<TResult>> selector)
    => selector(sources);

这是微不足道的。这也有点同义反复。你的

selector
是你想要创造的东西。

唯一明智的选择器是

Func<IObservable<TSource>, IObservable<TResult>> selector
.

可以这样吗:

public static IObservable<TResult> Publish<TSource, TResult>(
        this IObservable<TSource>[] sources,
        Func<IObservable<TSource>, IObservable<TResult>> selector)
    => sources.Select(x => x.Publish(selector)).Merge();
© www.soinside.com 2019 - 2024. All rights reserved.