refactor: rename and reorganize data models and repositories for clarity

This commit is contained in:
hhhhsc
2025-10-24 15:33:46 +08:00
parent d58c2a0ac7
commit 2d2419205a
60 changed files with 822 additions and 614 deletions

View File

@@ -108,7 +108,7 @@ datamate-docker-uninstall:
.PHONY: datamate-k8s-install
datamate-k8s-install: create-namespace
kubectl create configmap datamate-init-sql --from-file=scripts/db/ --dry-run=client -o yaml | kubectl apply -f - -n $(NAMESPACE)
helm install datamate deployment/helm/datamate/ -n $(NAMESPACE)
helm upgrade datamate deployment/helm/datamate/ -n $(NAMESPACE) --install
.PHONY: datamate-k8s-uninstall
datamate-k8s-uninstall:

View File

@@ -22,6 +22,11 @@
<artifactId>domain-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.datamate</groupId>
<artifactId>data-management-service</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>

View File

@@ -1,22 +1,16 @@
package com.datamate.cleaning;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* 数据归集服务配置类
*
* 基于DataX的数据归集和同步服务,支持多种数据源的数据采集和归集
*/
@SpringBootApplication
@EnableAsync
@EnableScheduling
@ComponentScan(basePackages = {
"com.datamate.cleaning",
"com.datamate.shared"
})
public class DataCleaningServiceConfiguration {
// Configuration class for JAR packaging - no main method needed
}

View File

