写点什么

使用 Strimzi 提高 Kafka 集群的安全性

  • 2023-03-15
    北京
  • 本文字数:18358 字

    阅读完需:约 60 分钟

使用Strimzi提高Kafka集群的安全性

在本系列的第3部分,我们学习了双写问题以及如何使用变更数据捕获模式解决这些问题,特别是使用Debezium读取数据库中所做的变更(通过事务日志)并将它们填充到 Kafka 主题中。



在本系列的第4部分,我们将示例又向前推进了一步,将应用程序从本地开发环境部署到 Kubernetes(生产环境)中。我们使用Strimzi来部署和配置 Kafka 和 Debezium。



但总的来说,我们忽略了一个重要的东西——当时我们没有把它简化,但它却非常重要——安全性问题。



  • 如何在不直接将用户名/密码硬编码在部署文件中的情况下保护 MySQL 实例。

  • 如何使用 Strimzi 在 Kafka 集群中添加 authn。

  • 如何配置 Debezium,以便对 Kafka 和 MySQL 实例进行安全身份验证。在本文中,我们将通过保护在上一篇文章中开发的应用程序(使用 Debezium Server 方法)来回答所有这些问题。

Kubernetes

我们需要一个安装了 Strimzi 的 Kubernetes 集群。我们在本系列的第 4 部分中对此进行了介绍,如果你要重用它,需要删除应用程序、MySQL 数据库、Kafka 集群和 Debezium 实例。


重要提示:如果第 4 部分中使用的集群还在,需要执行下面的步骤。如果集群已经被删除,请从介绍如何删除集群的部分之后继续阅读。


在终端窗口执行如下命令来删除它们:


kubectl delete deployment movie-plays-producer-debezium-server -n kafkakubectl delete service movie-plays-producer-debezium-server -n kafkakubectl delete -f mysql-deployment.yaml -n kafkakubectl delete -f debezium-kafka-connector.yaml -n kafkakubectl delete -f debezium-kafka-connect.yaml -n kafkakubectl delete -f kafka.yaml -n kafka
复制代码


重要提示:如果你还没有 Kuberntes 集群,则只需要执行下面的步骤。


如果集群已经被销毁,请按照指示创建一个新的集群。在终端窗口中运行以下命令:


minikube start -p strimzi --kubernetes-version='v1.22.12' --vm-driver='virtualbox' --memory=12096 --cpus=3
kubectl create namespace kafka
kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
复制代码


执行下面的命令验证 Operator 是否安装正确:


kubectl get pods -n kafka
NAME READY STATUS RESTARTS AGEstrimzi-cluster-operator-597d67c7d6-ms987 1/1 Running 0 4m27s
复制代码


等待 Operator 运行并准备就绪。


此时,我们可以开始使用身份验证和授权(而不是匿名访问)来安装所有组件。

MySQL

在前一篇文章中,我们部署了 MySQL 实例,将用户名/密码作为环境变量硬编码在部署文件中:


env:    - name: MYSQL_ROOT_PASSWORD      value: alex    - name: MYSQL_DATABASE      value: moviesdb    - name: MYSQL_USER      value: alex    - name: MYSQL_PASSWORD      value: alex
复制代码


我们创建一个 Kubernetes Secret 来存储这些敏感数据。Kubernetes 密钥文件中的数据必须采用 base64 格式编码。alex 的 base64 编码为 YWxleA==。


要生成这个值,执行下面的命令:


echo -n 'alex' | base64YWxleA==
复制代码


在 mysql-secret.yaml 文件中填入编码的密钥:


apiVersion: v1kind: Secretmetadata: name: mysqlsecrettype: Opaquedata: mysqlrootpassword: YWxleA== mysqluser: YWxleA== mysqlpassword: YWxleA==
复制代码


将其应用到集群:


kubectl apply -f mysql-secret.yaml -n kafka
复制代码


然后更新 MySQL 部署文件,使用 value 中的 secretKeyRef 字段读取在上一步中创建的密钥:


apiVersion: v1kind: Servicemetadata: name: mysql labels:   app: mysqlspec: ports:   - port: 3306 selector:   app: mysql clusterIP: None---apiVersion: apps/v1kind: Deploymentmetadata: name: mysql labels:   app: mysqlspec: selector:   matchLabels:     app: mysql strategy:   type: Recreate template:   metadata:     labels:       app: mysql   spec:     containers:     - image: mysql:8.0.30       name: mysql       env:       - name: MYSQL_ROOT_PASSWORD         valueFrom:           secretKeyRef:             key: mysqlrootpassword             name: mysqlsecret       - name: MYSQL_DATABASE         value: moviesdb       - name: MYSQL_USER         valueFrom:           secretKeyRef:             key: mysqluser             name: mysqlsecret       - name: MYSQL_PASSWORD         valueFrom:           secretKeyRef:             key: mysqlpassword             name: mysqlsecret       ports:       - containerPort: 3306         name: mysql
复制代码


在 secretKeyRef 中,我们指定了密钥名称。在本例中,我们在 mysql-secret.yaml 文件中指定的是 mysqlsecret。


将 MySQL 实例部署到 Kubernetes 集群:


kubectl apply -f mysql-deployment.yaml -n kafka
复制代码


我们可以通过导出环境变量来验证注入的密钥是否正确。首先,我们获取 Pod 的名称:


kubectl get pods -n kafkaNAME                                        READY   STATUS    RESTARTS   AGEmysql-7888f99967-4cj47                      1/1     Running   0          90s
复制代码


然后在终端窗口中运行下面的命令:


