diff --git a/Makefile b/Makefile
index c83aea0..c0dcc8d 100644
--- a/Makefile
+++ b/Makefile
@@ -108,7 +108,7 @@ datamate-docker-uninstall:
.PHONY: datamate-k8s-install
datamate-k8s-install: create-namespace
kubectl create configmap datamate-init-sql --from-file=scripts/db/ --dry-run=client -o yaml | kubectl apply -f - -n $(NAMESPACE)
- helm install datamate deployment/helm/datamate/ -n $(NAMESPACE)
+ helm upgrade datamate deployment/helm/datamate/ -n $(NAMESPACE) --install
.PHONY: datamate-k8s-uninstall
datamate-k8s-uninstall:
diff --git a/backend/services/data-cleaning-service/pom.xml b/backend/services/data-cleaning-service/pom.xml
index c48f1b1..56b70fc 100644
--- a/backend/services/data-cleaning-service/pom.xml
+++ b/backend/services/data-cleaning-service/pom.xml
@@ -22,6 +22,11 @@
domain-common
${project.version}
+
+ com.datamate
+ data-management-service
+ ${project.version}
+
org.springframework.boot
spring-boot-starter-test
diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/DataCleaningServiceConfiguration.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/DataCleaningServiceConfiguration.java
index 1825750..8f1dcce 100644
--- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/DataCleaningServiceConfiguration.java
+++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/DataCleaningServiceConfiguration.java
@@ -1,22 +1,16 @@
package com.datamate.cleaning;
import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* 数据归集服务配置类
- *
* 基于DataX的数据归集和同步服务,支持多种数据源的数据采集和归集
*/
@SpringBootApplication
@EnableAsync
@EnableScheduling
-@ComponentScan(basePackages = {
- "com.datamate.cleaning",
- "com.datamate.shared"
-})
public class DataCleaningServiceConfiguration {
// Configuration class for JAR packaging - no main method needed
}
diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/service/CleaningTaskService.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java
similarity index 62%
rename from backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/service/CleaningTaskService.java
rename to backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java
index e367cf9..9bb2a3c 100644
--- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/service/CleaningTaskService.java
+++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java
@@ -1,29 +1,34 @@
-package com.datamate.cleaning.application.service;
+package com.datamate.cleaning.application;
-import com.datamate.cleaning.application.httpclient.DatasetClient;
import com.datamate.cleaning.application.scheduler.CleaningTaskScheduler;
-import com.datamate.cleaning.domain.converter.OperatorInstanceConverter;
-import com.datamate.cleaning.domain.model.DatasetResponse;
-import com.datamate.cleaning.domain.model.ExecutorType;
-import com.datamate.cleaning.domain.model.OperatorInstancePo;
-import com.datamate.cleaning.domain.model.PagedDatasetFileResponse;
+import com.datamate.cleaning.common.enums.CleaningTaskStatusEnum;
+import com.datamate.cleaning.common.enums.ExecutorType;
+
import com.datamate.cleaning.domain.model.TaskProcess;
-import com.datamate.cleaning.infrastructure.persistence.mapper.CleaningResultMapper;
-import com.datamate.cleaning.infrastructure.persistence.mapper.CleaningTaskMapper;
-import com.datamate.cleaning.infrastructure.persistence.mapper.OperatorInstanceMapper;
+import com.datamate.cleaning.domain.repository.CleaningResultRepository;
+import com.datamate.cleaning.domain.repository.CleaningTaskRepository;
+import com.datamate.cleaning.domain.repository.OperatorInstanceRepository;
+
import com.datamate.cleaning.interfaces.dto.CleaningProcess;
-import com.datamate.cleaning.interfaces.dto.CleaningTask;
+import com.datamate.cleaning.interfaces.dto.CleaningTaskDto;
import com.datamate.cleaning.interfaces.dto.CreateCleaningTaskRequest;
-import com.datamate.cleaning.interfaces.dto.OperatorInstance;
+import com.datamate.cleaning.interfaces.dto.OperatorInstanceDto;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.SystemErrorCode;
+import com.datamate.datamanagement.application.DatasetApplicationService;
+import com.datamate.datamanagement.application.DatasetFileApplicationService;
+import com.datamate.datamanagement.common.enums.DatasetType;
+import com.datamate.datamanagement.domain.model.dataset.Dataset;
+import com.datamate.datamanagement.domain.model.dataset.DatasetFile;
+import com.datamate.datamanagement.interfaces.dto.CreateDatasetRequest;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -42,58 +47,61 @@ import java.util.UUID;
@Service
@RequiredArgsConstructor
public class CleaningTaskService {
- private final CleaningTaskMapper cleaningTaskMapper;
+ private final CleaningTaskRepository CleaningTaskRepo;
- private final OperatorInstanceMapper operatorInstanceMapper;
+ private final OperatorInstanceRepository operatorInstanceRepo;
- private final CleaningResultMapper cleaningResultMapper;
+ private final CleaningResultRepository cleaningResultRepo;
private final CleaningTaskScheduler taskScheduler;
+ private final DatasetApplicationService datasetService;
+
+ private final DatasetFileApplicationService datasetFileService;
+
private final String DATASET_PATH = "/dataset";
private final String FLOW_PATH = "/flow";
- public List getTasks(String status, String keywords, Integer page, Integer size) {
- Integer offset = page * size;
- List tasks = cleaningTaskMapper.findTasks(status, keywords, size, offset);
+ public List getTasks(String status, String keywords, Integer page, Integer size) {
+ List tasks = CleaningTaskRepo.findTasks(status, keywords, page, size);
tasks.forEach(this::setProcess);
return tasks;
}
- private void setProcess(CleaningTask task) {
- int count = cleaningResultMapper.countByInstanceId(task.getId());
+ private void setProcess(CleaningTaskDto task) {
+ int count = cleaningResultRepo.countByInstanceId(task.getId());
task.setProgress(CleaningProcess.of(task.getFileCount(), count));
}
public int countTasks(String status, String keywords) {
- return cleaningTaskMapper.findTasks(status, keywords, null, null).size();
+ return CleaningTaskRepo.findTasks(status, keywords, null, null).size();
}
@Transactional
- public CleaningTask createTask(CreateCleaningTaskRequest request) {
- DatasetResponse destDataset = DatasetClient.createDataset(request.getDestDatasetName(),
- request.getDestDatasetType());
+ public CleaningTaskDto createTask(CreateCleaningTaskRequest request) {
+ CreateDatasetRequest createDatasetRequest = new CreateDatasetRequest();
+ createDatasetRequest.setName(request.getDestDatasetName());
+ createDatasetRequest.setDatasetType(DatasetType.valueOf(request.getDestDatasetType()));
+ Dataset destDataset = datasetService.createDataset(createDatasetRequest);
- DatasetResponse srcDataset = DatasetClient.getDataset(request.getSrcDatasetId());
+ Dataset srcDataset = datasetService.getDataset(request.getSrcDatasetId());
- CleaningTask task = new CleaningTask();
+ CleaningTaskDto task = new CleaningTaskDto();
task.setName(request.getName());
task.setDescription(request.getDescription());
- task.setStatus(CleaningTask.StatusEnum.PENDING);
+ task.setStatus(CleaningTaskStatusEnum.PENDING);
String taskId = UUID.randomUUID().toString();
task.setId(taskId);
task.setSrcDatasetId(request.getSrcDatasetId());
task.setSrcDatasetName(request.getSrcDatasetName());
task.setDestDatasetId(destDataset.getId());
task.setDestDatasetName(destDataset.getName());
- task.setBeforeSize(srcDataset.getTotalSize());
- task.setFileCount(srcDataset.getFileCount());
- cleaningTaskMapper.insertTask(task);
+ task.setBeforeSize(srcDataset.getSizeBytes());
+ task.setFileCount(srcDataset.getFileCount().intValue());
+ CleaningTaskRepo.insertTask(task);
- List instancePos = request.getInstance().stream()
- .map(OperatorInstanceConverter.INSTANCE::operatorToDo).toList();
- operatorInstanceMapper.insertInstance(taskId, instancePos);
+ operatorInstanceRepo.insertInstance(taskId, request.getInstance());
prepareTask(task, request.getInstance());
scanDataset(taskId, request.getSrcDatasetId());
@@ -101,24 +109,24 @@ public class CleaningTaskService {
return task;
}
- public CleaningTask getTask(String taskId) {
- CleaningTask task = cleaningTaskMapper.findTaskById(taskId);
+ public CleaningTaskDto getTask(String taskId) {
+ CleaningTaskDto task = CleaningTaskRepo.findTaskById(taskId);
setProcess(task);
return task;
}
@Transactional
public void deleteTask(String taskId) {
- cleaningTaskMapper.deleteTask(taskId);
- operatorInstanceMapper.deleteByInstanceId(taskId);
- cleaningResultMapper.deleteByInstanceId(taskId);
+ CleaningTaskRepo.deleteTaskById(taskId);
+ operatorInstanceRepo.deleteByInstanceId(taskId);
+ cleaningResultRepo.deleteByInstanceId(taskId);
}
public void executeTask(String taskId) {
taskScheduler.executeTask(taskId);
}
- private void prepareTask(CleaningTask task, List instances) {
+ private void prepareTask(CleaningTaskDto task, List instances) {
TaskProcess process = new TaskProcess();
process.setInstanceId(task.getId());
process.setDatasetId(task.getDestDatasetId());
@@ -153,13 +161,13 @@ public class CleaningTaskService {
int pageNumber = 0;
int pageSize = 500;
PageRequest pageRequest = PageRequest.of(pageNumber, pageSize);
- PagedDatasetFileResponse datasetFile;
+ Page datasetFiles;
do {
- datasetFile = DatasetClient.getDatasetFile(srcDatasetId, pageRequest);
- if (datasetFile.getContent() != null && datasetFile.getContent().isEmpty()) {
+ datasetFiles = datasetFileService.getDatasetFiles(srcDatasetId, null, null, pageRequest);
+ if (datasetFiles.getContent().isEmpty()) {
break;
}
- List