大数跨境
0
0

Dubbo 压测插件 2.0 —— 基于普通 API 调用的压测实践

Dubbo 压测插件 2.0 —— 基于普通 API 调用的压测实践 阿里巴巴中间件
2019-08-22
3
导读:上周8月17日的 Dubbo 开发者日活动上海站已经结束.
Dubbo 开发者日活动@上海

上周8月17日的 Dubbo 开发者日活动上海站已经结束,在公众号对话框内回复Dubbo”,可获取活动视频回放和活动 PPT 集锦。

本文转载自有赞coder公众号,作者聂风给出了基于普通 API 调用的 Dubbo 压测实践,该项目 Gatling 已经开源,欢迎使用,地址:
https://github.com/youzan/gatling-dubbo


- 正文开始 -
🧯

可达性分析算法(Reachability Analysis)的基本思路是,通过一些被称为引用链(GC Roots)的对象作为起点,从这些节点开始向下搜索,搜索走过的路径被称为(Reference Chain),当一个对象到 GC Roots 没有任何引用链相连时(即从 GC Roots 节点到该节点不可达),则证明该对象是不可用的。

《Dubbo 压测插件的实现——基于 Gatling》中,我们介绍了基于 Dubbo 泛化调用实现的 Gatling Dubbo 压测插件,使用泛化调用发起 Dubbo 压测请求,consumer 端不需要拿到 provider 端的 API 包,使用上很便利,但是众所周知,Dubbo 泛化调用的性能不如普通 API 调用,虽然可以优化并使之达到与普通 API 调用相近的性能,但仍存在一些局限性。生产中除了网关等特殊应用外,一般很少使用泛化调用,如果以泛化调用的性能来表征生产中普通 API 调用的性能,其压测结论很难令人信服。做压测的时候,一般要求各种条件如环境等都尽可能保持一致。所以,我们又开发了基于普通 API 调用的 Gatling Dubbo 压测插件,即 gatling-dubbo2.0。此外,依托于 Gatling 强大的基础能力, gatling-dubbo2.0 相比于 Jmeter 还存在以下几方面的优势:


  • 更强的场景编排能力,支持多场景同时编排,如仿真电商业务中同时存在普通下单、团购、秒杀等多种交易类型的场景
  • 支持设置场景内流量模型,如漏斗模型,仿真用户从商品浏览 -> 加入购物车 -> 下单 -> 支付过程中的各级转化率
  • 不需要安装额外插件,原生支持设置压力模型,如设置压测需要达到的目标 RPS,甚至逐级加压进行梯度压力测试
  • 更低的资源消耗,更高的并发能力



一、插件主要组成



Action 和 ActionBuild
执行部分,这里的作用是发起 Dubbo 请求,校验请求结果并记录日志以便后续生成压测报告。ActionBuild 则为 DSL 使用 Action 的辅助类


Check 和 CheckBuild
校验部分,全链路压测中我们使用 json path 校验 HTTP 请求结果,这里我们实现了一样的校验方法,而且,对于一些不规范的返回结果(如返回了基本数据类型),还增加了自定义校验方法。CheckBuild 则为 DSL 使用 Check 的辅助类。


DSL
插件的领域特定语言,提供简单易用的 API 方便编写 Dubbo 压测脚本。


1.1 Action


DubboAction 包含了发起 Dubbo 请求、请求结果校验以及压力控制逻辑,需要扩展 Gatling 的 ExitableAction 并实现 execute 方法。


DubboAction 的入参 f 是一个函数,从压测脚本传入,函数负责组织 Dubbo 请求,从 session 中取值并动态构造请求参数。这一过程类似于使用 Jmeter 压测 Java 接口,即扩展 AbstractJavaSamplerClient。所以,gatling-dubbo 2.0 也支持非 dubbo 的其他 java 调用压测,因为 f 怎么写的控制权完全掌握在写压测脚本的人手里(本质上,远程调用和本地调用的客户端使用方式上并没有区别)。

