在 Android 上测试 Kotlin 流

测试与 通信的单元或模块的方式取决于被测对象是使用流作为输入还是输出。

  • 如果被测对象观察流,则可以在您可以从测试中控制的伪造依赖项中生成流。
  • 如果单元或模块公开流,则可以在测试中读取和验证流发出的一个或多个项目。

创建伪造的生产者

当被测对象是流的使用者时,一种常见的测试方法是用伪造的实现替换生产者。例如,给定一个在生产环境中从两个数据源获取数据的存储库观察类的示例

the subject under test and the data layer
图 1. 被测对象和数据层。

为了使测试确定性,您可以用一个始终发出相同伪造数据的伪造存储库替换存储库及其依赖项

dependencies are replaced with a fake implementation
图 2. 依赖项被伪造的实现替换。

要在流中发出预定义的一系列值,请使用 flow 生成器

class MyFakeRepository : MyRepository {
    fun observeCount() = flow {
        emit(ITEM_1)
    }
}

在测试中,注入此伪造的存储库,替换真实的实现

@Test
fun myTest() {
    // Given a class with fake dependencies:
    val sut = MyUnitUnderTest(MyFakeRepository())
    // Trigger and verify
    ...
}

现在您可以控制被测对象的输出,可以通过检查其输出验证其是否正常工作。

在测试中断言流发射

如果被测对象正在公开流,则测试需要对数据流的元素进行断言。

假设前面的示例存储库公开了流

repository with fake dependencies that exposes a flow
图 3. 一个公开流的存储库(被测对象),带有伪造的依赖项。

对于某些测试,您只需要检查流发出的第一个发射或有限数量的项目。

您可以通过调用 first() 使用流的第一个发射。此函数等待接收第一个项目,然后向生产者发送取消信号。

@Test
fun myRepositoryTest() = runTest {
    // Given a repository that combines values from two data sources:
    val repository = MyRepository(fakeSource1, fakeSource2)

    // When the repository emits a value
    val firstItem = repository.counter.first() // Returns the first item in the flow

    // Then check it's the expected item
    assertEquals(ITEM_1, firstItem)
}

如果测试需要检查多个值,则调用 toList() 会导致流等待源发出所有其值,然后将这些值作为列表返回。这仅适用于有限的数据流。

@Test
fun myRepositoryTest() = runTest {
    // Given a repository with a fake data source that emits ALL_MESSAGES
    val messages = repository.observeChatMessages().toList()

    // When all messages are emitted then they should be ALL_MESSAGES
    assertEquals(ALL_MESSAGES, messages)
}

对于需要更复杂项目集合或不返回有限数量项目的数据流,您可以使用 Flow API 选择和转换项目。以下是一些示例

// Take the second item
outputFlow.drop(1).first()

// Take the first 5 items
outputFlow.take(5).toList()

// Takes the first item verifying that the flow is closed after that
outputFlow.single()

// Finite data streams
// Verify that the flow emits exactly N elements (optional predicate)
outputFlow.count()
outputFlow.count(predicate)

在测试期间持续收集

如前一个示例所示,使用 toList() 收集流在内部使用 collect(),并挂起直到整个结果列表准备好返回。

为了交错导致流发出值的 action 和对已发出值的断言,您可以在测试期间持续从流中收集值。

例如,获取以下要测试的 Repository 类,以及一个具有 emit 方法的伴随伪造数据源实现,以便在测试期间动态生成值

class Repository(private val dataSource: DataSource) {
    fun scores(): Flow<Int> {
        return dataSource.counts().map { it * 10 }
    }
}

class FakeDataSource : DataSource {
    private val flow = MutableSharedFlow<Int>()
    suspend fun emit(value: Int) = flow.emit(value)
    override fun counts(): Flow<Int> = flow
}

在测试中使用此伪造时,您可以创建一个收集协程,该协程将持续接收来自 Repository 的值。在此示例中,我们将其收集到列表中,然后对其内容执行断言

@Test
fun continuouslyCollect() = runTest {
    val dataSource = FakeDataSource()
    val repository = Repository(dataSource)

    val values = mutableListOf<Int>()
    backgroundScope.launch(UnconfinedTestDispatcher(testScheduler)) {
        repository.scores().toList(values)
    }

    dataSource.emit(1)
    assertEquals(10, values[0]) // Assert on the list contents

    dataSource.emit(2)
    dataSource.emit(3)
    assertEquals(30, values[2])

    assertEquals(3, values.size) // Assert the number of items collected
}

由于此处 Repository 公开的流永远不会完成,因此收集它的 toList 调用永远不会返回。在 TestScope.backgroundScope 中启动收集协程可确保在测试结束前取消协程。否则,runTest 将继续等待其完成,导致测试停止响应并最终失败。

请注意此处用于收集协程的 UnconfinedTestDispatcher。这确保了收集协程被急切地启动,并在 launch 返回后准备好接收值。

使用 Turbine

第三方库 Turbine 提供了一个方便的 API 用于创建收集协程,以及其他用于测试 Flow 的便利功能。

@Test
fun usingTurbine() = runTest {
    val dataSource = FakeDataSource()
    val repository = Repository(dataSource)

    repository.scores().test {
        // Make calls that will trigger value changes only within test{}
        dataSource.emit(1)
        assertEquals(10, awaitItem())

        dataSource.emit(2)
        awaitItem() // Ignore items if needed, can also use skip(n)

        dataSource.emit(3)
        assertEquals(30, awaitItem())
    }
}

有关更多详细信息,请参阅 库的文档

测试 StateFlows

StateFlow 是一个可观察的数据持有者,可以对其进行收集以观察其随时间推移作为流而持有的值。请注意,此值流是合并的,这意味着如果值在 StateFlow 中快速设置,则该 StateFlow 的收集器不保证接收所有中间值,而仅接收最新的值。

在测试中,如果您牢记合并,则可以像收集任何其他流一样收集 StateFlow 的值,包括使用 Turbine。在某些测试场景中,可能需要尝试收集和断言所有中间值。

但是,我们通常建议将 StateFlow 视为数据持有者,并断言其 value 属性。这样,测试就可以验证对象在给定时间点的当前状态,并且不依赖于是否发生合并。

例如,以下 ViewModel 从 Repository 收集值并在 StateFlow 中将其公开给 UI。

class MyViewModel(private val myRepository: MyRepository) : ViewModel() {
    private val _score = MutableStateFlow(0)
    val score: StateFlow<Int> = _score.asStateFlow()

    fun initialize() {
        viewModelScope.launch {
            myRepository.scores().collect { score ->
                _score.value = score
            }
        }
    }
}

Repository 的伪造实现可能如下所示

class FakeRepository : MyRepository {
    private val flow = MutableSharedFlow<Int>()
    suspend fun emit(value: Int) = flow.emit(value)
    override fun scores(): Flow<Int> = flow
}

在使用此伪造对象测试 ViewModel 时,您可以从伪造对象发出值以触发 ViewModel 中 StateFlow 的更新,然后断言更新后的 value

@Test
fun testHotFakeRepository() = runTest {
    val fakeRepository = FakeRepository()
    val viewModel = MyViewModel(fakeRepository)

    assertEquals(0, viewModel.score.value) // Assert on the initial value

    // Start collecting values from the Repository
    viewModel.initialize()

    // Then we can send in values one by one, which the ViewModel will collect
    fakeRepository.emit(1)
    assertEquals(1, viewModel.score.value)

    fakeRepository.emit(2)
    fakeRepository.emit(3)
    assertEquals(3, viewModel.score.value) // Assert on the latest value
}

使用 stateIn 创建的 StateFlows

在上一节中,ViewModel 使用 MutableStateFlow 存储由来自 Repository 的流发出的最新值。这是一种常见模式,通常通过使用 stateIn 运算符以更简单的方式实现,该运算符将冷流转换为热 StateFlow

class MyViewModelWithStateIn(myRepository: MyRepository) : ViewModel() {
    val score: StateFlow<Int> = myRepository.scores()
        .stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000L), 0)
}

stateIn 运算符具有 SharingStarted 参数,该参数确定它何时激活并开始使用底层流。例如 SharingStarted.LazilySharingStarted.WhileSubsribed 经常在 ViewModel 中使用。

即使您在测试中断言 StateFlowvalue,也需要创建一个收集器。这可以是一个空收集器。

@Test
fun testLazilySharingViewModel() = runTest {
    val fakeRepository = HotFakeRepository()
    val viewModel = MyViewModelWithStateIn(fakeRepository)

    // Create an empty collector for the StateFlow
    backgroundScope.launch(UnconfinedTestDispatcher(testScheduler)) {
        viewModel.score.collect()
    }

    assertEquals(0, viewModel.score.value) // Can assert initial value

    // Trigger-assert like before
    fakeRepository.emit(1)
    assertEquals(1, viewModel.score.value)

    fakeRepository.emit(2)
    fakeRepository.emit(3)
    assertEquals(3, viewModel.score.value)
}

其他资源