jjzjj

Kotlin Flow 背压和线程切换竟然如此相似

我爱田Hebe 2023-09-21 原文

前言

上篇分析了Kotlin Flow原理,大部分操作符实现比较简单,相较而言背压和线程切换比较复杂,遗憾的是,纵观网上大部分文章,关于Flow背压和协程切换这块的原理说得比较少,语焉不详,鉴于此,本篇重点分析两者的原理及使用。
通过本篇文章,你将了解到:

  1. 什么是背压?
  2. 如何处理背压?
  3. Flow buffer的原理
  4. Flow 线程切换的使用
  5. Flow 线程切换的原理

1. 什么是背压?

先看自然界的水流:

为了充分利用水资源,人类建立了大坝,以大坝为分界点将水流分为上游和下游。

当上游的流速大于下游的流速,日积月累,最终导致大坝溢出,此种现象称为背压的出现

而对于Kotlin里的Flow,也有上游(生产者)、下游(消费者)的概念,如:

    suspend fun testBuffer1() {
        var flow = flow {
            //生产者
            (1..3).forEach {
                println("emit $it")
                emit(it)
            }
        }

        flow.collect {
            //消费者
            println("collect:$it")
        }
    }

通过collect操作符触发了流,从生产者生产数据(flow闭包),到消费者接收并处理数据(collect闭包),这就完成了流从上游到下游的一次流动过程。

2. 如何处理背压?

模拟一个生产者消费者速度不一致的场景:

    suspend fun testBuffer3() {
        var flow = flow {
            (1..3).forEach {
                delay(1000)
                println("emit $it")
                emit(it)
            }
        }

        var time = measureTimeMillis {
            flow.collect {
                delay(2000)
                println("collect:$it")
            }
        }
        println("use time:${time} ms")
    }

计算流从生产到消费的整个时间:

生产者的速度比消费者的速度快,而它俩都是在同一个线程里顺序执行的,生产者必须等待消费者消费完毕后才会进行下一次生产。
因此,整个流的耗时=生产者耗时(3 * 1000ms)+消费者耗时(3 * 2000ms)=9s。

显而易见,消费者影响了生产者的速度,这种情况下该怎么优化呢?
最简单的解决方案:

生产者和消费者分别在不同的线程执行

如:

    suspend fun testBuffer4() {
        var flow = flow {
            (1..3).forEach {
                delay(1000)
                println("emit $it in thread:${Thread.currentThread()}")
                emit(it)
            }
        }.flowOn(Dispatchers.IO)

        var time = measureTimeMillis {
            flow.collect {
                delay(2000)
                println("collect:$it in thread:${Thread.currentThread()}")
            }
        }
        println("use time:${time} ms")
    }

添加了flowOn()函数,它的存在使得它前面的代码在指定的线程里执行,如flow闭包了的代码都在IO线程执行,也就是生产者在IO线程执行。
而消费者在当前线程执行,因此两者无需相互等待,节省了总时间:

确实是减少了时间,提升了效率。但我们知道开启线程代价还是挺大的,既然都在协程里运行了,能否借助协程的特性:协程挂起不阻塞线程 来完成此事呢?
此时,Buffer出场了,先看看它是如何表演的:

    suspend fun testBuffer5() {
        var flow = flow {
            (1..3).forEach {
                delay(1000)
                println("emit $it in thread:${Thread.currentThread()}")
                emit(it)
            }
        }.buffer(5)

        var time = measureTimeMillis {
            flow.collect {
                delay(2000)
                println("collect:$it in thread:${Thread.currentThread()}")
            }
        }
        println("use time:${time} ms")
    }

这次没有使用flowOn,取而代之的是buffer。
运行结果如下:

可以看出,生产者消费者都是在同一线程执行,但总耗时却和不在同一线程运行时相差无几。
那么它是如何做到的呢?这就得从buffer的源码说起。

3. Flow buffer的原理

无buffer

先看看没有buffer时的耗时:

    suspend fun testBuffer3() {
        var flow = flow {
            (1..3).forEach {
                delay(1000)
                println("emit $it")
                emit(it)
            }
        }

        var time = measureTimeMillis {
            flow.collect {
                delay(2000)
                println("collect:$it")
            }
        }
        println("use time:${time} ms")
    }

从collect开始,依次执行flow闭包,通过emit调用到collect闭包,因为flow闭包里包含了几次emit,因此整个流程会有几次发射。
如上图,从步骤1到步骤8,因为是在同一个线程里,因此是串行执行的,整个流的耗时即为生产者到消费者(步骤1~步骤8)的耗时。

有buffer

在没看源码之前,我们先猜测一下它的流程:

