Spring Cloud 使用 Resilience4j 实现服务熔断的方法
作者:dk168 发布时间:2021-11-09 12:36:10
CircuitBreaker 断路器
服务熔断是为了保护我们的服务,比如当某个服务出现问题的时候,控制打向它的流量,让它有时间去恢复,或者限制一段时间只能有固定数量的请求打向这个服务。这些都是保护措施。我在实际工作中也确实遇到过,数据库出现问题了,进而导致Web服务出现问题了,导致不依赖数据库的服务也出现问题了,出现一连串问题。 这次学习《玩转 Spring 全家桶》,丁雪丰老师给了使用resilience4j的例子。 丁老师的例子是2019年的,这个框架已经修改了些方法,所以我自己也花了些时间来理解了它的用法。现将过程记录下来。
首先POM文件引入
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot2</artifactId>
<version>2.0.2</version>
</dependency>
接着改造之前的Controller方法
@RestController
@RequestMapping("/customer")
@Slf4j
public class BookController {
@Autowired
private BookService bookService;
private CircuitBreaker circuitBreaker;
public BookController(CircuitBreakerRegistry registry) {
circuitBreaker = registry.circuitBreaker("menu");
}
@GetMapping("/menu")
public List<Book> readMenu() {
Supplier<List<Book>> supplier = () -> bookService.getAll();
circuitBreaker.getEventPublisher()
.onEvent(event -> log.info(event.toString()));
try{
return circuitBreaker.executeSupplier(supplier);
}
catch (Exception ex)
{
log.error(ex.getMessage());
return Collections.emptyList();
}
}
}
不同的地方就是引入了CircuitBreaker, 然后使用它将我们的方法“bookService.getAll()”包起来了。
然后在配置文件中添加如下的配置
resilience4j.circuitbreaker.backends.menu.failure-rate-threshold=50
resilience4j.circuitbreaker.backends.menu.wait-duration-in-open-state=60000
resilience4j.circuitbreaker.backends.menu.sliding-window-size=5
resilience4j.circuitbreaker.backends.menu.permitted-number-of-calls-in-half-open-state=2
resilience4j.circuitbreaker.backends.menu.minimum-number-of-calls=2
稍微解释一下这里的配置
failure-rate-threshold=50是说失败率超过50%就熔断,
wait-duration-in-open-state= 60000,是说熔断后等待60S才允许再次调用。
sliding-window-size =5 可以理解为5个请求统计一次,
permitted-number-of-calls-in-half-open-state = 2是说进入半开的状态的时候,还允许请求多少个。
minimum-number-of-calls=2是说最少有多少个请求才开始统计。 这里的参数都是我为了实验设置的,实际情况根据需要进行调整。参数比较多,具体可以参加官方文档
https://resilience4j.readme.io/docs/circuitbreaker
我们来看下实际的效果通过浏览器访问,
首先我们现打开BookService,让它有一次成功的请求,日志会输出
CircuitBreaker 'menu' recorded a successful call.
然后我们将BookService关闭,让它请求失败,日志会输出如下
CircuitBreaker 'menu' recorded an error: 'feign.RetryableException: Connection refused: no further information executing GET http://bookshop-service/book/getAll'. Elapsed time: 2050 ms
CircuitBreaker 'menu' exceeded failure rate threshold. Current failure rate: 50.0
CircuitBreaker 'menu' changed state from CLOSED to OPEN
可以看到断路器已经打开了,
接着我们继续访问会出现,
CircuitBreaker 'menu' recorded a call which was not permitted.
这个时候请求不会打到BookService上面了。就算这个时候我们的BookService恢复正常。
等待60s后进入半Open的状态
CircuitBreaker 'menu' changed state from OPEN to HALF_OPEN
这个时候恢复BookService正常,我们请求也会正常响应了
CircuitBreaker 'menu' recorded a successful call
多请求几次,断路器就从HALF_OPEN变成了CLOSED
CircuitBreaker 'menu' changed state from HALF_OPEN to CLOSED
这里给一个官方的状态图来说明
断路器有三个状态: CLOSED, OPEN, HALF_OPEN。
CLOSED是最开始的状态,也就是关闭状态,流量可以正常通过,当失败比率超过threshold后,断路器打开, 变成OPEN 打开后流量不可以通过;等待一定的时间后,断路器进入半开状态 HALF_OPEN, 这个时候如果失败率低于阈值,断路器进入CLOSED状态,如果超过阈值,断路器继续保证OPEN,再等待,如此往复。
断路器现在还支持设置慢请求,使用起来还是比较方便。对于参数的设置如果不是很理解,可以通过单元测试的方法来加深对它的理解。这里参考https://github.com/eugenp/tutorials/blob/master/libraries-6/src/test/java/com/baeldung/resilence4j/Resilience4jUnitTest.java 上面的例子,给出来个单元测试
interface RemoteService {
int process(int i);
}
private RemoteService service;
@Test
public void whenCircuitBreakerIsUsed_thenItWorksAsExpected() {
service = mock(RemoteService.class);
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
// Percentage of failures to start short-circuit
.failureRateThreshold(20)
.minimumNumberOfCalls(5)
.build();
CircuitBreakerRegistry registry = CircuitBreakerRegistry.of(config);
CircuitBreaker circuitBreaker = registry.circuitBreaker("my");
Function<Integer, Integer> decorated = CircuitBreaker.decorateFunction(circuitBreaker, service::process);
when(service.process(anyInt())).thenThrow(new RuntimeException());
circuitBreaker.getEventPublisher()
.onEvent(event ->
{
log.info(event.toString());
});
for (int i = 0; i < 10; i++) {
try {
decorated.apply(i);
} catch (Exception ignore) {
}
}
verify(service, times(5)).process(any(Integer.class));
}
这里设置最少请求5次,失败率超过20%就熔断,然后我们请求了10次,实际上只调用了Service5次。
对于其它参数,你可以调整后,根据需要来验证是否符合预期。它的日志输出如下
CircuitBreaker 'my' recorded an error: 'java.lang.RuntimeException'. Elapsed time: 2 ms
CircuitBreaker 'my' recorded an error: 'java.lang.RuntimeException'. Elapsed time: 0 ms
CircuitBreaker 'my' recorded an error: 'java.lang.RuntimeException'. Elapsed time: 0 ms
CircuitBreaker 'my' recorded an error: 'java.lang.RuntimeException'. Elapsed time: 0 ms
CircuitBreaker 'my' recorded an error: 'java.lang.RuntimeException'. Elapsed time: 0 ms
CircuitBreaker 'my' exceeded failure rate threshold. Current failure rate: 100.0
CircuitBreaker 'my' changed state from CLOSED to OPEN
CircuitBreaker 'my' recorded a call which was not permitted.
CircuitBreaker 'my' recorded a call which was not permitted.
CircuitBreaker 'my' recorded a call which was not permitted.
CircuitBreaker 'my' recorded a call which was not permitted.
CircuitBreaker 'my' recorded a call which was not permitted.
可以看到5次过后,就开始打开断路器,后面的call就不被允许了。
隔舱Bulkhead
Resilience4j 里面的Bulkhead可以简单的理解为允许多少个并发访问。我们这里还是通过单元测试的方法来演示它的功能
@Test
public void whenBulkheadIsUsed_thenItWorksAsExpected() throws InterruptedException {
service = mock(RemoteService.class);
BulkheadConfig config = BulkheadConfig.custom().maxConcurrentCalls(2).build();
BulkheadRegistry registry = BulkheadRegistry.of(config);
Bulkhead bulkhead = registry.bulkhead("my");
Function<Integer, Integer> decorated = Bulkhead.decorateFunction(bulkhead, service::process);
try {
callAndBlock(decorated);
}
catch(BulkheadFullException ex)
{
log.error("isfull");
}
finally
{
verify(service, times(2)).process(any(Integer.class));
}
}
private void callAndBlock(Function<Integer, Integer> decoratedService) throws InterruptedException {
when(service.process(anyInt())).thenAnswer(invocation -> {
log.info("service called");
return null;
});
ArrayList<Integer> numberList = new ArrayList<Integer>();
for(int i = 0;i<10;i++)
{
numberList.add(i);
}
numberList.parallelStream().forEach((i)->{
try {
decoratedService.apply(i);
}
catch (Exception ex)
{
log.error("meet error " + ex.getMessage());
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
}
首先我们解读一下callAndBlock, 它会并发的去执行一个function. 如果我们不用隔舱,它的输出会是这样。
2022-12-28T15:22:52.010+08:00 INFO 37276 --- [onPool-worker-4] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:22:52.010+08:00 INFO 37276 --- [onPool-worker-9] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:22:52.010+08:00 INFO 37276 --- [onPool-worker-5] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:22:52.010+08:00 INFO 37276 --- [onPool-worker-3] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:22:52.010+08:00 INFO 37276 --- [onPool-worker-1] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:22:52.010+08:00 INFO 37276 --- [onPool-worker-6] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:22:52.010+08:00 INFO 37276 --- [onPool-worker-7] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:22:52.010+08:00 INFO 37276 --- [onPool-worker-8] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:22:52.010+08:00 INFO 37276 --- [onPool-worker-2] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:22:52.011+08:00 INFO 37276 --- [ main] c.k.r.bookcustomer.Resilience4jUnitTest : service called
可以看到启动了10个线程去访问方法。加了隔舱后,隔舱限定了一次只能两个,输出如下
2022-12-28T15:33:48.648+08:00 ERROR 32256 --- [onPool-worker-4] c.k.r.bookcustomer.Resilience4jUnitTest : meet error Bulkhead 'my' is full and does not permit further calls
2022-12-28T15:33:48.648+08:00 ERROR 32256 --- [onPool-worker-6] c.k.r.bookcustomer.Resilience4jUnitTest : meet error Bulkhead 'my' is full and does not permit further calls
2022-12-28T15:33:48.648+08:00 ERROR 32256 --- [onPool-worker-7] c.k.r.bookcustomer.Resilience4jUnitTest : meet error Bulkhead 'my' is full and does not permit further calls
2022-12-28T15:33:48.648+08:00 ERROR 32256 --- [onPool-worker-4] c.k.r.bookcustomer.Resilience4jUnitTest : meet error Bulkhead 'my' is full and does not permit further calls
2022-12-28T15:33:48.648+08:00 ERROR 32256 --- [onPool-worker-8] c.k.r.bookcustomer.Resilience4jUnitTest : meet error Bulkhead 'my' is full and does not permit further calls
2022-12-28T15:33:48.648+08:00 ERROR 32256 --- [onPool-worker-5] c.k.r.bookcustomer.Resilience4jUnitTest : meet error Bulkhead 'my' is full and does not permit further calls
2022-12-28T15:33:48.648+08:00 ERROR 32256 --- [onPool-worker-2] c.k.r.bookcustomer.Resilience4jUnitTest : meet error Bulkhead 'my' is full and does not permit further calls
2022-12-28T15:33:48.648+08:00 ERROR 32256 --- [onPool-worker-3] c.k.r.bookcustomer.Resilience4jUnitTest : meet error Bulkhead 'my' is full and does not permit further calls
2022-12-28T15:33:48.650+08:00 INFO 32256 --- [onPool-worker-1] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:33:48.650+08:00 INFO 32256 --- [ main] c.k.r.bookcustomer.Resilience4jUnitTest : service called
可以看到只有两次成功的访问,其它的访问都被block了。
限速器RateLimiter
RateLimiter的功能是限定一段时间内允许多少次访问,还是使用和Bulkhead一样的例子一样
@Test
public void whenRateLimiterInUse_thenItWorksAsExpected() throws InterruptedException {
service = mock(RemoteService.class);
RateLimiterConfig config = RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofMillis(1000))
.limitForPeriod(4)
.timeoutDuration(Duration.ofMillis(25))
.build();
RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);
RateLimiter rateLimiter = rateLimiterRegistry
.rateLimiter("name1");
CheckedFunction<Integer, Integer> decorated = RateLimiter
.decorateCheckedFunction(rateLimiter, service::process);
try {
callAndBlock(decorated);
}
catch(Exception ex)
{
log.error("isfull");
}
finally
{
verify(service, times(4)).process(any(Integer.class));
}
}
private void callAndBlock(CheckedFunction<Integer, Integer> decoratedService) throws InterruptedException {
when(service.process(anyInt())).thenAnswer(invocation -> {
log.info("service called");
return null;
});
ArrayList<Integer> numberList = new ArrayList<Integer>();
for(int i = 0;i<10;i++)
{
numberList.add(i);
}
numberList.parallelStream().forEach((i)->{
try {
decoratedService.apply(i);
}
catch (Exception ex)
{
log.error("meet error " + ex.getMessage());
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
}
我们这里故意设置1S中允许访问4次,实际的运行情况也是只允许了4次。日志输出如下
2022-12-28T15:39:52.027+08:00 INFO 35236 --- [onPool-worker-2] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:39:52.027+08:00 INFO 35236 --- [onPool-worker-5] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:39:52.027+08:00 INFO 35236 --- [ main] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:39:52.027+08:00 INFO 35236 --- [onPool-worker-7] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:39:52.053+08:00 ERROR 35236 --- [onPool-worker-6] c.k.r.bookcustomer.Resilience4jUnitTest : meet error RateLimiter 'name1' does not permit further calls
2022-12-28T15:39:52.060+08:00 ERROR 35236 --- [onPool-worker-3] c.k.r.bookcustomer.Resilience4jUnitTest : meet error RateLimiter 'name1' does not permit further calls
2022-12-28T15:39:52.060+08:00 ERROR 35236 --- [onPool-worker-9] c.k.r.bookcustomer.Resilience4jUnitTest : meet error RateLimiter 'name1' does not permit further calls
2022-12-28T15:39:52.060+08:00 ERROR 35236 --- [onPool-worker-1] c.k.r.bookcustomer.Resilience4jUnitTest : meet error RateLimiter 'name1' does not permit further calls
2022-12-28T15:39:52.060+08:00 ERROR 35236 --- [onPool-worker-8] c.k.r.bookcustomer.Resilience4jUnitTest : meet error RateLimiter 'name1' does not permit further calls
2022-12-28T15:39:52.075+08:00 ERROR 35236 --- [onPool-worker-4] c.k.r.bookcustomer.Resilience4jUnitTest : meet error RateLimiter 'name1' does not permit further calls
限速器这个功能只能限制在整体性能上面,如果要限制某个用户,只能某段时间访问多少次,它就做不到了。
Relilience4j 里面还提供了Retry,TimeLimiter,Cache. 感觉不是很有必要的功能, Retry在spring里面有相应的功能了,没有必要专门为了使用它而多加个包。 TimeLimiter,Cache 我感觉不是很受重视的功能,连例子文档都懒得提供,可见意义不大。
来源:https://www.cnblogs.com/dk168/archive/2022/12/28/17010315.html
猜你喜欢
- 拼图小游戏,学习阶段。很多不足,改进了一下演示图片:J_Puzzle.javaimport java.awt.BorderLayout;im
- 帧率(Frame rate)是用于测量显示帧数的量度。所谓的测量单位为每秒显示帧数(Frames per Second,简称:FPS)或“赫
- 默认情况下,插件 spring-boot-maven-plugin 会把整个项目打包成一个可运行的Jar包(即所谓的Flat Jar),导致
- @PathVariable接收两个参数首先@PathVariable无法接收对象,但是可以接收多个值var data = obj.data;
- 端口设置和contextpath的配置端口设置Spring boot 默认端口是8080,如果想要进行更改的话,只需要修改applicato
- 一、项目概述之前有不少粉丝私信我说,能不能用Android原生的语言开发一款在手机上运行的游戏呢?说实话,使用java语言直接开发游戏这个需
- 问题描述ResultSet 表示 select 语句的查询结果集。ResultSet 对象具有指向其当前数据行的指针, 最初,指针被置于第一
- 目前很多业务使用微服务架构,服务模块划分有这2种方式:服务功能划分业务划分不管哪种方式,一次接口调用都需要多个服务协同完成,其中一个服务出现
- Bezier Curve算法是根据参数曲线方程来得到光滑曲线的一种算法,曲线方程的参数由控制点决定。其本质是由调和函数根据控制点插值而成,其
- PrintStream 介绍PrintStream 是打印输出流,它继承于FilterOutputStream。PrintStream 是用
- 本文为大家分享了使用entrySet方法获取Map集合中元素的具体代码,供大家参考,具体内容如下/*--------------------
- 记得当初自己刚开始学习Java的时候,对Java的IO流这一块特别不明白,所以写了这篇随笔希望能对刚开始学习Java的人有所帮助,也方便以后
- Flyweight定义:避免大量拥有相同内容的小类的开销(如耗费内存),使大家共享一个类(元类)。为什么使用共享模式/享元模式面向对象语言的
- 将某个项目从Spring Boot1升级Spring Boot2之后出现如下报错,查了很多不同的解决方法都没有解决:Spring boot2
- 本文实例讲述了Android编程之绘制文本(FontMetrics)实现方法。分享给大家供大家参考,具体如下:Canvas 作为绘制文本时,
- 测试代码pom.xml:<?xml version="1.0" encoding="UTF-8"
- 1.挂起函数挂起函数在Kotlin协程中是一个比较重要的知识点,协程的非阻塞式、Channel、Flow等API都对它有充分的理解才能在学习
- 最新开发新项目的时候,要做分享项目,要求分享有微信,微信朋友圈,QQ,QQ空间,新浪微博这五个,所分享内容包括,分享纯图片,纯文字,图文类型
- @ApiImplicitParam作用在方法上,表示单独的请求参数参数name:参数名。value:参数的具体意义,作用。required:
- 在前面一篇Java Comparable和Comparator对比详解中,对于java中的排序方法进行比较和具体剖析,主要是针对 Compa