所有虚拟用户以并发方式执行 execute 方法,每个用户又以异步方式执行 Dubbo 请求,且无论请求是否正确返回,都需要记录相应的成功或失败日志,失败可能是由于请求失败了,也可能是请求成功了,但是校验请求结果失败了。下一步就是准备发起新的 Dubbo 请求,如果开启了 Rps 阀门(throttled),则会根据当前的 Rps 和 Rps 阀门阈值动态调整发送请求的频率,在施压机(consumer)未达到性能瓶颈的情况下,可以很稳定的保持在设置的 Rps 目标值上进行压测。如果 Rps 阀门未开启,则直接发起新的 Dubbo 请求(通过 AKKA Message 触发)。


 

  1. class DubboAction[A]( requestName: Expression[String],



  2. f: (Session) => A,



  3. val executor: ExecutorService,



  4. val objectMapper: ObjectMapper,



  5. checks: List[DubboCheck],



  6. coreComponents: CoreComponents,



  7. throttled: Boolean,



  8. val next: Action



  9. ) extends ExitableAction with NameGen {



  10. ......



  11. override def execute(session: Session): Unit = recover(session) {



  12. requestName(session) map { reqName =>



  13. val startTime = System.currentTimeMillis()



  14. val fu = Future {



  15. try {



  16. f(session)



  17. } finally {



  18. }



  19. }




  20. fu.onComplete {



  21. case Success(result) =>



  22. val endTime = System.currentTimeMillis()



  23. val resultJson = objectMapper.writeValueAsString(result)



  24. val (newSession, error) = Check.check(resultJson, session, checks)



  25. error match {



  26. case None =>



  27. statsEngine.logResponse(session, reqName, ResponseTimings(startTime, endTime), Status("OK"), None, None)



  28. throttle(newSession(session))



  29. case Some(Failure(errorMessage)) =>



  30. statsEngine.logResponse(session, reqName, ResponseTimings(startTime, endTime), Status("KO"), None, Some(errorMessage))



  31. throttle(newSession(session).markAsFailed)



  32. }




  33. case UFailure(e) =>



  34. val endTime = System.currentTimeMillis()



  35. statsEngine.logResponse(session, reqName, ResponseTimings(startTime, endTime), Status("KO"), None, Some(e.getMessage))



  36. throttle(session.markAsFailed)



  37. }



  38. }



  39. }




  40. private def throttle(s: Session): Unit = {



  41. if (throttled) {



  42. coreComponents.throttler.throttle(s.scenario, () => next ! s)



  43. } else {



  44. next ! s



  45. }



  46. }



  47. }



DubboActionBuilder 负责创建线程池并实例化 DubboAction:
  

  1. case class DubboActionBuilder[A](requestName: Expression[String], f: (Session) => A, checks: List[DubboCheck], threadPoolSize: Int) extends ActionBuilder {



  2. override def build(ctx: ScenarioContext, next: Action): Action = {



  3. import ctx._



  4. val executor = Executors.newFixedThreadPool(threadPoolSize)



  5. val objectMapper: ObjectMapper = new ObjectMapper()



  6. new DubboAction[A](requestName, f, executor, objectMapper, checks, coreComponents, throttled, next)



  7. }



  8. }



LambdaProcessBuilder 提供了设置 check 条件的 DSL 和 设置线程池大小的 DSL:
有赞的施压机是 4 核 8Gb 内存的,我们为其设置的默认线程池大小为 200,与 Dubbo 应用部署环境一致。你可以使用 DSL threadPoolSize(threadPoolSize: Int) 按照你的机器配置设置一个合适的线程池大小。如果施压机成了性能瓶颈,你可以考虑将其改造成集群来施压,具体可参考《有赞全链路压测引擎的设计与实现》
 

  1. case class DubboProcessBuilder[A](requestName: Expression[String], f: (Session) => A, checks: List[DubboCheck] = Nil, threadPoolSize: Int = 200) extends DubboCheckSupport {



  2. def check(dubboChecks: DubboCheck*): DubboProcessBuilder[A] = copy[A](checks = checks ::: dubboChecks.toList)




  3. def threadPoolSize(threadPoolSize: Int): DubboProcessBuilder[A] = copy[A](threadPoolSize = threadPoolSize)




  4. def build(): ActionBuilder = DubboActionBuilder[A](requestName, f, checks, threadPoolSize)



  5. }



