Kotlin 中的协程 flow

2023-09-14 15:49:08

一、Flow概述

Flow 具有异步挂起 suspend 响应式编程,可以使用挂起函数来异步生产和消费事件,Flow 的设计灵感也来源于响应式流以及其各种实现。

二、Flow 的生产和消费

suspend fun test1() {
    flow<Int> {
        (0..4).forEach {
            emit(it)//生产者发送数据
        }
    }.collect {
        println(it)
    }
}

flow {} 函数创建了一个冷数据流 Flow ,通过 emit 来发射数据,然后通过 collect 函数来收集这些数据。但是因为 collect 是挂起函数,挂起函数的调用又必须在另一个挂起函数或者协程作用域中。此时就需要我们使用协程来执行。

fun main() {
    runBlocking {
        test1()
    }
}

三、Flow线程切换:FlowOn

     findViewById<TextView>(R.id.textView).setOnClickListener() {
       lifecycleScope.launch {
         flow1()
       }
    }

    private suspend fun flow1() {
        flow<Int> {
            (0..4).forEach {
                Log.i("TAG", "flow:${currentCoroutineContext()}")
                emit(it)//生产者发送数据
            }
        }.collect {
            Log.i("TAG", "collect:${currentCoroutineContext()} it:$it")
        }
    }
lifecycleScope.launch 默认是主线程执行的,按照协程的执行原理,我们可以确定上面例子中所有的执行操作都是在主线程上:

flow:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate]
collect:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate] it:0
flow:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate]
collect:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate] it:1
flow:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate]
collect:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate] it:2
flow:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate]
collect:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate] it:3
flow:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate]
collect:[StandaloneCoroutine{Active}@9064722, Dispatchers.Main.immediate] it:4

当我们调用 flowOn切换线程时

    private suspend fun flow1() {
        flow<Int> {
            (0..2).forEach {
                Log.i("TAG", "flow:${currentCoroutineContext()}")
                emit(it)//生产者发送数据
            }
        }.flowOn(Dispatchers.IO)
            .collect {
                Log.i("TAG", "collect:${currentCoroutineContext()} it:$it")
            }
    }

可以看到 flow 代码块中的执行已经切换到另外一个线程执行。但是 collect 中的代码依然执行在主线程上。

输出: 

flow:[ProducerCoroutine{Active}@9064722, Dispatchers.IO]
flow:[ProducerCoroutine{Active}@9064722, Dispatchers.IO]
collect:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate] it:0
collect:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate] it:1
collect:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate] it:2

增加map,看看

    private suspend fun flow1() {
        flow<Int> {
            (0..2).forEach {
                Log.i("TAG", "flow:${currentCoroutineContext()}")
                emit(it)//生产者发送数据
            }
        }.flowOn(Dispatchers.IO)
            .map {
                Log.i("TAG", "map:${currentCoroutineContext()}")
                it
            }
            .collect {
                Log.i("TAG", "collect:${currentCoroutineContext()} it:$it")
            }
    }

输出:

flow:[ProducerCoroutine{Active}@9064722, Dispatchers.IO]
flow:[ProducerCoroutine{Active}@9064722, Dispatchers.IO]
map:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate]
collect:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate] it:0
map:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate]
collect:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate] it:1
map:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate]
collect:[ScopeCoroutine{Active}@7a4d5b3, Dispatchers.Main.immediate] it:2

 

总结:

1、flowOn 可以将执行此流的上下文更改为指定的上下文。

2、flowOn可以进行组合使用。

3、flowOn只影响前面没有自己上下文的操作符。已经有上下文的操作符不受后面 flowOn影响。

4、不管 flowOn 如何切换线程, collect 始终是运行在调用它的协程调度器上。

四、操作符

1、过度操作符/流程操作符:onStart -> onEach -> onCompletion

1))onStart: 在上游流启动之前被调用

