From 07029d07ff1fa676d34b0b455f84c12a84a0f901 Mon Sep 17 00:00:00 2001 From: hhhhsc701 <56435672+hhhhsc701@users.noreply.github.com> Date: Fri, 28 Nov 2025 15:28:10 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=B8=85=E6=B4=97=E9=87=8D?= =?UTF-8?q?=E8=AF=95=E6=9C=BA=E5=88=B6=EF=BC=8C=E4=BC=98=E5=8C=96=E6=B8=85?= =?UTF-8?q?=E6=B4=97=E8=BF=9B=E5=BA=A6=E5=B1=95=E7=A4=BA=EF=BC=8C=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=E6=A8=A1=E6=9D=BF=E6=97=A0=E6=B3=95=E5=B1=95=E7=A4=BA?= =?UTF-8?q?=E5=8F=82=E6=95=B0=20(#113)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * bugfix: 模板无法展示参数 * bugfix: 优化清洗进度展示 * bugfix: 优化清洗重试机制 --- .../application/CleaningTaskService.java | 31 ++++++++++++++++++- .../application/CleaningTemplateService.java | 16 +++++++++- .../repository/CleaningResultRepository.java | 4 +++ .../Impl/CleaningResultRepositoryImpl.java | 15 +++++++-- .../operator/application/OperatorService.java | 2 +- .../Detail/components/BasicInfo.tsx | 5 ++- .../Detail/components/FileTable.tsx | 10 +++++- .../Home/components/TaskList.tsx | 11 ++++--- .../Home/components/TemplateList.tsx | 14 ++++----- .../img_duplicated_images_cleaner/process.py | 4 +-- .../python-executor/datamate/core/base_op.py | 2 +- 11 files changed, 93 insertions(+), 21 deletions(-) diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java index 58c417d..11aba02 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java @@ -41,6 +41,7 @@ import java.util.*; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import java.util.stream.Stream; @Slf4j @@ -116,7 +117,7 @@ public class CleaningTaskService { prepareTask(task, request.getInstance()); scanDataset(taskId, request.getSrcDatasetId()); - executeTask(taskId); + taskScheduler.executeTask(taskId); return task; } @@ -170,6 +171,11 @@ public class CleaningTaskService { } public void executeTask(String taskId) { + List failed = cleaningResultRepo.findByInstanceId(taskId, "FAILED"); + Set failedSet = failed.stream().map(CleaningResultDto::getSrcFileId).collect(Collectors.toSet()); + CleaningTaskDto task = cleaningTaskRepo.findTaskById(taskId); + scanDataset(taskId, task.getSrcDatasetId(), failedSet); + cleaningResultRepo.deleteByInstanceId(taskId, "FAILED"); taskScheduler.executeTask(taskId); } @@ -226,6 +232,29 @@ public class CleaningTaskService { } while (pageNumber < datasetFiles.getTotalPages()); } + private void scanDataset(String taskId, String srcDatasetId, Set failedFiles) { + int pageNumber = 0; + int pageSize = 500; + PagingQuery pageRequest = new PagingQuery(pageNumber, pageSize); + PagedResponse datasetFiles; + do { + datasetFiles = datasetFileService.getDatasetFiles(srcDatasetId, null, null,null, pageRequest); + if (datasetFiles.getContent().isEmpty()) { + break; + } + List> files = datasetFiles.getContent().stream() + .filter(content -> failedFiles.contains(content.getId())) + .map(content -> Map.of("fileName", (Object) content.getFileName(), + "fileSize", content.getFileSize(), + "filePath", content.getFilePath(), + "fileType", content.getFileType(), + "fileId", content.getId())) + .toList(); + writeListMapToJsonlFile(files, FLOW_PATH + "/" + taskId + "/dataset.jsonl"); + pageNumber += 1; + } while (pageNumber < datasetFiles.getTotalPages()); + } + private void writeListMapToJsonlFile(List> mapList, String fileName) { ObjectMapper objectMapper = new ObjectMapper(); diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTemplateService.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTemplateService.java index 1303137..ed8c9d9 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTemplateService.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTemplateService.java @@ -6,8 +6,13 @@ import com.datamate.cleaning.domain.repository.OperatorInstanceRepository; import com.datamate.cleaning.infrastructure.validator.CleanTaskValidator; import com.datamate.cleaning.interfaces.dto.*; import com.datamate.cleaning.domain.model.entity.TemplateWithInstance; +import com.datamate.common.infrastructure.exception.BusinessException; +import com.datamate.operator.application.OperatorService; import com.datamate.operator.domain.repository.OperatorViewRepository; +import com.datamate.operator.infrastructure.exception.OperatorErrorCode; import com.datamate.operator.interfaces.dto.OperatorDto; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; @@ -31,6 +36,10 @@ public class CleaningTemplateService { private final CleanTaskValidator cleanTaskValidator; + private final OperatorService operatorService; + + private final ObjectMapper objectMapper = new ObjectMapper(); + public List getTemplates(String keywords) { List allOperators = operatorViewRepo.findOperatorsByCriteria(null, null, null, null, null); @@ -50,7 +59,12 @@ public class CleaningTemplateService { .map(v -> { OperatorDto operator = operatorsMap.get(v.getOperatorId()); if (StringUtils.isNotBlank(v.getSettingsOverride())) { - operator.setSettings(v.getSettingsOverride()); + try { + operator.setOverrides(objectMapper.readValue(v.getSettingsOverride(), Map.class)); + } catch (JsonProcessingException e) { + throw BusinessException.of(OperatorErrorCode.SETTINGS_PARSE_FAILED, e.getMessage()); + } + operatorService.overrideSettings(operator); } return operator; }).toList()); diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/repository/CleaningResultRepository.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/repository/CleaningResultRepository.java index 2c018ba..b735839 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/repository/CleaningResultRepository.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/repository/CleaningResultRepository.java @@ -10,7 +10,11 @@ import java.util.List; public interface CleaningResultRepository extends IRepository { void deleteByInstanceId(String instanceId); + void deleteByInstanceId(String instanceId, String status); + int[] countByInstanceId(String instanceId); List findByInstanceId(String instanceId); + + List findByInstanceId(String instanceId, String status); } diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/Impl/CleaningResultRepositoryImpl.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/Impl/CleaningResultRepositoryImpl.java index 9e012ba..0ed303b 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/Impl/CleaningResultRepositoryImpl.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/Impl/CleaningResultRepositoryImpl.java @@ -22,8 +22,14 @@ public class CleaningResultRepositoryImpl extends CrudRepository queryWrapper = new LambdaQueryWrapper<>(); - queryWrapper.eq(CleaningResult::getInstanceId, instanceId); + queryWrapper.eq(CleaningResult::getInstanceId, instanceId) + .eq(StringUtils.isNotBlank(status), CleaningResult::getStatus, status); mapper.delete(queryWrapper); } @@ -40,8 +46,13 @@ public class CleaningResultRepositoryImpl extends CrudRepository findByInstanceId(String instanceId) { + return findByInstanceId(instanceId, null); + } + + public List findByInstanceId(String instanceId, String status) { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); - queryWrapper.eq(CleaningResult::getInstanceId, instanceId); + queryWrapper.eq(CleaningResult::getInstanceId, instanceId) + .eq(StringUtils.isNotBlank(status), CleaningResult::getStatus, status); return CleaningResultConverter.INSTANCE.convertEntityToDto(mapper.selectList(queryWrapper)); } } diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/application/OperatorService.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/application/OperatorService.java index a43e723..abc82a7 100644 --- a/backend/services/operator-market-service/src/main/java/com/datamate/operator/application/OperatorService.java +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/application/OperatorService.java @@ -128,7 +128,7 @@ public class OperatorService { return operatorBasePath + File.separator + "extract" + File.separator + fileName; } - private void overrideSettings(OperatorDto operatorDto) { + public void overrideSettings(OperatorDto operatorDto) { if (StringUtils.isBlank(operatorDto.getSettings()) || MapUtils.isEmpty(operatorDto.getOverrides())) { return; } diff --git a/frontend/src/pages/DataCleansing/Detail/components/BasicInfo.tsx b/frontend/src/pages/DataCleansing/Detail/components/BasicInfo.tsx index f658271..cfa2572 100644 --- a/frontend/src/pages/DataCleansing/Detail/components/BasicInfo.tsx +++ b/frontend/src/pages/DataCleansing/Detail/components/BasicInfo.tsx @@ -110,7 +110,10 @@ export default function BasicInfo({ task }: { task: CleansingTask }) { {/* 处理进度 */}

处理进度

- + { task?.status?.value === TaskStatus.FAILED ? + + : + }
diff --git a/frontend/src/pages/DataCleansing/Detail/components/FileTable.tsx b/frontend/src/pages/DataCleansing/Detail/components/FileTable.tsx index c879046..1d95fe8 100644 --- a/frontend/src/pages/DataCleansing/Detail/components/FileTable.tsx +++ b/frontend/src/pages/DataCleansing/Detail/components/FileTable.tsx @@ -250,7 +250,7 @@ export default function FileTable({result, fetchTaskResult}) { key: "action", render: (_text: string, record: any) => (
- {record.status === "COMPLETED" && ( + {record.status === "COMPLETED" ? ( + ) : ( + )} diff --git a/frontend/src/pages/DataCleansing/Home/components/TaskList.tsx b/frontend/src/pages/DataCleansing/Home/components/TaskList.tsx index deeed9a..8bdd1de 100644 --- a/frontend/src/pages/DataCleansing/Home/components/TaskList.tsx +++ b/frontend/src/pages/DataCleansing/Home/components/TaskList.tsx @@ -177,10 +177,13 @@ export default function TaskList() { title: "进度", dataIndex: "process", key: "process", - width: 200, - render: (progress: number) => ( - - ), + width: 150, + render: (_, record: CleansingTask) => { + if (record?.status?.value == TaskStatus.FAILED) { + return ; + } + return ; + }, }, { title: "已处理文件数", diff --git a/frontend/src/pages/DataCleansing/Home/components/TemplateList.tsx b/frontend/src/pages/DataCleansing/Home/components/TemplateList.tsx index ed0fdb3..696e1cf 100644 --- a/frontend/src/pages/DataCleansing/Home/components/TemplateList.tsx +++ b/frontend/src/pages/DataCleansing/Home/components/TemplateList.tsx @@ -45,13 +45,6 @@ export default function TemplateList() { }; const templateColumns = [ - { - title: "模板ID", - dataIndex: "id", - key: "id", - fixed: "left", - width: 100, - }, { title: "模板名称", dataIndex: "name", @@ -71,6 +64,13 @@ export default function TemplateList() { ); }}, + { + title: "模板ID", + dataIndex: "id", + key: "id", + fixed: "left", + width: 150, + }, { title: "算子数量", dataIndex: "num", diff --git a/runtime/ops/filter/img_duplicated_images_cleaner/process.py b/runtime/ops/filter/img_duplicated_images_cleaner/process.py index f0a175b..578a145 100644 --- a/runtime/ops/filter/img_duplicated_images_cleaner/process.py +++ b/runtime/ops/filter/img_duplicated_images_cleaner/process.py @@ -91,10 +91,10 @@ class ImgDuplicatedImagesCleaner(Filter): with self.conn as connection: connection.execute(text(create_tables_sql)) # 判断是否有重复文件 - result = connection.execute(text(query_sql, query_sql_params)).fetchall() + result = connection.execute(text(query_sql), query_sql_params).fetchall() # 查询记录为空,无重复图片, 插入新文件特征 if not result: - connection.execute(text(insert_sql, insert_sql_params)) + connection.execute(text(insert_sql), insert_sql_params) return img_bytes logger.info(f"taskId: {self.task_uuid} fileName: {file_name}, method: Duplicate ImagesCleaner. " f"The image is duplicated and filtered ") diff --git a/runtime/python-executor/datamate/core/base_op.py b/runtime/python-executor/datamate/core/base_op.py index 68202d6..1db019a 100644 --- a/runtime/python-executor/datamate/core/base_op.py +++ b/runtime/python-executor/datamate/core/base_op.py @@ -288,7 +288,7 @@ class Filter(BaseOp): f"{str(get_exception_info(e))}") task_info = TaskInfoPersistence() task_info.persistence_task_info(sample) - return False + raise e sample["execute_status"] = execute_status # 文件无内容会被过滤