GMTC全球大前端技术大会(北京站)门票9折特惠截至本周五,点击立减¥480 了解详情
写点什么

基于 Pulsar 的事件驱动铁路网

2021 年 5 月 08 日

基于Pulsar的事件驱动铁路网

这张照片拍摄于瑞士的 Landwasser 高架桥。瑞士以其铁路网络闻名于世,根据维基百科,瑞士拥有世界上最密集的铁路网。本文带你一起模拟瑞士的铁路网络。



我们会用到 Apache PulsarNeutron。Apache Pulsar 是开源分布式 pub-sub 消息系统,最初由 Yahoo! 开发,目前属于 Apache 软件基金会。数据架构师、数据分析师、程序员等经常对比 Apache Pulsar 和 Apache Kafka,目前已有许多对比二者优劣势的文章。


Neutron 是基于 FS2 流媒体库的 Pulsar 客户端。作为一款成熟的产品, Neutron 已经用于 Chatroulette 的生产,但 Neutron 的开发并未停止。


拥有一套玩具铁路网一直是我童年时的梦想。现在,我可以自己动手搭建一套虚拟铁路网了。


接下来,我们将一起开发一个事件驱动的铁路网络模拟器。

思路

我们要搭建一套包含三个车站的铁路网:日内瓦、伯尔尼和苏黎世。其中日内瓦和苏黎世均与伯尔尼相连,但日内瓦与苏黎世不相连。



每个站点为一个节点,相连节点通过消息 broker——Apache Pulsar 通信。节点消费其相连节点发布的事件。consumer 过滤传入事件后消费与特定城市相关的事件。


有两种方式可以控制模拟器的行为,一是添加可用于人工干预的 HTTP 端点。用户通过发送 HTTP 请求向系统中添加新列车。


我们不持久保存任何数据,无需使用数据库或缓存系统,将所有数据保存在内存中。因此我们可以使用类似于 Ref 的高级并发机制。


Apache Pulsar 是系统的核心,负责节点间通信。一旦状态发生改变,系统应该发布描述这一动作的新事件。也就是说,每个事件都应该有一个时间戳。此外,每个事件应有一个列车 ID,代表特定列车的标识号码。初始时,有两个事件:


  • 出发(Departed)事件——列车出发时发布出发事件。

  • 到达(Arrived)事件——列车到达时发布到达事件。


这两个事件包含关于列车的基本信息:列车标识号码、出发城市、目的地城市、预计到达时间和事件时间戳。


每个城市都消费来自相连城市的事件。例如,苏黎世消费来自伯尔尼的事件,但不关注来自日内瓦的事件。苏黎世的事件 consumer 应确保能够捕获到由伯尔尼出发并且苏黎世为目的地的事件。每个城市对应一个 topic,3 个城市就对应 3 个 topic。需要优化时,可以把通用的 "城市 topic "分成几个更具体的 topic。


业务逻辑通过 Neutron 连接到 Apache Pulsar。


每个被消费的 topic 都会转换为 fs2 流,如果你不了解如何处理 fs2 流,可以参考 fs2 指南,本文代码不会涉及到这部分内容。


我基于 cats 库的 Tagless Final 技术编写了这一应用程序,并以 ZIO 作为运行时 effect

Pulsar 简介

Apache Pulsar 是分布式消息和流平台,可用于搭建高扩展性系统。系统内部通过消息进行通信,topic 数量可达数百万个。从开发者的角度来讲,Apache Pulsar 可以看作是一个黑匣子,但我建议多了解它的底层工作原理。为了更好地理解本文中的操作,我先介绍几个概念:


  • topic——信息传输的媒介。topic 分为两种:

  • 持久化 topic——持久存储消息数据。

  • 非持久化 topic——不持久存储消息数据,将消息保存在内存中。如果 Pulsar broker 宕机,所有传输中的消息都会丢失。

  • producer——与 topic 相连,用于发布消息。

  • consumer——通过订阅与 topic 相连,用于接收消息。

  • 订阅——制定向 consumer 发布消息的配置规则。Pulsar 支持四种订阅类型:

  • 独占——单一 consumer,如有多个 consumer 同时订阅则会引发错误;

  • 故障转移——多个 consumer,但只有一个 consumer 能收到消息;

  • 共享——多个 consumer,以轮询方式接收消息;

  • Key_Shared——多个 consumer,按 key 分发消息(一个 consumer 对应一个 key)。


