在本系列的第一部分中,我们学习了Apache Kafka和Quarkus的集成,并开发了一个简单的应用,从两个 Kafka 主题生产和消费事件。
在那个样例中,我们模拟了一个影视流公司,在一个 Kafka 主题中存储电影信息,在另外一个主题中存储了用户停止观看电影时所发生的每个事件,并捕获了电影已播放的时间。
下图展示了该应用的架构:
/filters:no_upscale()/articles/quarkus-with-kafka-streams/en/resources/1Untitled-10-1656674518432.jpeg)
我们可以看到,消费消息很简单,只要有消息生成,我们就可以得到它们,但是除此之外,我们也做不了其他的事情了。如果我们需要实时处理数据(比如过滤或操作事件)或者我们需要在事件之间做一些关联,单纯使用 Kafka 的消费 API 可能就不是最佳的方式了,因为这会导致代码非常复杂。
Kafka Streams
Kafka Streams项目能够帮助我们在事件产生时实时消费它们,应用各种转换,执行流连接等,并且可以选择性地将新的数据表述写回主题中。
对于有状态和无状态的流应用来说,Kafka Streams 都是理想方案,它能够实现基于时间的操作(例如,围绕给定的时间段对事件进行分组),并且考虑到了 Kafka 生态系统中普遍存在的可扩展性、可靠性和可维护性。
Kafka Stream 由三个元素组成,即输入(源处理器)、输出(sink 处理器)和处理器(流处理器)。
源处理器(Source processor):源处理器代表一个 Kafka 主题。源处理器会发送事件到一个或多个流处理器中。
流处理器(Stream processor):流处理器会将转换/逻辑应用于输入流中,比如连接、分组、计数、映射等。流处理器可以连接至另一个流处理器和/或 sink 处理器。
Sink 处理器(Sink processor):Sink 处理器代表了输出的数据,它会连接至一个 Kafka 主题。
**拓扑结构(topology)**是由源、处理器和 sink 组成的无循环图,事件会传入到一个 Kafka Streams 中,该实例将开始拓扑结构的执行。
Kafka Streams 和 Quarkus
Quarkus 使用 Quarkus KStreams 扩展实现与Kafka Streams集成。
Quarkus 起步
使用 Quarkus 最快捷的方式是通过初始化页面添加所需的依赖。每个服务可能需要不同的依赖,你可以选择 Java 11 或 Java 17。为了实现 Quarkus 与 Kafka Streams 的集成,我们至少需要添加 Kafka Streams 扩展。
/filters:no_upscale()/articles/quarkus-with-kafka-streams/en/resources/1Captura%20de%20Pantalla%202022-04-10%20a%20las%2022.34.28-1656674518432.jpeg)
要开发的应用
正如在本文开始时所提到的,在本系列的第一部分中,我们开发了一个影视流公司,它有两个 Kafka 主题,其中一个用来存储电影的列表,另外一个主题会在用户停止播放电影时存储用户所在的区域(事件的键),并且会以电影 id 和播放时间作为事件的值。
/filters:no_upscale()/articles/quarkus-with-kafka-streams/en/resources/1topic-1656674518432.jpeg)
所有的这些逻辑都是在名为Movie Plays Producer的生成者服务中创建的,该服务是使用 Quarkus 开发的。
除此之外,我还使用 Quakus 开发了一个Movie Plays Consumer服务,它会消费这两个主题的事件并且会在控制台上展示它们(并实现了 HTTP 服务器端事件)。
但是,这里没有对数据进行任何处理,它只是按照原样进行了接收。如果我们想要在 movies 和 playtimemovies 主题之间进行一下连接,在获取电影播放时长的时候得到电影的详细信息而不是 id 的话,那又该怎么办呢?
/filters:no_upscale()/articles/quarkus-with-kafka-streams/en/resources/1join-1656674518432.jpeg)
如果仅仅使用 Kafka 消息来实现这样的逻辑会变成一项很复杂的任务,因为我们需要在一个 Map 中存储 Movie 信息,并且在每个 playedmovie 事件发生时,进行匹配处理。
Movie Plays KStream
与其为每个用例手工编写代码,不如看一下如何使用 Kafka Streams,以及它是如何与 Quarkus 集成来解决这个问题的。
创建项目
导航至 Quarkus 的初始化页面,并选择 Apache Kafka Streams 扩展来实现与 Kafka Streams 的集成。然后,选择 RestEasy 和 RestEasy Jackson 扩展实现事件从 Java 对象和 JSON 之间的编排/解排。同时,取消选中 Started Code 生成选项。
请参照下面的截图:
/filters:no_upscale()/articles/quarkus-with-kafka-streams/en/resources/1Captura%20de%20Pantalla%202022-04-10%20a%20las%2017.25.38-1656674518432.jpeg)
你也可以跳过这个手动的步骤并导航至Kafka Stream Quarkus Generator链接,在这里,所有的依赖都已经选择好了。然后,点击 Generate your application 按钮,以下载应用骨架的压缩文件。
解压文件,并在你最喜欢的 IDE 中打开项目。
开发
当开发 Kafka Stream 应用时,我们需要做的第一件事就是创建Topology实例,并定义源、处理器和 sink。
在 Quarkus 中,我们只需要创建一个_CDI_类,这个类需要包含一个返回Topology实例的方法。
创建名为TopologyProducer的类,它将会实现从这两个主题消费事件并连接它们的逻辑。最后,生成的结果将会发送至一个 sink 处理器,该处理器以控制台输出的形式展示结果。
还有一个元素我们没有提到,在这些场景中它非常有用,那就是 Kafka Tables。
一个主题可以包含具有相同键的多个事件。例如,我们可以使用某个键插入一个电影,然后我们可以使用相同的键创建一个新的事件来对电影进行更新:
/filters:no_upscale()/articles/quarkus-with-kafka-streams/en/resources/1topic-1656674518432.jpg)
但是,如果我们想要让 movies 主题与_playtimemovies_主题进行连接的话,那我们该选择使用哪个值为 1 的事件呢?第一个还是第二个?在这个具体的情况中,应该选择最新的那一个,因为它包含了电影的最新版本。为了获取每个事件的最新版本,Kafka Streams 有一个_表_的概念(KTable/GlobalKTable)。
Kafka Streams 会浏览指定的主题,获取每个事件的最新版本,并将其放到一个表实例中。
KafkaStream 扩展并不会像 Kafka Messaging 集成那样自动注册SerDes,所以我们需要在拓扑中手动注册它们。
package org.acme;import javax.enterprise.context.ApplicationScoped;import javax.enterprise.inject.Produces;import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.KeyValue;import org.apache.kafka.streams.StreamsBuilder;import org.apache.kafka.streams.Topology;import org.apache.kafka.streams.kstream.Consumed;import org.apache.kafka.streams.kstream.GlobalKTable;import org.apache.kafka.streams.kstream.KStream;import org.apache.kafka.streams.kstream.Printed;import io.quarkus.kafka.client.serialization.ObjectMapperSerde;@ApplicationScopedpublic class TopologyProducer { private static final String MOVIES_TOPIC = "movies"; private static final String PLAY_MOVIES_TOPIC = "playtimemovies"; @Produces public Topology getTopCharts() { final StreamsBuilder builder = new StreamsBuilder();// 用于Movie和PlayedMovie的SerDes final ObjectMapperSerde<Movie> movieSerder = new ObjectMapperSerde<>(Movie.class); final ObjectMapperSerde<MoviePlayed> moviePlayedSerder = new ObjectMapperSerde<>(MoviePlayed.class); // 为Movies主题创建一个Global Kafka Table final GlobalKTable<Integer, Movie> moviesTable = builder.globalTable( MOVIES_TOPIC, Consumed.with(Serdes.Integer(), movieSerder)); // 连接至playtimemovies主题的流,每当该主题有事件生成都会被该流所消费 final KStream<String, MoviePlayed> playEvents = builder.stream( PLAY_MOVIES_TOPIC, Consumed.with(Serdes.String(), moviePlayedSerder));// PlayedMovies使用区域作为键,对象作为值。我们对内容进行map操作,让电影的id作为key(以便于进行连接)并让对象继续作为值// 另外,我们使用movies表的键(movieId)以及流的键(在前面的map方法中,我们也将其变成了movieId)进行连接// 最后,结果会流向控制台 playEvents .map((key, value) -> KeyValue.pair(value.id, value)) // Now key is the id field .join(moviesTable, (movieId, moviePlayedId) -> movieId, (moviePlayed, movie) -> movie) .print(Printed.toSysOut()); return builder.build(); }}Movie和MoviePlayed POJO 包含了实现逻辑所需的属性:
Movie对象如下所示:
package org.acme;public class Movie { public int id; public String name; public String director; public String genre; public Movie(int id, String name, String director, String genre) { this.id = id; this.name = name; this.director = director; this.genre = genre; }}MoviePlayed对象如下所示:
package org.acme;public class MoviePlayed { public int id; public long duration; public MoviePlayed(int id, long duration) { this.id = id; this.duration = duration; }}运行 Kafka Stream 应用之前的最后一步是配置参数,其中最重要的是quarkus.kafka-streams.topics。它是一个主题列表,在拓扑结构开始处理数据之前,它们就要存在于 Kafka 集群中,这是一个前提条件。
打开src/main/resources/application.properties文件并添加如下的代码行:
kafka-streams.cache.max.bytes.buffering=10240kafka-streams.commit.interval.ms=1000kafka-streams.metadata.max.age.ms=500kafka-streams.auto.offset.reset=earliestkafka-streams.metrics.recording.level=DEBUGquarkus.kafka-streams.topics=playtimemovies,movies现在,我们可以测试一下流了。我们启动在上一篇文章中开发的生产者。生产者的源码可以在这里找到。
Quarkus KStreams 集成了 Quarkus DevServices。所以,我们不需要启动 Kafka 集群,也不需要配置它的位置,因为 Quarkus Dev 模式会处理好所有的事情。我们只需要记住在自己的计算机上要有一个运行中的容器环境即可,比如 Podman 或其他兼容 OCI 的工具。
在终端窗口中启动生产者服务:
cd movie-plays-producer./mvnw compile quarkus:dev2022-04-11 07:49:31,900 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Cruella played for 287 minutes2022-04-11 07:49:31,941 INFO [io.quarkus] (Quarkus Main Thread) movie-plays-producer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.7.3.Final) started in 4.256s.2022-04-11 07:49:31,942 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.2022-04-11 07:49:31,943 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kafka-client, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]2022-04-11 07:49:32,399 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Encanto played for 162 minutes2022-04-11 07:49:32,899 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie The Hobbit played for 255 minutes2022-04-11 07:49:33,404 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Sing 2 played for 264 minutes2022-04-11 07:49:33,902 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Encanto played for 28 minutes2022-04-11 07:49:34,402 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Star Trek: First Contact played for 137 minutes2022-04-11 07:49:34,903 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Star Trek: First Contact played for 277 minutes2022-04-11 07:49:35,402 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie The Hobbit played for 141 minutes在另一个终端窗口中,启动我们刚刚开发的 Kafka Stream 代码:
./mvnw compile quarkus:dev2022-04-11 07:54:59,321 INFO [org.apa.kaf.str.pro.int.StreamTask] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-thread [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1] task [1_0] Restored and ready to run2022-04-11 07:54:59,322 INFO [org.apa.kaf.str.pro.int.StreamThread] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-thread [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1] Restoration took 74 ms for all tasks [1_0]2022-04-11 07:54:59,322 INFO [org.apa.kaf.str.pro.int.StreamThread] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-thread [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING2022-04-11 07:54:59,324 INFO [org.apa.kaf.str.KafkaStreams] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-client [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea] State transition from REBALANCING to RUNNING[KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella][KSTREAM-LEFTJOIN-0000000005]: 2, Movie [director=Jonathan Frakes, genre=Space, id=2, name=Star Trek: First Contact][KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella][KSTREAM-LEFTJOIN-0000000005]: 1, Movie [director=Peter Jackson, genre=Fantasy, id=1, name=The Hobbit][KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella][KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella][KSTREAM-LEFTJOIN-0000000005]: 3, Movie [director=Jared Bush, genre=Animation, id=3, name=Encanto][KSTREAM-LEFTJOIN-0000000005]: 5, Movie [director=Garth Jennings, genre=Jukebox Musical Comedy, id=5, name=Sing 2]在输出中打印的是事件(连接所生成的结果),其中键是movieId,值是movie本身。我们现在所具有的功能是,每当一个电影停止播放时,Kafka Stream 会对其进行处理并以 Movie 全量信息的形式对其进行展现。
到目前为止,还不算复杂,对于这样的场景,你可能会想我们根本没有必要使用 Kafka Streams。但是,我们再加一些需求,这样你就能看到它的强大之处了。
现在,我们不是在用户每次停掉电影的时候都生成事件,而是只对用户观看时间超过 10 分钟的电影发送事件。
我们可以使用filter方法按照持续时长进行过滤。
playEvents .filter((region, event) -> event.duration >= 10) // filters by duration .map((key, value) -> KeyValue.pair(value.id, value)) .join(moviesTable, (movieId, moviePlayedId) -> movieId, (moviePlayed, movie) -> movie) .print(Printed.toSysOut());重启应用,我们可以发现观看时长小于 10 分钟的电影将不会被处理。
我们发现,Kafka Streams 有助于代码的整洁性,接下来,我们添加最后的需求。现在,我们对每部电影的播放时长并不感兴趣,而是对每部电影有多少次超过 10 分钟的播放感兴趣。
到目前为止,对事件的处理都是无状态的,因为事件都是遵循这样的步骤,即接收、处理并发送至 sink 处理器(也就是发送至一个主题或控制台输出),但是,为了统计某部电影播放的次数,我们需要在内存记住电影被播放了多少次,并且当任意用户再次观看超过 10 分钟的时候,要对这个统计数字递增一次。此时,事件的处理就要以有状态的方式进行了。
我们需要做的第一件事就是创建一个 Java 类,以存储电影的名称及其播放的次数。
public class MoviePlayCount { public String name; public int count; public MoviePlayCount aggregate(String name) { this.name = name; this.count++; return this; } @Override public String toString() { return "MoviePlayCount [count=" + count + ", name=" + name + "]"; }}这是一个计数器类,它依然需要两样东西:
我们需要有一个地方存储这个类的实例,确保每次触发事件的时候,它不会被重置。
每当_playtimemovies_主题中有事件触发时,调用
aggregate方法的逻辑。
关于第一个问题,我们需要使用KeyValueBytesStoreSupplier接口。
public static final String COUNT_MOVIE_STORE = "countMovieStore";KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(COUNT_MOVIE_STORE);对于第二个问题,Kafka Streams 有一个用来聚合结果的aggregate方法。
在我们的用例中,也就是每部电影播放时长超过 10 分钟的次数。
// MoviePlayCount可以被序列化和反序列化ObjectMapperSerde<MoviePlayCount> moviePlayCountSerder = new ObjectMapperSerde<>(MoviePlayCount.class);// 这是之前的连接操作,其中键是电影id,值是电影.join(moviesTable, (movieId, moviePlayedId) -> movieId, (moviePlayed, movie) -> movie)// 根据键对事件进行分组,在本例中,也就是电影的id.groupByKey(Grouped.with(Serdes.Integer(), movieSerder)) // 聚合方法,如果MoviePlayCount已经创建的话,获取该对象(如果尚未创建的话,会创建该实例)并调用其aggregate方法,以对观看次数进行递增 .aggregate(MoviePlayCount::new, (movieId, movie, moviePlayCounter) -> moviePlayCounter.aggregate(movie.name), Materialized.<Integer, MoviePlayCount> as(storeSupplier) .withKeySerde(Serdes.Integer()) .withValueSerde(moviePlayCountSerder) )重启应用,在控制台上将会展示电影播放的次数。
提示:要重启应用,只需在终端输入“s”,应用将会自动重启。
应用重启之后,控制台将会展示每部电影的状态。
[KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=13, name=Cruella][KTABLE-TOSTREAM-0000000011]: 3, MoviePlayCount [count=11, name=Encanto][KTABLE-TOSTREAM-0000000011]: 5, MoviePlayCount [count=14, name=Sing 2][KTABLE-TOSTREAM-0000000011]: 2, MoviePlayCount [count=15, name=Star Trek: First Contact][KTABLE-TOSTREAM-0000000011]: 1, MoviePlayCount [count=16, name=The Hobbit][KTABLE-TOSTREAM-0000000011]: 2, MoviePlayCount [count=16, name=Star Trek: First Contact][KTABLE-TOSTREAM-0000000011]: 3, MoviePlayCount [count=12, name=Encanto][KTABLE-TOSTREAM-0000000011]: 2, MoviePlayCount [count=17, name=Star Trek: First Contact][KTABLE-TOSTREAM-0000000011]: 5, MoviePlayCount [count=15, name=Sing 2][KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=14, name=Cruella][KTABLE-TOSTREAM-0000000011]: 1, MoviePlayCount [count=17, name=The Hobbit][KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=15, name=Cruella][KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=16, name=Cruella]交互式查询
因为我们将_sink_处理器设置成了System.out 流,所以聚合结果会流向控制台。
.toStream().print(Printed.toSysOut());但是,我们也可以将结果流发往一个 Kafka 主题:
.to("counter_movies", Produced.with(Serdes.Integer(), moviePlayCountSerder));但是,如果我们感兴趣的不是每次对新事件的反应,而只是想查询特定的电影此时播放的次数,那又该怎么办呢?
Kafka Streams 的交互式查询(interactive query)允许我们直接查询底层存储,以获取给定键相关的值。
首先,我们创建一个名为MoviePlayCountData的类来存储查询结果。按照这种方式,我们可以解耦 Kafka Streams 使用的类与应用中其他部分所使用的类。
public class MoviePlayCountData { private String name; private int count; public MoviePlayCountData(String name, int count) { this.name = name; this.count = count; } public int getCount() { return count; } public String getName() { return name; }}现在,创建名为InteractiveQueries的类来实现对状态存储(KeyValueBytesStoreSupplier)的访问并根据电影的id查询它被播放的次数。
import javax.enterprise.context.ApplicationScoped;import javax.inject.Inject;import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.errors.InvalidStateStoreException;import org.apache.kafka.streams.state.QueryableStoreTypes;import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;import static org.apache.kafka.streams.StoreQueryParameters.fromNameAndType;import java.util.Optional;@ApplicationScopedpublic class InteractiveQueries { @Inject KafkaStreams streams; public <MoviePlayCountData> getMoviePlayCountData(int id) { // 获取状态存储并根据电影id获取播放次数 MoviePlayCount moviePlayCount = getMoviesPlayCount().get(id); // 如果有结果的话 if (moviePlayCount != null) { // 将结果包装到MoviePlayCountData中 return Optional.of(new MoviePlayCountData(moviePlayCount.name, moviePlayCount.count)); } else { return Optional.empty(); } } // 获取状态存储 private ReadOnlyKeyValueStore<Integer, MoviePlayCount> getMoviesPlayCount() { while (true) { try { return streams.store(fromNameAndType(TopologyProducer.COUNT_MOVIE_STORE, QueryableStoreTypes.keyValueStore())); } catch (InvalidStateStoreException e) { // 忽略之,此时存储尚未就绪 } } }}现在,我们可以添加一个简单的 REST 端点来运行该查询。
import java.util.Optional;import javax.inject.Inject;import javax.ws.rs.GET;import javax.ws.rs.Path;import javax.ws.rs.PathParam;import javax.ws.rs.core.Response;import javax.ws.rs.core.Response.Status;@Path("/movie")public class MovieCountResource { // 注入前文的类进行查询 @Inject InteractiveQueries interactiveQueries; @GET @Path("/data/{id}") public Response movieCountData(@PathParam("id") int id) { Optional<MoviePlayCountData> moviePlayCountData = interactiveQueries.getMoviePlayCountData(id); // 根据结果判定返回值还是404 if (moviePlayCountData.isPresent()) { return Response.ok(moviePlayCountData.get()).build(); } else { return Response.status(Status.NOT_FOUND.getStatusCode(), "No data found for movie " + id).build(); } }}Kafka Stream 实现的模式如下图所示:
/filters:no_upscale()/articles/quarkus-with-kafka-streams/en/resources/1topology-1656674518432.jpeg)
扩展
Kafka Streams 应用可以进行扩展,所以流会分布到多个实例中。在这种情况下,每个实例都包含聚合结果的一个子集,所以要想获得总的聚合结果,我们需要通过将 REST API 重定向到另外的实例来获取其他实例的数据。
Kafka Streams 提供了一个API,可以知道要请求的数据是在本地 Kafka Streams 存储中还是在其他的主机中。
虽然这个过程并不复杂,但已经超出了本文的讨论范围。
结论
到目前为止,我们已经看到,将 Quarkus 应用连接到 Apache Kafka 并生产和消费主题中的消息/事件是很容易的。此外,还看到 Kafka Streams 让我们不仅可以消费 Kafka 中消息,还能够实时处理它们,进行转换、过滤等操作,例如以同步的方式消费结果数据。 这是一项强大的技术,当需要处理的数据不断变化时,它可以轻松扩展,提供实时的体验。
但是,我们还没有解决该架构的最后一个问题。通常情况下,数据并不是存储在一个地方。电影信息可能存储在关系型数据库中,而电影的播放信息则存储在一个 Kafka 主题中。那么,该如何保持这两个地方的信息更新,以便 Kafka Streams 能正确地连接数据呢?
这里有一个缺失的部分,名为 Debezium 的项目可以帮助我们解决这个问题。我们将用一整篇文章来介绍 Debezium 和 Quarkus,敬请持续关注。
作者简介:
Alex Soto 是红帽公司的开发者体验总监。他对 Java 领域、软件自动化充满热情,他相信开源软件模式。Soto 是 Manning 的《Testing Java Microservices》和 O’Reilly 的《Quarkus Cookbook》两本书的共同作者,他还是多个开源项目的贡献者。自 2017 年以来,他一直是 Java Champion,是国际演讲者和 Salle URL 大学的教师。你可以在 Twitter 上关注他(Alex Soto ),随时了解 Kubernetes 和 Java 领域的动态。
原文链接:
Kafka Streams and Quarkus: Real-Time Processing Events
相关阅读:





