springboot学习(五十八) springboot中使用SseEmitter推送消息
服务端常用推送技术有:
1、客户端轮询:ajax定时拉取
2、服务端主动推送:WebSocket。全双工的,本质上是一个额外的tcp连接,建立和关闭时握手使用http协议,其他数据传输不使用http协议,更加复杂一些,适用于需要进行复杂双向数据通讯的场景。
3、服务端主动推送:SSE (Server Send Event)。html5新标准,用来从服务端实时推送数据到浏览器端,
直接建立在当前http连接上,本质上是保持一个http长连接,轻量协议简单的服务器数据推送的场景,使用服务器推送事件, SSE技术是基于单工通信模式,只是单纯的客户端向服务端发送请求,服务端不会主动发送给客户端。服务端采取的策略是抓住这个请求不放,等数据更新的时候才返回给客户端,当客户端接收到消息后,再向服务端发送请求,周而复始。
这篇文章介绍Springboot中使用SseEmitter实现消息推送。
1、服务端:
package com.iscas.biz.test.sse;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 测试SSE推送消息
*
* @author zhuquanwen
* @vesion 1.0
* @date 2021/9/11 21:01
* @since jdk1.8
*/
@RestController
@RequestMapping(path = "sse/test")
public class SseControllerTest {
private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();
@GetMapping(path = "subscribe", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public SseEmitter push(String id) throws IOException {
// 超时时间设置为3s,用于演示客户端自动重连
SseEmitter sseEmitter = new SseEmitter(30000L);
// 设置前端的重试时间为1s
sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("连接成功"));
sseCache.put(id, sseEmitter);
System.out.println("add " + id);
sseEmitter.onTimeout(() -> {
System.out.println(id + "超时");
sseCache.remove(id);
});
sseEmitter.onCompletion(() -> System.out.println("完成!!!"));
return sseEmitter;
}
@GetMapping(path = "push")
public String push(String id, String content) throws IOException {
SseEmitter sseEmitter = sseCache.get(id);
if (sseEmitter != null) {
sseEmitter.send(SseEmitter.event().name("msg").data("后端发送消息:" + content));
}
return "over";
}
@GetMapping(path = "over")
public String over(String id) {
SseEmitter sseEmitter = sseCache.get(id);
if (sseEmitter != null) {
sseEmitter.complete();
sseCache.remove(id);
}
return "over";
}
}
其中subscribe
是开启并订阅消息,push
是模拟触发后端推送,over
是模拟断开sse连接。
2、客户端:
<!doctype html>
<html lang="en">
<head>
<title>Sse测试</title>
<meta charset="utf-8"/>
</head>
<body>
<div>sse测试</div>
<div id="result"></div>
</body>
</html>
<script>
var source = new EventSource('http://localhost:7901/demo/sse/test/subscribe?id=qwe');
//source.onmessage = function (event) {
// text = document.getElementById('result').innerText;
// text += '\n' + event.data;
// document.getElementById('result').innerText = text;
//};
source.addEventListener("msg", function(e) {
text = document.getElementById('result').innerText;
text += '\n' + e.data;
console.log(e);
document.getElementById('result').innerText = text;
//source.close();
})
source.onerror = function(e) {
if (e.readyState == EventSource.CLOSE) {
text = document.getElementById('result').innerText;
text += '\n' + "连接关闭";
}
}
<!-- 添加一个开启回调 -->
source.onopen = function (event) {
text = document.getElementById('result').innerText;
text += '\n 开启: ';
console.log(event);
document.getElementById('result').innerText = text;
};
</script>
3、测试