使用 Kotlin Flow 和 LiveData 学习高级协程

1. 开始之前

在这个 codelab 中,您将学习如何使用 LiveData 构建器 在 Android 应用中将 Kotlin 协程LiveData 结合起来。我们还将使用 协程异步流,它是一种来自协程库的类型,用于表示异步值的序列(或流),以实现相同的事情。

您将从一个现有的应用开始,该应用使用 Android 架构组件 构建,它使用 LiveDataRoom 数据库获取对象列表,并将其显示在 RecyclerView 网格布局中。

以下是一些代码片段,让您了解您将要做什么。这是现有代码,用于查询 Room 数据库

val plants: LiveData<List<Plant>> = plantDao.getPlants()

LiveData 将使用 LiveData 构建器和协程以及额外的排序逻辑进行更新

val plants: LiveData<List<Plant>> = liveData<List<Plant>> {
   val plantsLiveData = plantDao.getPlants()
   val customSortOrder = plantsListSortOrderCache.getOrAwait()
   emitSource(plantsLiveData.map { plantList -> plantList.applySort(customSortOrder) })
}

您还将使用 Flow 实现相同的逻辑

private val customSortFlow = plantsListSortOrderCache::getOrAwait.asFlow()

val plantsFlow: Flow<List<Plant>>
   get() = plantDao.getPlantsFlow()
       .combine(customSortFlow) { plants, sortOrder ->
           plants.applySort(sortOrder)
       }
       .flowOn(defaultDispatcher)
       .conflate()

先决条件

  • 熟悉架构组件 ViewModelLiveDataRepositoryRoom
  • 熟悉 Kotlin 语法,包括扩展函数和 Lambda 表达式。
  • 熟悉 Kotlin 协程。
  • 基本了解在 Android 上使用线程,包括主线程、后台线程和回调。

您将做些什么

  • 将现有的 LiveData 转换为使用 Kotlin 协程友好的 LiveData 构建器。
  • LiveData 构建器中添加逻辑。
  • 使用 Flow 进行异步操作。
  • 组合 Flows 并转换多个异步源。
  • 使用 Flows 控制并发。
  • 学习如何选择 LiveDataFlow.

您需要什么

  • Android Studio Arctic Fox。此 codelab 可能会与其他版本一起使用,但某些内容可能缺失或外观不同。

如果您在完成本 codelab 时遇到任何问题(代码错误、语法错误、措辞不清等),请通过 codelab 左下角的“报告错误”链接报告问题。

2. 设置

下载代码

点击以下链接下载本 codelab 的所有代码

... 或者使用以下命令从命令行克隆 GitHub 存储库

$ git clone https://github.com/googlecodelabs/kotlin-coroutines.git

本 codelab 的代码位于 advanced-coroutines-codelab 目录中。

常见问题

3. 运行起始示例应用

首先,让我们看看起始示例应用是什么样子。按照以下说明在 Android Studio 中打开示例应用。

  1. 如果您下载了 kotlin-coroutines zip 文件,请解压缩该文件。
  2. 在 Android Studio 中打开 advanced-coroutines-codelab 目录。
  3. 确保在配置下拉菜单中选择了 start
  4. 点击 运行 execute.png 按钮,然后选择模拟设备或连接您的 Android 设备。该设备必须能够运行 Android Lollipop(最低支持的 SDK 为 21)。

应用首次运行时,会显示一个卡片列表,每个卡片显示特定植物的名称和图像

3d457e66c13f2ef8.png

每个 Plant 都有一个 growZoneNumber,该属性表示植物最有可能在其中茁壮成长的区域。用户可以点击过滤器图标 ee1895257963ae84.png 在显示所有植物和特定生长区域的植物之间切换,该生长区域硬编码为 9 区。多次按下过滤器按钮以查看此操作。

6d6df01741e0487a.png

架构概述

此应用使用 架构组件MainActivityPlantListFragment 中的 UI 代码与 PlantListViewModel 中的应用逻辑分离。 PlantRepositoryViewModelPlantDao 之间架起一座桥梁,PlantDao 访问 Room 数据库以返回 Plant 对象列表。然后,UI 获取此植物列表,并在 RecyclerView 网格布局中显示它们。

在我们开始修改代码之前,让我们快速了解一下数据如何从数据库流向 UI。以下是 ViewModel 中加载植物列表的方式

PlantListViewModel.kt

val plants: LiveData<List<Plant>> = growZone.switchMap { growZone ->
    if (growZone == NoGrowZone) {
        plantRepository.plants
    } else {
        plantRepository.getPlantsWithGrowZone(growZone)
    }
}

GrowZone 是一个内联类,它只包含一个表示其区域的 IntNoGrowZone 代表没有区域,仅用于过滤。

Plant.kt

inline class GrowZone(val number: Int)
val NoGrowZone = GrowZone(-1)

