写点什么

超越 CLEAN 和 MVP:在 Android 中构建离线优先的响应式数据层

作者:Mervyn Anthony
  • 2026-06-29
    北京
  • 本文字数:9708 字

    阅读完需:约 32 分钟

前言

移动应用程序运行在高度不可预测的环境中。用户期望应用程序能够瞬间加载、支持离线使用、实时更新,并且能在蜂窝网络连接时断时续的情况下完整地保存数据。

虽然 Model-View-Presenter(MVP)和 CLEAN 架构等模式为关注点分离提供了可靠的起点,但在应用于移动平台独特且具有响应性的需求时,它们往往力不从心,甚至还会引入不必要的模板代码。

本文介绍了响应式数据层架构(RDLA)——一种专门针对移动端优化过的模式,专门设计用于弥合响应式 UI 框架(如 Jetpack Compose)与受限的移动存储之间的差距。通过协调这两个边界,RDLA 使开发者能够构建健壮、以离线优先为原则的响应式数据层。

RDLA 对于任何需要实时 UI 更新和离线支持的应用程序都有益处,但在与联网硬件或易变数据源进行交互时,它变得更为关键。例如,在消费级医疗物联网和可穿戴设备领域(如双节点睡眠监测可穿戴设备或自适应助听器),应用程序需要绝对的可靠性和同步性。

与基于稳定网络协议的传统 REST API 架构不同,移动数据获取通常需要处理硬件 API(如蓝牙低功耗),而这些 API 依赖于在 Binder 线程间执行的深度嵌套、异步回调。如果没有可靠的架构来序列化操作,并将本地缓存视为唯一的权威数据源,那么这些系统很快就会陷入状态同步错误和连接不稳定的困境。例如,基于 BLE 设备的应用会触发臭名昭著的“ GATT 竞争条件”,导致底层蓝牙控制器乱序处理命令或完全丢弃命令(这通常会导致文档记录不详的 GATT 错误代码 Status 133 或 129)。

为了应对这些挑战,RDLA 将本地缓存视为确定性的 UI 缓冲区,同时利用 Kotlin 协程和 suspendCancellableCoroutine 桥接器来串行化物理硬件操作,从而将混乱的多线程异步事件转化为确定性的同步数据流。

借鉴为高度受监管的消费级医疗设备开发的架构模式,我们将通过一个健康指标追踪系统(专用于追踪心率记录)来探讨 RDLA 的拓扑结构,将其与传统模式进行对比,并演示其在 Kotlin 中的实现。

传统模式的局限性

在深入探讨 RDLA 之前,让我们先分析一下为什么传统模式在现代 Android 开发中会显得力不从心。

1. MVP 架构的拉取瓶颈

在经典的 Model-View-Presenter (MVP) 架构中,通信是过程式的,而且基于拉取:

  1. Presenter 向模型请求数据。

  2. Model 获取数据并通过回调将其返回。

  3. Presenter 将数据推送给 View。

这种机制在比较简单的应用程序中是可行的,但在响应式编程环境中却行不通。如果后台同步进程更新了数据库,除非 Presenter 轮询数据库或依赖复杂的事件总线,否则它将无法感知到这一变化。MVP 缺乏一种原生的机制自动地将状态变化向下游传播。

2. CLEAN 架构在移动端的错位

CLEAN 架构在保持业务逻辑与框架独立性方面表现出色。然而,如果在移动开发中未经修改地直接应用该架构,则会带来两个明显的挑战:

  • 冗余代码负担(透传用例):对于简单的读取操作,经典的 CLEAN 架构迫使开发者创建一个仅调用 Repository 方法的用例类。在拥有数十张表的数据库密集型应用中,这将导致大量琐碎的“透传”类,增加了维护负担,却未带来任何业务价值。

  • 平台无关性与移动端现实:在设计上,CLEAN 旨在实现数据库和框架的无关性。虽然这在企业后端系统中效果良好,但无法解决移动端特有的限制。它没有处理本地-远程数据同步、离线状态传播或 SQLite 性能限制(如数据库编译边界)方面的指导。

RDLA 简介

