NVIDIA 初创加速计划,免费加速您的创业启动 了解详情
写点什么

使用 Strimzi 将 Kafka 和 Debezium 迁移到 Kubernetes

  • 2022-10-12
    北京
  • 本文字数:14536 字

    阅读完需:约 48 分钟

使用Strimzi将Kafka和Debezium迁移到Kubernetes

在本系列文章的第1部分第2部分中,我们学习了Apache Kafka、Apache Kafka Streams 和Quarkus之间的集成。我们开发了一个简单的应用程序,向 Kafka 主题生成事件,并使用Kafka Streams实时消费和处理它们。


在那个例子中,我们模拟了一家电影流媒体公司。我们将电影信息保存在一个 Kafka 主题中,并在另一个 Kafka 主题中保存用户停止观看电影时的事件,并捕获影片播放的时间。我们实时对这些事件进行后期处理,计算电影播放超过 10 分钟的次数。


下图是这个应用程序的架构。


然后,在第3部分中,我们介绍了发件箱模式和 Debezium,用于避免在不同系统需要同步相同数据时发生的双写问题。


在前面的三篇文章中,我们已经从开发人员的角度学习了所有这些技术,并最终在开发人员的本地机器上(以开发模式)部署应用程序。


在本文中,我们将探讨如何将所有东西部署到生产环境,更具体地说,部署到 Kubernetes 中。我们将学习:


  • 在 Kubernetes 中安装和管理 Apache Kafka 集群。

  • 容器化 Quarkus 应用程序。

  • 配置一个带有生产参数的 Quarkus 应用程序。

  • 将 Debezium Embedded 迁移成 Debezium Server。

Kubernetes


Kubernetes 是一个开源的容器编配器,是部署微服务的事实上的平台。这些服务既可以在裸金属环境中运行,也可以在云环境中运行。


本文使用minikube作为 Kubernetes 集群,但同样的步骤应该适用于任何其他实现。

启动集群

在终端窗口中执行以下命令,在配备了 8GB 内存和 2 个 vCPU 的 VirtualBox 机器上启动集群。


minikube start -p strimzi --kubernetes-version='v1.22.12' --vm-driver='virtualbox' --memory=8096
[strimzi] minikube v1.24.0 on Darwin 12.5 minikube 1.26.1 is available! Download it: https://github.com/kubernetes/minikube/releases/tag/v1.26.1 To disable this notice, run: 'minikube config set WantUpdateNotification false'
✨ Using the virtualbox driver based on user configuration Starting control plane node strimzi in cluster strimzi Creating virtualbox VM (CPUs=2, Memory=8096MB, Disk=20000MB) ... > kubelet.sha256: 64 B / 64 B [--------------------------] 100.00% ? p/s 0s > kubeadm.sha256: 64 B / 64 B [--------------------------] 100.00% ? p/s 0s > kubectl.sha256: 64 B / 64 B [--------------------------] 100.00% ? p/s 0s > kubeadm: 43.74 MiB / 43.74 MiB [-------------] 100.00% 13.98 MiB p/s 3.3s > kubectl: 44.77 MiB / 44.77 MiB [-------------] 100.00% 11.11 MiB p/s 4.2s > kubelet: 115.30 MiB / 115.30 MiB [-----------] 100.00% 20.16 MiB p/s 5.9s
▪ Generating certificates and keys ... ▪ Booting up control plane ... ▪ Configuring RBAC rules ... ▪ Using image gcr.io/k8s-minikube/storage-provisioner:v5 Verifying Kubernetes components... Enabled addons: storage-provisioner, default-storageclass
❗ /usr/local/bin/kubectl is version 1.24.0, which may have incompatibilites with Kubernetes 1.22.12. ▪ Want kubectl v1.22.12? Try 'minikube kubectl -- get pods -A' Done! kubectl is now configured to use "strimzi" cluster and "default" namespace by default
复制代码


在终端窗口执行下面的命令检查 Kubernetes 集群是否正常运行。


kubectl get nodes
NAME STATUS ROLES AGE VERSIONstrimzi Ready control-plane,master 3m4s v1.22.12
kubectl get podsNo resources found in default namespace.
复制代码

Apache Kafka


在之前的文章中,我们通过 Quarkus 的开发模式来启动运行应用程序所需的外部依赖项(Kafka 集群和 MySQL 数据库)。从开发的角度来看,开发模式非常棒,但在部署到生产环境时,你会发现这些东西管理起来更加复杂。第一个障碍可能是在 Kubernetes 中安装和配置 Kafka 集群。


