【ArchSummit架构师峰会】探讨数据与人工智能相互驱动的关系>>> 了解详情
写点什么

Android 消息总线的演进之路:用 LiveDataBus 替代 RxBus、EventBus

  • 2020-02-20
  • 本文字数:8453 字

    阅读完需:约 28 分钟

Android消息总线的演进之路:用LiveDataBus替代RxBus、EventBus

背景

对于 Android 系统来说,消息传递是最基本的组件,每一个 App 内的不同页面,不同组件都在进行消息传递。消息传递既可以用于 Android 四大组件之间的通信,也可用于异步线程和主线程之间的通信。对于 Android 开发者来说,经常使用的消息传递方式有很多种,从最早使用的 Handler、BroadcastReceiver、接口回调,到近几年流行的通信总线类框架 EventBus、RxBus。Android 消息传递框架,总在不断的演进之中。

从 EventBus 说起

EventBus 是一个 Android 事件发布/订阅框架,通过解耦发布者和订阅者简化 Android 事件传递。EventBus 可以代替 Android 传统的 Intent、Handler、Broadcast 或接口回调,在 Fragment、Activity、Service 线程之间传递数据,执行方法。


EventBus 最大的特点就是:简洁、解耦。在没有 EventBus 之前我们通常用广播来实现监听,或者自定义接口函数回调,有的场景我们也可以直接用 Intent 携带简单数据,或者在线程之间通过 Handler 处理消息传递。但无论是广播还是 Handler 机制远远不能满足我们高效的开发。EventBus 简化了应用程序内各组件间、组件与后台线程间的通信。EventBus 一经推出,便受到广大开发者的推崇。


现在看来,EventBus 给 Android 开发者世界带来了一种新的框架和思想,就是消息的发布和订阅。这种思想在其后很多框架中都得到了应用。



图片摘自 EventBus GitHub 主页

发布/订阅模式订阅发布模式

定义了一种“一对多”的依赖关系,让多个订阅者对象同时监听某一个主题对象。这个主题对象在自身状态变化时,会通知所有订阅者对象,使它们能够自动更新自己的状态。


RxBus 的出现

RxBus不是一个库,而是一个文件,实现只有短短 30行代码。RxBus 本身不需要过多分析,它的强大完全来自于它基于的 RxJava 技术。响应式编程(Reactive Programming)技术这几年特别火,RxJava 是它在 Java 上的实作。RxJava 天生就是发布/订阅模式,而且很容易处理线程切换。所以,RxBus 凭借区区 30行代码,就敢挑战 EventBus 江湖老大的地位。

RxBus 原理

在 RxJava 中有个 Subject 类,它继承 Observable 类,同时实现了Observer 接口,因此 Subject 可以同时担当订阅者和被订阅者的角色,我们使用 Subject 的子类 PublishSubject 来创建一个 Subject 对象(PublishSubject 只有被订阅后才会把接收到的事件立刻发送给订阅者),在需要接收事件的地方,订阅该 Subject 对象,之后如果 Subject 对象接收到事件,则会发射给该订阅者,此时 Subject 对象充当被订阅者的角色。


完成了订阅,在需要发送事件的地方将事件发送给之前被订阅的 Subject 对象,则此时 Subject 对象作为订阅者接收事件,然后会立刻将事件转发给订阅该 Subject 对象的订阅者,以便订阅者处理相应事件,到这里就完成了事件的发送与处理。


最后就是取消订阅的操作了,RxJava 中,订阅操作会返回一个 Subscription 对象,以便在合适的时机取消订阅,防止内存泄漏,如果一个类产生多个 Subscription 对象,我们可以用一个 CompositeSubscription 存储起来,以进行批量的取消订阅。

RxBus 有很多实现,如:

AndroidKnife/RxBus(https://github.com/AndroidKnife/RxBus


Blankj/RxBus(https://github.com/Blankj/RxBus


其实正如前面所说的,RxBus 的原理是如此简单,我们自己都可以写出一个 RxBus 的实现:

基于 RxJava1 的 RxBus 实现:

public final class RxBus {
private final Subject<Object, Object> bus;
private RxBus() { bus = new SerializedSubject<>(PublishSubject.create()); }
private static class SingletonHolder { private static final RxBus defaultRxBus = new RxBus(); }
public static RxBus getInstance() { return SingletonHolder.defaultRxBus; }
/* * 发送 */ public void post(Object o) { bus.onNext(o); }
/* * 是否有Observable订阅 */ public boolean hasObservable() { return bus.hasObservers(); }
/* * 转换为特定类型的Obserbale */ public <T> Observable<T> toObservable(Class<T> type) { return bus.ofType(type); }}
复制代码


基于 RxJava2 的 RxBus 实现:


public final class RxBus2 {
private final Subject<Object> bus;
private RxBus2() { // toSerialized method made bus thread safe bus = PublishSubject.create().toSerialized(); }
public static RxBus2 getInstance() { return Holder.BUS; }
private static class Holder { private static final RxBus2 BUS = new RxBus2(); }
public void post(Object obj) { bus.onNext(obj); }
public <T> Observable<T> toObservable(Class<T> tClass) { return bus.ofType(tClass); }
public Observable<Object> toObservable() { return bus; }
public boolean hasObservers() { return bus.hasObservers(); }}
复制代码

引入 LiveDataBus 的想法

从 LiveData 谈起

LiveData 是 Android Architecture Components 提出的框架。LiveData 是一个可以被观察的数据持有类,它可以感知并遵循 Activity、Fragment 或 Service 等组件的生命周期。正是由于 LiveData 对组件生命周期可感知特点,因此可以做到仅在组件处于生命周期的激活状态时才更新 UI 数据。


LiveData 需要一个观察者对象,一般是 Observer 类的具体实现。当观察者的生命周期处于 STARTED 或 RESUMED 状态时,LiveData 会通知观察者数据变化;在观察者处于其他状态时,即使 LiveData 的数据变化了,也不会通知。

LiveData 的优点

  • UI 和实时数据保持一致 因为 LiveData 采用的是观察者模式,这样一来就可以在数据发生改变时获得通知,更新 UI。

  • 避免内存泄漏 观察者被绑定到组件的生命周期上,当被绑定的组件销毁(destroy)时,观察者会立刻自动清理自身的数据。

  • 不会再产生由于 Activity 处于 stop 状态而引起的崩溃


例如:当 Activity 处于后台状态时,是不会收到 LiveData 的任何事件的。


  • 不需要再解决生命周期带来的问题 LiveData 可以感知被绑定的组件的生命周期,只有在活跃状态才会通知数据变化。

  • 实时数据刷新 当组件处于活跃状态或者从不活跃状态到活跃状态时总是能收到最新的数据。

  • 解决 Configuration Change 问题 在屏幕发生旋转或者被回收再次启动,立刻就能收到最新的数据。

谈一谈 Android Architecture Components

Android Architecture Components 的核心是 Lifecycle、LiveData、ViewModel 以及 Room,通过它可以非常优雅的让数据与界面进行交互,并做一些持久化的操作,高度解耦,自动管理生命周期,而且不用担心内存泄漏的问题。


  • Room 一个强大的 SQLite 对象映射库。

  • ViewModel 一类对象,它用于为 UI 组件提供数据,在设备配置发生变更时依旧可以存活。

  • LiveData 一个可感知生命周期、可被观察的数据容器,它可以存储数据,还会在数据发生改变时进行提醒。

  • Lifecycle 包含 LifeCycleOwer 和 LifecycleObserver,分别是生命周期所有者和生命周期感知者。

Andrroiid Arrchitecture Components 的特点

  • 数据驱动型编程 变化的永远是数据,界面无需更改。

  • 感知生命周期,防止内存泄漏。

  • 高度解耦 数据,界面高度分离。

  • 数据持久化 数据、ViewModel不与 UI 的生命周期挂钩,不会因为界面的重建而销毁。

重点:为什么使用 LiveData 构建数据通信总线 LiveDataBus

使用 LiveData 的理由

  • LiveData 具有的这种可观察性和生命周期感知的能力,使其非常适合作为 Android 通信总线的基础构件。

  • 使用者不用显示调用反注册方法。


由于 LiveData 具有生命周期感知能力,所以 LiveDataBus 只需要调用注册回调方法,而不需要显示的调用反注册方法。这样带来的好处不仅可以编写更少的代码,而且可以完全杜绝其他通信总线类框架(如 EventBus、RxBus)忘记调用反注册所带来的内存泄漏的风险。

为什么要用 LiveDataBus 替代 EventtBus 和 RxBus

  • LiveDataBus 的实现及其简单 相对 EventBus 复杂的实现,LiveDataBus 只需要一个类就可以实现。

  • LiveDataBus 可以减小 APK 包的大小 由于 LiveDataBus 只依赖 Android 官方 Android Architecture Components 组件的 LiveData,没有其他依赖,本身实现只有一个类。作为比较,EventBus JAR 包大小为 57kb,RxBus 依赖 RxJava 和 RxAndroid,其中 RxJava2 包大小 2.2MB,RxJava1 包大小 1.1MB,RxAndroid 包大小 9kb。使用 LiveDataBus 可以大大减小 APK 包的大小。

  • LiveDataBus 依赖方支持更好 LiveDataBus 只依赖 Android 官方 Android Architecture Components 组件的 LiveData,相比 RxBus 依赖的 RxJava 和 RxAndroid,依赖方支持更好。

  • LiveDataBus 具有生命周期感知 LiveDataBus 具有生命周期感知,在 Android 系统中使用调用者不需要调用反注册,相比 EventBus 和 RxBus 使用更为方便,并且没有内存泄漏风险。

LiveDataBus 的设计和架构

LiveDataBus 的组成

  • 消息 消息可以是任何的 Object,可以定义不同类型的消息,如 Boolean、String。也可以定义自定义类型的消息。

  • 消息通道 LiveData 扮演了消息通道的角色,不同的消息通道用不同的名字区分,名字是 String 类型的,可以通过名字获取到一个 LiveData 消息通道。

  • 消息总线 消息总线通过单例实现,不同的消息通道存放在一个 HashMap 中。

  • 订阅 订阅者通过 getChannel 获取消息通道,然后调用 observe 订阅这个通道的消息。

  • 发布 发布者通过 getChannel 获取消息通道,然后调用 setValue 或者 postValue 发布消息。

LiveDataBus 原理图


LiveDataBus 原理图

LiveDataBus 的实现

第一个实现:

public final class LiveDataBus {
private final Map<String, MutableLiveData<Object>> bus;
private LiveDataBus() { bus = new HashMap<>(); }
private static class SingletonHolder { private static final LiveDataBus DATA_BUS = new LiveDataBus(); }
public static LiveDataBus get() { return SingletonHolder.DATA_BUS; }
public <T> MutableLiveData<T> getChannel(String target, Class<T> type) { if (!bus.containsKey(target)) { bus.put(target, new MutableLiveData<>()); } return (MutableLiveData<T>) bus.get(target); }
public MutableLiveData<Object> getChannel(String target) { return getChannel(target, Object.class); }}
复制代码


短短二十行代码,就实现了一个通信总线的全部功能,并且还具有生命周期感知功能,并且使用起来也及其简单:


注册订阅:


LiveDataBus.get().getChannel("key_test", Boolean.class)        .observe(this, new Observer<Boolean>() {            @Override            public void onChanged(@Nullable Boolean aBoolean) {            }        });
复制代码


发送消息:


LiveDataBus.get().getChannel("key_test").setValue(true);
复制代码


我们发送了一个名为”key_test”,值为 true 的事件。


这个时候订阅者就会收到消息,并作相应的处理,非常简单。

问题出现

对于 LiveDataBus 的第一版实现,我们发现,在使用这个 LiveDataBus 的过程中,订阅者会收到订阅之前发布的消息。对于一个消息总线来说,这是不可接受的。无论 EventBus 或者 RxBus,订阅方都不会收到订阅之前发出的消息。


对于一个消息总线,LiveDataBus 必须要解决这个问题。

问题分析

怎么解决这个问题呢?先分析下原因:


当 LifeCircleOwner 的状态发生变化的时候,会调用 LiveData.ObserverWrapper 的 activeStateChanged 函数,如果这个时候 ObserverWrapper 的状态是 active,就会调用 LiveData 的 dispatchingValue。



在 LiveData 的 dispatchingValue 中,又会调用 LiveData 的 considerNotify 方法。



在 LiveData 的 considerNotify 方法中,红框中的逻辑是关键,如果 ObserverWrapper 的 mLastVersion 小于 LiveData 的 mVersion,就会去回调 mObserver 的 onChanged 方法。而每个新的订阅者,其 version 都是-1,LiveData 一旦设置过其 version 是大于-1 的(每次 LiveData 设置值都会使其 version 加 1),这样就会导致 LiveDataBus 每注册一个新的订阅者,这个订阅者立刻会收到一个回调,即使这个设置的动作发生在订阅之前。


问题原因总结

对于这个问题,总结一下发生的核心原因。对于 LiveData,其初始的 version 是-1,当我们调用了其 setValue 或者 postValue,其 vesion 会+1;对于每一个观察者的封装 ObserverWrapper,其初始 version 也为-1,也就是说,每一个新注册的观察者,其 version 为-1;当 LiveData 设置这个 ObserverWrapper 的时候,如果 LiveData 的 version 大于 ObserverWrapper 的 version,LiveData 就会强制把当前 value 推送给 Observer。

如何解决这个问题

明白了问题产生的原因之后,我们来看看怎么才能解决这个问题。很显然,根据之前的分析,只需要在注册一个新的订阅者的时候把 Wrapper 的 version 设置成跟 LiveData 的 version 一致即可。


那么怎么实现呢,看看 LiveData 的 observe 方法,他会在步骤 1 创建一个 LifecycleBoundObserver,LifecycleBoundObserver 是 ObserverWrapper 的派生类。然后会在步骤 2 把这个 LifecycleBoundObserver 放入一个私有 Map 容器mObservers 中。无论 ObserverWrapper 还是 LifecycleBoundObserver 都是私有的或者包可见的,所以无法通过继承的方式更改 LifecycleBoundObserver 的 version。


那么能不能从 Map 容器mObservers 中取到 LifecycleBoundObserver,然后再更改 version 呢?答案是肯定的,通过查看 SafeIterableMap 的源码我们发现有一个 protected 的 get 方法。因此,在调用 observe 的时候,我们可以通过反射拿到 LifecycleBoundObserver,再把 LifecycleBoundObserver 的 version 设置成和 LiveData 一致即可。



对于非生命周期感知的 observeForever 方法来说,实现的思路是一致的,但是具体的实现略有不同。observeForever 的时候,生成的 wrapper不是 LifecycleBoundObserver,而是 AlwaysActiveObserver(步骤 1),而且我们也没有机会在 observeForever 调用完成之后再去更改 AlwaysActiveObserver 的 version,因为在 observeForever 方法体内,步骤 3 的语句,回调就发生了。



那么对于 observeForever,如何解决这个问题呢?既然是在调用内回调的,那么我们可以写一个 ObserverWrapper,把真正的回调给包装起来。


把 ObserverWrapper 传给 observeForever,那么在回调的时候我们去检查调用栈,如果回调是 observeForever 方法引起的,那么就不回调真正的订阅者。

LiveDataBus 最终实现

public final class LiveDataBus {
private final Map<String, BusMutableLiveData<Object>> bus;
private LiveDataBus() { bus = new HashMap<>(); }
private static class SingletonHolder { private static final LiveDataBus DEFAULT_BUS = new LiveDataBus(); }
public static LiveDataBus get() { return SingletonHolder.DEFAULT_BUS; }
public <T> MutableLiveData<T> with(String key, Class<T> type) { if (!bus.containsKey(key)) { bus.put(key, new BusMutableLiveData<>()); } return (MutableLiveData<T>) bus.get(key); }
public MutableLiveData<Object> with(String key) { return with(key, Object.class); }
private static class ObserverWrapper<T> implements Observer<T> {
private Observer<T> observer;
public ObserverWrapper(Observer<T> observer) { this.observer = observer; }
@Override public void onChanged(@Nullable T t) { if (observer != null) { if (isCallOnObserve()) { return; } observer.onChanged(t); } }
private boolean isCallOnObserve() { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); if (stackTrace != null && stackTrace.length > 0) { for (StackTraceElement element : stackTrace) { if ("android.arch.lifecycle.LiveData".equals(element.getClassName()) && "observeForever".equals(element.getMethodName())) { return true; } } } return false; } }
private static class BusMutableLiveData<T> extends MutableLiveData<T> {
private Map<Observer, Observer> observerMap = new HashMap<>();
@Override public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) { super.observe(owner, observer); try { hook(observer); } catch (Exception e) { e.printStackTrace(); } }
@Override public void observeForever(@NonNull Observer<T> observer) { if (!observerMap.containsKey(observer)) { observerMap.put(observer, new ObserverWrapper(observer)); } super.observeForever(observerMap.get(observer)); }
@Override public void removeObserver(@NonNull Observer<T> observer) { Observer realObserver = null; if (observerMap.containsKey(observer)) { realObserver = observerMap.remove(observer); } else { realObserver = observer; } super.removeObserver(realObserver); }
private void hook(@NonNull Observer<T> observer) throws Exception { //get wrapper's version Class<LiveData> classLiveData = LiveData.class; Field fieldObservers = classLiveData.getDeclaredField("mObservers"); fieldObservers.setAccessible(true); Object objectObservers = fieldObservers.get(this); Class<?> classObservers = objectObservers.getClass(); Method methodGet = classObservers.getDeclaredMethod("get", Object.class); methodGet.setAccessible(true); Object objectWrapperEntry = methodGet.invoke(objectObservers, observer); Object objectWrapper = null; if (objectWrapperEntry instanceof Map.Entry) { objectWrapper = ((Map.Entry) objectWrapperEntry).getValue(); } if (objectWrapper == null) { throw new NullPointerException("Wrapper can not be bull!"); } Class<?> classObserverWrapper = objectWrapper.getClass().getSuperclass(); Field fieldLastVersion = classObserverWrapper.getDeclaredField("mLastVersion"); fieldLastVersion.setAccessible(true); //get livedata's version Field fieldVersion = classLiveData.getDeclaredField("mVersion"); fieldVersion.setAccessible(true); Object objectVersion = fieldVersion.get(this); //set wrapper's version fieldLastVersion.set(objectWrapper, objectVersion); } }}
复制代码

注册订阅

LiveDataBus.get()        .with("key_test", String.class)        .observe(this, new Observer<String>() {            @Override            public void onChanged(@Nullable String s) {            }        });
复制代码

发送消息:

LiveDataBus.get().with("key_test").setValue(s);
复制代码

源码说明 LiveDataBus 的源码可以直接拷贝使用,也可以前往作者的 GitHub 仓库查看下载:https://github.com/JeremyLiao/LiveDataBus

总结

本文提供了一个新的消息总线框架——LiveDataBus。订阅者可以订阅某个消息通道的消息,发布者可以把消息发布到消息通道上。利用 LiveDataBus,不仅可以实现消息总线功能,而且对于订阅者,他们不需要关心何时取消订阅,极大减少了因为忘记取消订阅造成的内存泄漏风险。

作者介绍

  • 海亮,美团高级工程师,2017年加入美团,目前主要负责美团轻收银、美团收银零售版等 App 的相关业务及模块开发工作。


2020-02-20 21:51823

评论

发布
暂无评论
发现更多内容

中国区块链第一村的价值裂变

CECBC

5分钟速读之Rust权威指南(三)

wzx

rust

2021最热门的20个数据库学习总结,你会用哪几个?

北游学Java

Java MySQL 数据库 后端

爱奇艺世界大会|开幕式速览:智能制作助推影视工业化,匠心构筑行业健康生态

爱奇艺技术产品团队

如何给产品定价

石云升

创业 产品 职场经验 5月日更

搞服务器开发竟不知道宝塔为何物,有点说不过去呀!

liuzhen007

5月日更

新融合,新跳板:智能云网如何让企业数字化转型,起步即领先?

脑极体

架构实战营模块四总结

竹林七贤

“技术+应用”驱动金融科技创新融合

CECBC

金融

CRM Transaction处理中的权限控制

Jerry Wang

CRM SAP abap

图说丨一图看懂浪潮云“1231”业务战略

浪潮云

Flink消费Kafka

大数据技术指南

大数据 flink 5月日更

业界率先支持 MCP-OVER-XDS 协议,Nacos 2.0.1 + 1.4.2 Release 正式发布

阿里巴巴云原生

容器 微服务 云原生 k8s 中间件

终于来了!这份阿里P9纯手写的Java并发核心手册,把我没学会的高扩展、高性能、高可用全部讲清楚了

Java 程序员 架构 面试

Netty常用解码器学习笔记

风翱

Netty 5月日更

完蛋,好像病了

IT蜗壳-Tango

5月日更

高性能 JavaScriptの笔记(三)

空城机

JavaScript 大前端 5月日更

从0到1数字化转型的“精益落地”模式

高瑞

“碳中和”目标下的绿色金融探索

CECBC

金融

2021金三银四面试经历:阿里七面(已拿offer),面试经历+真题分享

Java 编程 程序员 架构 面试

爱奇艺世界大会|刘文峰:科技创新如何为用户和艺术家服务?

爱奇艺技术产品团队

Dubbo 延迟服务暴露

青年IT男

dubbo

被解救的代码 - 代码即服务时代来了!

阿里巴巴云原生

容器 微服务 开发者 云原生 开发工具

用ABAP 生成二维码 QR Code

Jerry Wang

二维码 SAP abap

五岳核心版上线!这份阿里开发手册核心版又将被多少人疯狂转载?

Java 编程 程序员 架构 面试

数据挖掘从入门到放弃(四):手撕(绘)关联规则挖掘算法

数据社

机器学习 5月日更

关于打的 umd 包在使用时,报 require is not defined 错误的问题出处

blueju

架构实战营模块四作业

竹林七贤

腾讯校招都会问些什么?| 五面腾讯(Java岗)经历分享

Java架构师迁哥

从基础到实战一应俱全,这份全网首发的Kafka技术手册,超详细!

Java架构师迁哥

4年Java开发经验,经常被问到高并发、性能调优方面的问题,该怎么办?

Java架构师迁哥

Android消息总线的演进之路:用LiveDataBus替代RxBus、EventBus_文化 & 方法_美团技术团队_InfoQ精选文章