每次emit都发送到buffer里,然后立刻回来继续发送,如此一来生产者没有被消费者的速度拖累。
而消费者会检测Buffer里是否有数据,有则取出来。

根据之前的经验我们知道:collect调用到emit最后到buffer是线性调用的,放入buffer后继续循环emit,那么问题来了:

是谁触发了collect闭包的调用呢?

接下来深入源码,探究答案。

buffer源码流程分析

创建Flow

public fun <T> Flow<T>.buffer(capacity: Int = Channel.BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> {
    var capacity = capacity//buffer容量
    var onBufferOverflow = onBufferOverflow//buffer满之后的处理策略
    if (capacity == Channel.CONFLATED) {
        capacity = 0
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    }
    // create a flow
    return when (this) {
        is FusibleFlow -> fuse(capacity = capacity, onBufferOverflow = onBufferOverflow)
        //走else 分支,构造ChannelFlowOperatorImpl
        else -> ChannelFlowOperatorImpl(this, capacity = capacity, onBufferOverflow = onBufferOverflow)
    }
}

buffer 返回Flow实例,其间涉及几个重要的类和函数:

调用collect
当调用Flow.collect时:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

构造了匿名内部类FlowCollector,并实现了emit方法,它的实现为collect的闭包。

调用ChannelFlowOperatorImpl.collect最终会调用ChannelFlow.collect:

    override suspend fun collect(collector: FlowCollector<T>): Unit =
        coroutineScope {
            collector.emitAll(produceImpl(this))
        }

    public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
        scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)

produceImpl 创建了Channel,内部开启了协程,返回ReceiveChannel。

再来看emitAll函数:

private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
    ensureActive()
    var cause: Throwable? = null
    try {
        while (true) {
            //挂起等待Channel数据
            val result = run { channel.receiveCatching() }
            if (result.isClosed) {
                //Channel关闭后才会退出循环
                result.exceptionOrNull()?.let { throw it }
                break // returns normally when result.closeCause == null
            }
            //发送数据
            emit(result.getOrThrow())
        }
    } catch (e: Throwable) {
        cause = e
        throw e
    } finally {
        if (consume) channel.cancelConsumed(cause)
    }
}

Channel此时并没有数据,因此协程会挂起等待。

Channel发送
Channel什么时候有数据呢?当然是在调用了Channel.send()函数后。
前面提到过collect之后开启了协程:

  public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
        scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)

  internal val collectToFun: suspend (ProducerScope<T>) -> Unit
        get() = { collectTo(it) }

  protected override suspend fun collectTo(scope: ProducerScope<T>) =
        flowCollect(SendingCollector(scope))

此时传入的参数为:collectToFun,最后构造了:

public class SendingCollector<T>(
    private val channel: SendChannel<T>
) : FlowCollector<T> {
    override suspend fun emit(value: T): Unit = channel.send(value)
}

当协程得到执行时,会调用collectToFun-->collectTo(it)-->flowCollect(SendingCollector(scope)),最终调用到:

#ChannelFlowOperatorImpl
    override suspend fun flowCollect(collector: FlowCollector<T>) =
        flow.collect(collector)

而该flow为最开始的flow,collector为SendingCollector。
flow.collect后会调用到flow的闭包,进而调用到emit函数:

    private fun emit(uCont: Continuation<Unit>, value: T): Any? {
        val currentContext = uCont.context
        currentContext.ensureActive()
        //...
        completion = uCont
        return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
    }

emitFun本质上会调用collector里的emit函数,而此时的collector即为SendingCollector,最后调用channel.send(value)

如此一来,Channel就将数据发送出去了,此时channel.receiveCatching()被唤醒,接下来执行emit(result.getOrThrow()),这函数最后会流转到最初始的collect的闭包里。
上面的分析即为生产者到消费者的流转过程,单看源码可能比较乱,看图解惑:

红色部分和绿色部分分别为不同的协程,它俩的关联点即是蓝色部分。

Flow buffer的本质上是利用了Channel进行数据的发送和接收

buffer为啥能提升效率

前面分析过无buffer时生产者消费者的流程图,作为对比,我们也将加入buffer后生产者消费者的流程图。

还是以相同的demo,阐述其流程:

  1. 生产者挂起1s,当1s结束后调用emit发射数据,此时数据放入buffer里,生产者调用delay继续挂起
  2. 此时消费者被唤醒,然后挂起 2s等待
  3. 第2s到来之时,生产者调用emit发送数据到buffer里,继续挂起
  4. 第2s到来之时,消费者结束挂起,消费数据,然后继续挂起2s
  5. 第3s到来之时,生产者继续生产数据,而后生产者退出生产
  6. 第5s到来之时,消费者挂起结束,消费数据,然后继续挂起2s
  7. 第7s到来之时,消费者挂起结束,消费结束,此时因为channel里已经没有数据了,退出循环,最终消费者退出

