feature:mysql数据库归集为csv文件 (#76)

* fix:配比任务需要能够跳转到目标数据集

* feature:增加配比任务详情接口

* fix:删除不存在的配比详情页面

* fix:使用正式的逻辑来展示标签

* fix:参数默认值去掉多余的-

* fix:修复配比任务相关操作

* fix:去除不需要的日志打印和import

* feature:数据归集创建时将obs、mysql归集也放出

* refactor:重构数据归集的代码

* refactor:重构数据归集的代码

* feature:增加实现mysql归集为csv文件
This commit is contained in:
Vincent
2025-11-12 17:05:31 +08:00
committed by GitHub
parent b8d7aca8b7
commit 2b09c7dfd1
8 changed files with 146 additions and 54 deletions

View File

@@ -1,5 +1,6 @@
package com.datamate.collection.application; package com.datamate.collection.application;
import com.datamate.collection.common.enums.TemplateType;
import com.datamate.collection.domain.model.entity.CollectionTask; import com.datamate.collection.domain.model.entity.CollectionTask;
import com.datamate.collection.domain.model.entity.TaskExecution; import com.datamate.collection.domain.model.entity.TaskExecution;
import com.datamate.collection.common.enums.TaskStatus; import com.datamate.collection.common.enums.TaskStatus;
@@ -9,6 +10,7 @@ import com.datamate.collection.domain.repository.TaskExecutionRepository;
import com.datamate.datamanagement.application.DatasetApplicationService; import com.datamate.datamanagement.application.DatasetApplicationService;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@@ -50,7 +52,9 @@ public class TaskExecutionService {
executionRepository.completeExecution(executionId, TaskStatus.SUCCESS.name(), LocalDateTime.now(), executionRepository.completeExecution(executionId, TaskStatus.SUCCESS.name(), LocalDateTime.now(),
0, 0L, 0L, 0L, null); 0, 0L, 0L, 0L, null);
collectionTaskRepository.updateStatus(task.getId(), TaskStatus.SUCCESS.name()); collectionTaskRepository.updateStatus(task.getId(), TaskStatus.SUCCESS.name());
if (StringUtils.isNotBlank(datasetId)) {
datasetApplicationService.processDataSourceAsync(datasetId, task.getId()); datasetApplicationService.processDataSourceAsync(datasetId, task.getId());
}
} catch (Exception e) { } catch (Exception e) {
log.error("DataX execution failed", e); log.error("DataX execution failed", e);
executionRepository.completeExecution(executionId, TaskStatus.FAILED.name(), LocalDateTime.now(), executionRepository.completeExecution(executionId, TaskStatus.FAILED.name(), LocalDateTime.now(),

View File

@@ -1,8 +1,10 @@
// java
package com.datamate.collection.infrastructure.datax; package com.datamate.collection.infrastructure.datax;
import com.datamate.collection.common.enums.TemplateType; import com.datamate.collection.common.enums.TemplateType;
import com.datamate.collection.domain.model.entity.CollectionTask; import com.datamate.collection.domain.model.entity.CollectionTask;
import com.datamate.collection.domain.process.ProcessRunner; import com.datamate.collection.domain.process.ProcessRunner;
import com.datamate.collection.infrastructure.datax.config.MysqlConfig;
import com.datamate.collection.infrastructure.datax.config.NasConfig; import com.datamate.collection.infrastructure.datax.config.NasConfig;
import com.datamate.common.infrastructure.exception.BusinessException; import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.SystemErrorCode; import com.datamate.common.infrastructure.exception.SystemErrorCode;
@@ -15,10 +17,10 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.io.*; import java.io.*;
import java.nio.file.Files; import java.nio.file.*;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration; import java.time.Duration;
import java.util.*;
import java.util.regex.Pattern;
@Slf4j @Slf4j
@Component @Component
@@ -30,7 +32,10 @@ public class DataxProcessRunner implements ProcessRunner {
@Override @Override
public int runJob(CollectionTask task, String executionId, int timeoutSeconds) throws Exception { public int runJob(CollectionTask task, String executionId, int timeoutSeconds) throws Exception {
Path job = buildJobFile(task); Path job = buildJobFile(task);
return runJob(job.toFile(), executionId, Duration.ofSeconds(timeoutSeconds)); int code = runJob(job.toFile(), executionId, Duration.ofSeconds(timeoutSeconds));
// 任务成功后做后处理(仅针对 MYSQL 类型)
postProcess(task);
return code;
} }
private int runJob(File jobFile, String executionId, Duration timeout) throws Exception { private int runJob(File jobFile, String executionId, Duration timeout) throws Exception {
@@ -90,11 +95,12 @@ public class DataxProcessRunner implements ProcessRunner {
switch (templateType) { switch (templateType) {
case NAS: case NAS:
// NAS 特殊处理 // NAS 特殊处理
// 移除 templateType 字段
NasConfig nasConfig = objectMapper.readValue(task.getConfig(), NasConfig.class); NasConfig nasConfig = objectMapper.readValue(task.getConfig(), NasConfig.class);
return nasConfig.toJobConfig(objectMapper, task); return nasConfig.toJobConfig(objectMapper, task);
case OBS: case OBS:
case MYSQL: case MYSQL:
MysqlConfig mysqlConfig = objectMapper.readValue(task.getConfig(), MysqlConfig.class);
return mysqlConfig.toJobConfig(objectMapper, task);
default: default:
throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR, "Unsupported template type: " + templateType); throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR, "Unsupported template type: " + templateType);
} }
@@ -103,4 +109,35 @@ public class DataxProcessRunner implements ProcessRunner {
throw new RuntimeException("Failed to parse task config", e); throw new RuntimeException("Failed to parse task config", e);
} }
} }
private void postProcess(CollectionTask task) throws IOException {
if (task.getTaskType() != TemplateType.MYSQL) {
return;
}
String targetPath = task.getTargetPath();
// 将targetPath下所有不以.csv结尾的文件修改为以.csv结尾
Path dir = Paths.get(targetPath);
if (!Files.exists(dir) || !Files.isDirectory(dir)) {
log.info("Target path {} does not exist or is not a directory for task {}, skip post processing.", targetPath, task.getId());
return;
}
try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir)) {
for (Path path : stream) {
if (!Files.isRegularFile(path)) continue;
String name = path.getFileName().toString();
if (name.toLowerCase().endsWith(".csv")) continue;
Path target = dir.resolve(name + ".csv");
try {
Files.move(path, target, StandardCopyOption.REPLACE_EXISTING);
log.info("Renamed file for task {}: {} -> {}", task.getId(), name, target.getFileName().toString());
} catch (IOException ex) {
log.warn("Failed to rename file {} for task {}: {}", path, task.getId(), ex.getMessage(), ex);
}
}
} catch (IOException ioe) {
log.warn("Error scanning target directory {} for task {}: {}", targetPath, task.getId(), ioe.getMessage(), ioe);
}
}
} }

