Yelp 的实时流技术之三:不止是模式存储服务的 Schematizer

阅读数:1603 2016 年 9 月 22 日 17:44

本文翻译自 More Than Just a Schema Store ,已获得原网站授权。

这是关于 Yelp 的实时流数据基础设施系列文章的第三篇。这个系列会深度讲解我们如何用“确保只有一次”的方式把 MySQL 数据库中的改动实时地以流的方式传输出去,我们如何自动跟踪表模式变化,如何处理和转换流,以及最终如何把这些数据存储到 Redshift 或 Salesforce 之类的数据仓库中去。

阅读本系列的第一篇:

中文:一天几十亿条消息:Yelp 的实时数据管道

英文: Billions of Messages a Day - Yelp's Real-time Data Pipeline

阅读本系列的第二篇:

中文: Yelp 的实时流技术之二:将 MySQL 表数据变更实时流到 Kafka 中

英文: Streaming MySQL tables in real-time to Kafka

当你的系统每天要实时从MySQL 到Kafka 发布几十亿条消息时,你会怎么管理这些数据的模式信息呢?当你的系统要接入几百个服务时,你就要处理几千种不同的模式,手工管理是不可行的。必须有自动化的方案来处理从上游数据源到所有下游消费者的模式改变问题。Confluent 公司的 Schema Registry Kafka Connect 都是不错的选择,可惜当我们开始构建 Yelp 数据管道时它们还没发布。因此就有了我们的 Schematizer。

Schematizer 是什么?

Yelp 数据管道的一个重要设计就是将所有数据都模式化,也就是说,所有流经数据管道的数据都必须遵守某种预先定义好的模式,而不是格式随意的。为什么非要强调这一点呢?因为我们想让所有的数据消费者都可以对他们要获取的数据格式有预期,因此可以在上游数据生产者决定改变他们发布的数据模式时,不会对下游造成非常大的影响。统一的模式表现也让 Yelp 数据管道可以轻松地整合各种使用不同数据格式的系统。

Schematizer 是用于跟踪和管理所有数据管道中用到的模式,并且提供自动化文档支持等功能的模式存储服务。我们使用 Apache Avro 来表达模式。Avro 有许多我们在数据管道中需要的功能,尤其是模式演进,它是解耦数据生产者和消费者的关键因素之一。每一条流经数据管道的消息都用 Avro 模式序列化过。为了减小消息体积,我们没有把全部模式信息都放在消息里,而只是带上了模式的 ID。数据消费者可以用 ID 来在运行时从 Schematizer 中获取模式信息并将消息反序列化。Schematizer 是所有预定义的模式信息的唯一可靠来源。

我们用不同方法管理模式。

Schematizer 用两种方法组织和管理模式:从数据生产者的角度和数据消费者的角度。

第一种方法根据数据的产生信息来将模式分组,每个组由名字空间和数据源来定义。生产者在向 Schematizer 注册模式时必须提供名字空间和数据源信息。比如一个准备向数据管道发布数据库数据的服务,它就可以把服务名作为名字空间,把表名作为数据源。

根据名字空间和数据源来将模式分组

第二种方法按数据的目的方信息来分组。比如 Redshift 集群或者 MySQL 数据库都是数据目的方,它们会对应一个或多个数据生产者,每个数据生产者又会关联一个或多个模式,这就对应着第一种方法中定义的名字空间和数据源。

(点击放大图像)

根据单个数据目的方来将模式分组

这两种方法让我们可以按不同的需要来检索和相关的模式。比如,一个程序可能想知道它会向哪些 Topic 发布数据,另一个服务又想知道它的 Redshift 集群中的数据都来自哪里。

我们这样注册模式。

数据管道要求所有发布到其中的数据都必须用预定义的 Avro 模式进行模式化和序列化。因此,当一个数据生产者准备向数据管道发布数据时,它要做的第一件事就是向 Schematizer 注册模式,最通用的办法就是直接注册一个 Avro 模式。

对于没有或者无法创建 Avro 模式的数据生产者,也可以向 Schematizer 中加入模式转换器来把非 Avro 模式转换成 Avro 模式。 MySQLStreamer 就是一个代表,它是一个把 MySQL 数据库中的数据发布到数据管道的服务,它只知道 MySQL 表模式。Schematizer 可以把 MySQL 表模式定义转换成相应的 Avro 模式。但如果数据生产者改变了模式定义的话,它必须重新注册。