消息系统发布事件后,由 producer 处理这些事件并发布到 topic 上,另一个系统里的 consumer 通过订阅连接到这个 topic。


点击这里了解更多关于 Apache Pulsar 的信息。

业务逻辑

上文提到铁路网中会发生的两个事件——列车的出发与到达。定义这两个事件的代码如下:


case class Departed(id: EventId, trainId: TrainId, from: From, to: To, expected: Expected, created: Timestamp) extends Eventcase class Arrived(id: EventId, trainId: TrainId, from: From, to: To, expected: Expected, created: Timestamp)  extends Event
复制代码


事件需包含系统中已发生动作的基本信息:唯一的事件 id、列车 id、出发城市、目的地城市、预计到达时间和实际事件时间戳。我们以后还可以添加站台号等信息。


为确保本文简单易懂,我们对本系统工作所需的数据量加以限制。为了便于区分事件中的字段(比如目的地和出发城市),所有字段都使用强类型。


由于没有可以自动检测火车到达或出发的系统,我们需要手动控制铁路网。假设有一名火车调度员在通过按钮和仪表盘来控制铁路网络,我们虽然没有炫酷的 UI,但可以搭建 API,API 的核心是两个简单的命令,用于触发车站的业务逻辑:


case class Arrival(trainId: TrainId, time: Actual)case class Departure(id: TrainId, to: To, time: Expected, actual: Actual)
复制代码

列车出发

让我们从创建火车出发开始吧!这个命令比较简单,可以通过 cURL 发送:


curl --request POST \  --url http://localhost:8081/departure \  --header 'content-type: application/json' \  --data '{  "id": "153",  "to": "Zurich",  "time": "2020-12-03T10:15:30.00Z",  "actual": "2020-12-03T10:15:30.00Z"}'
复制代码


上述命令假设伯尔尼服务节点在 8081 端口运行,每个节点都运行 HTTP 服务器,也都能够处理这一请求。我们使用 Http4s 库作为 HTTP 服务器,第一个线路定义如下:


case req @ POST -> Root / "departure" =>  req    .asJsonDecode[Departure]    .flatMap(departures.register)    .flatMap(_.fold(handleDepartureErrors, _ => Ok()))
复制代码


调用 Departures 服务仅需注册(register)一列出发的火车:


trait Departures[F[_]] {  def register(departure: Departure): F[Either[DepartureError, Departed]]}
复制代码


Scala 支持多种验证数据的方式,我选择最直接的一种——返回带有自定义错误的 Either。如果火车注册成功,则返回 Departed 事件;否则,返回错误。


为确保本文简单易懂,我们会在 Departures 服务执行过程中调用消息 producer。首先需执行 Departures 服务,即在 Departures 伴生对象中创建 make 函数 :


object Departures {  def make[F[_]: Monad: UUIDGen: Logger](      city: City,      connectedTo: List[City],      producer: Producer[F, Event]  ): Departures[F] = new Departures[F] {    def register(departure: Departure): F[Either[DepartureError, Departed]] = ???  }}
复制代码


为实现 Departures 接口,我们要给 effect F 设置边界:需有 UUIDGenLogger 实例。我已经在程序中创建了虚拟的 UUIDGenLogger 接口。


F 还应有 Monad 实例,用于连接函数调用。


首先执行验证逻辑,检查出发事件是否有效。我们只需检查目的地城市是否在相连城市列表中:


def validated(departure: Departure)(f: F[Departed]): F[Either[DepartureError, Departed]] = {  val destination = departure.to.city
connectedTo.find(_ === destination) match { case None => val e: DepartureError = DepartureError.UnexpectedDestination(destination) F.error(s"Tried to departure to an unexpected destination: $departure") .as(e.asLeft) case _ => f.map(_.asRight) }}
复制代码


如果目的地城市不在列表中,则生成错误信息日志并返回错误。否则创建 Departed 事件并将其作为结果返回。


接下来需要实现注册功能,示例代码如下:


def register(departure: Departure): F[Either[DepartureError, Departed]] =  validated(departure) {    F.newEventId      .map {        Departed(          _,          departure.id,          From(city),          departure.to,          departure.time,          departure.actual.toTimestamp        )      }      .flatTap(producer.send_)  }
复制代码


先验证目的地城市,若有效,生成一个 newEventId,用于创建新的 Departed 事件,该事件将通过传递给 make 函数的 producer 发布到 Pulsar 的城市 topic。点击这里查看 Departures 事件的最终版本。

预计出发列车

我们已经了解了如何生成列车。如果一列火车从苏黎世开往伯尔尼,那么伯尔尼会收到相应通知。


伯尔尼收听来自苏黎世的事件,一旦有把伯尔尼设为目的地的 Departed 事件,就将其加入预期列车表中。现在我们只关注业务逻辑,后文会再讨论消息消费。为预期出发事件定义 DepartureTracker,示例代码如下:


trait DepartureTracker[F[_]] {  def save(e: Departed): F[Unit]}
复制代码


该服务会成为 Departed 事件流中的 sink,所以我们不关注返回类型,也不希望出现任何验证错误。和上文 Departures 服务一样,先创建伴生对象,定义 make 函数:


def make[F[_]: Applicative: Logger](    city: City,    expectedTrains: ExpectedTrains[F]  ): DepartureTracker[F] = new DepartureTracker[F] {    def save(e: Departed): F[Unit] =      val updateExpectedTrains =        expectedTrains.update(e.trainId, ExpectedTrain(e.from, e.expected)) *>          F.info(s"$city is expecting ${e.trainId} from ${e.from} at ${e.expected}")

updateExpectedTrains.whenA(e.to.city === city) }
复制代码


我们依赖于 ExpectedTrains 服务。ExpectedTrain 是存储进站列车的服务,我们很快就能实现该服务。我们实现了 save 函数,只有进站列车的目的地城市与预期城市相符时,该函数才会执行。例如,日内瓦和苏黎世均消费来自伯尔尼的事件。伯尔尼发出 Departed 事件时,其中一个城市会忽略此消息,而另一个城市,即目的地城市,会更新预期列车表,创建日志消息。


预期列车存储中至少包含以下函数:


trait ExpectedTrains[F[_]] {  def get(id: TrainId): F[Option[ExpectedTrain]]  def remove(id: TrainId): F[Unit]  def update(id: TrainId, expectedTrain: ExpectedTrain): F[Unit]}
复制代码


即使我们尝试删除不存在于系统中的列车,也不会操作失败。在某些业务情况下可能会出现系统故障的错误,但在这种特殊情况下,我们会忽略这一错误。整个测试过程中,数据一直存储在内存中,不持久保存。


def make[F[_]: Functor](    ref: Ref[F, Map[TrainId, ExpectedTrain]]  ): ExpectedTrains[F] = new ExpectedTrains[F] {    def get(id: TrainId): F[Option[ExpectedTrain]] =       ref.get.map(_.get(id))    def remove(id: TrainId): F[Unit] =       ref.update(_.removed(id))    def update(id: TrainId, train: ExpectedTrain): F[Unit] =       ref.update(_.updated(id, train))  }
复制代码


我们在这一应用程序中使用 Ref 作为高级并发机制。

列车到达

业务逻辑三部曲的最后一部分是列车到达。与列车出发类似,先创建一个 HTTP 端点,可以用简单的 cURL POST 请求来调用:


curl --request POST \  --url http://localhost:8081/arrival \  --header 'Content-Type: application/json' \  --data '{  "trainId": "123",  "time": "2020-12-03T10:15:30.00Z"}'
复制代码


再由 Http4s 路线处理请求:


case req @ POST -> Root / "arrival" =>  req    .asJsonDecode[Arrival]    .flatMap(arrivals.register)    .flatMap(_.fold(handleArrivalErrors, _ => Ok()))
复制代码


Arrivals 服务类似于上文介绍的 Departures 服务。Arrivals 服务中也只有一个方法,即 register 方法:


trait Arrivals[F[_]] {  def register(arrival: Arrival): F[Either[ArrivalError, Arrived]]}
复制代码


然后需要验证请求,示例代码如下:


def validated(arrival: Arrival)(f: ExpectedTrain => F[Arrived]): F[Either[ArrivalError, Arrived]] =  expectedTrains    .get(arrival.trainId)    .flatMap {      case None =>        val e: ArrivalError = ArrivalError.UnexpectedTrain(arrival.trainId)        F.error(s"Tried to create arrival of an unexpected train: $arrival")         .as(e.asLeft)      case Some(train) =>        f(train).map(_.asRight)    }
复制代码


检查到达的列车是否与预期相符,若相符,则创建 Arrived 事件;否则,生成错误日志。列车到达事件中 register 方法的实现中与之前 register 方法的实现类似:


def register(arrival: Arrival): F[Either[ArrivalError, Arrived]] =  validated(arrival) { train =>    F.newEventId      .map {        Arrived(          _,          arrival.trainId,          train.from,          To(city),          train.time,          arrival.time.toTimestamp        )      }      .flatTap(a => expectedTrains.remove(a.trainId))      .flatTap(producer.send_)  }
复制代码


Departures 相比,到达事件不仅发布了新事件,还把到达列车从预计出发列车列表中删除。


以上为全部业务逻辑,代码已经通过单元测试(使用 ZIO Test 实现),可参考 GitHub 文件

消息消费

这一节主要讲消息消费,也会把所有逻辑服务连在一起。

创建资源

首先创建所需资源。一个城市节点包含四个组件:配置、事件 producer、事件 consumer,以及存储 ExpectedTrainsRef。我们可以把这四种资源在一个 case class 中组合起来,在 Main 类外创建:


final case class Resources[F[_], E](  config: Config,  producer: Producer[F, E],  consumers: List[Consumer[F, E]],  trainRef: Ref[F, Map[TrainId, ExpectedTrain]])
复制代码


我们使用 ciris 库从环境变量中读取 Config。关于配置,可以参考 GitHub 文件。我们使用 Chatroulette 开发的 Neutron 库来创建 producer 和 consumer。


首先,创建一个 Pulsar 对象实例,用于与 Apache Pulsar 集群建立连接:


Pulsar.create[F](config.pulsar.serviceUrl)
复制代码


以上操作仅需 serviceUrl,我们会得到 Resource[F, PulsarClient],可以用来创建 producer 和 consumer。创建 producer 之前,应该先创建包含 topic 配置的 topic 对象:


def topic(config: PulsarConfig, city: City) =  Topic(    Topic.Name(city.value.toLowerCase),    config  ).withType(Topic.Type.Persistent)
复制代码


Topic 名就是城市名,而且是持久化 topic,这样,任何未确认的消息都不会丢失。另外,作为配置的一部分,我们传递了命名空间租户。关于命名空间和租户的更多信息,可以查阅 Pulsar 文档


创建 producer 操作只是简单的一行:


def producer(client: Pulsar.T, config: Config): Resource[F, Producer[F, E]] =  Producer.create[F, E](client, topic(config.pulsar, config.city))
复制代码


创建 producer 的方法有很多,我们选择最简单的一种,只需使用之前创建的 Pulsar 客户端和一个 topic。


创建 consumer 所需操作稍多,因为还要创建订阅


