使用 Kotlin Flow 和 LiveData 学习高级协程

1. 开始之前

在本 Codelab 中,您将学习如何使用 LiveData 构建器 在 Android 应用中将 Kotlin 协程LiveData 组合起来。我们还将使用 协程异步流,它是协程库中用于表示异步值序列(或流)的一种类型,来实现相同的功能。

您将从一个现有的应用开始,该应用使用 Android 架构组件 构建,并使用 LiveDataRoom 数据库获取对象列表并在 RecyclerView 网格布局中显示它们。

以下是一些代码片段,让您了解您将要做什么。以下是查询 Room 数据库的现有代码

val plants: LiveData<List<Plant>> = plantDao.getPlants()

LiveData 将使用 LiveData 构建器和协程以及额外的排序逻辑进行更新

val plants: LiveData<List<Plant>> = liveData<List<Plant>> {
   val plantsLiveData = plantDao.getPlants()
   val customSortOrder = plantsListSortOrderCache.getOrAwait()
   emitSource(plantsLiveData.map { plantList -> plantList.applySort(customSortOrder) })
}

您还将使用 Flow 实现相同的逻辑

private val customSortFlow = plantsListSortOrderCache::getOrAwait.asFlow()

val plantsFlow: Flow<List<Plant>>
   get() = plantDao.getPlantsFlow()
       .combine(customSortFlow) { plants, sortOrder ->
           plants.applySort(sortOrder)
       }
       .flowOn(defaultDispatcher)
       .conflate()

先决条件

  • 了解架构组件 ViewModelLiveDataRepositoryRoom
  • 了解 Kotlin 语法,包括扩展函数和 Lambda 表达式。
  • 了解 Kotlin 协程。
  • 基本了解如何在 Android 上使用线程,包括主线程、后台线程和回调。

您将做什么

  • 将现有的 LiveData 转换为使用 Kotlin 协程友好的 LiveData 构建器。
  • LiveData 构建器中添加逻辑。
  • 使用 Flow 进行异步操作。
  • 组合 Flow 并转换多个异步源。
  • 使用 Flow 控制并发性。
  • 了解如何在 LiveDataFlow 之间进行选择。

您需要什么

  • Android Studio Arctic Fox。此 Codelab 可能会与其他版本一起使用,但某些内容可能缺失或外观不同。

如果您在完成此 Codelab 时遇到任何问题(代码错误、语法错误、措辞不清等),请通过 Codelab 左下角的“报告错误”链接报告问题。

2. 设置

下载代码

点击以下链接下载此 Codelab 的所有代码

…或者使用以下命令从命令行克隆 GitHub 存储库

$ git clone https://github.com/googlecodelabs/kotlin-coroutines.git

此 Codelab 的代码位于 advanced-coroutines-codelab 目录中。

常见 问题解答

3. 运行起始示例应用

首先,让我们看看起始示例应用是什么样子的。按照以下说明在 Android Studio 中打开示例应用。

  1. 如果您下载了 kotlin-coroutines zip 文件,请解压缩该文件。
  2. 在 Android Studio 中打开 advanced-coroutines-codelab 目录。
  3. 确保在配置下拉列表中选择了 start
  4. 点击 运行 execute.png 按钮,然后选择模拟器或连接您的 Android 设备。设备必须能够运行 Android Lollipop(最低支持的 SDK 为 21)。

应用首次运行时,会出现一个卡片列表,每个卡片显示特定植物的名称和图像

3d457e66c13f2ef8.png

每个 Plant 都有一个 growZoneNumber,该属性表示植物最有可能茁壮成长的区域。用户可以点击筛选图标 ee1895257963ae84.png 在显示所有植物和特定生长区域的植物之间切换,该区域硬编码为 9 区。多次按下筛选按钮以查看其操作。

6d6df01741e0487a.png

架构概述

此应用使用 架构组件MainActivityPlantListFragment 中的 UI 代码与 PlantListViewModel 中的应用逻辑分离。 PlantRepositoryViewModelPlantDao 之间提供桥梁,后者访问 Room 数据库以返回 Plant 对象列表。然后,UI 获取此植物列表并在 RecyclerView 网格布局中显示它们。

在我们开始修改代码之前,让我们快速了解一下数据如何从数据库流向 UI。以下是如何在 ViewModel 中加载植物列表的

PlantListViewModel.kt

val plants: LiveData<List<Plant>> = growZone.switchMap { growZone ->
    if (growZone == NoGrowZone) {
        plantRepository.plants
    } else {
        plantRepository.getPlantsWithGrowZone(growZone)
    }
}

GrowZone 是一个内联类,它仅包含一个表示其区域的 IntNoGrowZone 表示区域不存在,仅用于筛选。

Plant.kt

inline class GrowZone(val number: Int)
val NoGrowZone = GrowZone(-1)

