Flow: 是一种类似于序列的冷流,flow构建器中的代码直到流被收集的时候才运行。
流的连续性:流的每次单独收集都是按顺序执行的,除非使用特殊操作符。
从上游到下游每个过渡操作符都会处理每个发射出的值,然后再交给末端操作符。
flow构建器创建一个函数
返回多个值,而且是异步的,不是一次性返回
(1)构建流的三种方式
// flow构建器创建一个函数
// 返回多个值,而且是异步的,不是一次性返回
suspend fun simpleFlow() = flow<Int> {
for (i in 1..3) {
delay(1000)
emit(i) // 发射,产生一个元素
}
}
runBlocking {
// Flow构建方式1
simpleFlow().collect { value -> println(value) } // 收集元素
// Flow构建方式2
(1..5).asFlow().filter {
it % 2 == 0
}.map {
println("Map $it")
}.onEach {
delay(1000)
}.collect {
println("Collect $it")
}
// Flow构建方式3
flowOf("one", "two", "three").onEach { delay(1000) }.collect { values ->
println(values)
}
}
(2)流的上下文
// Flow上下文验证
(1..5).asFlow().filter {
println("当前线程-filter:" + Thread.currentThread().name)
it % 2 == 0
}.map {
println("当前线程-map:" + Thread.currentThread().name)
}.onEach {
delay(1000)
}.collect {
println("当前线程-collect:" + Thread.currentThread().name)
println("Collect $it")
}
从打印结果上看,上游和下游都是在主线程。
但是,一般情况下,Flow构建之后的代码块中是耗时操作,所以不能放在主线程,解决方案是:在Flow构建器后面添加 flowOn(Dispatchers.Default),改造后的代码如下:
suspend fun simpleFlow() = flow<Int> {
for (i in 1..3) {
delay(1000)
emit(i) // 发射,产生一个元素
}
}.flowOn(Dispatchers.Default)
fun main() {
runBlocking {
// Flow构建方式1
simpleFlow().collect { value -> println(value) } // 收集元素
// Flow构建方式2
(1..5).asFlow().filter {
println("当前线程-filter:" + Thread.currentThread().name)
it % 2 == 0
}.map {
println("当前线程-map:" + Thread.currentThread().name)
}.onEach {
delay(1000)
}.flowOn(Dispatchers.Default).collect {
println("当前线程-collect:" + Thread.currentThread().name)
println("Collect $it")
}
// Flow构建方式3
flowOf("one", "two", "three").flowOn(Dispatchers.Default).onEach { delay(1000) }.collect { values ->
println(values)
}
}
}
(3)启动流
启动流:launchIn传入协程作用域形参,使用launchIn替换collect我们可以在指定协程中启动流的收集
(1..5).asFlow().onEach {
delay(1000)
}.flowOn(Dispatchers.Default).launchIn(CoroutineScope(Dispatchers.IO)).join()
(1..5).asFlow().onEach {
delay(1000)
}.flowOn(Dispatchers.Default).launchIn(this).join()
(4)流的取消
使用 withTimeoutOrNull 方式取消:
suspend fun simpleFlow() = flow<Int> {
for (i in 1..3) {
delay(1000)
emit(i) // 发射,产生一个元素
}
}.flowOn(Dispatchers.Default)
fun main() {
runBlocking {
withTimeoutOrNull(2000) {
// Flow构建方式1
simpleFlow().collect { value -> println(value) } // 收集元素
}
withTimeoutOrNull(2000) {
(1..5).asFlow().onEach {
delay(1000)
}.flowOn(Dispatchers.Default).collect {
println("Collect $it")
}
}
withTimeoutOrNull(2000) {
flowOf("one", "two", "three").flowOn(Dispatchers.Default).onEach { delay(1000) }.collect { values ->
println(values)
}
}
withTimeoutOrNull(2000) {
(1..5).asFlow().onEach {
delay(1000)
}.flowOn(Dispatchers.Default).launchIn(CoroutineScope(Dispatchers.IO)).join()
}
println("Done...")
}
}
另外,启动流还可以调用 cancelAndJoin 取消。
val job = (1..5).asFlow().onEach {
delay(1000)
}.flowOn(Dispatchers.Default).launchIn(CoroutineScope(Dispatchers.IO))
delay(1000)
job.cancelAndJoin()
(5)流的取消检测
为方便起见,流构建器对每个发射值执行附加的ensureActive 检测以进行取消,这意味着从 flow{...} 发出的繁忙循环是可以取消的。
出于性能原因,大多数其他流操作不会自行执行其他取消检测,在协程处于繁忙循环的情况下,必须明确检测是否取消。
通过cancellable操作符来执行此操作。
suspend fun simpleFlow() = flow<Int> {
for (i in 1..5) {
delay(1000)
emit(i) // emit自带检测是否取消的能力
}
}.flowOn(Dispatchers.Default)
fun main() {
runBlocking {
// emit 自带检测是否取消的能力
simpleFlow().collect { value ->
if (value == 3) cancel()
}
// 如果没有emit,需要使用 cancellable
(1..5).asFlow().cancellable().onEach {
delay(1000)
}.flowOn(Dispatchers.Default).collect { value ->
if (value == 3) cancel()
}
}
}
(6)背压
背压:水流受到与流动方向一致的压力。
生产者、消费者模式,只要生产效率 > 消费效率,那么就会产生背压。
处理背压的方式有:
不改变执行上下文。suspend fun simpleFlow() = flow<Int> {
for (i in 1..50) {
println("发送数据:$i")
delay(100)
emit(i)
}
}
fun main() {
runBlocking {
val time = measureTimeMillis {
simpleFlow()
.collect { value ->
delay(300)
println("接收数据:$value")
}
}
println("耗时:$time")
}
}
以上代码,发送数据和接收数据都是在同一个线程中并行执行,如果存在耗时程序,将特别影响效率。
为了增加执行效率,可以使用 buffer 设置缓存大小,从而起到加快执行速率的效果。
val time = measureTimeMillis {
// 背压
simpleFlow()
.buffer(10)
.collect { value ->
delay(300)
println("接收数据:$value")
}
}
但是,从生产者/消费者的设计思想的角度上考虑,发送数据最好放在子线程。
val time = measureTimeMillis {
// 背压
simpleFlow()
.flowOn(Dispatchers.Default)
.collect { value ->
delay(300)
println("接收数据:$value")
}
}
使用 flowOn 可以指定 Flow 的协程作用域,这样可以将 并行 转成 并发,从而加快执行效率。
runBlocking {
val time = measureTimeMillis {
// 背压
simpleFlow()
.conflate()
.collect { value ->
delay(300)
println("接收数据==:$value")
}
}
println("耗时:$time")
}
以上代码使用 conflate,中间一些元素不会处理,从而加快执行效率。
val time = measureTimeMillis {
// 背压
simpleFlow()
.collectLatest { value ->
delay(300)
println("接收数据==:$value")
}
以上代码将 collect 改成 collectLatest 之后,只会处理最后一个值,从而加速执行速度。
(7)转换操作符
使用map转换:
suspend fun simpleFlow() = flow<Int> {
for (i in 1..3) {
println(i)
emit(i)
}
}
fun main() {
runBlocking {
simpleFlow()
.map { value ->
"response $value"
}
.collect { value ->
println(value)
}
}
}
使用transform转换,可以转换成任意次、任意值的Flow:
suspend fun simpleFlow() = flow<Int> {
for (i in 1..3) {
println(i)
emit(i)
}
}
fun main() {
runBlocking {
simpleFlow()
.transform { request ->
emit("request $request")
emit("request $request")
}
.collect { value ->
println(value)
}
}
}
(8)限长操作符
take 是限长操作符,可以限制处理的数量:
suspend fun simpleFlow() = flow<Int> {
for (i in 1..3) {
println(i)
emit(i)
}
}
fun main() {
runBlocking {
simpleFlow()
.take(2)
.collect { value ->
println(value)
}
}
}
(9)末端操作符
末端操作符是在流上用于 启动流收集的挂起函数。collect是最基础的末端操作符,但是还有另外一些更加方便使用的末端操作符:
fun main() {
runBlocking {
val sum = simpleFlow()
.reduce { a, b ->
a + b
}
println(sum)
}
}
reduce 操作符可以将元素累加。
reduce的返回值类型必须和集合的元素类型相符。
suspend fun simpleFlow() = flow<Int> {
for (i in 1..3) {
emit(i)
}
}
fun main() {
runBlocking {
val newStr = simpleFlow()
.fold(StringBuilder()) { str: StringBuilder, a: Int ->
str.append(a).append(" ")
}
println(newStr)
}
}
而fold的返回值类型则不受约束。
(10)组合操作符
zip 操作符将两个流合并。
runBlocking {
val nums1 = (1..3).asFlow()
val nums2 = flowOf("one", "two", "three")
nums1.zip(nums2) {a, b ->
"$a $b"
}.collect {value->
println(value)
}
}
(11)展平操作符
流表示异步接收的值序列,所以很容易遇到这种情况:每个值都会触发对另一个值序列的请求,然而,由于流具有异步的性质,因此需要不同的展平模式,为此,存在一系列的流展平操作符:
suspend fun requestFlow(i: Int) = flow<String> {
emit("request $i first")
delay(500)
emit("request $i second")
}
fun main() {
runBlocking {
val startTime = System.currentTimeMillis()
(1..3).asFlow()
.onEach { delay(100) }
.flatMapConcat {
requestFlow(it) // Flow的元素是Flow
}
.collect { value->
println("$value -- ${System.currentTimeMillis() - startTime}")
}
}
}
代码中 flatMapConcat 可以换成 flatMapMerge 或者 flatMapLatest。
三者的执行结果是:
flatMapConcat :(requestFlow全部执行完)
request 1 first -- 198
request 1 second -- 701
request 2 first -- 815
request 2 second -- 1319
request 3 first -- 1428
request 3 second -- 1932
flatMapMerge:(不需要等待requestFlow全部执行完)
request 1 first -- 281
request 2 first -- 361
request 3 first -- 470
request 1 second -- 798
request 2 second -- 876
request 3 second -- 985
flatMapLatest:
request 1 first -- 250
request 2 first -- 376
request 3 first -- 485
request 3 second -- 1001
(12)流的异常处理
suspend fun requestFlow() = flow<Int> {
for (i in 1..3) {
emit(i)
throw RuntimeException("exception")
}
}.catch {e: Throwable ->
println("上游异常捕获:" + e.message)
}
fun main() {
runBlocking {
try {
requestFlow()
.collect { value->
check(value < 2) // 检查异常
println(value)
}
} catch (e: Throwable) {
println("下游异常捕获:" + e.message)
}
}
}
check:检查异常,一旦检查到异常,程序crash。
下游通过 try...catch 捕获异常,上游Flow自带 catch 函数。
(13)流的完成
收集完成时,使用 finally,表示收集完成。
suspend fun requestFlow() = flow<Int> {
for (i in 1..3) {
emit(i)
}
}
fun main() {
runBlocking {
try {
requestFlow().collect { value-> println(value) }
} finally {
println("...完成...")
}
}
}
使用 onCompletion 也可以表示完成:
suspend fun requestFlow() = flow<Int> {
for (i in 1..3) {
emit(i)
throw RuntimeException("exception")
}
}.catch {exception->
println("catch -> exception:" + exception.message)
}
fun main() {
runBlocking {
requestFlow()
.onCompletion {exception ->
if (exception != null) { // 异常导致完成
println("finish -> exception:" + exception.message)
} else { // 正常结束
println("正常结束")
}
}
.collect { value-> println(value) }
}
}
onCompletion 可以拿到异常信息,但是不能捕获异常。
(13)Flow实现多路复用
多数情况下,我们可以通过构造合适的Flow来实现多路复用的效果。
data class User(val name: String)
data class Response<T>(val value: T, val isLocal: Boolean)
suspend fun CoroutineScope.getUserForLocal(name: String) = async {
delay(1000)
User(name)
}
suspend fun CoroutineScope.getUserFromRemote(name: String) = async {
delay(100)
User(name)
}
fun main() {
runBlocking {
val name = "guest"
// 两个函数
listOf(::getUserForLocal, ::getUserFromRemote)
.map { function->
function.call(name)
}
.map { deferred ->
flow { emit(deferred.await()) }
}.merge().collect { user -> println(user) }
}
}
以上代码用到了反射,需要引入依赖:
implementation 'org.jetbrains.kotlin:kotlin-reflect:1.0.6'
[完...]
exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby中使用两个参数异步运行exe吗?我已经尝试过ruby命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何rubygems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除
在我做的一些网络开发中,我有多个操作开始,比如对外部API的GET请求,我希望它们同时开始,因为一个不依赖另一个的结果。我希望事情能够在后台运行。我找到了concurrent-rubylibrary这似乎运作良好。通过将其混合到您创建的类中,该类的方法具有在后台线程上运行的异步版本。这导致我编写如下代码,其中FirstAsyncWorker和SecondAsyncWorker是我编写的类,我在其中混合了Concurrent::Async模块,并编写了一个名为“work”的方法来发送HTTP请求:defindexop1_result=FirstAsyncWorker.new.async.
我们开始使用Ruby开发新游戏项目。我们决定使用其中一种异步Ruby服务器,但我们无法决定选择哪一种。选项是:歌利亚抽筋+消瘦/彩虹rack-fiber_pool+rack+thin/rainbowseventmachine_httpserver它们似乎都在处理HTTP请求。Cramp还支持开箱即用的Websocket和服务器端事件。您知道这些服务器的优缺点吗? 最佳答案 我使用eventmachine_httpserver公开了一个RESTfulAPIinanEventMachine-basedIRCbot绝对不会推荐它用于任何严
我一直在研究ruby的并行/异步处理能力,并阅读了许多文章和博客文章。我查看了EventMachine、Fibers、Revactor、Reia等。不幸的是,我无法为这个非常简单的用例找到简单、有效(且非IO阻塞)的解决方案:File.open('somelogfile.txt')do|file|whileline=file.gets#(R)ReadfromIOline=process_line(line)#(P)Processthelinewrite_to_db(line)#(W)WritetheoutputtosomeIO(DBorfile)endend你看到了吗,我的小脚本正
我使用RubyEventMachines已经有一段时间了,我想我已经了解它的基础知识了。但是,我不确定如何高效地读取大文件(120MB)。我的目标是逐行读取文件并将每一行写入Cassandra数据库(对于MySQL、PostgreSQL、MongoDB等也应该如此,因为Cassandra客户端明确支持EM)。这个简单的片段会阻塞react器,对吗?require'rubygems'require'cassandra'require'thrift_client/event_machine'EM.rundoFiber.newdorm=Cassandra.new('RankMetrics',
目录FIFO一.自定义同步FIFO1.1代码设计1.2Testbech1.3行为仿真***学习位宽计算函数$clog2()***$clog2()系统函数使用,可以不关注***分布式资源或者BLOCKBRAM二.异步FIFO2.1在FIFO判满的时候有两种方式:2.2异步FIFO为什么要使用格雷码2.2.1介绍格雷码2.2.2格雷码在异步FIFO中的应用2.2.2格雷码判满2.4二进制与格雷码之间的转换2.4.1二进制码转换为格雷码的方法2.4.2格雷码转换为二进制码的方法2.3实现框图2.5实现及仿真代码2.6仿真图验证2.7结论FIFO 这篇更多的是记录FIFO学习,参考了众多优秀的文章,
我在网络上阅读了大量关于不同版本的ruby和rails的线程安全和性能的资料,我想我现在已经很好地理解了这些内容。讨论中似乎奇怪地遗漏了如何实际部署异步Rails应用程序。当谈到应用程序中的线程和同步性时,人们希望优化两件事:以最少的RAM使用率利用所有CPU内核能够在之前的请求等待IO时处理新请求第1点是人们(正确地)对JRuby感到兴奋的地方。对于这个问题,我只是想优化第2点。假设这是我应用中唯一的Controller:classTheController"hello"enddefslowrender:text=>User.count.to_sendendfast没有IO,每秒
我正在使用Goliath(由eventmachine提供支持)和postgresgempg,目前我以阻塞方式使用pggem:conn.exec('SELECT*FROMproducts')(例如)我想知道是否有更好的方法连接到postgres数据库? 最佳答案 pg库提供对PostgreSQL异步API的全面支持。我添加了anexample如何使用它到samples/目录:#!/usr/bin/envrubyrequire'pg'#ThisisaexampleofhowtousetheasynchronousAPItoqueryth
乍一看,我以为新的ruby2.0Thread.handle_interrupt会解决我所有的异步中断问题,但除非我弄错了,否则我无法让它做我想做的事(我的问题在最后和标题中)。从文档中,我可以看到如何避免在某个block中接收中断,将它们推迟到另一个block。这是一个示例程序:duration=ARGV.shift.to_it=Thread.newdoThread.handle_interrupt(RuntimeError=>:never)do5.times{putc'-';sleep1}Thread.handle_interrupt(RuntimeError=>:immedia
我在C#/.Net中做了很多开发,异步故事从第一天起就一直存在(诚然,多年来API从开始/结束到事件发生了显着变化,到Task和async/await).在过去一年左右的时间里,我一直在使用Node.js进行开发,它异步执行所有I/O并使用单线程事件循环模型。最近我在做一个我们使用Ruby的项目,对于应用程序的一部分,我觉得异步地发出一大堆Web请求是有意义的,并且惊讶地发现Ruby中的异步故事是巨大的不同的。执行任何异步I/O的唯一方法是使用EventMachine。我的问题归结为:为什么在.Net中(据我所知,Java/JVM也是如此)不需要事件循环,而且我可以触发异步请求在任何时