2)onEach:在上游流的每个值被下游发出之前调用。

3)onCompletion:在流程完成或者取消后调用,并将取消异常或失败作为操作的原因参数传递。

    private suspend fun flow2(){
        flow {
            Log.d("TAG","flow")
            emit(1)
        }.onStart {
            Log.d("TAG","onStart")
        }.onEach {
            Log.d("TAG","onEach")
        }.onCompletion {
            Log.d("TAG","onCompletion")
        }.collect {
            Log.d("TAG","collect")
        }
    }

输出:

onStart
flow
onEach
collect
onCompletion

2、异常操作符

1)catch

    private suspend fun flow3() {
        flow {
            Log.d("TAG", "flow")
            emit(1)
            throw NullPointerException("空指针")
        }.onStart {
            Log.d("TAG", "onStart")
        }.onEach {
            Log.d("TAG", "onEach")
        }.catch {
            Log.e("TAG", "catch $it")
        }.onCompletion {
            Log.d("TAG", "onCompletion")
        }.collect {
            Log.d("TAG", "collect")
        }
    }

输出:

onStart
flow
onEach
collect

onCompletion
catch java.lang.NullPointerException: 空指针 

2)retry

 

    private fun flow13() {
        var index = 0
        lifecycleScope.launch {
            flow {
                if (index < 2) {
                    index++
                    Log.e("TAG", "出现错误:$index")
                    throw RuntimeException("runtime exception index $index")
                }
                emit(100)
            }.retry(2).catch {
                Log.e("TAG", "catch: $it")
            }.collect {
                Log.d("TAG", "collect $it")
            }
        }
    }

输出

出现错误:1
出现错误:2
collect 100

3)retryWhen

    private fun flow14() {
        var index = 0
        lifecycleScope.launch {
            flow {
                if (index < 2) {
                    index++
                    Log.e("TAG", "出现错误:$index")
                    throw RuntimeException("runtime exception index $index")
                }
                emit(100)
            }.retryWhen{ cause, attempt ->
                Log.e("TAG","cause is $cause,attempt is $attempt")
                cause is RuntimeException
            }.catch {
                Log.e("TAG", "catch: $it")
            }.collect {
                Log.d("TAG", "collect $it")
            }
        }
    }

 出现错误:1
cause is java.lang.RuntimeException: runtime exception index 1,attempt is 0
出现错误:2
cause is java.lang.RuntimeException: runtime exception index 2,attempt is 1
collect 100

3、转换操作符

1)transform

    private fun flow4() {
        lifecycleScope.launch {
            (1..3).asFlow().transform {
                emit(it)
                emit("transform $it")
            }.collect {
                println("collect: $it")
            }
        }
    }

transfrom 操作符任意值任意此,其他转换操作符都是基于 transform 进行扩展。比如:可以在执行长时间运行的异步请求之前,发射一个字符串并跟踪这个响应。

输出:

 collect: 1
collect: transform 1
collect: 2
collect: transform 2
collect: 3
collect: transform 3

2)map  

数据转换操作符

    private fun flow5() {
        lifecycleScope.launch {
            flow {
                emit(1)
            }.map {
                Log.d("TAG", "第一次转换")
                it * 5
            }.map {
                Log.d("TAG", "第二次转换")
                "map $it"
            }.collect {
                Log.d("TAG", "最终转换后值:$it")
            }
        }
    }

 输出:

第一次转换
第二次转换
最终转换后值:map 5

3)fliter

fliter 操作符主要是对数据进行一个过滤,返回仅包含与给定匹配的原始流的值的流。

fliter 还有很多同类型操作符,如:filterNot / filterIsInstance / filterNotNull

    private fun flow6() {
        lifecycleScope.launch {
            (1..3).asFlow().filter {
                it < 2
            }.collect {
                println("it:$it")
            }
        }
    }

输出:

 it:1

4)zip

