“AI 技术+人才”如何成为企业增长新引擎?戳此了解>>> 了解详情
写点什么

Kafka Streams 与 Quarkus:实时处理事件

  • 2022-08-25
    北京
  • 本文字数:10072 字

    阅读完需:约 33 分钟

Kafka Streams与Quarkus:实时处理事件

在本系列的第一部分中,我们学习了Apache KafkaQuarkus的集成,并开发了一个简单的应用,从两个 Kafka 主题生产和消费事件。


在那个样例中,我们模拟了一个影视流公司,在一个 Kafka 主题中存储电影信息,在另外一个主题中存储了用户停止观看电影时所发生的每个事件,并捕获了电影已播放的时间。


下图展示了该应用的架构:



我们可以看到,消费消息很简单,只要有消息生成,我们就可以得到它们,但是除此之外,我们也做不了其他的事情了。如果我们需要实时处理数据(比如过滤或操作事件)或者我们需要在事件之间做一些关联,单纯使用 Kafka 的消费 API 可能就不是最佳的方式了,因为这会导致代码非常复杂。

Kafka Streams

Kafka Streams项目能够帮助我们在事件产生时实时消费它们,应用各种转换,执行流连接等,并且可以选择性地将新的数据表述写回主题中。


对于有状态和无状态的流应用来说,Kafka Streams 都是理想方案,它能够实现基于时间的操作(例如,围绕给定的时间段对事件进行分组),并且考虑到了 Kafka 生态系统中普遍存在的可扩展性、可靠性和可维护性。


Kafka Stream 由三个元素组成,即输入(源处理器)、输出(sink 处理器)和处理器(流处理器)。


源处理器(Source processor):源处理器代表一个 Kafka 主题。源处理器会发送事件到一个或多个流处理器中。


流处理器(Stream processor):流处理器会将转换/逻辑应用于输入流中,比如连接、分组、计数、映射等。流处理器可以连接至另一个流处理器和/或 sink 处理器。


Sink 处理器(Sink processor):Sink 处理器代表了输出的数据,它会连接至一个 Kafka 主题。


**拓扑结构(topology)**是由源、处理器和 sink 组成的无循环图,事件会传入到一个 Kafka Streams 中,该实例将开始拓扑结构的执行。

Kafka Streams 和 Quarkus

Quarkus 使用 Quarkus KStreams 扩展实现与Kafka Streams集成。

Quarkus 起步

使用 Quarkus 最快捷的方式是通过初始化页面添加所需的依赖。每个服务可能需要不同的依赖,你可以选择 Java 11 或 Java 17。为了实现 Quarkus 与 Kafka Streams 的集成,我们至少需要添加 Kafka Streams 扩展。


要开发的应用

正如在本文开始时所提到的,在本系列的第一部分中,我们开发了一个影视流公司,它有两个 Kafka 主题,其中一个用来存储电影的列表,另外一个主题会在用户停止播放电影时存储用户所在的区域(事件的键),并且会以电影 id 和播放时间作为事件的值。



所有的这些逻辑都是在名为Movie Plays Producer的生成者服务中创建的,该服务是使用 Quarkus 开发的。


除此之外,我还使用 Quakus 开发了一个Movie Plays Consumer服务,它会消费这两个主题的事件并且会在控制台上展示它们(并实现了 HTTP 服务器端事件)。


但是,这里没有对数据进行任何处理,它只是按照原样进行了接收。如果我们想要在 movies 和 playtimemovies 主题之间进行一下连接,在获取电影播放时长的时候得到电影的详细信息而不是 id 的话,那又该怎么办呢?



如果仅仅使用 Kafka 消息来实现这样的逻辑会变成一项很复杂的任务,因为我们需要在一个 Map 中存储 Movie 信息,并且在每个 playedmovie 事件发生时,进行匹配处理。

Movie Plays KStream

与其为每个用例手工编写代码,不如看一下如何使用 Kafka Streams,以及它是如何与 Quarkus 集成来解决这个问题的。

创建项目

导航至 Quarkus 的初始化页面,并选择 Apache Kafka Streams 扩展来实现与 Kafka Streams 的集成。然后,选择 RestEasy 和 RestEasy Jackson 扩展实现事件从 Java 对象和 JSON 之间的编排/解排。同时,取消选中 Started Code 生成选项。


请参照下面的截图:



你也可以跳过这个手动的步骤并导航至Kafka Stream Quarkus Generator链接,在这里,所有的依赖都已经选择好了。然后,点击 Generate your application 按钮,以下载应用骨架的压缩文件。


解压文件,并在你最喜欢的 IDE 中打开项目。

开发

当开发 Kafka Stream 应用时,我们需要做的第一件事就是创建Topology实例,并定义源、处理器和 sink。


在 Quarkus 中,我们只需要创建一个_CDI_类,这个类需要包含一个返回Topology实例的方法。


创建名为TopologyProducer的类,它将会实现从这两个主题消费事件并连接它们的逻辑。最后,生成的结果将会发送至一个 sink 处理器,该处理器以控制台输出的形式展示结果。


还有一个元素我们没有提到,在这些场景中它非常有用,那就是 Kafka Tables。


一个主题可以包含具有相同键的多个事件。例如,我们可以使用某个键插入一个电影,然后我们可以使用相同的键创建一个新的事件来对电影进行更新:



但是,如果我们想要让 movies 主题与_playtimemovies_主题进行连接的话,那我们该选择使用哪个值为 1 的事件呢?第一个还是第二个?在这个具体的情况中,应该选择最新的那一个,因为它包含了电影的最新版本。为了获取每个事件的最新版本,Kafka Streams 有一个_表_的概念(KTable/GlobalKTable)。


Kafka Streams 会浏览指定的主题,获取每个事件的最新版本,并将其放到一个表实例中。


KafkaStream 扩展并不会像 Kafka Messaging 集成那样自动注册SerDes,所以我们需要在拓扑中手动注册它们。


package org.acme;
import javax.enterprise.context.ApplicationScoped;import javax.enterprise.inject.Produces;
import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.KeyValue;import org.apache.kafka.streams.StreamsBuilder;import org.apache.kafka.streams.Topology;import org.apache.kafka.streams.kstream.Consumed;import org.apache.kafka.streams.kstream.GlobalKTable;import org.apache.kafka.streams.kstream.KStream;import org.apache.kafka.streams.kstream.Printed;
import io.quarkus.kafka.client.serialization.ObjectMapperSerde;
@ApplicationScopedpublic class TopologyProducer {
private static final String MOVIES_TOPIC = "movies"; private static final String PLAY_MOVIES_TOPIC = "playtimemovies";
@Produces public Topology getTopCharts() {
final StreamsBuilder builder = new StreamsBuilder();
// 用于Movie和PlayedMovie的SerDes
final ObjectMapperSerde<Movie> movieSerder = new ObjectMapperSerde<>(Movie.class); final ObjectMapperSerde<MoviePlayed> moviePlayedSerder = new ObjectMapperSerde<>(MoviePlayed.class);
// 为Movies主题创建一个Global Kafka Table
final GlobalKTable<Integer, Movie> moviesTable = builder.globalTable( MOVIES_TOPIC, Consumed.with(Serdes.Integer(), movieSerder));
// 连接至playtimemovies主题的流,每当该主题有事件生成都会被该流所消费
final KStream<String, MoviePlayed> playEvents = builder.stream( PLAY_MOVIES_TOPIC, Consumed.with(Serdes.String(), moviePlayedSerder));
// PlayedMovies使用区域作为键,对象作为值。我们对内容进行map操作,让电影的id作为key(以便于进行连接)并让对象继续作为值// 另外,我们使用movies表的键(movieId)以及流的键(在前面的map方法中,我们也将其变成了movieId)进行连接
// 最后,结果会流向控制台
playEvents .map((key, value) -> KeyValue.pair(value.id, value)) // Now key is the id field .join(moviesTable, (movieId, moviePlayedId) -> movieId, (moviePlayed, movie) -> movie) .print(Printed.toSysOut()); return builder.build();
}}
复制代码


MovieMoviePlayed POJO 包含了实现逻辑所需的属性:


Movie对象如下所示:

package org.acme;
public class Movie {
public int id; public String name; public String director; public String genre;
public Movie(int id, String name, String director, String genre) { this.id = id; this.name = name; this.director = director; this.genre = genre; }}
复制代码


MoviePlayed对象如下所示:

package org.acme;
public class MoviePlayed {
public int id; public long duration;
public MoviePlayed(int id, long duration) { this.id = id; this.duration = duration; }
}
复制代码


