jjzjj

backpressure

全部标签

javascript - RxJS 先 take 然后 throttle 等待

我想使用RxJS-DOM观察mousewheel事件,这样当第一个事件触发时,我转发它然后删除所有值,直到后续值之间的延迟超过先前指定的持续时间。我想象的运算符可能看起来像:Rx.DOM.fromEvent(window,'mousewheel',(e)=>e.deltaY).timegate(500/*ms*/)想象一下下面的数据流:0-(200ms)-1-(400ms)-2-(600ms)-3发送的值是数字,时间描述了下一个值到达所需的时间。由于0是第一个值,它会被发出,然后直到3的所有值都会被丢弃,因为后续值之间的各个延迟不大于500ms。与throttle不同,无论是否发出最后

javascript - RxJs:zip 运算符的有损形式

考虑使用zip运算符将两个无限的Observable压缩在一起,其中一个发出的数据项的频率是另一个的两倍。当前的实现是无损的,即如果我让这些Observable发射一个小时,然后我在它们的发射率之间切换,第一个Observable最终会catch另一个。随着缓冲区变得越来越大,这会在某个时候导致内存爆炸。如果第一个observable将在几个小时内发出项目,而第二个将在最后发出一个项目,则会发生同样的情况。如何实现此运算符的有损行为?我只想在我从两个流中获得排放时进行排放,我不在乎我错过了更快的流中有多少排放。说明:我在这里尝试解决的主要问题是由于zip运算符的无损特性导致的内存爆炸。

java - 获得 Cassandra Writes 背压的最佳方法是什么?

我有一项服务以我控制的速率从队列中消耗消息。我做了一些处理,然后尝试通过DatastaxJava客户端写入Cassandra集群。我已经使用maxRequestsPerConnection和maxConnectionsPerHost设置了我的Cassandra集群。但是,在测试中我发现,当我达到maxConnectionsPerHost和maxRequestsPerConnection时,对session.executeAsync的调用不会阻塞。我现在正在做的是使用newSemaphore(maxConnectionsPerHost*maxRequestsPerConnection)并

java - 如何在背压期间仅缓冲来自 rx.Observable 的最新发射

我有一个rx.Observable,它将任务的进度发送到onNext()。onNext()发射有时会发生得如此之快以至于Observer无法跟上,导致backpressure.我想通过仅缓冲来自Observable的最新发射来处理背压。例如:Observable发出1并且Observer接收1。当Observer仍在处理1时,Observable发出2、3,和4。Observer完成处理1并开始处理4(发射2和3被丢弃)。这似乎是在RxObservable中处理进度的常见情况,因为您通常只关心使用最新的进度信息更新您的UI。但是我一直无法弄清楚如何做到这一点。有人知道如何使用RxJav

redis - 如何高效地将数据从flink管道写入redis

我正在Apacheflinksqlapi中构建管道。管道进行简单的投影查询。但是,我需要在查询之前和查询之后再写一次元组(恰好是每个元组中的一些元素)。事实证明,我用来写入redis的代码严重降低了性能。即flink以非常小的数据速率产生背压。我的代码有什么问题,我该如何改进。请有任何建议。当我停止向redis写入前后性能都非常出色。这是我的管道代码:publicclassQueryExample{publicstaticLongthroughputCounterAfter=newLong("0");publicstaticvoidmain(String[]args){intk_par

go - 在具有互斥锁的 goroutine 之间修改的 slice 未显示正确的同步

我是新手,但之前使用过并发。我在共享多个goroutine之间的slice时遇到问题,这些goroutine不包含所有goroutine之间的相同数据。当我修改slice时,我也使用互斥锁来锁定结构,但它似乎没有帮助。我附上了我的代码,想知道我做错了什么,感谢您的帮助!typeStatestruct{waitingint32processingint32completedint32}typeSchedulerstruct{sync.Mutexitemschaninterface{}backPressure[]interface{}capacityintcancelercontext.C

go - 在具有互斥锁的 goroutine 之间修改的 slice 未显示正确的同步

我是新手,但之前使用过并发。我在共享多个goroutine之间的slice时遇到问题,这些goroutine不包含所有goroutine之间的相同数据。当我修改slice时,我也使用互斥锁来锁定结构,但它似乎没有帮助。我附上了我的代码,想知道我做错了什么,感谢您的帮助!typeStatestruct{waitingint32processingint32completedint32}typeSchedulerstruct{sync.Mutexitemschaninterface{}backPressure[]interface{}capacityintcancelercontext.C

android - RxJava Subject with Backpressure - 只有在下游完成消费后才发出最后一个值

我有一个PublishSubject,它在某些UI事件上调用onNext()。订阅者通常需要2秒来完成其工作。我需要在订阅者忙碌时忽略对onNext()的所有调用,但最后一个除外。我尝试了以下方法,但是我无法控制流程。请求似乎排队,每个请求都得到处理(因此背压似乎不起作用)。我怎样才能让它忽略除最后一个请求之外的所有请求?(我不想使用debounce,因为代码需要立即使用react,任何合理的小超时都不会起作用)。此外,我意识到对主题使用subscribeOn没有任何效果,因此我使用observeOn在其中一个运算符中执行异步工作。这是正确的做法吗?SubjectloadingQueu