如何合并同一源的热和冷可观测值,同时有效避免重复?

问题描述 投票:0回答:1

我有元素来源。它可以被查询,并在添加元素时发布事件。换句话说,我可以从查询结果中创建冷可观察值,并从事件中创建热可观察值。

一种方法是将冷热融合。

var o = cold.Merge(hot);

它可以工作,但如果元素是在订阅的冷和热可观察量之间发出的,它可能会丢失元素。

所以另一种方法是合并热和冷。

var o = hot.Merge(cold);

这也有效,可以避免丢失元素,但是当在订阅冷可观察值之前从热可观察值发出元素时,它会产生重复项。

过滤掉不明确的元素怎么样?

var o = hot.Merge(cold).Distinct()

这解决了问题,但它将维护一个不断增长的元素列表来检查独特性。我很感兴趣找到一种方法来完成使用 Distinct 的行为,但随着时间的推移不会使用越来越多的内存。

我相信解决方案是这样的:

订阅热门可观察的主题。将冷可观测值订阅给观察者。暂停主题(?),然后向观察者重播主题,但检查清晰度。最后,取消暂停(或者不管怎样)主题并将其订阅给观察者(没有清晰度检查)。

下面是演示上述行为的示例代码。如果您没有可用的编译器,您可以在此处运行它

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Threading.Tasks;

var strategies = new[]
{
    WarmObservable.WithMissesFrom<Int32>,
    WarmObservable.WithDuplicatesFrom,
    WarmObservable.WithCorrectButHighMemoryUsageFrom,
    //WarmObservable.From
};
foreach (var strategy in strategies)
{
    var observed = new List<Int32>();
    var nums = new ObservableAddOnlyCollection<Int32>();
    var warm = strategy(nums, nums.Added);
    const Int32 max = 1_000_000;
    Parallel.For(0, max, i =>
    {
        nums.Add(i);
        if (i == max / 2)
            warm.Subscribe(observed.Add);
    });
    Console.WriteLine($"{observed.Count:N0} elements observed using {strategy.Method.Name}.");
}

public sealed class ObservableAddOnlyCollection<T> : IObservable<T>
{
    private readonly List<T> items = new();
    private event Action<T> added = _ => {};

    public IObservable<T> Added => Observable.FromEvent<T>(h => added += h, h => added -= h);
    public void Add(T item) { lock (items) items.Add(item); added.Invoke(item); }
    public IDisposable Subscribe(IObserver<T> observer) => ToList().ToObservable().Subscribe(observer);
    private List<T> ToList() { lock (items) return items.ToList(); }
}

public static class WarmObservable
{
    public static IObservable<T> WithMissesFrom<T>(IObservable<T> cold, IObservable<T> hot)
    {
        return cold.Merge(hot);
    }
    
    public static IObservable<T> WithDuplicatesFrom<T>(IObservable<T> cold, IObservable<T> hot)
    {
        return hot.Merge(cold);
    }
    
    public static IObservable<T> WithCorrectButHighMemoryUsageFrom<T>(IObservable<T> cold, IObservable<T> hot)
    {
        return hot.Merge(cold).Distinct();
    }
    
    public static IObservable<T> From<T>(IObservable<T> cold, IObservable<T> hot)
    {
        // merge a cold observable with a hot observable, without losing any elements and without duplicating any elements
        // also don't require an increasing memory footprint to distinguish duplicates
        throw new NotImplementedException();
    }
}
c# system.reactive
1个回答
0
投票

我意识到,实现一个包装器

IObserver<T>
类使得在枚举冷可观察值后关闭不同检查变得微不足道。

这是可行的解决方案

public static class WarmObservable
{
    public static IObservable<T> From<T>(IObservable<T> cold, IObservable<T> hot, IEqualityComparer<T> ec)
    {
        // merge a cold observable with a hot observable, without losing any elements and without duplicating any elements
        // also don't require an increasing memory footprint to distinguish duplicates
        return new IgnoreDuplicatesColdCompletesObservable<T>(cold, hot, ec);
    }
    
    private sealed class IgnoreDuplicatesColdCompletesObservable<T>(IObservable<T> cold, IObservable<T> hot, IEqualityComparer<T> ec) : IObservable<T>
    {
        public IDisposable Subscribe(IObserver<T> observer)
        {
            var o = new DistinctObserver(observer, ec);
            return hot.Merge(cold.Finally(() => o.OnColdCompleted())).Subscribe(o);
        }
    
        private sealed class DistinctObserver(IObserver<T> o, IEqualityComparer<T> ec) : IObserver<T>
        {
            private HashSet<T>? set = new(ec);

            public void OnColdCompleted() => Interlocked.Exchange(ref set, null);

            public void OnCompleted() => o.OnCompleted();
            public void OnError(Exception error) => o.OnError(error);

            public void OnNext(T value)
            {
                if (set is not null && Interlocked.CompareExchange(ref set, null, null) is {} x)
                {
                    if (x.Add(value))
                    {
                        o.OnNext(value);
                    }
                }
                else
                {
                    o.OnNext(value);
                }
            }
        }
    }
}

优化说明:

set is not null
的“正常”检查位于互锁检查之前,作为一种优化。一旦
set
被分配
null
,当通过互锁读取时,该值将传播到CPU缓存,因此不必再进行互锁读取。

© www.soinside.com 2019 - 2024. All rights reserved.