银行选型和排坑实战:用开源软件自建分布式数据服务平台

阅读数:2 2020 年 3 月 18 日 10:15

银行选型和排坑实战:用开源软件自建分布式数据服务平台

本文由 dbaplus 社群授权转载。

之前设计篇讲了数据拆分的方式、场景、优缺点以及实施步骤,偏方法与理论。技术篇会介绍分布式数据服务平台设计与实现,讲述如何通过技术手段解决数据拆分带来的各种问题,以及各中间件的架构与原理。

平台主要包括分布式数据访问中间件(SDK、Proxy)、平滑扩容、数据集成、管控平台等四部分。

一、分布式数据访问中间件

数据拆分后,分散在多个库与表中,但应用开发时怎样才能准确访问数据库,换言之,如何才能拿到准确的数据库连接,拼接出正确的 sql(主要是实际表名),然后执行返回结果集呢?

为了尽可能减少业务侵入性,应用少做改造,往往都会抽象出一个数据访问层负责上述功能。数据访问层按实现方式不同,可分为应用自定义、数据中间件、分布式数据库三种方式,在我们项目中采用的是中间件方式,其技术架构如下:

银行选型和排坑实战:用开源软件自建分布式数据服务平台

分布式数据访问层

按照接入方式不同,数据访问中间件可以分为 SDK、Proxy(云原生架构下可能还会有 sidecar 方式)。

一个典型的分库分表中间件由 JDBC 接口实现(SDK 模式)、MySQL 报文解析(Proxy、Sider 模式)、SQL 解析器,路由计算、SQL 重写, SQL 执行、聚合处理、结果集合并、数据源管理、配置管理等部分构成。

1、JDBC 接口实现

JDBC 接口实现起来并不太难,数据库连接池都是基于此实现,本质上就是一种装饰器模式,主要就是 java.sql 与 javax.sql 包下 DataSource、Connection、Statement,PreparedStatement,ResultSet、DatabaseMetaData、ResultSetMetaData 等接口。这些接口也并不是都需要实现,不常用的接口可在集成一些框架时根据需要再实现。

2、MySQL 报文解析

MySQL 报文解析比 JDBC 接口复杂些,它包含了很多 MySQL 的命令,需要对照 MySQL 报文规范分别进行解析,另外由于 proxy 还要支持常见 DBA 工具接入,比如 MySQL CLI、Navicat、Dbvisualizer、MySQL workbench 等,这些工具甚至不同版本使用的 MySQL 报文都不完全一样,这块的兼容性也是一个繁琐的工作,考验对 Mysql 报文的支持的完整度。这部分像 Sharding-Proxy、Mycat 等都有实现,如果要自行研发或者扩展优化,可参考其实现细节。

MySQL 报文规范: https://dev.mysql.com/doc/internals/en/client-server-protocol.html

3、SQL 解析

SQL 解析是个繁琐复杂的活儿,对应就是词法 Lexer 与语法分析 Parser,因为要最大程度兼容各数据库厂商 SQL,这块是需要不断的迭代增强的。开源的手写解析器有阿里开源的 druid,也可以使用 javacc、antlr 等进行实现,相比手写解析器速度要慢些,但扩展定制化能力更好。这类解析器在使用方式上,多采用 vistor 设计模式,如果需要可以编写自己的 vistor 从而获取所需 AST(Abstract Syntax Tree)中的各类值。

4、路由计算

路由计算是根据 SQL 解析后 AST,提取分库分表列值(提取规则是预先配置好的),然后根据应用指定的运算表达式或者函数进行计算,分别得到数据库与表对应的序号(一般就是一个整型数值,类似一个数组下标)或者是真正的物理表名。读写分离模式下,只涉及库路由,会根据一个负载均衡算法选取一个合适的物理库,如果写 SQL 则会选择主库,如果是读则会按照随机、轮询或者权重等算法选择一个从库。

5、SQL 重写

SQL 重写主要为表名添加后缀(应用写 SQL 时是逻辑表名,实际表名往往是逻辑表名 + 序号),根据路由计算环节得到的物理表名,替换原 SQL 中的逻辑表名。另外 SQL 中有聚合函数、多库表分页等操作时,也会涉及到对 SQL 的改写,这部分有的开源中间件里也叫做 SQL 优化。注意这里最好不要简单的用字符串匹配去替换表名,例如当存在列名与表名一样的情况下会出现问题。

6、SQL 执行