kubectl exec -n kafka -ti mysql-7888f99967-4cj47 /bin/bash
bash-4.4# exportdeclare -x GOSU_VERSION="1.14"declare -x HOME="/root"declare -x HOSTNAME="mysql-7888f99967-4cj47"declare -x KUBERNETES_PORT="tcp://10.96.0.1:443"declare -x KUBERNETES_PORT_443_TCP="tcp://10.96.0.1:443"declare -x KUBERNETES_PORT_443_TCP_ADDR="10.96.0.1"declare -x KUBERNETES_PORT_443_TCP_PORT="443"declare -x KUBERNETES_PORT_443_TCP_PROTO="tcp"declare -x KUBERNETES_SERVICE_HOST="10.96.0.1"declare -x KUBERNETES_SERVICE_PORT="443"declare -x KUBERNETES_SERVICE_PORT_HTTPS="443"declare -x MYSQL_DATABASE="moviesdb"declare -x MYSQL_MAJOR="8.0"declare -x MYSQL_PASSWORD="alex"declare -x MYSQL_ROOT_PASSWORD="alex"declare -x MYSQL_SHELL_VERSION="8.0.30-1.el8"declare -x MYSQL_USER="alex"declare -x MYSQL_VERSION="8.0.30-1.el8"declare -x OLDPWDdeclare -x PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"declare -x PWD="/"declare -x SHLVL="1"declare -x TERM="xterm"
复制代码


现在可以退出容器:


exit
复制代码


现在,MySQL 数据库的凭证使用的是 Kubernetes Secret 配置,这比在部署文件中硬编码要好得多。应用程序也需要修改,因为它现在需要从 Secret 读取凭证,而不是读取配置文件中的静态凭证。

Move Play Producer Debezium

数据库用户名和密码硬编码在 application.properties 文件中,如果应用程序能够在部署到 Kubernetes 时自动配置用户名和密码,那就更好了。


一种方法是将密钥作为环境变量注入到应用程序 Pod 中,就像部署 MySQL 那样。例如,对于密码,部署文件的 env 部分可能是这样的:


- name: MYSQL_PASSWORD  valueFrom:    secretKeyRef:      key: mysqlpassword      name: mysqlsecret
复制代码


现在更新 application.properties 文件,从环境变量中获取密码:


%prod.quarkus.datasource.password=${mysql-password}
复制代码


虽然这样做可以奏效,但将密钥作为环境变量并不是最安全的做法,因为任何一个可以列出环境变量的人都可以很容易地窃取它们。


虽然这样做可以奏效,但将密钥作为环境变量并不是最安全的做法,因为任何一个可以列出环境变量的人都可以很容易地窃取它们。


Quarkus 有一个kubernetes-config扩展,应用程序可以用它直接从 Kubernetes API 服务器读取 Kubernetes ConfigMaps 和 Secrets。通过这种方式,密钥可以安全地从 Kubernetes 集群传到应用程序中,而不需要任何中间步骤,如将它们作为环境变量传入或作为卷挂载。

Kubernetes 配置扩展

首先要做的是注册 kubernetes-config 扩展。打开 pom.xml 文件,并添加以下依赖项:


<dependency>  <groupId>io.quarkus</groupId>  <artifactId>quarkus-kubernetes-config</artifactId></dependency>
复制代码


然后,让应用程序直接从 Kubernetes API 读取 Kubernetes Secrets(在我们的例子中,Secret 的名字是 mysqlsecret)。


打开 src/main/resources/application.properties,加入下面的内容:


%prod.quarkus.kubernetes-config.secrets.enabled=true                           quarkus.kubernetes-config.secrets=mysqlsecret
复制代码


然后更新 quarku.datasource.username 和 quarku.datasource.password 属性,读取 mysqlsecret Secret 中的 mysqluser 和 mysqlpassword。


在 application. properties 文件中更新这些属性:


%prod.quarkus.datasource.username=${mysqluser}%prod.quarkus.datasource.password=${mysqlpassword}
复制代码


这两个属性分别使用 mysqlsecret Secret 中的值进行了赋值。


由于读取 Kubernetes Secrets 需要与 Kubernetes API Server 发生交互,因此,当集群启用了 RBAC(基于角色的访问控制)时,用于运行应用程序的 ServiceAccount 必须具有适当的访问权限。


这两个属性分别使用 mysqlsecret Secret 中的值进行了赋值。


由于读取 Kubernetes Secrets 需要与 Kubernetes API Server 发生交互,因此,当集群启用了 RBAC(基于角色的访问控制)时,用于运行应用程序的 ServiceAccount 必须具有适当的访问权限。


因为我们在前一篇文章中注册了Kubernetes扩展,所以自动生成了所有必要的 Kubernetes 资源,所以现在不需要做任何事情。


现在在终端窗口运行下面的命令部署应用程序:


./mvnw clean package -DskipTests -Dquarkus.kubernetes.deploy=true
[INFO] [io.quarkus.kubernetes.deployment.KubernetesDeployer] Deploying to kubernetes server: https://192.168.59.104:8443/ in namespace: kafka.[INFO] [io.quarkus.kubernetes.deployment.KubernetesDeployer] Applied: Service movie-plays-producer-debezium-server.[INFO] [io.quarkus.kubernetes.deployment.KubernetesDeployer] Applied: Deployment movie-plays-producer-debezium-server.[INFO] [io.quarkus.deployment.QuarkusAugmentor] Quarkus augmentation completed in 9537ms
复制代码


为了验证部署是否正确,我们检查 Pod 的日志,确保没有出现错误,并且 SQL 语句执行是正确的:


kubectl get pods -n kafka
NAME READY STATUS RESTARTS AGEmovie-plays-producer-debezium-server-auth-7cc69fb56c-nc8tx 1/1 Running 0 44s


kubectl logs movie-plays-producer-debezium-server-auth-7cc69fb56c-nc8tx -n kafka
__ ____ __ _____ ___ __ ____ ______ --/ __ \/ / / / _ | / _ \/ //_/ / / / __/ -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \--\___\_\____/_/ |_/_/|_/_/|_|\____/___/2022-08-21 21:00:41,277 INFO [io.deb.out.qua.int.AdditionalJaxbMappingProducerImpl] (main) Contributed XML mapping for entity: io.debezium.outbox.quarkus.internal.OutboxEvent
Hibernate:
create table Movie ( id bigint not null, director varchar(255), genre varchar(255), name varchar(255), primary key (id) ) engine=InnoDBHibernate:
create table OutboxEvent ( id binary(255) not null, aggregatetype varchar(255) not null, aggregateid varchar(255) not null, type varchar(255) not null, timestamp datetime(6) not null, payload varchar(8000), tracingspancontext varchar(256), primary key (id) ) engine=InnoDB
复制代码


