Kafka-Clients 源码学习:KafkaProducer 篇

阅读数:939 2019 年 10 月 22 日 13:55

Kafka-Clients源码学习:KafkaProducer篇

前言

本文基于 Kafka-clients:1.1.0 版本。

KafkaProducer 的发送流程

调用流程图

Kafka-Clients源码学习:KafkaProducer篇

此图描述的是用户启动一个线程使用 KafkaProducer 进行消息发送的大概流程。除了用户主线程,KafkaProducer 在新建实例时还会启动一个设置为 daemon 的 Sender 线程。

user 线程

图中红线画出的用户主线程调用栈。当调用 send 方法发送 record 后会经历如下流程:

  1. 确认元数据可用
  2. 序列化
  3. 将数据 append 进缓存

下面由代码分析一下,为了清晰起见省略了部分内容。

确认元数据可用(waitOnMetadata)

复制代码
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
// 将 topic 放入 metadata 中,如果已存在就刷新过期时间
metadata.add(topic);
// 从内存中 fetch cluster 元数据
Cluster cluster = metadata.fetch();
Integer partitionsCount = cluster.partitionCountForTopic(topic); // 找到了直接 return
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
long elapsed;
// 标识需要进行元数据更新,并且等待至更新完成或抛出超时异常
do {
metadata.add(topic);
int version = metadata.requestUpdate();
sender.wakeup();
try {
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
}
// 省略部分超时、权限异常检测代码
......
partitionsCount = cluster.partitionCountForTopic(topic);
} while (partitionsCount == null);
if (partition != null && partition >= partitionsCount) {
throw new KafkaException(
String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));
}
return new ClusterAndWaitTime(cluster, elapsed);
}

序列化

复制代码
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
// 省略部分
//......
byte[] serializedKey;
try {
// 序列化 key
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
// 序列化 value
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers);
// 估算序列化后大小,确保不会超过 max_request_size 或者 total_memory_size
ensureValidRecordSize(serializedSize);
}
// 省略部分
//......
}

将数据 append 进缓存 (RecordAccumulator.append)

消息会被 append 进 accumulator 缓存结构中的双端队列,下面用一张图解释一下这个缓存结构。消息会从对应双端队列的尾部 batch 被写入。在 Sender 线程中每次会从头部 batch 拉取。

Kafka-Clients源码学习:KafkaProducer篇

复制代码
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// 根据 TopicPartition 获得对应双端队列 dq,没有则创建
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
// 尝试向已有 dq 中的 last batch 已写入此条 record
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null)
return appendResult;
}
// dq 中的 last batch 无法再添加 record,于是申请一个新的 batch,并且添加到 dq 的 last
// 从 BufferPool 中拿出一个 buffer
buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null) {
// 两次获取 dq 锁的过程中可能有其它线程创建了可用的 batch,如果 tryAppend 成功则直接返回
return appendResult;
}
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
// 将 record append 进刚刚新建的 batch
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
// 将申请的 batch 放入 dp last
dq.addLast(batch);
incomplete.add(batch);
// 避免在 finally 模块中释放这次申请的 buffer
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
}
} finally {
if (buffer != null)
// 如果使用了其它线程申请的 buffer 则释放此线程申请的 buffer
free.deallocate(buffer);
}
}

sender 线程

KafkaProducer 的构造函数中会启动一个新线程,用来跑 Sender 任务,所以这里我们将它称为 sender 线程,实际的线程名为 "kafka-producer-network-thread | $clientId"。

Sender 类实现了 Runnable 接口,实现的 run() 方法逻辑也很简单,一直循环下面几个步骤直到 close 被调用:

  1. 幂等或者事务 producer 逻辑(如果配置中有开启)。
  2. 从 RecordAccumulator 中获取 batch 构建 requests 发送到缓存。
  3. 进行一次 client.poll,进行真正的消息发送、接收处理。

幂等的相关实现后面会进行讲解,这里主要讲解步骤 2,3。

构建 Requests 发送到缓存 (sendProducerData&sendProducerRequest)

