作者|NearFar X Lab 团队 洪守伟、陈超、周志银、左益、武超
整理|SelectDB 内容团队
导读:无锡拈花云科技服务有限公司(以下简称拈花云科)是由中国创意文旅集成商拈花湾文旅和北京滴普科技有限公司共同孵化组建的。拈花云科以数字化思维为导向,致力于成为文旅目的地数智化服务商。2022 年底,拈花云科 NearFar X Lab 团队在数据需求的驱动下,开始调研并引进 Apache Doris 作为新架构下的数据仓库选型方案。本文主要介绍了拈花云科数据中台架构从 1.0 到 2.0 的演变过程,以及 Apache Doris 在交付型项目和 SaaS 产品中的应用实践,希望本文分享的内容能对大家有所启发。
业务背景
拈花云科的服务对象主要是国内各个景区、景点,业务范围涵盖文旅行业的多个板块,如票务、交通、零售、住宿、餐饮、演绎、游乐、影院、KTV、租赁、服务、会务、康乐、康养、电商、客服、营销、分销、安防等。多业务线条下用户对于数据使用的时效性需求差异较大,需要我们能够提供实时、准实时、T+1 的业务支撑能力。同时根据大部分景区为国有化的特点,我们也需要具备能够 提供私有化交付部署及 SaaS 化数据中台产品解决方案的双重服务支撑能力。
数据中台 1.0 - Lambda
早期构建数据中台时,为了优先满足 B 端用户数据整合的需求,以稳定数据输出为出发点,因此我们基于行业中比较成熟的 Lambda 架构形成了数据中台 1.0 。

在数据中台 1.0 架构中分为三层,分别为 Batch Layer,Speed Layer 和 Serving Layer。其中,Batch Layer 用于批量处理全部的数据,Speed Layer 用于处理增量的数据,在 Serving Layer 中综合 Batch Layer 生成的 Batch Views 和 Speed Layer 生成的 Realtime Views,提供给用户查询最终的结果。

Batch Layer: 在我们早期的实施类项目中,单纯以离线 T+1 进行数据支持的项目占了绝大多数。但实施类项目在实现 Lambda 架构的过程中也会面临很多问题。比如数据采集环节,由于项目本身原因业务系统不能开放 DB 的 Binlog 供数据仓库采集,因此只能以 JDBC 的方式完成增量或全量的数据同步,而通过该方式同步的数据往往会由于系统人工补充数据、时间戳不规范等问题产生同步数据差异的情况发生,最终只能通过额外的数据对比逻辑进行校验,以保证其数据的一致性。
Speed Layer: 项目受成本约束较大,大面积基于流的实时计算对于不论是从硬件成本、部署成本还是实施成本、维护成本等角度均难以支撑。基于该原因,在实施类项目中只有部分业务会进行基于流的实时统计计算,同时满足流计算条件的业务上游系统也必须同时满足同步 Binlog 的使用需求。
Serving Layer: 大部分的预计算结果存储在 MySQL 中提供 Report 支持,部分实时场景通过 Merge Query 对外提供 Ad-Hoc 的查询支持。
随着时间的推移,大量的项目交付使用增多,架构的问题也逐渐开始显现:
开发和维护成本高:该架构需要维护两套代码,即批处理和实时处理的代码,这无疑增加了开发和维护的成本。
数据处理复杂度高:Lambda 架构需要处理多个层次的数据,包括原始数据、批处理数据和实时处理数据,需要对不同的数据进行清洗、转换和合并,数据处理的复杂度较高。实时计算支持有限:业务方对于数据时效性要求越来越高,但是该架构能力有限,无法支持更多、更高要求的的实时计算。
资源利用率低:离线资源较多,但我们仅在凌晨后的调度时间范围内使用,资源利用率不高。
受成本制约:该架构对于我们部分用户而言使用成本较高,难以起到降低成本提高效率的作用。
新架构的设计目标
基于以上架构问题,我们希望实现一套更加灵活的架构方案,同时希望新的架构可以满足日益增高的数据时效性要求。在新方案实现之前,我们必须先对当前的业务应用场景和项目类型进行分析。
我们业务应用场景分为以下四类,这四类场景的特点和需求分别是:
看板类:包括 Web/ 移动端数据看板和大屏可视化,用于展示景区重要场所的数据,如业务播报(实时在园人数监控、车船调度管理等)、应急管理监控(客流密度监控、景区消防预警、景区能耗监控等)。其组成特点一般为业务汇总指标和监控指标报警,对数据时效性要求较高。
报表类:数据报表以图表形式展示,主要服务于各业务部门的一线业务人员。会更多关注垂直业务的数据覆盖程度,会有钻取需求(也可能通过不同报表来体现不同数据粒度)。一般以景区的业务部门为单位构建报表栏目和分析主题,除财务结算类报表外,一般可接受 T+1 的报表时效。
分析类:自助分析基于较好的数据模型表(数据宽表)实现,对分析人员有一定的数据理解和操作需求,基于我们提供的 BI 分析平台,业务人员可基于此数据范围通过拖拽的方式组合出自己的数据结果,灵活度较高。该场景对数据时效要求不高,更多关注业务数据沉淀和与往期历史数据的对比分析侧重架构的 OLAP 能力。
服务类:一般对接三方系统,由数据中台提供数据计算结果。如画像标签等数据,通过数据接口控制权限提供对外数据服务与其它业务系统集成,需要新架构能够提供稳定的数据服务。
接着我们对项目类型的特点和需求也进行了分析,并确定新架构需要同时提供实施类项目和 SaaS 产品的数据中台支撑能力:

