InfoQ Geekathon 大模型技术应用创新大赛 了解详情
写点什么

Dubbo 压测插件的实现——基于 Gatling

  • 2020-03-11
  • 本文字数:7082 字

    阅读完需:约 23 分钟

Dubbo压测插件的实现——基于Gatling

Dubbo 压测插件已开源,本文涉及代码详见 gatling-dubbo(点击阅读原文)


Gatling 是一个开源的基于 Scala、Akka、Netty 实现的高性能压测框架,较之其他基于线程实现的压测框架,Gatling 基于 AKKA Actor 模型实现,请求由事件驱动,在系统资源消耗上低于其他压测框架(如内存、连接池等),使得单台施压机可以模拟更多的用户。此外,Gatling 提供了一套简单高效的 DSL(领域特定语言)方便我们编排业务场景,同时也具备流量控制、压力控制的能力并提供了良好的压测报告,所以有赞选择在 Gatling 基础上扩展分布式能力,开发了自己的全链路压测引擎 MAXIM。全链路压测中我们主要模拟用户实际使用场景,使用 HTTP 接口作为压测入口,但有赞目前后端服务中 Dubbo 应用比重越来越高,如果可以知道 Dubbo 应用单机水位将对我们把控系统后端服务能力大有裨益。基于 Gatling 的优势和在有赞的使用基础,我们扩展 Gatling 开发了 gatling-dubbo 压测插件。

插件主要结构

实现 Dubbo 压测插件,需实现以下四部分内容:


  • Protocol 和 ProtocolBuild

  • 协议部分,这里主要定义 Dubbo 客户端相关内容,如协议、泛化调用、服务 URL、注册中心等内容,ProtocolBuild 则为 DSL 使用 Protocol 的辅助类

  • Action 和 ActionBuild

  • 执行部分,这里的作用是发起 Dubbo 请求,校验请求结果并记录日志以便后续生成压测报告。ActionBuild 则为 DSL 使用 Action 的辅助类

  • Check 和 CheckBuild

  • 检查部分,全链路压测中我们都使用 JsonPath检查请求结果,这里我们实现了一样的检查逻辑。CheckBuild 则为 DSL 使用 Check 的辅助类

  • DSL

  • Dubbo 插件的领域特定语言,我们提供了一套简单易用的 API 方便编写 Duboo 压测脚本,风格上与原生 HTTP DSL 保持一致


Protocol

协议部分由 5 个属性组成,这些属性将在 Action 初始化 Dubbo 客户端时使用,分别是:


  • protocol

  • 协议,设置为 dubbo

  • generic

  • 泛化调用设置,Dubbo 压测插件使用泛化调用发起请求,所以这里设置为 true,有赞优化了泛化调用的性能,为了使用该特性,引入了一个新值 result_no_change(去掉优化前泛化调用的序列化开销以提升性能)

  • url

  • Dubbo 服务的地址: dubbo://IP地址:端口

  • registryProtocol

  • ubbo 注册中心的协议,设置为 ETCD3

  • registryAddress

  • Dubbo 注册中心的地址


如果是测试 Dubbo 单机水位,则设置 url,注册中心设置为空;如果是测试 Dubbo 集群水位,则设置注册中心(目前支持 ETCD3),url 设置为空。由于目前注册中心只支持 ETCD3,插件在 Dubbo 集群上使用缺乏灵活性,所以我们又实现了客户端层面的负载均衡,如此便可抛开特定的注册中心来测试 Dubbo 集群水位。该特性目前正在内测中。