def consumer(client: PulsarClient, config: Config, city: City): Resource[F, Consumer[F, E]] = {  val name         = s"${city.value}-${config.city.value}"  val subscription =          Subscription            .Builder            .withName(Subscription.Name(name))            .withType(Subscription.Type.Failover)            .build
Consumer.create[F, E](client, topic(config.pulsar, city), subscription)}
复制代码


创建订阅,设置订阅名称为相连的城市名称与火车经停城市名组合。默认使用 Failover 订阅类型,并行运行 2 个实例(以防其中一个实例宕机)。


加上所需 Ref,我们终于可以创建 Resources 了:


for {  config    <- Resource.liftF(Config.load[F])  client    <- Pulsar.create[F](config.pulsar.url)  producer  <- producer(client, config)  consumers <- config.connectedTo.traverse(consumer(client, config, _))  trainRef  <- Resource.liftF(Ref.of[F, Map[TrainId, ExpectedTrain]](Map.empty))} yield Resources(config, producer, consumers, trainRef)
复制代码


请注意,我们使用 traverse 方法在 connectedTo 城市列表中创建了一份 consumer 列表,点击 GitHub 文件查看最终结果。

启动引擎

我们在应用程序中使用 zio.Task 作为 effect 类型。zio.Task 包含的类型参数最少,对于不熟悉 ZIO 的人来说,zio.Task 更易理解。如果你想了解更多类型参数,可以参考ZIO简介


首先,创建之前定义过的 Resources 类:


Resources  .make[Task, Event]  .use {    case Resources(config, producer, consumers, trainRef) => ???  }
复制代码


依然是 4 个参数。先初始化服务,为 HTTP 服务器创建路线


val expectedTrains   = ExpectedTrains.make[Task](trainRef)val arrivals         = Arrivals.make[Task](config.city, producer, expectedTrains)val departures       = Departures.make[Task](config.city, config.connectedTo, producer)val departureTracker = DepartureTracker.make[Task](config.city, expectedTrains)
val routes = new StationRoutes[F](arrivals, departures).routes.orNotFound
复制代码


创建 HTTP 服务器:


val httpServer = Task.concurrentEffectWith { implicit CE =>  BlazeServerBuilder[Task](ec)    .bindHttp(config.httpPort.value, "0.0.0.0")    .withHttpApp(routes)    .serve    .compile    .drain}
复制代码


如果你很了解 Http4s,那么以上操作应该不难理解。若不了解,点击这里查看相关文档。开始消费传入消息,并创建一个流:


val departureListener =  Stream    .emits(consumers)    .map(_.autoSubscribe)    .parJoinUnbounded    .collect { case e: Departed => e }    .evalMap(departureTracker.save)    .compile    .drain
复制代码


简而言之,我们使用 FS2 库创建了事件流。首先,创建 consumer 流,对每个 consumer 调用 autoSubscribe 方法,用于订阅 topic,再通过 parJoinUnbounded 把所有流合在一起,然后,用 collect 方法删除 Departed 以外的所有消息。最后,在之前实现的 departureTracker 上调用 save 方法,编译并排出流。


现在有两个最终流:HTTP 服务器和 Pulsar 的传入消息。此时我们已经处理完了所有消息,只需运行流,即并行压缩并丢弃结果:


departureListener  .zipPar(httpServer)  .unit
复制代码


组成 Main 类的代码块都比较简单,读取和维护也相对容易。

结语

本文给出了事件驱动系统示例,按步骤梳理了业务逻辑,模拟了瑞士铁路网。你可以在本文示例代码的基础上进行修改和拓展。


本文中使用到了 Apache Pulsar 的部分功能,但 Pulsar 不止于此,它操作简易,功能强大。我们搭建了一个简单的分布式系统,由几个节点组成,这些节点在 Apache Pulsar 上使用消息传递进行通信。本应用程序使用基于 cats 库的 Tagless Final 技术编写,其中 ZIO Task 为主要的 effect 类型。