上游模式改变会不会影响下游服务?

所有数据管道服务都不能回避的共同痛点就是该如何应对上游模式改变。通常这都需要许多在上游生产者和下游消费者之间的沟通和协调工作。Yelp 也不能免俗。我们也有批量任务和系统,它们要处理别的批量任务和系统产生的数据。每一次上游的模式改变都是非常痛苦的,它可能导致下游服务崩溃,整个处理过程都是非常耗费人力的。

我们通过模式兼容性来解决这个问题。在模式注册过程中,Schematizer 会根据模式兼容性来决定 Topic 和新模式之间的对应关系。只有兼容的模式才能延用旧的 Topic。如果有不兼容模式注册上来,Schematizer 会用相同的名字空间和数据源来为新模式注册一个新的 Topic。那 Schematizer 又怎么确定兼容性呢?答案就是 Avro 解释规则(Avro resolution rules)。Avro 解释规则保证在相同的 Topic 中,用新版模式打包的消息可以按旧版模式解包,反之亦然。

(点击放大图像)

不兼容的模式会分配不同的 Topic

目前 Yelp 数据管道中大部分数据都产生自 MySQLStreamer。比如我们想为某业务表增加一个字段,MySQLStreamer 就会向 Schematizer 注册新模式。因为按照 Avro 解释规则这样的改动是兼容的,所以 Schematizer 会创建新 Avro 模式,并把这个名字空间和数据源对应的旧的 Topic 分配给它。可如果是想把某字段从 int 改成 varchar,那这就是一个不兼容的改动了,Schematizer 会为新模式创建一个新 Topic。

保证了在 Topic 内部的模式兼容性,下游数据消费者就可以放心的用旧模式去处理这个 Topic 中的任何数据,不必担心数据模式变化会引起自身的崩溃等任何问题。他们也可以根据自己的需要在合适的时候连上新 Topic。这就让整个系统自动化程度更高,在模式改变时减少人工介入。

除了 Avro 解释规则,我们也在 Schematizer 中定义了一些自己的规则来支持一些数据管道功能。模式的主键字段被用于在数据管道中做日志压缩。因为对同一个Topic 来说做日志压缩的主键必须保持一致,所以任何对主键的改动都被认为是不兼容的,会导致Schematizer 为新模式创建一个新Topic。而且,当人工不可读(non-PII,Personally Identifiable Information)的模式开始包含人工可读字段时,这样的改动也被认为是不兼容的。人工不可读的数据和人工可读的数据必然分开存储,这样就简化了人工可读数据的安全实现,避免了下游消费者不小心读到一些他们本来没有权限读的数据。

(点击放大图像)

决定是否需要新 Topic 的逻辑流程

值得一提的是模式注册过程是幂等的。如果把相同的模式注册多次,那只有第一次会产生一个新模式,后面的都直接返回已注册的模式。这就让应用程序和服务可以非常容易地初始化它们的 Avro 模式。许多应用程序和服务都是把 Avro 模式定义在文件中或代码中的,但它们没办法写死模式 ID,因为模式 ID 是由 Schematizer 管控的。所以应用程序可以调用模式注册接口来直接注册模式,如果已经存在就把模式信息取回来了,如果不存在就直接注册,一举两得。

将模式改变事件处理全部流水线化。

为了让数据管道可以完全以流水线的方式处理模式改变事件,Schematizer 会根据当前模式和新模式的信息来为下游系统生成模式迁移计划。目前 Schematizer 只能为 Redshift 表生成模式迁移计划。对于把数据从数据管道中应用到 Redshift 集群的下游系统来说,在模式发生改变时它可以直接获取模式迁移计划并且执行,而且自动获取新的模式信息,不需要任何人工介入。这个功能是很容易扩展的,而且模式迁移计划生成器也是很容易替换的,所以将来我们会增加更多的模式迁移计划生成器来支持更多的模式类型,或者改用更好的算法来生成迁移计划。

Schematizer 知道所有数据生产者和消费者的信息。

除了管理注册的模式,Schematizer 还会跟进所有数据生产者和消费者的信息,包括哪个团队哪个服务负责生产或消费什么数据,发布数据的频率如何,等等。在需要人工介入时我们就可以用这些信息来有效地找到相应团队并与他们沟通协商。而且这些信息也可以帮助我们监控和找出那些过期了的模式和 Topic,从而可以将它们做废或删除。这样,就可以在新模式注册上来时简化兼容性验证工作。Schematizer 可以跳过那些废弃的模式,只检查新模式与 Topic 内剩下的有效的模式的兼容性就可以了。