object DubboProtocol { val DubboProtocolKey = new ProtocolKey {  type Protocol = DubboProtocol  type Components = DubboComponents
def protocolClass: Class[io.gatling.core.protocol.Protocol] = classOf[DubboProtocol].asInstanceOf[Class[io.gatling.core.protocol.Protocol]]
def defaultProtocolValue(configuration: GatlingConfiguration): DubboProtocol = throw new IllegalStateException("Can't provide a default value for DubboProtocol")
def newComponents(system: ActorSystem, coreComponents: CoreComponents): DubboProtocol => DubboComponents = { dubboProtocol => DubboComponents(dubboProtocol) } }}
case class DubboProtocol( protocol: String, //dubbo generic: String, //泛化调用? url: String, //use url or registryProtocol: String, //use registry registryAddress: String //use registry) extends Protocol { type Components = DubboComponents}
复制代码


为了方便 Action 中使用上面这些属性,我们将其装进了 Gatling 的 ProtocolComponents:


case class DubboComponents(dubboProtocol: DubboProtocol) extends ProtocolComponents { def onStart: Option[Session => Session] = None def onExit: Option[Session => Unit] = None}
复制代码


以上就是关于 Protocol 的定义。为了能在 DSL 中配置上述 Protocol,我们定义了 DubboProtocolBuilder,包含了 5 个方法分别设置 Protocol 的 protocol、generic、url、registryProtocol、registryAddress 5 个属性。


object DubboProtocolBuilderBase { def protocol(protocol: String) = DubboProtocolBuilderGenericStep(protocol)}
case class DubboProtocolBuilderGenericStep(protocol: String) { def generic(generic: String) = DubboProtocolBuilderUrlStep(protocol, generic)}
case class DubboProtocolBuilderUrlStep(protocol: String, generic: String) { def url(url: String) = DubboProtocolBuilderRegistryProtocolStep(protocol, generic, url)}
case class DubboProtocolBuilderRegistryProtocolStep(protocol: String, generic: String, url: String) { def registryProtocol(registryProtocol: String) = DubboProtocolBuilderRegistryAddressStep(protocol, generic, url, registryProtocol)}
case class DubboProtocolBuilderRegistryAddressStep(protocol: String, generic: String, url: String, registryProtocol: String) { def registryAddress(registryAddress: String) = DubboProtocolBuilder(protocol, generic, url, registryProtocol, registryAddress)}
case class DubboProtocolBuilder(protocol: String, generic: String, url: String, registryProtocol: String, registryAddress: String) { def build = DubboProtocol( protocol = protocol, generic = generic, url = url, registryProtocol = registryProtocol, registryAddress = registryAddress )}
复制代码

Action

DubboAction 包含了 Duboo 请求逻辑、请求结果校验逻辑以及压力控制逻辑,需要扩展 ExitableAction 并实现 execute 方法。


DubboAction 类的域 argTypes、argValues 分别是泛化调用请求参数类型和请求参数值,需为 Expression[] 类型,这样当使用数据 Feeder 作为压测脚本参数输入时,可以使用类似 ${args_types}${args_values}这样的表达式从数据 Feeder 中解析对应字段的值。


execute 方法必须以异步方式执行 Dubbo 请求,这样前一个 Dubbo 请求执行后但还未等响应返回时虚拟用户就可以通过 AKKA Message 立即发起下一个请求,如此一个虚拟用户可以在很短的时间内构造大量请求。请求方式方面,相比于泛化调用,原生 API 调用需要客户端载入 Dubbo 服务相应的 API 包,但有时候却拿不到,此外,当被测 Dubbo 应用多了,客户端需要载入多个 API 包,所以出于使用上的便利性,Dubbo 压测插件使用泛化调用发起请求。


异步请求响应后会执行 onComplete 方法,校验请求结果,并根据校验结果记录请求成功或失败日志,压测报告就是使用这些日志统计计算的。


为了控制压测时的 RPS,则需要实现 throttle 逻辑。实践中发现,高并发情况下,泛化调用性能远不如原生 API 调用性能,且响应时间成倍增长(如此不能表征 Dubbo 应用的真正性能),导致 Dubbo 压测插件压力控制不准,解决办法是优化泛化调用性能,使之与原生 API 调用的性能相近,请参考 dubbo 泛化调用性能优化


class DubboAction(  interface:    String,  method:      String,  argTypes:     Expression[Array[String]],  argValues:    Expression[Array[Object]],  genericService:  GenericService,  checks:      List[DubboCheck],  coreComponents:  CoreComponents,  throttled:    Boolean,  val objectMapper: ObjectMapper,  val next:     Action) extends ExitableAction with NameGen {
override def statsEngine: StatsEngine = coreComponents.statsEngine
override def name: String = genName("dubboRequest")
override def execute(session: Session): Unit = recover(session) { argTypes(session) flatMap { argTypesArray => argValues(session) map { argValuesArray => val startTime = System.currentTimeMillis() val f = Future { try { genericService.$invoke(method, argTypes(session).get, argValues(session).get) } finally { } }
f.onComplete { case Success(result) => val endTime = System.currentTimeMillis() val resultMap = result.asInstanceOf[JMap[String, Any]] val resultJson = objectMapper.writeValueAsString(resultMap) val (newSession, error) = Check.check(resultJson, session, checks) error match { case None => statsEngine.logResponse(session, interface + "." + method, ResponseTimings(startTime, endTime), Status("OK"), None, None) throttle(newSession(session)) case Some(Failure(errorMessage)) => statsEngine.logResponse(session, interface + "." + method, ResponseTimings(startTime, endTime), Status("KO"), None, Some(errorMessage)) throttle(newSession(session).markAsFailed) } case FuFailure(e) => val endTime = System.currentTimeMillis() statsEngine.logResponse(session, interface + "." + method, ResponseTimings(startTime, endTime), Status("KO"), None, Some(e.getMessage)) throttle(session.markAsFailed) } } } }
private def throttle(s: Session): Unit = { if (throttled) { coreComponents.throttler.throttle(s.scenario, () => next ! s) } else { next ! s } }}
复制代码


DubboActionBuilder 则是获取 Protocol 属性并初始化 Dubbo 客户端:


case class DubboActionBuilder(interface: String, method: String, argTypes: Expression[Array[String]], argValues: Expression[Array[Object]], checks: List[DubboCheck]) extends ActionBuilder { private def components(protocolComponentsRegistry: ProtocolComponentsRegistry): DubboComponents =  protocolComponentsRegistry.components(DubboProtocol.DubboProtocolKey)
override def build(ctx: ScenarioContext, next: Action): Action = { import ctx._ val protocol = components(protocolComponentsRegistry).dubboProtocol //Dubbo客户端配置 val reference = new ReferenceConfig[GenericService] val application = new ApplicationConfig application.setName("gatling-dubbo") reference.setApplication(application) reference.setProtocol(protocol.protocol) reference.setGeneric(protocol.generic) if (protocol.url == "") { val registry = new RegistryConfig registry.setProtocol(protocol.registryProtocol) registry.setAddress(protocol.registryAddress) reference.setRegistry(registry) } else { reference.setUrl(protocol.url) } reference.setInterface(interface) val cache = ReferenceConfigCache.getCache val genericService = cache.get(reference) val objectMapper: ObjectMapper = new ObjectMapper() new DubboAction(interface, method, argTypes, argValues, genericService, checks, coreComponents, throttled, objectMapper, next) }}
复制代码


LambdaProcessBuilder 则提供了设置 Dubbo 泛化调用入参的 DSL 以及接下来要介绍的 Check 部分的 DSL:


case class DubboProcessBuilder(interface: String, method: String, argTypes: Expression[Array[String]] = _ => Success(Array.empty[String]), argValues: Expression[Array[Object]] = _ => Success(Array.empty[Object]), checks: List[DubboCheck] = Nil) extends DubboCheckSupport {
def argTypes(argTypes: Expression[Array[String]]): DubboProcessBuilder = copy(argTypes = argTypes)
def argValues(argValues: Expression[Array[Object]]): DubboProcessBuilder = copy(argValues = argValues)
def check(dubboChecks: DubboCheck*): DubboProcessBuilder = copy(checks = checks ::: dubboChecks.toList)
def build(): ActionBuilder = DubboActionBuilder(interface, method, argTypes, argValues, checks)}
复制代码

Check

全链路压测中,我们都使用 JsonPath校验 HTTP 请求结果,Dubbo 压测插件中,我们也实现了基于 JsonPath的校验。实现 Check,必须实现 Gatling check 中的 Extender 和 Preparer:


package object dubbo { type DubboCheck = Check[String]
val DubboStringExtender: Extender[DubboCheck, String] = (check: DubboCheck) => check
val DubboStringPreparer: Preparer[String, String] = (result: String) => Success(result)}
复制代码


基于 JsonPath的校验逻辑:


trait DubboJsonPathOfType { self: DubboJsonPathCheckBuilder[String] =>
def ofType[](implicit extractorFactory: JsonPathExtractorFactory) = new DubboJsonPathCheckBuilder[](path, jsonParsers)}
object DubboJsonPathCheckBuilder { val CharsParsingThreshold = 200 * 1000
def preparer(jsonParsers: JsonParsers): Preparer[String, Any] = response => { if (response.length() > CharsParsingThreshold || jsonParsers.preferJackson) jsonParsers.safeParseJackson(response) else jsonParsers.safeParseBoon(response) }
def jsonPath(path: Expression[String])(implicit extractorFactory: JsonPathExtractorFactory, jsonParsers: JsonParsers) = new DubboJsonPathCheckBuilder[](path, jsonParsers) with DubboJsonPathOfType}
class DubboJsonPathCheckBuilder[X: JsonFilter]( private[check] val path: Expression[String], private[check] val jsonParsers: JsonParsers)(implicit extractorFactory: JsonPathExtractorFactory) extends DefaultMultipleFindCheckBuilder[DubboCheck, String, Any, X]( DubboStringExtender, DubboJsonPathCheckBuilder.preparer(jsonParsers) ) { import extractorFactory._
def findExtractor(occurrence: Int) = path.map(newSingleExtractor[](_, occurrence)) def findAllExtractor = path.map(newMultipleExtractor[X]) def countExtractor = path.map(newCountExtractor)}
复制代码


DubboCheckSupport 则提供了设置 jsonPath 表达式的 DSL:


trait DubboCheckSupport { def jsonPath(path: Expression[String])(implicit extractorFactory: JsonPathExtractorFactory, jsonParsers: JsonParsers) =  DubboJsonPathCheckBuilder.jsonPath(path)}
复制代码


Dubbo 压测脚本中可以设置一个或多个 check 校验请求结果,使用 DSL check 方法

DSL

trait AwsDsl提供顶层 DSL。我们还定义了 dubboProtocolBuilder2DubboProtocol、dubboProcessBuilder2ActionBuilder 两个 Scala 隐式方法,以自动构造 DubboProtocol 和 ActionBuilder。


此外,泛化调用中使用的参数类型为 Java 类型,而我们的压测脚本使用 Scala 编写,所以这里需要做两种语言间的类型转换,所以我们定义了 transformJsonDubboData 方法。


trait DubboDsl extends DubboCheckSupport { val Dubbo = DubboProtocolBuilderBase
def dubbo(interface: String, method: String) = DubboProcessBuilder(interface, method)
implicit def dubboProtocolBuilder2DubboProtocol(builder: DubboProtocolBuilder): DubboProtocol = builder.build
implicit def dubboProcessBuilder2ActionBuilder(builder: DubboProcessBuilder): ActionBuilder = builder.build()
def transformJsonDubboData(argTypeName: String, argValueName: String, session: Session): Session = { session.set(argTypeName, toArray(session(argTypeName).as[JList[String]])) .set(argValueName, toArray(session(argValueName).as[JList[Any]])) }
private def toArray[](value: JList[T]): Array[T] = { value.asScala.toArray }}
object Predef extends DubboDsl
复制代码

Dubbo 压测脚本和数据 Feeder 示例

压测脚本示例


import io.gatling.core.Predef._import io.gatling.dubbo.Predef._
import scala.concurrent.duration._
class DubboTest extends Simulation { val dubboConfig = Dubbo .protocol("dubbo") .generic("true") //直连某台Dubbo机器,只单独压测一台机器的水位 .url("dubbo://IP地址:端口") //或设置注册中心,压测该Dubbo应用集群的水位,支持ETCD3注册中心 .registryProtocol("") .registryAddress("")
val jsonFileFeeder = jsonFile("data.json").circular //数据Feeder val dubboScenario = scenario("load test dubbo") .forever("repeated") { feed(jsonFileFeeder) .exec(session => transformJsonDubboData("args_types1", "args_values1", session)) .exec(dubbo("com.xxx.xxxService", "methodName") .argTypes("${args_types1}") .argValues("${args_values1}") .check(jsonPath("$.code").is("200")) ) }
setUp( dubboScenario.inject(atOnceUsers(10)) .throttle( reachRps(10) in (1 seconds), holdFor(30 seconds)) ).protocols(dubboConfig)}
复制代码


data.json 示例


[ { "args_types1": ["com.xxx.xxxDTO"], "args_values1": [{  "field1": "111",  "field2": "222",  "field3": "333" }] }]
复制代码

Dubbo 压测报告示例


活动推荐:

2023年9月3-5日,「QCon全球软件开发大会·北京站」 将在北京•富力万丽酒店举办。此次大会以「启航·AIGC软件工程变革」为主题,策划了大前端融合提效、大模型应用落地、面向 AI 的存储、AIGC 浪潮下的研发效能提升、LLMOps、异构算力、微服务架构治理、业务安全技术、构建未来软件的编程语言、FinOps 等近30个精彩专题。咨询购票可联系票务经理 18514549229(微信同手机号)。

2020-03-11 22:20698

评论

发布
暂无评论
发现更多内容

解开心锁,放飞自我

少油少糖八分饱

认识自己 读后感 阅读笔记 被讨厌的勇气

性能测试|JMeter逻辑控制器(四)

霍格沃兹测试开发学社

性能测试|JMeter逻辑控制器(五)

霍格沃兹测试开发学社

性能测试|JMeter逻辑控制器(七)

霍格沃兹测试开发学社

App自动化测试|Appium+Python自动化测试环境搭建(Windows)

霍格沃兹测试开发学社

Python 测试 Node appium

Nautilus Chain 主网上线,Zepoch 持有者将获第三轮 POSE 空投

大瞿科技

性能测试|JMeter逻辑控制器(二)

霍格沃兹测试开发学社

性能测试|JMeter逻辑控制器(十)

霍格沃兹测试开发学社

App自动化测试|Appium-Desktop界面介绍

霍格沃兹测试开发学社

Python 测试 appium

App自动化测试|Appium元素定位工具

霍格沃兹测试开发学社

Python 测试 appium

接口自动化测试|Requests库的安装与介绍

霍格沃兹测试开发学社

文本生成图像DALL·E 2背后的原理——Diffusion Model | 社区征文

秃头小苏

年中技术盘点

性能测试|JMeter逻辑控制器(三)

霍格沃兹测试开发学社

App自动化测试|Appium工作原理及Desired Capbilities配置

霍格沃兹测试开发学社

Python 测试 appium

性能测试|JMeter参数化(一)

霍格沃兹测试开发学社

具身智能,是机器人的“冷饭热炒”吗?

脑极体

AI

Linux系统命令大全。

百度搜索:蓝易云

云计算 Linux 运维 服务器 命令

MongoDB源码学习:创建记录和索引(insertDocuments)

云里有只猫

mongodb 源码解读

性能测试|JMeter逻辑控制器(九)

霍格沃兹测试开发学社

App自动化测试|原生App元素定位方法(二)

霍格沃兹测试开发学社

接口测试|postman的介绍和安装

霍格沃兹测试开发学社

时光“摆渡者”,让回忆“闪现”眼前

脑极体

调整自我,安然入眠

少油少糖八分饱

读书笔记 读书感悟 #读书 睡眠 我们为什么要睡觉

性能测试|JMeter逻辑控制器(八)

霍格沃兹测试开发学社

PoseiSwap 即将开启 POSE 单币质押,治理体系将全面运行

威廉META

PoseiSwap 即将开启 POSE 单币质押,治理体系将全面运行

鳄鱼视界

性能测试|JMeter逻辑控制器(六)

霍格沃兹测试开发学社

性能测试|JMeter连接数据库

霍格沃兹测试开发学社

App自动化测试|Appium介绍

霍格沃兹测试开发学社

Python 测试 appium

App自动化测试|原生app元素定位方法

霍格沃兹测试开发学社

vscode配置gitbash终端

向阳逐梦

  • 扫码添加小助手
    领取最新资料包
Dubbo压测插件的实现——基于Gatling_文化 & 方法_有赞技术_InfoQ精选文章