此外,我们还尝试了 Neutron,虽然 Neutron 已用于 Chatroulette 生产,但仍在开发中。


点击这里查看本程序的最终版本,操作指南可见 readme 部分。


原文链接

https://scala.monster/train-station/

2021 年 5 月 08 日 17:101709

评论

发布
暂无评论
发现更多内容

医疗界“最强大脑”落户杭州!阿里巴巴联合浙大一院共同打造

阿里云情报局

互联网

揭秘在召唤师峡谷中移动路径选择逻辑?

华为云开发者社区

算法 地图 最短路径

DB-Engines 11月数据库排名:PostgreSQL坐稳同期涨幅榜冠军宝座

华章IT

数据库 postgresql

移动安全加固助力 App 实现全面、有效的安全防护

蚂蚁集团移动开发平台 mPaaS

安全攻防 App风险 mPaaS

终于啃完了这份Java核心原理+框架“面试圣经”,成功五面上岸美团

Java架构追梦

Java 架构 面试 微服务 框架开发

重磅解读:K8s Cluster Autoscaler模块及对应华为云插件Deep Dive

华为云开发者社区

容器 k8s 服务

接口测试如何在post请求中传递文件

测试人生路

接口测试

解决大中型浏览器(Chrome)插件开发痛点:自定义热更新方案——2.基于双缓存更新功能模块

梁龙先森

Java chrome 浏览器 技术方案 前端进阶

会展云技术解读 | 面对突发事故,APP如何做好崩溃分析与性能监控?

京东科技开发者

云计算 云服务

每周一看:16份文档资料,程序员软硬实力全概览,总有一个适合你

小Q

Java 学习 程序员 架构 面试

第 7 周 性能优化(一)总结

钟杰

极客大学架构师训练营

技术分享:WebAssembly能否重新定义前端开发模式?

Geek_Willie

webassembly

如何实现后台管理系统的权限路由和权限菜单

徐小夕

Java 编辑器 H5 数据可视化 前端进阶

【涂鸦物联网足迹】API及SDK介绍

IoT云工坊

软件开发 物联网 API sdk 云平台

架构师训练营第 1 期第 7 周作业

du tiezheng

极客大学架构师训练营

“开源软件供应链点亮计划-暑期2020”公布结果 基于ChubaoFS开发的项目获得最佳质量奖

京东科技开发者

大数据 云原生 开源项目

分库分表的 9种分布式主键ID 生成方案,挺全乎的

程序员内点事

分库分表 Java 分布式

嗯,查询滑动窗口最大值的这4种方法不错...

王磊

Java 数据结构和算法

【云小课】版本管理发展史之Git+——代码托管

华为云开发者社区

git 代码管理 托管

2020LF AI&DATA DAY(AI开源日):中国开源社区迈入全球化新征程

Geek_459987

数据结构与算法系列之链表操作全集(三)(GO)

书旅

go 数据结构

架构师训练营第八周总结

邓昀垚

天啦撸!打印日志竟然只晓得 Log4j?

沉默王二

Java 日志 log4j

简析低代码开发与传统开发的区别与优势

Marilyn

敏捷开发 低代码

mongodb 源码实现系列 - 网络传输层模块实现三

杨亚洲(专注mongodb及高性能中间件)

MySQL mongodb 分布式 高性能 分布式数据库mongodb

阿里云官方推出操作系统“等保合规”镜像 -- Alibaba Cloud Linux 等保2.0三级版

阿里云基础软件团队

内核

架构师训练营第八周作业

邓昀垚

极客大学架构师训练营

数据结构与算法系列之栈&队列(GO)

书旅

go 数据结构与算法

区块链usdt支付系统开发方案,承兑系统搭建

WX13823153201

区块链usdt支付系统开发

go-zero如何追踪你的请求链路

Kevin Wan

go Trace microservice

tomcat打包成rpm包

lee

tomcat rpm

基于Pulsar的事件驱动铁路网-InfoQ