中国民生银行大数据团队 Kafka1.X 管控实践

阅读数:1206 2019 年 10 月 16 日 08:00

中国民生银行大数据团队Kafka1.X管控实践

一、前述

我行自 2016 年开始使用 Kafka,主要用于两大类应用:一是用于在应用间构建实时的数据流通道,二是用于构建传输或处理数据流的实时流式应用。Kafka 自身没有监控管理页面,我们调研并对比了市面主要的 Kafka 管理工具,得到的结论是各个产品有自身的设计局限性,并且开源产品缺乏稳定性,因此在满足实际需求的前提下,集合众多产品的优点基础上,设计并研发了一套适合通用的 Kafka 监控管理平台。以下是调研的市面主要的 Kafka 管理工具:

工具 优点 缺点
Kafka Manager 功能较全面,有集群管理,topic 管理,consumer offset 和 lag 监控,支持 JMX 监控 无 ACL 功能,部署较为复杂
Kafka-monitor 实时监控服务的可用性、消息丢失率、延迟率 偏重监控,无集群管理
KafkaOffsetMonitor 监控 offset,并可存储历史 offset;jar 包部署,较方便 功能单一,无 topic 管理,集群管理
Confluent Control Center 监控集群、数据流程,管理 Kafka Connect 和 topic 开源版功能较少

二、民生 Kafka PAAS 介绍

Kafka Paas 平台是我行自研的一套平台,提供 Kafka 接入和 Kafka 运维管控解决方案,集成集群监控管理、Topic 管理、Consumer Group 管理、Schema Registry 管理和 ZooKeeper 管理功能,帮助 Kafka 运维人员和接入人员快速使用和管控 Kafka 集群。该平台提供了基于 Kafka 0.10.X 和 1.1.X 版本的 REST API,在第三部分我们会详细介绍。以下是部分功能截图展示:

中国民生银行大数据团队Kafka1.X管控实践

中国民生银行大数据团队Kafka1.X管控实践

中国民生银行大数据团队Kafka1.X管控实践

中国民生银行大数据团队Kafka1.X管控实践

中国民生银行大数据团队Kafka1.X管控实践

中国民生银行大数据团队Kafka1.X管控实践

中国民生银行大数据团队Kafka1.X管控实践

三、开放的 Kafka REST API

该 API 分为 Kafka 管理、ZooKeeper 管理、SchemaRegistry 管理、JMX Metric 采集、用户管理等几大功能,接下来将分别介绍该 API 的主要功能:

3.1 Kafka 管理

这是 API 的核心功能,包括 Cluster/Broker 管理、Topic 管理、Consumer 管理三大功能,以下分别介绍:

A. Cluster/Broker 管理

Cluster/Broker 管理包含以下功能:

  • (1) 查看集群的 id、controller 和集群中每个 broker 信息 ;
  • (2) 查看 broker 的信息,包括 id,端口号,在 zk 节点注册时间、JMX 端口、endpoints、listener security 等信息;
  • (3) 查看 broker 上日志目录信息,根据 broker、topic 信息查找副本所在的日志目录;
  • (4) 查看 broker 的配置信息和动态配置信息,对于动态配置信息可以修改和删除。这部分主要是利用 clients 包里的 KafkaAdminClient 实现的,KafkaAdminClient 是 0.11.0.0 版本开始提供的管理类,是用来替代之前 core 包里 AdminUtils,AdminClient,ZKUtils 等功能,但是这个类功能目前还不完善,比如没有管理 consumer 相关的方法,下面列举查看 cluster 的信息为例说明 KafkaAdminClient 的使用:
复制代码
DescribeClusterOptions describeClusterOptions =
new DescribeClusterOptions().timeoutMs((int) kafkaAdminClientGetTimeoutMs);
DescribeClusterResult describeClusterResult =
kafkaAdminClient.describeCluster(describeClusterOptions);
KafkaFuture<String> clusterIdFuture = describeClusterResult.clusterId();
KafkaFuture<Node> controllerFuture = describeClusterResult.controller();
KafkaFuture<Collection<Node>> nodesFuture = describeClusterResult.nodes();
String clusterId = "";
Node controller = null;
Collection<Node> nodes = new ArrayList<>();
try {
clusterId = clusterIdFuture.get(kafkaAdminClientGetTimeoutMs, TimeUnit.MILLISECONDS);
controller = controllerFuture.get(kafkaAdminClientGetTimeoutMs, TimeUnit.MILLISECONDS);
nodes = nodesFuture.get(kafkaAdminClientGetTimeoutMs, TimeUnit.MILLISECONDS);
} catch (Exception exception) {
log.warn("Describe cluster exception:" + exception);
throw new ApiException("Describe cluster exception:" + exception);
} finally {
if (clusterIdFuture.isDone() && !clusterIdFuture.isCompletedExceptionally()) {
// clusterDetail.put("clusterId", clusterId);
clusterInfo.setClusterId(clusterId);
}
if (controllerFuture.isDone() && !controllerFuture.isCompletedExceptionally()) {
// clusterDetail.put("controllerId", controller);
clusterInfo.setController(controller);
}
if (nodesFuture.isDone() && !nodesFuture.isCompletedExceptionally()) {
// clusterDetail.put("nodes", nodes);
clusterInfo.setNodes(nodes);
}
}

