AI实践哪家强?来 AICon, 解锁技术前沿,探寻产业新机! 了解详情
写点什么

Event Sourcing 和 CQRS 落地(一):UID-Generator 实现

  • 2019-07-01
  • 本文字数:6080 字

    阅读完需:约 20 分钟

Event Sourcing 和 CQRS落地(一):UID-Generator 实现

Event Sourcing 简单来说就是记录对象的每个事件而不是记录对象的最新状态,比如新建、修改等,只记录事件内容,当需要最新的状态的时,通过堆叠事件将最新的状态计算出来。那么这种模式查询的时候性能会变的非常差,这个时候就涉及到了 CQRS ,简单的理解就是读写分离,通过事件触发,将最新状态保存到读库,查询全都走读库,理论上代码层,数据库层,都可以做到分离,当然也可以不分离,一般来说为了保证数据库性能,这里起码会将数据库分离。

为什么要使用?

了解 Event Sourcing 的基本内容之后,我们可以发现这个模式有很多好处:


  • 记录了对象的事件,相当于记录了整个历史,可以查看到任意一个时间点的对象状态;

  • 都是以事件形式进行写入操作,理论上在高并发的情况下,没有死锁,性能会快很多;

  • 可以基于历史事件做更多的数据分析。


Event Soucing 通常会和 DDD CQRS 一起讨论,在微服务盛行的前提下,DDD 让整个软件系统松耦合,而 Event Soucing 也是面向 Aggregate,这个模式很符合 DDD 思想,同时由于 Event Sourcing 的特性,读取数据必然会成为瓶颈,这个时候 CQRS 就起到做用了,通过读写分离,让各自的职责专一,实际上在传统的方式中我们也可能会这么干,只是方式略微不同,比如有一个只读库,时时同步主库,让查询通过只读库进行,那么如果查询量特别大的时候,起码写库不会因为查询而下降性能。

背景

由于我们公司(匠人网络)的技术体系基本是 Spring 全家桶,而 Java 界似乎 Axon 又是比较流行的 Event Sourcing 框架,本着对新技术的尝试以及某些业务也确实有这方面的需求的出发点,对 Axon 做了一些尝试。后面的一系列文章将会以 Spring Cloud 作为背景,探讨 Axon 如何使用,以及如何处理一些常见的业务需求(溯源、读写分离、消息可靠等),所以在看后面的文章之前最好对 Spring Boot、Spring Cloud、Spring Cloud Stream、Spring Data JPA 等有一些基本的了解。

UID Generator

为什么要使用

由于 Event Soucing 是记录事件的,那么 Object Id 肯定就不能是用数据库生成的了,基本上所有的 Event Soucing 相关的框架都是将事件直接序列化,然后对应到 Object,所以这种情况下,就需要自己产生 ID,而自己生成 ID 的话,就有很多限制,比如需要根据时间递增,尽量比较短,在分布式的情况下 ID 保证不能重复等等,本文会比较几种方案,然后选择一种比较好的来实现。

方案选择

数据库

这种方案其实就是基于数据库的自增 ID,各个分布式系统通过一个数据库去分配 ID,由于依赖了数据库,性能肯定是个问题,如果部署多点数据库,不但实现麻烦,而且性能还是取决于数据库数量,所以在分布式系统当中,并发量大的系统一般不会采取该方案。

UUID

UUID 是通用唯一识别码 (Universally Unique Identifier),是由一组 32 位数的 16 进制数字所构成,也就是 128 bit。在规范字符串格式中,UUID 的十六个八位字节被表示为 32 个十六进制(基数 16)数字,以连字号分隔的五组来显示,形式为 8-4-4-4-12,总共有 36 个字符(即三十二个英数字母和四个连字号)。例如:


123e4567-e89b-12d3-a456-426655440000xxxxxxxx-xxxx-Mxxx-Nxxx-xxxxxxxxxxxx
复制代码


N 那个位置,只会是 8,9,a,b, M 那个位置,代表版本号,由于 UUID 的标准实现有 5 个版本,所以只会是 1,2,3,4,5。不同的版本基于的算法不一样,而在 Java 中最常用的 UUID.randomUUID() 是基于版本 4 的,基于随机数,也会有重复的概率,只是概率特别低,低到几乎可以忽略而已。


由于这种算法生成的 ID 是字符串,而且长度有特别的长,非常不利于建立索引等操作,所以通常不会用来作为主键。

Snowflake