当点击筛选按钮时,growZone 会切换。我们使用 switchMap 来确定要返回的植物列表。

以下是用于从数据库获取植物数据的存储库和数据访问对象 (DAO) 的外观

PlantDao.kt

@Query("SELECT * FROM plants ORDER BY name")
fun getPlants(): LiveData<List<Plant>>

@Query("SELECT * FROM plants WHERE growZoneNumber = :growZoneNumber ORDER BY name")
fun getPlantsWithGrowZoneNumber(growZoneNumber: Int): LiveData<List<Plant>>

PlantRepository.kt

val plants = plantDao.getPlants()

fun getPlantsWithGrowZone(growZone: GrowZone) =
    plantDao.getPlantsWithGrowZoneNumber(growZone.number)

虽然大多数代码修改都在 PlantListViewModelPlantRepository 中,但最好花点时间熟悉一下项目的结构,重点关注植物数据如何通过从数据库到 Fragment 的各个层级显示。在下一步中,我们将修改代码以使用 LiveData 构建器添加自定义排序。

4. 带有自定义排序的植物

植物列表目前按字母顺序显示,但我们希望通过先列出某些植物,然后按字母顺序列出其余植物来更改此列表的顺序。这类似于购物应用在可用购买商品列表顶部显示赞助结果。我们的产品团队希望能够动态更改排序顺序,而无需发布新版本的应用,因此我们将从后端获取要先排序的植物列表。

以下是应用使用自定义排序后的外观

1b9cc46d7c237b7b.png

自定义排序顺序列表包含以下四种植物:橙子、向日葵、葡萄和鳄梨。请注意,它们首先出现在列表中,然后按字母顺序排列其余植物。

现在,如果按下筛选按钮(并且仅显示GrowZone 9 的植物),则向日葵将从列表中消失,因为其GrowZone 不是 9。自定义排序列表中的其他三种植物位于GrowZone 9,因此它们将保留在列表顶部。GrowZone 9 中的唯一其他植物是番茄,它出现在此列表的最后。

1a622557d39c6992.png

让我们开始编写代码来实现自定义排序。

5. 获取排序顺序

我们将首先编写一个挂起函数,从网络获取自定义排序顺序,然后将其缓存到内存中。

将以下内容添加到PlantRepository

PlantRepository.kt

private var plantsListSortOrderCache = 
    CacheOnSuccess(onErrorFallback = { listOf<String>() }) {
        plantService.customPlantSortOrder()
    }

plantsListSortOrderCache 用作自定义排序顺序的内存缓存。如果出现网络错误,它将回退到空列表,以便即使未获取排序顺序,我们的应用仍可以显示数据。

此代码使用sunflower 模块中提供的CacheOnSuccess 实用程序类来处理缓存。通过像这样抽象化实现缓存的细节,应用程序代码可以更简单。由于CacheOnSuccess 已经过充分测试,因此我们无需为我们的存储库编写太多测试来确保正确的行为。在使用kotlinx-coroutines 时,在代码中引入类似的高级抽象是一个好主意。

现在让我们合并一些逻辑以将排序应用于植物列表。

将以下内容添加到PlantRepository:

PlantRepository.kt

private fun List<Plant>.applySort(customSortOrder: List<String>): List<Plant> {
    return sortedBy { plant ->
        val positionForItem = customSortOrder.indexOf(plant.plantId).let { order ->
            if (order > -1) order else Int.MAX_VALUE
        }
        ComparablePair(positionForItem, plant.name)
    }
}

此扩展函数将重新排列列表,将位于customSortOrder 中的Plants 放置在列表的前面。

6. 使用 LiveData 构建逻辑

现在排序逻辑已到位,请使用以下LiveData 构建器 替换plantsgetPlantsWithGrowZone 的代码

PlantRepository.kt

val plants: LiveData<List<Plant>> = liveData<List<Plant>> {
   val plantsLiveData = plantDao.getPlants()
   val customSortOrder = plantsListSortOrderCache.getOrAwait()
   emitSource(plantsLiveData.map {
       plantList -> plantList.applySort(customSortOrder) 
   })
}

fun getPlantsWithGrowZone(growZone: GrowZone) = liveData {
    val plantsGrowZoneLiveData = plantDao.getPlantsWithGrowZoneNumber(growZone.number)
    val customSortOrder = plantsListSortOrderCache.getOrAwait()
    emitSource(plantsGrowZoneLiveData.map { plantList -> 
        plantList.applySort(customSortOrder)
    })
}

现在,如果您运行应用,自定义排序的植物列表应该会出现

1b9cc46d7c237b7b.png

LiveData 构建器允许我们异步计算值,因为liveData 由协程支持。在这里,我们有一个挂起函数,用于从数据库获取LiveData 植物列表,同时还调用一个挂起函数来获取自定义排序顺序。然后,我们将这两个值组合起来对植物列表进行排序并返回该值,所有这些都在构建器中进行。

