From d590286b13fb2ee61ff4d63c8dfee456ce029710 Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Mon, 1 Dec 2025 09:59:21 +0800 Subject: [PATCH] =?UTF-8?q?feat(printer):=20=E5=AE=9E=E7=8E=B0=E6=89=93?= =?UTF-8?q?=E5=8D=B0=E6=9C=BA=E4=BB=BB=E5=8A=A1WebSocket=E5=AE=9E=E6=97=B6?= =?UTF-8?q?=E6=8E=A8=E9=80=81=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增PrinterTaskPushService接口及实现,负责任务推送逻辑 - 在PrinterServiceImpl中集成WebSocket推送,在任务创建和审核通过时主动推送 - 新增WebSocket配置类和处理器,支持打印机通过WebSocket连接接收任务 - 实现连接管理器,维护打印机在线状态并支持心跳保活 - 添加相关模型类如WsMessage、WsMessageType等,规范通信协议 - 在PrinterMapper中增加查询待处理任务列表的方法 - 完善异常处理和日志记录,确保推送可靠性 --- pom.xml | 6 + .../com/ycwl/basic/mapper/PrinterMapper.java | 2 + .../printer/PrinterTaskPushService.java | 28 ++ .../printer/impl/PrinterServiceImpl.java | 30 +- .../impl/PrinterTaskPushServiceImpl.java | 110 +++++++ .../websocket/config/WebSocketConfig.java | 25 ++ .../handler/PrinterWebSocketHandler.java | 299 ++++++++++++++++++ .../manager/PrinterConnectionManager.java | 109 +++++++ .../ycwl/basic/websocket/model/ErrorData.java | 23 ++ .../basic/websocket/model/TaskAckData.java | 14 + .../ycwl/basic/websocket/model/WsMessage.java | 42 +++ .../basic/websocket/model/WsMessageType.java | 17 + src/main/resources/mapper/PrinterMapper.xml | 6 + 13 files changed, 710 insertions(+), 1 deletion(-) create mode 100644 src/main/java/com/ycwl/basic/service/printer/PrinterTaskPushService.java create mode 100644 src/main/java/com/ycwl/basic/service/printer/impl/PrinterTaskPushServiceImpl.java create mode 100644 src/main/java/com/ycwl/basic/websocket/config/WebSocketConfig.java create mode 100644 src/main/java/com/ycwl/basic/websocket/handler/PrinterWebSocketHandler.java create mode 100644 src/main/java/com/ycwl/basic/websocket/manager/PrinterConnectionManager.java create mode 100644 src/main/java/com/ycwl/basic/websocket/model/ErrorData.java create mode 100644 src/main/java/com/ycwl/basic/websocket/model/TaskAckData.java create mode 100644 src/main/java/com/ycwl/basic/websocket/model/WsMessage.java create mode 100644 src/main/java/com/ycwl/basic/websocket/model/WsMessageType.java diff --git a/pom.xml b/pom.xml index 232c3271..53af170a 100644 --- a/pom.xml +++ b/pom.xml @@ -75,6 +75,12 @@ spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-websocket + + com.alibaba.cloud diff --git a/src/main/java/com/ycwl/basic/mapper/PrinterMapper.java b/src/main/java/com/ycwl/basic/mapper/PrinterMapper.java index 996c4684..759fe5ee 100644 --- a/src/main/java/com/ycwl/basic/mapper/PrinterMapper.java +++ b/src/main/java/com/ycwl/basic/mapper/PrinterMapper.java @@ -26,6 +26,8 @@ public interface PrinterMapper { PrintTaskResp findTaskByPrinterId(Integer printerId); + List listPendingTasksByPrinterId(Integer printerId); + int updateTaskStatus(@Param("id") Integer id, @Param("status") Integer status); int compareAndSetTaskStatus(@Param("id") Integer id, diff --git a/src/main/java/com/ycwl/basic/service/printer/PrinterTaskPushService.java b/src/main/java/com/ycwl/basic/service/printer/PrinterTaskPushService.java new file mode 100644 index 00000000..bf81370d --- /dev/null +++ b/src/main/java/com/ycwl/basic/service/printer/PrinterTaskPushService.java @@ -0,0 +1,28 @@ +package com.ycwl.basic.service.printer; + +import com.ycwl.basic.model.printer.resp.PrintTaskResp; + +import java.util.List; + +/** + * 打印机任务推送服务 + */ +public interface PrinterTaskPushService { + + /** + * 推送任务到打印机 + * + * @param printerId 打印机ID + * @param taskId 任务ID + * @return 是否推送成功 + */ + boolean pushTaskToPrinter(Integer printerId, Integer taskId); + + /** + * 获取打印机的待处理任务列表 + * + * @param printerId 打印机ID + * @return 待处理任务列表 + */ + List getPendingTasksByPrinterId(Integer printerId); +} diff --git a/src/main/java/com/ycwl/basic/service/printer/impl/PrinterServiceImpl.java b/src/main/java/com/ycwl/basic/service/printer/impl/PrinterServiceImpl.java index 10cd4d3b..f8cb49ca 100644 --- a/src/main/java/com/ycwl/basic/service/printer/impl/PrinterServiceImpl.java +++ b/src/main/java/com/ycwl/basic/service/printer/impl/PrinterServiceImpl.java @@ -67,6 +67,7 @@ import com.ycwl.basic.repository.ScenicRepository; import com.ycwl.basic.service.mobile.WxPayService; import com.ycwl.basic.service.pc.FaceService; import com.ycwl.basic.service.printer.PrinterService; +import com.ycwl.basic.service.printer.PrinterTaskPushService; import com.ycwl.basic.storage.StorageFactory; import com.ycwl.basic.storage.adapters.IStorageAdapter; import com.ycwl.basic.utils.ApiResponse; @@ -140,6 +141,8 @@ public class PrinterServiceImpl implements PrinterService { private FaceService faceService; @Autowired private DeviceRepository deviceRepository; + @Autowired + private PrinterTaskPushService taskPushService; // 用于优先打印的线程池,核心线程数根据实际情况调整 private final ExecutorService preferPrintExecutor = Executors.newFixedThreadPool( @@ -961,6 +964,17 @@ public class PrinterServiceImpl implements PrinterService { task.setCreateTime(new Date()); task.setUpdateTime(new Date()); printTaskMapper.insertTask(task); + + // ========== WebSocket 推送任务 ========== + // 只推送立即可处理的任务(status=0),待审核任务(status=4)等审核通过后再推送 + if (initialStatus == TASK_STATUS_PENDING) { + try { + taskPushService.pushTaskToPrinter(printer.getId(), task.getId()); + } catch (Exception e) { + log.error("推送任务失败: printerId={}, taskId={}", printer.getId(), task.getId(), e); + // 推送失败不影响任务创建,任务会通过 HTTP 轮询获取 + } + } } }); } @@ -1001,7 +1015,21 @@ public class PrinterServiceImpl implements PrinterService { return 0; } // 将状态从4(待审核)改为0(待处理) - return printTaskMapper.batchUpdateStatus(taskIds, TASK_STATUS_PENDING); + int count = printTaskMapper.batchUpdateStatus(taskIds, TASK_STATUS_PENDING); + + // ========== WebSocket 推送审核通过的任务 ========== + for (Integer taskId : taskIds) { + try { + PrintTaskEntity task = printTaskMapper.selectById(taskId); + if (task != null && task.getPrinterId() != null) { + taskPushService.pushTaskToPrinter(task.getPrinterId(), taskId); + } + } catch (Exception e) { + log.error("推送审核任务失败: taskId={}", taskId, e); + } + } + + return count; } /** diff --git a/src/main/java/com/ycwl/basic/service/printer/impl/PrinterTaskPushServiceImpl.java b/src/main/java/com/ycwl/basic/service/printer/impl/PrinterTaskPushServiceImpl.java new file mode 100644 index 00000000..89509266 --- /dev/null +++ b/src/main/java/com/ycwl/basic/service/printer/impl/PrinterTaskPushServiceImpl.java @@ -0,0 +1,110 @@ +package com.ycwl.basic.service.printer.impl; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.ycwl.basic.mapper.PrinterMapper; +import com.ycwl.basic.model.pc.printer.entity.PrintTaskEntity; +import com.ycwl.basic.model.printer.resp.PrintTaskResp; +import com.ycwl.basic.service.printer.PrinterTaskPushService; +import com.ycwl.basic.websocket.manager.PrinterConnectionManager; +import com.ycwl.basic.websocket.model.WsMessage; +import com.ycwl.basic.websocket.model.WsMessageType; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeanUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; + +import java.util.List; + +/** + * 打印机任务推送服务实现 + */ +@Slf4j +@Service +public class PrinterTaskPushServiceImpl implements PrinterTaskPushService { + + private static final int TASK_STATUS_PENDING = 0; + private static final int TASK_STATUS_PROCESSING = 3; + + @Autowired + private PrinterMapper printerMapper; + + @Autowired + private PrinterConnectionManager connectionManager; + + @Autowired + private ObjectMapper objectMapper; + + @Override + public boolean pushTaskToPrinter(Integer printerId, Integer taskId) { + try { + // 1. 检查打印机是否在线 + if (!connectionManager.isOnline(printerId)) { + log.debug("打印机不在线,跳过推送: printerId={}, taskId={}", printerId, taskId); + return false; + } + + // 2. 获取 WebSocket 连接 + WebSocketSession session = connectionManager.getSession(printerId); + if (session == null || !session.isOpen()) { + log.warn("打印机连接不可用: printerId={}, taskId={}", printerId, taskId); + return false; + } + + // 3. 执行 CAS 操作,确保任务只被推送一次 + int updatedRows = printerMapper.compareAndSetTaskStatus( + taskId, TASK_STATUS_PENDING, TASK_STATUS_PROCESSING + ); + + if (updatedRows != 1) { + log.debug("任务已被获取,跳过推送: printerId={}, taskId={}", printerId, taskId); + return false; + } + + // 4. 查询任务详情 + PrintTaskEntity taskEntity = printerMapper.getTaskById(taskId); + if (taskEntity == null) { + log.error("任务不存在: taskId={}", taskId); + return false; + } + + // 5. 转换为 PrintTaskResp + PrintTaskResp task = new PrintTaskResp(); + BeanUtils.copyProperties(taskEntity, task); + task.setStatus(TASK_STATUS_PROCESSING); + + // 6. 通过 WebSocket 推送任务 + WsMessage message = WsMessage.create(WsMessageType.TASK_PUSH, task); + String json = objectMapper.writeValueAsString(message); + session.sendMessage(new TextMessage(json)); + + log.info("任务推送成功: printerId={}, taskId={}", printerId, taskId); + return true; + + } catch (Exception e) { + log.error("任务推送失败: printerId={}, taskId={}", printerId, taskId, e); + // 推送失败时,恢复任务状态为待处理 + try { + printerMapper.compareAndSetTaskStatus( + taskId, TASK_STATUS_PROCESSING, TASK_STATUS_PENDING + ); + } catch (Exception ex) { + log.error("恢复任务状态失败: taskId={}", taskId, ex); + } + return false; + } + } + + @Override + public List getPendingTasksByPrinterId(Integer printerId) { + // 直接查询该打印机所有待处理任务 (status=0),最多返回 100 个 + List tasks = printerMapper.listPendingTasksByPrinterId(printerId); + + if (tasks.size() >= 100) { + log.warn("待处理任务过多,已达上限: printerId={}, count={}", printerId, tasks.size()); + } + + return tasks; + } +} diff --git a/src/main/java/com/ycwl/basic/websocket/config/WebSocketConfig.java b/src/main/java/com/ycwl/basic/websocket/config/WebSocketConfig.java new file mode 100644 index 00000000..400f98bc --- /dev/null +++ b/src/main/java/com/ycwl/basic/websocket/config/WebSocketConfig.java @@ -0,0 +1,25 @@ +package com.ycwl.basic.websocket.config; + +import com.ycwl.basic.websocket.handler.PrinterWebSocketHandler; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.config.annotation.EnableWebSocket; +import org.springframework.web.socket.config.annotation.WebSocketConfigurer; +import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; + +/** + * WebSocket 配置类 + */ +@Configuration +@EnableWebSocket +public class WebSocketConfig implements WebSocketConfigurer { + + @Autowired + private PrinterWebSocketHandler printerWebSocketHandler; + + @Override + public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { + registry.addHandler(printerWebSocketHandler, "/printer/ws") + .setAllowedOrigins("*"); // 生产环境应限制域名 + } +} diff --git a/src/main/java/com/ycwl/basic/websocket/handler/PrinterWebSocketHandler.java b/src/main/java/com/ycwl/basic/websocket/handler/PrinterWebSocketHandler.java new file mode 100644 index 00000000..12d34adf --- /dev/null +++ b/src/main/java/com/ycwl/basic/websocket/handler/PrinterWebSocketHandler.java @@ -0,0 +1,299 @@ +package com.ycwl.basic.websocket.handler; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.ycwl.basic.mapper.PrinterMapper; +import com.ycwl.basic.model.pc.printer.entity.PrinterEntity; +import com.ycwl.basic.model.printer.resp.PrintTaskResp; +import com.ycwl.basic.service.printer.PrinterTaskPushService; +import com.ycwl.basic.websocket.manager.PrinterConnectionManager; +import com.ycwl.basic.websocket.model.ErrorData; +import com.ycwl.basic.websocket.model.WsMessage; +import com.ycwl.basic.websocket.model.WsMessageType; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.PongMessage; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.TextWebSocketHandler; + +import java.io.IOException; +import java.net.URLDecoder; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 打印机 WebSocket 处理器 + */ +@Slf4j +@Component +public class PrinterWebSocketHandler extends TextWebSocketHandler { + + @Autowired + private PrinterMapper printerMapper; + + @Autowired + private PrinterConnectionManager connectionManager; + + @Autowired + private PrinterTaskPushService taskPushService; + + @Autowired + private ObjectMapper objectMapper; + + /** + * sessionId → printerId 映射(用于连接关闭时清理) + */ + private final Map sessionToPrinter = new ConcurrentHashMap<>(); + + @Override + public void afterConnectionEstablished(WebSocketSession session) { + try { + // 1. 从 URL 参数获取 accessKey + String query = session.getUri().getQuery(); + Map params = parseQueryString(query); + String accessKey = params.get("accessKey"); + + if (accessKey == null || accessKey.trim().isEmpty()) { + log.warn("缺少 accessKey 参数: sessionId={}", session.getId()); + session.close(CloseStatus.POLICY_VIOLATION); + return; + } + + // 2. 验证 accessKey + PrinterEntity printer = printerMapper.findByAccessKey(accessKey); + if (printer == null) { + log.warn("打印机不存在: accessKey={}, sessionId={}", accessKey, session.getId()); + session.close(CloseStatus.NOT_ACCEPTABLE); + return; + } + + if (printer.getStatus() != 1) { + log.warn("打印机已停用: printerId={}, sessionId={}", printer.getId(), session.getId()); + session.close(CloseStatus.NOT_ACCEPTABLE); + return; + } + + // 3. 认证成功,添加连接 + Integer printerId = printer.getId(); + connectionManager.addConnection(printerId, session); + sessionToPrinter.put(session.getId(), printerId); + + log.info("打印机连接成功: printerId={}, sessionId={}, printerName={}", + printerId, session.getId(), printer.getName()); + + // 4. 推送所有待处理任务 + pushPendingTasks(printerId); + + } catch (Exception e) { + log.error("连接认证失败: sessionId={}", session.getId(), e); + closeSession(session); + } + } + + /** + * 推送待处理任务 + * + * @param printerId 打印机ID + */ + private void pushPendingTasks(Integer printerId) { + try { + List tasks = taskPushService.getPendingTasksByPrinterId(printerId); + for (PrintTaskResp task : tasks) { + taskPushService.pushTaskToPrinter(printerId, task.getId()); + } + if (!tasks.isEmpty()) { + log.info("已推送待处理任务: printerId={}, count={}", printerId, tasks.size()); + } + } catch (Exception e) { + log.error("推送待处理任务失败: printerId={}", printerId, e); + } + } + + @Override + protected void handleTextMessage(WebSocketSession session, TextMessage message) { + try { + String payload = message.getPayload(); + WsMessage wsMessage = objectMapper.readValue(payload, WsMessage.class); + + switch (wsMessage.getType()) { + case TASK_ACK: + handleTaskAck(session, wsMessage); + break; + case TASK_SUCCESS: + handleTaskSuccess(session, wsMessage); + break; + case TASK_FAIL: + handleTaskFail(session, wsMessage); + break; + default: + log.warn("未知消息类型: type={}, sessionId={}", wsMessage.getType(), session.getId()); + } + } catch (Exception e) { + log.error("处理 WebSocket 消息失败: sessionId={}", session.getId(), e); + sendError(session, 500, "消息处理失败: " + e.getMessage()); + } + } + + /** + * 处理 pong 消息(WebSocket 原生保活机制) + * 客户端发送 ping 帧后,服务器自动响应 pong 帧,并触发此方法 + * + * @param session WebSocket会话 + * @param message Pong消息 + */ + @Override + protected void handlePongMessage(WebSocketSession session, PongMessage message) { + Integer printerId = sessionToPrinter.get(session.getId()); + if (printerId != null) { + connectionManager.setOnline(printerId); // 刷新 Redis 在线状态 + log.debug("收到 pong: printerId={}, sessionId={}", printerId, session.getId()); + } + } + + /** + * 处理任务确认 + * + * @param session WebSocket会话 + * @param wsMessage 消息 + */ + private void handleTaskAck(WebSocketSession session, WsMessage wsMessage) { + Integer printerId = sessionToPrinter.get(session.getId()); + if (printerId == null) { + return; + } + + Map dataMap = (Map) wsMessage.getData(); + Integer taskId = (Integer) dataMap.get("taskId"); + log.info("收到任务确认: printerId={}, taskId={}, sessionId={}", printerId, taskId, session.getId()); + } + + /** + * 处理任务成功 + * + * @param session WebSocket会话 + * @param wsMessage 消息 + */ + private void handleTaskSuccess(WebSocketSession session, WsMessage wsMessage) { + Integer printerId = sessionToPrinter.get(session.getId()); + if (printerId == null) { + log.warn("未认证的会话发送任务成功消息: sessionId={}", session.getId()); + return; + } + + Map dataMap = (Map) wsMessage.getData(); + Integer taskId = (Integer) dataMap.get("taskId"); + + log.info("任务完成成功: printerId={}, taskId={}", printerId, taskId); + printerMapper.updateTaskStatus(taskId, 1); // 状态改为成功 + } + + /** + * 处理任务失败 + * + * @param session WebSocket会话 + * @param wsMessage 消息 + */ + private void handleTaskFail(WebSocketSession session, WsMessage wsMessage) { + Integer printerId = sessionToPrinter.get(session.getId()); + if (printerId == null) { + log.warn("未认证的会话发送任务失败消息: sessionId={}", session.getId()); + return; + } + + Map dataMap = (Map) wsMessage.getData(); + Integer taskId = (Integer) dataMap.get("taskId"); + String reason = (String) dataMap.get("reason"); + + log.warn("任务完成失败: printerId={}, taskId={}, reason={}", printerId, taskId, reason); + printerMapper.updateTaskStatus(taskId, 2); // 状态改为失败 + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { + Integer printerId = sessionToPrinter.remove(session.getId()); + if (printerId != null) { + connectionManager.removeConnection(printerId); + log.info("WebSocket 连接关闭: printerId={}, sessionId={}, status={}", + printerId, session.getId(), status); + } + } + + @Override + public void handleTransportError(WebSocketSession session, Throwable exception) { + log.error("WebSocket 传输错误: sessionId={}", session.getId(), exception); + Integer printerId = sessionToPrinter.remove(session.getId()); + if (printerId != null) { + connectionManager.removeConnection(printerId); + } + } + + // ========== 辅助方法 ========== + + /** + * 解析查询字符串 + * + * @param query 查询字符串 + * @return 参数映射 + */ + private Map parseQueryString(String query) { + Map params = new HashMap<>(); + if (query != null && !query.isEmpty()) { + for (String param : query.split("&")) { + String[] pair = param.split("=", 2); + if (pair.length == 2) { + params.put( + URLDecoder.decode(pair[0], StandardCharsets.UTF_8), + URLDecoder.decode(pair[1], StandardCharsets.UTF_8) + ); + } + } + } + return params; + } + + /** + * 发送错误消息 + * + * @param session WebSocket会话 + * @param code 错误代码 + * @param message 错误消息 + */ + private void sendError(WebSocketSession session, int code, String message) { + sendMessage(session, WsMessage.create(WsMessageType.ERROR, new ErrorData(code, message))); + } + + /** + * 发送消息 + * + * @param session WebSocket会话 + * @param message 消息 + */ + private void sendMessage(WebSocketSession session, WsMessage message) { + try { + if (session.isOpen()) { + String json = objectMapper.writeValueAsString(message); + session.sendMessage(new TextMessage(json)); + } + } catch (IOException e) { + log.error("发送消息失败: sessionId={}", session.getId(), e); + } + } + + /** + * 关闭会话 + * + * @param session WebSocket会话 + */ + private void closeSession(WebSocketSession session) { + try { + session.close(); + } catch (IOException e) { + log.error("关闭会话失败: sessionId={}", session.getId(), e); + } + } +} diff --git a/src/main/java/com/ycwl/basic/websocket/manager/PrinterConnectionManager.java b/src/main/java/com/ycwl/basic/websocket/manager/PrinterConnectionManager.java new file mode 100644 index 00000000..8c4cc7a3 --- /dev/null +++ b/src/main/java/com/ycwl/basic/websocket/manager/PrinterConnectionManager.java @@ -0,0 +1,109 @@ +package com.ycwl.basic.websocket.manager; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.WebSocketSession; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +/** + * 打印机连接管理器 + */ +@Slf4j +@Component +public class PrinterConnectionManager { + + private static final String REDIS_KEY_PREFIX = "printer:online:"; + private static final long ONLINE_EXPIRE_SECONDS = 120; // 2分钟过期 + + /** + * 内存存储: printerId → WebSocketSession + */ + private final ConcurrentHashMap connections = new ConcurrentHashMap<>(); + + @Autowired + private RedisTemplate redisTemplate; + + /** + * 添加连接 + * + * @param printerId 打印机ID + * @param session WebSocket会话 + */ + public void addConnection(Integer printerId, WebSocketSession session) { + connections.put(printerId, session); + setOnline(printerId); + log.info("打印机连接已添加: printerId={}, sessionId={}", printerId, session.getId()); + } + + /** + * 移除连接 + * + * @param printerId 打印机ID + */ + public void removeConnection(Integer printerId) { + WebSocketSession session = connections.remove(printerId); + removeOnline(printerId); + if (session != null) { + log.info("打印机连接已移除: printerId={}, sessionId={}", printerId, session.getId()); + } + } + + /** + * 获取连接 + * + * @param printerId 打印机ID + * @return WebSocket会话 + */ + public WebSocketSession getSession(Integer printerId) { + return connections.get(printerId); + } + + /** + * 检查是否在线 + * + * @param printerId 打印机ID + * @return 是否在线 + */ + public boolean isOnline(Integer printerId) { + // 优先检查内存 + if (connections.containsKey(printerId)) { + return true; + } + // 检查 Redis + String key = REDIS_KEY_PREFIX + printerId; + return Boolean.TRUE.equals(redisTemplate.hasKey(key)); + } + + /** + * 设置在线状态(刷新心跳) + * + * @param printerId 打印机ID + */ + public void setOnline(Integer printerId) { + String key = REDIS_KEY_PREFIX + printerId; + redisTemplate.opsForValue().set(key, "1", ONLINE_EXPIRE_SECONDS, TimeUnit.SECONDS); + } + + /** + * 移除在线状态 + * + * @param printerId 打印机ID + */ + private void removeOnline(Integer printerId) { + String key = REDIS_KEY_PREFIX + printerId; + redisTemplate.delete(key); + } + + /** + * 获取在线打印机数量 + * + * @return 在线数量 + */ + public int getOnlineCount() { + return connections.size(); + } +} diff --git a/src/main/java/com/ycwl/basic/websocket/model/ErrorData.java b/src/main/java/com/ycwl/basic/websocket/model/ErrorData.java new file mode 100644 index 00000000..5eb6b438 --- /dev/null +++ b/src/main/java/com/ycwl/basic/websocket/model/ErrorData.java @@ -0,0 +1,23 @@ +package com.ycwl.basic.websocket.model; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 错误数据 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class ErrorData { + /** + * 错误代码 + */ + private Integer code; + + /** + * 错误消息 + */ + private String message; +} diff --git a/src/main/java/com/ycwl/basic/websocket/model/TaskAckData.java b/src/main/java/com/ycwl/basic/websocket/model/TaskAckData.java new file mode 100644 index 00000000..5cdcbfaa --- /dev/null +++ b/src/main/java/com/ycwl/basic/websocket/model/TaskAckData.java @@ -0,0 +1,14 @@ +package com.ycwl.basic.websocket.model; + +import lombok.Data; + +/** + * 任务确认数据 + */ +@Data +public class TaskAckData { + /** + * 任务ID + */ + private Integer taskId; +} diff --git a/src/main/java/com/ycwl/basic/websocket/model/WsMessage.java b/src/main/java/com/ycwl/basic/websocket/model/WsMessage.java new file mode 100644 index 00000000..5307d971 --- /dev/null +++ b/src/main/java/com/ycwl/basic/websocket/model/WsMessage.java @@ -0,0 +1,42 @@ +package com.ycwl.basic.websocket.model; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * WebSocket 消息基类 + * + * @param 消息数据类型 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class WsMessage { + /** + * 消息类型 + */ + private WsMessageType type; + + /** + * 消息数据 + */ + private T data; + + /** + * 时间戳 + */ + private Long timestamp; + + /** + * 创建消息 + * + * @param type 消息类型 + * @param data 消息数据 + * @param 数据类型 + * @return 消息对象 + */ + public static WsMessage create(WsMessageType type, T data) { + return new WsMessage<>(type, data, System.currentTimeMillis()); + } +} diff --git a/src/main/java/com/ycwl/basic/websocket/model/WsMessageType.java b/src/main/java/com/ycwl/basic/websocket/model/WsMessageType.java new file mode 100644 index 00000000..87d26469 --- /dev/null +++ b/src/main/java/com/ycwl/basic/websocket/model/WsMessageType.java @@ -0,0 +1,17 @@ +package com.ycwl.basic.websocket.model; + +/** + * WebSocket 消息类型枚举 + * + * 注意:保活机制使用 WebSocket 原生 ping/pong 帧,不再使用应用层心跳消息 + */ +public enum WsMessageType { + // 客户端 → 服务器 + TASK_ACK, // 任务确认 + TASK_SUCCESS, // 任务成功 + TASK_FAIL, // 任务失败 + + // 服务器 → 客户端 + TASK_PUSH, // 推送任务 + ERROR // 错误 +} diff --git a/src/main/resources/mapper/PrinterMapper.xml b/src/main/resources/mapper/PrinterMapper.xml index 350619f5..5c91398b 100644 --- a/src/main/resources/mapper/PrinterMapper.xml +++ b/src/main/resources/mapper/PrinterMapper.xml @@ -29,6 +29,12 @@ +