Dubbo2 巩固教程之六

大纲

前言

学习资源

Dubbo 扩展机制

SPI 扩展机制

调用拦截扩展

扩展说明

  • Dubbo 在服务提供方和服务消费方都支持调用过程拦截(Filter)。许多 Dubbo 的核心功能都是基于 Filter 扩展点实现的。每次执行远程方法调用时,都会触发 Filter 链执行,因此在自定义 Filter 时要注意其对性能的影响。
  • 约定:
    • 用户自定义 Filter 默认在内置 Filter 之后。
    • 特殊值 default,表示缺省扩展点插入的位置。比如:filter="xxx,default,yyy",表示 xxx 在缺省 Filter 之前,yyy 在缺省 Filter 之后。
    • 特殊符号 -,表示剔除。比如:filter="-foo1",剔除添加缺省扩展点 foo1。比如:filter="-default",剔除添加所有缺省扩展点。
    • 当 Provider 和 Service 同时配置的 Filter 时,会累加所有 Filter,而不是覆盖。
      • 比如:<dubbo:provider filter="xxx,yyy"/><dubbo:service filter="aaa,bbb" />,则 xxxyyyaaabbb 均会生效。
      • 如果要覆盖,则需要配置:<dubbo:service filter="-xxx,-yyy,aaa,bbb" />

扩展接口

在 Dubbo 中,Filter 的扩展接口是 org.apache.dubbo.rpc.Filter,已知的 Filter 扩展实现有以下这些:

  • org.apache.dubbo.rpc.filter.EchoFilter
  • org.apache.dubbo.rpc.filter.GenericFilter
  • org.apache.dubbo.rpc.filter.GenericImplFilter
  • org.apache.dubbo.rpc.filter.TokenFilter
  • org.apache.dubbo.rpc.filter.AccessLogFilter
  • org.apache.dubbo.rpc.filter.CountFilter
  • org.apache.dubbo.rpc.filter.ActiveLimitFilter
  • org.apache.dubbo.rpc.filter.ClassLoaderFilter
  • org.apache.dubbo.rpc.filter.ContextFilter
  • org.apache.dubbo.rpc.filter.ConsumerContextFilter
  • org.apache.dubbo.rpc.filter.ExceptionFilter
  • org.apache.dubbo.rpc.filter.ExecuteLimitFilter
  • org.apache.dubbo.rpc.filter.DeprecatedFilter

扩展配置

  • 服务消费方的调用过程拦截配置
1
2
3
4
5
<!-- 服务消费方调用过程拦截 -->
<dubbo:reference filter="xxx,yyy" />

<!-- 服务消费方调用过程缺省拦截器,将拦截所有reference -->
<dubbo:consumer filter="xxx,yyy"/>
  • 服务提供方的的调用过程拦截配置
1
2
3
4
5
<!-- 服务提供方调用过程拦截 -->
<dubbo:service filter="xxx,yyy" />

<!-- 服务提供方调用过程缺省拦截器,将拦截所有service -->
<dubbo:provider filter="xxx,yyy"/>

扩展示例

  • Maven 项目结构如下,由于 Filter 是基于 SPI 机制实现的,因此需要在 resources/META-INF/dubbo/ 目录创建一个配置文件,文件名必须是 org.apache.dubbo.rpc.Filter,而文件内容是 xxx=com.xxx.XxxFilter
1
2
3
4
5
6
7
8
9
10
src
|-main
|-java
|-com
|-xxx
|-XxxFilter.java (实现了 Filter 接口)
|-resources
|-META-INF
|-dubbo
|-org.apache.dubbo.rpc.Filter (纯文本文件,内容为:xxx=com.xxx.XxxFilter)
  • 实现 Dubbo 的 Filter 接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.xxx;

import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;

