From 6bbde0ec5622cec9b2bd879da1bbaef627261e80 Mon Sep 17 00:00:00 2001 From: hhhhsc701 <56435672+hhhhsc701@users.noreply.github.com> Date: Wed, 12 Nov 2025 18:00:19 +0800 Subject: [PATCH] =?UTF-8?q?feature:=20=E6=B8=85=E6=B4=97=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E8=AF=A6=E6=83=85=E9=A1=B5=20(#73)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feature: 清洗任务详情 * fix: 取消构建镜像,改为直接拉取 * fix: 增加清洗任务详情页 * fix: 增加清洗任务详情页 * fix: 算子列表可点击 * fix: 模板详情和更新 --- .github/workflows/docker-image-save.yml | 27 ++ .github/workflows/docker-images-reusable.yml | 16 +- .github/workflows/package.yml | 22 +- .../application/CleaningTaskService.java | 74 +++- .../application/CleaningTemplateService.java | 30 +- .../scheduler/CleaningTaskScheduler.java | 6 +- .../domain/model/entity/Operator.java | 36 -- .../repository/CleaningResultRepository.java | 7 +- .../OperatorInstanceRepository.java | 3 + .../converter/CleaningResultConverter.java | 15 + .../converter/OperatorInstanceConverter.java | 17 +- .../httpclient/RuntimeClient.java | 32 +- .../Impl/CleaningResultRepositoryImpl.java | 21 +- .../Impl/OperatorInstanceRepositoryImpl.java | 5 + .../mapper/OperatorInstanceMapper.java | 13 + .../interfaces/dto/CleaningProcess.java | 22 +- .../interfaces/dto/CleaningResultDto.java | 30 ++ .../interfaces/dto/CleaningTaskLog.java | 12 + .../rest/CleaningTaskController.java | 12 + .../operator/application/OperatorService.java | 4 +- .../repository/OperatorViewRepository.java | 5 +- .../converter/OperatorConverter.java | 2 + .../Impl/OperatorViewRepositoryImpl.java | 13 +- .../{CreateTempate.tsx => CreateTemplate.tsx} | 35 +- .../components/CreateTemplateStepOne.tsx | 6 + .../Create/components/OperatorLibrary.tsx | 56 +-- .../components/OperatorOrchestration.tsx | 23 +- .../Create/hooks/useCreateStepTwo.tsx | 1 + .../Create/hooks/useOperatorOperations.ts | 34 +- .../pages/DataCleansing/Detail/TaskDetail.tsx | 103 +++-- .../DataCleansing/Detail/TemplateDetail.tsx | 122 ++++++ .../Detail/components/BasicInfo.tsx | 53 +-- .../Detail/components/FileTable.tsx | 396 ++++++------------ .../Detail/components/LogsTable.tsx | 143 ++----- .../Detail/components/OperatorTable.tsx | 118 +----- .../Home/components/TaskList.tsx | 27 +- .../Home/components/TemplateList.tsx | 153 +++++-- .../src/pages/DataCleansing/cleansing.api.ts | 8 + .../pages/DataCleansing/cleansing.const.tsx | 5 + .../pages/DataCleansing/cleansing.model.ts | 21 +- .../OperatorMarket/Home/components/List.tsx | 3 - .../pages/OperatorMarket/operator.model.ts | 4 +- frontend/src/routes/routes.ts | 11 +- .../python-executor/datamate/core/base_op.py | 2 +- .../datamate/scheduler/cmd_task_scheduler.py | 102 +++-- scripts/images/runtime/Dockerfile | 10 +- 46 files changed, 1065 insertions(+), 795 deletions(-) create mode 100644 .github/workflows/docker-image-save.yml delete mode 100644 backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/entity/Operator.java create mode 100644 backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/converter/CleaningResultConverter.java create mode 100644 backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningResultDto.java create mode 100644 backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningTaskLog.java rename frontend/src/pages/DataCleansing/Create/{CreateTempate.tsx => CreateTemplate.tsx} (73%) create mode 100644 frontend/src/pages/DataCleansing/Detail/TemplateDetail.tsx diff --git a/.github/workflows/docker-image-save.yml b/.github/workflows/docker-image-save.yml new file mode 100644 index 0000000..5294f2e --- /dev/null +++ b/.github/workflows/docker-image-save.yml @@ -0,0 +1,27 @@ +name: docker-image-save.yml +on: + workflow_call: + inputs: + service_name: + required: true + type: string + +jobs: + pull-and-save: + runs-on: ubuntu-latest + steps: + - name: Pull Docker Image + run: | + LOWERCASE_REPO=$(echo "${{ github.repository_owner }}" | tr '[:upper:]' '[:lower:]') + docker pull ghcr.io/$LOWERCASE_REPO/datamate-${{ inputs.service_name }}:latest + docker tag ghcr.io/$LOWERCASE_REPO/datamate-${{ inputs.service_name }}:latest datamate-${{ inputs.service_name }}:latest + + - name: Save Docker Image + run: | + docker save -o datamate-${{ inputs.service_name }}.tar datamate-${{ inputs.service_name }}:latest + + - name: Upload Docker Image + uses: actions/upload-artifact@v4 + with: + name: datamate-${{ inputs.service_name }} + path: datamate-${{ inputs.service_name }}.tar diff --git a/.github/workflows/docker-images-reusable.yml b/.github/workflows/docker-images-reusable.yml index 0e7ed7a..73cdbd6 100644 --- a/.github/workflows/docker-images-reusable.yml +++ b/.github/workflows/docker-images-reusable.yml @@ -47,19 +47,7 @@ jobs: make build-${{ inputs.service_name }} VERSION=latest - name: Tag & Push Docker Image - if: github.event_name != 'pull_request' && !startsWith(github.workflow, 'Package') + if: github.event_name != 'pull_request' run: | docker tag datamate-${{ inputs.service_name }}:latest ${{ steps.set-tag.outputs.TAGS }} - docker push ${{ steps.set-tag.outputs.TAGS }} - - - name: Save Docker Image - if: startsWith(github.workflow, 'Package') - run: | - docker save -o datamate-${{ inputs.service_name }}.tar datamate-${{ inputs.service_name }}:latest - - - name: Upload Docker Image - if: startsWith(github.workflow, 'Package') - uses: actions/upload-artifact@v4 - with: - name: datamate-${{ inputs.service_name }} - path: datamate-${{ inputs.service_name }}.tar \ No newline at end of file + docker push ${{ steps.set-tag.outputs.TAGS }} \ No newline at end of file diff --git a/.github/workflows/package.yml b/.github/workflows/package.yml index 2b591d1..7320fd1 100644 --- a/.github/workflows/package.yml +++ b/.github/workflows/package.yml @@ -6,23 +6,33 @@ on: jobs: backend-docker-build: name: Build and Push Backend Docker Image - uses: ./.github/workflows/docker-image-backend.yml + uses: ./.github/workflows/docker-image-save.yml + with: + service_name: backend frontend-docker-build: name: Build and Push Frontend Docker Image - uses: ./.github/workflows/docker-image-frontend.yml + uses: ./.github/workflows/docker-image-save.yml + with: + service_name: frontend database-docker-build: name: Build and Push Database Docker Image - uses: ./.github/workflows/docker-image-database.yml + uses: ./.github/workflows/docker-image-save.yml + with: + service_name: database runtime-docker-build: name: Build and Push Runtime Docker Image - uses: ./.github/workflows/docker-image-runtime.yml + uses: ./.github/workflows/docker-image-save.yml + with: + service_name: runtime backend-python-docker-build: name: Build and Push Backend Python Docker Image - uses: ./.github/workflows/docker-image-backend-python.yml + uses: ./.github/workflows/docker-image-save.yml + with: + service_name: backend-python package-all: needs: @@ -54,7 +64,7 @@ jobs: - name: Upload Package uses: actions/upload-artifact@v4 with: - name: datamate + name: DataMate include-hidden-files: true path: | deployment/ diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java index 6b1d4d4..7b02817 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java @@ -11,10 +11,7 @@ import com.datamate.cleaning.domain.repository.CleaningTaskRepository; import com.datamate.cleaning.domain.repository.OperatorInstanceRepository; import com.datamate.cleaning.infrastructure.validator.CleanTaskValidator; -import com.datamate.cleaning.interfaces.dto.CleaningProcess; -import com.datamate.cleaning.interfaces.dto.CleaningTaskDto; -import com.datamate.cleaning.interfaces.dto.CreateCleaningTaskRequest; -import com.datamate.cleaning.interfaces.dto.OperatorInstanceDto; +import com.datamate.cleaning.interfaces.dto.*; import com.datamate.common.infrastructure.exception.BusinessException; import com.datamate.common.infrastructure.exception.SystemErrorCode; import com.datamate.datamanagement.application.DatasetApplicationService; @@ -40,15 +37,19 @@ import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Stream; @Slf4j @Service @RequiredArgsConstructor public class CleaningTaskService { - private final CleaningTaskRepository CleaningTaskRepo; + private final CleaningTaskRepository cleaningTaskRepo; private final OperatorInstanceRepository operatorInstanceRepo; @@ -66,19 +67,24 @@ public class CleaningTaskService { private final String FLOW_PATH = "/flow"; + private final Pattern LEVEL_PATTERN = Pattern.compile( + "\\b(TRACE|DEBUG|INFO|WARN|WARNING|ERROR|FATAL)\\b", + Pattern.CASE_INSENSITIVE + ); + public List getTasks(String status, String keywords, Integer page, Integer size) { - List tasks = CleaningTaskRepo.findTasks(status, keywords, page, size); + List tasks = cleaningTaskRepo.findTasks(status, keywords, page, size); tasks.forEach(this::setProcess); return tasks; } private void setProcess(CleaningTaskDto task) { - int count = cleaningResultRepo.countByInstanceId(task.getId()); - task.setProgress(CleaningProcess.of(task.getFileCount(), count)); + int[] count = cleaningResultRepo.countByInstanceId(task.getId()); + task.setProgress(CleaningProcess.of(task.getFileCount(), count[0], count[1])); } public int countTasks(String status, String keywords) { - return CleaningTaskRepo.findTasks(status, keywords, null, null).size(); + return cleaningTaskRepo.findTasks(status, keywords, null, null).size(); } @Transactional @@ -105,7 +111,7 @@ public class CleaningTaskService { task.setDestDatasetName(destDataset.getName()); task.setBeforeSize(srcDataset.getSizeBytes()); task.setFileCount(srcDataset.getFileCount().intValue()); - CleaningTaskRepo.insertTask(task); + cleaningTaskRepo.insertTask(task); operatorInstanceRepo.insertInstance(taskId, request.getInstance()); @@ -116,14 +122,50 @@ public class CleaningTaskService { } public CleaningTaskDto getTask(String taskId) { - CleaningTaskDto task = CleaningTaskRepo.findTaskById(taskId); + CleaningTaskDto task = cleaningTaskRepo.findTaskById(taskId); setProcess(task); + task.setInstance(operatorInstanceRepo.findOperatorByInstanceId(taskId)); return task; } + public List getTaskResults(String taskId) { + return cleaningResultRepo.findByInstanceId(taskId); + } + + public List getTaskLog(String taskId) { + String logPath = FLOW_PATH + "/" + taskId + "/output.log"; + try (Stream lines = Files.lines(Paths.get(logPath))) { + List logs = new ArrayList<>(); + AtomicReference lastLevel = new AtomicReference<>("INFO"); + lines.forEach(line -> { + lastLevel.set(getLogLevel(line, lastLevel.get())); + CleaningTaskLog log = new CleaningTaskLog(); + log.setLevel(lastLevel.get()); + log.setMessage(line); + logs.add(log); + }); + return logs; + } catch (IOException e) { + log.error("Fail to read log file {}", logPath, e); + return Collections.emptyList(); + } + } + + private String getLogLevel(String logLine, String defaultLevel) { + if (logLine == null || logLine.trim().isEmpty()) { + return defaultLevel; + } + + Matcher matcher = LEVEL_PATTERN.matcher(logLine); + if (matcher.find()) { + return matcher.group(1).toUpperCase(); + } + return defaultLevel; + } + @Transactional public void deleteTask(String taskId) { - CleaningTaskRepo.deleteTaskById(taskId); + cleaningTaskRepo.deleteTaskById(taskId); operatorInstanceRepo.deleteByInstanceId(taskId); cleaningResultRepo.deleteByInstanceId(taskId); } @@ -190,7 +232,7 @@ public class CleaningTaskService { try (BufferedWriter writer = new BufferedWriter(new FileWriter(fileName))) { if (!mapList.isEmpty()) { // 检查列表是否为空,避免异常 - String jsonString = objectMapper.writeValueAsString(mapList.get(0)); + String jsonString = objectMapper.writeValueAsString(mapList.getFirst()); writer.write(jsonString); for (int i = 1; i < mapList.size(); i++) { diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTemplateService.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTemplateService.java index bfd499b..cc20169 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTemplateService.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTemplateService.java @@ -5,7 +5,7 @@ import com.datamate.cleaning.domain.repository.CleaningTemplateRepository; import com.datamate.cleaning.domain.repository.OperatorInstanceRepository; import com.datamate.cleaning.interfaces.dto.*; import com.datamate.cleaning.domain.model.entity.TemplateWithInstance; -import com.datamate.operator.domain.repository.OperatorRepository; +import com.datamate.operator.domain.repository.OperatorViewRepository; import com.datamate.operator.interfaces.dto.OperatorDto; import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.StringUtils; @@ -26,10 +26,11 @@ public class CleaningTemplateService { private final OperatorInstanceRepository operatorInstanceRepo; - private final OperatorRepository operatorRepo; + private final OperatorViewRepository operatorViewRepo; public List getTemplates(String keywords) { - List allOperators = operatorRepo.findAllOperators(); + List allOperators = + operatorViewRepo.findOperatorsByCriteria(null, null, null, null, null); Map operatorsMap = allOperators.stream() .collect(Collectors.toMap(OperatorDto::getId, Function.identity())); List allTemplates = cleaningTemplateRepo.findAllTemplates(keywords); @@ -39,8 +40,8 @@ public class CleaningTemplateService { List value = twi.getValue(); CleaningTemplateDto template = new CleaningTemplateDto(); template.setId(twi.getKey()); - template.setName(value.get(0).getName()); - template.setDescription(value.get(0).getDescription()); + template.setName(value.getFirst().getName()); + template.setDescription(value.getFirst().getDescription()); template.setInstance(value.stream().filter(v -> StringUtils.isNotBlank(v.getOperatorId())) .sorted(Comparator.comparingInt(TemplateWithInstance::getOpIndex)) .map(v -> { @@ -50,8 +51,8 @@ public class CleaningTemplateService { } return operator; }).toList()); - template.setCreatedAt(value.get(0).getCreatedAt()); - template.setUpdatedAt(value.get(0).getUpdatedAt()); + template.setCreatedAt(value.getFirst().getCreatedAt()); + template.setUpdatedAt(value.getFirst().getUpdatedAt()); return template; }).toList(); } @@ -70,17 +71,22 @@ public class CleaningTemplateService { } public CleaningTemplateDto getTemplate(String templateId) { - return cleaningTemplateRepo.findTemplateById(templateId); + CleaningTemplateDto template = cleaningTemplateRepo.findTemplateById(templateId); + template.setInstance(operatorInstanceRepo.findOperatorByInstanceId(templateId)); + return template; } @Transactional public CleaningTemplateDto updateTemplate(String templateId, UpdateCleaningTemplateRequest request) { CleaningTemplateDto template = cleaningTemplateRepo.findTemplateById(templateId); - if (template != null) { - template.setName(request.getName()); - template.setDescription(request.getDescription()); - cleaningTemplateRepo.updateTemplate(template); + if (template == null) { + return null; } + template.setName(request.getName()); + template.setDescription(request.getDescription()); + cleaningTemplateRepo.updateTemplate(template); + operatorInstanceRepo.deleteByInstanceId(templateId); + operatorInstanceRepo.insertInstance(templateId, request.getInstance()); return template; } diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/scheduler/CleaningTaskScheduler.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/scheduler/CleaningTaskScheduler.java index 93b2562..6692ffa 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/scheduler/CleaningTaskScheduler.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/scheduler/CleaningTaskScheduler.java @@ -16,6 +16,8 @@ import java.util.concurrent.Executors; public class CleaningTaskScheduler { private final CleaningTaskRepository cleaningTaskRepo; + private final RuntimeClient runtimeClient; + private final ExecutorService taskExecutor = Executors.newFixedThreadPool(5); public void executeTask(String taskId) { @@ -28,11 +30,11 @@ public class CleaningTaskScheduler { task.setStatus(CleaningTaskStatusEnum.RUNNING); task.setStartedAt(LocalDateTime.now()); cleaningTaskRepo.updateTask(task); - RuntimeClient.submitTask(taskId); + runtimeClient.submitTask(taskId); } public void stopTask(String taskId) { - RuntimeClient.stopTask(taskId); + runtimeClient.stopTask(taskId); CleaningTaskDto task = new CleaningTaskDto(); task.setId(taskId); task.setStatus(CleaningTaskStatusEnum.STOPPED); diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/entity/Operator.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/entity/Operator.java deleted file mode 100644 index 93513a6..0000000 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/entity/Operator.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.datamate.cleaning.domain.model.entity; - -import com.baomidou.mybatisplus.annotation.TableId; -import com.baomidou.mybatisplus.annotation.TableName; -import lombok.Getter; -import lombok.Setter; - -import java.time.LocalDateTime; - -@Getter -@Setter -@TableName(value = "t_operator", autoResultMap = true) -public class Operator { - @TableId - private String id; - - private String name; - - private String description; - - private String version; - - private String inputs; - - private String outputs; - - private String runtime; - - private String settings; - - private Boolean isStar; - - private LocalDateTime createdAt; - - private LocalDateTime updatedAt; -} diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/repository/CleaningResultRepository.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/repository/CleaningResultRepository.java index 31ca1ac..2c018ba 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/repository/CleaningResultRepository.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/repository/CleaningResultRepository.java @@ -3,9 +3,14 @@ package com.datamate.cleaning.domain.repository; import com.baomidou.mybatisplus.extension.repository.IRepository; import com.datamate.cleaning.domain.model.entity.CleaningResult; +import com.datamate.cleaning.interfaces.dto.CleaningResultDto; + +import java.util.List; public interface CleaningResultRepository extends IRepository { void deleteByInstanceId(String instanceId); - int countByInstanceId(String instanceId); + int[] countByInstanceId(String instanceId); + + List findByInstanceId(String instanceId); } diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/repository/OperatorInstanceRepository.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/repository/OperatorInstanceRepository.java index 4fbcc2a..c956c17 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/repository/OperatorInstanceRepository.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/repository/OperatorInstanceRepository.java @@ -3,6 +3,7 @@ package com.datamate.cleaning.domain.repository; import com.baomidou.mybatisplus.extension.repository.IRepository; import com.datamate.cleaning.interfaces.dto.OperatorInstanceDto; import com.datamate.cleaning.domain.model.entity.OperatorInstance; +import com.datamate.operator.interfaces.dto.OperatorDto; import java.util.List; @@ -10,4 +11,6 @@ public interface OperatorInstanceRepository extends IRepository instances); void deleteByInstanceId(String instanceId); + + List findOperatorByInstanceId(String instanceId); } diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/converter/CleaningResultConverter.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/converter/CleaningResultConverter.java new file mode 100644 index 0000000..b8866a0 --- /dev/null +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/converter/CleaningResultConverter.java @@ -0,0 +1,15 @@ +package com.datamate.cleaning.infrastructure.converter; + +import com.datamate.cleaning.domain.model.entity.CleaningResult; +import com.datamate.cleaning.interfaces.dto.CleaningResultDto; +import org.mapstruct.Mapper; +import org.mapstruct.factory.Mappers; + +import java.util.List; + +@Mapper +public interface CleaningResultConverter { + CleaningResultConverter INSTANCE = Mappers.getMapper(CleaningResultConverter.class); + + List convertEntityToDto(List cleaningResult); +} diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/converter/OperatorInstanceConverter.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/converter/OperatorInstanceConverter.java index eebeeec..556a4d5 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/converter/OperatorInstanceConverter.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/converter/OperatorInstanceConverter.java @@ -2,10 +2,10 @@ package com.datamate.cleaning.infrastructure.converter; import com.datamate.cleaning.domain.model.entity.OperatorInstance; -import com.datamate.cleaning.domain.model.entity.Operator; import com.datamate.cleaning.interfaces.dto.OperatorInstanceDto; import com.datamate.common.infrastructure.exception.BusinessException; import com.datamate.common.infrastructure.exception.SystemErrorCode; +import com.datamate.operator.domain.model.OperatorView; import com.datamate.operator.interfaces.dto.OperatorDto; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -14,6 +14,8 @@ import org.mapstruct.Mapping; import org.mapstruct.Named; import org.mapstruct.factory.Mappers; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -35,5 +37,16 @@ public interface OperatorInstanceConverter { } } - List fromEntityToDto(List operator); + @Mapping(target = "categories", source = "categories", qualifiedByName = "stringToList") + OperatorDto fromEntityToDto(OperatorView operator); + + List fromEntityToDto(List operator); + + @Named("stringToList") + default List stringToList(String input) { + if (input == null || input.isEmpty()) { + return Collections.emptyList(); + } + return Arrays.stream(input.split(",")).toList(); + } } diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/httpclient/RuntimeClient.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/httpclient/RuntimeClient.java index d3b65e5..0c71356 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/httpclient/RuntimeClient.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/httpclient/RuntimeClient.java @@ -3,6 +3,8 @@ package com.datamate.cleaning.infrastructure.httpclient; import com.datamate.common.infrastructure.exception.BusinessException; import com.datamate.common.infrastructure.exception.SystemErrorCode; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; import java.io.IOException; import java.net.URI; @@ -13,24 +15,36 @@ import java.text.MessageFormat; import java.time.Duration; @Slf4j +@Component public class RuntimeClient { - private static final String BASE_URL = "http://datamate-runtime:8081/api"; + private final String CREATE_TASK_URL = "/api/task/{0}/submit"; - private static final String CREATE_TASK_URL = BASE_URL + "/task/{0}/submit"; + private final String STOP_TASK_URL = "/api/task/{0}/stop"; - private static final String STOP_TASK_URL = BASE_URL + "/task/{0}/stop"; + @Value("${runtime.protocol:http}") + private String protocol; - private static final HttpClient CLIENT = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(10)).build(); + @Value("${runtime.host:datamate-runtime}") + private String host; - public static void submitTask(String taskId) { - send(MessageFormat.format(CREATE_TASK_URL, taskId)); + @Value("${runtime.port:8081}") + private int port; + + private final HttpClient CLIENT = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(10)).build(); + + public void submitTask(String taskId) { + send(MessageFormat.format(getRequestUrl(CREATE_TASK_URL), taskId)); } - public static void stopTask(String taskId) { - send(MessageFormat.format(STOP_TASK_URL, taskId)); + public void stopTask(String taskId) { + send(MessageFormat.format(getRequestUrl(STOP_TASK_URL), taskId)); } - private static void send(String url) { + private String getRequestUrl(String url) { + return protocol + "://" + host + ":" + port + url; + } + + private void send(String url) { HttpRequest request = HttpRequest.newBuilder() .uri(URI.create(url)) .timeout(Duration.ofSeconds(30)) diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/Impl/CleaningResultRepositoryImpl.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/Impl/CleaningResultRepositoryImpl.java index e69be66..9e012ba 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/Impl/CleaningResultRepositoryImpl.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/Impl/CleaningResultRepositoryImpl.java @@ -2,12 +2,18 @@ package com.datamate.cleaning.infrastructure.persistence.Impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.repository.CrudRepository; +import com.datamate.cleaning.common.enums.CleaningTaskStatusEnum; import com.datamate.cleaning.domain.model.entity.CleaningResult; import com.datamate.cleaning.domain.repository.CleaningResultRepository; +import com.datamate.cleaning.infrastructure.converter.CleaningResultConverter; import com.datamate.cleaning.infrastructure.persistence.mapper.CleaningResultMapper; +import com.datamate.cleaning.interfaces.dto.CleaningResultDto; import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Repository; +import java.util.List; + @Repository @RequiredArgsConstructor public class CleaningResultRepositoryImpl extends CrudRepository @@ -22,9 +28,20 @@ public class CleaningResultRepositoryImpl extends CrudRepository lambdaWrapper = new LambdaQueryWrapper<>(); lambdaWrapper.eq(CleaningResult::getInstanceId, instanceId); - return mapper.selectCount(lambdaWrapper).intValue(); + List cleaningResults = mapper.selectList(lambdaWrapper); + int succeed = Math.toIntExact(cleaningResults.stream() + .filter(result -> + StringUtils.equals(result.getStatus(), CleaningTaskStatusEnum.COMPLETED.getValue())) + .count()); + return new int[] {succeed, cleaningResults.size() - succeed}; + } + + public List findByInstanceId(String instanceId) { + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + queryWrapper.eq(CleaningResult::getInstanceId, instanceId); + return CleaningResultConverter.INSTANCE.convertEntityToDto(mapper.selectList(queryWrapper)); } } diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/Impl/OperatorInstanceRepositoryImpl.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/Impl/OperatorInstanceRepositoryImpl.java index a3b197f..e5fe038 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/Impl/OperatorInstanceRepositoryImpl.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/Impl/OperatorInstanceRepositoryImpl.java @@ -7,6 +7,7 @@ import com.datamate.cleaning.interfaces.dto.OperatorInstanceDto; import com.datamate.cleaning.domain.model.entity.OperatorInstance; import com.datamate.cleaning.domain.repository.OperatorInstanceRepository; import com.datamate.cleaning.infrastructure.persistence.mapper.OperatorInstanceMapper; +import com.datamate.operator.interfaces.dto.OperatorDto; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Repository; @@ -37,4 +38,8 @@ public class OperatorInstanceRepositoryImpl extends CrudRepository findOperatorByInstanceId(String instanceId) { + return OperatorInstanceConverter.INSTANCE.fromEntityToDto(mapper.findOperatorByInstanceId(instanceId)); + } } diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/mapper/OperatorInstanceMapper.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/mapper/OperatorInstanceMapper.java index 0b0699d..f0e81b2 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/mapper/OperatorInstanceMapper.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/mapper/OperatorInstanceMapper.java @@ -2,9 +2,22 @@ package com.datamate.cleaning.infrastructure.persistence.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.datamate.cleaning.domain.model.entity.OperatorInstance; +import com.datamate.operator.domain.model.OperatorView; import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Select; + +import java.util.List; @Mapper public interface OperatorInstanceMapper extends BaseMapper { + @Select("SELECT o.operator_id as id, o.operator_name as name, description, version, inputs, outputs, runtime, " + + " settings, created_at, updated_at, " + + "GROUP_CONCAT(category_id ORDER BY created_at DESC SEPARATOR ',') AS categories " + + "FROM t_operator_instance toi LEFT JOIN v_operator o ON toi.operator_id = o.operator_id " + + "WHERE toi.instance_id = #{instanceId} " + + "GROUP BY o.operator_id, o.operator_name, description, version, inputs, outputs, runtime," + + " settings, created_at, updated_at, op_index " + + "ORDER BY toi.op_index") + List findOperatorByInstanceId(String instanceId); } diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningProcess.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningProcess.java index 1decd56..0f5c7f3 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningProcess.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningProcess.java @@ -16,23 +16,37 @@ import java.math.RoundingMode; public class CleaningProcess { private Float process; + private Float successRate; + private Integer totalFileNum; + private Integer succeedFileNum; + + private Integer failedFileNum; + private Integer finishedFileNum; - public CleaningProcess(int totalFileNum, int finishedFileNum) { + public CleaningProcess(int totalFileNum, int succeedFileNum, int failedFileNum) { this.totalFileNum = totalFileNum; - this.finishedFileNum = finishedFileNum; + this.succeedFileNum = succeedFileNum; + this.failedFileNum = failedFileNum; + this.finishedFileNum = succeedFileNum + failedFileNum; if (totalFileNum == 0) { this.process = 0.0f; } else { this.process = BigDecimal.valueOf(finishedFileNum * 100L) .divide(BigDecimal.valueOf(totalFileNum), 2, RoundingMode.HALF_UP).floatValue(); } + if (finishedFileNum == 0) { + this.successRate = 0f; + } else { + this.successRate = BigDecimal.valueOf(succeedFileNum * 100L) + .divide(BigDecimal.valueOf(finishedFileNum), 2, RoundingMode.HALF_UP).floatValue(); + } } - public static CleaningProcess of(int totalFileNum, int finishedFileNum) { - return new CleaningProcess(totalFileNum, finishedFileNum); + public static CleaningProcess of(int totalFileNum, int succeedFileNum, int failedFileNum) { + return new CleaningProcess(totalFileNum, succeedFileNum, failedFileNum); } } diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningResultDto.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningResultDto.java new file mode 100644 index 0000000..151abe0 --- /dev/null +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningResultDto.java @@ -0,0 +1,30 @@ +package com.datamate.cleaning.interfaces.dto; + +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public class CleaningResultDto { + private String instanceId; + + private String srcFileId; + + private String destFileId; + + private String srcName; + + private String destName; + + private String srcType; + + private String destType; + + private long srcSize; + + private long destSize; + + private String status; + + private String result; +} diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningTaskLog.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningTaskLog.java new file mode 100644 index 0000000..b5d45ea --- /dev/null +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningTaskLog.java @@ -0,0 +1,12 @@ +package com.datamate.cleaning.interfaces.dto; + +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public class CleaningTaskLog { + private String level; + + private String message; +} diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/rest/CleaningTaskController.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/rest/CleaningTaskController.java index 811f53e..bdca1dc 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/rest/CleaningTaskController.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/rest/CleaningTaskController.java @@ -1,7 +1,9 @@ package com.datamate.cleaning.interfaces.rest; import com.datamate.cleaning.application.CleaningTaskService; +import com.datamate.cleaning.interfaces.dto.CleaningResultDto; import com.datamate.cleaning.interfaces.dto.CleaningTaskDto; +import com.datamate.cleaning.interfaces.dto.CleaningTaskLog; import com.datamate.cleaning.interfaces.dto.CreateCleaningTaskRequest; import com.datamate.common.interfaces.PagedResponse; import lombok.RequiredArgsConstructor; @@ -54,4 +56,14 @@ public class CleaningTaskController { cleaningTaskService.deleteTask(taskId); return taskId; } + + @GetMapping("/{taskId}/result") + public List cleaningTasksTaskIdGetResult(@PathVariable("taskId") String taskId) { + return cleaningTaskService.getTaskResults(taskId); + } + + @GetMapping("/{taskId}/log") + public List cleaningTasksTaskIdGetLog(@PathVariable("taskId") String taskId) { + return cleaningTaskService.getTaskLog(taskId); + } } diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/application/OperatorService.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/application/OperatorService.java index 3442b0c..da0a883 100644 --- a/backend/services/operator-market-service/src/main/java/com/datamate/operator/application/OperatorService.java +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/application/OperatorService.java @@ -50,9 +50,7 @@ public class OperatorService { public List getOperators(Integer page, Integer size, List categories, String operatorName, Boolean isStar) { - List filteredOperators = operatorViewRepo.findOperatorsByCriteria(page, size, operatorName, - categories, isStar); - return filteredOperators.stream().map(OperatorConverter.INSTANCE::fromEntityToDto).toList(); + return operatorViewRepo.findOperatorsByCriteria(page, size, operatorName, categories, isStar); } public int getOperatorsCount(List categories, String operatorName, Boolean isStar) { diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/repository/OperatorViewRepository.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/repository/OperatorViewRepository.java index fbbdf64..a1c0688 100644 --- a/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/repository/OperatorViewRepository.java +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/repository/OperatorViewRepository.java @@ -2,12 +2,13 @@ package com.datamate.operator.domain.repository; import com.baomidou.mybatisplus.extension.repository.IRepository; import com.datamate.operator.domain.model.OperatorView; +import com.datamate.operator.interfaces.dto.OperatorDto; import java.util.List; public interface OperatorViewRepository extends IRepository { - List findOperatorsByCriteria(Integer page, Integer size, String operatorName, - List categories, Boolean isStar); + List findOperatorsByCriteria(Integer page, Integer size, String operatorName, + List categories, Boolean isStar); Integer countOperatorsByCriteria(String operatorName, List categories, Boolean isStar); diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/converter/OperatorConverter.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/converter/OperatorConverter.java index 791356e..f531ce3 100644 --- a/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/converter/OperatorConverter.java +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/converter/OperatorConverter.java @@ -21,6 +21,8 @@ public interface OperatorConverter { @Mapping(target = "categories", source = "categories", qualifiedByName = "stringToList") OperatorDto fromEntityToDto(OperatorView operator); + List fromEntityViewToDto(List operator); + List fromEntityToDto(List operator); @Named("stringToList") diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/persistence/Impl/OperatorViewRepositoryImpl.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/persistence/Impl/OperatorViewRepositoryImpl.java index 7ec6a05..1fe41e0 100644 --- a/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/persistence/Impl/OperatorViewRepositoryImpl.java +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/persistence/Impl/OperatorViewRepositoryImpl.java @@ -7,7 +7,9 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.repository.CrudRepository; import com.datamate.operator.domain.model.OperatorView; import com.datamate.operator.domain.repository.OperatorViewRepository; +import com.datamate.operator.infrastructure.converter.OperatorConverter; import com.datamate.operator.infrastructure.persistence.mapper.OperatorViewMapper; +import com.datamate.operator.interfaces.dto.OperatorDto; import io.micrometer.common.util.StringUtils; import lombok.RequiredArgsConstructor; import org.apache.commons.collections4.CollectionUtils; @@ -21,20 +23,23 @@ public class OperatorViewRepositoryImpl extends CrudRepository findOperatorsByCriteria(Integer page, Integer size, String operatorName, - List categories, Boolean isStar) { + public List findOperatorsByCriteria(Integer page, Integer size, String operatorName, + List categories, Boolean isStar) { QueryWrapper queryWrapper = Wrappers.query(); queryWrapper.in(CollectionUtils.isNotEmpty(categories), "category_id", categories) .like(StringUtils.isNotBlank(operatorName), "operator_name", operatorName) .eq(isStar != null, "is_star", isStar) .groupBy("operator_id") .orderByDesc("created_at"); - Page queryPage = null; + Page queryPage; if (size != null && page != null) { queryPage = new Page<>(page + 1, size); + } else { + queryPage = new Page<>(1, -1); } IPage operators = mapper.findOperatorsByCriteria(queryPage, queryWrapper); - return operators.getRecords(); + + return OperatorConverter.INSTANCE.fromEntityViewToDto(operators.getRecords()); } @Override diff --git a/frontend/src/pages/DataCleansing/Create/CreateTempate.tsx b/frontend/src/pages/DataCleansing/Create/CreateTemplate.tsx similarity index 73% rename from frontend/src/pages/DataCleansing/Create/CreateTempate.tsx rename to frontend/src/pages/DataCleansing/Create/CreateTemplate.tsx index 2f7876e..bb67588 100644 --- a/frontend/src/pages/DataCleansing/Create/CreateTempate.tsx +++ b/frontend/src/pages/DataCleansing/Create/CreateTemplate.tsx @@ -1,13 +1,18 @@ -import { useState } from "react"; -import { Card, Button, Steps, Form, Divider } from "antd"; -import { Link, useNavigate } from "react-router"; +import {useEffect, useState} from "react"; +import {Button, Steps, Form, message} from "antd"; +import {Link, useNavigate, useParams} from "react-router"; import { ArrowLeft } from "lucide-react"; -import { createCleaningTemplateUsingPost } from "../cleansing.api"; +import { + createCleaningTemplateUsingPost, + queryCleaningTemplateByIdUsingGet, + updateCleaningTemplateByIdUsingPut +} from "../cleansing.api"; import CleansingTemplateStepOne from "./components/CreateTemplateStepOne"; import { useCreateStepTwo } from "./hooks/useCreateStepTwo"; export default function CleansingTemplateCreate() { + const { id = "" } = useParams() const navigate = useNavigate(); const [form] = Form.useForm(); const [templateConfig, setTemplateConfig] = useState({ @@ -15,6 +20,21 @@ export default function CleansingTemplateCreate() { description: "", }); + const fetchTemplateDetail = async () => { + if (!id) return; + try { + const { data } = await queryCleaningTemplateByIdUsingGet(id); + setTemplateConfig(data); + } catch (error) { + message.error("获取任务详情失败"); + navigate("/data/cleansing"); + } + }; + + useEffect(() => { + fetchTemplateDetail() + }, [id]); + const handleSave = async () => { const template = { ...templateConfig, @@ -27,7 +47,8 @@ export default function CleansingTemplateCreate() { })), }; - await createCleaningTemplateUsingPost(template); + !id && await createCleaningTemplateUsingPost(template) && message.success("模板创建成功"); + id && await updateCleaningTemplateByIdUsingPut(id, template) && message.success("模板更新成功"); navigate("/data/cleansing?view=template"); }; @@ -79,7 +100,7 @@ export default function CleansingTemplateCreate() { -

创建清洗模板

+

{id ? '更新清洗模板' : '创建清洗模板'}

- 创建模板 + {id ? '更新模板' : '创建模板'} ) : (
- {showPoppular && operator.isStar && ( - - 热门 - - )} { - e.stopPropagation(); - toggleFavorite(operator.id); - }} + onClick={() => handleStar(operator, toggleFavorite)} > {favorites.has(operator.id) ? ( @@ -156,10 +148,9 @@ const OperatorLibrary: React.FC = ({ // 过滤算子 const filteredOperators = useMemo(() => { - const filtered = Object.values(groupedOperators).flatMap( + return Object.values(groupedOperators).flatMap( (category) => category.operators ); - return filtered; }, [groupedOperators]); // 收藏切换 @@ -173,6 +164,18 @@ const OperatorLibrary: React.FC = ({ setFavorites(newFavorites); }; + const fetchFavorite = async () => { + const newFavorites = new Set(favorites); + operatorList.forEach(item => { + item.isStar && newFavorites.add(item.id); + }); + setFavorites(newFavorites); + } + + useEffect(() => { + fetchFavorite() + }, [operatorList]); + // 全选分类算子 const handleSelectAll = (operators: OperatorI[]) => { const newSelected = [...selectedOperators]; @@ -257,7 +260,6 @@ const OperatorLibrary: React.FC = ({ } > void; removeOperator: (id: string) => void; setSelectedOperators: (operators: OperatorI[]) => void; @@ -33,6 +34,7 @@ const OperatorFlow: React.FC = ({ configOperator, templates, currentTemplate, + categoryOptions, setSelectedOperators, setConfigOperator, removeOperator, @@ -47,6 +49,16 @@ const OperatorFlow: React.FC = ({ }) => { const [editingIndex, setEditingIndex] = useState(null); + const categoryMap = useMemo(() => { + const map: { [key: string]: CategoryI } = {}; + categoryOptions.forEach((cat: any) => { + map[cat.id] = { + ...cat, + }; + }); + return map; + }, [categoryOptions]); + // 添加编号修改处理函数 const handleIndexChange = (operatorId: string, newIndex: string) => { const index = Number.parseInt(newIndex); @@ -167,8 +179,9 @@ const OperatorFlow: React.FC = ({ {operator.name} - {/* 分类标签 */} - 分类 + {operator?.categories?.map((categoryId) => { + return {categoryMap[categoryId].name} + })} {/* 参数状态指示 */} {Object.values(operator.configs).some( (param: any) => @@ -192,7 +205,7 @@ const OperatorFlow: React.FC = ({ ))} {selectedOperators.length === 0 && (
- +
开始构建您的算子流程
从左侧算子库拖拽算子到此处,或点击算子添加 diff --git a/frontend/src/pages/DataCleansing/Create/hooks/useCreateStepTwo.tsx b/frontend/src/pages/DataCleansing/Create/hooks/useCreateStepTwo.tsx index 63354bc..cae4d6b 100644 --- a/frontend/src/pages/DataCleansing/Create/hooks/useCreateStepTwo.tsx +++ b/frontend/src/pages/DataCleansing/Create/hooks/useCreateStepTwo.tsx @@ -55,6 +55,7 @@ export function useCreateStepTwo() { configOperator={configOperator} templates={templates} currentTemplate={currentTemplate} + categoryOptions={categoryOptions} setSelectedOperators={setSelectedOperators} setConfigOperator={setConfigOperator} setCurrentTemplate={setCurrentTemplate} diff --git a/frontend/src/pages/DataCleansing/Create/hooks/useOperatorOperations.ts b/frontend/src/pages/DataCleansing/Create/hooks/useOperatorOperations.ts index 45220df..ba257a7 100644 --- a/frontend/src/pages/DataCleansing/Create/hooks/useOperatorOperations.ts +++ b/frontend/src/pages/DataCleansing/Create/hooks/useOperatorOperations.ts @@ -1,13 +1,15 @@ import { useEffect, useState } from "react"; import { OperatorI } from "@/pages/OperatorMarket/operator.model"; import { CleansingTemplate } from "../../cleansing.model"; -import { queryCleaningTemplatesUsingGet } from "../../cleansing.api"; +import {queryCleaningTemplateByIdUsingGet, queryCleaningTemplatesUsingGet} from "../../cleansing.api"; import { queryCategoryTreeUsingGet, queryOperatorsUsingPost, } from "@/pages/OperatorMarket/operator.api"; +import {useParams} from "react-router"; export function useOperatorOperations() { + const { id = "" } = useParams(); const [currentStep, setCurrentStep] = useState(1); const [operators, setOperators] = useState([]); @@ -21,7 +23,7 @@ export function useOperatorOperations() { // 将后端返回的算子数据映射为前端需要的格式 const mapOperator = (op: OperatorI) => { const configs = - op.settings && typeof op.settings === "string" + op.settings ? JSON.parse(op.settings) : {}; const defaultParams: Record = {}; @@ -64,14 +66,26 @@ export function useOperatorOperations() { }; const initTemplates = async () => { - const { data } = await queryCleaningTemplatesUsingGet(); - const newTemplates = - data.content?.map?.((item) => ({ - ...item, - label: item.name, - value: item.id, - })) || []; - setTemplates(newTemplates); + if (id) { + const { data } = await queryCleaningTemplateByIdUsingGet(id); + const template = { + ...data, + label: data.name, + value: data.id, + } + setTemplates([template]) + setCurrentTemplate(template) + } else { + const { data } = await queryCleaningTemplatesUsingGet(); + const newTemplates = + data.content?.map?.((item) => ({ + ...item, + label: item.name, + value: item.id, + })) || []; + setTemplates(newTemplates); + setCurrentTemplate(newTemplates?.[0]) + } }; useEffect(() => { diff --git a/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx b/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx index ea5a5b2..745b701 100644 --- a/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx +++ b/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx @@ -1,29 +1,30 @@ import { useEffect, useState } from "react"; -import { Card, Breadcrumb, App } from "antd"; +import {Breadcrumb, App, Tabs} from "antd"; import { Play, Pause, Clock, CheckCircle, AlertCircle, - Database, Trash2, - Activity, + Activity, LayoutList, } from "lucide-react"; import DetailHeader from "@/components/DetailHeader"; import { Link, useNavigate, useParams } from "react-router"; import { deleteCleaningTaskByIdUsingDelete, executeCleaningTaskUsingPost, - queryCleaningTaskByIdUsingGet, + queryCleaningTaskByIdUsingGet, queryCleaningTaskLogByIdUsingGet, queryCleaningTaskResultByIdUsingGet, stopCleaningTaskUsingPost, } from "../cleansing.api"; -import { TaskStatusMap } from "../cleansing.const"; -import { TaskStatus } from "@/pages/DataCleansing/cleansing.model"; +import {mapTask, TaskStatusMap} from "../cleansing.const"; +import {CleansingResult, TaskStatus} from "@/pages/DataCleansing/cleansing.model"; import BasicInfo from "./components/BasicInfo"; import OperatorTable from "./components/OperatorTable"; import FileTable from "./components/FileTable"; import LogsTable from "./components/LogsTable"; +import {formatExecutionDuration} from "@/utils/unit.ts"; +import {ReloadOutlined} from "@ant-design/icons"; // 任务详情页面组件 export default function CleansingTaskDetail() { @@ -35,7 +36,7 @@ export default function CleansingTaskDetail() { if (!id) return; try { const { data } = await queryCleaningTaskByIdUsingGet(id); - setTask(data); + setTask(mapTask(data)); } catch (error) { message.error("获取任务详情失败"); navigate("/data/cleansing"); @@ -60,6 +61,38 @@ export default function CleansingTaskDetail() { navigate("/data/cleansing"); }; + const [result, setResult] = useState(); + + const fetchTaskResult = async () => { + if (!id) return; + try { + const { data } = await queryCleaningTaskResultByIdUsingGet(id); + setResult(data); + } catch (error) { + message.error("获取清洗结果失败"); + navigate("/data/cleansing/task-detail/" + id); + } + }; + + const [taskLog, setTaskLog] = useState(); + + const fetchTaskLog = async () => { + if (!id) return; + try { + const { data } = await queryCleaningTaskLogByIdUsingGet(id); + setTaskLog(data); + } catch (error) { + message.error("获取清洗日志失败"); + navigate("/data/cleansing/task-detail/" + id); + } + }; + + const handleRefresh = async () => { + fetchTaskDetail(); + {activeTab === "files" && await fetchTaskResult()} + {activeTab === "logs" && await fetchTaskLog()} + }; + useEffect(() => { fetchTaskDetail(); }, [id]); @@ -69,9 +102,9 @@ export default function CleansingTaskDetail() { const headerData = { ...task, - icon: , + icon: , status: TaskStatusMap[task?.status], - createdAt: task?.startTime, + createdAt: task?.createdAt, lastUpdated: task?.updatedAt, }; @@ -79,22 +112,24 @@ export default function CleansingTaskDetail() { { icon: , label: "总耗时", - value: task?.duration || "--", + value: formatExecutionDuration(task?.startedAt, task?.finishedAt) || "--", }, { icon: , label: "成功文件", - value: task?.successFiles || "--", + value: task?.progress?.succeedFileNum || "0", }, { icon: , label: "失败文件", - value: task?.failedFiles || "--", + value: (task?.status.value === TaskStatus.RUNNING || task?.status.value === TaskStatus.PENDING) ? + task?.progress.failedFileNum : + task?.progress?.totalFileNum - task?.progress.succeedFileNum, }, { icon: , label: "成功率", - value: `${task?.progress}%`, + value: task?.progress?.successRate ? task?.progress?.successRate + "%" : "--", }, ]; @@ -109,7 +144,7 @@ export default function CleansingTaskDetail() { }, ] : []), - ...(task?.status === TaskStatus.PENDING + ...([TaskStatus.PENDING, TaskStatus.STOPPED, TaskStatus.FAILED].includes(task?.status?.value) ? [ { key: "start", @@ -119,6 +154,12 @@ export default function CleansingTaskDetail() { }, ] : []), + { + key: "refresh", + label: "更新任务", + icon: , + onClick: handleRefresh, + }, { key: "delete", label: "删除任务", @@ -131,20 +172,20 @@ export default function CleansingTaskDetail() { const tabList = [ { key: "basic", - tab: "基本信息", - children: , + label: "基本信息", }, { key: "operators", - tab: "处理算子", - children: , + label: "处理算子", }, { key: "files", - tab: "处理文件", - children: , + label: "处理文件", + }, + { + key: "logs", + label: "运行日志", }, - { key: "logs", tab: "运行日志", children: }, ]; const breadItems = [ @@ -157,7 +198,7 @@ export default function CleansingTaskDetail() { ]; return ( -
+ <>
- -
+
+ +
+ {activeTab === "basic" && ( + + )} + {activeTab === "operators" && } + {activeTab === "files" && } + {activeTab === "logs" && } +
+
+ ); } diff --git a/frontend/src/pages/DataCleansing/Detail/TemplateDetail.tsx b/frontend/src/pages/DataCleansing/Detail/TemplateDetail.tsx new file mode 100644 index 0000000..ae6ae27 --- /dev/null +++ b/frontend/src/pages/DataCleansing/Detail/TemplateDetail.tsx @@ -0,0 +1,122 @@ +import { useEffect, useState } from "react"; +import {Breadcrumb, App, Tabs} from "antd"; +import { + Trash2, + LayoutList, +} from "lucide-react"; +import DetailHeader from "@/components/DetailHeader"; +import { Link, useNavigate, useParams } from "react-router"; +import { + deleteCleaningTemplateByIdUsingDelete, + queryCleaningTemplateByIdUsingGet, +} from "../cleansing.api"; +import {mapTemplate} from "../cleansing.const"; +import OperatorTable from "./components/OperatorTable"; +import {EditOutlined, ReloadOutlined, NumberOutlined} from "@ant-design/icons"; + +// 任务详情页面组件 +export default function CleansingTemplateDetail() { + const { id = "" } = useParams(); // 获取动态路由参数 + const { message } = App.useApp(); + const navigate = useNavigate(); + const [template, setTemplate] = useState(); + + const fetchTemplateDetail = async () => { + if (!id) return; + try { + const { data } = await queryCleaningTemplateByIdUsingGet(id); + setTemplate(mapTemplate(data)); + } catch (error) { + message.error("获取任务详情失败"); + navigate("/data/cleansing"); + } + }; + + const deleteTemplate = async () => { + await deleteCleaningTemplateByIdUsingDelete(id); + message.success("模板已删除"); + navigate("/data/cleansing"); + }; + + const handleRefresh = async () => { + fetchTemplateDetail(); + }; + + useEffect(() => { + fetchTemplateDetail(); + }, [id]); + + const [activeTab, setActiveTab] = useState("operators"); + + const headerData = { + ...template, + icon: , + createdAt: template?.createdAt, + lastUpdated: template?.updatedAt, + }; + + const statistics = [ + { + icon: , + label: "算子数量", + value: template?.instance?.length || 0, + }, + ]; + + const operations = [ + { + key: "update", + label: "更新任务", + icon: , + onClick: () => navigate(`/data/cleansing/update-template/${id}`), + }, + { + key: "refresh", + label: "更新任务", + icon: , + onClick: handleRefresh, + }, + { + key: "delete", + label: "删除任务", + icon: , + danger: true, + onClick: deleteTemplate, + }, + ]; + + const tabList = [ + { + key: "operators", + label: "处理算子", + }, + ]; + + const breadItems = [ + { + title: 数据清洗, + }, + { + title: "模板详情", + }, + ]; + + return ( + <> + +
+ +
+
+ +
+ +
+
+ + ); +} diff --git a/frontend/src/pages/DataCleansing/Detail/components/BasicInfo.tsx b/frontend/src/pages/DataCleansing/Detail/components/BasicInfo.tsx index c605f49..f658271 100644 --- a/frontend/src/pages/DataCleansing/Detail/components/BasicInfo.tsx +++ b/frontend/src/pages/DataCleansing/Detail/components/BasicInfo.tsx @@ -1,8 +1,8 @@ -import type { CleansingTask } from "@/pages/DataCleansing/cleansing.model"; -import { OperatorI } from "@/pages/OperatorMarket/operator.model"; -import { Button, Card, Descriptions, Progress, Tag } from "antd"; +import {CleansingTask, TaskStatus} from "@/pages/DataCleansing/cleansing.model"; +import { Button, Card, Descriptions, Progress } from "antd"; import { Activity, AlertCircle, CheckCircle, Clock } from "lucide-react"; import { useNavigate } from "react-router"; +import {formatExecutionDuration} from "@/utils/unit.ts"; export default function BasicInfo({ task }: { task: CleansingTask }) { const navigate = useNavigate(); @@ -11,7 +11,7 @@ export default function BasicInfo({ task }: { task: CleansingTask }) { { key: "id", label: "任务ID", - children: #{task?.id}, + children: {task?.id}, }, { key: "name", label: "任务名称", children: task?.name }, { @@ -19,6 +19,7 @@ export default function BasicInfo({ task }: { task: CleansingTask }) { label: "源数据集", children: ( ), }, - { key: "template", label: "使用模板", children: task?.template }, { key: "startTime", label: "开始时间", children: task?.startedAt }, - { key: "estimatedTime", label: "预计用时", children: task?.estimatedTime }, { key: "description", label: "任务描述", children: ( - {task?.description || "暂无描述"} - ), - span: 2, - }, - { - key: "rules", - label: "处理算子", - children: ( -
- {task?.instance?.map?.((op: OperatorI) => ( - {op.name} - ))} -
+ {task?.description || "--"} ), span: 2, }, @@ -77,28 +65,30 @@ export default function BasicInfo({ task }: { task: CleansingTask }) {
- {task?.duration || "--"} + {formatExecutionDuration(task?.startedAt, task?.finishedAt) || "--"}
总耗时
- {task?.successFiles || "--"} + {task?.progress?.succeedFileNum || "0"}
成功文件
- {task?.failedFiles || "--"} + {(task?.status.value === TaskStatus.RUNNING || task?.status.value === TaskStatus.PENDING) ? + task?.progress.failedFileNum : + task?.progress?.totalFileNum - task?.progress.succeedFileNum}
失败文件
- {task?.progress || "--"} + {task?.progress?.successRate ? task?.progress?.successRate + "%" : "--"}
成功率
@@ -120,25 +110,22 @@ export default function BasicInfo({ task }: { task: CleansingTask }) { {/* 处理进度 */}

处理进度

- +
- 已完成: {task?.processedFiles || "--"} + 已完成: {task?.progress?.succeedFileNum || "0"}
- 处理中: {task?.processingFiles || "--"} -
-
- - - 待处理: {task?.totalFiles - task?.processedFiles || "--"} - + 处理中: {(task?.status.value === TaskStatus.RUNNING || task?.status.value === TaskStatus.PENDING) ? + task?.progress?.totalFileNum - task?.progress.succeedFileNum : 0}
- 失败: {task?.failedFiles || "--"} + 失败: {(task?.status.value === TaskStatus.RUNNING || task?.status.value === TaskStatus.PENDING) ? + task?.progress.failedFileNum : + task?.progress?.totalFileNum - task?.progress.succeedFileNum}
diff --git a/frontend/src/pages/DataCleansing/Detail/components/FileTable.tsx b/frontend/src/pages/DataCleansing/Detail/components/FileTable.tsx index d1554dd..d078091 100644 --- a/frontend/src/pages/DataCleansing/Detail/components/FileTable.tsx +++ b/frontend/src/pages/DataCleansing/Detail/components/FileTable.tsx @@ -1,70 +1,30 @@ -import { Button, Modal, Table, Badge, Input } from "antd"; -import { Download, FileText } from "lucide-react"; -import { useState } from "react"; +import {Button, Modal, Table, Badge, Input} from "antd"; +import { Download } from "lucide-react"; +import {useEffect, useState} from "react"; +import {useParams} from "react-router"; +import {TaskStatus} from "@/pages/DataCleansing/cleansing.model.ts"; +import {TaskStatusMap} from "@/pages/DataCleansing/cleansing.const.tsx"; // 模拟文件列表数据 -const fileList = [ - { - id: 1, - fileName: "lung_cancer_001.svs", - originalSize: "15.2MB", - processedSize: "8.5MB", - status: "已完成", - duration: "2分15秒", - processedAt: "2024-01-20 09:32:40", - }, - { - id: 2, - fileName: "lung_cancer_002.svs", - originalSize: "18.7MB", - processedSize: "10.2MB", - status: "已完成", - duration: "2分38秒", - processedAt: "2024-01-20 09:35:18", - }, - { - id: 3, - fileName: "lung_cancer_003.svs", - originalSize: "12.3MB", - processedSize: "6.8MB", - status: "已完成", - duration: "1分52秒", - processedAt: "2024-01-20 09:37:10", - }, - { - id: 4, - fileName: "lung_cancer_004.svs", - originalSize: "20.1MB", - processedSize: "-", - status: "失败", - duration: "0分45秒", - processedAt: "2024-01-20 09:38:55", - }, - { - id: 5, - fileName: "lung_cancer_005.svs", - originalSize: "16.8MB", - processedSize: "9.3MB", - status: "已完成", - duration: "2分22秒", - processedAt: "2024-01-20 09:41:17", - }, -]; - -export default function FileTable() { +export default function FileTable({result, fetchTaskResult}) { + const { id = "" } = useParams(); const [showFileCompareDialog, setShowFileCompareDialog] = useState(false); - const [showFileLogDialog, setShowFileLogDialog] = useState(false); const [selectedFile, setSelectedFile] = useState(null); - const [selectedFileIds, setSelectedFileIds] = useState([]); + const [selectedFileIds, setSelectedFileIds] = useState([]); + + useEffect(() => { + fetchTaskResult(); + }, [id]); + const handleSelectAllFiles = (checked: boolean) => { if (checked) { - setSelectedFileIds(fileList.map((file) => file.id)); + setSelectedFileIds(result.map((file) => file.instanceId)); } else { setSelectedFileIds([]); } }; - const handleSelectFile = (fileId: number, checked: boolean) => { + const handleSelectFile = (fileId: string, checked: boolean) => { if (checked) { setSelectedFileIds([...selectedFileIds, fileId]); } else { @@ -79,116 +39,16 @@ export default function FileTable() { // 实际下载逻辑 }; - const handleBatchDeleteFiles = () => { - // 实际删除逻辑 - setSelectedFileIds([]); - }; - const handleViewFileLog = (file: any) => { - setSelectedFile(file); - setShowFileLogDialog(true); - }; + function formatFileSize(bytes: number, decimals: number = 2): string { + if (bytes === 0) return '0 Bytes'; - // 模拟单个文件的处理日志 - const getFileProcessLog = (fileName: string) => [ - { - time: "09:30:18", - step: "开始处理", - operator: "格式转换", - status: "INFO", - message: `开始处理文件: ${fileName}`, - }, - { - time: "09:30:19", - step: "文件验证", - operator: "格式转换", - status: "INFO", - message: "验证文件格式和完整性", - }, - { - time: "09:30:20", - step: "格式解析", - operator: "格式转换", - status: "INFO", - message: "解析SVS格式文件", - }, - { - time: "09:30:25", - step: "格式转换", - operator: "格式转换", - status: "SUCCESS", - message: "成功转换为JPEG格式", - }, - { - time: "09:30:26", - step: "噪声检测", - operator: "噪声去除", - status: "INFO", - message: "检测图像噪声水平", - }, - { - time: "09:30:28", - step: "噪声去除", - operator: "噪声去除", - status: "INFO", - message: "应用高斯滤波去除噪声", - }, - { - time: "09:30:31", - step: "噪声去除完成", - operator: "噪声去除", - status: "SUCCESS", - message: "噪声去除处理完成", - }, - { - time: "09:30:32", - step: "尺寸检测", - operator: "尺寸标准化", - status: "INFO", - message: "检测当前图像尺寸: 2048x1536", - }, - { - time: "09:30:33", - step: "尺寸调整", - operator: "尺寸标准化", - status: "INFO", - message: "调整图像尺寸至512x512", - }, - { - time: "09:30:35", - step: "尺寸标准化完成", - operator: "尺寸标准化", - status: "SUCCESS", - message: "图像尺寸标准化完成", - }, - { - time: "09:30:36", - step: "质量检查", - operator: "质量检查", - status: "INFO", - message: "检查图像质量指标", - }, - { - time: "09:30:38", - step: "分辨率检查", - operator: "质量检查", - status: "SUCCESS", - message: "分辨率符合要求", - }, - { - time: "09:30:39", - step: "清晰度检查", - operator: "质量检查", - status: "SUCCESS", - message: "图像清晰度良好", - }, - { - time: "09:30:40", - step: "处理完成", - operator: "质量检查", - status: "SUCCESS", - message: `文件 ${fileName} 处理完成`, - }, - ]; + const k = 1024; + const sizes = ['Bytes', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']; + + const i = Math.floor(Math.log(bytes) / Math.log(k)); + + return parseFloat((bytes / Math.pow(k, i)).toFixed(decimals)) + ' ' + sizes[i]; + } const fileColumns = [ { @@ -196,7 +56,7 @@ export default function FileTable() { 0 + selectedFileIds.length === result?.length && result?.length > 0 } onChange={(e) => handleSelectAllFiles(e.target.checked)} className="w-4 h-4" @@ -205,7 +65,7 @@ export default function FileTable() { dataIndex: "select", key: "select", width: 50, - render: (text: string, record: any) => ( + render: (_text: string, record: any) => ( ), onFilter: (value: string, record: any) => - record.fileName.toLowerCase().includes(value.toLowerCase()), + record.srcName.toLowerCase().includes(value.toLowerCase()), + render: (text: string) => ( + {text?.replace(/\.[^/.]+$/, "")} + ), + }, + { + title: "文件类型", + dataIndex: "srcType", + key: "srcType", + filterDropdown: ({ + setSelectedKeys, + selectedKeys, + confirm, + clearFilters, + }: any) => ( +
+ + setSelectedKeys(e.target.value ? [e.target.value] : []) + } + onPressEnter={() => confirm()} + className="mb-2" + /> +
+ + +
+
+ ), + onFilter: (value: string, record: any) => + record.srcType.toLowerCase().includes(value.toLowerCase()), + render: (text: string) => ( + {text} + ), + }, + { + title: "清洗后文件类型", + dataIndex: "destType", + key: "destType", + filterDropdown: ({ + setSelectedKeys, + selectedKeys, + confirm, + clearFilters, + }: any) => ( +
+ + setSelectedKeys(e.target.value ? [e.target.value] : []) + } + onPressEnter={() => confirm()} + className="mb-2" + /> +
+ + +
+
+ ), + onFilter: (value: string, record: any) => + record.destType.toLowerCase().includes(value.toLowerCase()), render: (text: string) => ( {text} ), }, { title: "清洗前大小", - dataIndex: "originalSize", - key: "originalSize", + dataIndex: "srcSize", + key: "srcSize", sorter: (a: any, b: any) => { const getSizeInBytes = (size: string) => { if (!size || size === "-") return 0; @@ -265,11 +197,14 @@ export default function FileTable() { }; return getSizeInBytes(a.originalSize) - getSizeInBytes(b.originalSize); }, + render: (number: number) => ( + {formatFileSize(number)} + ), }, { title: "清洗后大小", - dataIndex: "processedSize", - key: "processedSize", + dataIndex: "destSize", + key: "destSize", sorter: (a: any, b: any) => { const getSizeInBytes = (size: string) => { if (!size || size === "-") return 0; @@ -283,6 +218,9 @@ export default function FileTable() { getSizeInBytes(a.processedSize) - getSizeInBytes(b.processedSize) ); }, + render: (number: number) => ( + {formatFileSize(number)} + ), }, { title: "状态", @@ -297,43 +235,22 @@ export default function FileTable() { render: (status: string) => ( ), }, - { - title: "执行耗时", - dataIndex: "duration", - key: "duration", - sorter: (a: any, b: any) => { - const getTimeInSeconds = (duration: string) => { - const parts = duration.split(/[分秒]/); - const minutes = Number.parseInt(parts[0]) || 0; - const seconds = Number.parseInt(parts[1]) || 0; - return minutes * 60 + seconds; - }; - return getTimeInSeconds(a.duration) - getTimeInSeconds(b.duration); - }, - }, { title: "操作", key: "action", - render: (text: string, record: any) => ( + render: (_text: string, record: any) => (
- - {record.status === "已完成" && ( + {record.status === "COMPLETED" && (
- 文件格式: SVS -
-
- 分辨率: 2048x1536 -
-
- 色彩空间: RGB -
-
- 压缩方式: 无压缩 + 文件格式: {selectedFile?.srcType}
@@ -460,22 +327,13 @@ export default function FileTable() {
处理后文件预览
- 大小: {selectedFile?.processedSize} + 大小: {formatFileSize(selectedFile?.destSize)}
- 文件格式: JPEG -
-
- 分辨率: 512x512 -
-
- 色彩空间: RGB -
-
- 压缩方式: JPEG压缩 + 文件格式: {selectedFile?.destType}
@@ -485,15 +343,7 @@ export default function FileTable() {
文件大小优化
-
减少了 44.1%
-
-
-
处理时间
-
{selectedFile?.duration}
-
-
-
质量评分
-
优秀 (9.2/10)
+
减少了 {(100 * (selectedFile?.srcSize - selectedFile?.destSize) / selectedFile?.srcSize).toFixed(2)}%
diff --git a/frontend/src/pages/DataCleansing/Detail/components/LogsTable.tsx b/frontend/src/pages/DataCleansing/Detail/components/LogsTable.tsx index 8b9baa2..4e9376a 100644 --- a/frontend/src/pages/DataCleansing/Detail/components/LogsTable.tsx +++ b/frontend/src/pages/DataCleansing/Detail/components/LogsTable.tsx @@ -1,110 +1,43 @@ -export default function LogsTable({ task }: { task: any }) { - // 模拟运行日志 - const runLogs = [ - { - time: "09:30:15", - level: "INFO", - message: "开始执行数据清洗任务: 肺癌WSI图像清洗任务", - }, - { - time: "09:30:16", - level: "INFO", - message: "加载源数据集: 肺癌WSI病理图像数据集 (1250 文件)", - }, - { time: "09:30:17", level: "INFO", message: "初始化算子: 格式转换" }, - { - time: "09:30:18", - level: "INFO", - message: "开始处理文件: lung_cancer_001.svs", - }, - { - time: "09:30:25", - level: "SUCCESS", - message: "文件处理成功: lung_cancer_001.svs -> lung_cancer_001.jpg", - }, - { - time: "09:30:26", - level: "INFO", - message: "开始处理文件: lung_cancer_002.svs", - }, - { - time: "09:30:33", - level: "SUCCESS", - message: "文件处理成功: lung_cancer_002.svs -> lung_cancer_002.jpg", - }, - { - time: "09:58:42", - level: "INFO", - message: "格式转换完成,成功处理 1250/1250 文件", - }, - { time: "09:58:43", level: "INFO", message: "初始化算子: 噪声去除" }, - { - time: "09:58:44", - level: "INFO", - message: "开始处理文件: lung_cancer_001.jpg", - }, - { - time: "09:58:51", - level: "SUCCESS", - message: "噪声去除成功: lung_cancer_001.jpg", - }, - { - time: "10:15:23", - level: "WARNING", - message: "文件质量较低,跳过处理: lung_cancer_156.jpg", - }, - { - time: "10:35:18", - level: "INFO", - message: "噪声去除完成,成功处理 1228/1250 文件", - }, - { time: "10:35:19", level: "INFO", message: "初始化算子: 尺寸标准化" }, - { - time: "11:12:05", - level: "INFO", - message: "尺寸标准化完成,成功处理 1222/1228 文件", - }, - { time: "11:12:06", level: "INFO", message: "初始化算子: 质量检查" }, - { - time: "11:25:33", - level: "ERROR", - message: "质量检查失败: lung_cancer_089.jpg - 分辨率过低", - }, - { - time: "11:45:32", - level: "INFO", - message: "质量检查完成,成功处理 1198/1222 文件", - }, - { - time: "11:45:33", - level: "SUCCESS", - message: "数据清洗任务完成!总成功率: 95.8%", - }, - ]; +import {useEffect} from "react"; +import {useParams} from "react-router"; +import {FileClock} from "lucide-react"; - return ( -
-
- {runLogs?.map?.((log, index) => ( -
- {log.time} - - [{log.level}] - - {log.message} -
- ))} +export default function LogsTable({taskLog, fetchTaskLog} : {taskLog: any[], fetchTaskLog: () => Promise}) { + const { id = "" } = useParams(); + + useEffect(() => { + fetchTaskLog(); + }, [id]); + + return taskLog?.length > 0 ? ( + <> +
+
+ {taskLog?.map?.((log, index) => ( +
+ + [{log.level}] + + {log.message} +
+ ))} +
+ + ) : ( +
+ +

+ 当前任务无可用日志 +

); } diff --git a/frontend/src/pages/DataCleansing/Detail/components/OperatorTable.tsx b/frontend/src/pages/DataCleansing/Detail/components/OperatorTable.tsx index d998bc7..dc556bb 100644 --- a/frontend/src/pages/DataCleansing/Detail/components/OperatorTable.tsx +++ b/frontend/src/pages/DataCleansing/Detail/components/OperatorTable.tsx @@ -1,103 +1,25 @@ -import { Button, Input, Table } from "antd"; +import {Steps, Typography} from "antd"; +import {useNavigate} from "react-router"; -const operators = [ - { - name: "格式转换", - startTime: "09:30:15", - endTime: "09:58:42", - duration: "28分27秒", - status: "成功", - processedFiles: 1250, - successRate: 100, - }, - { - name: "噪声去除", - startTime: "09:58:42", - endTime: "10:35:18", - duration: "36分36秒", - status: "成功", - processedFiles: 1250, - successRate: 98.2, - }, - { - name: "尺寸标准化", - startTime: "10:35:18", - endTime: "11:12:05", - duration: "36分47秒", - status: "成功", - processedFiles: 1228, - successRate: 99.5, - }, - { - name: "质量检查", - startTime: "11:12:05", - endTime: "11:45:32", - duration: "33分27秒", - status: "成功", - processedFiles: 1222, - successRate: 97.8, - }, -]; export default function OperatorTable({ task }: { task: any }) { - const operatorColumns = [ - { - title: "算子名称", - dataIndex: "name", - key: "name", - fixed: "left", - width: 200, - filterDropdown: ({ - setSelectedKeys, - selectedKeys, - confirm, - clearFilters, - }: any) => ( -
- - setSelectedKeys(e.target.value ? [e.target.value] : []) - } - onPressEnter={() => confirm()} - className="mb-2" - /> -
- - -
-
- ), - onFilter: (value: string, record: any) => - record.name.toLowerCase().includes(value.toLowerCase()), - }, - { - title: "版本", - dataIndex: "version", - key: "version", - }, - { - title: "创建时间", - dataIndex: "createdAt", - key: "createdAt", - }, - { - title: "更新时间", - dataIndex: "updatedAt", - key: "updatedAt", - }, - ]; + const navigate = useNavigate(); - return ( - + return task?.instance?.length > 0 && ( + <> + ({ + title: navigate(`/data/operator-market/plugin-detail/${item?.id}`)} + > + {item?.name} + , + description: item?.description, + status: "finish" + }))} + className="overflow-auto" + /> + ); } diff --git a/frontend/src/pages/DataCleansing/Home/components/TaskList.tsx b/frontend/src/pages/DataCleansing/Home/components/TaskList.tsx index ef446a2..a0d4fae 100644 --- a/frontend/src/pages/DataCleansing/Home/components/TaskList.tsx +++ b/frontend/src/pages/DataCleansing/Home/components/TaskList.tsx @@ -43,10 +43,6 @@ export default function TaskList() { handleFiltersChange, } = useFetchData(queryCleaningTasksUsingGet, mapTask); - const handleViewTask = (task: any) => { - navigate("/data/cleansing/task-detail/" + task.id); - }; - const pauseTask = async (item: CleansingTask) => { await stopCleaningTaskUsingPost(item.id); message.success("任务已暂停"); @@ -86,8 +82,12 @@ export default function TaskList() { onClick: startTask, // implement pause/play logic }; return [ - isRunning && pauseBtn, - showStart && startBtn, + ...(isRunning + ? [ pauseBtn ] + : []), + ...(showStart + ? [ startBtn ] + : []), { key: "delete", label: "删除", @@ -106,6 +106,18 @@ export default function TaskList() { fixed: "left", width: 150, ellipsis: true, + render: (_, task: CleansingTask) => { + return ( + + ); + }, }, { title: "任务ID", @@ -273,6 +285,9 @@ export default function TaskList() { data={tableData} operations={taskOperations} pagination={pagination} + onView={(tableData) => { + navigate("/data/cleansing/task-detail/" + tableData.id) + }} /> ) : ( diff --git a/frontend/src/pages/DataCleansing/Home/components/TemplateList.tsx b/frontend/src/pages/DataCleansing/Home/components/TemplateList.tsx index ba26a18..e396e92 100644 --- a/frontend/src/pages/DataCleansing/Home/components/TemplateList.tsx +++ b/frontend/src/pages/DataCleansing/Home/components/TemplateList.tsx @@ -1,21 +1,102 @@ -import { DeleteOutlined } from "@ant-design/icons"; +import {DeleteOutlined, EditOutlined} from "@ant-design/icons"; import CardView from "@/components/CardView"; import { - deleteCleaningTemplateByIdUsingDelete, - queryCleaningTemplatesUsingGet, + deleteCleaningTemplateByIdUsingDelete, queryCleaningTemplatesUsingGet, } from "../../cleansing.api"; import useFetchData from "@/hooks/useFetchData"; -import { mapTemplate } from "../../cleansing.const"; -import { App } from "antd"; -import { CleansingTemplate } from "../../cleansing.model"; +import {mapTemplate} from "../../cleansing.const"; +import {App, Button, Card, Table, Tooltip} from "antd"; +import {CleansingTemplate} from "../../cleansing.model"; +import {SearchControls} from "@/components/SearchControls.tsx"; +import {useNavigate} from "react-router"; +import {useState} from "react"; export default function TemplateList() { + const navigate = useNavigate(); const { message } = App.useApp(); + const [viewMode, setViewMode] = useState<"card" | "list">("list"); - const { tableData, pagination, fetchData } = useFetchData( - queryCleaningTemplatesUsingGet, - mapTemplate - ); + const { + loading, + tableData, + pagination, + searchParams, + setSearchParams, + fetchData, + handleFiltersChange, + } = useFetchData(queryCleaningTemplatesUsingGet, mapTemplate); + + const templateOperations = () => { + return [ + { + key: "update", + label: "编辑", + icon: , + onClick: (template: CleansingTemplate) => navigate(`/data/cleansing/update-template/${template.id}`) + }, + { + key: "delete", + label: "删除", + danger: true, + icon: , + onClick: deleteTemplate, // implement delete logic + }, + ]; + }; + + const templateColumns = [ + { + title: "模板名称", + dataIndex: "name", + key: "name", + fixed: "left", + width: 150, + ellipsis: true, + render: (_, template: CleansingTemplate) => { + return ( + + ); + }}, + { + title: "算子数量", + dataIndex: "num", + key: "num", + width: 100, + ellipsis: true, + render: (_, template: CleansingTemplate) => { + return template.instance?.length ?? 0; + }, + }, + { + title: "操作", + key: "action", + fixed: "right", + width: 20, + render: (text: string, record: any) => ( +
+ {templateOperations(record).map((op) => + op ? ( + +
+ ), + }, + ] const deleteTemplate = async (template: CleansingTemplate) => { if (!template.id) { @@ -27,21 +108,43 @@ export default function TemplateList() { message.success("模板删除成功"); }; - const operations = [ - { - key: "delete", - label: "删除模板", - danger: true, - icon: , - onClick: (template: CleansingTemplate) => deleteTemplate(template), // 可实现删除逻辑 - }, - ]; - return ( - + <> + {/* Search and Filters */} + + setSearchParams({ ...searchParams, keyword }) + } + searchPlaceholder="搜索模板名称、描述" + onFiltersChange={handleFiltersChange} + viewMode={viewMode} + onViewModeChange={setViewMode} + showViewToggle={true} + onReload={fetchData} + onClearFilters={() => setSearchParams({ ...searchParams, filter: {} })} + /> + {viewMode === "card" ? ( + { + navigate("/data/cleansing/template-detail/" + tableData.id) + }} + /> + ) : ( + +
+ + )} + ); } diff --git a/frontend/src/pages/DataCleansing/cleansing.api.ts b/frontend/src/pages/DataCleansing/cleansing.api.ts index 56084db..d96bed0 100644 --- a/frontend/src/pages/DataCleansing/cleansing.api.ts +++ b/frontend/src/pages/DataCleansing/cleansing.api.ts @@ -13,6 +13,14 @@ export function queryCleaningTaskByIdUsingGet(taskId: string | number) { return get(`/api/cleaning/tasks/${taskId}`); } +export function queryCleaningTaskResultByIdUsingGet(taskId: string | number) { + return get(`/api/cleaning/tasks/${taskId}/result`); +} + +export function queryCleaningTaskLogByIdUsingGet(taskId: string | number) { + return get(`/api/cleaning/tasks/${taskId}/log`); +} + export function updateCleaningTaskByIdUsingPut(taskId: string | number, data: any) { return put(`/api/cleaning/tasks/${taskId}`, data); } diff --git a/frontend/src/pages/DataCleansing/cleansing.const.tsx b/frontend/src/pages/DataCleansing/cleansing.const.tsx index 5bf3f69..f2027c2 100644 --- a/frontend/src/pages/DataCleansing/cleansing.const.tsx +++ b/frontend/src/pages/DataCleansing/cleansing.const.tsx @@ -98,6 +98,11 @@ export const mapTask = (task: CleansingTask) => { createdAt, startedAt, finishedAt, + updatedAt: formatDateTime( + new Date(Math.max(...[ + new Date(task.finishedAt).getTime(), + new Date(task.startedAt).getTime(), + new Date(task.createdAt).getTime()])).toISOString()), icon: , status, duration, diff --git a/frontend/src/pages/DataCleansing/cleansing.model.ts b/frontend/src/pages/DataCleansing/cleansing.model.ts index 58cdce0..a695252 100644 --- a/frontend/src/pages/DataCleansing/cleansing.model.ts +++ b/frontend/src/pages/DataCleansing/cleansing.model.ts @@ -18,10 +18,13 @@ export interface CleansingTask { startedAt: string; progress: { finishedFileNum: number; - process: 100, + succeedFileNum: number; + failedFileNum: number; + process: 100; totalFileNum: number; + successRate: 100; }; - operators: OperatorI[]; + instance: OperatorI[]; createdAt: string; updatedAt: string; finishedAt: string; @@ -70,3 +73,17 @@ export enum TemplateType { AUDIO = "AUDIO", IMAGE2TEXT = "IMAGE2TEXT", } + +export interface CleansingResult { + instanceId: string; + srcFileId: string; + destFileId: string; + srcName: string; + destName: string; + srcType: string; + destType: string; + srcSize: number; + destSize: number; + status: string; + result: string; +} \ No newline at end of file diff --git a/frontend/src/pages/OperatorMarket/Home/components/List.tsx b/frontend/src/pages/OperatorMarket/Home/components/List.tsx index 62e51a1..ea81148 100644 --- a/frontend/src/pages/OperatorMarket/Home/components/List.tsx +++ b/frontend/src/pages/OperatorMarket/Home/components/List.tsx @@ -110,9 +110,6 @@ export function ListView({ operators = [], pagination, operations }) { {operator.name} v{operator.version} - - {getStatusBadge(operator.status).label} - } description={ diff --git a/frontend/src/pages/OperatorMarket/operator.model.ts b/frontend/src/pages/OperatorMarket/operator.model.ts index 5f5b9a8..ccd0164 100644 --- a/frontend/src/pages/OperatorMarket/operator.model.ts +++ b/frontend/src/pages/OperatorMarket/operator.model.ts @@ -33,7 +33,7 @@ export interface OperatorI { tags: string[]; isStar?: boolean; originalId?: string; // 用于标识原始算子ID,便于去重 - categories: number[]; // 分类列表 + categories: string[]; // 分类列表 settings: string; overrides?: { [key: string]: any }; // 用户配置的参数 defaultParams?: { [key: string]: any }; // 默认参数 @@ -50,6 +50,8 @@ export interface CategoryI { count: number; // 该分类下的算子数量 type: string; // e.g., "数据源", "数据清洗", "数据分析", "数据可视化" parentId?: number; // 父分类ID,若无父分类则为null + value: string; + createdAt: string; } export interface CategoryTreeI { diff --git a/frontend/src/routes/routes.ts b/frontend/src/routes/routes.ts index 09ffe0d..38d662b 100644 --- a/frontend/src/routes/routes.ts +++ b/frontend/src/routes/routes.ts @@ -12,7 +12,7 @@ import DatasetDetail from "@/pages/DataManagement/Detail/DatasetDetail"; import DataCleansing from "@/pages/DataCleansing/Home/DataCleansing"; import CleansingTaskCreate from "@/pages/DataCleansing/Create/CreateTask"; import CleansingTaskDetail from "@/pages/DataCleansing/Detail/TaskDetail"; -import CleansingTemplateCreate from "@/pages/DataCleansing/Create/CreateTempate"; +import CleansingTemplateCreate from "@/pages/DataCleansing/Create/CreateTemplate"; import DataAnnotation from "@/pages/DataAnnotation/Home/DataAnnotation"; import AnnotationTaskCreate from "@/pages/DataAnnotation/Create/CreateTask"; @@ -39,6 +39,7 @@ import OrchestrationPage from "@/pages/Orchestration/Orchestration"; import WorkflowEditor from "@/pages/Orchestration/WorkflowEditor"; import { withErrorBoundary } from "@/components/ErrorBoundary"; import AgentPage from "@/pages/Agent/Agent.tsx"; +import CleansingTemplateDetail from "@/pages/DataCleansing/Detail/TemplateDetail.tsx"; import RatioTaskDetail from "@/pages/RatioTask/Detail/RatioTaskDetail"; const router = createBrowserRouter([ @@ -120,6 +121,14 @@ const router = createBrowserRouter([ path: "create-template", Component: CleansingTemplateCreate, }, + { + path: "template-detail/:id", + Component: CleansingTemplateDetail, + }, + { + path: "update-template/:id", + Component: CleansingTemplateCreate, + }, ], }, { diff --git a/runtime/python-executor/datamate/core/base_op.py b/runtime/python-executor/datamate/core/base_op.py index 63a51a2..68202d6 100644 --- a/runtime/python-executor/datamate/core/base_op.py +++ b/runtime/python-executor/datamate/core/base_op.py @@ -160,7 +160,7 @@ class Mapper(BaseOp): sample["execute_status"] = execute_status task_info = TaskInfoPersistence() task_info.persistence_task_info(sample) - return sample + raise e sample["execute_status"] = execute_status # 加载文件成功执行信息到数据库 diff --git a/runtime/python-executor/datamate/scheduler/cmd_task_scheduler.py b/runtime/python-executor/datamate/scheduler/cmd_task_scheduler.py index d5ba7be..011f05a 100644 --- a/runtime/python-executor/datamate/scheduler/cmd_task_scheduler.py +++ b/runtime/python-executor/datamate/scheduler/cmd_task_scheduler.py @@ -10,14 +10,13 @@ from .scheduler import Task, TaskStatus, TaskResult, TaskScheduler class CommandTask(Task): """命令任务包装类""" - def __init__(self, task_id: str, command: str, shell: bool = True, + def __init__(self, task_id: str, command: str, log_path = None, shell: bool = True, timeout: Optional[int] = None, *args, **kwargs): super().__init__(task_id, *args, **kwargs) + self.log_path = log_path self.command = command self.shell = shell self.timeout = timeout - self.stdout = None - self.stderr = None self.return_code = None self._process = None @@ -35,56 +34,54 @@ class CommandTask(Task): self.status = TaskStatus.RUNNING self.started_at = datetime.now() - # 使用 asyncio.create_subprocess_shell 或 create_subprocess_exec - if self.shell: - process = await asyncio.create_subprocess_shell( - self.command, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - **self.kwargs - ) - else: - process = await asyncio.create_subprocess_exec( - *self.command.split(), - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - **self.kwargs - ) - - self._process = process - - # 等待进程完成(带超时) - try: - if self.timeout: - stdout, stderr = await asyncio.wait_for( - process.communicate(), - timeout=self.timeout + with open(self.log_path, 'a') as f: + # 使用 asyncio.create_subprocess_shell 或 create_subprocess_exec + if self.shell: + process = await asyncio.create_subprocess_shell( + self.command, + stdout=f, + stderr=asyncio.subprocess.STDOUT, + **self.kwargs ) else: - stdout, stderr = await process.communicate() + process = await asyncio.create_subprocess_exec( + *self.command.split(), + stdout=f, + stderr=asyncio.subprocess.STDOUT, + **self.kwargs + ) - self.stdout = stdout.decode() if stdout else "" - self.stderr = stderr.decode() if stderr else "" - self.return_code = process.returncode + self._process = process - if self._cancelled: - self.status = TaskStatus.CANCELLED - elif process.returncode == 0: - self.status = TaskStatus.COMPLETED - else: - self.status = TaskStatus.FAILED - - except asyncio.TimeoutError: - # 超时处理 - self._process.terminate() + # 等待进程完成(带超时) try: - await asyncio.wait_for(self._process.wait(), timeout=5.0) - except asyncio.TimeoutError: - self._process.kill() - await self._process.wait() + if self.timeout: + await asyncio.wait_for( + process.wait(), + timeout=self.timeout + ) + else: + await process.wait() + self.return_code = process.returncode - self.status = TaskStatus.FAILED - self.stderr = f"Command timed out after {self.timeout} seconds" + if self._cancelled: + self.status = TaskStatus.CANCELLED + elif process.returncode == 0: + self.status = TaskStatus.COMPLETED + else: + self.status = TaskStatus.FAILED + + except asyncio.TimeoutError: + # 超时处理 + self._process.terminate() + try: + await asyncio.wait_for(self._process.wait(), timeout=5.0) + except asyncio.TimeoutError: + self._process.kill() + await self._process.wait() + + self.status = TaskStatus.FAILED + f.write(f"\nCommand timed out after {self.timeout} seconds\n") except asyncio.CancelledError: # 任务被取消 @@ -101,7 +98,7 @@ class CommandTask(Task): except Exception as e: self.status = TaskStatus.FAILED - self.stderr = str(e) + logger.error(f"Task(id: {self.task_id}) run failed. Cause: {e}") finally: self.completed_at = datetime.now() @@ -127,8 +124,6 @@ class CommandTask(Task): """转换为结果对象""" self.result = { "command": self.command, - "stdout": self.stdout, - "stderr": self.stderr, "return_code": self.return_code, } return super().to_result() @@ -140,10 +135,13 @@ class CommandScheduler(TaskScheduler): def __init__(self, max_concurrent: int = 5): super().__init__(max_concurrent) - async def submit(self, task_id, command: str, shell: bool = True, + async def submit(self, task_id, command: str, log_path = None, shell: bool = True, timeout: Optional[int] = None, **kwargs) -> str: + if log_path is None: + log_path = f"/flow/{task_id}/output.log" + """提交命令任务""" - task = CommandTask(task_id, command, shell, timeout, **kwargs) + task = CommandTask(task_id, command, log_path, shell, timeout, **kwargs) self.tasks[task_id] = task # 使用信号量限制并发 diff --git a/scripts/images/runtime/Dockerfile b/scripts/images/runtime/Dockerfile index bff8c4e..2b1ddf0 100644 --- a/scripts/images/runtime/Dockerfile +++ b/scripts/images/runtime/Dockerfile @@ -1,5 +1,10 @@ FROM ghcr.io/astral-sh/uv:python3.11-bookworm +RUN --mount=type=cache,target=/var/cache/apt \ + --mount=type=cache,target=/var/lib/apt \ + apt update \ + && apt install -y libgl1 libglib2.0-0 vim libmagic1 libreoffice dos2unix + COPY runtime/python-executor /opt/runtime COPY runtime/ops /opt/runtime/datamate/ops COPY runtime/ops/user /opt/runtime/user @@ -7,11 +12,6 @@ COPY scripts/images/runtime/start.sh /opt/runtime/start.sh ENV PYTHONPATH=/opt/runtime/datamate/ -RUN --mount=type=cache,target=/var/cache/apt \ - --mount=type=cache,target=/var/lib/apt \ - apt update \ - && apt install -y libgl1 libglib2.0-0 vim libmagic1 libreoffice dos2unix - WORKDIR /opt/runtime RUN --mount=type=cache,target=/root/.cache/uv \