数据中台 2.0 - Apache Doris
结合以上需求,我们计划对原有架构进行升级,并对新架构的 OLAP 引擎进行选型。在对比了 ClickHouse 等 OLAP 引擎后(社区有非常多的对比文章参考,这里不过多赘述),最终选择了 Apache Doris 作为数据中台 2.0 的基座。同时,在数据的同步、集成及计算环节,我们也构建了多套方案来适配以 Apache Doris 为核心的计算链路,以应对不同类型的实施类项目及 SaaS 产品需求。

数据中台 2.0 的核心思路是将 Apache Doris 作为核心的数据仓库,并将其作为实时数据同步中心、核心数据计算中心。数据集成环节将专注于数据同步,而计算交由 Apache Doris 完成或由 Doris 辅助计算引擎完成。同时,我们将在提供多种数据同步至 Apache Doris 的方案以应对不同的项目需求。在这个架构下,我们支持实现实时、准实时、T+1 的计算场景支持,以满足不同业务场景的需求。
新架构数据流转:
1. 数据同步集成:架构 2.0 有多种数据同步方式,我们主要借助 Doris Unique Key 模型完成数据的同步更新。

2. 数仓分层计算:根据项目资源情况分 View/ 实体表单来构建后面的数据层级(DWD、DWS、ADS)。业务较轻或时效性很高时,通过 View 方式来实现逻辑层面的 DWD,通过这种方式为下游 Ad-hoc 提供宽表查询支持,Doris 的谓词下推及 View 优化的能力为使用视图查询带来了便利。而当业务较重时,通过实体表单 + 微批任务进行实现,按照调度依赖关系逐层完成计算,针对使用场景对表单进行优化。
3. 数据计算时效:新架构下的数据时效受具体数据计算链路中的三个方面限制,分别是数据采集时效、批次计算时效、数据查询耗时。在不考网络吞吐、消息积压、资源抢占的情况下:
(实施类项目经常会遇到第三方不提供 Binlog 的情况,所以这里把通过批次采集数据也作为一个 case 列出来)

