refactor: 修改调整数据归集实现,删除无用代码,优化代码结构 (#20)

This commit is contained in:
hefanli
2025-10-23 21:10:57 +08:00
committed by GitHub
parent d58c2a0ac7
commit cc072bbf90
38 changed files with 705 additions and 971 deletions

View File

@@ -127,35 +127,6 @@
<build>
<plugins>
<!-- OpenAPI Generator Plugin -->
<plugin>
<groupId>org.openapitools</groupId>
<artifactId>openapi-generator-maven-plugin</artifactId>
<version>6.6.0</version>
<executions>
<execution>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<inputSpec>${project.basedir}/../../openapi/specs/data-collection.yaml</inputSpec>
<generatorName>spring</generatorName>
<output>${project.build.directory}/generated-sources/openapi</output>
<apiPackage>com.datamate.collection.interfaces.api</apiPackage>
<modelPackage>com.datamate.collection.interfaces.dto</modelPackage>
<configOptions>
<interfaceOnly>true</interfaceOnly>
<useTags>true</useTags>
<useSpringBoot3>true</useSpringBoot3>
<documentationProvider>springdoc</documentationProvider>
<dateLibrary>java8-localdatetime</dateLibrary>
<java8>true</java8>
</configOptions>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>

View File

@@ -0,0 +1,79 @@
package com.datamate.collection.application;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.datamate.collection.domain.model.entity.CollectionTask;
import com.datamate.collection.domain.model.entity.TaskExecution;
import com.datamate.collection.common.enums.TaskStatus;
import com.datamate.collection.domain.repository.CollectionTaskRepository;
import com.datamate.collection.interfaces.dto.CollectionTaskPagingQuery;
import com.datamate.collection.common.enums.SyncMode;
import com.datamate.common.domain.utils.ChunksSaver;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
@Slf4j
@Service
@RequiredArgsConstructor
public class CollectionTaskService {
private final TaskExecutionService taskExecutionService;
private final CollectionTaskRepository collectionTaskRepository;
@Transactional
public CollectionTask create(CollectionTask task) {
task.setStatus(TaskStatus.READY);
task.setCreatedAt(LocalDateTime.now());
task.setUpdatedAt(LocalDateTime.now());
collectionTaskRepository.save(task);
executeTaskNow(task);
return task;
}
private void executeTaskNow(CollectionTask task) {
if (Objects.equals(task.getSyncMode(), SyncMode.ONCE)) {
TaskExecution exec = taskExecutionService.createExecution(task);
int timeout = task.getTimeoutSeconds() == null ? 3600 : task.getTimeoutSeconds();
taskExecutionService.runAsync(task, exec.getId(), timeout);
log.info("Triggered DataX execution for task {} at {}, execId={}", task.getId(), LocalDateTime.now(), exec.getId());
}
}
@Transactional
public CollectionTask update(CollectionTask task) {
task.setUpdatedAt(LocalDateTime.now());
collectionTaskRepository.updateById(task);
return task;
}
@Transactional
public void delete(String id) {
CollectionTask task = collectionTaskRepository.getById(id);
if (task != null) {
ChunksSaver.deleteFolder("/dataset/local/" + task.getId());
}
collectionTaskRepository.removeById(id);
}
public CollectionTask get(String id) {
return collectionTaskRepository.getById(id);
}
public IPage<CollectionTask> getTasks(CollectionTaskPagingQuery query) {
LambdaQueryWrapper<CollectionTask> wrapper = new LambdaQueryWrapper<CollectionTask>()
.eq(query.getStatus() != null, CollectionTask::getStatus, query.getStatus())
.like(StringUtils.isNotBlank(query.getName()), CollectionTask::getName, query.getName());
return collectionTaskRepository.page(new Page<>(query.getPage(), query.getSize()), wrapper);
}
public List<CollectionTask> selectActiveTasks() {
return collectionTaskRepository.selectActiveTasks();
}
}

View File

@@ -0,0 +1,57 @@
package com.datamate.collection.application;
import com.datamate.collection.domain.model.entity.CollectionTask;
import com.datamate.collection.domain.model.entity.TaskExecution;
import com.datamate.collection.common.enums.TaskStatus;
import com.datamate.collection.domain.process.ProcessRunner;
import com.datamate.collection.domain.repository.CollectionTaskRepository;
import com.datamate.collection.domain.repository.TaskExecutionRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
@Slf4j
@Service
@RequiredArgsConstructor
public class TaskExecutionService {
private final ProcessRunner processRunner;
private final TaskExecutionRepository executionRepository;
private final CollectionTaskRepository collectionTaskRepository;
@Transactional
public TaskExecution createExecution(CollectionTask task) {
TaskExecution exec = TaskExecution.initTaskExecution();
exec.setTaskId(task.getId());
exec.setTaskName(task.getName());
executionRepository.save(exec);
collectionTaskRepository.updateLastExecution(task.getId(), exec.getId());
collectionTaskRepository.updateStatus(task.getId(), TaskStatus.RUNNING.name());
return exec;
}
public TaskExecution selectLatestByTaskId(String taskId) {
return executionRepository.selectLatestByTaskId(taskId);
}
@Async
public void runAsync(CollectionTask task, String executionId, int timeoutSeconds) {
try {
int code = processRunner.runJob(task, executionId, timeoutSeconds);
log.info("DataX finished with code {} for execution {}", code, executionId);
// 简化:成功即完成
executionRepository.completeExecution(executionId, TaskStatus.SUCCESS.name(), LocalDateTime.now(),
0, 0L, 0L, 0L, null);
collectionTaskRepository.updateStatus(task.getId(), TaskStatus.SUCCESS.name());
} catch (Exception e) {
log.error("DataX execution failed", e);
executionRepository.completeExecution(executionId, TaskStatus.FAILED.name(), LocalDateTime.now(),
0, 0L, 0L, 0L, e.getMessage());
collectionTaskRepository.updateStatus(task.getId(), TaskStatus.FAILED.name());
}
}
}

View File

@@ -1,85 +0,0 @@
package com.datamate.collection.application.service;
import com.datamate.collection.domain.model.CollectionTask;
import com.datamate.collection.domain.model.TaskExecution;
import com.datamate.collection.domain.model.TaskStatus;
import com.datamate.collection.domain.model.DataxTemplate;
import com.datamate.collection.infrastructure.persistence.mapper.CollectionTaskMapper;
import com.datamate.collection.infrastructure.persistence.mapper.TaskExecutionMapper;
import com.datamate.collection.interfaces.dto.SyncMode;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@Slf4j
@Service
@RequiredArgsConstructor
public class CollectionTaskService {
private final CollectionTaskMapper taskMapper;
private final TaskExecutionMapper executionMapper;
private final DataxExecutionService dataxExecutionService;
@Transactional
public CollectionTask create(CollectionTask task) {
task.setStatus(TaskStatus.READY);
task.setCreatedAt(LocalDateTime.now());
task.setUpdatedAt(LocalDateTime.now());
taskMapper.insert(task);
executeTaskNow(task);
return task;
}
private void executeTaskNow(CollectionTask task) {
if (Objects.equals(task.getSyncMode(), SyncMode.ONCE.getValue())) {
TaskExecution exec = dataxExecutionService.createExecution(task);
int timeout = task.getTimeoutSeconds() == null ? 3600 : task.getTimeoutSeconds();
dataxExecutionService.runAsync(task, exec.getId(), timeout);
log.info("Triggered DataX execution for task {} at {}, execId={}", task.getId(), LocalDateTime.now(), exec.getId());
}
}
@Transactional
public CollectionTask update(CollectionTask task) {
task.setUpdatedAt(LocalDateTime.now());
taskMapper.update(task);
return task;
}
@Transactional
public void delete(String id) { taskMapper.deleteById(id); }
public CollectionTask get(String id) { return taskMapper.selectById(id); }
public List<CollectionTask> list(Integer page, Integer size, String status, String name) {
Map<String, Object> p = new HashMap<>();
p.put("status", status);
p.put("name", name);
if (page != null && size != null) {
p.put("offset", page * size);
p.put("limit", size);
}
return taskMapper.selectAll(p);
}
@Transactional
public TaskExecution startExecution(CollectionTask task) {
return dataxExecutionService.createExecution(task);
}
// ---- Template related merged methods ----
public List<DataxTemplate> listTemplates(String sourceType, String targetType, int page, int size) {
int offset = page * size;
return taskMapper.selectList(sourceType, targetType, offset, size);
}
public int countTemplates(String sourceType, String targetType) {
return taskMapper.countTemplates(sourceType, targetType);
}
}

View File

@@ -1,60 +0,0 @@
package com.datamate.collection.application.service;
import com.datamate.collection.domain.model.CollectionTask;
import com.datamate.collection.domain.model.TaskExecution;
import com.datamate.collection.domain.model.TaskStatus;
import com.datamate.collection.infrastructure.persistence.mapper.CollectionTaskMapper;
import com.datamate.collection.infrastructure.persistence.mapper.TaskExecutionMapper;
import com.datamate.collection.infrastructure.runtime.datax.DataxJobBuilder;
import com.datamate.collection.infrastructure.runtime.datax.DataxProcessRunner;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.nio.file.Path;
import java.time.Duration;
import java.time.LocalDateTime;
@Slf4j
@Service
@RequiredArgsConstructor
public class DataxExecutionService {
private final DataxJobBuilder jobBuilder;
private final DataxProcessRunner processRunner;
private final TaskExecutionMapper executionMapper;
private final CollectionTaskMapper taskMapper;
@Transactional
public TaskExecution createExecution(CollectionTask task) {
TaskExecution exec = TaskExecution.initTaskExecution();
exec.setTaskId(task.getId());
exec.setTaskName(task.getName());
executionMapper.insert(exec);
taskMapper.updateLastExecution(task.getId(), exec.getId());
taskMapper.updateStatus(task.getId(), TaskStatus.RUNNING.name());
return exec;
}
@Async
public void runAsync(CollectionTask task, String executionId, int timeoutSeconds) {
try {
Path job = jobBuilder.buildJobFile(task);
int code = processRunner.runJob(job.toFile(), executionId, Duration.ofSeconds(timeoutSeconds));
log.info("DataX finished with code {} for execution {}", code, executionId);
// 简化:成功即完成
executionMapper.completeExecution(executionId, TaskStatus.SUCCESS.name(), LocalDateTime.now(),
0, 0L, 0L, 0L, null, null);
taskMapper.updateStatus(task.getId(), TaskStatus.SUCCESS.name());
} catch (Exception e) {
log.error("DataX execution failed", e);
executionMapper.completeExecution(executionId, TaskStatus.FAILED.name(), LocalDateTime.now(),
0, 0L, 0L, 0L, e.getMessage(), null);
taskMapper.updateStatus(task.getId(), TaskStatus.FAILED.name());
}
}
}

View File

@@ -1,83 +0,0 @@
package com.datamate.collection.application.service;
import com.datamate.collection.domain.model.CollectionTask;
import com.datamate.collection.domain.model.TaskExecution;
import com.datamate.collection.domain.model.TaskStatus;
import com.datamate.collection.infrastructure.persistence.mapper.CollectionTaskMapper;
import com.datamate.collection.infrastructure.persistence.mapper.TaskExecutionMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
@RequiredArgsConstructor
public class TaskExecutionService {
private final TaskExecutionMapper executionMapper;
private final CollectionTaskMapper taskMapper;
public List<TaskExecution> list(String taskId, String status, LocalDateTime startDate,
LocalDateTime endDate, Integer page, Integer size) {
Map<String, Object> p = new HashMap<>();
p.put("taskId", taskId);
p.put("status", status);
p.put("startDate", startDate);
p.put("endDate", endDate);
if (page != null && size != null) {
p.put("offset", page * size);
p.put("limit", size);
}
return executionMapper.selectAll(p);
}
public long count(String taskId, String status, LocalDateTime startDate, LocalDateTime endDate) {
Map<String, Object> p = new HashMap<>();
p.put("taskId", taskId);
p.put("status", status);
p.put("startDate", startDate);
p.put("endDate", endDate);
return executionMapper.count(p);
}
// --- Added convenience methods ---
public TaskExecution get(String id) { return executionMapper.selectById(id); }
public TaskExecution getLatestByTaskId(String taskId) { return executionMapper.selectLatestByTaskId(taskId); }
@Transactional
public void complete(String executionId, boolean success, long successCount, long failedCount,
long dataSizeBytes, String errorMessage, String resultJson) {
LocalDateTime now = LocalDateTime.now();
TaskExecution exec = executionMapper.selectById(executionId);
if (exec == null) { return; }
int duration = (int) Duration.between(exec.getStartedAt(), now).getSeconds();
executionMapper.completeExecution(executionId, success ? TaskStatus.SUCCESS.name() : TaskStatus.FAILED.name(),
now, duration, successCount, failedCount, dataSizeBytes, errorMessage, resultJson);
CollectionTask task = taskMapper.selectById(exec.getTaskId());
if (task != null) {
taskMapper.updateStatus(task.getId(), success ? TaskStatus.SUCCESS.name() : TaskStatus.FAILED.name());
}
}
@Transactional
public void stop(String executionId) {
TaskExecution exec = executionMapper.selectById(executionId);
if (exec == null || exec.getStatus() != TaskStatus.RUNNING) { return; }
LocalDateTime now = LocalDateTime.now();
int duration = (int) Duration.between(exec.getStartedAt(), now).getSeconds();
// Reuse completeExecution to persist STOPPED status and timing info
executionMapper.completeExecution(exec.getId(), TaskStatus.STOPPED.name(), now, duration,
exec.getRecordsSuccess(), exec.getRecordsFailed(), exec.getDataSizeBytes(), null, exec.getResult());
taskMapper.updateStatus(exec.getTaskId(), TaskStatus.STOPPED.name());
}
@Transactional
public void stopLatestByTaskId(String taskId) {
TaskExecution latest = executionMapper.selectLatestByTaskId(taskId);
if (latest != null) { stop(latest.getId()); }
}
}

View File

@@ -0,0 +1,12 @@
package com.datamate.collection.common.enums;
/**
* 同步方式:一次性(ONCE) 或 定时(SCHEDULED)
*/
public enum SyncMode {
/** 一次性(ONCE) */
ONCE,
/// 定时(SCHEDULED)
SCHEDULED;
}

View File

@@ -1,7 +1,8 @@
package com.datamate.collection.domain.model;
package com.datamate.collection.common.enums;
/**
* 统一的任务和执行状态枚举
* 任务和执行状态枚举: - DRAFT: 草稿状态 - READY: 就绪状态 - RUNNING: 运行中 - SUCCESS: 执行成功 (对应原来的COMPLETED/SUCCESS) - FAILED: 执行失败 - STOPPED: 已停止
*
* @author Data Mate Platform Team
*/

View File

@@ -1,32 +1,36 @@
package com.datamate.collection.domain.model;
package com.datamate.collection.domain.model.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import com.datamate.collection.common.enums.SyncMode;
import com.datamate.collection.common.enums.TaskStatus;
import com.datamate.common.domain.model.base.BaseEntity;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Map;
@Data
public class CollectionTask {
private String id;
/**
* 数据采集任务实体与数据库表 t_dc_collection_tasks 对齐
*/
@Getter
@Setter
@TableName(value = "t_dc_collection_tasks", autoResultMap = true)
public class CollectionTask extends BaseEntity<String> {
private String name;
private String description;
private String config; // DataX JSON 配置包含源端和目标端配置信息
private TaskStatus status;
private String syncMode; // ONCE / SCHEDULED
private SyncMode syncMode; // ONCE / SCHEDULED
private String scheduleExpression;
private Integer retryCount;
private Integer timeoutSeconds;
private Long maxRecords;
private String sortField;
private String lastExecutionId;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
private String createdBy;
private String updatedBy;
public void addPath() {
try {

View File

@@ -1,4 +1,4 @@
package com.datamate.collection.domain.model;
package com.datamate.collection.domain.model.entity;
import lombok.Data;
import lombok.EqualsAndHashCode;

View File

@@ -1,13 +1,19 @@
package com.datamate.collection.domain.model;
package com.datamate.collection.domain.model.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import com.datamate.collection.common.enums.TaskStatus;
import com.datamate.common.domain.model.base.BaseEntity;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import java.time.LocalDateTime;
import java.util.UUID;
@Data
public class TaskExecution {
private String id;
@Getter
@Setter
@TableName(value = "t_dc_task_executions", autoResultMap = true)
public class TaskExecution extends BaseEntity<String> {
private String taskId;
private String taskName;
private TaskStatus status;
@@ -25,7 +31,6 @@ public class TaskExecution {
private String dataxJobId;
private String config;
private String result;
private LocalDateTime createdAt;
public static TaskExecution initTaskExecution() {
TaskExecution exec = new TaskExecution();

View File

@@ -0,0 +1,21 @@
package com.datamate.collection.domain.process;
import com.datamate.collection.domain.model.entity.CollectionTask;
/**
* 归集执行器接口
*
* @since 2025/10/23
*/
public interface ProcessRunner {
/**
* 执行归集任务
*
* @param task 任务
* @param executionId 执行ID
* @param timeoutSeconds 超时时间(秒)
* @return 执行结果
* @throws Exception 执行异常
*/
int runJob(CollectionTask task, String executionId, int timeoutSeconds) throws Exception;
}

View File

@@ -0,0 +1,19 @@
package com.datamate.collection.domain.repository;
import com.baomidou.mybatisplus.extension.repository.IRepository;
import com.datamate.collection.domain.model.entity.CollectionTask;
import java.util.List;
/**
* 归集任务仓储层
*
* @since 2025/10/23
*/
public interface CollectionTaskRepository extends IRepository<CollectionTask> {
List<CollectionTask> selectActiveTasks();
void updateStatus(String id, String status);
void updateLastExecution(String id, String lastExecutionId);
}

View File

@@ -0,0 +1,19 @@
package com.datamate.collection.domain.repository;
import com.baomidou.mybatisplus.extension.service.IService;
import com.datamate.collection.domain.model.entity.TaskExecution;
import java.time.LocalDateTime;
/**
* TaskExecutionRepository
*
* @since 2025/10/23
*/
public interface TaskExecutionRepository extends IService<TaskExecution> {
TaskExecution selectLatestByTaskId(String taskId);
void completeExecution(String executionId, String status, LocalDateTime completedAt,
Integer recordsProcessed, Long recordsTotal,
Long recordsSuccess, Long recordsFailed, String errorMessage);
}

View File

@@ -0,0 +1,124 @@
package com.datamate.collection.infrastructure.datax;
import com.datamate.collection.domain.model.entity.CollectionTask;
import com.datamate.collection.domain.process.ProcessRunner;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.SystemErrorCode;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.exec.*;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
@RequiredArgsConstructor
public class DataxProcessRunner implements ProcessRunner {
private final DataxProperties props;
@Override
public int runJob(CollectionTask task, String executionId, int timeoutSeconds) throws Exception {
Path job = buildJobFile(task);
return runJob(job.toFile(), executionId, Duration.ofSeconds(timeoutSeconds));
}
private int runJob(File jobFile, String executionId, Duration timeout) throws Exception {
File logFile = new File(props.getLogPath(), String.format("datax-%s.log", executionId));
String python = props.getPythonPath();
String dataxPy = props.getHomePath() + File.separator + "bin" + File.separator + "datax.py";
String cmd = String.format("%s %s %s", python, dataxPy, jobFile.getAbsolutePath());
log.info("Execute DataX: {}", cmd);
CommandLine cl = CommandLine.parse(cmd);
DefaultExecutor executor = getExecutor(timeout, logFile);
return executor.execute(cl);
}
private static DefaultExecutor getExecutor(Duration timeout, File logFile) throws FileNotFoundException {
DefaultExecutor executor = new DefaultExecutor();
// 将日志追加输出到文件
File parent = logFile.getParentFile();
if (!parent.exists()) {
parent.mkdirs();
}
ExecuteStreamHandler streamHandler = new PumpStreamHandler(
new org.apache.commons.io.output.TeeOutputStream(
new java.io.FileOutputStream(logFile, true), System.out),
new org.apache.commons.io.output.TeeOutputStream(
new java.io.FileOutputStream(logFile, true), System.err)
);
executor.setStreamHandler(streamHandler);
ExecuteWatchdog watchdog = new ExecuteWatchdog(timeout.toMillis());
executor.setWatchdog(watchdog);
return executor;
}
private Path buildJobFile(CollectionTask task) throws IOException {
Files.createDirectories(Paths.get(props.getJobConfigPath()));
String fileName = String.format("datax-job-%s.json", task.getId());
Path path = Paths.get(props.getJobConfigPath(), fileName);
// 简化:直接将任务中的 config 字段作为 DataX 作业 JSON
try (FileWriter fw = new FileWriter(path.toFile())) {
if (StringUtils.isBlank(task.getConfig())) {
throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR);
}
String json = getJobConfig(task);
log.info("Job config: {}", json);
fw.write(json);
}
return path;
}
private String getJobConfig(CollectionTask task) {
try {
ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> parameter = objectMapper.readValue(
task.getConfig(),
new TypeReference<>() {
}
);
Map<String, Object> job = new HashMap<>();
Map<String, Object> content = new HashMap<>();
Map<String, Object> reader = new HashMap<>();
reader.put("name", "nfsreader");
reader.put("parameter", parameter);
content.put("reader", reader);
Map<String, Object> writer = new HashMap<>();
writer.put("name", "nfswriter");
writer.put("parameter", parameter);
content.put("writer", writer);
job.put("content", List.of(content));
Map<String, Object> setting = new HashMap<>();
Map<String, Object> channel = new HashMap<>();
channel.put("channel", 2);
setting.put("speed", channel);
job.put("setting", setting);
Map<String, Object> jobConfig = new HashMap<>();
jobConfig.put("job", job);
return objectMapper.writeValueAsString(jobConfig);
} catch (Exception e) {
log.error("Failed to parse task config", e);
throw new RuntimeException("Failed to parse task config", e);
}
}
}

View File

@@ -1,4 +1,4 @@
package com.datamate.collection.infrastructure.runtime.datax;
package com.datamate.collection.infrastructure.datax;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

View File

@@ -1,47 +1,15 @@
package com.datamate.collection.infrastructure.persistence.mapper;
import com.datamate.collection.domain.model.CollectionTask;
import com.datamate.collection.domain.model.DataxTemplate;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.datamate.collection.domain.model.entity.CollectionTask;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import java.util.Map;
@Mapper
public interface CollectionTaskMapper {
int insert(CollectionTask entity);
int update(CollectionTask entity);
int deleteById(@Param("id") String id);
CollectionTask selectById(@Param("id") String id);
CollectionTask selectByName(@Param("name") String name);
List<CollectionTask> selectByStatus(@Param("status") String status);
List<CollectionTask> selectAll(Map<String, Object> params);
public interface CollectionTaskMapper extends BaseMapper<CollectionTask> {
int updateStatus(@Param("id") String id, @Param("status") String status);
int updateLastExecution(@Param("id") String id, @Param("lastExecutionId") String lastExecutionId);
List<CollectionTask> selectActiveTasks();
/**
* 查询模板列表
*
* @param sourceType 源数据源类型(可选)
* @param targetType 目标数据源类型(可选)
* @param offset 偏移量
* @param limit 限制数量
* @return 模板列表
*/
List<DataxTemplate> selectList(@Param("sourceType") String sourceType,
@Param("targetType") String targetType,
@Param("offset") int offset,
@Param("limit") int limit);
/**
* 统计模板数量
*
* @param sourceType 源数据源类型(可选)
* @param targetType 目标数据源类型(可选)
* @return 模板总数
*/
int countTemplates(@Param("sourceType") String sourceType,
@Param("targetType") String targetType);
}

View File

@@ -1,38 +1,22 @@
package com.datamate.collection.infrastructure.persistence.mapper;
import com.datamate.collection.domain.model.TaskExecution;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.datamate.collection.domain.model.entity.TaskExecution;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
@Mapper
public interface TaskExecutionMapper {
int insert(TaskExecution entity);
int update(TaskExecution entity);
int deleteById(@Param("id") String id);
TaskExecution selectById(@Param("id") String id);
List<TaskExecution> selectByTaskId(@Param("taskId") String taskId, @Param("limit") Integer limit);
List<TaskExecution> selectByStatus(@Param("status") String status);
List<TaskExecution> selectAll(Map<String, Object> params);
long count(Map<String, Object> params);
int updateProgress(@Param("id") String id,
@Param("status") String status,
@Param("progress") Double progress,
@Param("recordsProcessed") Long recordsProcessed,
@Param("throughput") Double throughput);
int completeExecution(@Param("id") String id,
@Param("status") String status,
@Param("completedAt") LocalDateTime completedAt,
@Param("durationSeconds") Integer durationSeconds,
@Param("recordsSuccess") Long recordsSuccess,
@Param("recordsFailed") Long recordsFailed,
@Param("dataSizeBytes") Long dataSizeBytes,
@Param("errorMessage") String errorMessage,
@Param("result") String result);
List<TaskExecution> selectRunningExecutions();
public interface TaskExecutionMapper extends BaseMapper<TaskExecution> {
TaskExecution selectLatestByTaskId(@Param("taskId") String taskId);
int deleteOldExecutions(@Param("beforeDate") LocalDateTime beforeDate);
void completeExecution(@Param("executionId") String executionId,
@Param("status") String status,
@Param("completedAt") LocalDateTime completedAt,
@Param("recordsProcessed") Integer recordsProcessed,
@Param("recordsTotal") Long recordsTotal,
@Param("recordsSuccess") Long recordsSuccess,
@Param("recordsFailed") Long recordsFailed,
@Param("errorMessage") String errorMessage);
}

View File

@@ -0,0 +1,36 @@
package com.datamate.collection.infrastructure.persistence.repository;
import com.baomidou.mybatisplus.extension.repository.CrudRepository;
import com.datamate.collection.domain.model.entity.CollectionTask;
import com.datamate.collection.domain.repository.CollectionTaskRepository;
import com.datamate.collection.infrastructure.persistence.mapper.CollectionTaskMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Repository;
import java.util.List;
/**
* CollectionTaskRepositoryImpl
*
* @since 2025/10/23
*/
@Repository
@RequiredArgsConstructor
public class CollectionTaskRepositoryImpl extends CrudRepository<CollectionTaskMapper, CollectionTask> implements CollectionTaskRepository {
private final CollectionTaskMapper collectionTaskMapper;
@Override
public List<CollectionTask> selectActiveTasks() {
return collectionTaskMapper.selectActiveTasks();
}
@Override
public void updateStatus(String id, String status) {
collectionTaskMapper.updateStatus(id, status);
}
@Override
public void updateLastExecution(String id, String lastExecutionId) {
collectionTaskMapper.updateLastExecution(id, lastExecutionId);
}
}

View File

@@ -0,0 +1,37 @@
package com.datamate.collection.infrastructure.persistence.repository;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.datamate.collection.domain.model.entity.TaskExecution;
import com.datamate.collection.domain.repository.TaskExecutionRepository;
import com.datamate.collection.infrastructure.persistence.mapper.TaskExecutionMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Repository;
import java.time.LocalDateTime;
/**
* TaskExecutionRepositoryImpl
*
* @since 2025/10/23
*/
@Repository
@RequiredArgsConstructor
public class TaskExecutionRepositoryImpl extends ServiceImpl<TaskExecutionMapper, TaskExecution>
implements TaskExecutionRepository {
private final TaskExecutionMapper taskExecutionMapper;
@Override
public TaskExecution selectLatestByTaskId(String taskId) {
return taskExecutionMapper.selectLatestByTaskId(taskId);
}
@Override
public void completeExecution(String executionId, String status, LocalDateTime completedAt,
Integer recordsProcessed, Long recordsTotal,
Long recordsSuccess, Long recordsFailed, String errorMessage) {
taskExecutionMapper.completeExecution(executionId, status, completedAt,
recordsProcessed, recordsTotal,
recordsSuccess, recordsFailed, errorMessage);
}
}

View File

@@ -1,83 +0,0 @@
package com.datamate.collection.infrastructure.runtime.datax;
import com.datamate.collection.domain.model.CollectionTask;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 根据任务配置拼装 DataX 作业 JSON 文件
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class DataxJobBuilder {
private final DataxProperties props;
public Path buildJobFile(CollectionTask task) throws IOException {
Files.createDirectories(Paths.get(props.getJobConfigPath()));
String fileName = String.format("datax-job-%s.json", task.getId());
Path path = Paths.get(props.getJobConfigPath(), fileName);
// 简化:直接将任务中的 config 字段作为 DataX 作业 JSON
try (FileWriter fw = new FileWriter(path.toFile())) {
String json = task.getConfig() == null || task.getConfig().isEmpty() ?
defaultJobJson() : task.getConfig();
if (StringUtils.isNotBlank(task.getConfig())) {
json = getJobConfig(task);
}
log.info("Job config: {}", json);
fw.write(json);
}
return path;
}
private String getJobConfig(CollectionTask task) {
try {
ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> parameter = objectMapper.readValue(
task.getConfig(),
new TypeReference<>() {}
);
Map<String, Object> job = new HashMap<>();
Map<String, Object> content = new HashMap<>();
Map<String, Object> reader = new HashMap<>();
reader.put("name", "nfsreader");
reader.put("parameter", parameter);
content.put("reader", reader);
Map<String, Object> writer = new HashMap<>();
writer.put("name", "nfswriter");
writer.put("parameter", parameter);
content.put("writer", writer);
job.put("content", List.of(content));
Map<String, Object> setting = new HashMap<>();
Map<String, Object> channel = new HashMap<>();
channel.put("channel", 2);
setting.put("speed", channel);
job.put("setting", setting);
Map<String, Object> jobConfig = new HashMap<>();
jobConfig.put("job", job);
return objectMapper.writeValueAsString(jobConfig);
} catch (Exception e) {
log.error("Failed to parse task config", e);
throw new RuntimeException("Failed to parse task config", e);
}
}
private String defaultJobJson() {
// 提供一个最小可运行的空 job,实际会被具体任务覆盖
return "{\n \"job\": {\n \"setting\": {\n \"speed\": {\n \"channel\": 1\n }\n },\n \"content\": []\n }\n}";
}
}

View File

@@ -1,46 +0,0 @@
package com.datamate.collection.infrastructure.runtime.datax;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.exec.*;
import org.springframework.stereotype.Component;
import java.io.File;
import java.time.Duration;
@Slf4j
@Component
@RequiredArgsConstructor
public class DataxProcessRunner {
private final DataxProperties props;
public int runJob(File jobFile, String executionId, Duration timeout) throws Exception {
File logFile = new File(props.getLogPath(), String.format("datax-%s.log", executionId));
String python = props.getPythonPath();
String dataxPy = props.getHomePath() + File.separator + "bin" + File.separator + "datax.py";
String cmd = String.format("%s %s %s", python, dataxPy, jobFile.getAbsolutePath());
log.info("Execute DataX: {}", cmd);
CommandLine cl = CommandLine.parse(cmd);
DefaultExecutor executor = new DefaultExecutor();
// 将日志追加输出到文件
File parent = logFile.getParentFile();
if (!parent.exists()) parent.mkdirs();
ExecuteStreamHandler streamHandler = new PumpStreamHandler(
new org.apache.commons.io.output.TeeOutputStream(
new java.io.FileOutputStream(logFile, true), System.out),
new org.apache.commons.io.output.TeeOutputStream(
new java.io.FileOutputStream(logFile, true), System.err)
);
executor.setStreamHandler(streamHandler);
ExecuteWatchdog watchdog = new ExecuteWatchdog(timeout.toMillis());
executor.setWatchdog(watchdog);
return executor.execute(cl);
}
}

View File

@@ -1,16 +1,18 @@
package com.datamate.collection.interfaces.converter;
import com.datamate.collection.domain.model.CollectionTask;
import com.datamate.collection.domain.model.DataxTemplate;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.datamate.collection.domain.model.entity.CollectionTask;
import com.datamate.collection.interfaces.dto.*;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.SystemErrorCode;
import com.datamate.common.interfaces.PagedResponse;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.Named;
import org.mapstruct.factory.Mappers;
import java.util.List;
import java.util.Map;
@Mapper
@@ -20,9 +22,7 @@ public interface CollectionTaskConverter {
@Mapping(source = "config", target = "config", qualifiedByName = "parseJsonToMap")
CollectionTaskResponse toResponse(CollectionTask task);
CollectionTaskSummary toSummary(CollectionTask task);
DataxTemplateSummary toTemplateSummary(DataxTemplate template);
List<CollectionTaskResponse> toResponse(List<CollectionTask> tasks);
@Mapping(source = "config", target = "config", qualifiedByName = "mapToJsonString")
CollectionTask toCollectionTask(CreateCollectionTaskRequest request);
@@ -30,11 +30,19 @@ public interface CollectionTaskConverter {
@Mapping(source = "config", target = "config", qualifiedByName = "mapToJsonString")
CollectionTask toCollectionTask(UpdateCollectionTaskRequest request);
@Mapping(source = "current", target = "page")
@Mapping(source = "size", target = "size")
@Mapping(source = "total", target = "totalElements")
@Mapping(source = "pages", target = "totalPages")
@Mapping(source = "records", target = "content")
PagedResponse<CollectionTaskResponse> toResponse(IPage<CollectionTask> tasks);
@Named("parseJsonToMap")
default Map<String, Object> parseJsonToMap(String json) {
try {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(json, Map.class);
return
objectMapper.readValue(json, Map.class);
} catch (Exception e) {
throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER);
}

View File

@@ -0,0 +1,25 @@
package com.datamate.collection.interfaces.dto;
import com.datamate.collection.common.enums.TaskStatus;
import com.datamate.common.interfaces.PagingQuery;
import lombok.Getter;
import lombok.Setter;
/**
* 归集任务分页查询参数
*
* @since 2025/10/23
*/
@Getter
@Setter
public class CollectionTaskPagingQuery extends PagingQuery {
/**
* 任务状态
*/
private TaskStatus status;
/**
* 任务名称
*/
private String name;
}

View File

@@ -0,0 +1,48 @@
package com.datamate.collection.interfaces.dto;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import com.datamate.collection.common.enums.TaskStatus;
import com.datamate.collection.common.enums.SyncMode;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.springframework.format.annotation.DateTimeFormat;
import jakarta.validation.Valid;
/**
* CollectionTaskResponse
*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class CollectionTaskResponse {
private String id;
private String name;
private String description;
@Valid
private Map<String, Object> config = new HashMap<>();
private TaskStatus status;
private SyncMode syncMode;
private String scheduleExpression;
private String lastExecutionId;
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
private LocalDateTime createdAt;
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
private LocalDateTime updatedAt;
}

View File

@@ -0,0 +1,53 @@
package com.datamate.collection.interfaces.dto;
import com.datamate.collection.common.enums.SyncMode;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.HashMap;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import jakarta.validation.Valid;
import jakarta.validation.constraints.*;
import io.swagger.v3.oas.annotations.media.Schema;
/**
* CreateCollectionTaskRequest
*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class CreateCollectionTaskRequest {
@NotNull
@Size(min = 1, max = 100)
@Schema(name = "name", description = "任务名称", requiredMode = Schema.RequiredMode.REQUIRED)
@JsonProperty("name")
private String name;
@Size(max = 500)
@Schema(name = "description", description = "任务描述", requiredMode = Schema.RequiredMode.NOT_REQUIRED)
@JsonProperty("description")
private String description;
@Valid
@NotNull
@Schema(name = "config", description = "归集配置,包含源端和目标端配置信息", requiredMode = Schema.RequiredMode.REQUIRED)
@JsonProperty("config")
private Map<String, Object> config = new HashMap<>();
@NotNull
@Valid
@Schema(name = "syncMode", requiredMode = Schema.RequiredMode.REQUIRED)
@JsonProperty("syncMode")
private SyncMode syncMode;
@Schema(name = "scheduleExpression", description = "Cron调度表达式 (syncMode=SCHEDULED 时必填)", requiredMode = Schema.RequiredMode.NOT_REQUIRED)
@JsonProperty("scheduleExpression")
private String scheduleExpression;
}

View File

@@ -0,0 +1,50 @@
package com.datamate.collection.interfaces.dto;
import com.datamate.collection.common.enums.SyncMode;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.HashMap;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import jakarta.validation.Valid;
import jakarta.validation.constraints.*;
import io.swagger.v3.oas.annotations.media.Schema;
/**
* UpdateCollectionTaskRequest
*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class UpdateCollectionTaskRequest {
@Size(min = 1, max = 100)
@Schema(name = "name", description = "任务名称", requiredMode = Schema.RequiredMode.NOT_REQUIRED)
@JsonProperty("name")
private String name;
@Size(max = 500)
@Schema(name = "description", description = "任务描述", requiredMode = Schema.RequiredMode.NOT_REQUIRED)
@JsonProperty("description")
private String description;
@Valid
@Schema(name = "config", description = "归集配置,包含源端和目标端配置信息", requiredMode = Schema.RequiredMode.NOT_REQUIRED)
@JsonProperty("config")
private Map<String, Object> config = new HashMap<>();
@Valid
@Schema(name = "syncMode", requiredMode = Schema.RequiredMode.NOT_REQUIRED)
@JsonProperty("syncMode")
private SyncMode syncMode;
@Schema(name = "scheduleExpression", description = "Cron调度表达式 (syncMode=SCHEDULED 时必填)", requiredMode = Schema.RequiredMode.NOT_REQUIRED)
@JsonProperty("scheduleExpression")
private String scheduleExpression;
}

View File

@@ -1,38 +1,36 @@
package com.datamate.collection.interfaces.rest;
import com.datamate.collection.application.service.CollectionTaskService;
import com.datamate.collection.domain.model.CollectionTask;
import com.datamate.collection.domain.model.DataxTemplate;
import com.datamate.collection.interfaces.api.CollectionTaskApi;
import com.datamate.collection.application.CollectionTaskService;
import com.datamate.collection.domain.model.entity.CollectionTask;
import com.datamate.collection.interfaces.converter.CollectionTaskConverter;
import com.datamate.collection.interfaces.dto.*;
import com.datamate.common.interfaces.PagedResponse;
import jakarta.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import java.util.*;
import java.util.stream.Collectors;
@Slf4j
@RestController
@RequestMapping("/data-collection/tasks")
@RequiredArgsConstructor
@Validated
public class CollectionTaskController implements CollectionTaskApi {
public class CollectionTaskController{
private final CollectionTaskService taskService;
@Override
public ResponseEntity<CollectionTaskResponse> createTask(CreateCollectionTaskRequest request) {
@PostMapping
public ResponseEntity<CollectionTaskResponse> createTask(@Valid @RequestBody CreateCollectionTaskRequest request) {
CollectionTask task = CollectionTaskConverter.INSTANCE.toCollectionTask(request);
task.setId(UUID.randomUUID().toString());
task.addPath();
return ResponseEntity.ok().body(CollectionTaskConverter.INSTANCE.toResponse(taskService.create(task)));
}
@Override
public ResponseEntity<CollectionTaskResponse> updateTask(String id, UpdateCollectionTaskRequest request) {
@PutMapping("/{id}")
public ResponseEntity<CollectionTaskResponse> updateTask(@PathVariable("id") String id, @Valid @RequestBody UpdateCollectionTaskRequest request) {
if (taskService.get(id) == null) {
return ResponseEntity.notFound().build();
}
@@ -41,43 +39,20 @@ public class CollectionTaskController implements CollectionTaskApi {
return ResponseEntity.ok(CollectionTaskConverter.INSTANCE.toResponse(taskService.update(task)));
}
@Override
public ResponseEntity<Void> deleteTask(String id) {
@DeleteMapping("/{id}")
public ResponseEntity<Void> deleteTask(@PathVariable("id") String id) {
taskService.delete(id);
return ResponseEntity.ok().build();
}
@Override
public ResponseEntity<CollectionTaskResponse> getTaskDetail(String id) {
@GetMapping("/{id}")
public ResponseEntity<CollectionTaskResponse> getTaskDetail(@PathVariable("id") String id) {
CollectionTask task = taskService.get(id);
return task == null ? ResponseEntity.notFound().build() : ResponseEntity.ok(CollectionTaskConverter.INSTANCE.toResponse(task));
}
@Override
public ResponseEntity<PagedCollectionTaskSummary> getTasks(Integer page, Integer size, TaskStatus status, String name) {
var list = taskService.list(page, size, status == null ? null : status.getValue(), name);
PagedCollectionTaskSummary response = new PagedCollectionTaskSummary();
response.setContent(list.stream().map(CollectionTaskConverter.INSTANCE::toSummary).collect(Collectors.toList()));
response.setNumber(page);
response.setSize(size);
response.setTotalElements(list.size()); // 简化处理,实际项目中应该有单独的count查询
response.setTotalPages(size == null || size == 0 ? 1 : (int) Math.ceil(list.size() * 1.0 / size));
return ResponseEntity.ok(response);
}
@Override
public ResponseEntity<PagedDataxTemplates> templatesGet(String sourceType, String targetType,
Integer page, Integer size) {
int pageNum = page != null ? page : 0;
int pageSize = size != null ? size : 20;
List<DataxTemplate> templates = taskService.listTemplates(sourceType, targetType, pageNum, pageSize);
int totalElements = taskService.countTemplates(sourceType, targetType);
PagedDataxTemplates response = new PagedDataxTemplates();
response.setContent(templates.stream().map(CollectionTaskConverter.INSTANCE::toTemplateSummary).collect(Collectors.toList()));
response.setNumber(pageNum);
response.setSize(pageSize);
response.setTotalElements(totalElements);
response.setTotalPages(pageSize > 0 ? (int) Math.ceil(totalElements * 1.0 / pageSize) : 1);
return ResponseEntity.ok(response);
@GetMapping
public ResponseEntity<PagedResponse<CollectionTaskResponse>> getTasks(@Valid CollectionTaskPagingQuery query) {
return ResponseEntity.ok(CollectionTaskConverter.INSTANCE.toResponse(taskService.getTasks(query)));
}
}

View File

@@ -1,101 +0,0 @@
package com.datamate.collection.interfaces.rest;
import com.datamate.collection.application.service.CollectionTaskService;
import com.datamate.collection.application.service.TaskExecutionService;
import com.datamate.collection.domain.model.TaskExecution;
import com.datamate.collection.interfaces.api.TaskExecutionApi;
import com.datamate.collection.interfaces.dto.PagedTaskExecutions;
import com.datamate.collection.interfaces.dto.TaskExecutionDetail;
import com.datamate.collection.interfaces.dto.TaskExecutionResponse;
import com.datamate.collection.interfaces.dto.TaskStatus; // DTO enum
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RestController;
import java.util.stream.Collectors;
@RestController
@RequiredArgsConstructor
@Validated
public class TaskExecutionController implements TaskExecutionApi {
private final TaskExecutionService executionService;
private final CollectionTaskService taskService;
private TaskExecutionDetail toDetail(TaskExecution e) {
TaskExecutionDetail d = new TaskExecutionDetail();
d.setId(e.getId());
d.setTaskId(e.getTaskId());
d.setTaskName(e.getTaskName());
if (e.getStatus() != null) { d.setStatus(TaskStatus.fromValue(e.getStatus().name())); }
d.setProgress(e.getProgress());
d.setRecordsTotal(e.getRecordsTotal() != null ? e.getRecordsTotal().intValue() : null);
d.setRecordsProcessed(e.getRecordsProcessed() != null ? e.getRecordsProcessed().intValue() : null);
d.setRecordsSuccess(e.getRecordsSuccess() != null ? e.getRecordsSuccess().intValue() : null);
d.setRecordsFailed(e.getRecordsFailed() != null ? e.getRecordsFailed().intValue() : null);
d.setThroughput(e.getThroughput());
d.setDataSizeBytes(e.getDataSizeBytes() != null ? e.getDataSizeBytes().intValue() : null);
d.setStartedAt(e.getStartedAt());
d.setCompletedAt(e.getCompletedAt());
d.setDurationSeconds(e.getDurationSeconds());
d.setErrorMessage(e.getErrorMessage());
return d;
}
// GET /executions/{id}
@Override
public ResponseEntity<TaskExecutionDetail> executionsIdGet(String id) {
var exec = executionService.get(id);
return exec == null ? ResponseEntity.notFound().build() : ResponseEntity.ok(toDetail(exec));
}
// DELETE /executions/{id}
@Override
public ResponseEntity<Void> executionsIdDelete(String id) {
executionService.stop(id); // 幂等处理,在service内部判断状态
return ResponseEntity.noContent().build();
}
// POST /tasks/{id}/execute -> 201
@Override
public ResponseEntity<TaskExecutionResponse> tasksIdExecutePost(String id) {
var task = taskService.get(id);
if (task == null) { return ResponseEntity.notFound().build(); }
var latestExec = executionService.getLatestByTaskId(id);
if (latestExec != null && latestExec.getStatus() == com.datamate.collection.domain.model.TaskStatus.RUNNING) {
TaskExecutionResponse r = new TaskExecutionResponse();
r.setId(latestExec.getId());
r.setTaskId(latestExec.getTaskId());
r.setTaskName(latestExec.getTaskName());
r.setStatus(TaskStatus.fromValue(latestExec.getStatus().name()));
r.setStartedAt(latestExec.getStartedAt());
return ResponseEntity.status(HttpStatus.CREATED).body(r); // 返回已有运行实例
}
var exec = taskService.startExecution(task);
TaskExecutionResponse r = new TaskExecutionResponse();
r.setId(exec.getId());
r.setTaskId(exec.getTaskId());
r.setTaskName(exec.getTaskName());
r.setStatus(TaskStatus.fromValue(exec.getStatus().name()));
r.setStartedAt(exec.getStartedAt());
return ResponseEntity.status(HttpStatus.CREATED).body(r);
}
// GET /tasks/{id}/executions -> 分页
@Override
public ResponseEntity<PagedTaskExecutions> tasksIdExecutionsGet(String id, Integer page, Integer size) {
if (page == null || page < 0) { page = 0; }
if (size == null || size <= 0) { size = 20; }
var list = executionService.list(id, null, null, null, page, size);
long total = executionService.count(id, null, null, null);
PagedTaskExecutions p = new PagedTaskExecutions();
p.setContent(list.stream().map(this::toDetail).collect(Collectors.toList()));
p.setNumber(page);
p.setSize(size);
p.setTotalElements((int) total);
p.setTotalPages(size == 0 ? 1 : (int) Math.ceil(total * 1.0 / size));
return ResponseEntity.ok(p);
}
}

View File

@@ -1,11 +1,10 @@
package com.datamate.collection.application.scheduler;
package com.datamate.collection.interfaces.scheduler;
import com.datamate.collection.application.service.DataxExecutionService;
import com.datamate.collection.domain.model.CollectionTask;
import com.datamate.collection.domain.model.TaskStatus;
import com.datamate.collection.domain.model.TaskExecution;
import com.datamate.collection.infrastructure.persistence.mapper.CollectionTaskMapper;
import com.datamate.collection.infrastructure.persistence.mapper.TaskExecutionMapper;
import com.datamate.collection.application.CollectionTaskService;
import com.datamate.collection.application.TaskExecutionService;
import com.datamate.collection.common.enums.TaskStatus;
import com.datamate.collection.domain.model.entity.CollectionTask;
import com.datamate.collection.domain.model.entity.TaskExecution;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
@@ -21,14 +20,13 @@ import java.util.List;
@RequiredArgsConstructor
public class TaskSchedulerInitializer {
private final CollectionTaskMapper taskMapper;
private final TaskExecutionMapper executionMapper;
private final DataxExecutionService dataxExecutionService;
private final CollectionTaskService collectionTaskService;
private final TaskExecutionService taskExecutionService;
// 定期扫描激活的采集任务根据 Cron 判断是否到期执行
@Scheduled(fixedDelayString = "${datamate.data-collection.scheduler.scan-interval-ms:10000}")
public void scanAndTrigger() {
List<CollectionTask> tasks = taskMapper.selectActiveTasks();
List<CollectionTask> tasks = collectionTaskService.selectActiveTasks();
if (tasks == null || tasks.isEmpty()) {
return;
}
@@ -40,7 +38,7 @@ public class TaskSchedulerInitializer {
}
try {
// 如果最近一次执行仍在运行则跳过
TaskExecution latest = executionMapper.selectLatestByTaskId(task.getId());
TaskExecution latest = taskExecutionService.selectLatestByTaskId(task.getId());
if (latest != null && latest.getStatus() == TaskStatus.RUNNING) {
continue;
}
@@ -53,9 +51,9 @@ public class TaskSchedulerInitializer {
if (nextTime != null && !nextTime.isAfter(now)) {
// 到期触发一次执行
TaskExecution exec = dataxExecutionService.createExecution(task);
TaskExecution exec = taskExecutionService.createExecution(task);
int timeout = task.getTimeoutSeconds() == null ? 3600 : task.getTimeoutSeconds();
dataxExecutionService.runAsync(task, exec.getId(), timeout);
taskExecutionService.runAsync(task, exec.getId(), timeout);
log.info("Triggered DataX execution for task {} at {}, execId={}", task.getId(), now, exec.getId());
}
} catch (Exception ex) {

View File

@@ -1,11 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.datamate.collection.infrastructure.persistence.mapper.CollectionTaskMapper">
<!-- Result Map -->
<resultMap id="CollectionTaskResultMap" type="com.datamate.collection.domain.model.CollectionTask">
<resultMap id="CollectionTaskResultMap" type="com.datamate.collection.domain.model.entity.CollectionTask">
<id property="id" column="id"/>
<result property="name" column="name"/>
<result property="description" column="description"/>
@@ -24,21 +23,6 @@
<result property="updatedBy" column="updated_by"/>
</resultMap>
<!-- 结果映射 (模板) -->
<resultMap id="DataxTemplateResultMap" type="com.datamate.collection.domain.model.DataxTemplate">
<id column="id" property="id" jdbcType="VARCHAR"/>
<result column="name" property="name" jdbcType="VARCHAR"/>
<result column="source_type" property="sourceType" jdbcType="VARCHAR"/>
<result column="target_type" property="targetType" jdbcType="VARCHAR"/>
<result column="template_content" property="templateContent" jdbcType="VARCHAR"/>
<result column="description" property="description" jdbcType="VARCHAR"/>
<result column="version" property="version" jdbcType="VARCHAR"/>
<result column="is_system" property="isSystem" jdbcType="BOOLEAN"/>
<result column="created_at" property="createdAt" jdbcType="TIMESTAMP"/>
<result column="updated_at" property="updatedAt" jdbcType="TIMESTAMP"/>
<result column="created_by" property="createdBy" jdbcType="VARCHAR"/>
</resultMap>
<!-- Base Column List (tasks) -->
<sql id="Base_Column_List">
id,
@@ -47,96 +31,6 @@
last_execution_id, created_at, updated_at, created_by, updated_by
</sql>
<!-- Template Column List -->
<sql id="Template_Column_List">
id, name, source_type, target_type, template_content, description, version, is_system, created_at, updated_at, created_by
</sql>
<!-- Insert -->
<insert id="insert" parameterType="com.datamate.collection.domain.model.CollectionTask">
INSERT INTO t_dc_collection_tasks (id, name, description, config, status, sync_mode,
schedule_expression, retry_count, timeout_seconds, max_records, sort_field,
last_execution_id, created_at, updated_at, created_by, updated_by)
VALUES (#{id}, #{name}, #{description}, #{config}, #{status}, #{syncMode},
#{scheduleExpression}, #{retryCount}, #{timeoutSeconds}, #{maxRecords}, #{sortField},
#{lastExecutionId}, #{createdAt}, #{updatedAt}, #{createdBy}, #{updatedBy})
</insert>
<!-- Update -->
<update id="update" parameterType="com.datamate.collection.domain.model.CollectionTask">
UPDATE t_dc_collection_tasks
SET name = #{name},
description = #{description},
config = #{config},
status = #{status},
sync_mode = #{syncMode},
schedule_expression = #{scheduleExpression},
retry_count = #{retryCount},
timeout_seconds = #{timeoutSeconds},
max_records = #{maxRecords},
sort_field = #{sortField},
last_execution_id = #{lastExecutionId},
updated_at = #{updatedAt},
updated_by = #{updatedBy}
WHERE id = #{id}
</update>
<!-- Delete by ID -->
<delete id="deleteById" parameterType="java.lang.String">
DELETE FROM t_dc_collection_tasks WHERE id = #{id}
</delete>
<!-- Select by ID -->
<select id="selectById" parameterType="java.lang.String" resultMap="CollectionTaskResultMap">
SELECT <include refid="Base_Column_List"/> FROM t_dc_collection_tasks WHERE id = #{id}
</select>
<!-- Select by Name -->
<select id="selectByName" parameterType="java.lang.String" resultMap="CollectionTaskResultMap">
SELECT <include refid="Base_Column_List"/> FROM t_dc_collection_tasks WHERE name = #{name}
</select>
<!-- Select by Status -->
<select id="selectByStatus" parameterType="java.lang.String" resultMap="CollectionTaskResultMap">
SELECT <include refid="Base_Column_List"/> FROM t_dc_collection_tasks WHERE status = #{status} ORDER BY created_at DESC
</select>
<!-- Select All with Pagination -->
<select id="selectAll" resultMap="CollectionTaskResultMap">
SELECT <include refid="Base_Column_List"/> FROM t_dc_collection_tasks
<where>
<if test="status != null and status != ''">
AND status = #{status}
</if>
<if test="name != null and name != ''">
AND name LIKE CONCAT('%', #{name}, '%')
</if>
</where>
ORDER BY created_at DESC
<if test="offset != null and limit != null">
LIMIT #{offset}, #{limit}
</if>
</select>
<!-- Count Total -->
<select id="count" resultType="java.lang.Long">
SELECT COUNT(*) FROM t_dc_collection_tasks
<where>
<if test="status != null and status != ''">
AND status = #{status}
</if>
<if test="name != null and name != ''">
AND name LIKE CONCAT('%', #{name}, '%')
</if>
<if test="sourceDataSourceId != null and sourceDataSourceId != ''">
AND source_datasource_id = #{sourceDataSourceId}
</if>
<if test="targetDataSourceId != null and targetDataSourceId != ''">
AND target_datasource_id = #{targetDataSourceId}
</if>
</where>
</select>
<!-- Update Status -->
<update id="updateStatus">
UPDATE t_dc_collection_tasks SET status = #{status}, updated_at = NOW() WHERE id = #{id}
@@ -154,35 +48,4 @@
AND schedule_expression IS NOT NULL
ORDER BY created_at DESC
</select>
<!-- 查询模板列表 -->
<select id="selectList" resultMap="DataxTemplateResultMap">
SELECT <include refid="Template_Column_List"/> FROM t_dc_datax_templates
<where>
<if test="sourceType != null and sourceType != ''">
AND source_type = #{sourceType}
</if>
<if test="targetType != null and targetType != ''">
AND target_type = #{targetType}
</if>
</where>
ORDER BY is_system DESC, created_at DESC
<if test="limit > 0">
LIMIT #{offset}, #{limit}
</if>
</select>
<!-- 统计模板数量 -->
<select id="countTemplates" resultType="java.lang.Integer">
SELECT COUNT(1) FROM t_dc_datax_templates
<where>
<if test="sourceType != null and sourceType != ''">
AND source_type = #{sourceType}
</if>
<if test="targetType != null and targetType != ''">
AND target_type = #{targetType}
</if>
</where>
</select>
</mapper>

View File

@@ -1,191 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.datamate.collection.infrastructure.persistence.mapper.TaskExecutionMapper">
<!-- Result Map -->
<resultMap id="TaskExecutionResultMap" type="com.datamate.collection.domain.model.TaskExecution">
<id property="id" column="id"/>
<result property="taskId" column="task_id"/>
<result property="taskName" column="task_name"/>
<result property="status" column="status" typeHandler="org.apache.ibatis.type.EnumTypeHandler"/>
<result property="progress" column="progress"/>
<result property="recordsTotal" column="records_total"/>
<result property="recordsProcessed" column="records_processed"/>
<result property="recordsSuccess" column="records_success"/>
<result property="recordsFailed" column="records_failed"/>
<result property="throughput" column="throughput"/>
<result property="dataSizeBytes" column="data_size_bytes"/>
<result property="startedAt" column="started_at"/>
<result property="completedAt" column="completed_at"/>
<result property="durationSeconds" column="duration_seconds"/>
<result property="errorMessage" column="error_message"/>
<result property="dataxJobId" column="datax_job_id"/>
<result property="config" column="config"/>
<result property="result" column="result"/>
<result property="createdAt" column="created_at"/>
</resultMap>
<!-- Base Column List -->
<sql id="Base_Column_List">
id, task_id, task_name, status, progress, records_total, records_processed,
records_success, records_failed, throughput, data_size_bytes, started_at,
completed_at, duration_seconds, error_message, datax_job_id, config, result, created_at
</sql>
<!-- Insert -->
<insert id="insert" parameterType="com.datamate.collection.domain.model.TaskExecution">
INSERT INTO t_dc_task_executions (
id, task_id, task_name, status, progress, records_total, records_processed,
records_success, records_failed, throughput, data_size_bytes, started_at,
completed_at, duration_seconds, error_message, datax_job_id, config, result, created_at
) VALUES (
#{id}, #{taskId}, #{taskName}, #{status}, #{progress}, #{recordsTotal}, #{recordsProcessed},
#{recordsSuccess}, #{recordsFailed}, #{throughput}, #{dataSizeBytes}, #{startedAt},
#{completedAt}, #{durationSeconds}, #{errorMessage}, #{dataxJobId}, #{config}, #{result}, #{createdAt}
)
</insert>
<!-- Update -->
<update id="update" parameterType="com.datamate.collection.domain.model.TaskExecution">
UPDATE t_dc_task_executions
SET status = #{status},
progress = #{progress},
records_total = #{recordsTotal},
records_processed = #{recordsProcessed},
records_success = #{recordsSuccess},
records_failed = #{recordsFailed},
throughput = #{throughput},
data_size_bytes = #{dataSizeBytes},
completed_at = #{completedAt},
duration_seconds = #{durationSeconds},
error_message = #{errorMessage},
result = #{result}
WHERE id = #{id}
</update>
<!-- Delete by ID -->
<delete id="deleteById" parameterType="java.lang.String">
DELETE FROM t_dc_task_executions WHERE id = #{id}
</delete>
<!-- Select by ID -->
<select id="selectById" parameterType="java.lang.String" resultMap="TaskExecutionResultMap">
SELECT <include refid="Base_Column_List"/>
FROM t_dc_task_executions
WHERE id = #{id}
</select>
<!-- Select by Task ID -->
<select id="selectByTaskId" resultMap="TaskExecutionResultMap">
SELECT <include refid="Base_Column_List"/>
FROM t_dc_task_executions
WHERE task_id = #{taskId}
ORDER BY started_at DESC
<if test="limit != null">
LIMIT #{limit}
</if>
</select>
<!-- Select by Status -->
<select id="selectByStatus" parameterType="java.lang.String" resultMap="TaskExecutionResultMap">
SELECT <include refid="Base_Column_List"/>
FROM t_dc_task_executions
WHERE status = #{status}
ORDER BY started_at DESC
</select>
<!-- Select All with Pagination -->
<select id="selectAll" resultMap="TaskExecutionResultMap">
SELECT <include refid="Base_Column_List"/>
FROM t_dc_task_executions
<where>
<if test="taskId != null and taskId != ''">
AND task_id = #{taskId}
</if>
<if test="status != null and status != ''">
AND status = #{status}
</if>
<if test="startDate != null">
AND started_at >= #{startDate}
</if>
<if test="endDate != null">
AND started_at &lt;= #{endDate}
</if>
</where>
ORDER BY started_at DESC
<if test="offset != null and limit != null">
LIMIT #{offset}, #{limit}
</if>
</select>
<!-- Count Total -->
<select id="count" resultType="java.lang.Long">
SELECT COUNT(*)
FROM t_dc_task_executions
<where>
<if test="taskId != null and taskId != ''">
AND task_id = #{taskId}
</if>
<if test="status != null and status != ''">
AND status = #{status}
</if>
<if test="startDate != null">
AND started_at >= #{startDate}
</if>
<if test="endDate != null">
AND started_at &lt;= #{endDate}
</if>
</where>
</select>
<!-- Update Status and Progress -->
<update id="updateProgress">
UPDATE t_dc_task_executions
SET status = #{status},
progress = #{progress},
records_processed = #{recordsProcessed},
throughput = #{throughput}
WHERE id = #{id}
</update>
<!-- Complete Execution -->
<update id="completeExecution">
UPDATE t_dc_task_executions
SET status = #{status},
progress = 100.00,
completed_at = #{completedAt},
duration_seconds = #{durationSeconds},
records_success = #{recordsSuccess},
records_failed = #{recordsFailed},
data_size_bytes = #{dataSizeBytes},
error_message = #{errorMessage},
result = #{result}
WHERE id = #{id}
</update>
<!-- Select Running Executions -->
<select id="selectRunningExecutions" resultMap="TaskExecutionResultMap">
SELECT <include refid="Base_Column_List"/>
FROM t_dc_task_executions
WHERE status = 'RUNNING'
ORDER BY started_at ASC
</select>
<!-- Select Latest Execution by Task -->
<select id="selectLatestByTaskId" parameterType="java.lang.String" resultMap="TaskExecutionResultMap">
SELECT <include refid="Base_Column_List"/>
FROM t_dc_task_executions
<select id="selectLatestByTaskId" resultType="com.datamate.collection.domain.model.entity.TaskExecution">
SELECT * FROM t_dc_task_executions
WHERE task_id = #{taskId}
ORDER BY started_at DESC
LIMIT 1
</select>
<!-- Delete Old Executions -->
<delete id="deleteOldExecutions">
DELETE FROM t_dc_task_executions
WHERE started_at &lt; #{beforeDate}
</delete>
<!-- Complete Execution -->
<update id="completeExecution">
UPDATE t_dc_task_executions
SET status = #{status},
completed_at = #{completedAt},
records_processed = #{recordsProcessed},
records_total = #{recordsTotal},
records_success = #{recordsSuccess},
records_failed = #{recordsFailed},
error_message = #{errorMessage},
updated_at = NOW()
WHERE id = #{executionId}
</update>
</mapper>

View File

@@ -2,6 +2,7 @@ package com.datamate.datamanagement.application;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.datamate.common.domain.utils.ChunksSaver;
import com.datamate.datamanagement.interfaces.dto.*;
import com.datamate.common.infrastructure.exception.BusinessAssert;
import com.datamate.common.interfaces.PagedResponse;
@@ -100,8 +101,13 @@ public class DatasetApplicationService {
/**
* 删除数据集
*/
@Transactional
public void deleteDataset(String datasetId) {
Dataset dataset = datasetRepository.getById(datasetId);
datasetRepository.removeById(datasetId);
if (dataset != null) {
ChunksSaver.deleteFolder(dataset.getPath());
}
}
/**

View File

@@ -24,7 +24,6 @@ public interface DatasetFileMapper extends BaseMapper<DatasetFile> {
@Param("status") String status,
RowBounds rowBounds);
int insert(DatasetFile file);
int update(DatasetFile file);
int deleteById(@Param("id") String id);
}

View File

@@ -60,7 +60,7 @@ public class FileService {
boolean isFinish = Objects.equals(preRequest.getUploadedFileNum(), preRequest.getTotalFileNum());
if (isFinish) {
// 删除存分片的临时路径
ChunksSaver.deleteFiles(new File(preRequest.getUploadPath(),
ChunksSaver.deleteFolder(new File(preRequest.getUploadPath(),
String.format(ChunksSaver.TEMP_DIR_NAME_FORMAT, preRequest.getId())).getPath());
chunkUploadRequestMapper.deleteById(preRequest.getId());
}

View File

@@ -2,6 +2,8 @@ package com.datamate.common.domain.utils;
import com.datamate.common.domain.model.ChunkUploadPreRequest;
import com.datamate.common.domain.model.ChunkUploadRequest;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.SystemErrorCode;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.springframework.web.multipart.MultipartFile;
@@ -106,29 +108,17 @@ public class ChunksSaver {
*
* @param uploadPath 文件路径
*/
public static void deleteFiles(String uploadPath) {
File dic = new File(uploadPath);
if (!dic.exists()) {
return;
}
File[] files = dic.listFiles();
if (files == null || files.length == 0) {
dic.delete();
public static void deleteFolder(String uploadPath) {
File folder = new File(uploadPath);
if (!folder.exists()) {
log.info("folder {} does not exist", uploadPath);
return;
}
try {
for (File file : files) {
if (file.isDirectory()) {
deleteFiles(file.getPath());
} else {
file.delete();
}
}
if (dic.exists()) {
dic.delete();
}
} catch (SecurityException e) {
log.warn("Fail to delete file", e);
FileUtils.deleteDirectory(folder);
} catch (IOException e) {
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR);
}
}
}