Kotlin学习笔记29(完结篇) Flow part2 Flow的Buffer 中间操作符zip 打平 异常处理 Flow的完成 onCompletion的优势 onCompletion陷阱
参考链接
示例来自bilibili Kotlin语言深入解析 张龙老师的视频
1 Buffer 缓冲
/**
* Buffer 缓冲
* 这里没有使用缓冲
*/
private fun myMethod(): Flow<Int> = flow {
for (i in 1..4) {
delay(100)
emit(i)
}
}
fun main() = runBlocking {
val time = measureTimeMillis {
myMethod().collect { value ->
delay(200)
println(value)
}
}
println(time)
}
/*
很明显 这里运行至少要(100+200)*4ms
*/
class HelloKotlin16 {
}
/**
* Buffer 缓冲 注意对比Kotlin16
*
* buffer的主要作用是对发射的缓冲 减少发射部分的等待时间
*
* buffer与flowOn之间有一定的关系:
* 实际上,flowOn运算符本质上在遇到需要改变CoroutineDispatcher时 同样也会使用缓存机制
* 所以有时候flowOn可以理解为复杂版的buffer
*/
private fun myMethod(): Flow<Int> = flow {
for (i in 1..4) {
delay(100)
emit(i)
}
}
fun main() = runBlocking {
val time = measureTimeMillis {
myMethod().buffer()/*注意这里添加了缓冲*/.collect { value ->
delay(200)
println(value)
}
}
println(time)
}
/*
上一个示例运行至少要(100+200)*4ms=1200ms左右
这里可以节省3次发射需要的100 一共节省300ms左右
*/
class HelloKotlin17 {
}
2 flow的中间操作符zip
/**
* flow的中间操作符zip:
* Flow的组合
* 将两个Flow合并为一个Flow 如果两个flow的长度不一样 会取较短的那个
*/
fun main() = runBlocking {
val nums: Flow<Int> = (1..5).asFlow()
val strs: Flow<String> = flowOf("one", "tow", "three", "four", "five")
val flow2: Flow<String> = nums.zip(strs) { a, b -> "$a -> $b" }
flow2.collect { println(it) }
}
class HelloKotlin18 {
}
3 Flatten Flow
/**
* Flatten Flow
*
* 将Flow<Flow<Int>> ->Flow<Int>
*/
private fun myMethod(i: Int): Flow<String> = flow {
emit("first: $i")
delay(100)
emit("second: $i")
}
fun main() = runBlocking {
// 如下写法会生成 Flow<Flow<String>>
val startTime1 = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) }
.map { myMethod(it) }
.collect { value ->
// value实际还是一个Flow
println("$value at ${System.currentTimeMillis() - startTime1} ms")
}
println("============")
(1..3).asFlow().onEach { delay(100) }
.map { myMethod(it) }
.collect { value ->
// value实际还是一个Flow
//println("${value.collect { println(it) }} at ${System.currentTimeMillis() - startTime1} ms")
value.collect { println("$it at ${System.currentTimeMillis() - startTime1} ms") }
}
println("============")
val startTime2 = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) }
.flatMapConcat { myMethod(it) }// 打平Flow
.collect { value ->
println("$value at ${System.currentTimeMillis() - startTime2} ms")
}
println("============")
}
/*
输出
kotlinx.coroutines.flow.SafeFlow@64a294a6 at 131 ms
kotlinx.coroutines.flow.SafeFlow@7e0b37bc at 236 ms
kotlinx.coroutines.flow.SafeFlow@3b95a09c at 346 ms
============
first: 1 at 464 ms
second: 1 at 567 ms
first: 2 at 677 ms
second: 2 at 787 ms
first: 3 at 897 ms
second: 3 at 1005 ms
============
first: 1 at 109 ms
second: 1 at 218 ms
first: 2 at 327 ms
second: 2 at 435 ms
first: 3 at 547 ms
second: 3 at 654 ms
============
Process finished with exit code 0
*/
class HelloKotlin19 {
}
4 Flow异常处理
/**
* Flow异常处理
*
* try catch可以捕获Flow整个生命周期任意阶段的异常
* flow有3个阶段
*
* 本例子是流的终端操作阶段发生异常
*
* 1 流元素生成阶段 (例如下面的flow构建器的代码块中)
* 2 流的中间操作阶段
* 3 流的终端操作阶段
*/
private fun myMethod(): Flow<Int> = flow {
for (i in 1..3) {
println("emitting $i")
emit(i)
}
}
fun main() = runBlocking {
try {
myMethod().collect { value ->
println(value)
// 在发射2时发生异常 被捕获并停止之后的发射
// check函数判断条件不成立会抛出IllegalStateException 并将其内容也会作为异常的参数
// 流在终端操作 collect时发生异常
check(value <= 1) {
"Collected $value"
}
}
} catch (e: Throwable) {
println("caught $e")
}
}
/*
输出
emitting 1
1
emitting 2
2
caught java.lang.IllegalStateException: Collected 2
*/
class HelloKotlin20 {
}
/**
* 流异常
* 在中间操作时发生异常
*
*/
private fun myMethod(): Flow<String> =
flow {
for (i in 1..3) {
println("emitting $i")
emit(i)
}
}.map { value ->
// 在中间操作发生异常
check(value <= 1) { "crash on $value" }
"string $value"
}
fun main() = runBlocking {
try {
myMethod().collect {
println(it)
}
} catch (e: Throwable) {
println("caught $e")
}
}
/*
输出:
emitting 1
string 1
emitting 2
caught java.lang.IllegalStateException: crash on 2
*/
class HelloKotlin21 {
}
5 Flow的完成
/**
* Flow的完成
* 用于在Flow完成时再做一些处理
*
* Flow的完成有两种方式
* 1 命令式
* 2 声明式
*
* 这一节说命令式
* 命令式其实也没有什么好说的 就是在Flow终端操作外套一个try finally
*/
private fun myMethod(): Flow<Int> = (1..10).asFlow()
fun main() = runBlocking {
try {
myMethod().collect { println(it) }
} finally {
println("finally")
}
}
class HelloKotlin22 {
}
/**
* Flow的完成
* 第二种 声明式
* Flow提供了一个名为onCompletion的中间操作
* onCompletion会在Flow完成或抛出异常时调用
*
* 注意该方法虽然是中间操作 但是它的运行实际时间却是在collect这个终端操作之后(除了onCompletion 其他中间操作都是发生再终端操作之前的)
*/
private fun myMethod(): Flow<Int> = (1..10).asFlow()
fun main() = runBlocking {
myMethod().onCompletion { println("onCompletion") }.collect { println(it) }
}
/*
输出
1
2
3
4
5
6
7
8
9
10
onCompletion
Process finished with exit code 0
*/
class HelloKotlin23 {
}
6 onCompletion的优势
/**
* Flow的完成 onCompletion的优势
*
* onCompletion的优势在于他有一个可空的Throwable参数 通过判断该参数 可以知道
* Flow的完成是正常结束还是抛出异常导致结束
*/
private fun myMethod(): Flow<Int> = flow {
emit(1)
throw RuntimeException()//注意对比这句话注释之后的输出
}
fun main() = runBlocking {
myMethod().onCompletion { cause ->
if (cause != null) {
println("Flow completed exceptionally")
}
}
.catch { cause -> println("catch exception") }
.collect { println(it) }
}
class HelloKotlin24 {
}
7 onCompletion陷阱
/**
* onCompletion陷阱
*
* onCompletion只能看到来自Flow上游的异常(发生在onCompletion之前的中间操作符)
* 而看不到下游的异常(发生在onCompletion之后的中间操作符及终端操作)
*/
private fun myMethod(): Flow<Int> = (1..10).asFlow()
fun main() = runBlocking {
myMethod().onCompletion { cause -> println("Flow completed with $cause") }
.collect { value ->
// 这里异常检测在终端操作中
// 虽然onCompletion会在collect之后执行 但是由于onCompletion是中间操作符 collect属于 onCompletion的下游
// 它无法知道终端操作符的执行状态
// 因此"Collected $value"无法传递给 中间操作onCompletion; 这会导致onCompletion的参数cause为空
check(value <= 1) { "Collected $value" }
println(value)
}
}
/*
输出与视频中的输出不一样
我的输出
1
Flow completed with java.lang.IllegalStateException: Collected 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
at com.example.lib.coroutine5.HelloKotlin25Kt$main$1$invokeSuspend$$inlined$collect$1.emit(Collect.kt:133)
at kotlinx.coroutines.flow.FlowKt__BuildersKt$asFlow$$inlined$unsafeFlow$9.collect(SafeCollector.common.kt:115)
at kotlinx.coroutines.flow.FlowKt__EmittersKt$onCompletion$$inlined$unsafeFlow$1.collect(SafeCollector.common.kt:114)
at com.example.lib.coroutine5.HelloKotlin25Kt$main$1.invokeSuspend(HelloKotlin25.kt:38)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:56)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:84)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
at com.example.lib.coroutine5.HelloKotlin25Kt.main(HelloKotlin25.kt:27)
at com.example.lib.coroutine5.HelloKotlin25Kt.main(HelloKotlin25.kt)
Process finished with exit code 1
视频中的输出不是
Flow completed with java.lang.IllegalStateException: Collected 2
而是
Flow completed with null
不清楚是不是Kotlin内部有调整
*/
class HelloKotlin25 {
}