Develop op (#35)

* refactor: enhance CleaningTaskService and related components with validation and repository updates
* feature: 支持算子上传创建
This commit is contained in:
hhhhsc701
2025-10-30 17:17:00 +08:00
committed by GitHub
parent 8d2b41ed94
commit b9b97c1ac2
63 changed files with 1190 additions and 1177 deletions

View File

@@ -1,6 +1,6 @@
package com.datamate.cleaning;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
@@ -8,9 +8,11 @@ import org.springframework.scheduling.annotation.EnableScheduling;
* 数据归集服务配置类
* 基于DataX的数据归集和同步服务,支持多种数据源的数据采集和归集
*/
@SpringBootApplication
@EnableAsync
@EnableScheduling
@ComponentScan(basePackages = {
"com.datamate.cleaning"
})
public class DataCleaningServiceConfiguration {
// Configuration class for JAR packaging - no main method needed
}

View File

@@ -10,6 +10,7 @@ import com.datamate.cleaning.domain.repository.CleaningResultRepository;
import com.datamate.cleaning.domain.repository.CleaningTaskRepository;
import com.datamate.cleaning.domain.repository.OperatorInstanceRepository;
import com.datamate.cleaning.infrastructure.validator.CleanTaskValidator;
import com.datamate.cleaning.interfaces.dto.CleaningProcess;
import com.datamate.cleaning.interfaces.dto.CleaningTaskDto;
import com.datamate.cleaning.interfaces.dto.CreateCleaningTaskRequest;
@@ -59,6 +60,8 @@ public class CleaningTaskService {
private final DatasetFileApplicationService datasetFileService;
private final CleanTaskValidator cleanTaskValidator;
private final String DATASET_PATH = "/dataset";
private final String FLOW_PATH = "/flow";
@@ -80,6 +83,9 @@ public class CleaningTaskService {
@Transactional
public CleaningTaskDto createTask(CreateCleaningTaskRequest request) {
cleanTaskValidator.checkNameDuplication(request.getName());
cleanTaskValidator.checkInputAndOutput(request.getInstance());
CreateDatasetRequest createDatasetRequest = new CreateDatasetRequest();
createDatasetRequest.setName(request.getDestDatasetName());
createDatasetRequest.setDatasetType(DatasetType.valueOf(request.getDestDatasetType()));

View File

@@ -5,6 +5,8 @@ import com.datamate.cleaning.domain.repository.CleaningTemplateRepository;
import com.datamate.cleaning.domain.repository.OperatorInstanceRepository;
import com.datamate.cleaning.interfaces.dto.*;
import com.datamate.cleaning.domain.model.entity.TemplateWithInstance;
import com.datamate.operator.domain.repository.OperatorRepository;
import com.datamate.operator.interfaces.dto.OperatorDto;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
@@ -24,8 +26,10 @@ public class CleaningTemplateService {
private final OperatorInstanceRepository operatorInstanceRepo;
private final OperatorRepository operatorRepo;
public List<CleaningTemplateDto> getTemplates(String keywords) {
List<OperatorDto> allOperators = operatorInstanceRepo.findAllOperators();
List<OperatorDto> allOperators = operatorRepo.findAllOperators();
Map<String, OperatorDto> operatorsMap = allOperators.stream()
.collect(Collectors.toMap(OperatorDto::getId, Function.identity()));
List<TemplateWithInstance> allTemplates = cleaningTemplateRepo.findAllTemplates(keywords);

View File

@@ -12,7 +12,7 @@ public enum CleanErrorCode implements ErrorCode {
*/
DUPLICATE_TASK_NAME("clean.0001", "清洗任务名称重复"),
CREATE_DATASET_FAILED("clean.0002", "创建数据集失败");
IN_AND_OUT_NOT_MATCH("clean.0002", "算子输入输出不匹配");
private final String code;
private final String message;

View File

@@ -1,26 +0,0 @@
package com.datamate.cleaning.domain.model;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.util.List;
@Getter
@Setter
@NoArgsConstructor
public class CreateDatasetRequest {
/** 数据集名称 */
private String name;
/** 数据集描述 */
private String description;
/** 数据集类型 */
private String datasetType;
/** 标签列表 */
private List<String> tags;
/** 数据源 */
private String dataSource;
/** 目标位置 */
private String targetLocation;
}

View File

@@ -1,36 +0,0 @@
package com.datamate.cleaning.domain.model;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.time.LocalDateTime;
@Getter
@Setter
@NoArgsConstructor
public class DatasetFileResponse {
/** 文件ID */
private String id;
/** 文件名 */
private String fileName;
/** 原始文件名 */
private String originalName;
/** 文件类型 */
private String fileType;
/** 文件大小(字节) */
private Long fileSize;
/** 文件状态 */
private String status;
/** 文件描述 */
private String description;
/** 文件路径 */
private String filePath;
/** 上传时间 */
private LocalDateTime uploadTime;
/** 最后更新时间 */
private LocalDateTime lastAccessTime;
/** 上传者 */
private String uploadedBy;
}

View File

@@ -1,44 +0,0 @@
package com.datamate.cleaning.domain.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.time.LocalDateTime;
/**
* 数据集实体(与数据库表 t_dm_datasets 对齐)
*/
@Getter
@Setter
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class DatasetResponse {
/** 数据集ID */
private String id;
/** 数据集名称 */
private String name;
/** 数据集描述 */
private String description;
/** 数据集类型 */
private String datasetType;
/** 数据集状态 */
private String status;
/** 数据源 */
private String dataSource;
/** 目标位置 */
private String targetLocation;
/** 文件数量 */
private Integer fileCount;
/** 总大小(字节) */
private Long totalSize;
/** 完成率(0-100) */
private Float completionRate;
/** 创建时间 */
private LocalDateTime createdAt;
/** 更新时间 */
private LocalDateTime updatedAt;
/** 创建者 */
private String createdBy;
}

View File

@@ -1,23 +0,0 @@
package com.datamate.cleaning.domain.model;
import lombok.Getter;
import lombok.Setter;
import java.util.List;
/**
* 数据集类型响应DTO
*/
@Getter
@Setter
public class DatasetTypeResponse {
/** 类型编码 */
private String code;
/** 类型名称 */
private String name;
/** 类型描述 */
private String description;
/** 支持的文件格式 */
private List<String> supportedFormats;
/** 图标 */
private String icon;
}

View File

@@ -1,28 +0,0 @@
package com.datamate.cleaning.domain.model;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.util.List;
@Getter
@Setter
@NoArgsConstructor
public class PagedDatasetFileResponse {
/** 文件内容列表 */
private List<DatasetFileResponse> content;
/** 当前页码 */
private Integer page;
/** 每页大小 */
private Integer size;
/** 总元素数 */
private Integer totalElements;
/** 总页数 */
private Integer totalPages;
/** 是否为第一页 */
private Boolean first;
/** 是否为最后一页 */
private Boolean last;
}

View File

@@ -16,4 +16,6 @@ public interface CleaningTaskRepository extends IRepository<CleaningTask> {
void updateTask(CleaningTaskDto task);
void deleteTaskById(String taskId);
boolean isNameExist(String name);
}

View File

@@ -1,15 +1,12 @@
package com.datamate.cleaning.domain.repository;
import com.baomidou.mybatisplus.extension.repository.IRepository;
import com.datamate.cleaning.interfaces.dto.OperatorDto;
import com.datamate.cleaning.interfaces.dto.OperatorInstanceDto;
import com.datamate.cleaning.domain.model.entity.OperatorInstance;
import java.util.List;
public interface OperatorInstanceRepository extends IRepository<OperatorInstance> {
List<OperatorDto> findAllOperators();
void insertInstance(String instanceId, List<OperatorInstanceDto> instances);
void deleteByInstanceId(String instanceId);

View File

@@ -3,10 +3,10 @@ package com.datamate.cleaning.infrastructure.converter;
import com.datamate.cleaning.domain.model.entity.OperatorInstance;
import com.datamate.cleaning.domain.model.entity.Operator;
import com.datamate.cleaning.interfaces.dto.OperatorDto;
import com.datamate.cleaning.interfaces.dto.OperatorInstanceDto;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.SystemErrorCode;
import com.datamate.operator.interfaces.dto.OperatorDto;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.mapstruct.Mapper;

View File

@@ -51,4 +51,10 @@ public class CleaningTaskRepositoryImpl extends CrudRepository<CleaningTaskMappe
public void deleteTaskById(String taskId) {
mapper.deleteById(taskId);
}
public boolean isNameExist(String name) {
LambdaQueryWrapper<CleaningTask> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(CleaningTask::getName, name);
return mapper.exists(queryWrapper);
}
}

View File

@@ -3,7 +3,6 @@ package com.datamate.cleaning.infrastructure.persistence.Impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.repository.CrudRepository;
import com.datamate.cleaning.infrastructure.converter.OperatorInstanceConverter;
import com.datamate.cleaning.interfaces.dto.OperatorDto;
import com.datamate.cleaning.interfaces.dto.OperatorInstanceDto;
import com.datamate.cleaning.domain.model.entity.OperatorInstance;
import com.datamate.cleaning.domain.repository.OperatorInstanceRepository;
@@ -20,11 +19,6 @@ public class OperatorInstanceRepositoryImpl extends CrudRepository<OperatorInsta
implements OperatorInstanceRepository {
private final OperatorInstanceMapper mapper;
@Override
public List<OperatorDto> findAllOperators() {
return OperatorInstanceConverter.INSTANCE.fromEntityToDto(mapper.findAllOperators());
}
@Override
public void insertInstance(String instanceId, List<OperatorInstanceDto> instances) {
List<OperatorInstance> operatorInstances = new ArrayList<>();

View File

@@ -1,17 +1,10 @@
package com.datamate.cleaning.infrastructure.persistence.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.datamate.cleaning.domain.model.entity.Operator;
import com.datamate.cleaning.domain.model.entity.OperatorInstance;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
import java.util.List;
@Mapper
public interface OperatorInstanceMapper extends BaseMapper<OperatorInstance> {
@Select("SELECT id, name, description, version, inputs, outputs, runtime, settings, is_star, created_at, " +
"updated_at FROM t_operator")
List<Operator> findAllOperators();
}

View File

@@ -0,0 +1,40 @@
package com.datamate.cleaning.infrastructure.validator;
import com.datamate.cleaning.common.exception.CleanErrorCode;
import com.datamate.cleaning.domain.repository.CleaningTaskRepository;
import com.datamate.cleaning.interfaces.dto.OperatorInstanceDto;
import com.datamate.common.infrastructure.exception.BusinessException;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Locale;
@Component
@RequiredArgsConstructor
public class CleanTaskValidator {
private final CleaningTaskRepository cleaningTaskRepo;
public void checkNameDuplication (String name) {
if (cleaningTaskRepo.isNameExist(name)) {
throw BusinessException.of(CleanErrorCode.DUPLICATE_TASK_NAME);
}
}
public void checkInputAndOutput (List<OperatorInstanceDto> operators) {
if (operators == null || operators.size() <= 1) {
return;
}
for (int i = 1; i < operators.size(); i++) {
OperatorInstanceDto front = operators.get(i - 1);
OperatorInstanceDto back = operators.get(i);
if (!StringUtils.equals(front.getOutputs(), back.getInputs())) {
throw BusinessException.of(CleanErrorCode.IN_AND_OUT_NOT_MATCH,
String.format(Locale.ROOT, "ops(name: [%s, %s]) inputs and outputs does not match",
front.getName(), back.getName()));
}
}
}
}

View File

@@ -5,6 +5,7 @@ import com.datamate.cleaning.common.enums.CleaningTaskStatusEnum;
import java.time.LocalDateTime;
import java.util.List;
import com.datamate.operator.interfaces.dto.OperatorDto;
import lombok.Getter;
import lombok.Setter;
import org.springframework.format.annotation.DateTimeFormat;

View File

@@ -4,6 +4,7 @@ import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import com.datamate.operator.interfaces.dto.OperatorDto;
import lombok.Getter;
import lombok.Setter;
import org.springframework.format.annotation.DateTimeFormat;

View File

@@ -1,41 +0,0 @@
package com.datamate.cleaning.interfaces.dto;
import java.time.LocalDateTime;
import lombok.Getter;
import lombok.Setter;
import org.springframework.format.annotation.DateTimeFormat;
/**
* OperatorDto
*/
@Getter
@Setter
public class OperatorDto {
private String id;
private String name;
private String description;
private String version;
private String inputs;
private String outputs;
private String runtime;
private String settings;
private Boolean isStar;
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
private LocalDateTime createdAt;
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME)
private LocalDateTime updatedAt;
}

View File

@@ -1,6 +1,7 @@
package com.datamate.cleaning.interfaces.dto;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -17,6 +18,14 @@ public class OperatorInstanceDto {
private String id;
private String name;
private String inputs;
private String outputs;
private List<Integer> categories;
private Map<String, Object> overrides = new HashMap<>();
}

View File

@@ -3,10 +3,8 @@ package com.datamate.cleaning.interfaces.rest;
import com.datamate.cleaning.application.CleaningTaskService;
import com.datamate.cleaning.interfaces.dto.CleaningTaskDto;
import com.datamate.cleaning.interfaces.dto.CreateCleaningTaskRequest;
import com.datamate.common.infrastructure.common.Response;
import com.datamate.common.interfaces.PagedResponse;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@@ -19,41 +17,41 @@ public class CleaningTaskController {
private final CleaningTaskService cleaningTaskService;
@GetMapping
public ResponseEntity<Response<PagedResponse<CleaningTaskDto>>> cleaningTasksGet(
public PagedResponse<CleaningTaskDto> cleaningTasksGet(
@RequestParam("page") Integer page,
@RequestParam("size") Integer size, @RequestParam(value = "status", required = false) String status,
@RequestParam(value = "keywords", required = false) String keywords) {
List<CleaningTaskDto> tasks = cleaningTaskService.getTasks(status, keywords, page, size);
int count = cleaningTaskService.countTasks(status, keywords);
int totalPages = (count + size + 1) / size;
return ResponseEntity.ok(Response.ok(PagedResponse.of(tasks, page, count, totalPages)));
return PagedResponse.of(tasks, page, count, totalPages);
}
@PostMapping
public ResponseEntity<Response<CleaningTaskDto>> cleaningTasksPost(@RequestBody CreateCleaningTaskRequest request) {
return ResponseEntity.ok(Response.ok(cleaningTaskService.createTask(request)));
public CleaningTaskDto cleaningTasksPost(@RequestBody CreateCleaningTaskRequest request) {
return cleaningTaskService.createTask(request);
}
@PostMapping("/{taskId}/stop")
public ResponseEntity<Response<Object>> cleaningTasksStop(@PathVariable("taskId") String taskId) {
public String cleaningTasksStop(@PathVariable("taskId") String taskId) {
cleaningTaskService.stopTask(taskId);
return ResponseEntity.ok(Response.ok(null));
return taskId;
}
@PostMapping("/{taskId}/execute")
public ResponseEntity<Response<Object>> cleaningTasksStart(@PathVariable("taskId") String taskId) {
public String cleaningTasksStart(@PathVariable("taskId") String taskId) {
cleaningTaskService.executeTask(taskId);
return ResponseEntity.ok(Response.ok(null));
return taskId;
}
@GetMapping("/{taskId}")
public ResponseEntity<Response<CleaningTaskDto>> cleaningTasksTaskIdGet(@PathVariable("taskId") String taskId) {
return ResponseEntity.ok(Response.ok(cleaningTaskService.getTask(taskId)));
public CleaningTaskDto cleaningTasksTaskIdGet(@PathVariable("taskId") String taskId) {
return cleaningTaskService.getTask(taskId);
}
@DeleteMapping("/{taskId}")
public ResponseEntity<Response<Object>> cleaningTasksTaskIdDelete(@PathVariable("taskId") String taskId) {
public String cleaningTasksTaskIdDelete(@PathVariable("taskId") String taskId) {
cleaningTaskService.deleteTask(taskId);
return ResponseEntity.ok(Response.ok(null));
return taskId;
}
}

View File

@@ -4,10 +4,8 @@ import com.datamate.cleaning.application.CleaningTemplateService;
import com.datamate.cleaning.interfaces.dto.CleaningTemplateDto;
import com.datamate.cleaning.interfaces.dto.CreateCleaningTemplateRequest;
import com.datamate.cleaning.interfaces.dto.UpdateCleaningTemplateRequest;
import com.datamate.common.infrastructure.common.Response;
import com.datamate.common.interfaces.PagedResponse;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
@@ -29,14 +27,14 @@ public class CleaningTemplateController {
private final CleaningTemplateService cleaningTemplateService;
@GetMapping
public ResponseEntity<Response<PagedResponse<CleaningTemplateDto>>> cleaningTemplatesGet(
public PagedResponse<CleaningTemplateDto> cleaningTemplatesGet(
@RequestParam(value = "page", required = false) Integer page,
@RequestParam(value = "size", required = false) Integer size,
@RequestParam(value = "keywords", required = false) String keyword) {
List<CleaningTemplateDto> templates = cleaningTemplateService.getTemplates(keyword);
if (page == null || size == null) {
return ResponseEntity.ok(Response.ok(PagedResponse.of(templates.stream()
.sorted(Comparator.comparing(CleaningTemplateDto::getCreatedAt).reversed()).toList())));
return PagedResponse.of(templates.stream()
.sorted(Comparator.comparing(CleaningTemplateDto::getCreatedAt).reversed()).toList());
}
int count = templates.size();
int totalPages = (count + size + 1) / size;
@@ -44,31 +42,31 @@ public class CleaningTemplateController {
.sorted(Comparator.comparing(CleaningTemplateDto::getCreatedAt).reversed())
.skip((long) page * size)
.limit(size).toList();
return ResponseEntity.ok(Response.ok(PagedResponse.of(limitTemplates, page, count, totalPages)));
return PagedResponse.of(limitTemplates, page, count, totalPages);
}
@PostMapping
public ResponseEntity<Response<CleaningTemplateDto>> cleaningTemplatesPost(
public CleaningTemplateDto cleaningTemplatesPost(
@RequestBody CreateCleaningTemplateRequest request) {
return ResponseEntity.ok(Response.ok(cleaningTemplateService.createTemplate(request)));
return cleaningTemplateService.createTemplate(request);
}
@GetMapping("/{templateId}")
public ResponseEntity<Response<CleaningTemplateDto>> cleaningTemplatesTemplateIdGet(
public CleaningTemplateDto cleaningTemplatesTemplateIdGet(
@PathVariable("templateId") String templateId) {
return ResponseEntity.ok(Response.ok(cleaningTemplateService.getTemplate(templateId)));
return cleaningTemplateService.getTemplate(templateId);
}
@PutMapping("/{templateId}")
public ResponseEntity<Response<CleaningTemplateDto>> cleaningTemplatesTemplateIdPut(
public CleaningTemplateDto cleaningTemplatesTemplateIdPut(
@PathVariable("templateId") String templateId, @RequestBody UpdateCleaningTemplateRequest request) {
return ResponseEntity.ok(Response.ok(cleaningTemplateService.updateTemplate(templateId, request)));
return cleaningTemplateService.updateTemplate(templateId, request);
}
@DeleteMapping("/{templateId}")
public ResponseEntity<Response<Object>> cleaningTemplatesTemplateIdDelete(
public String cleaningTemplatesTemplateIdDelete(
@PathVariable("templateId") String templateId) {
cleaningTemplateService.deleteTemplate(templateId);
return ResponseEntity.noContent().build();
return templateId;
}
}