StateFlow 和 SharedFlow

StateFlowSharedFlowFlow API,它们使流能够以最佳方式发出状态更新,并向多个消费者发出值。

StateFlow

StateFlow 是一种状态持有者可观测流,它会向其收集器发出当前和新的状态更新。当前状态值也可以通过其 value 属性读取。要更新状态并将其发送到流,请为 MutableStateFlow 类的 value 属性分配一个新值。

在 Android 中,StateFlow 非常适合需要维护可观测可变状态的类。

遵循 Kotlin 流中的示例,可以从 LatestNewsViewModel 暴露一个 StateFlow,以便 View 可以监听界面状态更新,并固有地使屏幕状态在配置更改后仍能保留。

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    // Backing property to avoid state updates from other classes
    private val _uiState = MutableStateFlow(LatestNewsUiState.Success(emptyList()))
    // The UI collects from this StateFlow to get its state updates
    val uiState: StateFlow<LatestNewsUiState> = _uiState

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Update View with the latest favorite news
                // Writes to the value property of MutableStateFlow,
                // adding a new element to the flow and updating all
                // of its collectors
                .collect { favoriteNews ->
                    _uiState.value = LatestNewsUiState.Success(favoriteNews)
                }
        }
    }
}

// Represents different states for the LatestNews screen
sealed class LatestNewsUiState {
    data class Success(val news: List<ArticleHeadline>): LatestNewsUiState()
    data class Error(val exception: Throwable): LatestNewsUiState()
}

负责更新 MutableStateFlow 的类是生产者,而从 StateFlow 收集的所有类都是消费者。与使用 flow 构建器构建的*冷*流不同,StateFlow 是*热*流:从流中收集不会触发任何生产者代码。StateFlow 始终处于活动状态并驻留在内存中,并且仅当没有其他引用从垃圾回收根指向它时,它才符合垃圾回收的条件。

当新的消费者开始从流中收集时,它会收到流中的最后一个状态以及任何后续状态。您可以在 LiveData 等其他可观测类中找到此行为。

View 像处理任何其他流一样监听 StateFlow

class LatestNewsActivity : AppCompatActivity() {
    private val latestNewsViewModel = // getViewModel()

    override fun onCreate(savedInstanceState: Bundle?) {
        ...
        // Start a coroutine in the lifecycle scope
        lifecycleScope.launch {
            // repeatOnLifecycle launches the block in a new coroutine every time the
            // lifecycle is in the STARTED state (or above) and cancels it when it's STOPPED.
            repeatOnLifecycle(Lifecycle.State.STARTED) {
                // Trigger the flow and start listening for values.
                // Note that this happens when lifecycle is STARTED and stops
                // collecting when the lifecycle is STOPPED
                latestNewsViewModel.uiState.collect { uiState ->
                    // New value received
                    when (uiState) {
                        is LatestNewsUiState.Success -> showFavoriteNews(uiState.news)
                        is LatestNewsUiState.Error -> showError(uiState.exception)
                    }
                }
            }
        }
    }
}

要将任何流转换为 StateFlow,请使用 stateIn 中间运算符。

StateFlow、Flow 和 LiveData

StateFlowLiveData 有相似之处。它们都是可观测的数据持有者类,并且在应用架构中使用时都遵循相似的模式。

但请注意,StateFlowLiveData 的行为方式不同:

  • StateFlow 需要在构造函数中传入初始状态,而 LiveData 不需要。
  • LiveData.observe() 会在视图进入 STOPPED 状态时自动取消注册消费者,而从 StateFlow 或任何其他流中收集不会自动停止收集。要实现相同的行为,您需要从 Lifecycle.repeatOnLifecycle 块中收集流。

使用 shareIn 将冷流转换为热流

StateFlow 是一个*热*流——只要流正在被收集,或者只要从垃圾回收根中存在对其的任何其他引用,它就会保留在内存中。您可以使用 shareIn 运算符将冷流转换为热流。

以在 Kotlin 流中创建的 callbackFlow 为例,您可以通过使用 shareIn 在收集器之间共享从 Firestore 检索到的数据,而不是让每个收集器都创建新的流。您需要传入以下内容:

  • 一个用于共享流的 CoroutineScope。此作用域的生命周期应长于任何消费者,以使共享流在需要时保持活动状态。
  • 要重播给每个新收集器的项目数量。
  • 启动行为策略。
class NewsRemoteDataSource(...,
    private val externalScope: CoroutineScope,
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        ...
    }.shareIn(
        externalScope,
        replay = 1,
        started = SharingStarted.WhileSubscribed()
    )
}

在此示例中,latestNews 流会将最后发出的项目重播给新的收集器,并且只要 externalScope 处于活动状态且存在活动的收集器,它就会保持活动状态。SharingStarted.WhileSubscribed() 启动策略在存在活动订阅者时保持上游生产者处于活动状态。还有其他启动策略可用,例如 SharingStarted.Eagerly 可立即启动生产者,或 SharingStarted.Lazily 可在第一个订阅者出现后开始共享并使流永久保持活动状态。

SharedFlow

shareIn 函数返回一个 SharedFlow,这是一个热流,它会向所有从中收集的消费者发出值。SharedFlowStateFlow 的高度可配置的泛化。

您可以创建 SharedFlow 而无需使用 shareIn。例如,您可以使用 SharedFlow 向应用的其余部分发送“心跳”,以便所有内容可以同时定期刷新。除了获取最新新闻,您可能还希望刷新用户信息部分及其收藏主题集合。在以下代码段中,TickHandler 暴露了一个 SharedFlow,以便其他类知道何时刷新其内容。与 StateFlow 一样,在类中使用 MutableSharedFlow 类型的后备属性来向流发送项目。

// Class that centralizes when the content of the app needs to be refreshed
class TickHandler(
    private val externalScope: CoroutineScope,
    private val tickIntervalMs: Long = 5000
) {
    // Backing property to avoid flow emissions from other classes
    private val _tickFlow = MutableSharedFlow<Unit>(replay = 0)
    val tickFlow: SharedFlow<Event<String>> = _tickFlow

    init {
        externalScope.launch {
            while(true) {
                _tickFlow.emit(Unit)
                delay(tickIntervalMs)
            }
        }
    }
}

class NewsRepository(
    ...,
    private val tickHandler: TickHandler,
    private val externalScope: CoroutineScope
) {
    init {
        externalScope.launch {
            // Listen for tick updates
            tickHandler.tickFlow.collect {
                refreshLatestNews()
            }
        }
    }

    suspend fun refreshLatestNews() { ... }
    ...
}

您可以按以下方式自定义 SharedFlow 的行为:

  • replay 允许您为新订阅者重新发送一定数量的先前发出的值。
  • onBufferOverflow 允许您指定当缓冲区充满要发送的项目时要执行的策略。默认值为 BufferOverflow.SUSPEND,这会使调用者挂起。其他选项是 DROP_LATESTDROP_OLDEST

MutableSharedFlow 还具有一个 subscriptionCount 属性,其中包含活动收集器的数量,以便您可以相应地优化您的业务逻辑。MutableSharedFlow 还包含一个 resetReplayCache 函数,如果您不想重播发送到流的最新信息,可以使用它。

其他流资源