springboot整合SSE

2023-09-14 15:09:34

SSE简介

SSE(Server Sent Event),是一种可以主动从服务端推送消息的技术。SSE的本质其实就是一个HTTP的长连接,只不过它给客户端发送的不是一次性的数据包,而是一个stream流,格式为text/event-stream。所以客户端不会关闭连接,会一直等着服务器发过来的新的数据流。

SSE服务端代码

springboot中封装了sse代码,不需要额外的依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
public class SseEmitterServer {

    private static final Logger LOGGER = LoggerFactory.getLogger(SseEmitterServer.class);

    /**
     * 当前连接数
     */
    private static AtomicInteger count = new AtomicInteger(0);

    private static Map<Integer, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    public static SseEmitter connect(Integer userId){
        // 设置超时日期,0表示不过期
        SseEmitter sseEmitter = new SseEmitter(0L);

        // 注册回调
        sseEmitter.onCompletion(completionCallBack(userId));
        sseEmitter.onError(errorCallBack(userId));
        sseEmitter.onTimeout(timeoutCallBack(userId));
        sseEmitterMap.put(userId,sseEmitter);
        count.getAndIncrement();
        LOGGER.info("创建新SSE连接,连接用户编号:{}",userId);
        LOGGER.info("现有连接用户:"+sseEmitterMap.keySet());
        return sseEmitter;
    }

    /**
     * 给指定用户发信息
     */
    public static void sendMessage(Integer userId,String message){
        if (!sseEmitterMap.containsKey(userId)) {
            connect(userId);
        }
        try {
            sseEmitterMap.get(userId).send(message);
            LOGGER.info("给" + userId + "号发送消息:" + message);
        } catch (IOException e) {
            LOGGER.error("userId:{},发送信息出错:{}", userId, e.getMessage());
            e.printStackTrace();
        }
    }

    /**
     * 群发消息
     */
    public static void batchSendMessage(String message){
        if (sseEmitterMap != null&&!sseEmitterMap.isEmpty()) {
            sseEmitterMap.forEach((k,v)->{
                try {
                    v.send(message, MediaType.APPLICATION_JSON);
                } catch (IOException e) {
                    LOGGER.error("userId:{},发送信息出错:{}",k,e.getMessage());
                    e.printStackTrace();
                }
            });
        }
    }

    public static void batchSendMessage(Set<Integer> userIds,String message){
        userIds.forEach(userId->sendMessage(userId,message));
    }

    /**
     * 移出用户
     */
    public static void removeUser(Integer userId){
        sseEmitterMap.remove(userId);
        count.getAndDecrement();
        LOGGER.info("remove user id:{}",userId);
        LOGGER.info("remain user id:"+sseEmitterMap.keySet());
    }

    public static List<Integer> getIds(){
        return new ArrayList<>(sseEmitterMap.keySet());
    }

    public static int getUserCount(){
        return count.intValue();
    }

    private static Runnable completionCallBack(Integer userId){
        return ()->{
          LOGGER.info("结束连接,{}",userId);
          removeUser(userId);
        };
    }

    private static Runnable timeoutCallBack(Integer userId){
        return ()->{
            LOGGER.info("连接超时,{}",userId);
            removeUser(userId);
        };
    }

    private static Consumer<Throwable> errorCallBack(Integer userId){
        return throwable -> {
            LOGGER.error("连接异常,{}",userId);
            removeUser(userId);
        };
    }
}
@RestController
@CrossOrigin(maxAge = 3600)
public class SseController {

    @RequestMapping(value = "/sse/connect/{id}",method = RequestMethod.GET)
    public SseEmitter connect(@PathVariable Integer id){
        SseEmitter sseEmitter = SseEmitterServer.connect(id);
        return sseEmitter;
    }