你可能想知道以下这些问题的答案:


  • Kafka 组件(Kafka、Zookeeper 等)需要使用哪个容器镜像?

  • 如何在 Kubernetes 中轻松部署所有这些组件?

  • 如何在 Kubernetes 中创建用户、主题或 HA?

  • 安全性如何?你可以尝试手动完成所有这些事情,例如编写很长的 YAML 文件和使用 Kafka CI 工具配置 Kafka 组件。然而,还有另一种 Kubernetes 原生的、完全自动化和可复制的(非常适合 CI/CD)方法,就是使用 Strimzi。

Strimzi

Strimzi是一个Kubernetes Operator,通过控制器来创建、配置和保护 Kafka 集群,就像其他 Kubernetes 资源(如 Pod、Deployment、ConfigMap 等)一样。


Strimzi 项目包含三个 Operator——一个用于管理 Kafka 集群,一个用于管理主题,一个用于用户管理。


在 Kubernetes 集群中安装了 Strimzi Operator 之后,你只需要使用下面的 YAML 文件就可以启动并运行一个 Kafka 集群,其中包含了一个 Kafka 副本和三个使用临时存储(没有挂载持久卷)的 ZooKeeper 副本。


apiVersion: kafka.strimzi.io/v1beta2kind: Kafkametadata: name: my-clusterspec: kafka:   version: 3.2.0   replicas: 1   listeners:     - name: plain       port: 9092       type: internal       tls: false     - name: tls       port: 9093       type: internal       tls: true   config:     offsets.topic.replication.factor: 1     transaction.state.log.replication.factor: 1     transaction.state.log.min.isr: 1     default.replication.factor: 1     min.insync.replicas: 1     inter.broker.protocol.version: "3.2"   storage:     type: ephemeral zookeeper:   replicas: 3   storage:     type: ephemeral entityOperator:   topicOperator: {}   userOperator: {}
复制代码


接下来,我们将在已经启动的集群中安装 Strimzi。

安装 Strimzi

首先是创建一个命名空间来安装 Strimzi Operator。在本例中,我们使用了命名空间 kafka。在终端窗口中执行如下命令:


kubectl create namespace kafkanamespace/kafka created
复制代码


接下来,我们应用 Strimzi 安装文件,其中包括用于声明式管理 Kafka 集群、Kafka 主题和用户的 CRD(CustomerResourceDefinition)。


kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
复制代码


运行下面的命令验证 Operator 是否安装正确。


kubectl get pods -n kafka
NAME READY STATUS RESTARTS AGEstrimzi-cluster-operator-597d67c7d6-ms987 1/1 Running 0 4m27s
复制代码


现在,我们开始创建带有 movies 主题的 Kafka 集群。我们将在这个主题中保存所有电影的信息,稍后 Kafka Streams 将消费这个主题,正如我们在本系列文章的第2部分中所看到的那样。

创建 Kafka 集群

创建一个新的文件(即 kafka.yaml)来安装一个带有一个副本的Kafka集群,不启用 TLS,作为内部Kubernetes服务


apiVersion: kafka.strimzi.io/v1beta2kind: Kafkametadata: name: my-clusterspec: kafka:   version: 3.2.0   replicas: 1   listeners:     - name: plain       port: 9092       type: internal       tls: false   config:     offsets.topic.replication.factor: 1     transaction.state.log.replication.factor: 1     transaction.state.log.min.isr: 1     default.replication.factor: 1     min.insync.replicas: 1     inter.broker.protocol.version: "3.2"   storage:     type: ephemeral zookeeper:   replicas: 1   storage:     type: ephemeral entityOperator:   topicOperator: {}   userOperator: {}
复制代码


然后在终端窗口中使用 kubectl 命令创建这个资源:


kubectl create -f kafka.yaml -n kafkakafka.kafka.strimzi.io/my-cluster created
复制代码


此时,Strimzi 开始在默认命名空间中安装 Kafka 集群。


现在,我们通过获取默认的名称空间 Pod 来检查集群的创建情况。


kubectl get pods -n kafka
NAME READY STATUS my-cluster-entity-operator-755596449b-cw82g 3/3 Running my-cluster-kafka-0 1/1 Running my-cluster-zookeeper-0 1/1 Running
复制代码


Kafka 集群已启动并运行。我们除了可以将 Kafka 作为 Kubernetes 资源安装之外,还可以查询和描述它。例如,在终端窗口中执行如下命令:


kubectl get kafka -n kafka
NAME DESIRED KAFKA REPLICAS DESIRED ZK REPLICAS READY WARNINGSmy-cluster 1 1 True True


kubectl describe kafka my-cluster -n kafka
Name: my-clusterNamespace: defaultLabels: <none>Annotations: <none>API Version: kafka.strimzi.io/v1beta2Kind: KafkaMetadata: Creation Timestamp: 2022-08-09T10:57:39Z
复制代码


当然,你也可以像删除其他 Kubernetes 资源一样删除它。此外,系统还创建了 4 个 Kubernetes 服务来访问 Kafka 集群:


kubectl get services -n kafka
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE 143mmy-cluster-kafka-bootstrap ClusterIP 172.30.77.150 <none> 9091/TCP,9092/TCP 21mmy-cluster-kafka-brokers ClusterIP None <none> 9090/TCP,9091/TCP,9092/TCP 21mmy-cluster-zookeeper-client ClusterIP 172.30.5.186 <none> 2181/TCP 21mmy-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 21m
复制代码


应用程序用于访问集群的服务是 my-cluster-kafka-bootstrap,它公开了 Kafka 的 9092 端口。


在进入到应用程序部分之前,我们需要使用另一个 YAML 文件来创建和配置 movies 主题。

创建 movies 主题

Strimzi 有一个用于创建和管理主题的 Operator。要创建一个新主题,我们需要创建一个KafkaTopic类型的 Kubernetes 资源文件,在 strimzi.io/cluster 中指定主题的名称和集群的名称(在我们的例子中是 my-cluster)。我们使用下面的内容创建一个名为 movies-topic.yaml 的新文件。


apiVersion: kafka.strimzi.io/v1beta2kind: KafkaTopicmetadata: name: movies labels:   strimzi.io/cluster: my-clusterspec: partitions: 1 replicas: 1 config:   retention.ms: 7200000   segment.bytes: 1073741824
复制代码


并应用这个文件:


kubectl apply -f movies-topic.yaml -n kafkakafkatopic.kafka.strimzi.io/movies create
复制代码


和其他 Kubernetes 资源一样,我们也可以查询和描述它。


kubectl get kafkatopic -n kafka
NAME CLUSTER PARTITIONS REPLICATION FACTOR READYconsumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a my-cluster 50 1 Truemovies my-cluster 1 1 Truestrimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55 my-cluster 1 1 Truestrimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b my-cluster 1 1 True
复制代码


描述已创建的主题:


kubectl port-forward -n kafka service/my-cluster-kafka-bootstrap 9092:9092
Forwarding from 127.0.0.1:9092 -> 9092Forwarding from [::1]:9092 -> 9092
复制代码


我们来检查一下创建的主题是否有端口转发


在终端窗口执行如下命令:


kubectl port-forward -n kafka service/my-cluster-kafka-bootstrap 9092:9092
Forwarding from 127.0.0.1:9092 -> 9092Forwarding from [::1]:9092 -> 9092
复制代码


打开一个新的终端窗口,使用kcat工具列出 Kafka 集群的元素。我们可以使用 localhost 作为主机名,就像在上一步中使用端口转发技巧一样。


kcat -b localhost:9092 -L
Metadata for all topics (from broker -1: localhost:9092/bootstrap): 1 brokers: broker 0 at my-cluster-kafka-0.my-cluster-kafka-brokers.default.svc:9092 (controller) 4 topics: topic "movies" with 1 partitions: partition 0, leader 0, replicas: 0, isrs: 0
复制代码


最后,我们停止端口转发进程,对项目进行容器化,就像我们在本系列文章的第3部分中所做的那样,并进行一些相应的配置,以便连接到 Kafka 集群。

生产者 Debezium


我们在系列文章的第3部分解决了双写问题,使用 Debezium(具体来说是Debezium Embedded)修复了这个问题,具体方法是监听来自 MySQL 服务器的事务日志,并在每次插入新的电影播放信息时生成带有数据的 Kafka 事件。你可以在本地机器上运行这个示例,使用开发服务启动所需的服务(MySQL 和 Kafka),并自动配置应用程序来连接它们。


现在有点不一样了——服务必须运行在 Kubernetes 集群中,包括在前面步骤中创建的 Kafka 集群和 MySQL 数据库。要让它在 Kubernetes 中运行,需要做出三个改变。


  • 使用新的 Kafka 和 MySQL 参数(主机名、端口、用户名和密码)来配置服务。

  • 将应用程序装入容器,并推送到容器注册表。

  • 创建 Kubernetes 资源文件,用于部署服务。

配置服务


首先要配置的是 Kafka 的主机名和端口,它们指向 Strimzi 创建的 Kubernetes 服务。打开 src/main/resources/application.properties 文件并添加下面的内容:


%prod.kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092
复制代码


%prod 前缀表示这个属性仅在应用程序以 prod 模式下运行时使用(而不是在 dev 或 test 模式下)。


其次时配置插入影片信息的数据库连接。在 application.properties 文件中添加下面的内容。


quarkus.hibernate-orm.database.generation=drop-and-create%prod.quarkus.datasource.username=alex%prod.quarkus.datasource.password=alex%prod.quarkus.datasource.jdbc.url=jdbc:mysql://mysql:3306/moviesdb
复制代码