协程在被观察时开始执行,并在协程成功完成或数据库或网络调用失败时取消。

在下一步中,我们将探讨使用转换getPlantsWithGrowZone 的变体。

7. liveData:修改值

我们现在将修改PlantRepository 以实现一个挂起的转换,因为每个值都在处理,学习如何在LiveData 中构建复杂的异步转换。作为先决条件,让我们创建一个在主线程上安全使用的排序算法版本。我们可以使用withContext 切换到另一个调度程序,仅用于 lambda,然后恢复到我们开始的调度程序。

将以下内容添加到PlantRepository

PlantRepository.kt

@AnyThread
suspend fun List<Plant>.applyMainSafeSort(customSortOrder: List<String>) =
    withContext(defaultDispatcher) {
        this@applyMainSafeSort.applySort(customSortOrder)
    }

然后,我们可以将此新的主线程安全排序与LiveData 构建器一起使用。更新块以使用switchMap,它允许您在每次收到新值时指向一个新的LiveData

PlantRepository.kt

fun getPlantsWithGrowZone(growZone: GrowZone) =
   plantDao.getPlantsWithGrowZoneNumber(growZone.number)
       .switchMap { plantList ->
           liveData {
               val customSortOrder = plantsListSortOrderCache.getOrAwait()
               emit(plantList.applyMainSafeSort(customSortOrder))
           }
       }

与以前的版本相比,一旦从网络收到自定义排序顺序,就可以将其与新的主线程安全applyMainSafeSort 一起使用。然后,此结果作为getPlantsWithGrowZone 返回的新值发出到switchMap

与上面的plants LiveData 类似,协程在被观察时开始执行,并在完成或数据库或网络调用失败时终止。这里的区别在于,在映射中进行网络调用是安全的,因为它已缓存。

现在让我们看看如何使用 Flow 实现此代码,并比较这些实现。

8. 介绍 Flow

我们将使用来自kotlinx-coroutinesFlow 构建相同的逻辑。在这样做之前,让我们先了解一下什么是 Flow 以及如何将其整合到您的应用中。

Flow 是Sequence 的异步版本,Sequence 是一种其值按需生成的集合类型。就像 Sequence 一样,Flow 在需要时按需生成每个值,并且 Flow 可以包含无限数量的值。

那么,Kotlin 为什么引入新的Flow 类型,它与普通 Sequence 有什么不同?答案在于异步的魔力。Flow 完全支持协程。这意味着您可以使用协程构建、转换和使用Flow。您还可以控制并发性,这意味着可以使用Flow 以声明方式协调多个协程的执行。

这开启了许多令人兴奋的可能性。

Flow 可用于完全反应式编程风格。如果您以前使用过RxJava 之类的东西,Flow 提供了类似的功能。通过使用诸如mapflatMapLatestcombine 等函数运算符转换 Flow,可以简洁地表达应用逻辑。

Flow 还支持大多数运算符上的挂起函数。这使您可以在像map 这样的运算符中执行顺序异步任务。通过在 Flow 内部使用挂起操作,与完全反应式风格的等效代码相比,它通常会产生更短且更易于阅读的代码。

在本 Codelab 中,我们将探讨使用这两种方法。

Flow 如何运行

为了习惯 Flow 如何按需(或延迟)生成值,请查看以下发出值(1, 2, 3) 并打印每个项目生成之前、期间和之后的内容的 Flow。

fun makeFlow() = flow {
   println("sending first value")
   emit(1)
   println("first value collected, sending another value")
   emit(2)
   println("second value collected, sending a third value")
   emit(3)
   println("done")
}

scope.launch {
   makeFlow().collect { value ->
       println("got $value")
   }
   println("flow is completed")
}

如果运行此代码,它将产生以下输出

sending first value
got 1
first value collected, sending another value
got 2
second value collected, sending a third value
got 3
done
flow is completed

您可以看到执行如何在collect lambda 和flow 构建器之间弹跳。每次 Flow 构建器调用emit 时,它都会挂起,直到该元素完全处理完毕。然后,当从 Flow 请求另一个值时,它会从上次离开的地方恢复,直到再次调用 emit。当flow 构建器完成时,collect 现在可以完成,并且调用块打印“flow 已完成”。

collect 的调用非常重要。Flow 使用诸如collect 之类的挂起运算符而不是公开Iterator 接口,以便它始终知道何时正在被主动使用。更重要的是,它知道调用者何时无法请求更多值,以便它可以清理资源。

Flow 何时运行

在上例中,Flowcollect 运算符运行时开始运行。通过调用 flow 构建器或其他 API 创建新的 Flow 不会导致任何工作执行。挂起运算符 collectFlow 中被称为 **终端运算符**。还有其他挂起终端运算符,例如 toListfirstsinglekotlinx-coroutines 一起提供,您也可以构建自己的终端运算符。

