Hystrix 入门教程 - 基础篇

服务雪崩效应

服务雪崩概述

微服务之间进行 RPC 或者 HTTP 调用时,一般都会设置 调用超时失败重试等机制来确保服务的成功执行,这看上去很美好,但如果不考虑服务的熔断和限流,它就是造成服务雪崩的元凶。假设有两个访问量比较大的服务 A 和 B,这两个服务分别依赖 C 和 D,其中 C 和 D 服务都依赖 E 服务(如下图),这就是所谓的扇出。A 和 B 不断地调用 C 和 D,处理客户请求和返回需要的数据;当 E 服务不能提供服务的时候,C 和 D 的 超时重试机制会被执行;由于新的请求不断的产生,会导致 C 和 D 对 E 服务的调用大量的积压,产生大量的调用等待和重试调用,会慢慢耗尽 C 和 D 的系统资源(CPU 或者内存等),然后 C 和 D 服务跟着也 down 掉。A 和 B 服务会重复 C 和 D 的遭遇,导致系统资源耗尽,然后服务也 down 掉了,最终整个服务都不可访问,造成了服务雪崩。

hystrix-snow

服务雪崩原因分析

  • 访问量的突然激增
  • 硬件故障,如机器宕机,机房断电,光纤被挖断等
  • 数据库存在严重瓶颈,如:长事务、SQL 查询超时等
  • 缓存击穿,导致请求全部落到某个服务,导致服务宕掉
  • 程序有 Bug,导致服务不可用或者运行缓慢,如内存泄漏、线程同步等待等

服务雪崩解决方案

  • 隔离:将不同类型的接口隔离部署,单个类型接口的失败甚至进程池被耗尽了,也不会影响其他接口的正常访问
  • 限流:当发现服务失败数量达到某个阈值,拒绝访问,以此限制更多流量进来,防止过多失败的请求将资源耗尽
  • 熔断:从接口请求连接时就拒绝访问,类似家里用的保险丝,当使用的电器总和超过了电压就熔断保险丝,保护整个区域的电路防止更多的损失
  • 降级:对于简单的展示功能,如果有失败的请求,返回默认值;对于整个站点或客户端,如果服务器负载过高,则将其他非核心业务停掉,以让出更多资源给其他服务使用

熔断与降级的区别

熔断与降级的相同点:

  • 最终表现类似,对于两者来说,最终让用户体验到的是某些功能暂时不可达或不可用
  • 目的很一致,都是从可用性可靠性着想,为防止系统的整体缓慢甚至崩溃而采用的技术手段
  • 粒度一般都是服务级别,当然,业界也有不少更细粒度的做法,比如做到数据持久层(允许查询,不允许增删改)
  • 自治性要求很高,熔断模式一般都是服务基于策略的自动触发,降级虽说可人工干预,但在微服务架构下,完全靠人显然不可能,开关预置、配置中心都是必要手段

熔断与降级的不同点:

  • 实现方式不太一样,降级具有代码侵入性 (由控制器完成或者自动降级),熔断一般称为自我熔断
  • 触发原因不太一样,服务熔断一般是某个服务(下游服务)故障引起,而服务降级一般是从整体负荷考虑
  • 管理目标的层次不太一样,熔断其实是一个框架级的处理,每个微服务都需要(无层级之分),而降级一般需要对业务有层级之分(比如降级一般是从最外围服务开始)

资源隔离的级别

  • 应用级别隔离:线程池隔离、信号量隔离、连接池隔离;Hystrix 实现了前两种,其各自优缺点如下图:

hystrix-isolation-level

  • 硬件级别隔离:虚拟机、Docker,比如 Docker 的资源隔离和资源限制,其通过 CGroup 来控制容器使用的资源配额,包括 CPU、内存、磁盘 IO、网络

Hystrix 介绍

Hystrix 是什么

Hystrix 是由 Netflix 开源的一个针对分布式系统容错处理的开源组件,2011 - 2012 年相继诞生和成熟,在 2018 年 11 月 20 日之后已经停止维护,最后一个正式版本为 1.5.18。Hystrix 单词意为 “豪猪”,浑身有刺保护自己,Hystrix 就是这样一个用来捍卫应用程序健康的利器。进一步说,Hystrix 是一个延迟和容错库,用在隔离远程系统、服务和第三方库,阻止级连故障,在复杂的分布式系统中实现恢复能力,以提高分布式系统的弹性。Hystrix 底层大量使用了 RxJava,而 Spring Cloud Hystrix 对 Hystrix 进行了二次封装,将其整合进 Spring Cloud 生态,更多介绍可参考:Hystrix 项目Hystrix 官方英文教程Spring Cloud Hystrix 官方中文文档

Hystrix 的设计目标

  • 通过客户端库对延迟和故障进行保护和控制
  • 在一个复杂的分布式系统中停止级联故障
  • 快速失败和迅速恢复
  • 在合理的情况下回退和优雅地降级
  • 开启近实时监控、告警和操作控制

Hystrix 的特性

服务熔断

熔断机制是应对服务雪崩效应的一种微服务链路保护机制。日常在各种场景下都会接触到熔断这两个字,高压电路中,如果某个地方的电压过高,熔断器就会熔断,对电路进行保护。股票交易中,如果股票指数过高,也会采用熔断机制,暂停股票的交易。同样,在微服务架构中,熔断机制也是起着类似的作用。当扇出链路的某个微服务不可用或者响应时间太长时,会进行服务的降级,进而熔断该节点微服务的调用,快速返回” 错误” 的响应信息。当检测到该节点微服务调用响应正常后恢复调用链路。在 Spring Cloud 框架里熔断机制通过 Hystrix 实现,Hystrix 会监控微服务间调用的状况,当失败的调用到一定阈值,缺省是 5 秒内 20 次调用失败就会启动熔断机制。服务熔断是在服务端(服务提供者)实现的,Hybstrix 熔断机制的注解是 @HystrixCommand

服务降级