    /**
     * 向指定用户发送消息
     */
    @RequestMapping(value = "/sse/send/{id}", method = RequestMethod.GET)
    public EiInfo sendMsg(@PathVariable Integer id,@RequestParam("message") String message) {
        EiInfo eiInfo = new EiInfo();
        SseEmitterServer.sendMessage(id,message);
        eiInfo.sysSetMsg("向"+id+"号用户发送信息,"+message+",消息发送成功");
        return eiInfo;
    }

    /**
     * 向所有用户发送消息
     */
    @RequestMapping(value = "/sse/send/all", method = RequestMethod.GET)
    public EiInfo sendMsg2AllUser(@RequestParam("message") String message) {
        EiInfo eiInfo = new EiInfo();
        SseEmitterServer.batchSendMessage(message);
        eiInfo.sysSetMsg("向所有用户发送信息,"+message+",消息发送成功");
        return eiInfo;
    }

    /**
     * 关闭用户连接
     */
    @RequestMapping(value = "/sse/close/{id}", method = RequestMethod.GET)
    public EiInfo closeSse(@PathVariable Integer id) {
        EiInfo eiInfo = new EiInfo();
        SseEmitterServer.removeUser(id);
        eiInfo.sysSetMsg("关闭"+id+"号连接。当前连接用户有:"+SseEmitterServer.getIds());
        return eiInfo;
    }
}

SSE测试

连接服务端

创建三个用户,分别连接服务端
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

给指定用户发送消息

使用接口测试工具,只给1号用户发送消息
在这里插入图片描述

浏览器中,1号用户接收到消息,2号3号未接收到消息
在这里插入图片描述
在这里插入图片描述

群发消息

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

关闭连接

在这里插入图片描述

SSE连接超时

当SSE客户端连接后,如果长时间不断开,它会保持连接状态。SSE(Server-Sent Events)是一种基于HTTP的推送技术,允许服务器实时向客户端发送数据。当客户端连接到服务器的SSE端点时,它会创建一个EventSource对象,该对象将与服务器进行长期连接,并接收服务器发送的事件。只有在客户端手动关闭连接或连接发生错误时,才会断开SSE连接。

请注意,由于SSE是基于HTTP的技术,因此它可能受到浏览器或服务器的超时设置的影响。如果在长时间没有收到服务器的数据时,连接可能会断开。如果您希望在长时间不断开连接的情况下保持SSE连接,请确保服务器发送数据以保持连接活跃,或者调整相关超时设置。

在使用SSE的客户端连接中,如果长时间不断开连接,可能会出现以下情况:

  1. 连接超时。如果服务器端没有发送新的事件数据,而客户端也没有重新建立连接,可能会超过服务器的连接超时时间。这可能导致服务器关闭连接,客户端需要重新建立连接才能接收新的事件数据。
  2. 网络异常。长时间不断开连接可能会导致网络异常,例如连接中断、丢包等问题。在这种情况下,客户端可能需要重新建立连接,以恢复和服务器的通信。
    为了避免长时间不断开连接的问题,建议在合适的时机关闭连接。

在初始化SseEmitter对象时,需要指定超时时间。
SseEmitter sseEmitter = new SseEmitter(10006060L);
0L表示不超时,1000L表示1秒超时。假设设置为30s超时连接,在客户端连接服务端后开始计时,如果在这30s内,服务端有向该客户端发送数据,那么在30s时间到了之后,服务端会先断开客户端的连接然后重新连接客户端,并开始新一轮计时;如果在这30s内,服务端没有向客户端发送数据,那么30s后服务端会断开客户端的连接,不再重连。

更多推荐

爬虫入门基础-Selenium反爬

在网络时代,爬虫作为一种强大的数据采集工具,被广泛应用于各行各业。然而,许多网站为了防止被恶意爬取数据,采取了各种反爬虫机制。为了能够成功地绕过这些机制,Selenium成为了爬虫领域的一把利器。本文将为你介绍爬虫入门基础,并重点探讨如何使用Selenium应对反爬虫的挑战。一、爬虫入门基础1、定义:爬虫是一种模拟浏览

minikube搭建k8s

