From d0972cbc9db2e8f90f5fc14d780f68ca2bf5a7d5 Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Wed, 4 Feb 2026 23:53:35 +0800 Subject: [PATCH] =?UTF-8?q?feat(data-management):=20=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E9=9B=86=E6=96=87=E4=BB=B6=E7=89=88=E6=9C=AC?= =?UTF-8?q?=E7=AE=A1=E7=90=86=E5=92=8C=E5=86=85=E9=83=A8=E8=B7=AF=E5=BE=84?= =?UTF-8?q?=E4=BF=9D=E6=8A=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 将数据集文件查询方法替换为只查询可见文件的版本 - 引入文件状态管理(ACTIVE/ARCHIVED)和内部目录结构 - 实现文件重复处理策略,支持版本控制模式而非覆盖 - 添加内部数据目录保护,防止访问.datamate等系统目录 - 重构文件上传流程,引入暂存目录和事务后清理机制 - 实现文件版本归档功能,保留历史版本到专用存储位置 - 优化文件路径规范化和安全验证逻辑 - 修复文件删除逻辑,确保归档文件不会被错误移除 - 更新数据集压缩下载功能以排除内部系统文件 --- .../DatasetApplicationService.java | 4 +- .../DatasetFileApplicationService.java | 1192 +++++++++++------ .../common/enums/DuplicateMethod.java | 3 +- .../domain/model/dataset/Dataset.java | 16 +- .../domain/model/dataset/DatasetFile.java | 36 +- .../dataset/DatasetFileUploadCheckInfo.java | 23 +- .../repository/DatasetFileRepository.java | 11 + .../impl/DatasetFileRepositoryImpl.java | 49 +- .../resources/mappers/DatasetFileMapper.xml | 21 +- ...tFileApplicationServiceVersioningTest.java | 147 ++ .../common/domain/utils/ArchiveAnalyzer.java | 15 +- .../common/domain/utils/CommonUtils.java | 5 +- .../app/db/models/dataset_management.py | 6 +- .../datamate/auto_annotation_worker.py | 86 +- .../datamate/sql_manager/sql/sql_config.json | 6 +- scripts/db/data-management-init.sql | 5 +- 16 files changed, 1141 insertions(+), 484 deletions(-) create mode 100644 backend/services/data-management-service/src/test/java/com/datamate/datamanagement/application/DatasetFileApplicationServiceVersioningTest.java diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java index 756ae58..b1174c3 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java @@ -164,7 +164,7 @@ public class DatasetApplicationService { public Dataset getDataset(String datasetId) { Dataset dataset = datasetRepository.getById(datasetId); BusinessAssert.notNull(dataset, DataManagementErrorCode.DATASET_NOT_FOUND); - List datasetFiles = datasetFileRepository.findAllByDatasetId(datasetId); + List datasetFiles = datasetFileRepository.findAllVisibleByDatasetId(datasetId); dataset.setFiles(datasetFiles); applyVisibleFileCounts(Collections.singletonList(dataset)); return dataset; @@ -439,7 +439,7 @@ public class DatasetApplicationService { Map statistics = new HashMap<>(); - List allFiles = datasetFileRepository.findAllByDatasetId(datasetId); + List allFiles = datasetFileRepository.findAllVisibleByDatasetId(datasetId); List visibleFiles = filterVisibleFiles(allFiles); long totalFiles = visibleFiles.size(); long completedFiles = visibleFiles.stream() diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java index 7920838..869a174 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java @@ -58,10 +58,9 @@ import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.Stream; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * 数据集文件应用服务 @@ -75,14 +74,19 @@ public class DatasetFileApplicationService { private static final String DOCX_FILE_TYPE = "docx"; private static final String XLS_FILE_TYPE = "xls"; private static final String XLSX_FILE_TYPE = "xlsx"; - private static final Set DOCUMENT_TEXT_FILE_TYPES = Set.of( - PDF_FILE_TYPE, - DOC_FILE_TYPE, - DOCX_FILE_TYPE, - XLS_FILE_TYPE, - XLSX_FILE_TYPE - ); - private static final String DERIVED_METADATA_KEY = "derived_from_file_id"; + private static final Set DOCUMENT_TEXT_FILE_TYPES = Set.of( + PDF_FILE_TYPE, + DOC_FILE_TYPE, + DOCX_FILE_TYPE, + XLS_FILE_TYPE, + XLSX_FILE_TYPE + ); + private static final String DERIVED_METADATA_KEY = "derived_from_file_id"; + private static final String FILE_STATUS_ACTIVE = "ACTIVE"; + private static final String FILE_STATUS_ARCHIVED = "ARCHIVED"; + private static final String INTERNAL_DIR_NAME = ".datamate"; + private static final String INTERNAL_UPLOAD_DIR_NAME = "uploading"; + private static final String INTERNAL_VERSIONS_DIR_NAME = "versions"; private final DatasetFileRepository datasetFileRepository; private final DatasetRepository datasetRepository; @@ -93,8 +97,8 @@ public class DatasetFileApplicationService { @Value("${datamate.data-management.base-path:/dataset}") private String datasetBasePath; - @Value("${datamate.data-management.file.duplicate:COVER}") - private DuplicateMethod duplicateMethod; + @Value("${datamate.data-management.file.duplicate:VERSION}") + private DuplicateMethod duplicateMethod; @Autowired public DatasetFileApplicationService(DatasetFileRepository datasetFileRepository, @@ -155,16 +159,26 @@ public class DatasetFileApplicationService { * 获取数据集文件列表 */ @Transactional(readOnly = true) - public PagedResponse getDatasetFilesWithDirectory(String datasetId, String prefix, boolean excludeDerivedFiles, PagingQuery pagingQuery) { - Dataset dataset = datasetRepository.getById(datasetId); - int page = Math.max(pagingQuery.getPage(), 1); - int size = pagingQuery.getSize() == null || pagingQuery.getSize() < 0 ? 20 : pagingQuery.getSize(); - if (dataset == null) { - return PagedResponse.of(new Page<>(page, size)); - } - String datasetPath = dataset.getPath(); - Path queryPath = Path.of(dataset.getPath() + File.separator + prefix); - Map datasetFilesMap = datasetFileRepository.findAllByDatasetId(datasetId) + public PagedResponse getDatasetFilesWithDirectory(String datasetId, String prefix, boolean excludeDerivedFiles, PagingQuery pagingQuery) { + Dataset dataset = datasetRepository.getById(datasetId); + int page = Math.max(pagingQuery.getPage(), 1); + int size = pagingQuery.getSize() == null || pagingQuery.getSize() < 0 ? 20 : pagingQuery.getSize(); + if (dataset == null) { + return PagedResponse.of(new Page<>(page, size)); + } + Path datasetRoot = Paths.get(dataset.getPath()).toAbsolutePath().normalize(); + prefix = Optional.ofNullable(prefix).orElse("").trim().replace("\\", "/"); + while (prefix.startsWith("/")) { + prefix = prefix.substring(1); + } + if (prefix.equals(INTERNAL_DIR_NAME) || prefix.startsWith(INTERNAL_DIR_NAME + "/")) { + return new PagedResponse<>(page, size, 0, 0, Collections.emptyList()); + } + Path queryPath = datasetRoot.resolve(prefix.replace("/", File.separator)).normalize(); + if (!queryPath.startsWith(datasetRoot)) { + return new PagedResponse<>(page, size, 0, 0, Collections.emptyList()); + } + Map datasetFilesMap = datasetFileRepository.findAllVisibleByDatasetId(datasetId) .stream() .filter(file -> file.getFilePath() != null) .collect(Collectors.toMap( @@ -180,13 +194,14 @@ public class DatasetFileApplicationService { .filter(Objects::nonNull) .collect(Collectors.toSet()) : Collections.emptySet(); - // 如果目录不存在,直接返回空结果(数据集刚创建时目录可能还未生成) - if (!Files.exists(queryPath)) { - return new PagedResponse<>(page, size, 0, 0, Collections.emptyList()); - } - try (Stream pathStream = Files.list(queryPath)) { + // 如果目录不存在,直接返回空结果(数据集刚创建时目录可能还未生成) + if (!Files.exists(queryPath)) { + return new PagedResponse<>(page, size, 0, 0, Collections.emptyList()); + } + try (Stream pathStream = Files.list(queryPath)) { List allFiles = pathStream - .filter(path -> path.toString().startsWith(datasetPath)) + .filter(path -> path.toAbsolutePath().normalize().startsWith(datasetRoot)) + .filter(path -> !isInternalDatasetPath(datasetRoot, path)) .filter(path -> !excludeDerivedFiles || Files.isDirectory(path) || !derivedFilePaths.contains(normalizeFilePath(path.toString()))) @@ -298,10 +313,90 @@ public class DatasetFileApplicationService { } } + private boolean isSameNormalizedPath(String left, String right) { + String normalizedLeft = normalizeFilePath(left); + String normalizedRight = normalizeFilePath(right); + if (normalizedLeft == null || normalizedRight == null) { + return false; + } + return normalizedLeft.equals(normalizedRight); + } + + private boolean isInternalDatasetPath(Path datasetRoot, Path path) { + if (datasetRoot == null || path == null) { + return false; + } + try { + Path normalizedRoot = datasetRoot.toAbsolutePath().normalize(); + Path normalizedPath = path.toAbsolutePath().normalize(); + if (!normalizedPath.startsWith(normalizedRoot)) { + return false; + } + Path relative = normalizedRoot.relativize(normalizedPath); + if (relative.getNameCount() == 0) { + return false; + } + return INTERNAL_DIR_NAME.equals(relative.getName(0).toString()); + } catch (Exception e) { + return false; + } + } + + private String normalizeLogicalPrefix(String prefix) { + if (prefix == null) { + return ""; + } + String normalized = prefix.trim().replace("\\", "/"); + while (normalized.startsWith("/")) { + normalized = normalized.substring(1); + } + while (normalized.endsWith("/")) { + normalized = normalized.substring(0, normalized.length() - 1); + } + while (normalized.contains("//")) { + normalized = normalized.replace("//", "/"); + } + return normalized; + } + + private String normalizeLogicalPath(String logicalPath) { + return normalizeLogicalPrefix(logicalPath); + } + + private String joinLogicalPath(String prefix, String relativePath) { + String normalizedPrefix = normalizeLogicalPrefix(prefix); + String normalizedRelative = normalizeLogicalPath(relativePath); + if (normalizedPrefix.isEmpty()) { + return normalizedRelative; + } + if (normalizedRelative.isEmpty()) { + return normalizedPrefix; + } + return normalizeLogicalPath(normalizedPrefix + "/" + normalizedRelative); + } + + private void assertNotInternalPrefix(String prefix) { + if (prefix == null || prefix.isBlank()) { + return; + } + String normalized = normalizeLogicalPrefix(prefix); + if (normalized.equals(INTERNAL_DIR_NAME) || normalized.startsWith(INTERNAL_DIR_NAME + "/")) { + throw BusinessException.of(CommonErrorCode.PARAM_ERROR); + } + } + + private boolean isArchivedStatus(DatasetFile datasetFile) { + if (datasetFile == null) { + return false; + } + String status = datasetFile.getStatus(); + return status != null && FILE_STATUS_ARCHIVED.equalsIgnoreCase(status); + } + private boolean isSourceDocument(DatasetFile datasetFile) { if (datasetFile == null) { return false; - } + } String fileType = datasetFile.getFileType(); if (fileType == null || fileType.isBlank()) { return false; @@ -309,10 +404,10 @@ public class DatasetFileApplicationService { return DOCUMENT_TEXT_FILE_TYPES.contains(fileType.toLowerCase(Locale.ROOT)); } - private boolean isDerivedFile(DatasetFile datasetFile) { - if (datasetFile == null) { - return false; - } + private boolean isDerivedFile(DatasetFile datasetFile) { + if (datasetFile == null) { + return false; + } String metadata = datasetFile.getMetadata(); if (metadata == null || metadata.isBlank()) { return false; @@ -321,16 +416,154 @@ public class DatasetFileApplicationService { ObjectMapper mapper = new ObjectMapper(); Map metadataMap = mapper.readValue(metadata, new TypeReference>() {}); return metadataMap.get(DERIVED_METADATA_KEY) != null; - } catch (Exception e) { - log.debug("Failed to parse dataset file metadata for derived detection: {}", datasetFile.getId(), e); - return false; - } - } - - /** - * 获取文件详情 - */ - @Transactional(readOnly = true) + } catch (Exception e) { + log.debug("Failed to parse dataset file metadata for derived detection: {}", datasetFile.getId(), e); + return false; + } + } + + private Path resolveDatasetRootPath(Dataset dataset, String datasetId) { + String datasetPath = dataset == null ? null : dataset.getPath(); + if (datasetPath == null || datasetPath.isBlank()) { + datasetPath = datasetBasePath + File.separator + datasetId; + if (dataset != null) { + dataset.setPath(datasetPath); + datasetRepository.updateById(dataset); + } + } + Path datasetRoot = Paths.get(datasetPath).toAbsolutePath().normalize(); + try { + Files.createDirectories(datasetRoot); + } catch (IOException e) { + log.error("Failed to create dataset root dir: {}", datasetRoot, e); + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); + } + return datasetRoot; + } + + private Path resolveStagingRootPath(Path datasetRoot, + DatasetFileUploadCheckInfo checkInfo, + List uploadedFiles) { + if (datasetRoot == null) { + return null; + } + String stagingPath = checkInfo == null ? null : checkInfo.getStagingPath(); + if (stagingPath != null && !stagingPath.isBlank()) { + try { + Path stagingRoot = Paths.get(stagingPath).toAbsolutePath().normalize(); + if (!stagingRoot.startsWith(datasetRoot)) { + log.warn("Staging root out of dataset root, datasetId={}, stagingRoot={}, datasetRoot={}", + checkInfo == null ? null : checkInfo.getDatasetId(), stagingRoot, datasetRoot); + return null; + } + Path relative = datasetRoot.relativize(stagingRoot); + if (relative.getNameCount() < 3) { + return null; + } + if (!INTERNAL_DIR_NAME.equals(relative.getName(0).toString()) + || !INTERNAL_UPLOAD_DIR_NAME.equals(relative.getName(1).toString())) { + return null; + } + return stagingRoot; + } catch (Exception e) { + log.warn("Invalid staging path: {}", stagingPath, e); + return null; + } + } + if (uploadedFiles == null || uploadedFiles.isEmpty()) { + return null; + } + FileUploadResult firstResult = uploadedFiles.get(0); + File firstFile = firstResult == null ? null : firstResult.getSavedFile(); + if (firstFile == null) { + return null; + } + try { + return Paths.get(firstFile.getParent()).toAbsolutePath().normalize(); + } catch (Exception e) { + return null; + } + } + + private void scheduleCleanupStagingDirAfterCommit(Path stagingRoot) { + if (stagingRoot == null) { + return; + } + Runnable cleanup = () -> deleteDirectoryRecursivelyQuietly(stagingRoot); + if (TransactionSynchronizationManager.isSynchronizationActive()) { + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCommit() { + cleanup.run(); + } + }); + return; + } + cleanup.run(); + } + + private void deleteDirectoryRecursivelyQuietly(Path directory) { + if (directory == null) { + return; + } + if (!Files.exists(directory)) { + return; + } + try (Stream paths = Files.walk(directory)) { + paths.sorted(Comparator.reverseOrder()).forEach(path -> { + try { + Files.deleteIfExists(path); + } catch (IOException e) { + log.debug("Failed to delete: {}", path, e); + } + }); + } catch (IOException e) { + log.debug("Failed to cleanup staging dir: {}", directory, e); + } + } + + private String sanitizeArchiveFileName(String fileName) { + String input = fileName == null ? "" : fileName.trim(); + if (input.isBlank()) { + return "file"; + } + StringBuilder builder = new StringBuilder(input.length()); + for (int i = 0; i < input.length(); i++) { + char c = input.charAt(i); + if (c <= 31 || c == 127) { + builder.append('_'); + continue; + } + if (c == '/' || c == '\\' || c == ':' || c == '*' || c == '?' || c == '\"' + || c == '<' || c == '>' || c == '|') { + builder.append('_'); + continue; + } + builder.append(c); + } + String sanitized = builder.toString().trim(); + return sanitized.isEmpty() ? "file" : sanitized; + } + + private String sha256Hex(String value) { + String input = value == null ? "" : value; + try { + java.security.MessageDigest digest = java.security.MessageDigest.getInstance("SHA-256"); + byte[] hashed = digest.digest(input.getBytes(java.nio.charset.StandardCharsets.UTF_8)); + StringBuilder builder = new StringBuilder(hashed.length * 2); + for (byte b : hashed) { + builder.append(String.format("%02x", b)); + } + return builder.toString(); + } catch (Exception e) { + return Integer.toHexString(input.hashCode()); + } + } + + /** + * 获取文件详情 + */ + @Transactional(readOnly = true) public DatasetFile getDatasetFile(String datasetId, String fileId) { DatasetFile file = datasetFileRepository.getById(fileId); if (file == null) { @@ -345,14 +578,16 @@ public class DatasetFileApplicationService { /** * 删除文件 */ - @Transactional + @Transactional public void deleteDatasetFile(String datasetId, String fileId) { DatasetFile file = getDatasetFile(datasetId, fileId); Dataset dataset = datasetRepository.getById(datasetId); - dataset.setFiles(new ArrayList<>(Collections.singleton(file))); datasetFileRepository.removeById(fileId); - dataset.removeFile(file); - datasetRepository.updateById(dataset); + if (!isArchivedStatus(file)) { + dataset.setFiles(new ArrayList<>(Collections.singleton(file))); + dataset.removeFile(file); + datasetRepository.updateById(dataset); + } datasetFilePreviewService.deletePreviewFileQuietly(datasetId, fileId); // 删除文件时,上传到数据集中的文件会同时删除数据库中的记录和文件系统中的文件,归集过来的文件仅删除数据库中的记录 if (file.getFilePath().startsWith(dataset.getPath())) { @@ -388,29 +623,37 @@ public class DatasetFileApplicationService { * 下载文件 */ @Transactional(readOnly = true) - public void downloadDatasetFileAsZip(String datasetId, HttpServletResponse response) { - Dataset dataset = datasetRepository.getById(datasetId); - if (Objects.isNull(dataset)) { - throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND); - } - List allByDatasetId = datasetFileRepository.findAllByDatasetId(datasetId); - Set filePaths = allByDatasetId.stream().map(DatasetFile::getFilePath).collect(Collectors.toSet()); - String datasetPath = dataset.getPath(); - Path downloadPath = Path.of(datasetPath); - response.setContentType("application/zip"); - String zipName = String.format("dataset_%s.zip", - LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"))); - response.setHeader(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=" + zipName); - try (ZipArchiveOutputStream zos = new ZipArchiveOutputStream(response.getOutputStream())) { - try (Stream pathStream = Files.walk(downloadPath)) { - List allPaths = pathStream.filter(path -> path.toString().startsWith(datasetPath)) - .filter(path -> filePaths.stream().anyMatch(filePath -> filePath.startsWith(path.toString()))) - .toList(); - for (Path path : allPaths) { - addToZipFile(path, downloadPath, zos); - } - } - } catch (IOException e) { + public void downloadDatasetFileAsZip(String datasetId, HttpServletResponse response) { + Dataset dataset = datasetRepository.getById(datasetId); + if (Objects.isNull(dataset)) { + throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND); + } + Path datasetRoot = Paths.get(dataset.getPath()).toAbsolutePath().normalize(); + Set filePaths = datasetFileRepository.findAllVisibleByDatasetId(datasetId).stream() + .map(DatasetFile::getFilePath) + .filter(Objects::nonNull) + .map(path -> Paths.get(path).toAbsolutePath().normalize()) + .filter(path -> path.startsWith(datasetRoot)) + .filter(path -> !isInternalDatasetPath(datasetRoot, path)) + .collect(Collectors.toSet()); + Path downloadPath = datasetRoot; + response.setContentType("application/zip"); + String zipName = String.format("dataset_%s.zip", + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"))); + response.setHeader(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=" + zipName); + try (ZipArchiveOutputStream zos = new ZipArchiveOutputStream(response.getOutputStream())) { + try (Stream pathStream = Files.walk(downloadPath)) { + List allPaths = pathStream + .map(path -> path.toAbsolutePath().normalize()) + .filter(path -> path.startsWith(datasetRoot)) + .filter(path -> !isInternalDatasetPath(datasetRoot, path)) + .filter(path -> filePaths.stream().anyMatch(filePath -> filePath.startsWith(path))) + .toList(); + for (Path path : allPaths) { + addToZipFile(path, downloadPath, zos); + } + } + } catch (IOException e) { log.error("Failed to download files in batches.", e); throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); } @@ -454,41 +697,45 @@ public class DatasetFileApplicationService { * @param datasetId 数据集id * @return 请求id */ - @Transactional - public String preUpload(UploadFilesPreRequest chunkUploadRequest, String datasetId) { - Dataset dataset = datasetRepository.getById(datasetId); - if (Objects.isNull(dataset)) { - throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND); - } - - // 构建上传路径,如果有 prefix 则追加到路径中 - String prefix = Optional.ofNullable(chunkUploadRequest.getPrefix()).orElse("").trim(); - prefix = prefix.replace("\\", "/"); - while (prefix.startsWith("/")) { - prefix = prefix.substring(1); - } - - String uploadPath = dataset.getPath(); - if (uploadPath == null || uploadPath.isBlank()) { - uploadPath = datasetBasePath + File.separator + datasetId; - } - if (!prefix.isEmpty()) { - uploadPath = uploadPath + File.separator + prefix.replace("/", File.separator); - } - - ChunkUploadPreRequest request = ChunkUploadPreRequest.builder().build(); - request.setUploadPath(uploadPath); - request.setTotalFileNum(chunkUploadRequest.getTotalFileNum()); - request.setServiceId(DatasetConstant.SERVICE_ID); - DatasetFileUploadCheckInfo checkInfo = new DatasetFileUploadCheckInfo(); - checkInfo.setDatasetId(datasetId); - checkInfo.setHasArchive(chunkUploadRequest.isHasArchive()); - checkInfo.setPrefix(prefix); - try { - ObjectMapper objectMapper = new ObjectMapper(); - String checkInfoJson = objectMapper.writeValueAsString(checkInfo); - request.setCheckInfo(checkInfoJson); - } catch (JsonProcessingException e) { + @Transactional + public String preUpload(UploadFilesPreRequest chunkUploadRequest, String datasetId) { + Dataset dataset = datasetRepository.getById(datasetId); + if (Objects.isNull(dataset)) { + throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND); + } + + String prefix = normalizeLogicalPrefix(chunkUploadRequest == null ? null : chunkUploadRequest.getPrefix()); + assertNotInternalPrefix(prefix); + + Path datasetRoot = resolveDatasetRootPath(dataset, datasetId); + Path stagingRoot = datasetRoot + .resolve(INTERNAL_DIR_NAME) + .resolve(INTERNAL_UPLOAD_DIR_NAME) + .resolve(UUID.randomUUID().toString()) + .toAbsolutePath() + .normalize(); + BusinessAssert.isTrue(stagingRoot.startsWith(datasetRoot), CommonErrorCode.PARAM_ERROR); + try { + Files.createDirectories(stagingRoot); + } catch (IOException e) { + log.error("Failed to create staging dir: {}", stagingRoot, e); + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); + } + + ChunkUploadPreRequest request = ChunkUploadPreRequest.builder().build(); + request.setUploadPath(stagingRoot.toString()); + request.setTotalFileNum(chunkUploadRequest.getTotalFileNum()); + request.setServiceId(DatasetConstant.SERVICE_ID); + DatasetFileUploadCheckInfo checkInfo = new DatasetFileUploadCheckInfo(); + checkInfo.setDatasetId(datasetId); + checkInfo.setHasArchive(chunkUploadRequest.isHasArchive()); + checkInfo.setPrefix(prefix); + checkInfo.setStagingPath(stagingRoot.toString()); + try { + ObjectMapper objectMapper = new ObjectMapper(); + String checkInfoJson = objectMapper.writeValueAsString(checkInfo); + request.setCheckInfo(checkInfoJson); + } catch (JsonProcessingException e) { log.warn("Failed to serialize checkInfo to JSON", e); } return fileService.preUpload(request); @@ -513,55 +760,274 @@ public class DatasetFileApplicationService { fileService.cancelUpload(reqId); } - private void saveFileInfoToDb(FileUploadResult fileUploadResult, String datasetId) { - if (Objects.isNull(fileUploadResult.getSavedFile())) { - // 文件切片上传没有完成 - return; - } - DatasetFileUploadCheckInfo checkInfo; - try { - ObjectMapper objectMapper = new ObjectMapper(); - checkInfo = objectMapper.readValue(fileUploadResult.getCheckInfo(), DatasetFileUploadCheckInfo.class); - if (!Objects.equals(checkInfo.getDatasetId(), datasetId)) { - throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND); - } - } catch (IllegalArgumentException | JsonProcessingException e) { - log.warn("Failed to convert checkInfo to DatasetFileUploadCheckInfo", e); - throw BusinessException.of(CommonErrorCode.PRE_UPLOAD_REQUEST_NOT_EXIST); - } - List files; - if (checkInfo.isHasArchive() && AnalyzerUtils.isPackage(fileUploadResult.getSavedFile().getPath())) { - files = ArchiveAnalyzer.process(fileUploadResult); - } else { - files = Collections.singletonList(fileUploadResult); - } - addFileToDataset(datasetId, files); - } - - private void addFileToDataset(String datasetId, List unpacked) { - Dataset dataset = datasetRepository.getById(datasetId); - dataset.setFiles(datasetFileRepository.findAllByDatasetId(datasetId)); - for (FileUploadResult file : unpacked) { - File savedFile = file.getSavedFile(); - LocalDateTime currentTime = LocalDateTime.now(); - DatasetFile datasetFile = DatasetFile.builder() - .id(UUID.randomUUID().toString()) - .datasetId(datasetId) - .fileSize(savedFile.length()) - .uploadTime(currentTime) - .lastAccessTime(currentTime) - .fileName(file.getFileName()) - .filePath(savedFile.getPath()) - .fileType(AnalyzerUtils.getExtension(file.getFileName())) - .build(); - setDatasetFileId(datasetFile, dataset); - datasetFileRepository.saveOrUpdate(datasetFile); - dataset.addFile(datasetFile); - triggerPdfTextExtraction(dataset, datasetFile); - } - dataset.active(); - datasetRepository.updateById(dataset); - } + private void saveFileInfoToDb(FileUploadResult fileUploadResult, String datasetId) { + if (Objects.isNull(fileUploadResult.getSavedFile())) { + // 文件切片上传没有完成 + return; + } + DatasetFileUploadCheckInfo checkInfo; + try { + ObjectMapper objectMapper = new ObjectMapper(); + checkInfo = objectMapper.readValue(fileUploadResult.getCheckInfo(), DatasetFileUploadCheckInfo.class); + if (!Objects.equals(checkInfo.getDatasetId(), datasetId)) { + throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND); + } + } catch (IllegalArgumentException | JsonProcessingException e) { + log.warn("Failed to convert checkInfo to DatasetFileUploadCheckInfo", e); + throw BusinessException.of(CommonErrorCode.PRE_UPLOAD_REQUEST_NOT_EXIST); + } + List files; + if (checkInfo.isHasArchive() && AnalyzerUtils.isPackage(fileUploadResult.getSavedFile().getPath())) { + files = ArchiveAnalyzer.process(fileUploadResult); + } else { + files = Collections.singletonList(fileUploadResult); + } + commitUploadedFiles(datasetId, checkInfo, files, fileUploadResult.isAllFilesUploaded()); + } + + private void commitUploadedFiles(String datasetId, + DatasetFileUploadCheckInfo checkInfo, + List uploadedFiles, + boolean cleanupStagingAfterCommit) { + Dataset dataset = datasetRepository.getById(datasetId); + BusinessAssert.notNull(dataset, DataManagementErrorCode.DATASET_NOT_FOUND); + + Path datasetRoot = resolveDatasetRootPath(dataset, datasetId); + String prefix = checkInfo == null ? "" : normalizeLogicalPrefix(checkInfo.getPrefix()); + assertNotInternalPrefix(prefix); + + Path stagingRoot = resolveStagingRootPath(datasetRoot, checkInfo, uploadedFiles); + BusinessAssert.notNull(stagingRoot, CommonErrorCode.PARAM_ERROR); + + dataset.setFiles(new ArrayList<>(datasetFileRepository.findAllVisibleByDatasetId(datasetId))); + for (FileUploadResult fileResult : uploadedFiles) { + commitSingleUploadedFile(dataset, datasetRoot, stagingRoot, prefix, fileResult); + } + + dataset.active(); + datasetRepository.updateById(dataset); + + if (cleanupStagingAfterCommit) { + scheduleCleanupStagingDirAfterCommit(stagingRoot); + } + } + + private void commitSingleUploadedFile(Dataset dataset, + Path datasetRoot, + Path stagingRoot, + String prefix, + FileUploadResult fileResult) { + if (dataset == null || fileResult == null || fileResult.getSavedFile() == null) { + return; + } + Path incomingPath = Paths.get(fileResult.getSavedFile().getPath()).toAbsolutePath().normalize(); + BusinessAssert.isTrue(incomingPath.startsWith(stagingRoot), CommonErrorCode.PARAM_ERROR); + + String relativePath = stagingRoot.relativize(incomingPath).toString().replace(File.separator, "/"); + String logicalPath = joinLogicalPath(prefix, relativePath); + assertNotInternalPrefix(logicalPath); + + commitNewFileVersion(dataset, datasetRoot, logicalPath, incomingPath, true); + } + + private DatasetFile commitNewFileVersion(Dataset dataset, + Path datasetRoot, + String logicalPath, + Path incomingFilePath, + boolean moveIncoming) { + BusinessAssert.notNull(dataset, CommonErrorCode.PARAM_ERROR); + BusinessAssert.isTrue(datasetRoot != null && Files.exists(datasetRoot), CommonErrorCode.PARAM_ERROR); + + String normalizedLogicalPath = normalizeLogicalPath(logicalPath); + BusinessAssert.isTrue(!normalizedLogicalPath.isEmpty(), CommonErrorCode.PARAM_ERROR); + assertNotInternalPrefix(normalizedLogicalPath); + + Path targetFilePath = datasetRoot.resolve(normalizedLogicalPath.replace("/", File.separator)) + .toAbsolutePath() + .normalize(); + BusinessAssert.isTrue(targetFilePath.startsWith(datasetRoot), CommonErrorCode.PARAM_ERROR); + + DuplicateMethod effectiveDuplicateMethod = resolveEffectiveDuplicateMethod(); + DatasetFile latest = datasetFileRepository.findLatestByDatasetIdAndLogicalPath(dataset.getId(), normalizedLogicalPath); + if (latest == null && dataset.getFiles() != null) { + latest = dataset.getFiles().stream() + .filter(existing -> isSameNormalizedPath(existing == null ? null : existing.getFilePath(), targetFilePath.toString())) + .findFirst() + .orElse(null); + } + if (latest != null && effectiveDuplicateMethod == DuplicateMethod.ERROR) { + throw BusinessException.of(DataManagementErrorCode.DATASET_FILE_ALREADY_EXISTS); + } + + long nextVersion = 1L; + if (latest != null) { + long latestVersion = Optional.ofNullable(latest.getVersion()).orElse(1L); + if (latest.getVersion() == null) { + latest.setVersion(latestVersion); + } + if (latest.getLogicalPath() == null || latest.getLogicalPath().isBlank()) { + latest.setLogicalPath(normalizedLogicalPath); + } + nextVersion = latestVersion + 1L; + } + + if (latest != null && effectiveDuplicateMethod == DuplicateMethod.VERSION) { + Path archivedPath = archiveDatasetFileVersion(datasetRoot, normalizedLogicalPath, latest); + if (archivedPath != null) { + latest.setFilePath(archivedPath.toString()); + } else if (Files.exists(targetFilePath)) { + log.error("Failed to archive latest file, refuse to overwrite. datasetId={}, fileId={}, logicalPath={}, targetPath={}", + dataset.getId(), latest.getId(), normalizedLogicalPath, targetFilePath); + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); + } + latest.setStatus(FILE_STATUS_ARCHIVED); + datasetFileRepository.updateById(latest); + dataset.removeFile(latest); + } else if (latest == null && Files.exists(targetFilePath)) { + archiveOrphanTargetFile(datasetRoot, normalizedLogicalPath, targetFilePath); + } + + try { + Files.createDirectories(targetFilePath.getParent()); + if (moveIncoming) { + Files.move(incomingFilePath, targetFilePath, java.nio.file.StandardCopyOption.REPLACE_EXISTING); + } else { + Files.copy(incomingFilePath, targetFilePath, java.nio.file.StandardCopyOption.REPLACE_EXISTING); + } + } catch (IOException e) { + log.error("Failed to write dataset file, datasetId={}, logicalPath={}, targetPath={}", + dataset.getId(), normalizedLogicalPath, targetFilePath, e); + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); + } + + LocalDateTime currentTime = LocalDateTime.now(); + String fileName = targetFilePath.getFileName().toString(); + long fileSize; + try { + fileSize = Files.size(targetFilePath); + } catch (IOException e) { + fileSize = 0L; + } + + DatasetFile datasetFile = DatasetFile.builder() + .id(UUID.randomUUID().toString()) + .datasetId(dataset.getId()) + .fileName(fileName) + .fileType(AnalyzerUtils.getExtension(fileName)) + .fileSize(fileSize) + .filePath(targetFilePath.toString()) + .logicalPath(normalizedLogicalPath) + .version(nextVersion) + .status(FILE_STATUS_ACTIVE) + .uploadTime(currentTime) + .lastAccessTime(currentTime) + .build(); + + datasetFileRepository.saveOrUpdate(datasetFile); + dataset.addFile(datasetFile); + triggerPdfTextExtraction(dataset, datasetFile); + return datasetFile; + } + + private DuplicateMethod resolveEffectiveDuplicateMethod() { + if (duplicateMethod == null) { + return DuplicateMethod.VERSION; + } + if (duplicateMethod == DuplicateMethod.COVER) { + log.warn("duplicateMethod=COVER 会导致标注引用的 fileId 对应内容被覆盖,已强制按 VERSION 处理。"); + return DuplicateMethod.VERSION; + } + return duplicateMethod; + } + + private Path archiveDatasetFileVersion(Path datasetRoot, String logicalPath, DatasetFile latest) { + if (latest == null || latest.getId() == null || latest.getId().isBlank()) { + return null; + } + Path currentPath; + try { + currentPath = Paths.get(latest.getFilePath()).toAbsolutePath().normalize(); + } catch (Exception e) { + log.warn("Invalid latest file path, skip archiving. datasetId={}, fileId={}, filePath={}", + latest.getDatasetId(), latest.getId(), latest.getFilePath()); + return null; + } + + if (!Files.exists(currentPath) || !Files.isRegularFile(currentPath)) { + log.warn("Latest file not found on disk, skip archiving. datasetId={}, fileId={}, filePath={}", + latest.getDatasetId(), latest.getId(), currentPath); + return null; + } + if (!currentPath.startsWith(datasetRoot)) { + log.warn("Latest file path out of dataset root, skip archiving. datasetId={}, fileId={}, filePath={}", + latest.getDatasetId(), latest.getId(), currentPath); + return null; + } + + long latestVersion = Optional.ofNullable(latest.getVersion()).orElse(1L); + String logicalPathHash = sha256Hex(logicalPath); + Path archiveDir = datasetRoot + .resolve(INTERNAL_DIR_NAME) + .resolve(INTERNAL_VERSIONS_DIR_NAME) + .resolve(logicalPathHash) + .resolve("v" + latestVersion) + .toAbsolutePath() + .normalize(); + BusinessAssert.isTrue(archiveDir.startsWith(datasetRoot), CommonErrorCode.PARAM_ERROR); + + try { + Files.createDirectories(archiveDir); + } catch (IOException e) { + log.error("Failed to create archive dir: {}", archiveDir, e); + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); + } + + String fileName = sanitizeArchiveFileName(Optional.ofNullable(latest.getFileName()).orElse(currentPath.getFileName().toString())); + Path archivedPath = archiveDir.resolve(latest.getId() + "__" + fileName).toAbsolutePath().normalize(); + BusinessAssert.isTrue(archivedPath.startsWith(archiveDir), CommonErrorCode.PARAM_ERROR); + + try { + Files.move(currentPath, archivedPath, java.nio.file.StandardCopyOption.REPLACE_EXISTING); + return archivedPath; + } catch (IOException e) { + log.error("Failed to archive latest file, datasetId={}, fileId={}, from={}, to={}", + latest.getDatasetId(), latest.getId(), currentPath, archivedPath, e); + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); + } + } + + private void archiveOrphanTargetFile(Path datasetRoot, String logicalPath, Path targetFilePath) { + if (datasetRoot == null || targetFilePath == null) { + return; + } + if (!Files.exists(targetFilePath) || !Files.isRegularFile(targetFilePath)) { + return; + } + String logicalPathHash = sha256Hex(logicalPath); + Path orphanDir = datasetRoot + .resolve(INTERNAL_DIR_NAME) + .resolve(INTERNAL_VERSIONS_DIR_NAME) + .resolve(logicalPathHash) + .resolve("orphan") + .toAbsolutePath() + .normalize(); + if (!orphanDir.startsWith(datasetRoot)) { + return; + } + try { + Files.createDirectories(orphanDir); + String safeName = sanitizeArchiveFileName(targetFilePath.getFileName().toString()); + Path orphanPath = orphanDir.resolve("orphan_" + System.currentTimeMillis() + "__" + safeName) + .toAbsolutePath() + .normalize(); + if (!orphanPath.startsWith(orphanDir)) { + return; + } + Files.move(targetFilePath, orphanPath, java.nio.file.StandardCopyOption.REPLACE_EXISTING); + } catch (Exception e) { + log.warn("Failed to archive orphan target file, logicalPath={}, targetPath={}", logicalPath, targetFilePath, e); + } + } /** * 在数据集下创建子目录 @@ -573,19 +1039,24 @@ public class DatasetFileApplicationService { throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND); } String datasetPath = dataset.getPath(); - String parentPrefix = Optional.ofNullable(req.getParentPrefix()).orElse("").trim(); - parentPrefix = parentPrefix.replace("\\", "/"); - while (parentPrefix.startsWith("/")) { - parentPrefix = parentPrefix.substring(1); - } - - String directoryName = Optional.ofNullable(req.getDirectoryName()).orElse("").trim(); - if (directoryName.isEmpty()) { - throw BusinessException.of(CommonErrorCode.PARAM_ERROR); - } - if (directoryName.contains("..") || directoryName.contains("/") || directoryName.contains("\\")) { - throw BusinessException.of(CommonErrorCode.PARAM_ERROR); - } + String parentPrefix = Optional.ofNullable(req.getParentPrefix()).orElse("").trim(); + parentPrefix = parentPrefix.replace("\\", "/"); + while (parentPrefix.startsWith("/")) { + parentPrefix = parentPrefix.substring(1); + } + parentPrefix = normalizeLogicalPrefix(parentPrefix); + assertNotInternalPrefix(parentPrefix); + + String directoryName = Optional.ofNullable(req.getDirectoryName()).orElse("").trim(); + if (directoryName.isEmpty()) { + throw BusinessException.of(CommonErrorCode.PARAM_ERROR); + } + if (INTERNAL_DIR_NAME.equals(directoryName)) { + throw BusinessException.of(CommonErrorCode.PARAM_ERROR); + } + if (directoryName.contains("..") || directoryName.contains("/") || directoryName.contains("\\")) { + throw BusinessException.of(CommonErrorCode.PARAM_ERROR); + } Path basePath = Paths.get(datasetPath); Path targetPath = parentPrefix.isEmpty() @@ -621,13 +1092,16 @@ public class DatasetFileApplicationService { while (prefix.startsWith("/")) { prefix = prefix.substring(1); } - while (prefix.endsWith("/")) { - prefix = prefix.substring(0, prefix.length() - 1); - } - - Path basePath = Paths.get(datasetPath); - Path targetPath = prefix.isEmpty() ? basePath : basePath.resolve(prefix); - Path normalized = targetPath.normalize(); + while (prefix.endsWith("/")) { + prefix = prefix.substring(0, prefix.length() - 1); + } + if (prefix.equals(INTERNAL_DIR_NAME) || prefix.startsWith(INTERNAL_DIR_NAME + "/")) { + throw BusinessException.of(CommonErrorCode.PARAM_ERROR); + } + + Path basePath = Paths.get(datasetPath); + Path targetPath = prefix.isEmpty() ? basePath : basePath.resolve(prefix); + Path normalized = targetPath.normalize(); if (!normalized.startsWith(basePath)) { throw BusinessException.of(CommonErrorCode.PARAM_ERROR); @@ -657,14 +1131,15 @@ public class DatasetFileApplicationService { /** * 递归压缩目录 */ - private void zipDirectory(Path sourceDir, Path basePath, ZipArchiveOutputStream zipOut) throws IOException { - try (Stream paths = Files.walk(sourceDir)) { - paths.filter(path -> !Files.isDirectory(path)) - .forEach(path -> { - try { - Path relativePath = basePath.relativize(path); - ZipArchiveEntry zipEntry = new ZipArchiveEntry(relativePath.toString()); - zipOut.putArchiveEntry(zipEntry); + private void zipDirectory(Path sourceDir, Path basePath, ZipArchiveOutputStream zipOut) throws IOException { + try (Stream paths = Files.walk(sourceDir)) { + paths.filter(path -> !Files.isDirectory(path)) + .filter(path -> !isInternalDatasetPath(basePath.toAbsolutePath().normalize(), path)) + .forEach(path -> { + try { + Path relativePath = basePath.relativize(path); + ZipArchiveEntry zipEntry = new ZipArchiveEntry(relativePath.toString()); + zipOut.putArchiveEntry(zipEntry); try (InputStream fis = Files.newInputStream(path)) { IOUtils.copy(fis, zipOut); } @@ -695,13 +1170,16 @@ public class DatasetFileApplicationService { prefix = prefix.substring(0, prefix.length() - 1); } - if (prefix.isEmpty()) { - throw BusinessException.of(CommonErrorCode.PARAM_ERROR); - } - - String datasetPath = dataset.getPath(); - Path basePath = Paths.get(datasetPath); - Path targetPath = basePath.resolve(prefix); + if (prefix.isEmpty()) { + throw BusinessException.of(CommonErrorCode.PARAM_ERROR); + } + if (prefix.equals(INTERNAL_DIR_NAME) || prefix.startsWith(INTERNAL_DIR_NAME + "/")) { + throw BusinessException.of(CommonErrorCode.PARAM_ERROR); + } + + String datasetPath = dataset.getPath(); + Path basePath = Paths.get(datasetPath); + Path targetPath = basePath.resolve(prefix); Path normalized = targetPath.normalize(); if (!normalized.startsWith(basePath)) { @@ -769,28 +1247,6 @@ public class DatasetFileApplicationService { } } - /** - * 为数据集文件设置文件id - * - * @param datasetFile 要设置id的文件 - * @param dataset 数据集(包含文件列表) - */ - private void setDatasetFileId(DatasetFile datasetFile, Dataset dataset) { - Map existDatasetFilMap = dataset.getFiles().stream().collect(Collectors.toMap(DatasetFile::getFilePath, Function.identity())); - DatasetFile existDatasetFile = existDatasetFilMap.get(datasetFile.getFilePath()); - if (Objects.isNull(existDatasetFile)) { - return; - } - if (duplicateMethod == DuplicateMethod.ERROR) { - log.error("file {} already exists in dataset {}", datasetFile.getFileName(), datasetFile.getDatasetId()); - throw BusinessException.of(DataManagementErrorCode.DATASET_FILE_ALREADY_EXISTS); - } - if (duplicateMethod == DuplicateMethod.COVER) { - dataset.removeFile(existDatasetFile); - datasetFile.setId(existDatasetFile.getId()); - } - } - /** * 复制文件到数据集目录 * @@ -798,42 +1254,27 @@ public class DatasetFileApplicationService { * @param req 复制文件请求 * @return 复制的文件列表 */ - @Transactional - public List copyFilesToDatasetDir(String datasetId, CopyFilesRequest req) { - Dataset dataset = datasetRepository.getById(datasetId); - BusinessAssert.notNull(dataset, SystemErrorCode.RESOURCE_NOT_FOUND); - List copiedFiles = new ArrayList<>(); - List existDatasetFiles = datasetFileRepository.findAllByDatasetId(datasetId); - dataset.setFiles(existDatasetFiles); - for (String sourceFilePath : req.sourcePaths()) { - Path sourcePath = Paths.get(sourceFilePath); - if (!Files.exists(sourcePath) || !Files.isRegularFile(sourcePath)) { - log.warn("Source file does not exist or is not a regular file: {}", sourceFilePath); - continue; - } - String fileName = sourcePath.getFileName().toString(); - File sourceFile = sourcePath.toFile(); - LocalDateTime currentTime = LocalDateTime.now(); - DatasetFile datasetFile = DatasetFile.builder() - .id(UUID.randomUUID().toString()) - .datasetId(datasetId) - .fileName(fileName) - .fileType(AnalyzerUtils.getExtension(fileName)) - .fileSize(sourceFile.length()) - .filePath(Paths.get(dataset.getPath(), fileName).toString()) - .uploadTime(currentTime) - .lastAccessTime(currentTime) - .build(); - setDatasetFileId(datasetFile, dataset); - dataset.addFile(datasetFile); - copiedFiles.add(datasetFile); - } - datasetFileRepository.saveOrUpdateBatch(copiedFiles, 100); - dataset.active(); - datasetRepository.updateById(dataset); - CompletableFuture.runAsync(() -> copyFilesToDatasetDir(req.sourcePaths(), dataset)); - return copiedFiles; - } + @Transactional + public List copyFilesToDatasetDir(String datasetId, CopyFilesRequest req) { + Dataset dataset = datasetRepository.getById(datasetId); + BusinessAssert.notNull(dataset, SystemErrorCode.RESOURCE_NOT_FOUND); + Path datasetRoot = resolveDatasetRootPath(dataset, datasetId); + dataset.setFiles(new ArrayList<>(datasetFileRepository.findAllVisibleByDatasetId(datasetId))); + List copiedFiles = new ArrayList<>(); + for (String sourceFilePath : req.sourcePaths()) { + Path sourcePath = Paths.get(sourceFilePath).toAbsolutePath().normalize(); + if (!Files.exists(sourcePath) || !Files.isRegularFile(sourcePath)) { + log.warn("Source file does not exist or is not a regular file: {}", sourceFilePath); + continue; + } + String logicalPath = sourcePath.getFileName().toString(); + DatasetFile datasetFile = commitNewFileVersion(dataset, datasetRoot, logicalPath, sourcePath, false); + copiedFiles.add(datasetFile); + } + dataset.active(); + datasetRepository.updateById(dataset); + return copiedFiles; + } /** * 复制文件到数据集目录(保留相对路径,适用于数据源导入) @@ -843,109 +1284,37 @@ public class DatasetFileApplicationService { * @param sourcePaths 源文件路径列表 * @return 复制的文件列表 */ - @Transactional - public List copyFilesToDatasetDirWithSourceRoot(String datasetId, Path sourceRoot, List sourcePaths) { - Dataset dataset = datasetRepository.getById(datasetId); - BusinessAssert.notNull(dataset, SystemErrorCode.RESOURCE_NOT_FOUND); - - Path normalizedRoot = sourceRoot.toAbsolutePath().normalize(); - List copiedFiles = new ArrayList<>(); - List existDatasetFiles = datasetFileRepository.findAllByDatasetId(datasetId); - dataset.setFiles(existDatasetFiles); - Map copyTargets = new LinkedHashMap<>(); - - for (String sourceFilePath : sourcePaths) { - if (sourceFilePath == null || sourceFilePath.isBlank()) { - continue; - } - Path sourcePath = Paths.get(sourceFilePath).toAbsolutePath().normalize(); - if (!sourcePath.startsWith(normalizedRoot)) { - log.warn("Source file path is out of root: {}", sourceFilePath); - continue; - } - if (!Files.exists(sourcePath) || !Files.isRegularFile(sourcePath)) { - log.warn("Source file does not exist or is not a regular file: {}", sourceFilePath); - continue; - } - - Path relativePath = normalizedRoot.relativize(sourcePath); - String fileName = sourcePath.getFileName().toString(); - File sourceFile = sourcePath.toFile(); - LocalDateTime currentTime = LocalDateTime.now(); - Path targetPath = Paths.get(dataset.getPath(), relativePath.toString()); - - DatasetFile datasetFile = DatasetFile.builder() - .id(UUID.randomUUID().toString()) - .datasetId(datasetId) - .fileName(fileName) - .fileType(AnalyzerUtils.getExtension(fileName)) - .fileSize(sourceFile.length()) - .filePath(targetPath.toString()) - .uploadTime(currentTime) - .lastAccessTime(currentTime) - .build(); - setDatasetFileId(datasetFile, dataset); - dataset.addFile(datasetFile); - copiedFiles.add(datasetFile); - copyTargets.put(sourceFilePath, datasetFile); - } - - if (copiedFiles.isEmpty()) { - return copiedFiles; - } - datasetFileRepository.saveOrUpdateBatch(copiedFiles, 100); - dataset.active(); - datasetRepository.updateById(dataset); - CompletableFuture.runAsync(() -> copyFilesToDatasetDirWithRelativePath(copyTargets, dataset, normalizedRoot)); - return copiedFiles; - } - - private void copyFilesToDatasetDir(List sourcePaths, Dataset dataset) { - for (String sourcePath : sourcePaths) { - Path sourceFilePath = Paths.get(sourcePath); - Path targetFilePath = Paths.get(dataset.getPath(), sourceFilePath.getFileName().toString()); - try { - Files.createDirectories(Path.of(dataset.getPath())); - Files.copy(sourceFilePath, targetFilePath); - DatasetFile datasetFile = datasetFileRepository.findByDatasetIdAndFileName( - dataset.getId(), - sourceFilePath.getFileName().toString() - ); - triggerPdfTextExtraction(dataset, datasetFile); - } catch (IOException e) { - log.error("Failed to copy file from {} to {}", sourcePath, targetFilePath, e); - } - } - } - - private void copyFilesToDatasetDirWithRelativePath( - Map copyTargets, - Dataset dataset, - Path sourceRoot - ) { - Path datasetRoot = Paths.get(dataset.getPath()).toAbsolutePath().normalize(); - Path normalizedRoot = sourceRoot.toAbsolutePath().normalize(); - for (Map.Entry entry : copyTargets.entrySet()) { - Path sourcePath = Paths.get(entry.getKey()).toAbsolutePath().normalize(); - if (!sourcePath.startsWith(normalizedRoot)) { - log.warn("Source file path is out of root: {}", sourcePath); - continue; - } - Path relativePath = normalizedRoot.relativize(sourcePath); - Path targetFilePath = datasetRoot.resolve(relativePath).normalize(); - if (!targetFilePath.startsWith(datasetRoot)) { - log.warn("Target file path is out of dataset path: {}", targetFilePath); - continue; - } - try { - Files.createDirectories(targetFilePath.getParent()); - Files.copy(sourcePath, targetFilePath); - triggerPdfTextExtraction(dataset, entry.getValue()); - } catch (IOException e) { - log.error("Failed to copy file from {} to {}", sourcePath, targetFilePath, e); - } - } - } + @Transactional + public List copyFilesToDatasetDirWithSourceRoot(String datasetId, Path sourceRoot, List sourcePaths) { + Dataset dataset = datasetRepository.getById(datasetId); + BusinessAssert.notNull(dataset, SystemErrorCode.RESOURCE_NOT_FOUND); + Path datasetRoot = resolveDatasetRootPath(dataset, datasetId); + Path normalizedRoot = sourceRoot.toAbsolutePath().normalize(); + dataset.setFiles(new ArrayList<>(datasetFileRepository.findAllVisibleByDatasetId(datasetId))); + + List copiedFiles = new ArrayList<>(); + for (String sourceFilePath : sourcePaths) { + if (sourceFilePath == null || sourceFilePath.isBlank()) { + continue; + } + Path sourcePath = Paths.get(sourceFilePath).toAbsolutePath().normalize(); + if (!sourcePath.startsWith(normalizedRoot)) { + log.warn("Source file path is out of root: {}", sourceFilePath); + continue; + } + if (!Files.exists(sourcePath) || !Files.isRegularFile(sourcePath)) { + log.warn("Source file does not exist or is not a regular file: {}", sourceFilePath); + continue; + } + Path relativePath = normalizedRoot.relativize(sourcePath); + String logicalPath = relativePath.toString().replace("\\", "/"); + DatasetFile datasetFile = commitNewFileVersion(dataset, datasetRoot, logicalPath, sourcePath, false); + copiedFiles.add(datasetFile); + } + dataset.active(); + datasetRepository.updateById(dataset); + return copiedFiles; + } /** * 添加文件到数据集(仅创建数据库记录,不执行文件系统操作) @@ -954,52 +1323,89 @@ public class DatasetFileApplicationService { * @param req 添加文件请求 * @return 添加的文件列表 */ - @Transactional - public List addFilesToDataset(String datasetId, AddFilesRequest req) { - Dataset dataset = datasetRepository.getById(datasetId); - BusinessAssert.notNull(dataset, SystemErrorCode.RESOURCE_NOT_FOUND); - List addedFiles = new ArrayList<>(); - List existDatasetFiles = datasetFileRepository.findAllByDatasetId(datasetId); - dataset.setFiles(existDatasetFiles); - - boolean softAdd = req.softAdd(); - String metadata; - try { - Map metadataMap = Map.of("softAdd", softAdd); + @Transactional + public List addFilesToDataset(String datasetId, AddFilesRequest req) { + Dataset dataset = datasetRepository.getById(datasetId); + BusinessAssert.notNull(dataset, SystemErrorCode.RESOURCE_NOT_FOUND); + List addedFiles = new ArrayList<>(); + dataset.setFiles(new ArrayList<>(datasetFileRepository.findAllVisibleByDatasetId(datasetId))); + + boolean softAdd = req.softAdd(); + String metadata; + try { + Map metadataMap = Map.of("softAdd", softAdd); ObjectMapper objectMapper = new ObjectMapper(); metadata = objectMapper.writeValueAsString(metadataMap); } catch (JsonProcessingException e) { log.error("Failed to serialize metadataMap", e); throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR); - } - - for (String sourceFilePath : req.sourcePaths()) { - Path sourcePath = Paths.get(sourceFilePath); - String fileName = sourcePath.getFileName().toString(); - File sourceFile = sourcePath.toFile(); - LocalDateTime currentTime = LocalDateTime.now(); - - DatasetFile datasetFile = DatasetFile.builder() - .id(UUID.randomUUID().toString()) - .datasetId(datasetId) - .fileName(fileName) - .fileType(AnalyzerUtils.getExtension(fileName)) - .fileSize(sourceFile.length()) - .filePath(sourceFilePath) - .uploadTime(currentTime) - .lastAccessTime(currentTime) - .metadata(metadata) - .build(); - setDatasetFileId(datasetFile, dataset); - dataset.addFile(datasetFile); - addedFiles.add(datasetFile); - triggerPdfTextExtraction(dataset, datasetFile); - } - datasetFileRepository.saveOrUpdateBatch(addedFiles, 100); - dataset.active(); - datasetRepository.updateById(dataset); - // Note: addFilesToDataset only creates DB records, no file system operations - // If file copy is needed, use copyFilesToDatasetDir endpoint instead + } + + for (String sourceFilePath : req.sourcePaths()) { + Path sourcePath = Paths.get(sourceFilePath); + String fileName = sourcePath.getFileName().toString(); + File sourceFile = sourcePath.toFile(); + String logicalPath = normalizeLogicalPath(fileName); + assertNotInternalPrefix(logicalPath); + + DatasetFile latest = datasetFileRepository.findLatestByDatasetIdAndLogicalPath(datasetId, logicalPath); + if (latest == null && dataset.getFiles() != null) { + latest = dataset.getFiles().stream() + .filter(existing -> existing != null + && !isArchivedStatus(existing) + && Objects.equals(existing.getFileName(), fileName)) + .findFirst() + .orElse(null); + } + + DuplicateMethod effectiveDuplicateMethod = resolveEffectiveDuplicateMethod(); + if (latest != null && effectiveDuplicateMethod == DuplicateMethod.ERROR) { + throw BusinessException.of(DataManagementErrorCode.DATASET_FILE_ALREADY_EXISTS); + } + + long nextVersion = 1L; + if (latest != null) { + long latestVersion = Optional.ofNullable(latest.getVersion()).orElse(1L); + if (latest.getVersion() == null) { + latest.setVersion(latestVersion); + } + if (latest.getLogicalPath() == null || latest.getLogicalPath().isBlank()) { + latest.setLogicalPath(logicalPath); + } + nextVersion = latestVersion + 1L; + } + + if (latest != null && effectiveDuplicateMethod == DuplicateMethod.VERSION) { + latest.setStatus(FILE_STATUS_ARCHIVED); + datasetFileRepository.updateById(latest); + dataset.removeFile(latest); + } + + LocalDateTime currentTime = LocalDateTime.now(); + DatasetFile datasetFile = DatasetFile.builder() + .id(UUID.randomUUID().toString()) + .datasetId(datasetId) + .fileName(fileName) + .fileType(AnalyzerUtils.getExtension(fileName)) + .fileSize(sourceFile.length()) + .filePath(sourceFilePath) + .logicalPath(logicalPath) + .version(nextVersion) + .status(FILE_STATUS_ACTIVE) + .uploadTime(currentTime) + .lastAccessTime(currentTime) + .metadata(metadata) + .build(); + + datasetFileRepository.saveOrUpdate(datasetFile); + dataset.addFile(datasetFile); + addedFiles.add(datasetFile); + triggerPdfTextExtraction(dataset, datasetFile); + } + dataset.active(); + datasetRepository.updateById(dataset); + // Note: addFilesToDataset only creates DB records, no file system operations + // If file copy is needed, use copyFilesToDatasetDir endpoint instead return addedFiles; } diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/common/enums/DuplicateMethod.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/common/enums/DuplicateMethod.java index 56c886e..e6a04b4 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/common/enums/DuplicateMethod.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/common/enums/DuplicateMethod.java @@ -7,5 +7,6 @@ package com.datamate.datamanagement.common.enums; */ public enum DuplicateMethod { ERROR, - COVER + COVER, + VERSION } diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/domain/model/dataset/Dataset.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/domain/model/dataset/Dataset.java index cf5375f..510ca2a 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/domain/model/dataset/Dataset.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/domain/model/dataset/Dataset.java @@ -152,11 +152,19 @@ public class Dataset extends BaseEntity { } public void removeFile(DatasetFile file) { - if (this.files.remove(file)) { - this.fileCount = Math.max(0, this.fileCount - 1); - this.sizeBytes = Math.max(0, this.sizeBytes - (file.getFileSize() != null ? file.getFileSize() : 0L)); - this.updatedAt = LocalDateTime.now(); + if (file == null) { + return; } + boolean removed = this.files.remove(file); + if (!removed && file.getId() != null) { + removed = this.files.removeIf(existing -> Objects.equals(existing.getId(), file.getId())); + } + if (!removed) { + return; + } + this.fileCount = Math.max(0, this.fileCount - 1); + this.sizeBytes = Math.max(0, this.sizeBytes - (file.getFileSize() != null ? file.getFileSize() : 0L)); + this.updatedAt = LocalDateTime.now(); } public void active() { diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/domain/model/dataset/DatasetFile.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/domain/model/dataset/DatasetFile.java index 45a58a1..31e1e67 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/domain/model/dataset/DatasetFile.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/domain/model/dataset/DatasetFile.java @@ -22,22 +22,26 @@ import java.util.List; @NoArgsConstructor @AllArgsConstructor @TableName("t_dm_dataset_files") -public class DatasetFile { - @TableId - private String id; // UUID - private String datasetId; // UUID - private String fileName; - private String filePath; - private String fileType; // JPG/PNG/DCM/TXT - private Long fileSize; // bytes - private String checkSum; - private String tags; - private String metadata; - private String status; // UPLOADED, PROCESSING, COMPLETED, ERROR - private LocalDateTime uploadTime; - private LocalDateTime lastAccessTime; - private LocalDateTime createdAt; - private LocalDateTime updatedAt; +public class DatasetFile { + @TableId + private String id; // UUID + private String datasetId; // UUID + private String fileName; + private String filePath; + /** 文件逻辑路径(相对数据集根目录,包含子目录) */ + private String logicalPath; + /** 文件版本号(同一个 logicalPath 下递增) */ + private Long version; + private String fileType; // JPG/PNG/DCM/TXT + private Long fileSize; // bytes + private String checkSum; + private String tags; + private String metadata; + private String status; // ACTIVE/ARCHIVED/DELETED/PROCESSING... + private LocalDateTime uploadTime; + private LocalDateTime lastAccessTime; + private LocalDateTime createdAt; + private LocalDateTime updatedAt; /** 标记是否为目录(非持久化字段) */ @TableField(exist = false) diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/domain/model/dataset/DatasetFileUploadCheckInfo.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/domain/model/dataset/DatasetFileUploadCheckInfo.java index edd5294..53ab49c 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/domain/model/dataset/DatasetFileUploadCheckInfo.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/domain/model/dataset/DatasetFileUploadCheckInfo.java @@ -12,13 +12,16 @@ import lombok.Setter; @Setter @NoArgsConstructor @AllArgsConstructor -public class DatasetFileUploadCheckInfo { - /** 数据集id */ - private String datasetId; - - /** 是否为压缩包上传 */ - private boolean hasArchive; - - /** 目标子目录前缀,例如 "images/",为空表示数据集根目录 */ - private String prefix; -} +public class DatasetFileUploadCheckInfo { + /** 数据集id */ + private String datasetId; + + /** 是否为压缩包上传 */ + private boolean hasArchive; + + /** 目标子目录前缀,例如 "images/",为空表示数据集根目录 */ + private String prefix; + + /** 上传临时落盘目录(仅服务端使用,不对外暴露) */ + private String stagingPath; +} diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/persistence/repository/DatasetFileRepository.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/persistence/repository/DatasetFileRepository.java index 3313a64..5d50fd6 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/persistence/repository/DatasetFileRepository.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/persistence/repository/DatasetFileRepository.java @@ -24,8 +24,19 @@ public interface DatasetFileRepository extends IRepository { List findAllByDatasetId(String datasetId); + /** + * 查询数据集内“可见文件”(默认不包含历史归档版本)。 + * 约定:status 为 NULL 视为可见;status = ARCHIVED 视为历史版本。 + */ + List findAllVisibleByDatasetId(String datasetId); + DatasetFile findByDatasetIdAndFileName(String datasetId, String fileName); + /** + * 查询指定逻辑路径的最新版本(ACTIVE/NULL)。 + */ + DatasetFile findLatestByDatasetIdAndLogicalPath(String datasetId, String logicalPath); + IPage findByCriteria(String datasetId, String fileType, String status, String name, Boolean hasAnnotation, IPage page); diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/persistence/repository/impl/DatasetFileRepositoryImpl.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/persistence/repository/impl/DatasetFileRepositoryImpl.java index d29f811..db68750 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/persistence/repository/impl/DatasetFileRepositoryImpl.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/persistence/repository/impl/DatasetFileRepositoryImpl.java @@ -25,6 +25,8 @@ public class DatasetFileRepositoryImpl extends CrudRepository findAllVisibleByDatasetId(String datasetId) { + return datasetFileMapper.selectList(new LambdaQueryWrapper() + .eq(DatasetFile::getDatasetId, datasetId) + .and(wrapper -> wrapper.isNull(DatasetFile::getStatus) + .or() + .ne(DatasetFile::getStatus, FILE_STATUS_ARCHIVED)) + .orderByDesc(DatasetFile::getUploadTime)); + } + @Override public DatasetFile findByDatasetIdAndFileName(String datasetId, String fileName) { return datasetFileMapper.findByDatasetIdAndFileName(datasetId, fileName); } + @Override + public DatasetFile findLatestByDatasetIdAndLogicalPath(String datasetId, String logicalPath) { + if (!StringUtils.hasText(datasetId) || !StringUtils.hasText(logicalPath)) { + return null; + } + return datasetFileMapper.selectOne(new LambdaQueryWrapper() + .eq(DatasetFile::getDatasetId, datasetId) + .eq(DatasetFile::getLogicalPath, logicalPath) + .and(wrapper -> wrapper.isNull(DatasetFile::getStatus) + .or() + .eq(DatasetFile::getStatus, FILE_STATUS_ACTIVE)) + .orderByDesc(DatasetFile::getVersion) + .orderByDesc(DatasetFile::getUploadTime) + .last("LIMIT 1")); + } + public IPage findByCriteria(String datasetId, String fileType, String status, String name, Boolean hasAnnotation, IPage page) { - return datasetFileMapper.selectPage(page, new LambdaQueryWrapper() - .eq(DatasetFile::getDatasetId, datasetId) - .eq(StringUtils.hasText(fileType), DatasetFile::getFileType, fileType) - .eq(StringUtils.hasText(status), DatasetFile::getStatus, status) - .like(StringUtils.hasText(name), DatasetFile::getFileName, name) - .exists(Boolean.TRUE.equals(hasAnnotation), ANNOTATION_EXISTS_SQL)); + LambdaQueryWrapper wrapper = new LambdaQueryWrapper() + .eq(DatasetFile::getDatasetId, datasetId) + .eq(StringUtils.hasText(fileType), DatasetFile::getFileType, fileType) + .like(StringUtils.hasText(name), DatasetFile::getFileName, name) + .exists(Boolean.TRUE.equals(hasAnnotation), ANNOTATION_EXISTS_SQL); + + if (StringUtils.hasText(status)) { + wrapper.eq(DatasetFile::getStatus, status); + } else { + wrapper.and(visibility -> visibility.isNull(DatasetFile::getStatus) + .or() + .ne(DatasetFile::getStatus, FILE_STATUS_ARCHIVED)); + } + + return datasetFileMapper.selectPage(page, wrapper); } @Override diff --git a/backend/services/data-management-service/src/main/resources/mappers/DatasetFileMapper.xml b/backend/services/data-management-service/src/main/resources/mappers/DatasetFileMapper.xml index 872f61a..407f8ff 100644 --- a/backend/services/data-management-service/src/main/resources/mappers/DatasetFileMapper.xml +++ b/backend/services/data-management-service/src/main/resources/mappers/DatasetFileMapper.xml @@ -3,7 +3,7 @@ "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> - id, dataset_id, file_name, file_path, file_type, file_size, check_sum, tags, metadata, status, + id, dataset_id, file_name, file_path, logical_path, version, file_type, file_size, check_sum, tags, metadata, status, upload_time, last_access_time, created_at, updated_at @@ -39,13 +39,17 @@ @@ -54,13 +58,19 @@ @@ -91,6 +101,8 @@ UPDATE t_dm_dataset_files SET file_name = #{fileName}, file_path = #{filePath}, + logical_path = #{logicalPath}, + version = #{version}, file_type = #{fileType}, file_size = #{fileSize}, upload_time = #{uploadTime}, @@ -126,6 +138,7 @@ #{datasetId} + AND (status IS NULL OR status <> 'ARCHIVED') AND (metadata IS NULL OR JSON_EXTRACT(metadata, '$.derived_from_file_id') IS NULL) GROUP BY dataset_id diff --git a/backend/services/data-management-service/src/test/java/com/datamate/datamanagement/application/DatasetFileApplicationServiceVersioningTest.java b/backend/services/data-management-service/src/test/java/com/datamate/datamanagement/application/DatasetFileApplicationServiceVersioningTest.java new file mode 100644 index 0000000..ea9287c --- /dev/null +++ b/backend/services/data-management-service/src/test/java/com/datamate/datamanagement/application/DatasetFileApplicationServiceVersioningTest.java @@ -0,0 +1,147 @@ +package com.datamate.datamanagement.application; + +import com.datamate.common.domain.service.FileService; +import com.datamate.datamanagement.domain.model.dataset.Dataset; +import com.datamate.datamanagement.domain.model.dataset.DatasetFile; +import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetFileRepository; +import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetRepository; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.MessageDigest; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class DatasetFileApplicationServiceVersioningTest { + + @TempDir + Path tempDir; + + @Mock + DatasetFileRepository datasetFileRepository; + + @Mock + DatasetRepository datasetRepository; + + @Mock + FileService fileService; + + @Mock + PdfTextExtractAsyncService pdfTextExtractAsyncService; + + @Mock + DatasetFilePreviewService datasetFilePreviewService; + + @Test + void copyFilesToDatasetDirWithSourceRoot_shouldArchiveOldFileAndCreateNewVersionWhenDuplicateLogicalPath() + throws Exception { + String datasetId = "dataset-1"; + + Path datasetRoot = tempDir.resolve("dataset-root"); + Files.createDirectories(datasetRoot); + + Path sourceRoot = tempDir.resolve("source-root"); + Files.createDirectories(sourceRoot); + + Path existingPath = datasetRoot.resolve("a.txt"); + Files.writeString(existingPath, "old-content", StandardCharsets.UTF_8); + + Path incomingPath = sourceRoot.resolve("a.txt"); + Files.writeString(incomingPath, "new-content", StandardCharsets.UTF_8); + + Dataset dataset = new Dataset(); + dataset.setId(datasetId); + dataset.setPath(datasetRoot.toString()); + + DatasetFile oldRecord = DatasetFile.builder() + .id("old-file-id") + .datasetId(datasetId) + .fileName("a.txt") + .filePath(existingPath.toString()) + .logicalPath(null) + .version(null) + .status(null) + .fileSize(Files.size(existingPath)) + .build(); + + when(datasetRepository.getById(datasetId)).thenReturn(dataset); + when(datasetFileRepository.findAllVisibleByDatasetId(datasetId)).thenReturn(List.of(oldRecord)); + when(datasetFileRepository.findLatestByDatasetIdAndLogicalPath(anyString(), anyString())).thenReturn(null); + + DatasetFileApplicationService service = new DatasetFileApplicationService( + datasetFileRepository, + datasetRepository, + fileService, + pdfTextExtractAsyncService, + datasetFilePreviewService + ); + + List copied = service.copyFilesToDatasetDirWithSourceRoot( + datasetId, + sourceRoot, + List.of(incomingPath.toString()) + ); + + assertThat(copied).hasSize(1); + assertThat(Files.readString(existingPath, StandardCharsets.UTF_8)).isEqualTo("new-content"); + + String logicalPathHash = sha256Hex("a.txt"); + Path archivedPath = datasetRoot + .resolve(".datamate") + .resolve("versions") + .resolve(logicalPathHash) + .resolve("v1") + .resolve("old-file-id__a.txt") + .toAbsolutePath() + .normalize(); + + assertThat(Files.exists(archivedPath)).isTrue(); + assertThat(Files.readString(archivedPath, StandardCharsets.UTF_8)).isEqualTo("old-content"); + + ArgumentCaptor archivedCaptor = ArgumentCaptor.forClass(DatasetFile.class); + verify(datasetFileRepository).updateById(archivedCaptor.capture()); + DatasetFile archivedRecord = archivedCaptor.getValue(); + assertThat(archivedRecord.getId()).isEqualTo("old-file-id"); + assertThat(archivedRecord.getStatus()).isEqualTo("ARCHIVED"); + assertThat(archivedRecord.getLogicalPath()).isEqualTo("a.txt"); + assertThat(archivedRecord.getVersion()).isEqualTo(1L); + assertThat(Paths.get(archivedRecord.getFilePath()).toAbsolutePath().normalize()).isEqualTo(archivedPath); + + ArgumentCaptor createdCaptor = ArgumentCaptor.forClass(DatasetFile.class); + verify(datasetFileRepository).saveOrUpdate(createdCaptor.capture()); + DatasetFile newRecord = createdCaptor.getValue(); + assertThat(newRecord.getId()).isNotEqualTo("old-file-id"); + assertThat(newRecord.getStatus()).isEqualTo("ACTIVE"); + assertThat(newRecord.getLogicalPath()).isEqualTo("a.txt"); + assertThat(newRecord.getVersion()).isEqualTo(2L); + assertThat(Paths.get(newRecord.getFilePath()).toAbsolutePath().normalize()).isEqualTo(existingPath.toAbsolutePath().normalize()); + } + + private static String sha256Hex(String value) { + try { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] hashed = digest.digest((value == null ? "" : value).getBytes(StandardCharsets.UTF_8)); + StringBuilder builder = new StringBuilder(hashed.length * 2); + for (byte b : hashed) { + builder.append(String.format("%02x", b)); + } + return builder.toString(); + } catch (Exception e) { + return Integer.toHexString((value == null ? "" : value).hashCode()); + } + } +} + diff --git a/backend/shared/domain-common/src/main/java/com/datamate/common/domain/utils/ArchiveAnalyzer.java b/backend/shared/domain-common/src/main/java/com/datamate/common/domain/utils/ArchiveAnalyzer.java index d3b6e07..3db0bc2 100644 --- a/backend/shared/domain-common/src/main/java/com/datamate/common/domain/utils/ArchiveAnalyzer.java +++ b/backend/shared/domain-common/src/main/java/com/datamate/common/domain/utils/ArchiveAnalyzer.java @@ -143,7 +143,20 @@ public class ArchiveAnalyzer { private static Optional extractEntity(ArchiveInputStream archiveInputStream, ArchiveEntry archiveEntry, Path archivePath) throws IOException { byte[] buffer = new byte[DEFAULT_BUFFER_SIZE]; - Path path = Paths.get(archivePath.getParent().toString(), archiveEntry.getName()); + Path archiveRoot = archivePath.getParent().toAbsolutePath().normalize(); + String entryName = archiveEntry.getName(); + if (entryName == null || entryName.isBlank()) { + return Optional.empty(); + } + entryName = entryName.replace("\\", "/"); + while (entryName.startsWith("/")) { + entryName = entryName.substring(1); + } + Path path = archiveRoot.resolve(entryName).normalize(); + if (!path.startsWith(archiveRoot)) { + log.warn("Skip unsafe archive entry path traversal: {}", archiveEntry.getName()); + return Optional.empty(); + } File file = path.toFile(); long fileSize = 0L; FileUtils.createParentDirectories(file); diff --git a/backend/shared/domain-common/src/main/java/com/datamate/common/domain/utils/CommonUtils.java b/backend/shared/domain-common/src/main/java/com/datamate/common/domain/utils/CommonUtils.java index 4a1fe47..bf6c4d1 100644 --- a/backend/shared/domain-common/src/main/java/com/datamate/common/domain/utils/CommonUtils.java +++ b/backend/shared/domain-common/src/main/java/com/datamate/common/domain/utils/CommonUtils.java @@ -13,7 +13,10 @@ public class CommonUtils { * @return 文件名(带后缀) */ public static String trimFilePath(String filePath) { - int lastSlashIndex = filePath.lastIndexOf(File.separator); + if (filePath == null || filePath.isBlank()) { + return ""; + } + int lastSlashIndex = Math.max(filePath.lastIndexOf('/'), filePath.lastIndexOf('\\')); String filename = filePath; if (lastSlashIndex != -1) { diff --git a/runtime/datamate-python/app/db/models/dataset_management.py b/runtime/datamate-python/app/db/models/dataset_management.py index d5e24a1..a44482e 100644 --- a/runtime/datamate-python/app/db/models/dataset_management.py +++ b/runtime/datamate-python/app/db/models/dataset_management.py @@ -61,13 +61,15 @@ class DatasetFiles(Base): dataset_id = Column(String(36), nullable=False, comment="所属数据集ID(UUID)") file_name = Column(String(255), nullable=False, comment="文件名") file_path = Column(String(1000), nullable=False, comment="文件路径") + logical_path = Column(String(1000), nullable=False, comment="文件逻辑路径(相对数据集根目录)") + version = Column(BigInteger, nullable=False, default=1, comment="文件版本号(同 logical_path 递增)") file_type = Column(String(50), nullable=True, comment="文件格式:JPG/PNG/DCM/TXT等") file_size = Column(BigInteger, default=0, comment="文件大小(字节)") check_sum = Column(String(64), nullable=True, comment="文件校验和") tags = Column(JSON, nullable=True, comment="文件标签信息") tags_updated_at = Column(TIMESTAMP, nullable=True, comment="标签最后更新时间") dataset_filemetadata = Column("metadata", JSON, nullable=True, comment="文件元数据") - status = Column(String(50), default='ACTIVE', comment="文件状态:ACTIVE/DELETED/PROCESSING") + status = Column(String(50), default='ACTIVE', comment="文件状态:ACTIVE/ARCHIVED/DELETED/PROCESSING") upload_time = Column(TIMESTAMP, server_default=func.current_timestamp(), comment="上传时间") last_access_time = Column(TIMESTAMP, nullable=True, comment="最后访问时间") created_at = Column(TIMESTAMP, server_default=func.current_timestamp(), comment="创建时间") @@ -112,4 +114,4 @@ class Tag(Base): updated_at = Column(TIMESTAMP, server_default=func.current_timestamp(), onupdate=func.current_timestamp(), comment="更新时间") def __repr__(self): - return f"" \ No newline at end of file + return f"" diff --git a/runtime/python-executor/datamate/auto_annotation_worker.py b/runtime/python-executor/datamate/auto_annotation_worker.py index 15757bf..b7fe36e 100644 --- a/runtime/python-executor/datamate/auto_annotation_worker.py +++ b/runtime/python-executor/datamate/auto_annotation_worker.py @@ -372,15 +372,15 @@ def _register_output_dataset( ) return - insert_file_sql = text( - """ - INSERT INTO t_dm_dataset_files ( - id, dataset_id, file_name, file_path, file_type, file_size, status - ) VALUES ( - :id, :dataset_id, :file_name, :file_path, :file_type, :file_size, :status - ) - """ - ) + insert_file_sql = text( + """ + INSERT INTO t_dm_dataset_files ( + id, dataset_id, file_name, file_path, logical_path, version, file_type, file_size, status + ) VALUES ( + :id, :dataset_id, :file_name, :file_path, :logical_path, :version, :file_type, :file_size, :status + ) + """ + ) update_dataset_stat_sql = text( """ UPDATE t_dm_datasets @@ -393,37 +393,43 @@ def _register_output_dataset( with SQLManager.create_connect() as conn: added_count = 0 - for file_name, file_path, file_size in image_files: - ext = os.path.splitext(file_name)[1].lstrip(".").upper() or None - conn.execute( - insert_file_sql, - { - "id": str(uuid.uuid4()), - "dataset_id": output_dataset_id, - "file_name": file_name, - "file_path": file_path, - "file_type": ext, - "file_size": int(file_size), - "status": "ACTIVE", - }, - ) - added_count += 1 - - for file_name, file_path, file_size in annotation_files: - ext = os.path.splitext(file_name)[1].lstrip(".").upper() or None - conn.execute( - insert_file_sql, - { - "id": str(uuid.uuid4()), - "dataset_id": output_dataset_id, - "file_name": file_name, - "file_path": file_path, - "file_type": ext, - "file_size": int(file_size), - "status": "ACTIVE", - }, - ) - added_count += 1 + for file_name, file_path, file_size in image_files: + ext = os.path.splitext(file_name)[1].lstrip(".").upper() or None + logical_path = os.path.relpath(file_path, output_dir).replace("\\", "/") + conn.execute( + insert_file_sql, + { + "id": str(uuid.uuid4()), + "dataset_id": output_dataset_id, + "file_name": file_name, + "file_path": file_path, + "logical_path": logical_path, + "version": 1, + "file_type": ext, + "file_size": int(file_size), + "status": "ACTIVE", + }, + ) + added_count += 1 + + for file_name, file_path, file_size in annotation_files: + ext = os.path.splitext(file_name)[1].lstrip(".").upper() or None + logical_path = os.path.relpath(file_path, output_dir).replace("\\", "/") + conn.execute( + insert_file_sql, + { + "id": str(uuid.uuid4()), + "dataset_id": output_dataset_id, + "file_name": file_name, + "file_path": file_path, + "logical_path": logical_path, + "version": 1, + "file_type": ext, + "file_size": int(file_size), + "status": "ACTIVE", + }, + ) + added_count += 1 if added_count > 0: conn.execute( diff --git a/runtime/python-executor/datamate/sql_manager/sql/sql_config.json b/runtime/python-executor/datamate/sql_manager/sql/sql_config.json index 8937b81..f6a19cf 100644 --- a/runtime/python-executor/datamate/sql_manager/sql/sql_config.json +++ b/runtime/python-executor/datamate/sql_manager/sql/sql_config.json @@ -1,9 +1,9 @@ { "query_sql": "SELECT * FROM t_task_instance_info WHERE instance_id IN (:instance_id)", "insert_sql": "INSERT INTO t_task_instance_info (instance_id, meta_file_name, meta_file_type, meta_file_id, meta_file_size, file_id, file_size, file_type, file_name, file_path, status, operator_id, error_code, incremental, child_id, slice_num) VALUES (:instance_id, :meta_file_name, :meta_file_type, :meta_file_id, :meta_file_size, :file_id, :file_size, :file_type, :file_name, :file_path, :status, :operator_id, :error_code, :incremental, :child_id, :slice_num)", - "insert_dataset_file_sql": "INSERT INTO t_dm_dataset_files (id, dataset_id, file_name, file_path, file_type, file_size, status, upload_time, last_access_time, created_at, updated_at) VALUES (:id, :dataset_id, :file_name, :file_path, :file_type, :file_size, :status, :upload_time, :last_access_time, :created_at, :updated_at)", + "insert_dataset_file_sql": "INSERT INTO t_dm_dataset_files (id, dataset_id, file_name, file_path, logical_path, version, file_type, file_size, status, upload_time, last_access_time, created_at, updated_at) VALUES (:id, :dataset_id, :file_name, :file_path, :logical_path, :version, :file_type, :file_size, :status, :upload_time, :last_access_time, :created_at, :updated_at)", "insert_clean_result_sql": "INSERT INTO t_clean_result (instance_id, src_file_id, dest_file_id, src_name, dest_name, src_type, dest_type, src_size, dest_size, status, result) VALUES (:instance_id, :src_file_id, :dest_file_id, :src_name, :dest_name, :src_type, :dest_type, :src_size, :dest_size, :status, :result)", - "query_dataset_sql": "SELECT file_size FROM t_dm_dataset_files WHERE dataset_id = :dataset_id", + "query_dataset_sql": "SELECT file_size FROM t_dm_dataset_files WHERE dataset_id = :dataset_id AND (status IS NULL OR status <> 'ARCHIVED')", "update_dataset_sql": "UPDATE t_dm_datasets SET size_bytes = :total_size, file_count = :file_count WHERE id = :dataset_id;", "update_task_sql": "UPDATE t_clean_task SET status = :status, after_size = :total_size, finished_at = :finished_time WHERE id = :task_id", "create_tables_sql": "CREATE TABLE IF NOT EXISTS t_task_instance_info (instance_id VARCHAR(255), meta_file_name TEXT, meta_file_type VARCHAR(100), meta_file_id BIGINT, meta_file_size VARCHAR(100), file_id BIGINT, file_size VARCHAR(100), file_type VARCHAR(100), file_name TEXT, file_path TEXT, status INT, operator_id VARCHAR(255), error_code VARCHAR(100), incremental VARCHAR(50), child_id BIGINT, slice_num INT DEFAULT 0);", @@ -14,4 +14,4 @@ "delete_similar_img_tables_sql": "DELETE FROM operator_similar_img_features WHERE flow_id = :flow_id", "create_similar_text_tables_sql": "CREATE TABLE IF NOT EXISTS operators_similar_text_features (id INT AUTO_INCREMENT PRIMARY KEY, task_uuid VARCHAR(255),file_feature TEXT,file_name TEXT,timestamp DATETIME);", "delete_similar_text_tables_sql": "DELETE FROM operators_similar_text_features WHERE flow_id = :flow_id" -} \ No newline at end of file +} diff --git a/scripts/db/data-management-init.sql b/scripts/db/data-management-init.sql index 6c3627d..677aa68 100644 --- a/scripts/db/data-management-init.sql +++ b/scripts/db/data-management-init.sql @@ -54,19 +54,22 @@ CREATE TABLE IF NOT EXISTS t_dm_dataset_files ( dataset_id VARCHAR(36) NOT NULL COMMENT '所属数据集ID(UUID)', file_name VARCHAR(255) NOT NULL COMMENT '文件名', file_path VARCHAR(1000) NOT NULL COMMENT '文件路径', + logical_path VARCHAR(1000) NOT NULL COMMENT '文件逻辑路径(相对数据集根目录)', + version BIGINT NOT NULL DEFAULT 1 COMMENT '文件版本号(同 logical_path 递增)', file_type VARCHAR(50) COMMENT '文件格式:JPG/PNG/DCM/TXT等', file_size BIGINT DEFAULT 0 COMMENT '文件大小(字节)', check_sum VARCHAR(64) COMMENT '文件校验和', tags JSON COMMENT '文件标签信息', tags_updated_at TIMESTAMP NULL COMMENT '标签最后更新时间', metadata JSON COMMENT '文件元数据', - status VARCHAR(50) DEFAULT 'ACTIVE' COMMENT '文件状态:ACTIVE/DELETED/PROCESSING', + status VARCHAR(50) DEFAULT 'ACTIVE' COMMENT '文件状态:ACTIVE/ARCHIVED/DELETED/PROCESSING', upload_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '上传时间', last_access_time TIMESTAMP NULL COMMENT '最后访问时间', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', FOREIGN KEY (dataset_id) REFERENCES t_dm_datasets(id) ON DELETE CASCADE, INDEX idx_dm_dataset (dataset_id), + INDEX idx_dm_dataset_logical_path (dataset_id, logical_path, version), INDEX idx_dm_file_type (file_type), INDEX idx_dm_file_status (status), INDEX idx_dm_upload_time (upload_time)