在下图中可以看到我们做了安全性保护的部分。



现在,应用程序正在运行中,MySQL 凭证也被保护起来了,下面我们继续为 Kafka 和 Debezium 提供保护。

Kafka

到目前为止,我们已经部署了一个开放的 Kafka 集群,没有启用身份验证或授权逻辑。


Strimzi 支持使用以下认证机制来部署 Kafka 集群:


  • SASL SCRAM-SHA-512;

  • TLS 客户端认证;

  • OAuth 2.0 基于令牌的身份验证。由于 Strimzi Operator 已经安装在 Kubernetes 集群中了,所以我们可以使用 Kafka 自定义资源。Kafka 资源配置了集群部署,并启用了 TLS 客户端身份验证。


Strimzi 可以在 listeners 中设置监听器,使用 mTLS 作为通信协议(tls=true)和认证方法类型(authentication 字段)。


创建一个叫作 kafka.yaml 的新文件,使用下面的内容来配置一个安全的 Kafka:


apiVersion: kafka.strimzi.io/v1beta2kind: Kafkametadata: name: my-cluster namespace: kafkaspec: kafka:   version: 3.2.0   replicas: 1   listeners:     - name: demo       port: 9092       type: internal       tls: false     - name: secure       port: 9093       type: internal       tls: true       authentication:         type: tls   authorization:     type: simple   config:     offsets.topic.replication.factor: 1     transaction.state.log.replication.factor: 1     transaction.state.log.min.isr: 1     default.replication.factor: 1     min.insync.replicas: 1     inter.broker.protocol.version: "3.2"   storage:     type: ephemeral zookeeper:   replicas: 1   storage:     type: ephemeral entityOperator:   topicOperator: {}   userOperator: {}
复制代码


将其应用到 Kubernetes 集群:


kubectl apply -f kafka.yaml -n kafka
kafka.kafka.strimzi.io/my-cluster created
复制代码


现在我们来验证 Kafka 集群已启动并在运行当中:


kubectl get pods -n kafka
NAME READY STATUS RESTARTS AGEmy-cluster-entity-operator-d4db5ff58-rt96n 3/3 Running 0 2m26smy-cluster-kafka-0 1/1 Running 0 2m58smy-cluster-zookeeper-0 1/1 Running 0 3m31s
复制代码


由于我们将监听器设置为使用 TLS,所以 Strimzi 已经自动创建了一个 Kubernetes Secret,其中包含集群证书、pkcs12 信任存储和相关的密码。


kubectl get secrets -n kafka
my-cluster-clients-ca Opaque 1 9m14smy-cluster-clients-ca-cert Opaque 3 9m14smy-cluster-cluster-ca Opaque 1 9m14smy-cluster-cluster-ca-cert Opaque 3 9m14smy-cluster-cluster-operator-certs Opaque 4 9m14smy-cluster-entity-operator-dockercfg-5wwb5 kubernetes.io/dockercfg 1 8m9smy-cluster-entity-operator-token-h9xkq kubernetes.io/service-account-token 4 8m9smy-cluster-entity-operator-token-npvfc kubernetes.io/service-account-token 4 8m9smy-cluster-entity-topic-operator-certs Opaque 4 8m9smy-cluster-entity-user-operator-certs Opaque 4 8m8smy-cluster-kafka-brokers Opaque 4 8m41smy-cluster-kafka-dockercfg-fgpx2 kubernetes.io/dockercfg 1 8m41smy-cluster-kafka-token-2x7s8 kubernetes.io/service-account-token 4 8m41smy-cluster-kafka-token-6qdgk kubernetes.io/service-account-token 4 8m41smy-cluster-zookeeper-dockercfg-p296g kubernetes.io/dockercfg 1 9m13smy-cluster-zookeeper-nodes Opaque 4 9m13smy-cluster-zookeeper-token-dp9sc kubernetes.io/service-account-token 4 9m13smy-cluster-zookeeper-token-gbrxg kubernetes.io/service-account-token 4 9m13s
复制代码


这里最为重要的是<clustername>-cluster-ca-cert(在本例中是 my-cluster-cluster-ca-cert)这个密钥。


在终端窗口中运行下面的命令列出密钥的内容:


kubectl get secret my-cluster-cluster-ca-cert -o yaml -n kafka
apiVersion: v1data: ca.crt: LS0tLS1CRUdJTiBDRVJU ca.p12: MIIGkwIBAzCCBk== ca.password: azJjY2tIMEs1c091kind: Secretmetadata: annotations: strimzi.io/ca-cert-generation: "0" creationTimestamp: "2022-08-21T19:32:55Z" labels: app.kubernetes.io/instance: my-cluster app.kubernetes.io/managed-by: strimzi-cluster-operator app.kubernetes.io/name: strimzi app.kubernetes.io/part-of: strimzi-my-cluster strimzi.io/cluster: my-cluster strimzi.io/kind: Kafka strimzi.io/name: strimzi name: my-cluster-cluster-ca-cert namespace: kafka ownerReferences: - apiVersion: kafka.strimzi.io/v1beta2 blockOwnerDeletion: false controller: false kind: Kafka name: my-cluster uid: 23c84dfb-bb33-47ed-bd41-b4e87e0a4c3a resourceVersion: "49424" uid: 6c2679a8-216f-421b-880a-de0e6a0879fatype: Opaque
复制代码


我们来创建一个 mTLS 授权的用户。

安全和 Debezium

Kafka 已经被保护起来了,现在我们来创建一个KafkaUser资源,将授权角色赋给使用 mTLS 模式为用户进行身份验证的组和主题。