public class XxxFilter implements Filter {
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// before filter ...
Result result = invoker.invoke(invocation);
// after filter ...
return result;
}
}
  • 如果自定义的 Filter 需要被 Dubbo 自动激活(不需要用户手动配置,比如 <dubbo:reference filter="xxx" />),则需要添加 @Activate 注解(自动激活扩展点)。一般情况下,@Activate 注解需要配置 group 属性(表示应用场景:Consumer 或者 Provider),还可以配置 order 属性(表示执行顺序,数字越小 Filter 越先执行,默认值是 0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.xxx;

import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.Activate;

@Activate(group = {CommonConstants.PROVIDER, CommonConstants.CONSUMER})
public class XxxFilter implements Filter {
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// do something before filter ...
Result result = invoker.invoke(invocation);
// do something after filter ...
return result;
}
}
  • 最终,resources/META-INF/dubbo/org.apache.dubbo.rpc.Filter 配置文件的内容如下
1
xxx=com.xxx.XxxFilter

扩展实战

案例说明

本节将通过 Dubbo 的调用过程拦截机制(Filter)来实现服务调用的熔断功能,熔断器由 Hystrix 提供支持。扩展 Hystrix 熔断机制的实现步骤如下:

  • (1) 在 POM 文件引入 Hystrix 依赖
  • (2) 实现 Dubbo 的 Filter 接口
  • (3) 通过命令模式(继承方式)集成 Hystrix
  • (4) 在 META-INF/dubbo/ 目录添加接口配置文件
  • (5) 服务消费方配置 Filter 相关参数
  • (6) 运行测试功能

特别注意

  • Hystrix 熔断器的使用和 Hystrix 线程池的配置可以看 这里 的介绍。
  • Hystrix 和 Sentinel 一样,都是 Client-Side Protection(客户端保护)。
  • Hystrix 的熔断器永远运行在 Consumer(服务消费方)侧,而不是运行在 Provider(服务提供方)侧。
案例代码

下载代码

  • 本节完整的案例代码可以直接从 GitHub 下载对应章节 dubbo-lesson-17。
服务消费方代码
  • 在 Consumer(服务消费方)中,引入 Maven 依赖
1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>2.7.23</version>
</dependency>

<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>1.5.18</version>
</dependency>
  • 在 Consumer(服务消费方)中,继承 Hystrix 的 HystrixCommand
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.clay.dubbo.consumer.command;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.exception.HystrixRuntimeException;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;

import java.util.concurrent.CompletableFuture;