在 Doris 中为了达到更好的计算时效,基于 Doris 的数据计算流程相比在 Hive 中的计算流程可以进行一定的简化,这样可避免过多的冗余计算设计,以此提高计算产出效率。
4. 补充架构能力:
Hadoop:根据不同的项目资源及数据情况来决定是否引入 Hadoop 补充大规模离线计算场景。以实施类项目为例,Doris 可以涵盖大部分核心业务数据计算场景。
MySQL:基于预计算的结果数据可以推送到下游 MySQL 中以供 Report 查询,从而分散 Doris 计算查询的资源消耗,这样可以将资源充分留给核心且时效性要求高的应用或高频批次任务。如果计算资源充足,Doris 也可以直接作为应用层的加速查询 DB,而无须引入其它 DB。
新架构收益
通过引入 Apache Doris,我们成功构建了高时效、低成本的数据中台 2.0,并成功满足了交付型项目和 SaaS 产品两种需求场景下的使用需求。新架构的收益如下:
数据时效性提升:架构 1.0 中大部分业务为 T+1 的支持方式,而在新架构下大部分业务都可实现实时或小时级计算支持。
资源利用率提高:在架构 1.0 中,离线资源在白天大部分时间处于闲置状态。而在新架构下,数据同步、计算(增量 / 全量)和查询均在同一集群下完成,从而提高了资源利用率。相较于部署一套 CDH,同等资源成本下,部署一套 Doris 可以带来更多的收益。
运维管理成本降低:在原有架构下,实时统计需求需要维护非常长的计算链路。而在新架构下,所有计算仅需在一个数据库中完成,更加简单、高效且易于维护。
易于业务扩展:Doris 的节点扩展操作非常便捷,这对于业务的增量支持非常友好。
新架构的落地实践
我们在 2022 年底首次在测试环境中部署了 Apache Doris 1.1.5 版本,并进行了一些业务数据的导入测试和新架构的可行性验证。在测试后,我们决定在生产环境中落地实践 Apache Doris。第一次生产环境部署时,我们使用了当时最新的 1.2.2 版本。目前,新项目已升级到 1.2.4 版本并使用。Apache Doris 作为新架构下的核心系统,在整个架构中发挥着重要的作用。下面我们将从模型选择、资源规划、表结构同步、计算场景实现、运维保障等几个角度分享我们基于 Doris 的项目落地经验,希望为正在准备落地 Doris 方案的读者带来一些参考。
模型选择
数据模型我们主要应用了 Doris 提供的 Unique 模型和 Aggregate 模型。
Unique 模型
对于 ODS 层的表单来说,我们需要 Doris Table 与源系统数据保持实时同步。为了保证数据同步的一致性,我们采用了 Unique 模型,该模型会根据主键来对数据进行合并。在 1.2.0 版本之前,Unique 模型是 Aggregate 模型的一种特例,使用了 Merge On Read 的实现方式,这种实现方式下 count(*)的查询效率较低。而在 1.2.0 版本推出之后,采用了新的 Merge On Write 的数据更新方式,在 Unique Key 写入过程中,Doris 会对新写入的数据和存量数据进行 Merge 操作,从而大幅优化查询性能。在 Merge 过程中,Doris 会查找 Unique Key 索引,并使用 Page Cache 来优化索引查找效率。因此在使用 1.2 版本中,建议打开 Doris BE 的 Page Cache(在 be.conf文件中增加配置项 disable_storage_page_cache = false)。另外在很多情况下,Unique 模型支持多种谓词的下推,这样表单也可以支持从源表直接建立视图的查询方式。
Aggregate 模型
在某些场景下(如维度列和指标列固定的报表查询),用户只关心最终按维度聚合后的结果,而不需要明细数据的信息。针对这种情况,我们建议使用 Aggregate 模型来创建表,该模型以维度列作为 Aggregate Key 建表。在导入数据时,Key 列相同的行会聚合成一行(目前 Doris 支持 SUM、REPLACE、MIN、MAX 四种聚合方式)。
Doris 会在三个阶段对数据进行聚合:
数据导入的 ETL 阶段,在每一批次导入的数据内部进行聚合;
底层 BE 进行数据 Compaction 的阶段;
数据查询阶段。
聚合完成之后,Doris 最终只会存储聚合后的数据,这种明细表单数据的预聚合处理大大减少了需要存储和管理的数据量。当新的明细数据导入时,它们会和表单中存储的聚合后的数据再进行聚合,以提供实时更新的聚合结果供用户查询。
资源管理
在生产环境中,我们使用一套 Doris 数据仓库支撑了多个下游数据应用系统的使用。这些应用系统对数据访问的资源消耗能力不同,对应的业务重要等级也不相同。为了能够更好管理应用资源的使用,避免资源冲突,我们需要对应用账号进行划分和资源规划,以保证多用户在同一 Doris 集群内进行数据操作时减少相互干扰。而 Doris 的多租户和资源隔离功能,可以帮助我们更合理地分配集群资源。Doris 对于资源隔离控制有两种方式,一是集群内节点级别的资源组划分,二是针对单个查询的资源限制。这里主要介绍下集群内节点级别的资源组划分过程。
第一步:需要梳理规划各场景的用途、重要等级及资源需求等,举例说明:

第二步:对节点资源进行划分、给节点打上 tag 标签:
alter system modify backend "10.10.101.1:9050" set ("tag.location" = "group_a");alter system modify backend "10.10.101.2:9050" set ("tag.location" = "group_a");alter system modify backend "10.10.101.3:9050" set ("tag.location" = "group_b");alter system modify backend "10.10.101.4:9050" set ("tag.location" = "group_b");alter system modify backend "10.10.101.5:9050" set ("tag.location" = "group_c");alter system modify backend "10.10.101.6:9050" set ("tag.location" = "group_c");第三步:给应用下的表单指定资源组分布,将用户数据的不同副本分布在不同资源组内
create table flume_etl<table>(k1 int, k2 int)distributed by hash(k1) buckets 1properties( "replication_allocation"="tag.location.group_a:2, tag.location.group_b:1")create table cdc_etl<table>"replication_allocation"="tag.location.group_b:2, tag.location.group_c:1"create table etl





















