我是靠谱客的博主 激情春天,这篇文章主要介绍kotlin flow使用,现在分享给大家,希望可以做个参考。

【Kotlin Flow】 一眼看全——Flow操作符大全 - 掘金

Flow概览:

flow操作符可以将返回的数据进行加工处理,数据流包含三个实体:

  • 上游 -- 数据提供方:会生成添加到数据流中的数据。得益于协程,数据流还可以异步生成数据。
  • 中介(可选) -- 数据加工:可以修改发送到数据流的值,或修正数据流本身。
  • 下游 -- 数据使用方:则使用数据流中的值。

Flow流使用步骤:

1、创建流:flow { ... },flowOf{ ... }

2、使用操作符修改、加工流数据

3、发射流:collect

创建流如下几种方式:

  • flowOf(…)函数根据一组固定的值创建流。
  • asFlow()扩展函数可以将各种类型的函数转换为流。
  • flow {…}构建器函数,用于构造从顺序调用到发出函数的任意流。
  • channelFlow {…}构建器函数构造从潜在并发调用到send函数的任意流。
  • MutableStateFlow和MutableSharedFlow定义相应的构造函数创建一个热可直接更新流程。

举例:loadData方法模拟网络获取数据操作,返回类型修改为Flow<Int> ,并构造一个flow,在flow中 每隔一秒,发送一个数据用来模拟延迟获取值,代码如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
fun main() { runBlocking { loadData().collect { println(it) } } } fun loadData() = flow { for (i in 1..3) { delay(1000) emit(i) } }

运行结果即是,每隔1秒钟,打印出来一个数字,emit 方法用于发射值,collect方法是收集值,这里需要注意的是,我们可以看到 在main方法协程中,我们可以直接调用loadData的方法,这是因为flow构建块中的代码 就是一个suspend函数。这样一来我们就实现了对数据的逐步加载,而不需要等待所有的数据返回。

collect是末端操作符,如果我们没有调用flow的collect方法,其实不会进入flow的代码块中,也就是说 Flow中的代码直到被collect调用的时候才会运行,否则会立即返回

Flow操作符 

操作符官方文档:Flow 

类似RxJava,Flow中也有许多操作符,这里我们简单举几个例子: 

filter

通过filter 我们可以对结果集添加过滤条件,如下所示,我们仅打印出大于1的值

复制代码
1
2
3
4
5
6
7
8
9
10
fun main() { runBlocking { val flow = loadData() flow.filter { it > 1 }.collect { println(it) } } }

运行main 打印结果如下所示,将大于1的数据过滤掉:

复制代码
1
2
2 3

map

使用map我们可以将结果进行修改,代码如下所示:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
fun main() { runBlocking { val flow = loadData() flow.map { getNewData(it) }.collect { println(it) } } } fun getNewData(value: Int): String { return "new data: ${value + 1}" }

运行main 打印结果如下所示,将返回的数据加1后:

复制代码
1
2
3
new data: 2 new data: 3 new data: 4

所有的操作符都是可以一起使用的,并非只能单独使用,例如将上面两个操作符合起来如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
fun main() { runBlocking { val flow = loadData() flow.filter { it > 1 }.map { getNewData(it) }.collect { println(it) } } } fun loadData() = flow { for (i in 1..3) { delay(1000) emit(i) } } fun getNewData(value: Int): String { return "new data: ${value + 1}" }

运行main 打印结果如下所示,将大于1的数据过滤掉后,将返回的数据加1:

复制代码
1
2
new data: 3 new data: 4

flowOn

相比于 RxJava 需要使用 observeOn、subscribeOn 来切换线程,flow 会更加简单。只需使用 flowOn。

而 collect() 指定哪个线程,则需要看整个 flow 处于哪个 CoroutineScope 下。

Flow的代码块是执行在执行时的上下文中,我们不能在flow中指定线程来运行Flow代码中的代码,

为了预防开发过程中的错误默认流构建器实施了约束,所有流实现都应仅从同一协程发出:

复制代码
1
2
3
4
5
6
7
8
9
val myFlow = flow { // GlobalScope.launch { // 禁止 // launch(Dispatchers.IO) { // 禁止 // withContext(CoroutineName("myFlow")) // 禁止 emit(1) // OK coroutineScope { emit(2) // OK -- still the same coroutine } }

如果在flow中开启另外的协程,则会报如下异常: 

​​

那么我们如何指定Flow代码块中的上下文呢,我们需要使用flowOn操作符,我们将Flow代码块中的代码指定在IO线程中,代码如下所示:

复制代码
1
2
3
4
5
6
fun loadData() = flow { for (i in 1..3) { delay(1000) emit(i) } }.flowOn(Dispatchers.IO)

异常透明

流实现永远不会捕获或处理下游流中发生的异常。从实现的角度来看,这意味着永远不要将对emit和EmittAll的调用包装为 try { ... } catch { ... }块。流中的异常处理应由catch运算符执行, 并且旨在仅捕获来自上游流的异常,同时传递所有下游异常。同样,终端操作员如collect会 抛出在其代码或上游流中发生的任何未处理的异常,例如:

复制代码
1
2
3
4
5
flow { emitData() } .map { computeOne(it) } .catch { ... } // 在emitData和computeOne中捕获异常 .map { computeTwo(it) } .collect { process(it) } // 从process和computeTwo中抛出异常

zip操作符

zip操作符,可以合并两个flow,代码如下所示:

复制代码
1
2
3
4
5
6
7
8
9
fun zip() { runBlocking { val flow = flowOf(1, 2, 3).onEach { delay(1000) } val flow2 = flowOf("a", "b", "c", "d").onEach { delay(15) } flow.zip(flow2) { i, s -> i.toString() + s }.collect { println(it) // Will print "1a 2b 3c" } } }

运行main 打印结果如下所示,其中flow只有三个数字,1秒返回,flow2有四个数值15毫秒返回,

一旦其中一个流完成,结果流就完成,并在剩余流上调用cancel。本例中,flow结束后没有打印flow2中的d

复制代码
1
2
3
1a 2b 3c

combine操作符

combine操作符可以合并两个flow,并最终返回合并的Flow,其值是通过组合每个流最近发射的值并使用变换函数生成的,代码如下所示:

复制代码
1
2
3
4
5
6
7
8
9
fun combine() { runBlocking { val flow = flowOf(1, 2, 3).onEach { delay(500) } val flow2 = flowOf("a", "b", "c").onEach { delay(15) } flow.combine(flow2) { i, s -> i.toString() + s }.collect { println(it) // Will print "1a 2a 2b 2c" } } }

打印结果如下:

复制代码
1
2
3
1c 2c 3c

这里需要注意,最终合并的流的结果是每个流最近的发设值,通过改变不同流中的delay值可以更深入的理解。上例中flow2间隔15毫秒发射一个值,而flow需要500毫秒,那么第45毫秒时flow2已经发射到了第三个值是c,此时combine的第一次合并需要等到flow发射第一个值500毫秒时合并为1c,1秒时合并为2c,1500毫秒时为3c。

如果将flow中的间隔改为10,那么结果就完全不同了:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
fun combine() { runBlocking { val flow = flowOf(1, 2, 3).onEach { delay(10) } val flow2 = flowOf("a", "b", "c").onEach { delay(15) } flow.combine(flow2) { i, s -> i.toString() + s }.collect { println(it) // Will print "1a 2a 2b 2c" } } } 打印结果: 1a 2a 2b 3b 3c

前20毫秒,flow发射了1、2,flow2发射了a,因此打印出1a、2a

20到45毫秒之间,flow发射了到了2、3,flow2发射了b,因此打印出2b、3b

45毫秒之后,flow最终仍然是3,flow2发射到了c,因此打印出3c

conflate操作符

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fun conflate() { runBlocking { val useTime = measureTimeMillis { val flow = flow { for (i in 1..30) { delay(100) emit(i) } } val result = flow.conflate().onEach { delay(1100) } println(result.toList()) } println(useTime) } }

发射器每个100ms发射一个元素,而接收器在每1100ms时才接受当时的最新值,打印结果:[1, 11, 22, 30]

若将接收器接收间隔改为1000,那么打印结果为:[1, 10, 20, 30]

debounce操作符

过滤掉给定timeout内的值,始终会发出最新值。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
fun debounce() { runBlocking { flow { emit(1) delay(90) emit(2) delay(90) emit(3) delay(1010) emit(4) delay(10) emit(5) delay(100) emit(6) delay(1000) emit(7) }.debounce(1000).collect{ print(it) } } } //打印结果:367

最后

以上就是激情春天最近收集整理的关于kotlin flow使用的全部内容,更多相关kotlin内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(69)

评论列表共有 0 条评论

立即
投稿
返回
顶部