Merge branch 'ModelEngine-Group:main' into main

This commit is contained in:
chenghh-9609
2025-10-24 09:41:06 +08:00
committed by GitHub
99 changed files with 3861 additions and 1955 deletions

View File

@@ -18,7 +18,7 @@ create-namespace:
install-%: install-%:
ifeq ($(origin INSTALLER), undefined) ifeq ($(origin INSTALLER), undefined)
@echo "Choose a deployment method:" @echo "Choose a deployment method:"
@echo "1. Docker" @echo "1. Docker/Docker-Compose"
@echo "2. Kubernetes/Helm" @echo "2. Kubernetes/Helm"
@echo -n "Enter choice: " @echo -n "Enter choice: "
@read choice; \ @read choice; \
@@ -39,7 +39,7 @@ install: install-datamate
uninstall-%: uninstall-%:
ifeq ($(origin INSTALLER), undefined) ifeq ($(origin INSTALLER), undefined)
@echo "Choose a deployment method:" @echo "Choose a deployment method:"
@echo "1. Docker" @echo "1. Docker/Docker-Compose"
@echo "2. Kubernetes/Helm" @echo "2. Kubernetes/Helm"
@echo -n "Enter choice: " @echo -n "Enter choice: "
@read choice; \ @read choice; \
@@ -97,52 +97,6 @@ runtime-docker-install:
runtime-docker-uninstall: runtime-docker-uninstall:
cd deployment/docker/datamate && docker-compose down runtime cd deployment/docker/datamate && docker-compose down runtime
.PHONY: runtime-k8s-install
runtime-k8s-install: create-namespace
helm upgrade datamate-kuberay-operator deployment/helm/ray/kuberay-operator --install -n $(NAMESPACE)
helm upgrade datamate-raycluster deployment/helm/ray/ray-cluster/ --install -n $(NAMESPACE)
kubectl apply -f deployment/helm/ray/service.yaml -n $(NAMESPACE)
.PHONY: runtime-k8s-uninstall
runtime-k8s-uninstall:
helm uninstall datamate-raycluster -n $(NAMESPACE)
helm uninstall datamate-kuberay-operator -n $(NAMESPACE)
kubectl delete -f deployment/helm/ray/service.yaml -n $(NAMESPACE)
.PHONY: mysql-k8s-install
mysql-k8s-install: create-namespace
kubectl create configmap datamate-init-sql --from-file=scripts/db/ --dry-run=client -o yaml | kubectl apply -f - -n $(NAMESPACE)
kubectl apply -f deployment/kubernetes/mysql/configmap.yaml -n $(NAMESPACE)
kubectl apply -f deployment/kubernetes/mysql/deploy.yaml -n $(NAMESPACE)
.PHONY: mysql-k8s-uninstall
mysql-k8s-uninstall:
kubectl delete configmap datamate-init-sql -n $(NAMESPACE) --ignore-not-found
kubectl delete -f deployment/kubernetes/mysql/configmap.yaml -n $(NAMESPACE) --ignore-not-found
kubectl delete -f deployment/kubernetes/mysql/deploy.yaml -n $(NAMESPACE) --ignore-not-found
.PHONY: database-k8s-install
database-k8s-install: mysql-k8s-install
.PHONY: database-k8s-uninstall
database-k8s-uninstall: mysql-k8s-uninstall
.PHONY: backend-k8s-install
backend-k8s-install: create-namespace
kubectl apply -f deployment/kubernetes/backend/deploy.yaml -n $(NAMESPACE)
.PHONY: backend-k8s-uninstall
backend-k8s-uninstall:
kubectl delete -f deployment/kubernetes/backend/deploy.yaml -n $(NAMESPACE) --ignore-not-found
.PHONY: frontend-k8s-install
frontend-k8s-install: create-namespace
kubectl apply -f deployment/kubernetes/frontend/deploy.yaml -n $(NAMESPACE)
.PHONY: frontend-k8s-uninstall
frontend-k8s-uninstall:
kubectl delete -f deployment/kubernetes/frontend/deploy.yaml -n $(NAMESPACE) --ignore-not-found
.PHONY: datamate-docker-install .PHONY: datamate-docker-install
datamate-docker-install: datamate-docker-install:
cd deployment/docker/datamate && docker-compose up -d cd deployment/docker/datamate && docker-compose up -d
@@ -152,7 +106,11 @@ datamate-docker-uninstall:
cd deployment/docker/datamate && docker-compose down cd deployment/docker/datamate && docker-compose down
.PHONY: datamate-k8s-install .PHONY: datamate-k8s-install
datamate-k8s-install: create-namespace database-k8s-install backend-k8s-install frontend-k8s-install runtime-k8s-install datamate-k8s-install: create-namespace
kubectl create configmap datamate-init-sql --from-file=scripts/db/ --dry-run=client -o yaml | kubectl apply -f - -n $(NAMESPACE)
helm install datamate deployment/helm/datamate/ -n $(NAMESPACE)
.PHONY: datamate-k8s-uninstall .PHONY: datamate-k8s-uninstall
datamate-k8s-uninstall: database-k8s-uninstall backend-k8s-uninstall frontend-k8s-uninstall runtime-k8s-uninstall datamate-k8s-uninstall:
helm uninstall datamate -n $(NAMESPACE) --ignore-not-found
kubectl delete configmap datamate-init-sql -n $(NAMESPACE) --ignore-not-found

View File

@@ -38,6 +38,7 @@
```bash ```bash
git clone git@github.com:ModelEngine-Group/DataMate.git git clone git@github.com:ModelEngine-Group/DataMate.git
cd DataMate
``` ```
### 镜像构建 ### 镜像构建

View File

@@ -41,6 +41,7 @@ If you like this project, please give it a Star⭐️!
```bash ```bash
git clone git@github.com:ModelEngine-Group/DataMate.git git clone git@github.com:ModelEngine-Group/DataMate.git
cd DataMate
``` ```
### Build Images ### Build Images

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

Binary file not shown.

Before

Width:  |  Height:  |  Size: 134 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 48 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 91 KiB

View File

@@ -127,35 +127,6 @@
<build> <build>
<plugins> <plugins>
<!-- OpenAPI Generator Plugin -->
<plugin>
<groupId>org.openapitools</groupId>
<artifactId>openapi-generator-maven-plugin</artifactId>
<version>6.6.0</version>
<executions>
<execution>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<inputSpec>${project.basedir}/../../openapi/specs/data-collection.yaml</inputSpec>
<generatorName>spring</generatorName>
<output>${project.build.directory}/generated-sources/openapi</output>
<apiPackage>com.datamate.collection.interfaces.api</apiPackage>
<modelPackage>com.datamate.collection.interfaces.dto</modelPackage>
<configOptions>
<interfaceOnly>true</interfaceOnly>
<useTags>true</useTags>
<useSpringBoot3>true</useSpringBoot3>
<documentationProvider>springdoc</documentationProvider>
<dateLibrary>java8-localdatetime</dateLibrary>
<java8>true</java8>
</configOptions>
</configuration>
</execution>
</executions>
</plugin>
<plugin> <plugin>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId> <artifactId>spring-boot-maven-plugin</artifactId>

View File

@@ -0,0 +1,79 @@
package com.datamate.collection.application;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.datamate.collection.domain.model.entity.CollectionTask;
import com.datamate.collection.domain.model.entity.TaskExecution;
import com.datamate.collection.common.enums.TaskStatus;
import com.datamate.collection.domain.repository.CollectionTaskRepository;
import com.datamate.collection.interfaces.dto.CollectionTaskPagingQuery;
import com.datamate.collection.common.enums.SyncMode;
import com.datamate.common.domain.utils.ChunksSaver;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
@Slf4j
@Service
@RequiredArgsConstructor
public class CollectionTaskService {
private final TaskExecutionService taskExecutionService;
private final CollectionTaskRepository collectionTaskRepository;
@Transactional
public CollectionTask create(CollectionTask task) {
task.setStatus(TaskStatus.READY);
task.setCreatedAt(LocalDateTime.now());
task.setUpdatedAt(LocalDateTime.now());
collectionTaskRepository.save(task);
executeTaskNow(task);
return task;
}
private void executeTaskNow(CollectionTask task) {
if (Objects.equals(task.getSyncMode(), SyncMode.ONCE)) {
TaskExecution exec = taskExecutionService.createExecution(task);
int timeout = task.getTimeoutSeconds() == null ? 3600 : task.getTimeoutSeconds();
taskExecutionService.runAsync(task, exec.getId(), timeout);
log.info("Triggered DataX execution for task {} at {}, execId={}", task.getId(), LocalDateTime.now(), exec.getId());
}
}
@Transactional
public CollectionTask update(CollectionTask task) {
task.setUpdatedAt(LocalDateTime.now());
collectionTaskRepository.updateById(task);
return task;
}
@Transactional
public void delete(String id) {
CollectionTask task = collectionTaskRepository.getById(id);
if (task != null) {
ChunksSaver.deleteFolder("/dataset/local/" + task.getId());
}
collectionTaskRepository.removeById(id);
}
public CollectionTask get(String id) {
return collectionTaskRepository.getById(id);
}
public IPage<CollectionTask> getTasks(CollectionTaskPagingQuery query) {
LambdaQueryWrapper<CollectionTask> wrapper = new LambdaQueryWrapper<CollectionTask>()
.eq(query.getStatus() != null, CollectionTask::getStatus, query.getStatus())
.like(StringUtils.isNotBlank(query.getName()), CollectionTask::getName, query.getName());
return collectionTaskRepository.page(new Page<>(query.getPage(), query.getSize()), wrapper);
}
public List<CollectionTask> selectActiveTasks() {
return collectionTaskRepository.selectActiveTasks();
}
}

View File

@@ -0,0 +1,57 @@
package com.datamate.collection.application;
import com.datamate.collection.domain.model.entity.CollectionTask;
import com.datamate.collection.domain.model.entity.TaskExecution;
import com.datamate.collection.common.enums.TaskStatus;
import com.datamate.collection.domain.process.ProcessRunner;
import com.datamate.collection.domain.repository.CollectionTaskRepository;
import com.datamate.collection.domain.repository.TaskExecutionRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
@Slf4j
@Service
@RequiredArgsConstructor
public class TaskExecutionService {
private final ProcessRunner processRunner;
private final TaskExecutionRepository executionRepository;
private final CollectionTaskRepository collectionTaskRepository;
@Transactional
public TaskExecution createExecution(CollectionTask task) {
TaskExecution exec = TaskExecution.initTaskExecution();
exec.setTaskId(task.getId());
exec.setTaskName(task.getName());
executionRepository.save(exec);
collectionTaskRepository.updateLastExecution(task.getId(), exec.getId());
collectionTaskRepository.updateStatus(task.getId(), TaskStatus.RUNNING.name());
return exec;
}
public TaskExecution selectLatestByTaskId(String taskId) {
return executionRepository.selectLatestByTaskId(taskId);
}
@Async
public void runAsync(CollectionTask task, String executionId, int timeoutSeconds) {
try {
int code = processRunner.runJob(task, executionId, timeoutSeconds);
log.info("DataX finished with code {} for execution {}", code, executionId);
// 简化:成功即完成
executionRepository.completeExecution(executionId, TaskStatus.SUCCESS.name(), LocalDateTime.now(),
0, 0L, 0L, 0L, null);
collectionTaskRepository.updateStatus(task.getId(), TaskStatus.SUCCESS.name());
} catch (Exception e) {
log.error("DataX execution failed", e);
executionRepository.completeExecution(executionId, TaskStatus.FAILED.name(), LocalDateTime.now(),
0, 0L, 0L, 0L, e.getMessage());
collectionTaskRepository.updateStatus(task.getId(), TaskStatus.FAILED.name());
}
}
}

View File

@@ -1,85 +0,0 @@
package com.datamate.collection.application.service;
import com.datamate.collection.domain.model.CollectionTask;
import com.datamate.collection.domain.model.TaskExecution;
import com.datamate.collection.domain.model.TaskStatus;
import com.datamate.collection.domain.model.DataxTemplate;
import com.datamate.collection.infrastructure.persistence.mapper.CollectionTaskMapper;
import com.datamate.collection.infrastructure.persistence.mapper.TaskExecutionMapper;
import com.datamate.collection.interfaces.dto.SyncMode;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@Slf4j
@Service
@RequiredArgsConstructor
public class CollectionTaskService {
private final CollectionTaskMapper taskMapper;
private final TaskExecutionMapper executionMapper;
private final DataxExecutionService dataxExecutionService;
@Transactional
public CollectionTask create(CollectionTask task) {
task.setStatus(TaskStatus.READY);
task.setCreatedAt(LocalDateTime.now());
task.setUpdatedAt(LocalDateTime.now());
taskMapper.insert(task);
executeTaskNow(task);
return task;
}
private void executeTaskNow(CollectionTask task) {
if (Objects.equals(task.getSyncMode(), SyncMode.ONCE.getValue())) {
TaskExecution exec = dataxExecutionService.createExecution(task);
int timeout = task.getTimeoutSeconds() == null ? 3600 : task.getTimeoutSeconds();
dataxExecutionService.runAsync(task, exec.getId(), timeout);
log.info("Triggered DataX execution for task {} at {}, execId={}", task.getId(), LocalDateTime.now(), exec.getId());
}
}
@Transactional
public CollectionTask update(CollectionTask task) {
task.setUpdatedAt(LocalDateTime.now());
taskMapper.update(task);
return task;
}
@Transactional
public void delete(String id) { taskMapper.deleteById(id); }
public CollectionTask get(String id) { return taskMapper.selectById(id); }
public List<CollectionTask> list(Integer page, Integer size, String status, String name) {
Map<String, Object> p = new HashMap<>();
p.put("status", status);
p.put("name", name);
if (page != null && size != null) {
p.put("offset", page * size);
p.put("limit", size);
}
return taskMapper.selectAll(p);
}
@Transactional
public TaskExecution startExecution(CollectionTask task) {
return dataxExecutionService.createExecution(task);
}
// ---- Template related merged methods ----
public List<DataxTemplate> listTemplates(String sourceType, String targetType, int page, int size) {
int offset = page * size;
return taskMapper.selectList(sourceType, targetType, offset, size);
}
public int countTemplates(String sourceType, String targetType) {
return taskMapper.countTemplates(sourceType, targetType);
}
}

