写点什么

Kafka-Clients 源码学习:KafkaProducer 篇

2019 年 10 月 22 日

Kafka-Clients源码学习:KafkaProducer篇

前言

本文基于 Kafka-clients:1.1.0 版本。


KafkaProducer 的发送流程

调用流程图


此图描述的是用户启动一个线程使用 KafkaProducer 进行消息发送的大概流程。除了用户主线程,KafkaProducer 在新建实例时还会启动一个设置为 daemon 的 Sender 线程。


user 线程

图中红线画出的用户主线程调用栈。当调用 send 方法发送 record 后会经历如下流程:


  1. 确认元数据可用

  2. 序列化

  3. 将数据 append 进缓存


下面由代码分析一下,为了清晰起见省略了部分内容。


确认元数据可用(waitOnMetadata)

    private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {        // 将topic放入metadata中,如果已存在就刷新过期时间        metadata.add(topic);        // 从内存中fetch cluster元数据        Cluster cluster = metadata.fetch();        Integer partitionsCount = cluster.partitionCountForTopic(topic); // 找到了直接return        if (partitionsCount != null && (partition == null || partition < partitionsCount))            return new ClusterAndWaitTime(cluster, 0);
long begin = time.milliseconds(); long remainingWaitMs = maxWaitMs; long elapsed; // 标识需要进行元数据更新,并且等待至更新完成或抛出超时异常 do { metadata.add(topic); int version = metadata.requestUpdate(); sender.wakeup(); try { metadata.awaitUpdate(version, remainingWaitMs); } catch (TimeoutException ex) { throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); } //省略部分超时、权限异常检测代码 ...... partitionsCount = cluster.partitionCountForTopic(topic); } while (partitionsCount == null);
if (partition != null && partition >= partitionsCount) { throw new KafkaException( String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount)); }
return new ClusterAndWaitTime(cluster, elapsed); }
复制代码