SQL 执行负责 SQL 的真正执行,对应的就是执行连接池或数据库驱动中 Statement 的 execute、executeQuery、executeUpdate、executeBatch 等方法。当然如果是涉及到多库多表的 SQL,例如 where 条件不包含分库分表键,这时会涉及到库表扫描,则需要考虑是连接优先还是内存优先,即采用多少个并发数据库连接执行,连接数太大则会可能耗尽连接池,给内存以及数据库带来很大压力;但连接数太小则会拉长 SQL 执行时间,很有可能带来超时问题,所以一个强大的 SQL 执行器还会根据 SQL 类型、数据分布、连接数等因素生成一个到合适的执行计划。

7、数据源管理

数据源管理负责维护各数据库的连接,这块实现起来比较简单,一般维护一个数据库连接池 DataSource 对象的 Map 就可以,只要根据数据源下标或者名称可以拿到对应的数据库连接即可。

8、聚合处理

聚合处理负责对聚合类函数的处理,因为分库分表后,实际执行的 SQL 都是面向单库的,而对于 max、min、sum、count、avg 等聚合操作,需要将各单库返回的结果进行二次处理才能计算出准确的值,例如 max、min、sum、count 需要遍历个各库结果,然后分别取最大、最小、累加,对于 avg 操作,还需要将原 SQL 修改为 select sum,count,然后分别累加,最后用累积后的 sum 除以累加后 count 才能得到准确值。另外对于多库表的分页操作,例如 limit 1,10,则将单库 SQL 的起始页都修改为第一页即 limit 0,10,然后再整体排序取出前 10 个才是正确的数据。

9、结果集合并

结果集合并负责将多个 SQL 执行单元返回的数据集进行合并,然后返回给调用客户端。一般当进行库表遍历、或者涉及多个库 SQL(例如使用 in 时)会需要进行合并。当然并不一定需要把数据全部读到内存再合并,有时基于数据库驱动实现的 ResultSet.next() 函数,逐条从数据库获取数据即可满足要求。关于结果集合并,sharding-jdbc 对此有一个更丰富的抽象与分类,支持流式归并、内存归并、分组归并等,具体可参见归并引擎。

归并引擎: https://shardingsphere.apache.org/document/current/cn/features/sharding/principle/merge/

10、配置管理

配置管理负责分库分表的规则以及数据源的定义,这块是面向应用开发者的,在使用体验上应当简单、易用、灵活。其中会涉及到物理数据源(参数跟连接池类似)、逻辑表、路由规则(库路由、表路由,库表分布,支持指定 java 函数或者 groovy 表达式),逻辑表 -> 路由规则的映射关系。另外我们在实践时还包括了一些元数据信息,包括 shardID-> 库表序号,这样做有个好处,业务在配置路由规则时只需要关注业务对象 ->shardID 即可。配置管理在具体形式方面,可以支持 xml、yaml、也支持在管控平台上在线进行配置,后者会通过将配置同步到配置中心,进而支持数据访问层进行编排(orchestration),例如在线扩容时需要动态增加数据源、修改路由规则、元数据信息等。

一个完整的分布式数据访问中间件,在架构上和数据库的计算层很像,尤其如果涉及到 DB 协议报文与 SQL 的解析,还是一个复杂和工作量较大的工程,因此一般应用团队建议还是采用开源成熟的方案,基于此做定制优化即可,没必要重复造轮子。

SDK 和 Proxy 方式各有优缺点,在我们项目中分别用在不同的场景,简单总结如下:

  • 联机交易 高频、高并发,查询带拆分键,数据量小,sdk 方式;
  • 运维 低频、查询条件灵活,数据量大,以查询为主 proxy 方式;
  • 批量 不携带分库分表列,数据量大,查询、更新、插入、删除都有,通过 API 指定库表方式。

接下来介绍下我们在开源中间件方面的实践,分为三个阶段

第一阶段

早些年这类开源中间件还挺多,但其实都没有一个稳定的社区支持。2015 年时我们基于一个类似 TDDL 的组件,对其事务、数据连接池、SQL 解析等方面进行了优化,修复了数十个开发遇到的 bug,实现 SDK 版本的数据访问中间件,暂就叫做 DAL。

第二阶段

2017 年,系统上线后发现,开发测试以及运维还需要一个执行分库分表 SQL 的平台,于是我们调研了 Mycat,但当时 1.6 版本只支持单维度拆分(单库内分表或者只分库),因此我们重写了其后端 SQL 路由模块,结合原 SDK 版本数据组件,利用 Mycat 的报文解析实现了 Proxy 的数据访问层。