所有数据生产者和消费者在启动时都必须提供这些信息。最初我们只想着把它们保存在 Schematizer 里就好了,但事实上这些信息对探索性的分析和预警都是非常有用的,最终我们决定把它们写到数据管道系统之外的单独的 Kafka Topic 中。这样数据就可以被 Redshift 和 Splunk 处理,也可以导入 Schematizer 和通过前端 Web 界面展示出来。我们用的是 Yelp 自行研发的通过 Clog 写入数据的异步、非阻塞式 Kafka 生产者,这样就不会影响生产者正常地发布数据。另外,这样也可以避免环形依赖,有时候正常的生产者要用相同的信息去注册多次。

该用哪个 Kafka Topic 呢?Schematizer 会处理好这些细节。

与一般意义上的 Kafka 生产者不同,数据管道的数据生产者不需要事先知道它们应该把数据发送到哪个 Kafka Topic 中。因为 Schematizer 规定了注册上来的模式和 Topic 之间的对应关系,所以数据生产者只要提供自己序列化数据所使用的模式信息,就可以从 Schematizer 那里得到正确的 Topic 信息并发布数据了。将 Topic 信息抽象出去可以让接口更简单易用。

对数据消费者也是类似的机制。尽管也可以给它们定下一些具体的 Topic 去消费,但更常见的用例是让 Schematizer 根据数据消费者感兴趣的组的信息来提供正确的 Topic。在本文前面章节介绍了各种不同的分组机制。数据消费都可以或者指定名字空间和数据源,或者指定数据目的方,Schematizer 就会找出那个组内的相应 Topic。这种机制对于数据消费者感兴趣的一组 Topic 可能由于模式的不兼容改变而变来变去的场景尤其有效。它让数据消费者不必再跟踪组内的每一个 Topic。

模式很好,文档更好!

模式把数据格式化了,但对于想了解数据确切意义的人来说提供的信息可能又不够。我们注意到使用数据的人通常不是生产数据的人,因此他们不知道去哪里找到有用的信息来让他们理解他们要用的数据。因为 Schematizer 负责管理数据管道中的所有模式,所以把数据的描述信息也保存在它这里就很合适。

知识挖掘器 Watson 隆重出场。

Schematizer 要求模式的注册方随着模式一起提供文档,然后 Schematizer 会提取文档信息并保存起来。为了让 Yelp 公司内的各个团队可以获得模式和数据文档,我们开发了 Watson,一个全公司员工都可以用来挖掘数据内容的 Webapp。Watson 实际上是 Schematizer 的一个可视化前端,它通过 Schematizer 的几个 RESTful API 来获取数据。

Watson 提供了关于数据管道状态的有价值信息:现有的名字空间、数据源及相关的 Avro 模式信息。最重要的是,Watson 为查看 Schematizer 管理的所有数据源和模式信息提供了简单的方法。

文档并不是天上掉下来的。

目前流经我们数据管道的数据主要都来自于数据库。我们用 SQLAlchemy 模型来为这些数据的数据源和模式整理文档。在 Yelp, SQLAlchemy 用来描述我们数据库中的所有模型。除了 docstring 之外,SQLAlchemy 还允许用户为模型的字段增加额外信息。因此,它自然成了我们保存文档的首选之处,记录各个数据模型和字段的目的和意义。

SQLAlchemy 还引入了一个属主字段来记录每个模型的维护者和专家。我们认为生成数据的人是提供文档的最佳人选。另外,这种方法也会鼓励大家时刻保持真实数据模型与描述的同步。