复制代码
private long sendProducerData(long now) {
// 获取本地元数据
Cluster cluster = metadata.fetch();
// 获取存在待发送分区对应的 node
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// 如果本地元数据中没找到对应 leader 信息,强制元数据更新
if (!result.unknownLeaderTopics.isEmpty()) {
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
this.metadata.requestUpdate();
}
// 删除没有准备好发送的 node
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
// 检查是否能向此 node 发送消息(这里会保证等待同一 node 的上次 request send 发送完成)
if (!this.client.ready(node, now)) {
iter.remove();
// 获得 disconnected 状态的 node 剩余 backoff 时间
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
// 从 Accumulator 中获取准备发送的 batchs
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
this.maxRequestSize, now);
if (guaranteeMessageOrder) {
// 如果 max.inflight.requests = 1,则将待发送的 partition 锁住
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
// 从 accumulator 中拿出 timeout 的 batch(max.inflight.requests > 1 的情况下过期不保回调顺序)
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
for (ProducerBatch expiredBatch : expiredBatches) {
// 回调 timeout 异常
failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException(), false);
//......
}
// 如果存在可发送数据并且准备好发送的 node,会以 0 的 timeout 来返回 pollTimeout,否则此 timeout 会由当前无法发送消息的 nodes 决定(比如 lingering, backing off)。总的来说:
// 1. 如果存在 partition 的数据准备好发送,pollTimeout=0
// 2. 另外如果存在 partition 有数据写入 accumulator 但是没有准备好发送,pollTimeout=min{当前时间与 linger 过期时间差值 (重发的 batch 考虑 retryBackoff 差值), 当前时间与 disconnected node 的 reconnectBackoffMs 的时间差值}
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (!result.readyNodes.isEmpty()) {
pollTimeout = 0;
}
// 将 batchs 构建成 request 发送到缓存
sendProduceRequests(batches, now);
return pollTimeout;
}
复制代码
private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
// 遍历 batches,以 node 为单位发送 request 请求
for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
sendProduceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue());
}
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
if (batches.isEmpty())
return;
Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
//......
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
MemoryRecords records = batch.records();
//......
produceRecordsByPartition.put(tp, records);
recordsByPartition.put(tp, batch);
}
// 如果 producer 开启了事务属性则获取 transactionId
String transactionalId = null;
if (transactionManager != null && transactionManager.isTransactional()) {
transactionalId = transactionManager.transactionalId();
}
// 用 produceRecordsByPartition 构建 requestBuilder
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
produceRecordsByPartition, transactionalId);
// 用 recordsByPartition 构建回调类,client.poll 收到 response 后会调用 callback.onComplete 方法。
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
String nodeId = Integer.toString(destination);
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);
// 调用 client.send 发送消息
client.send(clientRequest, now);
log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}

client.poll 处理

这个内容在上一篇写过,这里就不详细列 selector 的方法实现了,只需要关注 poll 方法,handleCompletedSends 生成已经发送成功并且不需要 ack 的请求对应 response,handCompletedReceives 生成等待收到回复的请求对应 response。最后统一由completeResponses进行处理。