默认情况下,Flow 将执行:

  • 每次应用终端运算符时(并且每次新的调用都独立于之前启动的任何调用)
  • 直到它正在运行的协程被取消
  • 当最后一个值已完全处理,并且请求了另一个值时

由于这些规则,Flow 可以参与结构化并发,并且可以安全地从 Flow 启动长时间运行的协程。Flow 不会泄漏资源,因为它们始终使用 协程协作取消规则 在调用者被取消时清理。

让我们修改上面的流,只使用 take 运算符查看前两个元素,然后收集它两次。

scope.launch {
   val repeatableFlow = makeFlow().take(2)  // we only care about the first two elements
   println("first collection")
   repeatableFlow.collect()
   println("collecting again")
   repeatableFlow.collect()
   println("second collection completed")
}

运行此代码,您将看到此输出

first collection
sending first value
first value collected, sending another value
collecting again
sending first value
first value collected, sending another value
second collection completed

每次调用 collect 时,flow lambda 都会从顶部开始。如果流执行了昂贵的操作(例如发出网络请求),这一点很重要。此外,由于我们应用了 take(2) 运算符,因此流只会生成两个值。在第二次调用 emit 后,它不会再次恢复流 lambda,因此“second value collected...”行永远不会打印。

9. 使用 flow 进行异步操作

好的,所以 Flow 就像 Sequence 一样是惰性的,但它又是如何异步的呢?让我们看一个异步序列的示例——观察数据库的变化。

在此示例中,我们需要协调在数据库线程池上生成的数据与位于另一个线程(例如主线程或 UI 线程)上的观察者。而且,由于我们将在数据更改时重复发出结果,因此此场景非常适合异步序列模式。

假设您的任务是为 Flow 编写 Room 集成。如果您从 Room 中现有的挂起查询支持开始,您可能会编写如下内容

// This code is a simplified version of how Room implements flow
fun <T> createFlow(query: Query, tables: List<Tables>): Flow<T> = flow {
    val changeTracker = tableChangeTracker(tables)

    while(true) {
        emit(suspendQuery(query))
        changeTracker.suspendUntilChanged()
    }
}

此代码依赖于两个虚构的挂起函数来生成 Flow

  • suspendQuery – 一个主线程安全的函数,用于运行常规的 Room 挂起查询
  • suspendUntilChanged – 一个函数,它会挂起协程,直到其中一个表发生更改

收集时,流最初会 emit 查询的第一个值。一旦处理了该值,流就会恢复并调用 suspendUntilChanged,它将按照其名称所述——挂起流,直到其中一个表发生更改。此时,系统中没有任何事情发生,直到其中一个表发生更改并且流恢复。

当流恢复时,它会发出另一个主线程安全的查询,并 emit 结果。此过程在无限循环中永远持续下去。

Flow 和结构化并发

但是等等——我们不想泄漏工作!协程本身并不昂贵,但它会重复唤醒自身以执行数据库查询。这是一个非常昂贵的泄漏操作。

即使我们创建了一个无限循环,Flow 也会通过支持结构化并发来帮助我们。

使用流消费值或迭代流的唯一方法是使用终端运算符。因为所有终端运算符都是挂起函数,所以工作与调用它们的范围的生命周期绑定。当范围被取消时,流将使用常规的 协程协作取消规则 自动取消自身。因此,即使我们在流构建器中编写了一个无限循环,我们也可以安全地使用它,而不会因结构化并发而导致泄漏。

10. 将 Flow 与 Room 一起使用

在此步骤中,您将学习如何将 FlowRoom 一起使用并将其连接到 UI。

此步骤对于许多 Flow 用法都很常见。以这种方式使用时,来自 RoomFlow 充当类似于 LiveData 的可观察数据库查询。

更新 Dao

首先,打开 PlantDao.kt,并添加两个返回 Flow<List<Plant>> 的新查询

PlantDao.kt

@Query("SELECT * from plants ORDER BY name")
fun getPlantsFlow(): Flow<List<Plant>>

@Query("SELECT * from plants WHERE growZoneNumber = :growZoneNumber ORDER BY name")
fun getPlantsWithGrowZoneNumberFlow(growZoneNumber: Int): Flow<List<Plant>>

请注意,除了返回类型外,这些函数与 LiveData 版本相同。但是,我们将并排开发它们以进行比较。

通过指定 Flow 返回类型,Room 将以以下特征执行查询

  • **主线程安全 –** 具有 Flow 返回类型的查询始终在 Room 执行器上运行,因此它们始终是主线程安全的。您无需在代码中执行任何操作即可使它们在主线程之外运行。
  • **观察更改 –** Room 自动观察更改并将新值发出到流中。
  • **异步序列 –** Flow 在每次更改时发出整个查询结果,并且不会引入任何缓冲区。如果返回 Flow<List<T>>,则流会发出一个包含查询结果中所有行的 List<T>。它将像序列一样执行——一次发出一个查询结果并挂起,直到要求它发出下一个结果。
  • **可取消 –** 当收集这些流的范围被取消时,Room 会取消观察此查询。