public class DubboHystrixCommand extends HystrixCommand<Result> {

private final Invoker<?> invoker;
private final Invocation invocation;

public DubboHystrixCommand(Setter setter, Invoker<?> invoker, Invocation invocation) {
super(setter);
this.invoker = invoker;
this.invocation = invocation;
}

/**
* 业务逻辑
*/
@Override
protected Result run() throws Exception {
// 发起 RPC 调用
Result result = invoker.invoke(invocation);

// 如果 Provider 返回异常,主动抛出 RpcException,让 Hystrix 统计失败次数,超过阈值时触发熔断
if (result.hasException()) {
throw new HystrixRuntimeException(
HystrixRuntimeException.FailureType.COMMAND_EXCEPTION, this.getClass(),
"Hystrix detected provider exception for method " + invoker.getInterface() + "::" + invocation.getMethodName(),
result.getException(), null
);
}

return result;
}

/**
* 降级逻辑
*/
@Override
protected Result getFallback() {
// 返回兜底数据(默认数据)
CompletableFuture future = new CompletableFuture();
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(future, invocation);
asyncRpcResult.setValue("I'm default value");
return asyncRpcResult;
}

}
  • 在 Consumer(服务消费方)中,实现 Dubbo 的 Filter 接口,添加 @Activate 注解(自动激活扩展点),让 Dobbuo 自动激活 Filter(不需要用户手动配置,比如 <dubbo:reference filter="xxx" />
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package com.clay.dubbo.consumer.filter;

import com.clay.dubbo.consumer.command.DubboHystrixCommand;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* 自定义 Filter,基于 Hystrix 实现服务调用的熔断功能
*/
@Activate(group = CommonConstants.CONSUMER, order = 10)
public class HystrixCircuitBreakerFilter implements Filter {

// Hystrix 线程池的核心线程数
private static final int DEFAULT_THREADPOOL_CORE_SIZE = 10;

private static final String KEY_THREADPOOL_CORE_SIZE = "hystrixCoreSize";

private static final Logger logger = LoggerFactory.getLogger(HystrixCircuitBreakerFilter.class);

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
logger.info("==> start to invoke hystrix circuit breaker filter");

HystrixCommand.Setter setter = getSetter(invoker, invocation);
DubboHystrixCommand command = new DubboHystrixCommand(setter, invoker, invocation);
Result result = command.execute();

logger.info("==> end to invoke hystrix circuit breaker filter");

return result;
}

/**
* 获取 Hystrix 命令配置对象(Setter)
*/
private HystrixCommand.Setter getSetter(Invoker<?> invoker, Invocation invocation) {
// 接口全限定名
String interfaceName = invoker.getInterface().getName();

// 方法名
String methodName = invocation.getMethodName();

// Dubbo URL
URL url = invoker.getUrl();

// 获取线程池的核心线程数(可通过 Dubbo URL 参数进行配置)
int coreSize = url == null ? DEFAULT_THREADPOOL_CORE_SIZE : url.getParameter(KEY_THREADPOOL_CORE_SIZE, DEFAULT_THREADPOOL_CORE_SIZE);

// 组装 Hystrix 命令配置对象(Setter)
return HystrixCommand.Setter
// 指定命令分组名(必须项,无默认值),主要用于统计
.withGroupKey(HystrixCommandGroupKey.Factory.asKey(interfaceName))
// 指定命令名称(可自定义,如果是服务调用,这里就写具体的接口名,如果是自定义的操作,就写自己的命令名称),默认是 HystrixCommand 子类的类名
.andCommandKey(HystrixCommandKey.Factory.asKey(methodName))
// 指定线程池名称,默认是 HystrixCommandGroupKey 的名称
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(invoker.getInterface().getSimpleName()))
// 配置命令属性
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
// 设置执行超时时间(默认是 1000 毫秒)
.withExecutionTimeoutInMilliseconds(500)
// 设置统计滚动时间窗口的长度(默认是 10000 毫秒)
.withMetricsRollingStatisticalWindowInMilliseconds(100000)
// 设置触发熔断所需的最小请求数。(默认是 20)
.withCircuitBreakerRequestVolumeThreshold(8)
// 设置请求错误百分比阈值(默认是 50,即使 50%)
.withCircuitBreakerErrorThresholdPercentage(40)
// 设置熔断器打开后休眠的时间窗口(默认是 5000 毫秒)
.withCircuitBreakerSleepWindowInMilliseconds(10000)
// 设置资源隔离方式(默认是线程池隔离)
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
)
// 配置线程池属性
.andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter()
// 设置线程池的核心线程数
.withCoreSize(coreSize)
);
}

}
  • 在 Consumer(服务消费方)中,往 resources/META-INF/dubbo/ 目录里面创建一个配置文件(整体目录结果如下),文件名必须是 org.apache.dubbo.rpc.Filter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
src/
└── main
├── java
│   └── com
│   └── clay
│   └── dubbo
│   └── consumer
│   ├── command
│   │   └── DubboHystrixCommand.java
│   ├── ConsumerApplication.java
│   ├── controller
│   │   └── DemoController.java
│   └── filter
│   └── HystrixCircuitBreakerFilter.java
└── resources
├── application.yml
└── META-INF
└── dubbo
└── org.apache.dubbo.rpc.Filter
  • 在 Consumer(服务消费方)中,添加配置文件 org.apache.dubbo.rpc.Filter 的内容(如下)