复制代码
public List<ClientResponse> poll(long timeout, long now) {
if (!abortedSends.isEmpty()) { // 如果存在 abortedSends,直接处理,不经过 selector 的 poll
List<ClientResponse> responses = new ArrayList<>();
handleAbortedSends(responses);
completeResponses(responses);
return responses;
}
// 判断是否要更新元数据,如果需要的话发送更新请求(会选择 inflight 请求数量最少的 ready node 发送请求)
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
// 完成一次 selector poll,最大阻塞时间为 timeout,metadataTimeout,requestTimeoutMs 的最小值
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
long updatedNow = this.time.milliseconds();
// 初始化 response list
List<ClientResponse> responses = new ArrayList<>();
// 如果是不等待回复的请求则直接生成 response 放入 list
handleCompletedSends(responses, updatedNow);
// 根据 inFlightRequests 的发送顺序生成已完成回复的 response 放入 list
handleCompletedReceives(responses, updatedNow);
// 处理断开连接的请求,生成 response 放入 list
handleDisconnections(responses, updatedNow);
// 更新 connectionStates 中 node 的连接状态
handleConnections();
// 需要的话发送 api version 获取请求
handleInitiateApiVersionRequests(updatedNow);
// 处理超时请求,生成 response 放入 list
handleTimedOutRequests(responses, updatedNow);
// 完成所有 response 的处理
completeResponses(responses);
return responses;
}
private void completeResponses(List<ClientResponse> responses) {
for (ClientResponse response : responses) {
try {
// 调用 response 中的 callback 进行处理,producer 端可以参考 Sender 类的 handleProduceResponse 方法
response.onComplete();
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}

最终所有 response 都会在 completeResponses 中处理,而处理所调用的方法就是我们在构建 request 时传入的 callback.onComplete(response)。所以回到 Sender 类中的handleProducerResponse方法就可以看到 producer 收到 broker 回复后的处理逻辑了。

复制代码
private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, long now) {
//......
if (response.wasDisconnected()) {
// 处理连接断开 batches
for (ProducerBatch batch : batches.values())
completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now);
} else if (response.versionMismatch() != null) {
// 处理协议版本不匹配
for (ProducerBatch batch : batches.values())
completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.UNSUPPORTED_VERSION), correlationId, now);
} else {
// 如果 response 有内容,则解析后 completeBatch
if (response.hasResponse()) {
ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
TopicPartition tp = entry.getKey();
ProduceResponse.PartitionResponse partResp = entry.getValue();
ProducerBatch batch = batches.get(tp);
completeBatch(batch, partResp, correlationId, now);
}
} else {
// 如果没有 response,即 ack = 0 时会走此处逻辑,直接 completeBatch
for (ProducerBatch batch : batches.values()) {
completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now);
}
}
}
}

completeBatch 负责进行 batch 的 retry reenqueueBatch(如果是可重试),以及回调的触发 batch.done(不可重试或者 success)。

幂等 producer 的实现

设置 producer 的 "enable.idempotence"=true 就开启了 producer 的幂等逻辑。这个配置是为了实现 producer 以精确一次的语义进行消息发送(只能保证单 producer 实例内的精确一次,如果存在 producer 的 failover 切换则无法保证)。开启幂等 producer 还额外要求配置 “max.in.flight.requests.per.connection” <= 5, “retries” > 0, 以及 “ack” = all。

TransactionManager 类负责幂等相关的实现,在 sender 发送具体消息之前会从 broker 端获取一个 producer id(PID),并且发送到 broker 的每批消息都会被赋予一个序列号用于消息去重。kafka 会把序列号与消息一起保存在底层日志中,这样即使分区的 leader 副本挂掉,新选出来的 leader broker 也能执行消息去重工作(所以幂等要求 ack=all,即所有 replicas 都写入才 ack)。

每对 (PID, 分区号) 都会有对应的递增 sequence,Broker 收到消息后,如果发现 sequence <= old sequence 便不会写入日志中,返回 client DUPLICATE_SEQUENCE_NUMBER 异常或者 OutOfOrderSequence 异常,client 会以发送成功的逻辑来处理此异常;如果 sequence = old sequence + 1,则正常写入并返回成功;如果 sequence > old sequence + 1,也会拒绝此次写入,并且返回 client OUT_OF_ORDER_SEQUENCE_NUMBER 异常,幂等 producer 将会按照 sequence 顺序 reenqueue 发送失败的消息,等待发送。

为什么说 max.in.flight.requests.per.connection <= 5 时都可以保证消息顺序

这里引用一段 KAFKA-5494 issue 的描述:

Currently, the idempotent producer (and hence transactional producer) requires max.in.flight.requests.per.connection=1.