环境:centos7,docker18minikube搭建k8s说明minikube是最接近原生kubernetes的本地单机版集群,支持大部分kubernetes功能,用于学习和开发k8s。支持Linux、Windows、Mac官网安装文档安装前提一台物理机或者虚拟机,物理机CPU大于2核vcpu、2G内存、20G磁

Java堆(Java Heap)

对于Java应用程序来说,Java堆(JavaHeap)是虚拟机所管理的内存中最大的一块。Java堆是被所有线程共享的一块内存区域,在虚拟机启动时创建。此内存区域的唯一目的就是存放对象实例,Java世界里“几乎”所有的对象实例都在这里分配内存。在《Java虚拟机规范》中对Java堆的描述是:“所有的对象实例以及数组都应

ZMQ协议

一、ZMQ特点普通的socket是端对端的关系,ZMQ是N:M的关系,socket的连接需要显式地建立连接,销毁连接,选择协议(TCP/UDP)和错误处理,ZMQ屏蔽了这些细节,像是一个封装了的socket库,让网络编程变得更简单。ZMQ不光用于主机与主机之间的socket通信,还可以是线程和进程之间的通信。ZMQ提供

Zookeeper集群 + Kafka集群

ZookeeperZookeeper概述Zookeeper是一个开源的分布式的,为分布式框架提供协调服务的Apache项目Zookeeper工作机制Zookeeper从设计模式角度来理解:是一个,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已经在Zo

Vue路由与nodes的环境配置

一,Vue路由什么是Vue路由?Vue路由是Vue.js的一部分,是一个官方的Vue.js插件,用于管理单页面应用程序的路由。它允许您在应用程序中使用URL路径和参数来管理不同的页面视图,而无需刷新页面。使用Vue路由,您可以轻松地设置和管理客户端路由,包括嵌套和动态路由。它还提供了一些高级功能,例如路由守卫,用于在页

leetcode646. 最长数对链(java)

最长数对链题目描述贪心解法二动态规划dp题目描述难度-中等leetcode646.最长数对链(java)给你一个由n个数对组成的数对数组pairs,其中pairs[i]=[lefti,righti]且lefti<righti。现在,我们定义一种跟随关系,当且仅当b<c时,数对p2=[c,d]才可以跟在p1=[a,b]后

HTML 学习笔记(基础)

它是超文本标记语言,由一大堆约定俗成的标签组成,而其标签里一般又有一些属性值可以设置。W3C标准:网页主要三大部分结构:HTML表现:CSS行为:JavaScript<!DOCTYPEhtml><htmllang="zh-CN"><head><metacharset="UTF-8"><metaname="viewpor

配置 iSCSI 服务并实现客户端自动挂载块设备

文章目录前言1.iSCSI简介2.iSCSIServer端配置2.1.添加磁盘2.2.安装targetcli软件包2.3.创建块设备2.4.创建Target2.5.创建LUN2.6.创建ACL2.7.配置门户创建监听2.8.查看全部配置信息并保存退出2.9.启用Target服务3.iSCSIClient端配置3.1.安

C# 扫描并读取图片中的文字(.NET Core)

本文介绍如何通过C#程序来扫描并读取图片中的文字,这里以创建一个.NetCore程序为例。下面是具体步骤,供参考。程序测试环境:VisualStudio版本要求不低于2017图片扫描工具:Spire.OCRfor.NET图片格式:png(这里的图片格式支持JPG、PNG、GIF、BMP、TIFF等格式)扫描的图片文字:

HT for Web (Hightopo) 使用心得(2)- 2D 图纸、节点、连线 与基本动画

概括来说,用HTforWeb做可视化主要分为两部分,也就是2D和3D。这两部分需要单独创建。在它们被创建完成后,我们再把它们集成到一起。HTforWeb的2D部分主要是指ht.graph.GraphView(简称GraphView,也就是2D图纸)。所谓2D图纸其本质是一个canvas。我们可以在上面进行基本图形的绘制

热文推荐