创建一个叫作 kafka-user-connect-all-topics.yaml 的文件,包含以下内容:


apiVersion: kafka.strimzi.io/v1beta2kind: KafkaUsermetadata: name: my-connect namespace: kafka labels:   # Cluster name set previously   strimzi.io/cluster: my-clusterspec: authentication:   type: tls authorization:   type: simple   acls:   # Kafka Connects internal topics used to store configuration, offsets or status   - resource:       type: group       name: outbox-viewer     operation: Read   - resource:       type: group       name: outbox-viewer     operation: Describe   - resource:       type: group       name: mysql-dbhistory     operation: Read   - resource:       type: group       name: mysql-dbhistory     operation: Describe   - resource:       type: group       name: connect-cluster     operation: Read   - resource:       type: group       name: connect-cluster     operation: Describe   - resource:       type: topic       name: connect-cluster-configs     operation: Read   - resource:       type: topic       name: connect-cluster-configs     operation: Describe   - resource:       type: topic       name: connect-cluster-configs     operation: Write   - resource:       type: topic       name: connect-cluster-configs     operation: Create   - resource:       type: topic       name: connect-cluster-status     operation: Read   - resource:       type: topic       name: connect-cluster-status     operation: Describe   - resource:       type: topic       name: connect-cluster-status     operation: Write   - resource:       type: topic       name: connect-cluster-status     operation: Create   - resource:       type: topic       name: connect-cluster-offsets     operation: Read   - resource:       type: topic       name: connect-cluster-offsets     operation: Write   - resource:       type: topic       name: connect-cluster-offsets     operation: Describe   - resource:       type: topic       name: connect-cluster-offsets     operation: Create   - resource:       type: group       name: connect-cluster     operation: Read   # Debezium topics   - resource:       type: topic       name: "*"     operation: Read   - resource:       type: topic       name: "*"     operation: Describe   - resource:       type: topic       name: "*"     operation: Write   - resource:       type: topic       name: "*"     operation: Create
复制代码


在终端窗口中应用这个资源:


kubectl apply -f kafka-user-connect-all-topics.yaml -n kafkakafkauser.kafka.strimzi.io/my-connect created
复制代码


在注册了这个 Kafka 用户后,Strimzi 创建了一个与 KafkaUser 资源(my-connect)同名的新密钥,并使用 pkcs12 密钥存储库保存客户端的私钥和访问它的密码。


kubectl get secret my-connect -n kafka -o yaml
apiVersion: v1data: ca.crt: LS0tLS1CK user.crt: LS0tLS1CRUdJTiB== user.key: LS0tLS1CRUdJTiBQUklWQVRK user.p12: MIILNAIBAzCAA== user.password: UUR4Nk5NemsxUVFFkind: Secretmetadata: creationTimestamp: "2022-08-21T20:12:44Z" labels: app.kubernetes.io/instance: my-connect app.kubernetes.io/managed-by: strimzi-user-operator app.kubernetes.io/name: strimzi-user-operator app.kubernetes.io/part-of: strimzi-my-connect strimzi.io/cluster: my-cluster strimzi.io/kind: KafkaUser name: my-connect namespace: kafka ownerReferences: - apiVersion: kafka.strimzi.io/v1beta2 blockOwnerDeletion: false controller: false kind: KafkaUser name: my-connect uid: 882447cc-7759-4884-9d2f-f57f8be92711 resourceVersion: "60439" uid: 9313676f-3417-42d8-b3fb-a1b1fe1b3a39type: Opaque
复制代码


现在,我们有了一个新的 Kafka 用户,拥有访问 Kafka 主题所需的权限。


在部署 Debezium Kafka Connector 之前,我们需要允许 Kafka Connector 对象使用 Kubernetes API 直接从 mysqlsecret Secret 对象中读取 MySQL 密钥(就像我们在应用程序中所做的那样),这样 Connector 就可以通过数据库身份验证并读取事务日志。


创建 kafka-role-binding.yaml 文件,内容如下:


apiVersion: rbac.authorization.k8s.io/v1kind: Rolemetadata: name: connector-configuration-role namespace: kafkarules:- apiGroups: [""] resources: ["secrets"] resourceNames: ["mysqlsecret", "my-connect", "my-cluster-cluster-ca-cert"] verbs: ["get"]---apiVersion: rbac.authorization.k8s.io/v1kind: RoleBindingmetadata: name: connector-configuration-role-binding namespace: kafkasubjects:- kind: ServiceAccount name: debezium-connect-cluster-connect namespace: kafkaroleRef: kind: Role name: connector-configuration-role apiGroup: rbac.authorization.k8s.io
复制代码


注意,subjects 下面的 name 是运行 Debezium Kafka Connect Pod 所需的服务帐户。我们还没有部署 Pod,不过在部署 KafkaConnect 组件时,创建的服务帐户需要遵循 $KafkaConnectName-connect 的格式。由于 Debezium Kafka Connect 的名称是 debezium-connect-cluster-connect,因此创建的服务帐户就是 my-connect-connect,并且我们授予这个帐户直接读取 Kubernetes Secrets 的权限。


在部署 Debezium Kafka Connect 之前应用 kafka-role-binding.yaml 文件:


kubectl apply -f kafka-role-binding.yaml -n kafka
role.rbac.authorization.k8s.io/connector-configuration-role createdrolebinding.rbac.authorization.k8s.io/connector-configuration-role-binding created
复制代码


下图总结了目前的安全通信:



为了部署 Debezium Kafka Connect,我们需要再次使用 Strimzi 提供的KafkaConnect对象,但需要做一些修改,以便通过 Kafka 集群的身份验证,并允许从 Kubernetes Secrets 读取配置参数(主要目的是读取 MySQL 凭证进行身份验证)。