View File

@@ -1,60 +0,0 @@
package com.datamate.collection.application.service;
import com.datamate.collection.domain.model.CollectionTask;
import com.datamate.collection.domain.model.TaskExecution;
import com.datamate.collection.domain.model.TaskStatus;
import com.datamate.collection.infrastructure.persistence.mapper.CollectionTaskMapper;
import com.datamate.collection.infrastructure.persistence.mapper.TaskExecutionMapper;
import com.datamate.collection.infrastructure.runtime.datax.DataxJobBuilder;
import com.datamate.collection.infrastructure.runtime.datax.DataxProcessRunner;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.nio.file.Path;
import java.time.Duration;
import java.time.LocalDateTime;
@Slf4j
@Service
@RequiredArgsConstructor
public class DataxExecutionService {
private final DataxJobBuilder jobBuilder;
private final DataxProcessRunner processRunner;
private final TaskExecutionMapper executionMapper;
private final CollectionTaskMapper taskMapper;
@Transactional
public TaskExecution createExecution(CollectionTask task) {
TaskExecution exec = TaskExecution.initTaskExecution();
exec.setTaskId(task.getId());
exec.setTaskName(task.getName());
executionMapper.insert(exec);
taskMapper.updateLastExecution(task.getId(), exec.getId());
taskMapper.updateStatus(task.getId(), TaskStatus.RUNNING.name());
return exec;
}
@Async
public void runAsync(CollectionTask task, String executionId, int timeoutSeconds) {
try {
Path job = jobBuilder.buildJobFile(task);
int code = processRunner.runJob(job.toFile(), executionId, Duration.ofSeconds(timeoutSeconds));
log.info("DataX finished with code {} for execution {}", code, executionId);
// 简化:成功即完成
executionMapper.completeExecution(executionId, TaskStatus.SUCCESS.name(), LocalDateTime.now(),
0, 0L, 0L, 0L, null, null);
taskMapper.updateStatus(task.getId(), TaskStatus.SUCCESS.name());
} catch (Exception e) {
log.error("DataX execution failed", e);
executionMapper.completeExecution(executionId, TaskStatus.FAILED.name(), LocalDateTime.now(),
0, 0L, 0L, 0L, e.getMessage(), null);
taskMapper.updateStatus(task.getId(), TaskStatus.FAILED.name());
}
}
}

View File

@@ -1,83 +0,0 @@
package com.datamate.collection.application.service;
import com.datamate.collection.domain.model.CollectionTask;
import com.datamate.collection.domain.model.TaskExecution;
import com.datamate.collection.domain.model.TaskStatus;
import com.datamate.collection.infrastructure.persistence.mapper.CollectionTaskMapper;
import com.datamate.collection.infrastructure.persistence.mapper.TaskExecutionMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
@RequiredArgsConstructor
public class TaskExecutionService {
private final TaskExecutionMapper executionMapper;
private final CollectionTaskMapper taskMapper;
public List<TaskExecution> list(String taskId, String status, LocalDateTime startDate,
LocalDateTime endDate, Integer page, Integer size) {
Map<String, Object> p = new HashMap<>();
p.put("taskId", taskId);
p.put("status", status);
p.put("startDate", startDate);
p.put("endDate", endDate);
if (page != null && size != null) {
p.put("offset", page * size);
p.put("limit", size);
}
return executionMapper.selectAll(p);
}
public long count(String taskId, String status, LocalDateTime startDate, LocalDateTime endDate) {
Map<String, Object> p = new HashMap<>();
p.put("taskId", taskId);
p.put("status", status);
p.put("startDate", startDate);
p.put("endDate", endDate);
return executionMapper.count(p);
}
// --- Added convenience methods ---
public TaskExecution get(String id) { return executionMapper.selectById(id); }
public TaskExecution getLatestByTaskId(String taskId) { return executionMapper.selectLatestByTaskId(taskId); }
@Transactional
public void complete(String executionId, boolean success, long successCount, long failedCount,
long dataSizeBytes, String errorMessage, String resultJson) {
LocalDateTime now = LocalDateTime.now();
TaskExecution exec = executionMapper.selectById(executionId);
if (exec == null) { return; }
int duration = (int) Duration.between(exec.getStartedAt(), now).getSeconds();
executionMapper.completeExecution(executionId, success ? TaskStatus.SUCCESS.name() : TaskStatus.FAILED.name(),
now, duration, successCount, failedCount, dataSizeBytes, errorMessage, resultJson);
CollectionTask task = taskMapper.selectById(exec.getTaskId());
if (task != null) {
taskMapper.updateStatus(task.getId(), success ? TaskStatus.SUCCESS.name() : TaskStatus.FAILED.name());
}
}
@Transactional
public void stop(String executionId) {
TaskExecution exec = executionMapper.selectById(executionId);
if (exec == null || exec.getStatus() != TaskStatus.RUNNING) { return; }
LocalDateTime now = LocalDateTime.now();
int duration = (int) Duration.between(exec.getStartedAt(), now).getSeconds();
// Reuse completeExecution to persist STOPPED status and timing info
executionMapper.completeExecution(exec.getId(), TaskStatus.STOPPED.name(), now, duration,
exec.getRecordsSuccess(), exec.getRecordsFailed(), exec.getDataSizeBytes(), null, exec.getResult());
taskMapper.updateStatus(exec.getTaskId(), TaskStatus.STOPPED.name());
}
@Transactional
public void stopLatestByTaskId(String taskId) {
TaskExecution latest = executionMapper.selectLatestByTaskId(taskId);
if (latest != null) { stop(latest.getId()); }
}
}

View File

@@ -0,0 +1,12 @@
package com.datamate.collection.common.enums;
/**
* 同步方式:一次性(ONCE) 或 定时(SCHEDULED)
*/
public enum SyncMode {
/** 一次性(ONCE) */
ONCE,
/// 定时(SCHEDULED)
SCHEDULED;
}

View File

@@ -1,7 +1,8 @@
package com.datamate.collection.domain.model; package com.datamate.collection.common.enums;
/** /**
* 统一的任务和执行状态枚举 * 统一的任务和执行状态枚举
* 任务和执行状态枚举: - DRAFT: 草稿状态 - READY: 就绪状态 - RUNNING: 运行中 - SUCCESS: 执行成功 (对应原来的COMPLETED/SUCCESS) - FAILED: 执行失败 - STOPPED: 已停止
* *
* @author Data Mate Platform Team * @author Data Mate Platform Team
*/ */

View File

@@ -1,32 +1,36 @@
package com.datamate.collection.domain.model; package com.datamate.collection.domain.model.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import com.datamate.collection.common.enums.SyncMode;
import com.datamate.collection.common.enums.TaskStatus;
import com.datamate.common.domain.model.base.BaseEntity;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data; import lombok.Getter;
import lombok.Setter;
import java.time.LocalDateTime;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
@Data /**
public class CollectionTask { * 数据采集任务实体与数据库表 t_dc_collection_tasks 对齐
private String id; */
@Getter
@Setter
@TableName(value = "t_dc_collection_tasks", autoResultMap = true)
public class CollectionTask extends BaseEntity<String> {
private String name; private String name;
private String description; private String description;
private String config; // DataX JSON 配置包含源端和目标端配置信息 private String config; // DataX JSON 配置包含源端和目标端配置信息
private TaskStatus status; private TaskStatus status;
private String syncMode; // ONCE / SCHEDULED private SyncMode syncMode; // ONCE / SCHEDULED
private String scheduleExpression; private String scheduleExpression;
private Integer retryCount; private Integer retryCount;
private Integer timeoutSeconds; private Integer timeoutSeconds;
private Long maxRecords; private Long maxRecords;
private String sortField; private String sortField;
private String lastExecutionId; private String lastExecutionId;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
private String createdBy;
private String updatedBy;
public void addPath() { public void addPath() {
try { try {

View File

@@ -1,4 +1,4 @@
package com.datamate.collection.domain.model; package com.datamate.collection.domain.model.entity;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;

View File

@@ -1,13 +1,19 @@
package com.datamate.collection.domain.model; package com.datamate.collection.domain.model.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import com.datamate.collection.common.enums.TaskStatus;
import com.datamate.common.domain.model.base.BaseEntity;
import lombok.Data; import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.UUID; import java.util.UUID;
@Data @Getter
public class TaskExecution { @Setter
private String id; @TableName(value = "t_dc_task_executions", autoResultMap = true)
public class TaskExecution extends BaseEntity<String> {
private String taskId; private String taskId;
private String taskName; private String taskName;
private TaskStatus status; private TaskStatus status;
@@ -25,7 +31,6 @@ public class TaskExecution {
private String dataxJobId; private String dataxJobId;
private String config; private String config;
private String result; private String result;
private LocalDateTime createdAt;
public static TaskExecution initTaskExecution() { public static TaskExecution initTaskExecution() {
TaskExecution exec = new TaskExecution(); TaskExecution exec = new TaskExecution();

View File

@@ -0,0 +1,21 @@
package com.datamate.collection.domain.process;
import com.datamate.collection.domain.model.entity.CollectionTask;
/**
* 归集执行器接口
*
* @since 2025/10/23
*/
public interface ProcessRunner {
/**
* 执行归集任务
*
* @param task 任务
* @param executionId 执行ID
* @param timeoutSeconds 超时时间(秒)
* @return 执行结果
* @throws Exception 执行异常
*/
int runJob(CollectionTask task, String executionId, int timeoutSeconds) throws Exception;
}

View File

@@ -0,0 +1,19 @@
package com.datamate.collection.domain.repository;
import com.baomidou.mybatisplus.extension.repository.IRepository;
import com.datamate.collection.domain.model.entity.CollectionTask;
import java.util.List;
/**
* 归集任务仓储层
*
* @since 2025/10/23
*/
public interface CollectionTaskRepository extends IRepository<CollectionTask> {
List<CollectionTask> selectActiveTasks();
void updateStatus(String id, String status);
void updateLastExecution(String id, String lastExecutionId);
}

View File

@@ -0,0 +1,19 @@
package com.datamate.collection.domain.repository;
import com.baomidou.mybatisplus.extension.service.IService;
import com.datamate.collection.domain.model.entity.TaskExecution;
import java.time.LocalDateTime;
/**
* TaskExecutionRepository
*
* @since 2025/10/23
*/
public interface TaskExecutionRepository extends IService<TaskExecution> {
TaskExecution selectLatestByTaskId(String taskId);
void completeExecution(String executionId, String status, LocalDateTime completedAt,
Integer recordsProcessed, Long recordsTotal,
Long recordsSuccess, Long recordsFailed, String errorMessage);
}

View File

@@ -0,0 +1,124 @@
package com.datamate.collection.infrastructure.datax;
import com.datamate.collection.domain.model.entity.CollectionTask;
import com.datamate.collection.domain.process.ProcessRunner;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.SystemErrorCode;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.exec.*;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Component
@RequiredArgsConstructor
public class DataxProcessRunner implements ProcessRunner {
private final DataxProperties props;
@Override
public int runJob(CollectionTask task, String executionId, int timeoutSeconds) throws Exception {
Path job = buildJobFile(task);
return runJob(job.toFile(), executionId, Duration.ofSeconds(timeoutSeconds));
}
private int runJob(File jobFile, String executionId, Duration timeout) throws Exception {
File logFile = new File(props.getLogPath(), String.format("datax-%s.log", executionId));
String python = props.getPythonPath();
String dataxPy = props.getHomePath() + File.separator + "bin" + File.separator + "datax.py";
String cmd = String.format("%s %s %s", python, dataxPy, jobFile.getAbsolutePath());
log.info("Execute DataX: {}", cmd);
CommandLine cl = CommandLine.parse(cmd);
DefaultExecutor executor = getExecutor(timeout, logFile);
return executor.execute(cl);
}
private static DefaultExecutor getExecutor(Duration timeout, File logFile) throws FileNotFoundException {
DefaultExecutor executor = new DefaultExecutor();
// 将日志追加输出到文件
File parent = logFile.getParentFile();
if (!parent.exists()) {
parent.mkdirs();
}
ExecuteStreamHandler streamHandler = new PumpStreamHandler(
new org.apache.commons.io.output.TeeOutputStream(
new java.io.FileOutputStream(logFile, true), System.out),
new org.apache.commons.io.output.TeeOutputStream(
new java.io.FileOutputStream(logFile, true), System.err)
);
executor.setStreamHandler(streamHandler);
ExecuteWatchdog watchdog = new ExecuteWatchdog(timeout.toMillis());
executor.setWatchdog(watchdog);
return executor;
}
private Path buildJobFile(CollectionTask task) throws IOException {
Files.createDirectories(Paths.get(props.getJobConfigPath()));
String fileName = String.format("datax-job-%s.json", task.getId());
Path path = Paths.get(props.getJobConfigPath(), fileName);
// 简化:直接将任务中的 config 字段作为 DataX 作业 JSON
try (FileWriter fw = new FileWriter(path.toFile())) {
if (StringUtils.isBlank(task.getConfig())) {
throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR);
}
String json = getJobConfig(task);
log.info("Job config: {}", json);
fw.write(json);
}
return path;
}
private String getJobConfig(CollectionTask task) {
try {
ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> parameter = objectMapper.readValue(
task.getConfig(),
new TypeReference<>() {
}
);
Map<String, Object> job = new HashMap<>();
Map<String, Object> content = new HashMap<>();
Map<String, Object> reader = new HashMap<>();
reader.put("name", "nfsreader");
reader.put("parameter", parameter);
content.put("reader", reader);
Map<String, Object> writer = new HashMap<>();
writer.put("name", "nfswriter");
writer.put("parameter", parameter);
content.put("writer", writer);
job.put("content", List.of(content));
Map<String, Object> setting = new HashMap<>();
Map<String, Object> channel = new HashMap<>();
channel.put("channel", 2);
setting.put("speed", channel);
job.put("setting", setting);
Map<String, Object> jobConfig = new HashMap<>();
jobConfig.put("job", job);
return objectMapper.writeValueAsString(jobConfig);
} catch (Exception e) {
log.error("Failed to parse task config", e);
throw new RuntimeException("Failed to parse task config", e);
}
}
}

View File

@@ -1,4 +1,4 @@
package com.datamate.collection.infrastructure.runtime.datax; package com.datamate.collection.infrastructure.datax;
import lombok.Data; import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;

View File

@@ -1,47 +1,15 @@
package com.datamate.collection.infrastructure.persistence.mapper; package com.datamate.collection.infrastructure.persistence.mapper;
import com.datamate.collection.domain.model.CollectionTask; import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.datamate.collection.domain.model.DataxTemplate; import com.datamate.collection.domain.model.entity.CollectionTask;
import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Param;
import java.util.List; import java.util.List;
import java.util.Map;
@Mapper @Mapper
public interface CollectionTaskMapper { public interface CollectionTaskMapper extends BaseMapper<CollectionTask> {
int insert(CollectionTask entity);
int update(CollectionTask entity);
int deleteById(@Param("id") String id);
CollectionTask selectById(@Param("id") String id);
CollectionTask selectByName(@Param("name") String name);
List<CollectionTask> selectByStatus(@Param("status") String status);
List<CollectionTask> selectAll(Map<String, Object> params);
int updateStatus(@Param("id") String id, @Param("status") String status); int updateStatus(@Param("id") String id, @Param("status") String status);
int updateLastExecution(@Param("id") String id, @Param("lastExecutionId") String lastExecutionId); int updateLastExecution(@Param("id") String id, @Param("lastExecutionId") String lastExecutionId);
List<CollectionTask> selectActiveTasks(); List<CollectionTask> selectActiveTasks();
/**
* 查询模板列表
*
* @param sourceType 源数据源类型(可选)
* @param targetType 目标数据源类型(可选)
* @param offset 偏移量
* @param limit 限制数量
* @return 模板列表
*/
List<DataxTemplate> selectList(@Param("sourceType") String sourceType,
@Param("targetType") String targetType,
@Param("offset") int offset,
@Param("limit") int limit);
/**
* 统计模板数量
*
* @param sourceType 源数据源类型(可选)
* @param targetType 目标数据源类型(可选)
* @return 模板总数
*/
int countTemplates(@Param("sourceType") String sourceType,
@Param("targetType") String targetType);
} }

View File

@@ -1,38 +1,22 @@
package com.datamate.collection.infrastructure.persistence.mapper; package com.datamate.collection.infrastructure.persistence.mapper;
import com.datamate.collection.domain.model.TaskExecution; import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.datamate.collection.domain.model.entity.TaskExecution;
import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Param;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
@Mapper @Mapper
public interface TaskExecutionMapper { public interface TaskExecutionMapper extends BaseMapper<TaskExecution> {
int insert(TaskExecution entity);
int update(TaskExecution entity);
int deleteById(@Param("id") String id);
TaskExecution selectById(@Param("id") String id);
List<TaskExecution> selectByTaskId(@Param("taskId") String taskId, @Param("limit") Integer limit);
List<TaskExecution> selectByStatus(@Param("status") String status);
List<TaskExecution> selectAll(Map<String, Object> params);
long count(Map<String, Object> params);
int updateProgress(@Param("id") String id,
@Param("status") String status,
@Param("progress") Double progress,
@Param("recordsProcessed") Long recordsProcessed,
@Param("throughput") Double throughput);
int completeExecution(@Param("id") String id,
@Param("status") String status,
@Param("completedAt") LocalDateTime completedAt,
@Param("durationSeconds") Integer durationSeconds,
@Param("recordsSuccess") Long recordsSuccess,
@Param("recordsFailed") Long recordsFailed,
@Param("dataSizeBytes") Long dataSizeBytes,
@Param("errorMessage") String errorMessage,
@Param("result") String result);
List<TaskExecution> selectRunningExecutions();
TaskExecution selectLatestByTaskId(@Param("taskId") String taskId); TaskExecution selectLatestByTaskId(@Param("taskId") String taskId);
int deleteOldExecutions(@Param("beforeDate") LocalDateTime beforeDate);
void completeExecution(@Param("executionId") String executionId,
@Param("status") String status,
@Param("completedAt") LocalDateTime completedAt,
@Param("recordsProcessed") Integer recordsProcessed,
@Param("recordsTotal") Long recordsTotal,
@Param("recordsSuccess") Long recordsSuccess,
@Param("recordsFailed") Long recordsFailed,
@Param("errorMessage") String errorMessage);
} }

View File

@@ -0,0 +1,36 @@
package com.datamate.collection.infrastructure.persistence.repository;
import com.baomidou.mybatisplus.extension.repository.CrudRepository;
import com.datamate.collection.domain.model.entity.CollectionTask;
import com.datamate.collection.domain.repository.CollectionTaskRepository;
import com.datamate.collection.infrastructure.persistence.mapper.CollectionTaskMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Repository;
import java.util.List;
/**
* CollectionTaskRepositoryImpl
*
* @since 2025/10/23
*/
@Repository
@RequiredArgsConstructor
public class CollectionTaskRepositoryImpl extends CrudRepository<CollectionTaskMapper, CollectionTask> implements CollectionTaskRepository {
private final CollectionTaskMapper collectionTaskMapper;
@Override
public List<CollectionTask> selectActiveTasks() {
return collectionTaskMapper.selectActiveTasks();
}
@Override
public void updateStatus(String id, String status) {
collectionTaskMapper.updateStatus(id, status);
}
@Override
public void updateLastExecution(String id, String lastExecutionId) {
collectionTaskMapper.updateLastExecution(id, lastExecutionId);
}
}

View File

@@ -0,0 +1,37 @@
package com.datamate.collection.infrastructure.persistence.repository;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.datamate.collection.domain.model.entity.TaskExecution;
import com.datamate.collection.domain.repository.TaskExecutionRepository;
import com.datamate.collection.infrastructure.persistence.mapper.TaskExecutionMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Repository;
import java.time.LocalDateTime;
/**
* TaskExecutionRepositoryImpl
*
* @since 2025/10/23
*/
@Repository
@RequiredArgsConstructor
public class TaskExecutionRepositoryImpl extends ServiceImpl<TaskExecutionMapper, TaskExecution>
implements TaskExecutionRepository {
private final TaskExecutionMapper taskExecutionMapper;
@Override
public TaskExecution selectLatestByTaskId(String taskId) {
return taskExecutionMapper.selectLatestByTaskId(taskId);
}
@Override
public void completeExecution(String executionId, String status, LocalDateTime completedAt,
Integer recordsProcessed, Long recordsTotal,
Long recordsSuccess, Long recordsFailed, String errorMessage) {
taskExecutionMapper.completeExecution(executionId, status, completedAt,
recordsProcessed, recordsTotal,
recordsSuccess, recordsFailed, errorMessage);
}
}

View File

@@ -1,83 +0,0 @@
package com.datamate.collection.infrastructure.runtime.datax;
import com.datamate.collection.domain.model.CollectionTask;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 根据任务配置拼装 DataX 作业 JSON 文件
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class DataxJobBuilder {
private final DataxProperties props;
public Path buildJobFile(CollectionTask task) throws IOException {
Files.createDirectories(Paths.get(props.getJobConfigPath()));
String fileName = String.format("datax-job-%s.json", task.getId());
Path path = Paths.get(props.getJobConfigPath(), fileName);
// 简化:直接将任务中的 config 字段作为 DataX 作业 JSON
try (FileWriter fw = new FileWriter(path.toFile())) {
String json = task.getConfig() == null || task.getConfig().isEmpty() ?
defaultJobJson() : task.getConfig();
if (StringUtils.isNotBlank(task.getConfig())) {
json = getJobConfig(task);
}
log.info("Job config: {}", json);
fw.write(json);
}
return path;
}
private String getJobConfig(CollectionTask task) {
try {
ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> parameter = objectMapper.readValue(
task.getConfig(),
new TypeReference<>() {}
);
Map<String, Object> job = new HashMap<>();
Map<String, Object> content = new HashMap<>();
Map<String, Object> reader = new HashMap<>();
reader.put("name", "nfsreader");
reader.put("parameter", parameter);
content.put("reader", reader);
Map<String, Object> writer = new HashMap<>();
writer.put("name", "nfswriter");
writer.put("parameter", parameter);
content.put("writer", writer);
job.put("content", List.of(content));
Map<String, Object> setting = new HashMap<>();
Map<String, Object> channel = new HashMap<>();
channel.put("channel", 2);
setting.put("speed", channel);
job.put("setting", setting);
Map<String, Object> jobConfig = new HashMap<>();
jobConfig.put("job", job);
return objectMapper.writeValueAsString(jobConfig);
} catch (Exception e) {
log.error("Failed to parse task config", e);
throw new RuntimeException("Failed to parse task config", e);
}
}
private String defaultJobJson() {
// 提供一个最小可运行的空 job,实际会被具体任务覆盖
return "{\n \"job\": {\n \"setting\": {\n \"speed\": {\n \"channel\": 1\n }\n },\n \"content\": []\n }\n}";
}
}

View File

@@ -1,46 +0,0 @@
package com.datamate.collection.infrastructure.runtime.datax;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.exec.*;
import org.springframework.stereotype.Component;
import java.io.File;
import java.time.Duration;
@Slf4j
@Component
@RequiredArgsConstructor
public class DataxProcessRunner {
private final DataxProperties props;
public int runJob(File jobFile, String executionId, Duration timeout) throws Exception {
File logFile = new File(props.getLogPath(), String.format("datax-%s.log", executionId));
String python = props.getPythonPath();
String dataxPy = props.getHomePath() + File.separator + "bin" + File.separator + "datax.py";
String cmd = String.format("%s %s %s", python, dataxPy, jobFile.getAbsolutePath());
log.info("Execute DataX: {}", cmd);
CommandLine cl = CommandLine.parse(cmd);
DefaultExecutor executor = new DefaultExecutor();
// 将日志追加输出到文件
File parent = logFile.getParentFile();
if (!parent.exists()) parent.mkdirs();
ExecuteStreamHandler streamHandler = new PumpStreamHandler(
new org.apache.commons.io.output.TeeOutputStream(
new java.io.FileOutputStream(logFile, true), System.out),
new org.apache.commons.io.output.TeeOutputStream(
new java.io.FileOutputStream(logFile, true), System.err)
);
executor.setStreamHandler(streamHandler);
ExecuteWatchdog watchdog = new ExecuteWatchdog(timeout.toMillis());
executor.setWatchdog(watchdog);
return executor.execute(cl);
}
}

View File

@@ -1,16 +1,18 @@
package com.datamate.collection.interfaces.converter; package com.datamate.collection.interfaces.converter;
import com.datamate.collection.domain.model.CollectionTask; import com.baomidou.mybatisplus.core.metadata.IPage;
import com.datamate.collection.domain.model.DataxTemplate; import com.datamate.collection.domain.model.entity.CollectionTask;
import com.datamate.collection.interfaces.dto.*; import com.datamate.collection.interfaces.dto.*;
import com.datamate.common.infrastructure.exception.BusinessException; import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.SystemErrorCode; import com.datamate.common.infrastructure.exception.SystemErrorCode;
import com.datamate.common.interfaces.PagedResponse;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.mapstruct.Mapper; import org.mapstruct.Mapper;
import org.mapstruct.Mapping; import org.mapstruct.Mapping;
import org.mapstruct.Named; import org.mapstruct.Named;
import org.mapstruct.factory.Mappers; import org.mapstruct.factory.Mappers;
import java.util.List;
import java.util.Map; import java.util.Map;
@Mapper @Mapper
@@ -20,9 +22,7 @@ public interface CollectionTaskConverter {
@Mapping(source = "config", target = "config", qualifiedByName = "parseJsonToMap") @Mapping(source = "config", target = "config", qualifiedByName = "parseJsonToMap")
CollectionTaskResponse toResponse(CollectionTask task); CollectionTaskResponse toResponse(CollectionTask task);
CollectionTaskSummary toSummary(CollectionTask task); List<CollectionTaskResponse> toResponse(List<CollectionTask> tasks);
DataxTemplateSummary toTemplateSummary(DataxTemplate template);
@Mapping(source = "config", target = "config", qualifiedByName = "mapToJsonString") @Mapping(source = "config", target = "config", qualifiedByName = "mapToJsonString")
CollectionTask toCollectionTask(CreateCollectionTaskRequest request); CollectionTask toCollectionTask(CreateCollectionTaskRequest request);
@@ -30,11 +30,19 @@ public interface CollectionTaskConverter {
@Mapping(source = "config", target = "config", qualifiedByName = "mapToJsonString") @Mapping(source = "config", target = "config", qualifiedByName = "mapToJsonString")
CollectionTask toCollectionTask(UpdateCollectionTaskRequest request); CollectionTask toCollectionTask(UpdateCollectionTaskRequest request);
@Mapping(source = "current", target = "page")
@Mapping(source = "size", target = "size")
@Mapping(source = "total", target = "totalElements")
@Mapping(source = "pages", target = "totalPages")
@Mapping(source = "records", target = "content")
PagedResponse<CollectionTaskResponse> toResponse(IPage<CollectionTask> tasks);
@Named("parseJsonToMap") @Named("parseJsonToMap")
default Map<String, Object> parseJsonToMap(String json) { default Map<String, Object> parseJsonToMap(String json) {
try { try {
ObjectMapper objectMapper = new ObjectMapper(); ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(json, Map.class); return
objectMapper.readValue(json, Map.class);
} catch (Exception e) { } catch (Exception e) {
throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER); throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER);
} }

View File

@@ -0,0 +1,25 @@
package com.datamate.collection.interfaces.dto;
import com.datamate.collection.common.enums.TaskStatus;
import com.datamate.common.interfaces.PagingQuery;
import lombok.Getter;
import lombok.Setter;
/**
* 归集任务分页查询参数
*
* @since 2025/10/23
*/
@Getter
@Setter
public class CollectionTaskPagingQuery extends PagingQuery {
/**
* 任务状态
*/
private TaskStatus status;
/**
* 任务名称
*/
private String name;
}

View File

@@ -0,0 +1,48 @@
package com.datamate.collection.interfaces.dto;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import com.datamate.collection.common.enums.TaskStatus;
import com.datamate.collection.common.enums.SyncMode;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.springframework.format.annotation.DateTimeFormat;
import jakarta.validation.Valid;
/**
* CollectionTaskResponse
*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class CollectionTaskResponse {
private String id;
private String name;
private String description;
@Valid
private Map<String, Object> config = new HashMap<>();
private TaskStatus status;
private SyncMode syncMode;
private String scheduleExpression;
private String lastExecutionId;
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
private LocalDateTime createdAt;
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
private LocalDateTime updatedAt;
}

View File

@@ -0,0 +1,53 @@
package com.datamate.collection.interfaces.dto;
import com.datamate.collection.common.enums.SyncMode;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.HashMap;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import jakarta.validation.Valid;
import jakarta.validation.constraints.*;
import io.swagger.v3.oas.annotations.media.Schema;
/**
* CreateCollectionTaskRequest
*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class CreateCollectionTaskRequest {
@NotNull
@Size(min = 1, max = 100)
@Schema(name = "name", description = "任务名称", requiredMode = Schema.RequiredMode.REQUIRED)
@JsonProperty("name")
private String name;
@Size(max = 500)
@Schema(name = "description", description = "任务描述", requiredMode = Schema.RequiredMode.NOT_REQUIRED)
@JsonProperty("description")
private String description;
@Valid
@NotNull
@Schema(name = "config", description = "归集配置,包含源端和目标端配置信息", requiredMode = Schema.RequiredMode.REQUIRED)
@JsonProperty("config")
private Map<String, Object> config = new HashMap<>();
@NotNull
@Valid
@Schema(name = "syncMode", requiredMode = Schema.RequiredMode.REQUIRED)
@JsonProperty("syncMode")
private SyncMode syncMode;
@Schema(name = "scheduleExpression", description = "Cron调度表达式 (syncMode=SCHEDULED 时必填)", requiredMode = Schema.RequiredMode.NOT_REQUIRED)
@JsonProperty("scheduleExpression")
private String scheduleExpression;
}

View File

@@ -0,0 +1,50 @@
package com.datamate.collection.interfaces.dto;
import com.datamate.collection.common.enums.SyncMode;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.HashMap;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import jakarta.validation.Valid;
import jakarta.validation.constraints.*;
import io.swagger.v3.oas.annotations.media.Schema;
/**
* UpdateCollectionTaskRequest
*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class UpdateCollectionTaskRequest {
@Size(min = 1, max = 100)
@Schema(name = "name", description = "任务名称", requiredMode = Schema.RequiredMode.NOT_REQUIRED)
@JsonProperty("name")
private String name;
@Size(max = 500)
@Schema(name = "description", description = "任务描述", requiredMode = Schema.RequiredMode.NOT_REQUIRED)
@JsonProperty("description")
private String description;
@Valid
@Schema(name = "config", description = "归集配置,包含源端和目标端配置信息", requiredMode = Schema.RequiredMode.NOT_REQUIRED)
@JsonProperty("config")
private Map<String, Object> config = new HashMap<>();
@Valid
@Schema(name = "syncMode", requiredMode = Schema.RequiredMode.NOT_REQUIRED)
@JsonProperty("syncMode")
private SyncMode syncMode;
@Schema(name = "scheduleExpression", description = "Cron调度表达式 (syncMode=SCHEDULED 时必填)", requiredMode = Schema.RequiredMode.NOT_REQUIRED)
@JsonProperty("scheduleExpression")
private String scheduleExpression;
}

View File

@@ -1,38 +1,36 @@
package com.datamate.collection.interfaces.rest; package com.datamate.collection.interfaces.rest;
import com.datamate.collection.application.service.CollectionTaskService; import com.datamate.collection.application.CollectionTaskService;
import com.datamate.collection.domain.model.CollectionTask; import com.datamate.collection.domain.model.entity.CollectionTask;
import com.datamate.collection.domain.model.DataxTemplate;
import com.datamate.collection.interfaces.api.CollectionTaskApi;
import com.datamate.collection.interfaces.converter.CollectionTaskConverter; import com.datamate.collection.interfaces.converter.CollectionTaskConverter;
import com.datamate.collection.interfaces.dto.*; import com.datamate.collection.interfaces.dto.*;
import com.datamate.common.interfaces.PagedResponse;
import jakarta.validation.Valid;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.RestController;
import java.util.*; import java.util.*;
import java.util.stream.Collectors;
@Slf4j @Slf4j
@RestController @RestController
@RequestMapping("/data-collection/tasks")
@RequiredArgsConstructor @RequiredArgsConstructor
@Validated public class CollectionTaskController{
public class CollectionTaskController implements CollectionTaskApi {
private final CollectionTaskService taskService; private final CollectionTaskService taskService;
@Override @PostMapping
public ResponseEntity<CollectionTaskResponse> createTask(CreateCollectionTaskRequest request) { public ResponseEntity<CollectionTaskResponse> createTask(@Valid @RequestBody CreateCollectionTaskRequest request) {
CollectionTask task = CollectionTaskConverter.INSTANCE.toCollectionTask(request); CollectionTask task = CollectionTaskConverter.INSTANCE.toCollectionTask(request);
task.setId(UUID.randomUUID().toString()); task.setId(UUID.randomUUID().toString());
task.addPath(); task.addPath();
return ResponseEntity.ok().body(CollectionTaskConverter.INSTANCE.toResponse(taskService.create(task))); return ResponseEntity.ok().body(CollectionTaskConverter.INSTANCE.toResponse(taskService.create(task)));
} }
@Override @PutMapping("/{id}")
public ResponseEntity<CollectionTaskResponse> updateTask(String id, UpdateCollectionTaskRequest request) { public ResponseEntity<CollectionTaskResponse> updateTask(@PathVariable("id") String id, @Valid @RequestBody UpdateCollectionTaskRequest request) {
if (taskService.get(id) == null) { if (taskService.get(id) == null) {
return ResponseEntity.notFound().build(); return ResponseEntity.notFound().build();
} }
@@ -41,43 +39,20 @@ public class CollectionTaskController implements CollectionTaskApi {
return ResponseEntity.ok(CollectionTaskConverter.INSTANCE.toResponse(taskService.update(task))); return ResponseEntity.ok(CollectionTaskConverter.INSTANCE.toResponse(taskService.update(task)));
} }
@Override @DeleteMapping("/{id}")
public ResponseEntity<Void> deleteTask(String id) { public ResponseEntity<Void> deleteTask(@PathVariable("id") String id) {
taskService.delete(id); taskService.delete(id);
return ResponseEntity.ok().build(); return ResponseEntity.ok().build();
} }
@Override @GetMapping("/{id}")
public ResponseEntity<CollectionTaskResponse> getTaskDetail(String id) { public ResponseEntity<CollectionTaskResponse> getTaskDetail(@PathVariable("id") String id) {
CollectionTask task = taskService.get(id); CollectionTask task = taskService.get(id);
return task == null ? ResponseEntity.notFound().build() : ResponseEntity.ok(CollectionTaskConverter.INSTANCE.toResponse(task)); return task == null ? ResponseEntity.notFound().build() : ResponseEntity.ok(CollectionTaskConverter.INSTANCE.toResponse(task));
} }
@Override @GetMapping
public ResponseEntity<PagedCollectionTaskSummary> getTasks(Integer page, Integer size, TaskStatus status, String name) { public ResponseEntity<PagedResponse<CollectionTaskResponse>> getTasks(@Valid CollectionTaskPagingQuery query) {
var list = taskService.list(page, size, status == null ? null : status.getValue(), name); return ResponseEntity.ok(CollectionTaskConverter.INSTANCE.toResponse(taskService.getTasks(query)));
PagedCollectionTaskSummary response = new PagedCollectionTaskSummary();
response.setContent(list.stream().map(CollectionTaskConverter.INSTANCE::toSummary).collect(Collectors.toList()));
response.setNumber(page);
response.setSize(size);
response.setTotalElements(list.size()); // 简化处理,实际项目中应该有单独的count查询
response.setTotalPages(size == null || size == 0 ? 1 : (int) Math.ceil(list.size() * 1.0 / size));
return ResponseEntity.ok(response);
}
@Override
public ResponseEntity<PagedDataxTemplates> templatesGet(String sourceType, String targetType,
Integer page, Integer size) {
int pageNum = page != null ? page : 0;
int pageSize = size != null ? size : 20;
List<DataxTemplate> templates = taskService.listTemplates(sourceType, targetType, pageNum, pageSize);
int totalElements = taskService.countTemplates(sourceType, targetType);
PagedDataxTemplates response = new PagedDataxTemplates();
response.setContent(templates.stream().map(CollectionTaskConverter.INSTANCE::toTemplateSummary).collect(Collectors.toList()));
response.setNumber(pageNum);
response.setSize(pageSize);
response.setTotalElements(totalElements);
response.setTotalPages(pageSize > 0 ? (int) Math.ceil(totalElements * 1.0 / pageSize) : 1);
return ResponseEntity.ok(response);
} }
} }

View File

@@ -1,101 +0,0 @@
package com.datamate.collection.interfaces.rest;
import com.datamate.collection.application.service.CollectionTaskService;
import com.datamate.collection.application.service.TaskExecutionService;
import com.datamate.collection.domain.model.TaskExecution;
import com.datamate.collection.interfaces.api.TaskExecutionApi;
import com.datamate.collection.interfaces.dto.PagedTaskExecutions;
import com.datamate.collection.interfaces.dto.TaskExecutionDetail;
import com.datamate.collection.interfaces.dto.TaskExecutionResponse;
import com.datamate.collection.interfaces.dto.TaskStatus; // DTO enum
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RestController;
import java.util.stream.Collectors;
@RestController
@RequiredArgsConstructor
@Validated
public class TaskExecutionController implements TaskExecutionApi {
private final TaskExecutionService executionService;
private final CollectionTaskService taskService;
private TaskExecutionDetail toDetail(TaskExecution e) {
TaskExecutionDetail d = new TaskExecutionDetail();
d.setId(e.getId());
d.setTaskId(e.getTaskId());
d.setTaskName(e.getTaskName());
if (e.getStatus() != null) { d.setStatus(TaskStatus.fromValue(e.getStatus().name())); }
d.setProgress(e.getProgress());
d.setRecordsTotal(e.getRecordsTotal() != null ? e.getRecordsTotal().intValue() : null);
d.setRecordsProcessed(e.getRecordsProcessed() != null ? e.getRecordsProcessed().intValue() : null);
d.setRecordsSuccess(e.getRecordsSuccess() != null ? e.getRecordsSuccess().intValue() : null);
d.setRecordsFailed(e.getRecordsFailed() != null ? e.getRecordsFailed().intValue() : null);
d.setThroughput(e.getThroughput());
d.setDataSizeBytes(e.getDataSizeBytes() != null ? e.getDataSizeBytes().intValue() : null);
d.setStartedAt(e.getStartedAt());
d.setCompletedAt(e.getCompletedAt());
d.setDurationSeconds(e.getDurationSeconds());
d.setErrorMessage(e.getErrorMessage());
return d;
}
// GET /executions/{id}
@Override
public ResponseEntity<TaskExecutionDetail> executionsIdGet(String id) {
var exec = executionService.get(id);
return exec == null ? ResponseEntity.notFound().build() : ResponseEntity.ok(toDetail(exec));
}
// DELETE /executions/{id}
@Override
public ResponseEntity<Void> executionsIdDelete(String id) {
executionService.stop(id); // 幂等处理,在service内部判断状态
return ResponseEntity.noContent().build();
}
// POST /tasks/{id}/execute -> 201
@Override
public ResponseEntity<TaskExecutionResponse> tasksIdExecutePost(String id) {
var task = taskService.get(id);
if (task == null) { return ResponseEntity.notFound().build(); }
var latestExec = executionService.getLatestByTaskId(id);
if (latestExec != null && latestExec.getStatus() == com.datamate.collection.domain.model.TaskStatus.RUNNING) {
TaskExecutionResponse r = new TaskExecutionResponse();
r.setId(latestExec.getId());
r.setTaskId(latestExec.getTaskId());
r.setTaskName(latestExec.getTaskName());
r.setStatus(TaskStatus.fromValue(latestExec.getStatus().name()));
r.setStartedAt(latestExec.getStartedAt());
return ResponseEntity.status(HttpStatus.CREATED).body(r); // 返回已有运行实例
}
var exec = taskService.startExecution(task);
TaskExecutionResponse r = new TaskExecutionResponse();
r.setId(exec.getId());
r.setTaskId(exec.getTaskId());
r.setTaskName(exec.getTaskName());
r.setStatus(TaskStatus.fromValue(exec.getStatus().name()));
r.setStartedAt(exec.getStartedAt());
return ResponseEntity.status(HttpStatus.CREATED).body(r);
}
// GET /tasks/{id}/executions -> 分页
@Override
public ResponseEntity<PagedTaskExecutions> tasksIdExecutionsGet(String id, Integer page, Integer size) {
if (page == null || page < 0) { page = 0; }
if (size == null || size <= 0) { size = 20; }
var list = executionService.list(id, null, null, null, page, size);
long total = executionService.count(id, null, null, null);
PagedTaskExecutions p = new PagedTaskExecutions();
p.setContent(list.stream().map(this::toDetail).collect(Collectors.toList()));
p.setNumber(page);
p.setSize(size);
p.setTotalElements((int) total);
p.setTotalPages(size == 0 ? 1 : (int) Math.ceil(total * 1.0 / size));
return ResponseEntity.ok(p);
}
}

View File

@@ -1,11 +1,10 @@
package com.datamate.collection.application.scheduler; package com.datamate.collection.interfaces.scheduler;
import com.datamate.collection.application.service.DataxExecutionService; import com.datamate.collection.application.CollectionTaskService;
import com.datamate.collection.domain.model.CollectionTask; import com.datamate.collection.application.TaskExecutionService;
import com.datamate.collection.domain.model.TaskStatus; import com.datamate.collection.common.enums.TaskStatus;
import com.datamate.collection.domain.model.TaskExecution; import com.datamate.collection.domain.model.entity.CollectionTask;
import com.datamate.collection.infrastructure.persistence.mapper.CollectionTaskMapper; import com.datamate.collection.domain.model.entity.TaskExecution;
import com.datamate.collection.infrastructure.persistence.mapper.TaskExecutionMapper;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
@@ -21,14 +20,13 @@ import java.util.List;
@RequiredArgsConstructor @RequiredArgsConstructor
public class TaskSchedulerInitializer { public class TaskSchedulerInitializer {
private final CollectionTaskMapper taskMapper; private final CollectionTaskService collectionTaskService;
private final TaskExecutionMapper executionMapper; private final TaskExecutionService taskExecutionService;
private final DataxExecutionService dataxExecutionService;
// 定期扫描激活的采集任务根据 Cron 判断是否到期执行 // 定期扫描激活的采集任务根据 Cron 判断是否到期执行
@Scheduled(fixedDelayString = "${datamate.data-collection.scheduler.scan-interval-ms:10000}") @Scheduled(fixedDelayString = "${datamate.data-collection.scheduler.scan-interval-ms:10000}")
public void scanAndTrigger() { public void scanAndTrigger() {
List<CollectionTask> tasks = taskMapper.selectActiveTasks(); List<CollectionTask> tasks = collectionTaskService.selectActiveTasks();
if (tasks == null || tasks.isEmpty()) { if (tasks == null || tasks.isEmpty()) {
return; return;
} }
@@ -40,7 +38,7 @@ public class TaskSchedulerInitializer {
} }
try { try {
// 如果最近一次执行仍在运行则跳过 // 如果最近一次执行仍在运行则跳过
TaskExecution latest = executionMapper.selectLatestByTaskId(task.getId()); TaskExecution latest = taskExecutionService.selectLatestByTaskId(task.getId());
if (latest != null && latest.getStatus() == TaskStatus.RUNNING) { if (latest != null && latest.getStatus() == TaskStatus.RUNNING) {
continue; continue;
} }
@@ -53,9 +51,9 @@ public class TaskSchedulerInitializer {
if (nextTime != null && !nextTime.isAfter(now)) { if (nextTime != null && !nextTime.isAfter(now)) {
// 到期触发一次执行 // 到期触发一次执行
TaskExecution exec = dataxExecutionService.createExecution(task); TaskExecution exec = taskExecutionService.createExecution(task);
int timeout = task.getTimeoutSeconds() == null ? 3600 : task.getTimeoutSeconds(); int timeout = task.getTimeoutSeconds() == null ? 3600 : task.getTimeoutSeconds();
dataxExecutionService.runAsync(task, exec.getId(), timeout); taskExecutionService.runAsync(task, exec.getId(), timeout);
log.info("Triggered DataX execution for task {} at {}, execId={}", task.getId(), now, exec.getId()); log.info("Triggered DataX execution for task {} at {}, execId={}", task.getId(), now, exec.getId());
} }
} catch (Exception ex) { } catch (Exception ex) {

View File

@@ -1,11 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd"> "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.datamate.collection.infrastructure.persistence.mapper.CollectionTaskMapper"> <mapper namespace="com.datamate.collection.infrastructure.persistence.mapper.CollectionTaskMapper">
<!-- Result Map --> <!-- Result Map -->
<resultMap id="CollectionTaskResultMap" type="com.datamate.collection.domain.model.CollectionTask"> <resultMap id="CollectionTaskResultMap" type="com.datamate.collection.domain.model.entity.CollectionTask">
<id property="id" column="id"/> <id property="id" column="id"/>
<result property="name" column="name"/> <result property="name" column="name"/>
<result property="description" column="description"/> <result property="description" column="description"/>
@@ -24,21 +23,6 @@
<result property="updatedBy" column="updated_by"/> <result property="updatedBy" column="updated_by"/>
</resultMap> </resultMap>
<!-- 结果映射 (模板) -->
<resultMap id="DataxTemplateResultMap" type="com.datamate.collection.domain.model.DataxTemplate">
<id column="id" property="id" jdbcType="VARCHAR"/>
<result column="name" property="name" jdbcType="VARCHAR"/>
<result column="source_type" property="sourceType" jdbcType="VARCHAR"/>
<result column="target_type" property="targetType" jdbcType="VARCHAR"/>
<result column="template_content" property="templateContent" jdbcType="VARCHAR"/>
<result column="description" property="description" jdbcType="VARCHAR"/>
<result column="version" property="version" jdbcType="VARCHAR"/>
<result column="is_system" property="isSystem" jdbcType="BOOLEAN"/>
<result column="created_at" property="createdAt" jdbcType="TIMESTAMP"/>
<result column="updated_at" property="updatedAt" jdbcType="TIMESTAMP"/>
<result column="created_by" property="createdBy" jdbcType="VARCHAR"/>
</resultMap>
<!-- Base Column List (tasks) --> <!-- Base Column List (tasks) -->
<sql id="Base_Column_List"> <sql id="Base_Column_List">
id, id,
@@ -47,96 +31,6 @@
last_execution_id, created_at, updated_at, created_by, updated_by last_execution_id, created_at, updated_at, created_by, updated_by
</sql> </sql>
<!-- Template Column List -->
<sql id="Template_Column_List">
id, name, source_type, target_type, template_content, description, version, is_system, created_at, updated_at, created_by
</sql>
<!-- Insert -->
<insert id="insert" parameterType="com.datamate.collection.domain.model.CollectionTask">
INSERT INTO t_dc_collection_tasks (id, name, description, config, status, sync_mode,
schedule_expression, retry_count, timeout_seconds, max_records, sort_field,
last_execution_id, created_at, updated_at, created_by, updated_by)
VALUES (#{id}, #{name}, #{description}, #{config}, #{status}, #{syncMode},
#{scheduleExpression}, #{retryCount}, #{timeoutSeconds}, #{maxRecords}, #{sortField},
#{lastExecutionId}, #{createdAt}, #{updatedAt}, #{createdBy}, #{updatedBy})
</insert>
<!-- Update -->
<update id="update" parameterType="com.datamate.collection.domain.model.CollectionTask">
UPDATE t_dc_collection_tasks
SET name = #{name},
description = #{description},
config = #{config},
status = #{status},
sync_mode = #{syncMode},
schedule_expression = #{scheduleExpression},
retry_count = #{retryCount},
timeout_seconds = #{timeoutSeconds},
max_records = #{maxRecords},
sort_field = #{sortField},
last_execution_id = #{lastExecutionId},
updated_at = #{updatedAt},
updated_by = #{updatedBy}
WHERE id = #{id}
</update>
<!-- Delete by ID -->
<delete id="deleteById" parameterType="java.lang.String">
DELETE FROM t_dc_collection_tasks WHERE id = #{id}
</delete>
<!-- Select by ID -->
<select id="selectById" parameterType="java.lang.String" resultMap="CollectionTaskResultMap">
SELECT <include refid="Base_Column_List"/> FROM t_dc_collection_tasks WHERE id = #{id}
</select>
<!-- Select by Name -->
<select id="selectByName" parameterType="java.lang.String" resultMap="CollectionTaskResultMap">
SELECT <include refid="Base_Column_List"/> FROM t_dc_collection_tasks WHERE name = #{name}
</select>
<!-- Select by Status -->
<select id="selectByStatus" parameterType="java.lang.String" resultMap="CollectionTaskResultMap">
SELECT <include refid="Base_Column_List"/> FROM t_dc_collection_tasks WHERE status = #{status} ORDER BY created_at DESC
</select>
<!-- Select All with Pagination -->
<select id="selectAll" resultMap="CollectionTaskResultMap">
SELECT <include refid="Base_Column_List"/> FROM t_dc_collection_tasks
<where>
<if test="status != null and status != ''">
AND status = #{status}
</if>
<if test="name != null and name != ''">
AND name LIKE CONCAT('%', #{name}, '%')
</if>
</where>
ORDER BY created_at DESC
<if test="offset != null and limit != null">
LIMIT #{offset}, #{limit}
</if>
</select>
<!-- Count Total -->
<select id="count" resultType="java.lang.Long">
SELECT COUNT(*) FROM t_dc_collection_tasks
<where>
<if test="status != null and status != ''">
AND status = #{status}
</if>
<if test="name != null and name != ''">
AND name LIKE CONCAT('%', #{name}, '%')
</if>
<if test="sourceDataSourceId != null and sourceDataSourceId != ''">
AND source_datasource_id = #{sourceDataSourceId}
</if>
<if test="targetDataSourceId != null and targetDataSourceId != ''">
AND target_datasource_id = #{targetDataSourceId}
</if>
</where>
</select>
<!-- Update Status --> <!-- Update Status -->
<update id="updateStatus"> <update id="updateStatus">
UPDATE t_dc_collection_tasks SET status = #{status}, updated_at = NOW() WHERE id = #{id} UPDATE t_dc_collection_tasks SET status = #{status}, updated_at = NOW() WHERE id = #{id}
@@ -154,35 +48,4 @@
AND schedule_expression IS NOT NULL AND schedule_expression IS NOT NULL
ORDER BY created_at DESC ORDER BY created_at DESC
</select> </select>
<!-- 查询模板列表 -->
<select id="selectList" resultMap="DataxTemplateResultMap">
SELECT <include refid="Template_Column_List"/> FROM t_dc_datax_templates
<where>
<if test="sourceType != null and sourceType != ''">
AND source_type = #{sourceType}
</if>
<if test="targetType != null and targetType != ''">
AND target_type = #{targetType}
</if>
</where>
ORDER BY is_system DESC, created_at DESC
<if test="limit > 0">
LIMIT #{offset}, #{limit}
</if>
</select>
<!-- 统计模板数量 -->
<select id="countTemplates" resultType="java.lang.Integer">
SELECT COUNT(1) FROM t_dc_datax_templates
<where>
<if test="sourceType != null and sourceType != ''">
AND source_type = #{sourceType}
</if>
<if test="targetType != null and targetType != ''">
AND target_type = #{targetType}
</if>
</where>
</select>
</mapper> </mapper>

View File

@@ -1,191 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd"> "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.datamate.collection.infrastructure.persistence.mapper.TaskExecutionMapper"> <mapper namespace="com.datamate.collection.infrastructure.persistence.mapper.TaskExecutionMapper">
<!-- Result Map -->
<resultMap id="TaskExecutionResultMap" type="com.datamate.collection.domain.model.TaskExecution">
<id property="id" column="id"/>
<result property="taskId" column="task_id"/>
<result property="taskName" column="task_name"/>
<result property="status" column="status" typeHandler="org.apache.ibatis.type.EnumTypeHandler"/>
<result property="progress" column="progress"/>
<result property="recordsTotal" column="records_total"/>
<result property="recordsProcessed" column="records_processed"/>
<result property="recordsSuccess" column="records_success"/>
<result property="recordsFailed" column="records_failed"/>
<result property="throughput" column="throughput"/>
<result property="dataSizeBytes" column="data_size_bytes"/>
<result property="startedAt" column="started_at"/>
<result property="completedAt" column="completed_at"/>
<result property="durationSeconds" column="duration_seconds"/>
<result property="errorMessage" column="error_message"/>
<result property="dataxJobId" column="datax_job_id"/>
<result property="config" column="config"/>
<result property="result" column="result"/>
<result property="createdAt" column="created_at"/>
</resultMap>
<!-- Base Column List -->
<sql id="Base_Column_List">
id, task_id, task_name, status, progress, records_total, records_processed,
records_success, records_failed, throughput, data_size_bytes, started_at,
completed_at, duration_seconds, error_message, datax_job_id, config, result, created_at
</sql>
<!-- Insert -->
<insert id="insert" parameterType="com.datamate.collection.domain.model.TaskExecution">
INSERT INTO t_dc_task_executions (
id, task_id, task_name, status, progress, records_total, records_processed,
records_success, records_failed, throughput, data_size_bytes, started_at,
completed_at, duration_seconds, error_message, datax_job_id, config, result, created_at
) VALUES (
#{id}, #{taskId}, #{taskName}, #{status}, #{progress}, #{recordsTotal}, #{recordsProcessed},
#{recordsSuccess}, #{recordsFailed}, #{throughput}, #{dataSizeBytes}, #{startedAt},
#{completedAt}, #{durationSeconds}, #{errorMessage}, #{dataxJobId}, #{config}, #{result}, #{createdAt}
)
</insert>
<!-- Update -->
<update id="update" parameterType="com.datamate.collection.domain.model.TaskExecution">
UPDATE t_dc_task_executions
SET status = #{status},
progress = #{progress},
records_total = #{recordsTotal},
records_processed = #{recordsProcessed},
records_success = #{recordsSuccess},
records_failed = #{recordsFailed},
throughput = #{throughput},
data_size_bytes = #{dataSizeBytes},
completed_at = #{completedAt},
duration_seconds = #{durationSeconds},
error_message = #{errorMessage},
result = #{result}
WHERE id = #{id}
</update>
<!-- Delete by ID -->
<delete id="deleteById" parameterType="java.lang.String">
DELETE FROM t_dc_task_executions WHERE id = #{id}
</delete>
<!-- Select by ID -->
<select id="selectById" parameterType="java.lang.String" resultMap="TaskExecutionResultMap">
SELECT <include refid="Base_Column_List"/>
FROM t_dc_task_executions
WHERE id = #{id}
</select>
<!-- Select by Task ID -->
<select id="selectByTaskId" resultMap="TaskExecutionResultMap">
SELECT <include refid="Base_Column_List"/>
FROM t_dc_task_executions
WHERE task_id = #{taskId}
ORDER BY started_at DESC
<if test="limit != null">
LIMIT #{limit}
</if>
</select>
<!-- Select by Status -->
<select id="selectByStatus" parameterType="java.lang.String" resultMap="TaskExecutionResultMap">
SELECT <include refid="Base_Column_List"/>
FROM t_dc_task_executions
WHERE status = #{status}
ORDER BY started_at DESC
</select>
<!-- Select All with Pagination -->
<select id="selectAll" resultMap="TaskExecutionResultMap">
SELECT <include refid="Base_Column_List"/>
FROM t_dc_task_executions
<where>
<if test="taskId != null and taskId != ''">
AND task_id = #{taskId}
</if>
<if test="status != null and status != ''">
AND status = #{status}
</if>
<if test="startDate != null">
AND started_at >= #{startDate}
</if>
<if test="endDate != null">
AND started_at &lt;= #{endDate}
</if>
</where>
ORDER BY started_at DESC
<if test="offset != null and limit != null">
LIMIT #{offset}, #{limit}
</if>
</select>
<!-- Count Total -->
<select id="count" resultType="java.lang.Long">
SELECT COUNT(*)
FROM t_dc_task_executions
<where>
<if test="taskId != null and taskId != ''">
AND task_id = #{taskId}
</if>
<if test="status != null and status != ''">
AND status = #{status}
</if>
<if test="startDate != null">
AND started_at >= #{startDate}
</if>
<if test="endDate != null">
AND started_at &lt;= #{endDate}
</if>
</where>
</select>
<!-- Update Status and Progress -->
<update id="updateProgress">
UPDATE t_dc_task_executions
SET status = #{status},
progress = #{progress},
records_processed = #{recordsProcessed},
throughput = #{throughput}
WHERE id = #{id}
</update>
<!-- Complete Execution -->
<update id="completeExecution">
UPDATE t_dc_task_executions
SET status = #{status},
progress = 100.00,
completed_at = #{completedAt},
duration_seconds = #{durationSeconds},
records_success = #{recordsSuccess},
records_failed = #{recordsFailed},
data_size_bytes = #{dataSizeBytes},
error_message = #{errorMessage},
result = #{result}
WHERE id = #{id}
</update>
<!-- Select Running Executions -->
<select id="selectRunningExecutions" resultMap="TaskExecutionResultMap">
SELECT <include refid="Base_Column_List"/>
FROM t_dc_task_executions
WHERE status = 'RUNNING'
ORDER BY started_at ASC
</select>
<!-- Select Latest Execution by Task --> <!-- Select Latest Execution by Task -->
<select id="selectLatestByTaskId" parameterType="java.lang.String" resultMap="TaskExecutionResultMap"> <select id="selectLatestByTaskId" resultType="com.datamate.collection.domain.model.entity.TaskExecution">
SELECT <include refid="Base_Column_List"/> SELECT * FROM t_dc_task_executions
FROM t_dc_task_executions
WHERE task_id = #{taskId} WHERE task_id = #{taskId}
ORDER BY started_at DESC ORDER BY started_at DESC
LIMIT 1 LIMIT 1
</select> </select>
<!-- Delete Old Executions --> <!-- Complete Execution -->
<delete id="deleteOldExecutions"> <update id="completeExecution">
DELETE FROM t_dc_task_executions UPDATE t_dc_task_executions
WHERE started_at &lt; #{beforeDate} SET status = #{status},
</delete> completed_at = #{completedAt},
records_processed = #{recordsProcessed},
records_total = #{recordsTotal},
records_success = #{recordsSuccess},
records_failed = #{recordsFailed},
error_message = #{errorMessage},
updated_at = NOW()
WHERE id = #{executionId}
</update>
</mapper> </mapper>

View File

@@ -2,6 +2,7 @@ package com.datamate.datamanagement.application;
import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.datamate.common.domain.utils.ChunksSaver;
import com.datamate.datamanagement.interfaces.dto.*; import com.datamate.datamanagement.interfaces.dto.*;
import com.datamate.common.infrastructure.exception.BusinessAssert; import com.datamate.common.infrastructure.exception.BusinessAssert;
import com.datamate.common.interfaces.PagedResponse; import com.datamate.common.interfaces.PagedResponse;
@@ -100,8 +101,13 @@ public class DatasetApplicationService {
/** /**
* 删除数据集 * 删除数据集
*/ */
@Transactional
public void deleteDataset(String datasetId) { public void deleteDataset(String datasetId) {
Dataset dataset = datasetRepository.getById(datasetId);
datasetRepository.removeById(datasetId); datasetRepository.removeById(datasetId);
if (dataset != null) {
ChunksSaver.deleteFolder(dataset.getPath());
}
} }
/** /**

View File

@@ -24,7 +24,6 @@ public interface DatasetFileMapper extends BaseMapper<DatasetFile> {
@Param("status") String status, @Param("status") String status,
RowBounds rowBounds); RowBounds rowBounds);
int insert(DatasetFile file);
int update(DatasetFile file); int update(DatasetFile file);
int deleteById(@Param("id") String id); int deleteById(@Param("id") String id);
} }

Binary file not shown.

Before

Width:  |  Height:  |  Size: 143 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 45 KiB

View File

@@ -60,7 +60,7 @@ public class FileService {
boolean isFinish = Objects.equals(preRequest.getUploadedFileNum(), preRequest.getTotalFileNum()); boolean isFinish = Objects.equals(preRequest.getUploadedFileNum(), preRequest.getTotalFileNum());
if (isFinish) { if (isFinish) {
// 删除存分片的临时路径 // 删除存分片的临时路径
ChunksSaver.deleteFiles(new File(preRequest.getUploadPath(), ChunksSaver.deleteFolder(new File(preRequest.getUploadPath(),
String.format(ChunksSaver.TEMP_DIR_NAME_FORMAT, preRequest.getId())).getPath()); String.format(ChunksSaver.TEMP_DIR_NAME_FORMAT, preRequest.getId())).getPath());
chunkUploadRequestMapper.deleteById(preRequest.getId()); chunkUploadRequestMapper.deleteById(preRequest.getId());
} }

View File

@@ -2,6 +2,8 @@ package com.datamate.common.domain.utils;
import com.datamate.common.domain.model.ChunkUploadPreRequest; import com.datamate.common.domain.model.ChunkUploadPreRequest;
import com.datamate.common.domain.model.ChunkUploadRequest; import com.datamate.common.domain.model.ChunkUploadRequest;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.SystemErrorCode;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.springframework.web.multipart.MultipartFile; import org.springframework.web.multipart.MultipartFile;
@@ -106,29 +108,17 @@ public class ChunksSaver {
* *
* @param uploadPath 文件路径 * @param uploadPath 文件路径
*/ */
public static void deleteFiles(String uploadPath) { public static void deleteFolder(String uploadPath) {
File dic = new File(uploadPath); File folder = new File(uploadPath);
if (!dic.exists()) {
return; if (!folder.exists()) {
} log.info("folder {} does not exist", uploadPath);
File[] files = dic.listFiles();
if (files == null || files.length == 0) {
dic.delete();
return; return;
} }
try { try {
for (File file : files) { FileUtils.deleteDirectory(folder);
if (file.isDirectory()) { } catch (IOException e) {
deleteFiles(file.getPath()); throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR);
} else {
file.delete();
}
}
if (dic.exists()) {
dic.delete();
}
} catch (SecurityException e) {
log.warn("Fail to delete file", e);
} }
} }
} }

View File

@@ -0,0 +1,23 @@
# Patterns to ignore when building packages.
# This supports shell glob matching, relative path matching, and
# negation (prefixed with !). Only one pattern per line.
.DS_Store
# Common VCS dirs
.git/
.gitignore
.bzr/
.bzrignore
.hg/
.hgignore
.svn/
# Common backup files
*.swp
*.bak
*.tmp
*.orig
*~
# Various IDEs
.project
.idea/
*.tmproj
.vscode/

View File

@@ -0,0 +1,24 @@
apiVersion: v2
name: datamate
description: A Helm chart for Kubernetes
# A chart can be either an 'application' or a 'library' chart.
#
# Application charts are a collection of templates that can be packaged into versioned archives
# to be deployed.
#
# Library charts provide useful utilities or functions for the chart developer. They're included as
# a dependency of application charts to inject those utilities and functions into the rendering
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.0.1
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "0.0.1"

View File

@@ -0,0 +1,23 @@
# Patterns to ignore when building packages.
# This supports shell glob matching, relative path matching, and
# negation (prefixed with !). Only one pattern per line.
.DS_Store
# Common VCS dirs
.git/
.gitignore
.bzr/
.bzrignore
.hg/
.hgignore
.svn/
# Common backup files
*.swp
*.bak
*.tmp
*.orig
*~
# Various IDEs
.project
.idea/
*.tmproj
.vscode/

View File

@@ -0,0 +1,29 @@
apiVersion: v2
name: backend
description: A Helm chart for Kubernetes
# A chart can be either an 'application' or a 'library' chart.
#
# Application charts are a collection of templates that can be packaged into versioned archives
# to be deployed.
#
# Library charts provide useful utilities or functions for the chart developer. They're included as
# a dependency of application charts to inject those utilities and functions into the rendering
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.0.1
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "0.0.1"
dependencies:
- name: database
repository: file://../database
version: 0.0.1

View File

@@ -0,0 +1,75 @@
{{/*
Expand the name of the chart.
*/}}
{{- define "backend.name" -}}
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" -}}
{{- end }}
{{/*
Create a default fully qualified app name.
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
If release name contains chart name it will be used as a full name.
*/}}
{{- define "backend.fullname" -}}
{{- if .Values.fullnameOverride }}
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" -}}
{{- else }}
{{- $name := default .Chart.Name .Values.nameOverride }}
{{- if contains $name .Release.Name }}
{{- .Release.Name | trunc 63 | trimSuffix "-" -}}
{{- else }}
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" -}}
{{- end }}
{{- end }}
{{- end }}
{{/*
Create chart name and version as used by the chart label.
*/}}
{{- define "backend.chart" -}}
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" -}}
{{- end }}
{{/*
Common labels
*/}}
{{- define "backend.labels" -}}
helm.sh/chart: {{ include "backend.chart" . }}
{{ include "backend.selectorLabels" . }}
{{- if .Chart.AppVersion }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
{{- end }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
{{- end }}
{{/*
Selector labels
*/}}
{{- define "backend.selectorLabels" -}}
app.kubernetes.io/name: {{ include "backend.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end }}
{{/*
Create the name of the service account to use
*/}}
{{- define "backend.serviceAccountName" -}}
{{- if .Values.serviceAccount.create }}
{{- default (include "backend.fullname" .) .Values.serviceAccount.name -}}
{{- else }}
{{- default "default" .Values.serviceAccount.name -}}
{{- end }}
{{- end }}
{{/*
Name of image
*/}}
{{- define "backend.image" -}}
{{- $name := default .Values.image.repository .Values.global.image.backend.name }}
{{- $tag := default .Values.image.tag .Values.global.image.backend.tag }}
{{- if .Values.global.image.repository }}
{{- .Values.global.image.repository | trimSuffix "/" }}/{{ $name }}:{{ $tag }}
{{- else }}
{{- $name }}:{{ $tag }}
{{- end }}
{{- end }}

View File

@@ -0,0 +1,82 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "backend.fullname" . }}
labels:
{{- include "backend.labels" . | nindent 4 }}
spec:
{{- if not .Values.autoscaling.enabled }}
replicas: {{ .Values.replicaCount }}
{{- end }}
selector:
matchLabels:
{{- include "backend.selectorLabels" . | nindent 6 }}
template:
metadata:
{{- with .Values.podAnnotations }}
annotations:
{{- toYaml . | nindent 8 }}
{{- end }}
labels:
{{- include "backend.labels" . | nindent 8 }}
{{- with .Values.podLabels }}
{{- toYaml . | nindent 8 }}
{{- end }}
spec:
{{- with .Values.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
serviceAccountName: {{ include "backend.serviceAccountName" . }}
{{- with .Values.podSecurityContext }}
securityContext:
{{- toYaml . | nindent 8 }}
{{- end }}
containers:
- name: {{ .Chart.Name }}
{{- with .Values.securityContext }}
securityContext:
{{- toYaml . | nindent 12 }}
{{- end }}
image: "{{ include "backend.image" . }}"
imagePullPolicy: {{ default .Values.global.image.pullPolicy .Values.image.pullPolicy }}
ports:
- name: http
containerPort: {{ .Values.service.port }}
protocol: TCP
{{- with .Values.livenessProbe }}
livenessProbe:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- with .Values.readinessProbe }}
readinessProbe:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- with .Values.resources }}
resources:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- with .Values.env }}
env:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- with .Values.volumeMounts }}
volumeMounts:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- with .Values.volumes }}
volumes:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}

View File

@@ -0,0 +1,15 @@
apiVersion: v1
kind: Service
metadata:
name: {{ include "backend.fullname" . }}
labels:
{{- include "backend.labels" . | nindent 4 }}
spec:
type: {{ .Values.service.type }}
ports:
- port: {{ .Values.service.port }}
targetPort: {{ .Values.service.port }}
protocol: TCP
name: {{ .Chart.Name }}
selector:
{{- include "backend.selectorLabels" . | nindent 4 }}

View File

@@ -0,0 +1,13 @@
{{- if .Values.serviceAccount.create -}}
apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ include "backend.serviceAccountName" . }}
labels:
{{- include "backend.labels" . | nindent 4 }}
{{- with .Values.serviceAccount.annotations }}
annotations:
{{- toYaml . | nindent 4 }}
{{- end }}
automountServiceAccountToken: {{ .Values.serviceAccount.automount }}
{{- end }}