1.2 Check


全链路压测中,我们使用 json path 校验 HTTP 请求结果,Dubbo 压测插件中,我们也实现了基于 json path 的校验方法:
 

  1. package object dubbo {



  2. type DubboCheck = Check[String]




  3. val DubboStringExtender: Extender[DubboCheck, String] =



  4. (check: DubboCheck) => check




  5. val DubboStringPreparer: Preparer[String, String] =



  6. (result: String) => Success(result)



  7. }


 

  1. trait DubboJsonPathOfType {



  2. self: DubboJsonPathCheckBuilder[String] =>




  3. def ofType[X: JsonFilter](implicit extractorFactory: JsonPathExtractorFactory) = new DubboJsonPathCheckBuilder[X](path, jsonParsers)



  4. }




  5. object DubboJsonPathCheckBuilder {



  6. val CharsParsingThreshold = 200 * 1000




  7. def preparer(jsonParsers: JsonParsers): Preparer[String, Any] =



  8. response => {



  9. if (response.length() > CharsParsingThreshold || jsonParsers.preferJackson)



  10. jsonParsers.safeParseJackson(response)



  11. else



  12. jsonParsers.safeParseBoon(response)



  13. }




  14. def jsonPath(path: Expression[String])(implicit extractorFactory: JsonPathExtractorFactory, jsonParsers: JsonParsers) =



  15. new DubboJsonPathCheckBuilder[String](path, jsonParsers) with DubboJsonPathOfType



  16. }




  17. class DubboJsonPathCheckBuilder[X: JsonFilter](



  18. private[check] val path: Expression[String],



  19. private[check] val jsonParsers: JsonParsers



  20. )(implicit extractorFactory: JsonPathExtractorFactory)



  21. extends DefaultMultipleFindCheckBuilder[DubboCheck, String, Any, X](



  22. DubboStringExtender,



  23. DubboJsonPathCheckBuilder.preparer(jsonParsers)



  24. ) {



  25. import extractorFactory._




  26. def findExtractor(occurrence: Int) = path.map(newSingleExtractor[X](_, occurrence))



  27. def findAllExtractor = path.map(newMultipleExtractor[X])



  28. def countExtractor = path.map(newCountExtractor)



  29. }



但有时候存在一些不规范的情况,dubbo 接口的返回结果并不能直接转化为 json,如返回了基本数据类型,所以我们还提供了自定义校验方法,可以将这样的返回结果转化为 String 类型,并使用字符串比较、正则表达式匹配等方法校验返回结果:
  

  1. case class DubboCustomCheck(func: String => Boolean, failureMessage: String = "Dubbo check failed") extends DubboCheck {



  2. override def check(response: String, session: Session)(implicit cache: mutable.Map[Any, Any]): Validation[CheckResult] = {



  3. func(response) match {



  4. case true => CheckResult.NoopCheckResultSuccess



  5. case _ => Failure(failureMessage)



  6. }



  7. }



  8. }



DubboCheckSupport 则提供了 json path、 custom 两种检验方式的 DSL
  

  1. trait DubboCheckSupport {



  2. def jsonPath(path: Expression[String])(implicit extractorFactory: JsonPathExtractorFactory, jsonParsers: JsonParsers) =



  3. DubboJsonPathCheckBuilder.jsonPath(path)




  4. def custom = DubboCustomCheck



  5. }



Dubbo 压测脚本中可以设置一个或多个 check 来校验请求结果


1.3 DSL


DubboDsl 提供顶层 DSL,隐式方法 
dubboProcessBuilder2ActionBuilderScala 用于自动从 DubboProcessBuilder 构造 ActionBuilder
 

  1. trait DubboDsl extends DubboCheckSupport {



  2. def dubbo[A](requestName: Expression[String], f: (Session) => A) = DubboProcessBuilder[A](requestName, f)




  3. implicit def dubboProcessBuilder2ActionBuilder[A](builder: DubboProcessBuilder[A]): ActionBuilder = builder.build()



  4. }




二、示例