点击过滤器按钮时,growZone 会切换。我们使用 switchMap 来确定要返回的植物列表。

以下是从数据库获取植物数据时,仓库和数据访问对象 (DAO) 的样子。

PlantDao.kt

@Query("SELECT * FROM plants ORDER BY name")
fun getPlants(): LiveData<List<Plant>>

@Query("SELECT * FROM plants WHERE growZoneNumber = :growZoneNumber ORDER BY name")
fun getPlantsWithGrowZoneNumber(growZoneNumber: Int): LiveData<List<Plant>>

PlantRepository.kt

val plants = plantDao.getPlants()

fun getPlantsWithGrowZone(growZone: GrowZone) =
    plantDao.getPlantsWithGrowZoneNumber(growZone.number)

虽然大多数代码修改是在 PlantListViewModelPlantRepository 中进行的,但花点时间熟悉一下项目的结构是个好主意,重点关注植物数据是如何从数据库到 Fragment 的各个层级进行传递的。在下一步,我们将修改代码,使用 LiveData 生成器添加自定义排序。

4. 具有自定义排序的植物

植物列表目前按字母顺序显示,但我们希望通过先列出某些植物,然后按字母顺序列出其余植物来更改此列表的顺序。这类似于购物应用程序在购买商品列表的顶部显示赞助结果。我们的产品团队希望能够在不发布新版本应用程序的情况下动态更改排序顺序,因此我们将从后端获取要排序的植物列表。

以下是应用程序使用自定义排序后的样子。

1b9cc46d7c237b7b.png

自定义排序顺序列表包含以下四种植物:橙子、向日葵、葡萄和鳄梨。请注意它们是如何出现在列表中最前面,然后是其他植物按字母顺序排列。

现在,如果按下过滤器按钮(并且只显示 GrowZone 9 的植物),向日葵会从列表中消失,因为它的 GrowZone 不是 9。自定义排序列表中的其他三种植物位于 GrowZone 9 中,因此它们将保持在列表的顶部。 GrowZone 9 中的唯一其他植物是番茄,它出现在此列表的最后面。

1a622557d39c6992.png

让我们开始编写代码来实现自定义排序。

5. 获取排序顺序

我们将从编写一个挂起函数开始,该函数从网络获取自定义排序顺序,然后将其缓存到内存中。

将以下内容添加到 PlantRepository 中。

PlantRepository.kt

private var plantsListSortOrderCache = 
    CacheOnSuccess(onErrorFallback = { listOf<String>() }) {
        plantService.customPlantSortOrder()
    }

plantsListSortOrderCache 用作自定义排序顺序的内存缓存。如果出现网络错误,它将回退到空列表,这样即使未获取排序顺序,我们的应用程序也能显示数据。

此代码使用 sunflower 模块中提供的 CacheOnSuccess 实用程序类来处理缓存。通过像这样抽象掉实现缓存的细节,应用程序代码可以更加直接。由于 CacheOnSuccess 已经过良好测试,因此我们不需要为我们的仓库编写太多测试以确保其行为正确。在使用 kotlinx-coroutines 时,在您的代码中引入类似的更高级别的抽象是个好主意。

现在让我们加入一些逻辑,将排序应用到植物列表中。

将以下内容添加到 PlantRepository:

PlantRepository.kt

private fun List<Plant>.applySort(customSortOrder: List<String>): List<Plant> {
    return sortedBy { plant ->
        val positionForItem = customSortOrder.indexOf(plant.plantId).let { order ->
            if (order > -1) order else Int.MAX_VALUE
        }
        ComparablePair(positionForItem, plant.name)
    }
}

此扩展函数将重新排列列表,将位于 customSortOrder 中的 Plants 放置在列表的最前面。

6. 使用 LiveData 构建逻辑

现在排序逻辑已经就绪,请将 plantsgetPlantsWithGrowZone 的代码替换为以下 LiveData 生成器

PlantRepository.kt

val plants: LiveData<List<Plant>> = liveData<List<Plant>> {
   val plantsLiveData = plantDao.getPlants()
   val customSortOrder = plantsListSortOrderCache.getOrAwait()
   emitSource(plantsLiveData.map {
       plantList -> plantList.applySort(customSortOrder) 
   })
}

fun getPlantsWithGrowZone(growZone: GrowZone) = liveData {
    val plantsGrowZoneLiveData = plantDao.getPlantsWithGrowZoneNumber(growZone.number)
    val customSortOrder = plantsListSortOrderCache.getOrAwait()
    emitSource(plantsGrowZoneLiveData.map { plantList -> 
        plantList.applySort(customSortOrder)
    })
}

现在,如果运行应用程序,自定义排序的植物列表应该会显示出来。

1b9cc46d7c237b7b.png

