概述
一.什么是消息推送
推送的场景比较多,比如有人关注我的公众号,这时我就会收到一条推送消息,以此来吸引我点击打开应用。
-
消息推送(push)通常是指网站的运营工作等人员,通过某种工具对用户当前网页或移动设备APP进行的主动消息推送。
-
消息推送一般又分为
web端消息推送
和移动端消息推送
。
-
上边的这种属于移动端消息推送,web端消息推送常见的诸如
站内信、未读邮件数量、监控报警数量等
,应用的也非常广泛。
-
如上图所示只要触发某个事件(主动分享了资源或者后台主动推送消息),web页面的通知小红点就会实时的+1就可以了。
-
-
通常在服务端会有
若干张消息推送表
,用来记录用户触发不同事件所推送不同类型的消息,前端主动查询(拉)
或者被动接收(推)用户所有未读的消息数
。- 因此消息推送无非是推
(push)
和拉(pull)
数据两种形式
CREATE TABLE `message_record` ( `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', `template_id` bigint unsigned NOT NULL COMMENT '消息模板ID', `type` int NOT NULL DEFAULT '1' COMMENT '推送渠道 1短信 2邮件 3微信4APP', `receiver` varchar(128) NOT NULL DEFAULT '' COMMENT '消息接收者(手机号,邮箱号,微信openid等)', `device_info` varchar(128) NOT NULL DEFAULT '' COMMENT 'APP推送终端设备信息', `content` varchar(1024) NOT NULL COMMENT '消息推送内容', `deleted` tinyint NOT NULL DEFAULT '0' COMMENT '逻辑删除标记:1删除; O未删除', `create_by` bigint unsigned NOT NULL COMMENT '创建人', `create_time` datetime NOT NULL COMMENT '创建时间', `update_by` bigint unsigned NOT NULL COMMENT '修改人', `update_time` datetime NOT NULL COMMENT '修改时间', PRIMARY KEY (`id`), KEY `idx_template_id` (`template_id`), KEY `idx receiver` (`receiver`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='消息推送记录表'
- 因此消息推送无非是推
二.服务端推送常用方式
1.短轮询(Polling)
- 客户端定时向服务端发起请求,服务端收到请求后立即返回,客户端再做渲染显示。
- 使用
JS定时器
间隔时间拉取服务端数据
- 使用
setInterval(() => {
//发起请求、处理响应
} , 1000);
- 确定:短轮询无论服务端是否发生数据变更,客户端都会进行请求,势必会对服务端造成很大压力,
浪费带宽和服务器资源
。
2.长轮询(Long Polling)
客户端向服务端发起请求,服务器端到请求后保持连接不断开
,直到数据有更新
才返回响应并关闭连接,客户端处理完响应信息后再向服务端发送新的请求。
- 原理:是servlet的
异步长连接请求
。即异步请求中在原始的请求返回的时并没有关闭连接
,关闭的只是处理请求的那个线程
(一般是回收的线程池里了),只有在异步请求全部处理完
之后才会关闭连接。 - 具体实现:如Spring的DeferredResult可以允许
容器线程快速释放占用的资源
,不阻塞请求线程,以此接受更多的请求提升系统的吞吐量,然后启动异步工作线程处理真正的业务逻辑
,处理完成调用DeferredResult.setResult(200)
提交响应结果。- 接口返回DeferredResult
,或者调用
setResult设值时不会返回,**当前Servlet容器线程会结束,由DeferredResult另起线程来进行结果处理并setResul,如果超时或设置setResult
,接口会立即返回
。
- 接口返回DeferredResult
实例1
要求:请求http://localhost:8080/get/requestId=1时,页面处于等待状态;当访问http://localhost:8080/set/requestId=1,前面的页面会返回"处理成功 1"。
@Controller
@RequestMapping(value = "/")
public class DeferredResultController {
private Map<String, DeferredResult<String>> deferredResultMap = new ConcurrentHashMap<>();;
/**
* 为了方便测试,简单模拟一个 多个请求用同一个requestId会出问题
*/
@ResponseBody
@GetMapping("/get")
public DeferredResult<String> get(@RequestParam String requestId,
@RequestParam(value = "timeout", required = false, defaultValue = "5000") Long timeout) {
System.out.println("start get");
//初始化延时对象,超时时间为5s
DeferredResult<String> deferredResult = new DeferredResult<>(timeout);
// 请求超时的回调函数
deferredResult.onTimeout(() -> {
//返回处理超时
deferredResult.setResult("处理超时");
//超时该处理任务
deferredResultMap.remove(requestId);
});
//如果不存在的requestId直接抛异常
Optional.ofNullable(deferredResultMap)
.filter(t -> !t.containsKey(requestId))
.orElseThrow(() -> new IllegalArgumentException(String.format("requestId=%s is existing", requestId)));
deferredResultMap.put(requestId,deferredResult);
System.out.println("end get");
return deferredResult;
}
/**
* 设置DeferredResult对象的result属性,模拟异步操作
*/
@ResponseBody
@GetMapping(value = "/set")
public String settingResult(@RequestParam String requestId) {
//--------------------这里相当于异步的操作方法 设置DeferredResult对象的setResult方法--------
if (deferredResultMap.containsKey(requestId)) {
DeferredResult<String> deferredResult = deferredResultMap.get(requestId);
deferredResult.setResult("处理成功:"+requestId);
deferredResultMap.remove(requestId);
}
return "Done";
}
}
实例2
要求:接口/test 接收请求后,立即将请求入队receiveQueue
,后台线程自旋
执行队列receiveQueue任务,任务完成后将结果入队resultQueue
,如果监听器线程监听resultQueue
,如果有任务结果,则将结果赋值给DeferredResult,返回结果响应。
定义Task,封装了DeferredResult对象和收到的消息对象,以及一个是否超时标记,用于任务完成后取出每个请求消息对应的DeferredResult对象,返回消息给客户端.
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Task<T> {
//延时返回对象
private DeferredResult<String> result;
//延时消息
private T message;
//是否超时
private Boolean isTimeout;
}
定义TaskQueue,用于管理队列及处理数据:
/**
* 模拟队列类
*/
@Component
public class TaskQueue {
/**
* 接收任务队列
*/
private BlockingQueue<Task<String>> receiveQueue = new LinkedBlockingDeque<>(5000);
/**
* 任务完成结果队列
*/
private BlockingQueue<Task<String>> resultQueue = new LinkedBlockingDeque<>(5000);
/**
* 初始化任务处理线程
*/
public TaskQueue() {
this.run();
}
/**
* 存入请求任务
*
* @param task task实体
* @throws InterruptedException
*/
public void put(Task<String> task) throws InterruptedException {
receiveQueue.put(task);
}
/**
* 获取任务完成结果
*
* @return
* @throws InterruptedException
*/
public Task<String> get() throws InterruptedException {
return resultQueue.take();
}
/**
* 处理任务
* 开启一个新线程,自旋的从接收队列中取出数据,然后处理若干秒后,将成功数据放入成功队列.
* ,如果任务超时标志isTimeout超时,可以中断该任务的进行,在正常的service中,可以替换为数据库回滚等操作.
*/
private void run() {
new Thread(() -> {
while (true) {
try {
//从接收队列中取出任务,处理,然后放入成功队列
Task<String> task = receiveQueue.take();
System.out.println("队列收到数据,处理中!");
Thread.sleep(1000);
task.setMessage("成功");
//TODO:如果超时了,中断该任务-此处应该加锁
if (task.getIsTimeout()) {
System.out.println("任务超时,处理线程中断该任务");
continue;
}
resultQueue.put(task);
System.out.println("队列处理完成!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
定义队列监听线程, 当spring容器加载完毕,开启新线程,自旋的从模拟队列的完成队列中获取数据,并使用ReferredResult返回
@Component
public class QueueResultListener implements ApplicationListener<ContextRefreshedEvent> {
@Autowired
TaskQueue taskQueue;
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
new Thread(() -> {
try {
Task<String> task = taskQueue.get();
task.getResult().setResult(task.getMessage());
System.out.println("监听器获取到结果:task=" + task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
实现Controller异步接口
@Controller
public class DeferredResultQueueController {
@Autowired
TaskQueue taskQueue;
@ResponseBody
@GetMapping("/test")
public DeferredResult<String> test(@RequestParam String requestId,
@RequestParam(value = "timeout", required = false, defaultValue = "5000") Long timeout) throws InterruptedException {
//新建延期返回对象并设置超时时间,优先级比configureAsyncSupport方法中默认配置中的高
System.out.println("start test");
//初始化延迟任务
DeferredResult<String> deferredResult = new DeferredResult<>(timeout);
//要执行的任务
Task<String> task = new Task<String>(deferredResult, "任务", false);
//设置超时后执行的任务,优先级比DeferredResultProcessingInterceptor拦截器中的高
deferredResult.onTimeout(() -> {
System.out.println("任务超时 id=" + requestId);
//TODO:告知该任务已经超时-此处应该加锁
task.setMessage("任务超时");
task.setIsTimeout(true);
});
//任务入队
taskQueue.put(task);
System.out.println("end test");
return deferredResult;
}
}
参考文章
Spring MVC3.2之后支持异步请求,能够在controller中返回一个Callable或者DeferredResult
。
- 高性能关键技术之—体验Spring MVC的异步模式(Callable、WebAsyncTask、DeferredResult) 基础使用篇
- 使用DeferredResult异步处理SpringMVC请求
3.MQTT
什么是 MQTT协议?
-
MQTT 全称(Message Queue Telemetry Transport):一种基于
发布/订阅(publish/subscribe)模式的轻量级通讯协议
,通过订阅相应的主题来获取消息,是物联网(Internet of Thing
)中的一个标准传输协议。- 该协议将消息的
发布者(publisher)
与订阅者(subscriber)
进行分离,因此可以在不可靠的网络环境中,为远程连接的设备提供可靠的消息服务,使用方式与传统的MQ有点类似。
- 该协议将消息的
-
TCP协议位于
传输层
,MQTT 协议位于应用层
,MQTT 协议构建于TCP/IP协议
上,也就是说只要支持TCP/IP协议栈的地方,都可以使用MQTT协议。
MQTT协议为什么在物联网(IOT)中如此受偏爱?而不是其它协议,比如我们更为熟悉的 HTTP协议呢?
-
首先HTTP协议它是一种
同步协议
,客户端请求后需要等待服务端的响应
。而在物联网(IOT)环境中,设备会很受制于环境影响
,比如带宽低、网络延迟高、网络通信不稳定等,显然异步消息协议
更为适合IOT应用程序。 -
HTTP是
单向
的,如果要获取消息客户端必须发起连接
,而在物联网(IOT)应用程序中,设备或传感器往往都是客户端
,这意味着它们无法被动地接收来自网络的命令。 通常需要将一条命令或者消息,发送到网络上的所有设备上
。HTTP要实现这样的功能不但很困难,而且成本极高。
springboot+rabbitmq实现智能家居实例详解
4.SSE
**SSE( Server-sent Events )**是 WebSocket 的一种轻量代替方案
,使用 HTTP 协议,在服务器和客户端之间打开一个单向通道
,只能服务器向客户端发送消息,服务端响应的不再是一次性的数据包,而是text/event-stream
类型的数据流信息,在有数据变更时从服务器流式传输
到客户端。
- 整体的实现思路有点类似于在线视频播放,视频流会连续不断的推送到浏览器,你也可以理解成,
客户端在完成一次用时很长(网络不畅)的下载
。
SSE与WebSocket作用相似,都可以建立服务端与浏览器之间的通信,实现服务端向客户端推送消息
,但还是有些许不同:
- SSE 是基于
HTTP
协议的,它们不需要特殊的协议或服务器实现即可工作;WebSocket需单独服务器来处理协议。 - SSE
单向通信
,只能由服务端向客户端单向通信
;webSocket全双工通信,即通信的双方可以同时发送和接收信息。 - SSE 实现简单开发成本低,无需引入其他组件;WebSocket传输数据需做二次解析,开发门槛高一些。
- SSE 默认支持
断线重连
;WebSocket则需要自己实现。 - SSE 只能
传输文本消息
,二进制数据需要经过编码
后传送;WebSocket默认支持传送二进制数据。
在 html5 的定义中,服务端 sse,一般需要遵循以下规范:
- Content-Type:
text/event-stream;charset=UTF-8
- Cache-Control: no-cache
- Connection:
keep-alive
SSE 如何保证数据完整性
- 客户端在每次接收到消息时,会把消息的 id 字段作为内部属性 Last-Event-ID 储存起来。
- SSE 默认支持断线重连机制,在连接断开时会 触发
EventSource
的 error 事件,同时自动重连。再次连接成功时 EventSource 会把 Last-Event-ID 属性作为请求头发送给服务器,这样服务器就可以根据这个 Last-Event-ID 作出相应的处理。- 这里需要注意的是,id 字段不是必须的,服务器有可能不会在消息中带上 id 字段,这样子
springboot集成SSE简约版
实例:客户端发送请求到服务端,服务端以流的形式不断向客户端推送数据示例,增加帅气值。
-
服务端代码(
注意响应头以及固定返回数据格式)
@Controller @RequestMapping(value = "/sse") public class SEEController { //响应头为text/event-stream;charset=UTF-8 @RequestMapping(value = "/get", produces = "text/event-stream;charset=UTF-8") public void push(HttpServletResponse response) { response.setContentType("text/event-stream"); response.setCharacterEncoding("utf-8"); int i = 0; while (true) { try { Thread.sleep(1000); PrintWriter pw = response.getWriter(); //注意返回数据必须以data:开头,"nn"结尾 pw.write("data:xdm帅气值加" + i + "nn"); pw.flush(); //检测异常时断开连接 if (pw.checkError()) { log.error("客户端断开连接"); return; } } catch (Exception e) { e.printStackTrace(); } i++; } } }
-
前端代码(重写message、open、error事件)
<html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>SSE Demo</title> </head> <body> <div id="msg_from_server">空白</div> <script type="text/javascript" src="../js/jquery.js"></script> <script type="text/javascript"> if (!!window.EventSource) { var source = new EventSource('/sse/get'); s = ''; //客户端收到服务器发来的数据 另一种写法:source.onmessage = function (event) {} source.addEventListener('message', function(e) { s += e.data + "<br/>" $("#msg_from_server").html(s); }); // 连接一旦建立,就会触发open事件 另一种写法:source.onopen = function (event) {} source.addEventListener('open', function(e) { console.log("连接打开."); }, false); // 如果发生通信错误(比如连接中断),就会触发error事件 另一种写法:source.onerror = function (event) {} source.addEventListener('error', function(e) { if (e.readyState == EventSource.CLOSED) { console.log("连接关闭"); } else { console.log(e.readyState); } }, false); } else { alert(4); console.log("没有sse"); } </script> </body> </html>
springboot集成SSE升级版
演示SSE的连接建立、接收数据和异常情况监听处理。
-
服务端
@Controller @RequestMapping(value = "/sse") @Slf4j public class SSEPlusController { private static Map<String, SseEmitter> cache = new ConcurrentHashMap<>(); String clientId; int sseId; @GetMapping("/create") public SseEmitter create(@RequestParam(name = "clientId", required = false) String clientId) { // 设置超时时间,0表示不过期。默认30000毫秒 //可以在客户端一直断网、直接关闭页面但未提醒后端的情况下,服务端在一定时间等待后自动关闭网络连接 SseEmitter sseEmitter = new SseEmitter(0L); // 是否需要给客户端推送ID if (Strings.isBlank(clientId)) { clientId = UUID.randomUUID().toString(); } this.clientId = clientId; cache.put(clientId, sseEmitter); log.info("sse连接,当前客户端:{}", clientId); return sseEmitter; } @Scheduled(cron = "0/3 * * * * ? ") public void pushMessage() { try { sseId++; SseEmitter sseEmitter = cache.get(clientId); sseEmitter.send( SseEmitter .event() .data("帅气值暴增" + sseId) .id("" + sseId) .reconnectTime(3000) ); } catch (Exception e) { log.error(e.getMessage()); sseId--; } } @GetMapping("/close") public void close(String clientId) { SseEmitter sseEmitter = cache.get(clientId); if (sseEmitter != null) { sseEmitter.complete(); cache.remove(clientId); } } }
复杂代码
/**
* SSE长链接
*/
@RestController
@RequestMapping("/sse")
public class SseEmitterController {
@Autowired
private SseEmitterService sseEmitterService;
/**
* 创建SSE长链接
*
* @param clientId 客户端唯一ID(如果为空,则由后端生成并返回给前端)
* @return org.springframework.web.servlet.mvc.method.annotation.SseEmitter
* @author re
* @date 2021/12/12
**/
@CrossOrigin //如果nginx做了跨域处理,此处可去掉
@GetMapping("/CreateSseConnect")
public SseEmitter createSseConnect(@RequestParam(name = "clientId", required = false) String clientId) {
return sseEmitterService.createSseConnect(clientId);
}
/**
* 关闭SSE连接
*
* @param clientId 客户端ID
* @author re
* @date 2021/12/13
**/
@GetMapping("/CloseSseConnect")
public Result closeSseConnect(String clientId) {
sseEmitterService.closeSseConnect(clientId);
return ResultGenerator.genSuccessResult(true);
}
}
@Service
public class SseEmitterServiceImpl implements SseEmitterService {
/**
* 容器,保存连接,用于输出返回
*/
private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();
@Override
public SseEmitter createSseConnect(String clientId) {
// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
SseEmitter sseEmitter = new SseEmitter(0L);
// 是否需要给客户端推送ID
if (StringUtils.isBlank(clientId)) {
clientId = IdUtil.simpleUUID();
}
// 注册回调
sseEmitter.onCompletion(completionCallBack(clientId));
sseCache.put(clientId, sseEmitter);
logger.info("创建新的sse连接,当前用户:{}", clientId);
try {
sseEmitter.send(SseEmitter.event().id(SseEmitterConstant.CLIENT_ID).data(clientId));
} catch (IOException e) {
logger.error("SseEmitterServiceImpl[createSseConnect]: 创建长链接异常,客户端ID:{}", clientId, e);
throw new BusinessException("创建连接异常!", e);
}
return sseEmitter;
}
@Override
public void closeSseConnect(String clientId) {
SseEmitter sseEmitter = sseCache.get(clientId);
if (sseEmitter != null) {
sseEmitter.complete();
removeUser(clientId);
}
}
// 根据客户端id获取SseEmitter对象
@Override
public SseEmitter getSseEmitterByClientId(String clientId) {
return sseCache.get(clientId);
}
// 推送消息到客户端,此处结合业务代码,业务中需要推送消息处调用即可向客户端主动推送消息
@Override
public void sendMsgToClient(List<SseEmitterResultVO> sseEmitterResultVOList) {
if (CollectionUtil.isEmpty(sseCache)) {
return;
}
for (Map.Entry<String, SseEmitter> entry : sseCache.entrySet()) {
sendMsgToClientByClientId(entry.getKey(), sseEmitterResultVOList, entry.getValue());
}
}
/**
* 推送消息到客户端
* 此处做了推送失败后,重试推送机制,可根据自己业务进行修改
*
* @param clientId 客户端ID
* @param sseEmitterResultVOList 推送信息,此处结合具体业务,定义自己的返回值即可
* @author re
* @date 2022/3/30
**/
private void sendMsgToClientByClientId(String clientId, List<SseEmitterResultVO> sseEmitterResultVOList, SseEmitter sseEmitter) {
if (sseEmitter == null) {
logger.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:客户端{}未创建长链接,失败消息:{}",
clientId, sseEmitterResultVOList.toString());
return;
}
SseEmitter.SseEventBuilder sendData = SseEmitter.event().id(SseEmitterConstant.TASK_RESULT).data(sseEmitterResultVOList, MediaType.APPLICATION_JSON);
try {
sseEmitter.send(sendData);
} catch (IOException e) {
// 推送消息失败,记录错误日志,进行重推
logger.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:{},尝试进行重推", sseEmitterResultVOList.toString(), e);
boolean isSuccess = true;
// 推送消息失败后,每隔10s推送一次,推送5次
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(10000);
sseEmitter = sseCache.get(clientId);
if (sseEmitter == null) {
logger.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败,未创建长链接", clientId, i + 1);
continue;
}
sseEmitter.send(sendData);
} catch (Exception ex) {
logger.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败", clientId, i + 1, ex);
continue;
}
logger.info("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推成功,{}", clientId, i + 1, sseEmitterResultVOList.toString());
return;
}
}
}
/**
* 长链接完成后回调接口(即关闭连接时调用)
*
* @param clientId 客户端ID
* @return java.lang.Runnable
* @author re
* @date 2021/12/14
**/
private Runnable completionCallBack(String clientId) {
return () -> {
logger.info("结束连接:{}", clientId);
removeUser(clientId);
};
}
/**
* 连接超时时调用
*
* @param clientId 客户端ID
* @return java.lang.Runnable
* @author re
* @date 2021/12/14
**/
private Runnable timeoutCallBack(String clientId) {
return () -> {
logger.info("连接超时:{}", clientId);
removeUser(clientId);
};
}
/**
* 推送消息异常时,回调方法
*
* @param clientId 客户端ID
* @return java.util.function.Consumer<java.lang.Throwable>
**/
private Consumer<Throwable> errorCallBack(String clientId) {
return throwable -> {
logger.error("SseEmitterServiceImpl[errorCallBack]:连接异常,客户端ID:{}", clientId);
// 推送消息失败后,每隔10s推送一次,推送5次
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(10000);
SseEmitter sseEmitter = sseCache.get(clientId);
if (sseEmitter == null) {
logger.error("SseEmitterServiceImpl[errorCallBack]:第{}次消息重推失败,未获取到 {} 对应的长链接", i + 1, clientId);
continue;
}
sseEmitter.send("失败后重新推送");
} catch (Exception e) {
e.printStackTrace();
}
}
};
}
/**
* 移除用户连接
* @param clientId 客户端ID
* @author re
**/
private void removeUser(String clientId) {
sseCache.remove(clientId);
logger.info("SseEmitterServiceImpl[removeUser]:移除用户:{}", clientId);
}
当请求超过设置的超时时间,会抛出AsyncRequestTimeoutException异常
,这里直接用@ControllerAdvice全局捕获统一返回即可,前端获取约定好的状态码后再次发起长轮询请求,如此往复调用。
@ControllerAdvice
public class AsyncRequestTimeoutHandler {
@ResponseStatus(HttpStatus.NOT_MODIFIED)
@ResponseBody
@ExceptionHandler(AsyncRequestTimeoutException.class)
public String asyncRequestTimeoutHandler(AsyncRequestTimeoutException e) {
System.out.println("异步请求超时");
return "304";
}
}
SseEmitter.event()
用来得到一个记录数据的容器。
.data("帅气值暴增" + sseId)
发送给客户端的数据。
.id("" + sseId)
记录发送数据的标识,服务端可以通过HttpServletRequest的请求头中拿到这个id,判断是否中间有误漏发数据。
.reconnectTime(3000)
定义在网络连接断开后,客户端向后端发起重连的时间间隔(以毫秒为单位)。
-
客户端:
注:若浏览器不兼容在页面引入evensource.js。<script src=/eventsource-polyfill.js></script>
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title> Springboot集成SSE升级版</title> </head> <script> let source = null; const clientId = new Date().getTime(); if (!!window.EventSource) { source = new EventSource('/sse/create?clientId=' + clientId); //建立连接 source.onopen = function (event) { setMessageInnerHTML("建立连接" + event); } //接收数据 source.onmessage = function (event) { setMessageInnerHTML(event.data); } //错误监听 source.onerror = function (event) { if (event.readyState === EventSource.CLOSED) { setMessageInnerHTML("连接关闭"); } else { console.log(event); } } } else { setMessageInnerHTML("浏览器不支持SSE"); } // 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据 window.onbeforeunload = function () { close(); }; // 关闭Sse连接 function close() { source.close(); const httpRequest = new XMLHttpRequest(); httpRequest.open('GET', '/sse/close/?clientId=' + clientId, true); httpRequest.send(); console.log("close"); } // 显示消息 function setMessageInnerHTML(innerHTML) { document.getElementById('text').innerHTML += innerHTML + '<br/>'; } </script> <body> <button onclick="close()">关闭连接</button> <div id="text"></div> </body> </html>
SSE常见问题
-
如果项目中使用nginx对后端服务做了代理,nginx代理转发后,默认会在1min的时候断掉长链接,SSE需要设置自己的长链接时间,则需要在nginx中进行配置;
在反向代理的location块中加入如下配置 proxy_set_header Host $http_host; ##proxy_set_header用来重定义发往后端服务器的请求头 proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; proxy_buffering off; proxy_http_version 1.1; proxy_read_timeout 600s; ##设置SSE长链接保持时间为 600s
-
前端报错:EventSource’s response has a MIME type (“application/json”) that is not “text/event-stream”. Aborting the connection
前端在创建SSE长链接时,完整的请求(包括参数和参数值)都必须放在new EventSource(完整请求)中; -
创建长链接时,接口状态一直处于pending,检查后端nginx是否做相应配置;
-
推送消息失败:检查客户端创建链接时的id,和推送消息时的id是否一致;
5.websocket
特点
- WebSocket是html5出现的一种在·
TCP连接
上进行全双工通信
的协议 - 浏览器和服务器仅需
一次握手
,就可以建立持久性的连接
,并进行双向数据传输
。 - WebSocket目前支持ws和wss两种模式,对应HTTP和HTTPS。
websocket运用场景:
- 即时通讯:多媒体聊天,你可以使用该技术开个聊天室,聊个火热。可以单独2人聊个畅快。
- 互动游戏:现在多人游戏越来越火热,那么多人游戏必须是时时的,不考虑其他因素,只是时效性方面,也可以用该技术做多人游戏。
- 协同合作:开发人员会有git,svn等代码管理工具,但还是会有冲突。用此技术开发一个文档协同编辑工具,使每个人的编辑内容都时时同步,将不会有冲突发生。
- 动态数据表报:类似通知变更,如有需求,可以开发一个时时的数据报表,使用此技术,服务端数据发生变化,可在表报上立刻显示出来。如,电商平台的交易数据,每时每刻都在变化着,可以时时监控。
- 实时工具:如导航,实时查询工具等也可使用。
支持WebSocket的主流浏览器如下:
- Chrome
- Firefox
IE >= 10
- Sarafi >= 6
- Android >= 4.4
- iOS >= 8
5.1.原生WebSocket-客户端的简单示例
var ws = new WebSocket("wss://echo.websocket.org");
ws.onopen = function(evt) {
console.log("Connection open ...");
ws.send("Hello WebSockets!");
};
ws.onmessage = function(evt) {
console.log( "Received Message: " + evt.data);
ws.close();
};
ws.onclose = function(evt) {
console.log("Connection closed.");
};
5.2.原生WebSocket-客户端的 API
1.构造函数
- WebSocket 对象作为一个构造函数,用于新建 WebSocket 实例。
var ws = new WebSocket('ws://localhost:8080');
- 执行上面语句之后,客户端就会与服务器进行连接。
实例对象的所有属性和方法清单,参见这里。
2.属性
webSocket.readyState
- 实例对象的readyState属性返回实例对象的当前状态,共有四种。
CONNECTING:值为0,表示连接尚未建立
OPEN:值为1,表示连接成功,可以通信了。
CLOSING:值为2,表示连接正在关闭。
CLOSED:值为3,表示连接已经关闭,或者打开连接失败。
webSocket.bufferedAmount
- 实例对象的bufferedAmount属性,表示还有多少字节的二进制数据没有发送出去。它可以用来判断发送是否结束。
var data = new ArrayBuffer(10000000);
socket.send(data);
if (socket.bufferedAmount === 0) {
// 发送完毕
} else {
// 发送还没结束
}
3.事件
webSocket.onopen
- 实例对象的onopen属性,用于指定连接成功后的回调函数。
ws.onopen = function () {
ws.send('Hello Server!');
}
- 如果要指定多个回调函数,可以使用addEventListener方法。
ws.addEventListener('open', function (event) {
ws.send('Hello Server!');
});
webSocket.onclose
- 实例对象的onclose属性,用于指定连接关闭后的回调函数。
ws.onclose = function(event) {
var code = event.code;
var reason = event.reason;
var wasClean = event.wasClean;
// handle close event
};
ws.addEventListener("close", function(event) {
var code = event.code;
var reason = event.reason;
var wasClean = event.wasClean;
// handle close event
webSocket.onmessage
- 实例对象的onmessage属性,用于指定收到服务器数据后的回调函数。
ws.onmessage = function(event) {
var data = event.data;
// 处理数据
};
ws.addEventListener("message", function(event) {
var data = event.data;
// 处理数据
});
- 注意,服务器数据可能是文本,也可能是二进制数据(blob对象或Arraybuffer对象)。
ws.onmessage = function(event){
if(typeof event.data === String) {
console.log("Received data string");
}
if(event.data instanceof ArrayBuffer){
var buffer = event.data;
console.log("Received arraybuffer");
}
}
- 除了动态判断收到的数据类型,也可以使用binaryType属性,显式指定收到的二进制数据类型。
// 收到的是 blob 数据
ws.binaryType = "blob";
ws.onmessage = function(e) {
console.log(e.data.size);
};
// 收到的是 ArrayBuffer 数据
ws.binaryType = "arraybuffer";
ws.onmessage = function(e) {
console.log(e.data.byteLength);
};
webSocket.onerror
- 实例对象的onerror属性,用于指定报错时的回调函数。
socket.onerror = function(event) {
// handle error event
};
socket.addEventListener("error", function(event) {
// handle error event
});
4.方法
webSocket.send()
- 实例对象的send()方法用于向服务器发送数据。
发送文本的例子。
ws.send('your message');
发送 Blob 对象的例子。
var file = document
.querySelector('input[type="file"]')
.files[0];
ws.send(file);
发送 ArrayBuffer 对象的例子。
// Sending canvas ImageData as ArrayBuffer
var img = canvas_context.getImageData(0, 0, 400, 320);
var binary = new Uint8Array(img.data.length);
for (var i = 0; i < img.data.length; i++) {
binary[i] = img.data[i];
}
ws.send(binary.buffer);
webSocket.close()
- 关闭连接
6.具体实现
常用的 Node 实现
- WebSockets
- Socket.IO
- WebSocket-Node
常用的 Java实现
- 使用tomcat的
websocket
实现- Tomcat的方式需要
tomcat 7.x
,Java7的支持。
- Tomcat的方式需要
- 使用spring的websocket
- spring与websocket整合需要
spring 4.x
,并且使用了socketjs
,对不支持websocket的浏览器
可以模拟websocket使用
- spring与websocket整合需要
Tomcat实现websocket
- 使用这种方式
无需任何配置
,只需服务端一个处理类
服务端
使用@ServerEndpoint
标注当前类为一个websocket服务器
,客户端可以通过ws://localhost:8088/webSocketByTomcat/10086
来连接到WebSocket服务器端。
@ServerEndpoint("/webSocketByTomcat/{username}")
public class WebSocketServer {
//在线人数
private static int onlineCount = 0;
//存储会话
private static Map<String, WebSocketServer> clients = new ConcurrentHashMap<>();
//当前会话
private Session session;
//当前用户
private String username;
//建立连接
@OnOpen
public void onOpen(@PathParam("username") String username, Session session) throws IOException {
this.username = username;
this.session = session;
//自增在线人数
addOnlineCount();
//存储当前会话
clients.put(username, this);
System.out.println("已连接");
}
//连接关闭
@OnClose
public void onClose() throws IOException {
//移除当前会话
clients.remove(username);
//自减在线人数
subOnlineCount();
}
//发送消息客户客户端
@OnMessage
public void onMessage(String message) throws IOException {
JSONObject jsonTo = JSONObject.fromObject(message);
//单独发
if (!jsonTo.get("To").equals("All")){
sendMessageTo("给一个人", jsonTo.get("To").toString());
}
//群发
else{
sendMessageAll("给所有人");
}
}
//连接失败
@OnError
public void onError(Session session, Throwable error) {
error.printStackTrace();
}
//发送消息给指定客户端
public void sendMessageTo(String message, String to) throws IOException {
// session.getBasicRemote().sendText(message);
//session.getAsyncRemote().sendText(message);
for (WebSocketServer item : clients.values()) {
if (item.username.equals(to) ) {
item.session.getAsyncRemote().sendText(message);
}
}
}
//发送消息给所有客户端
public void sendMessageAll(String message) throws IOException {
for (WebSocketServer item : clients.values()) {
item.session.getAsyncRemote().sendText(message);
}
}
//获取在线人数
public static synchronized int getOnlineCount() {
return onlineCount;
}
//自增在线人数
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
//自减在线人数
public static synchronized void subOnlineCount() { WebSocketServer.onlineCount--; }
//获取所有客户端
public static synchronized Map<String, WebSocketServer> getClients() {
return clients;
}
}
前端
客户端
- 前端初始化WebSocket连接,并监听连接状态,接收服务端数据或向服务端发送数据。
- 注意导入
sockjs
时要使用地址全称,并且连接使用的是http而不是websocket的ws
<%@ page language="java" import="java.util.*" pageEncoding="utf-8"%>
<%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c"%>
<%@ taglib uri="http://java.sun.com/jsp/jstl/fmt" prefix="fmt"%>
<c:set var="ctx" value="${pageContext.request.contextPath}" />
<c:set var="ctxpath"
value="${pageContext.request.scheme}${'://'}${pageContext.request.serverName}${':'}${pageContext.request.serverPort}${pageContext.request.contextPath}" />
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta charset=UTF-8">
<title>登录测试</title>
</head>
<body>
<h2>Hello World!</h2>
<div>
<span>sessionId:</span>
<%
HttpSession s= request.getSession();
out.println(s.getId());
%>
</div>
<input id="sessionId" type="hidden" value="<%=session.getId() %>" />
<input id="text" type="text" />
<button onclick="send()">发送消息</button>
<hr />
<button onclick="closeWebSocket()">关闭WebSocket连接</button>
<hr />
<div id="message"></div>
</body>
<script type="text/javascript" src="http://localhost:8088/static/js/sockjs-0.3.min.js"></script>
<script type="text/javascript">
//初始化websocket连接
var websocket = null;
if('WebSocket' in window) {
websocket = new WebSocket("ws://localhost:8088/websocket/webSocketByTomcat/"+document.getElementById('sessionId').value);
} else if('MozWebSocket' in window) {
websocket = new MozWebSocket("ws://localhost:8088/websocket/webSocketByTomcat/"+document.getElementById('sessionId').value);
} else {
websocket = new SockJS("localhost:8088/websocket/webSocketByTomcat/"+document.getElementById('sessionId').value);
}
// 获取连接状态
console.log('ws连接状态:' + ws.readyState);
//连接发生错误的
websocket.onerror = function () {
setMessageInnerHTML("WebSocket连接发生错误");
};
//连接成功
websocket.onopen = function () {
setMessageInnerHTML("WebSocket连接成功");
}
//接收到服务端消息
websocket.onmessage = function (event) {
setMessageInnerHTML(event.data);
}
//连接关闭
websocket.onclose = function () {
setMessageInnerHTML("WebSocket连接关闭");
}
//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function () {
closeWebSocket();
}
//将消息显示在网页上
function setMessageInnerHTML(innerHTML) {
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}
//关闭WebSocket连接
function closeWebSocket() {
websocket.close();
}
//发送消息
function send() {
var message = document.getElementById('text').value;
websocket.send(message);
}
</script>
</html>
websocket.send(“发送消息”)
,会触发服务端的onMessage()方法
- 连接建立成功时调用send(),可以在服务器端
onOpen()方法
,接收到消息。 关闭websocket时
,触发服务器端onclose()方法
,此时也可以发送消息,但是不能发送给自己,因为自己的已经关闭了连接,但是可以发送给其他人
。
SpringBoot整合websocket
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
具体实现
- 继承TextWebSocketHandler类实现WebSocketHandler 接口进行消息处理,如是发给一个人,还是发给所有人,以及
前端连接时触发
的一些事件
服务端
/**
* WebSocket server
*/
@Service
@Slf4j
public class CustomWebSocketHandler extends TextWebSocketHandler implements WebSocketHandler {
// 在线用户列表
private static final Map<String, WebSocketSession> clients = new HashMap<>();
// 用户标识
private static final String CLIENT_ID = "mchNo";
/**
* 连接成功时候,onopen方法()
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
log.info("成功建立websocket-spring连接");
String clientId = getClientId(session);
if (StringUtils.isNotEmpty(clientId)) {
//存储会话
clients.put(clientId, session);
session.sendMessage(new TextMessage("成功建立websocket-spring连接"));
log.info("用户标识:{},Session:{}", clientId, session.toString());
}
}
/**
* 调用websocket.send()时候,会调用该方法
*/
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) {
log.info("收到客户端消息:{}", message.getPayload());
JSONObject msgJson = JSONObject.parseObject(message.getPayload());
//接受标识
String to = msgJson.getString("to");
//接受消息
String msg = msgJson.getString("msg");
WebSocketMessage<?> webSocketMessageServer = new TextMessage("server:" + message);
try {
session.sendMessage(webSocketMessageServer);
//广播到所有在线用户
if ("all".equals(to.toLowerCase())) {
sendMessageToAllUsers(new TextMessage(getClientId(session) + ":" + msg));
}
//单独发送
else {
sendMessageToUser(to, new TextMessage(getClientId(session) + ":" + msg));
}
} catch (IOException e) {
log.info("handleTextMessage method error:{}", e);
}
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
if (session.isOpen()) {
session.close();
}
log.info("连接出错");
clients.remove(getClientId(session));
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
log.info("连接已关闭:" + status);
clients.remove(getClientId(session));
}
@Override
public boolean supportsPartialMessages() {
return false;
}
public void sendMessage(String jsonData) {
log.info("收到客户端消息sendMessage:{}", jsonData);
JSONObject msgJson = JSONObject.parseObject(jsonData);
String clientId = StringUtils.isEmpty(msgJson.getString(CLIENT_ID)) ? "陌生人" : msgJson.getString(CLIENT_ID);
String to = msgJson.getString("to");
String msg = msgJson.getString("msg");
if ("all".equals(to.toLowerCase())) {
sendMessageToAllUsers(new TextMessage(clientId + ":" + msg));
}
else {
sendMessageToUser(to, new TextMessage(clientId + ":" + msg));
}
}
/**
* 发送信息给指定用户
*/
public boolean sendMessageToUser(String clientId, TextMessage message) {
if (clients.get(clientId) == null) {
return false;
}
WebSocketSession session = clients.get(clientId);
log.info("sendMessage:{} ,msg:{}", session, message.getPayload());
if (!session.isOpen()) {
log.info("客户端:{},已断开连接,发送消息失败", clientId);
return false;
}
try {
session.sendMessage(message);
} catch (IOException e) {
log.info("sendMessageToUser method error:{}", e);
return false;
}
return true;
}
/**
* 广播信息-给所有在线用户发送消息
*/
public boolean sendMessageToAllUsers(TextMessage message) {
boolean allSendSuccess = true;
Set<String> clientSet = clients.keySet();
WebSocketSession session = null;
for (String clientId : clientSet) {
try {
session = clients.get(clientId);
if (session.isOpen()) {
session.sendMessage(message);
}
else {
log.info("客户端:{},已断开连接,发送消息失败", clientId);
}
} catch (IOException e) {
log.info("sendMessageToAllUsers method error:{}", e);
allSendSuccess = false;
}
}
return allSendSuccess;
}
/**
* 获取用户标识
*/
private String getClientId(WebSocketSession session) {
try {
return session.getAttributes().get(CLIENT_ID).toString();
} catch (Exception e) {
return null;
}
}
}
- 如果把websocketSession和httpsession对应起来就能
根据当前不同的session
,定向对websocketSession
进行数据返回- spring中有一个拦截器接口,HandshakeInterceptor,通过实现该接口来拦截握手过程,向其中添加属性
/**
* WebSocket握手时的拦截器
*/
@Slf4j
public class CustomWebSocketInterceptor implements HandshakeInterceptor {
/**
* 关联HeepSession和WebSocketSession,
* beforeHandShake方法中的Map参数 就是对应websocketSession里的属性
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler handler, Map<String, Object> map) throws Exception {
if (request instanceof ServletServerHttpRequest) {
log.info("*****beforeHandshake******");
HttpServletRequest httpServletRequest = ((ServletServerHttpRequest) request).getServletRequest();
HttpSession session = httpServletRequest.getSession(true);
log.info("clientId:{}", httpServletRequest.getParameter("clientId"));
if (session != null) {
map.put("sessionId",session.getId());
map.put("clientId", httpServletRequest.getParameter("clientId"));
}
}
return true;
}
@Override
public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
log.info("******afterHandshake******");
}
}
配置类注入handler
/**
* websocket的配置类
*/
@Configuration
@EnableWebSocket
public class CustomWebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(customWebSocketHandler(), "/webSocketBySpring/customWebSocketHandler")
.addInterceptors(new CustomWebSocketInterceptor())
.setAllowedOrigins("*");
registry.addHandler(customWebSocketHandler(), "/sockjs/webSocketBySpring/customWebSocketHandler")
.addInterceptors(new CustomWebSocketInterceptor())
.setAllowedOrigins("*")
.withSockJS();
}
@Bean
public WebSocketHandler customWebSocketHandler() {
return new CustomWebSocketHandler();
}
}
setAllowedOrigins("*")
一定要加上,不然只有访问localhost,其他的不予许访问- 经查阅官方文档spring-websocket 4.1.5版本前默认支持跨域访问,之后的版本默认不支持跨域,需要设置
使用withSockJS()的原因:
- 一些浏览器中缺少对
WebSocket
的支持,因此,回退选项是必要的,而Spring框架提供了基于SockJS
协议的透明的回退选项。SockJS
的一大好处在于提供了浏览器兼容性
。·优先使用原生WebSocket,如果在不支持websocket的浏览器中,会自动降为轮询
的方式·。
如果代码中添加了withSockJS()
如下,服务器也会自动降级为轮询
。
registry.addEndpoint("/coordination").withSockJS();
前端
- 同上《Tomcat实现websocket》jsp代码,替换webSocket的请求路径即可
最后
以上就是着急猫咪为你收集整理的【JavaWeb】小白也能看懂的服务器推送技术(WebSocket和SSE)一.什么是消息推送二.服务端推送常用方式的全部内容,希望文章能够帮你解决【JavaWeb】小白也能看懂的服务器推送技术(WebSocket和SSE)一.什么是消息推送二.服务端推送常用方式所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复