稍后,我们将使用这些参数部署一个 MySQL 实例。现在,我们假设配置参数是正确的。

容器化

Quarkus 为创建容器提供了与Jib项目的集成,让容器镜像的构建和推送简单得只需要执行一个 Maven/Gradle 任务。


打开 pom.xml 文件,在 dependencies 部分添加以下依赖项:


<dependency>     <groupId>io.quarkus</groupId>     <artifactId>quarkus-container-image-jib</artifactId></dependency>
复制代码


添加了Jib依赖项后,它将在打包时自动将应用程序装入容器。因为Jib的一些默认配置选项可能不适用于所有情况,所以你可以在 src/main/resources/application.properties 中覆盖它们。对于本例,我们将覆盖生成的容器镜像的 group 和容器注册中心的主机。


打开 application.properties 文件,并添加下面的内容:


# Substitue the value with your account namequarkus.container-image.group=lordofthejars # Defaults to Docker.io, overriden to Quay.quarkus.container-image.registry=quay.io
复制代码


你需要设置容器注册表的凭据,以便向注册表推送容器。你可以在执行构建之前运行 docker 的 login 命令。Maven 将从那里读取凭据,或者你可以使用 quarkus.container-image.username 和 quarkus.container-image.password 属性。


在项目的根目录下运行下面的命令来构建应用程序,它将构建出一个容器并将其推到指定的容器注册表中。


