feature:增加定时清除超出保留期限数据集的功能;增加数据归集任务绑定数据集的接口 (#24)

* refactor: 修改调整数据归集实现,删除无用代码,优化代码结构

* feature: 每天凌晨00:00扫描所有数据集,检查数据集是否超过了预设的保留天数,超出保留天数的数据集调用删除接口进行删除

* fix: 修改删除数据集文件的逻辑,上传到数据集中的文件会同时删除数据库中的记录和文件系统中的文件,归集过来的文件仅删除数据库中的记录

* fix: 增加参数校验和接口定义,删除不使用的接口

* fix: 数据集统计数据默认为0

* feature: 数据集状态增加流转,创建时为草稿状态,上传文件或者归集文件后修改为活动状态

* refactor: 修改分页查询归集任务的代码

* fix: 更新后重新执行;归集任务执行增加事务控制

* feature: 创建归集任务时能够同步创建数据集,更新归集任务时能更新到指定数据集
This commit is contained in:
hefanli
2025-10-25 15:59:36 +08:00
committed by GitHub
parent 871ba5758d
commit 46dfb389f1
21 changed files with 375 additions and 212 deletions

View File

@@ -39,7 +39,7 @@ paths:
schema:
type: integer
default: 0
description: 页码,从0开始
description: 页码,从1开始
- name: size
in: query
schema:
@@ -65,7 +65,7 @@ paths:
in: query
schema:
type: string
enum: [ACTIVE, INACTIVE, PROCESSING]
enum: [DRAFT, ACTIVE, PROCESSING, ARCHIVED, PUBLISHED, DEPRECATED]
description: 数据集状态过滤
responses:
'200':
@@ -231,40 +231,6 @@ paths:
schema:
$ref: '#/components/schemas/PagedDatasetFileResponse'
post:
tags: [DatasetFile]
summary: 上传文件到数据集
operationId: uploadDatasetFile
description: 向指定数据集上传文件
parameters:
- name: datasetId
in: path
required: true
schema:
type: string
description: 数据集ID
requestBody:
required: true
content:
multipart/form-data:
schema:
type: object
properties:
file:
type: string
format: binary
description: 要上传的文件
description:
type: string
description: 文件描述
responses:
'201':
description: 上传成功
content:
application/json:
schema:
$ref: '#/components/schemas/DatasetFileResponse'
/data-management/datasets/{datasetId}/files/{fileId}:
get:
tags: [DatasetFile]
@@ -342,6 +308,78 @@ paths:
type: string
format: binary
/data-management/datasets/{datasetId}/files/download:
get:
tags: [ DatasetFile ]
operationId: downloadDatasetFileAsZip
summary: 下载文件
description: 下载数据集中全部文件
parameters:
- name: datasetId
in: path
required: true
schema:
type: string
description: 数据集ID
responses:
'200':
description: 文件内容
content:
application/octet-stream:
schema:
type: string
format: binary
/data-management/datasets/{datasetId}/files/upload/pre-upload:
post:
tags: [ DatasetFile ]
operationId: preUpload
summary: 切片上传预上传
description: 预上传接口,返回后续分片上传所需的请求ID
parameters:
- name: datasetId
in: path
required: true
schema:
type: string
description: 数据集ID
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/UploadFilesPreRequest'
responses:
'200':
description: 预上传成功,返回请求ID
content:
application/json:
schema:
type: string
/data-management/datasets/{datasetId}/files/upload/chunk:
post:
tags: [ DatasetFile ]
operationId: chunkUpload
summary: 切片上传
description: 使用预上传返回的请求ID进行分片上传
parameters:
- name: datasetId
in: path
required: true
schema:
type: string
description: 数据集ID
requestBody:
required: true
content:
multipart/form-data:
schema:
$ref: '#/components/schemas/UploadFileRequest'
responses:
'200':
description: 上传成功
/data-management/dataset-types:
get:
operationId: getDatasetTypes
@@ -548,9 +586,59 @@ components:
description: 标签列表
status:
type: string
enum: [ACTIVE, INACTIVE]
enum: [DRAFT, ACTIVE, PROCESSING, ARCHIVED, PUBLISHED, DEPRECATED]
description: 数据集状态
UploadFilesPreRequest:
type: object
description: 切片上传预上传请求
properties:
hasArchive:
type: boolean
description: 是否为压缩包上传
default: false
totalFileNum:
type: integer
format: int32
minimum: 1
description: 总文件数量
totalSize:
type: integer
format: int64
description: 总文件大小(字节)
required: [ totalFileNum ]
UploadFileRequest:
type: object
description: 分片上传请求
properties:
reqId:
type: string
description: 预上传返回的请求ID
fileNo:
type: integer
format: int32
description: 文件编号(批量中的第几个)
fileName:
type: string
description: 文件名称
totalChunkNum:
type: integer
format: int32
description: 文件总分片数量
chunkNo:
type: integer
format: int32
description: 当前分片编号(从1开始)
file:
type: string
format: binary
description: 分片二进制内容
checkSumHex:
type: string
description: 分片校验和(十六进制)
required: [ reqId, fileNo, fileName, totalChunkNum, chunkNo, file ]
DatasetTypeResponse:
type: object
properties:

View File

@@ -87,7 +87,12 @@
<dependency>
<groupId>com.datamate</groupId>
<artifactId>domain-common</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.datamate</groupId>
<artifactId>data-management-service</artifactId>
<version>${project.version}</version>
</dependency>
<!-- OpenAPI Dependencies -->

View File

@@ -17,6 +17,7 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
@EnableTransactionManagement
@ComponentScan(basePackages = {
"com.datamate.collection",
"com.datamate.datamanagement",
"com.datamate.shared"
})
public class DataCollectionServiceConfiguration {

View File

@@ -7,12 +7,12 @@ import com.datamate.collection.domain.model.entity.CollectionTask;
import com.datamate.collection.domain.model.entity.TaskExecution;
import com.datamate.collection.common.enums.TaskStatus;
import com.datamate.collection.domain.repository.CollectionTaskRepository;
import com.datamate.collection.interfaces.dto.CollectionTaskPagingQuery;
import com.datamate.collection.common.enums.SyncMode;
import com.datamate.common.domain.utils.ChunksSaver;
import com.datamate.datamanagement.application.DatasetApplicationService;
import com.datamate.datamanagement.domain.model.dataset.Dataset;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -25,31 +25,32 @@ import java.util.Objects;
@RequiredArgsConstructor
public class CollectionTaskService {
private final TaskExecutionService taskExecutionService;
private final DatasetApplicationService datasetApplicationService;
private final CollectionTaskRepository collectionTaskRepository;
@Transactional
public CollectionTask create(CollectionTask task) {
task.setStatus(TaskStatus.READY);
task.setCreatedAt(LocalDateTime.now());
task.setUpdatedAt(LocalDateTime.now());
public CollectionTask create(CollectionTask task, String datasetId) {
task.initCreateParam();
collectionTaskRepository.save(task);
executeTaskNow(task);
executeTaskNow(task, datasetId);
return task;
}
private void executeTaskNow(CollectionTask task) {
private void executeTaskNow(CollectionTask task, String datasetId) {
if (Objects.equals(task.getSyncMode(), SyncMode.ONCE)) {
TaskExecution exec = taskExecutionService.createExecution(task);
int timeout = task.getTimeoutSeconds() == null ? 3600 : task.getTimeoutSeconds();
taskExecutionService.runAsync(task, exec.getId(), timeout);
taskExecutionService.runAsync(task, exec.getId(), timeout, datasetId);
log.info("Triggered DataX execution for task {} at {}, execId={}", task.getId(), LocalDateTime.now(), exec.getId());
}
}
@Transactional
public CollectionTask update(CollectionTask task) {
public CollectionTask update(CollectionTask task, String datasetId) {
task.setUpdatedAt(LocalDateTime.now());
task.addPath();
collectionTaskRepository.updateById(task);
executeTaskNow(task, datasetId);
return task;
}
@@ -66,11 +67,8 @@ public class CollectionTaskService {
return collectionTaskRepository.getById(id);
}
public IPage<CollectionTask> getTasks(CollectionTaskPagingQuery query) {
LambdaQueryWrapper<CollectionTask> wrapper = new LambdaQueryWrapper<CollectionTask>()
.eq(query.getStatus() != null, CollectionTask::getStatus, query.getStatus())
.like(StringUtils.isNotBlank(query.getName()), CollectionTask::getName, query.getName());
return collectionTaskRepository.page(new Page<>(query.getPage(), query.getSize()), wrapper);
public IPage<CollectionTask> getTasks(Page<CollectionTask> page, LambdaQueryWrapper<CollectionTask> wrapper) {
return collectionTaskRepository.page(page, wrapper);
}
public List<CollectionTask> selectActiveTasks() {

View File

@@ -6,6 +6,7 @@ import com.datamate.collection.common.enums.TaskStatus;
import com.datamate.collection.domain.process.ProcessRunner;
import com.datamate.collection.domain.repository.CollectionTaskRepository;
import com.datamate.collection.domain.repository.TaskExecutionRepository;
import com.datamate.datamanagement.application.DatasetApplicationService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
@@ -21,6 +22,7 @@ public class TaskExecutionService {
private final ProcessRunner processRunner;
private final TaskExecutionRepository executionRepository;
private final CollectionTaskRepository collectionTaskRepository;
private final DatasetApplicationService datasetApplicationService;
@Transactional
@@ -39,7 +41,8 @@ public class TaskExecutionService {
}
@Async
public void runAsync(CollectionTask task, String executionId, int timeoutSeconds) {
@Transactional
public void runAsync(CollectionTask task, String executionId, int timeoutSeconds, String datasetId) {
try {
int code = processRunner.runJob(task, executionId, timeoutSeconds);
log.info("DataX finished with code {} for execution {}", code, executionId);
@@ -47,6 +50,7 @@ public class TaskExecutionService {
executionRepository.completeExecution(executionId, TaskStatus.SUCCESS.name(), LocalDateTime.now(),
0, 0L, 0L, 0L, null);
collectionTaskRepository.updateStatus(task.getId(), TaskStatus.SUCCESS.name());
datasetApplicationService.processDataSourceAsync(datasetId, task.getId());
} catch (Exception e) {
log.error("DataX execution failed", e);
executionRepository.completeExecution(executionId, TaskStatus.FAILED.name(), LocalDateTime.now(),

View File

@@ -10,8 +10,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.Setter;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
/**
* 数据采集任务实体(与数据库表 t_dc_collection_tasks 对齐)
@@ -46,4 +48,12 @@ public class CollectionTask extends BaseEntity<String> {
throw new RuntimeException(e);
}
}
public void initCreateParam() {
this.id = UUID.randomUUID().toString();
this.addPath();
this.status = TaskStatus.READY;
this.createdAt = LocalDateTime.now();
this.updatedAt = LocalDateTime.now();
}
}

View File

@@ -6,6 +6,7 @@ import java.util.Map;
import com.datamate.collection.common.enums.TaskStatus;
import com.datamate.collection.common.enums.SyncMode;
import com.datamate.datamanagement.interfaces.dto.DatasetResponse;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -44,5 +45,7 @@ public class CollectionTaskResponse {
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
private LocalDateTime updatedAt;
private DatasetResponse dataset;
}

View File

@@ -1,6 +1,7 @@
package com.datamate.collection.interfaces.dto;
import com.datamate.collection.common.enums.SyncMode;
import com.datamate.datamanagement.interfaces.dto.CreateDatasetRequest;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.HashMap;
@@ -49,5 +50,9 @@ public class CreateCollectionTaskRequest {
@Schema(name = "scheduleExpression", description = "Cron调度表达式 (syncMode=SCHEDULED 时必填)", requiredMode = Schema.RequiredMode.NOT_REQUIRED)
@JsonProperty("scheduleExpression")
private String scheduleExpression;
/** 创建数据集参数 */
@Valid
private CreateDatasetRequest dataset;
}

View File

@@ -46,5 +46,8 @@ public class UpdateCollectionTaskRequest {
@Schema(name = "scheduleExpression", description = "Cron调度表达式 (syncMode=SCHEDULED 时必填)", requiredMode = Schema.RequiredMode.NOT_REQUIRED)
@JsonProperty("scheduleExpression")
private String scheduleExpression;
/** 数据集id */
private String datasetId;
}

View File

@@ -1,14 +1,21 @@
package com.datamate.collection.interfaces.rest;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.datamate.collection.application.CollectionTaskService;
import com.datamate.collection.domain.model.entity.CollectionTask;
import com.datamate.collection.interfaces.converter.CollectionTaskConverter;
import com.datamate.collection.interfaces.dto.*;
import com.datamate.common.interfaces.PagedResponse;
import com.datamate.datamanagement.application.DatasetApplicationService;
import com.datamate.datamanagement.domain.model.dataset.Dataset;
import com.datamate.datamanagement.interfaces.converter.DatasetConverter;
import jakarta.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.ResponseEntity;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.*;
import java.util.*;
@@ -21,12 +28,19 @@ public class CollectionTaskController{
private final CollectionTaskService taskService;
private final DatasetApplicationService datasetService;
@PostMapping
@Transactional
public ResponseEntity<CollectionTaskResponse> createTask(@Valid @RequestBody CreateCollectionTaskRequest request) {
CollectionTask task = CollectionTaskConverter.INSTANCE.toCollectionTask(request);
task.setId(UUID.randomUUID().toString());
task.addPath();
return ResponseEntity.ok().body(CollectionTaskConverter.INSTANCE.toResponse(taskService.create(task)));
String datasetId = null;
if (Objects.nonNull(request.getDataset())) {
datasetId = datasetService.createDataset(request.getDataset()).getId();
}
CollectionTaskResponse response = CollectionTaskConverter.INSTANCE.toResponse(taskService.create(task, datasetId));
response.setDataset(DatasetConverter.INSTANCE.convertToResponse(datasetService.getDataset(datasetId)));
return ResponseEntity.ok().body(response);
}
@PutMapping("/{id}")
@@ -36,7 +50,7 @@ public class CollectionTaskController{
}
CollectionTask task = CollectionTaskConverter.INSTANCE.toCollectionTask(request);
task.setId(id);
return ResponseEntity.ok(CollectionTaskConverter.INSTANCE.toResponse(taskService.update(task)));
return ResponseEntity.ok(CollectionTaskConverter.INSTANCE.toResponse(taskService.update(task, request.getDatasetId())));
}
@DeleteMapping("/{id}")
@@ -53,6 +67,10 @@ public class CollectionTaskController{
@GetMapping
public ResponseEntity<PagedResponse<CollectionTaskResponse>> getTasks(@Valid CollectionTaskPagingQuery query) {
return ResponseEntity.ok(CollectionTaskConverter.INSTANCE.toResponse(taskService.getTasks(query)));
Page<CollectionTask> page = new Page<>(query.getPage(), query.getSize());
LambdaQueryWrapper<CollectionTask> wrapper = new LambdaQueryWrapper<CollectionTask>()
.eq(query.getStatus() != null, CollectionTask::getStatus, query.getStatus())
.like(StringUtils.isNotBlank(query.getName()), CollectionTask::getName, query.getName());
return ResponseEntity.ok(CollectionTaskConverter.INSTANCE.toResponse(taskService.getTasks(page, wrapper)));
}
}

View File

@@ -53,7 +53,7 @@ public class TaskSchedulerInitializer {
// 到期,触发一次执行
TaskExecution exec = taskExecutionService.createExecution(task);
int timeout = task.getTimeoutSeconds() == null ? 3600 : task.getTimeoutSeconds();
taskExecutionService.runAsync(task, exec.getId(), timeout);
taskExecutionService.runAsync(task, exec.getId(), timeout, null);
log.info("Triggered DataX execution for task {} at {}, execId={}", task.getId(), now, exec.getId());
}
} catch (Exception ex) {

View File

@@ -28,6 +28,8 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -220,40 +222,28 @@ public class DatasetApplicationService {
public void processDataSourceAsync(String datasetId, String dataSourceId) {
try {
log.info("开始处理数据源文件扫描,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId);
// 1. 调用数据归集服务获取任务详情
CollectionTaskDetailResponse taskDetail = collectionTaskClient.getTaskDetail(dataSourceId).getData();
if (taskDetail == null) {
log.error("获取归集任务详情失败,任务ID: {}", dataSourceId);
return;
}
log.info("获取到归集任务详情: {}", taskDetail);
// 2. 解析任务配置
LocalCollectionConfig config = parseTaskConfig(taskDetail.getConfig());
if (config == null) {
log.error("解析任务配置失败,任务ID: {}", dataSourceId);
return;
}
// 4. 获取文件路径列表
List<String> filePaths = config.getFilePaths();
List<String> filePaths = getFilePaths(dataSourceId);
if (CollectionUtils.isEmpty(filePaths)) {
log.warn("文件路径列表为空,任务ID: {}", dataSourceId);
return;
}
log.info("开始扫描文件,共 {} 个文件路径", filePaths.size());
// 5. 扫描文件元数据
List<DatasetFile> datasetFiles = fileMetadataService.scanFiles(filePaths, datasetId);
// 查询数据集中已存在的文件
List<DatasetFile> existDatasetFileList = datasetFileRepository.findAllByDatasetId(datasetId);
Map<String, DatasetFile> existDatasetFilePathMap = existDatasetFileList.stream().collect(Collectors.toMap(DatasetFile::getFilePath, Function.identity()));
Dataset dataset = datasetRepository.getById(datasetId);
dataset.setFiles(existDatasetFileList);
// 6. 批量插入数据集文件表
// 批量同步数据集文件表
asyncDatasetFile(datasetFiles, existDatasetFilePathMap, dataset, existDatasetFileList, filePaths);
datasetRepository.updateById(dataset);
} catch (Exception e) {
log.error("处理数据源文件扫描失败,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId, e);
}
}
private void asyncDatasetFile(List<DatasetFile> datasetFiles, Map<String, DatasetFile> existDatasetFilePathMap, Dataset dataset, List<DatasetFile> existDatasetFileList, List<String> filePaths) {
if (CollectionUtils.isNotEmpty(datasetFiles)) {
for (DatasetFile datasetFile : datasetFiles) {
if (existDatasetFilePathMap.containsKey(datasetFile.getFilePath())) {
@@ -261,9 +251,11 @@ public class DatasetApplicationService {
dataset.removeFile(existDatasetFile);
existDatasetFile.setFileSize(datasetFile.getFileSize());
dataset.addFile(existDatasetFile);
dataset.active();
datasetFileRepository.updateById(existDatasetFile);
} else {
dataset.addFile(datasetFile);
dataset.active();
datasetFileRepository.save(datasetFile);
}
}
@@ -271,11 +263,33 @@ public class DatasetApplicationService {
} else {
log.warn("未扫描到有效文件");
}
datasetRepository.updateById(dataset);
} catch (Exception e) {
log.error("处理数据源文件扫描失败,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId, e);
for (DatasetFile datasetFile : existDatasetFileList) {
String existFilePath = datasetFile.getFilePath();
for (String filePath : filePaths) {
if (existFilePath.equals(filePath) || existFilePath.startsWith(filePath)) {
if (Files.notExists(Paths.get(existFilePath))) {
dataset.removeFile(datasetFile);
datasetFileRepository.removeById(datasetFile.getId());
}
}
}
}
}
private List<String> getFilePaths(String dataSourceId) {
CollectionTaskDetailResponse taskDetail = collectionTaskClient.getTaskDetail(dataSourceId).getData();
if (taskDetail == null) {
log.warn("获取归集任务详情失败,任务ID: {}", dataSourceId);
return Collections.emptyList();
}
log.info("获取到归集任务详情: {}", taskDetail);
LocalCollectionConfig config = parseTaskConfig(taskDetail.getConfig());
if (config == null) {
log.warn("解析任务配置失败,任务ID: {}", dataSourceId);
return Collections.emptyList();
}
return config.getFilePaths();
}
/**
* 解析任务配置

View File

@@ -10,7 +10,6 @@ import com.datamate.datamanagement.domain.contants.DatasetConstant;
import com.datamate.datamanagement.domain.model.dataset.Dataset;
import com.datamate.datamanagement.domain.model.dataset.DatasetFile;
import com.datamate.datamanagement.domain.model.dataset.DatasetFileUploadCheckInfo;
import com.datamate.datamanagement.domain.model.dataset.StatusConstants;
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetFileRepository;
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetRepository;
import com.datamate.datamanagement.interfaces.converter.DatasetConverter;
@@ -31,7 +30,6 @@ import org.springframework.data.domain.Pageable;
import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile;
import java.io.BufferedInputStream;
import java.io.File;
@@ -41,12 +39,9 @@ import java.net.MalformedURLException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.*;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
@@ -60,7 +55,6 @@ public class DatasetFileApplicationService {
private final DatasetFileRepository datasetFileRepository;
private final DatasetRepository datasetRepository;
private final Path fileStorageLocation;
private final FileService fileService;
@Value("${dataset.base.path:/dataset}")
@@ -68,61 +62,10 @@ public class DatasetFileApplicationService {
@Autowired
public DatasetFileApplicationService(DatasetFileRepository datasetFileRepository,
DatasetRepository datasetRepository, FileService fileService,
@Value("${app.file.upload-dir:./dataset}") String uploadDir) {
DatasetRepository datasetRepository, FileService fileService) {
this.datasetFileRepository = datasetFileRepository;
this.datasetRepository = datasetRepository;
this.fileStorageLocation = Paths.get(uploadDir).toAbsolutePath().normalize();
this.fileService = fileService;
try {
Files.createDirectories(this.fileStorageLocation);
} catch (Exception ex) {
throw new RuntimeException("Could not create the directory where the uploaded files will be stored.", ex);
}
}
/**
* 上传文件到数据集
*/
public DatasetFile uploadFile(String datasetId, MultipartFile file) {
Dataset dataset = datasetRepository.getById(datasetId);
if (dataset == null) {
throw new IllegalArgumentException("Dataset not found: " + datasetId);
}
String originalFilename = file.getOriginalFilename();
String fileName = originalFilename != null ? originalFilename : "file";
try {
// 保存文件到磁盘
Path targetLocation = this.fileStorageLocation.resolve(datasetId + File.separator + fileName);
// 确保目标目录存在
Files.createDirectories(targetLocation);
Files.copy(file.getInputStream(), targetLocation, StandardCopyOption.REPLACE_EXISTING);
// 创建文件实体(UUID 主键)
DatasetFile datasetFile = new DatasetFile();
datasetFile.setId(UUID.randomUUID().toString());
datasetFile.setDatasetId(datasetId);
datasetFile.setFileName(fileName);
datasetFile.setFilePath(targetLocation.toString());
datasetFile.setFileType(getFileExtension(originalFilename));
datasetFile.setFileSize(file.getSize());
datasetFile.setUploadTime(LocalDateTime.now());
datasetFile.setStatus(StatusConstants.DatasetFileStatuses.COMPLETED);
// 保存到数据库
datasetFileRepository.save(datasetFile);
// 更新数据集统计
dataset.addFile(datasetFile);
datasetRepository.updateById(dataset);
return datasetFileRepository.findByDatasetIdAndFileName(datasetId, fileName);
} catch (IOException ex) {
log.error("Could not store file {}", fileName, ex);
throw new RuntimeException("Could not store file " + fileName, ex);
}
}
/**
@@ -155,20 +98,21 @@ public class DatasetFileApplicationService {
/**
* 删除文件
*/
@Transactional
public void deleteDatasetFile(String datasetId, String fileId) {
DatasetFile file = getDatasetFile(datasetId, fileId);
Dataset dataset = datasetRepository.getById(datasetId);
// 删除文件时,上传到数据集中的文件会同时删除数据库中的记录和文件系统中的文件,归集过来的文件仅删除数据库中的记录
if (file.getFilePath().startsWith(dataset.getPath())) {
try {
Path filePath = Paths.get(file.getFilePath());
Files.deleteIfExists(filePath);
} catch (IOException ex) {
// ignore
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR);
}
}
datasetFileRepository.removeById(fileId);
Dataset dataset = datasetRepository.getById(datasetId);
// 简单刷新统计(精确处理可从DB统计)
dataset.setFileCount(Math.max(0, dataset.getFileCount() - 1));
dataset.setSizeBytes(Math.max(0, dataset.getSizeBytes() - (file.getFileSize() != null ? file.getFileSize() : 0)));
dataset.removeFile(file);
datasetRepository.updateById(dataset);
}
@@ -197,6 +141,7 @@ public class DatasetFileApplicationService {
@Transactional(readOnly = true)
public void downloadDatasetFileAsZip(String datasetId, HttpServletResponse response) {
List<DatasetFile> allByDatasetId = datasetFileRepository.findAllByDatasetId(datasetId);
fileRename(allByDatasetId);
response.setContentType("application/zip");
String zipName = String.format("dataset_%s.zip",
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss")));
@@ -211,6 +156,27 @@ public class DatasetFileApplicationService {
}
}
private void fileRename(List<DatasetFile> files) {
Set<String> uniqueFilenames = new HashSet<>();
for (DatasetFile file : files) {
String originalFilename = file.getFileName();
if (!uniqueFilenames.add(originalFilename)) {
String newFilename;
int counter = 1;
do {
newFilename = generateNewFilename(originalFilename, counter);
counter++;
} while (!uniqueFilenames.add(newFilename));
file.setFileName(newFilename);
}
}
}
private String generateNewFilename(String oldFilename, int counter) {
int dotIndex = oldFilename.lastIndexOf(".");
return oldFilename.substring(0, dotIndex) + "-(" + counter + ")" + oldFilename.substring(dotIndex);
}
private void addToZipFile(DatasetFile file, ZipOutputStream zos) throws IOException {
if (file.getFilePath() == null || !Files.exists(Paths.get(file.getFilePath()))) {
log.warn("The file hasn't been found on filesystem, id: {}", file.getId());
@@ -229,17 +195,6 @@ public class DatasetFileApplicationService {
}
}
private String getFileExtension(String fileName) {
if (fileName == null || fileName.isEmpty()) {
return null;
}
int lastDotIndex = fileName.lastIndexOf(".");
if (lastDotIndex == -1) {
return null;
}
return fileName.substring(lastDotIndex + 1);
}
/**
* 预上传
*
@@ -275,9 +230,6 @@ public class DatasetFileApplicationService {
public void chunkUpload(String datasetId, UploadFileRequest uploadFileRequest) {
FileUploadResult uploadResult = fileService.chunkUpload(DatasetConverter.INSTANCE.toChunkUploadRequest(uploadFileRequest));
saveFileInfoToDb(uploadResult, uploadFileRequest, datasetId);
if (uploadResult.isAllFilesUploaded()) {
// 解析文件,后续依据需求看是否添加校验文件元数据和解析半结构化文件的逻辑,
}
}
private void saveFileInfoToDb(FileUploadResult fileUploadResult, UploadFileRequest uploadFile, String datasetId) {
@@ -301,6 +253,7 @@ public class DatasetFileApplicationService {
datasetFileRepository.save(datasetFile);
dataset.addFile(datasetFile);
dataset.active();
datasetRepository.updateById(dataset);
}
}

View File

@@ -143,4 +143,10 @@ public class Dataset extends BaseEntity<String> {
this.updatedAt = LocalDateTime.now();
}
}
public void active() {
if (this.status == DatasetStatusType.DRAFT) {
this.status = DatasetStatusType.ACTIVE;
}
}
}

View File

@@ -7,7 +7,6 @@ import com.datamate.datamanagement.interfaces.dto.UploadFileRequest;
import com.datamate.common.domain.model.ChunkUploadRequest;
import com.datamate.datamanagement.domain.model.dataset.Dataset;
import com.datamate.datamanagement.domain.model.dataset.DatasetFile;
import com.datamate.datamanagement.interfaces.dto.*;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.factory.Mappers;

View File

@@ -10,11 +10,11 @@ import lombok.Setter;
@Setter
public class AllDatasetStatisticsResponse {
/** 总数据集数 */
private Integer totalDatasets;
private Integer totalDatasets = 0;
/** 总文件数 */
private Long totalSize;
private Long totalSize = 0L;
/** 总大小(字节) */
private Long totalFiles;
private Long totalFiles = 0L;
}

View File

@@ -3,6 +3,7 @@ package com.datamate.datamanagement.interfaces.dto;
import com.datamate.datamanagement.common.enums.DatasetType;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -19,9 +20,11 @@ import java.util.List;
@AllArgsConstructor
public class CreateDatasetRequest {
/** 数据集名称 */
@Size(min = 1, max = 100)
@NotBlank(message = "数据集名称不能为空")
private String name;
/** 数据集描述 */
@Size(max = 500)
private String description;
/** 数据集类型 */
@NotNull(message = "数据集类型不能为空")
@@ -30,6 +33,4 @@ public class CreateDatasetRequest {
private List<String> tags;
/** 数据源 */
private String dataSource;
/** 目标位置 */
private String targetLocation;
}

View File

@@ -24,6 +24,8 @@ public class DatasetResponse {
private String status;
/** 标签列表 */
private List<TagResponse> tags;
/** 数据集保留天数 */
private Integer retentionDays;
/** 数据源 */
private String dataSource;
/** 目标位置 */

View File

@@ -1,6 +1,8 @@
package com.datamate.datamanagement.interfaces.dto;
import com.datamate.datamanagement.common.enums.DatasetStatusType;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.Size;
import lombok.Getter;
import lombok.Setter;
@@ -13,8 +15,11 @@ import java.util.List;
@Setter
public class UpdateDatasetRequest {
/** 数据集名称 */
@Size(min = 1, max = 100)
@NotBlank(message = "数据集名称不能为空")
private String name;
/** 数据集描述 */
@Size(max = 500)
private String description;
/** 归集任务id */
private String dataSource;

View File

@@ -68,22 +68,6 @@ public class DatasetFileController {
return ResponseEntity.ok(Response.ok(response));
}
@PostMapping(consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public ResponseEntity<Response<DatasetFileResponse>> uploadDatasetFile(
@PathVariable("datasetId") String datasetId,
@RequestPart(value = "file", required = false) MultipartFile file) {
try {
DatasetFile datasetFile = datasetFileApplicationService.uploadFile(datasetId, file);
return ResponseEntity.status(HttpStatus.CREATED).body(Response.ok(DatasetConverter.INSTANCE.convertToResponse(datasetFile)));
} catch (IllegalArgumentException e) {
return ResponseEntity.badRequest().body(Response.error(SystemErrorCode.UNKNOWN_ERROR, null));
} catch (Exception e) {
log.error("upload fail", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(Response.error(SystemErrorCode.UNKNOWN_ERROR, null));
}
}
@GetMapping("/{fileId}")
public ResponseEntity<Response<DatasetFileResponse>> getDatasetFileById(
@PathVariable("datasetId") String datasetId,
@@ -109,9 +93,8 @@ public class DatasetFileController {
}
@IgnoreResponseWrap
@GetMapping(value = "/{fileId}/download", produces = MediaType.APPLICATION_OCTET_STREAM_VALUE)
public ResponseEntity<Resource> downloadDatasetFileById(
@PathVariable("datasetId") String datasetId,
@GetMapping(value = "/{fileId}/download", produces = MediaType.APPLICATION_OCTET_STREAM_VALUE + ";charset=UTF-8")
public ResponseEntity<Resource> downloadDatasetFileById(@PathVariable("datasetId") String datasetId,
@PathVariable("fileId") String fileId) {
try {
DatasetFile datasetFile = datasetFileApplicationService.getDatasetFile(datasetId, fileId);
@@ -142,8 +125,8 @@ public class DatasetFileController {
* @return 批量上传请求id
*/
@PostMapping("/upload/pre-upload")
public ResponseEntity<Response<String>> preUpload(@PathVariable("datasetId") String datasetId, @RequestBody @Valid UploadFilesPreRequest request) {
public ResponseEntity<Response<String>> preUpload(@PathVariable("datasetId") String datasetId,
@RequestBody @Valid UploadFilesPreRequest request) {
return ResponseEntity.ok(Response.ok(datasetFileApplicationService.preUpload(request, datasetId)));
}
@@ -153,7 +136,7 @@ public class DatasetFileController {
* @param uploadFileRequest 上传文件请求
*/
@PostMapping("/upload/chunk")
public ResponseEntity<Void> chunkUpload(@PathVariable("datasetId") String datasetId, UploadFileRequest uploadFileRequest) {
public ResponseEntity<Void> chunkUpload(@PathVariable("datasetId") String datasetId, @Valid UploadFileRequest uploadFileRequest) {
log.info("file upload reqId:{}, fileNo:{}, total chunk num:{}, current chunkNo:{}",
uploadFileRequest.getReqId(), uploadFileRequest.getFileNo(), uploadFileRequest.getTotalChunkNum(),
uploadFileRequest.getChunkNo());

View File

@@ -0,0 +1,65 @@
package com.datamate.datamanagement.interfaces.scheduler;
import com.datamate.common.interfaces.PagedResponse;
import com.datamate.datamanagement.application.DatasetApplicationService;
import com.datamate.datamanagement.interfaces.dto.DatasetPagingQuery;
import com.datamate.datamanagement.interfaces.dto.DatasetResponse;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* 数据集定时任务触发
*
* @since 2025/10/24
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class DatasetScheduler {
private final DatasetApplicationService datasetApplicationService;
/**
* 每天凌晨 00:00 扫描并删除超出保留期的数据集
*/
@Scheduled(cron = "0 0 0 * * ?")
public void cleanupExpiredDatasets() {
int pageNo = 1;
int pageSize = 500;
while (true) {
DatasetPagingQuery datasetPagingQuery = new DatasetPagingQuery();
datasetPagingQuery.setPage(pageNo);
datasetPagingQuery.setSize(pageSize);
PagedResponse<DatasetResponse> datasets = datasetApplicationService.getDatasets(datasetPagingQuery);
if (CollectionUtils.isEmpty(datasets.getContent())) {
break;
}
datasets.getContent().forEach(dataset -> {
Integer retentionDays = dataset.getRetentionDays();
LocalDateTime createdAt = dataset.getCreatedAt();
if (retentionDays != null && retentionDays > 0 && createdAt != null) {
LocalDateTime expireAt = createdAt.plusDays(retentionDays);
if (expireAt.isBefore(LocalDateTime.now())) {
try {
log.info("Deleting dataset {}, expired at {} (retentionDays={})", dataset.getId(), expireAt, retentionDays);
datasetApplicationService.deleteDataset(dataset.getId());
} catch (Exception e) {
log.warn("Failed to delete expired dataset {}: {}", dataset.getId(), e.getMessage());
}
}
}
});
if (datasets.getPage() >= datasets.getTotalPages()) {
break;
}
pageNo++;
}
}
}