class BizModel(Base):
    __yelp_owner__ = Ownership(
        teams=[TEAM_OWNERS['biz_team'],
        members=[],
        contacts=[]
    )
    __table_name__ = 'my_biz_table'
    __doc__ = 'Business information.'
    id = Column(Integer, primary_key=True, doc=r"""ID of the business.""")
    name = Column(String(64), doc=r"""Name of the business.""")

一个简单的包含文档和属主信息的 SQLAlchemy 模型

可是开发者在做 SQLAlchemy 模型的时候并不总是会记得提供文档信息。为了防止这样的事情发生,我们开发了自动校验功能来强制要求所有模型都必须完整地提供了属性描述和文档,这是绝不会退让的硬性标准。每当有新模型要加入时,如果要求的文档信息不完备,或者没有属主信息,校验就会失败。这些自动校验功能帮助我们朝着 100% 文档覆盖率的目标迈进了一大步。

为 Watson 提取高质量文档。

当数据模型有了文档之后,我们就可以把它导入 Schematizer 并最终通过 Watson 展现出去。在深入具体提取流程之前,我们先介绍一下这个过程中的另一个重要模块:特定应用转换器(Application Specific Transformer),简写为 AST。与名字含义一样,AST 从一个或多个数据管道 Topic 中输入消息流,用转换逻辑处理消息模式和数据包,再把转换后的消息输出到另外的数据管道 Topic 中。提供具体转换处理的转换模块是可以串连起来的,因此可以组合多个模块来做非常细致的转换工作。

我们用 AST 中的许多个转换模块来依据 SQLAlchemy 模型生成更易理解的数据。因为模块是可以串连的,现在我们只是简单的创建一个从 SQLAlchemy 模型中提取文档和属主信息的转换模块,并把它加入到已有的转换链中。这样,所有模型的文档和属主信息就通过现有管道自动提取并导入 Schematizer 了。实现过程相当简单,并无缝接入管道,所以可以非常有效地生成高质量文档。

(点击放大图像)

AST 中的转换模块

如上所述,AST 中现在已经有了一些为用户生成更有意义的信息的转换模块。位标志转换模块会解释一个整型字段的不同数据位的具体含义。相似地,Enum 字段转换模块也会把 Enum 值转换成可读的文字表述。这些转换模块带来的另一个好处是它们同时也产生了自解释和自生成文档的模式,因此也产生了更好的文档。

合作、贡献与检索

开发者的文档并不是我们要讲述的最后一项内容。Watson 也提供了功能让终端用户可以一起努力,为使 Yelp 的数据更具可读性而贡献自己的力量。

第一个功能就是打标签。Watson 允许用户为任意数据源打标签分类。一个数据源可能是个 MySQL 数据库表,也可能是个数据模型。比如,一个业务数据源可以打上“Business Information”的标签,而一个用户信息数据源可以打上“User Information”标签。终端用户可以把相关的数据源都打上相同的标签,这样以对自己最有意义的方式把它们组织在一起。打标签可以让我们更深入的理解我们的各个数据源之间是如何彼此关联的。

(点击放大图像)

打上了“Business Info”标签的业务数据源

Watson 提供的另一个功能是添加注释。终端用户,尤其是非技术人员,可以通过这种方法来为一个数据源或字段提供他们自己的文档。比如业务分析师常常就会对使用数据有非常宝贵的见解,他们可以通过注释来分享各种疑难杂症、边界用例和时效性很强的信息。

终端用户对于 Watson 的最大的需求就是检索。我们在 Watson 中实现了简单的检索引擎,让用户可以检索数据的模式、Topic、数据模型描述等各方面信息。在检索后台我们没有用 Elasticsearch,而是选择了 Whoosh Python 包,因为它可以帮助我们快速完成开发。就我们目前的检索量来说 Whoosh 的性能足以应付。随着数据规模增大,我们将来会考虑换用其它更易扩展的引擎。

结论

Schematizer 是 Yelp 数据管道的一个重要组成部分。它的模式注册操作是数据管道的许多重要功能的基础,包括在上游数据生产者更改模式时减轻对下游消费者程序和服务的影响等。Schematizer 也管理了数据发布的 Topic 分配,让用户不必再关心具体使用哪个 Topic 等这样的细节。最后,它要求所有写入数据管道的数据都必须有文档,这促进了全公司内的知识分享。Watson 的加入更是使得 Yelp 公司内的所有员工都可以方便地得到最及时的信息。

我们已经细致了解了 Schematizer 和它的前端文档系统 Watson。接下来,我们将细致分析流处理器:Paastorm。请大家继续保持关注!

鸣谢

非常感谢 Josh Szepietowski,Will Cheng,谢谢 Bo Wu 实现了 Watson。尤其感谢 Application Specific Transformer 的作者 Josh,他为本文中 AST 和 Watson 相关章节提供了非常宝贵的素材。

收藏

评论

微博

用户头像
发表评论

注册/登录 InfoQ 发表评论