由此可见,总共花费了7s。

ps:协程调度时机不同,打印顺序可能略有差异,但总体耗时不变。

至此,我们找到了buffer能够提高效率的原因:

生产者、消费者运行在不同的协程,挂起操作不阻塞对方

抛出一个比较有意思的问题:以下代码加buffer之后效率会有提升吗?

    suspend fun testBuffer6() {
        var flow = flow {
            (1..3).forEach {
                println("emit $it")
                emit(it)
            }
        }
        var time = measureTimeMillis {
            flow.collect {
                delay(2000)
                println("collect:$it")
            }
        }
        println("use time:${time} ms")
    }

在未实验之前,如果你已经有答案,恭喜你已经弄懂了buffer的本质。

4. Flow 线程切换的使用

    suspend fun testBuffer4() {
        var flow = flow {
            (1..3).forEach {
                delay(1000)
                println("emit $it in thread:${Thread.currentThread()}")
                emit(it)
            }
        }.flowOn(Dispatchers.IO)

        var time = measureTimeMillis {
            flow.collect {
                delay(2000)
                println("collect:$it in thread:${Thread.currentThread()}")
            }
        }
        println("use time:${time} ms")
    }

flowOn(Dispatchers.IO)表示其之前的操作符(函数)都在IO线程执行,如这里的意思是flow闭包里的代码在IO线程执行。
而其之后的操作符(函数)在当前的线程执行。
通常用在子线程里获取网络数据(flow闭包),然后再collect闭包里(主线程)更新UI。

5. Flow 线程切换的原理

public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
    checkFlowContext(context)
    return when {
        context == EmptyCoroutineContext -> this
        this is FusibleFlow -> fuse(context = context)
        else -> ChannelFlowOperatorImpl(this, context = context)
    }
}

看到这你可能已经有答案了:这不就和buffer一样的方式吗?
但仔细看,此处多了个上下文:CoroutineContext。
CoroutineContext的作用就是用来决定协程运行在哪个线程。

前面分析的buffer时,我们的协程的作用域是runBlocking,即使生产者、消费者在不同的协程,但是它们始终在同一个线程里执行。
而使用了flowOn指定线程,此时生产者、消费者在不同的线程运行协程。
因此,只要弄懂了buffer原理,flowOn原理自然而然就懂了。

以上为Flow背压和线程切换的全部内容,下篇将分析Flow的热流。
本文基于Kotlin 1.5.3,文中完整Demo请点击

您若喜欢,请点赞、关注、收藏,您的鼓励是我前进的动力

持续更新中,和我一起步步为营系统、深入学习Android/Kotlin

1、Android各种Context的前世今生
2、Android DecorView 必知必会
3、Window/WindowManager 不可不知之事
4、View Measure/Layout/Draw 真明白了
5、Android事件分发全套服务
6、Android invalidate/postInvalidate/requestLayout 彻底厘清
7、Android Window 如何确定大小/onMeasure()多次执行原因
8、Android事件驱动Handler-Message-Looper解析
9、Android 键盘一招搞定
10、Android 各种坐标彻底明了
11、Android Activity/Window/View 的background
12、Android Activity创建到View的显示过
13、Android IPC 系列
14、Android 存储系列
15、Java 并发系列不再疑惑
16、Java 线程池系列
17、Android Jetpack 前置基础系列
18、Android Jetpack 易学易懂系列
19、Kotlin 轻松入门系列
20、Kotlin 协程系列全面解读

作者:小鱼人爱编程
链接:https://juejin.cn/post/7172957388348063780

