Files
DataMate/backend/services/data-collection-service
hefanli 08bd4eca5c feature:增加数据配比功能 (#52)
* refactor: 修改调整数据归集实现,删除无用代码,优化代码结构

* feature: 每天凌晨00:00扫描所有数据集,检查数据集是否超过了预设的保留天数,超出保留天数的数据集调用删除接口进行删除

* fix: 修改删除数据集文件的逻辑,上传到数据集中的文件会同时删除数据库中的记录和文件系统中的文件,归集过来的文件仅删除数据库中的记录

* fix: 增加参数校验和接口定义,删除不使用的接口

* fix: 数据集统计数据默认为0

* feature: 数据集状态增加流转,创建时为草稿状态,上传文件或者归集文件后修改为活动状态

* refactor: 修改分页查询归集任务的代码

* fix: 更新后重新执行;归集任务执行增加事务控制

* feature: 创建归集任务时能够同步创建数据集,更新归集任务时能更新到指定数据集

* fix: 创建归集任务不需要创建数据集时不应该报错

* fix: 修复删除文件时数据集的统计数据不变动

* feature: 查询数据集详情时能够获取到文件标签分布

* fix: tags为空时不进行分析

* fix: 状态修改为ACTIVE

* fix: 修改解析tag的方法

* feature: 实现创建、分页查询、删除配比任务

* feature: 实现创建、分页查询、删除配比任务的前端交互

* fix: 修复进度计算异常导致的页面报错
2025-11-03 10:17:39 +08:00
..
2025-10-21 23:00:48 +08:00
2025-10-21 23:00:48 +08:00
2025-10-21 23:00:48 +08:00
2025-10-21 23:00:48 +08:00
2025-10-28 11:03:01 +08:00
2025-10-21 23:00:48 +08:00

数据归集服务 (Data Collection Service)

基于DataX的数据归集和同步服务,提供多数据源之间的数据同步功能。

功能特性

  • 🔗 多数据源支持: 支持MySQL、PostgreSQL、Oracle、SQL Server等主流数据库
  • 📊 任务管理: 创建、配置、执行和监控数据同步任务
  • 定时调度: 支持Cron表达式的定时任务
  • 📈 实时监控: 任务执行进度、状态和性能指标监控
  • 📝 执行日志: 详细的任务执行日志记录
  • 🔌 插件化: DataX Reader/Writer插件化集成

技术架构

  • 框架: Spring Boot 3.x
  • 数据库: MySQL + MyBatis
  • 同步引擎: DataX
  • API: OpenAPI 3.0 自动生成
  • 架构模式: DDD (领域驱动设计)

项目结构

src/main/java/com/datamate/collection/
├── DataCollectionApplication.java          # 应用启动类
├── domain/                                  # 领域层
│   ├── model/                              # 领域模型
│   │   ├── DataSource.java                 # 数据源实体
│   │   ├── CollectionTask.java             # 归集任务实体
│   │   ├── TaskExecution.java              # 任务执行记录
│   │   └── ExecutionLog.java               # 执行日志
│   └── service/                            # 领域服务
│       ├── DataSourceService.java
│       ├── CollectionTaskService.java
│       ├── TaskExecutionService.java
│       └── impl/                           # 服务实现
├── infrastructure/                          # 基础设施层
│   ├── config/                             # 配置类
│   ├── datax/                              # DataX执行引擎
│   │   └── DataXExecutionEngine.java
│   └── persistence/                        # 持久化
│       ├── mapper/                         # MyBatis Mapper
│       └── typehandler/                    # 类型处理器
└── interfaces/                             # 接口层
    ├── api/                                # OpenAPI生成的接口
    ├── dto/                                # OpenAPI生成的DTO
    └── rest/                               # REST控制器
        ├── DataSourceController.java
        ├── CollectionTaskController.java
        ├── TaskExecutionController.java
        └── exception/                      # 异常处理

src/main/resources/
├── mappers/                                # MyBatis XML映射文件
├── application.properties                  # 应用配置
└── ...

环境要求

  • Java 17+
  • Maven 3.6+
  • MySQL 8.0+
  • DataX 3.0+
  • Redis (可选,用于缓存)

配置说明

应用配置 (application.properties)

# 服务端口
server.port=8090

# 数据库配置
spring.datasource.url=jdbc:mysql://localhost:3306/knowledge_base
spring.datasource.username=root
spring.datasource.password=123456

# DataX配置
datax.home=/runtime/datax
datax.python.path=/runtime/datax/bin/datax.py
datax.job.timeout=7200
datax.job.memory=2g

DataX配置

确保DataX已正确安装并配置:

  1. 下载DataX到 /runtime/datax 目录
  2. 配置相关Reader/Writer插件
  3. 确保Python环境可用

数据库初始化

执行数据库初始化脚本:

mysql -u root -p knowledge_base < scripts/db/data-collection-init.sql

构建和运行

1. 编译项目

cd backend/services/data-collection-service
mvn clean compile

这将触发OpenAPI代码生成。

2. 打包

mvn clean package -DskipTests

3. 运行

作为独立服务运行:

java -jar target/data-collection-service-1.0.0-SNAPSHOT.jar

或通过main-application统一启动:

cd backend/services/main-application
mvn spring-boot:run

API文档

服务启动后,可通过以下地址访问API文档:

主要API端点

数据源管理

  • GET /api/v1/collection/datasources - 获取数据源列表
  • POST /api/v1/collection/datasources - 创建数据源
  • GET /api/v1/collection/datasources/{id} - 获取数据源详情
  • PUT /api/v1/collection/datasources/{id} - 更新数据源
  • DELETE /api/v1/collection/datasources/{id} - 删除数据源
  • POST /api/v1/collection/datasources/{id}/test - 测试连接

归集任务管理

  • GET /api/v1/collection/tasks - 获取任务列表
  • POST /api/v1/collection/tasks - 创建任务
  • GET /api/v1/collection/tasks/{id} - 获取任务详情
  • PUT /api/v1/collection/tasks/{id} - 更新任务
  • DELETE /api/v1/collection/tasks/{id} - 删除任务

任务执行管理

  • POST /api/v1/collection/tasks/{id}/execute - 执行任务
  • POST /api/v1/collection/tasks/{id}/stop - 停止任务
  • GET /api/v1/collection/executions - 获取执行历史
  • GET /api/v1/collection/executions/{executionId} - 获取执行详情
  • GET /api/v1/collection/executions/{executionId}/logs - 获取执行日志

监控统计

  • GET /api/v1/collection/monitor/statistics - 获取统计信息

开发指南

添加新的数据源类型

  1. DataSource.DataSourceType 枚举中添加新类型
  2. DataXExecutionEngine 中添加对应的Reader/Writer映射
  3. 更新数据库表结构和初始化数据

自定义DataX插件

  1. 将插件放置在 /runtime/datax/plugin 目录下
  2. DataXExecutionEngine 中配置插件映射关系
  3. 根据插件要求调整配置模板

扩展监控指标

  1. StatisticsService 中添加新的统计逻辑
  2. 更新 CollectionStatistics DTO
  3. 在数据库中添加相应的统计表或字段

故障排查

常见问题

  1. DataX执行失败

    • 检查DataX安装路径和Python环境
    • 确认数据源连接配置正确
    • 查看执行日志获取详细错误信息
  2. 数据库连接失败

    • 检查数据库配置和网络连通性
    • 确认数据库用户权限
  3. API调用失败

    • 检查请求参数格式
    • 查看应用日志获取详细错误信息

日志查看

# 应用日志
tail -f logs/data-collection-service.log

# 任务执行日志
curl http://localhost:8090/api/v1/collection/executions/{executionId}/logs

贡献指南

  1. Fork项目
  2. 创建特性分支: git checkout -b feature/new-feature
  3. 提交更改: git commit -am 'Add new feature'
  4. 推送分支: git push origin feature/new-feature
  5. 提交Pull Request

许可证

MIT License