总而言之,这使得 Flow 成为从 UI 层观察数据库的绝佳返回类型。

更新存储库

要继续将新返回值连接到 UI,请打开 PlantRepository.kt,并添加以下代码

PlantRepository.kt

val plantsFlow: Flow<List<Plant>>
   get() = plantDao.getPlantsFlow()

fun getPlantsWithGrowZoneFlow(growZoneNumber: GrowZone): Flow<List<Plant>> {
   return plantDao.getPlantsWithGrowZoneNumberFlow(growZoneNumber.number)
}

目前,我们只是将 Flow 值传递给调用方。这与我们开始此 codelab 时将 LiveData 传递给 ViewModel 完全相同。

更新 ViewModel

PlantListViewModel.kt 中,让我们从简单开始,只公开 plantsFlow。我们将在接下来的几个步骤中返回并向流版本添加种植区域切换。

PlantListViewModel.kt

// add a new property to plantListViewModel

val plantsUsingFlow: LiveData<List<Plant>> = plantRepository.plantsFlow.asLiveData()

同样,在进行过程中,我们将保留 LiveData 版本(val plants)以进行比较。

由于我们希望在此 codelab 中在 UI 层保留 LiveData,因此我们将使用 asLiveData 扩展函数将我们的 Flow 转换为 LiveData。就像 LiveData 构建器一样,这会向生成的 LiveData 添加可配置的超时。这很好,因为它可以防止我们在每次配置更改(例如设备旋转)时重新启动查询。

由于 flow 提供了主线程安全性和取消功能,因此您可以选择将 Flow 一直传递到 UI 层,而无需将其转换为 LiveData。但是,对于此 codelab,我们将在 UI 层坚持使用 LiveData

还在 ViewModel 中,向 init 块添加缓存更新。此步骤目前是可选的,但如果您清除缓存并且不添加此调用,则在应用程序中将看不到任何数据。

PlantListViewModel.kt

init {
    clearGrowZoneNumber()  // keep this

    // fetch the full plant list
    launchDataLoad { plantRepository.tryUpdateRecentPlantsCache() }
}

更新 Fragment

打开 PlantListFragment.kt,并将 subscribeUi 函数更改为指向我们的新 plantsUsingFlow LiveData

PlantListFragment.kt

private fun subscribeUi(adapter: PlantAdapter) {
   viewModel.plantsUsingFlow.observe(viewLifecycleOwner) { plants ->
       adapter.submitList(plants)
   }
}

使用 Flow 运行应用程序

如果您再次运行应用程序,您应该会看到您现在正在使用 Flow 加载数据!由于我们尚未实现 switchMap,因此过滤器选项不起作用。

在下一步中,我们将了解如何在 Flow 中转换数据。

11. 声明式组合流

在此步骤中,您将把排序顺序应用于 plantsFlow。我们将使用 flow声明式 API 来实现。

通过使用诸如 mapcombinemapLatest 这样的转换,我们可以声明式地表达我们希望如何在元素流经流程时转换每个元素。它甚至允许我们声明式地表达并发性,这可以真正简化代码。在本节中,您将看到如何使用操作符告诉 Flow 启动两个协程并声明式地组合其结果。

首先,打开 PlantRepository.kt 并定义一个名为 customSortFlow 的新私有流。

PlantRepository.kt

private val customSortFlow = flow { emit(plantsListSortOrderCache.getOrAwait()) }

这定义了一个 Flow,当收集时,它将调用 getOrAwaitemit 排序顺序。

由于此流仅发出单个值,您还可以使用 asFlow 直接从 getOrAwait 函数构建它。

// Create a flow that calls a single function
private val customSortFlow = plantsListSortOrderCache::getOrAwait.asFlow()

此代码创建一个新的 Flow,该流调用 getOrAwait 并将其结果作为第一个也是唯一的值发出。它通过使用 :: 引用 getOrAwait 方法并在结果 Function 对象上调用 asFlow 来实现。

这两个流都执行相同的操作,调用 getOrAwait 并发出结果,然后完成。

声明式地组合多个流

现在我们有两个流,customSortFlowplantsFlow,让我们声明式地组合它们!

plantsFlow 添加 combine 操作符。

PlantRepository.kt

private val customSortFlow = plantsListSortOrderCache::getOrAwait.asFlow()

val plantsFlow: Flow<List<Plant>>
   get() = plantDao.getPlantsFlow()
       // When the result of customSortFlow is available,
       // this will combine it with the latest value from
       // the flow above.  Thus, as long as both `plants`
       // and `sortOrder` are have an initial value (their
       // flow has emitted at least one value), any change 
       // to either `plants` or `sortOrder`  will call
       // `plants.applySort(sortOrder)`.
       .combine(customSortFlow) { plants, sortOrder ->
          plants.applySort(sortOrder) 
       }