LiveData 生成器允许我们异步计算值,因为 liveData 由协程支持。这里我们有一个挂起函数,用于从数据库获取 LiveData 植物列表,同时还调用一个挂起函数来获取自定义排序顺序。然后,我们将这两个值合并起来对植物列表进行排序,并在生成器内返回该值。

协程在被观察时开始执行,并在协程成功完成或数据库或网络调用失败时被取消。

在下一步,我们将探索使用 转换getPlantsWithGrowZone 的变体。

7. liveData:修改值

我们现在将修改 PlantRepository,以在处理每个值时实现一个挂起的 转换,学习如何在 LiveData 中构建复杂的异步转换。作为先决条件,让我们创建一个排序算法的版本,该版本可以安全地在主线程上使用。我们可以使用 withContext 只为 lambda 切换到另一个调度器,然后在我们开始使用的调度器上恢复。

将以下内容添加到 PlantRepository 中。

PlantRepository.kt

@AnyThread
suspend fun List<Plant>.applyMainSafeSort(customSortOrder: List<String>) =
    withContext(defaultDispatcher) {
        [email protected](customSortOrder)
    }

然后,我们可以将这个新的主线程安全的排序与 LiveData 生成器一起使用。更新块以使用 switchMap,它将允许您在每次收到新值时指向新的 LiveData

PlantRepository.kt

fun getPlantsWithGrowZone(growZone: GrowZone) =
   plantDao.getPlantsWithGrowZoneNumber(growZone.number)
       .switchMap { plantList ->
           liveData {
               val customSortOrder = plantsListSortOrderCache.getOrAwait()
               emit(plantList.applyMainSafeSort(customSortOrder))
           }
       }

与之前的版本相比,一旦从网络接收到自定义排序顺序,就可以将其与新的主线程安全的 applyMainSafeSort 一起使用。然后,该结果作为 getPlantsWithGrowZone 返回的新值发射到 switchMap 中。

与上面的 plants LiveData 类似,协程在被观察时开始执行,并在完成或数据库或网络调用失败时终止。这里的区别在于,在映射中进行网络调用是安全的,因为它被缓存了。

现在让我们看看如何使用 Flow 实现这段代码,并比较这些实现。

8. 介绍 Flow

我们将使用 Flow(来自 kotlinx-coroutines)构建相同的逻辑。在我们这样做之前,让我们看一下什么是流以及如何将其整合到您的应用程序中。

流是 序列 的异步版本,序列是一种类型的值按需生成的集合。就像序列一样,流会在需要时按需生成每个值,并且流可以包含无限数量的值。

那么,Kotlin 为什么引入了一个新的 Flow 类型,它与普通序列有什么区别呢?答案在于异步性的魔力。 Flow 包含对协程的完全支持。这意味着您可以使用协程构建、转换和使用 Flow。您还可以控制并发,这意味着使用 Flow 以声明方式协调多个协程的执行。

这打开了大量激动人心的可能性。

Flow 可用于完全反应式编程风格。如果您以前使用过类似于 RxJava 的东西,那么 Flow 提供了类似的功能。可以通过使用函数运算符(例如 mapflatMapLatestcombine 等)转换流来简洁地表达应用程序逻辑。

Flow 还支持大多数运算符上的挂起函数。这使您可以在类似于 map 的运算符内执行顺序异步任务。通过在流中使用挂起操作,它通常会产生比完全反应式风格中的等效代码更短、更容易阅读的代码。

在这个 Codelab 中,我们将探讨使用这两种方法。

Flow 如何运行

为了习惯 Flow 如何按需(或延迟地)生成值,请查看以下发射值 (1, 2, 3) 并在每个项目生成之前、期间和之后打印的流。

fun makeFlow() = flow {
   println("sending first value")
   emit(1)
   println("first value collected, sending another value")
   emit(2)
   println("second value collected, sending a third value")
   emit(3)
   println("done")
}

scope.launch {
   makeFlow().collect { value ->
       println("got $value")
   }
   println("flow is completed")
}

如果您运行此代码,它将生成以下输出。

sending first value
got 1
first value collected, sending another value
got 2
second value collected, sending a third value
got 3
done
flow is completed

您可以看到执行是如何在 collect lambda 和 flow 生成器之间来回切换的。每次流生成器调用 emit 时,它都会 挂起,直到元素完全处理完毕。然后,当从流中请求另一个值时,它将从上次离开的地方 恢复,直到再次调用 emit。当 flow 生成器完成时, collect 现在可以完成,调用块将打印 "flow is completed"。

collect 的调用非常重要。 Flow 使用挂起运算符(如 collect)而不是公开 Iterator 接口,以便它始终知道何时被积极使用。更重要的是,它知道何时调用方无法请求更多值,因此它可以清理资源。

流何时运行

上面示例中的 Flowcollect 运算符运行时开始运行。通过调用 flow 构建器或其他 API 创建新的 Flow 不会导致任何工作执行。挂起运算符 collectFlow 中被称为 **终止运算符**。还有其他挂起终止运算符,例如 toListfirstsingle,它们随 kotlinx-coroutines 一起提供,您也可以构建自己的运算符。

