
01
—
简单介绍
suishen-esb中,提供了Dubbo对Hystrix的集成;
-
Hystrix内部使用了线程池完成具体的任务执行; -
每一个远程service使用独立的线程池; -
内部封装中,线程池的核心线程数和最大线程数默认为30,等待队列使用SynchronousQueue(不接受等待任务),拒绝策略为AbortPolicy(线程池无法接受时抛出异常); -
当瞬时并发数超出最大线程数时,dubbo调用执行异常。
02
—
事件脉络
-
用户反馈使用异常,紧急查看日志
org.apache.dubbo.rpc.RpcException: Failed to invoke the method validLoginAuthentication in the service weli.wormhole.rpc.user.center.api.IAuthenticationService. Tried 1 times of the providers [10.65.0.205:11090] (1/4) from the registry node1.zk.all.platform.wtc.hwhosts.com:2181 on the consumer 10.65.0.34 using the dubbo version 2.7.3-SNAPSHOT. Last error is: validLoginAuthentication_1 could not be queued for execution and fallback failed.at org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:113)at org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:248)at org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:78)at org.apache.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:55)at org.apache.dubbo.common.bytecode.proxy14.validLoginAuthentication(proxy14.java)at weli.peanut.web.interceptor.VerifyLoginInterceptor.preHandle(VerifyLoginInterceptor.java:134)at org.springframework.web.servlet.HandlerExecutionChain.applyPreHandle(HandlerExecutionChain.java:134)at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:958)at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:897)at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:970)at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:861)at javax.servlet.http.HttpServlet.service(HttpServlet.java:620)at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:846)at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:303)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:197)at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:220)at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:122)at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:501)at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:170)at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:98)at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:116)at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:408)at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1040)at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:607)at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1721)at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1679)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)Caused by: com.netflix.hystrix.exception.HystrixRuntimeException: validLoginAuthentication_1 could not be queued for execution and fallback failed.at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:818)at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:793)at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140)at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)at com.netflix.hystrix.AbstractCommand$DeprecatedOnFallbackHookApplication$1.onError(AbstractCommand.java:1454)at com.netflix.hystrix.AbstractCommand$FallbackHookApplication$1.onError(AbstractCommand.java:1379)at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)at rx.observers.Subscribers$5.onError(Subscribers.java:230)at rx.internal.operators.OnSubscribeThrow.call(OnSubscribeThrow.java:44)at rx.internal.operators.OnSubscribeThrow.call(OnSubscribeThrow.java:28)at rx.Observable.unsafeSubscribe(Observable.java:10151)at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)at rx.Observable.unsafeSubscribe(Observable.java:10151)at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)at rx.Observable.unsafeSubscribe(Observable.java:10151)at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)at rx.Observable.unsafeSubscribe(Observable.java:10151)at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)at rx.Observable.unsafeSubscribe(Observable.java:10151)at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)at rx.Observable.unsafeSubscribe(Observable.java:10151)at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:142)at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87)at rx.Observable.unsafeSubscribe(Observable.java:10158)at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)at rx.Observable.unsafeSubscribe(Observable.java:10151)at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)at rx.Observable.unsafeSubscribe(Observable.java:10151)at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)at rx.Observable.unsafeSubscribe(Observable.java:10151)at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)at rx.Observable.unsafeSubscribe(Observable.java:10151)at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)at rx.Observable.unsafeSubscribe(Observable.java:10151)at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)at rx.Observable.unsafeSubscribe(Observable.java:10151)at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)at rx.Observable.unsafeSubscribe(Observable.java:10151)at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)at rx.Observable.unsafeSubscribe(Observable.java:10151)at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)at rx.Observable.unsafeSubscribe(Observable.java:10151)at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)at rx.Observable.subscribe(Observable.java:10247)at rx.Observable.subscribe(Observable.java:10214)at rx.internal.operators.BlockingOperatorToFuture.toFuture(BlockingOperatorToFuture.java:51)at rx.observables.BlockingObservable.toFuture(BlockingObservable.java:411)at com.netflix.hystrix.HystrixCommand.queue(HystrixCommand.java:377)at com.netflix.hystrix.HystrixCommand.execute(HystrixCommand.java:343)at suishen.esb.hystrix.dubbo.filter.HystrixFilter.invoke(HystrixFilter.java:46)at com.alibaba.dubbo.rpc.Filter.invoke(Filter.java:29)at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)at org.apache.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:92)at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)at org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter.invoke(FutureFilter.java:54)at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)at org.apache.dubbo.rpc.filter.ConsumerContextFilter.invoke(ConsumerContextFilter.java:58)at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$CallbackRegistrationInvoker.invoke(ProtocolFilterWrapper.java:157)at org.apache.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:56)at org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:82)... 36 common frames omittedCaused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@1bf63b71 rejected from java.util.concurrent.ThreadPoolExecutor@6fb1f813[Running, pool size = 30, active threads = 30, queued tasks = 0, completed tasks = 0]at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$ThreadPoolWorker.schedule(HystrixContextScheduler.java:172)at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$HystrixContextSchedulerWorker.schedule(HystrixContextScheduler.java:106)at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:45)at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:30)at rx.Observable.unsafeSubscribe(Observable.java:10151)... 91 common frames omitted
根据日志,发现是由于线程池打满造成了任务的拒绝执行,起初认为是提供方的dubbo线程池被打满,迅速排查中台日志;
{"@timestamp":"2022-04-30T22:45:00.000+08:00","@version":1,"message":"dubbo监控10.65.3.23 核心线程数:400,历史最高线程数:400,最大线程数:400,活跃线程数:0,当前线程数:400,队列大小:0,任务总量:48363297,已完成任务量:48363297","logger_name":"wormholeBusiness","thread_name":"qbScheduler-6","level":"INFO","level_value":20000}{"@timestamp":"2022-04-30T22:45:00.000+08:00","@version":1,"message":"dubbo监控10.65.3.44 核心线程数:400,历史最高线程数:400,最大线程数:400,活跃线程数:2,当前线程数:400,队列大小:0,任务总量:48371189,已完成任务量:48371187","logger_name":"wormholeBusiness","thread_name":"qbScheduler-3","level":"INFO","level_value":20000}
-
发现中台服务正常,dubbo空闲线程也比较充裕; -
在回头看调用方异常信息,发现调用方使用了Hystrix,抛出异常的是Hystrix内部的线程池; -
此时紧急增加节点,重启服务后,业务开始正常。
03
—
问题分析
根据日志分析,是由HystrixFilter执行了HystrixCommand.execute()造成了异常。
@Activate(group = Constants.CONSUMER)public class HystrixFilter implements Filter {public HystrixFilter() {ApplicationContext springContext = ApplicationContextHolder.getContext();if (springContext != null && !springContext.containsBean(HystrixSpringService.class.getSimpleName())) {BeanFactory beanFactory = springContext.getAutowireCapableBeanFactory();if (beanFactory instanceof DefaultListableBeanFactory) {BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.rootBeanDefinition(HystrixSpringService.class);beanDefinitionBuilder.setDestroyMethodName("preDestroy");beanDefinitionBuilder.setScope("singleton");((DefaultListableBeanFactory) beanFactory).registerBeanDefinition(HystrixSpringService.class.getSimpleName(), beanDefinitionBuilder.getBeanDefinition());//触发初始化beanFactory.getBean(HystrixSpringService.class.getSimpleName());}}}@Overridepublic Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {// 异步调用使用hystrix做熔断没有意义if ("true".equalsIgnoreCase(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY))) {return invoker.invoke(invocation);}DubboHystrixCommand command = new DubboHystrixCommand(invoker, invocation);return command.execute();}}
当执行的dubbo调用为同步调用时,会创建DubboHystrixCommand,交由Hystrix执行远程调用。
public class DubboHystrixCommand extends HystrixCommand<Result> {private static Logger logger = Logger.getLogger(DubboHystrixCommand.class);private static final int DEFAULT_THREADPOOL_CORE_SIZE = 30;private static final int CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS = 30000;private static final int DEFAULT_FAIL_QPS_THRESHOLD = 20;private Invoker invoker;private Invocation invocation;public DubboHystrixCommand(Invoker<?> invoker, Invocation invocation) {super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(invoker.getInterface().getName())).andCommandKey(HystrixCommandKey.Factory.asKey(String.format("%s_%d", invocation.getMethodName(),invocation.getArguments() == null ? 0 : invocation.getArguments().length))).andCommandPropertiesDefaults(HystrixCommandProperties.Setter()//10秒钟内请求失败上限值,超过此值熔断器发挥作用.withCircuitBreakerRequestVolumeThreshold(getFailQpsThreshold(invoker.getUrl()) * 10)//熔断器中断请求30秒后会进入半打开状态,放部分流量过去重试.withCircuitBreakerSleepWindowInMilliseconds(getCircuitBreakerSleepWindowInMilliseconds(invoker.getUrl()))//错误率达到50开启熔断保护.withCircuitBreakerErrorThresholdPercentage(50)//使用dubbo的超时,禁用这里的超时.withExecutionTimeoutEnabled(false))//根据dubbo配置设置线程池大小.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(getThreadPoolCoreSize(invoker.getUrl()))));this.invoker = invoker;this.invocation = invocation;}/*** 获取每秒请求失败的阈值,超过此阈值熔断器开始生效** @param url* @return*/private static int getFailQpsThreshold(URL url) {if (url != null) {int threshold = url.getParameter("FailQpsThreshold", DEFAULT_FAIL_QPS_THRESHOLD);if (logger.isDebugEnabled()) {logger.debug("FailQpsThreshold: " + threshold);}return threshold;}return DEFAULT_FAIL_QPS_THRESHOLD;}/*** 获取熔断器中断请求窗口大小** @param url* @return*/private static int getCircuitBreakerSleepWindowInMilliseconds(URL url) {if (url != null) {int circuitBreakerSleepWindowInMilliseconds = url.getParameter("CircuitBreakerSleepWindowInMilliseconds", CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS);if (logger.isDebugEnabled()) {logger.debug("circuitBreakerSleepWindowInMilliseconds: " + circuitBreakerSleepWindowInMilliseconds);}return circuitBreakerSleepWindowInMilliseconds;}return CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS;}/*** 获取线程池大小** @param url* @return*/private static int getThreadPoolCoreSize(URL url) {if (url != null) {int size = url.getParameter("ThreadPoolCoreSize", DEFAULT_THREADPOOL_CORE_SIZE);if (logger.isDebugEnabled()) {logger.debug("ThreadPoolCoreSize: " + size);}return size;}return DEFAULT_THREADPOOL_CORE_SIZE;}@Overrideprotected Result run() throws Exception {Throwable exception = null;Result result = null;try {result = invoker.invoke(invocation);exception = result.getException();} catch (Exception e) {exception = e;}// 这里打印异常是为了记录异常,再抛出异常是为了触发fallbackif (exception != null) {Logs.error("Dubbo Exception: ", exception);throw new Exception(exception);}return result;}@Overrideprotected Result getFallback() {return new RpcResult((Object) null);}}public class DubboHystrixCommand extends HystrixCommand<Result> {private static Logger logger = Logger.getLogger(DubboHystrixCommand.class);private static final int DEFAULT_THREADPOOL_CORE_SIZE = 30;private static final int CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS = 30000;private static final int DEFAULT_FAIL_QPS_THRESHOLD = 20;private Invoker invoker;private Invocation invocation;public DubboHystrixCommand(Invoker<?> invoker, Invocation invocation) {super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(invoker.getInterface().getName())).andCommandKey(HystrixCommandKey.Factory.asKey(String.format("%s_%d", invocation.getMethodName(),invocation.getArguments() == null ? 0 : invocation.getArguments().length))).andCommandPropertiesDefaults(HystrixCommandProperties.Setter()//10秒钟内请求失败上限值,超过此值熔断器发挥作用.withCircuitBreakerRequestVolumeThreshold(getFailQpsThreshold(invoker.getUrl()) * 10)//熔断器中断请求30秒后会进入半打开状态,放部分流量过去重试.withCircuitBreakerSleepWindowInMilliseconds(getCircuitBreakerSleepWindowInMilliseconds(invoker.getUrl()))//错误率达到50开启熔断保护.withCircuitBreakerErrorThresholdPercentage(50)//使用dubbo的超时,禁用这里的超时.withExecutionTimeoutEnabled(false))//根据dubbo配置设置线程池大小.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(getThreadPoolCoreSize(invoker.getUrl()))));this.invoker = invoker;this.invocation = invocation;}/*** 获取每秒请求失败的阈值,超过此阈值熔断器开始生效** @param url* @return*/private static int getFailQpsThreshold(URL url) {if (url != null) {int threshold = url.getParameter("FailQpsThreshold", DEFAULT_FAIL_QPS_THRESHOLD);if (logger.isDebugEnabled()) {logger.debug("FailQpsThreshold: " + threshold);}return threshold;}return DEFAULT_FAIL_QPS_THRESHOLD;}/*** 获取熔断器中断请求窗口大小** @param url* @return*/private static int getCircuitBreakerSleepWindowInMilliseconds(URL url) {if (url != null) {int circuitBreakerSleepWindowInMilliseconds = url.getParameter("CircuitBreakerSleepWindowInMilliseconds", CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS);if (logger.isDebugEnabled()) {logger.debug("circuitBreakerSleepWindowInMilliseconds: " + circuitBreakerSleepWindowInMilliseconds);}return circuitBreakerSleepWindowInMilliseconds;}return CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS;}/*** 获取线程池大小** @param url* @return*/private static int getThreadPoolCoreSize(URL url) {if (url != null) {int size = url.getParameter("ThreadPoolCoreSize", DEFAULT_THREADPOOL_CORE_SIZE);if (logger.isDebugEnabled()) {logger.debug("ThreadPoolCoreSize: " + size);}return size;}return DEFAULT_THREADPOOL_CORE_SIZE;}@Overrideprotected Result run() throws Exception {Throwable exception = null;Result result = null;try {result = invoker.invoke(invocation);exception = result.getException();} catch (Exception e) {exception = e;}// 这里打印异常是为了记录异常,再抛出异常是为了触发fallbackif (exception != null) {Logs.error("Dubbo Exception: ", exception);throw new Exception(exception);}return result;}@Overrideprotected Result getFallback() {return new RpcResult((Object) null);}}
-
内部封装中,线程池的核心线程数和最大线程数默认为30,等待队列使用SynchronousQueue(不接受等待任务),拒绝策略为AbortPolicy(线程池无法接受时抛出异常); -
当瞬时并发数超出最大线程数时,dubbo调用执行异常。
04
—
处理
1、修改HystrixFilter,提供是否使用Hystrix开关,对于签名验证等核心接口,选择同步执行。
(group = Constants.CONSUMER)public class HystrixFilter implements Filter {public HystrixFilter() {ApplicationContext springContext = ApplicationContextHolder.getContext();if (springContext != null && !springContext.containsBean(HystrixSpringService.class.getSimpleName())) {BeanFactory beanFactory = springContext.getAutowireCapableBeanFactory();if (beanFactory instanceof DefaultListableBeanFactory) {BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.rootBeanDefinition(HystrixSpringService.class);beanDefinitionBuilder.setDestroyMethodName("preDestroy");beanDefinitionBuilder.setScope("singleton");((DefaultListableBeanFactory) beanFactory).registerBeanDefinition(HystrixSpringService.class.getSimpleName(), beanDefinitionBuilder.getBeanDefinition());//触发初始化beanFactory.getBean(HystrixSpringService.class.getSimpleName());}}}public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {// 异步调用使用hystrix做熔断没有意义if ("true".equalsIgnoreCase(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY))|| "false".equalsIgnoreCase(invoker.getUrl().getMethodParameter(invocation.getMethodName(), "hystrixOpen"))) {return invoker.invoke(invocation);}DubboHystrixCommand command = new DubboHystrixCommand(invoker, invocation);return command.execute();}}
public class DubboHystrixCommand extends HystrixCommand<Result> {private static Logger logger = Logger.getLogger(DubboHystrixCommand.class);private static final int DEFAULT_THREAD_POOL_CORE_SIZE = 30;private static final int DEFAULT_THREAD_POOL_MAX_SIZE = 50;private static final int DEFAULT_THREAD_POOL_KEEP_ALIVE_TIME_MINUTES = 5;private static final int DEFAULT_FAIL_QPS_THRESHOLD = 20;private Invoker invoker;private Invocation invocation;public DubboHystrixCommand(Invoker<?> invoker, Invocation invocation) {super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(invoker.getInterface().getName())).andCommandKey(HystrixCommandKey.Factory.asKey(String.format("%s_%d", invocation.getMethodName(),invocation.getArguments() == null ? 0 : invocation.getArguments().length))).andCommandPropertiesDefaults(HystrixCommandProperties.Setter()//10秒钟内请求失败上限值,超过此值熔断器发挥作用.withCircuitBreakerRequestVolumeThreshold(getFailQpsThreshold(invoker.getUrl()) * 10)//熔断器中断请求30秒后会进入半打开状态,放部分流量过去重试.withCircuitBreakerSleepWindowInMilliseconds(30000)//错误率达到50开启熔断保护.withCircuitBreakerErrorThresholdPercentage(50)//使用dubbo的超时,禁用这里的超时.withExecutionTimeoutEnabled(false))//根据dubbo配置设置线程池大小.andThreadPoolPropertiesDefaults(getThreadPoolSetter(invoker.getUrl())));this.invoker = invoker;this.invocation = invocation;}/*** 获取每秒请求失败的阈值,超过此阈值熔断器开始生效** @param url* @return*/private static int getFailQpsThreshold(URL url) {if (url != null) {int threshold = url.getParameter("FailQpsThreshold", DEFAULT_FAIL_QPS_THRESHOLD);if (logger.isDebugEnabled()) {logger.debug("FailQpsThreshold: " + threshold);}return threshold;}return DEFAULT_FAIL_QPS_THRESHOLD;}private static HystrixThreadPoolProperties.Setter getThreadPoolSetter(URL url) {return HystrixThreadPoolProperties.Setter().withCoreSize(getThreadPoolProperties(url, "threadPoolCoreSize", DEFAULT_THREAD_POOL_CORE_SIZE)).withMaximumSize(getThreadPoolProperties(url, "threadPoolMaxSize", DEFAULT_THREAD_POOL_MAX_SIZE)).withKeepAliveTimeMinutes(getThreadPoolProperties(url, "threadPoolKeepAliveTimeMinutes",DEFAULT_THREAD_POOL_KEEP_ALIVE_TIME_MINUTES));}/*** 获取线程池大小** @param url* @return*/private static int getThreadPoolProperties(URL url, String name, int defaultProperties) {if (url != null) {int size = url.getParameter(name, defaultProperties);if (logger.isDebugEnabled()) {logger.debug(name + ": " + size);}return size;}return defaultProperties;}protected Result run() throws Exception {Throwable exception;Result result = null;try {result = invoker.invoke(invocation);exception = result.getException();} catch (Exception e) {exception = e;}// 有异常抛出if (exception != null) {Logs.error("dubbo exception: ", exception);throw new RuntimeException(exception);}return result;}}

