在容器化环境中扩展分布式流式处理器

阅读数:3269 2019 年 2 月 11 日 10:35

在容器化环境中扩展分布式流式处理器

本文介绍了我们在 Kubernetes 中扩展分布式流处理器的经验。流处理器应该支持维护最佳的并行性。然而,添加更多的资源会带来额外的成本,但却不能保证性能的提升。相反,流处理器应该识别出资源需求级别,并进行相应的扩展。

关键要点

  • 流式处理器应该是可扩展的,用以满足流数据处理不断增长的业务需求。
  • 在容器化环境中扩展流式处理器必须在服务质量与相关成本之间做出权衡。
  • 流式处理器应该能够通过水平扩展在容器化环境(例如运行在云服务供应商上的 Kubernetes)中利用这种权衡。
  • 在容器化环境中成功运行流式处理应用程序取决于为每个流式处理器配置的硬件资源。
  • 为容器化环境添加更多的硬件资源并不会带来性能的提升。

由于对事件流式处理(即流式处理)应用程序的需求在不断增长,数据流式处理近来已成为数据分析领域的主要范式之一。各种流式处理应用已经出现在各个行业中,如电信、交通流量管理、人群管理、健康信息学、网络安全、金融,等等。

流式处理器是一种软件平台,让用户能够更快地处理和响应传入的数据流。市场上有很多流式处理器可供选择,比如 Flink、Heron、Kafka、Samza、Spark Streaming、Storm 和 WSO2 Stream Processor,它们都是开源流式处理器。

流式处理器的实时操作为提升系统性能提供了高质量的服务。大多数现代流式处理器只需要少数几个计算节点就可以处理 90%的流式场景。但是,随着业务的扩展,大多数有利可图的企业需要处理越来越多的工作负载。因此,在选择流式处理器时,必须选择容易扩展且能够处理更大工作负载的解决方案。

流式处理器越来越多地被部署成云端的软件即服务(SaaS),例如 Amazon Kinesis Data Analytics、Microsoft Azure Stream Analytics、Google Cloud Dataflow,等等。基于云的流式处理器为在其上运行的流式处理应用程序提供了弹性扩展能力。以容器为中心的管理环境(即容器化环境,例如 Kubernetes)可以以可伸缩的方式运行流式处理应用程序。然而,由于硬件或软件环境的异构性和数据流的特性,在容器化环境中何时以及如何进行分布式流式处理器的缩放就成了一个非常重要的问题。本文介绍了一个数据密集型流式处理应用程序的实际应用场景,解释了如何在 Kubernetes 中进行系统的扩展,以及相关的权衡。我们使用 WSO2 Stream Processor(WSO2 SP)作为示例流式处理器,因为它是一个开源的适合实现此类应用程序的云原生流式处理器。不过,我们认为相同的概念也同样适用于市场上其他云原生流式处理器。

我们将提供一个真实的流式处理示例,与检测恶意攻击有关,在这个示例中,有人试图进行未授权的 Web 服务器登录,并造成服务器拒绝服务(DoS 攻击)。一旦检测到此类 DoS 攻击,系统就会向系统管理员发送警报,以便采取必要的安全措施。

使用 Web 服务器日志检测恶意攻击

可以使用 Web 服务器日志中捕获的 HTTP 日志事件来监控 Web 服务器的运行状态。如果连续出现相同的 HTTP 状态码(例如 401 未授权或 403 禁止访问),说明有人在尝试恶意登录 Web 服务器。401 响应说明外部第三方尝试访问 Web 服务器的凭证已经被拒绝。状态码 403 说明服务器拒绝接受请求,尽管服务器知道如何处理该请求。

在容器化环境中扩展分布式流式处理器

图 1:检测对 Web 服务器的恶意攻击

有几种不同的方法可用来处理这种情况。但是,对于这种情况,信息安全专家更喜欢收到实时警报。如果这种恶意请求在三秒钟内超过 30 次,并且访问率((未授权请求次数 + 禁止访问计数)/ 总请求次数)为 1.0,则需要抛出警报。我们使用流式处理器开发了一个警报生成程序(如图 1 所示),这个处理器接收并处理来自 Web 服务器的日志事件。为了让系统具备可扩展性,它被部署在运行在 Google Compute Engine(GCE)上的 Kubernetes 集群中。清单 1 显示了用 Siddhi 查询语言编写的流式 SQL 代码。我们将它称为 Siddhi 应用程序。

清单 1:使用 Siddhi 流式 SQL 开发的恶意攻击检测程序