服务压力剧增的时候,根据当前的业务情况及流量对一些服务和页面有策略的降级,缓解服务器的压力,以保证核心任务的进行,同时保证部分甚至大部分请求能得到正确的响应。也就是当前的请求处理不了或者出错了,给一个默认的返回结果。服务降级处理是在客户端(服务消费者)实现的,与服务端(服务提供者)没有关系。

准实时的调用监控

Hystrix 除了隔离依赖服务的调用以外,还提供了准实时的调用监控(Hystrix Dashboard)。Hystrix 会持续地记录所有通过 Hystrix 发起的请求的执行信息,并以统计报表和图形的形式展示给用户,包括每秒执行多少请求、多少成功、多少失败等。Netflix 通过 hystrix-metrics-event-stream 项目实现了对以上指标的监控,而 Spring Cloud 也提供了 Hystrix Dashboard 的整合,对监控内容转化成可视化界面。

Hystrix 入门案例

1. 版本说明

在下面的的教程中,使用的 Spring Cloud 版本是 Finchley.RELEASE,对应的 Spring Boot 版本是 2.0.3,点击下载完整的案例代码

2. 创建 Maven 父级 Pom 工程

在父工程里面配置好工程需要的父级依赖,目的是为了更方便管理与简化配置,具体 Maven 配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.3.RELEASE</version>
</parent>

<!-- 利用传递依赖,公共部分 -->
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>

<!-- 管理依赖 -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Finchley.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<!--注意:这里需要添加以下配置,否则可能会有各种依赖问题 -->
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>

3. 创建 Eureka Server 工程

创建 Eureka Server 的 Maven 工程,配置工程里的 pom.xml 文件,需要引入 spring-cloud-starter-netflix-eureka-server

1
2
3
4
5
6
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
</dependencies>

创建 Eureka Server 的启动主类,这里添加相应注解,作为程序的入口:

1
2
3
4
5
6
7
8
@SpringBootApplication
@EnableEurekaServer
public class EurekaServerApplication {

public static void main(String[] args){
SpringApplication.run(EurekaServerApplication.class, args);
}
}

添加 Eureka Server 需要的 application.yml 配置文件到工程中

1
2
3
4
5
6
7
8
9
10
11
server:
port: 8090

eureka:
instance:
hostname: 127.0.0.1
client:
registerWithEureka: false
fetchRegistry: false
serviceUrl:
defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/

4. 创建 Provider 源服务工程

创建 Provider 的 Maven 工程,配置工程里的 pom.xml 文件,需要引入 spring-cloud-starter-netflix-hystrixspring-cloud-starter-netflix-eureka-client

1
2
3
4
5
6
7
8
9
10
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
</dependencies>

创建 Provider 的启动主类,添加注解 @EnableHystrix@EnableDiscoveryClient

1
2
3
4
5
6
7
8
9
@EnableHystrix
@SpringBootApplication
@EnableDiscoveryClient
public class ProviderApplication {

public static void main(String[] args) {
SpringApplication.run(ProviderApplication.class, args);
}
}

application.yml 文件中指定服务名称(provider)、注册中心地址与端口号:

1
2
3
4
5
6
7
8
9
10
11
12
13
server:
port: 8080

spring:
application:
name: provider

eureka:
client:
service-url:
defaultZone: http://127.0.0.1:8090/eureka
instance:
prefer-ip-address: true

创建用于测试的 Controller 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@RestController
@RequestMapping("/user")
public class UserController {

@GetMapping("/getUser")
@HystrixCommand(fallbackMethod = "defaultUser")
public String getUser(String userName) {
if (userName.equals("Jim")) {
return "this is real user";
} else {
throw new RuntimeException("user is not exist");
}
}

public String defaultUser(String userName) {
return "the user not exist in this system";
}
}

5. 测试

  1. 启动 Eureka Server 与 Provider 应用
  2. 浏览器访问 http://127.0.0.1:8080/user/getUser?userName=Jim,当用户名为 Jim 时会返回正确的信息
  3. 当用户名不为 Jim 时,则会抛出运行时异常,同时 Hystrix 会降级处理返回友好的提示

Hystrix 实战应用

Feign 中使用 Hystrix

在 Feign 中,默认是自带 Hystrix 功能的,在很老的版本中默认是打开的,从最近的几个版本开始默认被关闭了,因此需要通过配置文件打开它,点击下载完整的案例代码。

在 Provider 源服务工程里,创建用于测试的 Controller 类:

1
2
3
4
5
6
7
8
9
@RestController
@RequestMapping("/dept")
public class DeptController {

@RequestMapping("/getDept")
public String getDept(String deptName) {
throw new RuntimeException("dept is not exist");
}
}

创建 Feign Client 工程,使用 @FeignClient 定义接口,并配置降级回退类:

1
2
3
4
5
6
@FeignClient(name = "PROVIDER", fallbackFactory = DeptClientFallbackServiceFactory.class)
public interface DeptClientService {

@RequestMapping("/dept/getDept")
public String getDept(@RequestParam("deptName") String deptName);
}

在 Feign Client 工程里,创建降级回退类,实现 FallbackFactory 接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
public class DeptClientFallbackServiceFactory implements FallbackFactory<DeptClientService> {

@Override
public DeptClientService create(Throwable throwable) {
return new DeptClientService() {

@Override
public String getDept(String deptName) {
return "the dept not exist in this system, please confirm deptName";
}
};
}
}

在 Feign Client 工程里,创建启动主类:

1
2
3
4
5
6
7
8
9
@EnableFeignClients
@EnableDiscoveryClient
@SpringBootApplication
public class FeignClientApplication {

public static void main(String[] args) {
SpringApplication.run(FeignClientApplication.class, args);
}
}

在 Feign Client 工程里,创建用于测试的 Controller 类:

1
2
3
4
5
6
7
8
9
10
11
12
@RestController
@RequestMapping("/dept")
public class DeptController {

@Autowired
private DeptClientService clientService;

@GetMapping("/get")
public String get(String deptName) {
return clientService.getDept(deptName);
}
}