有关Kotlin Flow 背压和线程切换竟然如此相似的更多相关文章

  1. ruby-on-rails - Ruby on Rails with Haml - 如何从 erb 切换 - 2

    我正在从erb文件切换到HAML。我将hamlgem添加到我的系统中。我创建了app/views/layouts/application.html.haml文件。我应该只删除application.html.erb文件吗?此外,仍然有/public/index.html文件被呈现为默认页面。我想创建自己的默认index.html.haml页面。我应该把它放在哪里以及如何使系统呈现该文件而不是默认索引文件?谢谢! 最佳答案 是的,您可以删除任何已转换为HAML的View的ERB版本。至于你的另一个问题,删除public/index/h

  2. ruby - RuntimeError(自动加载常量 Apps 多线程时检测到循环依赖 - 2

    我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("

  3. ruby - 如何让Ruby捕获线程中的语法错误 - 2

    我正在尝试使用ruby​​编写一个双线程客户端,一个线程从套接字读取数据并将其打印出来,另一个线程读取本地数据并将其发送到远程服务器。我发现的问题是Ruby似乎无法捕获线程内的错误,这是一个示例:#!/usr/bin/rubyThread.new{loop{$stdout.puts"hi"abc.putsefsleep1}}loop{sleep1}显然,如果我在线程外键入abc.putsef,代码将永远不会运行,因为Ruby将报告“undefinedvariableabc”。但是,如果它在一个线程内,则没有错误报告。我的问题是,如何让Ruby捕获这样的错误?或者至少,报告线程中的错误?

  4. ruby - 如何在 ruby​​ 中运行后台线程? - 2

    我是ruby​​的新手,我认为重新构建一个我用C#编写的简单聊天程序是个好主意。我正在使用Ruby2.0.0MRI(Matz的Ruby实现)。问题是我想在服务器运行时为简单的服务器命令提供I/O。这是从示例中获取的服务器。我添加了使用gets()获取输入的命令方法。我希望此方法在后台作为线程运行,但该线程正在阻塞另一个线程。require'socket'#Getsocketsfromstdlibserver=TCPServer.open(2000)#Sockettolistenonport2000defcommandsx=1whilex==1exitProgram=gets.chomp

  5. ruby-on-rails - 需要帮助最大化多个相似对象中的 3 个因素并适当排序 - 2

    我需要用任何语言编写一个算法,根据3个因素对数组进行排序。我以度假村为例(如Hipmunk)。假设我想去度假。我想要最便宜的地方、最好的评论和最多的景点。但是,显然我找不到在所有3个中都排名第一的方法。Example(assumingthereare20importantattractions):ResortA:$150/night...98/100infavorablereviews...18of20attractionsResortB:$99/night...85/100infavorablereviews...12of20attractionsResortC:$120/night

  6. ruby - Rails 开发服务器、PDFKit 和多线程 - 2

    我有一个使用PDFKit呈现网页的pdf版本的Rails应用程序。我使用Thin作为开发服务器。问题是当我处于开发模式时。当我使用“bundleexecrailss”启动我的服务器并尝试呈现任何PDF时,整个过程会陷入僵局,因为当您呈现PDF时,会向服务器请求一些额外的资源,如图像和css,看起来只有一个线程.如何配置Rails开发服务器以运行多个工作线程?非常感谢。 最佳答案 我找到的最简单的解决方案是unicorn.geminstallunicorn创建一个unicorn.conf:worker_processes3然后使用它:

  7. ruby - Ruby 1.9.1 中的 native 线程,对我有什么好处? - 2

    所以,Ruby1.9.1现在是declaredstable.Rails应该与它一起工作,并且正在慢慢地将gem移植到它。它具有native线程和全局解释器锁(GIL)。自从GIL到位后,原生线程是否比1.9.1中的绿色线程有任何优势? 最佳答案 1.9中的线程是原生的,但它们被“放慢了速度”,一次只允许一个线程运行。这是因为如果线程真的并行运行,它会混淆现有代码。优点:IO现在在线程中是异步的。如果一个线程阻塞在IO上,那么另一个线程将继续执行直到IO完成。C扩展可以使用真正的线程。缺点:任何非线程安全的C扩展都可能存在使用Thre

  8. ruby - 使写入文件线程安全 - 2

    我在一个ruby​​文件中有一个函数可以像这样写入一个文件File.open("myfile",'a'){|f|f.puts("#{sometext}")}这个函数在不同的线程中被调用,使得像上面这样的文件写入不是线程安全的。有谁知道如何以最简单的方式使这个文件写入线程安全?更多信息:如果重要的话,我正在使用rspec框架。 最佳答案 您可以通过File#flock给锁File.open("myfile",'a'){|f|f.flock(File::LOCK_EX)f.puts("#{sometext}")}

  9. java - java和ruby的主要区别和相似之处是什么? - 2

    关闭。这个问题需要更多focused.它目前不接受答案。想改进这个问题吗?更新问题,使其只关注一个问题editingthispost.关闭9年前。Improvethisquestion我现在是java专业人士,我喜欢使用ruby​​。这两种语言有什么相似之处吗?主要区别是什么?因为两者都是面向对象的。

  10. Ruby 线程与 Watir - 2

    我编写了几个类来控制我想如何处理多个网站,两者都使用类似的方法(即登录、刷新)。每个类都打开自己的WATIR浏览器实例。classSite1definitialize@ie=Watir::Browser.newenddeflogin@ie.goto"www.blah.com"endend无线程的main中的代码示例如下require'watir'require_relative'site1'agents=[]agents这工作正常,但在当前代理完成登录之前不会移动到下一个代理。我想合并多线程来处理这个问题,但似乎无法让它工作。require'watir'require_relative

随机推荐