./mvnw clean package -DskipTests -Dquarkus.container-image.push=true
[INFO] Scanning for projects...[INFO][INFO] ---------------< org.acme:movie-plays-producer-debezium >---------------[INFO] Building movie-plays-producer-debezium 1.0.0-SNAPSHOT[INFO] --------------------------------[ jar ]---------------------------------[INFO][INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ movie-plays-producer-debezium ---[INFO] Deleting /Users/asotobu/git/quarkus-integrating-kafka/strimzi/movie-plays-producer-debezium/target[INFO] [io.quarkus.container.image.jib.deployment.JibProcessor] Using base image with digest: sha256:1a2fddacdcda67494168749c7ab49243d06d8fbed34abab90566d81b94f5e1a5[INFO] [io.quarkus.container.image.jib.deployment.JibProcessor] Container entrypoint set to [java, -Djava.util.logging.manager=org.jboss.logmanager.LogManager, -jar, quarkus-run.jar][INFO] [io.quarkus.container.image.jib.deployment.JibProcessor] Pushed container image quay.io/lordofthejars/movie-plays-producer-debezium:1.0.0-SNAPSHOT (sha256:73dfe42d53f8d7e3c268dbebc2e5f866596de33b8fcaf82c27bdd414d28bdb8a)
复制代码


从最后一行日志可以看到,容器被创建,并使用 application.properties 中指定的账号推送到注册中心。

Kubernetes

在将容器推送到注册表之后,我们准备将服务部署到 Kubernetes 中。我们可以手动创建 Kubernetes 资源文件,但没有必要这么做,因为 Quarkus 为我们提供了一个Kubernetes扩展


打开 pom.xml 文件,并在 dependencies 部分添加下面的依赖项。


<dependency>     <groupId>io.quarkus</groupId>     <artifactId>quarkus-kubernetes</artifactId></dependency>
复制代码


每次 Maven 打包应用程序时都会注册 Kubernetes 扩展,并生成将应用程序部署到 Kubernetes 集群的 kubernetes.yml 文件。你可以通过 application.properties 来修改生成文件的内容。例如,我们将 Kubernetes Service 设置为 LoadBalancer 而不是 ClusterIP,并将命名空间设置为 kafka。


打开 application.properties 文件并添加下面的内容。


quarkus.kubernetes.service-type=load-balancerquarkus.kubernetes.namespace=kafka
复制代码


修改好以后运行 Maven package 生成部署文件。


./mvnw clean package -DskipTests
[INFO] ------------------------------------------------------------------------[INFO] BUILD SUCCESS[INFO] ------------------------------------------------------------------------
复制代码


检查生成的文件 target/kubernetes/kubernetes.yml:


cat target/kubernetes/kubernetes.yml
复制代码


输出的内容应该类似于下面这样:


---apiVersion: v1kind: Servicemetadata: name: movie-plays-producer-debeziumspec: ports:   - name: http     port: 80     targetPort: 8080 selector:   app.kubernetes.io/name: movie-plays-producer-debezium   app.kubernetes.io/version: 1.0.0-SNAPSHOT # Type is LoadBalancer as set in the application.properties file  type: LoadBalancer---apiVersion: apps/v1kind: Deploymentmetadata: name: movie-plays-producer-debeziumspec: replicas: 1 selector:   matchLabels:     app.kubernetes.io/name: movie-plays-producer-debezium     app.kubernetes.io/version: 1.0.0-SNAPSHOT template:   metadata:   spec:     containers:       - env:           - name: KUBERNETES_NAMESPACE             valueFrom:               fieldRef:                 fieldPath: metadata.namespace         # The image is correctly set automatically         image: quay.io/lordofthejars/movie-plays-producer-debezium:1.0.0-SNAPSHOT         imagePullPolicy: Always         name: movie-plays-producer-debezium         ports:           - containerPort: 8080             name: http             protocol: TCP
复制代码


在本例中,配置参数是硬编码在 application.properties 中的,但你可以将它们作为环境变量传递进去。要在 Kubernetes Deployment 对象中设置环境变量,比如覆盖 Kafka 的配置,可以添加下面的行:


quarkus.kubernetes.env.vars.kafka-bootstrap-servers=my-new-cluster:9092 
复制代码


生成文件的 env 部分将包含这个新的环境变量:


containers:       - env:           - name: KAFKA_BOOTSTRAP_SERVERS             value: my-new-cluster:9092         image: quay.io/lordofthejars/movie-plays-producer-debezium:1.0.0-SNAPSHOT
复制代码

全部放到一起

我们已经使用 Strimzi 在 Kubernetes 集群中部署了一个 Kafka 集群。我们将应用下面的文件(mysql-deployment.yaml)和 application.properties 中配置的参数部署 MySQL 实例。


apiVersion: v1kind: Servicemetadata: name: mysql labels:   app: mysqlspec: ports:   - port: 3306 selector:   app: mysql clusterIP: None---apiVersion: apps/v1kind: Deploymentmetadata: name: mysql labels:   app: mysqlspec: selector:   matchLabels:     app: mysql strategy:   type: Recreate template:   metadata:     labels:       app: mysql   spec:     containers:     - image: mysql:8.0.30       name: mysql       env:       - name: MYSQL_ROOT_PASSWORD         value: alex       - name: MYSQL_DATABASE         value: moviesdb       - name: MYSQL_USER         value: alex       - name: MYSQL_PASSWORD         value: alex       ports:       - containerPort: 3306         name: mysql
复制代码


将 MySQL 实例部署到 Kubernetes 集群:


kubectl apply -f mysql-deployment.yaml -n kafka
复制代码


最后要部署的是应用程序本身。我们有两个选择,第一个是直接应用资源:


kubectl apply -f target/kubernetes/kubernetes.yml -n kafka
复制代码


第二个是将 quarkus.kubernetes.deploy 标志设置为 true 来打包应用程序。当这个标志设置为 true 时,Maven 将:


  1. 创建应用程序 JAR 文件。

  2. 构建容器镜像。

  3. 将容器镜像推送到注册表中。

  4. 自动应用 kubernetes.yml 资源文件到已连接的 Kubernetes 集群。为了验证所有的东西都能正确地运行,我们将发送一个插入新电影信息的请求,并验证在 Kafka 主题中插入的新事件。


在终端窗口中执行以下命令获取访问服务的 IP 和端口。


获取访问服务的 IP:


minikube ip -p strimzi
192.168.59.104
复制代码


获取 movie-plays-producer-debezium 的公开端口,也就是第二个端口。


kubectl get services -n kafka
movie-plays-producer-debezium LoadBalancer 10.100.117.203 <pending> 80:30306/TCP 67m
复制代码


运行 curl 命令,插入一条新的电影信息记录。


curl -X 'POST' \  'http://192.168.59.104:30306/movie' \  -H 'accept: application/json' \  -H 'Content-Type: application/json' \  -d '{  "name": "Minions: The Rise of Gru",  "director": "Kyle Balda",  "genre": "Animation"}'
复制代码


检查 Quarkus 日志,查看数据库运行的 SQL 语句:


kubectl get pods -n kafka
NAME READY STATUS RESTARTS AGEmovie-plays-producer-debezium-56f644cb87-5cchk 1/1 Running 0 6m5smy-cluster-entity-operator-755596449b-cw82g 3/3 Running 0 35hmy-cluster-kafka-0 1/1 Running 0 35hmy-cluster-zookeeper-0 1/1 Running 0 35h
复制代码


打印 movie-plays-producer-debezium 的日志:


kubectl logs movie-plays-producer-debezium-6b9b65bf4-9z524 -n kafka
2022-08-11 07:44:25,658 INFO [org.acm.MovieResource] (executor-thread-1) New Movie inserted Minions: The Rise of Gru:)Hibernate: select next_val as id_val from hibernate_sequence for update
Hibernate: update hibernate_sequence set next_val= ? where next_val=?Hibernate: insert into Movie (director, genre, name, id) values (?, ?, ?, ?)Hibernate: insert into OutboxEvent (aggregatetype, aggregateid, type, timestamp, payload, tracingspancontext, id) values (?, ?, ?, ?, ?, ?, ?)
# Debezium reacts to the change2022-08-11 07:44:25,867 INFO [io.deb.con.com.BaseSourceTask] (executor-thread-0) 1 records sent during previous 00:20:44.297, last recorded offset: {transaction_id=null, ts_sec=1660203865, file=binlog.000002, pos=14795, row=1, server_id=1, event=4}Movie Created and Reacting
复制代码


