diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java index a5b5b35..aa4004f 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java @@ -1,1512 +1,1515 @@ -package com.datamate.datamanagement.application; - -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import com.datamate.common.domain.model.ChunkUploadPreRequest; -import com.datamate.common.domain.model.FileUploadResult; -import com.datamate.common.domain.service.FileService; -import com.datamate.common.domain.utils.AnalyzerUtils; -import com.datamate.common.domain.utils.ArchiveAnalyzer; -import com.datamate.common.infrastructure.exception.BusinessAssert; -import com.datamate.common.infrastructure.exception.BusinessException; -import com.datamate.common.infrastructure.exception.CommonErrorCode; -import com.datamate.common.infrastructure.exception.SystemErrorCode; -import com.datamate.common.interfaces.PagedResponse; -import com.datamate.common.interfaces.PagingQuery; -import com.datamate.datamanagement.common.enums.DuplicateMethod; -import com.datamate.datamanagement.common.enums.DatasetType; -import com.datamate.datamanagement.domain.contants.DatasetConstant; -import com.datamate.datamanagement.domain.model.dataset.Dataset; -import com.datamate.datamanagement.domain.model.dataset.DatasetFile; -import com.datamate.datamanagement.domain.model.dataset.DatasetFileUploadCheckInfo; -import com.datamate.datamanagement.infrastructure.exception.DataManagementErrorCode; -import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetFileRepository; -import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetRepository; -import com.datamate.datamanagement.interfaces.converter.DatasetConverter; -import com.datamate.datamanagement.interfaces.dto.AddFilesRequest; -import com.datamate.datamanagement.interfaces.dto.CopyFilesRequest; -import com.datamate.datamanagement.interfaces.dto.CreateDirectoryRequest; -import com.datamate.datamanagement.interfaces.dto.UploadFileRequest; -import com.datamate.datamanagement.interfaces.dto.UploadFilesPreRequest; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import jakarta.servlet.http.HttpServletResponse; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.compress.archivers.zip.ZipArchiveEntry; -import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream; -import org.apache.commons.io.IOUtils; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.core.io.Resource; -import org.springframework.core.io.UrlResource; -import org.springframework.http.HttpHeaders; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; -import org.springframework.transaction.support.TransactionSynchronization; -import org.springframework.transaction.support.TransactionSynchronizationManager; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.net.MalformedURLException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.attribute.BasicFileAttributes; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; -import java.util.*; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** - * 数据集文件应用服务 - */ -@Slf4j -@Service -@Transactional -public class DatasetFileApplicationService { - private static final String PDF_FILE_TYPE = "pdf"; - private static final String DOC_FILE_TYPE = "doc"; - private static final String DOCX_FILE_TYPE = "docx"; - private static final String XLS_FILE_TYPE = "xls"; - private static final String XLSX_FILE_TYPE = "xlsx"; - private static final Set DOCUMENT_TEXT_FILE_TYPES = Set.of( - PDF_FILE_TYPE, - DOC_FILE_TYPE, - DOCX_FILE_TYPE, - XLS_FILE_TYPE, - XLSX_FILE_TYPE - ); - private static final String DERIVED_METADATA_KEY = "derived_from_file_id"; - private static final String FILE_STATUS_ACTIVE = "ACTIVE"; - private static final String FILE_STATUS_ARCHIVED = "ARCHIVED"; - private static final String INTERNAL_DIR_NAME = ".datamate"; - private static final String INTERNAL_UPLOAD_DIR_NAME = "uploading"; - private static final String INTERNAL_VERSIONS_DIR_NAME = "versions"; - - private final DatasetFileRepository datasetFileRepository; - private final DatasetRepository datasetRepository; - private final FileService fileService; - private final PdfTextExtractAsyncService pdfTextExtractAsyncService; - private final DatasetFilePreviewService datasetFilePreviewService; - - @Value("${datamate.data-management.base-path:/dataset}") - private String datasetBasePath; - - @Value("${datamate.data-management.file.duplicate:VERSION}") - private DuplicateMethod duplicateMethod; - - @Autowired - public DatasetFileApplicationService(DatasetFileRepository datasetFileRepository, - DatasetRepository datasetRepository, - FileService fileService, - PdfTextExtractAsyncService pdfTextExtractAsyncService, - DatasetFilePreviewService datasetFilePreviewService) { - this.datasetFileRepository = datasetFileRepository; - this.datasetRepository = datasetRepository; - this.fileService = fileService; - this.pdfTextExtractAsyncService = pdfTextExtractAsyncService; - this.datasetFilePreviewService = datasetFilePreviewService; - } - - /** - * 获取数据集文件列表 - */ - @Transactional(readOnly = true) - public PagedResponse getDatasetFiles(String datasetId, String fileType, String status, String name, - Boolean hasAnnotation, PagingQuery pagingQuery) { - return getDatasetFiles(datasetId, fileType, status, name, hasAnnotation, false, pagingQuery); - } - - /** - * 获取数据集文件列表,支持排除已被转换为TXT的源文档文件 - * - * @param datasetId 数据集ID - * @param fileType 文件类型过滤 - * @param status 状态过滤 - * @param name 文件名模糊查询 - * @param hasAnnotation 是否有标注 - * @param excludeSourceDocuments 是否排除源文档(PDF/DOC/DOCX/XLS/XLSX) - * @param pagingQuery 分页参数 - * @return 分页文件列表 - */ - @Transactional(readOnly = true) - public PagedResponse getDatasetFiles(String datasetId, String fileType, String status, String name, - Boolean hasAnnotation, boolean excludeSourceDocuments, PagingQuery pagingQuery) { - IPage page = new Page<>(pagingQuery.getPage(), pagingQuery.getSize()); - IPage files = datasetFileRepository.findByCriteria(datasetId, fileType, status, name, hasAnnotation, page); - - if (excludeSourceDocuments) { - // 过滤掉源文档文件(PDF/DOC/DOCX/XLS/XLSX),用于标注场景只展示派生文件 - List filteredRecords = files.getRecords().stream() - .filter(file -> !isSourceDocument(file)) - .collect(Collectors.toList()); - - // 重新构建分页结果 - Page filteredPage = new Page<>(files.getCurrent(), files.getSize(), files.getTotal()); - filteredPage.setRecords(filteredRecords); - return PagedResponse.of(filteredPage); - } - - return PagedResponse.of(files); - } - - /** - * 获取数据集文件列表 - */ - @Transactional(readOnly = true) - public PagedResponse getDatasetFilesWithDirectory(String datasetId, String prefix, boolean excludeDerivedFiles, PagingQuery pagingQuery) { - Dataset dataset = datasetRepository.getById(datasetId); - int page = Math.max(pagingQuery.getPage(), 1); - int size = pagingQuery.getSize() == null || pagingQuery.getSize() < 0 ? 20 : pagingQuery.getSize(); - if (dataset == null) { - return PagedResponse.of(new Page<>(page, size)); - } - Path datasetRoot = Paths.get(dataset.getPath()).toAbsolutePath().normalize(); - prefix = Optional.ofNullable(prefix).orElse("").trim().replace("\\", "/"); - while (prefix.startsWith("/")) { - prefix = prefix.substring(1); - } - if (prefix.equals(INTERNAL_DIR_NAME) || prefix.startsWith(INTERNAL_DIR_NAME + "/")) { - return new PagedResponse<>(page, size, 0, 0, Collections.emptyList()); - } - Path queryPath = datasetRoot.resolve(prefix.replace("/", File.separator)).normalize(); - if (!queryPath.startsWith(datasetRoot)) { - return new PagedResponse<>(page, size, 0, 0, Collections.emptyList()); - } - Map datasetFilesMap = datasetFileRepository.findAllVisibleByDatasetId(datasetId) - .stream() - .filter(file -> file.getFilePath() != null) - .collect(Collectors.toMap( - file -> normalizeFilePath(file.getFilePath()), - Function.identity(), - (left, right) -> left - )); - Set derivedFilePaths = excludeDerivedFiles - ? datasetFilesMap.values().stream() - .filter(this::isDerivedFile) - .map(DatasetFile::getFilePath) - .map(this::normalizeFilePath) - .filter(Objects::nonNull) - .collect(Collectors.toSet()) - : Collections.emptySet(); - // 如果目录不存在,直接返回空结果(数据集刚创建时目录可能还未生成) - if (!Files.exists(queryPath)) { - return new PagedResponse<>(page, size, 0, 0, Collections.emptyList()); - } - try (Stream pathStream = Files.list(queryPath)) { - List allFiles = pathStream - .filter(path -> path.toAbsolutePath().normalize().startsWith(datasetRoot)) - .filter(path -> !isInternalDatasetPath(datasetRoot, path)) - .filter(path -> !excludeDerivedFiles - || Files.isDirectory(path) - || !derivedFilePaths.contains(normalizeFilePath(path.toString()))) - .sorted(Comparator - .comparing((Path path) -> !Files.isDirectory(path)) - .thenComparing(path -> path.getFileName().toString())) - .collect(Collectors.toList()); - - // 计算分页 - int total = allFiles.size(); - int totalPages = (int) Math.ceil((double) total / size); - - // 获取当前页数据 - int fromIndex = (page - 1) * size; - fromIndex = Math.max(fromIndex, 0); - int toIndex = Math.min(fromIndex + size, total); - - List pageData = new ArrayList<>(); - if (fromIndex < total) { - pageData = allFiles.subList(fromIndex, toIndex); - } - List datasetFiles = pageData.stream() - .map(path -> getDatasetFile(path, datasetFilesMap, excludeDerivedFiles, derivedFilePaths)) - .toList(); - - return new PagedResponse<>(page, size, total, totalPages, datasetFiles); - } catch (IOException e) { - log.error("list dataset path error", e); - return PagedResponse.of(new Page<>(page, size)); - } - } - - private DatasetFile getDatasetFile(Path path, - Map datasetFilesMap, - boolean excludeDerivedFiles, - Set derivedFilePaths) { - DatasetFile datasetFile = new DatasetFile(); - LocalDateTime localDateTime = LocalDateTime.now(); - try { - localDateTime = Files.getLastModifiedTime(path).toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime(); - } catch (IOException e) { - log.error("get last modified time error", e); - } - datasetFile.setFileName(path.getFileName().toString()); - datasetFile.setUploadTime(localDateTime); - - // 目录与普通文件区分处理 - if (Files.isDirectory(path)) { - datasetFile.setId("directory-" + datasetFile.getFileName()); - datasetFile.setDirectory(true); - - // 统计目录下文件数量和总大小 - try { - long fileCount; - long totalSize; - - try (Stream walk = Files.walk(path)) { - Stream fileStream = walk.filter(Files::isRegularFile); - if (excludeDerivedFiles && !derivedFilePaths.isEmpty()) { - fileStream = fileStream.filter(filePath -> - !derivedFilePaths.contains(normalizeFilePath(filePath.toString()))); - } - fileCount = fileStream.count(); - } - - try (Stream walk = Files.walk(path)) { - Stream fileStream = walk.filter(Files::isRegularFile); - if (excludeDerivedFiles && !derivedFilePaths.isEmpty()) { - fileStream = fileStream.filter(filePath -> - !derivedFilePaths.contains(normalizeFilePath(filePath.toString()))); - } - totalSize = fileStream - .mapToLong(p -> { - try { - return Files.size(p); - } catch (IOException e) { - log.error("get file size error", e); - return 0L; - } - }) - .sum(); - } - - datasetFile.setFileCount(fileCount); - datasetFile.setFileSize(totalSize); - } catch (IOException e) { - log.error("stat directory info error", e); - } - } else { - DatasetFile exist = datasetFilesMap.get(normalizeFilePath(path.toString())); - if (exist == null) { - datasetFile.setId("file-" + datasetFile.getFileName()); - datasetFile.setFileSize(path.toFile().length()); - } else { - datasetFile = exist; - } - } - return datasetFile; - } - - private String normalizeFilePath(String filePath) { - if (filePath == null || filePath.isBlank()) { - return null; - } - try { - return Paths.get(filePath).toAbsolutePath().normalize().toString(); - } catch (Exception e) { - return filePath.replace("\\", "/"); - } - } - - private boolean isSameNormalizedPath(String left, String right) { - String normalizedLeft = normalizeFilePath(left); - String normalizedRight = normalizeFilePath(right); - if (normalizedLeft == null || normalizedRight == null) { - return false; - } - return normalizedLeft.equals(normalizedRight); - } - - private boolean isInternalDatasetPath(Path datasetRoot, Path path) { - if (datasetRoot == null || path == null) { - return false; - } - try { - Path normalizedRoot = datasetRoot.toAbsolutePath().normalize(); - Path normalizedPath = path.toAbsolutePath().normalize(); - if (!normalizedPath.startsWith(normalizedRoot)) { - return false; - } - Path relative = normalizedRoot.relativize(normalizedPath); - if (relative.getNameCount() == 0) { - return false; - } - return INTERNAL_DIR_NAME.equals(relative.getName(0).toString()); - } catch (Exception e) { - return false; - } - } - - private String normalizeLogicalPrefix(String prefix) { - if (prefix == null) { - return ""; - } - String normalized = prefix.trim().replace("\\", "/"); - while (normalized.startsWith("/")) { - normalized = normalized.substring(1); - } - while (normalized.endsWith("/")) { - normalized = normalized.substring(0, normalized.length() - 1); - } - while (normalized.contains("//")) { - normalized = normalized.replace("//", "/"); - } - return normalized; - } - - private String normalizeLogicalPath(String logicalPath) { - return normalizeLogicalPrefix(logicalPath); - } - - private String joinLogicalPath(String prefix, String relativePath) { - String normalizedPrefix = normalizeLogicalPrefix(prefix); - String normalizedRelative = normalizeLogicalPath(relativePath); - if (normalizedPrefix.isEmpty()) { - return normalizedRelative; - } - if (normalizedRelative.isEmpty()) { - return normalizedPrefix; - } - return normalizeLogicalPath(normalizedPrefix + "/" + normalizedRelative); - } - - private void assertNotInternalPrefix(String prefix) { - if (prefix == null || prefix.isBlank()) { - return; - } - String normalized = normalizeLogicalPrefix(prefix); - if (normalized.equals(INTERNAL_DIR_NAME) || normalized.startsWith(INTERNAL_DIR_NAME + "/")) { - throw BusinessException.of(CommonErrorCode.PARAM_ERROR); - } - } - - private boolean isArchivedStatus(DatasetFile datasetFile) { - if (datasetFile == null) { - return false; - } - String status = datasetFile.getStatus(); - return status != null && FILE_STATUS_ARCHIVED.equalsIgnoreCase(status); - } - - private boolean isSourceDocument(DatasetFile datasetFile) { - if (datasetFile == null) { - return false; - } - String fileType = datasetFile.getFileType(); - if (fileType == null || fileType.isBlank()) { - return false; - } - return DOCUMENT_TEXT_FILE_TYPES.contains(fileType.toLowerCase(Locale.ROOT)); - } - - private boolean isDerivedFile(DatasetFile datasetFile) { - if (datasetFile == null) { - return false; - } - String metadata = datasetFile.getMetadata(); - if (metadata == null || metadata.isBlank()) { - return false; - } - try { - ObjectMapper mapper = new ObjectMapper(); - Map metadataMap = mapper.readValue(metadata, new TypeReference>() {}); - return metadataMap.get(DERIVED_METADATA_KEY) != null; - } catch (Exception e) { - log.debug("Failed to parse dataset file metadata for derived detection: {}", datasetFile.getId(), e); - return false; - } - } - - private Path resolveDatasetRootPath(Dataset dataset, String datasetId) { - String datasetPath = dataset == null ? null : dataset.getPath(); - if (datasetPath == null || datasetPath.isBlank()) { - datasetPath = datasetBasePath + File.separator + datasetId; - if (dataset != null) { - dataset.setPath(datasetPath); - datasetRepository.updateById(dataset); - } - } - Path datasetRoot = Paths.get(datasetPath).toAbsolutePath().normalize(); - try { - Files.createDirectories(datasetRoot); - } catch (IOException e) { - log.error("Failed to create dataset root dir: {}", datasetRoot, e); - throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); - } - return datasetRoot; - } - - private Path resolveStagingRootPath(Path datasetRoot, - DatasetFileUploadCheckInfo checkInfo, - List uploadedFiles) { - if (datasetRoot == null) { - return null; - } - String stagingPath = checkInfo == null ? null : checkInfo.getStagingPath(); - if (stagingPath != null && !stagingPath.isBlank()) { - try { - Path stagingRoot = Paths.get(stagingPath).toAbsolutePath().normalize(); - if (!stagingRoot.startsWith(datasetRoot)) { - log.warn("Staging root out of dataset root, datasetId={}, stagingRoot={}, datasetRoot={}", - checkInfo == null ? null : checkInfo.getDatasetId(), stagingRoot, datasetRoot); - return null; - } - Path relative = datasetRoot.relativize(stagingRoot); - if (relative.getNameCount() < 3) { - return null; - } - if (!INTERNAL_DIR_NAME.equals(relative.getName(0).toString()) - || !INTERNAL_UPLOAD_DIR_NAME.equals(relative.getName(1).toString())) { - return null; - } - return stagingRoot; - } catch (Exception e) { - log.warn("Invalid staging path: {}", stagingPath, e); - return null; - } - } - if (uploadedFiles == null || uploadedFiles.isEmpty()) { - return null; - } - FileUploadResult firstResult = uploadedFiles.get(0); - File firstFile = firstResult == null ? null : firstResult.getSavedFile(); - if (firstFile == null) { - return null; - } - try { - return Paths.get(firstFile.getParent()).toAbsolutePath().normalize(); - } catch (Exception e) { - return null; - } - } - - private void scheduleCleanupStagingDirAfterCommit(Path stagingRoot) { - if (stagingRoot == null) { - return; - } - Runnable cleanup = () -> deleteDirectoryRecursivelyQuietly(stagingRoot); - if (TransactionSynchronizationManager.isSynchronizationActive()) { - TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { - @Override - public void afterCommit() { - cleanup.run(); - } - }); - return; - } - cleanup.run(); - } - - private void deleteDirectoryRecursivelyQuietly(Path directory) { - if (directory == null) { - return; - } - if (!Files.exists(directory)) { - return; - } - try (Stream paths = Files.walk(directory)) { - paths.sorted(Comparator.reverseOrder()).forEach(path -> { - try { - Files.deleteIfExists(path); - } catch (IOException e) { - log.debug("Failed to delete: {}", path, e); - } - }); - } catch (IOException e) { - log.debug("Failed to cleanup staging dir: {}", directory, e); - } - } - - private String sanitizeArchiveFileName(String fileName) { - String input = fileName == null ? "" : fileName.trim(); - if (input.isBlank()) { - return "file"; - } - StringBuilder builder = new StringBuilder(input.length()); - for (int i = 0; i < input.length(); i++) { - char c = input.charAt(i); - if (c <= 31 || c == 127) { - builder.append('_'); - continue; - } - if (c == '/' || c == '\\' || c == ':' || c == '*' || c == '?' || c == '\"' - || c == '<' || c == '>' || c == '|') { - builder.append('_'); - continue; - } - builder.append(c); - } - String sanitized = builder.toString().trim(); - return sanitized.isEmpty() ? "file" : sanitized; - } - - private String sha256Hex(String value) { - String input = value == null ? "" : value; - try { - java.security.MessageDigest digest = java.security.MessageDigest.getInstance("SHA-256"); - byte[] hashed = digest.digest(input.getBytes(java.nio.charset.StandardCharsets.UTF_8)); - StringBuilder builder = new StringBuilder(hashed.length * 2); - for (byte b : hashed) { - builder.append(String.format("%02x", b)); - } - return builder.toString(); - } catch (Exception e) { - return Integer.toHexString(input.hashCode()); - } - } - - /** - * 获取文件详情 - */ - @Transactional(readOnly = true) - public DatasetFile getDatasetFile(String datasetId, String fileId) { - DatasetFile file = datasetFileRepository.getById(fileId); - if (file == null) { - throw new IllegalArgumentException("File not found: " + fileId); - } - if (!file.getDatasetId().equals(datasetId)) { - throw new IllegalArgumentException("File does not belong to the specified dataset"); - } - return file; - } - - /** - * 删除文件 - */ - @Transactional - public void deleteDatasetFile(String datasetId, String fileId) { - DatasetFile file = getDatasetFile(datasetId, fileId); - if (file == null) { - log.warn("File not found: datasetId={}, fileId={}", datasetId, fileId); - return; - } - String logicalPath = file.getLogicalPath(); - - // 如果 logicalPath 为 null,直接删除当前文件(兼容旧数据) - if (logicalPath == null) { - deleteDatasetFileInternal(datasetId, file); - return; - } - - List allVersions = datasetFileRepository.findAllByDatasetIdAndLogicalPath(datasetId, logicalPath); - - for (DatasetFile versionFile : allVersions) { - deleteDatasetFileInternal(datasetId, versionFile); - } - } - - private void deleteDatasetFileInternal(String datasetId, DatasetFile file) { - Dataset dataset = datasetRepository.getById(datasetId); - if (file == null || dataset == null) { - return; - } - - if (isSourceDocument(file)) { - deleteDerivedTextFileQuietly(datasetId, file.getId()); - } - - try { - datasetFileRepository.removeById(file.getId()); - } catch (Exception e) { - log.error("Failed to delete file record from database: fileId={}", file.getId(), e); - // 数据库删除失败时,跳过后续清理以避免数据不一致 - return; - } - - if (!isArchivedStatus(file)) { - try { - dataset.setFiles(new ArrayList<>(Collections.singleton(file))); - dataset.removeFile(file); - datasetRepository.updateById(dataset); - } catch (Exception e) { - log.error("Failed to update dataset: datasetId={}", datasetId, e); - } - } - - datasetFilePreviewService.deletePreviewFileQuietly(datasetId, file.getId()); - - if (file.getFilePath() != null && file.getFilePath().startsWith(dataset.getPath())) { - try { - Path filePath = Paths.get(file.getFilePath()); - Files.deleteIfExists(filePath); - } catch (IOException ex) { - log.error("Failed to delete physical file: filePath={}", file.getFilePath(), ex); - } - } - } - - private void deleteDerivedTextFileQuietly(String datasetId, String sourceFileId) { - if (sourceFileId == null || sourceFileId.isBlank()) { - return; - } - - try { - List derivedFiles = datasetFileRepository.findAllByDatasetId(datasetId).stream() - .filter(f -> isDerivedFileFromSource(f, sourceFileId)) - .toList(); - - for (DatasetFile derivedFile : derivedFiles) { - deleteDatasetFileInternal(datasetId, derivedFile); - } - } catch (Exception e) { - log.error("Failed to delete derived text files for sourceFileId: {}", sourceFileId, e); - } - } - - private boolean isDerivedFileFromSource(DatasetFile file, String sourceFileId) { - if (file == null || file.getMetadata() == null || file.getMetadata().isBlank()) { - return false; - } - try { - ObjectMapper mapper = new ObjectMapper(); - Map metadataMap = mapper.readValue(file.getMetadata(), new TypeReference>() {}); - Object derivedFromFileId = metadataMap.get(DERIVED_METADATA_KEY); - return derivedFromFileId != null && sourceFileId.equals(String.valueOf(derivedFromFileId)); - } catch (Exception e) { - log.debug("Failed to parse metadata for derived detection: fileId={}", file.getId(), e); - return false; - } - } - - /** - * 下载文件 - */ - @Transactional(readOnly = true) - public Resource downloadFile(String datasetId, String fileId) { - DatasetFile file = getDatasetFile(datasetId, fileId); - try { - Path filePath = Paths.get(file.getFilePath()).normalize(); - Resource resource = new UrlResource(filePath.toUri()); - if (resource.exists()) { - return resource; - } else { - throw new RuntimeException("File not found: " + file.getFileName()); - } - } catch (MalformedURLException ex) { - throw new RuntimeException("File not found: " + file.getFileName(), ex); - } - } - - /** - * 下载文件 - */ - @Transactional(readOnly = true) - public void downloadDatasetFileAsZip(String datasetId, HttpServletResponse response) { - Dataset dataset = datasetRepository.getById(datasetId); - if (Objects.isNull(dataset)) { - throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND); - } - Path datasetRoot = Paths.get(dataset.getPath()).toAbsolutePath().normalize(); - Set filePaths = datasetFileRepository.findAllVisibleByDatasetId(datasetId).stream() - .map(DatasetFile::getFilePath) - .filter(Objects::nonNull) - .map(path -> Paths.get(path).toAbsolutePath().normalize()) - .filter(path -> path.startsWith(datasetRoot)) - .filter(path -> !isInternalDatasetPath(datasetRoot, path)) - .collect(Collectors.toSet()); - Path downloadPath = datasetRoot; - response.setContentType("application/zip"); - String zipName = String.format("dataset_%s.zip", - LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"))); - response.setHeader(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=" + zipName); - try (ZipArchiveOutputStream zos = new ZipArchiveOutputStream(response.getOutputStream())) { - try (Stream pathStream = Files.walk(downloadPath)) { - List allPaths = pathStream - .map(path -> path.toAbsolutePath().normalize()) - .filter(path -> path.startsWith(datasetRoot)) - .filter(path -> !isInternalDatasetPath(datasetRoot, path)) - .filter(path -> filePaths.stream().anyMatch(filePath -> filePath.startsWith(path))) - .toList(); - for (Path path : allPaths) { - addToZipFile(path, downloadPath, zos); - } - } - } catch (IOException e) { - log.error("Failed to download files in batches.", e); - throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); - } - } - - private void addToZipFile(Path path, Path basePath, ZipArchiveOutputStream zos) throws IOException { - String entryName = basePath.relativize(path) - .toString() - .replace(File.separator, "/"); - - // 处理目录 - if (Files.isDirectory(path)) { - if (!entryName.isEmpty()) { - entryName += "/"; - ZipArchiveEntry dirEntry = new ZipArchiveEntry(entryName); - zos.putArchiveEntry(dirEntry); - zos.closeArchiveEntry(); - } - } else { - // 处理文件 - ZipArchiveEntry fileEntry = new ZipArchiveEntry(path.toFile(), entryName); - - // 设置更多属性 - BasicFileAttributes attrs = Files.readAttributes(path, BasicFileAttributes.class); - fileEntry.setSize(attrs.size()); - fileEntry.setLastModifiedTime(attrs.lastModifiedTime()); - - zos.putArchiveEntry(fileEntry); - - try (InputStream is = Files.newInputStream(path)) { - IOUtils.copy(is, zos); - } - zos.closeArchiveEntry(); - } - } - - /** - * 预上传 - * - * @param chunkUploadRequest 上传请求 - * @param datasetId 数据集id - * @return 请求id - */ - @Transactional - public String preUpload(UploadFilesPreRequest chunkUploadRequest, String datasetId) { - Dataset dataset = datasetRepository.getById(datasetId); - if (Objects.isNull(dataset)) { - throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND); - } - - String prefix = normalizeLogicalPrefix(chunkUploadRequest == null ? null : chunkUploadRequest.getPrefix()); - assertNotInternalPrefix(prefix); - - Path datasetRoot = resolveDatasetRootPath(dataset, datasetId); - Path stagingRoot = datasetRoot - .resolve(INTERNAL_DIR_NAME) - .resolve(INTERNAL_UPLOAD_DIR_NAME) - .resolve(UUID.randomUUID().toString()) - .toAbsolutePath() - .normalize(); - BusinessAssert.isTrue(stagingRoot.startsWith(datasetRoot), CommonErrorCode.PARAM_ERROR); - try { - Files.createDirectories(stagingRoot); - } catch (IOException e) { - log.error("Failed to create staging dir: {}", stagingRoot, e); - throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); - } - - ChunkUploadPreRequest request = ChunkUploadPreRequest.builder().build(); - request.setUploadPath(stagingRoot.toString()); - request.setTotalFileNum(chunkUploadRequest.getTotalFileNum()); - request.setServiceId(DatasetConstant.SERVICE_ID); - DatasetFileUploadCheckInfo checkInfo = new DatasetFileUploadCheckInfo(); - checkInfo.setDatasetId(datasetId); - checkInfo.setHasArchive(chunkUploadRequest.isHasArchive()); - checkInfo.setPrefix(prefix); - checkInfo.setStagingPath(stagingRoot.toString()); - try { - ObjectMapper objectMapper = new ObjectMapper(); - String checkInfoJson = objectMapper.writeValueAsString(checkInfo); - request.setCheckInfo(checkInfoJson); - } catch (JsonProcessingException e) { - log.warn("Failed to serialize checkInfo to JSON", e); - } - return fileService.preUpload(request); - } - - /** - * 切片上传 - * - * @param uploadFileRequest 上传请求 - */ - @Transactional - public void chunkUpload(String datasetId, UploadFileRequest uploadFileRequest) { - FileUploadResult uploadResult = fileService.chunkUpload(DatasetConverter.INSTANCE.toChunkUploadRequest(uploadFileRequest)); - saveFileInfoToDb(uploadResult, datasetId); - } - - /** - * 取消上传 - */ - @Transactional - public void cancelUpload(String reqId) { - fileService.cancelUpload(reqId); - } - - private void saveFileInfoToDb(FileUploadResult fileUploadResult, String datasetId) { - if (Objects.isNull(fileUploadResult.getSavedFile())) { - // 文件切片上传没有完成 - return; - } - DatasetFileUploadCheckInfo checkInfo; - try { - ObjectMapper objectMapper = new ObjectMapper(); - checkInfo = objectMapper.readValue(fileUploadResult.getCheckInfo(), DatasetFileUploadCheckInfo.class); - if (!Objects.equals(checkInfo.getDatasetId(), datasetId)) { - throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND); - } - } catch (IllegalArgumentException | JsonProcessingException e) { - log.warn("Failed to convert checkInfo to DatasetFileUploadCheckInfo", e); - throw BusinessException.of(CommonErrorCode.PRE_UPLOAD_REQUEST_NOT_EXIST); - } - List files; - if (checkInfo.isHasArchive() && AnalyzerUtils.isPackage(fileUploadResult.getSavedFile().getPath())) { - files = ArchiveAnalyzer.process(fileUploadResult); - } else { - files = Collections.singletonList(fileUploadResult); - } - commitUploadedFiles(datasetId, checkInfo, files, fileUploadResult.isAllFilesUploaded()); - } - - private void commitUploadedFiles(String datasetId, - DatasetFileUploadCheckInfo checkInfo, - List uploadedFiles, - boolean cleanupStagingAfterCommit) { - Dataset dataset = datasetRepository.getById(datasetId); - BusinessAssert.notNull(dataset, DataManagementErrorCode.DATASET_NOT_FOUND); - - Path datasetRoot = resolveDatasetRootPath(dataset, datasetId); - String prefix = checkInfo == null ? "" : normalizeLogicalPrefix(checkInfo.getPrefix()); - assertNotInternalPrefix(prefix); - - Path stagingRoot = resolveStagingRootPath(datasetRoot, checkInfo, uploadedFiles); - BusinessAssert.notNull(stagingRoot, CommonErrorCode.PARAM_ERROR); - - dataset.setFiles(new ArrayList<>(datasetFileRepository.findAllVisibleByDatasetId(datasetId))); - for (FileUploadResult fileResult : uploadedFiles) { - commitSingleUploadedFile(dataset, datasetRoot, stagingRoot, prefix, fileResult); - } - - dataset.active(); - datasetRepository.updateById(dataset); - - if (cleanupStagingAfterCommit) { - scheduleCleanupStagingDirAfterCommit(stagingRoot); - } - } - - private void commitSingleUploadedFile(Dataset dataset, - Path datasetRoot, - Path stagingRoot, - String prefix, - FileUploadResult fileResult) { - if (dataset == null || fileResult == null || fileResult.getSavedFile() == null) { - return; - } - Path incomingPath = Paths.get(fileResult.getSavedFile().getPath()).toAbsolutePath().normalize(); - BusinessAssert.isTrue(incomingPath.startsWith(stagingRoot), CommonErrorCode.PARAM_ERROR); - - String relativePath = stagingRoot.relativize(incomingPath).toString().replace(File.separator, "/"); - String logicalPath = joinLogicalPath(prefix, relativePath); - assertNotInternalPrefix(logicalPath); - - commitNewFileVersion(dataset, datasetRoot, logicalPath, incomingPath, true); - } - - private DatasetFile commitNewFileVersion(Dataset dataset, - Path datasetRoot, - String logicalPath, - Path incomingFilePath, - boolean moveIncoming) { - BusinessAssert.notNull(dataset, CommonErrorCode.PARAM_ERROR); - BusinessAssert.isTrue(datasetRoot != null && Files.exists(datasetRoot), CommonErrorCode.PARAM_ERROR); - - String normalizedLogicalPath = normalizeLogicalPath(logicalPath); - BusinessAssert.isTrue(!normalizedLogicalPath.isEmpty(), CommonErrorCode.PARAM_ERROR); - assertNotInternalPrefix(normalizedLogicalPath); - - Path targetFilePath = datasetRoot.resolve(normalizedLogicalPath.replace("/", File.separator)) - .toAbsolutePath() - .normalize(); - BusinessAssert.isTrue(targetFilePath.startsWith(datasetRoot), CommonErrorCode.PARAM_ERROR); - - DuplicateMethod effectiveDuplicateMethod = resolveEffectiveDuplicateMethod(); - DatasetFile latest = datasetFileRepository.findLatestByDatasetIdAndLogicalPath(dataset.getId(), normalizedLogicalPath); - if (latest == null && dataset.getFiles() != null) { - latest = dataset.getFiles().stream() - .filter(existing -> isSameNormalizedPath(existing == null ? null : existing.getFilePath(), targetFilePath.toString())) - .findFirst() - .orElse(null); - } - if (latest != null && effectiveDuplicateMethod == DuplicateMethod.ERROR) { - throw BusinessException.of(DataManagementErrorCode.DATASET_FILE_ALREADY_EXISTS); - } - - long nextVersion = 1L; - if (latest != null) { - long latestVersion = Optional.ofNullable(latest.getVersion()).orElse(1L); - if (latest.getVersion() == null) { - latest.setVersion(latestVersion); - } - if (latest.getLogicalPath() == null || latest.getLogicalPath().isBlank()) { - latest.setLogicalPath(normalizedLogicalPath); - } - nextVersion = latestVersion + 1L; - } - - if (latest != null && effectiveDuplicateMethod == DuplicateMethod.VERSION) { - Path archivedPath = archiveDatasetFileVersion(datasetRoot, normalizedLogicalPath, latest); - if (archivedPath != null) { - latest.setFilePath(archivedPath.toString()); - } else if (Files.exists(targetFilePath)) { - log.error("Failed to archive latest file, refuse to overwrite. datasetId={}, fileId={}, logicalPath={}, targetPath={}", - dataset.getId(), latest.getId(), normalizedLogicalPath, targetFilePath); - throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); - } - latest.setStatus(FILE_STATUS_ARCHIVED); - datasetFileRepository.updateById(latest); - dataset.removeFile(latest); - } else if (latest == null && Files.exists(targetFilePath)) { - archiveOrphanTargetFile(datasetRoot, normalizedLogicalPath, targetFilePath); - } - - try { - Files.createDirectories(targetFilePath.getParent()); - if (moveIncoming) { - Files.move(incomingFilePath, targetFilePath, java.nio.file.StandardCopyOption.REPLACE_EXISTING); - } else { - Files.copy(incomingFilePath, targetFilePath, java.nio.file.StandardCopyOption.REPLACE_EXISTING); - } - } catch (IOException e) { - log.error("Failed to write dataset file, datasetId={}, logicalPath={}, targetPath={}", - dataset.getId(), normalizedLogicalPath, targetFilePath, e); - throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); - } - - LocalDateTime currentTime = LocalDateTime.now(); - String fileName = targetFilePath.getFileName().toString(); - long fileSize; - try { - fileSize = Files.size(targetFilePath); - } catch (IOException e) { - fileSize = 0L; - } - - DatasetFile datasetFile = DatasetFile.builder() - .id(UUID.randomUUID().toString()) - .datasetId(dataset.getId()) - .fileName(fileName) - .fileType(AnalyzerUtils.getExtension(fileName)) - .fileSize(fileSize) - .filePath(targetFilePath.toString()) - .logicalPath(normalizedLogicalPath) - .version(nextVersion) - .status(FILE_STATUS_ACTIVE) - .uploadTime(currentTime) - .lastAccessTime(currentTime) - .build(); - - datasetFileRepository.saveOrUpdate(datasetFile); - dataset.addFile(datasetFile); - triggerPdfTextExtraction(dataset, datasetFile); - return datasetFile; - } - - private DuplicateMethod resolveEffectiveDuplicateMethod() { - if (duplicateMethod == null) { - return DuplicateMethod.VERSION; - } - if (duplicateMethod == DuplicateMethod.COVER) { - log.warn("duplicateMethod=COVER 会导致标注引用的 fileId 对应内容被覆盖,已强制按 VERSION 处理。"); - return DuplicateMethod.VERSION; - } - return duplicateMethod; - } - - private Path archiveDatasetFileVersion(Path datasetRoot, String logicalPath, DatasetFile latest) { - if (latest == null || latest.getId() == null || latest.getId().isBlank()) { - return null; - } - Path currentPath; - try { - currentPath = Paths.get(latest.getFilePath()).toAbsolutePath().normalize(); - } catch (Exception e) { - log.warn("Invalid latest file path, skip archiving. datasetId={}, fileId={}, filePath={}", - latest.getDatasetId(), latest.getId(), latest.getFilePath()); - return null; - } - - if (!Files.exists(currentPath) || !Files.isRegularFile(currentPath)) { - log.warn("Latest file not found on disk, skip archiving. datasetId={}, fileId={}, filePath={}", - latest.getDatasetId(), latest.getId(), currentPath); - return null; - } - if (!currentPath.startsWith(datasetRoot)) { - log.warn("Latest file path out of dataset root, skip archiving. datasetId={}, fileId={}, filePath={}", - latest.getDatasetId(), latest.getId(), currentPath); - return null; - } - - long latestVersion = Optional.ofNullable(latest.getVersion()).orElse(1L); - String logicalPathHash = sha256Hex(logicalPath); - Path archiveDir = datasetRoot - .resolve(INTERNAL_DIR_NAME) - .resolve(INTERNAL_VERSIONS_DIR_NAME) - .resolve(logicalPathHash) - .resolve("v" + latestVersion) - .toAbsolutePath() - .normalize(); - BusinessAssert.isTrue(archiveDir.startsWith(datasetRoot), CommonErrorCode.PARAM_ERROR); - - try { - Files.createDirectories(archiveDir); - } catch (IOException e) { - log.error("Failed to create archive dir: {}", archiveDir, e); - throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); - } - - String fileName = sanitizeArchiveFileName(Optional.ofNullable(latest.getFileName()).orElse(currentPath.getFileName().toString())); - Path archivedPath = archiveDir.resolve(latest.getId() + "__" + fileName).toAbsolutePath().normalize(); - BusinessAssert.isTrue(archivedPath.startsWith(archiveDir), CommonErrorCode.PARAM_ERROR); - - try { - Files.move(currentPath, archivedPath, java.nio.file.StandardCopyOption.REPLACE_EXISTING); - return archivedPath; - } catch (IOException e) { - log.error("Failed to archive latest file, datasetId={}, fileId={}, from={}, to={}", - latest.getDatasetId(), latest.getId(), currentPath, archivedPath, e); - throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); - } - } - - private void archiveOrphanTargetFile(Path datasetRoot, String logicalPath, Path targetFilePath) { - if (datasetRoot == null || targetFilePath == null) { - return; - } - if (!Files.exists(targetFilePath) || !Files.isRegularFile(targetFilePath)) { - return; - } - String logicalPathHash = sha256Hex(logicalPath); - Path orphanDir = datasetRoot - .resolve(INTERNAL_DIR_NAME) - .resolve(INTERNAL_VERSIONS_DIR_NAME) - .resolve(logicalPathHash) - .resolve("orphan") - .toAbsolutePath() - .normalize(); - if (!orphanDir.startsWith(datasetRoot)) { - return; - } - try { - Files.createDirectories(orphanDir); - String safeName = sanitizeArchiveFileName(targetFilePath.getFileName().toString()); - Path orphanPath = orphanDir.resolve("orphan_" + System.currentTimeMillis() + "__" + safeName) - .toAbsolutePath() - .normalize(); - if (!orphanPath.startsWith(orphanDir)) { - return; - } - Files.move(targetFilePath, orphanPath, java.nio.file.StandardCopyOption.REPLACE_EXISTING); - } catch (Exception e) { - log.warn("Failed to archive orphan target file, logicalPath={}, targetPath={}", logicalPath, targetFilePath, e); - } - } - - /** - * 在数据集下创建子目录 - */ - @Transactional - public void createDirectory(String datasetId, CreateDirectoryRequest req) { - Dataset dataset = datasetRepository.getById(datasetId); - if (dataset == null) { - throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND); - } - String datasetPath = dataset.getPath(); - String parentPrefix = Optional.ofNullable(req.getParentPrefix()).orElse("").trim(); - parentPrefix = parentPrefix.replace("\\", "/"); - while (parentPrefix.startsWith("/")) { - parentPrefix = parentPrefix.substring(1); - } - parentPrefix = normalizeLogicalPrefix(parentPrefix); - assertNotInternalPrefix(parentPrefix); - - String directoryName = Optional.ofNullable(req.getDirectoryName()).orElse("").trim(); - if (directoryName.isEmpty()) { - throw BusinessException.of(CommonErrorCode.PARAM_ERROR); - } - if (INTERNAL_DIR_NAME.equals(directoryName)) { - throw BusinessException.of(CommonErrorCode.PARAM_ERROR); - } - if (directoryName.contains("..") || directoryName.contains("/") || directoryName.contains("\\")) { - throw BusinessException.of(CommonErrorCode.PARAM_ERROR); - } - - Path basePath = Paths.get(datasetPath); - Path targetPath = parentPrefix.isEmpty() - ? basePath.resolve(directoryName) - : basePath.resolve(parentPrefix).resolve(directoryName); - - Path normalized = targetPath.normalize(); - if (!normalized.startsWith(basePath)) { - throw BusinessException.of(CommonErrorCode.PARAM_ERROR); - } - - try { - Files.createDirectories(normalized); - } catch (IOException e) { - log.error("Failed to create directory {} for dataset {}", normalized, datasetId, e); - throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); - } - } - - /** - * 下载目录为 ZIP 文件 - */ - @Transactional(readOnly = true) - public void downloadDirectory(String datasetId, String prefix, HttpServletResponse response) { - Dataset dataset = datasetRepository.getById(datasetId); - if (dataset == null) { - throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND); - } - - String datasetPath = dataset.getPath(); - prefix = Optional.ofNullable(prefix).orElse("").trim(); - prefix = prefix.replace("\\", "/"); - while (prefix.startsWith("/")) { - prefix = prefix.substring(1); - } - while (prefix.endsWith("/")) { - prefix = prefix.substring(0, prefix.length() - 1); - } - if (prefix.equals(INTERNAL_DIR_NAME) || prefix.startsWith(INTERNAL_DIR_NAME + "/")) { - throw BusinessException.of(CommonErrorCode.PARAM_ERROR); - } - - Path basePath = Paths.get(datasetPath); - Path targetPath = prefix.isEmpty() ? basePath : basePath.resolve(prefix); - Path normalized = targetPath.normalize(); - - if (!normalized.startsWith(basePath)) { - throw BusinessException.of(CommonErrorCode.PARAM_ERROR); - } - - if (!Files.exists(normalized) || !Files.isDirectory(normalized)) { - throw BusinessException.of(DataManagementErrorCode.DIRECTORY_NOT_FOUND); - } - - String zipFileName = prefix.isEmpty() ? dataset.getName() : prefix.replace("/", "_"); - zipFileName = zipFileName + "_" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss")) + ".zip"; - - try { - response.setContentType("application/zip"); - response.setHeader(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + zipFileName + "\""); - - try (ZipArchiveOutputStream zipOut = new ZipArchiveOutputStream(response.getOutputStream())) { - zipDirectory(normalized, normalized, zipOut); - zipOut.finish(); - } - } catch (IOException e) { - log.error("Failed to download directory {} for dataset {}", normalized, datasetId, e); - throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); - } - } - - /** - * 递归压缩目录 - */ - private void zipDirectory(Path sourceDir, Path basePath, ZipArchiveOutputStream zipOut) throws IOException { - try (Stream paths = Files.walk(sourceDir)) { - paths.filter(path -> !Files.isDirectory(path)) - .filter(path -> !isInternalDatasetPath(basePath.toAbsolutePath().normalize(), path)) - .forEach(path -> { - try { - Path relativePath = basePath.relativize(path); - ZipArchiveEntry zipEntry = new ZipArchiveEntry(relativePath.toString()); - zipOut.putArchiveEntry(zipEntry); - try (InputStream fis = Files.newInputStream(path)) { - IOUtils.copy(fis, zipOut); - } - zipOut.closeArchiveEntry(); - } catch (IOException e) { - log.error("Failed to add file to zip: {}", path, e); - } - }); - } - } - - /** - * 删除目录及其所有内容 - */ - @Transactional - public void deleteDirectory(String datasetId, String prefix) { - Dataset dataset = datasetRepository.getById(datasetId); - if (dataset == null) { - throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND); - } - - prefix = Optional.ofNullable(prefix).orElse("").trim(); - prefix = prefix.replace("\\", "/"); - while (prefix.startsWith("/")) { - prefix = prefix.substring(1); - } - while (prefix.endsWith("/")) { - prefix = prefix.substring(0, prefix.length() - 1); - } - - if (prefix.isEmpty()) { - throw BusinessException.of(CommonErrorCode.PARAM_ERROR); - } - if (prefix.equals(INTERNAL_DIR_NAME) || prefix.startsWith(INTERNAL_DIR_NAME + "/")) { - throw BusinessException.of(CommonErrorCode.PARAM_ERROR); - } - - String datasetPath = dataset.getPath(); - Path basePath = Paths.get(datasetPath); - Path targetPath = basePath.resolve(prefix); - Path normalized = targetPath.normalize(); - - if (!normalized.startsWith(basePath)) { - throw BusinessException.of(CommonErrorCode.PARAM_ERROR); - } - - if (!Files.exists(normalized) || !Files.isDirectory(normalized)) { - throw BusinessException.of(DataManagementErrorCode.DIRECTORY_NOT_FOUND); - } - - // 删除数据库中该目录下的所有文件记录(基于数据集内相对路径判断) - String datasetPathNorm = datasetPath.replace("\\", "/"); - String logicalPrefix = prefix; // 已经去掉首尾斜杠 - List filesToDelete = datasetFileRepository.findAllByDatasetId(datasetId).stream() - .filter(file -> { - if (file.getFilePath() == null) { - return false; - } - String filePath = file.getFilePath().replace("\\", "/"); - if (!filePath.startsWith(datasetPathNorm)) { - return false; - } - String relative = filePath.substring(datasetPathNorm.length()); - while (relative.startsWith("/")) { - relative = relative.substring(1); - } - return relative.equals(logicalPrefix) || relative.startsWith(logicalPrefix + "/"); - }) - .collect(Collectors.toList()); - - for (DatasetFile file : filesToDelete) { - datasetFileRepository.removeById(file.getId()); - datasetFilePreviewService.deletePreviewFileQuietly(datasetId, file.getId()); - } - - // 删除文件系统中的目录 - try { - deleteDirectoryRecursively(normalized); - } catch (IOException e) { - log.error("Failed to delete directory {} for dataset {}", normalized, datasetId, e); - throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); - } - - // 更新数据集 - dataset.setFiles(filesToDelete); - for (DatasetFile file : filesToDelete) { - dataset.removeFile(file); - } - datasetRepository.updateById(dataset); - } - - /** - * 递归删除目录 - */ - private void deleteDirectoryRecursively(Path directory) throws IOException { - try (Stream paths = Files.walk(directory)) { - paths.sorted(Comparator.reverseOrder()) - .forEach(path -> { - try { - Files.delete(path); - } catch (IOException e) { - log.error("Failed to delete: {}", path, e); - } - }); - } - } - - /** - * 复制文件到数据集目录 - * - * @param datasetId 数据集id - * @param req 复制文件请求 - * @return 复制的文件列表 - */ - @Transactional - public List copyFilesToDatasetDir(String datasetId, CopyFilesRequest req) { - Dataset dataset = datasetRepository.getById(datasetId); - BusinessAssert.notNull(dataset, SystemErrorCode.RESOURCE_NOT_FOUND); - Path datasetRoot = resolveDatasetRootPath(dataset, datasetId); - dataset.setFiles(new ArrayList<>(datasetFileRepository.findAllVisibleByDatasetId(datasetId))); - List copiedFiles = new ArrayList<>(); - for (String sourceFilePath : req.sourcePaths()) { - Path sourcePath = Paths.get(sourceFilePath).toAbsolutePath().normalize(); - if (!Files.exists(sourcePath) || !Files.isRegularFile(sourcePath)) { - log.warn("Source file does not exist or is not a regular file: {}", sourceFilePath); - continue; - } - String logicalPath = sourcePath.getFileName().toString(); - DatasetFile datasetFile = commitNewFileVersion(dataset, datasetRoot, logicalPath, sourcePath, false); - copiedFiles.add(datasetFile); - } - dataset.active(); - datasetRepository.updateById(dataset); - return copiedFiles; - } - - /** - * 复制文件到数据集目录(保留相对路径,适用于数据源导入) - * - * @param datasetId 数据集id - * @param sourceRoot 数据源根目录 - * @param sourcePaths 源文件路径列表 - * @return 复制的文件列表 - */ - @Transactional - public List copyFilesToDatasetDirWithSourceRoot(String datasetId, Path sourceRoot, List sourcePaths) { - Dataset dataset = datasetRepository.getById(datasetId); - BusinessAssert.notNull(dataset, SystemErrorCode.RESOURCE_NOT_FOUND); - Path datasetRoot = resolveDatasetRootPath(dataset, datasetId); - Path normalizedRoot = sourceRoot.toAbsolutePath().normalize(); - dataset.setFiles(new ArrayList<>(datasetFileRepository.findAllVisibleByDatasetId(datasetId))); - - List copiedFiles = new ArrayList<>(); - for (String sourceFilePath : sourcePaths) { - if (sourceFilePath == null || sourceFilePath.isBlank()) { - continue; - } - Path sourcePath = Paths.get(sourceFilePath).toAbsolutePath().normalize(); - if (!sourcePath.startsWith(normalizedRoot)) { - log.warn("Source file path is out of root: {}", sourceFilePath); - continue; - } - if (!Files.exists(sourcePath) || !Files.isRegularFile(sourcePath)) { - log.warn("Source file does not exist or is not a regular file: {}", sourceFilePath); - continue; - } - Path relativePath = normalizedRoot.relativize(sourcePath); - String logicalPath = relativePath.toString().replace("\\", "/"); - DatasetFile datasetFile = commitNewFileVersion(dataset, datasetRoot, logicalPath, sourcePath, false); - copiedFiles.add(datasetFile); - } - dataset.active(); - datasetRepository.updateById(dataset); - return copiedFiles; - } - - /** - * 添加文件到数据集(仅创建数据库记录,不执行文件系统操作) - * - * @param datasetId 数据集id - * @param req 添加文件请求 - * @return 添加的文件列表 - */ - @Transactional - public List addFilesToDataset(String datasetId, AddFilesRequest req) { - Dataset dataset = datasetRepository.getById(datasetId); - BusinessAssert.notNull(dataset, SystemErrorCode.RESOURCE_NOT_FOUND); - List addedFiles = new ArrayList<>(); - dataset.setFiles(new ArrayList<>(datasetFileRepository.findAllVisibleByDatasetId(datasetId))); - - boolean softAdd = req.softAdd(); - String metadata; - try { - Map metadataMap = Map.of("softAdd", softAdd); - ObjectMapper objectMapper = new ObjectMapper(); - metadata = objectMapper.writeValueAsString(metadataMap); - } catch (JsonProcessingException e) { - log.error("Failed to serialize metadataMap", e); - throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR); - } - - for (String sourceFilePath : req.sourcePaths()) { - Path sourcePath = Paths.get(sourceFilePath); - String fileName = sourcePath.getFileName().toString(); - File sourceFile = sourcePath.toFile(); - String logicalPath = normalizeLogicalPath(fileName); - assertNotInternalPrefix(logicalPath); - - DatasetFile latest = datasetFileRepository.findLatestByDatasetIdAndLogicalPath(datasetId, logicalPath); - if (latest == null && dataset.getFiles() != null) { - latest = dataset.getFiles().stream() - .filter(existing -> existing != null - && !isArchivedStatus(existing) - && Objects.equals(existing.getFileName(), fileName)) - .findFirst() - .orElse(null); - } - - DuplicateMethod effectiveDuplicateMethod = resolveEffectiveDuplicateMethod(); - if (latest != null && effectiveDuplicateMethod == DuplicateMethod.ERROR) { - throw BusinessException.of(DataManagementErrorCode.DATASET_FILE_ALREADY_EXISTS); - } - - long nextVersion = 1L; - if (latest != null) { - long latestVersion = Optional.ofNullable(latest.getVersion()).orElse(1L); - if (latest.getVersion() == null) { - latest.setVersion(latestVersion); - } - if (latest.getLogicalPath() == null || latest.getLogicalPath().isBlank()) { - latest.setLogicalPath(logicalPath); - } - nextVersion = latestVersion + 1L; - } - - if (latest != null && effectiveDuplicateMethod == DuplicateMethod.VERSION) { - latest.setStatus(FILE_STATUS_ARCHIVED); - datasetFileRepository.updateById(latest); - dataset.removeFile(latest); - } - - LocalDateTime currentTime = LocalDateTime.now(); - DatasetFile datasetFile = DatasetFile.builder() - .id(UUID.randomUUID().toString()) - .datasetId(datasetId) - .fileName(fileName) - .fileType(AnalyzerUtils.getExtension(fileName)) - .fileSize(sourceFile.length()) - .filePath(sourceFilePath) - .logicalPath(logicalPath) - .version(nextVersion) - .status(FILE_STATUS_ACTIVE) - .uploadTime(currentTime) - .lastAccessTime(currentTime) - .metadata(metadata) - .build(); - - datasetFileRepository.saveOrUpdate(datasetFile); - dataset.addFile(datasetFile); - addedFiles.add(datasetFile); - triggerPdfTextExtraction(dataset, datasetFile); - } - dataset.active(); - datasetRepository.updateById(dataset); - // Note: addFilesToDataset only creates DB records, no file system operations - // If file copy is needed, use copyFilesToDatasetDir endpoint instead - return addedFiles; - } - - private void triggerPdfTextExtraction(Dataset dataset, DatasetFile datasetFile) { - if (dataset == null || datasetFile == null) { - return; - } - if (dataset.getDatasetType() != DatasetType.TEXT) { - return; - } - String fileType = datasetFile.getFileType(); - if (fileType == null || !DOCUMENT_TEXT_FILE_TYPES.contains(fileType.toLowerCase(Locale.ROOT))) { - return; - } - String datasetId = dataset.getId(); - String fileId = datasetFile.getId(); - if (datasetId == null || fileId == null) { - return; - } - if (TransactionSynchronizationManager.isSynchronizationActive()) { - TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { - @Override - public void afterCommit() { - pdfTextExtractAsyncService.extractPdfText(datasetId, fileId); - } - }); - return; - } - pdfTextExtractAsyncService.extractPdfText(datasetId, fileId); - } -} +package com.datamate.datamanagement.application; + +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.datamate.common.domain.model.ChunkUploadPreRequest; +import com.datamate.common.domain.model.FileUploadResult; +import com.datamate.common.domain.service.FileService; +import com.datamate.common.domain.utils.AnalyzerUtils; +import com.datamate.common.domain.utils.ArchiveAnalyzer; +import com.datamate.common.infrastructure.exception.BusinessAssert; +import com.datamate.common.infrastructure.exception.BusinessException; +import com.datamate.common.infrastructure.exception.CommonErrorCode; +import com.datamate.common.infrastructure.exception.SystemErrorCode; +import com.datamate.common.interfaces.PagedResponse; +import com.datamate.common.interfaces.PagingQuery; +import com.datamate.datamanagement.common.enums.DuplicateMethod; +import com.datamate.datamanagement.common.enums.DatasetType; +import com.datamate.datamanagement.domain.contants.DatasetConstant; +import com.datamate.datamanagement.domain.model.dataset.Dataset; +import com.datamate.datamanagement.domain.model.dataset.DatasetFile; +import com.datamate.datamanagement.domain.model.dataset.DatasetFileUploadCheckInfo; +import com.datamate.datamanagement.infrastructure.exception.DataManagementErrorCode; +import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetFileRepository; +import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetRepository; +import com.datamate.datamanagement.interfaces.converter.DatasetConverter; +import com.datamate.datamanagement.interfaces.dto.AddFilesRequest; +import com.datamate.datamanagement.interfaces.dto.CopyFilesRequest; +import com.datamate.datamanagement.interfaces.dto.CreateDirectoryRequest; +import com.datamate.datamanagement.interfaces.dto.UploadFileRequest; +import com.datamate.datamanagement.interfaces.dto.UploadFilesPreRequest; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.servlet.http.HttpServletResponse; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.compress.archivers.zip.ZipArchiveEntry; +import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.io.Resource; +import org.springframework.core.io.UrlResource; +import org.springframework.http.HttpHeaders; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationManager; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.attribute.BasicFileAttributes; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * 数据集文件应用服务 + */ +@Slf4j +@Service +@Transactional +public class DatasetFileApplicationService { + private static final String PDF_FILE_TYPE = "pdf"; + private static final String DOC_FILE_TYPE = "doc"; + private static final String DOCX_FILE_TYPE = "docx"; + private static final String XLS_FILE_TYPE = "xls"; + private static final String XLSX_FILE_TYPE = "xlsx"; + private static final Set DOCUMENT_TEXT_FILE_TYPES = Set.of( + PDF_FILE_TYPE, + DOC_FILE_TYPE, + DOCX_FILE_TYPE, + XLS_FILE_TYPE, + XLSX_FILE_TYPE + ); + private static final String DERIVED_METADATA_KEY = "derived_from_file_id"; + private static final String FILE_STATUS_ACTIVE = "ACTIVE"; + private static final String FILE_STATUS_ARCHIVED = "ARCHIVED"; + private static final String INTERNAL_DIR_NAME = ".datamate"; + private static final String INTERNAL_UPLOAD_DIR_NAME = "uploading"; + private static final String INTERNAL_VERSIONS_DIR_NAME = "versions"; + + private final DatasetFileRepository datasetFileRepository; + private final DatasetRepository datasetRepository; + private final FileService fileService; + private final PdfTextExtractAsyncService pdfTextExtractAsyncService; + private final DatasetFilePreviewService datasetFilePreviewService; + + @Value("${datamate.data-management.base-path:/dataset}") + private String datasetBasePath; + + @Value("${datamate.data-management.file.duplicate:VERSION}") + private DuplicateMethod duplicateMethod; + + @Autowired + public DatasetFileApplicationService(DatasetFileRepository datasetFileRepository, + DatasetRepository datasetRepository, + FileService fileService, + PdfTextExtractAsyncService pdfTextExtractAsyncService, + DatasetFilePreviewService datasetFilePreviewService) { + this.datasetFileRepository = datasetFileRepository; + this.datasetRepository = datasetRepository; + this.fileService = fileService; + this.pdfTextExtractAsyncService = pdfTextExtractAsyncService; + this.datasetFilePreviewService = datasetFilePreviewService; + } + + /** + * 获取数据集文件列表 + */ + @Transactional(readOnly = true) + public PagedResponse getDatasetFiles(String datasetId, String fileType, String status, String name, + Boolean hasAnnotation, PagingQuery pagingQuery) { + return getDatasetFiles(datasetId, fileType, status, name, hasAnnotation, false, pagingQuery); + } + + /** + * 获取数据集文件列表,支持排除已被转换为TXT的源文档文件 + * + * @param datasetId 数据集ID + * @param fileType 文件类型过滤 + * @param status 状态过滤 + * @param name 文件名模糊查询 + * @param hasAnnotation 是否有标注 + * @param excludeSourceDocuments 是否排除源文档(PDF/DOC/DOCX/XLS/XLSX) + * @param pagingQuery 分页参数 + * @return 分页文件列表 + */ + @Transactional(readOnly = true) + public PagedResponse getDatasetFiles(String datasetId, String fileType, String status, String name, + Boolean hasAnnotation, boolean excludeSourceDocuments, PagingQuery pagingQuery) { + IPage page = new Page<>(pagingQuery.getPage(), pagingQuery.getSize()); + IPage files = datasetFileRepository.findByCriteria(datasetId, fileType, status, name, hasAnnotation, page); + + if (excludeSourceDocuments) { + // 过滤掉源文档文件(PDF/DOC/DOCX/XLS/XLSX),用于标注场景只展示派生文件 + List filteredRecords = files.getRecords().stream() + .filter(file -> !isSourceDocument(file)) + .collect(Collectors.toList()); + + // 重新构建分页结果 + Page filteredPage = new Page<>(files.getCurrent(), files.getSize(), files.getTotal()); + filteredPage.setRecords(filteredRecords); + return PagedResponse.of(filteredPage); + } + + return PagedResponse.of(files); + } + + /** + * 获取数据集文件列表 + */ + @Transactional(readOnly = true) + public PagedResponse getDatasetFilesWithDirectory(String datasetId, String prefix, boolean excludeDerivedFiles, PagingQuery pagingQuery) { + Dataset dataset = datasetRepository.getById(datasetId); + int page = Math.max(pagingQuery.getPage(), 1); + int size = pagingQuery.getSize() == null || pagingQuery.getSize() < 0 ? 20 : pagingQuery.getSize(); + if (dataset == null) { + return PagedResponse.of(new Page<>(page, size)); + } + Path datasetRoot = Paths.get(dataset.getPath()).toAbsolutePath().normalize(); + prefix = Optional.ofNullable(prefix).orElse("").trim().replace("\\", "/"); + while (prefix.startsWith("/")) { + prefix = prefix.substring(1); + } + if (prefix.equals(INTERNAL_DIR_NAME) || prefix.startsWith(INTERNAL_DIR_NAME + "/")) { + return new PagedResponse<>(page, size, 0, 0, Collections.emptyList()); + } + Path queryPath = datasetRoot.resolve(prefix.replace("/", File.separator)).normalize(); + if (!queryPath.startsWith(datasetRoot)) { + return new PagedResponse<>(page, size, 0, 0, Collections.emptyList()); + } + Map datasetFilesMap = datasetFileRepository.findAllVisibleByDatasetId(datasetId) + .stream() + .filter(file -> file.getFilePath() != null) + .collect(Collectors.toMap( + file -> normalizeFilePath(file.getFilePath()), + Function.identity(), + (left, right) -> left + )); + Set derivedFilePaths = excludeDerivedFiles + ? datasetFilesMap.values().stream() + .filter(this::isDerivedFile) + .map(DatasetFile::getFilePath) + .map(this::normalizeFilePath) + .filter(Objects::nonNull) + .collect(Collectors.toSet()) + : Collections.emptySet(); + // 如果目录不存在,直接返回空结果(数据集刚创建时目录可能还未生成) + if (!Files.exists(queryPath)) { + return new PagedResponse<>(page, size, 0, 0, Collections.emptyList()); + } + try (Stream pathStream = Files.list(queryPath)) { + List allFiles = pathStream + .filter(path -> path.toAbsolutePath().normalize().startsWith(datasetRoot)) + .filter(path -> !isInternalDatasetPath(datasetRoot, path)) + .filter(path -> !excludeDerivedFiles + || Files.isDirectory(path) + || !derivedFilePaths.contains(normalizeFilePath(path.toString()))) + .sorted(Comparator + .comparing((Path path) -> !Files.isDirectory(path)) + .thenComparing(path -> path.getFileName().toString())) + .collect(Collectors.toList()); + + // 计算分页 + int total = allFiles.size(); + int totalPages = (int) Math.ceil((double) total / size); + + // 获取当前页数据 + int fromIndex = (page - 1) * size; + fromIndex = Math.max(fromIndex, 0); + int toIndex = Math.min(fromIndex + size, total); + + List pageData = new ArrayList<>(); + if (fromIndex < total) { + pageData = allFiles.subList(fromIndex, toIndex); + } + List datasetFiles = pageData.stream() + .map(path -> getDatasetFile(path, datasetFilesMap, excludeDerivedFiles, derivedFilePaths)) + .toList(); + + return new PagedResponse<>(page, size, total, totalPages, datasetFiles); + } catch (IOException e) { + log.error("list dataset path error", e); + return PagedResponse.of(new Page<>(page, size)); + } + } + + private DatasetFile getDatasetFile(Path path, + Map datasetFilesMap, + boolean excludeDerivedFiles, + Set derivedFilePaths) { + DatasetFile datasetFile = new DatasetFile(); + LocalDateTime localDateTime = LocalDateTime.now(); + try { + localDateTime = Files.getLastModifiedTime(path).toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime(); + } catch (IOException e) { + log.error("get last modified time error", e); + } + datasetFile.setFileName(path.getFileName().toString()); + datasetFile.setUploadTime(localDateTime); + + // 目录与普通文件区分处理 + if (Files.isDirectory(path)) { + datasetFile.setId("directory-" + datasetFile.getFileName()); + datasetFile.setDirectory(true); + + // 统计目录下文件数量和总大小 + try { + long fileCount; + long totalSize; + + try (Stream walk = Files.walk(path)) { + Stream fileStream = walk.filter(Files::isRegularFile); + if (excludeDerivedFiles && !derivedFilePaths.isEmpty()) { + fileStream = fileStream.filter(filePath -> + !derivedFilePaths.contains(normalizeFilePath(filePath.toString()))); + } + fileCount = fileStream.count(); + } + + try (Stream walk = Files.walk(path)) { + Stream fileStream = walk.filter(Files::isRegularFile); + if (excludeDerivedFiles && !derivedFilePaths.isEmpty()) { + fileStream = fileStream.filter(filePath -> + !derivedFilePaths.contains(normalizeFilePath(filePath.toString()))); + } + totalSize = fileStream + .mapToLong(p -> { + try { + return Files.size(p); + } catch (IOException e) { + log.error("get file size error", e); + return 0L; + } + }) + .sum(); + } + + datasetFile.setFileCount(fileCount); + datasetFile.setFileSize(totalSize); + } catch (IOException e) { + log.error("stat directory info error", e); + } + } else { + DatasetFile exist = datasetFilesMap.get(normalizeFilePath(path.toString())); + if (exist == null) { + datasetFile.setId("file-" + datasetFile.getFileName()); + datasetFile.setFileSize(path.toFile().length()); + } else { + datasetFile = exist; + } + } + return datasetFile; + } + + private String normalizeFilePath(String filePath) { + if (filePath == null || filePath.isBlank()) { + return null; + } + try { + return Paths.get(filePath).toAbsolutePath().normalize().toString(); + } catch (Exception e) { + return filePath.replace("\\", "/"); + } + } + + private boolean isSameNormalizedPath(String left, String right) { + String normalizedLeft = normalizeFilePath(left); + String normalizedRight = normalizeFilePath(right); + if (normalizedLeft == null || normalizedRight == null) { + return false; + } + return normalizedLeft.equals(normalizedRight); + } + + private boolean isInternalDatasetPath(Path datasetRoot, Path path) { + if (datasetRoot == null || path == null) { + return false; + } + try { + Path normalizedRoot = datasetRoot.toAbsolutePath().normalize(); + Path normalizedPath = path.toAbsolutePath().normalize(); + if (!normalizedPath.startsWith(normalizedRoot)) { + return false; + } + Path relative = normalizedRoot.relativize(normalizedPath); + if (relative.getNameCount() == 0) { + return false; + } + return INTERNAL_DIR_NAME.equals(relative.getName(0).toString()); + } catch (Exception e) { + return false; + } + } + + private String normalizeLogicalPrefix(String prefix) { + if (prefix == null) { + return ""; + } + String normalized = prefix.trim().replace("\\", "/"); + while (normalized.startsWith("/")) { + normalized = normalized.substring(1); + } + while (normalized.endsWith("/")) { + normalized = normalized.substring(0, normalized.length() - 1); + } + while (normalized.contains("//")) { + normalized = normalized.replace("//", "/"); + } + return normalized; + } + + private String normalizeLogicalPath(String logicalPath) { + return normalizeLogicalPrefix(logicalPath); + } + + private String joinLogicalPath(String prefix, String relativePath) { + String normalizedPrefix = normalizeLogicalPrefix(prefix); + String normalizedRelative = normalizeLogicalPath(relativePath); + if (normalizedPrefix.isEmpty()) { + return normalizedRelative; + } + if (normalizedRelative.isEmpty()) { + return normalizedPrefix; + } + return normalizeLogicalPath(normalizedPrefix + "/" + normalizedRelative); + } + + private void assertNotInternalPrefix(String prefix) { + if (prefix == null || prefix.isBlank()) { + return; + } + String normalized = normalizeLogicalPrefix(prefix); + if (normalized.equals(INTERNAL_DIR_NAME) || normalized.startsWith(INTERNAL_DIR_NAME + "/")) { + throw BusinessException.of(CommonErrorCode.PARAM_ERROR); + } + } + + private boolean isArchivedStatus(DatasetFile datasetFile) { + if (datasetFile == null) { + return false; + } + String status = datasetFile.getStatus(); + return status != null && FILE_STATUS_ARCHIVED.equalsIgnoreCase(status); + } + + private boolean isSourceDocument(DatasetFile datasetFile) { + if (datasetFile == null) { + return false; + } + String fileType = datasetFile.getFileType(); + if (fileType == null || fileType.isBlank()) { + return false; + } + return DOCUMENT_TEXT_FILE_TYPES.contains(fileType.toLowerCase(Locale.ROOT)); + } + + private boolean isDerivedFile(DatasetFile datasetFile) { + if (datasetFile == null) { + return false; + } + String metadata = datasetFile.getMetadata(); + if (metadata == null || metadata.isBlank()) { + return false; + } + try { + ObjectMapper mapper = new ObjectMapper(); + Map metadataMap = mapper.readValue(metadata, new TypeReference>() {}); + return metadataMap.get(DERIVED_METADATA_KEY) != null; + } catch (Exception e) { + log.debug("Failed to parse dataset file metadata for derived detection: {}", datasetFile.getId(), e); + return false; + } + } + + private Path resolveDatasetRootPath(Dataset dataset, String datasetId) { + String datasetPath = dataset == null ? null : dataset.getPath(); + if (datasetPath == null || datasetPath.isBlank()) { + datasetPath = datasetBasePath + File.separator + datasetId; + if (dataset != null) { + dataset.setPath(datasetPath); + datasetRepository.updateById(dataset); + } + } + Path datasetRoot = Paths.get(datasetPath).toAbsolutePath().normalize(); + try { + Files.createDirectories(datasetRoot); + } catch (IOException e) { + log.error("Failed to create dataset root dir: {}", datasetRoot, e); + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); + } + return datasetRoot; + } + + private Path resolveStagingRootPath(Path datasetRoot, + DatasetFileUploadCheckInfo checkInfo, + List uploadedFiles) { + if (datasetRoot == null) { + return null; + } + String stagingPath = checkInfo == null ? null : checkInfo.getStagingPath(); + if (stagingPath != null && !stagingPath.isBlank()) { + try { + Path stagingRoot = Paths.get(stagingPath).toAbsolutePath().normalize(); + if (!stagingRoot.startsWith(datasetRoot)) { + log.warn("Staging root out of dataset root, datasetId={}, stagingRoot={}, datasetRoot={}", + checkInfo == null ? null : checkInfo.getDatasetId(), stagingRoot, datasetRoot); + return null; + } + Path relative = datasetRoot.relativize(stagingRoot); + if (relative.getNameCount() < 3) { + return null; + } + if (!INTERNAL_DIR_NAME.equals(relative.getName(0).toString()) + || !INTERNAL_UPLOAD_DIR_NAME.equals(relative.getName(1).toString())) { + return null; + } + return stagingRoot; + } catch (Exception e) { + log.warn("Invalid staging path: {}", stagingPath, e); + return null; + } + } + if (uploadedFiles == null || uploadedFiles.isEmpty()) { + return null; + } + FileUploadResult firstResult = uploadedFiles.get(0); + File firstFile = firstResult == null ? null : firstResult.getSavedFile(); + if (firstFile == null) { + return null; + } + try { + return Paths.get(firstFile.getParent()).toAbsolutePath().normalize(); + } catch (Exception e) { + return null; + } + } + + private void scheduleCleanupStagingDirAfterCommit(Path stagingRoot) { + if (stagingRoot == null) { + return; + } + Runnable cleanup = () -> deleteDirectoryRecursivelyQuietly(stagingRoot); + if (TransactionSynchronizationManager.isSynchronizationActive()) { + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCommit() { + cleanup.run(); + } + }); + return; + } + cleanup.run(); + } + + private void deleteDirectoryRecursivelyQuietly(Path directory) { + if (directory == null) { + return; + } + if (!Files.exists(directory)) { + return; + } + try (Stream paths = Files.walk(directory)) { + paths.sorted(Comparator.reverseOrder()).forEach(path -> { + try { + Files.deleteIfExists(path); + } catch (IOException e) { + log.debug("Failed to delete: {}", path, e); + } + }); + } catch (IOException e) { + log.debug("Failed to cleanup staging dir: {}", directory, e); + } + } + + private String sanitizeArchiveFileName(String fileName) { + String input = fileName == null ? "" : fileName.trim(); + if (input.isBlank()) { + return "file"; + } + StringBuilder builder = new StringBuilder(input.length()); + for (int i = 0; i < input.length(); i++) { + char c = input.charAt(i); + if (c <= 31 || c == 127) { + builder.append('_'); + continue; + } + if (c == '/' || c == '\\' || c == ':' || c == '*' || c == '?' || c == '\"' + || c == '<' || c == '>' || c == '|') { + builder.append('_'); + continue; + } + builder.append(c); + } + String sanitized = builder.toString().trim(); + return sanitized.isEmpty() ? "file" : sanitized; + } + + private String sha256Hex(String value) { + String input = value == null ? "" : value; + try { + java.security.MessageDigest digest = java.security.MessageDigest.getInstance("SHA-256"); + byte[] hashed = digest.digest(input.getBytes(java.nio.charset.StandardCharsets.UTF_8)); + StringBuilder builder = new StringBuilder(hashed.length * 2); + for (byte b : hashed) { + builder.append(String.format("%02x", b)); + } + return builder.toString(); + } catch (Exception e) { + return Integer.toHexString(input.hashCode()); + } + } + + /** + * 获取文件详情 + */ + @Transactional(readOnly = true) + public DatasetFile getDatasetFile(String datasetId, String fileId) { + DatasetFile file = datasetFileRepository.getById(fileId); + if (file == null) { + throw new IllegalArgumentException("File not found: " + fileId); + } + if (!file.getDatasetId().equals(datasetId)) { + throw new IllegalArgumentException("File does not belong to the specified dataset"); + } + return file; + } + + /** + * 删除文件 + */ + @Transactional + public void deleteDatasetFile(String datasetId, String fileId) { + DatasetFile file = getDatasetFile(datasetId, fileId); + if (file == null) { + log.warn("File not found: datasetId={}, fileId={}", datasetId, fileId); + return; + } + String logicalPath = file.getLogicalPath(); + + // 如果 logicalPath 为 null、空字符串或纯空白字符,直接删除当前文件(兼容旧数据) + if (StringUtils.isBlank(logicalPath)) { + deleteDatasetFileInternal(datasetId, file); + return; + } + + List allVersions = datasetFileRepository.findAllByDatasetIdAndLogicalPath(datasetId, logicalPath); + + for (DatasetFile versionFile : allVersions) { + deleteDatasetFileInternal(datasetId, versionFile); + } + } + + private void deleteDatasetFileInternal(String datasetId, DatasetFile file) { + Dataset dataset = datasetRepository.getById(datasetId); + if (file == null || dataset == null) { + return; + } + + // 先删除数据库记录,确保数据库操作成功后再清理派生文件 + try { + datasetFileRepository.removeById(file.getId()); + } catch (Exception e) { + log.error("Failed to delete file record from database: fileId={}", file.getId(), e); + // 数据库删除失败时,跳过后续清理以避免数据不一致 + return; + } + + // 数据库删除成功后,再删除派生文件 + if (isSourceDocument(file)) { + deleteDerivedTextFileQuietly(datasetId, file.getId()); + } + + if (!isArchivedStatus(file)) { + try { + dataset.setFiles(new ArrayList<>(Collections.singleton(file))); + dataset.removeFile(file); + datasetRepository.updateById(dataset); + } catch (Exception e) { + log.error("Failed to update dataset: datasetId={}", datasetId, e); + } + } + + datasetFilePreviewService.deletePreviewFileQuietly(datasetId, file.getId()); + + if (file.getFilePath() != null && file.getFilePath().startsWith(dataset.getPath())) { + try { + Path filePath = Paths.get(file.getFilePath()); + Files.deleteIfExists(filePath); + } catch (IOException ex) { + log.error("Failed to delete physical file: filePath={}", file.getFilePath(), ex); + } + } + } + + private void deleteDerivedTextFileQuietly(String datasetId, String sourceFileId) { + if (sourceFileId == null || sourceFileId.isBlank()) { + return; + } + + try { + List derivedFiles = datasetFileRepository.findAllByDatasetId(datasetId).stream() + .filter(f -> isDerivedFileFromSource(f, sourceFileId)) + .toList(); + + for (DatasetFile derivedFile : derivedFiles) { + deleteDatasetFileInternal(datasetId, derivedFile); + } + } catch (Exception e) { + log.error("Failed to delete derived text files for sourceFileId: {}", sourceFileId, e); + } + } + + private boolean isDerivedFileFromSource(DatasetFile file, String sourceFileId) { + if (file == null || file.getMetadata() == null || file.getMetadata().isBlank()) { + return false; + } + try { + ObjectMapper mapper = new ObjectMapper(); + Map metadataMap = mapper.readValue(file.getMetadata(), new TypeReference>() {}); + Object derivedFromFileId = metadataMap.get(DERIVED_METADATA_KEY); + return derivedFromFileId != null && sourceFileId.equals(String.valueOf(derivedFromFileId)); + } catch (Exception e) { + log.debug("Failed to parse metadata for derived detection: fileId={}", file.getId(), e); + return false; + } + } + + /** + * 下载文件 + */ + @Transactional(readOnly = true) + public Resource downloadFile(String datasetId, String fileId) { + DatasetFile file = getDatasetFile(datasetId, fileId); + try { + Path filePath = Paths.get(file.getFilePath()).normalize(); + Resource resource = new UrlResource(filePath.toUri()); + if (resource.exists()) { + return resource; + } else { + throw new RuntimeException("File not found: " + file.getFileName()); + } + } catch (MalformedURLException ex) { + throw new RuntimeException("File not found: " + file.getFileName(), ex); + } + } + + /** + * 下载文件 + */ + @Transactional(readOnly = true) + public void downloadDatasetFileAsZip(String datasetId, HttpServletResponse response) { + Dataset dataset = datasetRepository.getById(datasetId); + if (Objects.isNull(dataset)) { + throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND); + } + Path datasetRoot = Paths.get(dataset.getPath()).toAbsolutePath().normalize(); + Set filePaths = datasetFileRepository.findAllVisibleByDatasetId(datasetId).stream() + .map(DatasetFile::getFilePath) + .filter(Objects::nonNull) + .map(path -> Paths.get(path).toAbsolutePath().normalize()) + .filter(path -> path.startsWith(datasetRoot)) + .filter(path -> !isInternalDatasetPath(datasetRoot, path)) + .collect(Collectors.toSet()); + Path downloadPath = datasetRoot; + response.setContentType("application/zip"); + String zipName = String.format("dataset_%s.zip", + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"))); + response.setHeader(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=" + zipName); + try (ZipArchiveOutputStream zos = new ZipArchiveOutputStream(response.getOutputStream())) { + try (Stream pathStream = Files.walk(downloadPath)) { + List allPaths = pathStream + .map(path -> path.toAbsolutePath().normalize()) + .filter(path -> path.startsWith(datasetRoot)) + .filter(path -> !isInternalDatasetPath(datasetRoot, path)) + .filter(path -> filePaths.stream().anyMatch(filePath -> filePath.startsWith(path))) + .toList(); + for (Path path : allPaths) { + addToZipFile(path, downloadPath, zos); + } + } + } catch (IOException e) { + log.error("Failed to download files in batches.", e); + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); + } + } + + private void addToZipFile(Path path, Path basePath, ZipArchiveOutputStream zos) throws IOException { + String entryName = basePath.relativize(path) + .toString() + .replace(File.separator, "/"); + + // 处理目录 + if (Files.isDirectory(path)) { + if (!entryName.isEmpty()) { + entryName += "/"; + ZipArchiveEntry dirEntry = new ZipArchiveEntry(entryName); + zos.putArchiveEntry(dirEntry); + zos.closeArchiveEntry(); + } + } else { + // 处理文件 + ZipArchiveEntry fileEntry = new ZipArchiveEntry(path.toFile(), entryName); + + // 设置更多属性 + BasicFileAttributes attrs = Files.readAttributes(path, BasicFileAttributes.class); + fileEntry.setSize(attrs.size()); + fileEntry.setLastModifiedTime(attrs.lastModifiedTime()); + + zos.putArchiveEntry(fileEntry); + + try (InputStream is = Files.newInputStream(path)) { + IOUtils.copy(is, zos); + } + zos.closeArchiveEntry(); + } + } + + /** + * 预上传 + * + * @param chunkUploadRequest 上传请求 + * @param datasetId 数据集id + * @return 请求id + */ + @Transactional + public String preUpload(UploadFilesPreRequest chunkUploadRequest, String datasetId) { + Dataset dataset = datasetRepository.getById(datasetId); + if (Objects.isNull(dataset)) { + throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND); + } + + String prefix = normalizeLogicalPrefix(chunkUploadRequest == null ? null : chunkUploadRequest.getPrefix()); + assertNotInternalPrefix(prefix); + + Path datasetRoot = resolveDatasetRootPath(dataset, datasetId); + Path stagingRoot = datasetRoot + .resolve(INTERNAL_DIR_NAME) + .resolve(INTERNAL_UPLOAD_DIR_NAME) + .resolve(UUID.randomUUID().toString()) + .toAbsolutePath() + .normalize(); + BusinessAssert.isTrue(stagingRoot.startsWith(datasetRoot), CommonErrorCode.PARAM_ERROR); + try { + Files.createDirectories(stagingRoot); + } catch (IOException e) { + log.error("Failed to create staging dir: {}", stagingRoot, e); + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); + } + + ChunkUploadPreRequest request = ChunkUploadPreRequest.builder().build(); + request.setUploadPath(stagingRoot.toString()); + request.setTotalFileNum(chunkUploadRequest.getTotalFileNum()); + request.setServiceId(DatasetConstant.SERVICE_ID); + DatasetFileUploadCheckInfo checkInfo = new DatasetFileUploadCheckInfo(); + checkInfo.setDatasetId(datasetId); + checkInfo.setHasArchive(chunkUploadRequest.isHasArchive()); + checkInfo.setPrefix(prefix); + checkInfo.setStagingPath(stagingRoot.toString()); + try { + ObjectMapper objectMapper = new ObjectMapper(); + String checkInfoJson = objectMapper.writeValueAsString(checkInfo); + request.setCheckInfo(checkInfoJson); + } catch (JsonProcessingException e) { + log.warn("Failed to serialize checkInfo to JSON", e); + } + return fileService.preUpload(request); + } + + /** + * 切片上传 + * + * @param uploadFileRequest 上传请求 + */ + @Transactional + public void chunkUpload(String datasetId, UploadFileRequest uploadFileRequest) { + FileUploadResult uploadResult = fileService.chunkUpload(DatasetConverter.INSTANCE.toChunkUploadRequest(uploadFileRequest)); + saveFileInfoToDb(uploadResult, datasetId); + } + + /** + * 取消上传 + */ + @Transactional + public void cancelUpload(String reqId) { + fileService.cancelUpload(reqId); + } + + private void saveFileInfoToDb(FileUploadResult fileUploadResult, String datasetId) { + if (Objects.isNull(fileUploadResult.getSavedFile())) { + // 文件切片上传没有完成 + return; + } + DatasetFileUploadCheckInfo checkInfo; + try { + ObjectMapper objectMapper = new ObjectMapper(); + checkInfo = objectMapper.readValue(fileUploadResult.getCheckInfo(), DatasetFileUploadCheckInfo.class); + if (!Objects.equals(checkInfo.getDatasetId(), datasetId)) { + throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND); + } + } catch (IllegalArgumentException | JsonProcessingException e) { + log.warn("Failed to convert checkInfo to DatasetFileUploadCheckInfo", e); + throw BusinessException.of(CommonErrorCode.PRE_UPLOAD_REQUEST_NOT_EXIST); + } + List files; + if (checkInfo.isHasArchive() && AnalyzerUtils.isPackage(fileUploadResult.getSavedFile().getPath())) { + files = ArchiveAnalyzer.process(fileUploadResult); + } else { + files = Collections.singletonList(fileUploadResult); + } + commitUploadedFiles(datasetId, checkInfo, files, fileUploadResult.isAllFilesUploaded()); + } + + private void commitUploadedFiles(String datasetId, + DatasetFileUploadCheckInfo checkInfo, + List uploadedFiles, + boolean cleanupStagingAfterCommit) { + Dataset dataset = datasetRepository.getById(datasetId); + BusinessAssert.notNull(dataset, DataManagementErrorCode.DATASET_NOT_FOUND); + + Path datasetRoot = resolveDatasetRootPath(dataset, datasetId); + String prefix = checkInfo == null ? "" : normalizeLogicalPrefix(checkInfo.getPrefix()); + assertNotInternalPrefix(prefix); + + Path stagingRoot = resolveStagingRootPath(datasetRoot, checkInfo, uploadedFiles); + BusinessAssert.notNull(stagingRoot, CommonErrorCode.PARAM_ERROR); + + dataset.setFiles(new ArrayList<>(datasetFileRepository.findAllVisibleByDatasetId(datasetId))); + for (FileUploadResult fileResult : uploadedFiles) { + commitSingleUploadedFile(dataset, datasetRoot, stagingRoot, prefix, fileResult); + } + + dataset.active(); + datasetRepository.updateById(dataset); + + if (cleanupStagingAfterCommit) { + scheduleCleanupStagingDirAfterCommit(stagingRoot); + } + } + + private void commitSingleUploadedFile(Dataset dataset, + Path datasetRoot, + Path stagingRoot, + String prefix, + FileUploadResult fileResult) { + if (dataset == null || fileResult == null || fileResult.getSavedFile() == null) { + return; + } + Path incomingPath = Paths.get(fileResult.getSavedFile().getPath()).toAbsolutePath().normalize(); + BusinessAssert.isTrue(incomingPath.startsWith(stagingRoot), CommonErrorCode.PARAM_ERROR); + + String relativePath = stagingRoot.relativize(incomingPath).toString().replace(File.separator, "/"); + String logicalPath = joinLogicalPath(prefix, relativePath); + assertNotInternalPrefix(logicalPath); + + commitNewFileVersion(dataset, datasetRoot, logicalPath, incomingPath, true); + } + + private DatasetFile commitNewFileVersion(Dataset dataset, + Path datasetRoot, + String logicalPath, + Path incomingFilePath, + boolean moveIncoming) { + BusinessAssert.notNull(dataset, CommonErrorCode.PARAM_ERROR); + BusinessAssert.isTrue(datasetRoot != null && Files.exists(datasetRoot), CommonErrorCode.PARAM_ERROR); + + String normalizedLogicalPath = normalizeLogicalPath(logicalPath); + BusinessAssert.isTrue(!normalizedLogicalPath.isEmpty(), CommonErrorCode.PARAM_ERROR); + assertNotInternalPrefix(normalizedLogicalPath); + + Path targetFilePath = datasetRoot.resolve(normalizedLogicalPath.replace("/", File.separator)) + .toAbsolutePath() + .normalize(); + BusinessAssert.isTrue(targetFilePath.startsWith(datasetRoot), CommonErrorCode.PARAM_ERROR); + + DuplicateMethod effectiveDuplicateMethod = resolveEffectiveDuplicateMethod(); + DatasetFile latest = datasetFileRepository.findLatestByDatasetIdAndLogicalPath(dataset.getId(), normalizedLogicalPath); + if (latest == null && dataset.getFiles() != null) { + latest = dataset.getFiles().stream() + .filter(existing -> isSameNormalizedPath(existing == null ? null : existing.getFilePath(), targetFilePath.toString())) + .findFirst() + .orElse(null); + } + if (latest != null && effectiveDuplicateMethod == DuplicateMethod.ERROR) { + throw BusinessException.of(DataManagementErrorCode.DATASET_FILE_ALREADY_EXISTS); + } + + long nextVersion = 1L; + if (latest != null) { + long latestVersion = Optional.ofNullable(latest.getVersion()).orElse(1L); + if (latest.getVersion() == null) { + latest.setVersion(latestVersion); + } + if (latest.getLogicalPath() == null || latest.getLogicalPath().isBlank()) { + latest.setLogicalPath(normalizedLogicalPath); + } + nextVersion = latestVersion + 1L; + } + + if (latest != null && effectiveDuplicateMethod == DuplicateMethod.VERSION) { + Path archivedPath = archiveDatasetFileVersion(datasetRoot, normalizedLogicalPath, latest); + if (archivedPath != null) { + latest.setFilePath(archivedPath.toString()); + } else if (Files.exists(targetFilePath)) { + log.error("Failed to archive latest file, refuse to overwrite. datasetId={}, fileId={}, logicalPath={}, targetPath={}", + dataset.getId(), latest.getId(), normalizedLogicalPath, targetFilePath); + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); + } + latest.setStatus(FILE_STATUS_ARCHIVED); + datasetFileRepository.updateById(latest); + dataset.removeFile(latest); + } else if (latest == null && Files.exists(targetFilePath)) { + archiveOrphanTargetFile(datasetRoot, normalizedLogicalPath, targetFilePath); + } + + try { + Files.createDirectories(targetFilePath.getParent()); + if (moveIncoming) { + Files.move(incomingFilePath, targetFilePath, java.nio.file.StandardCopyOption.REPLACE_EXISTING); + } else { + Files.copy(incomingFilePath, targetFilePath, java.nio.file.StandardCopyOption.REPLACE_EXISTING); + } + } catch (IOException e) { + log.error("Failed to write dataset file, datasetId={}, logicalPath={}, targetPath={}", + dataset.getId(), normalizedLogicalPath, targetFilePath, e); + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); + } + + LocalDateTime currentTime = LocalDateTime.now(); + String fileName = targetFilePath.getFileName().toString(); + long fileSize; + try { + fileSize = Files.size(targetFilePath); + } catch (IOException e) { + fileSize = 0L; + } + + DatasetFile datasetFile = DatasetFile.builder() + .id(UUID.randomUUID().toString()) + .datasetId(dataset.getId()) + .fileName(fileName) + .fileType(AnalyzerUtils.getExtension(fileName)) + .fileSize(fileSize) + .filePath(targetFilePath.toString()) + .logicalPath(normalizedLogicalPath) + .version(nextVersion) + .status(FILE_STATUS_ACTIVE) + .uploadTime(currentTime) + .lastAccessTime(currentTime) + .build(); + + datasetFileRepository.saveOrUpdate(datasetFile); + dataset.addFile(datasetFile); + triggerPdfTextExtraction(dataset, datasetFile); + return datasetFile; + } + + private DuplicateMethod resolveEffectiveDuplicateMethod() { + if (duplicateMethod == null) { + return DuplicateMethod.VERSION; + } + if (duplicateMethod == DuplicateMethod.COVER) { + log.warn("duplicateMethod=COVER 会导致标注引用的 fileId 对应内容被覆盖,已强制按 VERSION 处理。"); + return DuplicateMethod.VERSION; + } + return duplicateMethod; + } + + private Path archiveDatasetFileVersion(Path datasetRoot, String logicalPath, DatasetFile latest) { + if (latest == null || latest.getId() == null || latest.getId().isBlank()) { + return null; + } + Path currentPath; + try { + currentPath = Paths.get(latest.getFilePath()).toAbsolutePath().normalize(); + } catch (Exception e) { + log.warn("Invalid latest file path, skip archiving. datasetId={}, fileId={}, filePath={}", + latest.getDatasetId(), latest.getId(), latest.getFilePath()); + return null; + } + + if (!Files.exists(currentPath) || !Files.isRegularFile(currentPath)) { + log.warn("Latest file not found on disk, skip archiving. datasetId={}, fileId={}, filePath={}", + latest.getDatasetId(), latest.getId(), currentPath); + return null; + } + if (!currentPath.startsWith(datasetRoot)) { + log.warn("Latest file path out of dataset root, skip archiving. datasetId={}, fileId={}, filePath={}", + latest.getDatasetId(), latest.getId(), currentPath); + return null; + } + + long latestVersion = Optional.ofNullable(latest.getVersion()).orElse(1L); + String logicalPathHash = sha256Hex(logicalPath); + Path archiveDir = datasetRoot + .resolve(INTERNAL_DIR_NAME) + .resolve(INTERNAL_VERSIONS_DIR_NAME) + .resolve(logicalPathHash) + .resolve("v" + latestVersion) + .toAbsolutePath() + .normalize(); + BusinessAssert.isTrue(archiveDir.startsWith(datasetRoot), CommonErrorCode.PARAM_ERROR); + + try { + Files.createDirectories(archiveDir); + } catch (IOException e) { + log.error("Failed to create archive dir: {}", archiveDir, e); + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); + } + + String fileName = sanitizeArchiveFileName(Optional.ofNullable(latest.getFileName()).orElse(currentPath.getFileName().toString())); + Path archivedPath = archiveDir.resolve(latest.getId() + "__" + fileName).toAbsolutePath().normalize(); + BusinessAssert.isTrue(archivedPath.startsWith(archiveDir), CommonErrorCode.PARAM_ERROR); + + try { + Files.move(currentPath, archivedPath, java.nio.file.StandardCopyOption.REPLACE_EXISTING); + return archivedPath; + } catch (IOException e) { + log.error("Failed to archive latest file, datasetId={}, fileId={}, from={}, to={}", + latest.getDatasetId(), latest.getId(), currentPath, archivedPath, e); + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); + } + } + + private void archiveOrphanTargetFile(Path datasetRoot, String logicalPath, Path targetFilePath) { + if (datasetRoot == null || targetFilePath == null) { + return; + } + if (!Files.exists(targetFilePath) || !Files.isRegularFile(targetFilePath)) { + return; + } + String logicalPathHash = sha256Hex(logicalPath); + Path orphanDir = datasetRoot + .resolve(INTERNAL_DIR_NAME) + .resolve(INTERNAL_VERSIONS_DIR_NAME) + .resolve(logicalPathHash) + .resolve("orphan") + .toAbsolutePath() + .normalize(); + if (!orphanDir.startsWith(datasetRoot)) { + return; + } + try { + Files.createDirectories(orphanDir); + String safeName = sanitizeArchiveFileName(targetFilePath.getFileName().toString()); + Path orphanPath = orphanDir.resolve("orphan_" + System.currentTimeMillis() + "__" + safeName) + .toAbsolutePath() + .normalize(); + if (!orphanPath.startsWith(orphanDir)) { + return; + } + Files.move(targetFilePath, orphanPath, java.nio.file.StandardCopyOption.REPLACE_EXISTING); + } catch (Exception e) { + log.warn("Failed to archive orphan target file, logicalPath={}, targetPath={}", logicalPath, targetFilePath, e); + } + } + + /** + * 在数据集下创建子目录 + */ + @Transactional + public void createDirectory(String datasetId, CreateDirectoryRequest req) { + Dataset dataset = datasetRepository.getById(datasetId); + if (dataset == null) { + throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND); + } + String datasetPath = dataset.getPath(); + String parentPrefix = Optional.ofNullable(req.getParentPrefix()).orElse("").trim(); + parentPrefix = parentPrefix.replace("\\", "/"); + while (parentPrefix.startsWith("/")) { + parentPrefix = parentPrefix.substring(1); + } + parentPrefix = normalizeLogicalPrefix(parentPrefix); + assertNotInternalPrefix(parentPrefix); + + String directoryName = Optional.ofNullable(req.getDirectoryName()).orElse("").trim(); + if (directoryName.isEmpty()) { + throw BusinessException.of(CommonErrorCode.PARAM_ERROR); + } + if (INTERNAL_DIR_NAME.equals(directoryName)) { + throw BusinessException.of(CommonErrorCode.PARAM_ERROR); + } + if (directoryName.contains("..") || directoryName.contains("/") || directoryName.contains("\\")) { + throw BusinessException.of(CommonErrorCode.PARAM_ERROR); + } + + Path basePath = Paths.get(datasetPath); + Path targetPath = parentPrefix.isEmpty() + ? basePath.resolve(directoryName) + : basePath.resolve(parentPrefix).resolve(directoryName); + + Path normalized = targetPath.normalize(); + if (!normalized.startsWith(basePath)) { + throw BusinessException.of(CommonErrorCode.PARAM_ERROR); + } + + try { + Files.createDirectories(normalized); + } catch (IOException e) { + log.error("Failed to create directory {} for dataset {}", normalized, datasetId, e); + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); + } + } + + /** + * 下载目录为 ZIP 文件 + */ + @Transactional(readOnly = true) + public void downloadDirectory(String datasetId, String prefix, HttpServletResponse response) { + Dataset dataset = datasetRepository.getById(datasetId); + if (dataset == null) { + throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND); + } + + String datasetPath = dataset.getPath(); + prefix = Optional.ofNullable(prefix).orElse("").trim(); + prefix = prefix.replace("\\", "/"); + while (prefix.startsWith("/")) { + prefix = prefix.substring(1); + } + while (prefix.endsWith("/")) { + prefix = prefix.substring(0, prefix.length() - 1); + } + if (prefix.equals(INTERNAL_DIR_NAME) || prefix.startsWith(INTERNAL_DIR_NAME + "/")) { + throw BusinessException.of(CommonErrorCode.PARAM_ERROR); + } + + Path basePath = Paths.get(datasetPath); + Path targetPath = prefix.isEmpty() ? basePath : basePath.resolve(prefix); + Path normalized = targetPath.normalize(); + + if (!normalized.startsWith(basePath)) { + throw BusinessException.of(CommonErrorCode.PARAM_ERROR); + } + + if (!Files.exists(normalized) || !Files.isDirectory(normalized)) { + throw BusinessException.of(DataManagementErrorCode.DIRECTORY_NOT_FOUND); + } + + String zipFileName = prefix.isEmpty() ? dataset.getName() : prefix.replace("/", "_"); + zipFileName = zipFileName + "_" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss")) + ".zip"; + + try { + response.setContentType("application/zip"); + response.setHeader(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + zipFileName + "\""); + + try (ZipArchiveOutputStream zipOut = new ZipArchiveOutputStream(response.getOutputStream())) { + zipDirectory(normalized, normalized, zipOut); + zipOut.finish(); + } + } catch (IOException e) { + log.error("Failed to download directory {} for dataset {}", normalized, datasetId, e); + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); + } + } + + /** + * 递归压缩目录 + */ + private void zipDirectory(Path sourceDir, Path basePath, ZipArchiveOutputStream zipOut) throws IOException { + try (Stream paths = Files.walk(sourceDir)) { + paths.filter(path -> !Files.isDirectory(path)) + .filter(path -> !isInternalDatasetPath(basePath.toAbsolutePath().normalize(), path)) + .forEach(path -> { + try { + Path relativePath = basePath.relativize(path); + ZipArchiveEntry zipEntry = new ZipArchiveEntry(relativePath.toString()); + zipOut.putArchiveEntry(zipEntry); + try (InputStream fis = Files.newInputStream(path)) { + IOUtils.copy(fis, zipOut); + } + zipOut.closeArchiveEntry(); + } catch (IOException e) { + log.error("Failed to add file to zip: {}", path, e); + } + }); + } + } + + /** + * 删除目录及其所有内容 + */ + @Transactional + public void deleteDirectory(String datasetId, String prefix) { + Dataset dataset = datasetRepository.getById(datasetId); + if (dataset == null) { + throw BusinessException.of(DataManagementErrorCode.DATASET_NOT_FOUND); + } + + prefix = Optional.ofNullable(prefix).orElse("").trim(); + prefix = prefix.replace("\\", "/"); + while (prefix.startsWith("/")) { + prefix = prefix.substring(1); + } + while (prefix.endsWith("/")) { + prefix = prefix.substring(0, prefix.length() - 1); + } + + if (prefix.isEmpty()) { + throw BusinessException.of(CommonErrorCode.PARAM_ERROR); + } + if (prefix.equals(INTERNAL_DIR_NAME) || prefix.startsWith(INTERNAL_DIR_NAME + "/")) { + throw BusinessException.of(CommonErrorCode.PARAM_ERROR); + } + + String datasetPath = dataset.getPath(); + Path basePath = Paths.get(datasetPath); + Path targetPath = basePath.resolve(prefix); + Path normalized = targetPath.normalize(); + + if (!normalized.startsWith(basePath)) { + throw BusinessException.of(CommonErrorCode.PARAM_ERROR); + } + + if (!Files.exists(normalized) || !Files.isDirectory(normalized)) { + throw BusinessException.of(DataManagementErrorCode.DIRECTORY_NOT_FOUND); + } + + // 删除数据库中该目录下的所有文件记录(基于数据集内相对路径判断) + String datasetPathNorm = datasetPath.replace("\\", "/"); + String logicalPrefix = prefix; // 已经去掉首尾斜杠 + List filesToDelete = datasetFileRepository.findAllByDatasetId(datasetId).stream() + .filter(file -> { + if (file.getFilePath() == null) { + return false; + } + String filePath = file.getFilePath().replace("\\", "/"); + if (!filePath.startsWith(datasetPathNorm)) { + return false; + } + String relative = filePath.substring(datasetPathNorm.length()); + while (relative.startsWith("/")) { + relative = relative.substring(1); + } + return relative.equals(logicalPrefix) || relative.startsWith(logicalPrefix + "/"); + }) + .collect(Collectors.toList()); + + for (DatasetFile file : filesToDelete) { + datasetFileRepository.removeById(file.getId()); + datasetFilePreviewService.deletePreviewFileQuietly(datasetId, file.getId()); + } + + // 删除文件系统中的目录 + try { + deleteDirectoryRecursively(normalized); + } catch (IOException e) { + log.error("Failed to delete directory {} for dataset {}", normalized, datasetId, e); + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR); + } + + // 更新数据集 + dataset.setFiles(filesToDelete); + for (DatasetFile file : filesToDelete) { + dataset.removeFile(file); + } + datasetRepository.updateById(dataset); + } + + /** + * 递归删除目录 + */ + private void deleteDirectoryRecursively(Path directory) throws IOException { + try (Stream paths = Files.walk(directory)) { + paths.sorted(Comparator.reverseOrder()) + .forEach(path -> { + try { + Files.delete(path); + } catch (IOException e) { + log.error("Failed to delete: {}", path, e); + } + }); + } + } + + /** + * 复制文件到数据集目录 + * + * @param datasetId 数据集id + * @param req 复制文件请求 + * @return 复制的文件列表 + */ + @Transactional + public List copyFilesToDatasetDir(String datasetId, CopyFilesRequest req) { + Dataset dataset = datasetRepository.getById(datasetId); + BusinessAssert.notNull(dataset, SystemErrorCode.RESOURCE_NOT_FOUND); + Path datasetRoot = resolveDatasetRootPath(dataset, datasetId); + dataset.setFiles(new ArrayList<>(datasetFileRepository.findAllVisibleByDatasetId(datasetId))); + List copiedFiles = new ArrayList<>(); + for (String sourceFilePath : req.sourcePaths()) { + Path sourcePath = Paths.get(sourceFilePath).toAbsolutePath().normalize(); + if (!Files.exists(sourcePath) || !Files.isRegularFile(sourcePath)) { + log.warn("Source file does not exist or is not a regular file: {}", sourceFilePath); + continue; + } + String logicalPath = sourcePath.getFileName().toString(); + DatasetFile datasetFile = commitNewFileVersion(dataset, datasetRoot, logicalPath, sourcePath, false); + copiedFiles.add(datasetFile); + } + dataset.active(); + datasetRepository.updateById(dataset); + return copiedFiles; + } + + /** + * 复制文件到数据集目录(保留相对路径,适用于数据源导入) + * + * @param datasetId 数据集id + * @param sourceRoot 数据源根目录 + * @param sourcePaths 源文件路径列表 + * @return 复制的文件列表 + */ + @Transactional + public List copyFilesToDatasetDirWithSourceRoot(String datasetId, Path sourceRoot, List sourcePaths) { + Dataset dataset = datasetRepository.getById(datasetId); + BusinessAssert.notNull(dataset, SystemErrorCode.RESOURCE_NOT_FOUND); + Path datasetRoot = resolveDatasetRootPath(dataset, datasetId); + Path normalizedRoot = sourceRoot.toAbsolutePath().normalize(); + dataset.setFiles(new ArrayList<>(datasetFileRepository.findAllVisibleByDatasetId(datasetId))); + + List copiedFiles = new ArrayList<>(); + for (String sourceFilePath : sourcePaths) { + if (sourceFilePath == null || sourceFilePath.isBlank()) { + continue; + } + Path sourcePath = Paths.get(sourceFilePath).toAbsolutePath().normalize(); + if (!sourcePath.startsWith(normalizedRoot)) { + log.warn("Source file path is out of root: {}", sourceFilePath); + continue; + } + if (!Files.exists(sourcePath) || !Files.isRegularFile(sourcePath)) { + log.warn("Source file does not exist or is not a regular file: {}", sourceFilePath); + continue; + } + Path relativePath = normalizedRoot.relativize(sourcePath); + String logicalPath = relativePath.toString().replace("\\", "/"); + DatasetFile datasetFile = commitNewFileVersion(dataset, datasetRoot, logicalPath, sourcePath, false); + copiedFiles.add(datasetFile); + } + dataset.active(); + datasetRepository.updateById(dataset); + return copiedFiles; + } + + /** + * 添加文件到数据集(仅创建数据库记录,不执行文件系统操作) + * + * @param datasetId 数据集id + * @param req 添加文件请求 + * @return 添加的文件列表 + */ + @Transactional + public List addFilesToDataset(String datasetId, AddFilesRequest req) { + Dataset dataset = datasetRepository.getById(datasetId); + BusinessAssert.notNull(dataset, SystemErrorCode.RESOURCE_NOT_FOUND); + List addedFiles = new ArrayList<>(); + dataset.setFiles(new ArrayList<>(datasetFileRepository.findAllVisibleByDatasetId(datasetId))); + + boolean softAdd = req.softAdd(); + String metadata; + try { + Map metadataMap = Map.of("softAdd", softAdd); + ObjectMapper objectMapper = new ObjectMapper(); + metadata = objectMapper.writeValueAsString(metadataMap); + } catch (JsonProcessingException e) { + log.error("Failed to serialize metadataMap", e); + throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR); + } + + for (String sourceFilePath : req.sourcePaths()) { + Path sourcePath = Paths.get(sourceFilePath); + String fileName = sourcePath.getFileName().toString(); + File sourceFile = sourcePath.toFile(); + String logicalPath = normalizeLogicalPath(fileName); + assertNotInternalPrefix(logicalPath); + + DatasetFile latest = datasetFileRepository.findLatestByDatasetIdAndLogicalPath(datasetId, logicalPath); + if (latest == null && dataset.getFiles() != null) { + latest = dataset.getFiles().stream() + .filter(existing -> existing != null + && !isArchivedStatus(existing) + && Objects.equals(existing.getFileName(), fileName)) + .findFirst() + .orElse(null); + } + + DuplicateMethod effectiveDuplicateMethod = resolveEffectiveDuplicateMethod(); + if (latest != null && effectiveDuplicateMethod == DuplicateMethod.ERROR) { + throw BusinessException.of(DataManagementErrorCode.DATASET_FILE_ALREADY_EXISTS); + } + + long nextVersion = 1L; + if (latest != null) { + long latestVersion = Optional.ofNullable(latest.getVersion()).orElse(1L); + if (latest.getVersion() == null) { + latest.setVersion(latestVersion); + } + if (latest.getLogicalPath() == null || latest.getLogicalPath().isBlank()) { + latest.setLogicalPath(logicalPath); + } + nextVersion = latestVersion + 1L; + } + + if (latest != null && effectiveDuplicateMethod == DuplicateMethod.VERSION) { + latest.setStatus(FILE_STATUS_ARCHIVED); + datasetFileRepository.updateById(latest); + dataset.removeFile(latest); + } + + LocalDateTime currentTime = LocalDateTime.now(); + DatasetFile datasetFile = DatasetFile.builder() + .id(UUID.randomUUID().toString()) + .datasetId(datasetId) + .fileName(fileName) + .fileType(AnalyzerUtils.getExtension(fileName)) + .fileSize(sourceFile.length()) + .filePath(sourceFilePath) + .logicalPath(logicalPath) + .version(nextVersion) + .status(FILE_STATUS_ACTIVE) + .uploadTime(currentTime) + .lastAccessTime(currentTime) + .metadata(metadata) + .build(); + + datasetFileRepository.saveOrUpdate(datasetFile); + dataset.addFile(datasetFile); + addedFiles.add(datasetFile); + triggerPdfTextExtraction(dataset, datasetFile); + } + dataset.active(); + datasetRepository.updateById(dataset); + // Note: addFilesToDataset only creates DB records, no file system operations + // If file copy is needed, use copyFilesToDatasetDir endpoint instead + return addedFiles; + } + + private void triggerPdfTextExtraction(Dataset dataset, DatasetFile datasetFile) { + if (dataset == null || datasetFile == null) { + return; + } + if (dataset.getDatasetType() != DatasetType.TEXT) { + return; + } + String fileType = datasetFile.getFileType(); + if (fileType == null || !DOCUMENT_TEXT_FILE_TYPES.contains(fileType.toLowerCase(Locale.ROOT))) { + return; + } + String datasetId = dataset.getId(); + String fileId = datasetFile.getId(); + if (datasetId == null || fileId == null) { + return; + } + if (TransactionSynchronizationManager.isSynchronizationActive()) { + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCommit() { + pdfTextExtractAsyncService.extractPdfText(datasetId, fileId); + } + }); + return; + } + pdfTextExtractAsyncService.extractPdfText(datasetId, fileId); + } +}