- 新增CLAUDE.md文件,详细描述Pipeline框架的设计理念与使用方法 - 介绍责任链模式、Builder模式在Pipeline中的应用 - 说明动态Stage添加、降级策略、配置驱动等核心特性 - 提供完整的业务实现示例(人脸匹配、图片处理) - 详细阐述Pipeline、PipelineContext、PipelineStage等核心组件 - 描述StageResult状态管理及@StageConfig注解使用方式 - 展示PipelineBuilder构建器模式的灵活用法 - 提供从Context创建到Stage实现再到Pipeline组装的全流程指导 - 总结最佳实践,包括错误处理策略、性能优化建议和测试方法 - 回答常见问题,如跳过Stage、动态添加Stage及超时处理等场景
49 KiB
Pipeline 通用管线框架
概述
Pipeline 是一个通用的、可扩展的管线框架,实现了责任链模式(Chain of Responsibility Pattern),用于组织和执行一系列处理阶段(Stage)。该框架支持动态Stage添加、多级降级策略、配置驱动的Stage控制,以及完善的错误处理机制。
核心特性
- 责任链模式: 将复杂流程拆分为独立的Stage,按顺序执行
- Builder模式: 灵活组装管线,支持条件性添加Stage
- 动态Stage添加: 支持在运行时根据条件动态插入后续Stage
- 降级策略: 支持多级降级执行,确保管线在异常情况下的稳定性
- 配置驱动: 支持通过外部配置控制Stage的启用/禁用
- 类型安全: 使用泛型确保类型安全
- 解耦设计: Context独立于业务模型,支持多种使用场景
- 自动清理: 无论成功或失败都会在finally中调用
context.cleanup()
包结构
com.ycwl.basic.pipeline
├── annotation/ # 注解定义
│ └── StageConfig # Stage配置注解
├── core/ # 核心类
│ ├── AbstractPipelineStage # Stage抽象基类
│ ├── PipelineContext # 管线上下文接口
│ ├── Pipeline # 管线执行器
│ ├── PipelineBuilder # 管线构建器
│ ├── PipelineStage # Stage接口
│ └── StageResult # Stage执行结果
├── enums/ # 枚举定义
│ └── StageOptionalMode # Stage可选模式枚举
└── exception/ # 异常类
└── PipelineException # 管线异常
业务实现包
框架提供了两个完整的业务实现示例:
1. 人脸匹配管线 (com.ycwl.basic.face.pipeline)
- 支持自动人脸匹配、自定义人脸匹配、仅识别三种场景
- 20个Stage实现完整的人脸识别和匹配流程
- 详见 FaceMatchingPipeline 文档
2. 图片处理管线 (com.ycwl.basic.image.pipeline)
- 支持打印、拼图、超分辨率增强等场景
- 包含下载、水印、旋转、质量检测、上传等Stage
- 详见 ImagePipeline 文档
核心组件
1. Pipeline - 管线执行器
职责: 按顺序执行一系列Stage,管理执行流程和异常处理。
关键特性:
- 顺序执行所有Stage
- 支持动态添加后续Stage
- 循环检测(最大执行100个Stage,防止无限循环)
- 详细的日志输出(带状态图标: ✓成功 ○跳过 △降级 ✗失败)
- 异常安全保证(finally块中cleanup)
源码位置: Pipeline.java:23-89
执行流程:
1. context.beforePipeline()
2. for (stage : stages)
2.1 if (!stage.shouldExecute(context)) continue
2.2 result = stage.execute(context)
2.3 if (result.hasNextStages()) 动态插入Stage
2.4 if (result.isFailed()) 终止执行
3. context.afterPipeline()
4. finally: context.cleanup()
使用示例:
// 方式1: 使用Builder构建
Pipeline<FaceMatchingContext> pipeline = new PipelineBuilder<>("FaceMatching")
.addStage(prepareContextStage)
.addStage(faceRecognitionStage)
.addStage(updateFaceResultStage)
.sortByPriority() // 按优先级排序(可选)
.build();
// 方式2: 直接构造
List<PipelineStage<MyContext>> stages = Arrays.asList(stage1, stage2, stage3);
Pipeline<MyContext> pipeline = new Pipeline<>("MyPipeline", stages);
// 执行管线
boolean success = pipeline.execute(context);
if (success) {
log.info("管线执行成功");
} else {
log.error("管线执行失败");
}
重要方法:
execute(C context): 执行管线,返回成功/失败getName(): 获取管线名称getStageCount(): 获取Stage总数getStageNames(): 获取所有Stage名称列表
日志输出示例:
[FaceMatching] Pipeline 开始执行, Stage 数量: 13
[FaceMatching] ✓ Stage PrepareContext - SUCCESS (耗时: 15ms)
[FaceMatching] ○ Stage FaceRecovery - SKIPPED (耗时: 2ms)
[FaceMatching] △ Stage VideoRecreation - DEGRADED (耗时: 120ms)
[FaceMatching] Pipeline 执行完成, 总Stage数: 13, 实际执行: 11, 耗时: 450ms
2. PipelineContext - 管线上下文接口
职责: 定义Stage之间传递状态和数据的通用接口,提供生命周期钩子和Stage开关控制。
源码位置: PipelineContext.java:7-50
核心方法:
| 方法 | 职责 | 调用时机 |
|---|---|---|
beforePipeline() |
Pipeline开始执行前的钩子 | Pipeline.execute()开始时 |
afterPipeline() |
Pipeline全部Stage执行完成后的钩子 | Pipeline.execute()成功完成时 |
cleanup() |
Pipeline结束时的清理钩子 | Pipeline.execute() finally块中 |
isStageEnabled(stageId, defaultEnabled) |
判断指定Stage是否启用 | AbstractPipelineStage.shouldExecute()中 |
isStageEnabled(stageId) |
判断指定Stage是否启用(默认false) | AbstractPipelineStage.shouldExecute()中 |
实现示例:
@Getter
@Setter
public class FaceMatchingContext implements PipelineContext {
// ==================== 核心字段 ====================
private final Long faceId;
private final boolean isNew;
// ==================== Stage配置 ====================
private Map<String, Boolean> stageEnabledMap = new HashMap<>();
// ==================== 实现PipelineContext接口 ====================
@Override
public boolean isStageEnabled(String stageId, boolean defaultEnabled) {
return stageEnabledMap.getOrDefault(stageId, defaultEnabled);
}
@Override
public void beforePipeline() {
log.info("开始人脸匹配流程: faceId={}, isNew={}", faceId, isNew);
}
@Override
public void afterPipeline() {
log.info("完成人脸匹配流程: faceId={}, 结果={}", faceId, finalResult);
}
@Override
public void cleanup() {
// 清理临时资源
tempFileManager.cleanup();
}
// ==================== 业务方法 ====================
public FaceMatchingContext setStageState(String stageId, boolean enabled) {
stageEnabledMap.put(stageId, enabled);
return this;
}
public FaceMatchingContext enableStage(String stageId) {
stageEnabledMap.put(stageId, true);
return this;
}
public FaceMatchingContext disableStage(String stageId) {
stageEnabledMap.put(stageId, false);
return this;
}
}
使用建议:
- 所有业务Context都应实现此接口
- 通过
stageEnabledMap集中管理Stage开关配置 - 在
cleanup()中释放所有临时资源(文件、连接等) - 在
beforePipeline()中进行埋点、日志记录 - 在
afterPipeline()中进行结果汇总、通知等
3. PipelineStage - Stage接口
职责: 定义Stage的通用行为和执行流程。
源码位置: PipelineStage.java
核心方法:
| 方法 | 职责 | 返回值 |
|---|---|---|
getName() |
获取Stage名称 | String |
shouldExecute(C context) |
判断是否应该执行 | boolean |
execute(C context) |
执行Stage逻辑 | StageResult |
getPriority() |
获取优先级(默认0) | int |
getStageConfig() |
获取Stage配置注解 | StageConfig |
优先级说明:
- 数值越小优先级越高
- 默认优先级为0
- 可通过
PipelineBuilder.sortByPriority()对Stage排序 - 常见优先级设置:
-100: 极高优先级(如PrepareContext)0: 正常优先级(大多数Stage)999: 清理Stage(如Cleanup)
4. AbstractPipelineStage - Stage抽象基类
职责: 提供Stage的通用实现和模板方法,封装可选性判断和钩子逻辑。
源码位置: AbstractPipelineStage.java:10-65
执行流程:
shouldExecute(context)
↓ true
beforeExecute(context)
↓
doExecute(context)
↓
afterExecute(context, result)
↓
return result
关键方法:
| 方法 | 访问修饰符 | 子类需要实现 | 职责 |
|---|---|---|---|
getName() |
abstract | ✅ | 返回Stage名称 |
doExecute(C context) |
protected abstract | ✅ | 实现具体处理逻辑 |
shouldExecuteByBusinessLogic(C context) |
protected | ❌ (默认true) | 实现业务逻辑判断 |
beforeExecute(C context) |
protected | ❌ | 执行前钩子 |
afterExecute(C context, StageResult<C> result) |
protected | ❌ | 执行后钩子 |
shouldExecute(C context) |
public final | ❌ | Stage执行判断(封装) |
execute(C context) |
public final | ❌ | Stage执行入口(封装) |
shouldExecute()执行判断逻辑:
@Override
public final boolean shouldExecute(C context) {
StageConfig config = getStageConfig();
if (config != null) {
String stageId = config.stageId();
StageOptionalMode mode = config.optionalMode();
// 1. FORCE_ON: 强制执行,只检查业务逻辑
if (mode == StageOptionalMode.FORCE_ON) {
return shouldExecuteByBusinessLogic(context);
}
// 2. SUPPORT: 检查外部配置
if (mode == StageOptionalMode.SUPPORT) {
boolean externalEnabled = context.isStageEnabled(stageId, config.defaultEnabled());
if (!externalEnabled) {
log.debug("[{}] Stage被外部配置禁用", stageId);
return false;
}
}
// 3. UNSUPPORT: 不检查外部配置,直接检查业务逻辑
}
// 最终检查业务逻辑
return shouldExecuteByBusinessLogic(context);
}
实现示例:
@Slf4j
@Component
@StageConfig(
stageId = "prepare_context",
optionalMode = StageOptionalMode.FORCE_ON,
description = "准备人脸匹配上下文数据"
)
public class PrepareContextStage extends AbstractPipelineStage<FaceMatchingContext> {
@Autowired
private FaceRepository faceRepository;
@Override
public String getName() {
return "PrepareContext";
}
@Override
protected boolean shouldExecuteByBusinessLogic(FaceMatchingContext context) {
// 可选:实现业务逻辑判断
return true;
}
@Override
protected void beforeExecute(FaceMatchingContext context) {
// 可选:执行前钩子
log.debug("准备加载人脸数据: faceId={}", context.getFaceId());
}
@Override
protected StageResult<FaceMatchingContext> doExecute(FaceMatchingContext context) {
Long faceId = context.getFaceId();
// 1. 加载人脸实体
FaceEntity face = faceRepository.getFace(faceId);
if (face == null) {
return StageResult.failed("人脸不存在,faceId: " + faceId);
}
// 2. 更新Context
context.setFace(face);
// 3. 返回成功结果
return StageResult.success("上下文准备完成");
}
@Override
protected void afterExecute(FaceMatchingContext context, StageResult<FaceMatchingContext> result) {
// 可选:执行后钩子
if (result.isSuccess()) {
log.info("上下文准备成功: memberId={}", context.getFace().getMemberId());
}
}
}
防御性编程模式 ⚠️:
对于在shouldExecuteByBusinessLogic()中有条件判断的Stage,必须在doExecute()开头添加相同的防御性检查,以防止单元测试直接调用execute()时跳过条件检查:
@Override
protected boolean shouldExecuteByBusinessLogic(FaceMatchingContext context) {
// 业务逻辑判断
if (context.getSearchResult() == null) {
return false;
}
return true;
}
@Override
protected StageResult<FaceMatchingContext> doExecute(FaceMatchingContext context) {
// ⚠️ 防御性检查:必须添加!
// 防止单元测试直接调用execute()时跳过shouldExecute()的检查
if (context.getSearchResult() == null) {
log.debug("searchResult为空,跳过处理");
return StageResult.skipped("searchResult为空");
}
// 正常处理逻辑
// ...
}
原因: Pipeline执行时会先调用shouldExecute()检查,但单元测试可能直接调用stage.execute(context),导致跳过shouldExecuteByBusinessLogic()的检查,直接进入doExecute()。
5. StageResult - Stage执行结果
职责: 封装Stage执行结果,包括状态、消息、异常和后续Stage。
源码位置: StageResult.java:13-93
状态类型:
| 状态 | 枚举值 | 含义 | Pipeline行为 | 使用场景 |
|---|---|---|---|---|
| SUCCESS | Status.SUCCESS |
执行成功 | 继续执行 | 正常完成处理 |
| SKIPPED | Status.SKIPPED |
跳过执行 | 继续执行 | 条件不满足,跳过当前Stage |
| DEGRADED | Status.DEGRADED |
降级执行 | 继续执行(记录警告) | 使用备用方案或部分失败 |
| FAILED | Status.FAILED |
执行失败 | 立即终止Pipeline | 关键错误,无法继续 |
核心字段:
| 字段 | 类型 | 说明 |
|---|---|---|
status |
Status |
执行状态 |
message |
String |
结果消息(可选) |
exception |
Throwable |
异常信息(可选) |
nextStages |
List<PipelineStage<C>> |
后续Stage列表(动态添加) |
静态工厂方法:
// ==================== SUCCESS ====================
// 简单成功
StageResult.success();
// 成功并附带消息
StageResult.success("处理完成");
// 成功并动态添加单个后续Stage
StageResult.successWithNext("质量不佳,添加增强", new ImageEnhanceStage());
// 成功并动态添加多个后续Stage(可变参数)
StageResult.successWithNext("检测到异常,添加修复Stage",
repairStage1, repairStage2);
// 成功并动态添加多个后续Stage(List)
List<PipelineStage<C>> stages = Arrays.asList(stage1, stage2, stage3);
StageResult.successWithNext("批量添加Stage", stages);
// ==================== SKIPPED ====================
// 简单跳过
StageResult.skipped(); // 默认消息: "条件不满足,跳过执行"
// 跳过并附带原因
StageResult.skipped("memberSourceList为空");
StageResult.skipped("非自定义匹配场景");
// ==================== FAILED ====================
// 失败并附带消息
StageResult.failed("人脸不存在");
// 失败并附带消息和异常
StageResult.failed("处理失败", exception);
// ==================== DEGRADED ====================
// 降级并附带原因
StageResult.degraded("使用备用水印方案");
StageResult.degraded("视频重切处理失败");
判断方法:
StageResult<MyContext> result = stage.execute(context);
// 状态判断
if (result.isSuccess()) { }
if (result.isSkipped()) { }
if (result.isFailed()) { }
if (result.isDegraded()) { }
// 是否可以继续执行(SUCCESS、SKIPPED、DEGRADED都返回true)
if (result.canContinue()) { }
// 获取字段
String message = result.getMessage();
Throwable exception = result.getException();
List<PipelineStage<C>> nextStages = result.getNextStages();
使用示例 - 动态添加Stage:
@Override
protected StageResult<PhotoProcessContext> doExecute(PhotoProcessContext context) {
File currentFile = context.getCurrentFile();
// 检测图片质量
double quality = detectQuality(currentFile);
// 如果质量低于阈值,动态添加增强Stage
if (quality < 0.7) {
ImageEnhanceStage enhanceStage = new ImageEnhanceStage(config);
return StageResult.successWithNext(
String.format("质量%.2f低于阈值,添加增强Stage", quality),
enhanceStage
);
}
return StageResult.success(String.format("质量%.2f良好", quality));
}
使用示例 - 降级策略:
@Override
protected StageResult<PhotoProcessContext> doExecute(PhotoProcessContext context) {
// 尝试高级水印
try {
applyAdvancedWatermark(context);
return StageResult.success("应用高级水印");
} catch (Exception e) {
log.warn("高级水印失败,尝试降级", e);
}
// 降级到基础水印
try {
applyBasicWatermark(context);
return StageResult.degraded("降级: 应用基础水印");
} catch (Exception e) {
log.warn("基础水印也失败,跳过水印", e);
}
// 完全跳过水印
return StageResult.degraded("降级: 跳过水印处理");
}
6. @StageConfig - Stage配置注解
职责: 统一声明Stage的元信息与可选性控制。
源码位置: StageConfig.java:17-38
注解字段:
| 字段 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
stageId |
String | ✅ | - | Stage唯一标识,用于外部配置引用 |
optionalMode |
StageOptionalMode | ❌ | UNSUPPORT | 可选性模式 |
description |
String | ❌ | "" | 描述信息,便于文档与日志 |
defaultEnabled |
boolean | ❌ | true | 默认是否启用(仅SUPPORT模式生效) |
StageOptionalMode枚举:
| 模式 | 说明 | 外部配置 | 业务逻辑 | 使用场景 |
|---|---|---|---|---|
UNSUPPORT |
不支持外部配置 | ❌ | ✅ | Stage完全由代码逻辑控制 |
SUPPORT |
支持外部配置 | ✅ | ✅ | Stage可通过配置启用/禁用 |
FORCE_ON |
强制开启 | ❌ | ✅ | Stage不允许被外部禁用(如Prepare、Cleanup) |
判断流程:
FORCE_ON:
→ 跳过外部配置检查
→ 检查业务逻辑 (shouldExecuteByBusinessLogic)
SUPPORT:
→ 检查外部配置 (context.isStageEnabled)
→ 如果禁用: 返回false
→ 如果启用: 检查业务逻辑
UNSUPPORT:
→ 跳过外部配置检查
→ 检查业务逻辑
使用示例:
// 1. 强制执行的Stage(如上下文准备)
@StageConfig(
stageId = "prepare_context",
optionalMode = StageOptionalMode.FORCE_ON,
description = "准备人脸匹配上下文数据"
)
public class PrepareContextStage extends AbstractPipelineStage<FaceMatchingContext> {
// 总是执行,不允许外部禁用
}
// 2. 支持外部配置的Stage(如水印、增强)
@StageConfig(
stageId = "watermark",
optionalMode = StageOptionalMode.SUPPORT,
description = "水印处理",
defaultEnabled = true // 默认启用,可通过配置禁用
)
public class WatermarkStage extends AbstractPipelineStage<PhotoProcessContext> {
// 可通过context.disableStage("watermark")禁用
}
// 3. 不支持外部配置的Stage(如内部逻辑判断)
@StageConfig(
stageId = "restore_orientation",
optionalMode = StageOptionalMode.UNSUPPORT,
description = "恢复图片方向"
)
public class RestoreOrientationStage extends AbstractPipelineStage<PhotoProcessContext> {
// 完全由业务逻辑控制(如是否旋转过)
@Override
protected boolean shouldExecuteByBusinessLogic(PhotoProcessContext context) {
return context.isRotationApplied();
}
}
7. PipelineBuilder - 管线构建器
职责: 使用Builder模式灵活组装管线。
源码位置: PipelineBuilder.java:10-59
核心方法:
| 方法 | 参数 | 返回值 | 说明 |
|---|---|---|---|
name(String) |
管线名称 | PipelineBuilder | 设置管线名称 |
addStage(PipelineStage) |
单个Stage | PipelineBuilder | 添加Stage |
addStages(List<PipelineStage>) |
Stage列表 | PipelineBuilder | 批量添加Stage |
addStageIf(boolean, PipelineStage) |
条件, Stage | PipelineBuilder | 条件性添加Stage |
sortByPriority() |
- | PipelineBuilder | 按优先级排序Stage |
build() |
- | Pipeline | 构建管线 |
使用示例:
// 1. 基础用法
Pipeline<MyContext> pipeline = new PipelineBuilder<>("MyPipeline")
.addStage(stage1)
.addStage(stage2)
.addStage(stage3)
.build();
// 2. 带名称
Pipeline<MyContext> pipeline = new PipelineBuilder<>()
.name("CustomPipeline")
.addStage(stage1)
.build();
// 3. 条件性添加Stage
Pipeline<PhotoProcessContext> pipeline = new PipelineBuilder<>("PrintPipeline")
.addStage(new DownloadStage())
.addStageIf(needWatermark, new WatermarkStage(config))
.addStageIf(needEnhance, new ImageEnhanceStage())
.addStage(new UploadStage())
.build();
// 4. 批量添加
List<PipelineStage<MyContext>> coreStages = Arrays.asList(
stage1, stage2, stage3
);
Pipeline<MyContext> pipeline = new PipelineBuilder<>("BatchPipeline")
.addStages(coreStages)
.addStage(cleanupStage)
.build();
// 5. 按优先级排序
Pipeline<MyContext> pipeline = new PipelineBuilder<>("SortedPipeline")
.addStage(normalStage) // priority = 0
.addStage(prepareStage) // priority = -100
.addStage(cleanupStage) // priority = 999
.sortByPriority() // 排序后: prepareStage → normalStage → cleanupStage
.build();
8. PipelineException - 管线异常
职责: 封装管线执行过程中的异常。
源码位置: PipelineException.java
使用场景:
- Stage执行数量超过最大限制(100个)
- Pipeline构建时Stage列表为空
- 其他管线级别的异常
抛出示例:
// Pipeline.java:37
if (executedCount >= maxStages) {
log.error("[{}] Stage执行数量超过最大限制({}), 可能存在循环依赖", name, maxStages);
throw new PipelineException("Stage执行数量超过最大限制,可能存在循环依赖");
}
// Pipeline.java:85
catch (Exception e) {
log.error("[{}] Pipeline执行异常", name, e);
throw new PipelineException("管线执行失败: " + e.getMessage(), e);
}
使用指南
1. 创建自定义Context
@Getter
@Setter
public class OrderProcessContext implements PipelineContext {
// ==================== 核心字段 ====================
private final Long orderId;
private final Long userId;
// ==================== 中间状态 ====================
private Order order;
private List<OrderItem> items;
private PaymentResult paymentResult;
// ==================== Stage配置 ====================
private Map<String, Boolean> stageEnabledMap = new HashMap<>();
// ==================== 构造函数 ====================
private OrderProcessContext(Builder builder) {
this.orderId = builder.orderId;
this.userId = builder.userId;
}
// ==================== 静态工厂方法 ====================
public static Builder builder() {
return new Builder();
}
public static OrderProcessContext forNewOrder(Long orderId, Long userId) {
return builder().orderId(orderId).userId(userId).build();
}
// ==================== 实现PipelineContext接口 ====================
@Override
public boolean isStageEnabled(String stageId, boolean defaultEnabled) {
return stageEnabledMap.getOrDefault(stageId, defaultEnabled);
}
@Override
public void beforePipeline() {
log.info("开始处理订单: orderId={}, userId={}", orderId, userId);
}
@Override
public void afterPipeline() {
log.info("完成处理订单: orderId={}, 支付结果={}", orderId, paymentResult);
}
@Override
public void cleanup() {
// 清理临时资源
}
// ==================== 业务方法 ====================
public OrderProcessContext setStageState(String stageId, boolean enabled) {
stageEnabledMap.put(stageId, enabled);
return this;
}
// ==================== Builder ====================
public static class Builder {
private Long orderId;
private Long userId;
public Builder orderId(Long orderId) {
this.orderId = orderId;
return this;
}
public Builder userId(Long userId) {
this.userId = userId;
return this;
}
public OrderProcessContext build() {
if (orderId == null) {
throw new IllegalArgumentException("orderId is required");
}
if (userId == null) {
throw new IllegalArgumentException("userId is required");
}
return new OrderProcessContext(this);
}
}
}
2. 创建自定义Stage
@Slf4j
@Component
@StageConfig(
stageId = "validate_order",
optionalMode = StageOptionalMode.FORCE_ON,
description = "验证订单有效性"
)
public class ValidateOrderStage extends AbstractPipelineStage<OrderProcessContext> {
@Autowired
private OrderRepository orderRepository;
@Override
public String getName() {
return "ValidateOrder";
}
@Override
protected boolean shouldExecuteByBusinessLogic(OrderProcessContext context) {
// 可选:业务逻辑判断
return true;
}
@Override
protected StageResult<OrderProcessContext> doExecute(OrderProcessContext context) {
Long orderId = context.getOrderId();
try {
// 1. 加载订单
Order order = orderRepository.findById(orderId);
if (order == null) {
return StageResult.failed("订单不存在: " + orderId);
}
// 2. 验证订单状态
if (order.getStatus() != OrderStatus.PENDING) {
return StageResult.failed("订单状态异常: " + order.getStatus());
}
// 3. 验证订单金额
if (order.getAmount() <= 0) {
return StageResult.failed("订单金额异常: " + order.getAmount());
}
// 4. 更新Context
context.setOrder(order);
// 5. 返回成功
return StageResult.success("订单验证通过");
} catch (Exception e) {
log.error("验证订单失败: orderId={}", orderId, e);
return StageResult.failed("验证订单失败: " + e.getMessage(), e);
}
}
}
3. 实现降级策略
@Slf4j
@StageConfig(
stageId = "send_notification",
optionalMode = StageOptionalMode.SUPPORT,
description = "发送通知",
defaultEnabled = true
)
public class SendNotificationStage extends AbstractPipelineStage<OrderProcessContext> {
@Autowired
private EmailService emailService;
@Autowired
private SmsService smsService;
@Override
public String getName() {
return "SendNotification";
}
@Override
protected StageResult<OrderProcessContext> doExecute(OrderProcessContext context) {
// 降级链: Email → SMS → 跳过
List<NotificationStrategy> strategies = Arrays.asList(
NotificationStrategy.EMAIL,
NotificationStrategy.SMS,
null // 跳过通知
);
for (int i = 0; i < strategies.size(); i++) {
NotificationStrategy strategy = strategies.get(i);
// 最后一级:跳过通知
if (strategy == null) {
log.warn("所有通知方式均失败,跳过通知");
return StageResult.degraded("降级: 跳过通知");
}
try {
// 尝试发送通知
sendNotification(context, strategy);
// 如果不是第一个策略,说明发生了降级
if (i > 0) {
String degradeMsg = String.format("降级: %s → %s",
NotificationStrategy.EMAIL, strategy);
log.warn(degradeMsg);
return StageResult.degraded(degradeMsg);
}
// 第一个策略成功
return StageResult.success("通知发送成功: " + strategy);
} catch (Exception e) {
log.warn("通知方式 {} 失败: {}", strategy, e.getMessage());
// 如果是最后一个策略的前一个,记录详细错误
if (i == strategies.size() - 2) {
log.warn("所有通知方式均失败,准备跳过通知", e);
}
}
}
return StageResult.degraded("降级: 跳过通知");
}
private void sendNotification(OrderProcessContext context, NotificationStrategy strategy)
throws Exception {
switch (strategy) {
case EMAIL:
emailService.send(context.getUserId(), buildEmailContent(context));
break;
case SMS:
smsService.send(context.getUserId(), buildSmsContent(context));
break;
}
}
private enum NotificationStrategy {
EMAIL, SMS
}
}
4. 动态添加Stage
@Slf4j
@StageConfig(
stageId = "risk_check",
optionalMode = StageOptionalMode.FORCE_ON,
description = "风险检测"
)
public class RiskCheckStage extends AbstractPipelineStage<OrderProcessContext> {
@Autowired
private RiskService riskService;
@Override
public String getName() {
return "RiskCheck";
}
@Override
protected StageResult<OrderProcessContext> doExecute(OrderProcessContext context) {
Order order = context.getOrder();
// 1. 执行风险检测
RiskResult riskResult = riskService.check(order);
// 2. 根据风险等级动态添加后续Stage
if (riskResult.getLevel() == RiskLevel.HIGH) {
// 高风险:添加人工审核Stage
ManualReviewStage reviewStage = new ManualReviewStage();
return StageResult.successWithNext(
"高风险订单,添加人工审核",
reviewStage
);
} else if (riskResult.getLevel() == RiskLevel.MEDIUM) {
// 中风险:添加额外验证Stage
ExtraVerificationStage verifyStage = new ExtraVerificationStage();
return StageResult.successWithNext(
"中风险订单,添加额外验证",
verifyStage
);
}
// 3. 低风险:直接通过
return StageResult.success("低风险订单,直接通过");
}
}
5. 组装完整Pipeline
@Component
public class OrderPipelineFactory {
@Autowired
private ValidateOrderStage validateOrderStage;
@Autowired
private LoadOrderItemsStage loadOrderItemsStage;
@Autowired
private RiskCheckStage riskCheckStage;
@Autowired
private CalculateAmountStage calculateAmountStage;
@Autowired
private ProcessPaymentStage processPaymentStage;
@Autowired
private SendNotificationStage sendNotificationStage;
@Autowired
private UpdateOrderStatusStage updateOrderStatusStage;
/**
* 创建新订单处理Pipeline
*/
public Pipeline<OrderProcessContext> createNewOrderPipeline() {
return new PipelineBuilder<OrderProcessContext>("NewOrderPipeline")
.addStage(validateOrderStage) // 1. 验证订单
.addStage(loadOrderItemsStage) // 2. 加载订单项
.addStage(riskCheckStage) // 3. 风险检测(可能动态添加审核Stage)
.addStage(calculateAmountStage) // 4. 计算金额
.addStage(processPaymentStage) // 5. 处理支付
.addStage(sendNotificationStage) // 6. 发送通知(可降级)
.addStage(updateOrderStatusStage) // 7. 更新订单状态
.build();
}
/**
* 创建订单取消Pipeline
*/
public Pipeline<OrderProcessContext> createCancelOrderPipeline() {
return new PipelineBuilder<OrderProcessContext>("CancelOrderPipeline")
.addStage(validateOrderStage) // 1. 验证订单
.addStage(new RefundStage()) // 2. 退款
.addStage(sendNotificationStage) // 3. 发送通知
.addStage(updateOrderStatusStage) // 4. 更新状态
.build();
}
/**
* 根据订单类型创建Pipeline
*/
public Pipeline<OrderProcessContext> createPipeline(OrderType type) {
return switch (type) {
case NEW -> createNewOrderPipeline();
case CANCEL -> createCancelOrderPipeline();
case REFUND -> createRefundPipeline();
};
}
}
6. 执行Pipeline
@Service
public class OrderService {
@Autowired
private OrderPipelineFactory pipelineFactory;
public void processNewOrder(Long orderId, Long userId) {
// 1. 创建Context
OrderProcessContext context = OrderProcessContext.forNewOrder(orderId, userId);
// 2. 可选:配置Stage开关
context.disableStage("send_notification"); // 禁用通知
// 3. 创建Pipeline
Pipeline<OrderProcessContext> pipeline = pipelineFactory.createNewOrderPipeline();
// 4. 执行Pipeline
try {
boolean success = pipeline.execute(context);
if (success) {
log.info("订单处理成功: orderId={}, 支付结果={}",
orderId, context.getPaymentResult());
} else {
log.error("订单处理失败: orderId={}", orderId);
// 处理失败逻辑
}
} catch (PipelineException e) {
log.error("Pipeline执行异常: orderId={}", orderId, e);
// 异常处理逻辑
}
}
}
最佳实践
1. Context设计原则
DO ✅:
- 使用Builder模式构建Context
- 提供静态工厂方法简化创建(如
forNewOrder()) - 实现
cleanup()方法清理临时资源 - 使用
Map<String, Boolean>统一管理Stage开关 - 在
beforePipeline()中进行埋点、日志记录 - 在
afterPipeline()中进行结果汇总、通知
DON'T ❌:
- 不要在Context中包含业务逻辑
- 不要在Context中直接持有Spring Bean
- 不要在Context中持有大量临时数据(应及时清理)
- 不要在多个Pipeline之间共享Context实例
2. Stage设计原则
DO ✅:
- 遵循单一职责原则,每个Stage只做一件事
- 在
shouldExecuteByBusinessLogic()有判断时,必须在doExecute()开头添加防御性检查 - 使用
@StageConfig注解声明Stage元信息 - 合理选择
optionalMode:- 必需的Stage使用
FORCE_ON - 可选的Stage使用
SUPPORT - 内部逻辑控制的Stage使用
UNSUPPORT
- 必需的Stage使用
- 捕获异常并返回
StageResult.failed()或StageResult.degraded() - 在关键操作处记录debug/info日志
- 实现幂等性(相同输入产生相同输出)
DON'T ❌:
- 不要在Stage之间直接调用
- 不要在Stage中修改其他Stage的状态
- 不要在Stage中抛出未捕获的异常
- 不要在Stage中执行耗时的同步操作(考虑异步)
- 不要在Stage中硬编码业务配置
3. 错误处理策略
| 场景 | 返回值 | Pipeline行为 | 使用时机 |
|---|---|---|---|
| 关键错误,无法继续 | StageResult.failed() |
立即终止 | 订单不存在、支付失败 |
| 可降级处理 | StageResult.degraded() |
继续执行,记录警告 | 通知发送失败、水印添加失败 |
| 条件不满足 | StageResult.skipped() |
继续执行 | 非VIP用户跳过优惠Stage |
| 正常完成 | StageResult.success() |
继续执行 | 所有正常情况 |
降级策略示例:
// 三级降级: 高级方案 → 标准方案 → 基础方案 → 跳过
List<Strategy> strategies = Arrays.asList(
Strategy.ADVANCED,
Strategy.STANDARD,
Strategy.BASIC,
null // 跳过
);
for (int i = 0; i < strategies.size(); i++) {
Strategy strategy = strategies.get(i);
if (strategy == null) {
return StageResult.degraded("所有方案失败,跳过处理");
}
try {
doProcess(context, strategy);
if (i > 0) {
return StageResult.degraded("降级到: " + strategy);
}
return StageResult.success("使用: " + strategy);
} catch (Exception e) {
log.warn("方案 {} 失败: {}", strategy, e.getMessage());
}
}
4. 性能优化建议
减少不必要的执行:
- 在
shouldExecuteByBusinessLogic()中尽早返回false - 使用
@StageConfig(optionalMode = SUPPORT)允许外部禁用 - 合理设置Stage优先级,避免无效排序
避免重复操作:
- 在Context中缓存加载的数据(如
context.setOrder(order)) - 后续Stage直接使用缓存数据(如
context.getOrder())
异步处理:
@Override
protected StageResult<MyContext> doExecute(MyContext context) {
// 对于非关键操作,使用异步执行
CompletableFuture.runAsync(() -> {
sendNotification(context);
});
return StageResult.success("异步发送通知");
}
监控耗时Stage:
@Override
protected void afterExecute(MyContext context, StageResult<MyContext> result) {
long duration = result.getDuration();
if (duration > 1000) {
log.warn("Stage执行耗时过长: {}ms", duration);
}
}
5. 测试最佳实践
单元测试结构:
@ExtendWith(MockitoExtension.class)
class ValidateOrderStageTest {
@Mock
private OrderRepository orderRepository;
@InjectMocks
private ValidateOrderStage stage;
private OrderProcessContext context;
@BeforeEach
void setUp() {
context = OrderProcessContext.forNewOrder(1L, 100L);
}
@Test
void testExecute_Success() {
// Given
Order order = new Order();
order.setId(1L);
order.setStatus(OrderStatus.PENDING);
order.setAmount(100.0);
when(orderRepository.findById(1L)).thenReturn(order);
// When
StageResult<OrderProcessContext> result = stage.execute(context);
// Then
assertTrue(result.isSuccess());
assertEquals(order, context.getOrder());
verify(orderRepository, times(1)).findById(1L);
}
@Test
void testExecute_OrderNotFound_Failed() {
// Given
when(orderRepository.findById(1L)).thenReturn(null);
// When
StageResult<OrderProcessContext> result = stage.execute(context);
// Then
assertTrue(result.isFailed());
assertTrue(result.getMessage().contains("订单不存在"));
}
@Test
void testExecute_InvalidStatus_Failed() {
// Given
Order order = new Order();
order.setStatus(OrderStatus.COMPLETED);
when(orderRepository.findById(1L)).thenReturn(order);
// When
StageResult<OrderProcessContext> result = stage.execute(context);
// Then
assertTrue(result.isFailed());
assertTrue(result.getMessage().contains("订单状态异常"));
}
@Test
void testExecute_Exception_Failed() {
// Given
when(orderRepository.findById(1L))
.thenThrow(new RuntimeException("Database error"));
// When
StageResult<OrderProcessContext> result = stage.execute(context);
// Then
assertTrue(result.isFailed());
assertNotNull(result.getException());
}
}
Pipeline集成测试:
@SpringBootTest
class OrderPipelineIntegrationTest {
@Autowired
private OrderPipelineFactory pipelineFactory;
@Test
void testNewOrderPipeline_Success() {
// Given
OrderProcessContext context = OrderProcessContext.forNewOrder(1L, 100L);
// When
Pipeline<OrderProcessContext> pipeline = pipelineFactory.createNewOrderPipeline();
boolean success = pipeline.execute(context);
// Then
assertTrue(success);
assertNotNull(context.getOrder());
assertNotNull(context.getPaymentResult());
assertEquals(OrderStatus.COMPLETED, context.getOrder().getStatus());
}
@Test
void testNewOrderPipeline_RiskCheckTriggersReview() {
// Given
OrderProcessContext context = OrderProcessContext.forNewOrder(999L, 100L);
// 999是高风险订单ID
// When
Pipeline<OrderProcessContext> pipeline = pipelineFactory.createNewOrderPipeline();
boolean success = pipeline.execute(context);
// Then
assertTrue(success);
// 验证动态添加了人工审核Stage
assertTrue(context.isReviewed());
}
}
常见问题
Q1: 如何跳过某个Stage?
A: 有三种方式:
- 外部配置(适用于
optionalMode = SUPPORT的Stage):
context.disableStage("watermark");
- 业务逻辑判断(在
shouldExecuteByBusinessLogic()中返回false):
@Override
protected boolean shouldExecuteByBusinessLogic(MyContext context) {
return context.needsProcessing(); // 返回false则跳过
}
- 构建时不添加:
builder.addStageIf(needWatermark, new WatermarkStage());
Q2: 如何在运行时决定是否添加某个Stage?
A: 使用动态Stage添加:
@Override
protected StageResult<MyContext> doExecute(MyContext context) {
if (needsExtraProcessing(context)) {
ExtraStage extraStage = new ExtraStage();
return StageResult.successWithNext("添加额外处理", extraStage);
}
return StageResult.success();
}
或使用PipelineBuilder.addStageIf():
Pipeline<MyContext> pipeline = new PipelineBuilder<>("MyPipeline")
.addStage(stage1)
.addStageIf(condition, stage2) // 条件性添加
.addStage(stage3)
.build();
Q3: Stage执行失败后Pipeline会怎样?
A:
StageResult.failed(): Pipeline立即终止,不执行后续StageStageResult.degraded(): Pipeline继续执行,但记录警告日志StageResult.skipped(): Pipeline继续执行,视为正常跳过
Q4: 如何处理Stage执行超时?
A: 当前框架不直接支持超时控制,可以在Stage内部实现:
@Override
protected StageResult<MyContext> doExecute(MyContext context) {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Result> future = executor.submit(() -> doHeavyWork(context));
try {
Result result = future.get(5, TimeUnit.SECONDS);
return StageResult.success("处理完成");
} catch (TimeoutException e) {
future.cancel(true);
log.warn("Stage执行超时");
return StageResult.degraded("超时,使用降级方案");
} finally {
executor.shutdown();
}
}
Q5: 临时资源什么时候被清理?
A:
- 推荐: 在Pipeline最后添加CleanupStage
- 兜底: Pipeline在finally块中会自动调用
context.cleanup() - 手动: 也可以在业务代码中手动调用
context.cleanup()
Pipeline<MyContext> pipeline = new PipelineBuilder<>("MyPipeline")
.addStage(stage1)
.addStage(stage2)
.addStage(new CleanupStage()) // 显式添加清理Stage
.build();
// Pipeline会在finally中再次调用cleanup(),确保清理
Q6: 如何获取Pipeline执行结果?
A:
- Pipeline返回
boolean(成功/失败) - 具体结果数据存储在Context中
boolean success = pipeline.execute(context);
if (success) {
// 从Context获取结果
PaymentResult result = context.getPaymentResult();
Order order = context.getOrder();
}
Q7: 如何调试Pipeline执行?
A:
- 查看Pipeline日志(包含每个Stage的执行状态和耗时)
- 使用
pipeline.getStageNames()查看Stage列表 - 在Context中添加调试字段
// 启用DEBUG日志
logging:
level:
com.ycwl.basic.pipeline: DEBUG
// 日志输出示例
[MyPipeline] Pipeline 开始执行, Stage 数量: 5
[MyPipeline] [1/5] 准备执行 Stage: ValidateOrder
[MyPipeline] ✓ Stage ValidateOrder - SUCCESS (耗时: 15ms)
[MyPipeline] [2/5] 准备执行 Stage: RiskCheck
[MyPipeline] Stage RiskCheck 动态添加了 1 个后续 Stage
[MyPipeline] ✓ Stage RiskCheck - SUCCESS (耗时: 120ms)
Q8: 如何支持并行执行Stage?
A: 当前框架不直接支持并行执行,所有Stage都是顺序执行。如果需要并行,可以:
- 在单个Stage内部使用
CompletableFuture并行处理 - 将独立的子任务拆分为异步Stage
@Override
protected StageResult<MyContext> doExecute(MyContext context) {
// 并行执行多个独立任务
CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> doTask1(context));
CompletableFuture<Void> task2 = CompletableFuture.runAsync(() -> doTask2(context));
CompletableFuture<Void> task3 = CompletableFuture.runAsync(() -> doTask3(context));
// 等待所有任务完成
CompletableFuture.allOf(task1, task2, task3).join();
return StageResult.success("并行任务完成");
}
Q9: Stage的优先级如何设置?
A:
- 实现
PipelineStage.getPriority()方法(默认返回0) - 调用
PipelineBuilder.sortByPriority()排序
@Override
public int getPriority() {
return -100; // 数值越小优先级越高
}
Pipeline<MyContext> pipeline = new PipelineBuilder<>("MyPipeline")
.addStage(normalStage) // priority = 0
.addStage(prepareStage) // priority = -100
.addStage(cleanupStage) // priority = 999
.sortByPriority() // 排序: prepareStage → normalStage → cleanupStage
.build();
Q10: 如何避免Stage循环依赖?
A:
- Pipeline内置了循环检测(最大执行100个Stage)
- 如果超过限制会抛出
PipelineException - 设计Stage时避免互相添加对方作为后续Stage
// BAD ❌ - 会导致无限循环
@Override
protected StageResult<MyContext> doExecute(MyContext context) {
return StageResult.successWithNext("添加自己", this); // 无限循环!
}
// GOOD ✅ - 有明确的终止条件
@Override
protected StageResult<MyContext> doExecute(MyContext context) {
if (context.getRetryCount() < 3) {
context.incrementRetryCount();
return StageResult.successWithNext("重试", this);
}
return StageResult.success("重试完成");
}
架构设计
设计模式
| 模式 | 应用 | 说明 |
|---|---|---|
| 责任链模式 | Pipeline + Stage | Stage按顺序处理,每个Stage决定是否处理和传递 |
| Builder模式 | PipelineBuilder | 灵活组装Pipeline |
| 模板方法模式 | AbstractPipelineStage | 定义执行流程,子类实现具体逻辑 |
| 策略模式 | StageOptionalMode | 不同的Stage执行策略(FORCE_ON/SUPPORT/UNSUPPORT) |
| 工厂模式 | PipelineFactory | 创建不同场景的Pipeline |
| 状态模式 | StageResult.Status | 四种执行状态(SUCCESS/SKIPPED/DEGRADED/FAILED) |
核心流程图
Pipeline.execute(context)
│
├─→ context.beforePipeline()
│
├─→ for (stage : stages)
│ │
│ ├─→ shouldExecute(context)?
│ │ │
│ │ ├─→ false: continue
│ │ │
│ │ └─→ true:
│ │ ├─→ beforeExecute(context)
│ │ ├─→ doExecute(context) → result
│ │ ├─→ afterExecute(context, result)
│ │ │
│ │ ├─→ result.hasNextStages()?
│ │ │ └─→ yes: 动态插入Stage
│ │ │
│ │ └─→ result.isFailed()?
│ │ └─→ yes: 终止Pipeline
│ │
│ └─→ 记录日志(✓成功 ○跳过 △降级 ✗失败)
│
├─→ context.afterPipeline()
│
└─→ finally: context.cleanup()
Stage执行判断流程
AbstractPipelineStage.shouldExecute(context)
│
├─→ 获取@StageConfig注解
│
├─→ optionalMode == FORCE_ON?
│ └─→ yes: 跳过外部配置检查
│
├─→ optionalMode == SUPPORT?
│ │
│ └─→ yes:
│ ├─→ context.isStageEnabled(stageId, defaultEnabled)
│ │ │
│ │ ├─→ false: 返回false(外部禁用)
│ │ │
│ │ └─→ true: 继续检查业务逻辑
│
└─→ shouldExecuteByBusinessLogic(context)
│
├─→ true: 执行Stage
│
└─→ false: 跳过Stage
已知限制
- 不支持Stage并行执行: 所有Stage都是顺序执行,需要并行的场景需在Stage内部实现
- 不支持Stage执行超时: 需要在Stage内部自行实现超时控制
- 不支持Pipeline暂停/恢复: Pipeline执行一旦开始就会运行到结束或失败
- 动态添加的Stage无法排序:
sortByPriority()只对初始Stage有效 - Context不支持并发访问: 需要在业务代码中保证线程安全
未来改进方向
- 🔄 支持Stage并行执行(ParallelStage)
- 🔄 支持Stage执行超时控制(@StageConfig.timeout)
- 🔄 支持Pipeline执行的暂停/恢复
- 🔄 支持更细粒度的性能监控(Metrics集成)
- 🔄 支持Stage执行的重试机制(@StageConfig.retry)
- 🔄 支持Pipeline执行的可视化追踪
- 🔄 支持Stage间的数据传递验证(Schema验证)
相关文档
- 人脸匹配Pipeline文档 - 完整的人脸识别业务实现
- 图片处理Pipeline文档 - 完整的图片处理业务实现
- Integration包文档 - 微服务集成框架
维护者
- Pipeline通用框架 - 基础架构团队