概述
开篇
flow api 已经慢慢被谷歌列为数据流的首选,可以见到官网的数据流篇都慢慢偏向于flow api,虽然LiveData等数据流类型已经深入开发者观念中,但是flow api也正慢慢的崛起出自己的市场。本篇讲的StateFlow是flow api中的一个更偏向于应用层的api,功能也非常和LiveData相似,那么为什么要出一个和LiveData类似的东西的,因为LiveData天生就引入了生命周期相关的概念,从设计角度出发,其实是耦合了生命周期这部分,所以现在才另起炉灶,出现了StateFlow。
接口部分
public interface StateFlow<out T> : SharedFlow<T> {
/**
* The current value of this state flow.
*/
public val value: T
}
可以看到StateFlow在SharedFlow上层上添加了一个属性,就是value值,可以被认为当前是当前可观测的值,跟LiveData的value类似。StateFlow更加推崇的是单体状态,所以区别于一般的flowapi(主要是数据流状态),它的实现禁止了一个重置操作。
StateFlowImpl 中
@Suppress("UNCHECKED_CAST")
override fun resetReplayCache() {
throw UnsupportedOperationException("MutableStateFlow.resetReplayCache is not supported")
}
还有就是一般的flow属于冷流,冷流这个概念就不再赘述,而flow之所以是冷流,是因为只有在collect的时候间接通过flowcollecter的emit方法去产生数据,本质上数据的改变依赖收集者,所以才是冷流,具体分析可以下看上一篇文章 juejin.cn/post/705860…
而StateFlow继承于SharedFlow,并且value数值是不依赖于collect方法改变,简单来说,就是收集者可以改变value数值,但是StateFlow中的value改变并不是只有这一种手段【注意这里概念】,它还可以直接对value数据改变,如下面例子
class CounterModel {
private val _counter = MutableStateFlow(0) // private mutable state flow
val counter = _counter.asStateFlow() // publicly exposed as read-only state flow
fun inc() {
_counter.value++
}
}
所以按照数据划分,它就可以属于热流,当然冷流热流本质是一个概念,便于区分即可。
发送者部分
为了方便分析,我们将数据改变部分称为发送者部分,每个对value进行set操作时都会进入下面方法。
private fun updateState(expectedState: Any?, newState: Any): Boolean {
var curSequence = 0
var curSlots: Array<StateFlowSlot?>? = this.slots // benign race, we will not use it
synchronized(this) {
val oldState = _state.value
if (expectedState != null && oldState != expectedState) return false // CAS support
if (oldState == newState) return true // Don't do anything if value is not changing, but CAS -> true
_state.value = newState
curSequence = sequence
if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
curSequence++ // make it odd
sequence = curSequence
} else {
// update is already in process, notify it, and return
sequence = curSequence + 2 // change sequence to notify, keep it odd
return true // updated
}
curSlots = slots // read current reference to collectors under lock
}
/*
Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines.
Loop until we're done firing all the changes. This is a sort of simple flat combining that
ensures sequential firing of concurrent updates and avoids the storm of collector resumes
when updates happen concurrently from many threads.
*/
while (true) {
// Benign race on element read from array
curSlots?.forEach {
it?.makePending()
}
// check if the value was updated again while we were updating the old one
synchronized(this) {
if (sequence == curSequence) { // nothing changed, we are done
sequence = curSequence + 1 // make sequence even again
return true // done, updated
}
// reread everything for the next loop under the lock
curSequence = sequence
curSlots = slots
}
}
}
其实一段代码下来,除了数据维护部分,就是做了这么一件事,更新value的数据值,并记录当前数值是否是最新数值。和livedata的更新数据部分类似,本质都是维护一个计数器sequence,用来区分当前的value是否是最新。slots用来(下文会讲)记录数据更新状态,总的来说就是当数值发生改变的时候更新数据。
订阅者部分
当我们调用collect的时候,就会进行数据获取,也就是调用接收的那一部分。
override suspend fun collect(collector: FlowCollector<T>) {
val slot = allocateSlot()
try {
if (collector is SubscribedFlowCollector) collector.onSubscription()
val collectorJob = currentCoroutineContext()[Job]
var oldState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
// The loop is arranged so that it starts delivering current value without waiting first
while (true) {
// Here the coroutine could have waited for a while to be dispatched,
// so we use the most recent state here to ensure the best possible conflation of stale values
val newState = _state.value
// always check for cancellation
collectorJob?.ensureActive()
// Conflate value emissions using equality
if (oldState == null || oldState != newState) {
collector.emit(NULL.unbox(newState))
oldState = newState
}
// Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
if (!slot.takePending()) { // try fast-path without suspending first
slot.awaitPending() // only suspend for new values when needed
}
}
} finally {
freeSlot(slot)
}
}
由于collect在协程当中,所以要时刻关注协程的状态collectorJob?.ensureActive()就是之一,那么什么时候才会正在调用我们collect里的自定义逻辑呢?换个说法来说就是什么时候像LiveData一样收到回调呢? 注意一下上面的这里
if (oldState == null || oldState != newState) {
collector.emit(NULL.unbox(newState))
oldState = newState
}
和livedata不一样的是,livedata每次设定value都会收到回调,而flow是在新旧状态不一样时,才会进行 collector.emit(NULL.unbox(newState)),进而触发收集。所以当value被多次设定同一个值的时候,LiveData会回调多次,而StateFlow只会调用一次。
关于StateFlowSlot
可以看到无论上面的发送部分或者接收部分都有StateFlowSlot类的影子, 比如
var curSlots: Array<StateFlowSlot?>? = this.slots /
里面维护着一个Array,那么这个是有什么用处呢? 我们知道flow是运行在协程里面的,简单来说协程本质就是状态机各种回调,那么基于这种环境下,在多线程或者多子协程中,为了维护State的正确更改,里面也要设定自己的状态机进行标识状态的正确改变。
private class StateFlowSlot : AbstractSharedFlowSlot<StateFlowImpl<*>>() {
private val _state = atomic<Any?>(null)
override fun allocateLocked(flow: StateFlowImpl<*>): Boolean {
// No need for atomic check & update here, since allocated happens under StateFlow lock
if (_state.value != null) return false // not free
_state.value = NONE // allocated
return true
}
override fun freeLocked(flow: StateFlowImpl<*>): Array<Continuation<Unit>?> {
_state.value = null // free now
return EMPTY_RESUMES // nothing more to do
}
@Suppress("UNCHECKED_CAST")
fun makePending() {
_state.loop { state ->
when {
state == null -> return // this slot is free - skip it
state === PENDING -> return // already pending, nothing to do
state === NONE -> { // mark as pending
if (_state.compareAndSet(state, PENDING)) return
}
else -> { // must be a suspend continuation state
// we must still use CAS here since continuation may get cancelled and free the slot at any time
if (_state.compareAndSet(state, NONE)) {
(state as CancellableContinuationImpl<Unit>).resume(Unit)
return
}
}
}
}
}
fun takePending(): Boolean = _state.getAndSet(NONE)!!.let { state ->
assert { state !is CancellableContinuationImpl<*> }
return state === PENDING
}
@Suppress("UNCHECKED_CAST")
suspend fun awaitPending(): Unit = suspendCancellableCoroutine sc@ { cont ->
assert { _state.value !is CancellableContinuationImpl<*> } // can be NONE or PENDING
if (_state.compareAndSet(NONE, cont)) return@sc // installed continuation, waiting for pending
// CAS failed -- the only possible reason is that it is already in pending state now
assert { _state.value === PENDING }
cont.resume(Unit)
}
}
一个StateFlowSlot里面有以下状态: 一般情况下有如下状态机切换
null
NONE
PENDING
进行新value设定
最终调用的是CancellableContinuationImpl 里面的执行,这里没什么好讲的,就是利用cas机制确保状态机的切换 值得注意的是,第一个null 和NONE的区别,null代表的是初始状态,设置null是为了避免新new一个StateFlowSlot 和对StateFlowSlot同时进行makePending 导致状态机混乱,所以才多加了一个初始状态null,简单表示StateFlowSlot刚刚创建,还没来的及去进入状态机更改,而NONE代表着可以进入状态机更改。
总结
到这里StateFlow已经是基本讲解完毕了,有理解错误的地方还望指正,最好可以关注一下往期的解析: 利用flow api打造一个总线型库 juejin.cn/post/705860…
github地址:github.com/TestPlanB/p… 欢迎star or pr
最后
以上就是唠叨黑米为你收集整理的深入理解StateFlow开篇接口部分发送者部分订阅者部分关于StateFlowSlot总结的全部内容,希望文章能够帮你解决深入理解StateFlow开篇接口部分发送者部分订阅者部分关于StateFlowSlot总结所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复