我想订阅流并在观察到特定事件序列时收到通知。
类似 - 用非常多的伪代码:
.WhenType<MessageHandled>().ThenType<TransactionCommitted>().ThenAnyNumberOfAnyType().ThenType<QueueStopped>()
。
类似于正则表达式,但过滤流中的事件/类型而不是字符串。
完成类似任务的最佳方法是什么?
我尝试使用 Observable Joins(请参阅System.Reactive.Joins 指南),但无法获得有效的解决方案。
但是,我确实尝试了状态机选项并得到了一些有用的东西。
我首先创建一个代表您的状态的
enum
。我知道您正在使用类型,但是 enum
让我的示例更清晰。
public enum Value
{
MessageHandled,
TransactionCommitted,
QueueStopped,
X,
Y,
}
然后我创建了一个可观察值,它返回这些值的各种组合。
IObservable<Value> source = new[]
{
/* invalid */ Value.TransactionCommitted, Value.TransactionCommitted, Value.MessageHandled, Value.X,
/* valid */ Value.MessageHandled, Value.TransactionCommitted, Value.X, Value.Y, Value.QueueStopped,
/* invalid */ Value.X, Value.X,
/* valid */ Value.MessageHandled, Value.TransactionCommitted, Value.QueueStopped,
/* invalid */ Value.Y, Value.MessageHandled, Value.Y, Value.TransactionCommitted, Value.QueueStopped,
}.ToObservable();
我为状态创建了一个枚举,并为状态机的初始状态和转换创建了定义。
public enum State { A, B, Z }
var initial = new (Value value, State state)[]
{
(Value.MessageHandled, State.A),
};
var transitions = new(State current, Func<Value, bool> rule, State next)[]
{
(State.A, t => t == Value.TransactionCommitted, State.B),
(State.B, t => t != Value.QueueStopped, State.B),
(State.B, t => t == Value.QueueStopped, State.Z),
};
最后,这是运行状态机的查询:
IObservable<Value[]> query =
source
.Scan(
new List<(State state, Value[] values)>(),
(accumulator, value) =>
(
Enumerable
.Concat(
from a in accumulator
from t in transitions
where a.state == t.current
where t.rule(value)
select (t.next, a.values.Append(value).ToArray()),
from i in initial
where i.value == value
select (i.state, new[] { value }))
).ToList())
.SelectMany(x => x.Where(y => y.state == State.Z), (x, y) => y.values);
产生:
而且,根据您的描述,我认为这就是您想要的。