响应式数据层架构(RDLA)是一种专门设计的模式,用于弥合响应式 UI 框架(如 Jetpack Compose)与移动端存储限制之间的差距。

RDLA 严格区分数据定义(API)与数据获取(实现),并遵循三大核心原则:

  1. 基于响应式推送的数据流:UI 绝不会用“一次性”的方式查询数据。相反,它会订阅数据的“冷流”(Flow)。

  2. 本地缓存作为唯一数据源:UI 仅从本地数据库读取数据。网络仅用于填充该数据库。

  3. 封装缓存与同步:检查缓存过期、合并本地编辑以及触发后台数据获取的逻辑完全封装在 Repository 实现中。

架构拓扑

RDLA 将数据包划分为三个独立的模块:API、实现和数据库(共享存储)。

图 1:RDLA 架构拓扑与模块边界(图片由作者制作)

RDLA 在架构生态中的定位:与 Clean 和 MVVM 相融合

RDLA 并非 MVVM 或 Clean 架构的替代方案。相反,它是一种移动端优先的数据层(及部分领域层)实现方式,能够与上述架构模式无缝集成。通过优化这些模式之间的接口,它有效地解决了移动端特有的常见痛点。

RDLA 如何与 Clean 架构融合

Clean 架构的核心在于依赖规则:代码依赖关系必须仅向内指向核心业务逻辑(实体和用例)。RDLA 严格遵守这一规则,并针对移动端的限制条件优化了实现:

图 2:RDLA 模块与 Clean 架构各层的映射关系(图片由作者制作)

  • API 模块(实体):RDLA 的 API 模块直接对应于 Clean 架构最内层的实体层。它仅包含纯 Kotlin 数据模型(如 HeartRateRecord)和 Repository 接口。该模块完全不依赖于任何平台、数据库或网络。

  • 存储库实现(用例):在经典的 Clean 实现中,开发人员通常会针对每次数据库读取编写一个用例类(例如 GetHeartRateRecordsUseCase)。RDLA 允许表示层直接查看存储库的响应流,在简单 CRUD 操作中消除了这类冗余代码。然而,对于跨多个领域的复杂业务逻辑(例如,根据睡眠和心电记录计算心率变异性),则仍然需要创建一个依赖于 RDLA Repository API 的标准 Clean 用例类。

  • 数据源接口(接口适配器):私有接口 LocalDataSource 和 RemoteDataSource 位于实现模块中,作为边界(Clean 架构中的接口适配器)将存储库与具体的数据库和网络引擎隔离开来。

  • Room DB 与 Retrofit 客户端(框架与驱动程序):具体实现(RoomLocalDataSource、RetrofitRemoteDataSource)位于独立的数据库和网络模块中。框架细节(如 Room 注解或序列化库)完全封装在这个外部边界内。

RDLA 如何驱动 MVVM(单向数据流)

在传统的 MVVM 架构中,ViewModel 通常充当主动管理者的角色,从存储库中提取数据并管理其生命周期。这种采用命令式管理的数据流容易引发同步错误。

RDLA 通过将 Model 转换为响应式数据总线改变了这一状况,实现了严格的单向数据流(UDF):

图 3:RDLA 中的单向数据流(UDF)响应循环(图片由作者制作)

  • ViewModel 作为转换器而非同步器:与通过启动协程按需获取数据并手动更新状态存储器不同,RDLA 中的 ViewModel 是一个被动的转换器。它监听存储库的 Flow,并使用 stateIn 运算符将其直接转换为 UI 可用的 StateFlow。

  • 自动 UI 同步:当后台同步工作线程或离线修改操作更新 Room 数据库时,数据库会自动发布新的数据集。这一变化会通过存储库和 ViewModel 直接传播给 UI,不需要 ViewModel 轮询或协调刷新操作。

  • 清晰的状态分离:RDLA 允许 ViewModel 将持久状态(通过由 Room 支持的 StateFlow 处理)与瞬态事件(通过 SharedFlow 处理,用于连接中断或错误等一次性通知)清晰地分离。

RDLA 实践:健康指标追踪系统

