阿里、蚂蚁、晟腾、中科加禾精彩分享 AI 基础设施洞见,现购票可享受 9 折优惠 |AICon 了解详情
写点什么

Kafka Streams 与 Quarkus:实时处理事件

  • 2022-08-25
    北京
  • 本文字数:10072 字

    阅读完需:约 33 分钟

Kafka Streams与Quarkus:实时处理事件

在本系列的第一部分中,我们学习了Apache KafkaQuarkus的集成,并开发了一个简单的应用,从两个 Kafka 主题生产和消费事件。


在那个样例中,我们模拟了一个影视流公司,在一个 Kafka 主题中存储电影信息,在另外一个主题中存储了用户停止观看电影时所发生的每个事件,并捕获了电影已播放的时间。


下图展示了该应用的架构:



我们可以看到,消费消息很简单,只要有消息生成,我们就可以得到它们,但是除此之外,我们也做不了其他的事情了。如果我们需要实时处理数据(比如过滤或操作事件)或者我们需要在事件之间做一些关联,单纯使用 Kafka 的消费 API 可能就不是最佳的方式了,因为这会导致代码非常复杂。

Kafka Streams

Kafka Streams项目能够帮助我们在事件产生时实时消费它们,应用各种转换,执行流连接等,并且可以选择性地将新的数据表述写回主题中。


对于有状态和无状态的流应用来说,Kafka Streams 都是理想方案,它能够实现基于时间的操作(例如,围绕给定的时间段对事件进行分组),并且考虑到了 Kafka 生态系统中普遍存在的可扩展性、可靠性和可维护性。


Kafka Stream 由三个元素组成,即输入(源处理器)、输出(sink 处理器)和处理器(流处理器)。


源处理器(Source processor):源处理器代表一个 Kafka 主题。源处理器会发送事件到一个或多个流处理器中。


流处理器(Stream processor):流处理器会将转换/逻辑应用于输入流中,比如连接、分组、计数、映射等。流处理器可以连接至另一个流处理器和/或 sink 处理器。


Sink 处理器(Sink processor):Sink 处理器代表了输出的数据,它会连接至一个 Kafka 主题。


**拓扑结构(topology)**是由源、处理器和 sink 组成的无循环图,事件会传入到一个 Kafka Streams 中,该实例将开始拓扑结构的执行。

Kafka Streams 和 Quarkus

Quarkus 使用 Quarkus KStreams 扩展实现与Kafka Streams集成。

Quarkus 起步

使用 Quarkus 最快捷的方式是通过初始化页面添加所需的依赖。每个服务可能需要不同的依赖,你可以选择 Java 11 或 Java 17。为了实现 Quarkus 与 Kafka Streams 的集成,我们至少需要添加 Kafka Streams 扩展。


要开发的应用

正如在本文开始时所提到的,在本系列的第一部分中,我们开发了一个影视流公司,它有两个 Kafka 主题,其中一个用来存储电影的列表,另外一个主题会在用户停止播放电影时存储用户所在的区域(事件的键),并且会以电影 id 和播放时间作为事件的值。



所有的这些逻辑都是在名为Movie Plays Producer的生成者服务中创建的,该服务是使用 Quarkus 开发的。


除此之外,我还使用 Quakus 开发了一个Movie Plays Consumer服务,它会消费这两个主题的事件并且会在控制台上展示它们(并实现了 HTTP 服务器端事件)。


但是,这里没有对数据进行任何处理,它只是按照原样进行了接收。如果我们想要在 movies 和 playtimemovies 主题之间进行一下连接,在获取电影播放时长的时候得到电影的详细信息而不是 id 的话,那又该怎么办呢?



如果仅仅使用 Kafka 消息来实现这样的逻辑会变成一项很复杂的任务,因为我们需要在一个 Map 中存储 Movie 信息,并且在每个 playedmovie 事件发生时,进行匹配处理。

Movie Plays KStream

与其为每个用例手工编写代码,不如看一下如何使用 Kafka Streams,以及它是如何与 Quarkus 集成来解决这个问题的。

创建项目

导航至 Quarkus 的初始化页面,并选择 Apache Kafka Streams 扩展来实现与 Kafka Streams 的集成。然后,选择 RestEasy 和 RestEasy Jackson 扩展实现事件从 Java 对象和 JSON 之间的编排/解排。同时,取消选中 Started Code 生成选项。


请参照下面的截图:



你也可以跳过这个手动的步骤并导航至Kafka Stream Quarkus Generator链接,在这里,所有的依赖都已经选择好了。然后,点击 Generate your application 按钮,以下载应用骨架的压缩文件。


解压文件,并在你最喜欢的 IDE 中打开项目。

开发

当开发 Kafka Stream 应用时,我们需要做的第一件事就是创建Topology实例,并定义源、处理器和 sink。


在 Quarkus 中,我们只需要创建一个_CDI_类,这个类需要包含一个返回Topology实例的方法。


创建名为TopologyProducer的类,它将会实现从这两个主题消费事件并连接它们的逻辑。最后,生成的结果将会发送至一个 sink 处理器,该处理器以控制台输出的形式展示结果。


还有一个元素我们没有提到,在这些场景中它非常有用,那就是 Kafka Tables。


一个主题可以包含具有相同键的多个事件。例如,我们可以使用某个键插入一个电影,然后我们可以使用相同的键创建一个新的事件来对电影进行更新:



但是,如果我们想要让 movies 主题与_playtimemovies_主题进行连接的话,那我们该选择使用哪个值为 1 的事件呢?第一个还是第二个?在这个具体的情况中,应该选择最新的那一个,因为它包含了电影的最新版本。为了获取每个事件的最新版本,Kafka Streams 有一个_表_的概念(KTable/GlobalKTable)。


Kafka Streams 会浏览指定的主题,获取每个事件的最新版本,并将其放到一个表实例中。


KafkaStream 扩展并不会像 Kafka Messaging 集成那样自动注册SerDes,所以我们需要在拓扑中手动注册它们。


package org.acme;
import javax.enterprise.context.ApplicationScoped;import javax.enterprise.inject.Produces;
import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.KeyValue;import org.apache.kafka.streams.StreamsBuilder;import org.apache.kafka.streams.Topology;import org.apache.kafka.streams.kstream.Consumed;import org.apache.kafka.streams.kstream.GlobalKTable;import org.apache.kafka.streams.kstream.KStream;import org.apache.kafka.streams.kstream.Printed;
import io.quarkus.kafka.client.serialization.ObjectMapperSerde;
@ApplicationScopedpublic class TopologyProducer {
private static final String MOVIES_TOPIC = "movies"; private static final String PLAY_MOVIES_TOPIC = "playtimemovies";
@Produces public Topology getTopCharts() {
final StreamsBuilder builder = new StreamsBuilder();
// 用于Movie和PlayedMovie的SerDes
final ObjectMapperSerde<Movie> movieSerder = new ObjectMapperSerde<>(Movie.class); final ObjectMapperSerde<MoviePlayed> moviePlayedSerder = new ObjectMapperSerde<>(MoviePlayed.class);
// 为Movies主题创建一个Global Kafka Table
final GlobalKTable<Integer, Movie> moviesTable = builder.globalTable( MOVIES_TOPIC, Consumed.with(Serdes.Integer(), movieSerder));
// 连接至playtimemovies主题的流,每当该主题有事件生成都会被该流所消费
final KStream<String, MoviePlayed> playEvents = builder.stream( PLAY_MOVIES_TOPIC, Consumed.with(Serdes.String(), moviePlayedSerder));
// PlayedMovies使用区域作为键,对象作为值。我们对内容进行map操作,让电影的id作为key(以便于进行连接)并让对象继续作为值// 另外,我们使用movies表的键(movieId)以及流的键(在前面的map方法中,我们也将其变成了movieId)进行连接
// 最后,结果会流向控制台
playEvents .map((key, value) -> KeyValue.pair(value.id, value)) // Now key is the id field .join(moviesTable, (movieId, moviePlayedId) -> movieId, (moviePlayed, movie) -> movie) .print(Printed.toSysOut()); return builder.build();
}}
复制代码


MovieMoviePlayed POJO 包含了实现逻辑所需的属性:


Movie对象如下所示:

package org.acme;
public class Movie {
public int id; public String name; public String director; public String genre;
public Movie(int id, String name, String director, String genre) { this.id = id; this.name = name; this.director = director; this.genre = genre; }}
复制代码


MoviePlayed对象如下所示:

package org.acme;
public class MoviePlayed {
public int id; public long duration;
public MoviePlayed(int id, long duration) { this.id = id; this.duration = duration; }
}
复制代码


