软件编程
位置:首页>> 软件编程>> java编程>> Java应用层协议WebSocket实现消息推送

Java应用层协议WebSocket实现消息推送

作者:尚少  发布时间:2022-05-09 07:26:29 

标签:Java,WebSocket,消息推送

前言

  大部分的web开发者,开发的业务都是基于Http协议的:前端请求后端接口,携带参数,后端执行业务代码,再返回结果给前端。作者参与开发的项目,有一个报警推送的功能,服务端实时推送报警信息给浏览器端;还有像抖音里面,如果有人关注、回复你的评论时,抖音就会推送相关消息给你了,你就会收到一条消息。

  有些同学会说了,基于Http协议也能实现啊:前端定时访问后端(每隔3s或者几秒),后端返回消息数据,前端拿到后弹出消息。这种方式太low了,而且每个浏览器都这样,使用系统的人一多,服务器的压力就太大了些。那到底用什么技术手段实现呢?我们的主角就登场了。

  WebSocket是在单个TCP连接上进行全双工通信的应用层协议(Http协议也是应用层),浏览器端和服务端都可主动发送数据给另一端。这样是不是比Http协议更适合消息推送这种场景。

浏览器端

  作者建了一个SpringBoot项目,Html放在src\main\resources\static下:

<!DOCTYPE html>
<html lang="zh" xmlns:th="http://www.thymeleaf.org">
<head>
<!--    解决中文乱码-->
   <meta charset="UTF-8"/>
   <title></title>
   <script type="text/javascript" src="./js/jquery.min.js"></script>
</head>
<body>
   <input id="input1" type="text" /><br/>
   <input type="button" value="浏览器发送服务端" onclick="btnClick()" />
   <input type="button" value="服务端发送浏览器" onclick="btnClick1()" />
   <input type="button" value="重新打开连接" onclick="btnClick2()" />
   <br/>
   <textarea id="textArea" style="height: 50px"></textarea>
<script>
   var ws;
   webSocketInit();
   function webSocketInit() {
       ws =new WebSocket('ws://localhost:8080/bootdemo/webSocket/10086');
       // 获取连接状态
       console.log('ws连接状态[初始]:' + ws.readyState);
       //监听是否连接成功
       ws.onopen = function () {
           console.log('ws连接状态[成功]:' + ws.readyState);
       };
       // 接听服务器发回的信息并处理展示
       ws.onmessage = function (obj) {
           console.log('接收到来自服务器的消息:');
           var txt = $("#textArea").val();
           $("#textArea").val(txt + "\n" + obj.data);
           $("#textArea").scrollTop($("#textArea")[0].scrollHeight);
           //完成通信后关闭WebSocket连接
           // ws.close();
       };
       // 监听连接关闭事件
       ws.onclose = function () {
           // 监听整个过程中websocket的状态
           console.log('ws连接状态[关闭]:' + ws.readyState);
       };
       // 监听并处理error事件
       ws.onerror = function (error) {
           console.log(error);
       };
   }
   function btnClick() {
       console.log("浏览器端发送消息:");
       //连接成功则发送一个数据
       ws.send($("#input1").val());
   }
   function btnClick1() {
       $.ajax({
           url: 'http://localhost:8080/bootdemo/pushWebSocket/publish?' +
           'userId=10086&message=' + $("#input1").val(),
           type: 'GET',
           success: function (data) {
               // console.log(data);
           }
       });
   }
   function btnClick2() {
       webSocketInit();
   }
</script>
</body>
</html>

服务器端

&emsp;&emsp;先引入依赖:

<dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-thymeleaf</artifactId>
   </dependency>
   <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-websocket</artifactId>
   </dependency>
   <dependency>
       <groupId>org.projectlombok</groupId>
       <artifactId>lombok</artifactId>
       <scope>provided</scope>
   </dependency>

&emsp;&emsp;bean上添加@ServerEndpoint,作为WebSocket的服务端。

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
@Component
@Slf4j
@ServerEndpoint("/webSocket/{userId}")
public class WebSocketServer {
   //与某个客户端的连接会话,需要通过它来给客户端发送数据
   private Session session;
   private static final CopyOnWriteArraySet<WebSocketServer> webSockets =
   new CopyOnWriteArraySet<>();
   // 用来存在线连接数
   private static final Map<String, Session> sessionPool =
   new HashMap<String, Session>();
   /**
    * 连接成功调用的方法
    */
   @OnOpen
   public void onOpen(Session session, @PathParam(value = "userId")
   String userId) {
       try {
           this.session = session;
           webSockets.add(this);
           sessionPool.put(userId, session);
       }
       catch (Exception e) {
       }
   }
   /**
    * 收到客户端消息后调用的方法
    */
   @OnMessage
   public void onMessage(String message) {
       log.info("websocket消息: 收到客户端消息:" + message);
   }
   public void sendOneMessage(String userId, String message) {
       Session session = sessionPool.get(userId);
       if (session != null && session.isOpen()) {
           try {
               log.info("服务端推送消息:" + message);
               session.getAsyncRemote().sendText(message);
           }
           catch (Exception e) {
               e.printStackTrace();
           }
       }
   }
}

&emsp;&emsp;进行注册:

@Configuration
public class WebSocketConfigOne {
   /**
    * 这个bean会自动注册使用了@ServerEndpoint注解声明的对象
    * 没有的话会报404
    *
    * @return
    */
   @Bean
   public ServerEndpointExporter serverEndpointExporter() {
       return new ServerEndpointExporter();
   }
}