你还可以使用 Kafka 容器里的 Kafka-console-consumer.sh 脚本来检查 Kafka 中的内容。进入容器并运行下面的命令:


kubectl exec -ti my-cluster-kafka-0 -n kafka /bin/bash
./bin/kafka-console-consumer.sh --topic movies --from-beginning --bootstrap-server localhost:9092{"id":1,"name":"Minions: The Rise of Gru","director":"Kyle Balda","genre":"Animation"}
复制代码


要返回本地终端窗口,请按 Ctrl+C 停止 kafka-console-consumer 进程,然后执行 exit 命令。


到目前为止,一切都很顺利。我们已经得到了与本系列文章第 3 部分中相同的应用程序,只是现在它运行在 Kubernetes 集群中。


到目前为止,我们使用的是 Debezium Embedded,但其实我们可以使用 Debezium Server。

Debezium Server


Debezium Server是一个可配置的、使用就绪的应用程序,它将事件从源数据库流到消息传递系统中,如 Kafka。它可以被注册成一个Kafka Connect组件,作为源连接器。

虽然我们不能在所有的场景中都使用 Debezium Server,但在我看来,使用这种方法有两个大的优点:


  • 你可以获得 Kafka 连接器的所有好处(容错、可扩展、可重用等)。

  • 因为它是一个外部组件,所以不需要更改应用程序代码,也不需要 Debezium Embedded 相关的代码或依赖项。因此,任何应用程序都可以在不做出修改或重新部署的情况下开始使用 Debezium。接下来,我们来看看如何从 Debezium Embedded 迁移到 Debezium Server。

移除 Debezium Embedded

首先要做的是删除 Debezium Embedded 相关的依赖项。


打开 pom.xml 文件,删除以下依赖项:


<dependency>     <groupId>io.debezium</groupId>     <artifactId>debezium-ddl-parser</artifactId></dependency><dependency>     <groupId>io.debezium</groupId>     <artifactId>debezium-embedded</artifactId></dependency><dependency>     <groupId>io.debezium</groupId>     <artifactId>debezium-connector-mysql</artifactId></dependency>
复制代码


下一步是删除所有与 Debezium Embedded 配置和监听器相关的代码。删除这些类文件——DebeziumConfiguration.java、DebeziumListener.java 和 MySqlJdbcParser.java。


因为所有与 Kafka 的交互都是通过 Kafka Connect 组件进行的,不需要 Kafka 代码,所以最后一步是从 pom.xml 中移除以下依赖项:


<dependency>     <groupId>io.quarkus</groupId>     <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId></dependency>
复制代码


application.properties 文件中的这一行不再需要:


%prod.kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092
复制代码


项目中已经没有了 Kafka 或 Debezium Embedded 依赖项。创建一个包含这些最新变更的容器镜像。


在终端窗口执行以下命令,删除之前的部署:


kubectl delete deployment movie-plays-producer-debeziumkubectl delete service movie-plays-producer-debezium
复制代码


要保留带有 Debezium Debezium 的容器镜像,请将 artifactId 更改为 movie-plays-producer-debezium-server。


然后将不带 Debezium 代码的新版本部署到 Kubernetes 集群中,如下所示:


./mvnw clean package -DskipTests -Dquarkus.kubernetes.deploy=true
复制代码


运行以下命令验证新部署的服务:


kubectl get pods -n kafkaa
NAME READY STATUS RESTARTS AGEmovie-plays-producer-debezium-server-59db564b74-vhdmf 1/1 Running 0 73m
复制代码

部署 Debezium Kafka Connect

首先,部署一个 Kafka Connect 组件与所需的 MySQL 连接器插件。你可以认为它跟我们在DebeziumListener类中实现的逻辑差不多,只是被作为一个 Kafka Connect 元素,可以在项目中重用。我们必须为 Kafka Connect 和连接器插件创建一个容器镜像,因为 Debezium 没有为各种可能的 Kafka 与数据的组合提供“官方”镜像。对于本例,我们使用 Kafka 3.2.0 的 MySQL 连接器创建一个容器镜像。


本文中 MySQL 连接器的容器镜像可以在quay.io/lordofthejars/debezium-connector-mysql:1.9.4找到,如果你对它的构建过程感到好奇,可以查看位于这个GitHub存储库中的 Dockerfile 文件。