View File

@@ -0,0 +1,73 @@
package com.datamate.collection.infrastructure.datax.config;
import com.datamate.collection.domain.model.entity.CollectionTask;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.collections4.CollectionUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Getter
@Setter
public class MysqlConfig {
private String jdbcUrl;
private String username;
private String password;
private String querySql;
private List<String> headers;
/**
* 将当前 MYSQL 配置构造成 DataX 所需的 job JSON 字符串。
*/
public String toJobConfig(ObjectMapper objectMapper, CollectionTask task) throws Exception {
Map<String, Object> mysqlParameter = new HashMap<>();
Map<String, Object> connection = new HashMap<>();
if (username != null) mysqlParameter.put("username", username);
if (password != null) mysqlParameter.put("password", password);
if (jdbcUrl != null) connection.put("jdbcUrl", Collections.singletonList(jdbcUrl));
if (querySql != null) connection.put("querySql", Collections.singletonList(querySql));
mysqlParameter.put("connection", Collections.singletonList(connection));
Map<String, Object> job = new HashMap<>();
Map<String, Object> content = new HashMap<>();
Map<String, Object> reader = new HashMap<>();
reader.put("name", "mysqlreader");
reader.put("parameter", mysqlParameter);
content.put("reader", reader);
Map<String, Object> writer = new HashMap<>();
Map<String, Object> writerParameter = new HashMap<>();
writer.put("name", "txtfilewriter");
if (CollectionUtils.isNotEmpty(headers)) {
writerParameter.put("header", headers);
}
writerParameter.put("path", task.getTargetPath());
writerParameter.put("fileName", "collectionResult");
writerParameter.put("writeMode", "truncate");
writerParameter.put("dateFormat", "yyyy-MM-dd HH:mm:ss");
writerParameter.put("fileFormat", "csv");
writerParameter.put("encoding", "UTF-8");
writerParameter.put("fieldDelimiter", ",");
writer.put("parameter", writerParameter);
content.put("writer", writer);
job.put("content", List.of(content));
Map<String, Object> setting = new HashMap<>();
Map<String, Object> channel = new HashMap<>();
channel.put("channel", 1);
setting.put("speed", channel);
job.put("setting", setting);
Map<String, Object> jobConfig = new HashMap<>();
jobConfig.put("job", job);
return objectMapper.writeValueAsString(jobConfig);
}
}

View File