配置如下字段:


  • 端口现在是 9093。

  • 设置用于与集群通信的 mTLS 证书(tls 字段)。

  • 设置证书和密钥用户(authentication 字段),以便进行身份验证。

  • 设置 config.providers,让 MySQL Connector 从 Kubernetes Secrets 读取配置。

  • externalConfiguration 用于将信任存储库和密钥存储库物化到文件中。它们被物化在/opt/kafka/external-configuration/目录下。MySQL Connector 会访问这些文件。创建 kafka-connect.yaml 文件,内容如下所示:


apiVersion: kafka.strimzi.io/v1beta2kind: KafkaConnectmetadata: name: debezium-connect-cluster namespace: kafka annotations:   strimzi.io/use-connector-resources: "true"spec: version: 3.2.0 image: quay.io/lordofthejars/debezium-connector-mysql:1.9.4 replicas: 1 bootstrapServers: my-cluster-kafka-bootstrap:9093 logging:   type: inline   loggers:     connect.root.logger.level: "INFO" tls:   trustedCertificates:     - secretName: my-cluster-cluster-ca-cert       certificate: ca.crt authentication:   type: tls   certificateAndKey:     secretName: my-connect     certificate: user.crt     key: user.key config:   config.providers: secrets   config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider   group.id: connect-cluster   offset.storage.topic: connect-cluster-offsets   offset.storage.replication.factor: 1   config.storage.topic: connect-cluster-configs   config.storage.replication.factor: 1   status.storage.topic: connect-cluster-status   status.storage.replication.factor: 1 externalConfiguration:   volumes:     - name: cluster-ca       secret:         secretName: my-cluster-cluster-ca-cert     - name: my-user       secret:         secretName: my-connect
复制代码


trustedCertificates 设置为使用 Kafka 对象部署 Kafka 集群时创建的密钥。


authentication 下面的 certificateAndKey 设置为注册 KafkaUser 时创建的密钥。


部署资源并验证其正确性:


kubectl apply -f kafka-connect.yaml -n kafkakafkaconnect.kafka.strimzi.io/debezium-connect-cluster created
复制代码


创建一个叫作 debezium-kafka-connector.yaml 的新文件,用于配置 Debezium,允许 MySQL Connector 访问 MySQL 实例的事务日志。在本例中,我们在连接器配置中没有使用明文的用户名和密码,而是引用前面用 MySQL 凭证创建的 Secret 对象。Secret 的访问格式为 secrets:<namespace>/<secretname>:<key>。此外,在应用了 KafkaConnect 定义后,它会读取物化的信任存储库和密钥库。


apiVersion: kafka.strimzi.io/v1beta2kind: KafkaConnectormetadata: name: debezium-connector-mysql namespace: kafka labels:   strimzi.io/cluster: debezium-connect-clusterspec: class: io.debezium.connector.mysql.MySqlConnector tasksMax: 1 config:   group.id: connect-cluster   tasks.max: 1   database.hostname: mysql   database.port: 3306   database.user: root   database.password: ${secrets:kafka/mysqlsecret:mysqlpassword}   database.server.id: 184054   database.server.name: mysql   database.include.list: moviesdb   database.allowPublicKeyRetrieval: true   table.include.list: moviesdb.OutboxEvent   database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9093   database.history.kafka.topic: schema-changes.movies   database.history.producer.security.protocol: SSL   database.history.producer.ssl.keystore.type: PKCS12   database.history.producer.ssl.keystore.location: /opt/kafka/external-configuration/my-user/user.p12   database.history.producer.ssl.keystore.password: ${secrets:kafka/my-connect:user.password}   database.history.producer.ssl.truststore.type: PKCS12   database.history.producer.ssl.truststore.location: /opt/kafka/external-configuration/cluster-ca/ca.p12   database.history.producer.ssl.truststore.password: ${secrets:kafka/my-cluster-cluster-ca-cert:ca.password}
database.history.consumer.security.protocol: SSL database.history.consumer.ssl.keystore.type: PKCS12 database.history.consumer.ssl.keystore.location: /opt/kafka/external-configuration/my-user/user.p12 database.history.consumer.ssl.keystore.password: ${secrets:kafka/my-connect:user.password} database.history.consumer.ssl.truststore.type: PKCS12 database.history.consumer.ssl.truststore.location: /opt/kafka/external-configuration/cluster-ca/ca.p12 database.history.consumer.ssl.truststore.password: ${secrets:kafka/my-cluster-cluster-ca-cert:ca.password}
复制代码


在终端窗口中执行下面的命令应用这个文件注册 MySQL Connector:


kubectl apply -f kafka-connector.yaml -n kafkakafkaconnector.kafka.strimzi.io/debezium-connector-mysql created
复制代码


最后,所有的通信通道都被保护起来了。

演示

现在,我们有了一个与上一篇文章中介绍的同样的示例,但现在它更安全了。


我们用一个叫作 outbox-viewer 的 Quarkus 应用程序来测试它,它将 OutboxEvent 主题的所有内容打印到控制台。部署下面的 YAML 文件:


---apiVersion: v1kind: ServiceAccountmetadata: annotations:   app.quarkus.io/commit-id: ebe139afdc9f7f956725af5c5a92cf3c03486bca   app.quarkus.io/build-timestamp: 2022-08-23 - 11:14:36 +0000 labels:   app.kubernetes.io/name: outbox-viewer   app.kubernetes.io/version: 1.0.0-SNAPSHOT name: outbox-viewer namespace: kafka---apiVersion: v1kind: Servicemetadata: annotations:   app.quarkus.io/commit-id: ebe139afdc9f7f956725af5c5a92cf3c03486bca   app.quarkus.io/build-timestamp: 2022-08-23 - 11:14:36 +0000 labels:   app.kubernetes.io/name: outbox-viewer   app.kubernetes.io/version: 1.0.0-SNAPSHOT name: outbox-viewer namespace: kafkaspec: ports:   - name: http     port: 80     targetPort: 8080 selector:   app.kubernetes.io/name: outbox-viewer   app.kubernetes.io/version: 1.0.0-SNAPSHOT type: ClusterIP---apiVersion: rbac.authorization.k8s.io/v1kind: Rolemetadata: name: view-secrets namespace: kafkarules: - apiGroups:     - ""   resources:     - secrets   verbs:     - get---apiVersion: rbac.authorization.k8s.io/v1kind: RoleBindingmetadata: name: outbox-viewer-view namespace: kafkaroleRef: kind: ClusterRole apiGroup: rbac.authorization.k8s.io name: viewsubjects: - kind: ServiceAccount   name: outbox-viewer   namespace: kafka---apiVersion: rbac.authorization.k8s.io/v1kind: RoleBindingmetadata: name: outbox-viewer-view-secrets namespace: kafkaroleRef: kind: Role apiGroup: rbac.authorization.k8s.io name: view-secretssubjects: - kind: ServiceAccount   name: outbox-viewer   namespace: kafka---apiVersion: apps/v1kind: Deploymentmetadata: annotations:   app.quarkus.io/commit-id: ebe139afdc9f7f956725af5c5a92cf3c03486bca   app.quarkus.io/build-timestamp: 2022-08-23 - 11:14:36 +0000 labels:   app.kubernetes.io/name: outbox-viewer   app.kubernetes.io/version: 1.0.0-SNAPSHOT name: outbox-viewer namespace: kafkaspec: replicas: 1 selector:   matchLabels:     app.kubernetes.io/name: outbox-viewer     app.kubernetes.io/version: 1.0.0-SNAPSHOT template:   metadata:     annotations:       app.quarkus.io/commit-id: ebe139afdc9f7f956725af5c5a92cf3c03486bca       app.quarkus.io/build-timestamp: 2022-08-23 - 11:14:36 +0000     labels:       app.kubernetes.io/name: outbox-viewer       app.kubernetes.io/version: 1.0.0-SNAPSHOT     namespace: kafka   spec:     containers:       - env:           - name: KUBERNETES_NAMESPACE             valueFrom:               fieldRef:                 fieldPath: metadata.namespace         image: quay.io/lordofthejars/outbox-viewer:1.0.0-SNAPSHOT         imagePullPolicy: Always         name: outbox-viewer         ports:           - containerPort: 8080             name: http             protocol: TCP         volumeMounts:           - mountPath: /home/jboss/cluster             name: cluster-volume             readOnly: false           - mountPath: /home/jboss/user             name: user-volume             readOnly: false     serviceAccountName: outbox-viewer     volumes:       - name: cluster-volume         secret:           optional: false           secretName: my-cluster-cluster-ca-cert       - name: user-volume         secret:           optional: false           secretName: my-connect
复制代码


然后在终端窗口中可以看到应用程序 Pod 的日志。


kubectl logs outbox-viewer-684969f9f6-7snng -f
复制代码


将 Pod 名称替换为你的 Pod 的名称。


在终端中运行下面的命令查找 Movie Player Producer 应用程序的 IP 和端口:


minikube ip -p strimzi
192.168.59.106
复制代码


获取 movie-plays-producer-debezium 暴露的端口(第二个端口)。


kubectl get services -n kafka
movie-plays-producer-debezium LoadBalancer 10.100.117.203 <pending> 80:32460/TCP 67m
复制代码


向 Movie Play Producer 应用程序发送 curl 请求:


curl -X 'POST' \  'http://192.168.59.106:32460/movie' \  -H 'accept: application/json' \  -H 'Content-Type: application/json' \  -d '{  "name": "Minions: The Rise of Gru",  "director": "Kyle Balda",  "genre": "Animation"}'
复制代码


根据你的示例调整 IP 和端口。


最后,检查 outbox-viewer Pod 的输出,可以看到数据从数据库传输到 Kafka。


{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"bytes","optional":false,"field”,"aggregatetype":"Movie","aggregateid":"1","type":"MovieCreated","timestamp":1661339188708005,"payload":"{\"id\":1,\"name\":\"Minions: The Rise of Gru\",\"director\":\"Kyle Balda\",\"genre\":\"Animation\"}","tracingspancontext":null},"source":{"version":"1.9.4.Final","connector":"mysql","name":"mysql","ts_ms":1661339188000,"snapshot":"false","db":"moviesdb","sequence":null,"table":"OutboxEvent","server_id":1,"gtid":null,"file":"binlog.000002","pos":2967,"row":0,"thread":15,"query":null},"op":"c","ts_ms":1661339188768,"transaction":null}}
复制代码

Debezium Embedded

到目前为止,我们已经保护了应用程序和 MySQL 数据库、Debezium 服务器和 MySQL、Debezium 服务器和 Kafka 之间的通信。


你可能会想,如果使用部署在 Quarkus 应用程序中的 Debezium Embedded 而不是 Debezium Server 该怎么办?我们该如何配置 Kafka 连接使用 mTLS?


Quarkus 提供了两种连接 Kafka 的方式——Kafka客户端响应式消息客户端。我们来看一下在使用这两种方式时通过 mTLS 认证方法验证 Kafka 集群所需的属性。

KeyStore 和 TrustStore

要在客户端配置 mTLS,需要四样东西:


  • 建立 mTLS 连接所需的集群 TrustStore;

  • TrustStore 的密码;

  • 用于身份验证的 Kafka User KeyStore;

  • KeyStore 的密码。前两个元素保存在之前应用 Strimzi 资源时创建的 my-cluster-cluster-ca-cert Kubernetes Secret 中。要获取它们,在终端窗口中运行下面的命令:


kubectl get secret my-cluster-cluster-ca-cert -n kafka -o jsonpath='{.data.ca\.p12}' | base64 -d > mtls-cluster-ca.p12
复制代码


获取密码:


kubectl get secret my-cluster-cluster-ca-cert -n kafka -o jsonpath='{.data.ca\.password}' | base64 -dk2cckH0K5sOu
复制代码


后面的元素保存在 my-connect Kubernetes Secret 中。要获取它们,在终端窗口中运行下面的命令:


kubectl get secret my-connect -n kafka -o jsonpath='{.data.user\.p12}' | base64 -d > mtls-user.p12
复制代码


获取密码:


kubectl get secret my-connect -n kafka -o jsonpath='{.data.user\.password}' | base64 -dQDx6NMzk1QQE
复制代码


现在,设置 Quarkus Kafka 配置属性,使用前面的凭证进行 Kafka 集群身份认证:


%prod.kafka.ssl.truststore.location=mtls-cluster-ca.p12%prod.kafka.ssl.truststore.password=k2cckH0K5sOu%prod.kafka.ssl.truststore.type=PKCS12%prod.kafka.ssl.keystore.location=mtls-user.p12%prod.kafka.ssl.keystore.password=QDx6NMzk1QQE%prod.kafka.ssl.keystore.type=PKCS12%prod.kafka.security.protocol=SSL
%prod.mp.messaging.incoming.movies.ssl.truststore.location=mtls-cluster-ca.p12%prod.mp.messaging.incoming.movies.ssl.truststore.password=k2cckH0K5sOu%prod.mp.messaging.incoming.movies.ssl.truststore.type=PKCS12%prod.mp.messaging.incoming.movies.ssl.keystore.location=mtls-user.p12%prod.mp.messaging.incoming.movies.ssl.keystore.password=QDx6NMzk1QQE%prod.mp.messaging.incoming.movies.ssl.keystore.type=PKCS12%prod.mp.messaging.incoming.movies.security.protocol=SSL
复制代码


我们可以像使用 MySQL 凭证一样,用 Quarkus Kubernetes Config 扩展来直接注入凭证,但为了简化起见,我们没有这么做。


不过,在安全性方面,仍然有一个重要的缺失点:如何正确地在 YAML 文件中存储密钥,以及如何在 Kubernetes 集群中安全地保存密钥?

加密密钥

在本文开始时,我们使用 MySQL 凭证创建了一个 Kubernetes Secret 对象,但它是一个包含 Base64 编码的敏感信息的 YAML 文件,所以并不安全。这个 YAML 文件可能最终会保存在 Git 存储库中,任何有权访问存储库的人都可以使用这些密钥。在下一节中,我们将解决这个问题。

Sealed Secrets

Sealed Secrets是一个 Kubernetes 控制器,允许在客户端(本地机器)加密 Kubernetes Secrets 资源,并在应用后在 Kubernetes 集群内解密它们。



Sealed Secrets 需要用到两个组件,第一个是用于加密密钥的 kubeseal CLI 工具。


要安装 kubeseal,请根据你的操作系统从这个链接下载软件包。


第二个是 kubeseal Kubernetes 控制器。在命令行中执行下面的命令来安装它:


kubectl apply -f https://github.com/bitnami-labs/sealed-secrets/releases/download/v0.18.1/controller.yaml -n kube-system
role.rbac.authorization.k8s.io/sealed-secrets-service-proxier createdclusterrole.rbac.authorization.k8s.io/secrets-unsealer createddeployment.apps/sealed-secrets-controller createdcustomresourcedefinition.apiextensions.k8s.io/sealedsecrets.bitnami.com createdservice/sealed-secrets-controller createdrole.rbac.authorization.k8s.io/sealed-secrets-key-admin createdclusterrolebinding.rbac.authorization.k8s.io/sealed-secrets-controller createdserviceaccount/sealed-secrets-controller createdrolebinding.rbac.authorization.k8s.io/sealed-secrets-service-proxier createdrolebinding.rbac.authorization.k8s.io/sealed-secrets-controller created
复制代码


运行下面的命令检查控制器是否正确部署并运行:


kubectl  get pods -n kube-system
sealed-secrets-controller-554d94cb68-xr6mw 1/1 Running 0 8m46s
复制代码


在那之后,我们可以基于 mysql-secret.yaml 文件使用 kubeseal 工具自动创建一个新的 Kubernetes 资源 SealedSecret,其中数据字段是加密的。


kubeseal -n kube -o yaml <mysql-secret.yaml > mysql-secret-encrypted.yaml
复制代码


生成的新文件叫作 mysql-secret-encrypted.yaml,其中每个密钥的值都经过加密:


apiVersion: bitnami.com/v1alpha1kind: SealedSecretmetadata: creationTimestamp: null name: mysqlsecret namespace: kubespec: encryptedData:   mysqlpassword: AgBl721mnowwPlC35FfO26zP0   mysqlrootpassword: AgAKl1tWV8hahn00yGS4ucs   mysqluser: AgCWrWFl1/LcStemplate:   data: null   metadata:     creationTimestamp: null     name: mysqlsecret     namespace: kafka   type: Opaque
复制代码


现在,你可以安全地删除 mysql-secret.yaml 文件,因为我们不再需要它了。


像应用其他 Kubernetes 资源文件一样应用加密的资源,Sealed Secrets Kubernetes 控制器将解密并将其作为正常的密钥保存在 Kubernetes 中。


你可以通过下面的命令验证 Secret:


kubectl  get secret mysqlsecret -n kafka -o yaml
apiVersion: v1data: mysqlpassword: YWxleA== mysqlrootpassword: YWxleA== mysqluser: YWxleA==kind: Secretmetadata: creationTimestamp: "2022-08-21T19:05:21Z" name: mysqlsecret namespace: kafka ownerReferences: - apiVersion: bitnami.com/v1alpha1 controller: true kind: SealedSecret name: mysqlsecret uid: 2a5ee74b-c2b2-49b3-9a9f-877e7a77b163 resourceVersion: "41514" uid: 494cbe8b-7480-4ebd-9cc5-6fe396795eaatype: Opaque
复制代码


需要注意的是,这是一个解密的 Kubernetes Secret,引用了负责创建它的 SealedSecret。因此,SealedSecret 的生命周期也与 Secret 紧密相关。


我们已经解决了正确存储 YAML 文件而不泄露敏感数据的问题,但是当 Secret 被应用到 Kubernetes 集群,它是以 Base64 编码格式存储的,所以它不是密钥。