默认情况下,Flow 将执行 :

  • 每次应用终止运算符时(并且每次新的调用都独立于之前启动的调用)
  • 直到它正在运行的协程被取消
  • 当最后一个值已完全处理,并且请求了另一个值时

由于这些规则,Flow 可以参与结构化并发,并且从 Flow 启动长时间运行的协程是安全的。Flow 不会泄漏资源,因为它们总是使用 协程协作取消规则 在调用者被取消时被清理。

让我们修改上面的流,只使用 take 运算符查看前两个元素,然后收集它两次。

scope.launch {
   val repeatableFlow = makeFlow().take(2)  // we only care about the first two elements
   println("first collection")
   repeatableFlow.collect()
   println("collecting again")
   repeatableFlow.collect()
   println("second collection completed")
}

运行此代码,您将看到此输出

first collection
sending first value
first value collected, sending another value
collecting again
sending first value
first value collected, sending another value
second collection completed

每次调用 collect 时,flow lambda 都从顶部开始。如果流执行了像进行网络请求这样的昂贵操作,这一点很重要。此外,由于我们应用了 take(2) 运算符,流将只产生两个值。它不会在第二次调用 emit 后再次恢复流 lambda,因此行“second value collected…”永远不会打印。

9. 使用流进行异步操作

好的,所以 FlowSequence 一样是惰性的,但它如何也异步呢?让我们看一个异步序列的示例——观察数据库的变化。

在这个示例中,我们需要协调在数据库线程池上生成的数据与生活在另一个线程上的观察者,例如主线程或 UI 线程。并且,由于我们将随着数据变化重复发出结果,因此此场景非常适合异步序列模式。

想象一下,您被要求为 Room 编写 Flow 集成。如果您从 Room 中现有的挂起查询支持开始,您可能会编写类似以下代码

// This code is a simplified version of how Room implements flow
fun <T> createFlow(query: Query, tables: List<Tables>): Flow<T> = flow {
    val changeTracker = tableChangeTracker(tables)

    while(true) {
        emit(suspendQuery(query))
        changeTracker.suspendUntilChanged()
    }
}

此代码依赖于两个虚构的挂起函数来生成 Flow

  • suspendQuery——一个主线程安全的函数,它运行常规的 Room 挂起查询
  • suspendUntilChanged——一个函数,它挂起协程,直到其中一个表发生更改

当收集时,流最初 emits 查询的第一个值。一旦该值被处理,流将恢复并调用 suspendUntilChanged,它将按字面意思执行——挂起流,直到其中一个表发生更改。此时,系统中没有任何事情发生,直到其中一个表发生更改并且流恢复。

当流恢复时,它将执行另一个主线程安全的查询,并 emits 结果。此过程将在无限循环中无限期地继续。

流和结构化并发

但是等等——我们不想泄漏工作!协程本身并不昂贵,但它会反复唤醒自己以执行数据库查询。这是一个非常昂贵的泄漏操作。

即使我们创建了一个无限循环,Flow 通过支持结构化并发帮助我们。

使用流消费值或迭代流的唯一方法是使用终止运算符。因为所有终止运算符都是挂起函数,所以工作与调用它们的范围的生存期绑定。当范围被取消时,流将使用常规的 协程协作取消规则 自动取消自身。因此,即使我们在流构建器中编写了一个无限循环,我们也可以在没有泄漏的情况下安全地使用它,因为结构化并发。

10. 使用流与 Room

在这一步中,您将学习如何使用 FlowRoom 以及如何将其连接到 UI。

此步骤在 Flow 的许多使用方式中很常见。当以这种方式使用时,来自 RoomFlow 充当类似于 LiveData 的可观察数据库查询。

更新 Dao

首先,打开 PlantDao.kt,添加两个返回 Flow<List<Plant>> 的新查询

PlantDao.kt

@Query("SELECT * from plants ORDER BY name")
fun getPlantsFlow(): Flow<List<Plant>>

@Query("SELECT * from plants WHERE growZoneNumber = :growZoneNumber ORDER BY name")
fun getPlantsWithGrowZoneNumberFlow(growZoneNumber: Int): Flow<List<Plant>>

注意,除了返回类型之外,这些函数与 LiveData 版本相同。但是,我们将并排开发它们,以进行比较。

通过指定 Flow 返回类型,Room 使用以下特征执行查询

  • 主线程安全性——具有 Flow 返回类型的查询始终在 Room 执行器上运行,因此它们始终是主线程安全的。您无需在代码中执行任何操作即可使它们在主线程之外运行。
  • 观察更改——Room 自动观察更改并将新值发出到流。
  • 异步序列——Flow 在每次更改时发出整个查询结果,并且不会引入任何缓冲区。如果您返回 Flow<List<T>>,流将发出一个包含查询结果中所有行的 List<T>。它将像序列一样执行——一次发出一个查询结果,并挂起,直到它被要求发出下一个结果。
  • 可取消——当收集这些流的范围被取消时,Room 将取消观察此查询。