@@ -41,8 +41,7 @@ public interface CollectionTaskConverter {
default Map<String, Object> parseJsonToMap(String json) { default Map<String, Object> parseJsonToMap(String json) {
try { try {
ObjectMapper objectMapper = new ObjectMapper(); ObjectMapper objectMapper = new ObjectMapper();
return return objectMapper.readValue(json, Map.class);
objectMapper.readValue(json, Map.class);
} catch (Exception e) { } catch (Exception e) {
throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER); throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER);
} }

View File

@@ -287,19 +287,4 @@ public class DatasetApplicationService {
log.info("获取到归集任务详情: {}", taskDetail); log.info("获取到归集任务详情: {}", taskDetail);
return Collections.singletonList(taskDetail.getTargetPath()); return Collections.singletonList(taskDetail.getTargetPath());
} }
/**
* 解析任务配置
*/
private LocalCollectionConfig parseTaskConfig(Map<String, Object> configMap) {
try {
if (configMap == null || configMap.isEmpty()) {
return null;
}
return objectMapper.convertValue(configMap, LocalCollectionConfig.class);
} catch (Exception e) {
log.error("解析任务配置失败", e);
return null;
}
}
} }

View File

@@ -321,21 +321,15 @@ export default function CollectionTaskCreate() {
{selectedTemplate === TemplateType.MYSQL && ( {selectedTemplate === TemplateType.MYSQL && (
<div className="grid grid-cols-2 gap-3 px-2 bg-blue-50 rounded"> <div className="grid grid-cols-2 gap-3 px-2 bg-blue-50 rounded">
<Form.Item <Form.Item
name={["config", "host"]} name={["config", "jdbcUrl"]}
rules={[{ required: true, message: "请输入MYSQL主机名" }]} rules={[{ required: true, message: "请输入数据库链接" }]}
label="MYSQL主机名" label="数据库链接"
className="col-span-2"
> >
<Input placeholder="192.168.1.100" /> <Input placeholder="jdbc:mysql://localhost:3306/mysql?useUnicode=true&characterEncoding=utf8" />
</Form.Item> </Form.Item>
<Form.Item <Form.Item
name={["config", "port"]} name={["config", "username"]}
rules={[{ required: true, message: "请输入端口号" }]}
label="端口号"
>
<Input placeholder="3306" />
</Form.Item>
<Form.Item
name={["config", "user"]}
rules={[{ required: true, message: "请输入用户名" }]} rules={[{ required: true, message: "请输入用户名" }]}
label="用户名" label="用户名"
> >
@@ -346,22 +340,22 @@ export default function CollectionTaskCreate() {
rules={[{ required: true, message: "请输入密码" }]} rules={[{ required: true, message: "请输入密码" }]}
label="密码" label="密码"
> >
<Input placeholder="" /> <Input type="password" className="h-8 text-xs" placeholder="Secret Key" />
</Form.Item> </Form.Item>
<Form.Item <Form.Item
name={["config", "schema"]} name={["config", "querySql"]}
rules={[{ required: true, message: "请输入数据库" }]}
label="数据库"
>
<Input placeholder="public" />
</Form.Item>
<Form.Item
name={["config", "sql"]}
rules={[{ required: true, message: "请输入查询语句" }]} rules={[{ required: true, message: "请输入查询语句" }]}
label="查询语句" label="查询语句"
> >
<Input placeholder="select * from your_table" /> <Input placeholder="select * from your_table" />
</Form.Item> </Form.Item>
<Form.Item
name={["config", "headers"]}
label="列名"
className="col-span-2"
>
<Select placeholder="请输入列名" mode="tags" />
</Form.Item>
</div> </div>
)} )}
</> </>

View File

@@ -315,13 +315,13 @@
<!-- </includes>--> <!-- </includes>-->
<!-- <outputDirectory>datax</outputDirectory>--> <!-- <outputDirectory>datax</outputDirectory>-->
<!-- </fileSet>--> <!-- </fileSet>-->
<!-- <fileSet>--> <fileSet>
<!-- <directory>txtfilewriter/target/datax/</directory>--> <directory>txtfilewriter/target/datax/</directory>
<!-- <includes>--> <includes>
<!-- <include>**/*.*</include>--> <include>**/*.*</include>
<!-- </includes>--> </includes>
<!-- <outputDirectory>datax</outputDirectory>--> <outputDirectory>datax</outputDirectory>
<!-- </fileSet>--> </fileSet>
<!-- <fileSet>--> <!-- <fileSet>-->
<!-- <directory>ftpwriter/target/datax/</directory>--> <!-- <directory>ftpwriter/target/datax/</directory>-->
<!-- <includes>--> <!-- <includes>-->

View File

@@ -111,7 +111,7 @@
<!-- <module>kuduwriter</module>--> <!-- <module>kuduwriter</module>-->
<!-- <module>ftpwriter</module>--> <!-- <module>ftpwriter</module>-->
<!-- <module>hdfswriter</module>--> <!-- <module>hdfswriter</module>-->
<!-- <module>txtfilewriter</module>--> <module>txtfilewriter</module>
<!-- <module>streamwriter</module>--> <!-- <module>streamwriter</module>-->
<!-- <module>elasticsearchwriter</module>--> <!-- <module>elasticsearchwriter</module>-->