静态密钥

默认情况下,Kubernetes 不会在 etcd 数据库中存储加密的密钥。静态加密密钥数据是一个很大的话题,值得专门为其写一篇文章(事实上,“Kubernetes Secret Management”一书专门讨论了这个话题)。每一种 Kubernetes 实现都有可能使用不同的方式来启用静态密钥加密,尽管最后都是一个被复制到每个 kube-apiserver 节点中的配置文件(EncryptionConfiguration)。


该文件的格式为:


apiVersion: apiserver.config.k8s.io/v1kind: EncryptionConfigurationresources: - resources:     - secrets   providers:     - identity: {}     - aesgcm:         keys:           - name: key1             secret: c2VjcmV0IGlzIHNlY3VyZQ==           - name: key2             secret: dGhpcyBpcyBwYXNzd29yZA==     - aescbc:         keys:           - name: key1             secret: c2VjcmV0IGlzIHNlY3VyZQ==           - name: key2             secret: dGhpcyBpcyBwYXNzd29yZA==     - secretbox:         keys:           - name: key1             secret: YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXoxMjM0NTY=
复制代码


在下图中,我们可以看到在 kube-apiserver 中注册 EncryptionConfiguration 文件的流程。



现在,我们已经使用 SealedSecrets 对象来加密 YAML 文件中的密钥,还使用 EncryptionConfiguration 文件来保护静态密钥。

结论

保护好所有的基础设施是一件很重要的事情,我们已经在本文中学习了如何使用 Kubernetes Secrets 来保护对数据库和 Kafka 的访问。


我们不仅可以使用 Strimzi 定义身份验证,还可以定义授权,提供一些规则,规定谁可以对 Kafka 主题做什么。


访问这些密钥也是一个重要的部分,Quarkus 和 Debezium 允许你以一种高效而安全的方式访问这些密钥,而不需要将密钥持久化在文件系统中(或作为环境变量),而是直接将它们注入内存。


安全性是一个重要的话题,当需要在 Kafka 集群中管理安全性时,Strimzi 是一个完美的选择。


源代码可以在GitHub上找到。


作者简介:

Alex Soto 是 Red Hat 的开发者体验总监。他对 Java、软件自动化充满了热情,并且深信开源软件模式。Soto 是“Testing Java Microservices”(Manning)和“Quarkus Cookbook”(O'Reilly)的合著者,也是几个开源项目的贡献者。自 2017 年以来,他获得了 Java Champion 称号,也是萨尔 Universidad Ramon Llull 大学的国际演讲者和教师。如果你想继续关注 Kubernetes 和 Java 的动态,可以在 Twitter 上关注他(https://twitter.com/alexsotob)。


原文链接

https://www.infoq.com/articles/secure-kafka-cluster-strimzi/

2023-03-15 08:009189

评论

发布
暂无评论
发现更多内容

深入理解MVCC与间隙锁

林一

MySQL MVCC

关于价值、目标、任务的思考

L3C老司机

商务部CECBC区块链专委会副主任、数字经济商学院院长吴桐:建立完善稳健的基础设施 加速区块链与产业深度融合

CECBC

区块链

优雅编码 | 18个Javascript代码的小技巧

devpoint

代码优化 优雅

「产品经理训练营」作业02:利益相关方识别

狷介

产品经理训练营

虚拟币钱包APP系统开发|虚拟币钱包软件开发

系统开发

「产品经理训练营」第二章作业

Sòrγy_じò ぴé

产品经理训练营 极客大学产品经理训练营 产品训练营

人民日报——大力发展数字经济

CECBC

数字经济

架构师训练营 4 期 第4周

引花眠

架构师训练营 4 期

产品 0 期 - 第二周作业

Jxin

算法:匹配有效的括号,Swift 5中UITest从入门到精通, Swift 5 Viper Template,极客大学产品经理训练营 产品思维和产品意识, John 易筋 ARTS 打卡 Week 36

John(易筋)

ARTS 打卡计划 极客大学产品经理训练营 Swift 5 UITest Swift 5 Viper Template

企业是如何选择技术栈来做离线数仓

大数据老哥

C++静态链接符号冲突的几种处理方法

ElvinYang

能源革命背后的牛公司 (28天写作 Day16/28)

mtfelix

28天写作 能源革命

“区块链+产业应用”系列研讨会首场“大健康产业篇”在深圳举行

CECBC

健康产业

一文带你读懂:设计模式的六大原则

后台技术汇

28天写作

重学JS | ES6既有Set,为啥还要有Weak Set?

梁龙先森

JavaScript 大前端 编程语言 28天写作

Dockerfile ENV 使用指南

K8sCat

Docker Dockerfile ENV ARG

Nginx架构赏析

旺旺

nginx 架构 中间件

高效学习:如何学得更快更好

石云升

学习 28天写作

自动泊车初步了解 (28天写作 Day17/28)

mtfelix

自动驾驶 28天写作 自动泊车

区块链交易所APP开发|区块链交易所系统软件开发

系统开发

「架构师训练营 4 期」 第四周 - 001

凯迪

五分钟学会模板模式

田维常

mybatis

第4周课后练习-系统架构

潘涛

架构师训练营 4 期

Scrum Patterns:Sprint计划会(译)

Bruce Talk

敏捷 译文 Agile Scrum Patterns

第4周总结-系统架构

潘涛

架构师训练营 4 期

CSS(二)——CSS核心基础

程序员的时光

CSS 程序员 七日更 28天写作

老师讲的真棒!2021Android精选面试实战总结整理,分享PDF高清版

欢喜学安卓

android 程序员 面试 移动开发

第二次作业

秦挺

泪目!为什么Flutter能最好地改变移动开发?成功收获美团,小米安卓offer

欢喜学安卓

android 程序员 面试 移动开发

使用Strimzi提高Kafka集群的安全性_架构_Alex Soto_InfoQ精选文章