在 Feign Client 工程里,配置 pom.xml 文件,让 Feign 启用 Hystrix:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
server:
port: 8082

spring:
application:
name: feign-client

eureka:
client:
service-url:
defaultZone: http://127.0.0.1:8090/eureka
instance:
prefer-ip-address: true

feign:
hystrix:
enabled: true

测试 Feign 使用 Hystrix 的效果:

  1. 启动 Eureka Server、Provider 应用
  2. 当设置 feign.hystrix.enabled=false 时,启动 Feign-Client 应用,访问 http://127.0.0.1:8082/dept/get?deptName=IT,服务端返回 500 错误页面
  3. 当设置 feign.hystrix.enabled=true 时,启动 Feign-Client 应用,访问 http://127.0.0.1:8082/dept/get?deptName=IT ,服务端返回 the dept not exist in this system, please confirm deptName,这时说明 Hystrix 已经产生作用

Hystrix Dashboard

Hystrix Dashboard 仪表盘是根据系统一段时间内发生的请求情况来展示的可视化面板,这些信息是每个 HystrixCommand 执行过程中的信息,这些信息是一个指标集合和具体的系统运行情况。创建 eureka-server、provider-service、feign-client 工程,其中 provider-service 提供了一个接口返回信息。由于 Hystrix 的指标是需要端口进行支撑的,因此 provider-service 工程需要增加 actuator 依赖,并公开 hystrix.stream 端点以便能被访问到,点击下载完整的案例代码。

配置 provider-service 工程里的 pom.xml,加入以下依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
</dependencies>

配置 provider-service 工程里的 application.yml,当 Spring Cloud 的版本高于 Dalston 时,建议确认 management.endpoints.web.exposure.include 包含的有 hystrix.stream 或者直接为 *;否则访问 http://127.0.0.1:8080/actuator/hystrix.stream 时可能会返回 404 错误页面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
server:
port: 8080

spring:
application:
name: provider

eureka:
client:
service-url:
defaultZone: http://127.0.0.1:8090/eureka
instance:
prefer-ip-address: true

management:
endpoints:
web:
exposure:
include: hystrix.stream

创建 hystrix-dashboard 工程,引入 spring-cloud-starter-netflix-hystrix-dashboard

1
2
3
4
5
6
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId>
</dependency>
</dependencies>

在 hystrix-dashboard 工程里,创建启动主类,添加 @EnableHystrixDashboard 注解:

1
2
3
4
5
6
7
8
@SpringBootApplication
@EnableHystrixDashboard
public class DashboardApplication {

public static void main(String[] args) {
SpringApplication.run(DashboardApplication.class, args);
}
}

在 hystrix-dashboard 工程里,添加 application.yml 文件

1
2
server:
port: 8000

测试 Hystrix Dashboard 的运行效果:

  1. 分别启动 eureka-server、provider-service、feign-client、hystrix-dashboard 应用
  2. 访问 Hystrix Dashboard 的首页:http://127.0.0.1:8091/hystrix查看首页截图
  3. 查看 provider-server 应用的监控信息: http://127.0.0.1:8080/actuator/hystrix.stream,目前 Spring Cloud Finchley 版的 SpringBoot 版本是 2.0,所以访问路径需要加上 /actuator,否则会访问不到监控页面,查看监控信息截图
  4. 在 Hystrix Dashboard 的首页中,填写 provider-server 应用的监控地址 http://127.0.0.1:8080/actuator/hystrix.stream,点击 Monitor Stream 按钮,跳转到监控图表页面,查看图表页面截图
  5. 调用 feigh-client 的接口:http://127.0.0.1:8082/dept/get?deptName=IT,更换不同的 deptName 参数值,观察 Hystrix Dashboard 监控页面上的图表变化

Hystrix Dashboard 各项指标参数的含义:

hystrix-dashboard-means

Turbine 聚合 Hystrix

上面讲的是单个实例的 Hystrix Dashboard,但在整个系统和集群的情况下不是特别有用,所以需要一种方式来聚合整个集群下的监控状况,Turbine 就是用来聚合所有相关的 hystrix.stream 流的方案,然后在 Hystrix Dashboard 中显示,具体原理如下图:

hystrix-turbine

创建 eureka-server、provider-service-user、provider-service-dept、hystrix-dashboard 工程后,再创建 hystrix-turbine 工程,用来聚合集群里的 hystrix.stream 流。为了学习方便,也可以将 hystrix-turbine 工程整合到 hystrix-dashboard 工程里,点击下载完整的案例代码。

配置 hystrix-turbine 工程里的 pom.xml 文件,由于 Turbine 依赖 Eureka 的服务注册发现,因此需要另外引入 spring-cloud-starter-netflix-eureka-client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-turbine</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
</dependencies>

配置 hystrix-turbine 工程的 application.yml 文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
server:
port: 8093

spring:
application:
name: turbine-service

eureka:
client:
service-url:
defaultZone: http://127.0.0.1:8090/eureka
instance:
prefer-ip-address: true

turbine:
aggregator:
clusterConfig: default #指定聚合哪些集群,多个使用","分割,默认为default。可使用http://.../turbine.stream?cluster={clusterConfig之一}访问
appConfig: provider-dept,provider-user #配置Eureka中的serviceId列表,表明监控哪些服务
clusterNameExpression: "'default'"
# 1.当clusterNameExpression: default时,turbine.aggregator.clusterConfig可以不写,因为默认就是default
# 2.当clusterNameExpression指定集群名称,默认表达式appName;此时:turbine.aggregator.clusterConfig需要配置想要监控的应用名称
# 3.当clusterNameExpression: metadata['cluster']时,假设想要监控的应用配置了eureka.instance.metadata-map.cluster: ABC,则需要配置,同时turbine.aggregator.clusterConfig: ABC

创建 hystrix-turbine 工程里的启动主类,添加 @EnableTurbine 注解后,会自动启用 Eureka Client:

1
2
3
4
5
6
7
8
@EnableTurbine
@SpringBootApplication
public class TurbineApplication {

public static void main(String[] args) {
SpringApplication.run(TurbineApplication.class, args);
}
}

测试 Turbine 的运行效果:

  1. 分别启动 eureka-server、provider-service-user、provider-service-dept 应用
  2. 启动 hystrix-turbine 应用,访问 http://127.0.0.1:8093/turbine.stream,观察是否能获取到集群监控信息
  3. 启动 hystrix-dashboard 应用,访问 Hystrix Dashboard 的首页 http://127.0.0.1:8094/hystrix,在页面上填写 hystrix-turbine 应用的监控地址 http://127.0.0.1:8093/turbine.stream,然后点击 Monitor Stream 按钮,跳转到监控图表页面,查看图表页面截图
  4. 分别访问 provider-service-user 应用:http://127.0.0.1:8092/user/getUser?userName=Jim、provider-service-dept 应用:http://127.0.0.1:8091/dept/getDept?deptName=IT,观察 Hystrix Dashboard 监控页面上的图表变化

Hystrix 进阶

Hystrix 配置说明

Hystrix 的配置比较多,具体可以参考:官方英文文档第三方中文文档

Hystrix 命令注解的区别

Hystrix 在使用过程中除了 HystrixCommand 还有 HystrixObservableCommand,这两个命令有很多共同点,如都支持故障和延迟容错、断路器、指标统计,两者的区别如下:

  • HystrixCommand 默认是阻塞式的,可以提供同步和异步两种方式,但 HystrixObservableCommand 是非阻塞式的,默认只能是异步的
  • HystrixCommand 执行的方法是 run,HystrixObservableCommand 执行的是 construct
  • HystrixCommand 一个实例一次只能发一条数据出去,HystrixObservableCommand 可以发送多条数据

Hystrix 异常机制和处理

5 种会被 fallback 截获的情况

Hystrix 的异常处理中,有 5 种出错的情况会被 fallback 所截获,从而触发 fallback,这些情况分别是:

hystrix-handle-fallback

有一种异常是不会触发 fallback 的,且不会被计数进入熔断,它是 BAD_REQUEST,会抛出 HystrixBadRequestException,这种异常一般对应的是由非法参数或者一些非系统异常引起的,对于这种异常可以根据响应创建对应的异常进行异常封装或者直接处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* HystrixBadRequestException 不会触发 fallback
*/
@RestController
@RequestMapping("/user")
public class UserController {

@GetMapping("/getUser")
@HystrixCommand(fallbackMethod = "defaultUser")
public String getUser(String userName) {
throw new HystrixBadRequestException("HystrixBadRequestException Error");
}

public String defaultUser(String userName) {
return "the user not exist in this system";
}
}

获取 fallback 里的异常信息

若想在 @HystrixCommand 里获取异常信息,只需要在方法内指定 Throwable 参数;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@RestController
@RequestMapping("/user")
public class UserController {

@GetMapping("/getUser")
@HystrixCommand(fallbackMethod = "defaultUser")
public String getUser(String userName) {
throw new RuntimeException("the user not exist");
}

public String defaultUser(String userName, Throwable throwable) {
System.out.println(throwable.getMessage());
return "the user not exist in this system";
}
}

或者继承 @HystrixCommand 的命令,通过方法来获取异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@RestController
public class ExceptionController {

@GetMapping("/getPSFallbackOtherExpcetion")
public String pSFallbackOtherExpcetion(){
String result = new PSFallbackOtherExpcetion().execute();
return result;
}
}

/**
* 继承HystrixCommand
*/
public class PSFallbackOtherExpcetion extends HystrixCommand<String>{

public PSFallbackOtherExpcetion() {
super(HystrixCommandGroupKey.Factory.asKey("GroupOE"));
}

@Override
protected String run() throws Exception {
throw new Exception("this command will trigger fallback");
}

@Override
protected String getFallback() {
System.out.println(getFailedExecutionException().getMessage());
return "invoke PSFallbackOtherExpcetion fallback method";
}
}

在 Feign Client 中可以用 ErrorDecoder 实现对这类异常的包装,在实际的使用中,很多时候调用接口会抛出这些 400-500 之间的错误,此时可以通过它进行封装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
public class FeignErrorDecoder implements feign.codec.ErrorDecoder {

@Override
public Exception decode(String methodKey, Response response) {
try {
if (response.status() >= 400 && response.status() <= 499) {
String error = Util.toString(response.body().asReader());
return new HystrixBadRequestException(error);
}
} catch (IOException e) {
System.out.println(e);
}
return feign.FeignException.errorStatus(methodKey, response);
}
}

fallback 会抛出异常的情况

fallback-throw-exception

Hystrix 请求缓存

Hystrix 请求缓存是指 Hystrix 在同一个上下文请求中缓存请求结果,它与传统理解的缓存有一定区别;Hystrix 的请求缓存是在同一个请求中进行,在进行第一次调用结束后对结果缓存,然后接下来同参数的请求将会使用第一次缓存的结果,缓存的生命周期只在这一次请求中有效。使用 HystrixCommand 有两种方式,第一次种是继承,第二种是直接注解,缓存也同时支持这两种使用方式。具体使用例子如下,点击下载完整的案例代码。

使用类来开启缓存

Hystrix 的缓存是在一次请求内有效,这要求请求要在一个 Hystrix 上下文里,不然在使用缓存的时候 Hystrix 会报一个没有初始化上下文的异常;可以使用 filter 过滤器或者 Interceptor 拦截器进行初始化,下面将使用一个拦截器来举例。使用类的方式很简单,只需要继承 HystrixCommand,然后重写它的 getCacheKey 方法即可,保证对于同一个请求返回同样的键值;对于缓存的清除,则可以调用 HystrixRequestCache 类的 clean 方法即可。

拦截器类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class CacheContextInterceptor implements HandlerInterceptor {

private HystrixRequestContext context;

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
this.context = HystrixRequestContext.initializeContext();
return true;
}

