You've already forked DataMate
refactor(data-import): 优化数据源文件扫描和复制逻辑
- 修改数据源文件扫描方法,直接在主流程中获取任务详情和路径 - 移除独立的getFilePaths方法,将路径扫描逻辑整合到scanFilePaths方法中 - 新增copyFilesToDatasetDirWithSourceRoot方法支持保留相对路径的文件复制 - 更新数据集文件应用服务中的文件复制逻辑,支持相对路径处理 - 修改Python后端项目接口中的文件查询逻辑,移除注释掉的编辑器服务引用 - 调整文件过滤逻辑,基于元数据中的派生源ID进行文件筛选 - 移除编辑器服务中已废弃的源文档过滤条件
This commit is contained in:
@@ -414,33 +414,32 @@ public class DatasetApplicationService {
|
|||||||
public void processDataSourceAsync(String datasetId, String dataSourceId) {
|
public void processDataSourceAsync(String datasetId, String dataSourceId) {
|
||||||
try {
|
try {
|
||||||
log.info("Initiating data source file scanning, dataset ID: {}, collection task ID: {}", datasetId, dataSourceId);
|
log.info("Initiating data source file scanning, dataset ID: {}, collection task ID: {}", datasetId, dataSourceId);
|
||||||
List<String> filePaths = getFilePaths(dataSourceId);
|
CollectionTaskDetailResponse taskDetail = collectionTaskClient.getTaskDetail(dataSourceId).getData();
|
||||||
|
if (taskDetail == null) {
|
||||||
|
log.warn("Fail to get collection task detail, task ID: {}", dataSourceId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Path targetPath = Paths.get(taskDetail.getTargetPath());
|
||||||
|
if (!Files.exists(targetPath) || !Files.isDirectory(targetPath)) {
|
||||||
|
log.warn("Target path not exists or is not a directory: {}", taskDetail.getTargetPath());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
List<String> filePaths = scanFilePaths(targetPath);
|
||||||
if (CollectionUtils.isEmpty(filePaths)) {
|
if (CollectionUtils.isEmpty(filePaths)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
datasetFileApplicationService.copyFilesToDatasetDir(datasetId, new CopyFilesRequest(filePaths));
|
datasetFileApplicationService.copyFilesToDatasetDirWithSourceRoot(datasetId, targetPath, filePaths);
|
||||||
log.info("Success file scan, total files: {}", filePaths.size());
|
log.info("Success file scan, total files: {}", filePaths.size());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("处理数据源文件扫描失败,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId, e);
|
log.error("处理数据源文件扫描失败,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<String> getFilePaths(String dataSourceId) {
|
private List<String> scanFilePaths(Path targetPath) {
|
||||||
CollectionTaskDetailResponse taskDetail = collectionTaskClient.getTaskDetail(dataSourceId).getData();
|
try (Stream<Path> paths = Files.walk(targetPath)) {
|
||||||
if (taskDetail == null) {
|
|
||||||
log.warn("Fail to get collection task detail, task ID: {}", dataSourceId);
|
|
||||||
return Collections.emptyList();
|
|
||||||
}
|
|
||||||
Path targetPath = Paths.get(taskDetail.getTargetPath());
|
|
||||||
if (!Files.exists(targetPath) || !Files.isDirectory(targetPath)) {
|
|
||||||
log.warn("Target path not exists or is not a directory: {}", taskDetail.getTargetPath());
|
|
||||||
return Collections.emptyList();
|
|
||||||
}
|
|
||||||
|
|
||||||
try (Stream<Path> paths = Files.walk(targetPath, 1)) {
|
|
||||||
return paths
|
return paths
|
||||||
.filter(Files::isRegularFile) // 只保留文件,排除目录
|
.filter(Files::isRegularFile)
|
||||||
.map(Path::toString) // 转换为字符串路径
|
.map(Path::toString)
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
log.error("Fail to scan directory: {}", targetPath, e);
|
log.error("Fail to scan directory: {}", targetPath, e);
|
||||||
|
|||||||
@@ -695,17 +695,17 @@ public class DatasetFileApplicationService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 复制文件到数据集目录
|
* 复制文件到数据集目录
|
||||||
*
|
*
|
||||||
* @param datasetId 数据集id
|
* @param datasetId 数据集id
|
||||||
* @param req 复制文件请求
|
* @param req 复制文件请求
|
||||||
* @return 复制的文件列表
|
* @return 复制的文件列表
|
||||||
*/
|
*/
|
||||||
@Transactional
|
@Transactional
|
||||||
public List<DatasetFile> copyFilesToDatasetDir(String datasetId, CopyFilesRequest req) {
|
public List<DatasetFile> copyFilesToDatasetDir(String datasetId, CopyFilesRequest req) {
|
||||||
Dataset dataset = datasetRepository.getById(datasetId);
|
Dataset dataset = datasetRepository.getById(datasetId);
|
||||||
BusinessAssert.notNull(dataset, SystemErrorCode.RESOURCE_NOT_FOUND);
|
BusinessAssert.notNull(dataset, SystemErrorCode.RESOURCE_NOT_FOUND);
|
||||||
List<DatasetFile> copiedFiles = new ArrayList<>();
|
List<DatasetFile> copiedFiles = new ArrayList<>();
|
||||||
List<DatasetFile> existDatasetFiles = datasetFileRepository.findAllByDatasetId(datasetId);
|
List<DatasetFile> existDatasetFiles = datasetFileRepository.findAllByDatasetId(datasetId);
|
||||||
dataset.setFiles(existDatasetFiles);
|
dataset.setFiles(existDatasetFiles);
|
||||||
@@ -735,15 +735,80 @@ public class DatasetFileApplicationService {
|
|||||||
datasetFileRepository.saveOrUpdateBatch(copiedFiles, 100);
|
datasetFileRepository.saveOrUpdateBatch(copiedFiles, 100);
|
||||||
dataset.active();
|
dataset.active();
|
||||||
datasetRepository.updateById(dataset);
|
datasetRepository.updateById(dataset);
|
||||||
CompletableFuture.runAsync(() -> copyFilesToDatasetDir(req.sourcePaths(), dataset));
|
CompletableFuture.runAsync(() -> copyFilesToDatasetDir(req.sourcePaths(), dataset));
|
||||||
return copiedFiles;
|
return copiedFiles;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void copyFilesToDatasetDir(List<String> sourcePaths, Dataset dataset) {
|
/**
|
||||||
for (String sourcePath : sourcePaths) {
|
* 复制文件到数据集目录(保留相对路径,适用于数据源导入)
|
||||||
Path sourceFilePath = Paths.get(sourcePath);
|
*
|
||||||
Path targetFilePath = Paths.get(dataset.getPath(), sourceFilePath.getFileName().toString());
|
* @param datasetId 数据集id
|
||||||
try {
|
* @param sourceRoot 数据源根目录
|
||||||
|
* @param sourcePaths 源文件路径列表
|
||||||
|
* @return 复制的文件列表
|
||||||
|
*/
|
||||||
|
@Transactional
|
||||||
|
public List<DatasetFile> copyFilesToDatasetDirWithSourceRoot(String datasetId, Path sourceRoot, List<String> sourcePaths) {
|
||||||
|
Dataset dataset = datasetRepository.getById(datasetId);
|
||||||
|
BusinessAssert.notNull(dataset, SystemErrorCode.RESOURCE_NOT_FOUND);
|
||||||
|
|
||||||
|
Path normalizedRoot = sourceRoot.toAbsolutePath().normalize();
|
||||||
|
List<DatasetFile> copiedFiles = new ArrayList<>();
|
||||||
|
List<DatasetFile> existDatasetFiles = datasetFileRepository.findAllByDatasetId(datasetId);
|
||||||
|
dataset.setFiles(existDatasetFiles);
|
||||||
|
Map<String, DatasetFile> copyTargets = new LinkedHashMap<>();
|
||||||
|
|
||||||
|
for (String sourceFilePath : sourcePaths) {
|
||||||
|
if (sourceFilePath == null || sourceFilePath.isBlank()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Path sourcePath = Paths.get(sourceFilePath).toAbsolutePath().normalize();
|
||||||
|
if (!sourcePath.startsWith(normalizedRoot)) {
|
||||||
|
log.warn("Source file path is out of root: {}", sourceFilePath);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (!Files.exists(sourcePath) || !Files.isRegularFile(sourcePath)) {
|
||||||
|
log.warn("Source file does not exist or is not a regular file: {}", sourceFilePath);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
Path relativePath = normalizedRoot.relativize(sourcePath);
|
||||||
|
String fileName = sourcePath.getFileName().toString();
|
||||||
|
File sourceFile = sourcePath.toFile();
|
||||||
|
LocalDateTime currentTime = LocalDateTime.now();
|
||||||
|
Path targetPath = Paths.get(dataset.getPath(), relativePath.toString());
|
||||||
|
|
||||||
|
DatasetFile datasetFile = DatasetFile.builder()
|
||||||
|
.id(UUID.randomUUID().toString())
|
||||||
|
.datasetId(datasetId)
|
||||||
|
.fileName(fileName)
|
||||||
|
.fileType(AnalyzerUtils.getExtension(fileName))
|
||||||
|
.fileSize(sourceFile.length())
|
||||||
|
.filePath(targetPath.toString())
|
||||||
|
.uploadTime(currentTime)
|
||||||
|
.lastAccessTime(currentTime)
|
||||||
|
.build();
|
||||||
|
setDatasetFileId(datasetFile, dataset);
|
||||||
|
dataset.addFile(datasetFile);
|
||||||
|
copiedFiles.add(datasetFile);
|
||||||
|
copyTargets.put(sourceFilePath, datasetFile);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (copiedFiles.isEmpty()) {
|
||||||
|
return copiedFiles;
|
||||||
|
}
|
||||||
|
datasetFileRepository.saveOrUpdateBatch(copiedFiles, 100);
|
||||||
|
dataset.active();
|
||||||
|
datasetRepository.updateById(dataset);
|
||||||
|
CompletableFuture.runAsync(() -> copyFilesToDatasetDirWithRelativePath(copyTargets, dataset, normalizedRoot));
|
||||||
|
return copiedFiles;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void copyFilesToDatasetDir(List<String> sourcePaths, Dataset dataset) {
|
||||||
|
for (String sourcePath : sourcePaths) {
|
||||||
|
Path sourceFilePath = Paths.get(sourcePath);
|
||||||
|
Path targetFilePath = Paths.get(dataset.getPath(), sourceFilePath.getFileName().toString());
|
||||||
|
try {
|
||||||
Files.createDirectories(Path.of(dataset.getPath()));
|
Files.createDirectories(Path.of(dataset.getPath()));
|
||||||
Files.copy(sourceFilePath, targetFilePath);
|
Files.copy(sourceFilePath, targetFilePath);
|
||||||
DatasetFile datasetFile = datasetFileRepository.findByDatasetIdAndFileName(
|
DatasetFile datasetFile = datasetFileRepository.findByDatasetIdAndFileName(
|
||||||
@@ -753,10 +818,39 @@ public class DatasetFileApplicationService {
|
|||||||
triggerPdfTextExtraction(dataset, datasetFile);
|
triggerPdfTextExtraction(dataset, datasetFile);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
log.error("Failed to copy file from {} to {}", sourcePath, targetFilePath, e);
|
log.error("Failed to copy file from {} to {}", sourcePath, targetFilePath, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void copyFilesToDatasetDirWithRelativePath(
|
||||||
|
Map<String, DatasetFile> copyTargets,
|
||||||
|
Dataset dataset,
|
||||||
|
Path sourceRoot
|
||||||
|
) {
|
||||||
|
Path datasetRoot = Paths.get(dataset.getPath()).toAbsolutePath().normalize();
|
||||||
|
Path normalizedRoot = sourceRoot.toAbsolutePath().normalize();
|
||||||
|
for (Map.Entry<String, DatasetFile> entry : copyTargets.entrySet()) {
|
||||||
|
Path sourcePath = Paths.get(entry.getKey()).toAbsolutePath().normalize();
|
||||||
|
if (!sourcePath.startsWith(normalizedRoot)) {
|
||||||
|
log.warn("Source file path is out of root: {}", sourcePath);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Path relativePath = normalizedRoot.relativize(sourcePath);
|
||||||
|
Path targetFilePath = datasetRoot.resolve(relativePath).normalize();
|
||||||
|
if (!targetFilePath.startsWith(datasetRoot)) {
|
||||||
|
log.warn("Target file path is out of dataset path: {}", targetFilePath);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Files.createDirectories(targetFilePath.getParent());
|
||||||
|
Files.copy(sourcePath, targetFilePath);
|
||||||
|
triggerPdfTextExtraction(dataset, entry.getValue());
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.error("Failed to copy file from {} to {}", sourcePath, targetFilePath, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 添加文件到数据集(仅创建数据库记录,不执行文件系统操作)
|
* 添加文件到数据集(仅创建数据库记录,不执行文件系统操作)
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -12,7 +12,6 @@ from app.module.shared.schema import StandardResponse, PaginatedData
|
|||||||
from app.module.dataset import DatasetManagementService
|
from app.module.dataset import DatasetManagementService
|
||||||
from app.core.logging import get_logger
|
from app.core.logging import get_logger
|
||||||
|
|
||||||
from app.module.annotation.service.editor import AnnotationEditorService
|
|
||||||
from ..service.mapping import DatasetMappingService
|
from ..service.mapping import DatasetMappingService
|
||||||
from ..service.template import AnnotationTemplateService
|
from ..service.template import AnnotationTemplateService
|
||||||
from ..schema import (
|
from ..schema import (
|
||||||
@@ -118,15 +117,30 @@ async def create_mapping(
|
|||||||
configuration=project_configuration or None,
|
configuration=project_configuration or None,
|
||||||
)
|
)
|
||||||
|
|
||||||
file_query = select(DatasetFiles.id).where(
|
file_result = await db.execute(
|
||||||
DatasetFiles.dataset_id == request.dataset_id
|
select(DatasetFiles).where(DatasetFiles.dataset_id == request.dataset_id)
|
||||||
)
|
)
|
||||||
|
file_records = file_result.scalars().all()
|
||||||
|
snapshot_file_ids: list[str] = []
|
||||||
if dataset_type == TEXT_DATASET_TYPE:
|
if dataset_type == TEXT_DATASET_TYPE:
|
||||||
file_query = file_query.where(
|
derived_source_ids = set()
|
||||||
~AnnotationEditorService._build_source_document_filter()
|
for file_record in file_records:
|
||||||
)
|
metadata = getattr(file_record, "dataset_filemetadata", None)
|
||||||
file_result = await db.execute(file_query)
|
if isinstance(metadata, dict):
|
||||||
snapshot_file_ids = [str(fid) for fid in file_result.scalars().all()]
|
source_id = metadata.get("derived_from_file_id")
|
||||||
|
if source_id:
|
||||||
|
derived_source_ids.add(str(source_id))
|
||||||
|
snapshot_file_ids = [
|
||||||
|
str(file_record.id)
|
||||||
|
for file_record in file_records
|
||||||
|
if file_record.id and str(file_record.id) not in derived_source_ids
|
||||||
|
]
|
||||||
|
else:
|
||||||
|
snapshot_file_ids = [
|
||||||
|
str(file_record.id)
|
||||||
|
for file_record in file_records
|
||||||
|
if file_record.id
|
||||||
|
]
|
||||||
|
|
||||||
# 创建映射关系并写入快照
|
# 创建映射关系并写入快照
|
||||||
mapping = await mapping_service.create_mapping_with_snapshot(
|
mapping = await mapping_service.create_mapping_with_snapshot(
|
||||||
|
|||||||
@@ -429,19 +429,10 @@ class AnnotationEditorService:
|
|||||||
exclude_source_documents: Optional[bool] = None,
|
exclude_source_documents: Optional[bool] = None,
|
||||||
) -> EditorTaskListResponse:
|
) -> EditorTaskListResponse:
|
||||||
project = await self._get_project_or_404(project_id)
|
project = await self._get_project_or_404(project_id)
|
||||||
dataset_type = self._normalize_dataset_type(await self._get_dataset_type(project.dataset_id))
|
|
||||||
should_exclude_source_documents = False
|
|
||||||
if dataset_type == DATASET_TYPE_TEXT:
|
|
||||||
should_exclude_source_documents = (
|
|
||||||
exclude_source_documents if exclude_source_documents is not None else True
|
|
||||||
)
|
|
||||||
|
|
||||||
base_conditions = [
|
base_conditions = [
|
||||||
LabelingProjectFile.project_id == project_id,
|
LabelingProjectFile.project_id == project_id,
|
||||||
DatasetFiles.dataset_id == project.dataset_id,
|
DatasetFiles.dataset_id == project.dataset_id,
|
||||||
]
|
]
|
||||||
if should_exclude_source_documents:
|
|
||||||
base_conditions.append(~self._build_source_document_filter())
|
|
||||||
|
|
||||||
count_result = await self.db.execute(
|
count_result = await self.db.execute(
|
||||||
select(func.count())
|
select(func.count())
|
||||||
|
|||||||
Reference in New Issue
Block a user