【AICon】探索八个行业创新案例,教你在教育、金融、医疗、法律等领域实践大模型技术! >>> 了解详情
写点什么

Kafka Streams 与 Quarkus:实时处理事件

  • 2022-08-25
    北京
  • 本文字数:10072 字

    阅读完需:约 33 分钟

Kafka Streams与Quarkus:实时处理事件

在本系列的第一部分中,我们学习了Apache KafkaQuarkus的集成,并开发了一个简单的应用,从两个 Kafka 主题生产和消费事件。


在那个样例中,我们模拟了一个影视流公司,在一个 Kafka 主题中存储电影信息,在另外一个主题中存储了用户停止观看电影时所发生的每个事件,并捕获了电影已播放的时间。


下图展示了该应用的架构:



我们可以看到,消费消息很简单,只要有消息生成,我们就可以得到它们,但是除此之外,我们也做不了其他的事情了。如果我们需要实时处理数据(比如过滤或操作事件)或者我们需要在事件之间做一些关联,单纯使用 Kafka 的消费 API 可能就不是最佳的方式了,因为这会导致代码非常复杂。

Kafka Streams

Kafka Streams项目能够帮助我们在事件产生时实时消费它们,应用各种转换,执行流连接等,并且可以选择性地将新的数据表述写回主题中。


对于有状态和无状态的流应用来说,Kafka Streams 都是理想方案,它能够实现基于时间的操作(例如,围绕给定的时间段对事件进行分组),并且考虑到了 Kafka 生态系统中普遍存在的可扩展性、可靠性和可维护性。


Kafka Stream 由三个元素组成,即输入(源处理器)、输出(sink 处理器)和处理器(流处理器)。


源处理器(Source processor):源处理器代表一个 Kafka 主题。源处理器会发送事件到一个或多个流处理器中。


流处理器(Stream processor):流处理器会将转换/逻辑应用于输入流中,比如连接、分组、计数、映射等。流处理器可以连接至另一个流处理器和/或 sink 处理器。


Sink 处理器(Sink processor):Sink 处理器代表了输出的数据,它会连接至一个 Kafka 主题。


**拓扑结构(topology)**是由源、处理器和 sink 组成的无循环图,事件会传入到一个 Kafka Streams 中,该实例将开始拓扑结构的执行。

Kafka Streams 和 Quarkus

Quarkus 使用 Quarkus KStreams 扩展实现与Kafka Streams集成。

Quarkus 起步

使用 Quarkus 最快捷的方式是通过初始化页面添加所需的依赖。每个服务可能需要不同的依赖,你可以选择 Java 11 或 Java 17。为了实现 Quarkus 与 Kafka Streams 的集成,我们至少需要添加 Kafka Streams 扩展。


要开发的应用

正如在本文开始时所提到的,在本系列的第一部分中,我们开发了一个影视流公司,它有两个 Kafka 主题,其中一个用来存储电影的列表,另外一个主题会在用户停止播放电影时存储用户所在的区域(事件的键),并且会以电影 id 和播放时间作为事件的值。



所有的这些逻辑都是在名为Movie Plays Producer的生成者服务中创建的,该服务是使用 Quarkus 开发的。


除此之外,我还使用 Quakus 开发了一个Movie Plays Consumer服务,它会消费这两个主题的事件并且会在控制台上展示它们(并实现了 HTTP 服务器端事件)。


但是,这里没有对数据进行任何处理,它只是按照原样进行了接收。如果我们想要在 movies 和 playtimemovies 主题之间进行一下连接,在获取电影播放时长的时候得到电影的详细信息而不是 id 的话,那又该怎么办呢?



如果仅仅使用 Kafka 消息来实现这样的逻辑会变成一项很复杂的任务,因为我们需要在一个 Map 中存储 Movie 信息,并且在每个 playedmovie 事件发生时,进行匹配处理。

Movie Plays KStream

与其为每个用例手工编写代码,不如看一下如何使用 Kafka Streams,以及它是如何与 Quarkus 集成来解决这个问题的。

创建项目

导航至 Quarkus 的初始化页面,并选择 Apache Kafka Streams 扩展来实现与 Kafka Streams 的集成。然后,选择 RestEasy 和 RestEasy Jackson 扩展实现事件从 Java 对象和 JSON 之间的编排/解排。同时,取消选中 Started Code 生成选项。


请参照下面的截图:



你也可以跳过这个手动的步骤并导航至Kafka Stream Quarkus Generator链接,在这里,所有的依赖都已经选择好了。然后,点击 Generate your application 按钮,以下载应用骨架的压缩文件。


解压文件,并在你最喜欢的 IDE 中打开项目。

开发

当开发 Kafka Stream 应用时,我们需要做的第一件事就是创建Topology实例,并定义源、处理器和 sink。


在 Quarkus 中,我们只需要创建一个_CDI_类,这个类需要包含一个返回Topology实例的方法。


创建名为TopologyProducer的类,它将会实现从这两个主题消费事件并连接它们的逻辑。最后,生成的结果将会发送至一个 sink 处理器,该处理器以控制台输出的形式展示结果。


还有一个元素我们没有提到,在这些场景中它非常有用,那就是 Kafka Tables。


一个主题可以包含具有相同键的多个事件。例如,我们可以使用某个键插入一个电影,然后我们可以使用相同的键创建一个新的事件来对电影进行更新:



但是,如果我们想要让 movies 主题与_playtimemovies_主题进行连接的话,那我们该选择使用哪个值为 1 的事件呢?第一个还是第二个?在这个具体的情况中,应该选择最新的那一个,因为它包含了电影的最新版本。为了获取每个事件的最新版本,Kafka Streams 有一个_表_的概念(KTable/GlobalKTable)。


Kafka Streams 会浏览指定的主题,获取每个事件的最新版本,并将其放到一个表实例中。


KafkaStream 扩展并不会像 Kafka Messaging 集成那样自动注册SerDes,所以我们需要在拓扑中手动注册它们。


package org.acme;
import javax.enterprise.context.ApplicationScoped;import javax.enterprise.inject.Produces;
import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.KeyValue;import org.apache.kafka.streams.StreamsBuilder;import org.apache.kafka.streams.Topology;import org.apache.kafka.streams.kstream.Consumed;import org.apache.kafka.streams.kstream.GlobalKTable;import org.apache.kafka.streams.kstream.KStream;import org.apache.kafka.streams.kstream.Printed;
import io.quarkus.kafka.client.serialization.ObjectMapperSerde;
@ApplicationScopedpublic class TopologyProducer {
private static final String MOVIES_TOPIC = "movies"; private static final String PLAY_MOVIES_TOPIC = "playtimemovies";
@Produces public Topology getTopCharts() {
final StreamsBuilder builder = new StreamsBuilder();
// 用于Movie和PlayedMovie的SerDes
final ObjectMapperSerde<Movie> movieSerder = new ObjectMapperSerde<>(Movie.class); final ObjectMapperSerde<MoviePlayed> moviePlayedSerder = new ObjectMapperSerde<>(MoviePlayed.class);
// 为Movies主题创建一个Global Kafka Table
final GlobalKTable<Integer, Movie> moviesTable = builder.globalTable( MOVIES_TOPIC, Consumed.with(Serdes.Integer(), movieSerder));
// 连接至playtimemovies主题的流,每当该主题有事件生成都会被该流所消费
final KStream<String, MoviePlayed> playEvents = builder.stream( PLAY_MOVIES_TOPIC, Consumed.with(Serdes.String(), moviePlayedSerder));
// PlayedMovies使用区域作为键,对象作为值。我们对内容进行map操作,让电影的id作为key(以便于进行连接)并让对象继续作为值// 另外,我们使用movies表的键(movieId)以及流的键(在前面的map方法中,我们也将其变成了movieId)进行连接
// 最后,结果会流向控制台
playEvents .map((key, value) -> KeyValue.pair(value.id, value)) // Now key is the id field .join(moviesTable, (movieId, moviePlayedId) -> movieId, (moviePlayed, movie) -> movie) .print(Printed.toSysOut()); return builder.build();
}}
复制代码


MovieMoviePlayed POJO 包含了实现逻辑所需的属性:


Movie对象如下所示:

package org.acme;
public class Movie {
public int id; public String name; public String director; public String genre;
public Movie(int id, String name, String director, String genre) { this.id = id; this.name = name; this.director = director; this.genre = genre; }}
复制代码


MoviePlayed对象如下所示:

package org.acme;
public class MoviePlayed {
public int id; public long duration;
public MoviePlayed(int id, long duration) { this.id = id; this.duration = duration; }
}
复制代码


运行 Kafka Stream 应用之前的最后一步是配置参数,其中最重要的是quarkus.kafka-streams.topics。它是一个主题列表,在拓扑结构开始处理数据之前,它们就要存在于 Kafka 集群中,这是一个前提条件。


