feat(dataset): 实现数据集文件可见性过滤功能

- 添加派生文件识别逻辑,通过元数据中的derived_from_file_id字段判断
- 实现applyVisibleFileCounts方法为数据集批量设置可见文件数量
- 修改数据集统计接口使用过滤后的可见文件进行统计计算
- 添加normalizeFilePath工具方法统一路径格式处理
- 更新文件查询逻辑支持派生文件过滤功能
- 新增DatasetFileCount DTO用于文件计数统计返回
This commit is contained in:
2026-02-01 22:55:07 +08:00
parent 9d185bb10c
commit a0239518fb
8 changed files with 226 additions and 64 deletions

View File

@@ -19,8 +19,11 @@ import com.datamate.datamanagement.infrastructure.exception.DataManagementErrorC
import com.datamate.datamanagement.infrastructure.persistence.mapper.TagMapper;
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetFileRepository;
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetRepository;
import com.datamate.datamanagement.infrastructure.persistence.repository.dto.DatasetFileCount;
import com.datamate.datamanagement.interfaces.converter.DatasetConverter;
import com.datamate.datamanagement.interfaces.dto.*;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
@@ -53,6 +56,7 @@ public class DatasetApplicationService {
private static final int SIMILAR_DATASET_MAX_LIMIT = 50;
private static final int SIMILAR_DATASET_CANDIDATE_FACTOR = 5;
private static final int SIMILAR_DATASET_CANDIDATE_MAX = 100;
private static final String DERIVED_METADATA_KEY = "derived_from_file_id";
private final DatasetRepository datasetRepository;
private final TagMapper tagMapper;
private final DatasetFileRepository datasetFileRepository;
@@ -142,6 +146,7 @@ public class DatasetApplicationService {
BusinessAssert.notNull(dataset, DataManagementErrorCode.DATASET_NOT_FOUND);
List<DatasetFile> datasetFiles = datasetFileRepository.findAllByDatasetId(datasetId);
dataset.setFiles(datasetFiles);
applyVisibleFileCounts(Collections.singletonList(dataset));
return dataset;
}
@@ -153,6 +158,7 @@ public class DatasetApplicationService {
IPage<Dataset> page = new Page<>(query.getPage(), query.getSize());
page = datasetRepository.findByCriteria(page, query);
String datasetPvcName = getDatasetPvcName();
applyVisibleFileCounts(page.getRecords());
List<DatasetResponse> datasetResponses = DatasetConverter.INSTANCE.convertToResponse(page.getRecords());
datasetResponses.forEach(dataset -> dataset.setPvcName(datasetPvcName));
return PagedResponse.of(datasetResponses, page.getCurrent(), page.getTotal(), page.getPages());
@@ -200,6 +206,7 @@ public class DatasetApplicationService {
})
.limit(safeLimit)
.toList();
applyVisibleFileCounts(sorted);
List<DatasetResponse> responses = DatasetConverter.INSTANCE.convertToResponse(sorted);
responses.forEach(item -> item.setPvcName(datasetPvcName));
return responses;
@@ -345,6 +352,61 @@ public class DatasetApplicationService {
dataset.setPath(newPath);
}
private void applyVisibleFileCounts(List<Dataset> datasets) {
if (CollectionUtils.isEmpty(datasets)) {
return;
}
List<String> datasetIds = datasets.stream()
.filter(Objects::nonNull)
.map(Dataset::getId)
.filter(StringUtils::hasText)
.toList();
if (datasetIds.isEmpty()) {
return;
}
Map<String, Long> countMap = datasetFileRepository.countNonDerivedByDatasetIds(datasetIds).stream()
.filter(Objects::nonNull)
.collect(Collectors.toMap(
DatasetFileCount::getDatasetId,
count -> Optional.ofNullable(count.getFileCount()).orElse(0L),
(left, right) -> left
));
for (Dataset dataset : datasets) {
if (dataset == null || !StringUtils.hasText(dataset.getId())) {
continue;
}
Long visibleCount = countMap.get(dataset.getId());
dataset.setFileCount(visibleCount != null ? visibleCount : 0L);
}
}
private List<DatasetFile> filterVisibleFiles(List<DatasetFile> files) {
if (CollectionUtils.isEmpty(files)) {
return Collections.emptyList();
}
return files.stream()
.filter(file -> !isDerivedFile(file))
.collect(Collectors.toList());
}
private boolean isDerivedFile(DatasetFile datasetFile) {
if (datasetFile == null) {
return false;
}
String metadata = datasetFile.getMetadata();
if (!StringUtils.hasText(metadata)) {
return false;
}
try {
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> metadataMap = mapper.readValue(metadata, new TypeReference<Map<String, Object>>() {});
return metadataMap.get(DERIVED_METADATA_KEY) != null;
} catch (Exception e) {
log.debug("Failed to parse dataset file metadata for derived detection: {}", datasetFile.getId(), e);
return false;
}
}
/**
* 获取数据集统计信息
*/
@@ -357,27 +419,29 @@ public class DatasetApplicationService {
Map<String, Object> statistics = new HashMap<>();
// 基础统计
Long totalFiles = datasetFileRepository.countByDatasetId(datasetId);
Long completedFiles = datasetFileRepository.countCompletedByDatasetId(datasetId);
List<DatasetFile> allFiles = datasetFileRepository.findAllByDatasetId(datasetId);
List<DatasetFile> visibleFiles = filterVisibleFiles(allFiles);
long totalFiles = visibleFiles.size();
long completedFiles = visibleFiles.stream()
.filter(file -> "COMPLETED".equalsIgnoreCase(file.getStatus()))
.count();
Long totalSize = datasetFileRepository.sumSizeByDatasetId(datasetId);
statistics.put("totalFiles", totalFiles != null ? totalFiles.intValue() : 0);
statistics.put("completedFiles", completedFiles != null ? completedFiles.intValue() : 0);
statistics.put("totalFiles", (int) totalFiles);
statistics.put("completedFiles", (int) completedFiles);
statistics.put("totalSize", totalSize != null ? totalSize : 0L);
// 完成率计算
float completionRate = 0.0f;
if (totalFiles != null && totalFiles > 0) {
completionRate = (completedFiles != null ? completedFiles.floatValue() : 0.0f) / totalFiles.floatValue() * 100.0f;
if (totalFiles > 0) {
completionRate = ((float) completedFiles) / (float) totalFiles * 100.0f;
}
statistics.put("completionRate", completionRate);
// 文件类型分布统计
Map<String, Integer> fileTypeDistribution = new HashMap<>();
List<DatasetFile> allFiles = datasetFileRepository.findAllByDatasetId(datasetId);
if (allFiles != null) {
for (DatasetFile file : allFiles) {
if (!visibleFiles.isEmpty()) {
for (DatasetFile file : visibleFiles) {
String fileType = file.getFileType() != null ? file.getFileType() : "unknown";
fileTypeDistribution.put(fileType, fileTypeDistribution.getOrDefault(fileType, 0) + 1);
}
@@ -386,8 +450,8 @@ public class DatasetApplicationService {
// 状态分布统计
Map<String, Integer> statusDistribution = new HashMap<>();
if (allFiles != null) {
for (DatasetFile file : allFiles) {
if (!visibleFiles.isEmpty()) {
for (DatasetFile file : visibleFiles) {
String status = file.getStatus() != null ? file.getStatus() : "unknown";
statusDistribution.put(status, statusDistribution.getOrDefault(status, 0) + 1);
}

View File

@@ -164,27 +164,36 @@ public class DatasetFileApplicationService {
}
String datasetPath = dataset.getPath();
Path queryPath = Path.of(dataset.getPath() + File.separator + prefix);
Map<String, DatasetFile> datasetFilesMap = datasetFileRepository.findAllByDatasetId(datasetId)
.stream().collect(Collectors.toMap(DatasetFile::getFilePath, Function.identity()));
Set<String> derivedFilePaths = excludeDerivedFiles
? datasetFilesMap.values().stream()
.filter(this::isDerivedFile)
.map(DatasetFile::getFilePath)
.filter(Objects::nonNull)
.collect(Collectors.toSet())
: Collections.emptySet();
Map<String, DatasetFile> datasetFilesMap = datasetFileRepository.findAllByDatasetId(datasetId)
.stream()
.filter(file -> file.getFilePath() != null)
.collect(Collectors.toMap(
file -> normalizeFilePath(file.getFilePath()),
Function.identity(),
(left, right) -> left
));
Set<String> derivedFilePaths = excludeDerivedFiles
? datasetFilesMap.values().stream()
.filter(this::isDerivedFile)
.map(DatasetFile::getFilePath)
.map(this::normalizeFilePath)
.filter(Objects::nonNull)
.collect(Collectors.toSet())
: Collections.emptySet();
// 如果目录不存在,直接返回空结果(数据集刚创建时目录可能还未生成)
if (!Files.exists(queryPath)) {
return new PagedResponse<>(page, size, 0, 0, Collections.emptyList());
}
try (Stream<Path> pathStream = Files.list(queryPath)) {
List<Path> allFiles = pathStream
.filter(path -> path.toString().startsWith(datasetPath))
.filter(path -> !excludeDerivedFiles || Files.isDirectory(path) || !derivedFilePaths.contains(path.toString()))
.sorted(Comparator
.comparing((Path path) -> !Files.isDirectory(path))
.thenComparing(path -> path.getFileName().toString()))
.collect(Collectors.toList());
List<Path> allFiles = pathStream
.filter(path -> path.toString().startsWith(datasetPath))
.filter(path -> !excludeDerivedFiles
|| Files.isDirectory(path)
|| !derivedFilePaths.contains(normalizeFilePath(path.toString())))
.sorted(Comparator
.comparing((Path path) -> !Files.isDirectory(path))
.thenComparing(path -> path.getFileName().toString()))
.collect(Collectors.toList());
// 计算分页
int total = allFiles.size();
@@ -199,7 +208,9 @@ public class DatasetFileApplicationService {
if (fromIndex < total) {
pageData = allFiles.subList(fromIndex, toIndex);
}
List<DatasetFile> datasetFiles = pageData.stream().map(path -> getDatasetFile(path, datasetFilesMap)).toList();
List<DatasetFile> datasetFiles = pageData.stream()
.map(path -> getDatasetFile(path, datasetFilesMap, excludeDerivedFiles, derivedFilePaths))
.toList();
return new PagedResponse<>(page, size, total, totalPages, datasetFiles);
} catch (IOException e) {
@@ -208,9 +219,12 @@ public class DatasetFileApplicationService {
}
}
private DatasetFile getDatasetFile(Path path, Map<String, DatasetFile> datasetFilesMap) {
DatasetFile datasetFile = new DatasetFile();
LocalDateTime localDateTime = LocalDateTime.now();
private DatasetFile getDatasetFile(Path path,
Map<String, DatasetFile> datasetFilesMap,
boolean excludeDerivedFiles,
Set<String> derivedFilePaths) {
DatasetFile datasetFile = new DatasetFile();
LocalDateTime localDateTime = LocalDateTime.now();
try {
localDateTime = Files.getLastModifiedTime(path).toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();
} catch (IOException e) {
@@ -229,23 +243,32 @@ public class DatasetFileApplicationService {
long fileCount;
long totalSize;
try (Stream<Path> walk = Files.walk(path)) {
fileCount = walk.filter(Files::isRegularFile).count();
}
try (Stream<Path> walk = Files.walk(path)) {
totalSize = walk
.filter(Files::isRegularFile)
.mapToLong(p -> {
try {
return Files.size(p);
} catch (IOException e) {
log.error("get file size error", e);
return 0L;
}
})
.sum();
}
try (Stream<Path> walk = Files.walk(path)) {
Stream<Path> fileStream = walk.filter(Files::isRegularFile);
if (excludeDerivedFiles && !derivedFilePaths.isEmpty()) {
fileStream = fileStream.filter(filePath ->
!derivedFilePaths.contains(normalizeFilePath(filePath.toString())));
}
fileCount = fileStream.count();
}
try (Stream<Path> walk = Files.walk(path)) {
Stream<Path> fileStream = walk.filter(Files::isRegularFile);
if (excludeDerivedFiles && !derivedFilePaths.isEmpty()) {
fileStream = fileStream.filter(filePath ->
!derivedFilePaths.contains(normalizeFilePath(filePath.toString())));
}
totalSize = fileStream
.mapToLong(p -> {
try {
return Files.size(p);
} catch (IOException e) {
log.error("get file size error", e);
return 0L;
}
})
.sum();
}
datasetFile.setFileCount(fileCount);
datasetFile.setFileSize(totalSize);
@@ -253,20 +276,31 @@ public class DatasetFileApplicationService {
log.error("stat directory info error", e);
}
} else {
DatasetFile exist = datasetFilesMap.get(path.toString());
if (exist == null) {
datasetFile.setId("file-" + datasetFile.getFileName());
datasetFile.setFileSize(path.toFile().length());
} else {
DatasetFile exist = datasetFilesMap.get(normalizeFilePath(path.toString()));
if (exist == null) {
datasetFile.setId("file-" + datasetFile.getFileName());
datasetFile.setFileSize(path.toFile().length());
} else {
datasetFile = exist;
}
}
return datasetFile;
}
private boolean isSourceDocument(DatasetFile datasetFile) {
if (datasetFile == null) {
return false;
return datasetFile;
}
private String normalizeFilePath(String filePath) {
if (filePath == null || filePath.isBlank()) {
return null;
}
try {
return Paths.get(filePath).toAbsolutePath().normalize().toString();
} catch (Exception e) {
return filePath.replace("\\", "/");
}
}
private boolean isSourceDocument(DatasetFile datasetFile) {
if (datasetFile == null) {
return false;
}
String fileType = datasetFile.getFileType();
if (fileType == null || fileType.isBlank()) {

View File

@@ -2,6 +2,7 @@ package com.datamate.datamanagement.infrastructure.persistence.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.datamate.datamanagement.domain.model.dataset.DatasetFile;
import com.datamate.datamanagement.infrastructure.persistence.repository.dto.DatasetFileCount;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.session.RowBounds;
@@ -17,6 +18,7 @@ public interface DatasetFileMapper extends BaseMapper<DatasetFile> {
Long countByDatasetId(@Param("datasetId") String datasetId);
Long countCompletedByDatasetId(@Param("datasetId") String datasetId);
Long sumSizeByDatasetId(@Param("datasetId") String datasetId);
Long countNonDerivedByDatasetId(@Param("datasetId") String datasetId);
DatasetFile findByDatasetIdAndFileName(@Param("datasetId") String datasetId, @Param("fileName") String fileName);
List<DatasetFile> findAllByDatasetId(@Param("datasetId") String datasetId);
List<DatasetFile> findByCriteria(@Param("datasetId") String datasetId,
@@ -38,4 +40,12 @@ public interface DatasetFileMapper extends BaseMapper<DatasetFile> {
* @return 源文件ID列表
*/
List<String> findSourceFileIdsWithDerivedFiles(@Param("datasetId") String datasetId);
/**
* 批量统计排除衍生文件后的文件数
*
* @param datasetIds 数据集ID列表
* @return 文件数统计列表
*/
List<DatasetFileCount> countNonDerivedByDatasetIds(@Param("datasetIds") List<String> datasetIds);
}

View File

@@ -3,6 +3,7 @@ package com.datamate.datamanagement.infrastructure.persistence.repository;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.repository.IRepository;
import com.datamate.datamanagement.domain.model.dataset.DatasetFile;
import com.datamate.datamanagement.infrastructure.persistence.repository.dto.DatasetFileCount;
import java.util.List;
@@ -15,6 +16,8 @@ import java.util.List;
public interface DatasetFileRepository extends IRepository<DatasetFile> {
Long countByDatasetId(String datasetId);
Long countNonDerivedByDatasetId(String datasetId);
Long countCompletedByDatasetId(String datasetId);
Long sumSizeByDatasetId(String datasetId);
@@ -36,4 +39,6 @@ public interface DatasetFileRepository extends IRepository<DatasetFile> {
* @return 源文件ID列表
*/
List<String> findSourceFileIdsWithDerivedFiles(String datasetId);
List<DatasetFileCount> countNonDerivedByDatasetIds(List<String> datasetIds);
}

View File

@@ -0,0 +1,18 @@
package com.datamate.datamanagement.infrastructure.persistence.repository.dto;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
/**
* 数据集文件数统计结果
*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class DatasetFileCount {
private String datasetId;
private Long fileCount;
}

View File

@@ -6,6 +6,7 @@ import com.baomidou.mybatisplus.extension.repository.CrudRepository;
import com.datamate.datamanagement.domain.model.dataset.DatasetFile;
import com.datamate.datamanagement.infrastructure.persistence.mapper.DatasetFileMapper;
import com.datamate.datamanagement.infrastructure.persistence.repository.DatasetFileRepository;
import com.datamate.datamanagement.infrastructure.persistence.repository.dto.DatasetFileCount;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Repository;
import org.springframework.util.StringUtils;
@@ -30,6 +31,11 @@ public class DatasetFileRepositoryImpl extends CrudRepository<DatasetFileMapper,
return datasetFileMapper.selectCount(new LambdaQueryWrapper<DatasetFile>().eq(DatasetFile::getDatasetId, datasetId));
}
@Override
public Long countNonDerivedByDatasetId(String datasetId) {
return datasetFileMapper.countNonDerivedByDatasetId(datasetId);
}
@Override
public Long countCompletedByDatasetId(String datasetId) {
return datasetFileMapper.countCompletedByDatasetId(datasetId);
@@ -71,4 +77,9 @@ public class DatasetFileRepositoryImpl extends CrudRepository<DatasetFileMapper,
// 使用 MyBatis 的 @Select 注解或直接调用 mapper 方法
return datasetFileMapper.findSourceFileIdsWithDerivedFiles(datasetId);
}
@Override
public List<DatasetFileCount> countNonDerivedByDatasetIds(List<String> datasetIds) {
return datasetFileMapper.countNonDerivedByDatasetIds(datasetIds);
}
}

View File

@@ -42,6 +42,13 @@
SELECT COUNT(*) FROM t_dm_dataset_files WHERE dataset_id = #{datasetId}
</select>
<select id="countNonDerivedByDatasetId" parameterType="string" resultType="long">
SELECT COUNT(*)
FROM t_dm_dataset_files
WHERE dataset_id = #{datasetId}
AND (metadata IS NULL OR JSON_EXTRACT(metadata, '$.derived_from_file_id') IS NULL)
</select>
<select id="countCompletedByDatasetId" parameterType="string" resultType="long">
SELECT COUNT(*) FROM t_dm_dataset_files WHERE dataset_id = #{datasetId} AND status = 'COMPLETED'
</select>
@@ -110,4 +117,16 @@
AND metadata IS NOT NULL
AND JSON_EXTRACT(metadata, '$.derived_from_file_id') IS NOT NULL
</select>
<select id="countNonDerivedByDatasetIds" resultType="com.datamate.datamanagement.infrastructure.persistence.repository.dto.DatasetFileCount">
SELECT dataset_id AS datasetId,
COUNT(*) AS fileCount
FROM t_dm_dataset_files
WHERE dataset_id IN
<foreach collection="datasetIds" item="datasetId" open="(" separator="," close=")">
#{datasetId}
</foreach>
AND (metadata IS NULL OR JSON_EXTRACT(metadata, '$.derived_from_file_id') IS NULL)
GROUP BY dataset_id
</select>
</mapper>

View File

@@ -145,9 +145,10 @@
<select id="getAllDatasetStatistics" resultType="com.datamate.datamanagement.interfaces.dto.AllDatasetStatisticsResponse">
SELECT
COUNT(*) AS total_datasets,
SUM(size_bytes) AS total_size,
SUM(file_count) AS total_files
FROM t_dm_datasets;
(SELECT COUNT(*) FROM t_dm_datasets) AS total_datasets,
(SELECT COALESCE(SUM(size_bytes), 0) FROM t_dm_datasets) AS total_size,
(SELECT COUNT(*)
FROM t_dm_dataset_files
WHERE metadata IS NULL OR JSON_EXTRACT(metadata, '$.derived_from_file_id') IS NULL) AS total_files
</select>
</mapper>