feature:数据集导入数据集支持选择归集任务导入 (#92)

* feature: 实现obs归集

* feature: 增加数据集中出现同名文件时的处理方式

* feature: 前端数据集导入数据时增加可以选择归集任务导入
This commit is contained in:
hefanli
2025-11-19 11:05:33 +08:00
committed by GitHub
parent 4506fa8a91
commit a07fba23f2
12 changed files with 168 additions and 209 deletions

View File

@@ -11,14 +11,12 @@ import com.datamate.datamanagement.domain.model.dataset.DatasetFile;
import com.datamate.datamanagement.domain.model.dataset.Tag;
import com.datamate.datamanagement.infrastructure.client.CollectionTaskClient;
import com.datamate.datamanagement.infrastructure.client.dto.CollectionTaskDetailResponse;
import com.datamate.datamanagement.infrastructure.client.dto.LocalCollectionConfig;
import com.datamate.datamanagement.infrastructure.exception.DataManagementErrorCode;
import com.datamate.datamanagement.infrastructure.persistence.mapper.TagMapper;
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetFileRepository;
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetRepository;
import com.datamate.datamanagement.interfaces.converter.DatasetConverter;
import com.datamate.datamanagement.interfaces.dto.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
@@ -28,11 +26,13 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* 数据集应用服务(对齐 DB schema,使用 UUID 字符串主键)
@@ -46,8 +46,7 @@ public class DatasetApplicationService {
private final TagMapper tagMapper;
private final DatasetFileRepository datasetFileRepository;
private final CollectionTaskClient collectionTaskClient;
private final FileMetadataService fileMetadataService;
private final ObjectMapper objectMapper;
private final DatasetFileApplicationService datasetFileApplicationService;
@Value("${datamate.data-management.base-path:/dataset}")
private String datasetBasePath;
@@ -223,68 +222,38 @@ public class DatasetApplicationService {
@Async
public void processDataSourceAsync(String datasetId, String dataSourceId) {
try {
log.info("开始处理数据源文件扫描,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId);
log.info("Initiating data source file scanning, dataset ID: {}, collection task ID: {}", datasetId, dataSourceId);
List<String> filePaths = getFilePaths(dataSourceId);
if (CollectionUtils.isEmpty(filePaths)) {
return;
}
log.info("开始扫描文件,共 {} 个文件路径", filePaths.size());
List<DatasetFile> datasetFiles = fileMetadataService.scanFiles(filePaths, datasetId);
// 查询数据集中已存在的文件
List<DatasetFile> existDatasetFileList = datasetFileRepository.findAllByDatasetId(datasetId);
Map<String, DatasetFile> existDatasetFilePathMap = existDatasetFileList.stream().collect(Collectors.toMap(DatasetFile::getFilePath, Function.identity()));
Dataset dataset = datasetRepository.getById(datasetId);
dataset.setFiles(existDatasetFileList);
// 批量同步数据集文件表
asyncDatasetFile(datasetFiles, existDatasetFilePathMap, dataset, existDatasetFileList, filePaths);
datasetRepository.updateById(dataset);
log.info("Starting file scan, total files: {}", filePaths.size());
datasetFileApplicationService.copyFilesToDatasetDir(datasetId, new CopyFilesRequest(filePaths));
} catch (Exception e) {
log.error("处理数据源文件扫描失败,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId, e);
}
}
private void asyncDatasetFile(List<DatasetFile> datasetFiles, Map<String, DatasetFile> existDatasetFilePathMap, Dataset dataset, List<DatasetFile> existDatasetFileList, List<String> filePaths) {
if (CollectionUtils.isNotEmpty(datasetFiles)) {
for (DatasetFile datasetFile : datasetFiles) {
if (existDatasetFilePathMap.containsKey(datasetFile.getFilePath())) {
DatasetFile existDatasetFile = existDatasetFilePathMap.get(datasetFile.getFilePath());
dataset.removeFile(existDatasetFile);
existDatasetFile.setFileSize(datasetFile.getFileSize());
dataset.addFile(existDatasetFile);
dataset.active();
datasetFileRepository.updateById(existDatasetFile);
} else {
dataset.addFile(datasetFile);
dataset.active();
datasetFileRepository.save(datasetFile);
}
}
log.info("文件元数据写入完成,共写入 {} 条记录", datasetFiles.size());
} else {
log.warn("未扫描到有效文件");
}
for (DatasetFile datasetFile : existDatasetFileList) {
String existFilePath = datasetFile.getFilePath();
for (String filePath : filePaths) {
if (existFilePath.equals(filePath) || existFilePath.startsWith(filePath)) {
if (Files.notExists(Paths.get(existFilePath))) {
dataset.removeFile(datasetFile);
datasetFileRepository.removeById(datasetFile.getId());
}
}
}
}
}
private List<String> getFilePaths(String dataSourceId) {
CollectionTaskDetailResponse taskDetail = collectionTaskClient.getTaskDetail(dataSourceId).getData();
if (taskDetail == null) {
log.warn("获取归集任务详情失败,任务ID: {}", dataSourceId);
log.warn("Fail to get collection task detail, task ID: {}", dataSourceId);
return Collections.emptyList();
}
Path targetPath = Paths.get(taskDetail.getTargetPath());
if (!Files.exists(targetPath) || !Files.isDirectory(targetPath)) {
log.warn("Target path not exists or is not a directory: {}", taskDetail.getTargetPath());
return Collections.emptyList();
}
try (Stream<Path> paths = Files.walk(targetPath, 1)) {
return paths
.filter(Files::isRegularFile) // 只保留文件,排除目录
.map(Path::toString) // 转换为字符串路径
.collect(Collectors.toList());
} catch (IOException e) {
log.error("Fail to scan directory: {}", targetPath, e);
return Collections.emptyList();
}
log.info("获取到归集任务详情: {}", taskDetail);
return Collections.singletonList(taskDetail.getTargetPath());
}
}

View File

@@ -7,10 +7,12 @@ import com.datamate.common.domain.utils.AnalyzerUtils;
import com.datamate.common.infrastructure.exception.BusinessAssert;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.SystemErrorCode;
import com.datamate.datamanagement.common.enums.DuplicateMethod;
import com.datamate.datamanagement.domain.contants.DatasetConstant;
import com.datamate.datamanagement.domain.model.dataset.Dataset;
import com.datamate.datamanagement.domain.model.dataset.DatasetFile;
import com.datamate.datamanagement.domain.model.dataset.DatasetFileUploadCheckInfo;
import com.datamate.datamanagement.infrastructure.exception.DataManagementErrorCode;
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetFileRepository;
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetRepository;
import com.datamate.datamanagement.interfaces.converter.DatasetConverter;
@@ -45,6 +47,8 @@ import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
@@ -63,6 +67,9 @@ public class DatasetFileApplicationService {
@Value("${datamate.data-management.base-path:/dataset}")
private String datasetBasePath;
@Value("${datamate.data-management.file.duplicate:COVER}")
private DuplicateMethod duplicateMethod;
@Autowired
public DatasetFileApplicationService(DatasetFileRepository datasetFileRepository,
DatasetRepository datasetRepository, FileService fileService) {
@@ -254,13 +261,36 @@ public class DatasetFileApplicationService {
.filePath(savedFile.getPath())
.fileType(AnalyzerUtils.getExtension(uploadFile.getFileName()))
.build();
datasetFileRepository.save(datasetFile);
dataset.setFiles(datasetFileRepository.findAllByDatasetId(datasetId));
setDatasetFileId(datasetFile, dataset);
datasetFileRepository.saveOrUpdate(datasetFile);
dataset.addFile(datasetFile);
dataset.active();
datasetRepository.updateById(dataset);
}
/**
* 为数据集文件设置文件id
*
* @param datasetFile 要设置id的文件
* @param dataset 数据集(包含文件列表)
*/
private void setDatasetFileId(DatasetFile datasetFile, Dataset dataset) {
Map<String, DatasetFile> existDatasetFilMap = dataset.getFiles().stream().collect(Collectors.toMap(DatasetFile::getFilePath, Function.identity()));
DatasetFile existDatasetFile = existDatasetFilMap.get(datasetFile.getFilePath());
if (Objects.isNull(existDatasetFile)) {
return;
}
if (duplicateMethod == DuplicateMethod.ERROR) {
log.error("file {} already exists in dataset {}", datasetFile.getFileName(), datasetFile.getDatasetId());
throw BusinessException.of(DataManagementErrorCode.DATASET_FILE_ALREADY_EXISTS);
}
if (duplicateMethod == DuplicateMethod.COVER) {
dataset.removeFile(existDatasetFile);
datasetFile.setId(existDatasetFile.getId());
}
}
/**
* 复制文件到数据集目录
*
@@ -273,6 +303,8 @@ public class DatasetFileApplicationService {
Dataset dataset = datasetRepository.getById(datasetId);
BusinessAssert.notNull(dataset, SystemErrorCode.RESOURCE_NOT_FOUND);
List<DatasetFile> copiedFiles = new ArrayList<>();
List<DatasetFile> existDatasetFiles = datasetFileRepository.findAllByDatasetId(datasetId);
dataset.setFiles(existDatasetFiles);
for (String sourceFilePath : req.sourcePaths()) {
Path sourcePath = Paths.get(sourceFilePath);
if (!Files.exists(sourcePath) || !Files.isRegularFile(sourcePath)) {
@@ -292,10 +324,11 @@ public class DatasetFileApplicationService {
.uploadTime(currentTime)
.lastAccessTime(currentTime)
.build();
setDatasetFileId(datasetFile, dataset);
dataset.addFile(datasetFile);
copiedFiles.add(datasetFile);
}
datasetFileRepository.saveBatch(copiedFiles, 100);
datasetFileRepository.saveOrUpdateBatch(copiedFiles, 100);
dataset.active();
datasetRepository.updateById(dataset);
CompletableFuture.runAsync(() -> copyFilesToDatasetDir(req.sourcePaths(), dataset));

View File

@@ -1,127 +0,0 @@
package com.datamate.datamanagement.application;
import com.datamate.datamanagement.domain.model.dataset.DatasetFile;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
/**
* 文件元数据扫描服务
*/
@Slf4j
@Service
public class FileMetadataService {
/**
* 扫描文件路径列表,提取文件元数据
* @param datasetId 数据集ID
* @return 数据集文件列表
*/
public List<DatasetFile> scanFiles(List<String> filePaths, String datasetId) {
List<DatasetFile> datasetFiles = new ArrayList<>();
if (filePaths == null || filePaths.isEmpty()) {
log.warn("文件路径列表为空,跳过扫描");
return datasetFiles;
}
for (String filePath : filePaths) {
try {
Path path = Paths.get(filePath);
if (!Files.exists(path)) {
log.warn("路径不存在: {}", filePath);
continue;
}
if (Files.isDirectory(path)) {
scanDirectory(datasetId, filePath, path, datasetFiles);
} else {
// 如果是文件,直接处理
DatasetFile datasetFile = extractFileMetadata(filePath, datasetId);
if (datasetFile != null) {
datasetFiles.add(datasetFile);
}
}
} catch (Exception e) {
log.error("扫描路径失败: {}, 错误: {}", filePath, e.getMessage(), e);
}
}
log.info("文件扫描完成,共扫描 {} 个文件", datasetFiles.size());
return datasetFiles;
}
private void scanDirectory(String datasetId, String filePath, Path path,
List<DatasetFile> datasetFiles) throws IOException {
// 如果是目录,扫描该目录下的所有文件(非递归)
List<Path> filesInDir = Files.list(path)
.filter(Files::isRegularFile)
.toList();
for (Path file : filesInDir) {
try {
DatasetFile datasetFile = extractFileMetadata(file.toString(), datasetId);
if (datasetFile != null) {
datasetFiles.add(datasetFile);
}
} catch (Exception e) {
log.error("处理目录中的文件失败: {}, 错误: {}", file, e.getMessage(), e);
}
}
log.info("已扫描目录 {} 下的 {} 个文件", filePath, filesInDir.size());
}
/**
* @param filePath 文件路径
* @param datasetId 数据集ID
* @return 数据集文件对象
*/
private DatasetFile extractFileMetadata(String filePath, String datasetId) throws IOException {
Path path = Paths.get(filePath);
if (!Files.exists(path)) {
log.warn("文件不存在: {}", filePath);
return null;
}
if (!Files.isRegularFile(path)) {
log.warn("路径不是文件: {}", filePath);
return null;
}
String fileName = path.getFileName().toString();
long fileSize = Files.size(path);
String fileType = getFileExtension(fileName);
return DatasetFile.builder()
.id(UUID.randomUUID().toString())
.datasetId(datasetId)
.fileName(fileName)
.filePath(filePath)
.fileSize(fileSize)
.fileType(fileType)
.uploadTime(LocalDateTime.now())
.lastAccessTime(LocalDateTime.now())
.status("ACTIVE")
.build();
}
/**
* 获取文件扩展名
*/
private String getFileExtension(String fileName) {
int lastDotIndex = fileName.lastIndexOf('.');
if (lastDotIndex > 0 && lastDotIndex < fileName.length() - 1) {
return fileName.substring(lastDotIndex + 1).toLowerCase();
}
return "unknown";
}
}

View File

@@ -0,0 +1,11 @@
package com.datamate.datamanagement.common.enums;
/**
* 文件重名时的处理方式
*
* @since 2025/11/18
*/
public enum DuplicateMethod {
ERROR,
COVER
}

View File

@@ -32,7 +32,11 @@ public enum DataManagementErrorCode implements ErrorCode {
/**
* 数据集标签已存在
*/
DATASET_TAG_ALREADY_EXISTS("data_management.0005", "数据集标签已存在");
DATASET_TAG_ALREADY_EXISTS("data_management.0005", "数据集标签已存在"),
/**
* 数据集标签已存在
*/
DATASET_FILE_ALREADY_EXISTS("data_management.0006", "数据集文件已存在");
private final String code;
private final String message;

View File

@@ -33,4 +33,6 @@ public class CreateDatasetRequest {
private List<String> tags;
/** 数据源 */
private String dataSource;
/** 保留天数 */
private Integer retentionDays;
}