View File

@@ -0,0 +1,114 @@
# Default values for datamate.
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.
# This will set the replicaset count more information can be found here: https://kubernetes.io/docs/concepts/workloads/controllers/replicaset/
replicaCount: 1
# This sets the container image more information can be found here: https://kubernetes.io/docs/concepts/containers/images/
image:
repository: "datamate-backend"
# This sets the pull policy for images.
pullPolicy: "IfNotPresent"
# Overrides the image tag whose default is the chart appVersion.
tag: "latest"
# This is for the secrets for pulling an image from a private repository more information can be found here: https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/
imagePullSecrets: []
# This is to override the chart name.
nameOverride: "datamate-backend"
fullnameOverride: "datamate-backend"
env:
- name: namespace
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: SPRING_CONFIG_LOCATION
value: file:/opt/backend/application.yml
# This section builds out the service account more information can be found here: https://kubernetes.io/docs/concepts/security/service-accounts/
serviceAccount:
# Specifies whether a service account should be created
create: true
# Automatically mount a ServiceAccount's API credentials?
automount: true
# Annotations to add to the service account
annotations: {}
# The name of the service account to use.
# If not set and create is true, a name is generated using the fullname template
name: ""
# This is for setting Kubernetes Annotations to a Pod.
# For more information checkout: https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/
podAnnotations: {}
# This is for setting Kubernetes Labels to a Pod.
# For more information checkout: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
podLabels: {}
podSecurityContext: {}
# fsGroup: 2000
securityContext: {}
# capabilities:
# drop:
# - ALL
# readOnlyRootFilesystem: true
# runAsNonRoot: true
# runAsUser: 1000
# This is for setting up a service more information can be found here: https://kubernetes.io/docs/concepts/services-networking/service/
service:
# This sets the service type more information can be found here: https://kubernetes.io/docs/concepts/services-networking/service/#publishing-services-service-types
type: ClusterIP
# This sets the ports more information can be found here: https://kubernetes.io/docs/concepts/services-networking/service/#field-spec-ports
port: 8080
resources: {}
# We usually recommend not to specify default resources and to leave this as a conscious
# choice for the user. This also increases chances charts run on environments with little
# resources, such as Minikube. If you do want to specify resources, uncomment the following
# lines, adjust them as necessary, and remove the curly braces after 'resources:'.
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
# This is to setup the liveness and readiness probes more information can be found here: https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/
# livenessProbe:
# httpGet:
# path: /
# port: http
# readinessProbe:
# httpGet:
# path: /
# port: http
# This section is for setting up autoscaling more information can be found here: https://kubernetes.io/docs/concepts/workloads/autoscaling/
autoscaling:
enabled: false
minReplicas: 1
maxReplicas: 100
targetCPUUtilizationPercentage: 80
# targetMemoryUtilizationPercentage: 80
# Additional volumes on the output Deployment definition.
volumes: []
# - name: foo
# secret:
# secretName: mysecret
# optional: false
# Additional volumeMounts on the output Deployment definition.
volumeMounts: []
# - name: foo
# mountPath: "/etc/foo"
# readOnly: true
nodeSelector: {}
tolerations: []
affinity: {}