为了满足在分布式系统中可以生成全局唯一且趋势递增的 ID,Twitter 推出了一种算法,该算法由 64 bit 组成。



  • 第一位永远是 0,实际上这是为了让生成的 ID 都为正数,以保证趋势递增;

  • 后面 41 位用来记录时间,理论上可以记录 2^41 毫秒,2^41/(24 * 3600 * 365 * 1000) = 69.7 年,所以这里的理论最大使用时间就是 70 年左右;

  • 在后面 10 位用来记录机器 ID ,更准确的应该说是实例 ID,对应的可以是某个 Container 或者某个进程,最多支持 1024 个;

  • 最后 12 位用来记录序列号,来保证每个实例每毫秒生成的 ID 唯一;


该算法的优点:


  • 不依赖数据库,高性能;

  • 生成的 ID 趋势递增;

  • 64 bit 的数字作为 ID 相比 UUID 短的多,方便建立数据库索引。


该算法的缺点:


  • 依赖系统时钟,如果系统时钟发生回拨,那么有可能造成 ID 冲突或乱序。

基于 Java 的实现

基于上面的分析,这里我们选择使用 Snowflake 来实现,Twitter 官方提供了一个 Scala 版本的实现,在这里我们实现一个 Java 版本,具体代码如下:


@Slf4jpublic class SnowFlake {
private static class TimeBackwardsException extends RuntimeException { public TimeBackwardsException(String message) { super(message); } }
/** * 起始的时间戳 */ private static final long START_STAMP = 1262275200000L;
/** * 每一部分占用的位数 */ private static final long SEQUENCE_BIT = 12; //序列号占用的位数 private static final long MACHINE_BIT = 10; //机器标识占用的位数
/** * 每一部分的最大值 */ private static final long MAX_MACHINE_NUM = -1L ^ (-1L << MACHINE_BIT); private static final long MAX_SEQUENCE = -1L ^ (-1L << SEQUENCE_BIT);
/** * 每一部分向左的位移 */ private static final long MACHINE_LEFT = SEQUENCE_BIT; private static final long TIMESTAMP_LEFT = SEQUENCE_BIT + MACHINE_BIT;
private long machineId; //机器标识 private long sequence = 0L; //序列号 private long lastStamp = -1L;//上一次时间戳
public SnowFlake(long machineId) {
if (machineId > MAX_MACHINE_NUM || machineId < 0) { throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0"); } this.machineId = machineId; }
/** * 产生下一个ID */ public synchronized Long nextId() { long currStamp = getNewStamp(); if (currStamp < lastStamp) { throw new TimeBackwardsException("Clock moved backwards. Refusing to generate id"); }
if (currStamp == lastStamp) { sequence = (sequence + 1) & MAX_SEQUENCE;
//同一毫秒的序列数已经达到最大 if (sequence == 0L) { sequence = new Random().nextInt(10); currStamp = getNextMill(); } } else { // 新的一毫秒,随机从 0-9 中开始 sequence = new Random().nextInt(10); } lastStamp = currStamp;
return (currStamp - START_STAMP) << TIMESTAMP_LEFT //时间戳部分 | machineId << MACHINE_LEFT //机器标识部分 | sequence; //序列号部分 }
private long getNextMill() { long mill = getNewStamp(); while (mill <= lastStamp) { mill = getNewStamp(); }
return mill; }
private long getNewStamp() { return System.currentTimeMillis(); }
}

复制代码


仔细阅读以下这段代码,你会发现有个地方和我们描述的不太一样:


    if (currStamp == lastStamp) {        sequence = (sequence + 1) & MAX_SEQUENCE;
//同一毫秒的序列数已经达到最大 if (sequence == 0L) { sequence = new Random().nextInt(10); currStamp = getNextMill(); } } else { // 新的一毫秒,随机从 0-9 中开始 sequence = new Random().nextInt(10); }
复制代码


在毫秒刷新的时候,我们并没有去把序列号置为 0,而是随机从 0-9 取了一个数,这么做的原因是在并发量不是特别高的时候,如果都从 0 开始的话,会导致生成的 ID 都是偶数,那么在做一些分表操作的时候,会导致严重的分配不均匀,所以这里我们随机从 0-9 开始让产生的 ID 尽可能的分配均匀。但是这么做是会下降性能的,每毫秒的 ID 生成数量会下降一些,但是这并没有下降数量级,完全是可以接受的。

基于 Spring Cloud 分配 Worker Id

上面介绍了如何使用 Snowflake 来生成 ID,那么结合 Spring Cloud ,我们需要给每个节点分配一个 Worker ID,但是由于 Spring Cloud 的特点,它是希望每个节点无状态化的,这就给我们分配 Worker ID 带来了一定的难度,如果我们需要区分每个几点,就不得不将节点信息存储到某个中央,然后再分配,为了便于之后的水平扩展,这里我们基于内部代码实现,大概的原理是在服务启动的时候,记录下节点 IP 和 MAC ,作为 Service Node Key 存储到数据库,这个 Key 在数据库中唯一,通过这个唯一的 Key 给不同的节点分配 ID。下面我们尝试使用 JPA 来实现这一过程。


Spring Cloud 在 2.1.0 之后提供了 getInstanceId() 方法,但是可以为空,所以需要看各个具体实现,我看了 K8S 和 consul 都提供了该方法的实现:


@Entity@Getter@Setter@NoArgsConstructorpublic class WorkerId {
@Id @GeneratedValue(strategy = GenerationType.AUTO) private Long id;
@Column(unique = true) private String serviceKey;}
@Repositorypublic interface WorkerIdRepository extends JpaRepository<WorkerId, Long> { WorkerId findByServiceKey(String serviceKey);}
@Service@AllArgsConstructor@Slf4jpublic class WorkerIdService {
private final WorkerIdRepository workerIdRepository; private final Registration registration;
Long getWorkerId() {
String serviceKey = getServiceKey();
WorkerId workerId = workerIdRepository.findByServiceKey(serviceKey);
if (workerId != null) { return workerId.getId() % (SnowFlake.MAX_MACHINE_NUM + 1); }
workerId = new WorkerId(); // 如果你的 Spring Boot 版本 >= 2.1.0 并且使用的 Discovery 提供了该方法的实现则可以直接使用 // workerId.setServiceKey(registration.getInstanceId()); workerId.setServiceKey(serviceKey); workerIdRepository.save(workerId); return workerId.getId() % (SnowFlake.MAX_MACHINE_NUM + 1); }
/** * 由于 Spring Cloud Discovery 的 ServiceInstance 接口没有一个获取 instance id 的方法,所以只能想办法自己标记 * Spring Cloud Discovery 在 2.1.0 之后的版本在接口中提供了 getInstanceId 这一方法,但是可以为空,所以需要各个实现,我看了 K8S 和 consul 都提供了该方法的实现 * @return ip:mac_address 形式的字符串 */ public String getServiceKey() { byte[] mac = null; String hostAddress = null; try {
Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces(); while (networkInterfaces.hasMoreElements()) { NetworkInterface networkInterface = networkInterfaces.nextElement(); Enumeration<InetAddress> addresses = networkInterface.getInetAddresses();
while (addresses.hasMoreElements()) { InetAddress addr = addresses.nextElement(); if (addr instanceof Inet4Address && !addr.isLoopbackAddress() && (networkInterface.getDisplayName().equals("en0") || networkInterface.getDisplayName().equals("eth0"))) { hostAddress = addr.getHostAddress(); mac = networkInterface.getHardwareAddress(); break; } } if (mac != null && StringUtils.isNotBlank(hostAddress)) { break; } } } catch (Exception e) { log.error(e.getMessage()); } if (mac == null || StringUtils.isBlank(hostAddress)) { return null; } // mac地址拼装成String StringBuilder macAddress = new StringBuilder(); for (int i = 0; i < mac.length; i++) { if (i != 0) { macAddress.append("-"); } //mac[i] & 0xFF 是为了把byte转化为正整数 String s = Integer.toHexString(mac[i] & 0xFF); macAddress.append(s.length() == 1 ? 0 + s : s); } // 把字符串所有小写字母改为大写成为正规的mac地址并返回 return hostAddress + ":" + macAddress.toString().toUpperCase(); }}

复制代码


建立相应的 Entity、Repository、Service,从代码中可以看到 Worker ID 的实现,获取 IP 和 MAC 作为唯一 Key 存入数据库,获取到自增 ID,然后对 Snowflake 的最大 Worker ID 取余,这样便得到了一个可用的 Worker ID。


然而这么做会不会有问题?由于 Worker ID 在 0-1023 之间反复,如果某些节点反复重启,超过 1024 次并且一些节点一直没有重启,就会出现 Worker ID 重复的情况。由于我们的业务目前节点的更新一般都是逐个依次重启,所以这里暂时不去处理这个问题,未来如果需要多个节点进行 AB 测试,这个时候可能就会出现某些节点频繁更新,而某些节点不变化的情况,届时可能就要重新考虑分配 ID 的方案了。


下面继续完成上面的方案,我们已经写好了相关的 Service ,剩下的就是在服务启动的时候向数据库写入信息了。


@Componentpublic class UIDGenerator {
private final WorkerIdService workerIdService; private SnowFlake flake;
@Autowired public UIDGenerator(WorkerIdService workerIdService) { this.workerIdService = workerIdService; }
public Long getId() { return flake.nextId(); }
@PostConstruct public void init() { this.flake = new SnowFlake(workerIdService.getWorkerId()); }}

复制代码


这里我们建立一个 UIDGenerator 作为服务的 ID 生成器,在启动的时候,通过 WorkerIdService 获取到一个 Worker ID,并构建 Snowfalke 对象,至此我们的 ID 生成器就基本完成。


作者介绍


周国勇,目前就职于杭州匠人网络创业,致力于楼宇资产管理的 SaaS 化,负责后端业务架构设计、项目管理,喜欢对业务模型的分析,热衷新技术的探索和实践,经常在踩坑的路上越走越远。


2019-07-01 08:309642

评论 2 条评论

发布
用户头像
本系列文章近期会持续更新,欢迎感兴趣的读者追踪阅读,也欢迎大家提出意见和见解~
2019-07-01 13:21
回复
没有更多了
发现更多内容

华为云ECS弹性云服务器,赋能企业加速数字化发展

秃头也爱科技

老周的 2022 年终总结

老周聊架构

年终总结 12月月更

音与影的魔法红线:双Vivid标准带来的超高清之变

脑极体

架构训练营-模块二作业

Sam

架构实战营

架构训练营模块二作业

gigifrog

架构训练营

2022-12-31:以下go语言代码输出什么?A:1 1;B:-1 1;C:-1 -1;D:编译错误。 package main import “fmt“ func main() { a

福大大架构师每日一题

golang 福大大 选择题

以华为云ECS为例,解读中小企业为何纷纷转投弹性云服务器

秃头也爱科技

云端高性能计算,华为云ECS助力企业数字化转型

秃头也爱科技

架构设计模块三作业

附加信息

架构训练营

【web 开发基础】PHP面向对象之访问类中的成员属性和方法(58)

迷彩

面向对象 this指针 PHP基础 PHP8 实例化

大势所趋_ 华为云企业交换机ESW助力智慧医院转型

路过的憨憨

【web 开发基础】PHP类的构造方法和析构方法(59 )

迷彩

面向对象 PHP基础 构造方法 析构方法

2023-01-01:remix-ide是浏览器的ide,官方已经提供地址,但是需要连接外网。如果是内网,需要自己在服务器里搭建remix-ide;另一种方式是用remix-ide的桌面版。这里只讨论

福大大架构师每日一题

云原生 k8s k3s 福大大 remix-ide

WIKO+鸿蒙生态:海外品牌中国化的新范式

脑极体

华为云企业交换机ESW,让数据业务无缝迁移上云

路过的憨憨

让上云变成一件简单的事情!华为云企业交换机支持无缝迁移上云

路过的憨憨

如何为企业打造优质应用环境!华为云弹性服务器了解一下

路过的憨憨

反编译APK获取代码&资源

芯动大师

Android Studio APK 反编译

模块4

KING

AI-002-十分钟理解ChatGPT的技术逻辑及演进(前世、今生)

非典型产品经理笔记

NLP 大模型 人工智能’ ChatGPT

2022年度总结-个人成长视角

非典型产品经理笔记

个人成长 网络安全 年终总结

回顾与展望Zebec举办的“Web3.0 TechHive Summit 2022 大会”

鳄鱼视界

好评爆棚的华为弹性云服务器,究竟有哪些亮点?

秃头也爱科技

AI-001-火爆全网的聊天机器人ChatGPT能做什么

非典型产品经理笔记

nlp 人工智能’ ChatGPT

OpenTelemetry系列 (五)| OpenTelemetry Java Instrumentation二次开发指南

骑牛上青山

Java 调用链 OpenTelemetry 微服务调用链 agent

华为云弹性服务器ECS,如何入局新能源产业?

秃头也爱科技

弹性公网IP支持多产品灵活绑定或解绑,能为企业提供独立公网IP资源!

秃头也爱科技

【web 开发基础】PHP中的访问方法(60)

迷彩

PHP基础 property 访问方法 类的封装 封装性

助力企业构建更可靠的云上云下网络,华为云企业交换机巧解企业上云难题!

路过的憨憨

【web 开发基础】PHP面向对象中类的继承(61)

迷彩

面向对象 extends PHP基础 类的继承 类的实例化

【JVM故障问题排查心得】「内存诊断系列」Xmx和Xms的大小是小于Docker容器以及Pod的大小的,为啥还是会出现OOMKilled?

码界西柚

jdk JVM 12 月 PK 榜 OOMKilled

Event Sourcing 和 CQRS落地(一):UID-Generator 实现_文化 & 方法_周国勇_InfoQ精选文章