为了部署 Debezium Kafka Connect,我们将使用 Strimzi 提供的KafkaConnect,因为它简化了整个过程。在这个 Kubernetes 资源文件中,我们指定了 Kafka 版本、Kafka 集群的位置(my-cluster-kafka-bootstrap:9092)、容器镜像(quay.io/lordofthejars/debezin-connector-mysql:1.9.4),以及一些特定的配置参数。


创建一个名为 debezium-kafka-connect.yaml 的文件,内容如下:


apiVersion: kafka.strimzi.io/v1beta2kind: KafkaConnectmetadata: name: debezium-connect-cluster annotations:   strimzi.io/use-connector-resources: "true"spec: version: 3.2.0 image: quay.io/lordofthejars/debezium-connector-mysql:1.9.4 replicas: 1 bootstrapServers: my-cluster-kafka-bootstrap:9092 config:   group.id: connect-cluster   key.converter: org.apache.kafka.connect.json.JsonConverter   value.converter: org.apache.kafka.connect.json.JsonConverter   key.converter.schemas.enable: false   value.converter.schemas.enable: false   offset.storage.topic: connect-offsets   offset.storage.replication.factor: 1   config.storage.topic: connect-configs   config.storage.replication.factor: 1   status.storage.topic: connect-status   status.storage.replication.factor: 1
复制代码


然后在终端窗口中应用这个资源:


kubectl apply -f debezium-kafka-connect.yaml -n kafka
复制代码


并通过运行以下命令验证它是否被正确部署:


kubectl get pods -n kafka
debezium-connect-cluster-connect-546c8695c-lszn7 1/1 Running 0 91m
复制代码


请记住,这个过程可能需要几分钟的准备时间。


Kafka Connect 组件现在连接到了 Kafka 集群,最后一步是通过配置让它监听 MySQL 实例的数据变更。


为此,我们将使用 Strimzi 提供的 KafkaConnector。这有点类似于我们在 DebeziumConfiguration 类中所做的那样,提供 database.hostname 或 table.include.list 之类的参数。此外,我们还要将 strimzi.io/cluster 的值设置为上一个 YAML 文件中指定的 KafkaConnect 名称(debezum-connect-cluster)。


创建一个名为 debezium-kafka-connector.yaml 的文件,内容如下:


apiVersion: kafka.strimzi.io/v1beta2kind: KafkaConnectormetadata: name: debezium-connector-mysql labels:   strimzi.io/cluster: debezium-connect-clusterspec: class: io.debezium.connector.mysql.MySqlConnector tasksMax: 1 config:   tasks.max: 1   database.hostname: mysql   database.port: 3306   database.user: root   database.password: alex   database.server.id: 184054   database.server.name: mysql   database.include.list: moviesdb   database.allowPublicKeyRetrieval: true   table.include.list: moviesdb.OutboxEvent   database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092   database.history.kafka.topic: schema-changes.movies
复制代码


通过应用资源来配置 Debezium Connector:


kubectl apply -f debezium-kafka-connector.yaml -n kafka
复制代码


为了验证一切工作正常,我们添加一条新的电影数据记录,并验证将新记录插入数据库时 Kafka 主题中会产生一个新事件。


获取新服务的端口,IP 仍然是相同的:


kubectl get services -n kafka
movie-plays-producer-debezium-server LoadBalancer 10.100.117.203 <pending> 80:30307/TCP 67m
curl -X 'POST' \ 'http://192.168.59.104:30307/movie' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "name": "Minions: The Rise of Gru", "director": "Kyle Balda", "genre": "Animation"}'
复制代码


使用 kafka-console-consumer.sh 脚本验证插入的数据:


kubectl exec -ti my-cluster-kafka-0 -n kafka /bin/bash
复制代码


然后在容器中运行脚本。注意,Debezium 连接器将事件发送到一个 Kafka 主题,名称是这样的<database.server.name>.<database.inlude.list>.<table>,在这个示例中是 mysql.moviesdb.OutboxEvent。


./bin/kafka-console-consumer.sh --topic mysql.moviesdb.OutboxEvent --from-beginning --bootstrap-server localhost:9092
{"before":null,"after":{"id":"Yxk0o5WwTvi0+nwBr2Y36wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA","aggregatetype":"Movie","aggregateid":"5","type":"MovieCreated","timestamp":1660253420864918,"payload":"{\"id\":5,\"name\":\"Minions: The Rise of Gru\",\"director\":\"Kyle Balda\",\"genre\":\"Animation\"}","tracingspancontext":null},"source":{"version":"1.9.4.Final","connector":"mysql","name":"mysql","ts_ms":1660253420000,"snapshot":"false","db":"moviesdb","sequence":null,"table":"OutboxEvent","server_id":1,"gtid":null,"file":"binlog.000002","pos":8788,"row":0,"thread":41,"query":null},"op":"c","ts_ms":1660253420878,"transaction":null}
复制代码


