大纲 Hystrix 请求缓存 Hystrix 的请求缓存(Request Cache)并不同于传统意义上的缓存。它只在同一个请求上下文中生效:当某个命令(Command)第一次以特定参数执行完成后,其结果会被缓存;在该命令的后续执行中,如果再次调用相同参数的命令(Command),将直接返回缓存的结果。缓存的生命周期仅限于这一次请求上下文中有效。使用 HystrixCommand 有两种方式,第一种是使用继承,第二种是使用注解,请求缓存也同时支持这两种使用方式。具体使用例子如下,点击下载 完整的案例代码。
Hystrix 请求缓存与传统缓存的区别
(1) 作用范围:只在单个请求上下文(HystrixRequestContext 生命周期)中有效。 (2) 缓存内容:对于同一个 HystrixCommand,当 cacheKey 相同且请求缓存开启时,第二次执行不会真正发起远程调用,而是直接返回第一次的调用结果。 (3) 使用目的:避免在同一个请求中,对下游服务重复发起相同调用,提高性能并减少负载。 使用类来开启请求缓存 Hystrix 的请求缓存是在一次请求内有效,这要求请求必须在一个 Hystrix 上下文里,不然在使用请求缓存的时候 Hystrix 会报一个没有初始化上下文的异常;可以使用 filter 过滤器或者 Interceptor 拦截器来初始化上下文,然后在请求结束之后关闭上下文。使用类来开启请求缓存的方式很简单,只需要继承 HystrixCommand 类,然后重写它的 getCacheKey 方法即可,保证对于同一个请求返回同样的键值;对于请求缓存的清除,直接调用 HystrixRequestCache 类的 clear() 方法即可。
创建拦截器类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class CacheContextInterceptor implements HandlerInterceptor { private HystrixRequestContext context; @Override public boolean preHandle (HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { this .context = HystrixRequestContext.initializeContext(); return true ; } @Override public void postHandle (HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception { } @Override public void afterCompletion (HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { this .context.shutdown(); } }
创建配置类,用于注册拦截器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Configuration public class CommonConfiguration { @Bean @LoadBalanced public RestTemplate restTemplate () { return new RestTemplate(); } @Bean public CacheContextInterceptor userContextInterceptor () { return new CacheContextInterceptor(); } }
1 2 3 4 5 6 7 8 9 10 11 @Configuration public class WebMvcConfiguration extends WebMvcConfigurerAdapter { @Autowired CacheContextInterceptor userContextInterceptor; @Override public void addInterceptors (InterceptorRegistry registry) { registry.addInterceptor(userContextInterceptor); } }
继承 HystrixCommand 类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class UserCommand extends HystrixCommand <String > { private String userName; private RestTemplate restTemplate; private static final Logger logger = LoggerFactory.getLogger(UserCommand.class); private static final HystrixCommandKey KEY = HystrixCommandKey.Factory.asKey("CommandKey" ); public UserCommand (String userName, RestTemplate restTemplate) { super (Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("CacheGroup" )).andCommandKey(KEY)); this .userName = userName; this .restTemplate = restTemplate; } @Override protected String run () throws Exception { String result = restTemplate.getForObject("http://PROVIDER/user/getUser?userName={1}" , String.class, this .userName); logger.info(result); return result; } @Override protected String getFallback () { return super .getFallback(); } @Override protected String getCacheKey () { return this .userName; } public static void cleanCache (String userName) { HystrixRequestCache.getInstance(KEY, HystrixConcurrencyStrategyDefault.getInstance()).clear(userName); } }
用于测试的 Controller 类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @RestController @RequestMapping("/user") public class UserController { private static final Logger logger = LoggerFactory.getLogger(UserController.class); @Autowired private RestTemplate restTemplate; @GetMapping("/get") public String get (String userName) { UserCommand commandOne = new UserCommand(userName, restTemplate); commandOne.execute(); logger.info("from cache: " + commandOne.isResponseFromCache()); UserCommand commandTwo = new UserCommand(userName, restTemplate); commandTwo.execute(); logger.info("from cache: " + commandTwo.isResponseFromCache()); return "cache test finished" ; } }
启动主类:
1 2 3 4 5 6 7 8 9 @EnableHystrix @EnableDiscoveryClient @SpringBootApplication public class CacheApplication { public static void main (String[] args) { SpringApplication.run(CacheApplication.class, args); } }
访问 http://127.0.0.1:8082/user/get?userName=Tom,调用了两次 execute 方法,使用 Hystrix 的默认方法 isResponseFromCache 来判断请求结果是否来自于缓存,从以下输出可以看出第二次请求确实来自于缓存,此时说明 Hystrix 的请求缓存生效了。
1 2 c.s.study.controller.CacheController : from cache: false c.s.study.controller.CacheController : from cache: true
使用注解开启请求缓存 Hystrix 提供了 @CacheResult 注解来启用请求缓存机制,且更为方便和快捷;使用 @CacheResult 注解就可以缓存数据,还可以选择配合 @CacheKey 注解来指定缓存 Key。
引入 Maven 依赖:
1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > com.netflix.hystrix</groupId > <artifactId > hystrix-core</artifactId > </dependency > <dependency > <groupId > com.netflix.hystrix</groupId > <artifactId > hystrix-javanica</artifactId > </dependency >
使用注解缓存数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Service public class DeptService { private static final Logger logger = LoggerFactory.getLogger(DeptService.class); @Autowired private RestTemplate restTemplate; @CacheResult @HystrixCommand public String getDept (@CacheKey String deptName) { String result = restTemplate.getForObject("http://PROVIDER/dept/getDept?deptName={1}" , String.class, deptName); logger.info(result); return result; } }
用于测试的 Controller 类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @RestController @RequestMapping("/dept") public class DeptController { @Autowired private DeptService deptService; @GetMapping("/get") public String get (String deptName) { deptService.getDept("IT" ); deptService.getDept("IT" ); return "annotation cache test finished" ; } }
访问 http://127.0.0.1:8082/dept/get?deptName=IT,调用了两次 get 方法,发现只打印了一条数据,说明第二次的请求是从缓存中读取,即 Hystrix 的请求缓存生效了。
1 DeptService : {"id":1,"deptName":"IT"}
使用注解清除请求缓存 Hystrix 提供了 @CacheRemove 注解来清除缓存数据,通常需要通过 commandKey 参数来指定 HystrixCommand 的 commandKey(除非没有配置 commandKey);在清除缓存时,还可以选择配合 @CacheKey 注解来指定缓存 Key,以此清除指定的缓存数据。
引入 Maven 依赖:
1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > com.netflix.hystrix</groupId > <artifactId > hystrix-core</artifactId > </dependency > <dependency > <groupId > com.netflix.hystrix</groupId > <artifactId > hystrix-javanica</artifactId > </dependency >
使用注解清理缓存:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Service public class DeptService { private static final Logger logger = LoggerFactory.getLogger(DeptService.class); @Autowired private RestTemplate restTemplate; @CacheResult @HystrixCommand(commandKey = "findDept") public String findDept (@CacheKey String deptName) { String result = restTemplate.getForObject("http://PROVIDER/dept/getDept?deptName={1}" , String.class, deptName); logger.info(result); return result; } @CacheRemove(commandKey = "findDept") @HystrixCommand public String updateDept (@CacheKey String deptName) { logger.info("delete dept cache" ); return "update dept success" ; } }
用于测试的 Controller 类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @RestController @RequestMapping("/dept") public class DeptController { @Autowired private DeptService deptService; @GetMapping("/find") public String find (String deptName) { deptService.findDept("IT" ); deptService.findDept("IT" ); deptService.updateDept(deptName); deptService.findDept("IT" ); deptService.findDept("IT" ); return "annotation cache test finished" ; } }
访问 http://127.0.0.1:8082/dept/find?deptName=IT,运行结果如下;在没有缓存的情况下,打印了一次,第二次取的是缓存数据,然后清除缓存后又打印了一次,最后一次又从缓存里取数据:
1 2 3 DeptService : {"id":1,"deptName":"IT"} DeptService : delete dept cache DeptService : {"id":1,"deptName":"IT"}
特别注意
如果在对应的 HystrixCommand 上显式指定过 commandKey,那么在 @CacheRemove 注解上也必须指定相同的 commandKey,否则它不知道清哪个缓存。 如果没有在对应的 HystrixCommand 上显式指定过 commandKey,Hystrix 默认会用方法签名作为 commandKey,这种情况下 @CacheRemove 注解可以不指定 commandKey,Hystrix 会自动匹配。 请求缓存使用注意事项 Hystrix 常用的请求缓存注解:
@CacheResult:使用该注解后调用结果会被缓存,同时它需要和 @HystrixCommand 注解一起使用,注解参数为 cacheKeyMethod。 @CacheRemove:清除缓存,通常需要指定 commandKey 参数,注解参数为 commandKey、cacheKeyMethod。 在指定了 HystrixCommand 的 commandKey 后,在 @CacheRemove 注解上也要指定 commandKey 参数,否则它不知道清哪个缓存。 @CacheKey:指定缓存 Key,如果不指定,默认会将所有方法参数拼接起来作为缓存 Key,注解参数为 value@CacheKey 标记的参数会被拼接成缓存 Key,用 toString() 拼接;相同 cacheKey 的 HystrixCommand 在同一个上下文内多次执行时,才会命中缓存; 缓存是否命中,取决于用户如何重写 HystrixCommand 的 getCacheKey() 方法或使用 @CacheKey。 一般在查询接口上使用 @CacheResult 注解,在更新、删除接口上使用 @CacheRemove 注解来删除缓存。 使用 Hystrix 请求缓存时有几方面需要注意:
必须使用 @EnableHystrix 注解来启用 Hystrix。 必须初始化 HystrixRequestContext,无论是使用继承类还是注解的方式来开启请求缓存。 如果在对应的 HystrixCommand 上显式指定过 commandKey,那么在 @CacheRemove 注解上也必须指定相同的 commandKey,否则它不知道清哪个缓存。 通过继承 HystrixCommand 类来使用请求缓存时,HystrixCommand 的 getCacheKey() 方法必须返回非 Null,否则请求缓存不会生效。 Hystrix 请求合并 Request Collapser 介绍 Request Collapser 是 Hystrix 推出的针对多个请求调用单个后端依赖做的一种优化和节约网络开销的方法。引用官方的这张图,当发起 5 个请求时,在请求没有聚合和合并的情况下,是每个请求单独开启一个线程,并开启一个网络链接进行调用,这都会加重应用程序的负担和开销,并占用 Hystrix 的线程连接池。当使用 Collapser 把请求都合并起来时,则只需要一个线程和一个连接的开销,这大大减少了并发和请求执行所需要的线程数和网络连接数,尤其在一个时间段内有非常多请求的情况下能极大地提高资源利用率。特别注意:若使用 Feign 调用的话,目前还不支持 Collapser。具体使用例子如下,点击下载 完整的案例代码。
使用注解进行请求合并 使用 Request Collapser 也可以通过继承类和注解的形式来实现,下面主要介绍注解的使用方式。
Request Collapser 和 Hystrix 缓存的使用类似,需要实现 Hystrix 上下文的初始化和关闭,这里使用拦截器来实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class HystrixContextInterceptor implements HandlerInterceptor { private HystrixRequestContext context; @Override public boolean preHandle (HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { this .context = HystrixRequestContext.initializeContext(); return true ; } @Override public void postHandle (HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception { } @Override public void afterCompletion (HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { this .context.shutdown(); } }
创建配置类,用于注册拦截器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Configuration public class CommonConfiguration { @Bean @LoadBalanced public RestTemplate restTemplate () { return new RestTemplate(); } @Bean public CacheContextInterceptor userContextInterceptor () { return new CacheContextInterceptor(); } }
1 2 3 4 5 6 7 8 9 10 11 @Configuration public class WebMvcConfiguration extends WebMvcConfigurerAdapter { @Autowired CacheContextInterceptor userContextInterceptor; @Override public void addInterceptors (InterceptorRegistry registry) { registry.addInterceptor(userContextInterceptor); } }
实现一个 Future 异步返回值的方法,在这个方法上配置请求合并的注解,之后外部通过调用这个方法来实现请求的合并。注意:这个方法必须是 Future 异步返回值的,否则无法合并请求。其中 @HystrixCollapser 注解代表开启请求合并,调用该方法时,实际上运行的是 collapsingList 方法,且利用 HystrixProperty 指定 timerDelayInMilliseconds,这属性代表合并多少毫秒(ms)内的请求,如果不配置的话,默认是 10ms。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Service public class CollapsingService implements ICollapsingService { private static final Logger logger = LoggerFactory.getLogger(CollapsingService.class); @HystrixCollapser(batchMethod = "collapsingList", collapserProperties = { @HystrixProperty(name = "timerDelayInMilliseconds", value = "1000") }) public Future<User> collapsing (Integer id) { return null ; } @HystrixCommand public List<User> collapsingList (List<Integer> userParam) { logger.info("collapsingList当前线程: " + Thread.currentThread().getName()); logger.info("当前请求参数个数:" + userParam.size()); List<User> userList = new ArrayList<User>(); for (Integer userNumber : userParam) { User user = new User(); user.setUserName("User - " + userNumber); user.setAge(userNumber); userList.add(user); } return userList; } }
创建接口测试类,在 getUser 接口内连续调用两次 collapsing 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @RestController @RequestMapping("/user") public class CollapsingController { private static final Logger logger = LoggerFactory.getLogger(CollapsingController.class); @Autowired private ICollapsingService collapsingService; @RequestMapping("/getUser") public String getUser () throws Exception { Future<User> user = collapsingService.collapsing(1 ); Future<User> user2 = collapsingService.collapsing(2 ); logger.info(user.get().getUserName()); logger.info(user2.get().getUserName()); return "Success" ; } }
启动主类:
1 2 3 4 5 6 7 8 9 @EnableHystrix @EnableDiscoveryClient @SpringBootApplication public class CollapsingApplication { public static void main (String[] args) { SpringApplication.run(CollapsingApplication.class, args); }
启动应用后访问 http://127.0.0.1:8082/user/getUser,可以看到实际调用了 collapsingList 方法,并打印了当前线程的名称、请求的参数和运行结果,一共合并了两个请求,达到了预期效果:
1 2 3 4 CollapsingService : collapsingList当前线程: hystrix-CollapsingService-1 CollapsingService : 当前请求参数个数:2 CollapsingController : User - 1 CollapsingController : User - 2
使用注解进行请求合并(全局) 上面讲了多个请求是如何合并的,但是都是在同一请求(单一线程)中发起的调用,如果两次请求接口都是在不同线程运行的,那么如何合并整个应用中的请求呢?即如何对所有线程请求中的多次服务调用进行合并呢? @HystrixCollapser 注解的 scope 属性有个两个值,分别是:Request(默认值)、Global。下面的代码中,增加了一个 scope 属性为 Global 的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Service public class CollapsingService implements ICollapsingService { private static final Logger logger = LoggerFactory.getLogger(CollapsingService.class); @HystrixCollapser(batchMethod = "collapsingListGlobal", scope = Scope.GLOBAL, collapserProperties = { @HystrixProperty(name = "timerDelayInMilliseconds", value = "10000") }) public Future<User> collapsingGlobal (Integer id) { return null ; } @HystrixCommand public List<User> collapsingListGlobal (List<Integer> userParam) { logger.info("collapsingListGlobal当前线程: " + Thread.currentThread().getName()); logger.info("当前请求参数个数:" + userParam.size()); List<User> userList = new ArrayList<User>(); for (Integer userNumber : userParam) { User user = new User(); user.setUserName("User- " + userNumber); user.setAge(userNumber); userList.add(user); } return userList; } }
增加一个调用接口来调用上述方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @RestController @RequestMapping("/user") public class CollapsingController { private static final Logger logger = LoggerFactory.getLogger(CollapsingController.class); @Autowired private ICollapsingService collapsingService; @RequestMapping("/getUserGolbal") public String getUserGolbal () throws Exception { Future<User> user = collapsingService.collapsingGlobal(1 ); Future<User> user2 = collapsingService.collapsingGlobal(2 ); logger.info(user.get().getUserName()); logger.info(user2.get().getUserName()); return "Success" ; } }
连续访问 http://127.0.0.1:8082/user/getUserGolbal 两次,会发现所有请求都合并在一个线程中;若改为 Request 作用域,Hystrix 则会运行两个线程来分别处理两次请求。
1 2 3 4 5 6 CollapsingService : collapsingListGlobal当前线程: hystrix-CollapsingService-10 CollapsingService : 当前请求参数个数:4 CollapsingController : User- 1 CollapsingController : User- 1 CollapsingController : User- 2 CollapsingController : User- 2
请求合并总结
Hystrix Request Collapser 主要用于请求合并的场景,在一个简单的系统中,这种场景可能很少碰到,所以对于请求合并,一般的使用场景是:当在某个时间段内有大量或并发的相同请求时,则适用使用请求合并;而如果在某个时间段内只有很少的请求,且延迟也不高,此时使用请求合并反而会增加复杂度和延迟,因为对于 Collapser 本身,Hystrix 也是需要时间进行批处理的。
Hystrix 线程传递 Hystrix 线程传递介绍 Hystrix 会对请求进行封装,然后管理请求的调用,从而实现断路器等多种功能。Hystrix 提供了两种隔离策略来处理请求,一种是线程池隔离(默认),一种是信号量隔离。如果使用信号量隔离,Hystrix 会在处理请求之前获取到一个信号量,如果成功拿到,则继续处理请求,请求在同一个业务调用线程中执行完毕。如果使用线程池隔离,Hystrix 会将请求提交到线程池中执行,由于执行线程和业务调用线程不同,就可能会产生线程切换,导致业务调用线程中的上下文数据(例如 ThreadLocal 中存放的信息)无法在执行线程中正常获取,这就是所谓的父子线程共享传递(线程上下文共享传递)问题。下面通过一个例子来说明,点击下载 完整的案例代码。
Hystrix 线程传递问题重现 建立一个 ThreadLocal 来保存用户的信息,通常在微服务里,会把当前请求的上下文数据放入本地线程变量,便于后续使用和销毁:
1 2 3 4 public class HystrixThreadLocal { public static ThreadLocal<String> threadLocal = new ThreadLocal<>(); }
定义测试接口,打印当前线程的 ID,并利用 ThreadLocal 存放用户信息;为了兼容其他情况,例如在使用 Feign 调用的时候,通常会使用 RequestContextHolder 拿到上下文属性,在此也进行测试一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @RestController @RequestMapping("/user") public class UserController { private static final Logger logger = LoggerFactory.getLogger(UserController.class); @Autowired private UserService userService; @GetMapping("/get/{id}") public String get (@PathVariable("id") Integer id) { HystrixThreadLocal.threadLocal.set("userId: " + id); RequestContextHolder.currentRequestAttributes().setAttribute("userId" , "userId: " + id, RequestAttributes.SCOPE_REQUEST); logger.info("current thread: " + Thread.currentThread().getId()); logger.info("thread local: " + HystrixThreadLocal.threadLocal.get()); logger.info("RequestContextHolder: " + RequestContextHolder.currentRequestAttributes().getAttribute("userId" , RequestAttributes.SCOPE_REQUEST)); return userService.get(id); } }
定义服务类,测试在没有使用线程池隔离模式的情况下,获取用户信息:
1 2 3 4 5 6 7 8 9 10 11 12 @Service public class UserService { private static final Logger logger = LoggerFactory.getLogger(UserService.class); public String get (Integer id) { logger.info("current thread: " + Thread.currentThread().getId()); logger.info("thread local: " + HystrixThreadLocal.threadLocal.get()); logger.info("RequestContextHolder: " + RequestContextHolder.currentRequestAttributes().getAttribute("userId" , RequestAttributes.SCOPE_REQUEST).toString()); return "Success" ; } }
启动应用后访问 http://127.0.0.1:8082/user/get/2 后,可以看到打印的线程 ID 都是一样的,线程变量也是传入 2,请求上下文的持有对象也可以顺利拿到:
1 2 3 4 5 6 7 UserController : current thread: 59 UserController : thread local: userId: 2 UserController : RequestContextHolder: userId: 2 UserService : current thread: 59 UserService : thread local: userId: 2 UserService : RequestContextHolder: userId: 2
服务类添加 @HystrixCommand 注解,测试在使用线程池隔离模式的情况下,获取用户信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 @Service public class UserService { private static final Logger logger = LoggerFactory.getLogger(UserService.class); @HystrixCommand public String get (Integer id) { logger.info("current thread: " + Thread.currentThread().getId()); logger.info("thread local: " + HystrixThreadLocal.threadLocal.get()); logger.info("RequestContextHolder: " + RequestContextHolder.currentRequestAttributes().getAttribute("userId" , RequestAttributes.SCOPE_REQUEST).toString()); return "Success" ; } }
启动应用后访问 http://127.0.0.1:8082/user/get/2 后,会发现进入的线程 ID 是 57,当达到后台服务的时候,线程 ID 变成 82,说明线程池的隔离已经生效,是重新启动的线程处理请求的,然后线程的变量也丢失了,RequestContextHolder 中也抛出了异常,意思是没有绑定线程变量,至此成功地重现了父子线程数据传递的问题。
1 2 3 4 5 6 7 UserController : current thread: 57 UserController : thread local: userId: 2 UserController : RequestContextHolder: userId: 2 UserService : current thread: 82 UserService : thread local: null java.lang.IllegalStateException: No thread-bound request found:
Hystrix 线程传递问题解决 解决 Hystrix 的父子线程共享传递(线程上下文共享传递)问题有两种方法:
第一种方法:
修改 Hystrix 的隔离策略,使用信号量隔离,直接修改配置文件即可 但 Hystrix 默认是线程池隔离,加上从真实的项目情况看,大部分都是使用线程池隔离,因此此方案不太推荐,对应配置属性为:hystrix.command.default.execution.isolation.strategy。 第二种方法:
Hystrix 官方推荐的一种方式,就是继承 HystrixConcurrencyStrategy 类并覆盖 wrapCallable() 方法,下面将介绍此方法的使用例子。 创建 HystrixThreadCallable 类,该类的构造函数是希望传递 RequestContextHolder 和自定义的 HystrixThreadLocal 对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class HystrixThreadCallable <S > implements Callable <S > { private final RequestAttributes requestAttributes; private final Callable<S> delegate; private String params; public HystrixThreadCallable (Callable<S> callable, RequestAttributes requestAttributes, String params) { this .delegate = callable; this .requestAttributes = requestAttributes; this .params = params; } @Override public S call () throws Exception { try { RequestContextHolder.setRequestAttributes(requestAttributes); HystrixThreadLocal.threadLocal.set(params); return delegate.call(); } finally { RequestContextHolder.resetRequestAttributes(); HystrixThreadLocal.threadLocal.remove(); } } }
重写 HystrixConcurrencyStrategy 类的 wrapCallable 方法,在执行请求前包装 HystrixThreadCallable 对象,将需要的对象信息设置进去,这样在下一个线程中就可以拿到了:
1 2 3 4 5 6 7 public class SpringCloudHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy { @Override public <T> Callable<T> wrapCallable (Callable<T> callable) { return new HystrixThreadCallable<>(callable, RequestContextHolder.getRequestAttributes(), HystrixThreadLocal.threadLocal.get()); } }
配置类:
1 2 3 4 5 6 7 8 @Configuration public class HystrixThreadContextConfiguration { @Bean public SpringCloudHystrixConcurrencyStrategy springCloudHystrixConcurrencyStrategy () { return new SpringCloudHystrixConcurrencyStrategy(); } }
启动应用后访问 http://127.0.0.1:8082/user/get/2 后,可以发现即使使用了 Hystrix 的线程池隔离模式,不同的线程也能顺利拿到上一个线程传递过来的信息:
1 2 3 4 5 6 7 UserController : current thread: 59 UserController : thread local: userId: 2 UserController : RequestContextHolder: userId: 2 UserService : current thread: 84 UserService : thread local: userId: 2 UserService : RequestContextHolder: userId: 2
多个并发策略共存 由于 HystrixPlugins 类的 registerConcurrencyStrategy() 方法只能被调用一次,即 Hystrix 不允许注册多个 Hystrix 并发策略,不然就会报错,这就导致了无法和其他并发策略一起使用,因此需要将其他并发策略注入进去,达到并存的目的,如 Sleuth 的并发策略也是做了同样的事情。具体的做法就是在构造此并发策略时,找到之前已经存在的并发策略,并保留在类的属性中,在调用过程中,返回之前并发策略的相关信息,如请求变量、连接池、阻塞队列;等请求进来时,既不影响之前的并发策略,也可以包装需要的请求信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 public class SpringCloudHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy { private HystrixConcurrencyStrategy delegateHystrixConcurrencyStrategy; @Override public <T> Callable<T> wrapCallable (Callable<T> callable) { return new HystrixThreadCallable<>(callable, RequestContextHolder.getRequestAttributes(), HystrixThreadLocal.threadLocal.get()); } public SpringCloudHystrixConcurrencyStrategy () { init(); } private void init () { try { this .delegateHystrixConcurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); if (this .delegateHystrixConcurrencyStrategy instanceof SpringCloudHystrixConcurrencyStrategy) { return ; } HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook(); HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier(); HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher(); HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy(); HystrixPlugins.reset(); HystrixPlugins.getInstance().registerConcurrencyStrategy(this ); HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook); HystrixPlugins.getInstance().registerEventNotifier(eventNotifier); HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher); HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy); } catch (Exception e) { throw e; } } @Override public ThreadPoolExecutor getThreadPool (HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { return this .delegateHystrixConcurrencyStrategy.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override public ThreadPoolExecutor getThreadPool (HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) { return this .delegateHystrixConcurrencyStrategy.getThreadPool(threadPoolKey, threadPoolProperties); } @Override public BlockingQueue<Runnable> getBlockingQueue (int maxQueueSize) { return this .delegateHystrixConcurrencyStrategy.getBlockingQueue(maxQueueSize); } @Override public <T> HystrixRequestVariable<T> getRequestVariable (HystrixRequestVariableLifecycle<T> rv) { return this .delegateHystrixConcurrencyStrategy.getRequestVariable(rv); } }
Hystrix 工作原理
Hystrix 的执行本质是保护下游依赖服务,通过线程池隔离、断路器、超时机制、降级、请求缓存、度量统计等手段,确保系统的稳定性。Hystrix 的工作原理(八大步骤)如下:
参考资料