diff --git a/src/main/java/com/ycwl/basic/pipeline/CLAUDE.md b/src/main/java/com/ycwl/basic/pipeline/CLAUDE.md new file mode 100644 index 00000000..13ed9bc4 --- /dev/null +++ b/src/main/java/com/ycwl/basic/pipeline/CLAUDE.md @@ -0,0 +1,1611 @@ +# Pipeline 通用管线框架 + +## 概述 + +Pipeline 是一个通用的、可扩展的管线框架,实现了责任链模式(Chain of Responsibility Pattern),用于组织和执行一系列处理阶段(Stage)。该框架支持动态Stage添加、多级降级策略、配置驱动的Stage控制,以及完善的错误处理机制。 + +### 核心特性 + +- **责任链模式**: 将复杂流程拆分为独立的Stage,按顺序执行 +- **Builder模式**: 灵活组装管线,支持条件性添加Stage +- **动态Stage添加**: 支持在运行时根据条件动态插入后续Stage +- **降级策略**: 支持多级降级执行,确保管线在异常情况下的稳定性 +- **配置驱动**: 支持通过外部配置控制Stage的启用/禁用 +- **类型安全**: 使用泛型确保类型安全 +- **解耦设计**: Context独立于业务模型,支持多种使用场景 +- **自动清理**: 无论成功或失败都会在finally中调用`context.cleanup()` + +## 包结构 + +``` +com.ycwl.basic.pipeline +├── annotation/ # 注解定义 +│ └── StageConfig # Stage配置注解 +├── core/ # 核心类 +│ ├── AbstractPipelineStage # Stage抽象基类 +│ ├── PipelineContext # 管线上下文接口 +│ ├── Pipeline # 管线执行器 +│ ├── PipelineBuilder # 管线构建器 +│ ├── PipelineStage # Stage接口 +│ └── StageResult # Stage执行结果 +├── enums/ # 枚举定义 +│ └── StageOptionalMode # Stage可选模式枚举 +└── exception/ # 异常类 + └── PipelineException # 管线异常 +``` + +### 业务实现包 + +框架提供了两个完整的业务实现示例: + +**1. 人脸匹配管线** (`com.ycwl.basic.face.pipeline`) +- 支持自动人脸匹配、自定义人脸匹配、仅识别三种场景 +- 20个Stage实现完整的人脸识别和匹配流程 +- 详见 [FaceMatchingPipeline 文档](../face/pipeline/CLAUDE.md) + +**2. 图片处理管线** (`com.ycwl.basic.image.pipeline`) +- 支持打印、拼图、超分辨率增强等场景 +- 包含下载、水印、旋转、质量检测、上传等Stage +- 详见 [ImagePipeline 文档](../image/pipeline/CLAUDE.md) + +## 核心组件 + +### 1. Pipeline - 管线执行器 + +**职责**: 按顺序执行一系列Stage,管理执行流程和异常处理。 + +**关键特性**: +- 顺序执行所有Stage +- 支持动态添加后续Stage +- 循环检测(最大执行100个Stage,防止无限循环) +- 详细的日志输出(带状态图标: ✓成功 ○跳过 △降级 ✗失败) +- 异常安全保证(finally块中cleanup) + +**源码位置**: `Pipeline.java:23-89` + +**执行流程**: +``` +1. context.beforePipeline() +2. for (stage : stages) + 2.1 if (!stage.shouldExecute(context)) continue + 2.2 result = stage.execute(context) + 2.3 if (result.hasNextStages()) 动态插入Stage + 2.4 if (result.isFailed()) 终止执行 +3. context.afterPipeline() +4. finally: context.cleanup() +``` + +**使用示例**: +```java +// 方式1: 使用Builder构建 +Pipeline pipeline = new PipelineBuilder<>("FaceMatching") + .addStage(prepareContextStage) + .addStage(faceRecognitionStage) + .addStage(updateFaceResultStage) + .sortByPriority() // 按优先级排序(可选) + .build(); + +// 方式2: 直接构造 +List> stages = Arrays.asList(stage1, stage2, stage3); +Pipeline pipeline = new Pipeline<>("MyPipeline", stages); + +// 执行管线 +boolean success = pipeline.execute(context); +if (success) { + log.info("管线执行成功"); +} else { + log.error("管线执行失败"); +} +``` + +**重要方法**: +- `execute(C context)`: 执行管线,返回成功/失败 +- `getName()`: 获取管线名称 +- `getStageCount()`: 获取Stage总数 +- `getStageNames()`: 获取所有Stage名称列表 + +**日志输出示例**: +``` +[FaceMatching] Pipeline 开始执行, Stage 数量: 13 +[FaceMatching] ✓ Stage PrepareContext - SUCCESS (耗时: 15ms) +[FaceMatching] ○ Stage FaceRecovery - SKIPPED (耗时: 2ms) +[FaceMatching] △ Stage VideoRecreation - DEGRADED (耗时: 120ms) +[FaceMatching] Pipeline 执行完成, 总Stage数: 13, 实际执行: 11, 耗时: 450ms +``` + +### 2. PipelineContext - 管线上下文接口 + +**职责**: 定义Stage之间传递状态和数据的通用接口,提供生命周期钩子和Stage开关控制。 + +**源码位置**: `PipelineContext.java:7-50` + +**核心方法**: + +| 方法 | 职责 | 调用时机 | +|------|------|---------| +| `beforePipeline()` | Pipeline开始执行前的钩子 | Pipeline.execute()开始时 | +| `afterPipeline()` | Pipeline全部Stage执行完成后的钩子 | Pipeline.execute()成功完成时 | +| `cleanup()` | Pipeline结束时的清理钩子 | Pipeline.execute() finally块中 | +| `isStageEnabled(stageId, defaultEnabled)` | 判断指定Stage是否启用 | AbstractPipelineStage.shouldExecute()中 | +| `isStageEnabled(stageId)` | 判断指定Stage是否启用(默认false) | AbstractPipelineStage.shouldExecute()中 | + +**实现示例**: + +```java +@Getter +@Setter +public class FaceMatchingContext implements PipelineContext { + + // ==================== 核心字段 ==================== + private final Long faceId; + private final boolean isNew; + + // ==================== Stage配置 ==================== + private Map stageEnabledMap = new HashMap<>(); + + // ==================== 实现PipelineContext接口 ==================== + + @Override + public boolean isStageEnabled(String stageId, boolean defaultEnabled) { + return stageEnabledMap.getOrDefault(stageId, defaultEnabled); + } + + @Override + public void beforePipeline() { + log.info("开始人脸匹配流程: faceId={}, isNew={}", faceId, isNew); + } + + @Override + public void afterPipeline() { + log.info("完成人脸匹配流程: faceId={}, 结果={}", faceId, finalResult); + } + + @Override + public void cleanup() { + // 清理临时资源 + tempFileManager.cleanup(); + } + + // ==================== 业务方法 ==================== + + public FaceMatchingContext setStageState(String stageId, boolean enabled) { + stageEnabledMap.put(stageId, enabled); + return this; + } + + public FaceMatchingContext enableStage(String stageId) { + stageEnabledMap.put(stageId, true); + return this; + } + + public FaceMatchingContext disableStage(String stageId) { + stageEnabledMap.put(stageId, false); + return this; + } +} +``` + +**使用建议**: +- 所有业务Context都应实现此接口 +- 通过`stageEnabledMap`集中管理Stage开关配置 +- 在`cleanup()`中释放所有临时资源(文件、连接等) +- 在`beforePipeline()`中进行埋点、日志记录 +- 在`afterPipeline()`中进行结果汇总、通知等 + +### 3. PipelineStage - Stage接口 + +**职责**: 定义Stage的通用行为和执行流程。 + +**源码位置**: `PipelineStage.java` + +**核心方法**: + +| 方法 | 职责 | 返回值 | +|------|------|--------| +| `getName()` | 获取Stage名称 | String | +| `shouldExecute(C context)` | 判断是否应该执行 | boolean | +| `execute(C context)` | 执行Stage逻辑 | StageResult | +| `getPriority()` | 获取优先级(默认0) | int | +| `getStageConfig()` | 获取Stage配置注解 | StageConfig | + +**优先级说明**: +- 数值越小优先级越高 +- 默认优先级为0 +- 可通过`PipelineBuilder.sortByPriority()`对Stage排序 +- 常见优先级设置: + - `-100`: 极高优先级(如PrepareContext) + - `0`: 正常优先级(大多数Stage) + - `999`: 清理Stage(如Cleanup) + +### 4. AbstractPipelineStage - Stage抽象基类 + +**职责**: 提供Stage的通用实现和模板方法,封装可选性判断和钩子逻辑。 + +**源码位置**: `AbstractPipelineStage.java:10-65` + +**执行流程**: +``` +shouldExecute(context) + ↓ true +beforeExecute(context) + ↓ +doExecute(context) + ↓ +afterExecute(context, result) + ↓ +return result +``` + +**关键方法**: + +| 方法 | 访问修饰符 | 子类需要实现 | 职责 | +|------|-----------|------------|------| +| `getName()` | abstract | ✅ | 返回Stage名称 | +| `doExecute(C context)` | protected abstract | ✅ | 实现具体处理逻辑 | +| `shouldExecuteByBusinessLogic(C context)` | protected | ❌ (默认true) | 实现业务逻辑判断 | +| `beforeExecute(C context)` | protected | ❌ | 执行前钩子 | +| `afterExecute(C context, StageResult result)` | protected | ❌ | 执行后钩子 | +| `shouldExecute(C context)` | public final | ❌ | Stage执行判断(封装) | +| `execute(C context)` | public final | ❌ | Stage执行入口(封装) | + +**shouldExecute()执行判断逻辑**: + +```java +@Override +public final boolean shouldExecute(C context) { + StageConfig config = getStageConfig(); + if (config != null) { + String stageId = config.stageId(); + StageOptionalMode mode = config.optionalMode(); + + // 1. FORCE_ON: 强制执行,只检查业务逻辑 + if (mode == StageOptionalMode.FORCE_ON) { + return shouldExecuteByBusinessLogic(context); + } + + // 2. SUPPORT: 检查外部配置 + if (mode == StageOptionalMode.SUPPORT) { + boolean externalEnabled = context.isStageEnabled(stageId, config.defaultEnabled()); + if (!externalEnabled) { + log.debug("[{}] Stage被外部配置禁用", stageId); + return false; + } + } + + // 3. UNSUPPORT: 不检查外部配置,直接检查业务逻辑 + } + + // 最终检查业务逻辑 + return shouldExecuteByBusinessLogic(context); +} +``` + +**实现示例**: + +```java +@Slf4j +@Component +@StageConfig( + stageId = "prepare_context", + optionalMode = StageOptionalMode.FORCE_ON, + description = "准备人脸匹配上下文数据" +) +public class PrepareContextStage extends AbstractPipelineStage { + + @Autowired + private FaceRepository faceRepository; + + @Override + public String getName() { + return "PrepareContext"; + } + + @Override + protected boolean shouldExecuteByBusinessLogic(FaceMatchingContext context) { + // 可选:实现业务逻辑判断 + return true; + } + + @Override + protected void beforeExecute(FaceMatchingContext context) { + // 可选:执行前钩子 + log.debug("准备加载人脸数据: faceId={}", context.getFaceId()); + } + + @Override + protected StageResult doExecute(FaceMatchingContext context) { + Long faceId = context.getFaceId(); + + // 1. 加载人脸实体 + FaceEntity face = faceRepository.getFace(faceId); + if (face == null) { + return StageResult.failed("人脸不存在,faceId: " + faceId); + } + + // 2. 更新Context + context.setFace(face); + + // 3. 返回成功结果 + return StageResult.success("上下文准备完成"); + } + + @Override + protected void afterExecute(FaceMatchingContext context, StageResult result) { + // 可选:执行后钩子 + if (result.isSuccess()) { + log.info("上下文准备成功: memberId={}", context.getFace().getMemberId()); + } + } +} +``` + +**防御性编程模式** ⚠️: + +对于在`shouldExecuteByBusinessLogic()`中有条件判断的Stage,**必须在`doExecute()`开头添加相同的防御性检查**,以防止单元测试直接调用`execute()`时跳过条件检查: + +```java +@Override +protected boolean shouldExecuteByBusinessLogic(FaceMatchingContext context) { + // 业务逻辑判断 + if (context.getSearchResult() == null) { + return false; + } + return true; +} + +@Override +protected StageResult doExecute(FaceMatchingContext context) { + // ⚠️ 防御性检查:必须添加! + // 防止单元测试直接调用execute()时跳过shouldExecute()的检查 + if (context.getSearchResult() == null) { + log.debug("searchResult为空,跳过处理"); + return StageResult.skipped("searchResult为空"); + } + + // 正常处理逻辑 + // ... +} +``` + +**原因**: Pipeline执行时会先调用`shouldExecute()`检查,但单元测试可能直接调用`stage.execute(context)`,导致跳过`shouldExecuteByBusinessLogic()`的检查,直接进入`doExecute()`。 + +### 5. StageResult - Stage执行结果 + +**职责**: 封装Stage执行结果,包括状态、消息、异常和后续Stage。 + +**源码位置**: `StageResult.java:13-93` + +**状态类型**: + +| 状态 | 枚举值 | 含义 | Pipeline行为 | 使用场景 | +|------|--------|------|-------------|---------| +| SUCCESS | `Status.SUCCESS` | 执行成功 | 继续执行 | 正常完成处理 | +| SKIPPED | `Status.SKIPPED` | 跳过执行 | 继续执行 | 条件不满足,跳过当前Stage | +| DEGRADED | `Status.DEGRADED` | 降级执行 | 继续执行(记录警告) | 使用备用方案或部分失败 | +| FAILED | `Status.FAILED` | 执行失败 | **立即终止Pipeline** | 关键错误,无法继续 | + +**核心字段**: + +| 字段 | 类型 | 说明 | +|------|------|------| +| `status` | `Status` | 执行状态 | +| `message` | `String` | 结果消息(可选) | +| `exception` | `Throwable` | 异常信息(可选) | +| `nextStages` | `List>` | 后续Stage列表(动态添加) | + +**静态工厂方法**: + +```java +// ==================== SUCCESS ==================== + +// 简单成功 +StageResult.success(); + +// 成功并附带消息 +StageResult.success("处理完成"); + +// 成功并动态添加单个后续Stage +StageResult.successWithNext("质量不佳,添加增强", new ImageEnhanceStage()); + +// 成功并动态添加多个后续Stage(可变参数) +StageResult.successWithNext("检测到异常,添加修复Stage", + repairStage1, repairStage2); + +// 成功并动态添加多个后续Stage(List) +List> stages = Arrays.asList(stage1, stage2, stage3); +StageResult.successWithNext("批量添加Stage", stages); + +// ==================== SKIPPED ==================== + +// 简单跳过 +StageResult.skipped(); // 默认消息: "条件不满足,跳过执行" + +// 跳过并附带原因 +StageResult.skipped("memberSourceList为空"); +StageResult.skipped("非自定义匹配场景"); + +// ==================== FAILED ==================== + +// 失败并附带消息 +StageResult.failed("人脸不存在"); + +// 失败并附带消息和异常 +StageResult.failed("处理失败", exception); + +// ==================== DEGRADED ==================== + +// 降级并附带原因 +StageResult.degraded("使用备用水印方案"); +StageResult.degraded("视频重切处理失败"); +``` + +**判断方法**: + +```java +StageResult result = stage.execute(context); + +// 状态判断 +if (result.isSuccess()) { } +if (result.isSkipped()) { } +if (result.isFailed()) { } +if (result.isDegraded()) { } + +// 是否可以继续执行(SUCCESS、SKIPPED、DEGRADED都返回true) +if (result.canContinue()) { } + +// 获取字段 +String message = result.getMessage(); +Throwable exception = result.getException(); +List> nextStages = result.getNextStages(); +``` + +**使用示例 - 动态添加Stage**: + +```java +@Override +protected StageResult doExecute(PhotoProcessContext context) { + File currentFile = context.getCurrentFile(); + + // 检测图片质量 + double quality = detectQuality(currentFile); + + // 如果质量低于阈值,动态添加增强Stage + if (quality < 0.7) { + ImageEnhanceStage enhanceStage = new ImageEnhanceStage(config); + return StageResult.successWithNext( + String.format("质量%.2f低于阈值,添加增强Stage", quality), + enhanceStage + ); + } + + return StageResult.success(String.format("质量%.2f良好", quality)); +} +``` + +**使用示例 - 降级策略**: + +```java +@Override +protected StageResult doExecute(PhotoProcessContext context) { + // 尝试高级水印 + try { + applyAdvancedWatermark(context); + return StageResult.success("应用高级水印"); + } catch (Exception e) { + log.warn("高级水印失败,尝试降级", e); + } + + // 降级到基础水印 + try { + applyBasicWatermark(context); + return StageResult.degraded("降级: 应用基础水印"); + } catch (Exception e) { + log.warn("基础水印也失败,跳过水印", e); + } + + // 完全跳过水印 + return StageResult.degraded("降级: 跳过水印处理"); +} +``` + +### 6. @StageConfig - Stage配置注解 + +**职责**: 统一声明Stage的元信息与可选性控制。 + +**源码位置**: `StageConfig.java:17-38` + +**注解字段**: + +| 字段 | 类型 | 必填 | 默认值 | 说明 | +|------|------|------|--------|------| +| `stageId` | String | ✅ | - | Stage唯一标识,用于外部配置引用 | +| `optionalMode` | StageOptionalMode | ❌ | UNSUPPORT | 可选性模式 | +| `description` | String | ❌ | "" | 描述信息,便于文档与日志 | +| `defaultEnabled` | boolean | ❌ | true | 默认是否启用(仅SUPPORT模式生效) | + +**StageOptionalMode枚举**: + +| 模式 | 说明 | 外部配置 | 业务逻辑 | 使用场景 | +|------|------|---------|---------|---------| +| `UNSUPPORT` | 不支持外部配置 | ❌ | ✅ | Stage完全由代码逻辑控制 | +| `SUPPORT` | 支持外部配置 | ✅ | ✅ | Stage可通过配置启用/禁用 | +| `FORCE_ON` | 强制开启 | ❌ | ✅ | Stage不允许被外部禁用(如Prepare、Cleanup) | + +**判断流程**: + +``` +FORCE_ON: + → 跳过外部配置检查 + → 检查业务逻辑 (shouldExecuteByBusinessLogic) + +SUPPORT: + → 检查外部配置 (context.isStageEnabled) + → 如果禁用: 返回false + → 如果启用: 检查业务逻辑 + +UNSUPPORT: + → 跳过外部配置检查 + → 检查业务逻辑 +``` + +**使用示例**: + +```java +// 1. 强制执行的Stage(如上下文准备) +@StageConfig( + stageId = "prepare_context", + optionalMode = StageOptionalMode.FORCE_ON, + description = "准备人脸匹配上下文数据" +) +public class PrepareContextStage extends AbstractPipelineStage { + // 总是执行,不允许外部禁用 +} + +// 2. 支持外部配置的Stage(如水印、增强) +@StageConfig( + stageId = "watermark", + optionalMode = StageOptionalMode.SUPPORT, + description = "水印处理", + defaultEnabled = true // 默认启用,可通过配置禁用 +) +public class WatermarkStage extends AbstractPipelineStage { + // 可通过context.disableStage("watermark")禁用 +} + +// 3. 不支持外部配置的Stage(如内部逻辑判断) +@StageConfig( + stageId = "restore_orientation", + optionalMode = StageOptionalMode.UNSUPPORT, + description = "恢复图片方向" +) +public class RestoreOrientationStage extends AbstractPipelineStage { + // 完全由业务逻辑控制(如是否旋转过) + @Override + protected boolean shouldExecuteByBusinessLogic(PhotoProcessContext context) { + return context.isRotationApplied(); + } +} +``` + +### 7. PipelineBuilder - 管线构建器 + +**职责**: 使用Builder模式灵活组装管线。 + +**源码位置**: `PipelineBuilder.java:10-59` + +**核心方法**: + +| 方法 | 参数 | 返回值 | 说明 | +|------|------|--------|------| +| `name(String)` | 管线名称 | PipelineBuilder | 设置管线名称 | +| `addStage(PipelineStage)` | 单个Stage | PipelineBuilder | 添加Stage | +| `addStages(List)` | Stage列表 | PipelineBuilder | 批量添加Stage | +| `addStageIf(boolean, PipelineStage)` | 条件, Stage | PipelineBuilder | 条件性添加Stage | +| `sortByPriority()` | - | PipelineBuilder | 按优先级排序Stage | +| `build()` | - | Pipeline | 构建管线 | + +**使用示例**: + +```java +// 1. 基础用法 +Pipeline pipeline = new PipelineBuilder<>("MyPipeline") + .addStage(stage1) + .addStage(stage2) + .addStage(stage3) + .build(); + +// 2. 带名称 +Pipeline pipeline = new PipelineBuilder<>() + .name("CustomPipeline") + .addStage(stage1) + .build(); + +// 3. 条件性添加Stage +Pipeline pipeline = new PipelineBuilder<>("PrintPipeline") + .addStage(new DownloadStage()) + .addStageIf(needWatermark, new WatermarkStage(config)) + .addStageIf(needEnhance, new ImageEnhanceStage()) + .addStage(new UploadStage()) + .build(); + +// 4. 批量添加 +List> coreStages = Arrays.asList( + stage1, stage2, stage3 +); +Pipeline pipeline = new PipelineBuilder<>("BatchPipeline") + .addStages(coreStages) + .addStage(cleanupStage) + .build(); + +// 5. 按优先级排序 +Pipeline pipeline = new PipelineBuilder<>("SortedPipeline") + .addStage(normalStage) // priority = 0 + .addStage(prepareStage) // priority = -100 + .addStage(cleanupStage) // priority = 999 + .sortByPriority() // 排序后: prepareStage → normalStage → cleanupStage + .build(); +``` + +### 8. PipelineException - 管线异常 + +**职责**: 封装管线执行过程中的异常。 + +**源码位置**: `PipelineException.java` + +**使用场景**: +- Stage执行数量超过最大限制(100个) +- Pipeline构建时Stage列表为空 +- 其他管线级别的异常 + +**抛出示例**: + +```java +// Pipeline.java:37 +if (executedCount >= maxStages) { + log.error("[{}] Stage执行数量超过最大限制({}), 可能存在循环依赖", name, maxStages); + throw new PipelineException("Stage执行数量超过最大限制,可能存在循环依赖"); +} + +// Pipeline.java:85 +catch (Exception e) { + log.error("[{}] Pipeline执行异常", name, e); + throw new PipelineException("管线执行失败: " + e.getMessage(), e); +} +``` + +## 使用指南 + +### 1. 创建自定义Context + +```java +@Getter +@Setter +public class OrderProcessContext implements PipelineContext { + + // ==================== 核心字段 ==================== + private final Long orderId; + private final Long userId; + + // ==================== 中间状态 ==================== + private Order order; + private List items; + private PaymentResult paymentResult; + + // ==================== Stage配置 ==================== + private Map stageEnabledMap = new HashMap<>(); + + // ==================== 构造函数 ==================== + private OrderProcessContext(Builder builder) { + this.orderId = builder.orderId; + this.userId = builder.userId; + } + + // ==================== 静态工厂方法 ==================== + public static Builder builder() { + return new Builder(); + } + + public static OrderProcessContext forNewOrder(Long orderId, Long userId) { + return builder().orderId(orderId).userId(userId).build(); + } + + // ==================== 实现PipelineContext接口 ==================== + @Override + public boolean isStageEnabled(String stageId, boolean defaultEnabled) { + return stageEnabledMap.getOrDefault(stageId, defaultEnabled); + } + + @Override + public void beforePipeline() { + log.info("开始处理订单: orderId={}, userId={}", orderId, userId); + } + + @Override + public void afterPipeline() { + log.info("完成处理订单: orderId={}, 支付结果={}", orderId, paymentResult); + } + + @Override + public void cleanup() { + // 清理临时资源 + } + + // ==================== 业务方法 ==================== + public OrderProcessContext setStageState(String stageId, boolean enabled) { + stageEnabledMap.put(stageId, enabled); + return this; + } + + // ==================== Builder ==================== + public static class Builder { + private Long orderId; + private Long userId; + + public Builder orderId(Long orderId) { + this.orderId = orderId; + return this; + } + + public Builder userId(Long userId) { + this.userId = userId; + return this; + } + + public OrderProcessContext build() { + if (orderId == null) { + throw new IllegalArgumentException("orderId is required"); + } + if (userId == null) { + throw new IllegalArgumentException("userId is required"); + } + return new OrderProcessContext(this); + } + } +} +``` + +### 2. 创建自定义Stage + +```java +@Slf4j +@Component +@StageConfig( + stageId = "validate_order", + optionalMode = StageOptionalMode.FORCE_ON, + description = "验证订单有效性" +) +public class ValidateOrderStage extends AbstractPipelineStage { + + @Autowired + private OrderRepository orderRepository; + + @Override + public String getName() { + return "ValidateOrder"; + } + + @Override + protected boolean shouldExecuteByBusinessLogic(OrderProcessContext context) { + // 可选:业务逻辑判断 + return true; + } + + @Override + protected StageResult doExecute(OrderProcessContext context) { + Long orderId = context.getOrderId(); + + try { + // 1. 加载订单 + Order order = orderRepository.findById(orderId); + if (order == null) { + return StageResult.failed("订单不存在: " + orderId); + } + + // 2. 验证订单状态 + if (order.getStatus() != OrderStatus.PENDING) { + return StageResult.failed("订单状态异常: " + order.getStatus()); + } + + // 3. 验证订单金额 + if (order.getAmount() <= 0) { + return StageResult.failed("订单金额异常: " + order.getAmount()); + } + + // 4. 更新Context + context.setOrder(order); + + // 5. 返回成功 + return StageResult.success("订单验证通过"); + + } catch (Exception e) { + log.error("验证订单失败: orderId={}", orderId, e); + return StageResult.failed("验证订单失败: " + e.getMessage(), e); + } + } +} +``` + +### 3. 实现降级策略 + +```java +@Slf4j +@StageConfig( + stageId = "send_notification", + optionalMode = StageOptionalMode.SUPPORT, + description = "发送通知", + defaultEnabled = true +) +public class SendNotificationStage extends AbstractPipelineStage { + + @Autowired + private EmailService emailService; + + @Autowired + private SmsService smsService; + + @Override + public String getName() { + return "SendNotification"; + } + + @Override + protected StageResult doExecute(OrderProcessContext context) { + + // 降级链: Email → SMS → 跳过 + List strategies = Arrays.asList( + NotificationStrategy.EMAIL, + NotificationStrategy.SMS, + null // 跳过通知 + ); + + for (int i = 0; i < strategies.size(); i++) { + NotificationStrategy strategy = strategies.get(i); + + // 最后一级:跳过通知 + if (strategy == null) { + log.warn("所有通知方式均失败,跳过通知"); + return StageResult.degraded("降级: 跳过通知"); + } + + try { + // 尝试发送通知 + sendNotification(context, strategy); + + // 如果不是第一个策略,说明发生了降级 + if (i > 0) { + String degradeMsg = String.format("降级: %s → %s", + NotificationStrategy.EMAIL, strategy); + log.warn(degradeMsg); + return StageResult.degraded(degradeMsg); + } + + // 第一个策略成功 + return StageResult.success("通知发送成功: " + strategy); + + } catch (Exception e) { + log.warn("通知方式 {} 失败: {}", strategy, e.getMessage()); + + // 如果是最后一个策略的前一个,记录详细错误 + if (i == strategies.size() - 2) { + log.warn("所有通知方式均失败,准备跳过通知", e); + } + } + } + + return StageResult.degraded("降级: 跳过通知"); + } + + private void sendNotification(OrderProcessContext context, NotificationStrategy strategy) + throws Exception { + switch (strategy) { + case EMAIL: + emailService.send(context.getUserId(), buildEmailContent(context)); + break; + case SMS: + smsService.send(context.getUserId(), buildSmsContent(context)); + break; + } + } + + private enum NotificationStrategy { + EMAIL, SMS + } +} +``` + +### 4. 动态添加Stage + +```java +@Slf4j +@StageConfig( + stageId = "risk_check", + optionalMode = StageOptionalMode.FORCE_ON, + description = "风险检测" +) +public class RiskCheckStage extends AbstractPipelineStage { + + @Autowired + private RiskService riskService; + + @Override + public String getName() { + return "RiskCheck"; + } + + @Override + protected StageResult doExecute(OrderProcessContext context) { + Order order = context.getOrder(); + + // 1. 执行风险检测 + RiskResult riskResult = riskService.check(order); + + // 2. 根据风险等级动态添加后续Stage + if (riskResult.getLevel() == RiskLevel.HIGH) { + // 高风险:添加人工审核Stage + ManualReviewStage reviewStage = new ManualReviewStage(); + return StageResult.successWithNext( + "高风险订单,添加人工审核", + reviewStage + ); + } else if (riskResult.getLevel() == RiskLevel.MEDIUM) { + // 中风险:添加额外验证Stage + ExtraVerificationStage verifyStage = new ExtraVerificationStage(); + return StageResult.successWithNext( + "中风险订单,添加额外验证", + verifyStage + ); + } + + // 3. 低风险:直接通过 + return StageResult.success("低风险订单,直接通过"); + } +} +``` + +### 5. 组装完整Pipeline + +```java +@Component +public class OrderPipelineFactory { + + @Autowired + private ValidateOrderStage validateOrderStage; + + @Autowired + private LoadOrderItemsStage loadOrderItemsStage; + + @Autowired + private RiskCheckStage riskCheckStage; + + @Autowired + private CalculateAmountStage calculateAmountStage; + + @Autowired + private ProcessPaymentStage processPaymentStage; + + @Autowired + private SendNotificationStage sendNotificationStage; + + @Autowired + private UpdateOrderStatusStage updateOrderStatusStage; + + /** + * 创建新订单处理Pipeline + */ + public Pipeline createNewOrderPipeline() { + return new PipelineBuilder("NewOrderPipeline") + .addStage(validateOrderStage) // 1. 验证订单 + .addStage(loadOrderItemsStage) // 2. 加载订单项 + .addStage(riskCheckStage) // 3. 风险检测(可能动态添加审核Stage) + .addStage(calculateAmountStage) // 4. 计算金额 + .addStage(processPaymentStage) // 5. 处理支付 + .addStage(sendNotificationStage) // 6. 发送通知(可降级) + .addStage(updateOrderStatusStage) // 7. 更新订单状态 + .build(); + } + + /** + * 创建订单取消Pipeline + */ + public Pipeline createCancelOrderPipeline() { + return new PipelineBuilder("CancelOrderPipeline") + .addStage(validateOrderStage) // 1. 验证订单 + .addStage(new RefundStage()) // 2. 退款 + .addStage(sendNotificationStage) // 3. 发送通知 + .addStage(updateOrderStatusStage) // 4. 更新状态 + .build(); + } + + /** + * 根据订单类型创建Pipeline + */ + public Pipeline createPipeline(OrderType type) { + return switch (type) { + case NEW -> createNewOrderPipeline(); + case CANCEL -> createCancelOrderPipeline(); + case REFUND -> createRefundPipeline(); + }; + } +} +``` + +### 6. 执行Pipeline + +```java +@Service +public class OrderService { + + @Autowired + private OrderPipelineFactory pipelineFactory; + + public void processNewOrder(Long orderId, Long userId) { + // 1. 创建Context + OrderProcessContext context = OrderProcessContext.forNewOrder(orderId, userId); + + // 2. 可选:配置Stage开关 + context.disableStage("send_notification"); // 禁用通知 + + // 3. 创建Pipeline + Pipeline pipeline = pipelineFactory.createNewOrderPipeline(); + + // 4. 执行Pipeline + try { + boolean success = pipeline.execute(context); + + if (success) { + log.info("订单处理成功: orderId={}, 支付结果={}", + orderId, context.getPaymentResult()); + } else { + log.error("订单处理失败: orderId={}", orderId); + // 处理失败逻辑 + } + + } catch (PipelineException e) { + log.error("Pipeline执行异常: orderId={}", orderId, e); + // 异常处理逻辑 + } + } +} +``` + +## 最佳实践 + +### 1. Context设计原则 + +**DO ✅**: +- 使用Builder模式构建Context +- 提供静态工厂方法简化创建(如`forNewOrder()`) +- 实现`cleanup()`方法清理临时资源 +- 使用`Map`统一管理Stage开关 +- 在`beforePipeline()`中进行埋点、日志记录 +- 在`afterPipeline()`中进行结果汇总、通知 + +**DON'T ❌**: +- 不要在Context中包含业务逻辑 +- 不要在Context中直接持有Spring Bean +- 不要在Context中持有大量临时数据(应及时清理) +- 不要在多个Pipeline之间共享Context实例 + +### 2. Stage设计原则 + +**DO ✅**: +- 遵循单一职责原则,每个Stage只做一件事 +- 在`shouldExecuteByBusinessLogic()`有判断时,必须在`doExecute()`开头添加防御性检查 +- 使用`@StageConfig`注解声明Stage元信息 +- 合理选择`optionalMode`: + - 必需的Stage使用`FORCE_ON` + - 可选的Stage使用`SUPPORT` + - 内部逻辑控制的Stage使用`UNSUPPORT` +- 捕获异常并返回`StageResult.failed()`或`StageResult.degraded()` +- 在关键操作处记录debug/info日志 +- 实现幂等性(相同输入产生相同输出) + +**DON'T ❌**: +- 不要在Stage之间直接调用 +- 不要在Stage中修改其他Stage的状态 +- 不要在Stage中抛出未捕获的异常 +- 不要在Stage中执行耗时的同步操作(考虑异步) +- 不要在Stage中硬编码业务配置 + +### 3. 错误处理策略 + +| 场景 | 返回值 | Pipeline行为 | 使用时机 | +|------|--------|-------------|---------| +| 关键错误,无法继续 | `StageResult.failed()` | 立即终止 | 订单不存在、支付失败 | +| 可降级处理 | `StageResult.degraded()` | 继续执行,记录警告 | 通知发送失败、水印添加失败 | +| 条件不满足 | `StageResult.skipped()` | 继续执行 | 非VIP用户跳过优惠Stage | +| 正常完成 | `StageResult.success()` | 继续执行 | 所有正常情况 | + +**降级策略示例**: + +```java +// 三级降级: 高级方案 → 标准方案 → 基础方案 → 跳过 +List strategies = Arrays.asList( + Strategy.ADVANCED, + Strategy.STANDARD, + Strategy.BASIC, + null // 跳过 +); + +for (int i = 0; i < strategies.size(); i++) { + Strategy strategy = strategies.get(i); + + if (strategy == null) { + return StageResult.degraded("所有方案失败,跳过处理"); + } + + try { + doProcess(context, strategy); + + if (i > 0) { + return StageResult.degraded("降级到: " + strategy); + } + + return StageResult.success("使用: " + strategy); + + } catch (Exception e) { + log.warn("方案 {} 失败: {}", strategy, e.getMessage()); + } +} +``` + +### 4. 性能优化建议 + +**减少不必要的执行**: +- 在`shouldExecuteByBusinessLogic()`中尽早返回false +- 使用`@StageConfig(optionalMode = SUPPORT)`允许外部禁用 +- 合理设置Stage优先级,避免无效排序 + +**避免重复操作**: +- 在Context中缓存加载的数据(如`context.setOrder(order)`) +- 后续Stage直接使用缓存数据(如`context.getOrder()`) + +**异步处理**: +```java +@Override +protected StageResult doExecute(MyContext context) { + // 对于非关键操作,使用异步执行 + CompletableFuture.runAsync(() -> { + sendNotification(context); + }); + + return StageResult.success("异步发送通知"); +} +``` + +**监控耗时Stage**: +```java +@Override +protected void afterExecute(MyContext context, StageResult result) { + long duration = result.getDuration(); + if (duration > 1000) { + log.warn("Stage执行耗时过长: {}ms", duration); + } +} +``` + +### 5. 测试最佳实践 + +**单元测试结构**: + +```java +@ExtendWith(MockitoExtension.class) +class ValidateOrderStageTest { + + @Mock + private OrderRepository orderRepository; + + @InjectMocks + private ValidateOrderStage stage; + + private OrderProcessContext context; + + @BeforeEach + void setUp() { + context = OrderProcessContext.forNewOrder(1L, 100L); + } + + @Test + void testExecute_Success() { + // Given + Order order = new Order(); + order.setId(1L); + order.setStatus(OrderStatus.PENDING); + order.setAmount(100.0); + + when(orderRepository.findById(1L)).thenReturn(order); + + // When + StageResult result = stage.execute(context); + + // Then + assertTrue(result.isSuccess()); + assertEquals(order, context.getOrder()); + verify(orderRepository, times(1)).findById(1L); + } + + @Test + void testExecute_OrderNotFound_Failed() { + // Given + when(orderRepository.findById(1L)).thenReturn(null); + + // When + StageResult result = stage.execute(context); + + // Then + assertTrue(result.isFailed()); + assertTrue(result.getMessage().contains("订单不存在")); + } + + @Test + void testExecute_InvalidStatus_Failed() { + // Given + Order order = new Order(); + order.setStatus(OrderStatus.COMPLETED); + + when(orderRepository.findById(1L)).thenReturn(order); + + // When + StageResult result = stage.execute(context); + + // Then + assertTrue(result.isFailed()); + assertTrue(result.getMessage().contains("订单状态异常")); + } + + @Test + void testExecute_Exception_Failed() { + // Given + when(orderRepository.findById(1L)) + .thenThrow(new RuntimeException("Database error")); + + // When + StageResult result = stage.execute(context); + + // Then + assertTrue(result.isFailed()); + assertNotNull(result.getException()); + } +} +``` + +**Pipeline集成测试**: + +```java +@SpringBootTest +class OrderPipelineIntegrationTest { + + @Autowired + private OrderPipelineFactory pipelineFactory; + + @Test + void testNewOrderPipeline_Success() { + // Given + OrderProcessContext context = OrderProcessContext.forNewOrder(1L, 100L); + + // When + Pipeline pipeline = pipelineFactory.createNewOrderPipeline(); + boolean success = pipeline.execute(context); + + // Then + assertTrue(success); + assertNotNull(context.getOrder()); + assertNotNull(context.getPaymentResult()); + assertEquals(OrderStatus.COMPLETED, context.getOrder().getStatus()); + } + + @Test + void testNewOrderPipeline_RiskCheckTriggersReview() { + // Given + OrderProcessContext context = OrderProcessContext.forNewOrder(999L, 100L); + // 999是高风险订单ID + + // When + Pipeline pipeline = pipelineFactory.createNewOrderPipeline(); + boolean success = pipeline.execute(context); + + // Then + assertTrue(success); + // 验证动态添加了人工审核Stage + assertTrue(context.isReviewed()); + } +} +``` + +## 常见问题 + +### Q1: 如何跳过某个Stage? + +**A**: 有三种方式: + +1. **外部配置**(适用于`optionalMode = SUPPORT`的Stage): +```java +context.disableStage("watermark"); +``` + +2. **业务逻辑判断**(在`shouldExecuteByBusinessLogic()`中返回false): +```java +@Override +protected boolean shouldExecuteByBusinessLogic(MyContext context) { + return context.needsProcessing(); // 返回false则跳过 +} +``` + +3. **构建时不添加**: +```java +builder.addStageIf(needWatermark, new WatermarkStage()); +``` + +### Q2: 如何在运行时决定是否添加某个Stage? + +**A**: 使用动态Stage添加: + +```java +@Override +protected StageResult doExecute(MyContext context) { + if (needsExtraProcessing(context)) { + ExtraStage extraStage = new ExtraStage(); + return StageResult.successWithNext("添加额外处理", extraStage); + } + return StageResult.success(); +} +``` + +或使用`PipelineBuilder.addStageIf()`: + +```java +Pipeline pipeline = new PipelineBuilder<>("MyPipeline") + .addStage(stage1) + .addStageIf(condition, stage2) // 条件性添加 + .addStage(stage3) + .build(); +``` + +### Q3: Stage执行失败后Pipeline会怎样? + +**A**: +- `StageResult.failed()`: Pipeline**立即终止**,不执行后续Stage +- `StageResult.degraded()`: Pipeline**继续执行**,但记录警告日志 +- `StageResult.skipped()`: Pipeline**继续执行**,视为正常跳过 + +### Q4: 如何处理Stage执行超时? + +**A**: 当前框架不直接支持超时控制,可以在Stage内部实现: + +```java +@Override +protected StageResult doExecute(MyContext context) { + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future future = executor.submit(() -> doHeavyWork(context)); + + try { + Result result = future.get(5, TimeUnit.SECONDS); + return StageResult.success("处理完成"); + } catch (TimeoutException e) { + future.cancel(true); + log.warn("Stage执行超时"); + return StageResult.degraded("超时,使用降级方案"); + } finally { + executor.shutdown(); + } +} +``` + +### Q5: 临时资源什么时候被清理? + +**A**: +- **推荐**: 在Pipeline最后添加CleanupStage +- **兜底**: Pipeline在finally块中会自动调用`context.cleanup()` +- **手动**: 也可以在业务代码中手动调用`context.cleanup()` + +```java +Pipeline pipeline = new PipelineBuilder<>("MyPipeline") + .addStage(stage1) + .addStage(stage2) + .addStage(new CleanupStage()) // 显式添加清理Stage + .build(); + +// Pipeline会在finally中再次调用cleanup(),确保清理 +``` + +### Q6: 如何获取Pipeline执行结果? + +**A**: +- Pipeline返回`boolean`(成功/失败) +- 具体结果数据存储在Context中 + +```java +boolean success = pipeline.execute(context); +if (success) { + // 从Context获取结果 + PaymentResult result = context.getPaymentResult(); + Order order = context.getOrder(); +} +``` + +### Q7: 如何调试Pipeline执行? + +**A**: +- 查看Pipeline日志(包含每个Stage的执行状态和耗时) +- 使用`pipeline.getStageNames()`查看Stage列表 +- 在Context中添加调试字段 + +```java +// 启用DEBUG日志 +logging: + level: + com.ycwl.basic.pipeline: DEBUG + +// 日志输出示例 +[MyPipeline] Pipeline 开始执行, Stage 数量: 5 +[MyPipeline] [1/5] 准备执行 Stage: ValidateOrder +[MyPipeline] ✓ Stage ValidateOrder - SUCCESS (耗时: 15ms) +[MyPipeline] [2/5] 准备执行 Stage: RiskCheck +[MyPipeline] Stage RiskCheck 动态添加了 1 个后续 Stage +[MyPipeline] ✓ Stage RiskCheck - SUCCESS (耗时: 120ms) +``` + +### Q8: 如何支持并行执行Stage? + +**A**: 当前框架不直接支持并行执行,所有Stage都是顺序执行。如果需要并行,可以: + +1. 在单个Stage内部使用`CompletableFuture`并行处理 +2. 将独立的子任务拆分为异步Stage + +```java +@Override +protected StageResult doExecute(MyContext context) { + // 并行执行多个独立任务 + CompletableFuture task1 = CompletableFuture.runAsync(() -> doTask1(context)); + CompletableFuture task2 = CompletableFuture.runAsync(() -> doTask2(context)); + CompletableFuture task3 = CompletableFuture.runAsync(() -> doTask3(context)); + + // 等待所有任务完成 + CompletableFuture.allOf(task1, task2, task3).join(); + + return StageResult.success("并行任务完成"); +} +``` + +### Q9: Stage的优先级如何设置? + +**A**: +- 实现`PipelineStage.getPriority()`方法(默认返回0) +- 调用`PipelineBuilder.sortByPriority()`排序 + +```java +@Override +public int getPriority() { + return -100; // 数值越小优先级越高 +} + +Pipeline pipeline = new PipelineBuilder<>("MyPipeline") + .addStage(normalStage) // priority = 0 + .addStage(prepareStage) // priority = -100 + .addStage(cleanupStage) // priority = 999 + .sortByPriority() // 排序: prepareStage → normalStage → cleanupStage + .build(); +``` + +### Q10: 如何避免Stage循环依赖? + +**A**: +- Pipeline内置了循环检测(最大执行100个Stage) +- 如果超过限制会抛出`PipelineException` +- 设计Stage时避免互相添加对方作为后续Stage + +```java +// BAD ❌ - 会导致无限循环 +@Override +protected StageResult doExecute(MyContext context) { + return StageResult.successWithNext("添加自己", this); // 无限循环! +} + +// GOOD ✅ - 有明确的终止条件 +@Override +protected StageResult doExecute(MyContext context) { + if (context.getRetryCount() < 3) { + context.incrementRetryCount(); + return StageResult.successWithNext("重试", this); + } + return StageResult.success("重试完成"); +} +``` + +## 架构设计 + +### 设计模式 + +| 模式 | 应用 | 说明 | +|------|------|------| +| 责任链模式 | Pipeline + Stage | Stage按顺序处理,每个Stage决定是否处理和传递 | +| Builder模式 | PipelineBuilder | 灵活组装Pipeline | +| 模板方法模式 | AbstractPipelineStage | 定义执行流程,子类实现具体逻辑 | +| 策略模式 | StageOptionalMode | 不同的Stage执行策略(FORCE_ON/SUPPORT/UNSUPPORT) | +| 工厂模式 | PipelineFactory | 创建不同场景的Pipeline | +| 状态模式 | StageResult.Status | 四种执行状态(SUCCESS/SKIPPED/DEGRADED/FAILED) | + +### 核心流程图 + +``` +Pipeline.execute(context) + │ + ├─→ context.beforePipeline() + │ + ├─→ for (stage : stages) + │ │ + │ ├─→ shouldExecute(context)? + │ │ │ + │ │ ├─→ false: continue + │ │ │ + │ │ └─→ true: + │ │ ├─→ beforeExecute(context) + │ │ ├─→ doExecute(context) → result + │ │ ├─→ afterExecute(context, result) + │ │ │ + │ │ ├─→ result.hasNextStages()? + │ │ │ └─→ yes: 动态插入Stage + │ │ │ + │ │ └─→ result.isFailed()? + │ │ └─→ yes: 终止Pipeline + │ │ + │ └─→ 记录日志(✓成功 ○跳过 △降级 ✗失败) + │ + ├─→ context.afterPipeline() + │ + └─→ finally: context.cleanup() +``` + +### Stage执行判断流程 + +``` +AbstractPipelineStage.shouldExecute(context) + │ + ├─→ 获取@StageConfig注解 + │ + ├─→ optionalMode == FORCE_ON? + │ └─→ yes: 跳过外部配置检查 + │ + ├─→ optionalMode == SUPPORT? + │ │ + │ └─→ yes: + │ ├─→ context.isStageEnabled(stageId, defaultEnabled) + │ │ │ + │ │ ├─→ false: 返回false(外部禁用) + │ │ │ + │ │ └─→ true: 继续检查业务逻辑 + │ + └─→ shouldExecuteByBusinessLogic(context) + │ + ├─→ true: 执行Stage + │ + └─→ false: 跳过Stage +``` + +## 已知限制 + +1. **不支持Stage并行执行**: 所有Stage都是顺序执行,需要并行的场景需在Stage内部实现 +2. **不支持Stage执行超时**: 需要在Stage内部自行实现超时控制 +3. **不支持Pipeline暂停/恢复**: Pipeline执行一旦开始就会运行到结束或失败 +4. **动态添加的Stage无法排序**: `sortByPriority()`只对初始Stage有效 +5. **Context不支持并发访问**: 需要在业务代码中保证线程安全 + +## 未来改进方向 + +- 🔄 支持Stage并行执行(ParallelStage) +- 🔄 支持Stage执行超时控制(@StageConfig.timeout) +- 🔄 支持Pipeline执行的暂停/恢复 +- 🔄 支持更细粒度的性能监控(Metrics集成) +- 🔄 支持Stage执行的重试机制(@StageConfig.retry) +- 🔄 支持Pipeline执行的可视化追踪 +- 🔄 支持Stage间的数据传递验证(Schema验证) + +## 相关文档 + +- [人脸匹配Pipeline文档](../face/pipeline/CLAUDE.md) - 完整的人脸识别业务实现 +- [图片处理Pipeline文档](../image/pipeline/CLAUDE.md) - 完整的图片处理业务实现 +- [Integration包文档](../integration/CLAUDE.md) - 微服务集成框架 + +## 维护者 + +- Pipeline通用框架 - 基础架构团队 diff --git a/src/test/java/com/ycwl/basic/face/pipeline/integration/FaceMatchingPipelineIntegrationTest.java b/src/test/java/com/ycwl/basic/face/pipeline/integration/FaceMatchingPipelineIntegrationTest.java index 4bcf4d9c..e73e9c08 100644 --- a/src/test/java/com/ycwl/basic/face/pipeline/integration/FaceMatchingPipelineIntegrationTest.java +++ b/src/test/java/com/ycwl/basic/face/pipeline/integration/FaceMatchingPipelineIntegrationTest.java @@ -1,7 +1,7 @@ package com.ycwl.basic.face.pipeline.integration; import com.ycwl.basic.face.pipeline.core.FaceMatchingContext; -import com.ycwl.basic.face.pipeline.core.Pipeline; +import com.ycwl.basic.pipeline.core.Pipeline; import com.ycwl.basic.face.pipeline.enums.FaceMatchingScene; import com.ycwl.basic.face.pipeline.factory.FaceMatchingPipelineFactory; import org.junit.jupiter.api.Test;