This was due to simplifying the implementation on the client and server. With some additional work, we can satisfy the idempotent guarantees even with any number of in flight requests. The changes on the client be summarized as follows:

  1. We increment sequence numbers when batches are drained.
  2. If for some reason, a batch fails with a retriable error, we know that all future batches would fail with an out of order sequence exception.
  3. As such, the client should treat some OutOfOrderSequence errors as retriable. In particular, we should maintain the ‘last acked sequnece’. If the batch succeeding the last ack’d sequence has an OutOfOrderSequence, that is a fatal error. If a future batch fails with OutOfOrderSequence they should be reenqeued.
  4. With the changes above, the the producer queues should become priority queues ordered by the sequence numbers.
  5. The partition is not ready unless the front of the queue has the next expected sequence.

With the changes above, we would get the benefits of multiple inflights in normal cases. When there are failures, we automatically constrain to a single inflight until we get back in sequence.

With multiple inflights, we now have the possibility of getting duplicates for batches other than the last appended batch. In order to return the record metadata (including offset) of the duplicates inside the log, we would require a log scan at the tail to get the metadata at the tail. This can be optimized by caching the metadata for the last ‘n’ batches. For instance, if the default max.inflight is 5, we could cache the record metadata of the last 5 batches, and fall back to a scan if the duplicate is not within those 5.

最后说的很清楚,因为 kafka broker 当前实现会缓存 the last 5 batches 用于判断 duplicates,而在 client 端 TransactionManager 中的 inflightBatchesBySequence 也是初始化了缓存长度为 5 的 PriorityQueue。所以强制规定 max.in.flight.requests.connection 是出于效率上的考虑。

KafkaProducer 的重要配置

配置名 含义
acks 消息发送的确认机制。0:无需确认 1: 仅需 partition leader 确认 -1 or all:需要所有 replicas 确认
enable.idepotence true: 幂等 producer false:非幂等 producer
retries 设置遇到可重试异常时最多重试多少次
max.block.ms 当 buffer 满了或者 metadata 不可用时最多阻塞多久 (kafkaProducer.send() & KafkaProducer.partitionsFor())
buffer.memory 可用用来缓存 records 的 bytes 大小,满了后会阻塞 send,此配置不能表示 KafkaProducer 用的所有内存,还有一些内存用来维护 inflight 请求以及压缩
request.timeout.ms 请求的超时时间。 在 2.1.0 版本前还表示消息的最大留存时间,2.1.0 后此语义被 delivery.timeout.ms 取代
connections.max.idle.ms 连接的最大 idle 时间
bootstrap.servers 启动连接 broker 地址
max.in.flight.requests.per.connection 同一个连接上的未确认请求的最大数量
batch.size 默认的 producer 发送消息的 batch 大小
linger.ms 在对应 partition 消息大小未达到 batch.size 时增加的 latency,如果对于同一个 broker 有多个 partition leader,则已最先需要发送的 partition 为准
max.request.size 单个请求的最大 bytes 大小
compression.type 是否压缩请求数据,提供 none, gzip, snappy, lz4 四种配置

还有很多配置项没有列出,kafka 给了用户非常多配置项,在进行 KafkaProducer 的配置前一定要先确定好自己的需求。比如说是否能容忍可能的消息丢失;是否需要严格保证消息的顺序;是否需要尽可能减少 Broker 端日志的重复数据;是否需要增加适当的消息延时 (linger batch, compression) 来提高吞吐等。

实践中的一些问题

保证同 partition 回调顺序而不是全局

我们在使用 kafka 时上游还有一个 binlog 解析器,为了确认消息整体不丢,解析器为的每一条消息分配了一个递增 id,并且在 ack 时会验证消息是否为期望的递增 id,如果不是就抛出异常。

因为我们只需要保证特定的 binlog 顺序(表级别),消息会被 hash 到不同分区上,因此 kafka 的回调不一定会按照全局的上游顺序被调用。而同步的 kafka send 效率又太低,所以我们实现了一个 ackBuffer 来解决这个问题。

MemoryCanalAckWithBuffer 实现