复制代码
@App:name("MaliciousAttacksDetection")
@App:description("HTTP Log Processor for detecting malicious DoS attacks")
@source(type = 'kafka', partition.no.list='0', threading.option='single.thread', group.id="group", bootstrap.servers='kafka-service:9092', topic.list = 'attackDetectionTopic',
@map(type = 'json'))
define stream inputStream ( iij_timestamp long, ip string, timestamp long, zone float, cik double, accession string, doc string, code float, size double, idx float, norefer float, noagent float, find float, crawler float, groupID int, browser string);
--Output of query 1: I want to know the IP of the malicious hosts which try to make unauthorized login attempts in short intervals.
@sink(type='log')
define stream outputStreamDoSAlert(ip string ,groupID int);
--The Actual latency of parallel siddhi apps are getting started to measure at this point
@info(name = "Query1")
@dist(execGroup='group11' ,parallel ='1')
from inputStream
select iij_timestamp, ip, timestamp, zone, cik, accession, doc, code, size, idx, norefer, noagent, find, crawler, groupID, browser, convert(time:timestampInMilliseconds(),'long') as injected_iijtimestamp
insert into interimInputStream;
--Query 2: Here all the accesses are either 401 or 403 and they have been done at least 30 times within 3 seconds time period.
@info(name = "Query2")
@dist(execGroup='group3', parallel ='12')
partition with (groupID of interimInputStream)
begin
from interimInputStream#window.timeBatch(timestamp, 3 sec)
select ip, count() as totalAccessCount, sum(ifThenElse(code == 401F, 1, 0)) as unauthorizedCount, sum(ifThenElse(code == 403F, 1, 0)) as forbiddenCount,injected_iijtimestamp as iijtimestamp,groupID,timestamp
insert into #interimStream3;
from #interimStream3#throughput:throughput(iijtimestamp,"throughput",3,6,"outputStreamDoSAlert",30)
select ip, totalAccessCount, (unauthorizedCount + forbiddenCount)/totalAccessCount as accessPercentage ,groupID
insert into #interimStream5;
from #interimStream5 [totalAccessCount > 30L and accessPercentage == 1.0]
select ip ,groupID
insert into outputStreamDoSAlert;
end;

我们将这个应用程序部署在分布式流式处理器中,如图 2 的部署架构图所示。每个组件(如 Worker-1、Worker-2……)都部署为单个容器和单个 Kubernetes pod。表 1 中列出了每个容器类别及其执行的任务。图 2 显示了整个系统被部署在六个 Kubernetes 节点中的方案。

表 1:Kubernetes 环境中不同类型容器执行的任务

在容器化环境中扩展分布式流式处理器
我们将应用程序部署在 Google Compute Engine 的 Kubernetes 环境中。对于特定的工作负载 P,系统应该提供特定的服务质量(QoS)值 Q。我们根据 Worker 的延迟数量来测量 Q 的值(延迟是指事件进入 Worker 和事件退出 Worker 之间的时间差)。下面列出了集群中部署的每个组件。

  • Web 服务器托管在 Node 1 上;
  • 生产者组件托管在 Node 2 上,这个组件负责生成工作负载。它读取 Web 服务器日志并将它们发布到运行在 Node 4 上的 Kafka 实例;
  • Node 2 和 Node 4 运行流式处理器的两个管理器;
  • 由 gcloud 自动生成的 NFS 托管在 Node 3 上;
  • Worker 运行在 Node 5 和 Node 6 上,并负责处理实际的工作负载。它们从 Kafka 实例读取数据,应用流式处理操作,并将结果写回 Kafka 实例。

在容器化环境中扩展分布式流式处理器

图 2:Kubernetes 集群的部署架构。

但是,随着时间的推移,Web 服务器上的工作负载也会增加,这是大多数 Web 服务器的典型特征。工作负载可能会从 P 增加到到 2P、4P、……、16P,等等。在这种情况下,运行 Web 服务器监控系统的企业需要维护 Q’(QoS 属性的观察值),让它与 Q 相当。流式处理器应该能够扩展到足以维持预期的 QoS 级别。请注意,可伸缩性是系统处理不断增加的工作负载的能力。在本文中,我们着重关注负载可伸缩性,即随着流量的增加,系统能够正常运行。

有两种方法可以实现负载可伸缩性:强伸缩和弱伸缩。强伸缩在保持问题规模不变的同时增加处理器的数量,弱伸缩也会增加处理器数量,但保持每个处理器的问题规模不变。在本文中,我们采用了弱伸缩,因为我们遇到的是工作负载增加的情况。

实验

我们使用了一个运行在 Google Compute Engine(GCE)上的 Kubernetes 集群。此外,我们使用的节点配备了 2 核 CPU 和 7.5GB 内存,用来托管 pod。每个 pod 都有一个容器,每个容器中都部署了一个流式处理器(SP)组件。我们使用 JDK 1.8、Kafka 2.0.1、WSO2-SP 4.3.0 和 MYSQL 5.7.4 来构建 docker 镜像。它们都作为容器化的应用程序部署在集群中。每个实验需要 40 分钟时间,包括 10 分钟的预热。请注意,我们使用符号 x-y-z 来表示 (节点数)-(Worker 数)-(实例数)。

我们使用 EDGAR 日志文件数据集作为实验数据集,因为它已经包含 CSV 格式的 Web 服务器日志数据集。我们使用了 EDGAR 日志文件数据集 log20170325.csv 。这个 CSV 文件中包含了 22,146,382 个事件,文件总大小约为 2.4GB,平均消息大小为 144 字节,每条记录有 15 个字段。清单 2 显示了 EDGAR 数据集中的头两个记录。

清单 2:来自 EDGAR 日志文件数据集的头两个记录