Proxy 模式的数据访问层上线后,可以很好的应对带分库分表键的 SQL 操作,但在涉及到库表遍历时,由于并发连接太多,经常会导致连接数不够,但如果串行执行则经常导致执行时间太长,最后超时报错。针对这个问题,我们做了个新的优化:
在将这类库表遍历的查询在生成执行计划时,通过 union all 进行了改写,类似 map-reduce,同一库上的不同表的 sql 通过 union all 合并,然后发到数据库执行,这样连接数 = 物理数据库总数,同时尽可能的利用了数据库的计算能力,在损耗较少连接数的前提下,大大提升了这类 SQL 的执行效率。(注意 order by 和 limit 需要加在 union all 的最后,为了不影响主库,可以将这类查询在从库执行)。
例如 user 表拆分成 1024 表,分布在 4 个库,SQL 拆分与合并示意图如下:

银行选型和排坑实战:用开源软件自建分布式数据服务平台

通过 union all 实现库表遍历

第三阶段

这两个中间件在运行 3 年左右后,也暴露出来了很多问题,例如 SQL 限制太多,兼容性太差,开源社区不活跃,部分核心代码设计结构不够清晰等,这给后续更复杂场景的使用带来了很多桎梏。因此在 19 年,我们决定对数据访问层进行升级重构,将底层分库分表组件与上层配置、编排进行剥离,改成插拔式设计,增加更加多元的分库分表组件。在那时开源社区已经涌现了一些优秀的分库分表项目,目前来看做的最好的就是 shardingshpere(后面简称 ss)了,ss 的设计与使用手册其官网都有详细介绍,这里主要简单介绍下我们集成 ss 的一些实践。

shardingsphere 整体设计架构清晰,内核各个引擎设计职责明确,jdbc 与 proxy 版本共享内核,接入端支持的多种实现方式。治理、事务、SQL 解析器分别单独抽象出来,都可以 hook 方式进行集成,通过 SPI 进行扩展。这种灵活的设计也为我们定制带来了很大的方便,代码实现上比较优雅。我们在集成时开始是 3.0.0 版本,后来升级到 4.0.0-RC1 版本,目前 ss 已发布 4.0.0 的 release 版本。

1)配置兼容

因为要在上层应用无感知的情况下更换底层分库分表引擎,所以改造的第一个问题就是兼容以前的配置。基于此,也就无法直接使用 sharding-jdbc 的 spring 或者 yaml 配置方式,而改用 API 方式,将原配置都转换为 sharding-jdbc 的配置对象。这块工作量时改造里最大的,但如果项目之前并没有分库分表配置,则直接在 sharding-jdbc 提供的方式中选择一种即可。由于我们项目中需要支持规则链、读权重等 ss 不支持功能,所以我们是基于 ComplexKeysShardingAlgorithm 接口进行的实现。

更简洁的 yaml 配置形式:

复制代码
ds:
master_0:
blockingTimeoutMillis: 5000
borrowConnectionTimeout: 30
connectionProperties: {}
idleTimeoutMinutes: 30
jdbcUrl: jdbc:mysql://localhost:3306/shard_0
logAbandoned: false
maintenanceInterval: 60
maxConn: 10
maxIdleTime: 61
minConn: 1
userName: root
password: 123456
queryTimeout: 30
testOnBorrow: false
testOnReturn: false
testQuery: null
testWhileIdle: true
timeBetweenEvictionRunsMillis: 60000
master_1:
jdbcUrl: jdbc:mysql://localhost:3306/shard_1
parent: master_0
groupRule: null
shardRule:
bindingTables:
- user,name
rules:
userTableRule:
dbIndexs: master_0,master_1
dataNodes: master_0.user_${['00','01']},master_1.user_${['02','03']}
dbRules:
- return test.dal.jdbc.shardingjdbc.YamlShardRuleParser.parserDbIndex(#user_id#)
- return test.dal.jdbc.shardingjdbc.YamlShardRuleParser.parserDbIndexByName(#name#,#address#)
tbRules:
- return test.dal.jdbc.shardingjdbc.YamlShardRuleParser.parserTbIndex(#user_id#)
- return test.dal.jdbc.shardingjdbc.YamlShardRuleParser.parserTbIndexByName(#name#,#address#)
tableRuleMap: {name: nameTableRule, user: userTableRule}

2)事务级别

sharding-jdbc 的默认事务是 local,即最大努力一阶段提交,或者叫链式提交,这种方式的好处是对应用透明,性能也还不错,互联网中使用较多。但这种方式可能会由于网络等原因导致部分提交成功,部分失败。虽然这种概率可能并不高,但一旦出现则会产生事务不一致的问题,这在金融关键场景下风险是很高的。所以我们在联机交易场景下禁止使用这种方式,而是要求必须严格单库事务,我们在先前 SDK 版本的数据访问中间件增加了校验,一旦跨库就直接抛异常。因此切换到 sharding-jdbc,这种事务级别也要继续支持。实现代码片段:

复制代码
/**
* Single DB Transaction Manager
* SPI: org.apache.shardingsphere.transaction.spi.ShardingTransactionManager
*/
@NoArgsConstructor
public class SingleDBTransactionManager implements ShardingTransactionManager {
private Map<String, DataSource> dataSources = new HashMap<String, DataSource>();
private ThreadLocal<String> targetDataSourceName = new ThreadLocal<String>() {
protected String initialValue() {
return null;
}
};
private ThreadLocal<Connection> connection = new ThreadLocal<Connection>() {
protected Connection initialValue() {
return null;
}
};
private ThreadLocal<Boolean> autoCommitted = new ThreadLocal<Boolean>() {
protected Boolean initialValue() {
return true;
}
};
@Override
public void close() throws Exception {
if (connection.get() != null) {
connection.get().close();
}
}
@Override
public void init(DatabaseType databaseType, Collection<ResourceDataSource> resourceDataSources) {
for (ResourceDataSource res : resourceDataSources) {
dataSources.put(res.getOriginalName(), res.getDataSource());
}
}
@Override
public TransactionType getTransactionType() {
return TransactionType.SINGLEDB;
}
@Override
public Connection getConnection(String dataSourceName) throws SQLException {
if (!ConditionChecker.getInstance().isMultiDbTxAllowed() && targetDataSourceName.get() != null
&& !targetDataSourceName.get().equals(dataSourceName)) {
throw new TransactionException(
"Don't allow multi-db transaction currently.previous dataSource key="
+ targetDataSourceName.get() + ", new dataSource key=" + dataSourceName);
}
targetDataSourceName.set(dataSourceName);
if (connection.get() == null) {
connection.set(dataSources.get(dataSourceName).getConnection());
}
return connection.get();
}
}

3)读库权重

虽然多个从库(一个主一般都要挂两个或者三个从,从库的数量由 RPO、多活甚至监管要求等因素决定)可以提供读功能,但细分的话,这些从库其实是有“差别”的,这种差异性有可能是由于机器硬件配置,也可能是由于所在机房、网络原因导致,这种时候就会需要支持读权限的权重配置,例如我们项目中有单元化的设计,需要根据当前所在单元及权重配置路由到当前机房的从库。另外也可以通过调整权重,支持在线对数据库进行维护或者升级等运维操作。实现代码片段:

复制代码
/**
* Weight based slave database load-balance algorithm.
* SPI: org.apache.shardingsphere.spi.masterslave.MasterSlaveLoadBalanceAlgorithm
*/
public final class WeightMasterSlaveLoadBalanceAlgorithm implements MasterSlaveLoadBalanceAlgorithm {
public final static String TYPE = "WEIGHT";
protected DataSource dataSource;
public WeightMasterSlaveLoadBalanceAlgorithm(DataSource ds) {
this.dataSource = ds;
}
public WeightMasterSlaveLoadBalanceAlgorithm(){
}
@Override
public String getDataSource(final String name, final String masterDataSourceName, final List<String> slaveDataSourceNames) {
String selectReadDb = dataSource.getTableRuleContext().getGroupRule(name).selectReadDb();
return slaveDataSourceNames.contains(selectReadDb) ? selectReadDb : null;
}
@Override
public String getType() {
return TYPE;
}

4)SQL 开关

SDK 模式的数据访问中间件,主要用在联机交易中,在这类场景下,是没有 DDL 操作需求的,也是不允许的,但 shading-jdbc 作为一个通用的数据分片中间件。对此并没有相应的开关配置,因此我们增加开关功能,应用在默认情况下,对 DDL、DCL 等语句进行了校验,不允许执行该类 SQL,在技术层面杜绝了应用的误用。实现代码片段:

复制代码
//SPI: org.apache.shardingsphere.core.parse.hook.ParsingHook
public class AccessPrevilegeCheckHook implements ParsingHook {
@Override
public void start(String sql) {
}
@Override
public void finishSuccess(SQLStatement sqlStatement, ShardingTableMetaData shardingTableMetaData) {
ConditionChecker.getInstance().checkDdlAndDcl(sqlStatement);
}
}
//SPI:org.apache.shardingsphere.core.rewrite.hook.RewriteHook
@NoArgsConstructor
public class TableScanCheckHook implements RewriteHook {
private List<TableUnit> tableUnits = new LinkedList<TableUnit>();
@Override
public void start(TableUnit tableUnit) {
if(tableUnits.size() > 0 && !ConditionChecker.getInstance().isTableScanAllowed()){
throw new RouteException("Don't allow table scan.");
}
tableUnits.add(tableUnit);
}
}
public class ConditionChecker {
private static ThreadLocal<SQLType> sqlTypeSnapshot = new ThreadLocal<SQLType>();
private boolean defalutTableScanAllowed = true;
private boolean defalutMultiDbTxAllowed = true;
private boolean defalutDdlAndDclAllowed = true;
private static ConditionChecker checker = new ConditionChecker();
public static ConditionChecker getInstance() {
return checker;
}
private ConditionChecker() {
}
private ThreadLocal<Boolean> tableScanAllowed = new ThreadLocal<Boolean>() {
protected Boolean initialValue() {
return defalutTableScanAllowed;
}
};
private ThreadLocal<Boolean> multiDbTxAllowed = new ThreadLocal<Boolean>() {
protected Boolean initialValue() {
return defalutMultiDbTxAllowed;
}
};
private ThreadLocal<Boolean> ddlAndDclAllowed = new ThreadLocal<Boolean>() {
protected Boolean initialValue() {
return defalutDdlAndDclAllowed;
}
};
public void setDefaultCondtion(boolean tableScanAllowed, boolean multiDbTxAllowed, boolean ddlAndDclAllowed) {
defalutTableScanAllowed = tableScanAllowed;
defalutMultiDbTxAllowed = multiDbTxAllowed;
defalutDdlAndDclAllowed = ddlAndDclAllowed;
}
public boolean isTableScanAllowed() {
return tableScanAllowed.get();
}
public void setTableScanAllowed(boolean tableScanAllowed) {
this.tableScanAllowed.set(tableScanAllowed);
}
public boolean isMultiDbTxAllowed() {
return multiDbTxAllowed.get();
}
public void setMultiDbTxAllowed(boolean multiDbTxAllowed) {
this.multiDbTxAllowed.set(multiDbTxAllowed);
}
public boolean isDdlAndDclAllowed() {
return ddlAndDclAllowed.get();
}
public void setDdlAndDclAllowed(boolean ddlAllowed) {
this.ddlAndDclAllowed.set(ddlAllowed);
}
public SQLType getSqlTypeSnapshot() {
return sqlTypeSnapshot.get();
}
public void checkTableScan(boolean isTableScan) {
if (!isTableScanAllowed())
throw new ConditionCheckException("Don't allow table scan.");
}
public void checkDdlAndDcl(SQLStatement sqlStatement) {
sqlTypeSnapshot.set(sqlStatement.getType());
if (!isDdlAndDclAllowed()
&& (sqlStatement.getType().equals(SQLType.DDL) || sqlStatement.getType().equals(SQLType.DCL))) {
throw new ConditionCheckException("Don't allow DDL or DCL.");
}
}
public void checkMultiDbTx(Map<String, Connection> cachedConnections, String newDataSource) {
if (!isMultiDbTxAllowed() && cachedConnections.size() > 0 && !cachedConnections.containsKey(newDataSource)) {
throw new ConditionCheckException("Don't allow multi-db transaction currently.old connection key="
+ cachedConnections.keySet() + "new connection key=" + newDataSource);
}
}
}

5)路由规则链

在我们项目中,对于一张表,在不同场景下可能会使用不同的分库分表列,例如有的是账号、有的是客户号(这两列都可路由到同一库表中),这时候就需要路由模块可以依次匹配搭配多个规则,例如 SQL 中有账号则用 account-rule,有客户号则用 customer-rule,因此我们支持了规则链配置功能,但 sharding-jdbc 只支持配置一个路由规则,因此在自定义路由算法函数中,我们增加了对规则链的支持。实现代码片段:

复制代码
public abstract class ChainedRuleShardingAlgorithm implements ComplexKeysShardingAlgorithm {
protected final DataSource dataSource;
public ChainedRuleShardingAlgorithm(DataSource ds) {
this.dataSource = ds;
}
@Override
public Collection<String> doSharding(Collection availableTargetNames, ComplexKeysShardingValue shardingValue) {
List<String> targets = new ArrayList<String>();
Set<String> actualNames = HintManager.isDatabaseShardingOnly() ? getHintActualName(shardingValue)
: calculateActualNames(shardingValue);
for (String each : actualNames) {
if (availableTargetNames.contains(each)) {
targets.add(each);
}
}
clear();
return targets;
}
@SuppressWarnings({ "serial", "unchecked" })
protected Set<String> calculateActualNames(ComplexKeysShardingValue shardingValue) {
Set<String> target = new HashSet<String>();
Map<String/* table */, Map<String/* column */, Collection/* value */>> shardingMap = new HashMap<String, Map<String, Collection>>();
String logicalTableName = shardingValue.getLogicTableName();
Map<String, Collection> shardingValuesMap = shardingValue.getColumnNameAndShardingValuesMap();
for (final Entry<String, Collection> entry : shardingValuesMap.entrySet()) {
if (shardingMap.containsKey(logicalTableName)) {
shardingMap.get(logicalTableName).put(entry.getKey(), entry.getValue());
} else {
shardingMap.put(logicalTableName, new HashMap<String, Collection>() {
{
put(entry.getKey(), entry.getValue());
}
});
}
}
// 遍历规则链, 查询匹配规则
for (String tableName : shardingMap.keySet()) {
RuleChain ruleChain = dataSource.getTableRuleContext().getRuleChain(tableName);
for (GroovyListRuleEngine engine : getRuleEngine(ruleChain)) {
Set<String> parameters = engine.getParameters();
Map<String, Collection> columnValues = shardingMap.get(tableName);
Set<String> eval = eval(columnValues, parameters, engine, ruleChain);
if (eval.size() > 0) {// 匹配即中止
target.addAll(eval);
return target;
}
}
}
return target;
}
@SuppressWarnings("unchecked")
protected Set<String> eval(final Map<String, Collection> columnValues, Set<String> parameters,
GroovyListRuleEngine engine, RuleChain ruleChain) {
Set<String> targetNames = new HashSet<String>();
if (columnValues.keySet().containsAll(parameters)) {// 匹配
List<Set<Object>> list = new LinkedList<Set<Object>>();// 参数集合
List<String> columns = new LinkedList<String>();// 列名集合
for (final String requireParam : parameters) {
list.add(convertToSet(columnValues.get(requireParam)));
columns.add(requireParam);
}
Set<List<Object>> cartesianProduct = Sets.cartesianProduct(list);
for (List<Object> values : cartesianProduct) {
Map<String, Object> arugmentMap = createArugmentMap(values, columns);
int index = engine.evaluate(arugmentMap);
targetNames.add(getActualName(ruleChain, index));
}
}
return targetNames;
}
private Set<Object> convertToSet(final Collection<Object> values) {
return Sets.newLinkedHashSet(values);
}
private Map<String, Object> createArugmentMap(List<Object> values, List<String> columns) {
HashMap<String, Object> map = new HashMap<String, Object>();
for (int i = 0; i < columns.size(); i++) {
map.put(columns.get(i).toLowerCase(), values.get(i));
}
return map;
}
protected abstract List<GroovyListRuleEngine> getRuleEngine(RuleChain ruleChain);
protected abstract String getActualName(RuleChain ruleChain, int index);
}
/**
* 库路由算法
*/
public class ChainedRuleDbShardingAlgorithm extends ChainedRuleShardingAlgorithm {
public ChainedRuleDbShardingAlgorithm(DataSource ds) {
super(ds);
}
@Override
protected List<GroovyListRuleEngine> getRuleEngine(RuleChain ruleChain) {
return ruleChain.getDbRuleList();
}
@Override
protected String getActualName(RuleChain ruleChain,int index) {
//add mapping from shard metadata
String dbIndex = dataSource.getHintSupport().getShardingDb(String.valueOf(index));
if(StringUtils.isEmpty(dbIndex)){
return ruleChain.getTableRule().getDbIndexArray()[index];
}else{
return ruleChain.getTableRule().getDbIndexArray()[Integer.valueOf(dbIndex)];
}
}
}
/**
* 表路由算法
*/
public class ChainedRuleTableShardingAlgorithm extends ChainedRuleShardingAlgorithm {
public ChainedRuleTableShardingAlgorithm(DataSource ds) {
super(ds);
}
@Override
protected List<GroovyListRuleEngine> getRuleEngine(RuleChain ruleChain) {
return ruleChain.getTableRuleList();
}
@Override
protected String getActualName(RuleChain ruleChain, int index) {
//add mapping from shard metadata
String tbShardIndex = dataSource.getHintSupport().getShardingTable(String.valueOf(index));
int tbIndex = index;
if(!StringUtils.isEmpty(tbShardIndex)){
tbIndex = Integer.valueOf(tbShardIndex);
}
SuffixManager suffixManager = ruleChain.getTableRule().getSuffixManager();
if(suffixManager.isInlineExpression()){
return ruleChain.getTbIndexs()[tbIndex];
}else{
Suffix suffix = suffixManager.getSuffix(0);
return String.format("%s%s%0"+suffix.getTbSuffixWidth() +"d", ruleChain.getLogicTable(),suffix.getTbSuffixPadding(), suffix.getTbSuffixFrom() + tbIndex);
}
}

6)管控平台对接

我们提供了一个管控平台,支持分布式数据相关组件在线配置,这些通过配置中心统一下发到各应用,而且支持动态变更。不管是 SDK 模式还是 Proxy 模式的数据访问中间件都使用的是同一份分库分表配置,只是接入方式不同而已。因此在集成 ss 的时候,还需要增加从配置中心获取配置的功能,这块主要涉及的调用配置中心 API 获取配置,这里就不贴具体代码了。

数据访问中间件的发展演进方向,未来其将会是多种形态的混合存在。

银行选型和排坑实战:用开源软件自建分布式数据服务平台

分布式数据访问中间件的三种模式

二、平滑扩容

在设计篇中已经介绍了扩容的机制,简单的说,平滑扩容就是通过异步复制,等数据接近追平后禁写,修改路由,然后恢复业务。主要目的是自动化、以及尽可能缩短停机窗口,目前一些云产品比如阿里云 DRDS、腾讯云 TDSQL 等的一键在线扩容本质上都是基于此机制。

但实践中这个过程需要多个步骤,数据库数量越多,操作风险越大,而且需要停机完成。为此我们与数据库团队一起设计与开发了平滑扩容功能。

我们将整个扩容环节,分为配置、迁移、校验、切换、清理五个大的步骤,每个步骤里又由多个任务构成。扩容任务在管控平台上建立,平滑扩容模块自动依次触发各个任务。

银行选型和排坑实战:用开源软件自建分布式数据服务平台

平滑扩容
  • 配置环节,主要是应用系统方定义扩容后的分库分表配置;
  • 迁移环节,依次自动完成从复制,同时进行数据校验;
    • 切换环节,首先进行禁写,断开主从只从,然后修改路由规则,最后再解除扩容库禁写。整个过程应用无需停机,仅仅会有一段时间禁写,这个时间一般来说也就十来秒;
  • 清理环节,清理环节是在后台异步处理,即清理数据库冗余表。

在分布式数据服务管控平台定义好扩容前后分库分表配置后,即可启动一键在线扩容。在扩容过程中可实时监控扩容进度,同时支持扩容中断恢复以及回滚。

三、数据集成

微服务架构下,有大量需要数据集成的场景:

  • 业务系统之间,例如下订单后需要通知库存、商家,然后还要推送到大数据等下游系统;
  • 分库分表后,为了应对其它维度查询,会需要建立异构索引,这样就需要数据传输到另外一套数据库中;
  • 系统内应用与中间件之间,例如如果使用 redis 等缓存,在操作完数据库还要更新缓存,类似这类数据集成需求,最朴素的解决方式就是双写,但双写一个问题是增加了应用复杂性,另外当发生不一致的情况是难以处理。

这类问题本质上也属于分布式事务场景,一种简单的方式就是基于 MQ 可靠消息,即在应用端写消息表,然后通过 MQ 消费消息进行数据集成处理。但这导致应用代码耦合大量双写逻辑,给应用开发带来很多复杂度。

针对双写问题,业界一种更优雅、先进的设计是基于日志的集成架构,在 OLTP 场景下,可以通过解析数据库日志类似 CDC,这种方式的好处是数据集成工作从应用代码中进行了剥离。

关于双写以及基于日志集成架构可参考 Using logs to build a solid data infrastructure (or: why dual writes are a bad idea

双写以及基于日志集成架构参考: https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad-idea/

银行选型和排坑实战:用开源软件自建分布式数据服务平台

数据集成

这类 CDC 的开源软件,java 类的有 shyiko、canal、debezium 等。这类项目的实现原理是主要模拟从库从主库异步获取日志事件,然后经过 ETL 发送到 MQ 或者其它下游系统。

如何模拟一个从库可参照 MySQL 复制协议: https://dev.mysql.com/doc/internals/en/replication-protocol.html

考虑功能完整性和社区活跃度,我们选择了基于 canal 构建数据集成中间件,具体工作原理这里就不介绍了,可参见 canal github。这里主要介绍下我们对其做的一些定制和优化。

canal github: https://github.com/alibaba/canal

1、在线配置

canal 的配置非常繁杂,很容易配错,所以最先开始做的就是提供了一个更简单、易用的在线配置定义功能,用户是只需进行一些核心关键的配置,例如数据库的 IP、用户、密码、订阅表、MQ 地址,其它不常修改的配置通过模板形式提供,大大降低了配置复杂度和工作量,当然如果需要也完全支持自定义。

2、性能优化

数据库 binlog 是有序的,但如果写 MQ 或者目标库,仍完全保证该顺序,那么则无法进行并发,这样同步的 TPS 是肯定上不去的,因此如何在保证一定顺序的前提下最大程度提高并发性能是一个需要结合业务场景解决的问题。

我们当时用的是 canal1.1.3 的版本,经过我们性能测试,数据写入 kafka 的 TPS 也就 5000+,这对于结息等大量数据变更的场景是不能满足要求的。

另外 canal 写入 MQ 的并发维度是表的主键,但我们项目中表的主键都是自增列(这个是我们项目中数据库开发规范,主要目的是保证 MySQL 写性能),如果根据此列进行并发控制,那么则无法保证 MQ 写入时的业务顺序性。例如支付流水表,如根据主键(自增列)则无法保证同一账户流水的顺序性。

银行选型和排坑实战:用开源软件自建分布式数据服务平台

canal 自身写入机制

针对此问题,我们对 canal 进行了改造,将原来只支持根据主键进行并发控制,修改为支持应用指定,例如我们项目采用业务唯一键;原来流程是顺序从 canal server 端读取 binlog-> 写 MQ-> 再确认 -> 再读取下一 binlog 事件;调整后改为并发读取 binlog,一旦在执行事件集中没有当前业务唯一键,就可直接写入 MQ,后台开启一个线程,按照 batchID 依次进行 ack,通过并行拉取 binlog 事件、分阶段无阻塞处理,单库数据同步 kafak 的 TPS 可以达到 1.2W+,已可以满足结息等场景。

银行选型和排坑实战:用开源软件自建分布式数据服务平台

canal 写入改造后机制

3、Serverless 化 Serverless 化

如果通过安装包部署,在用户配置完数据集成相关参数后,需要手工将 canal server 以及 adapter 包上传至服务器上。考虑到高可用,还得在备机上进行部署,在分库分表下,数据库拆分成多个,需要部署多个实列到多个服务器上(canal 支持同个实例部署在统一 server 节点,但性能会受影响)。因此我们将 canal server 与 adapter 进行了容器化改造,然后部署到了统一的 k8s 集群中,这样用户在配置完后,点击“启动实列”按钮,即可在 k8s 环境中自动部署高可用的 canal 集群,从而实现了数据集成功能的 serverless 化。

通过数据集成中间件,可以在应用无侵入下解决分库分表后一个很典型的问题:多维度拆分与多库查询。例如将分库分表的数据再集中到一个汇总库,然后一些复杂的查询统计就可以放在汇总库上;还有一些多维度拆分场景,类似电商里的卖家库、商家库,需要创建“二级索引”,也可以通过数据中间件自动创建;另外也可以方便实现诸如小表广播等需要保证数据一致性的功能。

四、管控平台

前面提到的各种中间件,涉及到大量配置定义、实例管理、监控等功能,这些功能分散在各组件内部,缺少一个统一的视图,而且应用开发人员需要重复定义。因此我们设计开发了数据服务管控平台,将分库分表配置定义、在线扩容、运维、监控等功能统一集成,最大程度降低开发以及运维人员对数据拆分带来的复杂度。同时提供开放 API,可以对接目前公司已有数据库以及云管理系统。

配置信息统一存放在配置中心,各中间件直接从配置中心拉取配置,在管控平台修改配置后,也可以实时通知各应用进行动态加载。管控平台相当于一个数据服务云管理平台,提供多租户,各应用无需自行部署,直接接入使用即可。在技术架构方面管控平台是个前后端分离架构,前端基于 vue.js,后端按照功能模块拆分成微服务,都部署在 k8s 集群中。

银行选型和排坑实战:用开源软件自建分布式数据服务平台

管控平台

五、 感受

上面介绍了我们在数据服务平台建设中各技术组件的设计原理和实践,限于篇幅,更多实现细节就不展开介绍。

在企业软件这块,有两个不同的思路,一个是购买商业产品,一种是基于开源软件自行构建。在金融领域,早些年以前者为主,近些年后者则变成了趋势。开源软件有点是开放,因为有源码所以有自主掌控的可能与条件,缺点是不像商业产品功能完整,往往需要自行定制、优化、扩展。作为软件开发人员,我们更喜欢使用白盒而不是黑盒。

当然开源并不代表免费,有可能付出的成本比商业软件更高。一方面需要投入精力学习开源项目,只有熟悉源代码后才可能具备修改定制能力;另一方面要积极关注开源界技术的发展,与时俱进,要有开放的心态,吸取开源先进的设计的东西,大胆验证,谨慎使用。

作者介绍

温卫斌,就职于中国民生银行信息科技部,目前负责分布式技术平台设计与研发,主要关注分布式数据相关领域。

原文链接

https://mp.weixin.qq.com/s?__biz=MzI4NTA1MDEwNg==&mid=2650786356&idx=1&sn=273a84662b935c65aa213446294d7f5c&chksm=f3f97fa1c48ef6b71bda53769ee1e4cfb073860cd11a1f0a49fa6868025ef54d45e85996d5b7&scene=27#wechat_redirect

评论

发布