@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {

}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
this.context.shutdown();
}
}

创建配置类,用于注册拦截器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
public class CommonConfiguration {

@Bean
@LoadBalanced
public RestTemplate restTemplate() {
return new RestTemplate();
}

@Bean
public CacheContextInterceptor userContextInterceptor() {
return new CacheContextInterceptor();
}
}
1
2
3
4
5
6
7
8
9
10
11
@Configuration
public class WebMvcConfiguration extends WebMvcConfigurerAdapter {

@Autowired
CacheContextInterceptor userContextInterceptor;

@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(userContextInterceptor);
}
}

继承 HystrixCommand 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class UserCommand extends HystrixCommand<String> {

private String userName;
private RestTemplate restTemplate;
private static final Logger logger = LoggerFactory.getLogger(UserCommand.class);
private static final HystrixCommandKey KEY = HystrixCommandKey.Factory.asKey("CommandKey");

public UserCommand(String userName, RestTemplate restTemplate) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("CacheGroup")).andCommandKey(KEY));
this.userName = userName;
this.restTemplate = restTemplate;
}

@Override
protected String run() throws Exception {
String result = restTemplate.getForObject("http://PROVIDER/user/getUser?userName={1}", String.class, this.userName);
logger.info(result);
return result;
}

@Override
protected String getFallback() {
return super.getFallback();
}

@Override
protected String getCacheKey() {
return this.userName;
}

public static void cleanCache(String userName) {
HystrixRequestCache.getInstance(KEY, HystrixConcurrencyStrategyDefault.getInstance()).clear(userName);
}
}

用于测试的 Controller 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@RestController
@RequestMapping("/user")
public class UserController {

private static final Logger logger = LoggerFactory.getLogger(UserController.class);

@Autowired
private RestTemplate restTemplate;

@GetMapping("/get")
public String get(String userName) {
UserCommand commandOne = new UserCommand(userName, restTemplate);
commandOne.execute();
logger.info("from cache: " + commandOne.isResponseFromCache());

UserCommand commandTwo = new UserCommand(userName, restTemplate);
commandTwo.execute();
logger.info("from cache: " + commandTwo.isResponseFromCache());
return "cache test finished";
}
}

启动主类:

1
2
3
4
5
6
7
8
9
@EnableHystrix
@EnableDiscoveryClient
@SpringBootApplication
public class CacheApplication {

public static void main(String[] args) {
SpringApplication.run(CacheApplication.class, args);
}
}

访问 http://127.0.0.1:8082/user/get?userName=Tom,调用了两次 execute 方法,使用 Hystrix 的默认方法 isResponseFromCache 来判断请求结果是否来自于缓存,从以下输出可以看出第二次请求确实来自于缓存,此时说明 Hystrix 的缓存生效了。

1
2
c.s.study.controller.CacheController     : from cache: false
c.s.study.controller.CacheController : from cache: true

使用注解开启缓存

Hystrix 提供了注解来使用缓存机制,且更为方便和快捷,使用 @CacheResult@CacheRemove 即可缓存数据和清除缓存。

使用注解缓存数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Service
public class DeptService {

private static final Logger logger = LoggerFactory.getLogger(DeptService.class);

@Autowired
private RestTemplate restTemplate;

@CacheResult
@HystrixCommand
public String getDept(String deptName) {
String result = restTemplate.getForObject("http://PROVIDER/dept/getDept?deptName={1}", String.class, deptName);
logger.info(result);
return result;
}
}

用于测试的 Controller 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RestController
@RequestMapping("/dept")
public class DeptController {

@Autowired
private DeptService deptService;

@GetMapping("/get")
public String get(String deptName) {
deptService.getDept("IT");
deptService.getDept("IT");
return "annotation cache test finished";
}
}

访问 http://127.0.0.1:8082/dept/get?deptName=IT,调用了两次 get 方法,发现只打印了一条数据,说明第二次的请求是从缓存中读取,即 Hystrix 的缓存生效了。

使用注解清除缓存

使用 commandKey 参数来指定 HystrixCommand 的 key,在清除缓存时,可以直接附加这个值来清除指定的参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Service
public class DeptService {

private static final Logger logger = LoggerFactory.getLogger(DeptService.class);

@Autowired
private RestTemplate restTemplate;

@CacheResult
@HystrixCommand(commandKey = "findDept")
public String findDept(@CacheKey String deptName) {
String result = restTemplate.getForObject("http://PROVIDER/dept/getDept?deptName={1}", String.class, deptName);
logger.info(result);
return result;
}

@CacheRemove(commandKey = "findDept")
@HystrixCommand
public String updateDept(@CacheKey String deptName) {
logger.info("delete dept cache");
return "update dept success";
}
}

用于测试的 Controller 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@RestController
@RequestMapping("/dept")
public class DeptController {

@Autowired
private DeptService deptService;

@GetMapping("/find")
public String find(String deptName) {
// 调用接口并缓存数据
deptService.findDept("IT");
deptService.findDept("IT");
// 清除缓存
deptService.updateDept(deptName);
// 再调用接口
deptService.findDept("IT");
deptService.findDept("IT");
return "annotation cache test finished";
}
}

访问 http://127.0.0.1:8082/dept/find?deptName=IT,运行结果如下;在没有缓存的情况下,打印了一次,第二次取的是缓存数据,然后清除缓存后又打印了一次,最后一次又从缓存里取数据:

1
2
3
DeptService  : {"id":1,"deptName":"IT"}
DeptService : delete dept cache
DeptService : {"id":1,"deptName":"IT"}

缓存使用注意事项

Hystrix 常用缓存注解:

  • @CacheResult:使用该注解后结果会被缓存,同时它需要和 @HystrixCommand 注解一起使用,注解参数为 cacheKeyMethod
  • @CacheRemove:清除缓存,需要指定 commandKey,注解参数为 commandKeycacheKeyMethod
  • @CacheKey:指定请求命令参数,默认使用方法里的所有参数作为 Key,注解参数为 value
  • 一般在查询接口上使用 @CacheResult,在更新、删除接口上使用 @CacheRemove 删除缓存