combine 操作符将两个流组合在一起。这两个流将在各自的协程中运行,然后每当任何一个流产生新值时,转换将使用来自这两个流的最新值被调用。

通过使用 combine,我们可以将缓存的网络查找与我们的数据库查询结合起来。它们都将在不同的协程上并发运行。这意味着,当 Room 启动网络请求时,Retrofit 可以启动网络查询。然后,一旦两个流都提供结果,它将调用 combine lambda,我们在其中将加载的排序顺序应用于加载的植物。

为了探索 combine 操作符的工作原理,修改 customSortFlow 以在 onStart 中发出两次,并在两次之间有较长的延迟,如下所示。

// Create a flow that calls a single function
private val customSortFlow = plantsListSortOrderCache::getOrAwait.asFlow()
   .onStart {
       emit(listOf())
       delay(1500)
   }

转换 onStart 将在观察者监听其他操作符之前发生,并且它可以发出占位符值。因此,这里我们发出一个空列表,延迟 1500 毫秒调用 getOrAwait,然后继续原始流。如果您现在运行应用程序,您将看到 Room 数据库查询立即返回,并与空列表组合(这意味着它将按字母顺序排序)。然后大约 1500 毫秒后,它应用自定义排序。

在继续学习 Codelab 之前,请从 customSortFlow 中删除 onStart 转换。

Flow 和主线程安全

Flow 可以调用**主线程安全**函数,就像我们在这里所做的那样,它将保留协程的正常主线程安全保证。 RoomRetrofit 都将为我们提供主线程安全,我们不需要做任何其他事情来使用 Flow 进行网络请求或数据库查询。

