Kotlin 之 Flow

Flow 库是在 Kotlin Coroutines 1.3.2 发布之后新增的库,也叫做异步流,类似 RxJava 的 Observable、 Flowable 等等,所以很多人都用 Flow 与 RxJava 做对比。而 Flow 则比 RxJava 简单很多。

我们知道 MVVM 中 LiveData 是一个生命周期感知组件,最好在 View 和 ViewModel 层中使用它,如果在 Repositories 或者 DataSource 中使用会有这两个主要的问题:不支持线程切换,其次不支持背压。而 Flow 正好是为解决此问题的。

RxJava 与 Flow 比较

RxJava模式 

    Flowable.create<Int>({
        for (index in 1..10) {
            println(index)
            it.onNext(index)
        }
        it.onComplete()
    }, BackpressureStrategy.BUFFER)
        .map {
            return@map it * it
        }
        .filter {
            return@filter it > 5
        }
        .toList()
        .subscribe({
            println(it)
        }, {
            println(it)
        })

Flow 模式

    try {
        flow {
            for (index in 1..10) {
                println(index)
                emit(index)
            }
        }.map {
            it * it
        }
            .filter {
                it > 5
            }
            .toList()
            .apply {
                println(this)
            }
    } catch (e: Exception) {
        println(e)
    }

通过 RxJava 与 Flow 的两个实现,可以看到代码简化程度不相伯仲。RxJava 入门的门槛很高,学习成本较高。Flow 则较为简单,没有 RxJava 中那么多的操作符,设计初衷就是解决单一问题。下面列出 Flow 的优点:

Flow 支持线程切换、背压

Flow 入门的门槛很低,没有那么多傻傻分不清楚的操作符

简单的数据转换与操作符,如 map 等等

Flow 是对 Kotlin 协程的扩展,让我们可以像运行同步代码一样运行异步代码,使得代码更加简洁,提高了代码的可读性

易于做单元测试

 

Flow

下面正式说下 Flow 的使用

Flow 是非阻塞的,以挂起的方式执行,只有遇到末端操作符,才会触发所有操作的执行,所有操作都在相同的代码块内顺序执行,发射出来的值都是顺序执行的,只有在某一时刻结束(遇到末端操作符或者出现异常)。

其中中间操作符与末端操作符列出

中间操作符 map、filter、take、zip 等,它们最后都是通过 emit 来发射数据。

public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value ->
   return@transform emit(transform(value))
}

public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
    if (predicate(value)) return@transform emit(value)
}

末端操作符 :collect、collectLatest、single、reducetoCollection、toList 等,最后都是调用 collect 。

public suspend fun <T> Flow<T>.toList(destination: MutableList<T> = ArrayList()): List<T> = toCollection(destination)

public suspend fun <T, C : MutableCollection<in T>> Flow<T>.toCollection(destination: C): C {
    collect { value ->
        destination.add(value)
    }
    return destination
}

遇到中间操作符,并不会执行任何操作,也不会挂起函数本身,这些操作符构建了一个待执行的调用链。遇到末端操作符会触发所有操作的执行。

 

Flow 只有遇到末端操作符才会触发所有操作的执行,所以 Flow 也被称为冷数据流。下面看下热数据流 Channel 。

Channel

Channel 是非阻塞的,它用于发送方 (SendChannel) 和接收方 (ReceiveChannel) 之间通信,它实现了 SendChannel 和 ReceiveChannel 接口,所以既可以发送数据又可以接受数据,发送方 (SendChannel) 和接收方 (ReceiveChannel) 之间通过缓冲区进行同步的。

Channel 包含发送与接受

发送

send() 如果缓冲区没有满,则立即添加元素, 如果缓冲区满了调用者会被挂起,send() 方法是一个挂起函数,用于同步发送方和接收方的一种机制;

offer() 如果缓冲区存在并且没有满立即向缓冲区添加一个元素,添加成功会返回 true, 失败会返回 false。

接受

receive() 异步获取元素,如果缓冲区是空时调用者会被挂起,直到一个新值被发送到缓冲区,receive() 方法是一个挂起函数,用于同步发送方和接收方的一种机制。

poll() 用于同步获取一个元素,如果缓冲区是空的,则返回 null。

 

Channel 有四种类型

RendezvousChannel :这是默认的类型,大小为 0 的缓冲区,只有当 send() 方法和 receive() 方法都调用的时候,元素才会从发送方传输到接收方,否则将会被挂起。

LinkedListChannel :通过 Channel.Factory.CONFLATED 会创建一个容量无限的缓冲区 (受限于内存的大小) ,send() 方法远不会挂起,offer() 方法始终返回 true。

ConflatedChannel :最多缓冲一个元素,新元素会覆盖掉旧元素,只会接收最后发送的元素,之前的元素都会丢失,send() 方法永远不会挂起,offer() 方法始终返回 true。

ArrayChannel :通过 Channel.Factory.BUFFERED 或者 指定大小 会创建一个固定容量的数组缓冲区,send() 方法仅在缓冲区满时挂起,receive() 方法仅在缓冲区为空时挂起。

通过源码可以看到

public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
    when (capacity) {
        RENDEZVOUS -> RendezvousChannel()
        UNLIMITED -> LinkedListChannel()
        CONFLATED -> ConflatedChannel()
        BUFFERED -> ArrayChannel(CHANNEL_DEFAULT_CAPACITY)
        else -> ArrayChannel(capacity)
    }

它的使用方法比较简单,类似观察者方法,这里就不贴代码了。

 

Channel 的所发送的数据只能被一个消费者消费,而如果需要一对多的话那就需要 BroadcastChannel ,它会像我们平时使用广播一样进行分发给所有订阅者。另外需要注意的是,BroadcastChannel 不支持 RENDEZVOUS。

BroadcastChannel

BroadcastChannel 是非阻塞的,它用于发送方 (SendChannel) 和接收方 (ReceiveChannel) 之间通信,实现了 SendChannel 接口,所以只可以发送数据;提供了 openSubscription 方法,会返回一个新的 ReceiveChannel,可以从缓冲区获取数据。

它的子类有 ArrayBroadcastChannel、BroadcastCoroutine、ConflatedBroadcastChannel、LazyBroadcastCoroutine 四种。

使用方法

    runBlocking {
        val broadcastChannel = GlobalScope.broadcast {
            for (i in 0..3) {
                send(i)
            }
        }
        List(2) { index ->
            GlobalScope.launch {
                for (i in broadcastChannel.openSubscription()) {
                    println(" 协程$index 接收 $i")
                }
            }

        }.joinAll()
    }

 

今日这篇大致完成,下面记录 kotlin 较复杂的一些函数:

debounce 防抖动函数,指定时间内只会发送最后一个字符串。

        flow {
            emit(1)
            emit(2)
            emit(3)
            delay(90)
            emit(4)
            emit(5)
        }
            .debounce(200)
            .toList()
            .apply {
                println(this)//[5]
            }

flatMapLatest 一定时间内获取最新的数据

        flow {
            emit(1)
            emit(2)
            emit(3)
        }
            .flatMapLatest {
                flow {
                    delay(100)
                    emit(it)
                }
            }
            .toList()
            .apply {
                print(this)//[3]
            }

distinctUntilChanged 过滤掉重复的请求

        flow {
            emit(1)
            emit(1)
            emit(1)
            emit(1)
            emit(2)
            emit(2)
            emit(2)
            emit(2)
            emit(2)
        }
            .distinctUntilChanged()
            .toList()
            .apply {
                println(this)//[1, 2]
            }

这些方法的源码这里就不多解释了,注意要跟一下源码,了解下如何实现的。