使用 Hystrix 缓存时有几方面需要注意:

  • 需要使用 @EnableHystrix 注解启用 Hystrix
  • 需要初始化 HystrixRequestContext,无论是使用继承类还是注解的方式来开启缓存
  • 在指定了 HystrixCommand 的 commandKey 后,在 @CacheRemove 也要指定 commandKey

Hystrix Request Collapser

Request Collapser 介绍

Request Collapser 是 Hystrix 推出的针对多个请求调用单个后端依赖做的一种优化和节约网络开销的方法。引用官方的这张图,当发起 5 个请求时,在请求没有聚合和合并的情况下,是每个请求单独开启一个线程,并开启一个网络链接进行调用,这都会加重应用程序的负担和开销,并占用 Hystrix 的线程连接池。当使用 Collapser 把请求都合并起来时,则只需要一个线程和一个连接的开销,这大大减少了并发和请求执行所需要的线程数和网络连接数,尤其在一个时间段内有非常多请求的情况下能极大地提高资源利用率。特别注意:若使用 Feign 调用的话,目前还不支持 Collapser。具体使用例子如下,点击下载完整的案例代码。

hystrix-request-collapser

使用注解进行请求合并

使用 Request Collapser 也可以通过继承类和注解的形式来实现,下面主要介绍注解的使用方式。

Request Collapser 和 Hystrix 缓存的使用类似,需要实现 Hystrix 上下文的初始化和关闭,这里使用拦截器来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class HystrixContextInterceptor implements HandlerInterceptor {

private HystrixRequestContext context;

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
this.context = HystrixRequestContext.initializeContext();
return true;
}

@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {

}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
this.context.shutdown();
}
}

创建配置类,用于注册拦截器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
public class CommonConfiguration {

@Bean
@LoadBalanced
public RestTemplate restTemplate() {
return new RestTemplate();
}

@Bean
public CacheContextInterceptor userContextInterceptor() {
return new CacheContextInterceptor();
}
}
1
2
3
4
5
6
7
8
9
10
11
@Configuration
public class WebMvcConfiguration extends WebMvcConfigurerAdapter {

@Autowired
CacheContextInterceptor userContextInterceptor;

@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(userContextInterceptor);
}
}

实现一个 Future 异步返回值的方法,在这个方法上配置请求合并的注解,之后外部通过调用这个方法来实现请求的合并。注意:这个方法必须是 Future 异步返回值的,否则无法合并请求。其中 @HystrixCollapser 注解代表开启请求合并,调用该方法时,实际上运行的是 collapsingList 方法,且利用 HystrixProperty 指定 timerDelayInMilliseconds,这属性代表合并多少毫秒(ms)内的请求,如果不配置的话,默认是 10ms。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Service
public class CollapsingService implements ICollapsingService {

private static final Logger logger = LoggerFactory.getLogger(CollapsingService.class);

@HystrixCollapser(batchMethod = "collapsingList", collapserProperties = {
@HystrixProperty(name = "timerDelayInMilliseconds", value = "1000")
})
public Future<User> collapsing(Integer id) {
return null;
}

@HystrixCommand
public List<User> collapsingList(List<Integer> userParam) {
logger.info("collapsingList当前线程: " + Thread.currentThread().getName());
logger.info("当前请求参数个数:" + userParam.size());
List<User> userList = new ArrayList<User>();
for (Integer userNumber : userParam) {
User user = new User();
user.setUserName("User - " + userNumber);
user.setAge(userNumber);
userList.add(user);
}
return userList;
}
}

创建接口测试类,在 getUser 接口内连续调用两次 collapsing 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@RestController
@RequestMapping("/user")
public class CollapsingController {

private static final Logger logger = LoggerFactory.getLogger(CollapsingController.class);

@Autowired
private ICollapsingService collapsingService;

/**
* 请求聚合/合并
*
* @return
* @throws Exception
*/
@RequestMapping("/getUser")
public String getUser() throws Exception {
Future<User> user = collapsingService.collapsing(1);
Future<User> user2 = collapsingService.collapsing(2);
logger.info(user.get().getUserName());
logger.info(user2.get().getUserName());
return "Success";
}
}

启动主类:

1
2
3
4
5
6
7
8
9
@EnableHystrix
@EnableDiscoveryClient
@SpringBootApplication
public class CollapsingApplication {

public static void main(String[] args) {
SpringApplication.run(CollapsingApplication.class, args);

}

启动应用后访问 http://127.0.0.1:8082/user/getUser,可以看到实际调用了 collapsingList 方法,并打印了当前线程的名称、请求的参数和运行结果,一共合并了两个请求,达到了预期效果:

1
2
3
4
CollapsingService      : collapsingList当前线程: hystrix-CollapsingService-1
CollapsingService : 当前请求参数个数:2
CollapsingController : User - 1
CollapsingController : User - 2

使用注解进行请求合并(全局)

上面讲了多个请求是如何合并的,但是都是在同一请求(单一线程)中发起的调用,如果两次请求接口都是在不同线程运行的,那么如何合并整个应用中的请求呢?即如何对所有线程请求中的多次服务调用进行合并呢? @HystrixCollapser 注解的 scope 属性有个两个值,分别是:Request(默认值)Global。下面的代码中,增加了一个 scope 属性为 Global 的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Service
public class CollapsingService implements ICollapsingService {

private static final Logger logger = LoggerFactory.getLogger(CollapsingService.class);

@HystrixCollapser(batchMethod = "collapsingListGlobal", scope = Scope.GLOBAL, collapserProperties = {
@HystrixProperty(name = "timerDelayInMilliseconds", value = "10000")
})
public Future<User> collapsingGlobal(Integer id) {
return null;
}

@HystrixCommand
public List<User> collapsingListGlobal(List<Integer> userParam) {
logger.info("collapsingListGlobal当前线程: " + Thread.currentThread().getName());
logger.info("当前请求参数个数:" + userParam.size());
List<User> userList = new ArrayList<User>();
for (Integer userNumber : userParam) {
User user = new User();
user.setUserName("User- " + userNumber);
user.setAge(userNumber);
userList.add(user);
}
return userList;
}
}

增加一个调用接口来调用上述方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@RestController
@RequestMapping("/user")
public class CollapsingController {

private static final Logger logger = LoggerFactory.getLogger(CollapsingController.class);

@Autowired
private ICollapsingService collapsingService;

/**
* 请求聚合/合并,整个应用的
*
* @return
* @throws Exception
*/
@RequestMapping("/getUserGolbal")
public String getUserGolbal() throws Exception {
Future<User> user = collapsingService.collapsingGlobal(1);
Future<User> user2 = collapsingService.collapsingGlobal(2);
logger.info(user.get().getUserName());
logger.info(user2.get().getUserName());
return "Success";
}
}

连续访问 http://127.0.0.1:8082/user/getUserGolbal 两次,会发现所有请求都合并在一个线程中;若改为 Request 作用域,Hystrix 则会运行两个线程来分别处理两次请求。

1
2
3
4
5
6
CollapsingService      : collapsingListGlobal当前线程: hystrix-CollapsingService-10
CollapsingService : 当前请求参数个数:4
CollapsingController : User- 1
CollapsingController : User- 1
CollapsingController : User- 2
CollapsingController : User- 2

请求合并总结

Hystrix Request Collapser 主要用于请求合并的场景,在一个简单的系统中,这种场景可能很少碰到,所以对于请求合并,一般的使用场景是:当在某个时间段内有大量或并发的相同请求时,则适用使用请求合并;而如果在某个时间段内只有很少的请求,且延迟也不高,此时使用请求合并反而会增加复杂度和延迟,因为对于 Collapser 本身,Hystrix 也是需要时间进行批处理的。

Hystrix 线程传递及并发策略

Hystrix 线程传递介绍

Hystrix 会对请求进行封装,然后管理请求的调用,从而实现断路器等多种功能。Hystrix 提供了两种隔离模式来进行请求的操作,一种是信号量隔离,一种是线程池隔离。如果是信号量,Hystrix 则在请求的时候会获取到一个信号量,如果成功拿到,则继续进行请求,请求在同一个线程中执行完毕。如果是线程池隔离,Hystrix 会把请求放入线程池中执行,这时就有可能产生线程的变化,从而导致线程 1 的上下文数据在线程 2 里不能正常拿到。下面通过一个例子来说明,点击下载完整的案例代码。

Hystrix 线程传递问题重现

建立一个 ThreadLocal 来保存用户的信息,通常在微服务里,会把当前请求的上下文数据放入本地线程变量,便于后续使用和销毁:

1
2
3
4
public class HystrixThreadLocal {

public static ThreadLocal<String> threadLocal = new ThreadLocal<>();
}

定义测试接口,打印当前线程的 ID,并利用 ThreadLocal 存放用户信息;为了兼容其他情况,例如在使用 Feign 调用的时候,通常会使用 RequestContextHolder 拿到上下文属性,在此也进行测试一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@RestController
@RequestMapping("/user")
public class UserController {

private static final Logger logger = LoggerFactory.getLogger(UserController.class);

@Autowired
private UserService userService;

@GetMapping("/get/{id}")
public String get(@PathVariable("id") Integer id) {
HystrixThreadLocal.threadLocal.set("userId: " + id);
RequestContextHolder.currentRequestAttributes().setAttribute("userId", "userId: " + id, RequestAttributes.SCOPE_REQUEST);
logger.info("current thread: " + Thread.currentThread().getId());
logger.info("thread local: " + HystrixThreadLocal.threadLocal.get());
logger.info("RequestContextHolder: " + RequestContextHolder.currentRequestAttributes().getAttribute("userId", RequestAttributes.SCOPE_REQUEST));
return userService.get(id);
}
}

定义服务类,测试在没有使用线程池隔离模式的情况下,获取用户信息:

1
2
3
4
5
6
7
8
9
10
11
12
@Service
public class UserService {

private static final Logger logger = LoggerFactory.getLogger(UserService.class);

public String get(Integer id) {
logger.info("current thread: " + Thread.currentThread().getId());
logger.info("thread local: " + HystrixThreadLocal.threadLocal.get());
logger.info("RequestContextHolder: " + RequestContextHolder.currentRequestAttributes().getAttribute("userId", RequestAttributes.SCOPE_REQUEST).toString());
return "Success";
}
}

启动应用后访问 http://127.0.0.1:8082/user/get/2 后,可以看到打印的线程 ID 都是一样的,线程变量也是传入 2,请求上下文的持有对象也可以顺利拿到:

1
2
3
4
5
6
7
UserController      : current thread: 59
UserController : thread local: userId: 2
UserController : RequestContextHolder: userId: 2

UserService : current thread: 59
UserService : thread local: userId: 2
UserService : RequestContextHolder: userId: 2

服务类添加 @HystrixCommand 注解,测试在使用线程池隔离模式的情况下,获取用户信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Service
public class UserService {

private static final Logger logger = LoggerFactory.getLogger(UserService.class);

@HystrixCommand
public String get(Integer id) {
logger.info("current thread: " + Thread.currentThread().getId());
logger.info("thread local: " + HystrixThreadLocal.threadLocal.get());
logger.info("RequestContextHolder: " + RequestContextHolder.currentRequestAttributes().getAttribute("userId", RequestAttributes.SCOPE_REQUEST).toString());
return "Success";
}
}

启动应用后访问 http://127.0.0.1:8082/user/get/2 后,会发现进入的线程池 ID 是 57,当达到后台服务的时候,线程 ID 变成 82,说明线程池的隔离已经生效,是重新启动的线程处理请求的,然后线程的变量也丢失了,RequestContextHolder 中也抛出了异常,意思是没有绑定线程变量,至此成功地重现了父子线程数据传递的问题。

1
2
3
4
5
6
7
UserController      : current thread: 57
UserController : thread local: userId: 2
UserController : RequestContextHolder: userId: 2

UserService : current thread: 82
UserService : thread local: null
java.lang.IllegalStateException: No thread-bound request found:

Hystrix 线程传递问题解决方案

解决 Hystrix 的线程传递问题有两种方法:

  • 第一种:修改 Hystrix 的隔离策略,使用信号量隔离,直接修改配置文件即可,但 Hystrix 默认是线程池隔离,加上从真实的项目情况看,大部分都是使用线程池隔离,因此此方案不太推荐,对应属性为:hystrix.command.default.execution.isolation.strategy
  • 第二种:Hystrix 官方推荐的一种方式,就是使用继承 HystrixConcurrencyStrategy 类覆盖 wrapCallable 方法,下面将介绍此方法的使用例子

创建 HystrixThreadCallable 类,该类的构造函数是希望传递 RequestContextHolder 和自定义的 HystrixThreadLocal 对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class HystrixThreadCallable<S> implements Callable<S> {

private final RequestAttributes requestAttributes;
private final Callable<S> delegate;
private String params;

public HystrixThreadCallable(Callable<S> callable, RequestAttributes requestAttributes, String params) {
this.delegate = callable;
this.requestAttributes = requestAttributes;
this.params = params;
}

@Override
public S call() throws Exception {
try {
RequestContextHolder.setRequestAttributes(requestAttributes);
HystrixThreadLocal.threadLocal.set(params);
return delegate.call();
} finally {
RequestContextHolder.resetRequestAttributes();
HystrixThreadLocal.threadLocal.remove();
}
}
}

重写 HystrixConcurrencyStrategy 类的 wrapCallable 方法,在执行请求前包装 HystrixThreadCallable 对象,将需要的对象信息设置进去,这样在下一个线程中就可以拿到了:

1
2
3
4
5
6
7
public class SpringCloudHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {

@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
return new HystrixThreadCallable<>(callable, RequestContextHolder.getRequestAttributes(), HystrixThreadLocal.threadLocal.get());
}
}

配置类:

1
2
3
4
5
6
7
8
@Configuration
public class HystrixThreadContextConfiguration {

@Bean
public SpringCloudHystrixConcurrencyStrategy springCloudHystrixConcurrencyStrategy() {
return new SpringCloudHystrixConcurrencyStrategy();
}
}

启动应用后访问 http://127.0.0.1:8082/user/get/2 后,可以发现即使使用了 Hystrix 的线程池隔离模式,不同的线程也能顺利拿到上一个线程传递过来的信息:

1
2
3
4
5
6
7
UserController      : current thread: 59
UserController : thread local: userId: 2
UserController : RequestContextHolder: userId: 2

UserService : current thread: 84
UserService : thread local: userId: 2
UserService : RequestContextHolder: userId: 2

并发策略共存

由于 HystrixPlugins 的 registerConcurrencyStrategy 方法只能被调用一次,即 Hystrix 不允许注册多个 Hystrix 并发策略,不然就会报错,这就导致了无法和其他并发策略一起使用,因此需要将其他并发策略注入进去,达到并存的目的,如 sleuth 的并发策略也是做了同样的事情。具体的做法就是在构造此并发策略时,找到之前已经存在的并发策略,并保留在类的属性中,在调用过程中,返回之前并发策略的相关信息,如请求变量、连接池、阻塞队列;等请求进来时,既不影响之前的并发策略,也可以包装需要的请求信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class SpringCloudHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {

private HystrixConcurrencyStrategy delegateHystrixConcurrencyStrategy;

@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
return new HystrixThreadCallable<>(callable, RequestContextHolder.getRequestAttributes(), HystrixThreadLocal.threadLocal.get());
}

public SpringCloudHystrixConcurrencyStrategy() {
init();
}

private void init() {
try {
this.delegateHystrixConcurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
if (this.delegateHystrixConcurrencyStrategy instanceof SpringCloudHystrixConcurrencyStrategy) {
return;
}

HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook();
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();

HystrixPlugins.reset();
HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook);
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
} catch (Exception e) {
throw e;
}
}

@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixProperty<Integer> corePoolSize,
HystrixProperty<Integer> maximumPoolSize,
HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
return this.delegateHystrixConcurrencyStrategy.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue);
}

@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
return this.delegateHystrixConcurrencyStrategy.getThreadPool(threadPoolKey, threadPoolProperties);
}

@Override
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
return this.delegateHystrixConcurrencyStrategy.getBlockingQueue(maxQueueSize);
}

@Override
public <T> HystrixRequestVariable<T> getRequestVariable(HystrixRequestVariableLifecycle<T> rv) {
return this.delegateHystrixConcurrencyStrategy.getRequestVariable(rv);
}
}

补充内容

Hystrix 的优势

Hytrix 支持异步调用,支持线程池级别的隔离

这种方式就是通过 RxJava 进行调用,等待完成后进行异步通知调用,但在 HTTP 这种请求中,主线程还是阻塞在等待中。带来的收益无非就是 Hytrix 能对超时进行控制。但缺点也很明显,如果是每个接口创建一个线程池的话,如果接口过多,机器中会创建大量线程,而在 Java 中,线程是属于轻量级的进程,对应是内核线程,进而造成线程的切换。而线程切换的成本也比较高。再者还需要预先给各个资源做线程池大小的分配,并且对于一些使用了 ThreadLocal 的场景不友好。

Hytrix 支持百分比 + 连续错误比率的条件进行降级

这确实比 Sentinel 单纯的统计异常率,或异常数更精细,技术选型具体根据业务去取舍。正如阿里巴巴自己比较的,Sentinel 侧重于流控,而熔断的话 Hytrix 更灵活和专业的,虽然 Hystrix 已经停止开发了,但一般情况下用 Sentinel 代替 Hytrix 也足够了。