fix(core): 修复 StageResult 中 nextStages 的不可变性问题

- 将 nextStages 初始化改为使用 Collections.unmodifiableList 包装
- 防止外部代码修改 nextStages 列表内容
- 保证 StageResult 的不可变性和线程安全性
- 添加完整的单元测试覆盖各种构造场景
This commit is contained in:
2025-11-26 09:03:36 +08:00
parent 7b18d7c2af
commit d2846e6d8e
14 changed files with 2275 additions and 1 deletions

View File

@@ -0,0 +1,595 @@
# Image Pipeline 图片处理管线
## 概述
Image Pipeline 是一个通用的、可扩展的图片处理管线框架,用于组织和执行一系列图片处理操作(Stage)。
### 核心特性
- **责任链模式**: 将图片处理流程拆分为独立的 Stage,按顺序执行
- **Builder 模式**: 灵活组装管线,支持条件性添加 Stage
- **动态 Stage 添加**: 支持在运行时根据条件动态添加后续 Stage
- **降级策略**: 支持多级降级执行,确保管线在异常情况下的稳定性
- **配置驱动**: 支持通过外部配置控制 Stage 的启用/禁用
- **类型安全**: 使用泛型和枚举确保类型安全
- **解耦设计**: Context 独立于业务模型,支持多种使用场景
## 包结构
```
com.ycwl.basic.image.pipeline
├── annotation/ # 注解定义
│ └── StageConfig # Stage 配置注解
├── core/ # 核心类
│ ├── AbstractPipelineStage # Stage 抽象基类
│ ├── PhotoProcessContext # 管线上下文
│ ├── Pipeline # 管线执行器
│ ├── PipelineBuilder # 管线构建器
│ ├── PipelineStage # Stage 接口
│ └── StageResult # Stage 执行结果
├── enums/ # 枚举定义
│ ├── ImageSource # 图片来源枚举
│ ├── ImageType # 图片类型枚举
│ ├── PipelineScene # 管线场景枚举
│ └── StageOptionalMode # Stage 可选模式枚举
├── exception/ # 异常类
│ ├── PipelineException # 管线异常
│ └── StageExecutionException # Stage 执行异常
├── stages/ # 具体 Stage 实现
│ ├── CleanupStage # 清理临时文件
│ ├── ConditionalRotateStage # 条件性旋转
│ ├── DownloadStage # 下载图片
│ ├── ImageEnhanceStage # 图像增强(超分)
│ ├── ImageOrientationStage # 图像方向检测
│ ├── ImageQualityCheckStage # 图像质量检测
│ ├── PuzzleBorderStage # 拼图边框处理
│ ├── RestoreOrientationStage # 恢复图片方向
│ ├── SourcePhotoUpdateStage # 源图片更新
│ ├── UploadStage # 上传图片
│ └── WatermarkStage # 水印处理
└── util/ # 工具类
└── TempFileManager # 临时文件管理器
```
## 核心组件
### 1. Pipeline - 管线执行器
**职责**: 按顺序执行一系列 Stage,管理执行流程和异常处理。
**关键特性**:
- 顺序执行所有 Stage
- 支持动态添加后续 Stage
- 循环检测(最大执行 100 个 Stage)
- 详细的日志输出(带状态图标)
**使用示例**:
```java
Pipeline<PhotoProcessContext> pipeline = new PipelineBuilder<>("MyPipeline")
.addStage(new DownloadStage())
.addStage(new WatermarkStage())
.addStage(new UploadStage())
.addStage(new CleanupStage())
.build();
boolean success = pipeline.execute(context);
```
### 2. PhotoProcessContext - 管线上下文
**职责**: 在各个 Stage 之间传递状态和数据,提供临时文件管理。
**核心字段**:
- `processId`: 处理过程唯一标识,用于隔离临时文件
- `originalUrl`: 原图 URL
- `scenicId`: 景区 ID
- `imageType`: 图片类型(普通照片/拼图/手机上传)
- `tempFileManager`: 临时文件管理器
**静态工厂方法**:
```java
// 从打印订单创建(打印场景)
PhotoProcessContext context = PhotoProcessContext.fromPrinterOrderItem(orderItem, scenicId);
// 为超分辨率场景创建
PhotoProcessContext context = PhotoProcessContext.forSuperResolution(itemId, url, scenicId);
// 使用 Builder 自定义创建
PhotoProcessContext context = PhotoProcessContext.builder()
.processId("custom-id")
.originalUrl("https://example.com/image.jpg")
.scenicId(12345L)
.imageType(ImageType.NORMAL_PHOTO)
.source(ImageSource.IPC)
.scene(PipelineScene.IMAGE_PRINT)
.build();
```
**重要方法**:
- `getCurrentFile()`: 获取当前处理中的文件
- `updateProcessedFile(File)`: 更新处理后的文件
- `setResultUrl(String)`: 设置最终结果 URL(会触发回调)
- `cleanup()`: 清理所有临时文件
- `isStageEnabled(stageId, default)`: 判断 Stage 是否启用
### 3. AbstractPipelineStage - Stage 抽象基类
**职责**: 提供 Stage 的通用实现和模板方法。
**执行流程**:
```
shouldExecute() → beforeExecute() → doExecute() → afterExecute()
```
**子类需要实现**:
- `getName()`: 返回 Stage 名称
- `doExecute(context)`: 实现具体处理逻辑
- `shouldExecuteByBusinessLogic(context)`: (可选)实现条件判断
**Stage 执行判断逻辑**:
1. 检查 `@StageConfig` 注解
2. 根据 `optionalMode` 决定是否检查外部配置
- `FORCE_ON`: 强制执行,不检查外部配置
- `SUPPORT`: 检查外部配置(`context.isStageEnabled()`
- `UNSUPPORT`: 不检查外部配置
3. 执行业务逻辑判断(`shouldExecuteByBusinessLogic()`
### 4. StageResult - Stage 执行结果
**状态类型**:
- `SUCCESS`: 执行成功
- `SKIPPED`: 跳过执行
- `FAILED`: 执行失败(会终止管线)
- `DEGRADED`: 降级执行(继续管线但记录警告)
**静态工厂方法**:
```java
// 成功
StageResult.success();
StageResult.success("处理完成");
// 成功并动态添加后续 Stage
StageResult.successWithNext("质量不佳,添加增强", new ImageEnhanceStage());
// 跳过
StageResult.skipped("条件不满足");
// 失败
StageResult.failed("下载失败");
StageResult.failed("处理失败", exception);
// 降级
StageResult.degraded("使用备用方案");
```
### 5. @StageConfig - Stage 配置注解
**字段**:
- `stageId`: Stage 唯一标识(用于外部配置控制)
- `optionalMode`: 可选模式
- `FORCE_ON`: 强制执行(如 DownloadStage、CleanupStage)
- `SUPPORT`: 支持外部控制(如 WatermarkStage、ImageEnhanceStage)
- `UNSUPPORT`: 不支持外部控制(如 RestoreOrientationStage)
- `defaultEnabled`: 默认是否启用
- `description`: 描述信息
**示例**:
```java
@StageConfig(
stageId = "watermark",
optionalMode = StageOptionalMode.SUPPORT,
description = "水印处理",
defaultEnabled = true
)
public class WatermarkStage extends AbstractPipelineStage<PhotoProcessContext> {
// ...
}
```
## Stage 列表
### 核心 Stage
| Stage | 职责 | Optional Mode | 执行条件 |
|-------|------|---------------|---------|
| DownloadStage | 从 URL 下载图片到本地 | FORCE_ON | 总是执行 |
| CleanupStage | 清理所有临时文件 | FORCE_ON | 总是执行(优先级 999) |
### 图片处理 Stage
| Stage | 职责 | Optional Mode | 执行条件 |
|-------|------|---------------|---------|
| ImageOrientationStage | 检测图片方向(横竖) | UNSUPPORT | 仅普通照片 |
| ConditionalRotateStage | 条件性旋转(竖图变横图) | UNSUPPORT | 仅竖图 |
| RestoreOrientationStage | 恢复图片方向(横图变回竖图) | UNSUPPORT | 需要旋转的照片 |
| WatermarkStage | 添加水印 | SUPPORT | 仅普通照片 |
| PuzzleBorderStage | 处理拼图边框 | UNSUPPORT | 仅拼图 |
| ImageEnhanceStage | 图像增强(超分) | SUPPORT | 可配置 |
| ImageQualityCheckStage | 图像质量检测 | SUPPORT | 仅普通照片 |
### 存储 Stage
| Stage | 职责 | Optional Mode | 执行条件 |
|-------|------|---------------|---------|
| UploadStage | 上传图片到存储服务 | FORCE_ON | 总是执行 |
| SourcePhotoUpdateStage | 更新源图片记录 | UNSUPPORT | 总是执行 |
## 典型管线示例
### 1. 打印照片处理管线
```java
Pipeline<PhotoProcessContext> pipeline = new PipelineBuilder<>("PrintPipeline")
.addStage(new DownloadStage()) // 1. 下载
.addStage(new ImageOrientationStage()) // 2. 检测方向
.addStage(new ConditionalRotateStage()) // 3. 旋转竖图
.addStage(new WatermarkStage()) // 4. 添加水印
.addStage(new RestoreOrientationStage()) // 5. 恢复方向
.addStage(new UploadStage()) // 6. 上传
.addStage(new CleanupStage()) // 7. 清理
.build();
```
### 2. 拼图处理管线
```java
Pipeline<PhotoProcessContext> pipeline = new PipelineBuilder<>("PuzzlePipeline")
.addStage(new DownloadStage()) // 1. 下载
.addStage(new PuzzleBorderStage()) // 2. 添加拼图边框
.addStage(new UploadStage()) // 3. 上传
.addStage(new CleanupStage()) // 4. 清理
.build();
```
### 3. 超分辨率增强管线
```java
Pipeline<PhotoProcessContext> pipeline = new PipelineBuilder<>("SuperResolutionPipeline")
.addStage(new DownloadStage()) // 1. 下载
.addStage(new ImageEnhanceStage(config)) // 2. 超分增强
.addStage(new SourcePhotoUpdateStage(sourceService, sourceId)) // 3. 更新记录
.addStage(new CleanupStage()) // 4. 清理
.build();
```
### 4. 带质量检测的管线(动态 Stage)
```java
Pipeline<PhotoProcessContext> pipeline = new PipelineBuilder<>("QualityCheckPipeline")
.addStage(new DownloadStage()) // 1. 下载
.addStage(new ImageQualityCheckStage()) // 2. 质量检测(可能动态添加 ImageEnhanceStage)
.addStage(new WatermarkStage()) // 3. 水印
.addStage(new UploadStage()) // 4. 上传
.addStage(new CleanupStage()) // 5. 清理
.build();
```
## 扩展指南
### 如何创建新的 Stage
1. **继承 AbstractPipelineStage**:
```java
@Slf4j
@StageConfig(
stageId = "my_stage",
optionalMode = StageOptionalMode.SUPPORT,
description = "我的自定义 Stage",
defaultEnabled = true
)
public class MyStage extends AbstractPipelineStage<PhotoProcessContext> {
@Override
public String getName() {
return "MyStage";
}
@Override
protected boolean shouldExecuteByBusinessLogic(PhotoProcessContext context) {
// 实现条件判断
return context.getImageType() == ImageType.NORMAL_PHOTO;
}
@Override
protected StageResult doExecute(PhotoProcessContext context) {
File currentFile = context.getCurrentFile();
try {
// 1. 获取输入文件
if (currentFile == null || !currentFile.exists()) {
return StageResult.failed("当前文件不存在");
}
// 2. 处理图片
File outputFile = context.getTempFileManager()
.createTempFile("my_output", ".jpg");
// 执行具体处理逻辑
doSomethingWithImage(currentFile, outputFile);
// 3. 更新 Context
context.updateProcessedFile(outputFile);
// 4. 返回成功结果
return StageResult.success("处理完成");
} catch (Exception e) {
log.error("处理失败", e);
return StageResult.failed("处理失败: " + e.getMessage(), e);
}
}
private void doSomethingWithImage(File input, File output) {
// 具体实现
}
}
```
2. **添加到管线**:
```java
pipeline.addStage(new MyStage());
```
### 如何实现降级策略
参考 `WatermarkStage` 的实现,使用循环尝试多种方案:
```java
@Override
protected StageResult doExecute(PhotoProcessContext context) {
List<Strategy> strategies = Arrays.asList(
Strategy.ADVANCED,
Strategy.STANDARD,
Strategy.BASIC
);
for (int i = 0; i < strategies.size(); i++) {
Strategy strategy = strategies.get(i);
try {
StageResult result = tryStrategy(context, strategy);
if (i > 0) {
// 使用了降级策略
return StageResult.degraded("降级到: " + strategy);
}
return result;
} catch (Exception e) {
log.warn("策略 {} 失败: {}", strategy, e.getMessage());
}
}
// 所有策略都失败
return StageResult.degraded("所有策略失败,跳过处理");
}
```
### 如何动态添加后续 Stage
参考 `ImageQualityCheckStage` 的实现:
```java
@Override
protected StageResult doExecute(PhotoProcessContext context) {
boolean needsEnhancement = checkQuality(context.getCurrentFile());
if (needsEnhancement) {
ImageEnhanceStage enhanceStage = new ImageEnhanceStage();
return StageResult.successWithNext("质量不佳,添加增强", enhanceStage);
}
return StageResult.success("质量良好");
}
```
## 最佳实践
### 1. Context 管理
- **总是使用静态工厂方法或 Builder**: 避免直接调用构造函数
- **及时清理临时文件**: 在 finally 块或使用 CleanupStage
- **使用回调更新外部状态**: 通过 `resultUrlCallback` 而非直接操作业务对象
### 2. Stage 设计
- **单一职责**: 每个 Stage 只做一件事
- **可组合**: Stage 应该可以灵活组合
- **幂等性**: 相同输入应产生相同输出
- **异常处理**: 捕获异常并返回 `StageResult.failed()``StageResult.degraded()`
- **日志记录**: 在关键操作处记录 debug/info 日志
### 3. 管线构建
- **CleanupStage 总是最后**: 确保临时文件总是被清理
- **DownloadStage 总是最前**: 确保有本地文件可用
- **合理使用 optionalMode**:
- 必需的 Stage 使用 `FORCE_ON`
- 可选的 Stage 使用 `SUPPORT`
- 内部逻辑控制的 Stage 使用 `UNSUPPORT`
### 4. 性能优化
- **复用 TempFileManager**: 自动管理临时文件生命周期
- **避免重复下载**: 使用 `context.getCurrentFile()` 获取最新文件
- **及时更新 processedFile**: 使用 `context.updateProcessedFile()` 通知下一个 Stage
### 5. 错误处理
- **失败即停止**: 使用 `StageResult.failed()` 终止管线
- **降级继续执行**: 使用 `StageResult.degraded()` 记录问题但继续执行
- **跳过非关键 Stage**: 使用 `StageResult.skipped()` 表示条件不满足
- **携带异常信息**: `StageResult.failed(message, exception)` 便于排查问题
## 配置控制
### 1. 景区级配置
通过 `ScenicConfigManager` 加载景区配置:
```java
context.setScenicConfigManager(scenicConfigManager);
```
Stage 内部可以获取配置:
```java
ScenicConfigManager config = context.getScenicConfigManager();
Boolean enabled = config.getBoolean("my_feature_enabled");
String value = config.getString("my_setting");
```
### 2. 请求级配置
通过 `loadStageConfig()` 加载请求参数:
```java
Map<String, Boolean> stageConfig = new HashMap<>();
stageConfig.put("watermark", false); // 禁用水印
stageConfig.put("image_enhance", true); // 启用增强
context.loadStageConfig(scenicConfigManager, stageConfig);
```
### 3. Stage 启用判断
`AbstractPipelineStage.shouldExecute()` 中自动处理:
```java
// 对于 optionalMode = SUPPORT 的 Stage
@StageConfig(
stageId = "watermark",
optionalMode = StageOptionalMode.SUPPORT,
defaultEnabled = true
)
public class WatermarkStage extends AbstractPipelineStage<PhotoProcessContext> {
// 如果外部配置禁用了 watermark,则不执行
}
```
## 测试指南
### 单元测试结构
```java
@Test
public void testStageSuccess() {
// 1. 准备 Context
PhotoProcessContext context = PhotoProcessContext.builder()
.processId("test-1")
.originalUrl("https://example.com/test.jpg")
.scenicId(123L)
.build();
// 2. 创建 Stage
MyStage stage = new MyStage();
// 3. 执行
StageResult result = stage.execute(context);
// 4. 断言
assertTrue(result.isSuccess());
assertNotNull(context.getCurrentFile());
}
@Test
public void testStageSkipped() {
// 测试条件不满足时跳过
}
@Test
public void testStageFailed() {
// 测试异常情况
}
@Test
public void testStageDegraded() {
// 测试降级情况
}
```
### 管线集成测试
```java
@Test
public void testPipelineExecution() {
Pipeline<PhotoProcessContext> pipeline = new PipelineBuilder<>("TestPipeline")
.addStage(new DownloadStage())
.addStage(new WatermarkStage())
.addStage(new UploadStage())
.addStage(new CleanupStage())
.build();
PhotoProcessContext context = createTestContext();
boolean success = pipeline.execute(context);
assertTrue(success);
assertNotNull(context.getResultUrl());
}
```
## 常见问题
### Q: 如何跳过某个 Stage?
A: 有三种方式:
1. 使用外部配置(适用于 `optionalMode = SUPPORT` 的 Stage)
2.`shouldExecuteByBusinessLogic()` 中返回 false
3. 构建管线时不添加该 Stage
### Q: 如何在运行时决定是否添加某个 Stage?
A: 使用 `PipelineBuilder.addStageIf()`:
```java
builder.addStageIf(needWatermark, new WatermarkStage());
```
或者使用动态 Stage 添加(`StageResult.successWithNext()`)。
### Q: 如何处理 Stage 执行失败?
A: 返回 `StageResult.failed()`,管线会立即终止。如果希望继续执行,使用 `StageResult.degraded()`
### Q: 临时文件什么时候被清理?
A: 由 `CleanupStage` 负责,通常放在管线最后。也可以手动调用 `context.cleanup()`
### Q: 如何获取最终处理结果?
A: 使用 `context.getResultUrl()`,或者在构建 Context 时提供 `resultUrlCallback`
### Q: 如何支持新的图片来源或场景?
A: 扩展 `ImageSource``PipelineScene` 枚举,然后在 Stage 中添加相应的判断逻辑。
## 架构演进
### 已实现的特性
- ✅ 责任链模式的基础管线框架
- ✅ Builder 模式的管线构建
- ✅ 动态 Stage 添加
- ✅ 多级降级策略
- ✅ 配置驱动的 Stage 控制
- ✅ Context 与业务模型解耦
- ✅ 类型安全的图片分类
### 未来可能的改进
- 🔄 支持并行执行某些 Stage
- 🔄 支持 Stage 执行超时控制
- 🔄 支持管线执行的暂停/恢复
- 🔄 支持更细粒度的性能监控
- 🔄 支持 Stage 执行的重试机制
- 🔄 支持管线执行的可视化追踪
## 相关文档
- [ImageUtils 工具类](../utils/ImageUtils.java)
- [StorageFactory 存储工厂](../storage/StorageFactory.java)
- [WatermarkFactory 水印工厂](../image/watermark/ImageWatermarkFactory.java)
## 维护者
- 图片处理管线 - 基础架构团队

View File

@@ -29,7 +29,9 @@ public class StageResult {
this.status = status;
this.message = message;
this.exception = exception;
this.nextStages = nextStages != null ? new ArrayList<>(nextStages) : Collections.emptyList();
this.nextStages = nextStages != null
? Collections.unmodifiableList(new ArrayList<>(nextStages))
: Collections.emptyList();
}
public static StageResult success() {