序列化

    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {        TopicPartition tp = null;        try {            //省略部分            //......            byte[] serializedKey;            try {                //序列化key                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());            } catch (ClassCastException cce) {                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +                        " specified in key.serializer", cce);            }            byte[] serializedValue;            try {                //序列化value                serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());            } catch (ClassCastException cce) {                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +                        " specified in value.serializer", cce);            }                        Header[] headers = record.headers().toArray();            int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers);            //估算序列化后大小,确保不会超过max_request_size或者total_memory_size            ensureValidRecordSize(serializedSize);        }        //省略部分        //......    }
复制代码


将数据 append 进缓存(RecordAccumulator.append)

消息会被 append 进 accumulator 缓存结构中的双端队列,下面用一张图解释一下这个缓存结构。消息会从对应双端队列的尾部 batch 被写入。在 Sender 线程中每次会从头部 batch 拉取。



    public RecordAppendResult append(TopicPartition tp,                                     long timestamp,                                     byte[] key,                                     byte[] value,                                     Header[] headers,                                     Callback callback,                                     long maxTimeToBlock) throws InterruptedException {        ByteBuffer buffer = null;        if (headers == null) headers = Record.EMPTY_HEADERS;        try {            //根据TopicPartition获得对应双端队列dq,没有则创建            Deque<ProducerBatch> dq = getOrCreateDeque(tp);            synchronized (dq) {                //尝试向已有dq中的last batch已写入此条record                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);                if (appendResult != null)                    return appendResult;            }
// dq中的last batch无法再添加record,于是申请一个新的batch,并且添加到dq的last // 从BufferPool中拿出一个buffer buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) { //两次获取dq锁的过程中可能有其它线程创建了可用的batch,如果tryAppend成功则直接返回 return appendResult; }
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds()); //将record append进刚刚新建的batch FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds())); // 将申请的batch放入dp last dq.addLast(batch); incomplete.add(batch);
// 避免在finally模块中释放这次申请的buffer buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true); } } finally { if (buffer != null) // 如果使用了其它线程申请的buffer则释放此线程申请的buffer free.deallocate(buffer); } }
复制代码


sender 线程

KafkaProducer 的构造函数中会启动一个新线程,用来跑 Sender 任务,所以这里我们将它称为 sender 线程,实际的线程名为"kafka-producer-network-thread | $clientId"。


Sender 类实现了 Runnable 接口,实现的 run()方法逻辑也很简单,一直循环下面几个步骤直到 close 被调用:


  1. 幂等或者事务 producer 逻辑(如果配置中有开启)。

  2. 从 RecordAccumulator 中获取 batch 构建 requests 发送到缓存。

  3. 进行一次 client.poll,进行真正的消息发送、接收处理。


幂等的相关实现后面会进行讲解,这里主要讲解步骤 2,3。


构建 Requests 发送到缓存 (sendProducerData&sendProducerRequest)

    private long sendProducerData(long now) {        //获取本地元数据        Cluster cluster = metadata.fetch();
// 获取存在待发送分区对应的node RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// 如果本地元数据中没找到对应leader信息,强制元数据更新 if (!result.unknownLeaderTopics.isEmpty()) { for (String topic : result.unknownLeaderTopics) this.metadata.add(topic); this.metadata.requestUpdate(); }
// 删除没有准备好发送的node Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); // 检查是否能向此node发送消息(这里会保证等待同一node的上次request send发送完成) if (!this.client.ready(node, now)) { iter.remove(); // 获得disconnected状态的node剩余backoff时间 notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } }
// 从Accumulator中获取准备发送的batchs Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); if (guaranteeMessageOrder) { // 如果max.inflight.requests = 1,则将待发送的partition锁住 for (List<ProducerBatch> batchList : batches.values()) { for (ProducerBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } } //从accumulator中拿出timeout的batch(max.inflight.requests > 1的情况下过期不保回调顺序) List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now); for (ProducerBatch expiredBatch : expiredBatches) { //回调timeout异常 failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException(), false); //...... }
// 如果存在可发送数据并且准备好发送的node,会以0的timeout来返回pollTimeout,否则此timeout会由当前无法发送消息的nodes决定(比如 lingering, backing off)。总的来说: // 1. 如果存在partition的数据准备好发送,pollTimeout=0 // 2. 另外如果存在partition有数据写入accumulator但是没有准备好发送,pollTimeout=min{当前时间与linger过期时间差值(重发的batch考虑retryBackoff差值), 当前时间与disconnected node的reconnectBackoffMs的时间差值} long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (!result.readyNodes.isEmpty()) { pollTimeout = 0; } //将batchs构建成request发送到缓存 sendProduceRequests(batches, now);
return pollTimeout; }
复制代码


    private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {        // 遍历batches,以node为单位发送request请求        for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())            sendProduceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue());    }

private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) { if (batches.isEmpty()) return;
Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size()); final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
//...... for (ProducerBatch batch : batches) { TopicPartition tp = batch.topicPartition; MemoryRecords records = batch.records(); //...... produceRecordsByPartition.put(tp, records); recordsByPartition.put(tp, batch); }
// 如果producer开启了事务属性则获取transactionId String transactionalId = null; if (transactionManager != null && transactionManager.isTransactional()) { transactionalId = transactionManager.transactionalId(); } // 用produceRecordsByPartition构建requestBuilder ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout, produceRecordsByPartition, transactionalId); // 用recordsByPartition构建回调类,client.poll收到response后会调用callback.onComplete方法。 RequestCompletionHandler callback = new RequestCompletionHandler() { public void onComplete(ClientResponse response) { handleProduceResponse(response, recordsByPartition, time.milliseconds()); } };
String nodeId = Integer.toString(destination); ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback); //调用client.send发送消息 client.send(clientRequest, now); log.trace("Sent produce request to {}: {}", nodeId, requestBuilder); }
复制代码


client.poll 处理

这个内容在上一篇写过,这里就不详细列 selector 的方法实现了,只需要关注 poll 方法,handleCompletedSends 生成已经发送成功并且不需要 ack 的请求对应 response,handCompletedReceives 生成等待收到回复的请求对应 response。最后统一由 completeResponses 进行处理。


public List<ClientResponse> poll(long timeout, long now) {    if (!abortedSends.isEmpty()) { // 如果存在abortedSends,直接处理,不经过selector的poll        List<ClientResponse> responses = new ArrayList<>();        handleAbortedSends(responses);        completeResponses(responses);        return responses;    }     //判断是否要更新元数据,如果需要的话发送更新请求(会选择inflight请求数量最少的ready node发送请求)    long metadataTimeout = metadataUpdater.maybeUpdate(now);    try {        // 完成一次selector poll,最大阻塞时间为timeout,metadataTimeout,requestTimeoutMs的最小值        this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));    } catch (IOException e) {        log.error("Unexpected error during I/O", e);    }     long updatedNow = this.time.milliseconds();    // 初始化response list    List<ClientResponse> responses = new ArrayList<>();    // 如果是不等待回复的请求则直接生成response放入list    handleCompletedSends(responses, updatedNow);    // 根据inFlightRequests的发送顺序生成已完成回复的response放入list    handleCompletedReceives(responses, updatedNow);    // 处理断开连接的请求,生成response放入list    handleDisconnections(responses, updatedNow);    // 更新connectionStates中node的连接状态    handleConnections();    // 需要的话发送api version获取请求    handleInitiateApiVersionRequests(updatedNow);    // 处理超时请求,生成response放入list    handleTimedOutRequests(responses, updatedNow);    // 完成所有response的处理    completeResponses(responses);     return responses;} private void completeResponses(List<ClientResponse> responses) {    for (ClientResponse response : responses) {        try {            // 调用response中的callback进行处理,producer端可以参考Sender类的handleProduceResponse方法            response.onComplete();        } catch (Exception e) {            log.error("Uncaught error in request completion:", e);        }    }}
复制代码


最终所有 response 都会在 completeResponses 中处理,而处理所调用的方法就是我们在构建 request 时传入的 callback.onComplete(response)。所以回到 Sender 类中的 handleProducerResponse 方法就可以看到 producer 收到 broker 回复后的处理逻辑了。


    private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, long now) {        //......        if (response.wasDisconnected()) {             // 处理连接断开batches            for (ProducerBatch batch : batches.values())                completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now);        } else if (response.versionMismatch() != null) {             // 处理协议版本不匹配            for (ProducerBatch batch : batches.values())                completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.UNSUPPORTED_VERSION), correlationId, now);        } else {            // 如果response有内容,则解析后completeBatch            if (response.hasResponse()) {                ProduceResponse produceResponse = (ProduceResponse) response.responseBody();                for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {                    TopicPartition tp = entry.getKey();                    ProduceResponse.PartitionResponse partResp = entry.getValue();                    ProducerBatch batch = batches.get(tp);                    completeBatch(batch, partResp, correlationId, now);                }            } else {                // 如果没有response,即ack = 0 时会走此处逻辑,直接completeBatch                for (ProducerBatch batch : batches.values()) {                    completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now);                }            }        }    }
复制代码


completeBatch 负责进行 batch 的 retry reenqueueBatch(如果是可重试),以及回调的触发 batch.done(不可重试或者 success)。


幂等 producer 的实现

设置 producer 的"enable.idempotence"=true 就开启了 producer 的幂等逻辑。这个配置是为了实现 producer 以精确一次的语义进行消息发送(只能保证单 producer 实例内的精确一次,如果存在 producer 的 failover 切换则无法保证)。开启幂等 producer 还额外要求配置 “max.in.flight.requests.per.connection” <= 5, “retries” > 0, 以及 “ack” = all。


TransactionManager 类负责幂等相关的实现,在 sender 发送具体消息之前会从 broker 端获取一个 producer id(PID),并且发送到 broker 的每批消息都会被赋予一个序列号用于消息去重。kafka 会把序列号与消息一起保存在底层日志中,这样即使分区的 leader 副本挂掉,新选出来的 leader broker 也能执行消息去重工作(所以幂等要求 ack=all,即所有 replicas 都写入才 ack)。


每对(PID, 分区号)都会有对应的递增 sequence,Broker 收到消息后,如果发现 sequence <= old sequence 便不会写入日志中,返回 client DUPLICATE_SEQUENCE_NUMBER 异常或者 OutOfOrderSequence 异常,client 会以发送成功的逻辑来处理此异常;如果 sequence = old sequence + 1,则正常写入并返回成功;如果 sequence > old sequence + 1,也会拒绝此次写入,并且返回 client OUT_OF_ORDER_SEQUENCE_NUMBER 异常,幂等 producer 将会按照 sequence 顺序 reenqueue 发送失败的消息,等待发送。


为什么说 max.in.flight.requests.per.connection <= 5 时都可以保证消息顺序

这里引用一段 KAFKA-5494 issue 的描述:


Currently, the idempotent producer (and hence transactional producer) requires max.in.flight.requests.per.connection=1.


This was due to simplifying the implementation on the client and server. With some additional work, we can satisfy the idempotent guarantees even with any number of in flight requests. The changes on the client be summarized as follows:


  1. We increment sequence numbers when batches are drained.

  2. If for some reason, a batch fails with a retriable error, we know that all future batches would fail with an out of order sequence exception.

  3. As such, the client should treat some OutOfOrderSequence errors as retriable. In particular, we should maintain the ‘last acked sequnece’. If the batch succeeding the last ack’d sequence has an OutOfOrderSequence, that is a fatal error. If a future batch fails with OutOfOrderSequence they should be reenqeued.

  4. With the changes above, the the producer queues should become priority queues ordered by the sequence numbers.

  5. The partition is not ready unless the front of the queue has the next expected sequence.


With the changes above, we would get the benefits of multiple inflights in normal cases. When there are failures, we automatically constrain to a single inflight until we get back in sequence.


With multiple inflights, we now have the possibility of getting duplicates for batches other than the last appended batch. In order to return the record metadata (including offset) of the duplicates inside the log, we would require a log scan at the tail to get the metadata at the tail. This can be optimized by caching the metadata for the last ‘n’ batches. For instance, if the default max.inflight is 5, we could cache the record metadata of the last 5 batches, and fall back to a scan if the duplicate is not within those 5.


最后说的很清楚,因为 kafka broker 当前实现会缓存 the last 5 batches 用于判断 duplicates,而在 client 端 TransactionManager 中的 inflightBatchesBySequence 也是初始化了缓存长度为 5 的 PriorityQueue。所以强制规定 max.in.flight.requests.connection 是出于效率上的考虑。


KafkaProducer 的重要配置

配置名含义
acks消息发送的确认机制。0:无需确认 1: 仅需partition leader确认 -1 or all:需要所有replicas确认
enable.idepotencetrue:幂等producer false:非幂等producer
retries设置遇到可重试异常时最多重试多少次
max.block.ms当buffer满了或者metadata不可用时最多阻塞多久(kafkaProducer.send() & KafkaProducer.partitionsFor())
buffer.memory可用用来缓存records的bytes大小,满了后会阻塞send,此配置不能表示KafkaProducer用的所有内存,还有一些内存用来维护inflight请求以及压缩
request.timeout.ms请求的超时时间。 在2.1.0版本前还表示消息的最大留存时间,2.1.0后此语义被delivery.timeout.ms取代
connections.max.idle.ms连接的最大idle时间
bootstrap.servers启动连接broker地址
max.in.flight.requests.per.connection同一个连接上的未确认请求的最大数量
batch.size默认的producer发送消息的batch大小
linger.ms在对应partition消息大小未达到batch.size时增加的latency,如果对于同一个broker有多个partition leader,则已最先需要发送的partition为准
max.request.size单个请求的最大bytes大小
compression.type是否压缩请求数据,提供none, gzip, snappy, lz4四种配置


还有很多配置项没有列出,kafka 给了用户非常多配置项,在进行 KafkaProducer 的配置前一定要先确定好自己的需求。比如说是否能容忍可能的消息丢失;是否需要严格保证消息的顺序;是否需要尽可能减少 Broker 端日志的重复数据;是否需要增加适当的消息延时(linger batch, compression)来提高吞吐等。


实践中的一些问题

保证同 partition 回调顺序而不是全局

我们在使用 kafka 时上游还有一个 binlog 解析器,为了确认消息整体不丢,解析器为的每一条消息分配了一个递增 id,并且在 ack 时会验证消息是否为期望的递增 id,如果不是就抛出异常。


因为我们只需要保证特定的 binlog 顺序(表级别),消息会被 hash 到不同分区上,因此 kafka 的回调不一定会按照全局的上游顺序被调用。而同步的 kafka send 效率又太低,所以我们实现了一个 ackBuffer 来解决这个问题。


MemoryCanalAckWithBuffer 实现

实现目标:


  1. 在拿到 Message 时将 MessageId 顺序放入 buffer,如果 buffer 满了阻塞等待信号。将 buffer 返回的 Sequence 传入 KafkaSend 的回调对象。

  2. 在 kafka 确认消费成功回调中,根据回调对象中的 Sequence 去 mark 标记 buffer 中对应位置的 id。(因为 MessageId 取值为[1, Long.MAX_VALUE], 所以标记的方式可以利用这个特点,将原 id 标记成负数-id)

  3. 在一个轮询线程中,从 buffer 中按顺序 get 元素出来,如果 buffer 是空的或者下一个元素还没有被标记阻塞等待信号。


对外接口
//获得buffer中的下一个元素,在buffer为空或者待取出id没有被标记过的时候阻塞(waiting for signal)long get() throws InterruptedException;//标记传入Sequence的idboolean mark(long markSequence) throws InterruptedException;//放入id,返回在buffer中的Sequence(可以计算出数组下标),buffer满时会被阻塞(waiting for signal)long put(long id) throws InterruptedException;  
void setBufferSize(int bufferSize); //设置BufferSizevoid start(); //初始化buffervoid stop(); //关闭buff,重置putSequence,getSequence,清除buffer
复制代码


  • Buffer 中维护的是一个 long 基本类型的数组,数组的长度默认走 lion 配置,程序也可以调用 set 接口设置。

  • Buffer 采用的是懒惰的启动方式,在构造函数中不会分配内存,调用了 start 方法后,会根据 bufferSize 的值,计算一个 2 的次幂作为 buffer 长度,初始化 buffer 数组。

  • 内部维护了一个 putSequence,一个 getSequence 来记录读写位置以及 buffer 已使用空间。

  • 内部维护了一个 ReentrantLock 来同步对数组的读写操作,3 个 Condition 对象来协调 put,get,mark 线程间的合作。


request.timeout.ms 配置有多重含义

在 kafka-client 1.1.0 版本开发 producer 的时候发现这个配置非常坑,正常来说 TimeoutException 是一个可重试异常,但是在实际代码中即使设置了 max retries 的情况下,还是可能抛出 TimeoutException 到 send callback。


原因在于 request.timeout.ms 不仅用于判断发送消息的超时,还会用来判断缓存在 RecordAccumulator 中的 batch 是否过期(类似于消息发送最大留存时间),一个配置项存在多重含义,配置灵活性低,对用户不直观,当前被迫将 request.timeout.ms 设置为 max 值。


用了个野路子,通过和 Broker 连接的 idle 来检测消息发送异常(超时),目前能很快感知到集群机器的异常并且切换 broker 重发。


最近看到 kafka 也修复了这个配置问题(fix version:2.1.0),通过增加 delivery.timeout.ms 来配置消息最长发送时间。



幂等 producer 开启的情况下出现整型溢出

TransactionManager 会根据消息发送的分区产生递增 sequence 并保存在一个 TopicPartition -> Integer 的 Map 当中。在我们 producer 长时间不发版的情况下,可能因为某个 partition(这和我们的发送顺序保证逻辑相关,某些表会频繁刷数据)累计发送数据特别多而使这个 Integer 溢出。


当前最新版本的 kafka 实现依然还是 Integer,感觉和 broker 端需要存储这个值到日志相关,想要省存储空间。现在的做法只能是在出现异常告警后或者定期的重启一轮 Producer,这个问题不会导致数据丢失或者乱序。


相关链接


https://issues.apache.org/jira/browse/KAFKA-5494


https://kafka.apache.org/documentation/


https://issues.apache.org/jira/browse/KAFKA-5886


https://issues.apache.org/jira/browse/KAFKA-4515


https://issues.apache.org/jira/browse/KAFKA-5621


作者:


王植田,笔名 zwangbo,就职于拼多多,公众号是 zwangbo 的技术杂文,热爱技术和分享,欢迎交流。


2019 年 10 月 22 日 13:551597

评论

发布
暂无评论
发现更多内容

智慧警务可视化平台开发,重点人员管控系统搭建

t13823115967

智慧公安 智慧警务系统开发

4年Java开发经验裸辞之后闭关修炼2个多月,成功拿下美团、京东、字节跳动(Java架构师)offer。

Java成神之路

Java 程序员 架构 面试 编程语言

我和阿里P7差的不是薪资?而是Redis+微服务+Nginx+MySQL+Tomcat

Java架构之路

Java 程序员 架构 面试 编程语言

一文为你详解Unique SQL原理和应用

华为云开发者社区

数据库 sql unique

朋友突然从某度外包人员摇身一变成为大厂架构师,在我的死缠烂打下他说出了自己的秘密武器。

Java成神之路

Java 程序员 架构 面试 编程语言

80%Java开发者面试都问的SpringBoot你竟不会?看完这些笔记足以

Java架构之路

Java 程序员 架构 面试 编程语言

倾斜摄影实景三维在智慧工厂 Web 3D GIS 数字孪生应用

一只数据鲸鱼

GIS 数字化 数据可视化 3D渲染 数字工厂

dubbogo 3.0:牵手 gRPC 走向云原生时代

阿里巴巴云原生

go gRPC 云原生 中间件 dubbo-go

IDEA 文档插件 DocView 版本更新:修改 UI 并支持 IDEA 2020.3 !

程序员小航

idea插件 IntelliJ IDEA 文档生成

从 JMM 透析 volatile 与 synchronized 原理

码哥字节

volatile JVM JMM Java 25 周年 synchronized

五年开发经验裸辞之后投递简历,收到阿里面试邀请四面成功斩获offer,特分享本次阿里面经希望对大家有所帮助。

Java成神之路

Java 程序员 架构 面试 编程语言

一口气说出四种幂等性解决方案,面试官露出了姨母笑~

不才陈某

Java 分布式 接口

解析字节算法面试真题,深入探究ArrayList应用原理

小Q

Java 学习 编程 架构 面试

面试被问高并发一脸懵?那是你没看过我整理得高并发回答模板

小Q

Java 学习 面试 高并发 性能调优

耗时一个月整理的97道大厂Java核心面试题出炉,精心整理,无偿分享

Java架构之路

Java 程序员 架构 面试 编程语言

Java进阶文档:彻底搞懂JVM+Linux+MySQL+Netty+Tomcat+并发编程

Java架构之路

Java 程序员 架构 面试 编程语言

太赞了!滴滴开源了一套分布式ID的生成系统...

Java架构师迁哥

熟练掌握Spring Cloud已然成为Java工程师的面试门槛,简历上没写熟悉掌握微服务连面试机会都难得!

Java成神之路

Java 程序员 架构 面试 编程语言

什么是全场景AI计算框架MindSpore?

华为云开发者社区

人工智能 AI mindspore

为了SpringBoot提交Tomcat执行,我总结了这么多

小Q

tomcat 学习 面试 微服务 springboot

云图说 | 云上资源管控有神器!关于IAM,你想知道的都在这里!

华为云开发者社区

服务 权限管理 iam

将原则纳入到架构的生命中

soolaugust

架构 思考 设计

好久不见!这份Spring全家桶、Docker、Redis架构大礼包免费赠送

Java架构之路

Java 程序员 架构 面试 编程语言

VACUUM无法从表中删除死元组的三个原因

PostgreSQLChina

数据库 postgresql

产品策略闭环是个什么环?

万事ONES

项目管理 团队协作 需求管理 需求分析 产品策略

一文带你彻底了解大数据处理引擎Flink内存管理

华为云开发者社区

大数据 数据 处理

《码出高效:Java 开发手册》“码” 出高效的同时编写出高质量的代“码”。PDF文档资料免费开放下载!

Java成神之路

Java 程序员 架构 面试 编程语言

即构实时音视频多中心调度设计

ZEGO即构

腾讯高工强烈推荐的“Netty速成手册”原理+应用+调优,带你将知识点一网打尽

比伯

Java 编程 程序员 架构 Netty

硬肝到秃头!Alibaba强推并发编程笔记我跪了,真的学到好多东西!

Java架构追梦

Java 学习 架构 面试 并发编程

从源码的角度搞懂 Java 动态代理!

Java架构师迁哥

演讲经验交流会|ArchSummit 上海站

演讲经验交流会|ArchSummit 上海站

Kafka-Clients源码学习:KafkaProducer篇-InfoQ