实现目标:

  1. 在拿到 Message 时将 MessageId 顺序放入 buffer,如果buffer 满了阻塞等待信号。将 buffer 返回的 Sequence 传入 KafkaSend 的回调对象。
  2. 在 kafka 确认消费成功回调中,根据回调对象中的 Sequence 去 mark 标记 buffer 中对应位置的 id。(因为 MessageId 取值为 [1, Long.MAX_VALUE], 所以标记的方式可以利用这个特点,将原 id 标记成负数 -id)
  3. 在一个轮询线程中,从 buffer 中按顺序 get 元素出来,如果buffer 是空的或者下一个元素还没有被标记阻塞等待信号。
对外接口
复制代码
// 获得 buffer 中的下一个元素,在 buffer 为空或者待取出 id 没有被标记过的时候阻塞(waiting for signal)
long get() throws InterruptedException;
// 标记传入 Sequence 的 id
boolean mark(long markSequence) throws InterruptedException;
// 放入 id,返回在 buffer 中的 Sequence(可以计算出数组下标),buffer 满时会被阻塞(waiting for signal)
long put(long id) throws InterruptedException;
void setBufferSize(int bufferSize); // 设置 BufferSize
void start(); // 初始化 buffer
void stop(); // 关闭 buff,重置 putSequence,getSequence,清除 buffer
  • Buffer 中维护的是一个 long 基本类型的数组,数组的长度默认走 lion 配置,程序也可以调用 set 接口设置。
  • Buffer 采用的是懒惰的启动方式,在构造函数中不会分配内存,调用了 start 方法后,会根据 bufferSize 的值,计算一个 2 的次幂作为 buffer 长度,初始化 buffer 数组。
  • 内部维护了一个 putSequence,一个 getSequence 来记录读写位置以及 buffer 已使用空间。
  • 内部维护了一个 ReentrantLock 来同步对数组的读写操作,3 个 Condition 对象来协调 put,get,mark 线程间的合作。

request.timeout.ms 配置有多重含义

在 kafka-client 1.1.0 版本开发 producer 的时候发现这个配置非常坑,正常来说 TimeoutException 是一个可重试异常,但是在实际代码中即使设置了 max retries 的情况下,还是可能抛出 TimeoutException 到 send callback。

原因在于 request.timeout.ms 不仅用于判断发送消息的超时,还会用来判断缓存在 RecordAccumulator 中的 batch 是否过期(类似于消息发送最大留存时间),一个配置项存在多重含义,配置灵活性低,对用户不直观,当前被迫将 request.timeout.ms 设置为 max 值。

用了个野路子,通过和 Broker 连接的 idle 来检测消息发送异常(超时),目前能很快感知到集群机器的异常并且切换 broker 重发。

最近看到 kafka 也修复了这个配置问题(fix version:2.1.0),通过增加 delivery.timeout.ms 来配置消息最长发送时间。

Kafka-Clients源码学习:KafkaProducer篇

幂等 producer 开启的情况下出现整型溢出

TransactionManager 会根据消息发送的分区产生递增 sequence 并保存在一个 TopicPartition -> Integer 的 Map 当中。在我们 producer 长时间不发版的情况下,可能因为某个 partition(这和我们的发送顺序保证逻辑相关,某些表会频繁刷数据) 累计发送数据特别多而使这个 Integer 溢出。

当前最新版本的 kafka 实现依然还是 Integer,感觉和 broker 端需要存储这个值到日志相关,想要省存储空间。现在的做法只能是在出现异常告警后或者定期的重启一轮 Producer,这个问题不会导致数据丢失或者乱序。

相关链接

https://issues.apache.org/jira/browse/KAFKA-5494
https://kafka.apache.org/documentation/
https://issues.apache.org/jira/browse/KAFKA-5886
https://issues.apache.org/jira/browse/KAFKA-4515
https://issues.apache.org/jira/browse/KAFKA-5621

作者:

王植田,笔名 zwangbo,就职于拼多多,公众号是 zwangbo 的技术杂文,热爱技术和分享,欢迎交流。

评论

发布