From 9a205919d7f4f68ae6a7ac0464db0bc89fbe3c13 Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Fri, 30 Jan 2026 18:58:34 +0800 Subject: [PATCH] =?UTF-8?q?refactor(data-import):=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E6=BA=90=E6=96=87=E4=BB=B6=E6=89=AB=E6=8F=8F?= =?UTF-8?q?=E5=92=8C=E5=A4=8D=E5=88=B6=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 修改数据源文件扫描方法,直接在主流程中获取任务详情和路径 - 移除独立的getFilePaths方法,将路径扫描逻辑整合到scanFilePaths方法中 - 新增copyFilesToDatasetDirWithSourceRoot方法支持保留相对路径的文件复制 - 更新数据集文件应用服务中的文件复制逻辑,支持相对路径处理 - 修改Python后端项目接口中的文件查询逻辑,移除注释掉的编辑器服务引用 - 调整文件过滤逻辑,基于元数据中的派生源ID进行文件筛选 - 移除编辑器服务中已废弃的源文档过滤条件 --- .../DatasetApplicationService.java | 33 ++-- .../DatasetFileApplicationService.java | 142 +++++++++++++++--- .../module/annotation/interface/project.py | 30 +++- .../app/module/annotation/service/editor.py | 9 -- 4 files changed, 156 insertions(+), 58 deletions(-) diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java index 9f987ea..c31cacf 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java @@ -414,33 +414,32 @@ public class DatasetApplicationService { public void processDataSourceAsync(String datasetId, String dataSourceId) { try { log.info("Initiating data source file scanning, dataset ID: {}, collection task ID: {}", datasetId, dataSourceId); - List filePaths = getFilePaths(dataSourceId); + CollectionTaskDetailResponse taskDetail = collectionTaskClient.getTaskDetail(dataSourceId).getData(); + if (taskDetail == null) { + log.warn("Fail to get collection task detail, task ID: {}", dataSourceId); + return; + } + Path targetPath = Paths.get(taskDetail.getTargetPath()); + if (!Files.exists(targetPath) || !Files.isDirectory(targetPath)) { + log.warn("Target path not exists or is not a directory: {}", taskDetail.getTargetPath()); + return; + } + List filePaths = scanFilePaths(targetPath); if (CollectionUtils.isEmpty(filePaths)) { return; } - datasetFileApplicationService.copyFilesToDatasetDir(datasetId, new CopyFilesRequest(filePaths)); + datasetFileApplicationService.copyFilesToDatasetDirWithSourceRoot(datasetId, targetPath, filePaths); log.info("Success file scan, total files: {}", filePaths.size()); } catch (Exception e) { log.error("处理数据源文件扫描失败,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId, e); } } - private List getFilePaths(String dataSourceId) { - CollectionTaskDetailResponse taskDetail = collectionTaskClient.getTaskDetail(dataSourceId).getData(); - if (taskDetail == null) { - log.warn("Fail to get collection task detail, task ID: {}", dataSourceId); - return Collections.emptyList(); - } - Path targetPath = Paths.get(taskDetail.getTargetPath()); - if (!Files.exists(targetPath) || !Files.isDirectory(targetPath)) { - log.warn("Target path not exists or is not a directory: {}", taskDetail.getTargetPath()); - return Collections.emptyList(); - } - - try (Stream paths = Files.walk(targetPath, 1)) { + private List scanFilePaths(Path targetPath) { + try (Stream paths = Files.walk(targetPath)) { return paths - .filter(Files::isRegularFile) // 只保留文件,排除目录 - .map(Path::toString) // 转换为字符串路径 + .filter(Files::isRegularFile) + .map(Path::toString) .collect(Collectors.toList()); } catch (IOException e) { log.error("Fail to scan directory: {}", targetPath, e); diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java index 9366f48..2fcb373 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetFileApplicationService.java @@ -695,17 +695,17 @@ public class DatasetFileApplicationService { } } - /** - * 复制文件到数据集目录 - * - * @param datasetId 数据集id - * @param req 复制文件请求 - * @return 复制的文件列表 - */ - @Transactional - public List copyFilesToDatasetDir(String datasetId, CopyFilesRequest req) { - Dataset dataset = datasetRepository.getById(datasetId); - BusinessAssert.notNull(dataset, SystemErrorCode.RESOURCE_NOT_FOUND); + /** + * 复制文件到数据集目录 + * + * @param datasetId 数据集id + * @param req 复制文件请求 + * @return 复制的文件列表 + */ + @Transactional + public List copyFilesToDatasetDir(String datasetId, CopyFilesRequest req) { + Dataset dataset = datasetRepository.getById(datasetId); + BusinessAssert.notNull(dataset, SystemErrorCode.RESOURCE_NOT_FOUND); List copiedFiles = new ArrayList<>(); List existDatasetFiles = datasetFileRepository.findAllByDatasetId(datasetId); dataset.setFiles(existDatasetFiles); @@ -735,15 +735,80 @@ public class DatasetFileApplicationService { datasetFileRepository.saveOrUpdateBatch(copiedFiles, 100); dataset.active(); datasetRepository.updateById(dataset); - CompletableFuture.runAsync(() -> copyFilesToDatasetDir(req.sourcePaths(), dataset)); - return copiedFiles; - } - - private void copyFilesToDatasetDir(List sourcePaths, Dataset dataset) { - for (String sourcePath : sourcePaths) { - Path sourceFilePath = Paths.get(sourcePath); - Path targetFilePath = Paths.get(dataset.getPath(), sourceFilePath.getFileName().toString()); - try { + CompletableFuture.runAsync(() -> copyFilesToDatasetDir(req.sourcePaths(), dataset)); + return copiedFiles; + } + + /** + * 复制文件到数据集目录(保留相对路径,适用于数据源导入) + * + * @param datasetId 数据集id + * @param sourceRoot 数据源根目录 + * @param sourcePaths 源文件路径列表 + * @return 复制的文件列表 + */ + @Transactional + public List copyFilesToDatasetDirWithSourceRoot(String datasetId, Path sourceRoot, List sourcePaths) { + Dataset dataset = datasetRepository.getById(datasetId); + BusinessAssert.notNull(dataset, SystemErrorCode.RESOURCE_NOT_FOUND); + + Path normalizedRoot = sourceRoot.toAbsolutePath().normalize(); + List copiedFiles = new ArrayList<>(); + List existDatasetFiles = datasetFileRepository.findAllByDatasetId(datasetId); + dataset.setFiles(existDatasetFiles); + Map copyTargets = new LinkedHashMap<>(); + + for (String sourceFilePath : sourcePaths) { + if (sourceFilePath == null || sourceFilePath.isBlank()) { + continue; + } + Path sourcePath = Paths.get(sourceFilePath).toAbsolutePath().normalize(); + if (!sourcePath.startsWith(normalizedRoot)) { + log.warn("Source file path is out of root: {}", sourceFilePath); + continue; + } + if (!Files.exists(sourcePath) || !Files.isRegularFile(sourcePath)) { + log.warn("Source file does not exist or is not a regular file: {}", sourceFilePath); + continue; + } + + Path relativePath = normalizedRoot.relativize(sourcePath); + String fileName = sourcePath.getFileName().toString(); + File sourceFile = sourcePath.toFile(); + LocalDateTime currentTime = LocalDateTime.now(); + Path targetPath = Paths.get(dataset.getPath(), relativePath.toString()); + + DatasetFile datasetFile = DatasetFile.builder() + .id(UUID.randomUUID().toString()) + .datasetId(datasetId) + .fileName(fileName) + .fileType(AnalyzerUtils.getExtension(fileName)) + .fileSize(sourceFile.length()) + .filePath(targetPath.toString()) + .uploadTime(currentTime) + .lastAccessTime(currentTime) + .build(); + setDatasetFileId(datasetFile, dataset); + dataset.addFile(datasetFile); + copiedFiles.add(datasetFile); + copyTargets.put(sourceFilePath, datasetFile); + } + + if (copiedFiles.isEmpty()) { + return copiedFiles; + } + datasetFileRepository.saveOrUpdateBatch(copiedFiles, 100); + dataset.active(); + datasetRepository.updateById(dataset); + CompletableFuture.runAsync(() -> copyFilesToDatasetDirWithRelativePath(copyTargets, dataset, normalizedRoot)); + return copiedFiles; + } + + private void copyFilesToDatasetDir(List sourcePaths, Dataset dataset) { + for (String sourcePath : sourcePaths) { + Path sourceFilePath = Paths.get(sourcePath); + Path targetFilePath = Paths.get(dataset.getPath(), sourceFilePath.getFileName().toString()); + try { Files.createDirectories(Path.of(dataset.getPath())); Files.copy(sourceFilePath, targetFilePath); DatasetFile datasetFile = datasetFileRepository.findByDatasetIdAndFileName( @@ -753,10 +818,39 @@ public class DatasetFileApplicationService { triggerPdfTextExtraction(dataset, datasetFile); } catch (IOException e) { log.error("Failed to copy file from {} to {}", sourcePath, targetFilePath, e); - } - } - } - + } + } + } + + private void copyFilesToDatasetDirWithRelativePath( + Map copyTargets, + Dataset dataset, + Path sourceRoot + ) { + Path datasetRoot = Paths.get(dataset.getPath()).toAbsolutePath().normalize(); + Path normalizedRoot = sourceRoot.toAbsolutePath().normalize(); + for (Map.Entry entry : copyTargets.entrySet()) { + Path sourcePath = Paths.get(entry.getKey()).toAbsolutePath().normalize(); + if (!sourcePath.startsWith(normalizedRoot)) { + log.warn("Source file path is out of root: {}", sourcePath); + continue; + } + Path relativePath = normalizedRoot.relativize(sourcePath); + Path targetFilePath = datasetRoot.resolve(relativePath).normalize(); + if (!targetFilePath.startsWith(datasetRoot)) { + log.warn("Target file path is out of dataset path: {}", targetFilePath); + continue; + } + try { + Files.createDirectories(targetFilePath.getParent()); + Files.copy(sourcePath, targetFilePath); + triggerPdfTextExtraction(dataset, entry.getValue()); + } catch (IOException e) { + log.error("Failed to copy file from {} to {}", sourcePath, targetFilePath, e); + } + } + } + /** * 添加文件到数据集(仅创建数据库记录,不执行文件系统操作) * diff --git a/runtime/datamate-python/app/module/annotation/interface/project.py b/runtime/datamate-python/app/module/annotation/interface/project.py index 3aac08a..1ce14ef 100644 --- a/runtime/datamate-python/app/module/annotation/interface/project.py +++ b/runtime/datamate-python/app/module/annotation/interface/project.py @@ -12,7 +12,6 @@ from app.module.shared.schema import StandardResponse, PaginatedData from app.module.dataset import DatasetManagementService from app.core.logging import get_logger -from app.module.annotation.service.editor import AnnotationEditorService from ..service.mapping import DatasetMappingService from ..service.template import AnnotationTemplateService from ..schema import ( @@ -118,15 +117,30 @@ async def create_mapping( configuration=project_configuration or None, ) - file_query = select(DatasetFiles.id).where( - DatasetFiles.dataset_id == request.dataset_id + file_result = await db.execute( + select(DatasetFiles).where(DatasetFiles.dataset_id == request.dataset_id) ) + file_records = file_result.scalars().all() + snapshot_file_ids: list[str] = [] if dataset_type == TEXT_DATASET_TYPE: - file_query = file_query.where( - ~AnnotationEditorService._build_source_document_filter() - ) - file_result = await db.execute(file_query) - snapshot_file_ids = [str(fid) for fid in file_result.scalars().all()] + derived_source_ids = set() + for file_record in file_records: + metadata = getattr(file_record, "dataset_filemetadata", None) + if isinstance(metadata, dict): + source_id = metadata.get("derived_from_file_id") + if source_id: + derived_source_ids.add(str(source_id)) + snapshot_file_ids = [ + str(file_record.id) + for file_record in file_records + if file_record.id and str(file_record.id) not in derived_source_ids + ] + else: + snapshot_file_ids = [ + str(file_record.id) + for file_record in file_records + if file_record.id + ] # 创建映射关系并写入快照 mapping = await mapping_service.create_mapping_with_snapshot( diff --git a/runtime/datamate-python/app/module/annotation/service/editor.py b/runtime/datamate-python/app/module/annotation/service/editor.py index 8d0945c..07824bb 100644 --- a/runtime/datamate-python/app/module/annotation/service/editor.py +++ b/runtime/datamate-python/app/module/annotation/service/editor.py @@ -429,19 +429,10 @@ class AnnotationEditorService: exclude_source_documents: Optional[bool] = None, ) -> EditorTaskListResponse: project = await self._get_project_or_404(project_id) - dataset_type = self._normalize_dataset_type(await self._get_dataset_type(project.dataset_id)) - should_exclude_source_documents = False - if dataset_type == DATASET_TYPE_TEXT: - should_exclude_source_documents = ( - exclude_source_documents if exclude_source_documents is not None else True - ) - base_conditions = [ LabelingProjectFile.project_id == project_id, DatasetFiles.dataset_id == project.dataset_id, ] - if should_exclude_source_documents: - base_conditions.append(~self._build_source_document_filter()) count_result = await self.db.execute( select(func.count())