jjzjj

Observable

全部标签

c# - 将两个 Observable 与一个优先级更高的 Observable 合并

是否可以使用ReactiveExtensions来实现以下目标;两个Observable,一个是“高”优先级,另一个是“低”优先级将两个Observable合并为一个,然后可以对其进行订阅,目的是让这个生成的Observable始终先于任何低优先级的项目发出高优先级的项目。我知道这可以使用两个ConcurrentQueue集合和类似的东西更简单地实现;returnthis.highPriorityItems.TryDequeue(outitem)||this.lowPriorityItems.TryDequeue(outitem);但是这种方法有一些问题,比如不能像Observable

c# - 可观察到 Rx 中的回调

我正在寻找一种优雅的方式来使用Rx从一个普通的回调委托(delegate)创建一个Observable,类似于Observable.FromEventPattern?说,我正在包装Win32EnumWindows回调我提供的EnumWindowsProc的API。我知道我可以为这个回调创建一个临时的C#事件适配器并将它传递给FromEventPattern。此外,我可能可以手动实现IObservable,因此它会从我的EnumWindowsProc回调中调用IObserver.OnNext。是否存在我缺少的用于在Rx中包装回调的现有模式? 最佳答案

c# - 如何创建一个产生单一值且永不完成的可观察对象

我知道Observable.Never()作为创建一个永不完成的序列的方法,但是是否有一个扩展/干净的过程来创建一个产生单个值然后永不完成的可观察对象?我和Observable.Create(...)一起去吗??Observable.Concat(Observable.Return(onlyValue),Observable.Never())?或者是否有内置或比这更多的“RXy”? 最佳答案 对于您的具体问题,一个简单的选择是使用‛Never‛和‛StartWith‛:Observable.Never().StartWith(5)但

c# - Observable.FromEvent 和 CreateDelegate 参数映射

我在看的实现Observable.FromEvent(add,remove)我正在努力了解它是如何工作的。让我们说TEventHandler是标准:publicdelegatevoidEventHandler(objectsender,EventArgse);那么让我费解的代码是:TEventHandlerd=(TEventHandler)Delegate.CreateDelegate(typeof(TEventHandler),(object)newAction(observer.OnNext),typeof(Action).GetMethod("Invoke"));(n.b我已将此

c# - 使用 RX 的最佳实践——返回一个 Observable 还是接受一个 Observer?

使用ReactiveExtensions,我可以想出多种方法来模拟具有副作用/IO的操作-比如从聊天室订阅消息。我可以接受参数(比如聊天室)和一个Observer,返回一个Disposable,即DisposableSubscribeTo(stringchatRoom,Observerobserver)或者在给定参数的情况下返回一个Observable,即ObservableGetObservableFor(stringchatRoom)当返回一个Observable时,我还可以选择将其设置为“热”或“冷”,即在调用我的方法时或在订阅observable时执行实际订阅。此外,我可以使o

c# - Linq 查询以在列表 c# 的列表中过滤 id

我有一个结果列表列表,其中包含列表。我有另一个列表,其中仅包含列表。我想使用linq查询从数据中进行过滤,它应该返回包含技能ID的所有数据来自第二个列表。varlist=this._viewModel.Data.Select(T=>T.SkillsList);varfiltered=item.Skills.Contains(list.Where(t=>t.ToString()).ToList();从第一个列表开始,它包含技能列表中的小数列表;item.Skills包含字段为skillid和代码的列表。item是另一个包含技能列表的对象。 最佳答案

c# - Observable.Interval 对高频事件有用吗?

我正在使用Observable.Interval来测试一段特定的客户端/服务器代码在不同负载下的执行情况。但它似乎有一些奇怪的行为。Observable.Interval(timespan=0)尽快产生事件,例如每秒800万个事件。这似乎没问题。Observable.Interval(0只产生1个事件,然后什么都不产生。Observable.Interval(1ms以大约请求的速率生成事件,高度量化,并且最多仅每秒64个事件。我可以理解它不一定在下面使用高分辨率计时器,但令人困惑的是它在三个区域中具有如此完全不同的行为。这是预期的行为,还是我用错了?如果是预期的,那么是否有替代Obse

c# - Observable.Where 与异步谓词

有没有一种方便的方法可以将异步函数用作可观察对象上的Where运算符的谓词?例如,如果我有一个整洁但可能长时间运行的函数定义如下:TaskRank(objectitem);是否有将其传递到Where并保持异步执行的技巧?如:myObservable.Where(asyncitem=>(awaitRank(item))>5)过去,当我需要这样做时,我会使用SelectMany并将这些结果与原始值一起投影到新类型中,然后根据那个。myObservable.SelectMany(asyncitem=>new{ShouldInclude=(awaitRank(item))>5,Item=ite

c# - Observable.Defer - 需要澄清它到底做了什么

假设我想生成一个异步随机数流,每100毫秒抽出一个新值。在尝试提出解决方案时,我的第一次尝试看起来像这样:varrandom=newRandom();Observable.Start(()=>random.Next()).Delay(TimeSpan.FromMilliseconds(100)).Repeat().Subscribe(Console.WriteLine);如果您尝试运行它,您会注意到它只是一遍又一遍地重复相同的值。好吧,我想我误解了Repeat的工作原理。玩了一会儿之后,我想到了这个并且它起作用了:varrandom=newRandom();Observable.Def

c# - 在不跳过值的情况下节流 Rx.Observable

Throttle方法会在其他人跟随得太快时跳过可观察序列中的值。但我需要一种方法来延迟它们。也就是说,我需要在不跳过任何项的情况下设置项之间的最小延迟。实际例子:有一个网络服务接受请求的速度不超过每秒一次;有一个用户可以添加单个或批量的请求。如果没有Rx,我将创建一个列表和一个计时器。当用户添加请求时,我会将它们添加到列表中。在计时器事件中,我将检查列表是否为空。如果不是,我会发送请求并删除相应的项目。带锁和所有的东西。现在,使用Rx,我可以创建Subject,在用户添加请求时添加项目。但我需要一种方法来确保Web服务不会因应用延迟而被淹没。我是Rx的新手,所以我可能遗漏了一些明显的东