zip 操作符用于组合两个流中的相关值,与 RxJava 中的 zip 功能一样;

    private fun flow7() {
        val flow1 = (1..3).asFlow()
        val flow2 = flowOf("one", "two", "three")
        lifecycleScope.launch {
            flow2.zip(flow1) { value1, value2 ->
                "$value1:$value2"
            }.collect {
                Log.d("TAG", "collect:$it")
            }
        }
    }

输出:

collect:one:1
collect:two:2
collect:three:3

4、限制操作符

1)take 

take 操作符返回包含第一个计数元素的流,当发射次数大于等于 count 的值时,通过抛出异常来取消执行。

    private fun flow8() {
        lifecycleScope.launch {
            (1..3).asFlow().take(2)
                .collect {
                    Log.d("TAG", "it:$it")
                }
        }
    }

 输出:

it:1
it:2

2)takeWhile

takeWhile 操作符与 filter 类似,不过他是当遇到条件判断为 false 的时候,将会中断后续的操作。

    private fun flow9() {
        lifecycleScope.launch {
            flowOf(1, 1, 2, 3, 1, 4).map {
                delay(100)
                it
            }.takeWhile {
                it == 1
            }.collect {
                Log.d("TAG", "it:$it")
            }
        }
    }

输出:

it:1
it:1 

3)drop

drop 操作符与 take 相反,它是丢弃掉指定的 count 数量后执行后续的流。

    private fun flow10() {
        lifecycleScope.launch {
            (1..3).asFlow().drop(2)
                .collect {
                    Log.d("TAG", "it:$it")
                }
        }
    }

 输出:

it:3

5、末端流操作符

 collect 是最基础的末端操作符,基本上每一个例子当中我们都是使用 collect。

1)toList

toList 操作符是将我们的流转换成一个List集合

    private fun flow11() {
        lifecycleScope.launch {
            val list = (1..5).asFlow().toList()
            Log.d("TAG", "toList:$list")
        }
    }

 输出:

toList:[1, 2, 3, 4, 5]

6、Flow的缓冲

    private fun flow12() {
        lifecycleScope.launch {
            val time = measureTimeMillis {
                (1..3).asFlow().map {
                    delay(100)
                    it
                }.buffer().collect {
                    delay(300)
                    Log.d("TAG", "it:$it")
                }
            }
            Log.d("TAG","collected in $time ms")
        }
    }

输出: 

it:1
it:2
it:3
collected in 1060 ms 

参考:

Kotlin协程之Flow使用 - 掘金 

更多推荐

【送书】实现可观测性平台的技术要点是什么?

文章目录实现可观测性平台的技术要点是什么?兼容全域信号量所谓全域信号量有哪些?统一采集和上传工具统一的存储后台自由探索和综合使用数据总结实现可观测性平台的技术要点是什么?随着可观测性理念的深入人心,可观测性平台已经开始进入了落地阶段,它的先进性已经毋庸置疑;而另外一只靴子:它如何以一个统一融合的平台在企业中生根发芽?可

5个免费的3D钣金CAD软件

作为一名咨询顾问,我一直在寻找能够满足大客户需求的最佳CAD软件。但我知道并不是每个人都在寻找劳斯莱斯式的钣金设计解决方案。有时你只想要一些简单的东西,而且最好是免费的。例如,如果你正在设计简单的折叠钣金零件,则只需设计一些具有圆角半径的法兰:一个简单的钣金模块。推荐:用NSDT编辑器快速搭建可编程3D场景首先,你可以

CNC 3D浮雕 Aspire 11.55 Crack

Aspire提供了功能强大且直观的软件解决方案,用于在CNC铣床上创建和切割零件。有用于2D设计和计算2D刀具路径的工具,例如仿形、型腔加工和钻孔以及2.5D刀具路径,包括:V形雕刻、棱镜雕刻、成型刀具路径、凹槽、倒角刀具路径,甚至装饰纹理策略。Aspire成为我们的高级套件的原因在于添加了3D设计工具,使您能够创建自

