软件编程
位置:首页>> 软件编程>> java编程>> Springboot实现高吞吐量异步处理详解(适用于高并发场景)

Springboot实现高吞吐量异步处理详解(适用于高并发场景)

作者:诚信天下  发布时间:2023-08-20 23:40:35 

标签:Springboot,高吞吐量,异步

技术要点


org.springframework.web.context.request.async.DeferredResult<T>

示例如下:

1.   新建Maven项目  async

2.   pom.xml


<project xmlns="http://maven.apache.org/POM/4.0.0"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
   http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>
 <groupId>com.java</groupId>
 <artifactId>async</artifactId>
 <version>1.0.0</version>

<parent>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-parent</artifactId>
   <version>2.0.5.RELEASE</version>
 </parent>

<dependencies>

<!-- Spring Boot -->
   <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
   </dependency>

<!-- 热部署 -->
   <dependency>
     <groupId>org.springframework</groupId>
     <artifactId>springloaded</artifactId>
     <version>1.2.8.RELEASE</version>
     <scope>provided</scope>
   </dependency>
   <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-devtools</artifactId>
     <scope>provided</scope>
   </dependency>

</dependencies>

<build>
   <finalName>${project.artifactId}</finalName>
   <plugins>
     <plugin>
       <groupId>org.apache.maven.plugins</groupId>
       <artifactId>maven-compiler-plugin</artifactId>
       <configuration>
         <source>1.8</source>
         <target>1.8</target>
         <encoding>UTF-8</encoding>
       </configuration>
     </plugin>

<plugin>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-maven-plugin</artifactId>
       <executions>
         <execution>
           <goals>
             <goal>repackage</goal>
           </goals>
         </execution>
       </executions>
     </plugin>
   </plugins>
 </build>
</project>

3.   AsyncStarter.java


package com.java;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class AsyncStarter {

public static void main(String[] args) {
   SpringApplication.run(AsyncStarter.class, args);
 }
}

4.   AsyncVo.java


package com.java.vo;

import org.springframework.web.context.request.async.DeferredResult;

/**
* 存储异步处理信息
*
* @author Logen
*
* @param <I> 接口输入参数
* @param <O> 接口返回参数
*/
public class AsyncVo<I, O> {

/**
  * 请求参数
  */
 private I params;

/**
  * 响应结果
  */
 private DeferredResult<O> result;

public I getParams() {
   return params;
 }

public void setParams(I params) {
   this.params = params;
 }

public DeferredResult<O> getResult() {
   return result;
 }

public void setResult(DeferredResult<O> result) {
   this.result = result;
 }
}

5.   RequestQueue.java


package com.java.queue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.springframework.stereotype.Component;

import com.java.vo.AsyncVo;

/**
* 存放所有异步处理接口请求队列的对象,一个接口对应一个队列
*
* @author Logen
*
*/
@Component
public class RequestQueue {

/**
  * 处理下订单接口的队列,设置缓冲容量为50
  */
 private BlockingQueue<AsyncVo<String, Object>> orderQueue = new LinkedBlockingQueue<>(50);

public BlockingQueue<AsyncVo<String, Object>> getOrderQueue() {
   return orderQueue;
 }
}

6.   OrderTask.java


package com.java.task;

import java.util.HashMap;
import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.java.queue.RequestQueue;
import com.java.vo.AsyncVo;

/**
* 处理订单接口的任务,每个任务类处理一种接口
*
* @author Logen
*
*/
@Component
public class OrderTask extends Thread {

@Autowired
 private RequestQueue queue;

private boolean running = true;

@Override
 public void run() {
   while (running) {
     try {
       AsyncVo<String, Object> vo = queue.getOrderQueue().take();
       System.out.println("[ OrderTask ]开始处理订单");

String params = vo.getParams();
       Thread.sleep(3000);
       Map<String, Object> map = new HashMap<>();
       map.put("params", params);
       map.put("time", System.currentTimeMillis());

vo.getResult().setResult(map);

System.out.println("[ OrderTask ]订单处理完成");
     } catch (InterruptedException e) {
       e.printStackTrace();
       running = false;
     }

}
 }

public void setRunning(boolean running) {
   this.running = running;
 }
}

7.   QueueListener.java


package com.java.listener;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.java.task.OrderTask;

/**
* 队列 * ,初始化启动所有监听任务
*
* @author Logen
*
*/
@Component
public class QueueListener {

@Autowired
 private OrderTask orderTask;

/**
  * 初始化时启动监听请求队列
  */
 @PostConstruct
 public void init() {
   orderTask.start();
 }

/**
  * 销毁容器时停止监听任务
  */
 @PreDestroy
 public void destory() {
   orderTask.setRunning(false);
 }

}

8.   OrderController.java


package com.java.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;

import com.java.queue.RequestQueue;
import com.java.vo.AsyncVo;

/**
* <blockquote>
*
* <pre>
*
* 模拟下单处理,实现高吞吐量异步处理请求
*
* 1、 Controller层接口只接收请求,不进行处理,而是把请求信息放入到对应该接口的请求队列中
* 2、 该接口对应的任务类监听对应接口的请求队列,从队列中顺序取出请求信息并进行处理
*
* 优点:接口几乎在收到请求的同时就已经返回,处理程序在后台异步进行处理,大大提高吞吐量
*
*
* </pre>
*
* </blockquote>
*
* @author Logen
*
*/
@RestController
public class OrderController {

@Autowired
 private RequestQueue queue;

@GetMapping("/order")
 public DeferredResult<Object> order(String number) throws InterruptedException {
   System.out.println("[ OrderController ] 接到下单请求");
   System.out.println("当前待处理订单数: " + queue.getOrderQueue().size());

AsyncVo<String, Object> vo = new AsyncVo<>();
   DeferredResult<Object> result = new DeferredResult<>();

vo.setParams(number);
   vo.setResult(result);

queue.getOrderQueue().put(vo);
   System.out.println("[ OrderController ] 返回下单结果");
   return result;
 }
}

 9.   运行 AsyncStarter.java ,启动测试

浏览器输入 http://localhost:8080/order?number=10001

正常情况处理3秒返回,返回结果如下

{"time":1548241500718,"params":"10001"}

观察控制台打印日志,如下所示:


[ OrderController ] 接到下单请求
当前待处理订单数: 0
[ OrderController ] 返回下单结果
[ OrderTask ]开始处理订单
[ OrderTask ]订单处理完成

结论:Controller层几乎在接收到请求的同时就已经返回,处理程序在后台异步处理任务。 

快速多次刷新浏览器,目的为了高并发测试,观察控制台打印信息

现象:Controller层快速返回,待处理请求在队列中开始增加,异步处理程序在按顺序处理请求。

优点:对客户端响应时间不变,但提高了服务端的吞吐量。大大提升高并发处理性能!

来源:https://www.cnblogs.com/jonban/p/async.html

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com