1. 开始之前
在本 Codelab 中,您将学习如何使用 LiveData
构建器 在 Android 应用中将 Kotlin 协程 与 LiveData
组合起来。我们还将使用 协程异步流,它是协程库中用于表示异步值序列(或流)的一种类型,来实现相同的功能。
您将从一个现有的应用开始,该应用使用 Android 架构组件 构建,并使用 LiveData
从 Room
数据库获取对象列表并在 RecyclerView
网格布局中显示它们。
以下是一些代码片段,让您了解您将要做什么。以下是查询 Room 数据库的现有代码
val plants: LiveData<List<Plant>> = plantDao.getPlants()
LiveData
将使用 LiveData
构建器和协程以及额外的排序逻辑进行更新
val plants: LiveData<List<Plant>> = liveData<List<Plant>> {
val plantsLiveData = plantDao.getPlants()
val customSortOrder = plantsListSortOrderCache.getOrAwait()
emitSource(plantsLiveData.map { plantList -> plantList.applySort(customSortOrder) })
}
您还将使用 Flow
实现相同的逻辑
private val customSortFlow = plantsListSortOrderCache::getOrAwait.asFlow()
val plantsFlow: Flow<List<Plant>>
get() = plantDao.getPlantsFlow()
.combine(customSortFlow) { plants, sortOrder ->
plants.applySort(sortOrder)
}
.flowOn(defaultDispatcher)
.conflate()
先决条件
- 了解架构组件
ViewModel
、LiveData
、Repository
和Room
。 - 了解 Kotlin 语法,包括扩展函数和 Lambda 表达式。
- 了解 Kotlin 协程。
- 基本了解如何在 Android 上使用线程,包括主线程、后台线程和回调。
您将做什么
- 将现有的
LiveData
转换为使用 Kotlin 协程友好的LiveData
构建器。 - 在
LiveData
构建器中添加逻辑。 - 使用
Flow
进行异步操作。 - 组合
Flow
并转换多个异步源。 - 使用
Flow
控制并发性。 - 了解如何在
LiveData
和Flow
之间进行选择。
您需要什么
- Android Studio Arctic Fox。此 Codelab 可能会与其他版本一起使用,但某些内容可能缺失或外观不同。
如果您在完成此 Codelab 时遇到任何问题(代码错误、语法错误、措辞不清等),请通过 Codelab 左下角的“报告错误”链接报告问题。
2. 设置
下载代码
点击以下链接下载此 Codelab 的所有代码
…或者使用以下命令从命令行克隆 GitHub 存储库
$ git clone https://github.com/googlecodelabs/kotlin-coroutines.git
此 Codelab 的代码位于 advanced-coroutines-codelab
目录中。
常见 问题解答
3. 运行起始示例应用
首先,让我们看看起始示例应用是什么样子的。按照以下说明在 Android Studio 中打开示例应用。
- 如果您下载了
kotlin-coroutines
zip 文件,请解压缩该文件。 - 在 Android Studio 中打开
advanced-coroutines-codelab
目录。 - 确保在配置下拉列表中选择了
start
。 - 点击 运行 按钮,然后选择模拟器或连接您的 Android 设备。设备必须能够运行 Android Lollipop(最低支持的 SDK 为 21)。
应用首次运行时,会出现一个卡片列表,每个卡片显示特定植物的名称和图像
每个 Plant
都有一个 growZoneNumber
,该属性表示植物最有可能茁壮成长的区域。用户可以点击筛选图标 在显示所有植物和特定生长区域的植物之间切换,该区域硬编码为 9 区。多次按下筛选按钮以查看其操作。
架构概述
此应用使用 架构组件 将 MainActivity
和 PlantListFragment
中的 UI 代码与 PlantListViewModel
中的应用逻辑分离。 PlantRepository
在 ViewModel
和 PlantDao
之间提供桥梁,后者访问 Room
数据库以返回 Plant
对象列表。然后,UI 获取此植物列表并在 RecyclerView
网格布局中显示它们。
在我们开始修改代码之前,让我们快速了解一下数据如何从数据库流向 UI。以下是如何在 ViewModel
中加载植物列表的
PlantListViewModel.kt
val plants: LiveData<List<Plant>> = growZone.switchMap { growZone ->
if (growZone == NoGrowZone) {
plantRepository.plants
} else {
plantRepository.getPlantsWithGrowZone(growZone)
}
}
GrowZone
是一个内联类,它仅包含一个表示其区域的 Int
。 NoGrowZone
表示区域不存在,仅用于筛选。
Plant.kt
inline class GrowZone(val number: Int)
val NoGrowZone = GrowZone(-1)
当点击筛选按钮时,growZone
会切换。我们使用 switchMap
来确定要返回的植物列表。
以下是用于从数据库获取植物数据的存储库和数据访问对象 (DAO) 的外观
PlantDao.kt
@Query("SELECT * FROM plants ORDER BY name")
fun getPlants(): LiveData<List<Plant>>
@Query("SELECT * FROM plants WHERE growZoneNumber = :growZoneNumber ORDER BY name")
fun getPlantsWithGrowZoneNumber(growZoneNumber: Int): LiveData<List<Plant>>
PlantRepository.kt
val plants = plantDao.getPlants()
fun getPlantsWithGrowZone(growZone: GrowZone) =
plantDao.getPlantsWithGrowZoneNumber(growZone.number)
虽然大多数代码修改都在 PlantListViewModel
和 PlantRepository
中,但最好花点时间熟悉一下项目的结构,重点关注植物数据如何通过从数据库到 Fragment
的各个层级显示。在下一步中,我们将修改代码以使用 LiveData
构建器添加自定义排序。
4. 带有自定义排序的植物
植物列表目前按字母顺序显示,但我们希望通过先列出某些植物,然后按字母顺序列出其余植物来更改此列表的顺序。这类似于购物应用在可用购买商品列表顶部显示赞助结果。我们的产品团队希望能够动态更改排序顺序,而无需发布新版本的应用,因此我们将从后端获取要先排序的植物列表。
以下是应用使用自定义排序后的外观
自定义排序顺序列表包含以下四种植物:橙子、向日葵、葡萄和鳄梨。请注意,它们首先出现在列表中,然后按字母顺序排列其余植物。
现在,如果按下筛选按钮(并且仅显示GrowZone
9 的植物),则向日葵将从列表中消失,因为其GrowZone
不是 9。自定义排序列表中的其他三种植物位于GrowZone
9,因此它们将保留在列表顶部。GrowZone
9 中的唯一其他植物是番茄,它出现在此列表的最后。
让我们开始编写代码来实现自定义排序。
5. 获取排序顺序
我们将首先编写一个挂起函数,从网络获取自定义排序顺序,然后将其缓存到内存中。
将以下内容添加到PlantRepository
PlantRepository.kt
private var plantsListSortOrderCache =
CacheOnSuccess(onErrorFallback = { listOf<String>() }) {
plantService.customPlantSortOrder()
}
plantsListSortOrderCache
用作自定义排序顺序的内存缓存。如果出现网络错误,它将回退到空列表,以便即使未获取排序顺序,我们的应用仍可以显示数据。
此代码使用sunflower
模块中提供的CacheOnSuccess
实用程序类来处理缓存。通过像这样抽象化实现缓存的细节,应用程序代码可以更简单。由于CacheOnSuccess
已经过充分测试,因此我们无需为我们的存储库编写太多测试来确保正确的行为。在使用kotlinx-coroutines
时,在代码中引入类似的高级抽象是一个好主意。
现在让我们合并一些逻辑以将排序应用于植物列表。
将以下内容添加到PlantRepository:
PlantRepository.kt
private fun List<Plant>.applySort(customSortOrder: List<String>): List<Plant> {
return sortedBy { plant ->
val positionForItem = customSortOrder.indexOf(plant.plantId).let { order ->
if (order > -1) order else Int.MAX_VALUE
}
ComparablePair(positionForItem, plant.name)
}
}
此扩展函数将重新排列列表,将位于customSortOrder
中的Plants
放置在列表的前面。
6. 使用 LiveData 构建逻辑
现在排序逻辑已到位,请使用以下LiveData
构建器 替换plants
和getPlantsWithGrowZone
的代码
PlantRepository.kt
val plants: LiveData<List<Plant>> = liveData<List<Plant>> {
val plantsLiveData = plantDao.getPlants()
val customSortOrder = plantsListSortOrderCache.getOrAwait()
emitSource(plantsLiveData.map {
plantList -> plantList.applySort(customSortOrder)
})
}
fun getPlantsWithGrowZone(growZone: GrowZone) = liveData {
val plantsGrowZoneLiveData = plantDao.getPlantsWithGrowZoneNumber(growZone.number)
val customSortOrder = plantsListSortOrderCache.getOrAwait()
emitSource(plantsGrowZoneLiveData.map { plantList ->
plantList.applySort(customSortOrder)
})
}
现在,如果您运行应用,自定义排序的植物列表应该会出现
LiveData
构建器允许我们异步计算值,因为liveData
由协程支持。在这里,我们有一个挂起函数,用于从数据库获取LiveData
植物列表,同时还调用一个挂起函数来获取自定义排序顺序。然后,我们将这两个值组合起来对植物列表进行排序并返回该值,所有这些都在构建器中进行。
协程在被观察时开始执行,并在协程成功完成或数据库或网络调用失败时取消。
在下一步中,我们将探讨使用转换 的getPlantsWithGrowZone
的变体。
7. liveData:修改值
我们现在将修改PlantRepository
以实现一个挂起的转换,因为每个值都在处理,学习如何在LiveData
中构建复杂的异步转换。作为先决条件,让我们创建一个在主线程上安全使用的排序算法版本。我们可以使用withContext
切换到另一个调度程序,仅用于 lambda,然后恢复到我们开始的调度程序。
将以下内容添加到PlantRepository
PlantRepository.kt
@AnyThread
suspend fun List<Plant>.applyMainSafeSort(customSortOrder: List<String>) =
withContext(defaultDispatcher) {
this@applyMainSafeSort.applySort(customSortOrder)
}
然后,我们可以将此新的主线程安全排序与LiveData
构建器一起使用。更新块以使用switchMap
,它允许您在每次收到新值时指向一个新的LiveData
。
PlantRepository.kt
fun getPlantsWithGrowZone(growZone: GrowZone) =
plantDao.getPlantsWithGrowZoneNumber(growZone.number)
.switchMap { plantList ->
liveData {
val customSortOrder = plantsListSortOrderCache.getOrAwait()
emit(plantList.applyMainSafeSort(customSortOrder))
}
}
与以前的版本相比,一旦从网络收到自定义排序顺序,就可以将其与新的主线程安全applyMainSafeSort
一起使用。然后,此结果作为getPlantsWithGrowZone
返回的新值发出到switchMap
。
与上面的plants
LiveData 类似,协程在被观察时开始执行,并在完成或数据库或网络调用失败时终止。这里的区别在于,在映射中进行网络调用是安全的,因为它已缓存。
现在让我们看看如何使用 Flow 实现此代码,并比较这些实现。
8. 介绍 Flow
我们将使用来自kotlinx-coroutines
的Flow 构建相同的逻辑。在这样做之前,让我们先了解一下什么是 Flow 以及如何将其整合到您的应用中。
Flow 是Sequence 的异步版本,Sequence 是一种其值按需生成的集合类型。就像 Sequence 一样,Flow 在需要时按需生成每个值,并且 Flow 可以包含无限数量的值。
那么,Kotlin 为什么引入新的Flow
类型,它与普通 Sequence 有什么不同?答案在于异步的魔力。Flow
完全支持协程。这意味着您可以使用协程构建、转换和使用Flow
。您还可以控制并发性,这意味着可以使用Flow
以声明方式协调多个协程的执行。
这开启了许多令人兴奋的可能性。
Flow
可用于完全反应式编程风格。如果您以前使用过RxJava
之类的东西,Flow
提供了类似的功能。通过使用诸如map
、flatMapLatest
、combine
等函数运算符转换 Flow,可以简洁地表达应用逻辑。
Flow
还支持大多数运算符上的挂起函数。这使您可以在像map
这样的运算符中执行顺序异步任务。通过在 Flow 内部使用挂起操作,与完全反应式风格的等效代码相比,它通常会产生更短且更易于阅读的代码。
在本 Codelab 中,我们将探讨使用这两种方法。
Flow 如何运行
为了习惯 Flow 如何按需(或延迟)生成值,请查看以下发出值(1, 2, 3)
并打印每个项目生成之前、期间和之后的内容的 Flow。
fun makeFlow() = flow {
println("sending first value")
emit(1)
println("first value collected, sending another value")
emit(2)
println("second value collected, sending a third value")
emit(3)
println("done")
}
scope.launch {
makeFlow().collect { value ->
println("got $value")
}
println("flow is completed")
}
如果运行此代码,它将产生以下输出
sending first value got 1 first value collected, sending another value got 2 second value collected, sending a third value got 3 done flow is completed
您可以看到执行如何在collect
lambda 和flow
构建器之间弹跳。每次 Flow 构建器调用emit
时,它都会挂起
,直到该元素完全处理完毕。然后,当从 Flow 请求另一个值时,它会从上次离开的地方恢复
,直到再次调用 emit。当flow
构建器完成时,collect
现在可以完成,并且调用块打印“flow 已完成”。
对collect
的调用非常重要。Flow
使用诸如collect
之类的挂起运算符而不是公开Iterator
接口,以便它始终知道何时正在被主动使用。更重要的是,它知道调用者何时无法请求更多值,以便它可以清理资源。
Flow 何时运行
在上例中,Flow
在 collect
运算符运行时开始运行。通过调用 flow
构建器或其他 API 创建新的 Flow
不会导致任何工作执行。挂起运算符 collect
在 Flow
中被称为 **终端运算符**。还有其他挂起终端运算符,例如 toList
、first
和 single
与 kotlinx-coroutines
一起提供,您也可以构建自己的终端运算符。
默认情况下,Flow
将执行:
- 每次应用终端运算符时(并且每次新的调用都独立于之前启动的任何调用)
- 直到它正在运行的协程被取消
- 当最后一个值已完全处理,并且请求了另一个值时
由于这些规则,Flow
可以参与结构化并发,并且可以安全地从 Flow
启动长时间运行的协程。Flow
不会泄漏资源,因为它们始终使用 协程协作取消规则 在调用者被取消时清理。
让我们修改上面的流,只使用 take
运算符查看前两个元素,然后收集它两次。
scope.launch {
val repeatableFlow = makeFlow().take(2) // we only care about the first two elements
println("first collection")
repeatableFlow.collect()
println("collecting again")
repeatableFlow.collect()
println("second collection completed")
}
运行此代码,您将看到此输出
first collection sending first value first value collected, sending another value collecting again sending first value first value collected, sending another value second collection completed
每次调用 collect
时,flow
lambda 都会从顶部开始。如果流执行了昂贵的操作(例如发出网络请求),这一点很重要。此外,由于我们应用了 take(2)
运算符,因此流只会生成两个值。在第二次调用 emit
后,它不会再次恢复流 lambda,因此“second value collected...”行永远不会打印。
9. 使用 flow 进行异步操作
好的,所以 Flow
就像 Sequence
一样是惰性的,但它又是如何异步的呢?让我们看一个异步序列的示例——观察数据库的变化。
在此示例中,我们需要协调在数据库线程池上生成的数据与位于另一个线程(例如主线程或 UI 线程)上的观察者。而且,由于我们将在数据更改时重复发出结果,因此此场景非常适合异步序列模式。
假设您的任务是为 Flow
编写 Room
集成。如果您从 Room
中现有的挂起查询支持开始,您可能会编写如下内容
// This code is a simplified version of how Room implements flow
fun <T> createFlow(query: Query, tables: List<Tables>): Flow<T> = flow {
val changeTracker = tableChangeTracker(tables)
while(true) {
emit(suspendQuery(query))
changeTracker.suspendUntilChanged()
}
}
此代码依赖于两个虚构的挂起函数来生成 Flow
suspendQuery
– 一个主线程安全的函数,用于运行常规的Room
挂起查询suspendUntilChanged
– 一个函数,它会挂起协程,直到其中一个表发生更改
收集时,流最初会 emit
查询的第一个值。一旦处理了该值,流就会恢复并调用 suspendUntilChanged
,它将按照其名称所述——挂起流,直到其中一个表发生更改。此时,系统中没有任何事情发生,直到其中一个表发生更改并且流恢复。
当流恢复时,它会发出另一个主线程安全的查询,并 emit
结果。此过程在无限循环中永远持续下去。
Flow 和结构化并发
但是等等——我们不想泄漏工作!协程本身并不昂贵,但它会重复唤醒自身以执行数据库查询。这是一个非常昂贵的泄漏操作。
即使我们创建了一个无限循环,Flow
也会通过支持结构化并发来帮助我们。
使用流消费值或迭代流的唯一方法是使用终端运算符。因为所有终端运算符都是挂起函数,所以工作与调用它们的范围的生命周期绑定。当范围被取消时,流将使用常规的 协程协作取消规则 自动取消自身。因此,即使我们在流构建器中编写了一个无限循环,我们也可以安全地使用它,而不会因结构化并发而导致泄漏。
10. 将 Flow 与 Room 一起使用
在此步骤中,您将学习如何将 Flow
与 Room
一起使用并将其连接到 UI。
此步骤对于许多 Flow
用法都很常见。以这种方式使用时,来自 Room
的 Flow
充当类似于 LiveData
的可观察数据库查询。
更新 Dao
首先,打开 PlantDao.kt
,并添加两个返回 Flow<List<Plant>>
的新查询
PlantDao.kt
@Query("SELECT * from plants ORDER BY name")
fun getPlantsFlow(): Flow<List<Plant>>
@Query("SELECT * from plants WHERE growZoneNumber = :growZoneNumber ORDER BY name")
fun getPlantsWithGrowZoneNumberFlow(growZoneNumber: Int): Flow<List<Plant>>
请注意,除了返回类型外,这些函数与 LiveData
版本相同。但是,我们将并排开发它们以进行比较。
通过指定 Flow
返回类型,Room
将以以下特征执行查询
- **主线程安全 –** 具有
Flow
返回类型的查询始终在Room
执行器上运行,因此它们始终是主线程安全的。您无需在代码中执行任何操作即可使它们在主线程之外运行。 - **观察更改 –**
Room
自动观察更改并将新值发出到流中。 - **异步序列 –**
Flow
在每次更改时发出整个查询结果,并且不会引入任何缓冲区。如果返回Flow<List<T>>
,则流会发出一个包含查询结果中所有行的List<T>
。它将像序列一样执行——一次发出一个查询结果并挂起,直到要求它发出下一个结果。 - **可取消 –** 当收集这些流的范围被取消时,
Room
会取消观察此查询。
总而言之,这使得 Flow
成为从 UI 层观察数据库的绝佳返回类型。
更新存储库
要继续将新返回值连接到 UI,请打开 PlantRepository.kt
,并添加以下代码
PlantRepository.kt
val plantsFlow: Flow<List<Plant>>
get() = plantDao.getPlantsFlow()
fun getPlantsWithGrowZoneFlow(growZoneNumber: GrowZone): Flow<List<Plant>> {
return plantDao.getPlantsWithGrowZoneNumberFlow(growZoneNumber.number)
}
目前,我们只是将 Flow
值传递给调用方。这与我们开始此 codelab 时将 LiveData
传递给 ViewModel
完全相同。
更新 ViewModel
在 PlantListViewModel.kt
中,让我们从简单开始,只公开 plantsFlow
。我们将在接下来的几个步骤中返回并向流版本添加种植区域切换。
PlantListViewModel.kt
// add a new property to plantListViewModel
val plantsUsingFlow: LiveData<List<Plant>> = plantRepository.plantsFlow.asLiveData()
同样,在进行过程中,我们将保留 LiveData
版本(val plants
)以进行比较。
由于我们希望在此 codelab 中在 UI 层保留 LiveData
,因此我们将使用 asLiveData
扩展函数将我们的 Flow
转换为 LiveData
。就像 LiveData
构建器一样,这会向生成的 LiveData
添加可配置的超时。这很好,因为它可以防止我们在每次配置更改(例如设备旋转)时重新启动查询。
由于 flow 提供了主线程安全性和取消功能,因此您可以选择将 Flow
一直传递到 UI 层,而无需将其转换为 LiveData
。但是,对于此 codelab,我们将在 UI 层坚持使用 LiveData
。
还在 ViewModel
中,向 init
块添加缓存更新。此步骤目前是可选的,但如果您清除缓存并且不添加此调用,则在应用程序中将看不到任何数据。
PlantListViewModel.kt
init {
clearGrowZoneNumber() // keep this
// fetch the full plant list
launchDataLoad { plantRepository.tryUpdateRecentPlantsCache() }
}
更新 Fragment
打开 PlantListFragment.kt
,并将 subscribeUi
函数更改为指向我们的新 plantsUsingFlow
LiveData
。
PlantListFragment.kt
private fun subscribeUi(adapter: PlantAdapter) {
viewModel.plantsUsingFlow.observe(viewLifecycleOwner) { plants ->
adapter.submitList(plants)
}
}
使用 Flow 运行应用程序
如果您再次运行应用程序,您应该会看到您现在正在使用 Flow
加载数据!由于我们尚未实现 switchMap
,因此过滤器选项不起作用。
在下一步中,我们将了解如何在 Flow
中转换数据。
11. 声明式组合流
在此步骤中,您将把排序顺序应用于 plantsFlow
。我们将使用 flow
的声明式 API 来实现。
通过使用诸如 map
、combine
或 mapLatest
这样的转换,我们可以声明式地表达我们希望如何在元素流经流程时转换每个元素。它甚至允许我们声明式地表达并发性,这可以真正简化代码。在本节中,您将看到如何使用操作符告诉 Flow
启动两个协程并声明式地组合其结果。
首先,打开 PlantRepository.kt
并定义一个名为 customSortFlow
的新私有流。
PlantRepository.kt
private val customSortFlow = flow { emit(plantsListSortOrderCache.getOrAwait()) }
这定义了一个 Flow
,当收集时,它将调用 getOrAwait
并emit
排序顺序。
由于此流仅发出单个值,您还可以使用 asFlow
直接从 getOrAwait
函数构建它。
// Create a flow that calls a single function
private val customSortFlow = plantsListSortOrderCache::getOrAwait.asFlow()
此代码创建一个新的 Flow
,该流调用 getOrAwait
并将其结果作为第一个也是唯一的值发出。它通过使用 ::
引用 getOrAwait 方法并在结果 Function
对象上调用 asFlow
来实现。
这两个流都执行相同的操作,调用 getOrAwait
并发出结果,然后完成。
声明式地组合多个流
现在我们有两个流,customSortFlow
和 plantsFlow
,让我们声明式地组合它们!
向 plantsFlow
添加 combine
操作符。
PlantRepository.kt
private val customSortFlow = plantsListSortOrderCache::getOrAwait.asFlow()
val plantsFlow: Flow<List<Plant>>
get() = plantDao.getPlantsFlow()
// When the result of customSortFlow is available,
// this will combine it with the latest value from
// the flow above. Thus, as long as both `plants`
// and `sortOrder` are have an initial value (their
// flow has emitted at least one value), any change
// to either `plants` or `sortOrder` will call
// `plants.applySort(sortOrder)`.
.combine(customSortFlow) { plants, sortOrder ->
plants.applySort(sortOrder)
}
combine
操作符将两个流组合在一起。这两个流将在各自的协程中运行,然后每当任何一个流产生新值时,转换将使用来自这两个流的最新值被调用。
通过使用 combine
,我们可以将缓存的网络查找与我们的数据库查询结合起来。它们都将在不同的协程上并发运行。这意味着,当 Room 启动网络请求时,Retrofit 可以启动网络查询。然后,一旦两个流都提供结果,它将调用 combine
lambda,我们在其中将加载的排序顺序应用于加载的植物。
为了探索 combine
操作符的工作原理,修改 customSortFlow
以在 onStart
中发出两次,并在两次之间有较长的延迟,如下所示。
// Create a flow that calls a single function
private val customSortFlow = plantsListSortOrderCache::getOrAwait.asFlow()
.onStart {
emit(listOf())
delay(1500)
}
转换 onStart
将在观察者监听其他操作符之前发生,并且它可以发出占位符值。因此,这里我们发出一个空列表,延迟 1500 毫秒调用 getOrAwait
,然后继续原始流。如果您现在运行应用程序,您将看到 Room 数据库查询立即返回,并与空列表组合(这意味着它将按字母顺序排序)。然后大约 1500 毫秒后,它应用自定义排序。
在继续学习 Codelab 之前,请从 customSortFlow
中删除 onStart
转换。
Flow 和主线程安全
Flow
可以调用**主线程安全**函数,就像我们在这里所做的那样,它将保留协程的正常主线程安全保证。 Room
和 Retrofit
都将为我们提供主线程安全,我们不需要做任何其他事情来使用 Flow 进行网络请求或数据库查询。
此流已经使用了以下线程
plantService.customPlantSortOrder
在 Retrofit 线程上运行(它调用Call.enqueue
)getPlantsFlow
将在 Room Executor 上运行查询applySort
将在收集调度程序(在本例中为Dispatchers.Main
)上运行
因此,如果我们只是在 Retrofit
中调用挂起函数并使用 Room
流,我们就不需要用主线程安全问题来复杂化此代码。
但是,随着我们的数据集大小的增长,对 applySort
的调用可能会变得足够慢以阻塞主线程。Flow
提供了一个名为 flowOn
的声明式 API 来控制流运行的线程。
向 plantsFlow
添加 flowOn
,如下所示。
PlantRepository.kt
private val customSortFlow = plantsListSortOrderCache::getOrAwait.asFlow()
val plantsFlow: Flow<List<Plant>>
get() = plantDao.getPlantsFlow()
.combine(customSortFlow) { plants, sortOrder ->
plants.applySort(sortOrder)
}
.flowOn(defaultDispatcher)
.conflate()
调用 flowOn
对代码的执行方式有两个重要的影响。
- 在调用
flowOn
**之前**,在一个新的defaultDispatcher
(在本例中为Dispatchers.Default
)上启动一个新的协程来运行和收集流。 - 引入一个缓冲区,用于将结果从新协程发送到后面的调用。
- 在
flowOn
**之后**,将这些值从缓冲区发出到Flow
。在本例中,这是ViewModel
中的asLiveData
。
这与 withContext
用于切换调度程序的方式非常相似,但它确实在我们的转换中间引入了缓冲区,从而改变了流的工作方式。由 flowOn
启动的协程允许以比调用方消耗它们更快的速度产生结果,并且默认情况下它会缓冲大量结果。
在本例中,我们计划将结果发送到 UI,因此我们只关心最新的结果。这就是 conflate
操作符的作用——它修改 flowOn
的缓冲区,使其仅存储最后一个结果。如果在读取前一个结果之前有另一个结果进来,它将被覆盖。
运行应用程序
如果您再次运行应用程序,您应该会看到您现在正在加载数据并使用 Flow
应用自定义排序顺序!由于我们尚未实现 switchMap
,因此筛选选项不起作用。
在下一步中,我们将研究另一种使用 flow
提供主线程安全的方法。
12. 在两个流之间切换
为了完成此 API 的 Flow 版本,请打开 PlantListViewModel.kt
,我们将在其中根据 GrowZone
在流之间切换,就像我们在 LiveData
版本中所做的那样。
在 plants
liveData
下添加以下代码。
PlantListViewModel.kt
private val growZoneFlow = MutableStateFlow<GrowZone>(NoGrowZone)
val plantsUsingFlow: LiveData<List<Plant>> = growZoneFlow.flatMapLatest { growZone ->
if (growZone == NoGrowZone) {
plantRepository.plantsFlow
} else {
plantRepository.getPlantsWithGrowZoneFlow(growZone)
}
}.asLiveData()
此模式展示了如何将事件(种植区更改)集成到流中。它与 LiveData.switchMap
版本执行完全相同的事情——根据事件在两个数据源之间切换。
逐步浏览代码
PlantListViewModel.kt
private val growZoneFlow = MutableStateFlow<GrowZone>(NoGrowZone)
这定义了一个新的 MutableStateFlow
,其初始值为 NoGrowZone
。这是一种特殊的 Flow 值持有者,它只保存它收到的最后一个值。它是一个线程安全的并发原语,因此您可以同时从多个线程写入它(并且任何被认为是“最后一个”的线程都将获胜)。
您还可以订阅以获取对当前值的更新。总的来说,它具有与 LiveData
类似的行为——它只是保存最后一个值并允许您观察对其的更改。
PlantListViewModel.kt
val plantsUsingFlow: LiveData<List<Plant>> = growZoneFlow.flatMapLatest { growZone ->
StateFlow
也是一个常规的 Flow
,因此您可以像往常一样使用所有操作符。
这里我们使用了 flatMapLatest
操作符,它与 LiveData
中的 switchMap
完全相同。每当 growZone
更改其值时,将应用此 lambda,并且它必须返回一个 Flow
。然后,返回的 Flow
将被用作所有下游操作符的 Flow
。
基本上,这允许我们根据 growZone
的值在不同的流之间切换。
PlantListViewModel.kt
if (growZone == NoGrowZone) {
plantRepository.plantsFlow
} else {
plantRepository.getPlantsWithGrowZoneFlow(growZone)
}
在 flatMapLatest
内部,我们根据 growZone
进行切换。此代码与 LiveData.switchMap
版本几乎相同,唯一的区别是它返回 Flows
而不是 LiveDatas
。
PlantListViewModel.kt
}.asLiveData()
最后,我们将 Flow
转换为 LiveData
,因为我们的 Fragment
期望我们从 ViewModel
中公开一个 LiveData
。
更改 StateFlow 的值
为了让应用知道过滤器已更改,我们可以设置 MutableStateFlow.value
。这是一种将事件传达给协程的简单方法,就像我们在这里做的那样。
PlantListViewModel.kt
fun setGrowZoneNumber(num: Int) {
growZone.value = GrowZone(num)
growZoneFlow.value = GrowZone(num)
launchDataLoad {
plantRepository.tryUpdateRecentPlantsForGrowZoneCache(GrowZone(num)) }
}
fun clearGrowZoneNumber() {
growZone.value = NoGrowZone
growZoneFlow.value = NoGrowZone
launchDataLoad {
plantRepository.tryUpdateRecentPlantsCache()
}
}
再次运行应用
如果再次运行应用,则过滤器现在对 LiveData
版本和 Flow
版本都有效!
在下一步中,我们将应用自定义排序到 getPlantsWithGrowZoneFlow
。
13. 将样式与流混合
Flow 最令人兴奋的功能之一是对挂起函数的一流支持。 flow
构建器和几乎所有转换都公开了一个 suspend
运算符,它可以调用任何挂起函数。因此,网络和数据库调用的 **主线程安全** 以及多个异步操作的编排可以通过从流内部调用常规挂起函数来完成。
实际上,这允许您自然地将声明式转换与命令式代码混合使用。正如您在此示例中将看到的,在常规的 map 运算符内部,您可以编排多个异步操作,而无需应用任何额外的转换。在很多地方,这会导致比完全声明式方法更简单的代码。
使用挂起函数编排异步工作
为了结束我们对 Flow 的探索,我们将使用挂起运算符应用自定义排序。
打开 PlantRepository.kt
并向 getPlantsWithGrowZoneNumberFlow
添加 map 转换。
PlantRepository.kt
fun getPlantsWithGrowZoneFlow(growZone: GrowZone): Flow<List<Plant>> {
return plantDao.getPlantsWithGrowZoneNumberFlow(growZone.number)
.map { plantList ->
val sortOrderFromNetwork = plantsListSortOrderCache.getOrAwait()
val nextValue = plantList.applyMainSafeSort(sortOrderFromNetwork)
nextValue
}
}
通过依靠常规挂起函数来处理异步工作,此 map 操作是 **主线程安全** 的,即使它组合了两个异步操作。
从数据库返回每个结果时,我们将获取缓存的排序顺序 - 如果它尚未准备好,它将等待异步网络请求。然后,一旦我们有了排序顺序,就可以安全地调用 applyMainSafeSort
,它将在默认调度程序上运行排序。
此代码现在完全是主线程安全的,因为它将主线程安全问题推迟到常规挂起函数。它比在 plantsFlow
中实现的相同转换简单得多。
但是,值得注意的是,它的执行方式会略有不同。每次数据库发出新值时,都会获取缓存的值。这没问题,因为我们在 plantsListSortOrderCache
中正确地缓存了它,但如果它启动了一个新的网络请求,则此实现将发出很多不必要的网络请求。此外,在 .combine
版本中,网络请求和数据库查询 *并发* 运行,而在此版本中,它们按顺序运行。
由于这些差异,没有明确的规则来构建此代码。在许多情况下,使用像我们这里所做的挂起转换是可以的,这使得所有异步操作都按顺序执行。但是,在其他情况下,最好使用运算符来控制并发并提供主线程安全。
14. 使用流控制并发
你快到了!作为最后一步(可选),让我们将网络请求移到基于流的协程中。
这样做,我们将从 onClick
调用的处理程序中删除发出网络调用的逻辑,并从 growZone
驱动它们。这有助于我们创建一个单一的事实来源并避免代码重复 - 没有任何代码可以更改过滤器而不刷新缓存。
打开 PlantListViewModel.kt
,并将此添加到 init 块中
PlantListViewModel.kt
init {
clearGrowZoneNumber()
growZone.mapLatest { growZone ->
_spinner.value = true
if (growZone == NoGrowZone) {
plantRepository.tryUpdateRecentPlantsCache()
} else {
plantRepository.tryUpdateRecentPlantsForGrowZoneCache(growZone)
}
}
.onEach { _spinner.value = false }
.catch { throwable -> _snackbar.value = throwable.message }
.launchIn(viewModelScope)
}
此代码将启动一个新的协程来观察发送到 growZoneChannel
的值。您现在可以注释掉下面方法中的网络调用,因为它们仅适用于 LiveData
版本。
PlantListViewModel.kt
fun setGrowZoneNumber(num: Int) {
growZone.value = GrowZone(num)
growZoneFlow.value = GrowZone(num)
// launchDataLoad {
// plantRepository.tryUpdateRecentPlantsForGrowZoneCache(GrowZone(num))
// }
}
fun clearGrowZoneNumber() {
growZone.value = NoGrowZone
growZoneFlow.value = NoGrowZone
// launchDataLoad {
// plantRepository.tryUpdateRecentPlantsCache()
// }
}
再次运行应用
如果您现在再次运行该应用,您会看到网络刷新现在由 growZone
控制!我们已经大大改进了代码,因为随着更多更改过滤器的方法进入,该通道充当哪个过滤器处于活动状态的单一事实来源。这样,网络请求和当前过滤器永远不会不同步。
逐步浏览代码
让我们一次逐步介绍所有新使用的函数,从外部开始
PlantListViewModel.kt
growZone
// ...
.launchIn(viewModelScope)
这次,我们使用 launchIn
运算符在我们的 ViewModel
中收集流。
运算符 launchIn
创建一个新的协程并收集流中的每个值。它将在提供的 CoroutineScope
中启动 - 在这种情况下,为 viewModelScope
。这很好,因为这意味着当此 ViewModel
被清除时,收集将被取消。
如果没有提供任何其他运算符,这不会做太多事情 - 但由于 Flow 在其所有运算符中都提供了挂起 lambda,因此很容易根据每个值执行异步操作。
PlantListViewModel.kt
.mapLatest { growZone ->
_spinner.value = true
if (growZone == NoGrowZone) {
plantRepository.tryUpdateRecentPlantsCache()
} else {
plantRepository.tryUpdateRecentPlantsForGrowZoneCache(growZone)
}
}
这就是魔力所在 - mapLatest
将为每个值应用此 map 函数。但是,与常规 map
不同,它将为对 map 转换的每次调用启动一个新的协程。然后,如果在前面的协程完成之前,growZoneChannel
发出了一个新值,它将在启动一个新的协程之前取消它。
我们可以使用 mapLatest
为我们控制并发。Flow 转换可以处理它,而不是自己构建取消/重新启动逻辑。与手动编写相同的取消逻辑相比,此代码节省了很多代码和复杂性。
Flow 的取消遵循协程的正常 协作取消规则。
PlantListViewModel.kt
.onEach { _spinner.value = false }
.catch { throwable -> _snackbar.value = throwable.message }
onEach
将在位于其上方的流发出值时每次被调用。在这里,我们使用它在处理完成后重置微调器。
该 catch
运算符将捕获位于其上方的流中抛出的任何异常。它可以向流发出一个新值(如错误状态),将异常重新抛回流,或执行我们这里正在执行的工作。
出现错误时,我们只是告诉我们的 _snackbar
显示错误消息。
总结
此步骤向您展示了如何使用 Flow 控制并发,以及如何在 ViewModel 内部使用 Flow 而无需依赖 UI 观察器。
作为挑战步骤,尝试定义一个函数来封装此流的数据加载,并使用以下签名
fun <T> loadDataFor(source: StateFlow<T>, block: suspend (T) -> Unit) {