From b8d7aca8b7f847a91119d5628b43ab7b4ae1d73c Mon Sep 17 00:00:00 2001 From: Vincent <84168298+szc0616@users.noreply.github.com> Date: Wed, 12 Nov 2025 09:34:50 +0800 Subject: [PATCH] =?UTF-8?q?refactor=EF=BC=9A=E9=87=8D=E6=9E=84=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BD=92=E9=9B=86=E9=83=A8=E5=88=86=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=20(#75)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix:配比任务需要能够跳转到目标数据集 * feature:增加配比任务详情接口 * fix:删除不存在的配比详情页面 * fix:使用正式的逻辑来展示标签 * fix:参数默认值去掉多余的- * fix:修复配比任务相关操作 * fix:去除不需要的日志打印和import * feature:数据归集创建时将obs、mysql归集也放出 * refactor:重构数据归集的代码 * refactor:重构数据归集的代码 --- .../application/CollectionTaskService.java | 4 - .../collection/common/enums/SyncMode.java | 2 +- .../collection/common/enums/TemplateType.java | 11 + .../domain/model/entity/CollectionTask.java | 5 +- .../datax/DataxProcessRunner.java | 54 ++--- .../datax/config/BaseConfig.java | 4 + .../datax/config/NasConfig.java | 54 +++++ .../dto/CollectionTaskResponse.java | 3 +- .../dto/CreateCollectionTaskRequest.java | 6 + .../DatasetApplicationService.java | 7 +- .../domain/model/dataset/DatasetFile.java | 3 - .../dto/CollectionTaskDetailResponse.java | 1 + .../DataCollection/Create/CreateTask.tsx | 201 +++++++++++++----- scripts/db/data-collection-init.sql | 2 + 14 files changed, 247 insertions(+), 110 deletions(-) create mode 100644 backend/services/data-collection-service/src/main/java/com/datamate/collection/common/enums/TemplateType.java create mode 100644 backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/config/BaseConfig.java create mode 100644 backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/config/NasConfig.java diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java index 6ee20b8..97d8fdc 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/CollectionTaskService.java @@ -5,12 +5,9 @@ import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.datamate.collection.domain.model.entity.CollectionTask; import com.datamate.collection.domain.model.entity.TaskExecution; -import com.datamate.collection.common.enums.TaskStatus; import com.datamate.collection.domain.repository.CollectionTaskRepository; import com.datamate.collection.common.enums.SyncMode; import com.datamate.common.domain.utils.ChunksSaver; -import com.datamate.datamanagement.application.DatasetApplicationService; -import com.datamate.datamanagement.domain.model.dataset.Dataset; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -25,7 +22,6 @@ import java.util.Objects; @RequiredArgsConstructor public class CollectionTaskService { private final TaskExecutionService taskExecutionService; - private final DatasetApplicationService datasetApplicationService; private final CollectionTaskRepository collectionTaskRepository; @Transactional diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/common/enums/SyncMode.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/common/enums/SyncMode.java index 28399a2..85c961d 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/common/enums/SyncMode.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/common/enums/SyncMode.java @@ -7,6 +7,6 @@ public enum SyncMode { /** 一次性(ONCE) */ ONCE, /// 定时(SCHEDULED) - SCHEDULED; + SCHEDULED } diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/common/enums/TemplateType.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/common/enums/TemplateType.java new file mode 100644 index 0000000..33fd230 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/common/enums/TemplateType.java @@ -0,0 +1,11 @@ +package com.datamate.collection.common.enums; + +/** + * 模板类型枚举 + * + */ +public enum TemplateType { + NAS, + OBS, + MYSQL +} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/CollectionTask.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/CollectionTask.java index 773731b..1c51e00 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/CollectionTask.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/domain/model/entity/CollectionTask.java @@ -3,6 +3,7 @@ package com.datamate.collection.domain.model.entity; import com.baomidou.mybatisplus.annotation.TableName; import com.datamate.collection.common.enums.SyncMode; import com.datamate.collection.common.enums.TaskStatus; +import com.datamate.collection.common.enums.TemplateType; import com.datamate.common.domain.model.base.BaseEntity; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; @@ -24,6 +25,8 @@ import java.util.UUID; public class CollectionTask extends BaseEntity { private String name; private String description; + private TemplateType taskType; // 任务类型 + private String targetPath; // 目标存储路径 private String config; // DataX JSON 配置,包含源端和目标端配置信息 private TaskStatus status; private SyncMode syncMode; // ONCE / SCHEDULED @@ -51,7 +54,7 @@ public class CollectionTask extends BaseEntity { public void initCreateParam() { this.id = UUID.randomUUID().toString(); - this.addPath(); + this.targetPath = "/dataset/local/" + id; this.status = TaskStatus.READY; this.createdAt = LocalDateTime.now(); this.updatedAt = LocalDateTime.now(); diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java index 29ffcd2..e74f16f 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java @@ -1,28 +1,24 @@ package com.datamate.collection.infrastructure.datax; +import com.datamate.collection.common.enums.TemplateType; import com.datamate.collection.domain.model.entity.CollectionTask; import com.datamate.collection.domain.process.ProcessRunner; +import com.datamate.collection.infrastructure.datax.config.NasConfig; import com.datamate.common.infrastructure.exception.BusinessException; import com.datamate.common.infrastructure.exception.SystemErrorCode; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.exec.*; +import org.apache.commons.io.output.TeeOutputStream; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileWriter; -import java.io.IOException; +import java.io.*; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; -import java.util.HashMap; -import java.util.List; -import java.util.Map; @Slf4j @Component @@ -61,10 +57,8 @@ public class DataxProcessRunner implements ProcessRunner { } ExecuteStreamHandler streamHandler = new PumpStreamHandler( - new org.apache.commons.io.output.TeeOutputStream( - new java.io.FileOutputStream(logFile, true), System.out), - new org.apache.commons.io.output.TeeOutputStream( - new java.io.FileOutputStream(logFile, true), System.err) + new TeeOutputStream(new FileOutputStream(logFile, true), System.out), + new TeeOutputStream(new FileOutputStream(logFile, true), System.err) ); executor.setStreamHandler(streamHandler); @@ -92,30 +86,18 @@ public class DataxProcessRunner implements ProcessRunner { private String getJobConfig(CollectionTask task) { try { ObjectMapper objectMapper = new ObjectMapper(); - Map parameter = objectMapper.readValue( - task.getConfig(), - new TypeReference<>() { - } - ); - Map job = new HashMap<>(); - Map content = new HashMap<>(); - Map reader = new HashMap<>(); - reader.put("name", "nfsreader"); - reader.put("parameter", parameter); - content.put("reader", reader); - Map writer = new HashMap<>(); - writer.put("name", "nfswriter"); - writer.put("parameter", parameter); - content.put("writer", writer); - job.put("content", List.of(content)); - Map setting = new HashMap<>(); - Map channel = new HashMap<>(); - channel.put("channel", 2); - setting.put("speed", channel); - job.put("setting", setting); - Map jobConfig = new HashMap<>(); - jobConfig.put("job", job); - return objectMapper.writeValueAsString(jobConfig); + TemplateType templateType = task.getTaskType(); + switch (templateType) { + case NAS: + // NAS 特殊处理 + // 移除 templateType 字段 + NasConfig nasConfig = objectMapper.readValue(task.getConfig(), NasConfig.class); + return nasConfig.toJobConfig(objectMapper, task); + case OBS: + case MYSQL: + default: + throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR, "Unsupported template type: " + templateType); + } } catch (Exception e) { log.error("Failed to parse task config", e); throw new RuntimeException("Failed to parse task config", e); diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/config/BaseConfig.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/config/BaseConfig.java new file mode 100644 index 0000000..ea5ecf0 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/config/BaseConfig.java @@ -0,0 +1,4 @@ +package com.datamate.collection.infrastructure.datax.config; + +public interface BaseConfig { +} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/config/NasConfig.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/config/NasConfig.java new file mode 100644 index 0000000..c91398e --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/config/NasConfig.java @@ -0,0 +1,54 @@ +package com.datamate.collection.infrastructure.datax.config; + +import com.datamate.collection.domain.model.entity.CollectionTask; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.Getter; +import lombok.Setter; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Getter +@Setter +public class NasConfig implements BaseConfig{ + private String ip; + + private String path; + + private List files; + + /** + * 将当前 NAS 配置构造成 DataX 所需的 job JSON 字符串。 + */ + public String toJobConfig(ObjectMapper objectMapper, CollectionTask task) throws Exception { + Map parameter = new HashMap<>(); + if (ip != null) parameter.put("ip", ip); + if (path != null) parameter.put("path", path); + if (files != null) parameter.put("files", files); + parameter.put("destPath", task.getTargetPath()); + + Map job = new HashMap<>(); + Map content = new HashMap<>(); + Map reader = new HashMap<>(); + reader.put("name", "nfsreader"); + reader.put("parameter", parameter); + content.put("reader", reader); + + Map writer = new HashMap<>(); + writer.put("name", "nfswriter"); + writer.put("parameter", parameter); + content.put("writer", writer); + + job.put("content", List.of(content)); + Map setting = new HashMap<>(); + Map channel = new HashMap<>(); + channel.put("channel", 2); + setting.put("speed", channel); + job.put("setting", setting); + + Map jobConfig = new HashMap<>(); + jobConfig.put("job", job); + return objectMapper.writeValueAsString(jobConfig); + } +} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskResponse.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskResponse.java index 2b0801e..caf913e 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskResponse.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CollectionTaskResponse.java @@ -29,7 +29,8 @@ public class CollectionTaskResponse { private String description; - @Valid + private String targetPath; + private Map config = new HashMap<>(); private TaskStatus status; diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CreateCollectionTaskRequest.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CreateCollectionTaskRequest.java index 74a2f2c..ca7e9bb 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CreateCollectionTaskRequest.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/dto/CreateCollectionTaskRequest.java @@ -1,6 +1,7 @@ package com.datamate.collection.interfaces.dto; import com.datamate.collection.common.enums.SyncMode; +import com.datamate.collection.common.enums.TemplateType; import com.datamate.datamanagement.interfaces.dto.CreateDatasetRequest; import com.fasterxml.jackson.annotation.JsonProperty; @@ -35,6 +36,11 @@ public class CreateCollectionTaskRequest { @JsonProperty("description") private String description; + @NotNull + @Schema(name = "taskType", description = "任务类型", requiredMode = Schema.RequiredMode.REQUIRED) + @JsonProperty("taskType") + private TemplateType taskType; + @Valid @NotNull @Schema(name = "config", description = "归集配置,包含源端和目标端配置信息", requiredMode = Schema.RequiredMode.REQUIRED) diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java index 8c9bca5..f4c61d8 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java @@ -285,12 +285,7 @@ public class DatasetApplicationService { return Collections.emptyList(); } log.info("获取到归集任务详情: {}", taskDetail); - LocalCollectionConfig config = parseTaskConfig(taskDetail.getConfig()); - if (config == null) { - log.warn("解析任务配置失败,任务ID: {}", dataSourceId); - return Collections.emptyList(); - } - return config.getFilePaths(); + return Collections.singletonList(taskDetail.getTargetPath()); } /** diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/domain/model/dataset/DatasetFile.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/domain/model/dataset/DatasetFile.java index 608c422..e0df444 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/domain/model/dataset/DatasetFile.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/domain/model/dataset/DatasetFile.java @@ -6,10 +6,8 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.*; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import java.time.LocalDateTime; -import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -50,7 +48,6 @@ public class DatasetFile { ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(tags, new TypeReference>() {}); } catch (Exception e) { - log.error(e.getMessage(), e); return Collections.emptyList(); } } diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/client/dto/CollectionTaskDetailResponse.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/client/dto/CollectionTaskDetailResponse.java index 5e38d8e..e5f86b2 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/client/dto/CollectionTaskDetailResponse.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/infrastructure/client/dto/CollectionTaskDetailResponse.java @@ -13,6 +13,7 @@ public class CollectionTaskDetailResponse { private String id; private String name; private String description; + private String targetPath; private Map config; private String status; private String syncMode; diff --git a/frontend/src/pages/DataCollection/Create/CreateTask.tsx b/frontend/src/pages/DataCollection/Create/CreateTask.tsx index 8284e49..ed208af 100644 --- a/frontend/src/pages/DataCollection/Create/CreateTask.tsx +++ b/frontend/src/pages/DataCollection/Create/CreateTask.tsx @@ -14,7 +14,7 @@ const { TextArea } = Input; const defaultTemplates = [ { - id: "nas", + id: "NAS", name: "NAS到本地", description: "从NAS文件系统导入数据到本地文件系统", config: { @@ -23,7 +23,7 @@ const defaultTemplates = [ }, }, { - id: "obs", + id: "OBS", name: "OBS到本地", description: "从OBS文件系统导入数据到本地文件系统", config: { @@ -32,11 +32,11 @@ const defaultTemplates = [ }, }, { - id: "web", - name: "Web到本地", - description: "从Web URL导入数据到本地文件系统", + id: "MYSQL", + name: "Mysql到本地", + description: "从Mysql中导入数据到本地文件系统", config: { - reader: "webreader", + reader: "mysqlreader", writer: "localwriter", }, }, @@ -45,9 +45,9 @@ const defaultTemplates = [ const syncModeOptions = Object.values(SyncModeMap); enum TemplateType { - NAS = "nas", - OBS = "obs", - WEB = "web", + NAS = "NAS", + OBS = "OBS", + MYSQL = "MYSQL", } export default function CollectionTaskCreate() { @@ -58,16 +58,22 @@ export default function CollectionTaskCreate() { const [templateType, setTemplateType] = useState<"default" | "custom">( "default" ); - const [selectedTemplate, setSelectedTemplate] = useState("nas"); + // 默认模板类型设为 NAS + const [selectedTemplate, setSelectedTemplate] = useState( + TemplateType.NAS + ); const [customConfig, setCustomConfig] = useState(""); - const [newTask, setNewTask] = useState({ + // 将 newTask 设为 any,并初始化 config.templateType 为 NAS + const [newTask, setNewTask] = useState({ name: "", description: "", syncMode: SyncMode.ONCE, cronExpression: "", maxRetries: 10, - dataset: {}, + dataset: null, + config: { templateType: TemplateType.NAS }, + createDataset: false, }); const [scheduleExpression, setScheduleExpression] = useState({ type: SyncMode.SCHEDULED, @@ -79,7 +85,7 @@ export default function CollectionTaskCreate() { const handleSubmit = async () => { try { - const formData = await form.validateFields(); + await form.validateFields(); if (templateType === "default" && !selectedTemplate) { window.alert("请选择默认模板"); return; @@ -88,8 +94,21 @@ export default function CollectionTaskCreate() { window.alert("请填写自定义配置"); return; } - // Create task logic here - await createTaskUsingPost(newTask); + + // 构建最终 payload,不依赖异步 setState + const payload = { + ...newTask, + taskType: + templateType === "default" ? selectedTemplate : "CUSTOM", + config: { + ...((newTask && newTask.config) || {}), + ...(templateType === "custom" ? { dataxJson: customConfig } : {}), + }, + }; + + console.log("创建任务 payload:", payload); + + await createTaskUsingPost(payload); message.success("任务创建成功"); navigate("/data/collection"); } catch (error) { @@ -192,32 +211,47 @@ export default function CollectionTaskCreate() { */} {templateType === "default" && ( <> - {/* -
- {defaultTemplates.map((template) => ( -
setSelectedTemplate(template.id)} - > -
{template.name}
-
- {template.description} + { + +
+ {defaultTemplates.map((template) => ( +
{ + setSelectedTemplate(template.id as TemplateType); + // 使用函数式更新,合并之前的 config + setNewTask((prev: any) => ({ + ...prev, + config: { + templateType: template.id, + }, + })); + // 同步表单显示 + form.setFieldsValue({ + config: { templateType: template.id }, + }); + }} + > +
{template.name}
+
+ {template.description} +
+
+ {template.config.reader} → {template.config.writer} +
-
- {template.config.reader} → {template.config.writer} -
-
- ))} -
- */} + ))} +
+ + } {/* nas import */} {selectedTemplate === TemplateType.NAS && ( -
+
@@ -256,21 +290,21 @@ export default function CollectionTaskCreate() { /> @@ -282,6 +316,54 @@ export default function CollectionTaskCreate() {
)} + + {/* mysql import */} + {selectedTemplate === TemplateType.MYSQL && ( +
+ + + + + + + + + + + + + + + + + + +
+ )} )} @@ -313,15 +395,17 @@ export default function CollectionTaskCreate() { value={isCreateDataset} onChange={(e) => { const value = e.target.value; - if (value === false) { - form.setFieldsValue({ - dataset: {}, - }); - setNewTask({ - ...newTask, - dataset: {}, - }); + let datasetInit = null; + if (value === true) { + datasetInit = {}; } + form.setFieldsValue({ + dataset: datasetInit, + }); + setNewTask((prev: any) => ({ + ...prev, + dataset: datasetInit, + })); setIsCreateDataset(e.target.value); }} > @@ -339,13 +423,13 @@ export default function CollectionTaskCreate() { { - setNewTask({ - ...newTask, + setNewTask((prev: any) => ({ + ...prev, dataset: { - ...newTask.dataset, + ...(prev.dataset || {}), name: e.target.value, }, - }); + })); }} /> @@ -356,15 +440,16 @@ export default function CollectionTaskCreate() { > { form.setFieldValue(["dataset", "datasetType"], type); - setNewTask({ - ...newTask, + setNewTask((prev: any) => ({ + ...prev, dataset: { + ...(prev.dataset || {}), datasetType: type as DatasetSubType, }, - }); + })); }} /> diff --git a/scripts/db/data-collection-init.sql b/scripts/db/data-collection-init.sql index 62a611a..d28a109 100644 --- a/scripts/db/data-collection-init.sql +++ b/scripts/db/data-collection-init.sql @@ -46,6 +46,8 @@ CREATE TABLE t_dc_collection_tasks ( name VARCHAR(255) NOT NULL COMMENT '任务名称', description TEXT COMMENT '任务描述', sync_mode VARCHAR(20) DEFAULT 'ONCE' COMMENT '同步模式:ONCE/SCHEDULED', + task_type VARCHAR(20) DEFAULT 'NAS' COMMENT '任务类型:NAS/OBS/MYSQL/CUSTOM', + target_path VARCHAR(1000) DEFAULT '' COMMENT '目标存储路径', config TEXT NOT NULL COMMENT '归集配置(DataX配置),包含源端和目标端配置信息', schedule_expression VARCHAR(255) COMMENT 'Cron调度表达式', status VARCHAR(20) DEFAULT 'DRAFT' COMMENT '任务状态:DRAFT/READY/RUNNING/SUCCESS/FAILED/STOPPED',