-
更强的场景编排能力,支持多场景同时编排,如仿真电商业务中同时存在普通下单、团购、秒杀等多种交易类型的场景 -
支持设置场景内流量模型,如漏斗模型,仿真用户从商品浏览 -> 加入购物车 -> 下单 -> 支付过程中的各级转化率 -
不需要安装额外插件,原生支持设置压力模型,如设置压测需要达到的目标 RPS,甚至逐级加压进行梯度压力测试 -
更低的资源消耗,更高的并发能力
一、插件主要组成
执行部分,这里的作用是发起 Dubbo 请求,校验请求结果并记录日志以便后续生成压测报告。ActionBuild 则为 DSL 使用 Action 的辅助类
校验部分,全链路压测中我们使用 json path 校验 HTTP 请求结果,这里我们实现了一样的校验方法,而且,对于一些不规范的返回结果(如返回了基本数据类型),还增加了自定义校验方法。CheckBuild 则为 DSL 使用 Check 的辅助类。
插件的领域特定语言,提供简单易用的 API 方便编写 Dubbo 压测脚本。
1.1 Action
class DubboAction[A]( requestName: Expression[String],
f: (Session) => A,
val executor: ExecutorService,
val objectMapper: ObjectMapper,
checks: List[DubboCheck],
coreComponents: CoreComponents,
throttled: Boolean,
val next: Action
) extends ExitableAction with NameGen {
......
override def execute(session: Session): Unit = recover(session) {
requestName(session) map { reqName =>
val startTime = System.currentTimeMillis()
val fu = Future {
try {
f(session)
} finally {
}
}
fu.onComplete {
case Success(result) =>
val endTime = System.currentTimeMillis()
val resultJson = objectMapper.writeValueAsString(result)
val (newSession, error) = Check.check(resultJson, session, checks)
error match {
case None =>
statsEngine.logResponse(session, reqName, ResponseTimings(startTime, endTime), Status("OK"), None, None)
throttle(newSession(session))
case Some(Failure(errorMessage)) =>
statsEngine.logResponse(session, reqName, ResponseTimings(startTime, endTime), Status("KO"), None, Some(errorMessage))
throttle(newSession(session).markAsFailed)
}
case UFailure(e) =>
val endTime = System.currentTimeMillis()
statsEngine.logResponse(session, reqName, ResponseTimings(startTime, endTime), Status("KO"), None, Some(e.getMessage))
throttle(session.markAsFailed)
}
}
}
private def throttle(s: Session): Unit = {
if (throttled) {
coreComponents.throttler.throttle(s.scenario, () => next ! s)
} else {
next ! s
}
}
}
DubboActionBuilder 负责创建线程池并实例化 DubboAction:
case class DubboActionBuilder[A](requestName: Expression[String], f: (Session) => A, checks: List[DubboCheck], threadPoolSize: Int) extends ActionBuilder {
override def build(ctx: ScenarioContext, next: Action): Action = {
import ctx._
val executor = Executors.newFixedThreadPool(threadPoolSize)
val objectMapper: ObjectMapper = new ObjectMapper()
new DubboAction[A](requestName, f, executor, objectMapper, checks, coreComponents, throttled, next)
}
}
有赞的施压机是 4 核 8Gb 内存的,我们为其设置的默认线程池大小为 200,与 Dubbo 应用部署环境一致。你可以使用 DSL threadPoolSize(threadPoolSize: Int) 按照你的机器配置设置一个合适的线程池大小。如果施压机成了性能瓶颈,你可以考虑将其改造成集群来施压,具体可参考《有赞全链路压测引擎的设计与实现》
case class DubboProcessBuilder[A](requestName: Expression[String], f: (Session) => A, checks: List[DubboCheck] = Nil, threadPoolSize: Int = 200) extends DubboCheckSupport {
def check(dubboChecks: DubboCheck*): DubboProcessBuilder[A] = copy[A](checks = checks ::: dubboChecks.toList)
def threadPoolSize(threadPoolSize: Int): DubboProcessBuilder[A] = copy[A](threadPoolSize = threadPoolSize)
def build(): ActionBuilder = DubboActionBuilder[A](requestName, f, checks, threadPoolSize)
}
1.2 Check
package object dubbo {
type DubboCheck = Check[String]
val DubboStringExtender: Extender[DubboCheck, String] =
(check: DubboCheck) => check
val DubboStringPreparer: Preparer[String, String] =
(result: String) => Success(result)
}
trait DubboJsonPathOfType {
self: DubboJsonPathCheckBuilder[String] =>
def ofType[X: JsonFilter](implicit extractorFactory: JsonPathExtractorFactory) = new DubboJsonPathCheckBuilder[X](path, jsonParsers)
}
object DubboJsonPathCheckBuilder {
val CharsParsingThreshold = 200 * 1000
def preparer(jsonParsers: JsonParsers): Preparer[String, Any] =
response => {
if (response.length() > CharsParsingThreshold || jsonParsers.preferJackson)
jsonParsers.safeParseJackson(response)
else
jsonParsers.safeParseBoon(response)
}
def jsonPath(path: Expression[String])(implicit extractorFactory: JsonPathExtractorFactory, jsonParsers: JsonParsers) =
new DubboJsonPathCheckBuilder[String](path, jsonParsers) with DubboJsonPathOfType
}
class DubboJsonPathCheckBuilder[X: JsonFilter](
private[check] val path: Expression[String],
private[check] val jsonParsers: JsonParsers
)(implicit extractorFactory: JsonPathExtractorFactory)
extends DefaultMultipleFindCheckBuilder[DubboCheck, String, Any, X](
DubboStringExtender,
DubboJsonPathCheckBuilder.preparer(jsonParsers)
) {
import extractorFactory._
def findExtractor(occurrence: Int) = path.map(newSingleExtractor[X](_, occurrence))
def findAllExtractor = path.map(newMultipleExtractor[X])
def countExtractor = path.map(newCountExtractor)
}
但有时候存在一些不规范的情况,dubbo 接口的返回结果并不能直接转化为 json,如返回了基本数据类型,所以我们还提供了自定义校验方法,可以将这样的返回结果转化为 String 类型,并使用字符串比较、正则表达式匹配等方法校验返回结果:
case class DubboCustomCheck(func: String => Boolean, failureMessage: String = "Dubbo check failed") extends DubboCheck {
override def check(response: String, session: Session)(implicit cache: mutable.Map[Any, Any]): Validation[CheckResult] = {
func(response) match {
case true => CheckResult.NoopCheckResultSuccess
case _ => Failure(failureMessage)
}
}
}
DubboCheckSupport 则提供了 json path、 custom 两种检验方式的 DSL
trait DubboCheckSupport {
def jsonPath(path: Expression[String])(implicit extractorFactory: JsonPathExtractorFactory, jsonParsers: JsonParsers) =
DubboJsonPathCheckBuilder.jsonPath(path)
def custom = DubboCustomCheck
}
Dubbo 压测脚本中可以设置一个或多个 check 来校验请求结果
1.3 DSL
trait DubboDsl extends DubboCheckSupport {
def dubbo[A](requestName: Expression[String], f: (Session) => A) = DubboProcessBuilder[A](requestName, f)
implicit def dubboProcessBuilder2ActionBuilder[A](builder: DubboProcessBuilder[A]): ActionBuilder = builder.build()
}
二、示例
2.1 压测脚本示例
class Mix extends Simulation {
val application = new ApplicationConfig()
application.setName("gatling-dubbo")
// 初始化 AService
val referenceAService = new ReferenceConfig[AService]
referenceAService.setApplication(application)
referenceAService.setUrl("dubbo://IP:PORT/com.xxx.service.AService")
referenceAService.setInterface(classOf[AService])
val aService = referenceAService.get()
// 初始化 BService
val referenceBService = new ReferenceConfig[BService]
referenceBService.setApplication(application)
referenceBService.setUrl("dubbo://IP:PORT/com.yyy.service.BService")
referenceBService.setInterface(classOf[BService])
val bService = referenceBService.get()
// 设置数据源
val jsonFileFeeder = jsonFile("data.json").shuffle.circular
val mixScenario = scenario("scenario of mix")
.forever("tripsCount") {
feed(jsonFileFeeder)
.randomSwitch(11d -> exec(
dubbo("com.xxx.service.AService.aMethod", fAMethod)
.check(jsonPath("$.success").is("true"))
)
)
.randomSwitch(4d -> exec(
dubbo("com.yyy.service.BService.bMethod", fBMethod)
.check(jsonPath("$.success").is("true"))
)
)
.randomSwitch(5d -> exec(
......
)
......
)
}
setUp(mixScenario.inject(constantUsersPerSec(100) during (10 seconds)).throttle(reachRps(1000) in (1 seconds), holdFor(120 seconds)))
// 设置 aMethod 的请求参数并调用
def fAMethod(session: Session): Object = {
val aParam = new AParam()
aParam.setName("A Name");
// 从 session 中获取动态参数并设置
aParam.setAId(session.attributes("aId").asInstanceOf[Integer].toLong);
aService.aMethod(aParam);
}
// 设置 bMethod 的请求参数并调用
def fBMethod(session: Session): Object = {
val bParam = new BParam()
bParam.setAge(26)
// 从 session 中获取动态参数并设置
bParam.setBId(session.attributes("bId").asInstanceOf[Integer].toLong)
bService.bMethod(bParam);
}
def fXxx(session: Session): Object = {
......
}
}
randomSwitch 的作用:
以上示例其实是 gatling-dubbo 在有赞的一个典型使用场景,即评估一个应用的单实例性能。按生产环境真实的接口调用比例请求各个接口(该比例由场景执行各个请求的概率分布模拟),这样的压测结果就可以真实反映生产环境应用的单实例性能,并为容量报警、生产扩容等提供参考依据。
2.2 压测数据示例
[
{
"aId": 160,
"bId": 859296
},
{
"aId": 160,
"bId": 1019040
},
{
"aId": 160,
"bId": 1221792
},
......
]
2.3 压测报告示例