运行 Kafka Stream 应用之前的最后一步是配置参数,其中最重要的是quarkus.kafka-streams.topics。它是一个主题列表,在拓扑结构开始处理数据之前,它们就要存在于 Kafka 集群中,这是一个前提条件。


打开src/main/resources/application.properties文件并添加如下的代码行:

kafka-streams.cache.max.bytes.buffering=10240kafka-streams.commit.interval.ms=1000kafka-streams.metadata.max.age.ms=500kafka-streams.auto.offset.reset=earliestkafka-streams.metrics.recording.level=DEBUG
quarkus.kafka-streams.topics=playtimemovies,movies
复制代码


现在,我们可以测试一下流了。我们启动在上一篇文章中开发的生产者。生产者的源码可以在这里找到。


Quarkus KStreams 集成了 Quarkus DevServices。所以,我们不需要启动 Kafka 集群,也不需要配置它的位置,因为 Quarkus Dev 模式会处理好所有的事情。我们只需要记住在自己的计算机上要有一个运行中的容器环境即可,比如 Podman 或其他兼容 OCI 的工具。


在终端窗口中启动生产者服务:

cd movie-plays-producer./mvnw compile quarkus:dev
2022-04-11 07:49:31,900 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Cruella played for 287 minutes2022-04-11 07:49:31,941 INFO [io.quarkus] (Quarkus Main Thread) movie-plays-producer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.7.3.Final) started in 4.256s.2022-04-11 07:49:31,942 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.2022-04-11 07:49:31,943 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kafka-client, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]2022-04-11 07:49:32,399 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Encanto played for 162 minutes2022-04-11 07:49:32,899 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie The Hobbit played for 255 minutes2022-04-11 07:49:33,404 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Sing 2 played for 264 minutes2022-04-11 07:49:33,902 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Encanto played for 28 minutes2022-04-11 07:49:34,402 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Star Trek: First Contact played for 137 minutes2022-04-11 07:49:34,903 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Star Trek: First Contact played for 277 minutes2022-04-11 07:49:35,402 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie The Hobbit played for 141 minutes
复制代码


在另一个终端窗口中,启动我们刚刚开发的 Kafka Stream 代码:

./mvnw compile quarkus:dev
2022-04-11 07:54:59,321 INFO [org.apa.kaf.str.pro.int.StreamTask] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-thread [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1] task [1_0] Restored and ready to run2022-04-11 07:54:59,322 INFO [org.apa.kaf.str.pro.int.StreamThread] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-thread [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1] Restoration took 74 ms for all tasks [1_0]2022-04-11 07:54:59,322 INFO [org.apa.kaf.str.pro.int.StreamThread] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-thread [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING2022-04-11 07:54:59,324 INFO [org.apa.kaf.str.KafkaStreams] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-client [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea] State transition from REBALANCING to RUNNING[KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella][KSTREAM-LEFTJOIN-0000000005]: 2, Movie [director=Jonathan Frakes, genre=Space, id=2, name=Star Trek: First Contact][KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella][KSTREAM-LEFTJOIN-0000000005]: 1, Movie [director=Peter Jackson, genre=Fantasy, id=1, name=The Hobbit][KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella][KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella][KSTREAM-LEFTJOIN-0000000005]: 3, Movie [director=Jared Bush, genre=Animation, id=3, name=Encanto][KSTREAM-LEFTJOIN-0000000005]: 5, Movie [director=Garth Jennings, genre=Jukebox Musical Comedy, id=5, name=Sing 2]
复制代码


在输出中打印的是事件(连接所生成的结果),其中键是movieId,值是movie本身。我们现在所具有的功能是,每当一个电影停止播放时,Kafka Stream 会对其进行处理并以 Movie 全量信息的形式对其进行展现。


到目前为止,还不算复杂,对于这样的场景,你可能会想我们根本没有必要使用 Kafka Streams。但是,我们再加一些需求,这样你就能看到它的强大之处了。


现在,我们不是在用户每次停掉电影的时候都生成事件,而是只对用户观看时间超过 10 分钟的电影发送事件。


我们可以使用filter方法按照持续时长进行过滤。

playEvents       .filter((region, event) -> event.duration >= 10) // filters by duration       .map((key, value) -> KeyValue.pair(value.id, value))       .join(moviesTable, (movieId, moviePlayedId) -> movieId, (moviePlayed, movie) -> movie)        .print(Printed.toSysOut());
复制代码


重启应用,我们可以发现观看时长小于 10 分钟的电影将不会被处理。


我们发现,Kafka Streams 有助于代码的整洁性,接下来,我们添加最后的需求。现在,我们对每部电影的播放时长并不感兴趣,而是对每部电影有多少次超过 10 分钟的播放感兴趣。


到目前为止,对事件的处理都是无状态的,因为事件都是遵循这样的步骤,即接收、处理并发送至 sink 处理器(也就是发送至一个主题或控制台输出),但是,为了统计某部电影播放的次数,我们需要在内存记住电影被播放了多少次,并且当任意用户再次观看超过 10 分钟的时候,要对这个统计数字递增一次。此时,事件的处理就要以有状态的方式进行了。


我们需要做的第一件事就是创建一个 Java 类,以存储电影的名称及其播放的次数。

public class MoviePlayCount {   public String name;   public int count;
public MoviePlayCount aggregate(String name) { this.name = name; this.count++;
return this; }
@Override public String toString() { return "MoviePlayCount [count=" + count + ", name=" + name + "]"; }
}
复制代码


这是一个计数器类,它依然需要两样东西:


  • 我们需要有一个地方存储这个类的实例,确保每次触发事件的时候,它不会被重置。

  • 每当_playtimemovies_主题中有事件触发时,调用aggregate方法的逻辑。


关于第一个问题,我们需要使用KeyValueBytesStoreSupplier接口。


public static final String COUNT_MOVIE_STORE = "countMovieStore";
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(COUNT_MOVIE_STORE);
复制代码


对于第二个问题,Kafka Streams 有一个用来聚合结果的aggregate方法。


在我们的用例中,也就是每部电影播放时长超过 10 分钟的次数。


// MoviePlayCount可以被序列化和反序列化ObjectMapperSerde<MoviePlayCount> moviePlayCountSerder = new ObjectMapperSerde<>(MoviePlayCount.class);
// 这是之前的连接操作,其中键是电影id,值是电影.join(moviesTable, (movieId, moviePlayedId) -> movieId, (moviePlayed, movie) -> movie)// 根据键对事件进行分组,在本例中,也就是电影的id.groupByKey(Grouped.with(Serdes.Integer(), movieSerder)) // 聚合方法,如果MoviePlayCount已经创建的话,获取该对象(如果尚未创建的话,会创建该实例)并调用其aggregate方法,以对观看次数进行递增 .aggregate(MoviePlayCount::new, (movieId, movie, moviePlayCounter) -> moviePlayCounter.aggregate(movie.name), Materialized.<Integer, MoviePlayCount> as(storeSupplier) .withKeySerde(Serdes.Integer()) .withValueSerde(moviePlayCountSerder) )
复制代码


重启应用,在控制台上将会展示电影播放的次数。


提示:要重启应用,只需在终端输入“s”,应用将会自动重启。


应用重启之后,控制台将会展示每部电影的状态。


[KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=13, name=Cruella][KTABLE-TOSTREAM-0000000011]: 3, MoviePlayCount [count=11, name=Encanto][KTABLE-TOSTREAM-0000000011]: 5, MoviePlayCount [count=14, name=Sing 2][KTABLE-TOSTREAM-0000000011]: 2, MoviePlayCount [count=15, name=Star Trek: First Contact][KTABLE-TOSTREAM-0000000011]: 1, MoviePlayCount [count=16, name=The Hobbit][KTABLE-TOSTREAM-0000000011]: 2, MoviePlayCount [count=16, name=Star Trek: First Contact][KTABLE-TOSTREAM-0000000011]: 3, MoviePlayCount [count=12, name=Encanto][KTABLE-TOSTREAM-0000000011]: 2, MoviePlayCount [count=17, name=Star Trek: First Contact][KTABLE-TOSTREAM-0000000011]: 5, MoviePlayCount [count=15, name=Sing 2][KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=14, name=Cruella][KTABLE-TOSTREAM-0000000011]: 1, MoviePlayCount [count=17, name=The Hobbit][KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=15, name=Cruella][KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=16, name=Cruella]
复制代码


交互式查询


因为我们将_sink_处理器设置成了System.out 流,所以聚合结果会流向控制台。


.toStream().print(Printed.toSysOut());
复制代码


但是,我们也可以将结果流发往一个 Kafka 主题:


.to("counter_movies",                      Produced.with(Serdes.Integer(), moviePlayCountSerder));
复制代码


但是,如果我们感兴趣的不是每次对新事件的反应,而只是想查询特定的电影此时播放的次数,那又该怎么办呢?


Kafka Streams 的交互式查询(interactive query)允许我们直接查询底层存储,以获取给定键相关的值。


首先,我们创建一个名为MoviePlayCountData的类来存储查询结果。按照这种方式,我们可以解耦 Kafka Streams 使用的类与应用中其他部分所使用的类。


public class MoviePlayCountData {
private String name; private int count;
public MoviePlayCountData(String name, int count) { this.name = name; this.count = count; }
public int getCount() { return count; }
public String getName() { return name; }
}
复制代码


现在,创建名为InteractiveQueries的类来实现对状态存储(KeyValueBytesStoreSupplier)的访问并根据电影的id查询它被播放的次数。


import javax.enterprise.context.ApplicationScoped;import javax.inject.Inject;
import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.errors.InvalidStateStoreException;import org.apache.kafka.streams.state.QueryableStoreTypes;import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import static org.apache.kafka.streams.StoreQueryParameters.fromNameAndType;
import java.util.Optional;
@ApplicationScopedpublic class InteractiveQueries {
@Inject KafkaStreams streams;
public <MoviePlayCountData> getMoviePlayCountData(int id) { // 获取状态存储并根据电影id获取播放次数 MoviePlayCount moviePlayCount = getMoviesPlayCount().get(id); // 如果有结果的话 if (moviePlayCount != null) { // 将结果包装到MoviePlayCountData中 return Optional.of(new MoviePlayCountData(moviePlayCount.name, moviePlayCount.count)); } else { return Optional.empty(); } }
// 获取状态存储 private ReadOnlyKeyValueStore<Integer, MoviePlayCount> getMoviesPlayCount() { while (true) { try { return streams.store(fromNameAndType(TopologyProducer.COUNT_MOVIE_STORE, QueryableStoreTypes.keyValueStore())); } catch (InvalidStateStoreException e) { // 忽略之,此时存储尚未就绪 } } }
}
复制代码


现在,我们可以添加一个简单的 REST 端点来运行该查询。

import java.util.Optional;
import javax.inject.Inject;import javax.ws.rs.GET;import javax.ws.rs.Path;import javax.ws.rs.PathParam;import javax.ws.rs.core.Response;import javax.ws.rs.core.Response.Status;
@Path("/movie")public class MovieCountResource {
// 注入前文的类进行查询 @Inject InteractiveQueries interactiveQueries;
@GET @Path("/data/{id}") public Response movieCountData(@PathParam("id") int id) { Optional<MoviePlayCountData> moviePlayCountData = interactiveQueries.getMoviePlayCountData(id);
// 根据结果判定返回值还是404 if (moviePlayCountData.isPresent()) { return Response.ok(moviePlayCountData.get()).build(); } else { return Response.status(Status.NOT_FOUND.getStatusCode(), "No data found for movie " + id).build(); }
}}
复制代码


Kafka Stream 实现的模式如下图所示:


扩展

Kafka Streams 应用可以进行扩展,所以流会分布到多个实例中。在这种情况下,每个实例都包含聚合结果的一个子集,所以要想获得总的聚合结果,我们需要通过将 REST API 重定向到另外的实例来获取其他实例的数据。


Kafka Streams 提供了一个API,可以知道要请求的数据是在本地 Kafka Streams 存储中还是在其他的主机中。


虽然这个过程并不复杂,但已经超出了本文的讨论范围。

结论

到目前为止,我们已经看到,将 Quarkus 应用连接到 Apache Kafka 并生产和消费主题中的消息/事件是很容易的。此外,还看到 Kafka Streams 让我们不仅可以消费 Kafka 中消息,还能够实时处理它们,进行转换、过滤等操作,例如以同步的方式消费结果数据。 这是一项强大的技术,当需要处理的数据不断变化时,它可以轻松扩展,提供实时的体验。


但是,我们还没有解决该架构的最后一个问题。通常情况下,数据并不是存储在一个地方。电影信息可能存储在关系型数据库中,而电影的播放信息则存储在一个 Kafka 主题中。那么,该如何保持这两个地方的信息更新,以便 Kafka Streams 能正确地连接数据呢?


这里有一个缺失的部分,名为 Debezium 的项目可以帮助我们解决这个问题。我们将用一整篇文章来介绍 Debezium 和 Quarkus,敬请持续关注。


作者简介:

Alex Soto 是红帽公司的开发者体验总监。他对 Java 领域、软件自动化充满热情,他相信开源软件模式。Soto 是 Manning 的《Testing Java Microservices》和 O’Reilly 的《Quarkus Cookbook》两本书的共同作者,他还是多个开源项目的贡献者。自 2017 年以来,他一直是 Java Champion,是国际演讲者和 Salle URL 大学的教师。你可以在 Twitter 上关注他(Alex Soto ),随时了解 Kubernetes 和 Java 领域的动态。


原文链接:

Kafka Streams and Quarkus: Real-Time Processing Events


相关阅读:

本系列第一部分:使用 Apache Kafka 实现 Quarkus 的反应式消息

2022-08-25 11:005230

评论

发布
暂无评论
发现更多内容

探讨MySQL事务特性和实现原理

小小怪下士

Java MySQL 程序员 事务

看海泰方圆类ChatGPT技术模型!

电子信息发烧客

畅销10年的数据库技术图书,当之无愧的霸主!还有谁?

博文视点Broadview

谈谈干前端三年的几点感受

虎妞先生

前端 成长 代码人生

Flomesh Ingress 使用实践(四)TLS 透传

Flomesh

Kubernetes 服务网格 ingress Pipy 流量管理

应用部署初探:微服务的3大部署模式

SEAL安全

微服务 企业号 2 月 PK 榜

数据同步gossip协议原理与应用场景介绍

京东科技开发者

架构 Consul fabric Gossip协议 企业号 2 月 PK 榜

Vue3项目框架搭建封装,一次学习,终身受益【万字长文,满满干货】

虎妞先生

前端 前端架构 Vue 3 vue cli

Python从0到1丨图像增强及运算:形态学开运算、闭运算和梯度运算

华为云开发者联盟

Python 人工智能 华为云 企业号 2 月 PK 榜 华为云开发者联盟

银行零售如何更贴近客户?是时候升级你的客户旅程平台了

Kyligence

数据分析 客户旅程

Hi3861编译烧录更快捷

HarmonyOS开发者

HarmonyOS

JVM性能调优,分享些好用的内存分析神器

Steven

如何用Apipost预执行脚本动态修改Query、Body、Header参数

不想敲代码

Postman 接口调试 API apipost

非代码的贡献也能成为Committer,我与DolphinScheduler社区的故事

Apache DolphinScheduler

开源 开源社区 开源文化 开源软件 大数据 开源

给webpack提了一个pr之后......

虎妞先生

前端 webpack #开源

众生皆苦,我选pnpm

虎妞先生

npm 原理 前端工程化 pnpm

十分钟用vitepress搭建项目文档

虎妞先生

前端 vite Vue 3

万里数据库加入龙蜥社区,打造基于“龙蜥+GreatSQL”的开源技术底座

OpenAnolis小助手

开源 龙蜥社区 greatsql社区 万里数据库 生态适配

干货|PCBA丝印位号与极性符号的组装性设计

华秋电子

PCB dfm

从零开始学习BOM&DOM

虎妞先生

前端 DOM

C++到Python全搞定,教你如何为FastDeploy贡献代码

飞桨PaddlePaddle

c++ paddle 飞桨

Java高手速成 | Hibernate的配置文件与JPA API的基本用法

TiAmo

hibernate jpa api 网关

2023年低代码发展新趋势

力软低代码开发平台

Flink CEP 在抖音电商的业务实践

Apache Flink

大数据 flink 实时计算

BSN-DDC基础网络详解(二):快速接入指南

BSN研习社

BSN-DDC

基于Verilog HDL的状态机描述方法

timerring

FPGA

git中patch的用法

ModStart

前端如何实现将多页数据合并导出到Excel单Sheet页解决方案|内附代码

葡萄城技术团队

数据库 前端 架构分布式

我的2022,从紫竹院到通惠河畔

虎妞先生

学习 前端 成长 年终总结

对话 ChatGPT:现象级 AI 应用,将如何阐释「研发效能管理」?

LigaAI

人工智能 研发效能 openai ChatGPT 企业号 2 月 PK 榜

图片竟能直接生成逼真音效?这AI模型也太神奇了吧!

人称T客

Kafka Streams与Quarkus:实时处理事件_云原生_Alex Soto_InfoQ精选文章