From 2b09c7dfd1ce0d1afeb472a419be2c2ffd16fd4c Mon Sep 17 00:00:00 2001 From: Vincent <84168298+szc0616@users.noreply.github.com> Date: Wed, 12 Nov 2025 17:05:31 +0800 Subject: [PATCH] =?UTF-8?q?feature=EF=BC=9Amysql=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=BA=93=E5=BD=92=E9=9B=86=E4=B8=BAcsv=E6=96=87=E4=BB=B6=20(#7?= =?UTF-8?q?6)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix:配比任务需要能够跳转到目标数据集 * feature:增加配比任务详情接口 * fix:删除不存在的配比详情页面 * fix:使用正式的逻辑来展示标签 * fix:参数默认值去掉多余的- * fix:修复配比任务相关操作 * fix:去除不需要的日志打印和import * feature:数据归集创建时将obs、mysql归集也放出 * refactor:重构数据归集的代码 * refactor:重构数据归集的代码 * feature:增加实现mysql归集为csv文件 --- .../application/TaskExecutionService.java | 6 +- .../datax/DataxProcessRunner.java | 47 ++++++++++-- .../datax/config/MysqlConfig.java | 73 +++++++++++++++++++ .../converter/CollectionTaskConverter.java | 3 +- .../DatasetApplicationService.java | 15 ---- .../DataCollection/Create/CreateTask.tsx | 36 ++++----- runtime/datax/package.xml | 16 ++-- runtime/datax/pom.xml | 4 +- 8 files changed, 146 insertions(+), 54 deletions(-) create mode 100644 backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/config/MysqlConfig.java diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java index 2d4c874..6d27a65 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/application/TaskExecutionService.java @@ -1,5 +1,6 @@ package com.datamate.collection.application; +import com.datamate.collection.common.enums.TemplateType; import com.datamate.collection.domain.model.entity.CollectionTask; import com.datamate.collection.domain.model.entity.TaskExecution; import com.datamate.collection.common.enums.TaskStatus; @@ -9,6 +10,7 @@ import com.datamate.collection.domain.repository.TaskExecutionRepository; import com.datamate.datamanagement.application.DatasetApplicationService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -50,7 +52,9 @@ public class TaskExecutionService { executionRepository.completeExecution(executionId, TaskStatus.SUCCESS.name(), LocalDateTime.now(), 0, 0L, 0L, 0L, null); collectionTaskRepository.updateStatus(task.getId(), TaskStatus.SUCCESS.name()); - datasetApplicationService.processDataSourceAsync(datasetId, task.getId()); + if (StringUtils.isNotBlank(datasetId)) { + datasetApplicationService.processDataSourceAsync(datasetId, task.getId()); + } } catch (Exception e) { log.error("DataX execution failed", e); executionRepository.completeExecution(executionId, TaskStatus.FAILED.name(), LocalDateTime.now(), diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java index e74f16f..41858a0 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/DataxProcessRunner.java @@ -1,8 +1,10 @@ +// java package com.datamate.collection.infrastructure.datax; import com.datamate.collection.common.enums.TemplateType; import com.datamate.collection.domain.model.entity.CollectionTask; import com.datamate.collection.domain.process.ProcessRunner; +import com.datamate.collection.infrastructure.datax.config.MysqlConfig; import com.datamate.collection.infrastructure.datax.config.NasConfig; import com.datamate.common.infrastructure.exception.BusinessException; import com.datamate.common.infrastructure.exception.SystemErrorCode; @@ -15,10 +17,10 @@ import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; import java.io.*; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; +import java.nio.file.*; import java.time.Duration; +import java.util.*; +import java.util.regex.Pattern; @Slf4j @Component @@ -30,7 +32,10 @@ public class DataxProcessRunner implements ProcessRunner { @Override public int runJob(CollectionTask task, String executionId, int timeoutSeconds) throws Exception { Path job = buildJobFile(task); - return runJob(job.toFile(), executionId, Duration.ofSeconds(timeoutSeconds)); + int code = runJob(job.toFile(), executionId, Duration.ofSeconds(timeoutSeconds)); + // 任务成功后做后处理(仅针对 MYSQL 类型) + postProcess(task); + return code; } private int runJob(File jobFile, String executionId, Duration timeout) throws Exception { @@ -90,11 +95,12 @@ public class DataxProcessRunner implements ProcessRunner { switch (templateType) { case NAS: // NAS 特殊处理 - // 移除 templateType 字段 NasConfig nasConfig = objectMapper.readValue(task.getConfig(), NasConfig.class); return nasConfig.toJobConfig(objectMapper, task); case OBS: case MYSQL: + MysqlConfig mysqlConfig = objectMapper.readValue(task.getConfig(), MysqlConfig.class); + return mysqlConfig.toJobConfig(objectMapper, task); default: throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR, "Unsupported template type: " + templateType); } @@ -103,4 +109,35 @@ public class DataxProcessRunner implements ProcessRunner { throw new RuntimeException("Failed to parse task config", e); } } + + private void postProcess(CollectionTask task) throws IOException { + if (task.getTaskType() != TemplateType.MYSQL) { + return; + } + String targetPath = task.getTargetPath(); + // 将targetPath下所有不以.csv结尾的文件修改为以.csv结尾 + Path dir = Paths.get(targetPath); + if (!Files.exists(dir) || !Files.isDirectory(dir)) { + log.info("Target path {} does not exist or is not a directory for task {}, skip post processing.", targetPath, task.getId()); + return; + } + + try (DirectoryStream stream = Files.newDirectoryStream(dir)) { + for (Path path : stream) { + if (!Files.isRegularFile(path)) continue; + String name = path.getFileName().toString(); + if (name.toLowerCase().endsWith(".csv")) continue; + + Path target = dir.resolve(name + ".csv"); + try { + Files.move(path, target, StandardCopyOption.REPLACE_EXISTING); + log.info("Renamed file for task {}: {} -> {}", task.getId(), name, target.getFileName().toString()); + } catch (IOException ex) { + log.warn("Failed to rename file {} for task {}: {}", path, task.getId(), ex.getMessage(), ex); + } + } + } catch (IOException ioe) { + log.warn("Error scanning target directory {} for task {}: {}", targetPath, task.getId(), ioe.getMessage(), ioe); + } + } } diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/config/MysqlConfig.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/config/MysqlConfig.java new file mode 100644 index 0000000..4ba0067 --- /dev/null +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/infrastructure/datax/config/MysqlConfig.java @@ -0,0 +1,73 @@ +package com.datamate.collection.infrastructure.datax.config; + +import com.datamate.collection.domain.model.entity.CollectionTask; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.Getter; +import lombok.Setter; +import org.apache.commons.collections4.CollectionUtils; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Getter +@Setter +public class MysqlConfig { + private String jdbcUrl; + + private String username; + + private String password; + + private String querySql; + + private List headers; + + /** + * 将当前 MYSQL 配置构造成 DataX 所需的 job JSON 字符串。 + */ + public String toJobConfig(ObjectMapper objectMapper, CollectionTask task) throws Exception { + Map mysqlParameter = new HashMap<>(); + Map connection = new HashMap<>(); + if (username != null) mysqlParameter.put("username", username); + if (password != null) mysqlParameter.put("password", password); + if (jdbcUrl != null) connection.put("jdbcUrl", Collections.singletonList(jdbcUrl)); + if (querySql != null) connection.put("querySql", Collections.singletonList(querySql)); + mysqlParameter.put("connection", Collections.singletonList(connection)); + + Map job = new HashMap<>(); + Map content = new HashMap<>(); + Map reader = new HashMap<>(); + reader.put("name", "mysqlreader"); + reader.put("parameter", mysqlParameter); + content.put("reader", reader); + + Map writer = new HashMap<>(); + Map writerParameter = new HashMap<>(); + writer.put("name", "txtfilewriter"); + if (CollectionUtils.isNotEmpty(headers)) { + writerParameter.put("header", headers); + } + writerParameter.put("path", task.getTargetPath()); + writerParameter.put("fileName", "collectionResult"); + writerParameter.put("writeMode", "truncate"); + writerParameter.put("dateFormat", "yyyy-MM-dd HH:mm:ss"); + writerParameter.put("fileFormat", "csv"); + writerParameter.put("encoding", "UTF-8"); + writerParameter.put("fieldDelimiter", ","); + writer.put("parameter", writerParameter); + content.put("writer", writer); + + job.put("content", List.of(content)); + Map setting = new HashMap<>(); + Map channel = new HashMap<>(); + channel.put("channel", 1); + setting.put("speed", channel); + job.put("setting", setting); + + Map jobConfig = new HashMap<>(); + jobConfig.put("job", job); + return objectMapper.writeValueAsString(jobConfig); + } +} diff --git a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/converter/CollectionTaskConverter.java b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/converter/CollectionTaskConverter.java index 2b45383..b549319 100644 --- a/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/converter/CollectionTaskConverter.java +++ b/backend/services/data-collection-service/src/main/java/com/datamate/collection/interfaces/converter/CollectionTaskConverter.java @@ -41,8 +41,7 @@ public interface CollectionTaskConverter { default Map parseJsonToMap(String json) { try { ObjectMapper objectMapper = new ObjectMapper(); - return - objectMapper.readValue(json, Map.class); + return objectMapper.readValue(json, Map.class); } catch (Exception e) { throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER); } diff --git a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java index f4c61d8..0c13bb0 100644 --- a/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java +++ b/backend/services/data-management-service/src/main/java/com/datamate/datamanagement/application/DatasetApplicationService.java @@ -287,19 +287,4 @@ public class DatasetApplicationService { log.info("获取到归集任务详情: {}", taskDetail); return Collections.singletonList(taskDetail.getTargetPath()); } - - /** - * 解析任务配置 - */ - private LocalCollectionConfig parseTaskConfig(Map configMap) { - try { - if (configMap == null || configMap.isEmpty()) { - return null; - } - return objectMapper.convertValue(configMap, LocalCollectionConfig.class); - } catch (Exception e) { - log.error("解析任务配置失败", e); - return null; - } - } } diff --git a/frontend/src/pages/DataCollection/Create/CreateTask.tsx b/frontend/src/pages/DataCollection/Create/CreateTask.tsx index ed208af..8b23b6d 100644 --- a/frontend/src/pages/DataCollection/Create/CreateTask.tsx +++ b/frontend/src/pages/DataCollection/Create/CreateTask.tsx @@ -321,21 +321,15 @@ export default function CollectionTaskCreate() { {selectedTemplate === TemplateType.MYSQL && (
- + - - - @@ -346,22 +340,22 @@ export default function CollectionTaskCreate() { rules={[{ required: true, message: "请输入密码" }]} label="密码" > - + - - - + +