除了用到 KafkaAdminClient,broker 的 JMX 端口,在 zk 注册的时间等这些需要获取 zk 上 /brokers/ids 的信息,其实在 KafkaZkClient 里有这类似的实现方法,但是返回的 Broker 信息里只有 id,endpoints,rack 信息,没有 timestamp,jmxPort,endpoints 这些信息,因此我们使用 ZooKeeper 的 CuratorClient 获取该路径的数据,然后通过反序列化读取全部数据,实现细节截取如下:

复制代码
String brokerInfoStr = null;
try {
// TODO replace zkClient with kafkaZKClient
brokerInfoStr =
new String(
zkClient
.getData()
.forPath(ZkUtils.BrokerIdsPath() + "/" + entry.getKey()));
} catch (Exception e) {
e.printStackTrace();
}
BrokerInfo brokerInfo;
try {
ObjectMapper objectMapper = new ObjectMapper();
brokerInfo = objectMapper.readValue(brokerInfoStr, BrokerInfo.class);
} catch (Exception exception) {
throw new ApiException("List broker exception." + exception);
}

需要注意的是不同版本的 Broker 在 zk 上注册的字段信息不同,字段信息可通过版本区分,在 0.10.0.1 版本是 3,在 1.1.1 版本是 4,以下是 version=3 和 4 的一个示例,在 version 4 里多一个字段 listenersecurityprotocol_map,详细可参考类 ZkData.scala:

复制代码
* Version 3 JSON schema for a broker is:
* {
* "version":3,
* "host":"localhost",
* "port":9092,
* "jmx_port":9999,
* "timestamp":"2233345666",
* "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"],
* "rack":"dc1"
* }
*
* Version 4 (current) JSON schema for a broker is:
* {
* "version":4,
* "host":"localhost",
* "port":9092,
* "jmx_port":9999,
* "timestamp":"2233345666",
* "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
* "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"},
* "rack":"dc1"
* }

另外 KafkaAdminClient 有获取 broker 配置信息的方法 describeConfigs,但是该方法获取不到 broker 动态的配置信息,在重分区的时候,可以设置限速,但是该方法获取不到限速的值(值为 null),因此我们改进了获取 broker 配置信息的方法,将 describeConfigs 方法获取的配置和从 /config/brokers/id 上获取的信息进行 merge,形成最全最准确的配置信息。

B. Topic 管理

Topic 管理包含以下功能:

  • (1)Topic 的查看、创建、删除 (删除是异步的)、配置查看 / 修改、schema 查看;KafkaAdminClient 提供了 listTopics()、describeTopics()、createTopics()、deleteTopics() 等 topic 管理方法,配置的查看修改部分是利用 describeConfigs()、alterConfigs() 实现,和 Broker 配置管理类似,需要整合从 zookeeper 上获取的 topic 动态配置信息。schema 的查看是根据 topic 名称和 schema 名称的对应关系获取。
  • (2) 分区查看,分区重分布生成计划、执行和检查(支持跨 broker 和 broker 内重分布,带限速功能) 分区信息查看,包括查看分区 id、分区的 leader、副本分本、ISR、开始 offset、结束 offset、消息数,前 4 个字段信息是通过 KafkaAdminClient 的 describeTopics() 实现,开始 offset 和结束 offset 是通过 KafkaConsumer 的 beginningOffsets() 和 endoffsets() 方法获取,消息数是这两个值的差值。在 kafka 中有脚本 kafka-reassign-partitions 提供了分区重分布的功能,该脚本可用于增加分区、增加副本、分区迁移,在 1.1 版本开始支持不同路径间的迁移;增加分区是采用 KafkaAdminClient.createPartitions() 方法实现的,其他功能该客户端还未实现,我们是调用 ReassignPartitionsCommand.scala 类实现的,Scala 被编译为 Java 字节码,运行在 JVM 之上,因此在 Java 里可以直接调用 Scala,对于 Java 和 Scala 之间类型转换,使用 Scala 提供的 JavaConverters.scala,以下生成分区重分布计划的代码,里面涉及到了 Java 和 Scala 对象的转化:
复制代码
// Return <Current partition replica assignment, Proposed partition reassignment>
public List<ReassignModel> generateReassignPartition(ReassignWrapper reassignWrapper) {
KafkaZkClient kafkaZkClient = zookeeperUtils.getKafkaZkClient();
List<ReassignModel> result = new ArrayList<>();
Seq brokerSeq =
JavaConverters.asScalaBufferConverter(reassignWrapper.getBrokers()).asScala().toSeq();
// <Proposed partition reassignment,Current partition replica assignment>
Tuple2 resultTuple2;
try {
resultTuple2 =
ReassignPartitionsCommand.generateAssignment(
kafkaZkClient, brokerSeq, reassignWrapper.generateReassignJsonString(), false);
} catch (Exception exception) {
throw new ApiException("Generate reassign plan exception." + exception);
}
HashMap<TopicPartitionReplica, String> emptyMap = new HashMap<>();
ObjectMapper objectMapper = new ObjectMapper();
try {
result.add(
objectMapper.readValue(
ReassignPartitionsCommand.formatAsReassignmentJson(
(scala.collection.Map<TopicPartition, Seq<Object>>) resultTuple2._2(),
JavaConverters.mapAsScalaMapConverter(emptyMap).asScala()),
ReassignModel.class));
result.add(
objectMapper.readValue(
ReassignPartitionsCommand.formatAsReassignmentJson(
(scala.collection.Map<TopicPartition, Seq<Object>>) resultTuple2._1(),
JavaConverters.mapAsScalaMapConverter(emptyMap).asScala()),
ReassignModel.class));
Collections.sort(result.get(0).getPartitions());
Collections.sort(result.get(1).getPartitions());
} catch (Exception exception) {
throw new ApiException("Generate reassign plan exception." + exception);
}
return result;
}
  • (3) 消息查看,消息查看支持根据 offset 和 timestamp 两种查询方式,对于消息中的 key 和 value 支持 StringDeserializer、ByteArrayDeserializer、FloatDeserializer、DoubleDeserializer、KafkaAvroDeserializer 等方式进行反序列化解码,实现的原理是根据传入的 Key 和 Value 的解码方式创建 consumer,根据反序列化类型的不同,我们分两种情况:a. 如果解码方式是 ByteArrayDeserializer 或者是 KafkaAvroDeserializer,拉取消息返回的类型是 ConsumerRecords,这里需要注意的是 Producer 端如果是用 KafkaAvroSerializer 编码方式发送的消息,那么每条消息里从第 6 个字节(第 1 个字节是 MAGIC_BYTE,第 2-6 字节是一个整数,是 schema 的 ID)才是真正数据,部分代码细节如下:
复制代码
private Object avroDeserialize(byte[] bytes, String avroSchema, boolean isInSchemaRegistry) {
Schema schema = new Schema.Parser().parse(avroSchema);
DatumReader reader = new GenericDatumReader<GenericRecord>(schema);
ByteBuffer buffer = ByteBuffer.wrap(bytes);
Object object = null;
if (isInSchemaRegistry) {
try {
// 这里利用 confluent 提供的 io.confluent.kafka.serializers.KafkaAvroDeserializer 进行解码
object = confluentSchemaService.deserializeBytesToObject("", bytes, schema);
} catch (SerializationException serializationException) {
throw new ApiException("Avro Deserialize exception. " + serializationException);
}
} else {
try {
object =
reader.read(
null,
DecoderFactory.get().binaryDecoder(buffer.array(), 0, bytes.length, null));
} catch (IOException exception) {
throw new ApiException("Avro Deserialize exception. " + exception);
}
}
return object;
}

b. 其他方式下,拉取消息时返回的 ConsumerRecords,然后根据 Java 的反射机制判断每条 ConsumerRecord 内数据的类型,然后转化成 String 类型

这部分代码细节请参考:

复制代码
ConsumerRecords<Object, Object> crs = consumer.poll(timeoutMs);
if (crs.count() != 0) {
Iterator<ConsumerRecord<Object, Object>> it = crs.iterator();
while (it.hasNext()) {
Record record = Record.builder().topic(topic).keyDecoder(keyDecoder)
.valueDecoder(valueDecoder).build();
ConsumerRecord<Object, Object> initCr = it.next();
record.setOffset(initCr.offset());
record.setTimestamp(initCr.timestamp());
record.setKey(initCr.key());
record.setValue(initCr.value());
recordList.add(record);
}
}
public String getValueByDecoder(String decoder, Object value) {
if (value == null) return null;
Class<?> type = KafkaUtils.DESERIALIZER_TYPE_MAP.get(decoder);
try {
if (String.class.isAssignableFrom(type)) {
return value.toString();
}
if (Short.class.isAssignableFrom(type)) {
return value.toString();
}
...
if (Bytes.class.isAssignableFrom(type)) {
Bytes bytes = (Bytes) value;
return bytes.toString();
}
if (byte[].class.isAssignableFrom(type)) {
if (decoder.contains("AvroDeserializer")) {
return value.toString();
} else {
byte[] byteArray = (byte[]) value;
return new String(byteArray);
}
}
if (ByteBuffer.class.isAssignableFrom(type)) {
ByteBuffer byteBuffer = (ByteBuffer) value;
return new String(byteBuffer.array());
}
}
...

C. Consumer 管理

Consumer 管理包含以下功能:

  • (1)new/old Consumer 的查看,包括 state、组内 consumer 分配策略、group 所在协调节点、组内所有 consumer 成员信息、consumer 消费的 topic 信息和消费位移 /lag 信息,另外可以根据 consumer 名称 /topic 名称查询消费 consumer 信息和消费位移 /lag 信息

这部分实现是利用 kafka.admin.AdminClient 实现的,describeConsumerGroup 方法可以获取 state、组内 consumer 分配策略、协调节点和组内 consumer 信息,关于获取消费的位移 /lag 信息,利用 adminclient.listGroupOffsets() 方法获取消费过的所有分区信息和位移信息,

再结合 describeConsumerGroups() 获取到的 Consumer 分配信息获取 consumer 的消费位移 /lag 信息,注意的是 listGroupOffsets 中获取的分区信息可能比 describeConsumerGroup() 获取到的 ConsumerSummary 里分区的总和,也就是存在没有 consumer 的消费信息,这些分区对应的 consumerId,clientId,host 用“-”代替,此处是参考 ConsumerGroupCommand.scala 类里的 collectGroupOffsets() 方法实现

  • (2) 重置 consumer 的 offset,支持重置到 earliest、latest、指定 offset,也可以重置到指定时间点的 offset

实现是创建一个同 group 下的 KafkaConsumer,然后用 assign() 方法指定消费分区,然后再利用 KafkaConsumer 提供的 seekToBeginning(),seekToEnd(),seek() 方法实现重置,如果需要重置到指定时间点,先用 offsetsForTimes() 方法获取不早于该时间戳的分区位移,然后再用 seek() 方法重置,需要注意的是为了不影响该 group 的消费位移,重置之前需要将处于 active 状态的 group 先停掉。

  • (3)new/old Consumer 删除

虽然之后 kafka 不再支持 old consumer,但是在我行 0.10.0.1 环境中,还是有使用 old consumer 的客户端,因此对于删除 old consumer 的功能仍保留,实现原理是删除 zookeeper 路径 /consumers/{groupName}的数据;对于 new consumer,使用 kafka.admin.AdminClient.deleteConsumerGroups() 实现。

3.2 Zookeeper 管理

ZooKeeper 管理包含以下功能:

(1) 查看 zk 的环境信息

实现原理是通过 zookeeper 的四字命令 envi 获取数据,然后解析。

(2) 查看服务器的详细信息:服务器的详细信息:接收 / 发送包数量、连接数、模式(leader/follower)、节点总数、延迟,以及所有客户端的列表

实现原理也是通过 zookeeper 的四字命令 stat 获取数据,然后解析,细节代码如下:

复制代码
public Map<HostAndPort, ZkServerStat> stat() {
List<HostAndPort> hostAndPortList = zookeeperUtils.getZookeeperConfig().getHostAndPort();
Map<HostAndPort, ZkServerStat> result = new HashMap<>();
for (int i = 0; i < hostAndPortList.size(); i++) {
HostAndPort hp = hostAndPortList.get(i);
try {
result.put(
hp,
zookeeperUtils.parseStatResult(
zookeeperUtils.executeCommand(
hp.getHostText(), hp.getPort(), ZkServerCommand.stat.toString())));
} catch (ServiceNotAvailableException serviceNotAvailbleException) {
log.warn(
"Execute "
+ ZkServerCommand.stat.toString()
+ " command failed. Exception:"
+ serviceNotAvailbleException);
result.put(
hp,
ZkServerStat.builder()
.mode(serviceNotAvailbleException.getServiceState())
.msg(serviceNotAvailbleException.getMessage())
.build());
}
}
return result;
}

(3) 查看指定路径下节点信息,获取指定路径的节点数据

实现效果和 ls/get 命令一样,是通过 curatorClient 提供的 getChildren() 方和 getData() 方法实现。

3.3 Schema Registry 管理

在我行实际使用过程中,Kafka 的数据来源之一是 CDC(Change Data Capture),这些数据是 Avro 格式的,且包含 schema,因此我们部署了 confluent 版本 Schema Registry,Schema Registry 服务在 CDC 同步架构中作为 Avro Schema 的存储库。Confluent 本身也提供了 RESTful 的接口,为了将这些管理功能集成,在我们的 API 中封装了主要接口,包括以下功能:(1) 获取所有 subjects,在原有的接口上进行扩展,不仅获取了 subject 名称,还获取了最新的 schema 的 Id,版本,schema 内容,代码细节如下:

复制代码
public List<SchemaRegistryMetadata> getAllSubjects() {
try {
Collection<String> subjects = this.schemaRegistryClient.getAllSubjects();
List<SchemaRegistryMetadata> allSubjects = new ArrayList<>();
SchemaMetadata schemaMetadata;
SchemaRegistryMetadata schemaRegistryMetadata;
for (String subject : subjects) {
schemaMetadata = schemaRegistryClient.getLatestSchemaMetadata(subject);
schemaRegistryMetadata = SchemaRegistryMetadata.builder().subject(subject)
.id(schemaMetadata.getId())
.version(schemaMetadata.getVersion()).schema(schemaMetadata.getSchema()).build();
allSubjects.add(schemaRegistryMetadata);
}
return allSubjects;
} catch (Exception exception) {
throw new ApiException("ConfluentSchemaService getAllSubjects exception : " + exception);
}
}

另外提供获取指定 subject 的 shema 所有的版本信息,删除指定的 subject

(2) 注册 schema;根据 schemaId 获取 shema 信息,在原有的接口上进行了扩展,不仅获取了 shema 的详细内容,还能获取对应的 subject 名称,版本信息;根据 subject 名称和 schema 版本获取 schema 信息

3.4 User 管理和 JMX metric 采集

由于该 API 是管理 Kafka、ZooKeeper、Schema Registry 等众多产品的 API,且 API 操作中有很多修改,删除等接口,为了增加接口的安全性,我们增加了身份认证(是否添加认证可通过参数配置),采用 HTTP basic 方式进行认证。实现上使用 Spring Boot 集成 Spring Security 方式,为了减少 API 对其他产品的依赖,我们将用户名和密码存储在文件 security.yml 中,同时启动一个线程定期 load 该文件的数据,添加用户后不需要重启应用程序。JMX metric 采集是可以根据传入的 jmx url 和筛选指标,从对应的 jmx 端口获取指标信息,由于后续行内有统一的 jmx 指标采集规划,因此该功能后续不再更新。

四、后续计划

随着业务的不断发展,我行使用 Kafka 的应用不断增多,Kafka 变得越来越重要。后续我们会重点进行 Kafka PaaS 平台建设,近期我们会先将 kafka 升级到 1.1.1,同时增加 ACL 权限管控,关于 ACL 的管控接口会逐步开发并开源出来,另外就是规划多集群的分级管理,欢迎各行各业的朋友和我们交流和联系。

作者介绍

文乔,工作于中国民生银行总行信息技术部大数据基础产品平台组,天眼日志平台主要参与人。

王健,民生银行信息科技部 dba ,2011 年加入民生银行,主要从事数据库和 kafka,数据复制等运维工作。

本文转载自公众号民生运维(ID:CMBCOP)

原文链接

https://mp.weixin.qq.com/s?__biz=MzUxNzEwOTUyMg==&mid=2247484934&idx=1&sn=63336e5fd0a55eada2d82f085ec84253&chksm=f99c647bceebed6d1b2de88aad8b7552e8cbe886f1d929e92d5e43f863dbd6931a465db9f4f4&scene=27#wechat_redirect

评论

发布
用户头像
kafkaeagle呢?
2019 年 10 月 16 日 08:25
回复
没有更多了