综上所述,这使得 Flow 成为从 UI 层观察数据库的绝佳返回类型。

更新存储库

要继续将新的返回值连接到 UI,请打开 PlantRepository.kt,并添加以下代码

PlantRepository.kt

val plantsFlow: Flow<List<Plant>>
   get() = plantDao.getPlantsFlow()

fun getPlantsWithGrowZoneFlow(growZoneNumber: GrowZone): Flow<List<Plant>> {
   return plantDao.getPlantsWithGrowZoneNumberFlow(growZoneNumber.number)
}

目前,我们只是将 Flow 值传递给调用者。这与我们使用此代码实验室开始时将 LiveData 传递给 ViewModel 完全相同。

更新 ViewModel

PlantListViewModel.kt 中,让我们从简单开始,只公开 plantsFlow。我们将在接下来的几步中返回并为流版本添加生长区域切换。

PlantListViewModel.kt

// add a new property to plantListViewModel

val plantsUsingFlow: LiveData<List<Plant>> = plantRepository.plantsFlow.asLiveData()

同样,我们将保留 LiveData 版本(val plants)以便在进行时进行比较。

由于我们希望在此代码实验室中将 LiveData 保留在 UI 层,因此我们将使用 asLiveData 扩展函数将我们的 Flow 转换为 LiveData。就像 LiveData 构建器一样,这会为生成的 LiveData 添加一个可配置的超时。这很好,因为它可以防止我们在每次配置更改时(例如设备旋转)重新启动查询。

由于流提供主线程安全性以及取消的能力,您可以选择将 Flow 一直传递到 UI 层,而无需将其转换为 LiveData。但是,对于此代码实验室,我们将坚持在 UI 层使用 LiveData

还在 ViewModel 中,将缓存更新添加到 init 块。此步骤目前是可选的,但如果您清除缓存而不添加此调用,您将不会在应用程序中看到任何数据。

PlantListViewModel.kt

init {
    clearGrowZoneNumber()  // keep this

    // fetch the full plant list
    launchDataLoad { plantRepository.tryUpdateRecentPlantsCache() }
}

更新片段

打开 PlantListFragment.kt,并将 subscribeUi 函数更改为指向我们新的 plantsUsingFlow LiveData

PlantListFragment.kt

private fun subscribeUi(adapter: PlantAdapter) {
   viewModel.plantsUsingFlow.observe(viewLifecycleOwner) { plants ->
       adapter.submitList(plants)
   }
}

使用流运行应用程序

如果你再次运行应用程序,你应该看到你现在使用 Flow 加载数据!由于我们还没有实现 switchMap,过滤选项没有任何作用。

在下一步中,我们将看看如何将数据转换为 Flow

11. 声明式地组合流

在本步骤中,您将把排序顺序应用于 plantsFlow。我们将使用 flow声明式 API 来实现这一点。

通过使用 mapcombinemapLatest 等转换,我们可以声明式地表达我们希望如何转换每个元素,因为它在流中移动。它甚至让我们能够以声明式的方式表达并发,这可以真正简化代码。在本节中,您将看到如何使用操作符告诉 Flow 声明式地启动两个协程并组合它们的结果。

要开始,请打开 PlantRepository.kt 并定义一个名为 customSortFlow 的新私有流

PlantRepository.kt

private val customSortFlow = flow { emit(plantsListSortOrderCache.getOrAwait()) }

这定义了一个 Flow,当收集时,它将调用 getOrAwaitemit 排序顺序。

由于此流仅发出单个值,您还可以使用 asFlow 直接从 getOrAwait 函数构建它。

// Create a flow that calls a single function
private val customSortFlow = plantsListSortOrderCache::getOrAwait.asFlow()

此代码创建一个新的 Flow,它调用 getOrAwait 并将其结果作为第一个也是唯一的 value 发出。它通过使用 :: 引用 getOrAwait 方法并在结果 Function 对象上调用 asFlow 来实现这一点。

这两个流都做同样的事情,调用 getOrAwait 并发出结果,然后完成。

声明式地组合多个流

现在我们有了两个流,customSortFlowplantsFlow,让我们声明式地将它们组合起来!

plantsFlow 添加 combine 操作符

PlantRepository.kt

private val customSortFlow = plantsListSortOrderCache::getOrAwait.asFlow()

val plantsFlow: Flow<List<Plant>>
   get() = plantDao.getPlantsFlow()
       // When the result of customSortFlow is available,
       // this will combine it with the latest value from
       // the flow above.  Thus, as long as both `plants`
       // and `sortOrder` are have an initial value (their
       // flow has emitted at least one value), any change 
       // to either `plants` or `sortOrder`  will call
       // `plants.applySort(sortOrder)`.
       .combine(customSortFlow) { plants, sortOrder ->
          plants.applySort(sortOrder) 
       }

