反应式编程中流之间的循环依赖关系

问题描述 投票:0回答:3

涉足响应式编程时,我经常遇到两个流相互依赖的情况。解决这些情况的惯用方法是什么?

一个最小的例子:有按钮 A 和 B,都显示一个值。单击 A 必须将 A 的值增加 B。单击 B 必须将 B 的值设置为 A。

我能想到的第一个解决方案(F# 中的示例,但欢迎使用任何语言的答案):

let solution1 buttonA buttonB =
    let mutable lastA = 0
    let mutable lastB = 1
    let a = new Subject<_> ()
    let b = new Subject<_> ()
    (OnClick buttonA).Subscribe(fun _ -> lastA <- lastA + lastB; a.OnNext lastA) 
    (OnClick buttonB).Subscribe(fun _ -> lastB <- lastA; b.OnNext lastB)
    a.Subscribe(SetText buttonA)
    b.Subscribe(SetText buttonA)
    a.OnNext 0
    b.OnNext 1

这个解决方案使用可变状态和主题,它的可读性不太好,而且看起来不惯用。

我尝试的第二个解决方案涉及创建一个将两个依赖流链接在一起的方法:

let dependency (aGivenB: IObservable<_> -> IObservable<_>) (bGivenA: IObservable<_> -> IObservable<_>) =
    let bProxy = new ReplaySubject<_> () 
    let a = aGivenB bProxy
    let b = bGivenA a
    b.Subscribe(bProxy.OnNext)
    a, b

let solution2 buttonA buttonB =
    let aGivenB b =
        Observable.WithLatestFrom(OnClick buttonA, b, fun click bValue -> bValue)
                  .Scan(fun acc x -> acc + x)
                  .StartWith(0)
    let bGivenA a =
        Observable.Sample(a, OnClick buttonB)
                  .StartWith(1)
    let a, b = dependency aGivenB bGivenA
    a.Subscribe(SetText buttonA)
    b.Subscribe(SetText buttonB)

这看起来好一点,但是由于反应库中不存在像

dependency
这样的方法,我相信存在更惯用的解决方案。使用第二种方法也很容易引入无限递归。

在反应式编程中,处理涉及流之间循环依赖关系的问题(例如上面的示例)的推荐方法是什么?

c# f# system.reactive reactive-programming reactivex
3个回答
3
投票

编辑

这是一个 F# 解决方案:

type DU = 
    | A 
    | B 

type State = { AValue : int; BValue : int }

let solution2 (aObservable:IObservable<_>, bObservable:IObservable<_>) = 

    let union = aObservable.Select(fun _ -> A).Merge(bObservable.Select(fun _ -> B))

    let result = union.Scan({AValue = 0; BValue = 1}, fun state du -> match du with
        | A -> { state with AValue = state.AValue + state.BValue }
        | B -> { state with BValue = state.AValue }
    )

    result

F# 实际上是一种很棒的语言,这要归功于内置的可区分联合和记录。这是用 C# 编写的答案,带有自定义的受歧视联盟;我的 F# 有点生锈了。

诀窍是使用可区分联合将两个可观察量转变为一个可观察量。所以基本上将 a 和 b 联合成一个受歧视联合的可观察值:

a : *---*---*---**
b : -*-*--*---*---
du: ab-ba-b-a-b-aa

完成后,您可以对该项目是“A”推送还是“B”推送做出反应。

为了确认,我假设没有办法显式地 set 嵌入到 ButtonA/ButtonB 中的值。如果有的话,这些变化应该被建模为可观察的,并且也被纳入受歧视的联合中。

var a = new Subject<Unit>();
var b = new Subject<Unit>();
var observable = a.DiscriminatedUnion(b)
    .Scan(new State(0, 1), (state, du) => du.Unify(
        /* A clicked case */_ => new State(state.A + state.B, state.B), 
        /* B clicked case */_ => new State(state.A, state.A)
    )
);

observable.Subscribe(state => Console.WriteLine($"a = {state.A}, b = {state.B}"));
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
b.OnNext(Unit.Default);
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
a.OnNext(Unit.Default);
b.OnNext(Unit.Default);

这是 C# 中依赖的类。其中大部分都可以轻松转换为内置 F# 类型。

public class State /*easily replaced with an F# record */
{
    public State(int a, int b)
    {
        A = a;
        B = b;
    }

    public int A { get; }
    public int B { get; }
}

/* easily replaced with built-in discriminated unions and pattern matching */
public static class DiscriminatedUnionExtensions
{
    public static IObservable<DiscriminatedUnionClass<T1, T2>> DiscriminatedUnion<T1, T2>(this IObservable<T1> a, IObservable<T2> b)
    {
        return Observable.Merge(
            a.Select(t1 => DiscriminatedUnionClass<T1, T2>.Create(t1)),
            b.Select(t2 => DiscriminatedUnionClass<T1, T2>.Create(t2))
        );
    }

    public static IObservable<TResult> Unify<T1, T2, TResult>(this IObservable<DiscriminatedUnionClass<T1, T2>> source,
        Func<T1, TResult> f1, Func<T2, TResult> f2)
    {
        return source.Select(union => Unify(union, f1, f2));
    }

    public static TResult Unify<T1, T2, TResult>(this DiscriminatedUnionClass<T1, T2> union, Func<T1, TResult> f1, Func<T2, TResult> f2)
    {
        return union.Item == 1
            ? f1(union.Item1)
            : f2(union.Item2)
        ;
    }
}

public class DiscriminatedUnionClass<T1, T2>
{
    private readonly T1 _t1;
    private readonly T2 _t2;
    private readonly int _item;
    private DiscriminatedUnionClass(T1 t1, T2 t2, int item)
    {
        _t1 = t1;
        _t2 = t2;
        _item = item;
    }

    public int Item
    {
        get { return _item; }
    }

    public T1 Item1
    {
        get { return _t1; }
    }

    public T2 Item2
    {
        get { return _t2; }
    }

    public static DiscriminatedUnionClass<T1, T2> Create(T1 t1)
    {
        return new DiscriminatedUnionClass<T1, T2>(t1, default(T2), 1);
    }

    public static DiscriminatedUnionClass<T1, T2> Create(T2 t2)
    {
        return new DiscriminatedUnionClass<T1, T2>(default(T1), t2, 2);
    }
}

1
投票

假设输出最终被发送回源,您可以使用基本运算符来完成此操作。您所要做的就是为每个可观察到的按钮/信号调用

withLatestFrom
两次。我的解决方案是用java编写的,但它应该很容易理解!

private static Pair<Observable<Integer>, Observable<Integer>> test(
    final Observable<Integer> aValues,
    final Observable<Integer> bValues,
    final Observable<Void> aButton,
    final Observable<Void> bButton,
    final Func2<Integer, Integer, Integer> aFunction,
    final Func2<Integer, Integer, Integer> bFunction
) {
    return new Pair<>(
        aButton.withLatestFrom(aValues, (button, a) -> a).withLatestFrom(bValues, aFunction),
        bButton.withLatestFrom(aValues, (button, a) -> a).withLatestFrom(bValues, bFunction)
    );
}

这是我使用的测试代码:

final TestScheduler scheduler = new TestScheduler();

final TestSubject<Integer> aSubject = TestSubject.create(scheduler);
final TestSubject<Integer> bSubject = TestSubject.create(scheduler);
aSubject.onNext(1);
bSubject.onNext(1);

final TestSubject<Void> aButton = TestSubject.create(scheduler);
final TestSubject<Void> bButton = TestSubject.create(scheduler);

final Pair<Observable<Integer>, Observable<Integer>> pair = test(
    aSubject, bSubject, aButton, bButton, (a, b) -> a + b, (a, b) -> a
);

pair.component1().subscribe(aSubject::onNext);
pair.component2().subscribe(bSubject::onNext);
pair.component1().map(a -> "A: " + a).subscribe(System.out::println);
pair.component2().map(b -> "B: " + b).subscribe(System.out::println);

aButton.onNext(null); scheduler.triggerActions();
bButton.onNext(null); scheduler.triggerActions();
aButton.onNext(null); scheduler.triggerActions();
aButton.onNext(null); scheduler.triggerActions();
bButton.onNext(null); scheduler.triggerActions();

打印:

A: 2
B: 2
A: 4
A: 6
B: 6

1
投票

这是一个非常简单的解决方案,使用 Gjallarhorn:

#r @"..\packages\Gjallarhorn\lib\portable-net45+netcore45+wpa81+wp8+MonoAndroid1+MonoTouch1\Gjallarhorn.dll"

open Gjallarhorn

(*
    Clicking on A must increment the value of A by B. Clicking on B must set the value of B to A.
*)
let  a = Mutable.create 3
let b = Mutable.create 4

let clickA() = a.Value <- a.Value + b.Value
let clickB() = b.Value <- a.Value

let d1 = Signal.Subscription.create (fun x -> printfn "%A" <| "Clicked A: " + x.ToString()) a
let d2 = Signal.Subscription.create (fun x -> printfn "%A" <| "Clicked B: " + x.ToString()) b

clickA()
clickB()  

它实际上与您的初始版本非常相似,因此使用可变状态,但使绑定到 UI 变得非常容易,有关更惯用的用法,请参阅此博客文章

© www.soinside.com 2019 - 2024. All rights reserved.