为了说明这种架构,我们将构建一个跟踪心率指标的数据层。

1. API 模块(公共)

API 模块是 UI 层唯一能访问的包。它包含纯领域模型和存储库接口。

领域模型(HeartRateRecord.kt

这是一个标准的 Kotlin 数据类,没有数据库或序列化注解。

package com.example.healthtracker.data.heartrate.api.modelimport java.time.Instantdata class HeartRateRecord(    val id: String,    val bpm: Int,    val timestamp: Instant)
复制代码

Repository 接口(HeartRateRepository.kt

该接口定义了公共合约,提供了冷流。

package com.example.healthtracker.data.heartrate.apiimport com.example.healthtracker.data.heartrate.api.model.HeartRateRecordimport kotlinx.coroutines.flow.Flowimport java.time.Instantinterface HeartRateRepository {    /**     * 返回一个包含心率记录的响应式数据流。     * 如果本地缓存已过期,则在后台触发网络刷新。     */    fun observeHeartRates(start: Instant, end: Instant): Flow<List<HeartRateRecord>>    /**     * 上传新的心率记录。在服务器确认之前暂停操作。     */    suspend fun upsertHeartRates(records: List<HeartRateRecord>)}
复制代码

2. 实现模块(私有)

为了防止泄露,实现模块中的所有内容均被标记为 internal。UI 层绝不能直接访问这些类。

缓存封装器(Cached.kt

为了管理缓存过期而又不向领域模型中引入元数据,我们在实现层中将模型封装在 Cached 容器中:

package com.example.healthtracker.data.core.cachingimport java.time.Instantdata class Cached<out T>(    val value: T,    val insertionTime: Instant)
复制代码

Repository 协调器(HeartRateFetchAndStoreRepository.kt

该类负责协调本地和远程数据源。它会检查缓存过期情况,并触发后台数据获取。

package com.example.healthtracker.data.heartrate.implimport com.example.healthtracker.data.heartrate.api.HeartRateRepositoryimport com.example.healthtracker.data.heartrate.api.model.HeartRateRecordimport com.example.healthtracker.data.heartrate.impl.local.HeartRateLocalDataSourceimport com.example.healthtracker.data.heartrate.impl.remote.HeartRateRemoteDataSourceimport com.example.healthtracker.data.core.caching.Cachedimport kotlinx.coroutines.CoroutineScopeimport kotlinx.coroutines.flow.Flowimport kotlinx.coroutines.flow.flowOnimport kotlinx.coroutines.flow.mapimport kotlinx.coroutines.flow.onStartimport kotlinx.coroutines.launchimport java.time.Durationimport java.time.Instantimport kotlin.coroutines.CoroutineContextinternal class HeartRateFetchAndStoreRepository(    private val localDS: HeartRateLocalDataSource,    private val remoteDS: HeartRateRemoteDataSource,    private val appScope: CoroutineScope,    private val lightweightContext: CoroutineContext,    private val cacheTtl: Duration = Duration.ofMinutes(10)) : HeartRateRepository {    override fun observeHeartRates(start: Instant, end: Instant): Flow<List<HeartRateRecord>> {        return localDS.readHeartRates(start, end)            .onStart {                 // 异步后台刷新执行                appScope.launch { triggerRefreshIfNeeded(start, end) }             }            .map { cachedList -> cachedList.map { it.value } }            .flowOn(lightweightContext)    }    private suspend fun triggerRefreshIfNeeded(start: Instant, end: Instant) {        val cachedData = localDS.readHeartRatesOnce(start, end)        if (cachedData.isEmpty() || isStale(cachedData)) {            try {                val remoteData = remoteDS.fetchHeartRates(start, end)                localDS.writeHeartRates(remoteData)            } catch (e: Exception) {                // 静默失败;UI 继续显示 cachedData            }        }    }    private fun isStale(data: List<Cached<HeartRateRecord>>): Boolean {        val oldestAllowed = Instant.now().minus(cacheTtl)        return data.any { it.insertionTime.isBefore(oldestAllowed) }    }    override suspend fun upsertHeartRates(records: List<HeartRateRecord>) {        // 同步更新:在更新数据库之前,服务器的写入操作必须成功        val serverConfirmed = remoteDS.uploadHeartRates(records)        localDS.writeHeartRates(serverConfirmed)    }}
复制代码

请注意,通过在应用程序作用域 CoroutineScope(appScope)中调用 triggerRefreshIfNeeded,即使用户退出当前屏幕(这会导致 ViewModel 的作用域被取消),也能确保数据库更新成功完成。

3. 存储层模块(Room)

在实际的移动应用中,相关功能通常共享一个数据库。RDLA 引入了事务组来处理这种情况。例如,心率和血压数据被归入“心血管事务组”,共享一个 Room 数据库实例。

图 4:Cardio 事务组下的 Room 本地存储模块包的层次结构(图片由作者制作)

本地数据源接口(HeartRateLocalDataSource.kt

interface HeartRateLocalDataSource {    fun readHeartRates(start: Instant, end: Instant): Flow<List<Cached<HeartRateRecord>>>    suspend fun readHeartRatesOnce(start: Instant, end: Instant): List<Cached<HeartRateRecord>>    suspend fun writeHeartRates(records: List<HeartRateRecord>)}The Database Entity (HeartRateEntity.kt)@Entity(tableName = "heart_rate_records")data class HeartRateEntity(    @PrimaryKey val id: String,    val bpm: Int,    val timestamp: Instant,    val insertionTime: Instant) {    fun toModel() = HeartRateRecord(id = id, bpm = bpm, timestamp = timestamp)}
复制代码

Room 实现(HeartRateRoomDataSource.kt

internal class HeartRateRoomDataSource(    private val dao: HeartRateDao,    private val lightweightContext: CoroutineContext) : HeartRateLocalDataSource {    override fun readHeartRates(start: Instant, end: Instant): Flow<List<Cached<HeartRateRecord>>> {        return dao.observeHeartRates(start, end)            .distinctUntilChanged() // 防止 Room 发出冗余数据            .map { entities ->                entities.map { Cached(it.toModel(), it.insertionTime) }            }            .flowOn(lightweightContext)    }    override suspend fun readHeartRatesOnce(start: Instant, end: Instant): List<Cached<HeartRateRecord>> {        return dao.getHeartRates(start, end).map { Cached(it.toModel(), it.insertionTime) }    }    override suspend fun writeHeartRates(records: List<HeartRateRecord>) {        val entities = records.map {             HeartRateEntity(it.id, it.bpm, it.timestamp, Instant.now())         }        dao.insertOrUpdate(entities)    }}
复制代码

提示:我们将 distinctUntilChanged() 应用于 Room 数据流。由于 Room 会触发表级观察,所以对表的任何更新都会触发数据发布,即使所查询的子集未发生变化也是如此。distinctUntilChanged() 可以过滤掉这些冗余的事件。

4. UI 层(组合)消费

响应式数据层的强大程度取决于调用它的表示层的性能。为了有效连接 RDLA 与 Jetpack Compose,UI 层利用 StateFlow 将底层数据流聚合为统一的状态表示。这可以确保无论应用程序的生命周期如何变化,UI 都能准确地反映持久状态(如“已连接”、“正在扫描”或“已断开连接”)。相反,瞬态事件(例如局部同步回滚或 BLE 连接中断)会绕过永久状态。这些事件通过一个带有重放缓存且高度可配置的 SharedFlow 推送至 UI,从而可以确保即使在设备旋转导致 UI 暂时销毁的情况下,关键的一次性警报也能被保留并即时发送。

@HiltViewModelclass HeartRateViewModel @Inject constructor(    private val repository: HeartRateRepository) : ViewModel() {    // 持久状态流    val uiState: StateFlow<UiState> = repository        .observeHeartRates(Instant.now().minus(1, ChronoUnit.DAYS), Instant.now())        .map { records -> UiState.Success(records) }        .stateIn(            scope = viewModelScope,            started = SharingStarted.WhileSubscribed(5000),            initialValue = UiState.Loading        )    // 瞬态事件流    private val _faultEvents = MutableSharedFlow<FaultEvent>(        replay = 1,        onBufferOverflow = BufferOverflow.DROP_OLDEST    )    val faultEvents = _faultEvents.asSharedFlow()}
复制代码

通过将用于处理持续状态的 StateFlow 与用于处理易失性硬件事件的 SharedFlow 无缝集成,ViewModel 为 Compose 提供了一座坚不可摧的响应式数据桥,确保 UI 绝不会显示过时或异常状态。

设计离线突变

变异是指由用户发起的更新操作。根据用户体验的要求,变异操作可以采用同步或异步方式处理。

1. 同步变异

同步变异要求用户处于在线状态。如果网络请求失败,则本地数据库将保持不变,并立即向用户显示错误提示(如尝试删除医疗日志)。

2. 异步变异(离线优先)

对于标准日志(如运动期间记录的心率 BPM),写入操作必须立即成功,即使在离线状态下也是如此。为实现这一点,变异会被写入本地数据库队列,实时合并到活跃的用户界面数据流中,并在后台进行同步。

图 5:异步突变同步管道与操作系统后台委托(图片由作者制作)

在本地数据源中实现突变合并

override fun readHeartRates(start: Instant, end: Instant): Flow<List<Cached<HeartRateRecord>>> {    val savedRecordsFlow = dao.observeHeartRates(start, end)    val pendingMutationsFlow = mutationDao.observePendingAddMutations()    return savedRecordsFlow.combine(pendingMutationsFlow) { saved, mutations ->        val mergedList = saved.map { Cached(it.toModel(), it.insertionTime) }.toMutableList()                mutations.forEach { mutation ->            if (mutation.timestamp in start..end) {                // 将本地待处理的突变叠加到列表上方                mergedList.removeAll { it.value.id == mutation.localId }                mergedList.add(                    Cached(                        value = HeartRateRecord(mutation.localId, mutation.bpm, mutation.timestamp),                        insertionTime = mutation.localCreationTime                    )                )            }        }        mergedList.sortedByDescending { it.value.timestamp }    }.flowOn(lightweightContext)}
复制代码

RDLA 将后台执行职责明确地分配给 appScope 协程和 Android Jetpack WorkManager。对于即时同步的数据处理(例如将收到的健康指标保存到 Room 数据库),该架构会在应用程序作用域的 CoroutineContext 中启动协程。这样可以保证,即使用户迅速跳转到了其他界面并取消了当前的 ViewModel 作用域,轻量级的本地数据库写入操作也能成功完成。

然而,对于连接本地数据库与远程云端的异步数据更新(或通过 BLE 推送数兆字节的 OTA 固件负载),appScope 就不够用了。Android 激进的电源管理功能(Doze 模式)和进程终止机制可能会在操作进行过程中终止应用程序。对于受 FDA 监管的健康指标,数据丢失是绝对不可接受的。通过 WorkManager 将异步数据更新加入队列,并将同步请求直接委托给操作系统服务。这样可以保证关键数据包同步在满足严格系统约束(例如要求使用不限流量的 Wi-Fi 或充足的电量)的情况下执行,并且如果连接中断,也能可靠地从最后已知的内存偏移量处精确恢复。

后台同步通过 Android WorkManager 进行管理,使用由 Hilt 注入的 CoroutineWorker。通过将网络调用的限制与 UI 解耦,即使用户关闭了应用,也能确保请求成功。

冲突解决与回滚

虽然乐观的本地更新能提供无缝的用户体验,但本质上容易引发冲突。当 WorkManager 同步工作进程尝试将本地变更队列上传至服务器时,总是存在被远程拒绝的风险。例如,如果该记录被另一台已授权的设备并发修改,就可能返回 HTTP 409 冲突错误;或者如果数据未能通过临床验证,就可能返回 422 Unprocessable Entity 的错误。

在 RDLA 中,架构必须能够优雅地处理这些回滚操作,而且要确保不破坏本地“单一可信数据源”。当远程 API 抛出 HTTP 异常时,工作线程会捕获该故障,并将 Room 中的本地变异实体标记为 FAILED,而不是触发无限重试循环。

中央存储库会监控同步过程的状态,并通过 SharedFlow 将这个故障状态作为瞬态事件发送至 UI 层。与此同时,本地数据源会执行一项数据库事务,将队列中被拒绝的变更操作清除。由于 UI 层是以响应式方式收集合并后的数据流,清除待处理的变更操作会迫使 Room 触发新的数据发布。distinctUntilChanged() 过滤器会传递这个更新后的状态,而 UI 会立即自动回滚到最后一个经服务器确认的已知状态。

为什么 RDLA 让测试变得轻而易举

RDLA 最大的优势之一在于它简化了单元测试。通过将 Room 数据库和 Retrofit 网络配置进行隔离,你可以直接测试存储库逻辑。

TestExtensions 模式

由于存储库接口不应该向客户端暴露数据插入的方法(为了防止用户界面直接修改同步状态),所以我们在 API 目标中引入了一个 testonly 接口:

图 6:TestExtensions 的初始化模式与作用域边界分离(图片由作者制作)

1. 定义 Test Extensions 接口(位于 API 模块中)

@VisibleForTestinginterface HeartRateRepositoryTestExtensions {    suspend fun seedLocalHeartRates(records: List<HeartRateRecord>)    suspend fun clearLocalCache()}
复制代码

2. 在 Repository 中实现接口(位于 Impl 接口中)

internal class HeartRateFetchAndStoreRepository(    private val localDS: HeartRateLocalDataSource,    // ...) : HeartRateRepository, HeartRateRepositoryTestExtensions {        // Repository 实现...    override suspend fun seedLocalHeartRates(records: List<HeartRateRecord>) {        localDS.writeHeartRates(records)    }    override suspend fun clearLocalCache() {        localDS.clearAll()    }}
复制代码

3. 编写一个松耦合的单元测试(使用 Robolectric )

@HiltAndroidTest@RunWith(AndroidJUnit4::class)@Config(application = HiltTestApplication::class)class HeartRateRepositoryTest {    @get:Rule val hiltRule = HiltAndroidRule(this)    @Inject lateinit var repository: HeartRateRepository    @Inject lateinit var testExtensions: HeartRateRepositoryTestExtensions    @Before    fun setUp() {        hiltRule.inject()    }    @Test    fun observeHeartRates_emitsSeededData() = runTest {        val now = Instant.now()        val records = listOf(HeartRateRecord("1", 72, now))                // 通过测试扩展直接向数据库插入数据        testExtensions.seedLocalHeartRates(records)        val flow = repository.observeHeartRates(now.minusSeconds(60), now.plusSeconds(60))                assertThat(flow.first()).containsExactlyElementsIn(records)    }}
复制代码

核心测试优势

  • 无需模拟 SQLite:结合 Robolectric 和真实的 Room 数据库,可以确保在测试时对 SQL 查询进行全面验证。

  • 强大的离线验证:通过注入 FakeHeartRateRemoteDataSource,可以模拟网络故障,并验证存储库的备用逻辑能否优雅地处理离线状态。

  • 解耦数据库重构:如果你重构了 SQLite 模式(例如添加数据库字段),但只要本地数据源中的映射逻辑能正确地将数据库实体转换为领域模型,你的存储库测试就不会失败。

小结

要构建一个以离线优先为原则的响应式 Android 应用程序,需要设计一个专为响应式应用而打造的数据层。通过应用响应式数据层架构(RDLA),你可以在公共数据 API 契约与特定于框架的私有数据源实现之间建立清晰的边界。这样,你的表示层(ViewModels/Presenters)将以纯粹的响应式方式运行,通过监听数据变化而非通过过程化的方式查询数据。此外,为了简化测试工作,RDLA 鼓励开发人员基于接口进行编程并利用 TestExtensions 等简洁的初始化模式。

归根结底,向 RDLA 过渡能为你的代码库提供一种可以干净利落地进行扩展的结构,让你可以从容地应对同步挑战,并支持现代用户所期待的、离线优先的丰富体验。

原文链接:https://www.infoq.com/articles/rdla-offline-first-reactive-android-data-layer/