打开src/main/resources/application.properties文件并添加如下的代码行:

kafka-streams.cache.max.bytes.buffering=10240kafka-streams.commit.interval.ms=1000kafka-streams.metadata.max.age.ms=500kafka-streams.auto.offset.reset=earliestkafka-streams.metrics.recording.level=DEBUG
quarkus.kafka-streams.topics=playtimemovies,movies
复制代码


现在,我们可以测试一下流了。我们启动在上一篇文章中开发的生产者。生产者的源码可以在这里找到。


Quarkus KStreams 集成了 Quarkus DevServices。所以,我们不需要启动 Kafka 集群,也不需要配置它的位置,因为 Quarkus Dev 模式会处理好所有的事情。我们只需要记住在自己的计算机上要有一个运行中的容器环境即可,比如 Podman 或其他兼容 OCI 的工具。


在终端窗口中启动生产者服务:

cd movie-plays-producer./mvnw compile quarkus:dev
2022-04-11 07:49:31,900 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Cruella played for 287 minutes2022-04-11 07:49:31,941 INFO [io.quarkus] (Quarkus Main Thread) movie-plays-producer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.7.3.Final) started in 4.256s.2022-04-11 07:49:31,942 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.2022-04-11 07:49:31,943 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kafka-client, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]2022-04-11 07:49:32,399 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Encanto played for 162 minutes2022-04-11 07:49:32,899 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie The Hobbit played for 255 minutes2022-04-11 07:49:33,404 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Sing 2 played for 264 minutes2022-04-11 07:49:33,902 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Encanto played for 28 minutes2022-04-11 07:49:34,402 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Star Trek: First Contact played for 137 minutes2022-04-11 07:49:34,903 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Star Trek: First Contact played for 277 minutes2022-04-11 07:49:35,402 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie The Hobbit played for 141 minutes
复制代码


在另一个终端窗口中,启动我们刚刚开发的 Kafka Stream 代码:

./mvnw compile quarkus:dev
2022-04-11 07:54:59,321 INFO [org.apa.kaf.str.pro.int.StreamTask] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-thread [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1] task [1_0] Restored and ready to run2022-04-11 07:54:59,322 INFO [org.apa.kaf.str.pro.int.StreamThread] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-thread [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1] Restoration took 74 ms for all tasks [1_0]2022-04-11 07:54:59,322 INFO [org.apa.kaf.str.pro.int.StreamThread] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-thread [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING2022-04-11 07:54:59,324 INFO [org.apa.kaf.str.KafkaStreams] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-client [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea] State transition from REBALANCING to RUNNING[KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella][KSTREAM-LEFTJOIN-0000000005]: 2, Movie [director=Jonathan Frakes, genre=Space, id=2, name=Star Trek: First Contact][KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella][KSTREAM-LEFTJOIN-0000000005]: 1, Movie [director=Peter Jackson, genre=Fantasy, id=1, name=The Hobbit][KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella][KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella][KSTREAM-LEFTJOIN-0000000005]: 3, Movie [director=Jared Bush, genre=Animation, id=3, name=Encanto][KSTREAM-LEFTJOIN-0000000005]: 5, Movie [director=Garth Jennings, genre=Jukebox Musical Comedy, id=5, name=Sing 2]
复制代码


在输出中打印的是事件(连接所生成的结果),其中键是movieId,值是movie本身。我们现在所具有的功能是,每当一个电影停止播放时,Kafka Stream 会对其进行处理并以 Movie 全量信息的形式对其进行展现。


到目前为止,还不算复杂,对于这样的场景,你可能会想我们根本没有必要使用 Kafka Streams。但是,我们再加一些需求,这样你就能看到它的强大之处了。


现在,我们不是在用户每次停掉电影的时候都生成事件,而是只对用户观看时间超过 10 分钟的电影发送事件。


我们可以使用filter方法按照持续时长进行过滤。

playEvents       .filter((region, event) -> event.duration >= 10) // filters by duration       .map((key, value) -> KeyValue.pair(value.id, value))       .join(moviesTable, (movieId, moviePlayedId) -> movieId, (moviePlayed, movie) -> movie)        .print(Printed.toSysOut());
复制代码


重启应用,我们可以发现观看时长小于 10 分钟的电影将不会被处理。


我们发现,Kafka Streams 有助于代码的整洁性,接下来,我们添加最后的需求。现在,我们对每部电影的播放时长并不感兴趣,而是对每部电影有多少次超过 10 分钟的播放感兴趣。


到目前为止,对事件的处理都是无状态的,因为事件都是遵循这样的步骤,即接收、处理并发送至 sink 处理器(也就是发送至一个主题或控制台输出),但是,为了统计某部电影播放的次数,我们需要在内存记住电影被播放了多少次,并且当任意用户再次观看超过 10 分钟的时候,要对这个统计数字递增一次。此时,事件的处理就要以有状态的方式进行了。


我们需要做的第一件事就是创建一个 Java 类,以存储电影的名称及其播放的次数。

public class MoviePlayCount {   public String name;   public int count;
public MoviePlayCount aggregate(String name) { this.name = name; this.count++;
return this; }
@Override public String toString() { return "MoviePlayCount [count=" + count + ", name=" + name + "]"; }
}
复制代码


这是一个计数器类,它依然需要两样东西:


  • 我们需要有一个地方存储这个类的实例,确保每次触发事件的时候,它不会被重置。

  • 每当_playtimemovies_主题中有事件触发时,调用aggregate方法的逻辑。


关于第一个问题,我们需要使用KeyValueBytesStoreSupplier接口。


public static final String COUNT_MOVIE_STORE = "countMovieStore";
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(COUNT_MOVIE_STORE);
复制代码


对于第二个问题,Kafka Streams 有一个用来聚合结果的aggregate方法。


在我们的用例中,也就是每部电影播放时长超过 10 分钟的次数。


// MoviePlayCount可以被序列化和反序列化ObjectMapperSerde<MoviePlayCount> moviePlayCountSerder = new ObjectMapperSerde<>(MoviePlayCount.class);
// 这是之前的连接操作,其中键是电影id,值是电影.join(moviesTable, (movieId, moviePlayedId) -> movieId, (moviePlayed, movie) -> movie)// 根据键对事件进行分组,在本例中,也就是电影的id.groupByKey(Grouped.with(Serdes.Integer(), movieSerder)) // 聚合方法,如果MoviePlayCount已经创建的话,获取该对象(如果尚未创建的话,会创建该实例)并调用其aggregate方法,以对观看次数进行递增 .aggregate(MoviePlayCount::new, (movieId, movie, moviePlayCounter) -> moviePlayCounter.aggregate(movie.name), Materialized.<Integer, MoviePlayCount> as(storeSupplier) .withKeySerde(Serdes.Integer()) .withValueSerde(moviePlayCountSerder) )
复制代码


重启应用,在控制台上将会展示电影播放的次数。


提示:要重启应用,只需在终端输入“s”,应用将会自动重启。


应用重启之后,控制台将会展示每部电影的状态。


[KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=13, name=Cruella][KTABLE-TOSTREAM-0000000011]: 3, MoviePlayCount [count=11, name=Encanto][KTABLE-TOSTREAM-0000000011]: 5, MoviePlayCount [count=14, name=Sing 2][KTABLE-TOSTREAM-0000000011]: 2, MoviePlayCount [count=15, name=Star Trek: First Contact][KTABLE-TOSTREAM-0000000011]: 1, MoviePlayCount [count=16, name=The Hobbit][KTABLE-TOSTREAM-0000000011]: 2, MoviePlayCount [count=16, name=Star Trek: First Contact][KTABLE-TOSTREAM-0000000011]: 3, MoviePlayCount [count=12, name=Encanto][KTABLE-TOSTREAM-0000000011]: 2, MoviePlayCount [count=17, name=Star Trek: First Contact][KTABLE-TOSTREAM-0000000011]: 5, MoviePlayCount [count=15, name=Sing 2][KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=14, name=Cruella][KTABLE-TOSTREAM-0000000011]: 1, MoviePlayCount [count=17, name=The Hobbit][KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=15, name=Cruella][KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=16, name=Cruella]
复制代码


交互式查询


因为我们将_sink_处理器设置成了System.out 流,所以聚合结果会流向控制台。


.toStream().print(Printed.toSysOut());
复制代码


但是,我们也可以将结果流发往一个 Kafka 主题:


.to("counter_movies",                      Produced.with(Serdes.Integer(), moviePlayCountSerder));
复制代码


但是,如果我们感兴趣的不是每次对新事件的反应,而只是想查询特定的电影此时播放的次数,那又该怎么办呢?


Kafka Streams 的交互式查询(interactive query)允许我们直接查询底层存储,以获取给定键相关的值。


首先,我们创建一个名为MoviePlayCountData的类来存储查询结果。按照这种方式,我们可以解耦 Kafka Streams 使用的类与应用中其他部分所使用的类。


public class MoviePlayCountData {
private String name; private int count;
public MoviePlayCountData(String name, int count) { this.name = name; this.count = count; }
public int getCount() { return count; }
public String getName() { return name; }
}
复制代码


现在,创建名为InteractiveQueries的类来实现对状态存储(KeyValueBytesStoreSupplier)的访问并根据电影的id查询它被播放的次数。


import javax.enterprise.context.ApplicationScoped;import javax.inject.Inject;
import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.errors.InvalidStateStoreException;import org.apache.kafka.streams.state.QueryableStoreTypes;import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import static org.apache.kafka.streams.StoreQueryParameters.fromNameAndType;
import java.util.Optional;
@ApplicationScopedpublic class InteractiveQueries {
@Inject KafkaStreams streams;
public <MoviePlayCountData> getMoviePlayCountData(int id) { // 获取状态存储并根据电影id获取播放次数 MoviePlayCount moviePlayCount = getMoviesPlayCount().get(id); // 如果有结果的话 if (moviePlayCount != null) { // 将结果包装到MoviePlayCountData中 return Optional.of(new MoviePlayCountData(moviePlayCount.name, moviePlayCount.count)); } else { return Optional.empty(); } }
// 获取状态存储 private ReadOnlyKeyValueStore<Integer, MoviePlayCount> getMoviesPlayCount() { while (true) { try { return streams.store(fromNameAndType(TopologyProducer.COUNT_MOVIE_STORE, QueryableStoreTypes.keyValueStore())); } catch (InvalidStateStoreException e) { // 忽略之,此时存储尚未就绪 } } }
}
复制代码


现在,我们可以添加一个简单的 REST 端点来运行该查询。

import java.util.Optional;
import javax.inject.Inject;import javax.ws.rs.GET;import javax.ws.rs.Path;import javax.ws.rs.PathParam;import javax.ws.rs.core.Response;import javax.ws.rs.core.Response.Status;
@Path("/movie")public class MovieCountResource {
// 注入前文的类进行查询 @Inject InteractiveQueries interactiveQueries;
@GET @Path("/data/{id}") public Response movieCountData(@PathParam("id") int id) { Optional<MoviePlayCountData> moviePlayCountData = interactiveQueries.getMoviePlayCountData(id);
// 根据结果判定返回值还是404 if (moviePlayCountData.isPresent()) { return Response.ok(moviePlayCountData.get()).build(); } else { return Response.status(Status.NOT_FOUND.getStatusCode(), "No data found for movie " + id).build(); }
}}
复制代码


Kafka Stream 实现的模式如下图所示:


扩展

Kafka Streams 应用可以进行扩展,所以流会分布到多个实例中。在这种情况下,每个实例都包含聚合结果的一个子集,所以要想获得总的聚合结果,我们需要通过将 REST API 重定向到另外的实例来获取其他实例的数据。


Kafka Streams 提供了一个API,可以知道要请求的数据是在本地 Kafka Streams 存储中还是在其他的主机中。


虽然这个过程并不复杂,但已经超出了本文的讨论范围。

结论

到目前为止,我们已经看到,将 Quarkus 应用连接到 Apache Kafka 并生产和消费主题中的消息/事件是很容易的。此外,还看到 Kafka Streams 让我们不仅可以消费 Kafka 中消息,还能够实时处理它们,进行转换、过滤等操作,例如以同步的方式消费结果数据。 这是一项强大的技术,当需要处理的数据不断变化时,它可以轻松扩展,提供实时的体验。


但是,我们还没有解决该架构的最后一个问题。通常情况下,数据并不是存储在一个地方。电影信息可能存储在关系型数据库中,而电影的播放信息则存储在一个 Kafka 主题中。那么,该如何保持这两个地方的信息更新,以便 Kafka Streams 能正确地连接数据呢?


这里有一个缺失的部分,名为 Debezium 的项目可以帮助我们解决这个问题。我们将用一整篇文章来介绍 Debezium 和 Quarkus,敬请持续关注。


作者简介:

Alex Soto 是红帽公司的开发者体验总监。他对 Java 领域、软件自动化充满热情,他相信开源软件模式。Soto 是 Manning 的《Testing Java Microservices》和 O’Reilly 的《Quarkus Cookbook》两本书的共同作者,他还是多个开源项目的贡献者。自 2017 年以来,他一直是 Java Champion,是国际演讲者和 Salle URL 大学的教师。你可以在 Twitter 上关注他(Alex Soto ),随时了解 Kubernetes 和 Java 领域的动态。


原文链接:

Kafka Streams and Quarkus: Real-Time Processing Events


相关阅读:

本系列第一部分:使用 Apache Kafka 实现 Quarkus 的反应式消息

2022-08-25 11:005322

评论

发布
暂无评论
发现更多内容

defi质押LP流动性挖矿dapp系统开发详情(案例)

开发微hkkf5566

设备离线时控制指令如何下发:通过设备影子实现离线设备的控制指令触达方案——设备管理运维类

阿里云AIoT

物联网

解密数仓高可用failover流程

华为云开发者联盟

数据库 后端 华为云 华为云开发者联盟 企业号 3 月 PK 榜

博睿“她”力量 :这份专业值得信赖

博睿数据

博睿数据 节日祝福

排序算法 Quick Sort

控心つcrazy

JavaScript 面试 前端 数据结构算法 算法、

面向新时代,海泰方圆战略升级!“1465”隆重发布!

电子信息发烧客

ChatGPT作者John Schulman:我们成功的秘密武器

OneFlow

人工智能 深度学习 ChatGPT

数据安全特点有哪些?现在企业如何保障数据安全?

行云管家

数据安全 堡垒机 数据泄露

Java面试一个月,心态崩了……

程序知音

Java java面试 Java进阶 后端技术 Java面试八股文

易观分析:银保监会成为“历史”,金融行业将面临哪些重点影响?

易观分析

金融 经济

喜马拉雅基于DeepRec构建AI平台实践

阿里云大数据AI技术

人工智能 深度学习 推理 企业号 3 月 PK 榜 稀疏学习

瓴羊Quick BI更合适“中国式报表”需求!

巷子

车载小程序发展现状:使用环境、用户体验、应用场景及未来趋势

没有用户名丶

小程序化

如何判断多账号是同一个人?用图技术搞定 ID Mapping

NebulaGraph

图数据库 风险控制 安全控制

中小企业需要统一的快速开发平台吗?

力软低代码开发平台

喜讯!阿里云数据库PolarDB荣获第12届PostgreSQL中国技术大会“开源数据库杰出贡献奖”

阿里云数据库开源

开源数据库 polarDB 阿里云数据库 PolarDB-PG PolarDB for PostgreSQL

【物联网开发实战】- 设备上云方案详解——设备接入类

阿里云AIoT

物联网 传感器

bucket表:数仓存算分离中CU与DN解绑的关键

华为云开发者联盟

数据库 后端 华为云 华为云开发者联盟 企业号 3 月 PK 榜

GitLab 凭借什么连续 3 年上榜 Gartner 应用程序安全测试魔力象限?听听 GitLab 自己的分析

极狐GitLab

DevOps DevSecOps 安全测试 极狐GitLab 安全合规

如何通过C#/VB.NET代码在Word中插入或删除脚注

在下毛毛雨

C# .net word 脚注

云图说丨云数据库GaussDB(for MySQL)事务拆分大揭秘

华为云开发者联盟

数据库 后端 华为云 华为云开发者联盟 企业号 3 月 PK 榜

DLRover:蚂蚁开源大规模智能分布式训练系统

SOFAStack

人工智能 互联网 DLRover

云计算生态该怎么做?阿里云计算巢打了个样

云布道师

云计算 阿里云

IoT平台设备标签功能和规则引擎组合最佳实践——设备接入类

阿里云AIoT

sql 监控 物联网 API 定位技术

DLRover:蚂蚁开源大规模智能分布式训练系统

AI Infra

互联网 智能 训练智能

及刻周边惠:拥抱HarmonyOS原子化服务

HarmonyOS开发者

HarmonyOS

汇率市场大幅波动,用友BIP全球司库助力企业外汇避险

用友BIP

金融 外汇避险

什么是大前端技术?微信小程序用户占比达25%

没有用户名丶

物联网平台提醒欠费该如何查询和处理?——普及类

阿里云AIoT

物联网

浪潮 KaiwuDB x 山东重工 | 打造离散制造业 IIoT 标杆解决方案

KaiwuDB

数据库 iiot 制造业

什么是信创产品?怎么成为信创产品?

行云管家

信创 国产化

Kafka Streams与Quarkus:实时处理事件_云原生_Alex Soto_InfoQ精选文章