feat(data-management): 实现数据集文件版本管理和内部路径保护

- 将数据集文件查询方法替换为只查询可见文件的版本
- 引入文件状态管理(ACTIVE/ARCHIVED)和内部目录结构
- 实现文件重复处理策略,支持版本控制模式而非覆盖
- 添加内部数据目录保护,防止访问.datamate等系统目录
- 重构文件上传流程,引入暂存目录和事务后清理机制
- 实现文件版本归档功能,保留历史版本到专用存储位置
- 优化文件路径规范化和安全验证逻辑
- 修复文件删除逻辑,确保归档文件不会被错误移除
- 更新数据集压缩下载功能以排除内部系统文件
This commit is contained in:
2026-02-04 23:53:35 +08:00
parent 473f4e717f
commit d0972cbc9d
16 changed files with 1141 additions and 484 deletions

View File

@@ -164,7 +164,7 @@ public class DatasetApplicationService {
public Dataset getDataset(String datasetId) {
Dataset dataset = datasetRepository.getById(datasetId);
BusinessAssert.notNull(dataset, DataManagementErrorCode.DATASET_NOT_FOUND);
List<DatasetFile> datasetFiles = datasetFileRepository.findAllByDatasetId(datasetId);
List<DatasetFile> datasetFiles = datasetFileRepository.findAllVisibleByDatasetId(datasetId);
dataset.setFiles(datasetFiles);
applyVisibleFileCounts(Collections.singletonList(dataset));
return dataset;
@@ -439,7 +439,7 @@ public class DatasetApplicationService {
Map<String, Object> statistics = new HashMap<>();
List<DatasetFile> allFiles = datasetFileRepository.findAllByDatasetId(datasetId);
List<DatasetFile> allFiles = datasetFileRepository.findAllVisibleByDatasetId(datasetId);
List<DatasetFile> visibleFiles = filterVisibleFiles(allFiles);
long totalFiles = visibleFiles.size();
long completedFiles = visibleFiles.stream()

View File

@@ -58,7 +58,6 @@ import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -83,6 +82,11 @@ public class DatasetFileApplicationService {
XLSX_FILE_TYPE
);
private static final String DERIVED_METADATA_KEY = "derived_from_file_id";
private static final String FILE_STATUS_ACTIVE = "ACTIVE";
private static final String FILE_STATUS_ARCHIVED = "ARCHIVED";
private static final String INTERNAL_DIR_NAME = ".datamate";
private static final String INTERNAL_UPLOAD_DIR_NAME = "uploading";
private static final String INTERNAL_VERSIONS_DIR_NAME = "versions";
private final DatasetFileRepository datasetFileRepository;
private final DatasetRepository datasetRepository;
@@ -93,7 +97,7 @@ public class DatasetFileApplicationService {
@Value("${datamate.data-management.base-path:/dataset}")
private String datasetBasePath;
@Value("${datamate.data-management.file.duplicate:COVER}")
@Value("${datamate.data-management.file.duplicate:VERSION}")
private DuplicateMethod duplicateMethod;
@Autowired
@@ -162,9 +166,19 @@ public class DatasetFileApplicationService {
if (dataset == null) {
return PagedResponse.of(new Page<>(page, size));
}
String datasetPath = dataset.getPath();
Path queryPath = Path.of(dataset.getPath() + File.separator + prefix);
Map<String, DatasetFile> datasetFilesMap = datasetFileRepository.findAllByDatasetId(datasetId)
Path datasetRoot = Paths.get(dataset.getPath()).toAbsolutePath().normalize();
prefix = Optional.ofNullable(prefix).orElse("").trim().replace("\\", "/");
while (prefix.startsWith("/")) {
prefix = prefix.substring(1);
}
if (prefix.equals(INTERNAL_DIR_NAME) || prefix.startsWith(INTERNAL_DIR_NAME + "/")) {
return new PagedResponse<>(page, size, 0, 0, Collections.emptyList());
}
Path queryPath = datasetRoot.resolve(prefix.replace("/", File.separator)).normalize();
if (!queryPath.startsWith(datasetRoot)) {
return new PagedResponse<>(page, size, 0, 0, Collections.emptyList());
}
Map<String, DatasetFile> datasetFilesMap = datasetFileRepository.findAllVisibleByDatasetId(datasetId)
.stream()
.filter(file -> file.getFilePath() != null)
.collect(Collectors.toMap(
@@ -186,7 +200,8 @@ public class DatasetFileApplicationService {
}
try (Stream<Path> pathStream = Files.list(queryPath)) {
List<Path> allFiles = pathStream
.filter(path -> path.toString().startsWith(datasetPath))
.filter(path -> path.toAbsolutePath().normalize().startsWith(datasetRoot))
.filter(path -> !isInternalDatasetPath(datasetRoot, path))
.filter(path -> !excludeDerivedFiles
|| Files.isDirectory(path)
|| !derivedFilePaths.contains(normalizeFilePath(path.toString())))
@@ -298,6 +313,86 @@ public class DatasetFileApplicationService {
}
}
private boolean isSameNormalizedPath(String left, String right) {
String normalizedLeft = normalizeFilePath(left);
String normalizedRight = normalizeFilePath(right);
if (normalizedLeft == null || normalizedRight == null) {
return false;
}
return normalizedLeft.equals(normalizedRight);
}
private boolean isInternalDatasetPath(Path datasetRoot, Path path) {
if (datasetRoot == null || path == null) {
return false;
}
try {
Path normalizedRoot = datasetRoot.toAbsolutePath().normalize();
Path normalizedPath = path.toAbsolutePath().normalize();
if (!normalizedPath.startsWith(normalizedRoot)) {
return false;
}
Path relative = normalizedRoot.relativize(normalizedPath);
if (relative.getNameCount() == 0) {
return false;
}
return INTERNAL_DIR_NAME.equals(relative.getName(0).toString());
} catch (Exception e) {
return false;
}
}
private String normalizeLogicalPrefix(String prefix) {
if (prefix == null) {
return "";
}
String normalized = prefix.trim().replace("\\", "/");
while (normalized.startsWith("/")) {
normalized = normalized.substring(1);
}
while (normalized.endsWith("/")) {
normalized = normalized.substring(0, normalized.length() - 1);
}
while (normalized.contains("//")) {
normalized = normalized.replace("//", "/");
}
return normalized;
}
private String normalizeLogicalPath(String logicalPath) {
return normalizeLogicalPrefix(logicalPath);
}
private String joinLogicalPath(String prefix, String relativePath) {
String normalizedPrefix = normalizeLogicalPrefix(prefix);
String normalizedRelative = normalizeLogicalPath(relativePath);
if (normalizedPrefix.isEmpty()) {
return normalizedRelative;
}
if (normalizedRelative.isEmpty()) {
return normalizedPrefix;
}
return normalizeLogicalPath(normalizedPrefix + "/" + normalizedRelative);
}
private void assertNotInternalPrefix(String prefix) {
if (prefix == null || prefix.isBlank()) {
return;
}
String normalized = normalizeLogicalPrefix(prefix);
if (normalized.equals(INTERNAL_DIR_NAME) || normalized.startsWith(INTERNAL_DIR_NAME + "/")) {
throw BusinessException.of(CommonErrorCode.PARAM_ERROR);
}
}
private boolean isArchivedStatus(DatasetFile datasetFile) {
if (datasetFile == null) {
return false;
}
String status = datasetFile.getStatus();
return status != null && FILE_STATUS_ARCHIVED.equalsIgnoreCase(status);
}
private boolean isSourceDocument(DatasetFile datasetFile) {
if (datasetFile == null) {
return false;
@@ -327,6 +422,144 @@ public class DatasetFileApplicationService {
}
}
private Path resolveDatasetRootPath(Dataset dataset, String datasetId) {
String datasetPath = dataset == null ? null : dataset.getPath();
if (datasetPath == null || datasetPath.isBlank()) {
datasetPath = datasetBasePath + File.separator + datasetId;
if (dataset != null) {
dataset.setPath(datasetPath);
datasetRepository.updateById(dataset);
}
}
Path datasetRoot = Paths.get(datasetPath).toAbsolutePath().normalize();
try {
Files.createDirectories(datasetRoot);
} catch (IOException e) {
log.error("Failed to create dataset root dir: {}", datasetRoot, e);
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR);
}
return datasetRoot;
}
private Path resolveStagingRootPath(Path datasetRoot,
DatasetFileUploadCheckInfo checkInfo,
List<FileUploadResult> uploadedFiles) {
if (datasetRoot == null) {
return null;
}
String stagingPath = checkInfo == null ? null : checkInfo.getStagingPath();
if (stagingPath != null && !stagingPath.isBlank()) {
try {
Path stagingRoot = Paths.get(stagingPath).toAbsolutePath().normalize();
if (!stagingRoot.startsWith(datasetRoot)) {
log.warn("Staging root out of dataset root, datasetId={}, stagingRoot={}, datasetRoot={}",
checkInfo == null ? null : checkInfo.getDatasetId(), stagingRoot, datasetRoot);
return null;
}
Path relative = datasetRoot.relativize(stagingRoot);
if (relative.getNameCount() < 3) {
return null;
}
if (!INTERNAL_DIR_NAME.equals(relative.getName(0).toString())
|| !INTERNAL_UPLOAD_DIR_NAME.equals(relative.getName(1).toString())) {
return null;
}
return stagingRoot;
} catch (Exception e) {
log.warn("Invalid staging path: {}", stagingPath, e);
return null;
}
}
if (uploadedFiles == null || uploadedFiles.isEmpty()) {
return null;
}
FileUploadResult firstResult = uploadedFiles.get(0);
File firstFile = firstResult == null ? null : firstResult.getSavedFile();
if (firstFile == null) {
return null;
}
try {
return Paths.get(firstFile.getParent()).toAbsolutePath().normalize();
} catch (Exception e) {
return null;
}
}
private void scheduleCleanupStagingDirAfterCommit(Path stagingRoot) {
if (stagingRoot == null) {
return;
}
Runnable cleanup = () -> deleteDirectoryRecursivelyQuietly(stagingRoot);
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
cleanup.run();
}
});
return;
}
cleanup.run();
}
private void deleteDirectoryRecursivelyQuietly(Path directory) {
if (directory == null) {
return;
}
if (!Files.exists(directory)) {
return;
}
try (Stream<Path> paths = Files.walk(directory)) {
paths.sorted(Comparator.reverseOrder()).forEach(path -> {
try {
Files.deleteIfExists(path);
} catch (IOException e) {
log.debug("Failed to delete: {}", path, e);
}
});
} catch (IOException e) {
log.debug("Failed to cleanup staging dir: {}", directory, e);
}
}
private String sanitizeArchiveFileName(String fileName) {
String input = fileName == null ? "" : fileName.trim();
if (input.isBlank()) {
return "file";
}
StringBuilder builder = new StringBuilder(input.length());
for (int i = 0; i < input.length(); i++) {
char c = input.charAt(i);
if (c <= 31 || c == 127) {
builder.append('_');
continue;
}
if (c == '/' || c == '\\' || c == ':' || c == '*' || c == '?' || c == '\"'
|| c == '<' || c == '>' || c == '|') {
builder.append('_');
continue;
}
builder.append(c);
}
String sanitized = builder.toString().trim();
return sanitized.isEmpty() ? "file" : sanitized;
}
private String sha256Hex(String value) {
String input = value == null ? "" : value;
try {
java.security.MessageDigest digest = java.security.MessageDigest.getInstance("SHA-256");
byte[] hashed = digest.digest(input.getBytes(java.nio.charset.StandardCharsets.UTF_8));
StringBuilder builder = new StringBuilder(hashed.length * 2);
for (byte b : hashed) {
builder.append(String.format("%02x", b));
}
return builder.toString();
} catch (Exception e) {
return Integer.toHexString(input.hashCode());
}
}
/**
* 获取文件详情
*/
@@ -349,10 +582,12 @@ public class DatasetFileApplicationService {
public void deleteDatasetFile(String datasetId, String fileId) {
DatasetFile file = getDatasetFile(datasetId, fileId);
Dataset dataset = datasetRepository.getById(datasetId);
dataset.setFiles(new ArrayList<>(Collections.singleton(file)));
datasetFileRepository.removeById(fileId);
dataset.removeFile(file);
datasetRepository.updateById(dataset);
if (!isArchivedStatus(file)) {
dataset.setFiles(new ArrayList<>(Collections.singleton(file)));
dataset.removeFile(file);
datasetRepository.updateById(dataset);
}
datasetFilePreviewService.deletePreviewFileQuietly(datasetId, fileId);
// 删除文件时,上传到数据集中的文件会同时删除数据库中的记录和文件系统中的文件,归集过来的文件仅删除数据库中的记录
if (file.getFilePath().startsWith(dataset.getPath())) {
@@ -393,18 +628,26 @@ public class DatasetFileApplicationService {
if (Objects.isNull(dataset)) {
throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND);
}
List<DatasetFile> allByDatasetId = datasetFileRepository.findAllByDatasetId(datasetId);
Set<String> filePaths = allByDatasetId.stream().map(DatasetFile::getFilePath).collect(Collectors.toSet());
String datasetPath = dataset.getPath();
Path downloadPath = Path.of(datasetPath);
Path datasetRoot = Paths.get(dataset.getPath()).toAbsolutePath().normalize();
Set<Path> filePaths = datasetFileRepository.findAllVisibleByDatasetId(datasetId).stream()
.map(DatasetFile::getFilePath)
.filter(Objects::nonNull)
.map(path -> Paths.get(path).toAbsolutePath().normalize())
.filter(path -> path.startsWith(datasetRoot))
.filter(path -> !isInternalDatasetPath(datasetRoot, path))
.collect(Collectors.toSet());
Path downloadPath = datasetRoot;
response.setContentType("application/zip");
String zipName = String.format("dataset_%s.zip",
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss")));
response.setHeader(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=" + zipName);
try (ZipArchiveOutputStream zos = new ZipArchiveOutputStream(response.getOutputStream())) {
try (Stream<Path> pathStream = Files.walk(downloadPath)) {
List<Path> allPaths = pathStream.filter(path -> path.toString().startsWith(datasetPath))
.filter(path -> filePaths.stream().anyMatch(filePath -> filePath.startsWith(path.toString())))
List<Path> allPaths = pathStream
.map(path -> path.toAbsolutePath().normalize())
.filter(path -> path.startsWith(datasetRoot))
.filter(path -> !isInternalDatasetPath(datasetRoot, path))
.filter(path -> filePaths.stream().anyMatch(filePath -> filePath.startsWith(path)))
.toList();
for (Path path : allPaths) {
addToZipFile(path, downloadPath, zos);
@@ -461,29 +704,33 @@ public class DatasetFileApplicationService {
throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND);
}
// 构建上传路径,如果有 prefix 则追加到路径中
String prefix = Optional.ofNullable(chunkUploadRequest.getPrefix()).orElse("").trim();
prefix = prefix.replace("\\", "/");
while (prefix.startsWith("/")) {
prefix = prefix.substring(1);
}
String prefix = normalizeLogicalPrefix(chunkUploadRequest == null ? null : chunkUploadRequest.getPrefix());
assertNotInternalPrefix(prefix);
String uploadPath = dataset.getPath();
if (uploadPath == null || uploadPath.isBlank()) {
uploadPath = datasetBasePath + File.separator + datasetId;
}
if (!prefix.isEmpty()) {
uploadPath = uploadPath + File.separator + prefix.replace("/", File.separator);
Path datasetRoot = resolveDatasetRootPath(dataset, datasetId);
Path stagingRoot = datasetRoot
.resolve(INTERNAL_DIR_NAME)
.resolve(INTERNAL_UPLOAD_DIR_NAME)
.resolve(UUID.randomUUID().toString())
.toAbsolutePath()
.normalize();
BusinessAssert.isTrue(stagingRoot.startsWith(datasetRoot), CommonErrorCode.PARAM_ERROR);
try {
Files.createDirectories(stagingRoot);
} catch (IOException e) {
log.error("Failed to create staging dir: {}", stagingRoot, e);
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR);
}
ChunkUploadPreRequest request = ChunkUploadPreRequest.builder().build();
request.setUploadPath(uploadPath);
request.setUploadPath(stagingRoot.toString());
request.setTotalFileNum(chunkUploadRequest.getTotalFileNum());
request.setServiceId(DatasetConstant.SERVICE_ID);
DatasetFileUploadCheckInfo checkInfo = new DatasetFileUploadCheckInfo();
checkInfo.setDatasetId(datasetId);
checkInfo.setHasArchive(chunkUploadRequest.isHasArchive());
checkInfo.setPrefix(prefix);
checkInfo.setStagingPath(stagingRoot.toString());
try {
ObjectMapper objectMapper = new ObjectMapper();
String checkInfoJson = objectMapper.writeValueAsString(checkInfo);
@@ -535,32 +782,251 @@ public class DatasetFileApplicationService {
} else {
files = Collections.singletonList(fileUploadResult);
}
addFileToDataset(datasetId, files);
commitUploadedFiles(datasetId, checkInfo, files, fileUploadResult.isAllFilesUploaded());
}
private void addFileToDataset(String datasetId, List<FileUploadResult> unpacked) {
private void commitUploadedFiles(String datasetId,
DatasetFileUploadCheckInfo checkInfo,
List<FileUploadResult> uploadedFiles,
boolean cleanupStagingAfterCommit) {
Dataset dataset = datasetRepository.getById(datasetId);
dataset.setFiles(datasetFileRepository.findAllByDatasetId(datasetId));
for (FileUploadResult file : unpacked) {
File savedFile = file.getSavedFile();
LocalDateTime currentTime = LocalDateTime.now();
DatasetFile datasetFile = DatasetFile.builder()
.id(UUID.randomUUID().toString())
.datasetId(datasetId)
.fileSize(savedFile.length())
.uploadTime(currentTime)
.lastAccessTime(currentTime)
.fileName(file.getFileName())
.filePath(savedFile.getPath())
.fileType(AnalyzerUtils.getExtension(file.getFileName()))
.build();
setDatasetFileId(datasetFile, dataset);
datasetFileRepository.saveOrUpdate(datasetFile);
dataset.addFile(datasetFile);
triggerPdfTextExtraction(dataset, datasetFile);
BusinessAssert.notNull(dataset, DataManagementErrorCode.DATASET_NOT_FOUND);
Path datasetRoot = resolveDatasetRootPath(dataset, datasetId);
String prefix = checkInfo == null ? "" : normalizeLogicalPrefix(checkInfo.getPrefix());
assertNotInternalPrefix(prefix);
Path stagingRoot = resolveStagingRootPath(datasetRoot, checkInfo, uploadedFiles);
BusinessAssert.notNull(stagingRoot, CommonErrorCode.PARAM_ERROR);
dataset.setFiles(new ArrayList<>(datasetFileRepository.findAllVisibleByDatasetId(datasetId)));
for (FileUploadResult fileResult : uploadedFiles) {
commitSingleUploadedFile(dataset, datasetRoot, stagingRoot, prefix, fileResult);
}
dataset.active();
datasetRepository.updateById(dataset);
if (cleanupStagingAfterCommit) {
scheduleCleanupStagingDirAfterCommit(stagingRoot);
}
}
private void commitSingleUploadedFile(Dataset dataset,
Path datasetRoot,
Path stagingRoot,
String prefix,
FileUploadResult fileResult) {
if (dataset == null || fileResult == null || fileResult.getSavedFile() == null) {
return;
}
Path incomingPath = Paths.get(fileResult.getSavedFile().getPath()).toAbsolutePath().normalize();
BusinessAssert.isTrue(incomingPath.startsWith(stagingRoot), CommonErrorCode.PARAM_ERROR);
String relativePath = stagingRoot.relativize(incomingPath).toString().replace(File.separator, "/");
String logicalPath = joinLogicalPath(prefix, relativePath);
assertNotInternalPrefix(logicalPath);
commitNewFileVersion(dataset, datasetRoot, logicalPath, incomingPath, true);
}
private DatasetFile commitNewFileVersion(Dataset dataset,
Path datasetRoot,
String logicalPath,
Path incomingFilePath,
boolean moveIncoming) {
BusinessAssert.notNull(dataset, CommonErrorCode.PARAM_ERROR);
BusinessAssert.isTrue(datasetRoot != null && Files.exists(datasetRoot), CommonErrorCode.PARAM_ERROR);
String normalizedLogicalPath = normalizeLogicalPath(logicalPath);
BusinessAssert.isTrue(!normalizedLogicalPath.isEmpty(), CommonErrorCode.PARAM_ERROR);
assertNotInternalPrefix(normalizedLogicalPath);
Path targetFilePath = datasetRoot.resolve(normalizedLogicalPath.replace("/", File.separator))
.toAbsolutePath()
.normalize();
BusinessAssert.isTrue(targetFilePath.startsWith(datasetRoot), CommonErrorCode.PARAM_ERROR);
DuplicateMethod effectiveDuplicateMethod = resolveEffectiveDuplicateMethod();
DatasetFile latest = datasetFileRepository.findLatestByDatasetIdAndLogicalPath(dataset.getId(), normalizedLogicalPath);
if (latest == null && dataset.getFiles() != null) {
latest = dataset.getFiles().stream()
.filter(existing -> isSameNormalizedPath(existing == null ? null : existing.getFilePath(), targetFilePath.toString()))
.findFirst()
.orElse(null);
}
if (latest != null && effectiveDuplicateMethod == DuplicateMethod.ERROR) {
throw BusinessException.of(DataManagementErrorCode.DATASET_FILE_ALREADY_EXISTS);
}
long nextVersion = 1L;
if (latest != null) {
long latestVersion = Optional.ofNullable(latest.getVersion()).orElse(1L);
if (latest.getVersion() == null) {
latest.setVersion(latestVersion);
}
if (latest.getLogicalPath() == null || latest.getLogicalPath().isBlank()) {
latest.setLogicalPath(normalizedLogicalPath);
}
nextVersion = latestVersion + 1L;
}
if (latest != null && effectiveDuplicateMethod == DuplicateMethod.VERSION) {
Path archivedPath = archiveDatasetFileVersion(datasetRoot, normalizedLogicalPath, latest);
if (archivedPath != null) {
latest.setFilePath(archivedPath.toString());
} else if (Files.exists(targetFilePath)) {
log.error("Failed to archive latest file, refuse to overwrite. datasetId={}, fileId={}, logicalPath={}, targetPath={}",
dataset.getId(), latest.getId(), normalizedLogicalPath, targetFilePath);
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR);
}
latest.setStatus(FILE_STATUS_ARCHIVED);
datasetFileRepository.updateById(latest);
dataset.removeFile(latest);
} else if (latest == null && Files.exists(targetFilePath)) {
archiveOrphanTargetFile(datasetRoot, normalizedLogicalPath, targetFilePath);
}
try {
Files.createDirectories(targetFilePath.getParent());
if (moveIncoming) {
Files.move(incomingFilePath, targetFilePath, java.nio.file.StandardCopyOption.REPLACE_EXISTING);
} else {
Files.copy(incomingFilePath, targetFilePath, java.nio.file.StandardCopyOption.REPLACE_EXISTING);
}
} catch (IOException e) {
log.error("Failed to write dataset file, datasetId={}, logicalPath={}, targetPath={}",
dataset.getId(), normalizedLogicalPath, targetFilePath, e);
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR);
}
LocalDateTime currentTime = LocalDateTime.now();
String fileName = targetFilePath.getFileName().toString();
long fileSize;
try {
fileSize = Files.size(targetFilePath);
} catch (IOException e) {
fileSize = 0L;
}
DatasetFile datasetFile = DatasetFile.builder()
.id(UUID.randomUUID().toString())
.datasetId(dataset.getId())
.fileName(fileName)
.fileType(AnalyzerUtils.getExtension(fileName))
.fileSize(fileSize)
.filePath(targetFilePath.toString())
.logicalPath(normalizedLogicalPath)
.version(nextVersion)
.status(FILE_STATUS_ACTIVE)
.uploadTime(currentTime)
.lastAccessTime(currentTime)
.build();
datasetFileRepository.saveOrUpdate(datasetFile);
dataset.addFile(datasetFile);
triggerPdfTextExtraction(dataset, datasetFile);
return datasetFile;
}
private DuplicateMethod resolveEffectiveDuplicateMethod() {
if (duplicateMethod == null) {
return DuplicateMethod.VERSION;
}
if (duplicateMethod == DuplicateMethod.COVER) {
log.warn("duplicateMethod=COVER 会导致标注引用的 fileId 对应内容被覆盖,已强制按 VERSION 处理。");
return DuplicateMethod.VERSION;
}
return duplicateMethod;
}
private Path archiveDatasetFileVersion(Path datasetRoot, String logicalPath, DatasetFile latest) {
if (latest == null || latest.getId() == null || latest.getId().isBlank()) {
return null;
}
Path currentPath;
try {
currentPath = Paths.get(latest.getFilePath()).toAbsolutePath().normalize();
} catch (Exception e) {
log.warn("Invalid latest file path, skip archiving. datasetId={}, fileId={}, filePath={}",
latest.getDatasetId(), latest.getId(), latest.getFilePath());
return null;
}
if (!Files.exists(currentPath) || !Files.isRegularFile(currentPath)) {
log.warn("Latest file not found on disk, skip archiving. datasetId={}, fileId={}, filePath={}",
latest.getDatasetId(), latest.getId(), currentPath);
return null;
}
if (!currentPath.startsWith(datasetRoot)) {
log.warn("Latest file path out of dataset root, skip archiving. datasetId={}, fileId={}, filePath={}",
latest.getDatasetId(), latest.getId(), currentPath);
return null;
}
long latestVersion = Optional.ofNullable(latest.getVersion()).orElse(1L);
String logicalPathHash = sha256Hex(logicalPath);
Path archiveDir = datasetRoot
.resolve(INTERNAL_DIR_NAME)
.resolve(INTERNAL_VERSIONS_DIR_NAME)
.resolve(logicalPathHash)
.resolve("v" + latestVersion)
.toAbsolutePath()
.normalize();
BusinessAssert.isTrue(archiveDir.startsWith(datasetRoot), CommonErrorCode.PARAM_ERROR);
try {
Files.createDirectories(archiveDir);
} catch (IOException e) {
log.error("Failed to create archive dir: {}", archiveDir, e);
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR);
}
String fileName = sanitizeArchiveFileName(Optional.ofNullable(latest.getFileName()).orElse(currentPath.getFileName().toString()));
Path archivedPath = archiveDir.resolve(latest.getId() + "__" + fileName).toAbsolutePath().normalize();
BusinessAssert.isTrue(archivedPath.startsWith(archiveDir), CommonErrorCode.PARAM_ERROR);
try {
Files.move(currentPath, archivedPath, java.nio.file.StandardCopyOption.REPLACE_EXISTING);
return archivedPath;
} catch (IOException e) {
log.error("Failed to archive latest file, datasetId={}, fileId={}, from={}, to={}",
latest.getDatasetId(), latest.getId(), currentPath, archivedPath, e);
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR);
}
}
private void archiveOrphanTargetFile(Path datasetRoot, String logicalPath, Path targetFilePath) {
if (datasetRoot == null || targetFilePath == null) {
return;
}
if (!Files.exists(targetFilePath) || !Files.isRegularFile(targetFilePath)) {
return;
}
String logicalPathHash = sha256Hex(logicalPath);
Path orphanDir = datasetRoot
.resolve(INTERNAL_DIR_NAME)
.resolve(INTERNAL_VERSIONS_DIR_NAME)
.resolve(logicalPathHash)
.resolve("orphan")
.toAbsolutePath()
.normalize();
if (!orphanDir.startsWith(datasetRoot)) {
return;
}
try {
Files.createDirectories(orphanDir);
String safeName = sanitizeArchiveFileName(targetFilePath.getFileName().toString());
Path orphanPath = orphanDir.resolve("orphan_" + System.currentTimeMillis() + "__" + safeName)
.toAbsolutePath()
.normalize();
if (!orphanPath.startsWith(orphanDir)) {
return;
}
Files.move(targetFilePath, orphanPath, java.nio.file.StandardCopyOption.REPLACE_EXISTING);
} catch (Exception e) {
log.warn("Failed to archive orphan target file, logicalPath={}, targetPath={}", logicalPath, targetFilePath, e);
}
}
/**
@@ -578,11 +1044,16 @@ public class DatasetFileApplicationService {
while (parentPrefix.startsWith("/")) {
parentPrefix = parentPrefix.substring(1);
}
parentPrefix = normalizeLogicalPrefix(parentPrefix);
assertNotInternalPrefix(parentPrefix);
String directoryName = Optional.ofNullable(req.getDirectoryName()).orElse("").trim();
if (directoryName.isEmpty()) {
throw BusinessException.of(CommonErrorCode.PARAM_ERROR);
}
if (INTERNAL_DIR_NAME.equals(directoryName)) {
throw BusinessException.of(CommonErrorCode.PARAM_ERROR);
}
if (directoryName.contains("..") || directoryName.contains("/") || directoryName.contains("\\")) {
throw BusinessException.of(CommonErrorCode.PARAM_ERROR);
}
@@ -624,6 +1095,9 @@ public class DatasetFileApplicationService {
while (prefix.endsWith("/")) {
prefix = prefix.substring(0, prefix.length() - 1);
}
if (prefix.equals(INTERNAL_DIR_NAME) || prefix.startsWith(INTERNAL_DIR_NAME + "/")) {
throw BusinessException.of(CommonErrorCode.PARAM_ERROR);
}
Path basePath = Paths.get(datasetPath);
Path targetPath = prefix.isEmpty() ? basePath : basePath.resolve(prefix);
@@ -660,6 +1134,7 @@ public class DatasetFileApplicationService {
private void zipDirectory(Path sourceDir, Path basePath, ZipArchiveOutputStream zipOut) throws IOException {
try (Stream<Path> paths = Files.walk(sourceDir)) {
paths.filter(path -> !Files.isDirectory(path))
.filter(path -> !isInternalDatasetPath(basePath.toAbsolutePath().normalize(), path))
.forEach(path -> {
try {
Path relativePath = basePath.relativize(path);
@@ -698,6 +1173,9 @@ public class DatasetFileApplicationService {
if (prefix.isEmpty()) {
throw BusinessException.of(CommonErrorCode.PARAM_ERROR);
}
if (prefix.equals(INTERNAL_DIR_NAME) || prefix.startsWith(INTERNAL_DIR_NAME + "/")) {
throw BusinessException.of(CommonErrorCode.PARAM_ERROR);
}
String datasetPath = dataset.getPath();
Path basePath = Paths.get(datasetPath);
@@ -769,28 +1247,6 @@ public class DatasetFileApplicationService {
}
}
/**
* 为数据集文件设置文件id
*
* @param datasetFile 要设置id的文件
* @param dataset 数据集(包含文件列表)
*/
private void setDatasetFileId(DatasetFile datasetFile, Dataset dataset) {
Map<String, DatasetFile> existDatasetFilMap = dataset.getFiles().stream().collect(Collectors.toMap(DatasetFile::getFilePath, Function.identity()));
DatasetFile existDatasetFile = existDatasetFilMap.get(datasetFile.getFilePath());
if (Objects.isNull(existDatasetFile)) {
return;
}
if (duplicateMethod == DuplicateMethod.ERROR) {
log.error("file {} already exists in dataset {}", datasetFile.getFileName(), datasetFile.getDatasetId());
throw BusinessException.of(DataManagementErrorCode.DATASET_FILE_ALREADY_EXISTS);
}
if (duplicateMethod == DuplicateMethod.COVER) {
dataset.removeFile(existDatasetFile);
datasetFile.setId(existDatasetFile.getId());
}
}
/**
* 复制文件到数据集目录
*
@@ -802,36 +1258,21 @@ public class DatasetFileApplicationService {
public List<DatasetFile> copyFilesToDatasetDir(String datasetId, CopyFilesRequest req) {
Dataset dataset = datasetRepository.getById(datasetId);
BusinessAssert.notNull(dataset, SystemErrorCode.RESOURCE_NOT_FOUND);
Path datasetRoot = resolveDatasetRootPath(dataset, datasetId);
dataset.setFiles(new ArrayList<>(datasetFileRepository.findAllVisibleByDatasetId(datasetId)));
List<DatasetFile> copiedFiles = new ArrayList<>();
List<DatasetFile> existDatasetFiles = datasetFileRepository.findAllByDatasetId(datasetId);
dataset.setFiles(existDatasetFiles);
for (String sourceFilePath : req.sourcePaths()) {
Path sourcePath = Paths.get(sourceFilePath);
Path sourcePath = Paths.get(sourceFilePath).toAbsolutePath().normalize();
if (!Files.exists(sourcePath) || !Files.isRegularFile(sourcePath)) {
log.warn("Source file does not exist or is not a regular file: {}", sourceFilePath);
continue;
}
String fileName = sourcePath.getFileName().toString();
File sourceFile = sourcePath.toFile();
LocalDateTime currentTime = LocalDateTime.now();
DatasetFile datasetFile = DatasetFile.builder()
.id(UUID.randomUUID().toString())
.datasetId(datasetId)
.fileName(fileName)
.fileType(AnalyzerUtils.getExtension(fileName))
.fileSize(sourceFile.length())
.filePath(Paths.get(dataset.getPath(), fileName).toString())
.uploadTime(currentTime)
.lastAccessTime(currentTime)
.build();
setDatasetFileId(datasetFile, dataset);
dataset.addFile(datasetFile);
String logicalPath = sourcePath.getFileName().toString();
DatasetFile datasetFile = commitNewFileVersion(dataset, datasetRoot, logicalPath, sourcePath, false);
copiedFiles.add(datasetFile);
}
datasetFileRepository.saveOrUpdateBatch(copiedFiles, 100);
dataset.active();
datasetRepository.updateById(dataset);
CompletableFuture.runAsync(() -> copyFilesToDatasetDir(req.sourcePaths(), dataset));
return copiedFiles;
}
@@ -847,13 +1288,11 @@ public class DatasetFileApplicationService {
public List<DatasetFile> copyFilesToDatasetDirWithSourceRoot(String datasetId, Path sourceRoot, List<String> sourcePaths) {
Dataset dataset = datasetRepository.getById(datasetId);
BusinessAssert.notNull(dataset, SystemErrorCode.RESOURCE_NOT_FOUND);
Path datasetRoot = resolveDatasetRootPath(dataset, datasetId);
Path normalizedRoot = sourceRoot.toAbsolutePath().normalize();
List<DatasetFile> copiedFiles = new ArrayList<>();
List<DatasetFile> existDatasetFiles = datasetFileRepository.findAllByDatasetId(datasetId);
dataset.setFiles(existDatasetFiles);
Map<String, DatasetFile> copyTargets = new LinkedHashMap<>();
dataset.setFiles(new ArrayList<>(datasetFileRepository.findAllVisibleByDatasetId(datasetId)));
List<DatasetFile> copiedFiles = new ArrayList<>();
for (String sourceFilePath : sourcePaths) {
if (sourceFilePath == null || sourceFilePath.isBlank()) {
continue;
@@ -867,86 +1306,16 @@ public class DatasetFileApplicationService {
log.warn("Source file does not exist or is not a regular file: {}", sourceFilePath);
continue;
}
Path relativePath = normalizedRoot.relativize(sourcePath);
String fileName = sourcePath.getFileName().toString();
File sourceFile = sourcePath.toFile();
LocalDateTime currentTime = LocalDateTime.now();
Path targetPath = Paths.get(dataset.getPath(), relativePath.toString());
DatasetFile datasetFile = DatasetFile.builder()
.id(UUID.randomUUID().toString())
.datasetId(datasetId)
.fileName(fileName)
.fileType(AnalyzerUtils.getExtension(fileName))
.fileSize(sourceFile.length())
.filePath(targetPath.toString())
.uploadTime(currentTime)
.lastAccessTime(currentTime)
.build();
setDatasetFileId(datasetFile, dataset);
dataset.addFile(datasetFile);
String logicalPath = relativePath.toString().replace("\\", "/");
DatasetFile datasetFile = commitNewFileVersion(dataset, datasetRoot, logicalPath, sourcePath, false);
copiedFiles.add(datasetFile);
copyTargets.put(sourceFilePath, datasetFile);
}
if (copiedFiles.isEmpty()) {
return copiedFiles;
}
datasetFileRepository.saveOrUpdateBatch(copiedFiles, 100);
dataset.active();
datasetRepository.updateById(dataset);
CompletableFuture.runAsync(() -> copyFilesToDatasetDirWithRelativePath(copyTargets, dataset, normalizedRoot));
return copiedFiles;
}
private void copyFilesToDatasetDir(List<String> sourcePaths, Dataset dataset) {
for (String sourcePath : sourcePaths) {
Path sourceFilePath = Paths.get(sourcePath);
Path targetFilePath = Paths.get(dataset.getPath(), sourceFilePath.getFileName().toString());
try {
Files.createDirectories(Path.of(dataset.getPath()));
Files.copy(sourceFilePath, targetFilePath);
DatasetFile datasetFile = datasetFileRepository.findByDatasetIdAndFileName(
dataset.getId(),
sourceFilePath.getFileName().toString()
);
triggerPdfTextExtraction(dataset, datasetFile);
} catch (IOException e) {
log.error("Failed to copy file from {} to {}", sourcePath, targetFilePath, e);
}
}
}
private void copyFilesToDatasetDirWithRelativePath(
Map<String, DatasetFile> copyTargets,
Dataset dataset,
Path sourceRoot
) {
Path datasetRoot = Paths.get(dataset.getPath()).toAbsolutePath().normalize();
Path normalizedRoot = sourceRoot.toAbsolutePath().normalize();
for (Map.Entry<String, DatasetFile> entry : copyTargets.entrySet()) {
Path sourcePath = Paths.get(entry.getKey()).toAbsolutePath().normalize();
if (!sourcePath.startsWith(normalizedRoot)) {
log.warn("Source file path is out of root: {}", sourcePath);
continue;
}
Path relativePath = normalizedRoot.relativize(sourcePath);
Path targetFilePath = datasetRoot.resolve(relativePath).normalize();
if (!targetFilePath.startsWith(datasetRoot)) {
log.warn("Target file path is out of dataset path: {}", targetFilePath);
continue;
}
try {
Files.createDirectories(targetFilePath.getParent());
Files.copy(sourcePath, targetFilePath);
triggerPdfTextExtraction(dataset, entry.getValue());
} catch (IOException e) {
log.error("Failed to copy file from {} to {}", sourcePath, targetFilePath, e);
}
}
}
/**
* 添加文件到数据集(仅创建数据库记录,不执行文件系统操作)
*
@@ -959,8 +1328,7 @@ public class DatasetFileApplicationService {
Dataset dataset = datasetRepository.getById(datasetId);
BusinessAssert.notNull(dataset, SystemErrorCode.RESOURCE_NOT_FOUND);
List<DatasetFile> addedFiles = new ArrayList<>();
List<DatasetFile> existDatasetFiles = datasetFileRepository.findAllByDatasetId(datasetId);
dataset.setFiles(existDatasetFiles);
dataset.setFiles(new ArrayList<>(datasetFileRepository.findAllVisibleByDatasetId(datasetId)));
boolean softAdd = req.softAdd();
String metadata;
@@ -977,8 +1345,43 @@ public class DatasetFileApplicationService {
Path sourcePath = Paths.get(sourceFilePath);
String fileName = sourcePath.getFileName().toString();
File sourceFile = sourcePath.toFile();
LocalDateTime currentTime = LocalDateTime.now();
String logicalPath = normalizeLogicalPath(fileName);
assertNotInternalPrefix(logicalPath);
DatasetFile latest = datasetFileRepository.findLatestByDatasetIdAndLogicalPath(datasetId, logicalPath);
if (latest == null && dataset.getFiles() != null) {
latest = dataset.getFiles().stream()
.filter(existing -> existing != null
&& !isArchivedStatus(existing)
&& Objects.equals(existing.getFileName(), fileName))
.findFirst()
.orElse(null);
}
DuplicateMethod effectiveDuplicateMethod = resolveEffectiveDuplicateMethod();
if (latest != null && effectiveDuplicateMethod == DuplicateMethod.ERROR) {
throw BusinessException.of(DataManagementErrorCode.DATASET_FILE_ALREADY_EXISTS);
}
long nextVersion = 1L;
if (latest != null) {
long latestVersion = Optional.ofNullable(latest.getVersion()).orElse(1L);
if (latest.getVersion() == null) {
latest.setVersion(latestVersion);
}
if (latest.getLogicalPath() == null || latest.getLogicalPath().isBlank()) {
latest.setLogicalPath(logicalPath);
}
nextVersion = latestVersion + 1L;
}
if (latest != null && effectiveDuplicateMethod == DuplicateMethod.VERSION) {
latest.setStatus(FILE_STATUS_ARCHIVED);
datasetFileRepository.updateById(latest);
dataset.removeFile(latest);
}
LocalDateTime currentTime = LocalDateTime.now();
DatasetFile datasetFile = DatasetFile.builder()
.id(UUID.randomUUID().toString())
.datasetId(datasetId)
@@ -986,16 +1389,19 @@ public class DatasetFileApplicationService {
.fileType(AnalyzerUtils.getExtension(fileName))
.fileSize(sourceFile.length())
.filePath(sourceFilePath)
.logicalPath(logicalPath)
.version(nextVersion)
.status(FILE_STATUS_ACTIVE)
.uploadTime(currentTime)
.lastAccessTime(currentTime)
.metadata(metadata)
.build();
setDatasetFileId(datasetFile, dataset);
datasetFileRepository.saveOrUpdate(datasetFile);
dataset.addFile(datasetFile);
addedFiles.add(datasetFile);
triggerPdfTextExtraction(dataset, datasetFile);
}
datasetFileRepository.saveOrUpdateBatch(addedFiles, 100);
dataset.active();
datasetRepository.updateById(dataset);
// Note: addFilesToDataset only creates DB records, no file system operations

View File

@@ -7,5 +7,6 @@ package com.datamate.datamanagement.common.enums;
*/
public enum DuplicateMethod {
ERROR,
COVER
COVER,
VERSION
}

View File

@@ -152,11 +152,19 @@ public class Dataset extends BaseEntity<String> {
}
public void removeFile(DatasetFile file) {
if (this.files.remove(file)) {
this.fileCount = Math.max(0, this.fileCount - 1);
this.sizeBytes = Math.max(0, this.sizeBytes - (file.getFileSize() != null ? file.getFileSize() : 0L));
this.updatedAt = LocalDateTime.now();
if (file == null) {
return;
}
boolean removed = this.files.remove(file);
if (!removed && file.getId() != null) {
removed = this.files.removeIf(existing -> Objects.equals(existing.getId(), file.getId()));
}
if (!removed) {
return;
}
this.fileCount = Math.max(0, this.fileCount - 1);
this.sizeBytes = Math.max(0, this.sizeBytes - (file.getFileSize() != null ? file.getFileSize() : 0L));
this.updatedAt = LocalDateTime.now();
}
public void active() {

View File

@@ -28,12 +28,16 @@ public class DatasetFile {
private String datasetId; // UUID
private String fileName;
private String filePath;
/** 文件逻辑路径(相对数据集根目录,包含子目录) */
private String logicalPath;
/** 文件版本号(同一个 logicalPath 下递增) */
private Long version;
private String fileType; // JPG/PNG/DCM/TXT
private Long fileSize; // bytes
private String checkSum;
private String tags;
private String metadata;
private String status; // UPLOADED, PROCESSING, COMPLETED, ERROR
private String status; // ACTIVE/ARCHIVED/DELETED/PROCESSING...
private LocalDateTime uploadTime;
private LocalDateTime lastAccessTime;
private LocalDateTime createdAt;

View File

@@ -21,4 +21,7 @@ public class DatasetFileUploadCheckInfo {
/** 目标子目录前缀,例如 "images/",为空表示数据集根目录 */
private String prefix;
/** 上传临时落盘目录(仅服务端使用,不对外暴露) */
private String stagingPath;
}

View File

@@ -24,8 +24,19 @@ public interface DatasetFileRepository extends IRepository<DatasetFile> {
List<DatasetFile> findAllByDatasetId(String datasetId);
/**
* 查询数据集内“可见文件”(默认不包含历史归档版本)。
* 约定:status 为 NULL 视为可见;status = ARCHIVED 视为历史版本。
*/
List<DatasetFile> findAllVisibleByDatasetId(String datasetId);
DatasetFile findByDatasetIdAndFileName(String datasetId, String fileName);
/**
* 查询指定逻辑路径的最新版本(ACTIVE/NULL)。
*/
DatasetFile findLatestByDatasetIdAndLogicalPath(String datasetId, String logicalPath);
IPage<DatasetFile> findByCriteria(String datasetId, String fileType, String status, String name,
Boolean hasAnnotation, IPage<DatasetFile> page);

View File

@@ -25,6 +25,8 @@ public class DatasetFileRepositoryImpl extends CrudRepository<DatasetFileMapper,
private final DatasetFileMapper datasetFileMapper;
private static final String ANNOTATION_EXISTS_SQL =
"SELECT 1 FROM t_dm_annotation_results ar WHERE ar.file_id = t_dm_dataset_files.id";
private static final String FILE_STATUS_ARCHIVED = "ARCHIVED";
private static final String FILE_STATUS_ACTIVE = "ACTIVE";
@Override
public Long countByDatasetId(String datasetId) {
@@ -51,19 +53,54 @@ public class DatasetFileRepositoryImpl extends CrudRepository<DatasetFileMapper,
return datasetFileMapper.findAllByDatasetId(datasetId);
}
@Override
public List<DatasetFile> findAllVisibleByDatasetId(String datasetId) {
return datasetFileMapper.selectList(new LambdaQueryWrapper<DatasetFile>()
.eq(DatasetFile::getDatasetId, datasetId)
.and(wrapper -> wrapper.isNull(DatasetFile::getStatus)
.or()
.ne(DatasetFile::getStatus, FILE_STATUS_ARCHIVED))
.orderByDesc(DatasetFile::getUploadTime));
}
@Override
public DatasetFile findByDatasetIdAndFileName(String datasetId, String fileName) {
return datasetFileMapper.findByDatasetIdAndFileName(datasetId, fileName);
}
@Override
public DatasetFile findLatestByDatasetIdAndLogicalPath(String datasetId, String logicalPath) {
if (!StringUtils.hasText(datasetId) || !StringUtils.hasText(logicalPath)) {
return null;
}
return datasetFileMapper.selectOne(new LambdaQueryWrapper<DatasetFile>()
.eq(DatasetFile::getDatasetId, datasetId)
.eq(DatasetFile::getLogicalPath, logicalPath)
.and(wrapper -> wrapper.isNull(DatasetFile::getStatus)
.or()
.eq(DatasetFile::getStatus, FILE_STATUS_ACTIVE))
.orderByDesc(DatasetFile::getVersion)
.orderByDesc(DatasetFile::getUploadTime)
.last("LIMIT 1"));
}
public IPage<DatasetFile> findByCriteria(String datasetId, String fileType, String status, String name,
Boolean hasAnnotation, IPage<DatasetFile> page) {
return datasetFileMapper.selectPage(page, new LambdaQueryWrapper<DatasetFile>()
.eq(DatasetFile::getDatasetId, datasetId)
.eq(StringUtils.hasText(fileType), DatasetFile::getFileType, fileType)
.eq(StringUtils.hasText(status), DatasetFile::getStatus, status)
.like(StringUtils.hasText(name), DatasetFile::getFileName, name)
.exists(Boolean.TRUE.equals(hasAnnotation), ANNOTATION_EXISTS_SQL));
LambdaQueryWrapper<DatasetFile> wrapper = new LambdaQueryWrapper<DatasetFile>()
.eq(DatasetFile::getDatasetId, datasetId)
.eq(StringUtils.hasText(fileType), DatasetFile::getFileType, fileType)
.like(StringUtils.hasText(name), DatasetFile::getFileName, name)
.exists(Boolean.TRUE.equals(hasAnnotation), ANNOTATION_EXISTS_SQL);
if (StringUtils.hasText(status)) {
wrapper.eq(DatasetFile::getStatus, status);
} else {
wrapper.and(visibility -> visibility.isNull(DatasetFile::getStatus)
.or()
.ne(DatasetFile::getStatus, FILE_STATUS_ARCHIVED));
}
return datasetFileMapper.selectPage(page, wrapper);
}
@Override

View File

@@ -3,7 +3,7 @@
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.datamate.datamanagement.infrastructure.persistence.mapper.DatasetFileMapper">
<sql id="Base_Column_List">
id, dataset_id, file_name, file_path, file_type, file_size, check_sum, tags, metadata, status,
id, dataset_id, file_name, file_path, logical_path, version, file_type, file_size, check_sum, tags, metadata, status,
upload_time, last_access_time, created_at, updated_at
</sql>
@@ -39,13 +39,17 @@
</select>
<select id="countByDatasetId" parameterType="string" resultType="long">
SELECT COUNT(*) FROM t_dm_dataset_files WHERE dataset_id = #{datasetId}
SELECT COUNT(*)
FROM t_dm_dataset_files
WHERE dataset_id = #{datasetId}
AND (status IS NULL OR status <> 'ARCHIVED')
</select>
<select id="countNonDerivedByDatasetId" parameterType="string" resultType="long">
SELECT COUNT(*)
FROM t_dm_dataset_files
WHERE dataset_id = #{datasetId}
AND (status IS NULL OR status <> 'ARCHIVED')
AND (metadata IS NULL OR JSON_EXTRACT(metadata, '$.derived_from_file_id') IS NULL)
</select>
@@ -54,13 +58,19 @@
</select>
<select id="sumSizeByDatasetId" parameterType="string" resultType="long">
SELECT COALESCE(SUM(file_size), 0) FROM t_dm_dataset_files WHERE dataset_id = #{datasetId}
SELECT COALESCE(SUM(file_size), 0)
FROM t_dm_dataset_files
WHERE dataset_id = #{datasetId}
AND (status IS NULL OR status <> 'ARCHIVED')
</select>
<select id="findByDatasetIdAndFileName" resultType="com.datamate.datamanagement.domain.model.dataset.DatasetFile">
SELECT <include refid="Base_Column_List"/>
FROM t_dm_dataset_files
WHERE dataset_id = #{datasetId} AND file_name = #{fileName}
WHERE dataset_id = #{datasetId}
AND file_name = #{fileName}
AND (status IS NULL OR status <> 'ARCHIVED')
ORDER BY version DESC, upload_time DESC
LIMIT 1
</select>
@@ -91,6 +101,8 @@
UPDATE t_dm_dataset_files
SET file_name = #{fileName},
file_path = #{filePath},
logical_path = #{logicalPath},
version = #{version},
file_type = #{fileType},
file_size = #{fileSize},
upload_time = #{uploadTime},
@@ -126,6 +138,7 @@
<foreach collection="datasetIds" item="datasetId" open="(" separator="," close=")">
#{datasetId}
</foreach>
AND (status IS NULL OR status <> 'ARCHIVED')
AND (metadata IS NULL OR JSON_EXTRACT(metadata, '$.derived_from_file_id') IS NULL)
GROUP BY dataset_id
</select>

View File

@@ -0,0 +1,147 @@
package com.datamate.datamanagement.application;
import com.datamate.common.domain.service.FileService;
import com.datamate.datamanagement.domain.model.dataset.Dataset;
import com.datamate.datamanagement.domain.model.dataset.DatasetFile;
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetFileRepository;
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetRepository;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class DatasetFileApplicationServiceVersioningTest {
@TempDir
Path tempDir;
@Mock
DatasetFileRepository datasetFileRepository;
@Mock
DatasetRepository datasetRepository;
@Mock
FileService fileService;
@Mock
PdfTextExtractAsyncService pdfTextExtractAsyncService;
@Mock
DatasetFilePreviewService datasetFilePreviewService;
@Test
void copyFilesToDatasetDirWithSourceRoot_shouldArchiveOldFileAndCreateNewVersionWhenDuplicateLogicalPath()
throws Exception {
String datasetId = "dataset-1";
Path datasetRoot = tempDir.resolve("dataset-root");
Files.createDirectories(datasetRoot);
Path sourceRoot = tempDir.resolve("source-root");
Files.createDirectories(sourceRoot);
Path existingPath = datasetRoot.resolve("a.txt");
Files.writeString(existingPath, "old-content", StandardCharsets.UTF_8);
Path incomingPath = sourceRoot.resolve("a.txt");
Files.writeString(incomingPath, "new-content", StandardCharsets.UTF_8);
Dataset dataset = new Dataset();
dataset.setId(datasetId);
dataset.setPath(datasetRoot.toString());
DatasetFile oldRecord = DatasetFile.builder()
.id("old-file-id")
.datasetId(datasetId)
.fileName("a.txt")
.filePath(existingPath.toString())
.logicalPath(null)
.version(null)
.status(null)
.fileSize(Files.size(existingPath))
.build();
when(datasetRepository.getById(datasetId)).thenReturn(dataset);
when(datasetFileRepository.findAllVisibleByDatasetId(datasetId)).thenReturn(List.of(oldRecord));
when(datasetFileRepository.findLatestByDatasetIdAndLogicalPath(anyString(), anyString())).thenReturn(null);
DatasetFileApplicationService service = new DatasetFileApplicationService(
datasetFileRepository,
datasetRepository,
fileService,
pdfTextExtractAsyncService,
datasetFilePreviewService
);
List<DatasetFile> copied = service.copyFilesToDatasetDirWithSourceRoot(
datasetId,
sourceRoot,
List.of(incomingPath.toString())
);
assertThat(copied).hasSize(1);
assertThat(Files.readString(existingPath, StandardCharsets.UTF_8)).isEqualTo("new-content");
String logicalPathHash = sha256Hex("a.txt");
Path archivedPath = datasetRoot
.resolve(".datamate")
.resolve("versions")
.resolve(logicalPathHash)
.resolve("v1")
.resolve("old-file-id__a.txt")
.toAbsolutePath()
.normalize();
assertThat(Files.exists(archivedPath)).isTrue();
assertThat(Files.readString(archivedPath, StandardCharsets.UTF_8)).isEqualTo("old-content");
ArgumentCaptor<DatasetFile> archivedCaptor = ArgumentCaptor.forClass(DatasetFile.class);
verify(datasetFileRepository).updateById(archivedCaptor.capture());
DatasetFile archivedRecord = archivedCaptor.getValue();
assertThat(archivedRecord.getId()).isEqualTo("old-file-id");
assertThat(archivedRecord.getStatus()).isEqualTo("ARCHIVED");
assertThat(archivedRecord.getLogicalPath()).isEqualTo("a.txt");
assertThat(archivedRecord.getVersion()).isEqualTo(1L);
assertThat(Paths.get(archivedRecord.getFilePath()).toAbsolutePath().normalize()).isEqualTo(archivedPath);
ArgumentCaptor<DatasetFile> createdCaptor = ArgumentCaptor.forClass(DatasetFile.class);
verify(datasetFileRepository).saveOrUpdate(createdCaptor.capture());
DatasetFile newRecord = createdCaptor.getValue();
assertThat(newRecord.getId()).isNotEqualTo("old-file-id");
assertThat(newRecord.getStatus()).isEqualTo("ACTIVE");
assertThat(newRecord.getLogicalPath()).isEqualTo("a.txt");
assertThat(newRecord.getVersion()).isEqualTo(2L);
assertThat(Paths.get(newRecord.getFilePath()).toAbsolutePath().normalize()).isEqualTo(existingPath.toAbsolutePath().normalize());
}
private static String sha256Hex(String value) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] hashed = digest.digest((value == null ? "" : value).getBytes(StandardCharsets.UTF_8));
StringBuilder builder = new StringBuilder(hashed.length * 2);
for (byte b : hashed) {
builder.append(String.format("%02x", b));
}
return builder.toString();
} catch (Exception e) {
return Integer.toHexString((value == null ? "" : value).hashCode());
}
}
}

View File

@@ -143,7 +143,20 @@ public class ArchiveAnalyzer {
private static Optional<FileUploadResult> extractEntity(ArchiveInputStream<?> archiveInputStream, ArchiveEntry archiveEntry, Path archivePath)
throws IOException {
byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
Path path = Paths.get(archivePath.getParent().toString(), archiveEntry.getName());
Path archiveRoot = archivePath.getParent().toAbsolutePath().normalize();
String entryName = archiveEntry.getName();
if (entryName == null || entryName.isBlank()) {
return Optional.empty();
}
entryName = entryName.replace("\\", "/");
while (entryName.startsWith("/")) {
entryName = entryName.substring(1);
}
Path path = archiveRoot.resolve(entryName).normalize();
if (!path.startsWith(archiveRoot)) {
log.warn("Skip unsafe archive entry path traversal: {}", archiveEntry.getName());
return Optional.empty();
}
File file = path.toFile();
long fileSize = 0L;
FileUtils.createParentDirectories(file);

View File

@@ -13,7 +13,10 @@ public class CommonUtils {
* @return 文件名(带后缀)
*/
public static String trimFilePath(String filePath) {
int lastSlashIndex = filePath.lastIndexOf(File.separator);
if (filePath == null || filePath.isBlank()) {
return "";
}
int lastSlashIndex = Math.max(filePath.lastIndexOf('/'), filePath.lastIndexOf('\\'));
String filename = filePath;
if (lastSlashIndex != -1) {

View File

@@ -61,13 +61,15 @@ class DatasetFiles(Base):
dataset_id = Column(String(36), nullable=False, comment="所属数据集ID(UUID)")
file_name = Column(String(255), nullable=False, comment="文件名")
file_path = Column(String(1000), nullable=False, comment="文件路径")
logical_path = Column(String(1000), nullable=False, comment="文件逻辑路径(相对数据集根目录)")
version = Column(BigInteger, nullable=False, default=1, comment="文件版本号(同 logical_path 递增)")
file_type = Column(String(50), nullable=True, comment="文件格式:JPG/PNG/DCM/TXT等")
file_size = Column(BigInteger, default=0, comment="文件大小(字节)")
check_sum = Column(String(64), nullable=True, comment="文件校验和")
tags = Column(JSON, nullable=True, comment="文件标签信息")
tags_updated_at = Column(TIMESTAMP, nullable=True, comment="标签最后更新时间")
dataset_filemetadata = Column("metadata", JSON, nullable=True, comment="文件元数据")
status = Column(String(50), default='ACTIVE', comment="文件状态:ACTIVE/DELETED/PROCESSING")
status = Column(String(50), default='ACTIVE', comment="文件状态:ACTIVE/ARCHIVED/DELETED/PROCESSING")
upload_time = Column(TIMESTAMP, server_default=func.current_timestamp(), comment="上传时间")
last_access_time = Column(TIMESTAMP, nullable=True, comment="最后访问时间")
created_at = Column(TIMESTAMP, server_default=func.current_timestamp(), comment="创建时间")

View File

@@ -375,9 +375,9 @@ def _register_output_dataset(
insert_file_sql = text(
"""
INSERT INTO t_dm_dataset_files (
id, dataset_id, file_name, file_path, file_type, file_size, status
id, dataset_id, file_name, file_path, logical_path, version, file_type, file_size, status
) VALUES (
:id, :dataset_id, :file_name, :file_path, :file_type, :file_size, :status
:id, :dataset_id, :file_name, :file_path, :logical_path, :version, :file_type, :file_size, :status
)
"""
)
@@ -395,6 +395,7 @@ def _register_output_dataset(
for file_name, file_path, file_size in image_files:
ext = os.path.splitext(file_name)[1].lstrip(".").upper() or None
logical_path = os.path.relpath(file_path, output_dir).replace("\\", "/")
conn.execute(
insert_file_sql,
{
@@ -402,6 +403,8 @@ def _register_output_dataset(
"dataset_id": output_dataset_id,
"file_name": file_name,
"file_path": file_path,
"logical_path": logical_path,
"version": 1,
"file_type": ext,
"file_size": int(file_size),
"status": "ACTIVE",
@@ -411,6 +414,7 @@ def _register_output_dataset(
for file_name, file_path, file_size in annotation_files:
ext = os.path.splitext(file_name)[1].lstrip(".").upper() or None
logical_path = os.path.relpath(file_path, output_dir).replace("\\", "/")
conn.execute(
insert_file_sql,
{
@@ -418,6 +422,8 @@ def _register_output_dataset(
"dataset_id": output_dataset_id,
"file_name": file_name,
"file_path": file_path,
"logical_path": logical_path,
"version": 1,
"file_type": ext,
"file_size": int(file_size),
"status": "ACTIVE",

View File

@@ -1,9 +1,9 @@
{
"query_sql": "SELECT * FROM t_task_instance_info WHERE instance_id IN (:instance_id)",
"insert_sql": "INSERT INTO t_task_instance_info (instance_id, meta_file_name, meta_file_type, meta_file_id, meta_file_size, file_id, file_size, file_type, file_name, file_path, status, operator_id, error_code, incremental, child_id, slice_num) VALUES (:instance_id, :meta_file_name, :meta_file_type, :meta_file_id, :meta_file_size, :file_id, :file_size, :file_type, :file_name, :file_path, :status, :operator_id, :error_code, :incremental, :child_id, :slice_num)",
"insert_dataset_file_sql": "INSERT INTO t_dm_dataset_files (id, dataset_id, file_name, file_path, file_type, file_size, status, upload_time, last_access_time, created_at, updated_at) VALUES (:id, :dataset_id, :file_name, :file_path, :file_type, :file_size, :status, :upload_time, :last_access_time, :created_at, :updated_at)",
"insert_dataset_file_sql": "INSERT INTO t_dm_dataset_files (id, dataset_id, file_name, file_path, logical_path, version, file_type, file_size, status, upload_time, last_access_time, created_at, updated_at) VALUES (:id, :dataset_id, :file_name, :file_path, :logical_path, :version, :file_type, :file_size, :status, :upload_time, :last_access_time, :created_at, :updated_at)",
"insert_clean_result_sql": "INSERT INTO t_clean_result (instance_id, src_file_id, dest_file_id, src_name, dest_name, src_type, dest_type, src_size, dest_size, status, result) VALUES (:instance_id, :src_file_id, :dest_file_id, :src_name, :dest_name, :src_type, :dest_type, :src_size, :dest_size, :status, :result)",
"query_dataset_sql": "SELECT file_size FROM t_dm_dataset_files WHERE dataset_id = :dataset_id",
"query_dataset_sql": "SELECT file_size FROM t_dm_dataset_files WHERE dataset_id = :dataset_id AND (status IS NULL OR status <> 'ARCHIVED')",
"update_dataset_sql": "UPDATE t_dm_datasets SET size_bytes = :total_size, file_count = :file_count WHERE id = :dataset_id;",
"update_task_sql": "UPDATE t_clean_task SET status = :status, after_size = :total_size, finished_at = :finished_time WHERE id = :task_id",
"create_tables_sql": "CREATE TABLE IF NOT EXISTS t_task_instance_info (instance_id VARCHAR(255), meta_file_name TEXT, meta_file_type VARCHAR(100), meta_file_id BIGINT, meta_file_size VARCHAR(100), file_id BIGINT, file_size VARCHAR(100), file_type VARCHAR(100), file_name TEXT, file_path TEXT, status INT, operator_id VARCHAR(255), error_code VARCHAR(100), incremental VARCHAR(50), child_id BIGINT, slice_num INT DEFAULT 0);",

View File

@@ -54,19 +54,22 @@ CREATE TABLE IF NOT EXISTS t_dm_dataset_files (
dataset_id VARCHAR(36) NOT NULL COMMENT '所属数据集ID(UUID)',
file_name VARCHAR(255) NOT NULL COMMENT '文件名',
file_path VARCHAR(1000) NOT NULL COMMENT '文件路径',
logical_path VARCHAR(1000) NOT NULL COMMENT '文件逻辑路径(相对数据集根目录)',
version BIGINT NOT NULL DEFAULT 1 COMMENT '文件版本号(同 logical_path 递增)',
file_type VARCHAR(50) COMMENT '文件格式:JPG/PNG/DCM/TXT等',
file_size BIGINT DEFAULT 0 COMMENT '文件大小(字节)',
check_sum VARCHAR(64) COMMENT '文件校验和',
tags JSON COMMENT '文件标签信息',
tags_updated_at TIMESTAMP NULL COMMENT '标签最后更新时间',
metadata JSON COMMENT '文件元数据',
status VARCHAR(50) DEFAULT 'ACTIVE' COMMENT '文件状态:ACTIVE/DELETED/PROCESSING',
status VARCHAR(50) DEFAULT 'ACTIVE' COMMENT '文件状态:ACTIVE/ARCHIVED/DELETED/PROCESSING',
upload_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '上传时间',
last_access_time TIMESTAMP NULL COMMENT '最后访问时间',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
FOREIGN KEY (dataset_id) REFERENCES t_dm_datasets(id) ON DELETE CASCADE,
INDEX idx_dm_dataset (dataset_id),
INDEX idx_dm_dataset_logical_path (dataset_id, logical_path, version),
INDEX idx_dm_file_type (file_type),
INDEX idx_dm_file_status (status),
INDEX idx_dm_upload_time (upload_time)