View File

@@ -0,0 +1,23 @@
# Patterns to ignore when building packages.
# This supports shell glob matching, relative path matching, and
# negation (prefixed with !). Only one pattern per line.
.DS_Store
# Common VCS dirs
.git/
.gitignore
.bzr/
.bzrignore
.hg/
.hgignore
.svn/
# Common backup files
*.swp
*.bak
*.tmp
*.orig
*~
# Various IDEs
.project
.idea/
*.tmproj
.vscode/

View File

@@ -0,0 +1,24 @@
apiVersion: v2
name: database
description: A Helm chart for Kubernetes
# A chart can be either an 'application' or a 'library' chart.
#
# Application charts are a collection of templates that can be packaged into versioned archives
# to be deployed.
#
# Library charts provide useful utilities or functions for the chart developer. They're included as
# a dependency of application charts to inject those utilities and functions into the rendering
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.0.1
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "0.0.1"

View File

@@ -0,0 +1,75 @@
{{/*
Expand the name of the chart.
*/}}
{{- define "database.name" -}}
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" -}}
{{- end }}
{{/*
Create a default fully qualified app name.
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
If release name contains chart name it will be used as a full name.
*/}}
{{- define "database.fullname" -}}
{{- if .Values.fullnameOverride }}
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" -}}
{{- else }}
{{- $name := default .Chart.Name .Values.nameOverride -}}
{{- if contains $name .Release.Name }}
{{- .Release.Name | trunc 63 | trimSuffix "-" -}}
{{- else }}
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" -}}
{{- end }}
{{- end }}
{{- end }}
{{/*
Create chart name and version as used by the chart label.
*/}}
{{- define "database.chart" -}}
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" -}}
{{- end }}
{{/*
Common labels
*/}}
{{- define "database.labels" -}}
helm.sh/chart: {{ include "database.chart" . }}
{{ include "database.selectorLabels" . }}
{{- if .Chart.AppVersion }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
{{- end }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
{{- end }}
{{/*
Selector labels
*/}}
{{- define "database.selectorLabels" -}}
app.kubernetes.io/name: {{ include "database.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end }}
{{/*
Create the name of the service account to use
*/}}
{{- define "database.serviceAccountName" -}}
{{- if .Values.serviceAccount.create }}
{{- default (include "database.fullname" .) .Values.serviceAccount.name -}}
{{- else }}
{{- default "default" .Values.serviceAccount.name -}}
{{- end }}
{{- end }}
{{/*
Name of image
*/}}
{{- define "database.image" -}}
{{- $name := default .Values.image.repository .Values.global.image.database.name }}
{{- $tag := default .Values.image.tag .Values.global.image.database.tag }}
{{- if .Values.global.image.repository }}
{{- .Values.global.image.repository | trimSuffix "/" }}/{{ $name }}:{{ $tag }}
{{- else }}
{{- $name }}:{{ $tag }}
{{- end }}
{{- end }}

View File

@@ -0,0 +1,22 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: datamate-mysql-utf8-config
data:
utf8.cnf: |
[mysqld]
# 设置服务器默认字符集为 utf8mb4 (推荐,支持完整的 UTF-8,包括 emoji)
character-set-server = utf8mb4
# 设置默认排序规则
collation-server = utf8mb4_unicode_ci
# 或者使用 utf8_general_ci (性能稍好,但排序规则稍宽松)
default-time-zone = 'Asia/Shanghai'
log_error=/var/log/datamate/database/error.log
[client]
# 设置客户端连接默认字符集
default-character-set = utf8mb4
[mysql]
# 设置 mysql 命令行客户端默认字符集
default-character-set = utf8mb4

View File

@@ -0,0 +1,82 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "database.fullname" . }}
labels:
{{- include "database.labels" . | nindent 4 }}
spec:
{{- if not .Values.autoscaling.enabled }}
replicas: {{ .Values.replicaCount }}
{{- end }}
selector:
matchLabels:
{{- include "database.selectorLabels" . | nindent 6 }}
template:
metadata:
{{- with .Values.podAnnotations }}
annotations:
{{- toYaml . | nindent 8 }}
{{- end }}
labels:
{{- include "database.labels" . | nindent 8 }}
{{- with .Values.podLabels }}
{{- toYaml . | nindent 8 }}
{{- end }}
spec:
{{- with .Values.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
serviceAccountName: {{ include "database.serviceAccountName" . }}
{{- with .Values.podSecurityContext }}
securityContext:
{{- toYaml . | nindent 8 }}
{{- end }}
containers:
- name: {{ .Chart.Name }}
{{- with .Values.securityContext }}
securityContext:
{{- toYaml . | nindent 12 }}
{{- end }}
image: "{{ include "database.image" . }}"
imagePullPolicy: {{ default .Values.global.image.pullPolicy .Values.image.pullPolicy }}
ports:
- name: http
containerPort: {{ .Values.service.port }}
protocol: TCP
{{- with .Values.livenessProbe }}
livenessProbe:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- with .Values.readinessProbe }}
readinessProbe:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- with .Values.resources }}
resources:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- with .Values.env }}
env:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- with .Values.volumeMounts }}
volumeMounts:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- with .Values.volumes }}
volumes:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}