combine 操作符将两个流组合在一起。这两个流将在它们自己的协程中运行,然后每当任何一个流产生一个新值时,转换将使用来自任何一个流的最新值被调用。

通过使用 combine,我们可以将缓存的网络查找与我们的数据库查询结合起来。它们都将在不同的协程上并发运行。这意味着当 Room 启动网络请求时,Retrofit 可以启动网络查询。然后,一旦两个流都准备好了结果,它将调用 combine lambda,我们在其中将加载的排序顺序应用于加载的植物。

要探索 combine 操作符的工作原理,请修改 customSortFlow,使其在 onStart 中发出两次,并具有相当长的延迟,如下所示

// Create a flow that calls a single function
private val customSortFlow = plantsListSortOrderCache::getOrAwait.asFlow()
   .onStart {
       emit(listOf())
       delay(1500)
   }

转换 onStart 将在观察者在其他操作符之前监听时发生,它可以发出占位符值。因此,这里我们发出一个空列表,延迟 1500 毫秒调用 getOrAwait,然后继续原始流。如果您现在运行应用程序,您将看到 Room 数据库查询会立即返回,并与空列表组合(这意味着它将按字母顺序排序)。然后大约 1500 毫秒后,它将应用自定义排序。

在继续本 Codelab 之前,请从 customSortFlow 中删除 onStart 转换。

Flow 和主线程安全

Flow 可以调用主线程安全函数,就像我们在这里做的那样,它将保留协程的正常主线程安全保证。RoomRetrofit 都将为我们提供主线程安全,我们不需要做任何其他事情来使用 Flow 进行网络请求或数据库查询。