此流已经使用了以下线程

  • plantService.customPlantSortOrder 在 Retrofit 线程上运行(它调用 Call.enqueue
  • getPlantsFlow 将在 Room Executor 上运行查询
  • applySort 将在收集调度程序(在本例中为 Dispatchers.Main)上运行

因此,如果我们只是在 Retrofit 中调用挂起函数并使用 Room 流,我们就不需要用主线程安全问题来复杂化此代码。

但是,随着我们的数据集大小的增长,对 applySort 的调用可能会变得足够慢以阻塞主线程。Flow 提供了一个名为 flowOn 的声明式 API 来控制流运行的线程。

plantsFlow 添加 flowOn,如下所示。

PlantRepository.kt

private val customSortFlow = plantsListSortOrderCache::getOrAwait.asFlow()

val plantsFlow: Flow<List<Plant>>
   get() = plantDao.getPlantsFlow()
       .combine(customSortFlow) { plants, sortOrder ->
          plants.applySort(sortOrder) 
       }
       .flowOn(defaultDispatcher)
       .conflate()

调用 flowOn 对代码的执行方式有两个重要的影响。

  1. 在调用 flowOn **之前**,在一个新的 defaultDispatcher(在本例中为 Dispatchers.Default)上启动一个新的协程来运行和收集流。
  2. 引入一个缓冲区,用于将结果从新协程发送到后面的调用。
  3. flowOn **之后**,将这些值从缓冲区发出到 Flow。在本例中,这是 ViewModel 中的 asLiveData

这与 withContext 用于切换调度程序的方式非常相似,但它确实在我们的转换中间引入了缓冲区,从而改变了流的工作方式。由 flowOn 启动的协程允许以比调用方消耗它们更快的速度产生结果,并且默认情况下它会缓冲大量结果。

在本例中,我们计划将结果发送到 UI,因此我们只关心最新的结果。这就是 conflate 操作符的作用——它修改 flowOn 的缓冲区,使其仅存储最后一个结果。如果在读取前一个结果之前有另一个结果进来,它将被覆盖。

运行应用程序

如果您再次运行应用程序,您应该会看到您现在正在加载数据并使用 Flow 应用自定义排序顺序!由于我们尚未实现 switchMap,因此筛选选项不起作用。

在下一步中,我们将研究另一种使用 flow 提供主线程安全的方法。

12. 在两个流之间切换

为了完成此 API 的 Flow 版本,请打开 PlantListViewModel.kt,我们将在其中根据 GrowZone 在流之间切换,就像我们在 LiveData 版本中所做的那样。

plants liveData 下添加以下代码。

PlantListViewModel.kt

private val growZoneFlow = MutableStateFlow<GrowZone>(NoGrowZone)

val plantsUsingFlow: LiveData<List<Plant>> = growZoneFlow.flatMapLatest { growZone ->
        if (growZone == NoGrowZone) {
            plantRepository.plantsFlow
        } else {
            plantRepository.getPlantsWithGrowZoneFlow(growZone)
        }
    }.asLiveData()

此模式展示了如何将事件(种植区更改)集成到流中。它与 LiveData.switchMap 版本执行完全相同的事情——根据事件在两个数据源之间切换。

逐步浏览代码

PlantListViewModel.kt

private val growZoneFlow = MutableStateFlow<GrowZone>(NoGrowZone)

这定义了一个新的 MutableStateFlow,其初始值为 NoGrowZone。这是一种特殊的 Flow 值持有者,它只保存它收到的最后一个值。它是一个线程安全的并发原语,因此您可以同时从多个线程写入它(并且任何被认为是“最后一个”的线程都将获胜)。

您还可以订阅以获取对当前值的更新。总的来说,它具有与 LiveData 类似的行为——它只是保存最后一个值并允许您观察对其的更改。

PlantListViewModel.kt

val plantsUsingFlow: LiveData<List<Plant>> = growZoneFlow.flatMapLatest { growZone ->

StateFlow 也是一个常规的 Flow,因此您可以像往常一样使用所有操作符。

这里我们使用了 flatMapLatest 操作符,它与 LiveData 中的 switchMap 完全相同。每当 growZone 更改其值时,将应用此 lambda,并且它必须返回一个 Flow。然后,返回的 Flow 将被用作所有下游操作符的 Flow

基本上,这允许我们根据 growZone 的值在不同的流之间切换。

PlantListViewModel.kt

if (growZone == NoGrowZone) {
    plantRepository.plantsFlow
} else {
    plantRepository.getPlantsWithGrowZoneFlow(growZone)
}

flatMapLatest 内部,我们根据 growZone 进行切换。此代码与 LiveData.switchMap 版本几乎相同,唯一的区别是它返回 Flows 而不是 LiveDatas

PlantListViewModel.kt

   }.asLiveData()

最后,我们将 Flow 转换为 LiveData,因为我们的 Fragment 期望我们从 ViewModel 中公开一个 LiveData

更改 StateFlow 的值

为了让应用知道过滤器已更改,我们可以设置 MutableStateFlow.value。这是一种将事件传达给协程的简单方法,就像我们在这里做的那样。

PlantListViewModel.kt

fun setGrowZoneNumber(num: Int) {
    growZone.value = GrowZone(num)
    growZoneFlow.value = GrowZone(num)

    launchDataLoad {
        plantRepository.tryUpdateRecentPlantsForGrowZoneCache(GrowZone(num)) }
    }

fun clearGrowZoneNumber() {
    growZone.value = NoGrowZone
    growZoneFlow.value = NoGrowZone

    launchDataLoad { 
        plantRepository.tryUpdateRecentPlantsCache() 
    }
}

再次运行应用

如果再次运行应用,则过滤器现在对 LiveData 版本和 Flow 版本都有效!

在下一步中,我们将应用自定义排序到 getPlantsWithGrowZoneFlow

13. 将样式与流混合

Flow 最令人兴奋的功能之一是对挂起函数的一流支持。 flow 构建器和几乎所有转换都公开了一个 suspend 运算符,它可以调用任何挂起函数。因此,网络和数据库调用的 **主线程安全** 以及多个异步操作的编排可以通过从流内部调用常规挂起函数来完成。

实际上,这允许您自然地将声明式转换与命令式代码混合使用。正如您在此示例中将看到的,在常规的 map 运算符内部,您可以编排多个异步操作,而无需应用任何额外的转换。在很多地方,这会导致比完全声明式方法更简单的代码。

使用挂起函数编排异步工作

为了结束我们对 Flow 的探索,我们将使用挂起运算符应用自定义排序。

打开 PlantRepository.kt 并向 getPlantsWithGrowZoneNumberFlow 添加 map 转换。

PlantRepository.kt

fun getPlantsWithGrowZoneFlow(growZone: GrowZone): Flow<List<Plant>> {
   return plantDao.getPlantsWithGrowZoneNumberFlow(growZone.number)
       .map { plantList ->
           val sortOrderFromNetwork = plantsListSortOrderCache.getOrAwait()
           val nextValue = plantList.applyMainSafeSort(sortOrderFromNetwork)
           nextValue
       }
}

通过依靠常规挂起函数来处理异步工作,此 map 操作是 **主线程安全** 的,即使它组合了两个异步操作。

从数据库返回每个结果时,我们将获取缓存的排序顺序 - 如果它尚未准备好,它将等待异步网络请求。然后,一旦我们有了排序顺序,就可以安全地调用 applyMainSafeSort,它将在默认调度程序上运行排序。

此代码现在完全是主线程安全的,因为它将主线程安全问题推迟到常规挂起函数。它比在 plantsFlow 中实现的相同转换简单得多。

但是,值得注意的是,它的执行方式会略有不同。每次数据库发出新值时,都会获取缓存的值。这没问题,因为我们在 plantsListSortOrderCache 中正确地缓存了它,但如果它启动了一个新的网络请求,则此实现将发出很多不必要的网络请求。此外,在 .combine 版本中,网络请求和数据库查询 *并发* 运行,而在此版本中,它们按顺序运行。

由于这些差异,没有明确的规则来构建此代码。在许多情况下,使用像我们这里所做的挂起转换是可以的,这使得所有异步操作都按顺序执行。但是,在其他情况下,最好使用运算符来控制并发并提供主线程安全。

14. 使用流控制并发

你快到了!作为最后一步(可选),让我们将网络请求移到基于流的协程中。

这样做,我们将从 onClick 调用的处理程序中删除发出网络调用的逻辑,并从 growZone 驱动它们。这有助于我们创建一个单一的事实来源并避免代码重复 - 没有任何代码可以更改过滤器而不刷新缓存。

打开 PlantListViewModel.kt,并将此添加到 init 块中

PlantListViewModel.kt

init {
   clearGrowZoneNumber()

   growZone.mapLatest { growZone ->
           _spinner.value = true
           if (growZone == NoGrowZone) {
               plantRepository.tryUpdateRecentPlantsCache()
           } else {
               plantRepository.tryUpdateRecentPlantsForGrowZoneCache(growZone)
           }
       }
       .onEach {  _spinner.value = false }
       .catch { throwable ->  _snackbar.value = throwable.message  }
       .launchIn(viewModelScope)
}

此代码将启动一个新的协程来观察发送到 growZoneChannel 的值。您现在可以注释掉下面方法中的网络调用,因为它们仅适用于 LiveData 版本。

PlantListViewModel.kt

fun setGrowZoneNumber(num: Int) {
    growZone.value = GrowZone(num)
    growZoneFlow.value = GrowZone(num)

    // launchDataLoad { 
    //    plantRepository.tryUpdateRecentPlantsForGrowZoneCache(GrowZone(num))
    // }
}

fun clearGrowZoneNumber() {
    growZone.value = NoGrowZone
    growZoneFlow.value = NoGrowZone

    // launchDataLoad {
    //    plantRepository.tryUpdateRecentPlantsCache()
    // }
}

再次运行应用

如果您现在再次运行该应用,您会看到网络刷新现在由 growZone 控制!我们已经大大改进了代码,因为随着更多更改过滤器的方法进入,该通道充当哪个过滤器处于活动状态的单一事实来源。这样,网络请求和当前过滤器永远不会不同步。

逐步浏览代码

让我们一次逐步介绍所有新使用的函数,从外部开始

PlantListViewModel.kt

growZone
    // ...
    .launchIn(viewModelScope)

这次,我们使用 launchIn 运算符在我们的 ViewModel 中收集流。

运算符 launchIn 创建一个新的协程并收集流中的每个值。它将在提供的 CoroutineScope 中启动 - 在这种情况下,为 viewModelScope。这很好,因为这意味着当此 ViewModel 被清除时,收集将被取消。

如果没有提供任何其他运算符,这不会做太多事情 - 但由于 Flow 在其所有运算符中都提供了挂起 lambda,因此很容易根据每个值执行异步操作。

PlantListViewModel.kt

.mapLatest { growZone ->
    _spinner.value = true
    if (growZone == NoGrowZone) {
        plantRepository.tryUpdateRecentPlantsCache()
    } else {
        plantRepository.tryUpdateRecentPlantsForGrowZoneCache(growZone)
    }
}

这就是魔力所在 - mapLatest 将为每个值应用此 map 函数。但是,与常规 map 不同,它将为对 map 转换的每次调用启动一个新的协程。然后,如果在前面的协程完成之前,growZoneChannel 发出了一个新值,它将在启动一个新的协程之前取消它。

我们可以使用 mapLatest 为我们控制并发。Flow 转换可以处理它,而不是自己构建取消/重新启动逻辑。与手动编写相同的取消逻辑相比,此代码节省了很多代码和复杂性。

Flow 的取消遵循协程的正常 协作取消规则

PlantListViewModel.kt

.onEach {  _spinner.value = false }
.catch { throwable -> _snackbar.value = throwable.message }

onEach 将在位于其上方的流发出值时每次被调用。在这里,我们使用它在处理完成后重置微调器。

catch 运算符将捕获位于其上方的流中抛出的任何异常。它可以向流发出一个新值(如错误状态),将异常重新抛回流,或执行我们这里正在执行的工作。

出现错误时,我们只是告诉我们的 _snackbar 显示错误消息。

总结

此步骤向您展示了如何使用 Flow 控制并发,以及如何在 ViewModel 内部使用 Flow 而无需依赖 UI 观察器。

作为挑战步骤,尝试定义一个函数来封装此流的数据加载,并使用以下签名

fun <T> loadDataFor(source: StateFlow<T>, block: suspend (T) -> Unit) {