View File

@@ -0,0 +1,15 @@
apiVersion: v1
kind: Service
metadata:
name: {{ include "database.fullname" . }}
labels:
{{- include "database.labels" . | nindent 4 }}
spec:
type: {{ .Values.service.type }}
ports:
- port: {{ .Values.service.port }}
targetPort: {{ .Values.service.port }}
protocol: TCP
name: {{ .Chart.Name }}
selector:
{{- include "database.selectorLabels" . | nindent 4 }}

View File

@@ -0,0 +1,13 @@
{{- if .Values.serviceAccount.create -}}
apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ include "database.serviceAccountName" . }}
labels:
{{- include "database.labels" . | nindent 4 }}
{{- with .Values.serviceAccount.annotations }}
annotations:
{{- toYaml . | nindent 4 }}
{{- end }}
automountServiceAccountToken: {{ .Values.serviceAccount.automount }}
{{- end }}

View File

@@ -0,0 +1,110 @@
# Default values for datamate.
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.
# This will set the replicaset count more information can be found here: https://kubernetes.io/docs/concepts/workloads/controllers/replicaset/
replicaCount: 1
# This sets the container image more information can be found here: https://kubernetes.io/docs/concepts/containers/images/
image:
repository: "mysql"
# This sets the pull policy for images.
pullPolicy: "IfNotPresent"
# Overrides the image tag whose default is the chart appVersion.
tag: "8"
# This is for the secrets for pulling an image from a private repository more information can be found here: https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/
imagePullSecrets: []
# This is to override the chart name.
nameOverride: "datamate-database"
fullnameOverride: "datamate-database"
env:
- name: MYSQL_ROOT_PASSWORD
value: "Huawei@123"
# This section builds out the service account more information can be found here: https://kubernetes.io/docs/concepts/security/service-accounts/
serviceAccount:
# Specifies whether a service account should be created
create: true
# Automatically mount a ServiceAccount's API credentials?
automount: true
# Annotations to add to the service account
annotations: {}
# The name of the service account to use.
# If not set and create is true, a name is generated using the fullname template
name: ""
# This is for setting Kubernetes Annotations to a Pod.
# For more information checkout: https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/
podAnnotations: {}
# This is for setting Kubernetes Labels to a Pod.
# For more information checkout: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
podLabels: {}
podSecurityContext: {}
# fsGroup: 2000
securityContext: {}
# capabilities:
# drop:
# - ALL
# readOnlyRootFilesystem: true
# runAsNonRoot: true
# runAsUser: 1000
# This is for setting up a service more information can be found here: https://kubernetes.io/docs/concepts/services-networking/service/
service:
# This sets the service type more information can be found here: https://kubernetes.io/docs/concepts/services-networking/service/#publishing-services-service-types
type: ClusterIP
# This sets the ports more information can be found here: https://kubernetes.io/docs/concepts/services-networking/service/#field-spec-ports
port: 3306
resources: {}
# We usually recommend not to specify default resources and to leave this as a conscious
# choice for the user. This also increases chances charts run on environments with little
# resources, such as Minikube. If you do want to specify resources, uncomment the following
# lines, adjust them as necessary, and remove the curly braces after 'resources:'.
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
# This is to setup the liveness and readiness probes more information can be found here: https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/
# livenessProbe:
# httpGet:
# path: /
# port: http
# readinessProbe:
# httpGet:
# path: /
# port: http
# This section is for setting up autoscaling more information can be found here: https://kubernetes.io/docs/concepts/workloads/autoscaling/
autoscaling:
enabled: false
minReplicas: 1
maxReplicas: 100
targetCPUUtilizationPercentage: 80
# targetMemoryUtilizationPercentage: 80
# Additional volumes on the output Deployment definition.
volumes: []
# - name: foo
# secret:
# secretName: mysecret
# optional: false
# Additional volumeMounts on the output Deployment definition.
volumeMounts: []
# - name: foo
# mountPath: "/etc/foo"
# readOnly: true
nodeSelector: {}
tolerations: []
affinity: {}