&emsp;&emsp;推送消息的控制器:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.HashMap;
import java.util.Map;
@Controller
@RequestMapping("/pushWebSocket")
public class WebSocketController {
   @Autowired
   private WebSocketServer webSocketServer;
   @GetMapping("/publish")
   @ResponseBody
   public Map publish(String userId, String message) {
       webSocketServer.sendOneMessage(userId, message);
       HashMap<String, Object> map = new HashMap<>();
       map.put("code", 200);
       return map;
   }
}

&emsp;&emsp;还有我的配置文件application.properties:

&emsp;&emsp;# web port

&emsp;&emsp;server.port=8080

&emsp;&emsp;server.servlet.context-path=/bootdemo

&emsp;&emsp;运行启动类后,访问html(localhost:8080/bootdemo/index.html)如下:

Java应用层协议WebSocket实现消息推送

&emsp;&emsp;有的同学一思索,点击图中的第2个按钮"服务端发送浏览器",你这好像也是前端先请求,再推送的消息;我们的WebSocketController#publish方法,在真实的场景下,可以在后端的定时任务中、消息中间件的消费者端调用,不用前端先发送请求。

&emsp;&emsp;当然SpringBoot有专门构建WebSocket服务端的方式。

&emsp;&emsp;核心配置类:

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.servlet.HandlerMapping;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.HandshakeInterceptor;
import javax.servlet.http.HttpServletRequest;
import java.util.Map;
@Configuration
@EnableWebSocket
@Slf4j
public class WebSocketConfig1 implements WebSocketConfigurer {
   @Override
   public void registerWebSocketHandlers(WebSocketHandlerRegistry
   registry) {
       registry.addHandler(new MyWebSocketHandler(), "/webSocket/{userId}")//设置连接路径和处理
               .setAllowedOrigins("*")
               .addInterceptors(new MyWebSocketInterceptor());//设置 *
   }
   class MyWebSocketInterceptor implements HandshakeInterceptor {
       //前置拦截一般用来注册用户信息,绑定 WebSocketSession
       @Override
       public boolean beforeHandshake(ServerHttpRequest request,
       ServerHttpResponse response, WebSocketHandler wsHandler,
       Map<String, Object> attributes) throws Exception {
           log.info("前置拦截~~");
           if (!(request instanceof ServletServerHttpRequest)) {
               return true;
           }
           HttpServletRequest servletRequest =
           ((ServletServerHttpRequest)request).getServletRequest();
           Map map = (Map)servletRequest.getAttribute(HandlerMapping.
           URI_TEMPLATE_VARIABLES_ATTRIBUTE);
           String userId = (String)map.get("userId");
           attributes.put("userId", userId);
           return true;
       }
       @Override
       public void afterHandshake(ServerHttpRequest request,
       ServerHttpResponse response, WebSocketHandler wsHandler,
       Exception exception) {
           log.info("后置拦截~~");
       }
   }
}

&emsp;&emsp;核心处理器:

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
public class MyWebSocketHandler implements WebSocketHandler {
   private static final Map<String, WebSocketSession> SESSIONS =
   new ConcurrentHashMap<>();
/**
* 建立新的socket连接后回调的方法
*/
   @Override
   public void afterConnectionEstablished(WebSocketSession session)
   throws Exception {
       String userId = (String) session.getAttributes().get("userId");
       SESSIONS.put(userId, session);
   }
/**
* 接收到浏览器端的消息后回调的方法
*/
   @Override
   public void handleMessage(WebSocketSession session,
   WebSocketMessage<?> message) throws Exception {
       String msg = message.getPayload().toString();
       log.info("收到客户端消息:" + msg);
   }
/**
* 连接出错时回调的方法
*/
   @Override
   public void handleTransportError(WebSocketSession session,
   Throwable exception) throws Exception {
       log.info("连接出错");
       if (session.isOpen()) {
           session.close();
       }
   }
/**
* 连接关闭时回调的方法
*/
   @Override
   public void afterConnectionClosed(WebSocketSession session,
   CloseStatus closeStatus) throws Exception {
       log.info("连接关闭:status:" + closeStatus);
   }
/**
* 是否处理部分消息,返回false就行
*/
   @Override
   public boolean supportsPartialMessages() {
       return false;
   }
/**
* 推送消息给浏览器端
*/
   public void sendMessage(String userId, String message) {
       WebSocketSession webSocketSession = SESSIONS.get(userId);
       if (webSocketSession == null || !webSocketSession.isOpen()) {
           return;
       }
       try {
           webSocketSession.sendMessage(new TextMessage(message));
       }
       catch (Exception ex) {
           log.error("推送消息异常:" + ex);
       }
   }
}

&emsp;&emsp;控制器也改造下:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.HashMap;
import java.util.Map;
@Controller
@RequestMapping("/pushWebSocket")
public class WebSocketController {
   @Autowired
   private MyWebSocketHandler handler;
   @GetMapping("/publish")
   @ResponseBody
   public Map publish(String userId, String message) {
       handler.sendMessage(userId, message);
       HashMap<String, Object> map = new HashMap<>();
       map.put("code", 200);
       return map;
   }
}

&emsp;&emsp;前端部分不用做修改,和之前一样的代码。

来源:https://blog.csdn.net/gs2436/article/details/127351318

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com