运行 Kafka Stream 应用之前的最后一步是配置参数,其中最重要的是quarkus.kafka-streams.topics。它是一个主题列表,在拓扑结构开始处理数据之前,它们就要存在于 Kafka 集群中,这是一个前提条件。


打开src/main/resources/application.properties文件并添加如下的代码行:

kafka-streams.cache.max.bytes.buffering=10240kafka-streams.commit.interval.ms=1000kafka-streams.metadata.max.age.ms=500kafka-streams.auto.offset.reset=earliestkafka-streams.metrics.recording.level=DEBUG
quarkus.kafka-streams.topics=playtimemovies,movies
复制代码


现在,我们可以测试一下流了。我们启动在上一篇文章中开发的生产者。生产者的源码可以在这里找到。


Quarkus KStreams 集成了 Quarkus DevServices。所以,我们不需要启动 Kafka 集群,也不需要配置它的位置,因为 Quarkus Dev 模式会处理好所有的事情。我们只需要记住在自己的计算机上要有一个运行中的容器环境即可,比如 Podman 或其他兼容 OCI 的工具。


在终端窗口中启动生产者服务:

cd movie-plays-producer./mvnw compile quarkus:dev
2022-04-11 07:49:31,900 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Cruella played for 287 minutes2022-04-11 07:49:31,941 INFO [io.quarkus] (Quarkus Main Thread) movie-plays-producer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.7.3.Final) started in 4.256s.2022-04-11 07:49:31,942 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.2022-04-11 07:49:31,943 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kafka-client, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]2022-04-11 07:49:32,399 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Encanto played for 162 minutes2022-04-11 07:49:32,899 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie The Hobbit played for 255 minutes2022-04-11 07:49:33,404 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Sing 2 played for 264 minutes2022-04-11 07:49:33,902 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Encanto played for 28 minutes2022-04-11 07:49:34,402 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Star Trek: First Contact played for 137 minutes2022-04-11 07:49:34,903 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Star Trek: First Contact played for 277 minutes2022-04-11 07:49:35,402 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie The Hobbit played for 141 minutes
复制代码


在另一个终端窗口中,启动我们刚刚开发的 Kafka Stream 代码:

./mvnw compile quarkus:dev
2022-04-11 07:54:59,321 INFO [org.apa.kaf.str.pro.int.StreamTask] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-thread [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1] task [1_0] Restored and ready to run2022-04-11 07:54:59,322 INFO [org.apa.kaf.str.pro.int.StreamThread] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-thread [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1] Restoration took 74 ms for all tasks [1_0]2022-04-11 07:54:59,322 INFO [org.apa.kaf.str.pro.int.StreamThread] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-thread [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING2022-04-11 07:54:59,324 INFO [org.apa.kaf.str.KafkaStreams] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-client [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea] State transition from REBALANCING to RUNNING[KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella][KSTREAM-LEFTJOIN-0000000005]: 2, Movie [director=Jonathan Frakes, genre=Space, id=2, name=Star Trek: First Contact][KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella][KSTREAM-LEFTJOIN-0000000005]: 1, Movie [director=Peter Jackson, genre=Fantasy, id=1, name=The Hobbit][KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella][KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella][KSTREAM-LEFTJOIN-0000000005]: 3, Movie [director=Jared Bush, genre=Animation, id=3, name=Encanto][KSTREAM-LEFTJOIN-0000000005]: 5, Movie [director=Garth Jennings, genre=Jukebox Musical Comedy, id=5, name=Sing 2]
复制代码


在输出中打印的是事件(连接所生成的结果),其中键是movieId,值是movie本身。我们现在所具有的功能是,每当一个电影停止播放时,Kafka Stream 会对其进行处理并以 Movie 全量信息的形式对其进行展现。


到目前为止,还不算复杂,对于这样的场景,你可能会想我们根本没有必要使用 Kafka Streams。但是,我们再加一些需求,这样你就能看到它的强大之处了。


现在,我们不是在用户每次停掉电影的时候都生成事件,而是只对用户观看时间超过 10 分钟的电影发送事件。


我们可以使用filter方法按照持续时长进行过滤。

playEvents       .filter((region, event) -> event.duration >= 10) // filters by duration       .map((key, value) -> KeyValue.pair(value.id, value))       .join(moviesTable, (movieId, moviePlayedId) -> movieId, (moviePlayed, movie) -> movie)        .print(Printed.toSysOut());
复制代码


重启应用,我们可以发现观看时长小于 10 分钟的电影将不会被处理。


我们发现,Kafka Streams 有助于代码的整洁性,接下来,我们添加最后的需求。现在,我们对每部电影的播放时长并不感兴趣,而是对每部电影有多少次超过 10 分钟的播放感兴趣。


到目前为止,对事件的处理都是无状态的,因为事件都是遵循这样的步骤,即接收、处理并发送至 sink 处理器(也就是发送至一个主题或控制台输出),但是,为了统计某部电影播放的次数,我们需要在内存记住电影被播放了多少次,并且当任意用户再次观看超过 10 分钟的时候,要对这个统计数字递增一次。此时,事件的处理就要以有状态的方式进行了。


我们需要做的第一件事就是创建一个 Java 类,以存储电影的名称及其播放的次数。

public class MoviePlayCount {   public String name;   public int count;
public MoviePlayCount aggregate(String name) { this.name = name; this.count++;
return this; }
@Override public String toString() { return "MoviePlayCount [count=" + count + ", name=" + name + "]"; }
}
复制代码


这是一个计数器类,它依然需要两样东西:


  • 我们需要有一个地方存储这个类的实例,确保每次触发事件的时候,它不会被重置。

  • 每当_playtimemovies_主题中有事件触发时,调用aggregate方法的逻辑。


关于第一个问题,我们需要使用KeyValueBytesStoreSupplier接口。


public static final String COUNT_MOVIE_STORE = "countMovieStore";
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(COUNT_MOVIE_STORE);
复制代码


对于第二个问题,Kafka Streams 有一个用来聚合结果的aggregate方法。


在我们的用例中,也就是每部电影播放时长超过 10 分钟的次数。


// MoviePlayCount可以被序列化和反序列化ObjectMapperSerde<MoviePlayCount> moviePlayCountSerder = new ObjectMapperSerde<>(MoviePlayCount.class);
// 这是之前的连接操作,其中键是电影id,值是电影.join(moviesTable, (movieId, moviePlayedId) -> movieId, (moviePlayed, movie) -> movie)// 根据键对事件进行分组,在本例中,也就是电影的id.groupByKey(Grouped.with(Serdes.Integer(), movieSerder)) // 聚合方法,如果MoviePlayCount已经创建的话,获取该对象(如果尚未创建的话,会创建该实例)并调用其aggregate方法,以对观看次数进行递增 .aggregate(MoviePlayCount::new, (movieId, movie, moviePlayCounter) -> moviePlayCounter.aggregate(movie.name), Materialized.<Integer, MoviePlayCount> as(storeSupplier) .withKeySerde(Serdes.Integer()) .withValueSerde(moviePlayCountSerder) )
复制代码


重启应用,在控制台上将会展示电影播放的次数。


提示:要重启应用,只需在终端输入“s”,应用将会自动重启。


应用重启之后,控制台将会展示每部电影的状态。


[KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=13, name=Cruella][KTABLE-TOSTREAM-0000000011]: 3, MoviePlayCount [count=11, name=Encanto][KTABLE-TOSTREAM-0000000011]: 5, MoviePlayCount [count=14, name=Sing 2][KTABLE-TOSTREAM-0000000011]: 2, MoviePlayCount [count=15, name=Star Trek: First Contact][KTABLE-TOSTREAM-0000000011]: 1, MoviePlayCount [count=16, name=The Hobbit][KTABLE-TOSTREAM-0000000011]: 2, MoviePlayCount [count=16, name=Star Trek: First Contact][KTABLE-TOSTREAM-0000000011]: 3, MoviePlayCount [count=12, name=Encanto][KTABLE-TOSTREAM-0000000011]: 2, MoviePlayCount [count=17, name=Star Trek: First Contact][KTABLE-TOSTREAM-0000000011]: 5, MoviePlayCount [count=15, name=Sing 2][KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=14, name=Cruella][KTABLE-TOSTREAM-0000000011]: 1, MoviePlayCount [count=17, name=The Hobbit][KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=15, name=Cruella][KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=16, name=Cruella]
复制代码


交互式查询


因为我们将_sink_处理器设置成了System.out 流,所以聚合结果会流向控制台。


.toStream().print(Printed.toSysOut());
复制代码


但是,我们也可以将结果流发往一个 Kafka 主题:


.to("counter_movies",                      Produced.with(Serdes.Integer(), moviePlayCountSerder));
复制代码


但是,如果我们感兴趣的不是每次对新事件的反应,而只是想查询特定的电影此时播放的次数,那又该怎么办呢?


Kafka Streams 的交互式查询(interactive query)允许我们直接查询底层存储,以获取给定键相关的值。


首先,我们创建一个名为MoviePlayCountData的类来存储查询结果。按照这种方式,我们可以解耦 Kafka Streams 使用的类与应用中其他部分所使用的类。


public class MoviePlayCountData {
private String name; private int count;
public MoviePlayCountData(String name, int count) { this.name = name; this.count = count; }
public int getCount() { return count; }
public String getName() { return name; }
}
复制代码


现在,创建名为InteractiveQueries的类来实现对状态存储(KeyValueBytesStoreSupplier)的访问并根据电影的id查询它被播放的次数。


import javax.enterprise.context.ApplicationScoped;import javax.inject.Inject;
import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.errors.InvalidStateStoreException;import org.apache.kafka.streams.state.QueryableStoreTypes;import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import static org.apache.kafka.streams.StoreQueryParameters.fromNameAndType;
import java.util.Optional;
@ApplicationScopedpublic class InteractiveQueries {
@Inject KafkaStreams streams;
public <MoviePlayCountData> getMoviePlayCountData(int id) { // 获取状态存储并根据电影id获取播放次数 MoviePlayCount moviePlayCount = getMoviesPlayCount().get(id); // 如果有结果的话 if (moviePlayCount != null) { // 将结果包装到MoviePlayCountData中 return Optional.of(new MoviePlayCountData(moviePlayCount.name, moviePlayCount.count)); } else { return Optional.empty(); } }
// 获取状态存储 private ReadOnlyKeyValueStore<Integer, MoviePlayCount> getMoviesPlayCount() { while (true) { try { return streams.store(fromNameAndType(TopologyProducer.COUNT_MOVIE_STORE, QueryableStoreTypes.keyValueStore())); } catch (InvalidStateStoreException e) { // 忽略之,此时存储尚未就绪 } } }
}
复制代码


现在,我们可以添加一个简单的 REST 端点来运行该查询。

import java.util.Optional;
import javax.inject.Inject;import javax.ws.rs.GET;import javax.ws.rs.Path;import javax.ws.rs.PathParam;import javax.ws.rs.core.Response;import javax.ws.rs.core.Response.Status;
@Path("/movie")public class MovieCountResource {
// 注入前文的类进行查询 @Inject InteractiveQueries interactiveQueries;
@GET @Path("/data/{id}") public Response movieCountData(@PathParam("id") int id) { Optional<MoviePlayCountData> moviePlayCountData = interactiveQueries.getMoviePlayCountData(id);
// 根据结果判定返回值还是404 if (moviePlayCountData.isPresent()) { return Response.ok(moviePlayCountData.get()).build(); } else { return Response.status(Status.NOT_FOUND.getStatusCode(), "No data found for movie " + id).build(); }
}}
复制代码


Kafka Stream 实现的模式如下图所示:


扩展

Kafka Streams 应用可以进行扩展,所以流会分布到多个实例中。在这种情况下,每个实例都包含聚合结果的一个子集,所以要想获得总的聚合结果,我们需要通过将 REST API 重定向到另外的实例来获取其他实例的数据。


Kafka Streams 提供了一个API,可以知道要请求的数据是在本地 Kafka Streams 存储中还是在其他的主机中。


虽然这个过程并不复杂,但已经超出了本文的讨论范围。

结论

到目前为止,我们已经看到,将 Quarkus 应用连接到 Apache Kafka 并生产和消费主题中的消息/事件是很容易的。此外,还看到 Kafka Streams 让我们不仅可以消费 Kafka 中消息,还能够实时处理它们,进行转换、过滤等操作,例如以同步的方式消费结果数据。 这是一项强大的技术,当需要处理的数据不断变化时,它可以轻松扩展,提供实时的体验。


但是,我们还没有解决该架构的最后一个问题。通常情况下,数据并不是存储在一个地方。电影信息可能存储在关系型数据库中,而电影的播放信息则存储在一个 Kafka 主题中。那么,该如何保持这两个地方的信息更新,以便 Kafka Streams 能正确地连接数据呢?


这里有一个缺失的部分,名为 Debezium 的项目可以帮助我们解决这个问题。我们将用一整篇文章来介绍 Debezium 和 Quarkus,敬请持续关注。


作者简介:

Alex Soto 是红帽公司的开发者体验总监。他对 Java 领域、软件自动化充满热情,他相信开源软件模式。Soto 是 Manning 的《Testing Java Microservices》和 O’Reilly 的《Quarkus Cookbook》两本书的共同作者,他还是多个开源项目的贡献者。自 2017 年以来,他一直是 Java Champion,是国际演讲者和 Salle URL 大学的教师。你可以在 Twitter 上关注他(Alex Soto ),随时了解 Kubernetes 和 Java 领域的动态。


原文链接:

Kafka Streams and Quarkus: Real-Time Processing Events


相关阅读:

本系列第一部分:使用 Apache Kafka 实现 Quarkus 的反应式消息

2022-08-25 11:005308

评论

发布
暂无评论
发现更多内容

科技快讯丨浪潮海岳低代码平台inBuilder全面亮相开放原子开发者大会

inBuilder低代码平台

开源 低代码

数字人SaaS系统无限生成AI数字人!

青否数字人

数字人

湖南省岳阳市君山公安分局与合合信息旗下启信宝达成战略合作

合合技术团队

大数据 科技 合合信息 警务技术 启信宝

玩转Spring状态机 | 京东云技术团队

京东科技开发者

spring 设计模式 状态机 状态模式 spring状态机

【最佳实践】京东小程序-LBS业务场景的性能提升 | 京东云技术团队

京东科技开发者

小程序 性能优化 前端 LBS

第一视角现场探展!带你揭秘爱采购生成式AI如何提效赋能

科技热闻

软件开发

Geek_8da502

拳头游戏即将完成全球基础设施全面迁移至亚马逊云科技

财见

为什么AI数字人直播被判定为录播?

青否数字人

数字人

名企私教服务加盟全栈开发与自动化测试班,成就你的技术梦想

霍格沃兹测试开发学社

云图说丨初识华为云OrgID

华为云开发者联盟

云计算 后端 华为云 华为云开发者联盟 华为云云图说

2023技术总结——AI成为了我日常不可缺少的部分

StackOverflow

AI 模型 GPT

实践解析HPA各关联组件扭转关系

华为云开发者联盟

云原生 后端 华为云 华为云开发者联盟

Different WiFi cards -QCN9274 and QCN6274- Different performance

wifi6-yiyi

network wifi wireless WiFi card

什么是 DDoS ?如何识别DDoS?怎么应对DDOS攻击

德迅云安全杨德俊

Instagram下载软件 4K Stogram 免激活最新版

mac大玩家j

Mac软件推荐 下载工具

我的深度学习项目经验分享

小王撤了

AI

Python终于可以自动收发邮件了,1行代码实现

程序员晚枫

Python 邮件

AI数字人线下互动大屏,提升智能交互服务体验!

青否数字人

专业raw图像处理 DxO PhotoLab 7激活版中文最新

胖墩儿不胖y

Mac软件 图像处理工具 raw图像处理

让数据成为生产力,用友时序数据库为智能制造按下“瞬时加速键”

用友BIP

如何使用 Helm 在 K8s 上集成 Prometheus 和 Grafana|Part 1

SEAL安全

Helm kubernetes 运维 企业号12月PK榜

5家券商综合评级上升,11月券商App终端业务体验评测报告发布

博睿数据

IPQ8072A and IPQ8074A processors: performance comparison and innovation diff

wallysSK

硬件 嵌入式 IPQ8074A wifi solution IPQ8072A

淘宝商品详情数据接口使用方法介绍

tbapi

淘宝电商 淘宝商品详情数据接口 淘宝API接口 淘宝数据采集

一文让你简单了解软堡垒机和云堡垒机的主要区别

行云管家

云计算 网络安全 堡垒机 上云

DxO FilmPack 7 for Mac:摄影师的创意伙伴

影影绰绰一往直前

基础知识回顾:安装 NGINX 开源版和 NGINX Plus

NGINX开源社区

web服务器 keepalive nginx 开源版 NGINX PLUS NGINX repo

手把手教你使用ArkTS中的canvas实现签名板功能

HarmonyOS开发者

HarmonyOS

从西工大安全事件浅谈特权账号管理系统

尚思卓越

数据库 网络安全

DxO PhotoLab 6 for Mac(raw图片处理软件) 6.12.0.62中文破解版

mac

苹果mac Windows软件 DxO PhotoLab 6 图片处理软件

Kafka Streams与Quarkus:实时处理事件_云原生_Alex Soto_InfoQ精选文章