View File

@@ -0,0 +1,23 @@
# Patterns to ignore when building packages.
# This supports shell glob matching, relative path matching, and
# negation (prefixed with !). Only one pattern per line.
.DS_Store
# Common VCS dirs
.git/
.gitignore
.bzr/
.bzrignore
.hg/
.hgignore
.svn/
# Common backup files
*.swp
*.bak
*.tmp
*.orig
*~
# Various IDEs
.project
.idea/
*.tmproj
.vscode/

View File

@@ -0,0 +1,29 @@
apiVersion: v2
name: frontend
description: A Helm chart for Kubernetes
# A chart can be either an 'application' or a 'library' chart.
#
# Application charts are a collection of templates that can be packaged into versioned archives
# to be deployed.
#
# Library charts provide useful utilities or functions for the chart developer. They're included as
# a dependency of application charts to inject those utilities and functions into the rendering
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.0.1
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "0.0.1"
dependencies:
- name: backend
repository: file://../backend
version: 0.0.1

View File

@@ -0,0 +1,75 @@
{{/*
Expand the name of the chart.
*/}}
{{- define "frontend.name" -}}
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" -}}
{{- end }}
{{/*
Create a default fully qualified app name.
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
If release name contains chart name it will be used as a full name.
*/}}
{{- define "frontend.fullname" -}}
{{- if .Values.fullnameOverride }}
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" -}}
{{- else }}
{{- $name := default .Chart.Name .Values.nameOverride }}
{{- if contains $name .Release.Name }}
{{- .Release.Name | trunc 63 | trimSuffix "-" -}}
{{- else }}
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" -}}
{{- end }}
{{- end }}
{{- end }}
{{/*
Create chart name and version as used by the chart label.
*/}}
{{- define "frontend.chart" -}}
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" -}}
{{- end }}
{{/*
Common labels
*/}}
{{- define "frontend.labels" -}}
helm.sh/chart: {{ include "frontend.chart" . }}
{{ include "frontend.selectorLabels" . }}
{{- if .Chart.AppVersion }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
{{- end }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
{{- end }}
{{/*
Selector labels
*/}}
{{- define "frontend.selectorLabels" -}}
app.kubernetes.io/name: {{ include "frontend.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end }}
{{/*
Create the name of the service account to use
*/}}
{{- define "frontend.serviceAccountName" -}}
{{- if .Values.serviceAccount.create }}
{{- default (include "frontend.fullname" .) .Values.serviceAccount.name -}}
{{- else }}
{{- default "default" .Values.serviceAccount.name -}}
{{- end }}
{{- end }}
{{/*
Name of image
*/}}
{{- define "frontend.image" -}}
{{- $name := default .Values.image.repository .Values.global.image.frontend.name }}
{{- $tag := default .Values.image.tag .Values.global.image.frontend.tag }}
{{- if .Values.global.image.repository }}
{{- .Values.global.image.repository | trimSuffix "/" }}/{{ $name }}:{{ $tag }}
{{- else }}
{{- $name }}:{{ $tag }}
{{- end }}
{{- end }}

View File

@@ -0,0 +1,82 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "frontend.fullname" . }}
labels:
{{- include "frontend.labels" . | nindent 4 }}
spec:
{{- if not .Values.autoscaling.enabled }}
replicas: {{ .Values.replicaCount }}
{{- end }}
selector:
matchLabels:
{{- include "frontend.selectorLabels" . | nindent 6 }}
template:
metadata:
{{- with .Values.podAnnotations }}
annotations:
{{- toYaml . | nindent 8 }}
{{- end }}
labels:
{{- include "frontend.labels" . | nindent 8 }}
{{- with .Values.podLabels }}
{{- toYaml . | nindent 8 }}
{{- end }}
spec:
{{- with .Values.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
serviceAccountName: {{ include "frontend.serviceAccountName" . }}
{{- with .Values.podSecurityContext }}
securityContext:
{{- toYaml . | nindent 8 }}
{{- end }}
containers:
- name: {{ .Chart.Name }}
{{- with .Values.securityContext }}
securityContext:
{{- toYaml . | nindent 12 }}
{{- end }}
image: "{{ include "frontend.image" . }}"
imagePullPolicy: {{ default .Values.global.image.pullPolicy .Values.image.pullPolicy }}
ports:
- name: http
containerPort: {{ .Values.service.port }}
protocol: TCP
{{- with .Values.livenessProbe }}
livenessProbe:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- with .Values.readinessProbe }}
readinessProbe:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- with .Values.resources }}
resources:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- with .Values.env }}
env:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- with .Values.volumeMounts }}
volumeMounts:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- with .Values.volumes }}
volumes:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}

