在协程中,流是一种可以顺序发出多个值的类型,而不是仅返回单个值的挂起函数。例如,您可以使用流接收来自数据库的实时更新。
流构建在协程之上,可以提供多个值。从概念上讲,流是数据流,可以异步计算。发出的值必须具有相同的类型。例如,Flow<Int>
是一个发出整数值的流。
流非常类似于生成一系列值的Iterator
,但它使用挂起函数来异步生成和使用值。这意味着,例如,流可以安全地发出网络请求以生成下一个值,而不会阻塞主线程。
数据流涉及三个实体
- 生产者生成添加到流中的数据。由于协程的存在,流也可以异步生成数据。
- (可选)中间体可以修改流入流的每个值或流本身。
- 消费者使用流中的值。
在 Android 中,存储库 通常是 UI 数据的生产者,其用户界面 (UI) 是最终显示数据的消费者。其他时候,UI 层是用户输入事件的生产者,层次结构中的其他层会使用它们。生产者和消费者之间的层通常充当中间体,修改数据流以使其适应下一层的需求。
创建流
要创建流,请使用流构建器 API。 flow
构建器函数创建一个新的流,您可以在其中使用emit
函数手动将新值发出到数据流中。
在以下示例中,数据源以固定的时间间隔自动获取最新的新闻。由于挂起函数无法返回多个连续的值,因此数据源创建并返回一个流来满足此需求。在这种情况下,数据源充当生产者。
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
flow
构建器在协程中执行。因此,它受益于相同的异步 API,但有一些限制适用
- 流是顺序的。由于生产者在协程中,因此在调用挂起函数时,生产者会挂起,直到挂起函数返回。在示例中,生产者会挂起,直到
fetchLatestNews
网络请求完成。只有这样,结果才会发出到流中。 - 使用
flow
构建器,生产者无法从不同的CoroutineContext
中emit
值。因此,请勿通过创建新的协程或使用withContext
代码块在不同的CoroutineContext
中调用emit
。在这些情况下,您可以使用其他流构建器,例如callbackFlow
。
修改流
中间体可以使用中间操作符修改数据流,而无需使用值。这些操作符是在应用于数据流时设置一系列操作的函数,这些操作直到将来在使用值时才会执行。在Flow 参考文档中了解有关中间操作符的更多信息。
在下面的示例中,存储库层使用中间操作符map
将数据转换为要在View
上显示的数据
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
可以将中间操作符一个接一个地应用,形成一个操作链,这些操作在将项目发出到流中时延迟执行。请注意,仅将中间操作符应用于流并不会启动流收集。
从流中收集
使用终端操作符触发流开始侦听值。要获取流中发出的所有值,请使用collect
。您可以在官方流文档中了解有关终端操作符的更多信息。
由于collect
是一个挂起函数,因此需要在协程中执行。它接受一个 lambda 表达式作为参数,该表达式在每个新值上都会被调用。由于它是一个挂起函数,因此调用collect
的协程可能会挂起,直到流关闭。
继续前面的示例,这是一个使用存储库层数据构建ViewModel
的简单实现。
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
收集流会触发生产者,生产者会刷新最新的新闻并在固定的时间间隔内发出网络请求的结果。由于生产者使用while(true)
循环始终处于活动状态,因此当 ViewModel 被清除且viewModelScope
被取消时,数据流将关闭。
流收集可能因以下原因停止
- 收集的协程被取消,如前面的示例所示。这也会停止底层的生产者。
- 生产者完成发出项目。在这种情况下,数据流将关闭,并且调用
collect
的协程恢复执行。
除非使用其他中间操作符指定,否则流是冷的和惰性的。这意味着每次在流上调用终端操作符时都会执行生产者代码。在前面的示例中,如果有多个流收集器,则会导致数据源在不同的固定时间间隔内多次获取最新的新闻。为了优化并在多个使用者同时收集时共享流,请使用shareIn
操作符。
捕获意外异常
生产者的实现可能来自第三方库。这意味着它可能会抛出意外异常。要处理这些异常,请使用catch
中间操作符。
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
在前面的示例中,当发生异常时,collect
lambda 不会被调用,因为没有收到新项目。
catch
也可以emit
项目到流中。例如,存储库层可以emit
缓存的值。
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
在本例中,当发生异常时,collect
lambda 会被调用,因为由于异常,一个新项目已被发出到流中。
在不同的 CoroutineContext 中执行
flow
构建器的生产者默认在收集它的协程的CoroutineContext
中执行,并且如前所述,它不能从不同的CoroutineContext
emit
值。在某些情况下,这种行为可能不理想。例如,在本主题中使用的示例中,存储库层不应在Dispatchers.Main
上执行操作,而Dispatchers.Main
由viewModelScope
使用。
要更改流的CoroutineContext
,请使用中间操作符flowOn
。flowOn
更改上游流的CoroutineContext
,这意味着生产者和在flowOn
之前(或之上)应用的任何中间操作符。下游流(flowOn
之后的中间操作符以及使用者)不受影响,并在用于从流中collect
的CoroutineContext
上执行。如果有多个flowOn
操作符,则每个操作符都会从其当前位置更改上游。
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
使用此代码,onEach
和map
操作符使用defaultDispatcher
,而catch
操作符和使用者在viewModelScope
使用的Dispatchers.Main
上执行。
由于数据源层正在执行 I/O 工作,因此您应该使用针对 I/O 操作进行了优化的调度器。
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// Executes on the IO dispatcher
...
}
.flowOn(ioDispatcher)
}
Jetpack 库中的流
Flow 已集成到许多 Jetpack 库中,并且在 Android 第三方库中很受欢迎。Flow 非常适合实时数据更新和无限数据流。
您可以使用Flow 与 Room来接收数据库中更改的通知。使用数据访问对象 (DAO)时,返回Flow
类型以获取实时更新。
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
每次Example
表发生更改时,都会发出一个新列表,其中包含数据库中的新项目。
将基于回调的 API 转换为流
callbackFlow
是一个流构建器,它允许您将基于回调的 API 转换为流。例如,Firebase Firestore Android API 使用回调。
要将这些 API 转换为流并侦听 Firestore 数据库更新,您可以使用以下代码
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
trySend(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
与flow
构建器不同,callbackFlow
允许从不同的CoroutineContext
使用send
函数或在协程外部使用trySend
函数发出值。
在内部,callbackFlow
使用一个通道,它在概念上与阻塞队列非常相似。通道配置有容量,即可以缓冲的最大元素数。在callbackFlow
中创建的通道的默认容量为 64 个元素。当您尝试向已满的通道添加新元素时,send
会挂起生产者,直到有空间容纳新元素,而trySend
不会将元素添加到通道中,并立即返回false
。
trySend
立即将指定的元素添加到通道中,前提是这不会违反其容量限制,然后返回成功的结果。