1
hystrix=com.clay.dubbo.consumer.filter.HystrixCircuitBreakerFilter
  • 在 Consumer(服务消费方)中,创建 YML 配置文件(application.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
server:
port: 9091

spring:
application:
name: dubbo-consumer-application

dubbo:
# 服务信息
application:
name: ${spring.application.name}
# 注册中心地址
registry:
address: zookeeper://192.168.2.1.3:2181
# 扫描 Dubbo 相关的注解
scan:
base-packages: com.clay.dubbo.consumer
# 消费行为配置
consumer:
# 关闭了启动检查,这样消费者启动时,不会到注册中心里面检查服务提供者是否存在
check: false
# 建议统一配置为不重试请求,对于查询等幂等操作来说可以在代码中单独配置重试次数
retries: 0
# 默认情况下限制请求必须在 1000 毫秒内完成,对于具体服务可以在代码中单独配置
timeout: 1000
  • 在 Consumer(服务消费方)中,创建用于测试的 Controller
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.clay.dubbo.consumer.controller;

import com.clay.dubbo.service.DemoService;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.config.annotation.DubboReference;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
public class DemoController {

/**
* 引用 Dubbo 服务
*/
@DubboReference(parameters = {"hystrixCoreSize=15"}) // 指定Hystrix线程池的核心线程数
private DemoService demoService;

@GetMapping("/sayHello/{name}")
public String sayHello(@PathVariable("name") String name) {
String result = demoService.sayHello(name);
log.info("===> " + result);
return result;
}

}

如果 Dubbo 使用的是 XML 配置文件,可以参考以下配置内容:

1
2
3
4
5
<!-- 引用 Dubbo 服务 -->
<dubbo:reference id="demoService" interface="com.clay.dubbo.service.DemoService">
<!-- 指定 Hystrix 线程池的核心线程数 -->
<dubbo:parameter key="hystrixCoreSize" value="15"/>
</dubbo:reference>
服务提供方代码
  • 在 Provider(服务提供方)中,引入 Maven 依赖
1
2
3
4
5
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>2.7.23</version>
</dependency>
  • 在 Provider(服务提供方)中,创建服务类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 暴露服务
*/
@DubboService
public class DemoServiceImpl implements DemoService {

@Override
public String sayHello(String name) {
// 模拟业务执行出错
if (name.length() <= 5) {
throw new RuntimeException("name is to short");
}
return "Hello " + name;
}

}
  • 在 Provider(服务提供方)中,创建 YML 配置文件(application.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
server:
port: 9090

spring:
application:
name: dubbo-provider-application

dubbo:
# 服务信息
application:
name: ${spring.application.name}
# 注册中心地址
registry:
address: zookeeper://192.168.2.1.3:2181
# 服务提供者的协议
protocol:
name: dubbo
port: 20880
# 扫描 Dubbo 相关的注解
scan:
base-packages: com.clay.dubbo.provider
案例测试
  • (1) 频繁调用接口触发异常

    • 浏览器多次访问 http://127.0.0.1:9091/sayHello/jim 接口。
    • 由于参数 name 的长度过短,Provider 会抛出 RuntimeException 异常。
    • Hystrix 捕获异常后,会触发服务降级,返回 "I'm default value" 给浏览器。
    • 当多次调用接口失败(例如 10 秒内至少有 8 次请求,其中 40% 请求处理失败)时,Hystrix 的断路器会打开(OPEN 状态)。
    • 断路器在打开状态下会持续 10 秒,之后会尝试放行部分请求以检测服务是否恢复。
  • (2) 断路器打开期间的请求

    • 浏览器在 Hystrix 断路器打开期间多次访问 http://127.0.0.1:9091/sayHello/caradoc 接口。
    • 即使参数 name 的长度满足要求,Hystrix 也会直接走降级逻辑,返回 "I'm default value" 给浏览器。
    • 此时,Hystrix 不会将请求发送给 Provider,因为断路器处于打开状态(OPEN)。
  • (3) 断路器关闭后的请求

    • 等待一段时间(例如 10 秒)后,浏览器再次频繁访问 http://127.0.0.1:9091/sayHello/caradoc 接口。
    • 可以发现接口调用正常,并返回 "Hello caradoc" 字符串,说明 Hystrix 的断路器已关闭(CLOSE),服务调用恢复正常。

参考资料