复制代码
ip,date,time,zone,cik,accession,extention,code,size,idx,norefer,noagent,find,crawler,browser
100.14.44.eca,2017-03-25,00:00:00,0.0,1031093.0,0001137171-10-000013,-index.htm,200.0,7926.0,1.0,0.0,0.0,10.0,0.0,

我们分别测量了三个级别的性能,即节点级别、容器级别和流式处理器级别。不过,本文得出的结论是基于流式处理器级别的延迟和吞吐量。我们在 group3 Siddhi 执行组中测量了这些值。我们使用 Kubernetes 集群进行了六次不同的实验,得出了表 2 所示的结果。

在表 2 中,ID 对应于唯一的实验标识符。节点数量对应于 Kubernetes Worker 的总数。实例数量是指 Siddhi 实例的数量。生产者数据速率(线程数)对应于生产者的数量,这些生产者通过从 EDGAR 数据文件读取 HTTP 日志事件来生成流事件。

结果

两个节点、6 个 Worker、6 个实例(2-6-6)导致单个工作负载生产者 P 的平均延迟为 390 毫秒。这是最基本的情况。随着工作负载的增加,延迟显著增加,而吞吐量会随之降低。我们将工作负载生成器线程增加到 16,用以生成很大的工作负载。场景 2 显示了这种情况。当我们将生成器线程增加 16 时,延迟增加了 28.2%,吞吐量减少了 29%。场景 3 表明,减少输入数据项中唯一组 ID 的数量会使事情变得更糟(特别是在吞吐量方面)。这是因为减少唯一组 ID 的数量会导致应用程序的并行性也减少。

如果我们将每个节点的 Worker 数量加倍,如场景 4 所示,我们将每个 Worker 的内存量减半。与情况 1 相比,吞吐量降低了三分之二,延迟增加了 9 倍。因此,Worker 的内存使用量对应用程序的延迟起着重要作用。有时候,如果没有提供足够的内存,甚至无法部署 Worker。

对于第 5 种场景,我们将 Kubernetes Worker 节点的数量增加到 3 个,总共有 12 个 Worker,从而消除了性能瓶颈。现在添加了另一个节点(Node 7),每个节点有四个流式处理器。每个 Worker 需要 1 GB 内存,因此每个节点可以有 4 到 5 个 Worker。因此,即使我们有 3-6-6 的系统设置(即三个节点,六个 Worker 和六个实例),只能获得 2-6-6 的运行性能。但是,2-12-12 和 3-12-12 的性能特征会不一样。

添加更多的硬件资源也无助于是。我们可以从实验场景 6 中观察到这一点。虽然与场景 5 相比,节点数量增加了一倍,但我们得到了与场景 5 相似的平均延迟。如果为 Worker 添加更多的 Siddhi 部分应用程序,场景 6 的方法可能会有用,但这需要额外的内存。

表 2:不同 Kubernetes 集群的性能
在容器化环境中扩展分布式流式处理器

当我们在 Siddhi 应用程序中使用分区时,Siddhi 应用程序被分成若干个 Siddhi 应用程序,这些应用程序将根据分区属性的唯一值分布来获得工作负载。表 2 最右边的一列显示了 groupID 字段使用的属性惟一值的数量。在 Siddhi 应用程序中,我们使用 groupID 作为分区属性。因此,使用 6 个惟一值意味着只有 6 个部分 Siddhi 应用程序可以根据 Siddhi 的哈希映射获得工作负载。我们将惟一 groupID 的数量增加到 12 来增加并行性。这意味着流将被定向到 12 个 Siddhi 部分应用程序。由于这些原因,并行性增加了,我们在场景 1 中获得的延迟比场景 3 中的更好。

总结

可伸缩性是容器环境中的流式处理器面临的一个重大挑战,因为应用程序的工作负载会随着时间的推移而增加。本文分享了在 Kubernetes 环境中扩展此类分布式流式处理器的经验。为此,流式处理器应该提供某种编程语言或查询结构,以维持最佳的并行级别,而不管应用程序的初始规模如何。随着工作负载的增加,需要提供足够数量的硬件资源,让系统可以保持足够的 QoS 水平。添加更多资源会产生额外成本,但添加更多的资源并不能保证一定会获得性能的提升。流式处理器需要能够识别出资源需求级别,并扩展到可以保持最佳性能与成本比的级别。

关于作者

Sarangan Janakan 是 WSO2 的实习软件工程师,目前是斯里兰卡莫拉图瓦大学计算机科学与工程系的三年级本科生。他的研究兴趣包括数据流式处理和云计算。

Miyuru Dayarathna 是 WSO2 的高级技术主管。他是一名计算机科学家,在流式计算、图形数据管理和挖掘、云计算、性能工程、信息安全等方面做出了贡献。他还是斯里兰卡莫拉图瓦大学计算机科学与工程系的顾问。他最近在 WSO2 的研究重点是流式处理器和身份识别服务器。他已经在知名的国际期刊和会议上发表过技术论文。

查看英文原文: https://www.infoq.com/articles/distributed-stream-processor-container

评论

发布