before 字段是空的,因为是插入操作,所以没有前值,但是在 after 字段中有电影信息数据。

结论


我们已经将应用程序从本地迁移到了 Kubernetes 集群中。


Strimzi 为我们提供了在 Kubernetes 中部署和管理 Apache Kafka 集群的一个关键元素。我们可以使用 Kubernetes 资源文件安装和管理集群,采用 GitOps 的方式来管理 Kafka。


Debezium Embedded 适用于一些场景,比如在检测数据变更时使用的临时逻辑。不过,在其他项目中(特别是在遗留项目或需要高可伸缩性和容错性的项目),Debezium Server 可能更合适。


有了 Strimzi、Jib 和 Kubernetes Quarkus 扩展,从本地转移到 Kubernetes 集群应该并不难。


本文的源代码可以在GitHub上找到。

作者简介


Alex Soto 是 Red Hat 的开发者体验总监。他对 Java 和软件自动化领域充满热情,并相信开源软件模型。Soto 是《Testing Java Microservices》(Manning 出版)和《Quarkus Cookbook》(O'Reilly 出版)的合著者,也是多个开源项目的贡献者。自 2017 年以来,他获得 Java Champion 的称号,也是 Salle URL 大学的国际演讲师和教师。你可以在 Twitter 上关注他(Alex Soto),继续关注 Kubernetes 和 Java 世界的动态。


原文链接

https://www.infoq.com/articles/strimzi-the-gitops-way/


相关阅读:

本系列第一部分:使用 Apache Kafka 实现 Quarkus 的反应式消息

本系列第二部分:Kafka Streams 与 Quarkus:实时处理事件

本系列第三部分:Debezium 和 Quarkus:通过 CDC 模式来避免双重写入

2022-10-12 08:009319

评论

发布
暂无评论
发现更多内容

架构师训练营第 9 周作业

netspecial

极客大学架构师训练营

Java 中常见的细粒度锁实现

rookiedev

Java 多线程 细粒度锁

Week5 作业1

Sean Chen

Week 9 作业01

Croesus

助推城市智慧化!正舵者携手中科院演绎区块链魅力

CECBC

区块链 人工智能

第9周作业2

Yangjing

极客大学架构师训练营

JVM垃圾回收原理,秒杀系统架构方案

garlic

极客大学架构师训练营

【喜讯】Apache DolphinScheduler 荣获 “2020 年度十大开源新锐项目”

代立冬

Apache 大数据 开源 DolphinScheduler Apache DolphinScheduler

秒杀系统

橘子皮嚼着不脆

第九周作业

极客大学架构师训练营

大数据和Hadoop平台介绍

MySQL从删库到跑路

大数据 hadoop

Netty源码解析 -- 对象池Recycler实现原理

binecy

Netty 对象存储 高性能

架构师训练营 week5 学习总结

花果山

极客大学架构师训练营

架构师训练营 week5 课后作业

花果山

极客大学架构师训练营

架构师训练营第二期 Week 5 总结

bigxiang

极客大学架构师训练营

应届秋招生,熬夜吃透华为架构师这份‘典藏级’计算机网络+计算机操作系统,成功上岸腾讯

网络协议 编程之路 计算机知识

InfoQ 写作平台的魔力

Yolanda

二分法求平方根,swift面向协议编程protocol从入门到精通、《格局》吴军著读后感、John 易筋 ARTS 打卡 Week 27

John(易筋)

collection ARTS 打卡计划 格局 吴军 李嘉图定律 面向协议protocol编程

架构师训练营第九周课程笔记及心得

Airs

能源区块链研究|区块链与核电安全

CECBC

区块链 核电

第五周作业

晴空万里

极客大学架构师训练营

架构师训练营第二期 Week 5 作业

bigxiang

极客大学架构师训练营

真零基础Python开发web

MySQL从删库到跑路

Python django Web bottle

第9周作业1

Yangjing

极客大学架构师训练营

5G+工业互联网的中国登山队,如何攀跃“产业化”山峦?

脑极体

技术选型总结一

Mars

技术选型

架构师训练营第 9 周课后练习

叶纪想

极客大学架构师训练营

架构师训练营 1 期第 9 周:性能优化(三)- 总结

piercebn

极客大学架构师训练营

架构师 01 期,第九周课后作业

子文

【架构师训练营】第九周作业:性能优化

MindController

秒杀系统

【架构师训练营第 1 期 09 周】 学习总结

Bear

极客大学架构师训练营

使用Strimzi将Kafka和Debezium迁移到Kubernetes_架构_Alex Soto_InfoQ精选文章