View File

@@ -0,0 +1,18 @@
apiVersion: v1
kind: Service
metadata:
name: {{ include "frontend.fullname" . }}
labels:
{{- include "frontend.labels" . | nindent 4 }}
spec:
type: {{ .Values.service.type }}
ports:
- port: {{ .Values.service.port }}
targetPort: {{ .Values.service.port }}
protocol: TCP
name: {{ .Chart.Name }}
{{- if eq .Values.service.type "NodePort" }}
nodePort: {{ .Values.service.nodePort }}
{{- end }}
selector:
{{- include "frontend.selectorLabels" . | nindent 4 }}

View File

@@ -0,0 +1,13 @@
{{- if .Values.serviceAccount.create -}}
apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ include "frontend.serviceAccountName" . }}
labels:
{{- include "frontend.labels" . | nindent 4 }}
{{- with .Values.serviceAccount.annotations }}
annotations:
{{- toYaml . | nindent 4 }}
{{- end }}
automountServiceAccountToken: {{ .Values.serviceAccount.automount }}
{{- end }}

View File

@@ -0,0 +1,109 @@
# Default values for datamate.
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.
# This will set the replicaset count more information can be found here: https://kubernetes.io/docs/concepts/workloads/controllers/replicaset/
replicaCount: 1
# This sets the container image more information can be found here: https://kubernetes.io/docs/concepts/containers/images/
image:
repository: "datamate-frontend"
# This sets the pull policy for images.
pullPolicy: "IfNotPresent"
# Overrides the image tag whose default is the chart appVersion.
tag: "latest"
# This is for the secrets for pulling an image from a private repository more information can be found here: https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/
imagePullSecrets: []
# This is to override the chart name.
nameOverride: "datamate-frontend"
fullnameOverride: "datamate-frontend"
env: []
# This section builds out the service account more information can be found here: https://kubernetes.io/docs/concepts/security/service-accounts/
serviceAccount:
# Specifies whether a service account should be created
create: true
# Automatically mount a ServiceAccount's API credentials?
automount: true
# Annotations to add to the service account
annotations: {}
# The name of the service account to use.
# If not set and create is true, a name is generated using the fullname template
name: ""
# This is for setting Kubernetes Annotations to a Pod.
# For more information checkout: https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/
podAnnotations: {}
# This is for setting Kubernetes Labels to a Pod.
# For more information checkout: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
podLabels: {}
podSecurityContext: {}
# fsGroup: 2000
securityContext: {}
# capabilities:
# drop:
# - ALL
# readOnlyRootFilesystem: true
# runAsNonRoot: true
# runAsUser: 1000
# This is for setting up a service more information can be found here: https://kubernetes.io/docs/concepts/services-networking/service/
service:
# This sets the service type more information can be found here: https://kubernetes.io/docs/concepts/services-networking/service/#publishing-services-service-types
type: NodePort
# This sets the ports more information can be found here: https://kubernetes.io/docs/concepts/services-networking/service/#field-spec-ports
port: 80
nodePort: 30000
resources: {}
# We usually recommend not to specify default resources and to leave this as a conscious
# choice for the user. This also increases chances charts run on environments with little
# resources, such as Minikube. If you do want to specify resources, uncomment the following
# lines, adjust them as necessary, and remove the curly braces after 'resources:'.
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
# This is to setup the liveness and readiness probes more information can be found here: https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/
# livenessProbe:
# httpGet:
# path: /
# port: http
# readinessProbe:
# httpGet:
# path: /
# port: http
# This section is for setting up autoscaling more information can be found here: https://kubernetes.io/docs/concepts/workloads/autoscaling/
autoscaling:
enabled: false
minReplicas: 1
maxReplicas: 100
targetCPUUtilizationPercentage: 80
# targetMemoryUtilizationPercentage: 80
# Additional volumes on the output Deployment definition.
volumes: []
# - name: foo
# secret:
# secretName: mysecret
# optional: false
# Additional volumeMounts on the output Deployment definition.
volumeMounts: []
# - name: foo
# mountPath: "/etc/foo"
# readOnly: true
nodeSelector: {}
tolerations: []
affinity: {}

View File

@@ -0,0 +1,83 @@
# Default values for datamate.
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.
global:
image:
repository: ""
pullPolicy: "IfNotPresent"
backend:
name: "datamate-backend"
tag: "latest"
frontend:
name: "datamate-frontend"
tag: "latest"
database:
name: "mysql"
tag: "8"
datasetVolume: &datasetVolume
name: dataset-volume
hostPath:
path: /opt/datamate/data/dataset
type: DirectoryOrCreate
flowVolume: &flowVolume
name: flow-volume
hostPath:
path: /opt/datamate/data/flow
type: DirectoryOrCreate
logVolume: &logVolume
name: log-volume
hostPath:
path: /opt/datamate/data/log
type: DirectoryOrCreate
dataVolume: &dataVolume
name: data-volume
hostPath:
path: /opt/datamate/data/mysql
type: DirectoryOrCreate
backend:
volumes:
- *datasetVolume
- *flowVolume
- *logVolume
volumeMounts:
- name: dataset-volume
mountPath: /dataset
- name: flow-volume
mountPath: /flow
- name: log-volume
mountPath: /var/log/datamate
frontend:
volumes:
- *logVolume
volumeMounts:
- name: log-volume
mountPath: /var/log/datamate/frontend
subPath: frontend
database:
volumes:
- *dataVolume
- *logVolume
- name: init-sql
configMap:
name: datamate-init-sql
- name: mysql-utf8-config
configMap:
name: datamate-mysql-utf8-config
volumeMounts:
- name: data-volume
mountPath: /var/lib/mysql
- name: log-volume
mountPath: /var/log/datamate/database
subPath: database
- name: init-sql
mountPath: /docker-entrypoint-initdb.d
- name: mysql-utf8-config
mountPath: /etc/mysql/conf.d

View File

@@ -1,33 +1,4 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
labels:
app: datamate
tier: backend
name: datamate-backend
rules:
- verbs:
- create
- list
- get
- delete
apiGroups:
- batch
resources:
- jobs
- verbs:
- list
apiGroups:
- ""
resources:
- pods
- verbs:
- get
- list
apiGroups:
- ""
resources:
- pods/log
--- ---
apiVersion: v1 apiVersion: v1
@@ -39,20 +10,7 @@ metadata:
name: datamate-backend name: datamate-backend
--- ---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
labels:
app: datamate
tier: backend
name: datamate-backend
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: datamate-backend
subjects:
- kind: ServiceAccount
name: datamate-backend
--- ---
apiVersion: apps/v1 apiVersion: apps/v1

View File

@@ -31,7 +31,10 @@ CREATE TABLE t_dc_task_executions (
error_message TEXT COMMENT '错误信息', error_message TEXT COMMENT '错误信息',
datax_job_id TEXT COMMENT 'datax任务ID', datax_job_id TEXT COMMENT 'datax任务ID',
result TEXT COMMENT '执行结果', result TEXT COMMENT '执行结果',
created_at TIMESTAMP NULL COMMENT '创建时间', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
created_by VARCHAR(255) COMMENT '创建者',
updated_by VARCHAR(255) COMMENT '更新者',
INDEX idx_task_id (task_id), INDEX idx_task_id (task_id),
INDEX idx_status (status), INDEX idx_status (status),
INDEX idx_started_at (started_at) INDEX idx_started_at (started_at)