refactor:重构数据归集部分代码 (#75)

* fix:配比任务需要能够跳转到目标数据集

* feature:增加配比任务详情接口

* fix:删除不存在的配比详情页面

* fix:使用正式的逻辑来展示标签

* fix:参数默认值去掉多余的-

* fix:修复配比任务相关操作

* fix:去除不需要的日志打印和import

* feature:数据归集创建时将obs、mysql归集也放出

* refactor:重构数据归集的代码

* refactor:重构数据归集的代码
This commit is contained in:
Vincent
2025-11-12 09:34:50 +08:00
committed by GitHub
parent aa01f52535
commit b8d7aca8b7
14 changed files with 247 additions and 110 deletions

View File

@@ -5,12 +5,9 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.datamate.collection.domain.model.entity.CollectionTask;
import com.datamate.collection.domain.model.entity.TaskExecution;
import com.datamate.collection.common.enums.TaskStatus;
import com.datamate.collection.domain.repository.CollectionTaskRepository;
import com.datamate.collection.common.enums.SyncMode;
import com.datamate.common.domain.utils.ChunksSaver;
import com.datamate.datamanagement.application.DatasetApplicationService;
import com.datamate.datamanagement.domain.model.dataset.Dataset;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@@ -25,7 +22,6 @@ import java.util.Objects;
@RequiredArgsConstructor
public class CollectionTaskService {
private final TaskExecutionService taskExecutionService;
private final DatasetApplicationService datasetApplicationService;
private final CollectionTaskRepository collectionTaskRepository;
@Transactional

View File

@@ -7,6 +7,6 @@ public enum SyncMode {
/** 一次性(ONCE) */
ONCE,
/// 定时(SCHEDULED)
SCHEDULED;
SCHEDULED
}

View File

@@ -0,0 +1,11 @@
package com.datamate.collection.common.enums;
/**
* 模板类型枚举
*
*/
public enum TemplateType {
NAS,
OBS,
MYSQL
}

View File

@@ -3,6 +3,7 @@ package com.datamate.collection.domain.model.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import com.datamate.collection.common.enums.SyncMode;
import com.datamate.collection.common.enums.TaskStatus;
import com.datamate.collection.common.enums.TemplateType;
import com.datamate.common.domain.model.base.BaseEntity;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -24,6 +25,8 @@ import java.util.UUID;
public class CollectionTask extends BaseEntity<String> {
private String name;
private String description;
private TemplateType taskType; // 任务类型
private String targetPath; // 目标存储路径
private String config; // DataX JSON 配置,包含源端和目标端配置信息
private TaskStatus status;
private SyncMode syncMode; // ONCE / SCHEDULED
@@ -51,7 +54,7 @@ public class CollectionTask extends BaseEntity<String> {
public void initCreateParam() {
this.id = UUID.randomUUID().toString();
this.addPath();
this.targetPath = "/dataset/local/" + id;
this.status = TaskStatus.READY;
this.createdAt = LocalDateTime.now();
this.updatedAt = LocalDateTime.now();

View File

@@ -1,28 +1,24 @@
package com.datamate.collection.infrastructure.datax;
import com.datamate.collection.common.enums.TemplateType;
import com.datamate.collection.domain.model.entity.CollectionTask;
import com.datamate.collection.domain.process.ProcessRunner;
import com.datamate.collection.infrastructure.datax.config.NasConfig;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.SystemErrorCode;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.exec.*;
import org.apache.commons.io.output.TeeOutputStream;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
@@ -61,10 +57,8 @@ public class DataxProcessRunner implements ProcessRunner {
}
ExecuteStreamHandler streamHandler = new PumpStreamHandler(
new org.apache.commons.io.output.TeeOutputStream(
new java.io.FileOutputStream(logFile, true), System.out),
new org.apache.commons.io.output.TeeOutputStream(
new java.io.FileOutputStream(logFile, true), System.err)
new TeeOutputStream(new FileOutputStream(logFile, true), System.out),
new TeeOutputStream(new FileOutputStream(logFile, true), System.err)
);
executor.setStreamHandler(streamHandler);
@@ -92,30 +86,18 @@ public class DataxProcessRunner implements ProcessRunner {
private String getJobConfig(CollectionTask task) {
try {
ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> parameter = objectMapper.readValue(
task.getConfig(),
new TypeReference<>() {
}
);
Map<String, Object> job = new HashMap<>();
Map<String, Object> content = new HashMap<>();
Map<String, Object> reader = new HashMap<>();
reader.put("name", "nfsreader");
reader.put("parameter", parameter);
content.put("reader", reader);
Map<String, Object> writer = new HashMap<>();
writer.put("name", "nfswriter");
writer.put("parameter", parameter);
content.put("writer", writer);
job.put("content", List.of(content));
Map<String, Object> setting = new HashMap<>();
Map<String, Object> channel = new HashMap<>();
channel.put("channel", 2);
setting.put("speed", channel);
job.put("setting", setting);
Map<String, Object> jobConfig = new HashMap<>();
jobConfig.put("job", job);
return objectMapper.writeValueAsString(jobConfig);
TemplateType templateType = task.getTaskType();
switch (templateType) {
case NAS:
// NAS 特殊处理
// 移除 templateType 字段
NasConfig nasConfig = objectMapper.readValue(task.getConfig(), NasConfig.class);
return nasConfig.toJobConfig(objectMapper, task);
case OBS:
case MYSQL:
default:
throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR, "Unsupported template type: " + templateType);
}
} catch (Exception e) {
log.error("Failed to parse task config", e);
throw new RuntimeException("Failed to parse task config", e);

View File

@@ -0,0 +1,4 @@
package com.datamate.collection.infrastructure.datax.config;
public interface BaseConfig {
}

View File

@@ -0,0 +1,54 @@
package com.datamate.collection.infrastructure.datax.config;
import com.datamate.collection.domain.model.entity.CollectionTask;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.Setter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Getter
@Setter
public class NasConfig implements BaseConfig{
private String ip;
private String path;
private List<String> files;
/**
* 将当前 NAS 配置构造成 DataX 所需的 job JSON 字符串。
*/
public String toJobConfig(ObjectMapper objectMapper, CollectionTask task) throws Exception {
Map<String, Object> parameter = new HashMap<>();
if (ip != null) parameter.put("ip", ip);
if (path != null) parameter.put("path", path);
if (files != null) parameter.put("files", files);
parameter.put("destPath", task.getTargetPath());
Map<String, Object> job = new HashMap<>();
Map<String, Object> content = new HashMap<>();
Map<String, Object> reader = new HashMap<>();
reader.put("name", "nfsreader");
reader.put("parameter", parameter);
content.put("reader", reader);
Map<String, Object> writer = new HashMap<>();
writer.put("name", "nfswriter");
writer.put("parameter", parameter);
content.put("writer", writer);
job.put("content", List.of(content));
Map<String, Object> setting = new HashMap<>();
Map<String, Object> channel = new HashMap<>();
channel.put("channel", 2);
setting.put("speed", channel);
job.put("setting", setting);
Map<String, Object> jobConfig = new HashMap<>();
jobConfig.put("job", job);
return objectMapper.writeValueAsString(jobConfig);
}
}

View File

@@ -29,7 +29,8 @@ public class CollectionTaskResponse {
private String description;
@Valid
private String targetPath;
private Map<String, Object> config = new HashMap<>();
private TaskStatus status;

View File

@@ -1,6 +1,7 @@
package com.datamate.collection.interfaces.dto;
import com.datamate.collection.common.enums.SyncMode;
import com.datamate.collection.common.enums.TemplateType;
import com.datamate.datamanagement.interfaces.dto.CreateDatasetRequest;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -35,6 +36,11 @@ public class CreateCollectionTaskRequest {
@JsonProperty("description")
private String description;
@NotNull
@Schema(name = "taskType", description = "任务类型", requiredMode = Schema.RequiredMode.REQUIRED)
@JsonProperty("taskType")
private TemplateType taskType;
@Valid
@NotNull
@Schema(name = "config", description = "归集配置,包含源端和目标端配置信息", requiredMode = Schema.RequiredMode.REQUIRED)

View File

@@ -285,12 +285,7 @@ public class DatasetApplicationService {
return Collections.emptyList();
}
log.info("获取到归集任务详情: {}", taskDetail);
LocalCollectionConfig config = parseTaskConfig(taskDetail.getConfig());
if (config == null) {
log.warn("解析任务配置失败,任务ID: {}", dataSourceId);
return Collections.emptyList();
}
return config.getFilePaths();
return Collections.singletonList(taskDetail.getTargetPath());
}
/**

View File

@@ -6,10 +6,8 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -50,7 +48,6 @@ public class DatasetFile {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(tags, new TypeReference<List<FileTag>>() {});
} catch (Exception e) {
log.error(e.getMessage(), e);
return Collections.emptyList();
}
}

View File

@@ -13,6 +13,7 @@ public class CollectionTaskDetailResponse {
private String id;
private String name;
private String description;
private String targetPath;
private Map<String, Object> config;
private String status;
private String syncMode;