StateFlow
和 SharedFlow
是 Flow API,它们使流能够以最佳方式发出状态更新并向多个使用者发出值。
StateFlow
StateFlow
是一种状态持有者可观察流,它向其收集器发出当前和新的状态更新。当前状态值也可以通过其 value
属性读取。要更新状态并将其发送到流,请将新值分配给 MutableStateFlow
类的 value
属性。
在 Android 中,StateFlow
非常适合需要维护可观察的可变状态的类。
遵循来自 Kotlin 流 的示例,可以从 LatestNewsViewModel
公开 StateFlow
,以便 View
可以监听 UI 状态更新,并且固有地使屏幕状态在配置更改中保持不变。
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
// Backing property to avoid state updates from other classes
private val _uiState = MutableStateFlow(LatestNewsUiState.Success(emptyList()))
// The UI collects from this StateFlow to get its state updates
val uiState: StateFlow<LatestNewsUiState> = _uiState
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Update View with the latest favorite news
// Writes to the value property of MutableStateFlow,
// adding a new element to the flow and updating all
// of its collectors
.collect { favoriteNews ->
_uiState.value = LatestNewsUiState.Success(favoriteNews)
}
}
}
}
// Represents different states for the LatestNews screen
sealed class LatestNewsUiState {
data class Success(val news: List<ArticleHeadline>): LatestNewsUiState()
data class Error(val exception: Throwable): LatestNewsUiState()
}
负责更新 MutableStateFlow
的类是生产者,所有从 StateFlow
收集的类都是使用者。与使用 flow
生成器构建的冷流不同,StateFlow
是热的:从流中收集不会触发任何生产者代码。 StateFlow
始终处于活动状态并在内存中,并且只有在没有来自垃圾回收根的其他引用时,它才有资格进行垃圾回收。
当新的使用者开始从流中收集时,它会接收流中的最后一个状态和任何后续状态。您可以在其他可观察类(如 LiveData
)中找到此行为。
View
就像监听任何其他流一样监听 StateFlow
class LatestNewsActivity : AppCompatActivity() {
private val latestNewsViewModel = // getViewModel()
override fun onCreate(savedInstanceState: Bundle?) {
...
// Start a coroutine in the lifecycle scope
lifecycleScope.launch {
// repeatOnLifecycle launches the block in a new coroutine every time the
// lifecycle is in the STARTED state (or above) and cancels it when it's STOPPED.
repeatOnLifecycle(Lifecycle.State.STARTED) {
// Trigger the flow and start listening for values.
// Note that this happens when lifecycle is STARTED and stops
// collecting when the lifecycle is STOPPED
latestNewsViewModel.uiState.collect { uiState ->
// New value received
when (uiState) {
is LatestNewsUiState.Success -> showFavoriteNews(uiState.news)
is LatestNewsUiState.Error -> showError(uiState.exception)
}
}
}
}
}
}
要将任何流转换为 StateFlow
,请使用 stateIn
中间运算符。
StateFlow、Flow 和 LiveData
StateFlow
和 LiveData
具有相似之处。两者都是可观察的数据持有者类,并且在应用架构中使用时都遵循类似的模式。
但是,请注意,StateFlow
和 LiveData
的行为确实不同
StateFlow
要求在构造函数中传递初始状态,而LiveData
则不需要。LiveData.observe()
在视图进入STOPPED
状态时会自动取消注册消费者,而从StateFlow
或任何其他流中收集数据不会自动停止收集。为了实现相同行为,您需要在Lifecycle.repeatOnLifecycle
块中收集流。
使用 shareIn
将冷流转换为热流
StateFlow
是一个热流——只要收集流或存在来自垃圾回收根的任何其他引用,它就会保留在内存中。您可以使用 shareIn
运算符将冷流转换为热流。
以 Kotlin 流 中创建的 callbackFlow
为例,您可以使用 shareIn
在收集器之间共享从 Firestore 获取的数据,而不是让每个收集器都创建一个新的流。您需要传入以下内容:
- 一个用于共享流的
CoroutineScope
。此作用域的生存时间应长于任何消费者,以便在需要时保持共享流处于活动状态。 - 要重播给每个新收集器的项目数量。
- 启动行为策略。
class NewsRemoteDataSource(...,
private val externalScope: CoroutineScope,
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
...
}.shareIn(
externalScope,
replay = 1,
started = SharingStarted.WhileSubscribed()
)
}
在此示例中,latestNews
流会将最后发出的项目重播给新的收集器,并在 externalScope
处于活动状态且存在活动收集器时保持活动状态。 SharingStarted.WhileSubscribed()
启动策略在存在活动订阅者时保持上游生产者处于活动状态。还提供其他启动策略,例如 SharingStarted.Eagerly
以立即启动生产者或 SharingStarted.Lazily
以在第一个订阅者出现后开始共享并永远保持流处于活动状态。
SharedFlow
shareIn
函数返回一个 SharedFlow
,这是一个向所有从中收集数据的消费者发出值的热流。SharedFlow
是 StateFlow
的高度可配置的泛化。
您可以创建 SharedFlow
而无需使用 shareIn
。例如,您可以使用 SharedFlow
向应用程序的其余部分发送滴答信号,以便所有内容在同一时间定期刷新。除了获取最新新闻外,您可能还希望使用其收藏的热门主题刷新用户信息部分。在以下代码片段中,TickHandler
公开了 SharedFlow
,以便其他类知道何时刷新其内容。与 StateFlow
一样,在类中使用类型为 MutableSharedFlow
的后备属性将项目发送到流
// Class that centralizes when the content of the app needs to be refreshed
class TickHandler(
private val externalScope: CoroutineScope,
private val tickIntervalMs: Long = 5000
) {
// Backing property to avoid flow emissions from other classes
private val _tickFlow = MutableSharedFlow<Unit>(replay = 0)
val tickFlow: SharedFlow<Event<String>> = _tickFlow
init {
externalScope.launch {
while(true) {
_tickFlow.emit(Unit)
delay(tickIntervalMs)
}
}
}
}
class NewsRepository(
...,
private val tickHandler: TickHandler,
private val externalScope: CoroutineScope
) {
init {
externalScope.launch {
// Listen for tick updates
tickHandler.tickFlow.collect {
refreshLatestNews()
}
}
}
suspend fun refreshLatestNews() { ... }
...
}
您可以通过以下方式自定义 SharedFlow
行为
replay
允许您为新的订阅者重新发送一些先前发出的值。onBufferOverflow
允许您指定缓冲区已满时要发送的项目的策略。默认值为BufferOverflow.SUSPEND
,这会导致调用者挂起。其他选项为DROP_LATEST
或DROP_OLDEST
。
MutableSharedFlow
还具有一个 subscriptionCount
属性,其中包含活动收集器的数量,以便您可以相应地优化您的业务逻辑。MutableSharedFlow
还包含一个 resetReplayCache
函数,如果您不想重播发送到流的最新信息。
其他流资源
- Android 上的 Kotlin 流
- Android 上的 Kotlin 流测试
- 关于 Flow 的 shareIn 和 stateIn 运算符需要了解的事项
- 从 LiveData 迁移到 Kotlin Flow
- Kotlin 协程和流的其他资源