2.1 压测脚本示例

 

  1. class Mix extends Simulation {



  2. val application = new ApplicationConfig()



  3. application.setName("gatling-dubbo")




  4. // 初始化 AService



  5. val referenceAService = new ReferenceConfig[AService]



  6. referenceAService.setApplication(application)



  7. referenceAService.setUrl("dubbo://IP:PORT/com.xxx.service.AService")



  8. referenceAService.setInterface(classOf[AService])



  9. val aService = referenceAService.get()




  10. // 初始化 BService



  11. val referenceBService = new ReferenceConfig[BService]



  12. referenceBService.setApplication(application)



  13. referenceBService.setUrl("dubbo://IP:PORT/com.yyy.service.BService")



  14. referenceBService.setInterface(classOf[BService])



  15. val bService = referenceBService.get()




  16. // 设置数据源



  17. val jsonFileFeeder = jsonFile("data.json").shuffle.circular



  18. val mixScenario = scenario("scenario of mix")



  19. .forever("tripsCount") {



  20. feed(jsonFileFeeder)



  21. .randomSwitch(11d -> exec(



  22. dubbo("com.xxx.service.AService.aMethod", fAMethod)



  23. .check(jsonPath("$.success").is("true"))



  24. )



  25. )



  26. .randomSwitch(4d -> exec(



  27. dubbo("com.yyy.service.BService.bMethod", fBMethod)



  28. .check(jsonPath("$.success").is("true"))



  29. )



  30. )



  31. .randomSwitch(5d -> exec(



  32. ......



  33. )



  34. ......



  35. )



  36. }




  37. setUp(mixScenario.inject(constantUsersPerSec(100) during (10 seconds)).throttle(reachRps(1000) in (1 seconds), holdFor(120 seconds)))




  38. // 设置 aMethod 的请求参数并调用



  39. def fAMethod(session: Session): Object = {



  40. val aParam = new AParam()



  41. aParam.setName("A Name");



  42. // 从 session 中获取动态参数并设置



  43. aParam.setAId(session.attributes("aId").asInstanceOf[Integer].toLong);



  44. aService.aMethod(aParam);



  45. }




  46. // 设置 bMethod 的请求参数并调用



  47. def fBMethod(session: Session): Object = {



  48. val bParam = new BParam()



  49. bParam.setAge(26)



  50. // 从 session 中获取动态参数并设置



  51. bParam.setBId(session.attributes("bId").asInstanceOf[Integer].toLong)



  52. bService.bMethod(bParam);



  53. }




  54. def fXxx(session: Session): Object = {



  55. ......



  56. }



  57. }



randomSwitch 的作用:
以上示例其实是 gatling-dubbo 在有赞的一个典型使用场景,即评估一个应用的单实例性能。按生产环境真实的接口调用比例请求各个接口(该比例由场景执行各个请求的概率分布模拟),这样的压测结果就可以真实反映生产环境应用的单实例性能,并为容量报警、生产扩容等提供参考依据。


2.2 压测数据示例


 

  1. [



  2. {



  3. "aId": 160,



  4. "bId": 859296



  5. },



  6. {



  7. "aId": 160,



  8. "bId": 1019040



  9. },



  10. {



  11. "aId": 160,



  12. "bId": 1221792



  13. },



  14. ......



  15. ]



压测数据使用 Json 数组保存,其中每一个 Json 对象都包含了一次压测请求所需的所有动态参数,且为了方便通过 session 设置动态参数,Json 对象中不再嵌套其他 Json 对象。


2.3 压测报告示例


1、应用基线性能评估,用于精准扩容:

 
2、中心化限流效果验证:


/ 点击下方图片,报名参加 /

Tips:
# 点下“看”❤️
# 然后,公众号对话框内发送“包您满意”,试试手气?😆
# 本期奖品是来自淘宝心选的休闲帆布双肩包。
【声明】内容源于网络
0
0
阿里巴巴中间件
Aliware阿里巴巴中间件官方账号
内容 920
粉丝 0
阿里巴巴中间件 Aliware阿里巴巴中间件官方账号
总阅读512
粉丝0
内容920