feat(printer): 实现打印机任务WebSocket实时推送功能

- 新增PrinterTaskPushService接口及实现,负责任务推送逻辑
- 在PrinterServiceImpl中集成WebSocket推送,在任务创建和审核通过时主动推送
- 新增WebSocket配置类和处理器,支持打印机通过WebSocket连接接收任务
- 实现连接管理器,维护打印机在线状态并支持心跳保活
- 添加相关模型类如WsMessage、WsMessageType等,规范通信协议
- 在PrinterMapper中增加查询待处理任务列表的方法
- 完善异常处理和日志记录,确保推送可靠性
This commit is contained in:
2025-12-01 09:59:21 +08:00
parent 1de760fc87
commit d590286b13
13 changed files with 710 additions and 1 deletions

View File

@@ -0,0 +1,299 @@
package com.ycwl.basic.websocket.handler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ycwl.basic.mapper.PrinterMapper;
import com.ycwl.basic.model.pc.printer.entity.PrinterEntity;
import com.ycwl.basic.model.printer.resp.PrintTaskResp;
import com.ycwl.basic.service.printer.PrinterTaskPushService;
import com.ycwl.basic.websocket.manager.PrinterConnectionManager;
import com.ycwl.basic.websocket.model.ErrorData;
import com.ycwl.basic.websocket.model.WsMessage;
import com.ycwl.basic.websocket.model.WsMessageType;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.PongMessage;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 打印机 WebSocket 处理器
*/
@Slf4j
@Component
public class PrinterWebSocketHandler extends TextWebSocketHandler {
@Autowired
private PrinterMapper printerMapper;
@Autowired
private PrinterConnectionManager connectionManager;
@Autowired
private PrinterTaskPushService taskPushService;
@Autowired
private ObjectMapper objectMapper;
/**
* sessionId → printerId 映射(用于连接关闭时清理)
*/
private final Map<String, Integer> sessionToPrinter = new ConcurrentHashMap<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) {
try {
// 1. 从 URL 参数获取 accessKey
String query = session.getUri().getQuery();
Map<String, String> params = parseQueryString(query);
String accessKey = params.get("accessKey");
if (accessKey == null || accessKey.trim().isEmpty()) {
log.warn("缺少 accessKey 参数: sessionId={}", session.getId());
session.close(CloseStatus.POLICY_VIOLATION);
return;
}
// 2. 验证 accessKey
PrinterEntity printer = printerMapper.findByAccessKey(accessKey);
if (printer == null) {
log.warn("打印机不存在: accessKey={}, sessionId={}", accessKey, session.getId());
session.close(CloseStatus.NOT_ACCEPTABLE);
return;
}
if (printer.getStatus() != 1) {
log.warn("打印机已停用: printerId={}, sessionId={}", printer.getId(), session.getId());
session.close(CloseStatus.NOT_ACCEPTABLE);
return;
}
// 3. 认证成功,添加连接
Integer printerId = printer.getId();
connectionManager.addConnection(printerId, session);
sessionToPrinter.put(session.getId(), printerId);
log.info("打印机连接成功: printerId={}, sessionId={}, printerName={}",
printerId, session.getId(), printer.getName());
// 4. 推送所有待处理任务
pushPendingTasks(printerId);
} catch (Exception e) {
log.error("连接认证失败: sessionId={}", session.getId(), e);
closeSession(session);
}
}
/**
* 推送待处理任务
*
* @param printerId 打印机ID
*/
private void pushPendingTasks(Integer printerId) {
try {
List<PrintTaskResp> tasks = taskPushService.getPendingTasksByPrinterId(printerId);
for (PrintTaskResp task : tasks) {
taskPushService.pushTaskToPrinter(printerId, task.getId());
}
if (!tasks.isEmpty()) {
log.info("已推送待处理任务: printerId={}, count={}", printerId, tasks.size());
}
} catch (Exception e) {
log.error("推送待处理任务失败: printerId={}", printerId, e);
}
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
try {
String payload = message.getPayload();
WsMessage<?> wsMessage = objectMapper.readValue(payload, WsMessage.class);
switch (wsMessage.getType()) {
case TASK_ACK:
handleTaskAck(session, wsMessage);
break;
case TASK_SUCCESS:
handleTaskSuccess(session, wsMessage);
break;
case TASK_FAIL:
handleTaskFail(session, wsMessage);
break;
default:
log.warn("未知消息类型: type={}, sessionId={}", wsMessage.getType(), session.getId());
}
} catch (Exception e) {
log.error("处理 WebSocket 消息失败: sessionId={}", session.getId(), e);
sendError(session, 500, "消息处理失败: " + e.getMessage());
}
}
/**
* 处理 pong 消息(WebSocket 原生保活机制)
* 客户端发送 ping 帧后,服务器自动响应 pong 帧,并触发此方法
*
* @param session WebSocket会话
* @param message Pong消息
*/
@Override
protected void handlePongMessage(WebSocketSession session, PongMessage message) {
Integer printerId = sessionToPrinter.get(session.getId());
if (printerId != null) {
connectionManager.setOnline(printerId); // 刷新 Redis 在线状态
log.debug("收到 pong: printerId={}, sessionId={}", printerId, session.getId());
}
}
/**
* 处理任务确认
*
* @param session WebSocket会话
* @param wsMessage 消息
*/
private void handleTaskAck(WebSocketSession session, WsMessage<?> wsMessage) {
Integer printerId = sessionToPrinter.get(session.getId());
if (printerId == null) {
return;
}
Map<String, Object> dataMap = (Map<String, Object>) wsMessage.getData();
Integer taskId = (Integer) dataMap.get("taskId");
log.info("收到任务确认: printerId={}, taskId={}, sessionId={}", printerId, taskId, session.getId());
}
/**
* 处理任务成功
*
* @param session WebSocket会话
* @param wsMessage 消息
*/
private void handleTaskSuccess(WebSocketSession session, WsMessage<?> wsMessage) {
Integer printerId = sessionToPrinter.get(session.getId());
if (printerId == null) {
log.warn("未认证的会话发送任务成功消息: sessionId={}", session.getId());
return;
}
Map<String, Object> dataMap = (Map<String, Object>) wsMessage.getData();
Integer taskId = (Integer) dataMap.get("taskId");
log.info("任务完成成功: printerId={}, taskId={}", printerId, taskId);
printerMapper.updateTaskStatus(taskId, 1); // 状态改为成功
}
/**
* 处理任务失败
*
* @param session WebSocket会话
* @param wsMessage 消息
*/
private void handleTaskFail(WebSocketSession session, WsMessage<?> wsMessage) {
Integer printerId = sessionToPrinter.get(session.getId());
if (printerId == null) {
log.warn("未认证的会话发送任务失败消息: sessionId={}", session.getId());
return;
}
Map<String, Object> dataMap = (Map<String, Object>) wsMessage.getData();
Integer taskId = (Integer) dataMap.get("taskId");
String reason = (String) dataMap.get("reason");
log.warn("任务完成失败: printerId={}, taskId={}, reason={}", printerId, taskId, reason);
printerMapper.updateTaskStatus(taskId, 2); // 状态改为失败
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
Integer printerId = sessionToPrinter.remove(session.getId());
if (printerId != null) {
connectionManager.removeConnection(printerId);
log.info("WebSocket 连接关闭: printerId={}, sessionId={}, status={}",
printerId, session.getId(), status);
}
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) {
log.error("WebSocket 传输错误: sessionId={}", session.getId(), exception);
Integer printerId = sessionToPrinter.remove(session.getId());
if (printerId != null) {
connectionManager.removeConnection(printerId);
}
}
// ========== 辅助方法 ==========
/**
* 解析查询字符串
*
* @param query 查询字符串
* @return 参数映射
*/
private Map<String, String> parseQueryString(String query) {
Map<String, String> params = new HashMap<>();
if (query != null && !query.isEmpty()) {
for (String param : query.split("&")) {
String[] pair = param.split("=", 2);
if (pair.length == 2) {
params.put(
URLDecoder.decode(pair[0], StandardCharsets.UTF_8),
URLDecoder.decode(pair[1], StandardCharsets.UTF_8)
);
}
}
}
return params;
}
/**
* 发送错误消息
*
* @param session WebSocket会话
* @param code 错误代码
* @param message 错误消息
*/
private void sendError(WebSocketSession session, int code, String message) {
sendMessage(session, WsMessage.create(WsMessageType.ERROR, new ErrorData(code, message)));
}
/**
* 发送消息
*
* @param session WebSocket会话
* @param message 消息
*/
private void sendMessage(WebSocketSession session, WsMessage<?> message) {
try {
if (session.isOpen()) {
String json = objectMapper.writeValueAsString(message);
session.sendMessage(new TextMessage(json));
}
} catch (IOException e) {
log.error("发送消息失败: sessionId={}", session.getId(), e);
}
}
/**
* 关闭会话
*
* @param session WebSocket会话
*/
private void closeSession(WebSocketSession session) {
try {
session.close();
} catch (IOException e) {
log.error("关闭会话失败: sessionId={}", session.getId(), e);
}
}
}