@@ -1,29 +1,34 @@
package com.datamate.cleaning.application.service;
package com.datamate.cleaning.application;
import com.datamate.cleaning.application.httpclient.DatasetClient;
import com.datamate.cleaning.application.scheduler.CleaningTaskScheduler;
import com.datamate.cleaning.domain.converter.OperatorInstanceConverter;
import com.datamate.cleaning.domain.model.DatasetResponse;
import com.datamate.cleaning.domain.model.ExecutorType;
import com.datamate.cleaning.domain.model.OperatorInstancePo;
import com.datamate.cleaning.domain.model.PagedDatasetFileResponse;
import com.datamate.cleaning.common.enums.CleaningTaskStatusEnum;
import com.datamate.cleaning.common.enums.ExecutorType;
import com.datamate.cleaning.domain.model.TaskProcess;
import com.datamate.cleaning.infrastructure.persistence.mapper.CleaningResultMapper;
import com.datamate.cleaning.infrastructure.persistence.mapper.CleaningTaskMapper;
import com.datamate.cleaning.infrastructure.persistence.mapper.OperatorInstanceMapper;
import com.datamate.cleaning.domain.repository.CleaningResultRepository;
import com.datamate.cleaning.domain.repository.CleaningTaskRepository;
import com.datamate.cleaning.domain.repository.OperatorInstanceRepository;
import com.datamate.cleaning.interfaces.dto.CleaningProcess;
import com.datamate.cleaning.interfaces.dto.CleaningTask;
import com.datamate.cleaning.interfaces.dto.CleaningTaskDto;
import com.datamate.cleaning.interfaces.dto.CreateCleaningTaskRequest;
import com.datamate.cleaning.interfaces.dto.OperatorInstance;
import com.datamate.cleaning.interfaces.dto.OperatorInstanceDto;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.SystemErrorCode;
import com.datamate.datamanagement.application.DatasetApplicationService;
import com.datamate.datamanagement.application.DatasetFileApplicationService;
import com.datamate.datamanagement.common.enums.DatasetType;
import com.datamate.datamanagement.domain.model.dataset.Dataset;
import com.datamate.datamanagement.domain.model.dataset.DatasetFile;
import com.datamate.datamanagement.interfaces.dto.CreateDatasetRequest;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -42,58 +47,61 @@ import java.util.UUID;
@Service
@RequiredArgsConstructor
public class CleaningTaskService {
private final CleaningTaskMapper cleaningTaskMapper;
private final CleaningTaskRepository CleaningTaskRepo;
private final OperatorInstanceMapper operatorInstanceMapper;
private final OperatorInstanceRepository operatorInstanceRepo;
private final CleaningResultMapper cleaningResultMapper;
private final CleaningResultRepository cleaningResultRepo;
private final CleaningTaskScheduler taskScheduler;
private final DatasetApplicationService datasetService;
private final DatasetFileApplicationService datasetFileService;
private final String DATASET_PATH = "/dataset";
private final String FLOW_PATH = "/flow";
public List<CleaningTask> getTasks(String status, String keywords, Integer page, Integer size) {
Integer offset = page * size;
List<CleaningTask> tasks = cleaningTaskMapper.findTasks(status, keywords, size, offset);
public List<CleaningTaskDto> getTasks(String status, String keywords, Integer page, Integer size) {
List<CleaningTaskDto> tasks = CleaningTaskRepo.findTasks(status, keywords, page, size);
tasks.forEach(this::setProcess);
return tasks;
}
private void setProcess(CleaningTask task) {
int count = cleaningResultMapper.countByInstanceId(task.getId());
private void setProcess(CleaningTaskDto task) {
int count = cleaningResultRepo.countByInstanceId(task.getId());
task.setProgress(CleaningProcess.of(task.getFileCount(), count));
}
public int countTasks(String status, String keywords) {
return cleaningTaskMapper.findTasks(status, keywords, null, null).size();
return CleaningTaskRepo.findTasks(status, keywords, null, null).size();
}
@Transactional
public CleaningTask createTask(CreateCleaningTaskRequest request) {
DatasetResponse destDataset = DatasetClient.createDataset(request.getDestDatasetName(),
request.getDestDatasetType());
public CleaningTaskDto createTask(CreateCleaningTaskRequest request) {
CreateDatasetRequest createDatasetRequest = new CreateDatasetRequest();
createDatasetRequest.setName(request.getDestDatasetName());
createDatasetRequest.setDatasetType(DatasetType.valueOf(request.getDestDatasetType()));
Dataset destDataset = datasetService.createDataset(createDatasetRequest);
DatasetResponse srcDataset = DatasetClient.getDataset(request.getSrcDatasetId());
Dataset srcDataset = datasetService.getDataset(request.getSrcDatasetId());
CleaningTask task = new CleaningTask();
CleaningTaskDto task = new CleaningTaskDto();
task.setName(request.getName());
task.setDescription(request.getDescription());
task.setStatus(CleaningTask.StatusEnum.PENDING);
task.setStatus(CleaningTaskStatusEnum.PENDING);
String taskId = UUID.randomUUID().toString();
task.setId(taskId);
task.setSrcDatasetId(request.getSrcDatasetId());
task.setSrcDatasetName(request.getSrcDatasetName());
task.setDestDatasetId(destDataset.getId());
task.setDestDatasetName(destDataset.getName());
task.setBeforeSize(srcDataset.getTotalSize());
task.setFileCount(srcDataset.getFileCount());
cleaningTaskMapper.insertTask(task);
task.setBeforeSize(srcDataset.getSizeBytes());
task.setFileCount(srcDataset.getFileCount().intValue());
CleaningTaskRepo.insertTask(task);
List<OperatorInstancePo> instancePos = request.getInstance().stream()
.map(OperatorInstanceConverter.INSTANCE::operatorToDo).toList();
operatorInstanceMapper.insertInstance(taskId, instancePos);
operatorInstanceRepo.insertInstance(taskId, request.getInstance());
prepareTask(task, request.getInstance());
scanDataset(taskId, request.getSrcDatasetId());
@@ -101,24 +109,24 @@ public class CleaningTaskService {
return task;
}
public CleaningTask getTask(String taskId) {
CleaningTask task = cleaningTaskMapper.findTaskById(taskId);
public CleaningTaskDto getTask(String taskId) {
CleaningTaskDto task = CleaningTaskRepo.findTaskById(taskId);
setProcess(task);
return task;
}
@Transactional
public void deleteTask(String taskId) {
cleaningTaskMapper.deleteTask(taskId);
operatorInstanceMapper.deleteByInstanceId(taskId);
cleaningResultMapper.deleteByInstanceId(taskId);
CleaningTaskRepo.deleteTaskById(taskId);
operatorInstanceRepo.deleteByInstanceId(taskId);
cleaningResultRepo.deleteByInstanceId(taskId);
}
public void executeTask(String taskId) {
taskScheduler.executeTask(taskId);
}
private void prepareTask(CleaningTask task, List<OperatorInstance> instances) {
private void prepareTask(CleaningTaskDto task, List<OperatorInstanceDto> instances) {
TaskProcess process = new TaskProcess();
process.setInstanceId(task.getId());
process.setDatasetId(task.getDestDatasetId());
@@ -153,13 +161,13 @@ public class CleaningTaskService {
int pageNumber = 0;
int pageSize = 500;
PageRequest pageRequest = PageRequest.of(pageNumber, pageSize);
PagedDatasetFileResponse datasetFile;
Page<DatasetFile> datasetFiles;
do {
datasetFile = DatasetClient.getDatasetFile(srcDatasetId, pageRequest);
if (datasetFile.getContent() != null && datasetFile.getContent().isEmpty()) {
datasetFiles = datasetFileService.getDatasetFiles(srcDatasetId, null, null, pageRequest);
if (datasetFiles.getContent().isEmpty()) {
break;
}
List<Map<String, Object>> files = datasetFile.getContent().stream()
List<Map<String, Object>> files = datasetFiles.getContent().stream()
.map(content -> Map.of("fileName", (Object) content.getFileName(),
"fileSize", content.getFileSize(),
"filePath", content.getFilePath(),
@@ -168,7 +176,7 @@ public class CleaningTaskService {
.toList();
writeListMapToJsonlFile(files, FLOW_PATH + "/" + taskId + "/dataset.jsonl");
pageNumber += 1;
} while (pageNumber < datasetFile.getTotalPages());
} while (pageNumber < datasetFiles.getTotalPages());
}
private void writeListMapToJsonlFile(List<Map<String, Object>> mapList, String fileName) {

View File

@@ -0,0 +1,88 @@
package com.datamate.cleaning.application;
import com.datamate.cleaning.domain.repository.CleaningTemplateRepository;
import com.datamate.cleaning.domain.repository.OperatorInstanceRepository;
import com.datamate.cleaning.interfaces.dto.*;
import com.datamate.cleaning.domain.model.entity.TemplateWithInstance;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
@Service
@RequiredArgsConstructor
public class CleaningTemplateService {
private final CleaningTemplateRepository cleaningTemplateRepo;
private final OperatorInstanceRepository operatorInstanceRepo;
public List<CleaningTemplateDto> getTemplates(String keywords) {
List<OperatorDto> allOperators = operatorInstanceRepo.findAllOperators();
Map<String, OperatorDto> operatorsMap = allOperators.stream()
.collect(Collectors.toMap(OperatorDto::getId, Function.identity()));
List<TemplateWithInstance> allTemplates = cleaningTemplateRepo.findAllTemplates(keywords);
Map<String, List<TemplateWithInstance>> templatesMap = allTemplates.stream()
.collect(Collectors.groupingBy(TemplateWithInstance::getId));
return templatesMap.entrySet().stream().map(twi -> {
List<TemplateWithInstance> value = twi.getValue();
CleaningTemplateDto template = new CleaningTemplateDto();
template.setId(twi.getKey());
template.setName(value.get(0).getName());
template.setDescription(value.get(0).getDescription());
template.setInstance(value.stream().filter(v -> StringUtils.isNotBlank(v.getOperatorId()))
.sorted(Comparator.comparingInt(TemplateWithInstance::getOpIndex))
.map(v -> {
OperatorDto operator = operatorsMap.get(v.getOperatorId());
if (StringUtils.isNotBlank(v.getSettingsOverride())) {
operator.setSettings(v.getSettingsOverride());
}
return operator;
}).toList());
template.setCreatedAt(value.get(0).getCreatedAt());
template.setUpdatedAt(value.get(0).getUpdatedAt());
return template;
}).toList();
}
@Transactional
public CleaningTemplateDto createTemplate(CreateCleaningTemplateRequest request) {
CleaningTemplateDto template = new CleaningTemplateDto();
String templateId = UUID.randomUUID().toString();
template.setId(templateId);
template.setName(request.getName());
template.setDescription(request.getDescription());
cleaningTemplateRepo.insertTemplate(template);
operatorInstanceRepo.insertInstance(templateId, request.getInstance());
return template;
}
public CleaningTemplateDto getTemplate(String templateId) {
return cleaningTemplateRepo.findTemplateById(templateId);
}
@Transactional
public CleaningTemplateDto updateTemplate(String templateId, UpdateCleaningTemplateRequest request) {
CleaningTemplateDto template = cleaningTemplateRepo.findTemplateById(templateId);
if (template != null) {
template.setName(request.getName());
template.setDescription(request.getDescription());
cleaningTemplateRepo.updateTemplate(template);
}
return template;
}
@Transactional
public void deleteTemplate(String templateId) {
cleaningTemplateRepo.deleteTemplate(templateId);
operatorInstanceRepo.deleteByInstanceId(templateId);
}
}

View File

@@ -1,120 +0,0 @@
package com.datamate.cleaning.application.httpclient;
import com.datamate.cleaning.domain.model.CreateDatasetRequest;
import com.datamate.cleaning.domain.model.DatasetResponse;
import com.datamate.cleaning.domain.model.PagedDatasetFileResponse;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.ErrorCodeImpl;
import com.datamate.common.infrastructure.exception.SystemErrorCode;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.PageRequest;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.Map;
import java.util.stream.Collectors;
@Slf4j
public class DatasetClient {
private static final String BASE_URL = "http://localhost:8080/api";
private static final String CREATE_DATASET_URL = BASE_URL + "/data-management/datasets";
private static final String GET_DATASET_URL = BASE_URL + "/data-management/datasets/{0}";
private static final String GET_DATASET_FILE_URL = BASE_URL + "/data-management/datasets/{0}/files";
private static final HttpClient CLIENT = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(10)).build();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
static {
OBJECT_MAPPER.registerModule(new JavaTimeModule());
}
public static DatasetResponse createDataset(String name, String type) {
CreateDatasetRequest createDatasetRequest = new CreateDatasetRequest();
createDatasetRequest.setName(name);
createDatasetRequest.setDatasetType(type);
String jsonPayload;
try {
jsonPayload = OBJECT_MAPPER.writeValueAsString(createDatasetRequest);
} catch (IOException e) {
log.error("Error occurred while converting the object.", e);
throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR);
}
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(CREATE_DATASET_URL))
.timeout(Duration.ofSeconds(30))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(jsonPayload))
.build();
return sendAndReturn(request, DatasetResponse.class);
}
public static DatasetResponse getDataset(String datasetId) {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(MessageFormat.format(GET_DATASET_URL, datasetId)))
.timeout(Duration.ofSeconds(30))
.header("Content-Type", "application/json")
.GET()
.build();
return sendAndReturn(request, DatasetResponse.class);
}
public static PagedDatasetFileResponse getDatasetFile(String datasetId, PageRequest page) {
String url = buildQueryParams(MessageFormat.format(GET_DATASET_FILE_URL, datasetId),
Map.of("page", page.getPageNumber(), "size", page.getPageSize()));
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.timeout(Duration.ofSeconds(30))
.header("Content-Type", "application/json")
.GET()
.build();
return sendAndReturn(request, PagedDatasetFileResponse.class);
}
private static <T> T sendAndReturn(HttpRequest request, Class<T> clazz) {
try {
HttpResponse<String> response = CLIENT.send(request, HttpResponse.BodyHandlers.ofString());
int statusCode = response.statusCode();
String responseBody = response.body();
JsonNode jsonNode = OBJECT_MAPPER.readTree(responseBody);
if (statusCode < 200 || statusCode >= 300) {
String code = jsonNode.get("code").asText();
String message = jsonNode.get("message").asText();
throw BusinessException.of(ErrorCodeImpl.of(code, message));
}
return OBJECT_MAPPER.treeToValue(jsonNode.get("data"), clazz);
} catch (IOException | InterruptedException e) {
log.error("Error occurred while making the request.", e);
throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR);
}
}
private static String buildQueryParams(String baseUrl, Map<String, Object> params) {
if (params == null || params.isEmpty()) {
return baseUrl;
}
String queryString = params.entrySet().stream()
.map(entry -> entry.getKey() + entry.getValue().toString())
.collect(Collectors.joining("&"));
return baseUrl + (baseUrl.contains("?") ? "&" : "?") + queryString;
}
}

View File

@@ -1,8 +1,9 @@
package com.datamate.cleaning.application.scheduler;
import com.datamate.cleaning.application.httpclient.RuntimeClient;
import com.datamate.cleaning.infrastructure.persistence.mapper.CleaningTaskMapper;
import com.datamate.cleaning.interfaces.dto.CleaningTask;
import com.datamate.cleaning.infrastructure.httpclient.RuntimeClient;
import com.datamate.cleaning.common.enums.CleaningTaskStatusEnum;
import com.datamate.cleaning.domain.repository.CleaningTaskRepository;
import com.datamate.cleaning.interfaces.dto.CleaningTaskDto;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
@@ -13,7 +14,7 @@ import java.util.concurrent.Executors;
@Service
@RequiredArgsConstructor
public class CleaningTaskScheduler {
private final CleaningTaskMapper cleaningTaskMapper;
private final CleaningTaskRepository cleaningTaskRepo;
private final ExecutorService taskExecutor = Executors.newFixedThreadPool(5);
@@ -22,19 +23,19 @@ public class CleaningTaskScheduler {
}
private void submitTask(String taskId) {
CleaningTask task = new CleaningTask();
CleaningTaskDto task = new CleaningTaskDto();
task.setId(taskId);
task.setStatus(CleaningTask.StatusEnum.RUNNING);
task.setStatus(CleaningTaskStatusEnum.RUNNING);
task.setStartedAt(LocalDateTime.now());
cleaningTaskMapper.updateTask(task);
cleaningTaskRepo.updateTask(task);
RuntimeClient.submitTask(taskId);
}
public void stopTask(String taskId) {
RuntimeClient.stopTask(taskId);
CleaningTask task = new CleaningTask();
CleaningTaskDto task = new CleaningTaskDto();
task.setId(taskId);
task.setStatus(CleaningTask.StatusEnum.STOPPED);
cleaningTaskMapper.updateTask(task);
task.setStatus(CleaningTaskStatusEnum.STOPPED);
cleaningTaskRepo.updateTask(task);
}
}

View File

@@ -1,95 +0,0 @@
package com.datamate.cleaning.application.service;
import com.datamate.cleaning.domain.converter.OperatorInstanceConverter;
import com.datamate.cleaning.domain.model.OperatorInstancePo;
import com.datamate.cleaning.domain.model.TemplateWithInstance;
import com.datamate.cleaning.infrastructure.persistence.mapper.CleaningTemplateMapper;
import com.datamate.cleaning.infrastructure.persistence.mapper.OperatorInstanceMapper;
import com.datamate.cleaning.interfaces.dto.CleaningTemplate;
import com.datamate.cleaning.interfaces.dto.CreateCleaningTemplateRequest;
import com.datamate.cleaning.interfaces.dto.OperatorResponse;
import com.datamate.cleaning.interfaces.dto.UpdateCleaningTemplateRequest;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
@Service
@RequiredArgsConstructor
public class CleaningTemplateService {
private final CleaningTemplateMapper cleaningTemplateMapper;
private final OperatorInstanceMapper operatorInstanceMapper;
public List<CleaningTemplate> getTemplates(String keywords) {
List<OperatorResponse> allOperators = cleaningTemplateMapper.findAllOperators();
Map<String, OperatorResponse> operatorsMap = allOperators.stream()
.collect(Collectors.toMap(OperatorResponse::getId, Function.identity()));
List<TemplateWithInstance> allTemplates = cleaningTemplateMapper.findAllTemplates(keywords);
Map<String, List<TemplateWithInstance>> templatesMap = allTemplates.stream()
.collect(Collectors.groupingBy(TemplateWithInstance::getId));
return templatesMap.entrySet().stream().map(twi -> {
List<TemplateWithInstance> value = twi.getValue();
CleaningTemplate template = new CleaningTemplate();
template.setId(twi.getKey());
template.setName(value.get(0).getName());
template.setDescription(value.get(0).getDescription());
template.setInstance(value.stream().filter(v -> StringUtils.isNotBlank(v.getOperatorId()))
.sorted(Comparator.comparingInt(TemplateWithInstance::getOpIndex))
.map(v -> {
OperatorResponse operator = operatorsMap.get(v.getOperatorId());
if (StringUtils.isNotBlank(v.getSettingsOverride())) {
operator.setSettings(v.getSettingsOverride());
}
return operator;
}).toList());
template.setCreatedAt(value.get(0).getCreatedAt());
template.setUpdatedAt(value.get(0).getUpdatedAt());
return template;
}).toList();
}
@Transactional
public CleaningTemplate createTemplate(CreateCleaningTemplateRequest request) {
CleaningTemplate template = new CleaningTemplate();
String templateId = UUID.randomUUID().toString();
template.setId(templateId);
template.setName(request.getName());
template.setDescription(request.getDescription());
cleaningTemplateMapper.insertTemplate(template);
List<OperatorInstancePo> instancePos = request.getInstance().stream()
.map(OperatorInstanceConverter.INSTANCE::operatorToDo).toList();
operatorInstanceMapper.insertInstance(templateId, instancePos);
return template;
}
public CleaningTemplate getTemplate(String templateId) {
return cleaningTemplateMapper.findTemplateById(templateId);
}
@Transactional
public CleaningTemplate updateTemplate(String templateId, UpdateCleaningTemplateRequest request) {
CleaningTemplate template = cleaningTemplateMapper.findTemplateById(templateId);
if (template != null) {
template.setName(request.getName());
template.setDescription(request.getDescription());
cleaningTemplateMapper.updateTemplate(template);
}
return template;
}
@Transactional
public void deleteTemplate(String templateId) {
cleaningTemplateMapper.deleteTemplate(templateId);
operatorInstanceMapper.deleteByInstanceId(templateId);
}
}

View File

@@ -0,0 +1,39 @@
package com.datamate.cleaning.common.enums;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.SystemErrorCode;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
public enum CleaningTaskStatusEnum {
PENDING("PENDING"),
RUNNING("RUNNING"),
COMPLETED("COMPLETED"),
STOPPED("STOPPED"),
FAILED("FAILED");
private final String value;
CleaningTaskStatusEnum(String value) {
this.value = value;
}
@JsonValue
public String getValue() {
return value;
}
@JsonCreator
public static CleaningTaskStatusEnum fromValue(String value) {
for (CleaningTaskStatusEnum b : CleaningTaskStatusEnum.values()) {
if (b.value.equals(value)) {
return b;
}
}
throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER);
}
}

View File

@@ -1,4 +1,4 @@
package com.datamate.cleaning.infrastructure.exception;
package com.datamate.cleaning.common.exception;
import com.datamate.common.infrastructure.exception.ErrorCode;
import lombok.AllArgsConstructor;

View File

@@ -1,13 +0,0 @@
package com.datamate.cleaning.domain.model;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class OperatorInstancePo {
private String id;
private String overrides;
}

View File

@@ -0,0 +1,32 @@
package com.datamate.cleaning.domain.model.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
@TableName(value = "t_clean_result", autoResultMap = true)
public class CleaningResult {
private String instanceId;
private String srcFileId;
private String destFileId;
private String srcName;
private String destName;
private String srcType;
private String destType;
private long srcSize;
private long destSize;
private String status;
private String result;
}

View File

@@ -0,0 +1,46 @@
package com.datamate.cleaning.domain.model.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import com.datamate.cleaning.common.enums.CleaningTaskStatusEnum;
import lombok.Getter;
import lombok.Setter;
import java.time.LocalDateTime;
/**
* CleaningTask
*/
@Getter
@Setter
@TableName(value = "t_clean_task", autoResultMap = true)
public class CleaningTask {
private String id;
private String name;
private String description;
private CleaningTaskStatusEnum status;
private String srcDatasetId;
private String srcDatasetName;
private String destDatasetId;
private String destDatasetName;
private Long beforeSize;
private Long afterSize;
private Integer fileCount;
private LocalDateTime createdAt;
private LocalDateTime startedAt;
private LocalDateTime finishedAt;
}

View File

@@ -0,0 +1,26 @@
package com.datamate.cleaning.domain.model.entity;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Getter;
import lombok.Setter;
import java.time.LocalDateTime;
@Getter
@Setter
@TableName(value = "t_clean_template", autoResultMap = true)
public class CleaningTemplate {
@TableId
private String id;
private String name;
private String description;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
private String createdBy;
}

View File

@@ -0,0 +1,36 @@
package com.datamate.cleaning.domain.model.entity;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Getter;
import lombok.Setter;
import java.time.LocalDateTime;
@Getter
@Setter
@TableName(value = "t_operator", autoResultMap = true)
public class Operator {
@TableId
private String id;
private String name;
private String description;
private String version;
private String inputs;
private String outputs;
private String runtime;
private String settings;
private Boolean isStar;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
}

View File

@@ -0,0 +1,18 @@
package com.datamate.cleaning.domain.model.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
@TableName(value = "t_operator_instance", autoResultMap = true)
public class OperatorInstance {
private String instanceId;
private String operatorId;
private int opIndex;
private String settingsOverride;
}

View File

@@ -1,8 +1,7 @@
package com.datamate.cleaning.domain.model;
package com.datamate.cleaning.domain.model.entity;
import lombok.Getter;
import lombok.Setter;
import org.springframework.format.annotation.DateTimeFormat;
import java.time.LocalDateTime;
@@ -16,10 +15,8 @@ public class TemplateWithInstance {
private String description;
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
private LocalDateTime createdAt;
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
private LocalDateTime updatedAt;
private String operatorId;

View File

@@ -0,0 +1,11 @@
package com.datamate.cleaning.domain.repository;
import com.baomidou.mybatisplus.extension.repository.IRepository;
import com.datamate.cleaning.domain.model.entity.CleaningResult;
public interface CleaningResultRepository extends IRepository<CleaningResult> {
void deleteByInstanceId(String instanceId);
int countByInstanceId(String instanceId);
}

View File

@@ -0,0 +1,19 @@
package com.datamate.cleaning.domain.repository;
import com.baomidou.mybatisplus.extension.repository.IRepository;
import com.datamate.cleaning.domain.model.entity.CleaningTask;
import com.datamate.cleaning.interfaces.dto.CleaningTaskDto;
import java.util.List;
public interface CleaningTaskRepository extends IRepository<CleaningTask> {
List<CleaningTaskDto> findTasks(String status, String keywords, Integer page, Integer size);
CleaningTaskDto findTaskById(String taskId);
void insertTask(CleaningTaskDto task);
void updateTask(CleaningTaskDto task);
void deleteTaskById(String taskId);
}

View File

@@ -0,0 +1,20 @@
package com.datamate.cleaning.domain.repository;
import com.baomidou.mybatisplus.extension.repository.IRepository;
import com.datamate.cleaning.domain.model.entity.TemplateWithInstance;
import com.datamate.cleaning.domain.model.entity.CleaningTemplate;
import com.datamate.cleaning.interfaces.dto.CleaningTemplateDto;
import java.util.List;
public interface CleaningTemplateRepository extends IRepository<CleaningTemplate> {
List<TemplateWithInstance> findAllTemplates(String keywords);
CleaningTemplateDto findTemplateById(String templateId);
void insertTemplate(CleaningTemplateDto template);
void updateTemplate(CleaningTemplateDto template);
void deleteTemplate(String templateId);
}

View File

@@ -0,0 +1,16 @@
package com.datamate.cleaning.domain.repository;
import com.baomidou.mybatisplus.extension.repository.IRepository;
import com.datamate.cleaning.interfaces.dto.OperatorDto;
import com.datamate.cleaning.interfaces.dto.OperatorInstanceDto;
import com.datamate.cleaning.domain.model.entity.OperatorInstance;
import java.util.List;
public interface OperatorInstanceRepository extends IRepository<OperatorInstance> {
List<OperatorDto> findAllOperators();
void insertInstance(String instanceId, List<OperatorInstanceDto> instances);
void deleteByInstanceId(String instanceId);
}

View File

@@ -0,0 +1,19 @@
package com.datamate.cleaning.infrastructure.converter;
import com.datamate.cleaning.domain.model.entity.CleaningTask;
import com.datamate.cleaning.interfaces.dto.CleaningTaskDto;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
import java.util.List;
@Mapper
public interface CleaningTaskConverter {
CleaningTaskConverter INSTANCE = Mappers.getMapper(CleaningTaskConverter.class);
CleaningTaskDto fromEntityToDto(CleaningTask source);
List<CleaningTaskDto> fromEntityToDto(List<CleaningTask> source);
CleaningTask fromDtoToEntity(CleaningTaskDto source);
}

View File

@@ -0,0 +1,15 @@
package com.datamate.cleaning.infrastructure.converter;
import com.datamate.cleaning.domain.model.entity.CleaningTemplate;
import com.datamate.cleaning.interfaces.dto.CleaningTemplateDto;
import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
@Mapper
public interface CleaningTemplateConverter {
CleaningTemplateConverter INSTANCE = Mappers.getMapper(CleaningTemplateConverter.class);
CleaningTemplate fromDtoToEntity(CleaningTemplateDto dto);
CleaningTemplateDto fromEntityToDto(CleaningTemplate entity);
}

View File

@@ -1,8 +1,10 @@
package com.datamate.cleaning.domain.converter;
package com.datamate.cleaning.infrastructure.converter;
import com.datamate.cleaning.domain.model.OperatorInstancePo;
import com.datamate.cleaning.interfaces.dto.OperatorInstance;
import com.datamate.cleaning.domain.model.entity.OperatorInstance;
import com.datamate.cleaning.domain.model.entity.Operator;
import com.datamate.cleaning.interfaces.dto.OperatorDto;
import com.datamate.cleaning.interfaces.dto.OperatorInstanceDto;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.SystemErrorCode;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -12,17 +14,19 @@ import org.mapstruct.Mapping;
import org.mapstruct.Named;
import org.mapstruct.factory.Mappers;
import java.util.List;
import java.util.Map;
@Mapper
public interface OperatorInstanceConverter {
OperatorInstanceConverter INSTANCE = Mappers.getMapper(OperatorInstanceConverter.class);
@Mapping(target = "overrides", source = "overrides", qualifiedByName = "mapToJson")
OperatorInstancePo operatorToDo(OperatorInstance instance);
@Mapping(target = "settingsOverride", source = "overrides", qualifiedByName = "mapToString")
@Mapping(target = "operatorId", source = "id")
OperatorInstance fromDtoToEntity(OperatorInstanceDto instance);
@Named("mapToJson")
static String mapToJson(Map<String, Object> objects) {
@Named("mapToString")
static String mapToString(Map<String, Object> objects) {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.writeValueAsString(objects);
@@ -30,4 +34,6 @@ public interface OperatorInstanceConverter {
throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR);
}
}
List<OperatorDto> fromEntityToDto(List<Operator> operator);
}

View File

@@ -1,4 +1,4 @@
package com.datamate.cleaning.application.httpclient;
package com.datamate.cleaning.infrastructure.httpclient;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.SystemErrorCode;

View File

@@ -0,0 +1,30 @@
package com.datamate.cleaning.infrastructure.persistence.Impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.repository.CrudRepository;
import com.datamate.cleaning.domain.model.entity.CleaningResult;
import com.datamate.cleaning.domain.repository.CleaningResultRepository;
import com.datamate.cleaning.infrastructure.persistence.mapper.CleaningResultMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Repository;
@Repository
@RequiredArgsConstructor
public class CleaningResultRepositoryImpl extends CrudRepository<CleaningResultMapper, CleaningResult>
implements CleaningResultRepository {
private final CleaningResultMapper mapper;
@Override
public void deleteByInstanceId(String instanceId) {
LambdaQueryWrapper<CleaningResult> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(CleaningResult::getInstanceId, instanceId);
mapper.delete(queryWrapper);
}
@Override
public int countByInstanceId(String instanceId) {
LambdaQueryWrapper<CleaningResult> lambdaWrapper = new LambdaQueryWrapper<>();
lambdaWrapper.eq(CleaningResult::getInstanceId, instanceId);
return mapper.selectCount(lambdaWrapper).intValue();
}
}

View File

@@ -0,0 +1,54 @@
package com.datamate.cleaning.infrastructure.persistence.Impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.repository.CrudRepository;
import com.datamate.cleaning.domain.model.entity.CleaningTask;
import com.datamate.cleaning.domain.repository.CleaningTaskRepository;
import com.datamate.cleaning.infrastructure.converter.CleaningTaskConverter;
import com.datamate.cleaning.infrastructure.persistence.mapper.CleaningTaskMapper;
import com.datamate.cleaning.interfaces.dto.CleaningTaskDto;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Repository;
import java.util.List;
@Repository
@RequiredArgsConstructor
public class CleaningTaskRepositoryImpl extends CrudRepository<CleaningTaskMapper, CleaningTask>
implements CleaningTaskRepository {
private final CleaningTaskMapper mapper;
public List<CleaningTaskDto> findTasks(String status, String keywords, Integer page, Integer size) {
LambdaQueryWrapper<CleaningTask> lambdaWrapper = new LambdaQueryWrapper<>();
lambdaWrapper.eq(StringUtils.isNotBlank(status), CleaningTask::getStatus, status)
.like(StringUtils.isNotBlank(keywords), CleaningTask::getName, keywords)
.orderByDesc(CleaningTask::getCreatedAt);
if (size != null && page != null) {
Page<CleaningTask> queryPage = new Page<>(page + 1, size);
IPage<CleaningTask> resultPage = mapper.selectPage(queryPage, lambdaWrapper);
return CleaningTaskConverter.INSTANCE.fromEntityToDto(resultPage.getRecords());
} else {
return CleaningTaskConverter.INSTANCE.fromEntityToDto(mapper.selectList(lambdaWrapper));
}
}
public CleaningTaskDto findTaskById(String taskId) {
return CleaningTaskConverter.INSTANCE.fromEntityToDto(mapper.selectById(taskId));
}
public void insertTask(CleaningTaskDto task) {
mapper.insert(CleaningTaskConverter.INSTANCE.fromDtoToEntity(task));
}
public void updateTask(CleaningTaskDto task) {
mapper.updateById(CleaningTaskConverter.INSTANCE.fromDtoToEntity(task));
}
public void deleteTaskById(String taskId) {
mapper.deleteById(taskId);
}
}

View File

@@ -0,0 +1,52 @@
package com.datamate.cleaning.infrastructure.persistence.Impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.repository.CrudRepository;
import com.datamate.cleaning.domain.model.entity.TemplateWithInstance;
import com.datamate.cleaning.domain.model.entity.CleaningTemplate;
import com.datamate.cleaning.domain.repository.CleaningTemplateRepository;
import com.datamate.cleaning.infrastructure.converter.CleaningTemplateConverter;
import com.datamate.cleaning.infrastructure.persistence.mapper.CleaningTemplateMapper;
import com.datamate.cleaning.interfaces.dto.CleaningTemplateDto;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Repository;
import java.util.List;
@Repository
@RequiredArgsConstructor
public class CleaningTemplateRepositoryImpl extends CrudRepository<CleaningTemplateMapper, CleaningTemplate>
implements CleaningTemplateRepository {
private final CleaningTemplateMapper mapper;
@Override
public List<TemplateWithInstance> findAllTemplates(String keywords) {
QueryWrapper<TemplateWithInstance> queryWrapper = new QueryWrapper<>();
queryWrapper.like(StringUtils.isNotBlank(keywords), "name", keywords)
.orderByDesc("created_at");
return mapper.findAllTemplates(queryWrapper);
}
@Override
public CleaningTemplateDto findTemplateById(String templateId) {
return CleaningTemplateConverter.INSTANCE.fromEntityToDto(mapper.selectById(templateId));
}
@Override
public void insertTemplate(CleaningTemplateDto template) {
mapper.insert(CleaningTemplateConverter.INSTANCE.fromDtoToEntity(template));
}
@Override
public void updateTemplate(CleaningTemplateDto template) {
mapper.updateById(CleaningTemplateConverter.INSTANCE.fromDtoToEntity(template));
}
@Override
public void deleteTemplate(String templateId) {
mapper.deleteById(templateId);
}
}

View File

@@ -0,0 +1,46 @@
package com.datamate.cleaning.infrastructure.persistence.Impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.repository.CrudRepository;
import com.datamate.cleaning.infrastructure.converter.OperatorInstanceConverter;
import com.datamate.cleaning.interfaces.dto.OperatorDto;
import com.datamate.cleaning.interfaces.dto.OperatorInstanceDto;
import com.datamate.cleaning.domain.model.entity.OperatorInstance;
import com.datamate.cleaning.domain.repository.OperatorInstanceRepository;
import com.datamate.cleaning.infrastructure.persistence.mapper.OperatorInstanceMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Repository;
import java.util.ArrayList;
import java.util.List;
@Repository
@RequiredArgsConstructor
public class OperatorInstanceRepositoryImpl extends CrudRepository<OperatorInstanceMapper, OperatorInstance>
implements OperatorInstanceRepository {
private final OperatorInstanceMapper mapper;
@Override
public List<OperatorDto> findAllOperators() {
return OperatorInstanceConverter.INSTANCE.fromEntityToDto(mapper.findAllOperators());
}
@Override
public void insertInstance(String instanceId, List<OperatorInstanceDto> instances) {
List<OperatorInstance> operatorInstances = new ArrayList<>();
for (int i = 0; i < instances.size(); i++) {
OperatorInstance operatorInstance = OperatorInstanceConverter.INSTANCE.fromDtoToEntity(instances.get(i));
operatorInstance.setInstanceId(instanceId);
operatorInstance.setOpIndex(i + 1);
operatorInstances.add(operatorInstance);
}
mapper.insert(operatorInstances);
}
@Override
public void deleteByInstanceId(String instanceId) {
LambdaQueryWrapper<OperatorInstance> lambdaWrapper = new LambdaQueryWrapper<>();
lambdaWrapper.eq(OperatorInstance::getInstanceId, instanceId);
mapper.delete(lambdaWrapper);
}
}

View File

@@ -1,11 +1,9 @@
package com.datamate.cleaning.infrastructure.persistence.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.datamate.cleaning.domain.model.entity.CleaningResult;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
@Mapper
public interface CleaningResultMapper {
void deleteByInstanceId(@Param("instanceId") String instanceId);
int countByInstanceId(@Param("instanceId") String instanceId);
public interface CleaningResultMapper extends BaseMapper<CleaningResult> {
}

View File

@@ -1,21 +1,9 @@
package com.datamate.cleaning.infrastructure.persistence.mapper;
import com.datamate.cleaning.interfaces.dto.CleaningTask;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.datamate.cleaning.domain.model.entity.CleaningTask;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
@Mapper
public interface CleaningTaskMapper {
List<CleaningTask> findTasks(@Param("status") String status, @Param("keywords") String keywords,
@Param("size") Integer size, @Param("offset") Integer offset);
CleaningTask findTaskById(@Param("taskId") String taskId);
void insertTask(CleaningTask task);
void updateTask(CleaningTask task);
void deleteTask(@Param("taskId") String taskId);
public interface CleaningTaskMapper extends BaseMapper<CleaningTask> {
}

View File

@@ -1,25 +1,20 @@
package com.datamate.cleaning.infrastructure.persistence.mapper;
import com.datamate.cleaning.domain.model.TemplateWithInstance;
import com.datamate.cleaning.interfaces.dto.CleaningTemplate;
import com.datamate.cleaning.interfaces.dto.OperatorResponse;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.toolkit.Constants;
import com.datamate.cleaning.domain.model.entity.TemplateWithInstance;
import com.datamate.cleaning.domain.model.entity.CleaningTemplate;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import java.util.List;
@Mapper
public interface CleaningTemplateMapper {
List<TemplateWithInstance> findAllTemplates(@Param("keywords") String keywords);
List<OperatorResponse> findAllOperators();
CleaningTemplate findTemplateById(@Param("templateId") String templateId);
void insertTemplate(CleaningTemplate template);
void updateTemplate(CleaningTemplate template);
void deleteTemplate(@Param("templateId") String templateId);
public interface CleaningTemplateMapper extends BaseMapper<CleaningTemplate> {
@Select("SELECT t.id AS id, name, description, created_at, updated_at, created_by, operator_id, op_index, " +
"settings_override FROM t_clean_template t LEFT JOIN t_operator_instance o ON t.id = o.instance_id " +
"${ew.customSqlSegment}")
List<TemplateWithInstance> findAllTemplates(@Param(Constants.WRAPPER) Wrapper<TemplateWithInstance> queryWrapper);
}

View File

@@ -1,17 +1,17 @@
package com.datamate.cleaning.infrastructure.persistence.mapper;
import com.datamate.cleaning.domain.model.OperatorInstancePo;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.datamate.cleaning.domain.model.entity.Operator;
import com.datamate.cleaning.domain.model.entity.OperatorInstance;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import java.util.List;
@Mapper
public interface OperatorInstanceMapper {
void insertInstance(@Param("instanceId") String instanceId,
@Param("instances") List<OperatorInstancePo> instances);
void deleteByInstanceId(@Param("instanceId") String instanceId);
public interface OperatorInstanceMapper extends BaseMapper<OperatorInstance> {
@Select("SELECT id, name, description, version, inputs, outputs, runtime, settings, is_star, created_at, " +
"updated_at FROM t_operator")
List<Operator> findAllOperators();
}

View File

@@ -1,94 +0,0 @@
package com.datamate.cleaning.interfaces.dto;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import java.time.LocalDateTime;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
import org.springframework.format.annotation.DateTimeFormat;
/**
* CleaningTask
*/
@Getter
@Setter
public class CleaningTask {
private String id;
private String name;
private String description;
private String srcDatasetId;
private String srcDatasetName;
private String destDatasetId;
private String destDatasetName;
private long beforeSize;
private long afterSize;
private int fileCount;
/**
* 任务当前状态
*/
public enum StatusEnum {
PENDING("PENDING"),
RUNNING("RUNNING"),
COMPLETED("COMPLETED"),
STOPPED("STOPPED"),
FAILED("FAILED");
private final String value;
StatusEnum(String value) {
this.value = value;
}
@JsonValue
public String getValue() {
return value;
}
@JsonCreator
public static StatusEnum fromValue(String value) {
for (StatusEnum b : StatusEnum.values()) {
if (b.value.equals(value)) {
return b;
}
}
throw new IllegalArgumentException("Unexpected value '" + value + "'");
}
}
private StatusEnum status;
private String templateId;
private List<OperatorResponse> instance;
private CleaningProcess progress;
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
private LocalDateTime createdAt;
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
private LocalDateTime startedAt;
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
private LocalDateTime finishedAt;
}

View File

@@ -0,0 +1,57 @@
package com.datamate.cleaning.interfaces.dto;
import com.datamate.cleaning.common.enums.CleaningTaskStatusEnum;
import java.time.LocalDateTime;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
import org.springframework.format.annotation.DateTimeFormat;
/**
* CleaningTask
*/
@Getter
@Setter
public class CleaningTaskDto {
private String id;
private String name;
private String description;
private String srcDatasetId;
private String srcDatasetName;
private String destDatasetId;
private String destDatasetName;
private Long beforeSize;
private Long afterSize;
private Integer fileCount;
private CleaningTaskStatusEnum status;
private String templateId;
private List<OperatorDto> instance;
private CleaningProcess progress;
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
private LocalDateTime createdAt;
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
private LocalDateTime startedAt;
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
private LocalDateTime finishedAt;
}

View File

@@ -14,7 +14,7 @@ import org.springframework.format.annotation.DateTimeFormat;
@Getter
@Setter
public class CleaningTemplate {
public class CleaningTemplateDto {
private String id;
@@ -22,7 +22,7 @@ public class CleaningTemplate {
private String description;
private List<OperatorResponse> instance = new ArrayList<>();
private List<OperatorDto> instance = new ArrayList<>();
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
private LocalDateTime createdAt;

View File

@@ -27,6 +27,6 @@ public class CreateCleaningTaskRequest {
private String destDatasetType;
private List<OperatorInstance> instance = new ArrayList<>();
private List<OperatorInstanceDto> instance = new ArrayList<>();
}

View File

@@ -18,6 +18,6 @@ public class CreateCleaningTemplateRequest {
private String description;
private List<OperatorInstance> instance = new ArrayList<>();
private List<OperatorInstanceDto> instance = new ArrayList<>();
}

View File

@@ -7,12 +7,12 @@ import lombok.Setter;
import org.springframework.format.annotation.DateTimeFormat;
/**
* OperatorResponse
* OperatorDto
*/
@Getter
@Setter
public class OperatorResponse {
public class OperatorDto {
private String id;

View File

@@ -13,7 +13,7 @@ import lombok.Setter;
@Getter
@Setter
public class OperatorInstance {
public class OperatorInstanceDto {
private String id;

View File

@@ -21,6 +21,6 @@ public class UpdateCleaningTemplateRequest {
private String description;
private List<OperatorInstance> instance = new ArrayList<>();
private List<OperatorInstanceDto> instance = new ArrayList<>();
}

View File

@@ -1,7 +1,7 @@
package com.datamate.cleaning.interfaces.api;
package com.datamate.cleaning.interfaces.rest;
import com.datamate.cleaning.application.service.CleaningTaskService;
import com.datamate.cleaning.interfaces.dto.CleaningTask;
import com.datamate.cleaning.application.CleaningTaskService;
import com.datamate.cleaning.interfaces.dto.CleaningTaskDto;
import com.datamate.cleaning.interfaces.dto.CreateCleaningTaskRequest;
import com.datamate.common.infrastructure.common.Response;
import com.datamate.common.interfaces.PagedResponse;
@@ -19,18 +19,18 @@ public class CleaningTaskController {
private final CleaningTaskService cleaningTaskService;
@GetMapping
public ResponseEntity<Response<PagedResponse<CleaningTask>>> cleaningTasksGet(
public ResponseEntity<Response<PagedResponse<CleaningTaskDto>>> cleaningTasksGet(
@RequestParam("page") Integer page,
@RequestParam("size") Integer size, @RequestParam(value = "status", required = false) String status,
@RequestParam(value = "keywords", required = false) String keywords) {
List<CleaningTask> tasks = cleaningTaskService.getTasks(status, keywords, page, size);
List<CleaningTaskDto> tasks = cleaningTaskService.getTasks(status, keywords, page, size);
int count = cleaningTaskService.countTasks(status, keywords);
int totalPages = (count + size + 1) / size;
return ResponseEntity.ok(Response.ok(PagedResponse.of(tasks, page, count, totalPages)));
}
@PostMapping
public ResponseEntity<Response<CleaningTask>> cleaningTasksPost(@RequestBody CreateCleaningTaskRequest request) {
public ResponseEntity<Response<CleaningTaskDto>> cleaningTasksPost(@RequestBody CreateCleaningTaskRequest request) {
return ResponseEntity.ok(Response.ok(cleaningTaskService.createTask(request)));
}
@@ -47,7 +47,7 @@ public class CleaningTaskController {
}
@GetMapping("/{taskId}")
public ResponseEntity<Response<CleaningTask>> cleaningTasksTaskIdGet(@PathVariable("taskId") String taskId) {
public ResponseEntity<Response<CleaningTaskDto>> cleaningTasksTaskIdGet(@PathVariable("taskId") String taskId) {
return ResponseEntity.ok(Response.ok(cleaningTaskService.getTask(taskId)));
}

View File

@@ -1,7 +1,7 @@
package com.datamate.cleaning.interfaces.api;
package com.datamate.cleaning.interfaces.rest;
import com.datamate.cleaning.application.service.CleaningTemplateService;
import com.datamate.cleaning.interfaces.dto.CleaningTemplate;
import com.datamate.cleaning.application.CleaningTemplateService;
import com.datamate.cleaning.interfaces.dto.CleaningTemplateDto;
import com.datamate.cleaning.interfaces.dto.CreateCleaningTemplateRequest;
import com.datamate.cleaning.interfaces.dto.UpdateCleaningTemplateRequest;
import com.datamate.common.infrastructure.common.Response;
@@ -29,38 +29,38 @@ public class CleaningTemplateController {
private final CleaningTemplateService cleaningTemplateService;
@GetMapping
public ResponseEntity<Response<PagedResponse<CleaningTemplate>>> cleaningTemplatesGet(
public ResponseEntity<Response<PagedResponse<CleaningTemplateDto>>> cleaningTemplatesGet(
@RequestParam(value = "page", required = false) Integer page,
@RequestParam(value = "size", required = false) Integer size,
@RequestParam(value = "keywords", required = false) String keyword) {
List<CleaningTemplate> templates = cleaningTemplateService.getTemplates(keyword);
List<CleaningTemplateDto> templates = cleaningTemplateService.getTemplates(keyword);
if (page == null || size == null) {
return ResponseEntity.ok(Response.ok(PagedResponse.of(templates.stream()
.sorted(Comparator.comparing(CleaningTemplate::getCreatedAt).reversed()).toList())));
.sorted(Comparator.comparing(CleaningTemplateDto::getCreatedAt).reversed()).toList())));
}
int count = templates.size();
int totalPages = (count + size + 1) / size;
List<CleaningTemplate> limitTemplates = templates.stream()
.sorted(Comparator.comparing(CleaningTemplate::getCreatedAt).reversed())
List<CleaningTemplateDto> limitTemplates = templates.stream()
.sorted(Comparator.comparing(CleaningTemplateDto::getCreatedAt).reversed())
.skip((long) page * size)
.limit(size).toList();
return ResponseEntity.ok(Response.ok(PagedResponse.of(limitTemplates, page, count, totalPages)));
}
@PostMapping
public ResponseEntity<Response<CleaningTemplate>> cleaningTemplatesPost(
public ResponseEntity<Response<CleaningTemplateDto>> cleaningTemplatesPost(
@RequestBody CreateCleaningTemplateRequest request) {
return ResponseEntity.ok(Response.ok(cleaningTemplateService.createTemplate(request)));
}
@GetMapping("/{templateId}")
public ResponseEntity<Response<CleaningTemplate>> cleaningTemplatesTemplateIdGet(
public ResponseEntity<Response<CleaningTemplateDto>> cleaningTemplatesTemplateIdGet(
@PathVariable("templateId") String templateId) {
return ResponseEntity.ok(Response.ok(cleaningTemplateService.getTemplate(templateId)));
}
@PutMapping("/{templateId}")
public ResponseEntity<Response<CleaningTemplate>> cleaningTemplatesTemplateIdPut(
public ResponseEntity<Response<CleaningTemplateDto>> cleaningTemplatesTemplateIdPut(
@PathVariable("templateId") String templateId, @RequestBody UpdateCleaningTemplateRequest request) {
return ResponseEntity.ok(Response.ok(cleaningTemplateService.updateTemplate(templateId, request)));
}

View File

@@ -1,12 +0,0 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.datamate.cleaning.infrastructure.persistence.mapper.CleaningResultMapper">
<delete id="deleteByInstanceId">
DELETE FROM t_clean_result WHERE instance_id = #{instanceId}
</delete>
<select id="countByInstanceId" resultType="java.lang.Integer">
SELECT COUNT(1) FROM t_clean_result WHERE instance_id = #{instanceId}
</select>
</mapper>

View File

@@ -1,56 +0,0 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.datamate.cleaning.infrastructure.persistence.mapper.CleaningTaskMapper">
<sql id="Base_Column_List">
id, name, description, src_dataset_id, src_dataset_name, dest_dataset_id, dest_dataset_name, before_size,
after_size, file_count, status, created_at, started_at, finished_at
</sql>
<select id="findTasks" resultType="com.datamate.cleaning.interfaces.dto.CleaningTask">
SELECT <include refid="Base_Column_List"/> FROM t_clean_task
<where>
<if test="status != null and status != ''">
AND status = #{status}
</if>
<if test="keywords != null and status != ''">
AND name LIKE CONCAT('%', #{keywords}, '%')
</if>
</where>
ORDER BY created_at DESC
<if test="size != null and offset != null">
LIMIT ${size} OFFSET ${offset}
</if>
</select>
<select id="findTaskById" resultType="com.datamate.cleaning.interfaces.dto.CleaningTask">
SELECT <include refid="Base_Column_List"/> FROM t_clean_task WHERE id = #{taskId}
</select>
<insert id="insertTask">
INSERT INTO t_clean_task (id, name, description, status, src_dataset_id, src_dataset_name, dest_dataset_id,
dest_dataset_name, before_size, after_size, file_count, created_at)
VALUES (#{id}, #{name}, #{description}, #{status}, #{srcDatasetId}, #{srcDatasetName}, #{destDatasetId},
#{destDatasetName}, #{beforeSize}, #{afterSize}, #{fileCount}, NOW())
</insert>
<update id="updateTask">
UPDATE t_clean_task
<set>
<if test="status != null">
status = #{status.value},
</if>
<if test="startedAt != null">
started_at = #{startedAt},
</if>
<if test="finishedAt != null">
finished_at = #{finishedAt},
</if>
</set>
WHERE id = #{id}
</update>
<delete id="deleteTask">
DELETE FROM t_clean_task WHERE id = #{taskId}
</delete>
</mapper>

View File

@@ -1,38 +0,0 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.datamate.cleaning.infrastructure.persistence.mapper.CleaningTemplateMapper">
<select id="findAllTemplates" resultType="com.datamate.cleaning.domain.model.TemplateWithInstance">
SELECT t.id AS id, name, description, created_at, updated_at, created_by, operator_id, op_index, settings_override
FROM t_clean_template t LEFT JOIN t_operator_instance o ON t.id = o.instance_id
<where>
<if test="keywords != null and status != ''">
AND name LIKE CONCAT('%', #{keywords}, '%')
</if>
</where>
ORDER BY created_at DESC
</select>
<select id="findAllOperators" resultType="com.datamate.cleaning.interfaces.dto.OperatorResponse">
SELECT id, name, description, version, inputs, outputs, runtime, settings, is_star, created_at, updated_at
FROM t_operator
</select>
<select id="findTemplateById" resultType="com.datamate.cleaning.interfaces.dto.CleaningTemplate">
SELECT * FROM t_clean_template WHERE id = #{templateId}
</select>
<insert id="insertTemplate">
INSERT INTO t_clean_template (id, name, description, created_at)
VALUES (#{id}, #{name}, #{description}, NOW())
</insert>
<update id="updateTemplate">
UPDATE t_clean_template SET name = #{name}, description = #{description}, updated_at = NOW() WHERE id = #{id}
</update>
<delete id="deleteTemplate">
DELETE FROM t_clean_template WHERE id = #{templateId}
</delete>
</mapper>

View File

@@ -1,16 +0,0 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.datamate.cleaning.infrastructure.persistence.mapper.OperatorInstanceMapper">
<insert id="insertInstance">
INSERT INTO t_operator_instance(instance_id, operator_id, op_index, settings_override)
VALUES
<foreach collection="instances" item="operator" separator="," index="index">
(#{instanceId}, #{operator.id}, #{index} + 1, #{operator.overrides})
</foreach>
</insert>
<delete id="deleteByInstanceId">
DELETE FROM t_operator_instance
WHERE instance_id = #{instanceId};
</delete>
</mapper>

View File

@@ -14,7 +14,7 @@ spring:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://datamate-database:3306/datamate?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
username: ${DB_USERNAME:root}
password: ${DB_PASSWORD:Huawei@123}
password: ${DB_PASSWORD:password}
hikari:
maximum-pool-size: 20
minimum-idle: 5

View File

@@ -34,7 +34,7 @@ services:
image: mysql:8
restart: on-failure
environment:
MYSQL_ROOT_PASSWORD: Huawei@123
MYSQL_ROOT_PASSWORD: password
ports:
- "3306"
command: |
@@ -61,7 +61,7 @@ services:
MYSQL_HOST: "datamate-database"
MYSQL_PORT: "3306"
MYSQL_USER: "root"
MYSQL_PASSWORD: "Huawei@123"
MYSQL_PASSWORD: "password"
MYSQL_DATABASE: "datamate"
ports:
- "8081"

View File

@@ -32,6 +32,28 @@ spec:
securityContext:
{{- toYaml . | nindent 8 }}
{{- end }}
initContainers:
{{- range .Values.initContainers }}
- name: {{ .name }}
{{- if .image }}
image: {{ .image }}
{{- else }}
image: {{ include "database.image" $ }}
{{- end }}
{{- if .imagePullPolicy }}
imagePullPolicy: {{ .imagePullPolicy }}
{{- else }}
imagePullPolicy: {{ default $.Values.global.image.pullPolicy $.Values.image.pullPolicy }}
{{- end }}
command:
{{- toYaml .command | nindent 12 }}
args:
{{- toYaml .args | nindent 12 }}
{{- with .volumeMounts }}
volumeMounts:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- end }}
containers:
- name: {{ .Chart.Name }}
{{- with .Values.securityContext }}

View File

@@ -21,7 +21,21 @@ fullnameOverride: "datamate-database"
env:
- name: MYSQL_ROOT_PASSWORD
value: "Huawei@123"
value: "password"
initContainers:
- name: init-log
command:
- sh
- -c
args:
- |
chown mysql:mysql /var/log/datamate/database
chmod 755 /var/log/datamate/database
volumeMounts:
- name: log-volume
mountPath: /var/log/datamate/database
subPath: database
# This section builds out the service account more information can be found here: https://kubernetes.io/docs/concepts/security/service-accounts/
serviceAccount:

View File

@@ -74,7 +74,7 @@ head:
- name: MYSQL_USER
value: "root"
- name: MYSQL_PASSWORD
value: "Huawei@123"
value: "password"
- name: MYSQL_DATABASE
value: "datamate"
# - name: EXAMPLE_ENV
@@ -151,7 +151,7 @@ head:
- name: MYSQL_USER
value: "root"
- name: MYSQL_PASSWORD
value: "Huawei@123"
value: "password"
- name: MYSQL_DATABASE
value: "datamate"
ports:
@@ -218,7 +218,7 @@ worker:
- name: MYSQL_USER
value: "root"
- name: MYSQL_PASSWORD
value: "Huawei@123"
value: "password"
- name: MYSQL_DATABASE
value: "datamate"
# - name: EXAMPLE_ENV

View File

@@ -40,7 +40,7 @@ spec:
imagePullPolicy: IfNotPresent
env:
- name: MYSQL_ROOT_PASSWORD
value: "Huawei@123"
value: "password"
ports:
- containerPort: 3306
volumeMounts:

View File

@@ -14,7 +14,7 @@ spring:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://datamate-database:3306/datamate?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
username: ${DB_USERNAME:root}
password: ${DB_PASSWORD:Huawei@123}
password: ${DB_PASSWORD:password}
hikari:
maximum-pool-size: 20
minimum-idle: 5

View File

@@ -14,7 +14,7 @@ spring:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://mysql:3306/datamate?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
username: ${DB_USERNAME:root}
password: ${DB_PASSWORD:Huawei@123}
password: ${DB_PASSWORD:password}
hikari:
maximum-pool-size: 20
minimum-idle: 5

View File

@@ -27,15 +27,15 @@ def ensure_database_and_user():
# 只在 MySQL 配置时执行
if not settings.mysql_host:
return
mysql_root_password = os.getenv('MYSQL_ROOT_PASSWORD', 'Huawei@123')
mysql_root_password = os.getenv('MYSQL_ROOT_PASSWORD', 'password')
# URL 编码密码以处理特殊字符
encoded_password = quote_plus(mysql_root_password)
# 使用 root 用户连接(不指定数据库)
root_url = f"mysql+pymysql://root:{encoded_password}@{settings.mysql_host}:{settings.mysql_port}/"
try:
root_engine = create_engine(root_url, poolclass=pool.NullPool)
with root_engine.connect() as conn:
@@ -45,24 +45,24 @@ def ensure_database_and_user():
f"CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci"
))
conn.commit()
# 创建用户(如果不存在)- 使用 MySQL 8 默认的 caching_sha2_password
conn.execute(text(
f"CREATE USER IF NOT EXISTS '{settings.mysql_user}'@'%' "
f"IDENTIFIED BY '{settings.mysql_password}'"
))
conn.commit()
# 授予权限
conn.execute(text(
f"GRANT ALL PRIVILEGES ON `{settings.mysql_database}`.* TO '{settings.mysql_user}'@'%'"
))
conn.commit()
# 刷新权限
conn.execute(text("FLUSH PRIVILEGES"))
conn.commit()
root_engine.dispose()
print(f"✓ Database '{settings.mysql_database}' and user '{settings.mysql_user}' are ready")
except Exception as e:
@@ -123,7 +123,7 @@ def run_migrations_online() -> None:
"""
# 先确保数据库和用户存在
ensure_database_and_user()
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",

View File

@@ -22,7 +22,7 @@ class SQLManager:
connection_url = URL.create(
drivername="mysql+pymysql",
username=os.getenv("MYSQL_USER", "root"),
password=os.getenv("MYSQL_PASSWORD", "Huawei@123"),
password=os.getenv("MYSQL_PASSWORD", "password"),
host=os.getenv("MYSQL_HOST", "mysql"),
port=os.getenv("MYSQL_PORT", 3306),
database=os.getenv("MYSQL_DATABASE", "datamate"),

View File

@@ -55,8 +55,7 @@ CREATE TABLE IF NOT EXISTS t_clean_result
);
INSERT IGNORE INTO t_clean_template(id, name, description)
VALUES ('ac2f2582-a990-11f0-9768-00155d09c825', '模板', '模板'),
('26ae585c-8310-4679-adc0-e53215e6e69b', 'text文本清洗模板', 'text文本清洗模板'),
VALUES ('26ae585c-8310-4679-adc0-e53215e6e69b', '文本清洗模板', '文本清洗模板'),
('4421504e-c6c9-4760-b55a-509d17429597', '图片清洗模板', '图片清洗模板');
INSERT IGNORE INTO t_operator_instance(instance_id, operator_id, op_index, settings_override)

View File

@@ -26,7 +26,7 @@ FROM openjdk:21-jdk-slim
RUN sed -i 's/deb.debian.org/mirrors.aliyun.com/g' /etc/apt/sources.list.d/debian.sources && \
apt-get update && \
apt-get install -y vim wget curl nfs-common rsync python3 python3-pip python-is-python3 && \
apt-get install -y vim wget curl nfs-common rsync python3 python3-pip python-is-python3 dos2unix && \
apt-get clean && \
rm -rf /var/lib/apy/lists/*
@@ -37,7 +37,8 @@ COPY editions/community/config/application.yml /opt/backend/application.yml
COPY editions/community/config/log4j2.xml /opt/backend/log4j2.xml
COPY scripts/images/backend/start.sh /opt/backend/start.sh
RUN chmod +x /opt/backend/start.sh \
RUN dos2unix /opt/backend/start.sh \
&& chmod +x /opt/backend/start.sh \
&& ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
ENTRYPOINT ["/opt/backend/start.sh"]