此流已经使用了以下线程

  • plantService.customPlantSortOrder 在 Retrofit 线程上运行(它调用 Call.enqueue
  • getPlantsFlow 将在 Room Executor 上运行查询
  • applySort 将在收集调度器(在本例中为 Dispatchers.Main)上运行

因此,如果我们只是在 Retrofit 中调用挂起函数并使用 Room 流,我们就无需用主线程安全问题来复杂化此代码。

但是,随着我们的数据集的大小增长,对 applySort 的调用可能会变得足够慢以至于阻塞主线程。 Flow 提供了一个名为 flowOn 的声明式 API 来控制流在哪个线程上运行。

flowOn 添加到 plantsFlow,如下所示

PlantRepository.kt

private val customSortFlow = plantsListSortOrderCache::getOrAwait.asFlow()

val plantsFlow: Flow<List<Plant>>
   get() = plantDao.getPlantsFlow()
       .combine(customSortFlow) { plants, sortOrder ->
          plants.applySort(sortOrder) 
       }
       .flowOn(defaultDispatcher)
       .conflate()

调用 flowOn 对代码执行方式有两个重要的影响

  1. 在调用 flowOn 之前,在 defaultDispatcher(在本例中为 Dispatchers.Default)上启动一个新协程来运行和收集流。
  2. 引入一个缓冲区,将结果从新协程发送到后面的调用。
  3. flowOn 之后,将这些缓冲区中的值发出到 Flow 中。在本例中,这是 ViewModel 中的 asLiveData

这与使用 withContext 切换调度器的方式非常相似,但它确实在我们的转换中间引入了缓冲区,这会改变流的工作方式。由 flowOn 启动的协程被允许以比调用者消耗速度更快的速度产生结果,并且默认情况下它将缓冲大量结果。

在本例中,我们计划将结果发送到 UI,因此我们只关心最新的结果。这就是 conflate 操作符的作用——它修改 flowOn 的缓冲区,以仅存储最后一个结果。如果在读取上一个结果之前有另一个结果进来,它将被覆盖。

运行应用程序

如果你再次运行应用程序,你应该看到你现在正在使用 Flow 加载数据并应用自定义排序顺序!由于我们还没有实现 switchMap,过滤选项没有任何作用。

在下一步中,我们将看看另一种使用 flow 提供主线程安全的方法。

12. 在两个流之间切换

要完成此 API 的流版本,请打开 PlantListViewModel.kt,我们将根据 GrowZone 在流之间切换,就像我们在 LiveData 版本中做的那样。

plants liveData 下面添加以下代码

PlantListViewModel.kt

private val growZoneFlow = MutableStateFlow<GrowZone>(NoGrowZone)

val plantsUsingFlow: LiveData<List<Plant>> = growZoneFlow.flatMapLatest { growZone ->
        if (growZone == NoGrowZone) {
            plantRepository.plantsFlow
        } else {
            plantRepository.getPlantsWithGrowZoneFlow(growZone)
        }
    }.asLiveData()

此模式展示了如何将事件(生长区域更改)集成到流中。它与 LiveData.switchMap 版本的功能完全相同——根据事件在两个数据源之间切换。

逐步执行代码

PlantListViewModel.kt

private val growZoneFlow = MutableStateFlow<GrowZone>(NoGrowZone)

这定义了一个新的 MutableStateFlow,其初始值为 NoGrowZone。这是一种特殊的 Flow 值持有者,它只保存它收到的最后一个值。它是一个线程安全的并发原语,因此您可以从多个线程同时写入它(并且被认为是“最后一个”的那个将获胜)。

您也可以订阅以获取对当前值的更新。总的来说,它的行为类似于 LiveData——它只是保存最后一个值,并允许您观察对它的更改。

PlantListViewModel.kt

val plantsUsingFlow: LiveData<List<Plant>> = growZoneFlow.flatMapLatest { growZone ->

StateFlow 也是一个普通的 Flow,因此您可以像往常一样使用所有操作符。

这里我们使用了 flatMapLatest 操作符,它与 LiveData 中的 switchMap 完全相同。每当 growZone 更改其值时,此 lambda 将被应用,并且它必须返回一个 Flow。然后,返回的 Flow 将被用作所有下游操作符的 Flow

基本上,这使我们能够根据 growZone 的值在不同的流之间切换。

PlantListViewModel.kt

if (growZone == NoGrowZone) {
    plantRepository.plantsFlow
} else {
    plantRepository.getPlantsWithGrowZoneFlow(growZone)
}

flatMapLatest 内部,我们根据 growZone 切换。此代码与 LiveData.switchMap 版本几乎相同,唯一的区别是它返回 Flows 而不是 LiveDatas

PlantListViewModel.kt

   }.asLiveData()

最后,我们将 Flow 转换为 LiveData,因为我们的 Fragment 期望我们从 ViewModel 公开一个 LiveData

更改 StateFlow 的值

为了让应用程序知道过滤器发生了变化,我们可以设置 MutableStateFlow.value。这是一种将事件传递到协程的简单方法,就像我们在这里所做的那样。

PlantListViewModel.kt

fun setGrowZoneNumber(num: Int) {
    growZone.value = GrowZone(num)
    growZoneFlow.value = GrowZone(num)

    launchDataLoad {
        plantRepository.tryUpdateRecentPlantsForGrowZoneCache(GrowZone(num)) }
    }

fun clearGrowZoneNumber() {
    growZone.value = NoGrowZone
    growZoneFlow.value = NoGrowZone

    launchDataLoad { 
        plantRepository.tryUpdateRecentPlantsCache() 
    }
}

再次运行应用程序

如果你再次运行应用程序,过滤器现在对 LiveData 版本和 Flow 版本都有效!

在下一步中,我们将把自定义排序应用于 getPlantsWithGrowZoneFlow

13. 将流与样式混合

Flow 最令人兴奋的功能之一是对挂起函数的一流支持。 flow 构建器和几乎所有转换都公开了一个 suspend 运算符,它可以调用任何挂起函数。因此,网络和数据库调用的 **主线程安全** 以及编排多个异步操作可以使用流内部对常规挂起函数的调用来完成。

实际上,这允许你自然地将声明式转换与命令式代码混合使用。正如你将在本示例中看到的,在一个常规的 map 运算符内,你可以编排多个异步操作,而无需应用任何额外的转换。在很多地方,这会导致比完全声明式方法更简单的代码。

使用挂起函数来编排异步工作

为了完成对 Flow 的探索,我们将使用挂起运算符应用自定义排序。

打开 PlantRepository.kt,并向 getPlantsWithGrowZoneNumberFlow 添加一个 map 转换。

PlantRepository.kt

fun getPlantsWithGrowZoneFlow(growZone: GrowZone): Flow<List<Plant>> {
   return plantDao.getPlantsWithGrowZoneNumberFlow(growZone.number)
       .map { plantList ->
           val sortOrderFromNetwork = plantsListSortOrderCache.getOrAwait()
           val nextValue = plantList.applyMainSafeSort(sortOrderFromNetwork)
           nextValue
       }
}

通过依靠常规的挂起函数来处理异步工作,这个 map 操作是 **主线程安全** 的,即使它组合了两个异步操作。

从数据库返回每个结果时,我们将获取缓存的排序顺序,如果它还没有准备好,它将等待异步网络请求。然后,一旦我们有了排序顺序,就可以安全地调用 applyMainSafeSort,它将在默认调度器上运行排序。

这段代码现在完全是主线程安全的,因为它将主线程安全问题推迟到常规的挂起函数。它比在 plantsFlow 中实现的相同转换要简单得多。

但是,值得注意的是,它的执行方式会有所不同。每次数据库发出新值时,都会获取缓存的值。这没有问题,因为我们正在 plantsListSortOrderCache 中正确地缓存它,但如果它启动了新的网络请求,这个实现会进行很多不必要的网络请求。此外,在 .combine 版本中,网络请求和数据库查询是 *同时* 运行的,而在这个版本中,它们是按顺序运行的。

由于这些差异,没有明确的规则来构建这段代码。在很多情况下,使用像我们在这里所做的那样的挂起转换是可以的,这使得所有异步操作都是顺序的。但是,在其他情况下,最好使用运算符来控制并发并提供主线程安全。

14. 使用流来控制并发

你快到了!作为最后(可选)一步,让我们将网络请求移到基于流的协程中。

通过这样做,我们将从 onClick 调用的处理程序中删除进行网络调用的逻辑,并从 growZone 中驱动它们。这有助于我们创建一个单一的事实来源,并避免代码重复——任何代码都无法在不刷新缓存的情况下更改过滤器。

打开 PlantListViewModel.kt,并将以下内容添加到 init 块中

PlantListViewModel.kt

init {
   clearGrowZoneNumber()

   growZone.mapLatest { growZone ->
           _spinner.value = true
           if (growZone == NoGrowZone) {
               plantRepository.tryUpdateRecentPlantsCache()
           } else {
               plantRepository.tryUpdateRecentPlantsForGrowZoneCache(growZone)
           }
       }
       .onEach {  _spinner.value = false }
       .catch { throwable ->  _snackbar.value = throwable.message  }
       .launchIn(viewModelScope)
}

这段代码将启动一个新的协程来观察发送到 growZoneChannel 的值。你现在可以注释掉下面方法中的网络调用,因为它们只在 LiveData 版本中需要。

PlantListViewModel.kt

fun setGrowZoneNumber(num: Int) {
    growZone.value = GrowZone(num)
    growZoneFlow.value = GrowZone(num)

    // launchDataLoad { 
    //    plantRepository.tryUpdateRecentPlantsForGrowZoneCache(GrowZone(num))
    // }
}

fun clearGrowZoneNumber() {
    growZone.value = NoGrowZone
    growZoneFlow.value = NoGrowZone

    // launchDataLoad {
    //    plantRepository.tryUpdateRecentPlantsCache()
    // }
}

再次运行应用程序

如果你现在再次运行应用程序,你会看到网络刷新现在由 growZone 控制!我们已经大幅改进了代码,因为随着更多更改过滤器的方法出现,通道充当哪个过滤器处于活动状态的单一事实来源。这样,网络请求和当前过滤器永远不会不同步。

逐步执行代码

让我们一次逐步遍历所有新使用的函数,从最外层开始

PlantListViewModel.kt

growZone
    // ...
    .launchIn(viewModelScope)

这次,我们使用 launchIn 运算符在我们的 ViewModel 中收集流。

launchIn 运算符创建一个新的协程并从流中收集每个值。它将在提供的 CoroutineScope 中启动——在本例中是 viewModelScope。这很棒,因为这意味着当这个 ViewModel 被清除时,收集将被取消。

如果没有提供任何其他运算符,这不会做太多事情——但是由于 Flow 在所有运算符中都提供了挂起 lambda,因此可以轻松地根据每个值执行异步操作。

PlantListViewModel.kt

.mapLatest { growZone ->
    _spinner.value = true
    if (growZone == NoGrowZone) {
        plantRepository.tryUpdateRecentPlantsCache()
    } else {
        plantRepository.tryUpdateRecentPlantsForGrowZoneCache(growZone)
    }
}

这就是魔法所在——mapLatest 将对每个值应用这个 map 函数。但是,与常规的 map 不同,它将为每个对 map 转换的调用启动一个新的协程。然后,如果在先前的协程完成之前,growZoneChannel 发出了一个新值,它将在启动新的协程之前取消先前的协程。

我们可以使用 mapLatest 为我们控制并发。与其自己构建取消/重启逻辑,流转换可以处理它。与手动编写相同的取消逻辑相比,这段代码节省了大量代码和复杂性。

Flow 的取消遵循协程的正常 协作取消规则

PlantListViewModel.kt

.onEach {  _spinner.value = false }
.catch { throwable -> _snackbar.value = throwable.message }

onEach 将在它上方的流发出值时每次被调用。在这里,我们使用它来在处理完成之后重置微调器。

catch 运算符将捕获在流中它上方的任何抛出异常。它可以向流发出一个新值,例如错误状态,将异常重新抛回流,或者执行像我们在这里所做的那样工作。

发生错误时,我们只是告诉我们的 _snackbar 显示错误消息。

总结

这一步向你展示了如何使用 Flow 来控制并发,以及如何在不依赖 UI 观察器的情况下在 ViewModel 中使用 Flows

作为挑战步骤,尝试定义一个函数来封装这个流的数据加载,函数签名如下

fun <T> loadDataFor(source: StateFlow<T>, block: suspend (T) -> Unit) {