西门子 S7 协议解析

目录1建立连接2读数据3写数据1建立连接0300001611E00000000100C1021000C2020301C0010A(第一次握手报文)0300报文头0016数据总长度:2211E00000000100C1021000C2020301C0010A报文结束0300001611D00001001100C0010A

shell脚本相关基础操作汇总

汇总起来,备忘查看。目录1,脚本的参数传递与接收2,判断是否有传入的参数、获得参数个数3,获取传入的全部参数、遍历全部参数4,获取当前脚本对应的进程pid5,判断目录、文件等是否存在6,判断变量是否为空7,判断上一个命令是否执行正常8,获取命令的结果9,设置/删除全局环境变量设置环境变量将命令执行结果设置为环境变量设置

积木报表 JimuReport v1.6.2-GA5版本发布—高危SQL漏洞安全加固版本

项目介绍一款免费的数据可视化报表,含报表和大屏设计,像搭建积木一样在线设计报表!功能涵盖,数据报表、打印设计、图表报表、大屏设计等!Web版报表设计器,类似于excel操作风格,通过拖拽完成报表设计。秉承“简单、易用、专业”的产品理念,极大的降低报表开发难度、缩短开发周期、节省成本、解决各类报表难题。领先的企业级Web

乐鑫科技全球首批支持蓝牙 Mesh Protocol 1.1 协议

乐鑫科技(688018.SH)非常高兴地宣布,其自研的蓝牙Mesh协议栈ESP-BLE-MESH现已支持最新蓝牙MeshProtocol1.1协议的全部功能,成为全球首批在蓝牙技术联盟(BluetoothSIG)正式发布该协议之前支持该更新的公司之一。这意味着乐鑫在低功耗蓝牙无线通信领域潜心专研产品和方案,其技术实力和

BLE Mesh蓝牙mesh传输大数据包传输文件照片等大数据量通讯

1、BLEMesh数据传输现状BLEMesh网络技术是低功耗蓝牙的一个进阶版,Mesh扩大了蓝牙在应用中的规模和范围,因为它同时支持超过三万个网络节点,可以跨越大型建筑物,不仅可以使得医疗健康应用更加方便快捷,还能监测像学校这类的大型公共场所随时监测学生的安全状况。BLEMesh覆盖范围通过mesh网络的relay(中

接口自动化测试(Python+Requests+Unittest)

(1)接口自动化测试的意义、前后端分离思想接口自动化测试的优缺点:优点:测试复用性。维护成本相对UI自动化低一些。为什么UI自动化维护成本更高?因为前端页面变化太快,而且UI自动化比较耗时(比如等待页面元素的加载、添加等待时间、定位元素、操作元素、模拟页面动作这些都需要时间)为什么接口自动化维护成本较低?因为接口较稳定

nova相机功能又㕛叒叕升级了!!!拍人像更自然

nova系列手机一直以其高颜值外观和性能体验,持续热销,成为当下年轻人追捧的手机之一。其出色的影像能力,无论是日常生活中的风景拍摄还是人物拍摄,都能够拍摄出非常清晰细腻的照片,同时还配备了多种摄影模式,让用户能够拍摄出更加专业和有创意的照片。而关于人像拍摄,广大用户如今更青睐于相对原生、人物真实且细节饱满的人像特写相片

棒球教学知识架构·棒球1号位

棒球教学知识架构1.棒球运动的基本认知棒球运动的起源和发展历程棒球运动起源于19世纪中叶的美国,最初是一种儿童游戏,使用木棒和石头或木头制成的球进行比赛。后来,人们开始使用橡胶球和棒子,并规定了比赛规则和场地标准,棒球运动逐渐发展成为一项正式的体育运动。随着时间的推移,棒球运动在美国和加拿大广受欢迎,并逐渐传播到世界各

热文推荐