System.Reactive是指.NET的Reactive Extensions,也称为Rx。 Rx为开发人员提供了通用IObservable <T>接口的反应式编程模型,而不是传统的命令式编程模型或严格依赖.NET事件或特定API的其他反应式编程模型。
所有,我希望删除 System.Reactive 附带的所有 Observable 扩展方法,因为我们正在将当前代码库从 XF 迁移到 MAUI,并且我们有 XF 代码应用程序使用一些...
在 Blazor 服务器端应用程序中,我有一个组件,基本上是邮政编码的下拉列表。我使用 Reactive 来调用端点,该端点通过搜索条件查找匹配的邮政编码。 我
如何为一组可观察对象实现 Observable.Publish
我正在寻找实现以下功能: 公共静态 IObservable 发布( 这个 IObservable[] 来源, 函数 我正在寻找实现以下功能: public static IObservable<TResult> Publish<TSource, TResult>( this IObservable<TSource>[] sources, Func<IObservable<TSource>[], IObservable<TResult>> selector) { // how? } 行为应该与现有的相同 IObservable<TResult> Publish<TSource, TResult>( IObservable<TSource> source, Func<IObservable<TSource>, Iobservable<TResult>> selector) 当然唯一的区别是源(和选择器的参数)是一个可观察数组。基本上,“给定一组可观察对象,产生一个结果可观察对象,同时共享对数组的每个可观察对象的单个订阅”。 这可以用任何巧妙的方式完成吗? 到目前为止,我唯一的想法是通过链接单个可观察的 .Publish() 调用的表达式树来实现这一点——出于性能原因,这是不可行的。 或者去做这样的事情。但这看起来很乱,我发现很难推理出正确性 public static IObservable<TResult> Publish<TSource, TResult>( this IObservable<TSource>[] sources, Func<IObservable<TSource>[], IObservable<TResult>> selector) { return Observable.Create<TResult>(observer => { var published = sources.Select(x => x.Publish()).ToArray(); var result = new CompositeDisposable(); result.Add(selector(published).Subscribe(observer)); foreach(var p in published) result.Add(p.Connect()); return result; }); } 我现在想出了一个解决方案,但很高兴听到更好的想法! public static IObservable<TResult> Publish<TSource, TResult>(this IReadOnlyList<IObservable<TSource>> sources, Func<IReadOnlyList<IObservable<TSource>>, IObservable<TResult>> selector) { IObservable<TResult> Generator(ImmutableArray<IObservable<TSource>> published) { return published.Length == sources.Count ? selector(published) : sources[published.Length].Publish(x => Generator(published.Add(x))); } return Generator(ImmutableArray<IObservable<TSource>>.Empty); } 为什么这个有用的问题出现了。为了清楚起见,我附上了我打算用它做的事情之一。以下函数与现有的 CombineLatest() 工作方式相同,但集成了节流功能:项目的第一个组合会立即发出,之后的所有内容都会去抖动,如果结果选择器的调用开销很大,这会很有帮助。 public static IObservable<TResult> CombineLatest<TSource, TResult>(this IReadOnlyList<IObservable<TSource>> sources, Func<IList<TSource>, TResult> resultSelector, TimeSpan timeSpan, IScheduler? scheduler = null) { scheduler ??= Scheduler.Default; return sources.Publish<TSource, TResult>(publishedSources => publishedSources .CombineLatest(_ => Unit.Default) .Publish(x => x.Take(1).Merge(x.Skip(1).Sample(timeSpan, scheduler)) .Publish(signal => publishedSources .Select(source => signal.WithLatestFrom(source, (_,value) => value)) .Zip() .Select(resultSelector))); } 在我看来,您使用的selector是错误的。 这是您当前的实现selector: public static IObservable<TResult> Publish<TSource, TResult>( this IObservable<TSource>[] sources, Func<IObservable<TSource>[], IObservable<TResult>> selector) => selector(sources); 这是微不足道的。这也有点同义反复。你的 selector 是你想要创造的东西。 唯一明智的选择器是Func<IObservable<TSource>, IObservable<TResult>> selector. 可以这样吗: public static IObservable<TResult> Publish<TSource, TResult>( this IObservable<TSource>[] sources, Func<IObservable<TSource>, IObservable<TResult>> selector) => sources.Select(x => x.Publish(selector)).Merge();
使用 AtomicIntegers 构造的可观察对象在 RxJava 中的意外行为
这是测试用例: 导入 io.reactivex.rxjava3.core.*; 导入 java.util.concurrent.TimeUnit; 导入 java.util.concurrent.atomic.AtomicInteger; 公共课 MainTest { 公共静态 AtomicInte ...
System.Reactive:实现一个 IObservable<T>
我需要创建一个自定义的 IObservable。我在那里读了一些书,最后我不应该直接实施 IObservable。 我注意到有一个 ObservableBase。这是……
这对其他人来说似乎是一个基本问题,但由于我刚刚开始使用 Rx,我想知道是否有人可以指出正确的方向。我正在尝试订阅
将异步函数传递给类型为 Action<X> 的参数? [重复]
应用有如下定义 IDisposable IObservable.Subscribe(Action onNext) 它被传递了一个异步函数: 异步任务流程(X通知,
IObservable.Subscribe OnNext 操作不会在 blazor webassembly 中阻塞
我正在使用 https://github.com/dotnet/reactive 我使用 CombineLatest 将 observableA(通过计时器发出项目)与 observableB(手动插入)结合起来。 在 OnNext 方法中,有时我会向
我们如何从 IObservable<T>> 创建一个 IObservableList<IList<T>?
问题 我们如何从 IObservable> 到 IObservableList(来自 DynamicData)? 语境 我在我的项目中同时使用 Reactive 扩展和 DynamicData。 我
IObservable.Subscribe ONext 操作不会在 blazor webassembly 中阻塞
我正在使用 https://github.com/dotnet/reactive 我使用 CombineLatest 将 observableA(通过计时器发出项目)与 observableB(手动插入)结合起来。 在 OnNext 方法中,有时我会向
如何在 Rx.Net 中实现 exhaustMap 处理程序?
我正在从 rxjs 中寻找类似于 exhaustMap 运算符的东西,但 RX.NET 似乎没有这样的运算符。 我需要实现的是,在源流的每个元素上,...
我有一个带有时间戳的 Observable 项目集合。 我使用 Scan 方法包装每个项目,并添加对集合中最后一个有效项目的引用。 可观察
基本上我想创建一个队列。像const queue = new BehaviorSubject([]) queue.subscribe((args) => someCall(args))这样的东西,我可能会在几个地方调用queue.next({arg1, arg2, arg3})。
用TestScheduler测试一个IConnectableObservable。
好吧,虽然已经很晚了,但我还是无法解决为什么会发生以下情况。我正在尝试测试以下(简化的)IConnectableObservable。 : Private const int ...
在Node.js中,你可以设置一个服务器,只要服务器还活着,并且在事件循环中处于活跃状态,这个进程就不会终止。我想知道是否可以用......做这样的事情。
我正在使用一个非托管库,它要求所有对其API的调用都在同一个线程上运行。我们希望使用Reactive扩展的EventLoopScheduler来实现这一目标,因为我们将...
我有一个主题,每100毫秒得到一个int。这个数字在每次迭代时递增。我订阅这个主题,得到50的块回,并等待几秒钟。例如:...
因为微软可能杀了所有的并行扩展CTP的下载链接,我完全迷茫了。我想指定在某个时间运行的最大任务数,因为我希望线程数多于处理器数。
我有一个简单的订阅,是在MyClass中创建的。/myService.Connect返回IObservable。 myService.Connect(requestParameters) .Subscribe(DoSomething); 有一个 ...