diff --git a/Makefile b/Makefile index cf0bca7..f57014e 100644 --- a/Makefile +++ b/Makefile @@ -82,8 +82,8 @@ deer-flow-docker-build: git clone git@github.com:bytedance/deer-flow.git ../deer-flow; \ fi sed -i "s/dark/light/g" "../deer-flow/web/src/components/deer-flow/theme-provider-wrapper.tsx" - cp deployment/docker/deer-flow/.env.example ../deer-flow/.env - cp deployment/docker/deer-flow/conf.yaml.example ../deer-flow/conf.yaml + cp -n deployment/docker/deer-flow/.env.example ../deer-flow/.env + cp -n deployment/docker/deer-flow/conf.yaml.example ../deer-flow/conf.yaml cd ../deer-flow && docker compose build .PHONY: mineru-docker-build @@ -131,16 +131,16 @@ mineru-k8s-uninstall: .PHONY: datamate-docker-install datamate-docker-install: - cd deployment/docker/datamate && cp .env.example .env && docker compose -f docker-compose.yml up -d + cd deployment/docker/datamate && cp -n .env.example .env && docker compose -f docker-compose.yml up -d .PHONY: datamate-docker-uninstall datamate-docker-uninstall: - cd deployment/docker/datamate && docker compose -f docker-compose.yml down + cd deployment/docker/datamate && docker compose -f docker-compose.yml down -v .PHONY: deer-flow-docker-install deer-flow-docker-install: - cd deployment/docker/datamate && cp .env.deer-flow.example .env && docker compose -f docker-compose.yml up -d - cd deployment/docker/deer-flow && cp .env.example .env && cp conf.yaml.example conf.yaml && docker compose -f docker-compose.yml up -d + cd deployment/docker/datamate && cp -n .env.deer-flow.example .env && docker compose -f docker-compose.yml up -d + cd deployment/docker/deer-flow && cp -n .env.example .env && cp -n conf.yaml.example conf.yaml && docker compose -f docker-compose.yml up -d .PHONY: deer-flow-docker-uninstall deer-flow-docker-uninstall: diff --git a/backend/openapi/specs/operator-market.yaml b/backend/openapi/specs/operator-market.yaml index 44d353e..1f52368 100644 --- a/backend/openapi/specs/operator-market.yaml +++ b/backend/openapi/specs/operator-market.yaml @@ -43,88 +43,81 @@ paths: schema: type: "object" properties: - code: - type: "string" - description: "状态码" - message: - type: "string" - description: "消息" - data: - type: "object" - properties: - page: - type: "integer" - description: "" - format: "int64" - size: - type: "integer" - description: "" - format: "int64" - totalElements: - type: "integer" - description: "" - format: "int64" - totalPages: - type: "integer" - description: "" - format: "int64" - content: - type: "array" - description: "" - items: - type: "object" - properties: - id: - type: "string" - description: "" - name: - type: "string" - description: "" - description: - type: "string" - description: "" - version: - type: "string" - description: "" - inputs: - type: "string" - description: "" - outputs: - type: "string" - description: "" - categories: - type: "array" - description: "" - items: - type: "integer" - runtime: - type: "string" - description: "" - settings: - type: "string" - description: "" - isStar: - type: "boolean" - description: "" - createdAt: - type: "string" - description: "" - updatedAt: - type: "string" - description: "" - description: "OperatorResponse" - description: "数据" + page: + type: "integer" + description: "" + format: "int64" + size: + type: "integer" + description: "" + format: "int64" + totalElements: + type: "integer" + description: "" + format: "int64" + totalPages: + type: "integer" + description: "" + format: "int64" + content: + type: "array" + description: "" + items: + type: "object" + properties: + id: + type: "string" + description: "" + name: + type: "string" + description: "" + description: + type: "string" + description: "" + version: + type: "string" + description: "" + inputs: + type: "string" + description: "" + outputs: + type: "string" + description: "" + categories: + type: "array" + description: "" + items: + type: "string" + runtime: + type: "string" + description: "" + settings: + type: "string" + description: "" + fileName: + type: "string" + description: "" + isStar: + type: "boolean" + description: "" + createdAt: + type: "string" + description: "" + updatedAt: + type: "string" + description: "" + description: "OperatorDto" /operators/{id}: get: summary: "operatorsIdGet" description: "operatorsIdGet" parameters: - - name: "id" - in: "path" - description: "" - required: true - schema: - type: "string" + - name: "id" + in: "path" + description: "" + required: true + schema: + type: "string" responses: "200": description: "" @@ -133,70 +126,66 @@ paths: schema: type: "object" properties: - code: + id: type: "string" - description: "状态码" - message: + description: "" + name: type: "string" - description: "消息" - data: - type: "object" - properties: - id: - type: "string" - description: "" - name: - type: "string" - description: "" - description: - type: "string" - description: "" - version: - type: "string" - description: "" - inputs: - type: "string" - description: "" - outputs: - type: "string" - description: "" - categories: - type: "array" - description: "" - items: - type: "integer" - runtime: - type: "string" - description: "" - settings: - type: "string" - description: "" - isStar: - type: "boolean" - description: "" - createdAt: - type: "string" - description: "" - updatedAt: - type: "string" - description: "" - description: "数据" + description: "" + description: + type: "string" + description: "" + version: + type: "string" + description: "" + inputs: + type: "string" + description: "" + outputs: + type: "string" + description: "" + categories: + type: "array" + description: "" + items: + type: "string" + runtime: + type: "string" + description: "" + settings: + type: "string" + description: "" + fileName: + type: "string" + description: "" + isStar: + type: "boolean" + description: "" + createdAt: + type: "string" + description: "" + updatedAt: + type: "string" + description: "" put: summary: "operatorsIdPut" description: "operatorsIdPut" parameters: - - name: "id" - in: "path" - description: "" - required: true - schema: - type: "string" + - name: "id" + in: "path" + description: "" + required: true + schema: + type: "string" requestBody: content: application/json: schema: type: "object" properties: + id: + type: "string" + description: "" name: type: "string" description: "" @@ -216,13 +205,25 @@ paths: type: "array" description: "" items: - type: "integer" + type: "string" runtime: type: "string" description: "" settings: type: "string" description: "" + fileName: + type: "string" + description: "" + isStar: + type: "boolean" + description: "" + createdAt: + type: "string" + description: "" + updatedAt: + type: "string" + description: "" description: "" responses: "200": @@ -232,54 +233,47 @@ paths: schema: type: "object" properties: - code: + id: type: "string" - description: "状态码" - message: + description: "" + name: type: "string" - description: "消息" - data: - type: "object" - properties: - id: - type: "string" - description: "" - name: - type: "string" - description: "" - description: - type: "string" - description: "" - version: - type: "string" - description: "" - inputs: - type: "string" - description: "" - outputs: - type: "string" - description: "" - categories: - type: "array" - description: "" - items: - type: "integer" - runtime: - type: "string" - description: "" - settings: - type: "string" - description: "" - isStar: - type: "boolean" - description: "" - createdAt: - type: "string" - description: "" - updatedAt: - type: "string" - description: "" - description: "数据" + description: "" + description: + type: "string" + description: "" + version: + type: "string" + description: "" + inputs: + type: "string" + description: "" + outputs: + type: "string" + description: "" + categories: + type: "array" + description: "" + items: + type: "string" + runtime: + type: "string" + description: "" + settings: + type: "string" + description: "" + fileName: + type: "string" + description: "" + isStar: + type: "boolean" + description: "" + createdAt: + type: "string" + description: "" + updatedAt: + type: "string" + description: "" /operators/create: post: summary: "operatorsCreatePost" @@ -312,7 +306,7 @@ paths: type: "array" description: "" items: - type: "integer" + type: "string" runtime: type: "string" description: "" @@ -322,230 +316,13 @@ paths: fileName: type: "string" description: "" - description: "" - responses: - "200": - description: "" - content: - application/json: - schema: - type: "object" - properties: - code: - type: "string" - description: "状态码" - message: - type: "string" - description: "消息" - data: - type: "object" - properties: - id: - type: "string" - description: "" - name: - type: "string" - description: "" - description: - type: "string" - description: "" - version: - type: "string" - description: "" - inputs: - type: "string" - description: "" - outputs: - type: "string" - description: "" - categories: - type: "array" - description: "" - items: - type: "integer" - runtime: - type: "string" - description: "" - settings: - type: "string" - description: "" - isStar: - type: "boolean" - description: "" - createdAt: - type: "string" - description: "" - updatedAt: - type: "string" - description: "" - description: "数据" - /operators/upload: - post: - summary: "operatorsUploadPost" - description: "operatorsUploadPost" - parameters: - - name: "description" - in: "query" - description: "" - required: true - schema: - type: "string" - nullable: false - requestBody: - content: - multipart/form-data: - schema: - required: - - "file" - type: "object" - properties: - file: + isStar: + type: "boolean" + description: "" + createdAt: type: "string" description: "" - format: "binary" - required: true - responses: - "200": - description: "" - content: - application/json: - schema: - type: "object" - properties: - code: - type: "string" - description: "状态码" - message: - type: "string" - description: "消息" - data: - type: "object" - properties: - id: - type: "string" - description: "" - name: - type: "string" - description: "" - description: - type: "string" - description: "" - version: - type: "string" - description: "" - inputs: - type: "string" - description: "" - outputs: - type: "string" - description: "" - categories: - type: "array" - description: "" - items: - type: "integer" - runtime: - type: "string" - description: "" - settings: - type: "string" - description: "" - isStar: - type: "boolean" - description: "" - createdAt: - type: "string" - description: "" - updatedAt: - type: "string" - description: "" - description: "数据" - /labels: - get: - summary: "labelsGet" - description: "labelsGet" - parameters: - - name: "page" - in: "query" - description: "" - required: true - schema: - type: "integer" - nullable: false - - name: "size" - in: "query" - description: "" - required: true - schema: - type: "integer" - nullable: false - - name: "keyword" - in: "query" - description: "" - required: true - schema: - type: "string" - nullable: false - responses: - "200": - description: "" - content: - application/json: - schema: - type: "object" - properties: - code: - type: "string" - description: "状态码" - message: - type: "string" - description: "消息" - data: - type: "object" - properties: - page: - type: "integer" - description: "" - format: "int64" - size: - type: "integer" - description: "" - format: "int64" - totalElements: - type: "integer" - description: "" - format: "int64" - totalPages: - type: "integer" - description: "" - format: "int64" - content: - type: "array" - description: "" - items: - type: "object" - properties: - id: - type: "string" - description: "" - name: - type: "string" - description: "" - description: "com.datamate.operator.interfaces.dto.Label" - description: "数据" - post: - summary: "labelsPost" - description: "labelsPost" - requestBody: - content: - application/json: - schema: - type: "object" - properties: - id: - type: "string" - description: "" - name: + updatedAt: type: "string" description: "" description: "" @@ -555,36 +332,6 @@ paths: content: application/json: schema: - type: "object" - properties: - code: - type: "string" - description: "状态码" - message: - type: "string" - description: "消息" - data: - type: "object" - properties: {} - description: "数据" - /labels/{id}: - put: - summary: "labelsIdPut" - description: "labelsIdPut" - parameters: - - name: "id" - in: "path" - description: "" - required: true - schema: - type: "string" - requestBody: - content: - application/json: - schema: - type: "array" - description: "" - items: type: "object" properties: id: @@ -593,6 +340,73 @@ paths: name: type: "string" description: "" + description: + type: "string" + description: "" + version: + type: "string" + description: "" + inputs: + type: "string" + description: "" + outputs: + type: "string" + description: "" + categories: + type: "array" + description: "" + items: + type: "string" + runtime: + type: "string" + description: "" + settings: + type: "string" + description: "" + fileName: + type: "string" + description: "" + isStar: + type: "boolean" + description: "" + createdAt: + type: "string" + description: "" + updatedAt: + type: "string" + description: "" + /operators/upload: + post: + summary: "operatorsUploadPost" + description: "operatorsUploadPost" + requestBody: + content: + application/json: + schema: + type: "object" + properties: + reqId: + type: "string" + description: "预上传返回的id,用来确认同一个任务" + fileNo: + type: "integer" + description: "文件编号,用于标识批量上传中的第几个文件" + fileName: + type: "string" + description: "文件名称" + totalChunkNum: + type: "integer" + description: "文件总分块数量" + chunkNo: + type: "integer" + description: "当前分块编号,从1开始" + file: + type: "string" + description: "上传的文件分块内容" + checkSumHex: + type: "string" + description: "文件分块的校验和(十六进制字符串),用于验证文件完整性" + description: "" responses: "200": description: "" @@ -601,99 +415,96 @@ paths: schema: type: "object" properties: - code: + id: type: "string" - description: "状态码" - message: + description: "" + name: type: "string" - description: "消息" - data: - type: "object" - properties: {} - description: "数据" - /categories/tree: - get: - summary: "categoryTreeGet" - description: "categoryTreeGet" + description: "" + description: + type: "string" + description: "" + version: + type: "string" + description: "" + inputs: + type: "string" + description: "" + outputs: + type: "string" + description: "" + categories: + type: "array" + description: "" + items: + type: "string" + runtime: + type: "string" + description: "" + settings: + type: "string" + description: "" + fileName: + type: "string" + description: "" + isStar: + type: "boolean" + description: "" + createdAt: + type: "string" + description: "" + updatedAt: + type: "string" + description: "" + /operators/upload/pre-upload: + post: + summary: "preUpload" + description: "preUpload" responses: "200": description: "" content: application/json: schema: - type: "object" - properties: - code: - type: "string" - description: "状态码" - message: - type: "string" - description: "消息" - data: - type: "object" - properties: - page: - type: "integer" - description: "" - format: "int64" - size: - type: "integer" - description: "" - format: "int64" - totalElements: - type: "integer" - description: "" - format: "int64" - totalPages: - type: "integer" - description: "" - format: "int64" - content: - type: "array" - description: "" - items: - type: "object" - properties: - id: - type: "integer" - description: "" - name: - type: "string" - description: "" - count: - type: "integer" - description: "" - categories: - type: "array" - description: "" - items: - type: "object" - properties: - id: - type: "integer" - description: "" - format: "int64" - name: - type: "string" - description: "" - count: - type: "integer" - description: "" - format: "int64" - type: - type: "string" - description: "" - parentId: - type: "integer" - description: "" - format: "int64" - description: "com.datamate.operator.interfaces.dto.SubCategory" - default: "new ArrayList<>()" - description: "com.datamate.operator.interfaces.dto.CategoryTreeResponse" - description: "数据" + type: "string" + /operators/upload/chunk: + post: + summary: "chunkUpload" + description: "chunkUpload" + requestBody: + content: + application/json: + schema: + type: "object" + properties: + reqId: + type: "string" + description: "预上传返回的id,用来确认同一个任务" + fileNo: + type: "integer" + description: "文件编号,用于标识批量上传中的第几个文件" + fileName: + type: "string" + description: "文件名称" + totalChunkNum: + type: "integer" + description: "文件总分块数量" + chunkNo: + type: "integer" + description: "当前分块编号,从1开始" + file: + type: "string" + description: "上传的文件分块内容" + checkSumHex: + type: "string" + description: "文件分块的校验和(十六进制字符串),用于验证文件完整性" + description: "" + responses: + "200": + description: "No Content" components: schemas: - OperatorResponse: + OperatorDto: type: "object" properties: id: @@ -718,13 +529,16 @@ components: type: "array" description: "" items: - type: "integer" + type: "string" runtime: type: "string" description: "" settings: type: "string" description: "" + fileName: + type: "string" + description: "" isStar: type: "boolean" description: "" @@ -734,79 +548,4 @@ components: updatedAt: type: "string" description: "" - description: "数据" - com.datamate.operator.interfaces.dto.Label: - type: "object" - properties: - id: - type: "string" - description: "" - name: - type: "string" - description: "" - description: "com.datamate.operator.interfaces.dto.Label" - java.lang.Object: - type: "object" - properties: {} - description: "数据" - com.datamate.operator.interfaces.dto.SubCategory: - type: "object" - properties: - id: - type: "integer" - description: "" - format: "int64" - name: - type: "string" - description: "" - count: - type: "integer" - description: "" - format: "int64" - type: - type: "string" - description: "" - parentId: - type: "integer" - description: "" - format: "int64" - description: "com.datamate.operator.interfaces.dto.SubCategory" - com.datamate.operator.interfaces.dto.CategoryTreeResponse: - type: "object" - properties: - id: - type: "integer" - description: "" - name: - type: "string" - description: "" - count: - type: "integer" - description: "" - categories: - type: "array" - description: "" - items: - type: "object" - properties: - id: - type: "integer" - description: "" - format: "int64" - name: - type: "string" - description: "" - count: - type: "integer" - description: "" - format: "int64" - type: - type: "string" - description: "" - parentId: - type: "integer" - description: "" - format: "int64" - description: "com.datamate.operator.interfaces.dto.SubCategory" - default: "new ArrayList<>()" - description: "com.datamate.operator.interfaces.dto.CategoryTreeResponse" + description: "OperatorDto" diff --git a/backend/services/data-cleaning-service/pom.xml b/backend/services/data-cleaning-service/pom.xml index 6ec182f..aed8ad6 100644 --- a/backend/services/data-cleaning-service/pom.xml +++ b/backend/services/data-cleaning-service/pom.xml @@ -27,6 +27,11 @@ data-management-service ${project.version} + + com.datamate + operator-market-service + ${project.version} + org.springframework.boot spring-boot-starter-test diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/DataCleaningServiceConfiguration.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/DataCleaningServiceConfiguration.java index 8f1dcce..1ea5ad0 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/DataCleaningServiceConfiguration.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/DataCleaningServiceConfiguration.java @@ -1,6 +1,6 @@ package com.datamate.cleaning; -import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.ComponentScan; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; @@ -8,9 +8,11 @@ import org.springframework.scheduling.annotation.EnableScheduling; * 数据归集服务配置类 * 基于DataX的数据归集和同步服务,支持多种数据源的数据采集和归集 */ -@SpringBootApplication @EnableAsync @EnableScheduling +@ComponentScan(basePackages = { + "com.datamate.cleaning" +}) public class DataCleaningServiceConfiguration { // Configuration class for JAR packaging - no main method needed } diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java index 9bb2a3c..6b1d4d4 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java @@ -10,6 +10,7 @@ import com.datamate.cleaning.domain.repository.CleaningResultRepository; import com.datamate.cleaning.domain.repository.CleaningTaskRepository; import com.datamate.cleaning.domain.repository.OperatorInstanceRepository; +import com.datamate.cleaning.infrastructure.validator.CleanTaskValidator; import com.datamate.cleaning.interfaces.dto.CleaningProcess; import com.datamate.cleaning.interfaces.dto.CleaningTaskDto; import com.datamate.cleaning.interfaces.dto.CreateCleaningTaskRequest; @@ -59,6 +60,8 @@ public class CleaningTaskService { private final DatasetFileApplicationService datasetFileService; + private final CleanTaskValidator cleanTaskValidator; + private final String DATASET_PATH = "/dataset"; private final String FLOW_PATH = "/flow"; @@ -80,6 +83,9 @@ public class CleaningTaskService { @Transactional public CleaningTaskDto createTask(CreateCleaningTaskRequest request) { + cleanTaskValidator.checkNameDuplication(request.getName()); + cleanTaskValidator.checkInputAndOutput(request.getInstance()); + CreateDatasetRequest createDatasetRequest = new CreateDatasetRequest(); createDatasetRequest.setName(request.getDestDatasetName()); createDatasetRequest.setDatasetType(DatasetType.valueOf(request.getDestDatasetType())); diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTemplateService.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTemplateService.java index 66e7600..bfd499b 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTemplateService.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTemplateService.java @@ -5,6 +5,8 @@ import com.datamate.cleaning.domain.repository.CleaningTemplateRepository; import com.datamate.cleaning.domain.repository.OperatorInstanceRepository; import com.datamate.cleaning.interfaces.dto.*; import com.datamate.cleaning.domain.model.entity.TemplateWithInstance; +import com.datamate.operator.domain.repository.OperatorRepository; +import com.datamate.operator.interfaces.dto.OperatorDto; import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; @@ -24,8 +26,10 @@ public class CleaningTemplateService { private final OperatorInstanceRepository operatorInstanceRepo; + private final OperatorRepository operatorRepo; + public List getTemplates(String keywords) { - List allOperators = operatorInstanceRepo.findAllOperators(); + List allOperators = operatorRepo.findAllOperators(); Map operatorsMap = allOperators.stream() .collect(Collectors.toMap(OperatorDto::getId, Function.identity())); List allTemplates = cleaningTemplateRepo.findAllTemplates(keywords); diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/common/exception/CleanErrorCode.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/common/exception/CleanErrorCode.java index 9548d39..ff7e2d4 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/common/exception/CleanErrorCode.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/common/exception/CleanErrorCode.java @@ -12,7 +12,7 @@ public enum CleanErrorCode implements ErrorCode { */ DUPLICATE_TASK_NAME("clean.0001", "清洗任务名称重复"), - CREATE_DATASET_FAILED("clean.0002", "创建数据集失败"); + IN_AND_OUT_NOT_MATCH("clean.0002", "算子输入输出不匹配"); private final String code; private final String message; diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/CreateDatasetRequest.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/CreateDatasetRequest.java deleted file mode 100644 index 5c19006..0000000 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/CreateDatasetRequest.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.datamate.cleaning.domain.model; - -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.Setter; - -import java.util.List; - - -@Getter -@Setter -@NoArgsConstructor -public class CreateDatasetRequest { - /** 数据集名称 */ - private String name; - /** 数据集描述 */ - private String description; - /** 数据集类型 */ - private String datasetType; - /** 标签列表 */ - private List tags; - /** 数据源 */ - private String dataSource; - /** 目标位置 */ - private String targetLocation; -} diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/DatasetFileResponse.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/DatasetFileResponse.java deleted file mode 100644 index e313373..0000000 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/DatasetFileResponse.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.datamate.cleaning.domain.model; - -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.Setter; - -import java.time.LocalDateTime; - - -@Getter -@Setter -@NoArgsConstructor -public class DatasetFileResponse { - /** 文件ID */ - private String id; - /** 文件名 */ - private String fileName; - /** 原始文件名 */ - private String originalName; - /** 文件类型 */ - private String fileType; - /** 文件大小(字节) */ - private Long fileSize; - /** 文件状态 */ - private String status; - /** 文件描述 */ - private String description; - /** 文件路径 */ - private String filePath; - /** 上传时间 */ - private LocalDateTime uploadTime; - /** 最后更新时间 */ - private LocalDateTime lastAccessTime; - /** 上传者 */ - private String uploadedBy; -} diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/DatasetResponse.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/DatasetResponse.java deleted file mode 100644 index 2c63a0e..0000000 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/DatasetResponse.java +++ /dev/null @@ -1,44 +0,0 @@ -package com.datamate.cleaning.domain.model; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.Setter; - -import java.time.LocalDateTime; - -/** - * 数据集实体(与数据库表 t_dm_datasets 对齐) - */ -@Getter -@Setter -@NoArgsConstructor -@JsonIgnoreProperties(ignoreUnknown = true) -public class DatasetResponse { - /** 数据集ID */ - private String id; - /** 数据集名称 */ - private String name; - /** 数据集描述 */ - private String description; - /** 数据集类型 */ - private String datasetType; - /** 数据集状态 */ - private String status; - /** 数据源 */ - private String dataSource; - /** 目标位置 */ - private String targetLocation; - /** 文件数量 */ - private Integer fileCount; - /** 总大小(字节) */ - private Long totalSize; - /** 完成率(0-100) */ - private Float completionRate; - /** 创建时间 */ - private LocalDateTime createdAt; - /** 更新时间 */ - private LocalDateTime updatedAt; - /** 创建者 */ - private String createdBy; -} diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/DatasetTypeResponse.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/DatasetTypeResponse.java deleted file mode 100644 index b085583..0000000 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/DatasetTypeResponse.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.datamate.cleaning.domain.model; - -import lombok.Getter; -import lombok.Setter; -import java.util.List; - -/** - * 数据集类型响应DTO - */ -@Getter -@Setter -public class DatasetTypeResponse { - /** 类型编码 */ - private String code; - /** 类型名称 */ - private String name; - /** 类型描述 */ - private String description; - /** 支持的文件格式 */ - private List supportedFormats; - /** 图标 */ - private String icon; -} diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/PagedDatasetFileResponse.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/PagedDatasetFileResponse.java deleted file mode 100644 index da33f17..0000000 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/PagedDatasetFileResponse.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.datamate.cleaning.domain.model; - -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.Setter; - -import java.util.List; - - -@Getter -@Setter -@NoArgsConstructor -public class PagedDatasetFileResponse { - /** 文件内容列表 */ - private List content; - /** 当前页码 */ - private Integer page; - /** 每页大小 */ - private Integer size; - /** 总元素数 */ - private Integer totalElements; - /** 总页数 */ - private Integer totalPages; - /** 是否为第一页 */ - private Boolean first; - /** 是否为最后一页 */ - private Boolean last; -} diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/repository/CleaningTaskRepository.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/repository/CleaningTaskRepository.java index 58a06a0..e8aeb1f 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/repository/CleaningTaskRepository.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/repository/CleaningTaskRepository.java @@ -16,4 +16,6 @@ public interface CleaningTaskRepository extends IRepository { void updateTask(CleaningTaskDto task); void deleteTaskById(String taskId); + + boolean isNameExist(String name); } diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/repository/OperatorInstanceRepository.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/repository/OperatorInstanceRepository.java index 50b42e2..4fbcc2a 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/repository/OperatorInstanceRepository.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/repository/OperatorInstanceRepository.java @@ -1,15 +1,12 @@ package com.datamate.cleaning.domain.repository; import com.baomidou.mybatisplus.extension.repository.IRepository; -import com.datamate.cleaning.interfaces.dto.OperatorDto; import com.datamate.cleaning.interfaces.dto.OperatorInstanceDto; import com.datamate.cleaning.domain.model.entity.OperatorInstance; import java.util.List; public interface OperatorInstanceRepository extends IRepository { - List findAllOperators(); - void insertInstance(String instanceId, List instances); void deleteByInstanceId(String instanceId); diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/converter/OperatorInstanceConverter.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/converter/OperatorInstanceConverter.java index 6bdda57..eebeeec 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/converter/OperatorInstanceConverter.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/converter/OperatorInstanceConverter.java @@ -3,10 +3,10 @@ package com.datamate.cleaning.infrastructure.converter; import com.datamate.cleaning.domain.model.entity.OperatorInstance; import com.datamate.cleaning.domain.model.entity.Operator; -import com.datamate.cleaning.interfaces.dto.OperatorDto; import com.datamate.cleaning.interfaces.dto.OperatorInstanceDto; import com.datamate.common.infrastructure.exception.BusinessException; import com.datamate.common.infrastructure.exception.SystemErrorCode; +import com.datamate.operator.interfaces.dto.OperatorDto; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.mapstruct.Mapper; diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/Impl/CleaningTaskRepositoryImpl.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/Impl/CleaningTaskRepositoryImpl.java index 8cfedf9..455f8d5 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/Impl/CleaningTaskRepositoryImpl.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/Impl/CleaningTaskRepositoryImpl.java @@ -51,4 +51,10 @@ public class CleaningTaskRepositoryImpl extends CrudRepository queryWrapper = new LambdaQueryWrapper<>(); + queryWrapper.eq(CleaningTask::getName, name); + return mapper.exists(queryWrapper); + } } diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/Impl/OperatorInstanceRepositoryImpl.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/Impl/OperatorInstanceRepositoryImpl.java index 295d7c0..a3b197f 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/Impl/OperatorInstanceRepositoryImpl.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/Impl/OperatorInstanceRepositoryImpl.java @@ -3,7 +3,6 @@ package com.datamate.cleaning.infrastructure.persistence.Impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.repository.CrudRepository; import com.datamate.cleaning.infrastructure.converter.OperatorInstanceConverter; -import com.datamate.cleaning.interfaces.dto.OperatorDto; import com.datamate.cleaning.interfaces.dto.OperatorInstanceDto; import com.datamate.cleaning.domain.model.entity.OperatorInstance; import com.datamate.cleaning.domain.repository.OperatorInstanceRepository; @@ -20,11 +19,6 @@ public class OperatorInstanceRepositoryImpl extends CrudRepository findAllOperators() { - return OperatorInstanceConverter.INSTANCE.fromEntityToDto(mapper.findAllOperators()); - } - @Override public void insertInstance(String instanceId, List instances) { List operatorInstances = new ArrayList<>(); diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/mapper/OperatorInstanceMapper.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/mapper/OperatorInstanceMapper.java index 2649d86..0b0699d 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/mapper/OperatorInstanceMapper.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/persistence/mapper/OperatorInstanceMapper.java @@ -1,17 +1,10 @@ package com.datamate.cleaning.infrastructure.persistence.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; -import com.datamate.cleaning.domain.model.entity.Operator; import com.datamate.cleaning.domain.model.entity.OperatorInstance; import org.apache.ibatis.annotations.Mapper; -import org.apache.ibatis.annotations.Select; - -import java.util.List; @Mapper public interface OperatorInstanceMapper extends BaseMapper { - @Select("SELECT id, name, description, version, inputs, outputs, runtime, settings, is_star, created_at, " + - "updated_at FROM t_operator") - List findAllOperators(); } diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/validator/CleanTaskValidator.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/validator/CleanTaskValidator.java new file mode 100644 index 0000000..92f3d29 --- /dev/null +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/infrastructure/validator/CleanTaskValidator.java @@ -0,0 +1,40 @@ +package com.datamate.cleaning.infrastructure.validator; + +import com.datamate.cleaning.common.exception.CleanErrorCode; +import com.datamate.cleaning.domain.repository.CleaningTaskRepository; +import com.datamate.cleaning.interfaces.dto.OperatorInstanceDto; +import com.datamate.common.infrastructure.exception.BusinessException; +import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Locale; + + +@Component +@RequiredArgsConstructor +public class CleanTaskValidator { + private final CleaningTaskRepository cleaningTaskRepo; + + public void checkNameDuplication (String name) { + if (cleaningTaskRepo.isNameExist(name)) { + throw BusinessException.of(CleanErrorCode.DUPLICATE_TASK_NAME); + } + } + + public void checkInputAndOutput (List operators) { + if (operators == null || operators.size() <= 1) { + return; + } + for (int i = 1; i < operators.size(); i++) { + OperatorInstanceDto front = operators.get(i - 1); + OperatorInstanceDto back = operators.get(i); + if (!StringUtils.equals(front.getOutputs(), back.getInputs())) { + throw BusinessException.of(CleanErrorCode.IN_AND_OUT_NOT_MATCH, + String.format(Locale.ROOT, "ops(name: [%s, %s]) inputs and outputs does not match", + front.getName(), back.getName())); + } + } + } +} diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningTaskDto.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningTaskDto.java index be6fbc7..f2cc965 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningTaskDto.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningTaskDto.java @@ -5,6 +5,7 @@ import com.datamate.cleaning.common.enums.CleaningTaskStatusEnum; import java.time.LocalDateTime; import java.util.List; +import com.datamate.operator.interfaces.dto.OperatorDto; import lombok.Getter; import lombok.Setter; import org.springframework.format.annotation.DateTimeFormat; diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningTemplateDto.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningTemplateDto.java index b86bb3e..29c7102 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningTemplateDto.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningTemplateDto.java @@ -4,6 +4,7 @@ import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; +import com.datamate.operator.interfaces.dto.OperatorDto; import lombok.Getter; import lombok.Setter; import org.springframework.format.annotation.DateTimeFormat; diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/OperatorDto.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/OperatorDto.java deleted file mode 100644 index 7b07fd7..0000000 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/OperatorDto.java +++ /dev/null @@ -1,41 +0,0 @@ -package com.datamate.cleaning.interfaces.dto; - -import java.time.LocalDateTime; - -import lombok.Getter; -import lombok.Setter; -import org.springframework.format.annotation.DateTimeFormat; - -/** - * OperatorDto - */ - -@Getter -@Setter -public class OperatorDto { - - private String id; - - private String name; - - private String description; - - private String version; - - private String inputs; - - private String outputs; - - private String runtime; - - private String settings; - - private Boolean isStar; - - @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) - private LocalDateTime createdAt; - - @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) - private LocalDateTime updatedAt; -} - diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/OperatorInstanceDto.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/OperatorInstanceDto.java index 64439b5..5ea8216 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/OperatorInstanceDto.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/OperatorInstanceDto.java @@ -1,6 +1,7 @@ package com.datamate.cleaning.interfaces.dto; import java.util.HashMap; +import java.util.List; import java.util.Map; @@ -17,6 +18,14 @@ public class OperatorInstanceDto { private String id; + private String name; + + private String inputs; + + private String outputs; + + private List categories; + private Map overrides = new HashMap<>(); } diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/rest/CleaningTaskController.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/rest/CleaningTaskController.java index 04674b7..811f53e 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/rest/CleaningTaskController.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/rest/CleaningTaskController.java @@ -3,10 +3,8 @@ package com.datamate.cleaning.interfaces.rest; import com.datamate.cleaning.application.CleaningTaskService; import com.datamate.cleaning.interfaces.dto.CleaningTaskDto; import com.datamate.cleaning.interfaces.dto.CreateCleaningTaskRequest; -import com.datamate.common.infrastructure.common.Response; import com.datamate.common.interfaces.PagedResponse; import lombok.RequiredArgsConstructor; -import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; import java.util.List; @@ -19,41 +17,41 @@ public class CleaningTaskController { private final CleaningTaskService cleaningTaskService; @GetMapping - public ResponseEntity>> cleaningTasksGet( + public PagedResponse cleaningTasksGet( @RequestParam("page") Integer page, @RequestParam("size") Integer size, @RequestParam(value = "status", required = false) String status, @RequestParam(value = "keywords", required = false) String keywords) { List tasks = cleaningTaskService.getTasks(status, keywords, page, size); int count = cleaningTaskService.countTasks(status, keywords); int totalPages = (count + size + 1) / size; - return ResponseEntity.ok(Response.ok(PagedResponse.of(tasks, page, count, totalPages))); + return PagedResponse.of(tasks, page, count, totalPages); } @PostMapping - public ResponseEntity> cleaningTasksPost(@RequestBody CreateCleaningTaskRequest request) { - return ResponseEntity.ok(Response.ok(cleaningTaskService.createTask(request))); + public CleaningTaskDto cleaningTasksPost(@RequestBody CreateCleaningTaskRequest request) { + return cleaningTaskService.createTask(request); } @PostMapping("/{taskId}/stop") - public ResponseEntity> cleaningTasksStop(@PathVariable("taskId") String taskId) { + public String cleaningTasksStop(@PathVariable("taskId") String taskId) { cleaningTaskService.stopTask(taskId); - return ResponseEntity.ok(Response.ok(null)); + return taskId; } @PostMapping("/{taskId}/execute") - public ResponseEntity> cleaningTasksStart(@PathVariable("taskId") String taskId) { + public String cleaningTasksStart(@PathVariable("taskId") String taskId) { cleaningTaskService.executeTask(taskId); - return ResponseEntity.ok(Response.ok(null)); + return taskId; } @GetMapping("/{taskId}") - public ResponseEntity> cleaningTasksTaskIdGet(@PathVariable("taskId") String taskId) { - return ResponseEntity.ok(Response.ok(cleaningTaskService.getTask(taskId))); + public CleaningTaskDto cleaningTasksTaskIdGet(@PathVariable("taskId") String taskId) { + return cleaningTaskService.getTask(taskId); } @DeleteMapping("/{taskId}") - public ResponseEntity> cleaningTasksTaskIdDelete(@PathVariable("taskId") String taskId) { + public String cleaningTasksTaskIdDelete(@PathVariable("taskId") String taskId) { cleaningTaskService.deleteTask(taskId); - return ResponseEntity.ok(Response.ok(null)); + return taskId; } } diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/rest/CleaningTemplateController.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/rest/CleaningTemplateController.java index ffa74cc..5079500 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/rest/CleaningTemplateController.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/rest/CleaningTemplateController.java @@ -4,10 +4,8 @@ import com.datamate.cleaning.application.CleaningTemplateService; import com.datamate.cleaning.interfaces.dto.CleaningTemplateDto; import com.datamate.cleaning.interfaces.dto.CreateCleaningTemplateRequest; import com.datamate.cleaning.interfaces.dto.UpdateCleaningTemplateRequest; -import com.datamate.common.infrastructure.common.Response; import com.datamate.common.interfaces.PagedResponse; import lombok.RequiredArgsConstructor; -import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; @@ -29,14 +27,14 @@ public class CleaningTemplateController { private final CleaningTemplateService cleaningTemplateService; @GetMapping - public ResponseEntity>> cleaningTemplatesGet( + public PagedResponse cleaningTemplatesGet( @RequestParam(value = "page", required = false) Integer page, @RequestParam(value = "size", required = false) Integer size, @RequestParam(value = "keywords", required = false) String keyword) { List templates = cleaningTemplateService.getTemplates(keyword); if (page == null || size == null) { - return ResponseEntity.ok(Response.ok(PagedResponse.of(templates.stream() - .sorted(Comparator.comparing(CleaningTemplateDto::getCreatedAt).reversed()).toList()))); + return PagedResponse.of(templates.stream() + .sorted(Comparator.comparing(CleaningTemplateDto::getCreatedAt).reversed()).toList()); } int count = templates.size(); int totalPages = (count + size + 1) / size; @@ -44,31 +42,31 @@ public class CleaningTemplateController { .sorted(Comparator.comparing(CleaningTemplateDto::getCreatedAt).reversed()) .skip((long) page * size) .limit(size).toList(); - return ResponseEntity.ok(Response.ok(PagedResponse.of(limitTemplates, page, count, totalPages))); + return PagedResponse.of(limitTemplates, page, count, totalPages); } @PostMapping - public ResponseEntity> cleaningTemplatesPost( + public CleaningTemplateDto cleaningTemplatesPost( @RequestBody CreateCleaningTemplateRequest request) { - return ResponseEntity.ok(Response.ok(cleaningTemplateService.createTemplate(request))); + return cleaningTemplateService.createTemplate(request); } @GetMapping("/{templateId}") - public ResponseEntity> cleaningTemplatesTemplateIdGet( + public CleaningTemplateDto cleaningTemplatesTemplateIdGet( @PathVariable("templateId") String templateId) { - return ResponseEntity.ok(Response.ok(cleaningTemplateService.getTemplate(templateId))); + return cleaningTemplateService.getTemplate(templateId); } @PutMapping("/{templateId}") - public ResponseEntity> cleaningTemplatesTemplateIdPut( + public CleaningTemplateDto cleaningTemplatesTemplateIdPut( @PathVariable("templateId") String templateId, @RequestBody UpdateCleaningTemplateRequest request) { - return ResponseEntity.ok(Response.ok(cleaningTemplateService.updateTemplate(templateId, request))); + return cleaningTemplateService.updateTemplate(templateId, request); } @DeleteMapping("/{templateId}") - public ResponseEntity> cleaningTemplatesTemplateIdDelete( + public String cleaningTemplatesTemplateIdDelete( @PathVariable("templateId") String templateId) { cleaningTemplateService.deleteTemplate(templateId); - return ResponseEntity.noContent().build(); + return templateId; } } diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/application/CategoryService.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/application/CategoryService.java index 77f9c63..190aed9 100644 --- a/backend/services/operator-market-service/src/main/java/com/datamate/operator/application/CategoryService.java +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/application/CategoryService.java @@ -6,13 +6,16 @@ import com.datamate.operator.domain.repository.CategoryRepository; import com.datamate.operator.interfaces.dto.CategoryDto; import com.datamate.operator.interfaces.dto.CategoryRelationDto; import com.datamate.operator.interfaces.dto.CategoryTreeResponse; -import com.datamate.operator.interfaces.dto.SubCategory; import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; +import java.time.LocalDateTime; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import java.util.stream.Collectors; @Service @@ -26,37 +29,40 @@ public class CategoryService { List allCategories = categoryRepo.findAllCategories(); List allRelations = categoryRelationRepo.findAllRelation(); - Map relationMap = allRelations.stream() + Map relationMap = allRelations.stream() .collect(Collectors.groupingBy( CategoryRelationDto::getCategoryId, Collectors.collectingAndThen(Collectors.counting(), Math::toIntExact))); - Map nameMap = allCategories.stream() - .collect(Collectors.toMap(CategoryDto::getId, CategoryDto::getName)); - Map> groupedByParentId = allCategories.stream() - .filter(relation -> relation.getParentId() > 0) + Map nameMap = allCategories.stream() + .collect(Collectors.toMap(CategoryDto::getId, Function.identity())); + Map> groupedByParentId = allCategories.stream() + .filter(relation -> !StringUtils.equals(relation.getParentId(), "0")) .collect(Collectors.groupingBy(CategoryDto::getParentId)); return groupedByParentId.entrySet().stream() - .sorted(Map.Entry.comparingByKey()) + .sorted(categoryComparator(nameMap)) .map(entry -> { - Integer parentId = entry.getKey(); + String parentId = entry.getKey(); List group = entry.getValue(); CategoryTreeResponse response = new CategoryTreeResponse(); response.setId(parentId); - response.setName(nameMap.get(parentId)); + response.setName(nameMap.get(parentId).getName()); AtomicInteger totalCount = new AtomicInteger(); - response.setCategories(group.stream().map(category -> { - SubCategory subCategory = new SubCategory(); - subCategory.setId(category.getId()); - subCategory.setName(category.getName()); - subCategory.setCount(relationMap.getOrDefault(category.getId(), 0)); + response.setCategories(group.stream().peek(category -> { + category.setCount(relationMap.getOrDefault(category.getId(), 0)); totalCount.getAndAdd(relationMap.getOrDefault(category.getId(), 0)); - subCategory.setParentId(parentId); - return subCategory; - }).toList()); + }).sorted(Comparator.comparing(CategoryDto::getCreatedAt)).toList()); response.setCount(totalCount.get()); return response; }).toList(); } + + private Comparator>> categoryComparator(Map categoryMap) { + return (entry1, entry2) -> { + LocalDateTime index1 = categoryMap.get(entry1.getKey()).getCreatedAt(); + LocalDateTime index2 = categoryMap.get(entry2.getKey()).getCreatedAt(); + return index1.compareTo(index2); + }; + } } diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/application/LabelService.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/application/LabelService.java deleted file mode 100644 index f9fe8be..0000000 --- a/backend/services/operator-market-service/src/main/java/com/datamate/operator/application/LabelService.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.datamate.operator.application; - -import com.datamate.operator.interfaces.dto.LabelDto; -import org.springframework.stereotype.Service; -import java.util.List; -import java.util.Collections; - -@Service -public class LabelService { - public List getLabels(Integer page, Integer size, String keyword) { - // TODO: 查询标签列表 - return Collections.emptyList(); - } - public void updateLabel(String id, List updateLabelDtoRequest) { - // TODO: 更新标签 - } - public void createLabels(LabelDto labelsPostRequest) { - // TODO: 批量创建标签 - } -} - diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/application/OperatorService.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/application/OperatorService.java index 54ff858..5cd1041 100644 --- a/backend/services/operator-market-service/src/main/java/com/datamate/operator/application/OperatorService.java +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/application/OperatorService.java @@ -1,15 +1,22 @@ package com.datamate.operator.application; +import com.datamate.common.domain.model.ChunkUploadPreRequest; +import com.datamate.common.domain.service.FileService; +import com.datamate.operator.domain.contants.OperatorConstant; import com.datamate.operator.infrastructure.converter.OperatorConverter; import com.datamate.operator.domain.model.OperatorView; import com.datamate.operator.domain.repository.CategoryRelationRepository; import com.datamate.operator.domain.repository.OperatorRepository; import com.datamate.operator.domain.repository.OperatorViewRepository; +import com.datamate.operator.infrastructure.parser.ParserHolder; import com.datamate.operator.interfaces.dto.OperatorDto; +import com.datamate.operator.interfaces.dto.UploadOperatorRequest; import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; -import org.springframework.web.multipart.MultipartFile; +import org.springframework.transaction.annotation.Transactional; +import java.io.File; import java.util.List; @Service @@ -21,14 +28,21 @@ public class OperatorService { private final CategoryRelationRepository relationRepo; - public List getOperators(Integer page, Integer size, List categories, + private final ParserHolder parserHolder; + + private final FileService fileService; + + @Value("${operator.base.path:/operator}") + private String operatorBasePath; + + public List getOperators(Integer page, Integer size, List categories, String operatorName, Boolean isStar) { List filteredOperators = operatorViewRepo.findOperatorsByCriteria(page, size, operatorName, categories, isStar); return filteredOperators.stream().map(OperatorConverter.INSTANCE::fromEntityToDto).toList(); } - public int getOperatorsCount(List categories, String operatorName, Boolean isStar) { + public int getOperatorsCount(List categories, String operatorName, Boolean isStar) { return operatorViewRepo.countOperatorsByCriteria(operatorName, categories, isStar); } @@ -37,20 +51,60 @@ public class OperatorService { return OperatorConverter.INSTANCE.fromEntityToDto(operator); } + @Transactional public OperatorDto createOperator(OperatorDto req) { operatorRepo.insertOperator(req); relationRepo.batchInsert(req.getId(), req.getCategories()); + parserHolder.extractTo(getFileType(req.getFileName()), getUploadPath(req.getFileName()), + getExtractPath(getFileNameWithoutExtension(req.getFileName()))); return getOperatorById(req.getId()); } + @Transactional public OperatorDto updateOperator(String id, OperatorDto req) { operatorRepo.updateOperator(req); relationRepo.batchInsert(id, req.getCategories()); + parserHolder.extractTo(getFileType(req.getFileName()), getUploadPath(req.getFileName()), + getExtractPath(getFileNameWithoutExtension(req.getFileName()))); return getOperatorById(id); } - public OperatorDto uploadOperator(MultipartFile file, String description) { - // TODO: 文件上传与解析 - return new OperatorDto(); + @Transactional + public void deleteOperator(String id) { + operatorRepo.deleteOperator(id); + relationRepo.deleteByOperatorId(id); + } + + public OperatorDto uploadOperator(String fileName) { + return parserHolder.parseYamlFromArchive(getFileType(fileName), new File(getUploadPath(fileName)), + OperatorConstant.YAML_PATH); + } + + public String preUpload() { + ChunkUploadPreRequest request = ChunkUploadPreRequest.builder().build(); + request.setUploadPath(operatorBasePath + File.separator + "upload"); + request.setTotalFileNum(1); + request.setServiceId(OperatorConstant.SERVICE_ID); + return fileService.preUpload(request); + } + + public void chunkUpload(UploadOperatorRequest request) { + fileService.chunkUpload(OperatorConverter.INSTANCE.toChunkRequest(request)); + } + + private String getFileType(String fileName) { + return fileName.substring(fileName.lastIndexOf('.') + 1); + } + + private String getFileNameWithoutExtension(String fileName) { + return fileName.substring(0, fileName.lastIndexOf('.')); + } + + private String getUploadPath(String fileName) { + return operatorBasePath + File.separator + "upload" + File.separator + fileName; + } + + private String getExtractPath(String fileName) { + return operatorBasePath + File.separator + "extract" + File.separator + fileName; } } diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/contants/OperatorConstant.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/contants/OperatorConstant.java new file mode 100644 index 0000000..35a871d --- /dev/null +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/contants/OperatorConstant.java @@ -0,0 +1,42 @@ +package com.datamate.operator.domain.contants; + +import java.util.HashMap; +import java.util.Map; + +public class OperatorConstant { + public static String SERVICE_ID = "operator"; + + public static String YAML_PATH = "metadata.yml"; + + public static String CATEGORY_PYTHON = "python"; + + public static String CATEGORY_PYTHON_ID = "9eda9d5d-072b-499b-916c-797a0a8750e1"; + + public static String CATEGORY_JAVA = "java"; + + public static String CATEGORY_JAVA_ID = "b5bfc548-8ef6-417c-b8a6-a4197c078249"; + + public static String CATEGORY_CUSTOMIZED_ID = "ec2cdd17-8b93-4a81-88c4-ac9e98d10757"; + + public static String CATEGORY_TEXT_ID = "d8a5df7a-52a9-42c2-83c4-01062e60f597"; + + public static String CATEGORY_IMAGE_ID = "de36b61c-9e8a-4422-8c31-d30585c7100f"; + + public static String CATEGORY_AUDIO_ID = "42dd9392-73e4-458c-81ff-41751ada47b5"; + + public static String CATEGORY_VIDEO_ID = "a233d584-73c8-4188-ad5d-8f7c8dda9c27"; + + public static String CATEGORY_ALL_ID = "4d7dbd77-0a92-44f3-9056-2cd62d4a71e4"; + + public static Map CATEGORY_MAP = new HashMap<>(); + + static { + CATEGORY_MAP.put(CATEGORY_PYTHON, CATEGORY_PYTHON_ID); + CATEGORY_MAP.put(CATEGORY_JAVA, CATEGORY_JAVA_ID); + CATEGORY_MAP.put("text", CATEGORY_TEXT_ID); + CATEGORY_MAP.put("image", CATEGORY_IMAGE_ID); + CATEGORY_MAP.put("audio", CATEGORY_AUDIO_ID); + CATEGORY_MAP.put("video", CATEGORY_VIDEO_ID); + CATEGORY_MAP.put("all", CATEGORY_ALL_ID); + } +} diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/model/Category.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/model/Category.java index c8f89c6..cd52f1a 100644 --- a/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/model/Category.java +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/model/Category.java @@ -4,15 +4,21 @@ import com.baomidou.mybatisplus.annotation.TableName; import lombok.Getter; import lombok.Setter; +import java.time.LocalDateTime; + @Setter @Getter @TableName(value = "t_operator_category", autoResultMap = true) public class Category { - private Integer id; + private String id; private String name; + private String value; + private String type; - private Integer parentId; + private String parentId; + + private LocalDateTime createdAt; } diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/model/CategoryRelation.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/model/CategoryRelation.java index 085b078..9d40cb9 100644 --- a/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/model/CategoryRelation.java +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/model/CategoryRelation.java @@ -10,7 +10,7 @@ import lombok.Setter; @AllArgsConstructor @TableName(value = "t_operator_category_relation", autoResultMap = true) public class CategoryRelation { - private Integer categoryId; + private String categoryId; private String operatorId; } diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/repository/CategoryRelationRepository.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/repository/CategoryRelationRepository.java index 962b071..aac1c05 100644 --- a/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/repository/CategoryRelationRepository.java +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/repository/CategoryRelationRepository.java @@ -3,7 +3,6 @@ package com.datamate.operator.domain.repository; import com.baomidou.mybatisplus.extension.repository.IRepository; import com.datamate.operator.domain.model.CategoryRelation; import com.datamate.operator.interfaces.dto.CategoryRelationDto; -import org.apache.ibatis.annotations.Param; import java.util.List; @@ -11,5 +10,7 @@ public interface CategoryRelationRepository extends IRepository findAllRelation(); - void batchInsert(@Param("operatorId") String operatorId, @Param("categories") List categories); + void batchInsert(String operatorId, List categories); + + void deleteByOperatorId(String operatorId); } diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/repository/OperatorRepository.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/repository/OperatorRepository.java index 546df44..298fd05 100644 --- a/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/repository/OperatorRepository.java +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/repository/OperatorRepository.java @@ -7,9 +7,11 @@ import com.datamate.operator.interfaces.dto.OperatorDto; import java.util.List; public interface OperatorRepository extends IRepository { - List findAllOperators(); + List findAllOperators(); void updateOperator(OperatorDto operator); void insertOperator(OperatorDto operator); + + void deleteOperator(String id); } diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/repository/OperatorViewRepository.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/repository/OperatorViewRepository.java index cacba83..fbbdf64 100644 --- a/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/repository/OperatorViewRepository.java +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/domain/repository/OperatorViewRepository.java @@ -7,9 +7,9 @@ import java.util.List; public interface OperatorViewRepository extends IRepository { List findOperatorsByCriteria(Integer page, Integer size, String operatorName, - List categories, Boolean isStar); + List categories, Boolean isStar); - Integer countOperatorsByCriteria(String operatorName, List categories, Boolean isStar); + Integer countOperatorsByCriteria(String operatorName, List categories, Boolean isStar); OperatorView findOperatorById(String id); } diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/converter/OperatorConverter.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/converter/OperatorConverter.java index ddccf0f..791356e 100644 --- a/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/converter/OperatorConverter.java +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/converter/OperatorConverter.java @@ -1,8 +1,10 @@ package com.datamate.operator.infrastructure.converter; +import com.datamate.common.domain.model.ChunkUploadRequest; import com.datamate.operator.domain.model.Operator; import com.datamate.operator.domain.model.OperatorView; import com.datamate.operator.interfaces.dto.OperatorDto; +import com.datamate.operator.interfaces.dto.UploadOperatorRequest; import org.mapstruct.Mapper; import org.mapstruct.Mapping; import org.mapstruct.Named; @@ -19,13 +21,17 @@ public interface OperatorConverter { @Mapping(target = "categories", source = "categories", qualifiedByName = "stringToList") OperatorDto fromEntityToDto(OperatorView operator); + List fromEntityToDto(List operator); + @Named("stringToList") - static List stringToList(String input) { + static List stringToList(String input) { if (input == null || input.isEmpty()) { return Collections.emptyList(); } - return Arrays.stream(input.split(",")).map(Integer::valueOf).toList(); + return Arrays.stream(input.split(",")).map(String::valueOf).toList(); } Operator fromDtoToEntity(OperatorDto operator); + + ChunkUploadRequest toChunkRequest(UploadOperatorRequest request); } diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/exception/OperatorErrorCode.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/exception/OperatorErrorCode.java new file mode 100644 index 0000000..8026ade --- /dev/null +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/exception/OperatorErrorCode.java @@ -0,0 +1,21 @@ +package com.datamate.operator.infrastructure.exception; + +import com.datamate.common.infrastructure.exception.ErrorCode; +import lombok.AllArgsConstructor; +import lombok.Getter; + +@Getter +@AllArgsConstructor +public enum OperatorErrorCode implements ErrorCode { + /** + * 不支持的文件类型 + */ + UNSUPPORTED_FILE_TYPE("op.0001", "不支持的文件类型"), + + YAML_NOT_FOUND("op.0002", "算子中缺少元数据文件"), + + FIELD_NOT_FOUND("op.0003", "缺少必要的字段"); + + private final String code; + private final String message; +} diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/parser/AbstractParser.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/parser/AbstractParser.java new file mode 100644 index 0000000..c4d0953 --- /dev/null +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/parser/AbstractParser.java @@ -0,0 +1,80 @@ +package com.datamate.operator.infrastructure.parser; + +import com.datamate.common.infrastructure.exception.BusinessException; +import com.datamate.common.infrastructure.exception.SystemErrorCode; +import com.datamate.operator.domain.contants.OperatorConstant; +import com.datamate.operator.infrastructure.exception.OperatorErrorCode; +import com.datamate.operator.interfaces.dto.OperatorDto; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.yaml.snakeyaml.LoaderOptions; +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.constructor.SafeConstructor; + +import java.io.File; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +public abstract class AbstractParser { + protected ObjectMapper objectMapper = new ObjectMapper(); + + protected OperatorDto parseYaml(InputStream yamlContent) { + Yaml yaml = new Yaml(new SafeConstructor(new LoaderOptions())); + Map content = yaml.load(yamlContent); + OperatorDto operator = new OperatorDto(); + operator.setId(toStringIfNotNull(content.get("raw_id"))); + operator.setName(toStringIfNotNull(content.get("name"))); + operator.setDescription(toStringIfNotNull(content.get("description"))); + operator.setVersion(toStringIfNotNull(content.get("version"))); + operator.setInputs(toStringIfNotNull(content.get("inputs"))); + operator.setOutputs(toStringIfNotNull(content.get("outputs"))); + operator.setRuntime(toJsonIfNotNull(content.get("runtime"))); + operator.setSettings(toJsonIfNotNull(content.get("settings"))); + List categories = new ArrayList<>(); + categories.add(OperatorConstant.CATEGORY_MAP.get(toLowerCaseIfNotNull(content.get("language")))); + categories.add(OperatorConstant.CATEGORY_MAP.get(toLowerCaseIfNotNull(content.get("modal")))); + categories.add(OperatorConstant.CATEGORY_CUSTOMIZED_ID); + operator.setCategories(categories); + return operator; + }; + + /** + * 从压缩包内读取指定路径的 yaml 文件并解析为指定类型 + * @param archive 压缩包路径(zip 或 tar) + * @param entryPath 压缩包内部的文件路径,例如 "config/app.yaml" 或 "./config/app.yaml" + * @return 解析后的对象 + */ + public abstract OperatorDto parseYamlFromArchive(File archive, String entryPath); + + /** + * 将压缩包解压到目标目录(保持相对路径) + * @param archive 压缩包路径 + * @param targetDir 目标目录 + */ + public abstract void extractTo(File archive, String targetDir); + + private String toStringIfNotNull(Object obj) { + if (obj == null) { + throw BusinessException.of(OperatorErrorCode.FIELD_NOT_FOUND); + } + return obj.toString(); + } + + private String toLowerCaseIfNotNull(Object obj) { + if (obj == null) { + throw BusinessException.of(OperatorErrorCode.FIELD_NOT_FOUND); + } + return obj.toString().toLowerCase(Locale.ROOT); + } + + private String toJsonIfNotNull(Object obj) { + try { + return obj == null ? null : objectMapper.writeValueAsString(obj); + } catch (JsonProcessingException e) { + throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR, e.getMessage()); + } + } +} diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/parser/ParserHolder.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/parser/ParserHolder.java new file mode 100644 index 0000000..374d8c5 --- /dev/null +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/parser/ParserHolder.java @@ -0,0 +1,62 @@ +package com.datamate.operator.infrastructure.parser; + +import com.datamate.common.infrastructure.exception.BusinessException; +import com.datamate.common.infrastructure.exception.SystemErrorCode; +import com.datamate.operator.infrastructure.exception.OperatorErrorCode; +import com.datamate.operator.interfaces.dto.OperatorDto; +import jakarta.annotation.PostConstruct; +import org.springframework.stereotype.Component; + +import java.io.File; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Component +public class ParserHolder { + // 存放 parser:key 为 parser 类型标识(例如 "zip" 或 "tar"),value 为 parser 实例 + private final Map parserMap = new ConcurrentHashMap<>(); + + // 注册 parser(可在启动时调用) + public void registerParser(String type, AbstractParser parser) { + if (type == null || parser == null) { + throw BusinessException.of(SystemErrorCode.UNKNOWN_ERROR); + } + parserMap.put(type, parser); + } + + // 根据类型获取 parser(可能为 null) + public AbstractParser getParser(String type) { + return parserMap.get(type); + } + + // 便捷代理:从指定类型的压缩包中读取 entry 并解析为 clazz + public OperatorDto parseYamlFromArchive(String type, File archive, String entryPath) { + AbstractParser parser = getParser(type); + if (parser == null) { + throw BusinessException.of(OperatorErrorCode.UNSUPPORTED_FILE_TYPE, + "No parser registered for type: " + type); + } + return parser.parseYamlFromArchive(archive, entryPath); + } + + // 便捷代理:将指定类型的压缩包解压到目标目录 + public void extractTo(String type, File archive, String targetDir) { + AbstractParser parser = getParser(type); + if (parser == null) { + throw BusinessException.of(OperatorErrorCode.UNSUPPORTED_FILE_TYPE, + "No parser registered for type: " + type); + } + parser.extractTo(archive, targetDir); + } + + public void extractTo(String type, String sourceDir, String targetDir) { + extractTo(type, new File(sourceDir), targetDir); + } + + @PostConstruct + public void init() { + // 注册 zip 和 tar parser,key 可根据需要调整(例如 "zip"/"tar") + registerParser("zip", new ZipParser()); + registerParser("tar", new TarParser()); + } +} diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/parser/TarParser.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/parser/TarParser.java new file mode 100644 index 0000000..b363025 --- /dev/null +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/parser/TarParser.java @@ -0,0 +1,77 @@ +package com.datamate.operator.infrastructure.parser; + +import com.datamate.common.infrastructure.exception.BusinessException; +import com.datamate.common.infrastructure.exception.SystemErrorCode; +import com.datamate.operator.infrastructure.exception.OperatorErrorCode; +import com.datamate.operator.interfaces.dto.OperatorDto; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Objects; + +public class TarParser extends AbstractParser { + + @Override + public OperatorDto parseYamlFromArchive(File archive, String entryPath) { + // 允许带或不带前导 "./" + String normalized = entryPath.startsWith("./") ? entryPath.substring(2) : entryPath; + try (InputStream fis = Files.newInputStream(archive.toPath()); + TarArchiveInputStream tis = new TarArchiveInputStream(fis)) { + TarArchiveEntry entry; + while ((entry = tis.getNextEntry()) != null) { + String name = entry.getName(); + if (Objects.equals(name, entryPath) || Objects.equals(name, normalized)) { + // 使用 SnakeYAML 解析当前 entry 的内容到目标类型 + return parseYaml(tis); + } + } + } catch (IOException e) { + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR, e.getMessage()); + } + throw BusinessException.of(OperatorErrorCode.YAML_NOT_FOUND, "Entry not found in tar: " + entryPath); + } + + @Override + public void extractTo(File archive, String targetDir) { + Path targetPath = Paths.get(targetDir); + try (InputStream fis = Files.newInputStream(archive.toPath()); + TarArchiveInputStream tis = new TarArchiveInputStream(fis)) { + Files.createDirectories(targetPath); + TarArchiveEntry entry; + while ((entry = tis.getNextEntry()) != null) { + String entryName = entry.getName(); + // 去掉可能的前导 "./" + if (entryName.startsWith("./")) { + entryName = entryName.substring(2); + } + + Path resolved = targetPath.resolve(entryName).toAbsolutePath().normalize(); + if (!resolved.startsWith(targetPath.toAbsolutePath().normalize())) { + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR, "Bad tar entry: " + entryName); + } + + if (entry.isDirectory()) { + Files.createDirectories(resolved); + } else { + Files.createDirectories(resolved.getParent()); + try (OutputStream os = Files.newOutputStream(resolved)) { + byte[] buffer = new byte[8192]; + int len; + while ((len = tis.read(buffer)) != -1) { + os.write(buffer, 0, len); + } + } + } + } + } catch (IOException e) { + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR, e.getMessage()); + } + } +} diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/parser/ZipParser.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/parser/ZipParser.java new file mode 100644 index 0000000..f02bc25 --- /dev/null +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/parser/ZipParser.java @@ -0,0 +1,77 @@ +package com.datamate.operator.infrastructure.parser; + +import com.datamate.common.infrastructure.exception.BusinessException; +import com.datamate.common.infrastructure.exception.SystemErrorCode; +import com.datamate.operator.infrastructure.exception.OperatorErrorCode; +import com.datamate.operator.interfaces.dto.OperatorDto; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.LinkOption; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Enumeration; +import java.util.zip.ZipEntry; +import java.util.zip.ZipFile; + +public class ZipParser extends AbstractParser { + + @Override + public OperatorDto parseYamlFromArchive(File archive, String entryPath) { + try (ZipFile zipFile = new ZipFile(archive)) { + // 允许带或不带前导 "./" + String normalized = entryPath.startsWith("./") ? entryPath.substring(2) : entryPath; + ZipEntry entry = zipFile.getEntry(entryPath); + if (entry == null) { + entry = zipFile.getEntry(normalized); + } + if (entry == null) { + throw BusinessException.of(OperatorErrorCode.YAML_NOT_FOUND, "Entry not found in zip: " + entryPath); + } + try (InputStream is = zipFile.getInputStream(entry)) { + // 使用 SnakeYAML 解析为目标类型 + return parseYaml(is); + } + } catch (IOException e) { + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR, e.getMessage()); + } + } + + @Override + public void extractTo(File archive, String targetDir) { + Path targetPath = Paths.get(targetDir); + try (ZipFile zipFile = new ZipFile(archive)) { + Files.createDirectories(targetPath); + Enumeration entries = zipFile.entries(); + while (entries.hasMoreElements()) { + ZipEntry entry = entries.nextElement(); + String entryName = entry.getName(); + + // 防止 Zip Slip:确保解压路径仍在 targetDir 下 + Path resolved = targetPath.resolve(entryName).toAbsolutePath().normalize(); + if (!resolved.startsWith(targetPath.toAbsolutePath().normalize())) { + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR, "Bad zip entry: " + entryName); + } + + if (entry.isDirectory()) { + Files.createDirectories(resolved); + } else { + Files.createDirectories(resolved.getParent()); + try (InputStream is = zipFile.getInputStream(entry); + OutputStream os = Files.newOutputStream(resolved)) { + byte[] buffer = new byte[8192]; + int len; + while ((len = is.read(buffer)) != -1) { + os.write(buffer, 0, len); + } + } + } + } + } catch (IOException e) { + throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR, e.getMessage()); + } + } +} diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/persistence/Impl/CategoryRelationRepositoryImpl.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/persistence/Impl/CategoryRelationRepositoryImpl.java index e807cb4..8184ea6 100644 --- a/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/persistence/Impl/CategoryRelationRepositoryImpl.java +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/persistence/Impl/CategoryRelationRepositoryImpl.java @@ -1,5 +1,6 @@ package com.datamate.operator.infrastructure.persistence.Impl; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.repository.CrudRepository; import com.datamate.operator.domain.model.CategoryRelation; import com.datamate.operator.domain.repository.CategoryRelationRepository; @@ -23,10 +24,17 @@ public class CategoryRelationRepositoryImpl extends CrudRepository categories) { + public void batchInsert(String operatorId, List categories) { List categoryRelations = categories.stream() .map(category -> new CategoryRelation(category, operatorId)) .toList(); mapper.insert(categoryRelations); } + + @Override + public void deleteByOperatorId(String operatorId) { + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + queryWrapper.eq(CategoryRelation::getOperatorId, operatorId); + mapper.delete(queryWrapper); + } } diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/persistence/Impl/OperatorRepositoryImpl.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/persistence/Impl/OperatorRepositoryImpl.java index 58a6961..0eabdd4 100644 --- a/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/persistence/Impl/OperatorRepositoryImpl.java +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/infrastructure/persistence/Impl/OperatorRepositoryImpl.java @@ -17,8 +17,8 @@ public class OperatorRepositoryImpl extends CrudRepository findAllOperators() { - return mapper.selectList(null); + public List findAllOperators() { + return OperatorConverter.INSTANCE.fromEntityToDto(mapper.selectList(null)); } @Override @@ -30,4 +30,9 @@ public class OperatorRepositoryImpl extends CrudRepository findOperatorsByCriteria(Integer page, Integer size, String operatorName, - List categories, Boolean isStar) { + List categories, Boolean isStar) { QueryWrapper queryWrapper = Wrappers.query(); queryWrapper.in(CollectionUtils.isNotEmpty(categories), "category_id", categories) .like(StringUtils.isNotBlank(operatorName), "operator_name", operatorName) @@ -37,7 +37,7 @@ public class OperatorViewRepositoryImpl extends CrudRepository categories, Boolean isStar) { + public Integer countOperatorsByCriteria(String operatorName, List categories, Boolean isStar) { QueryWrapper queryWrapper = Wrappers.query(); queryWrapper.in(CollectionUtils.isNotEmpty(categories),"category_id", categories) .like(StringUtils.isNotBlank(operatorName), "operator_name", operatorName) diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/dto/CategoryDto.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/dto/CategoryDto.java index 9681aa3..bd29df0 100644 --- a/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/dto/CategoryDto.java +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/dto/CategoryDto.java @@ -3,14 +3,22 @@ package com.datamate.operator.interfaces.dto; import lombok.Getter; import lombok.Setter; +import java.time.LocalDateTime; + @Setter @Getter public class CategoryDto { - private Integer id; + private String id; private String name; + private String value; + + private long count; + private String type; - private Integer parentId; + private String parentId; + + private LocalDateTime createdAt; } diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/dto/CategoryRelationDto.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/dto/CategoryRelationDto.java index 7f95760..e573d6b 100644 --- a/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/dto/CategoryRelationDto.java +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/dto/CategoryRelationDto.java @@ -6,7 +6,7 @@ import lombok.Setter; @Setter @Getter public class CategoryRelationDto { - private Integer categoryId; + private String categoryId; private String operatorId; } diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/dto/CategoryTreeResponse.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/dto/CategoryTreeResponse.java index be6d6a6..4e80506 100644 --- a/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/dto/CategoryTreeResponse.java +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/dto/CategoryTreeResponse.java @@ -12,11 +12,11 @@ import java.util.List; @Setter @NoArgsConstructor public class CategoryTreeResponse { - private Integer id; + private String id; private String name; private Integer count; - private List categories = new ArrayList<>(); + private List categories = new ArrayList<>(); } diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/dto/OperatorDto.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/dto/OperatorDto.java index 810ba0b..249745a 100644 --- a/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/dto/OperatorDto.java +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/dto/OperatorDto.java @@ -14,32 +14,32 @@ import java.util.List; @Getter @Setter public class OperatorDto { - private String id; + private String id; - private String name; + private String name; - private String description; + private String description; - private String version; + private String version; - private String inputs; + private String inputs; - private String outputs; + private String outputs; - private List categories; + private List categories; - private String runtime; + private String runtime; - private String settings; + private String settings; - private String fileName; + private String fileName; - private Boolean isStar; + private Boolean isStar; - @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) - private LocalDateTime createdAt; + @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) + private LocalDateTime createdAt; - @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) - private LocalDateTime updatedAt; + @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) + private LocalDateTime updatedAt; } diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/dto/OperatorsListPostRequest.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/dto/OperatorsListPostRequest.java index 9731579..fde4507 100644 --- a/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/dto/OperatorsListPostRequest.java +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/dto/OperatorsListPostRequest.java @@ -19,7 +19,7 @@ public class OperatorsListPostRequest { private Integer size; - private List categories = new ArrayList<>(); + private List categories = new ArrayList<>(); private String operatorName; diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/dto/SubCategory.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/dto/SubCategory.java deleted file mode 100644 index dc9408a..0000000 --- a/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/dto/SubCategory.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.datamate.operator.interfaces.dto; - -import lombok.Getter; -import lombok.Setter; - -@Getter -@Setter -public class SubCategory { - private long id; - - private String name; - - private long count; - - private String type; - - private long parentId; -} diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/dto/UploadOperatorRequest.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/dto/UploadOperatorRequest.java new file mode 100644 index 0000000..c6ebb61 --- /dev/null +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/dto/UploadOperatorRequest.java @@ -0,0 +1,34 @@ +package com.datamate.operator.interfaces.dto; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.web.multipart.MultipartFile; + +/** + * 上传文件请求 + * 用于分块上传文件时的请求参数封装,支持大文件分片上传功能 + */ +@Getter +@Setter +public class UploadOperatorRequest { + /** 预上传返回的id,用来确认同一个任务 */ + private String reqId; + + /** 文件编号,用于标识批量上传中的第几个文件 */ + private int fileNo; + + /** 文件名称 */ + private String fileName; + + /** 文件总分块数量 */ + private int totalChunkNum; + + /** 当前分块编号,从1开始 */ + private int chunkNo; + + /** 上传的文件分块内容 */ + private MultipartFile file; + + /** 文件分块的校验和(十六进制字符串),用于验证文件完整性 */ + private String checkSumHex; +} diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/rest/CategoryController.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/rest/CategoryController.java index d9bd462..2a7dca8 100644 --- a/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/rest/CategoryController.java +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/rest/CategoryController.java @@ -1,11 +1,9 @@ package com.datamate.operator.interfaces.rest; -import com.datamate.common.infrastructure.common.Response; import com.datamate.common.interfaces.PagedResponse; import com.datamate.operator.application.CategoryService; import com.datamate.operator.interfaces.dto.CategoryTreeResponse; import lombok.RequiredArgsConstructor; -import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @@ -20,8 +18,8 @@ public class CategoryController { private final CategoryService categoryService; @GetMapping("/tree") - public ResponseEntity>> categoryTreeGet() { + public PagedResponse categoryTreeGet() { List allCategories = categoryService.getAllCategories(); - return ResponseEntity.ok(Response.ok(PagedResponse.of(allCategories))); + return PagedResponse.of(allCategories); } } diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/rest/LabelController.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/rest/LabelController.java deleted file mode 100644 index ec07a8b..0000000 --- a/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/rest/LabelController.java +++ /dev/null @@ -1,40 +0,0 @@ -package com.datamate.operator.interfaces.rest; - -import com.datamate.common.infrastructure.common.Response; -import com.datamate.common.interfaces.PagedResponse; -import com.datamate.operator.application.LabelService; -import com.datamate.operator.interfaces.dto.LabelDto; -import lombok.RequiredArgsConstructor; -import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.*; - -import java.util.List; - -@RestController -@RequestMapping("/labels") -@RequiredArgsConstructor -public class LabelController { - private final LabelService labelService; - - @GetMapping - public ResponseEntity>> labelsGet(@RequestParam("page") Integer page, - @RequestParam("size") Integer size, - @RequestParam("keyword") String keyword) { - return ResponseEntity.ok(Response.ok(PagedResponse.of(labelService.getLabels(page, size, keyword)))); - } - - @PutMapping("/{id}") - public ResponseEntity> labelsIdPut(@PathVariable("id") String id, - @RequestBody List updateLabelDtoRequest) { - labelService.updateLabel(id, updateLabelDtoRequest); - return ResponseEntity.ok(Response.ok(null)); - } - - @PostMapping - public ResponseEntity> labelsPost(@RequestBody LabelDto labelsPostRequest) { - labelService.createLabels(labelsPostRequest); - return ResponseEntity.ok(Response.ok(null)); - } -} - - diff --git a/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/rest/OperatorController.java b/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/rest/OperatorController.java index a201b96..ab07fd4 100644 --- a/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/rest/OperatorController.java +++ b/backend/services/operator-market-service/src/main/java/com/datamate/operator/interfaces/rest/OperatorController.java @@ -1,14 +1,13 @@ package com.datamate.operator.interfaces.rest; -import com.datamate.common.infrastructure.common.Response; import com.datamate.common.interfaces.PagedResponse; import com.datamate.operator.application.OperatorService; import com.datamate.operator.interfaces.dto.OperatorDto; import com.datamate.operator.interfaces.dto.OperatorsListPostRequest; +import com.datamate.operator.interfaces.dto.UploadOperatorRequest; import lombok.RequiredArgsConstructor; -import org.springframework.http.ResponseEntity; +import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.*; -import org.springframework.web.multipart.MultipartFile; import java.util.List; @@ -19,34 +18,48 @@ public class OperatorController { private final OperatorService operatorService; @PostMapping("/list") - public ResponseEntity>> operatorsListPost(@RequestBody OperatorsListPostRequest request) { + public PagedResponse operatorsListPost(@RequestBody OperatorsListPostRequest request) { List responses = operatorService.getOperators(request.getPage(), request.getSize(), request.getCategories(), request.getOperatorName(), request.getIsStar()); int count = operatorService.getOperatorsCount(request.getCategories(), request.getOperatorName(), request.getIsStar()); int totalPages = (count + request.getSize() + 1) / request.getSize(); - return ResponseEntity.ok(Response.ok(PagedResponse.of(responses, request.getPage(), count, totalPages))); + return PagedResponse.of(responses, request.getPage(), count, totalPages); } @GetMapping("/{id}") - public ResponseEntity> operatorsIdGet(@PathVariable("id") String id) { - return ResponseEntity.ok(Response.ok(operatorService.getOperatorById(id))); + public OperatorDto operatorsIdGet(@PathVariable("id") String id) { + return operatorService.getOperatorById(id); } @PutMapping("/{id}") - public ResponseEntity> operatorsIdPut(@PathVariable("id") String id, + public OperatorDto operatorsIdPut(@PathVariable("id") String id, @RequestBody OperatorDto updateOperatorRequest) { - return ResponseEntity.ok(Response.ok(operatorService.updateOperator(id, updateOperatorRequest))); + return operatorService.updateOperator(id, updateOperatorRequest); } @PostMapping("/create") - public ResponseEntity> operatorsCreatePost(@RequestBody OperatorDto createOperatorRequest) { - return ResponseEntity.ok(Response.ok(operatorService.createOperator(createOperatorRequest))); + public OperatorDto operatorsCreatePost(@RequestBody OperatorDto createOperatorRequest) { + return operatorService.createOperator(createOperatorRequest); } @PostMapping("/upload") - public ResponseEntity> operatorsUploadPost(@RequestPart(value = "file") MultipartFile file, - @RequestParam(value = "description") String description) { - return ResponseEntity.ok(Response.ok(operatorService.uploadOperator(file, description))); + public OperatorDto operatorsUploadPost(@RequestBody UploadOperatorRequest request) { + return operatorService.uploadOperator(request.getFileName()); + } + + @PostMapping(value = "/upload/pre-upload", produces = MediaType.APPLICATION_JSON_VALUE) + public String preUpload() { + return operatorService.preUpload(); + } + + @PostMapping("/upload/chunk") + public void chunkUpload(@ModelAttribute UploadOperatorRequest request) { + operatorService.chunkUpload(request); + } + + @DeleteMapping("/{id}") + public void operatorDelete(@PathVariable("id") String id) { + operatorService.deleteOperator(id); } } diff --git a/deployment/docker/datamate/docker-compose.yml b/deployment/docker/datamate/docker-compose.yml index ba21622..d3f4f3d 100644 --- a/deployment/docker/datamate/docker-compose.yml +++ b/deployment/docker/datamate/docker-compose.yml @@ -9,6 +9,8 @@ services: - dataset_volume:/dataset - flow_volume:/flow - log_volume:/var/log/datamate + - operator-upload-volume:/operators/upload + - operator-runtime-volume:/operators/extract networks: [ datamate ] depends_on: - datamate-database @@ -71,6 +73,7 @@ services: - log_volume:/var/log/datamate - dataset_volume:/dataset - flow_volume:/flow + - operator-runtime-volume:/opt/runtime/datamate/ops/user networks: [ datamate ] # 4) mineru @@ -109,6 +112,10 @@ volumes: name: datamate-frontend-log-volume database_log_volume: name: datamate-database-log-volume + operator-upload-volume: + name: datamate-operator-upload-volume + operator-runtime-volume: + name: datamate-operator-runtime-volume networks: datamate: diff --git a/deployment/helm/datamate/charts/frontend/templates/configmap.yaml b/deployment/helm/datamate/charts/frontend/templates/configmap.yaml new file mode 100644 index 0000000..dd1c304 --- /dev/null +++ b/deployment/helm/datamate/charts/frontend/templates/configmap.yaml @@ -0,0 +1,28 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: datamate-nginx-conf +data: + backend.conf: | + server { + listen 80; + server_name 0.0.0.0; + + access_log /var/log/datamate/frontend/access.log main; + error_log /var/log/datamate/frontend/error.log notice; + + client_max_body_size 1024M; + + location /api/ { + proxy_pass http://datamate-backend:8080/api/; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + } + + location / { + root /opt/frontend; + try_files $uri $uri/ /index.html; + } + } + diff --git a/deployment/helm/datamate/charts/ray-cluster/values.yaml b/deployment/helm/datamate/charts/ray-cluster/values.yaml index 49278db..d316aa4 100644 --- a/deployment/helm/datamate/charts/ray-cluster/values.yaml +++ b/deployment/helm/datamate/charts/ray-cluster/values.yaml @@ -113,63 +113,11 @@ head: securityContext: {} # Optional: The following volumes/volumeMounts configurations are optional but recommended because # Ray writes logs to /tmp/ray/session_latests/logs instead of stdout/stderr. - volumes: - - name: log-volume - hostPath: - path: /opt/datamate/data/log - type: DirectoryOrCreate - - name: dataset-volume - hostPath: - path: /opt/datamate/data/dataset - type: DirectoryOrCreate - - name: flow-volume - hostPath: - path: /opt/datamate/data/flow - type: DirectoryOrCreate - volumeMounts: - - mountPath: /tmp/ray - name: log-volume - subPath: ray/head - - mountPath: /dataset - name: dataset-volume - - mountPath: /flow - name: flow-volume + volumes: [] + volumeMounts: [] # sidecarContainers specifies additional containers to attach to the Ray pod. # Follows standard K8s container spec. - sidecarContainers: - - name: runtime - image: datamate-runtime - imagePullPolicy: IfNotPresent - command: - - python - - /opt/runtime/datamate/operator_runtime.py - - --port - - "8081" - env: - - name: MYSQL_HOST - value: "datamate-database" - - name: MYSQL_PORT - value: "3306" - - name: MYSQL_USER - value: "root" - - name: MYSQL_PASSWORD - value: "password" - - name: MYSQL_DATABASE - value: "datamate" - - name: PDF_FORMATTER_BASE_URL - value: "http://datamate-mineru:9001" - ports: - - containerPort: 8081 - volumeMounts: - - mountPath: /tmp/ray - name: log-volume - subPath: ray/head - - mountPath: /var/log/datamate - name: log-volume - - mountPath: /dataset - name: dataset-volume - - mountPath: /flow - name: flow-volume + sidecarContainers: [] # See docs/guidance/pod-command.md for more details about how to specify # container command for head Pod. command: [] @@ -260,27 +208,8 @@ worker: securityContext: {} # Optional: The following volumes/volumeMounts configurations are optional but recommended because # Ray writes logs to /tmp/ray/session_latests/logs instead of stdout/stderr. - volumes: - - name: log-volume - hostPath: - path: /opt/datamate/data/log - type: DirectoryOrCreate - - name: dataset-volume - hostPath: - path: /opt/datamate/data/dataset - type: DirectoryOrCreate - - name: flow-volume - hostPath: - path: /opt/datamate/data/flow - type: DirectoryOrCreate - volumeMounts: - - mountPath: /tmp/ray - name: log-volume - subPath: ray/worker - - mountPath: /dataset - name: dataset-volume - - mountPath: /flow - name: flow-volume + volumes: [] + volumeMounts: [] # sidecarContainers specifies additional containers to attach to the Ray pod. # Follows standard K8s container spec. sidecarContainers: [] diff --git a/deployment/helm/datamate/values.yaml b/deployment/helm/datamate/values.yaml index 2bf849e..97aba1f 100644 --- a/deployment/helm/datamate/values.yaml +++ b/deployment/helm/datamate/values.yaml @@ -40,11 +40,17 @@ dataVolume: &dataVolume path: /opt/datamate/data/mysql type: DirectoryOrCreate +operatorVolume: &operatorVolume + name: operator-volume + hostPath: + path: /opt/datamate/data/operator + backend: volumes: - *datasetVolume - *flowVolume - *logVolume + - *operatorVolume volumeMounts: - name: dataset-volume mountPath: /dataset @@ -52,14 +58,22 @@ backend: mountPath: /flow - name: log-volume mountPath: /var/log/datamate + - name: operator-volume + mountPath: /operators frontend: volumes: - *logVolume + - name: datamate-nginx-conf + configMap: + name: datamate-nginx-conf volumeMounts: - name: log-volume mountPath: /var/log/datamate/frontend subPath: frontend + - mountPath: /etc/nginx/conf.d/backend.conf + name: datamate-nginx-conf + subPath: backend.conf database: volumes: @@ -81,3 +95,76 @@ database: mountPath: /docker-entrypoint-initdb.d - name: mysql-utf8-config mountPath: /etc/mysql/conf.d + +ray-cluster: + head: + volumes: + - *datasetVolume + - *flowVolume + - *logVolume + - *operatorVolume + volumeMounts: + - mountPath: /tmp/ray + name: log-volume + subPath: ray/head + - mountPath: /dataset + name: dataset-volume + - mountPath: /flow + name: flow-volume + - mountPath: /opt/runtime/datamate/ops/user + name: operator-volume + subPath: extract + sidecarContainers: + - name: runtime + image: datamate-runtime + imagePullPolicy: IfNotPresent + args: + - python + - /opt/runtime/datamate/operator_runtime.py + - --port + - "8081" + env: + - name: MYSQL_HOST + value: "datamate-database" + - name: MYSQL_PORT + value: "3306" + - name: MYSQL_USER + value: "root" + - name: MYSQL_PASSWORD + value: "password" + - name: MYSQL_DATABASE + value: "datamate" + - name: PDF_FORMATTER_BASE_URL + value: "http://datamate-mineru:9001" + ports: + - containerPort: 8081 + volumeMounts: + - mountPath: /tmp/ray + name: log-volume + subPath: ray/head + - mountPath: /var/log/datamate + name: log-volume + - mountPath: /dataset + name: dataset-volume + - mountPath: /flow + name: flow-volume + - mountPath: /opt/runtime/datamate/ops/user + name: operator-volume + subPath: extract + worker: + volumes: + - *datasetVolume + - *flowVolume + - *logVolume + - *operatorVolume + volumeMounts: + - mountPath: /tmp/ray + name: log-volume + subPath: ray/worker + - mountPath: /dataset + name: dataset-volume + - mountPath: /flow + name: flow-volume + - mountPath: /opt/runtime/datamate/ops/user + name: operator-volume + subPath: extract diff --git a/scripts/db/data-cleaning-init.sql b/scripts/db/data-cleaning-init.sql index 47a42ba..3ce98ef 100644 --- a/scripts/db/data-cleaning-init.sql +++ b/scripts/db/data-cleaning-init.sql @@ -3,7 +3,7 @@ USE datamate; CREATE TABLE IF NOT EXISTS t_clean_template ( id varchar(64) primary key not null unique, - name varchar(64), + name varchar(64) unique, description varchar(256), created_at timestamp default current_timestamp, updated_at timestamp default current_timestamp, @@ -13,7 +13,7 @@ CREATE TABLE IF NOT EXISTS t_clean_template CREATE TABLE IF NOT EXISTS t_clean_task ( id varchar(64) primary key, - name varchar(64), + name varchar(64) unique, description varchar(256), status varchar(256), src_dataset_id varchar(64), diff --git a/scripts/db/data-operator-init.sql b/scripts/db/data-operator-init.sql index b611759..0b51a42 100644 --- a/scripts/db/data-operator-init.sql +++ b/scripts/db/data-operator-init.sql @@ -3,7 +3,7 @@ USE datamate; CREATE TABLE IF NOT EXISTS t_operator ( id varchar(64) primary key, - name varchar(64), + name varchar(64) unique, description varchar(256), version varchar(256), inputs varchar(256), @@ -18,15 +18,17 @@ CREATE TABLE IF NOT EXISTS t_operator CREATE TABLE IF NOT EXISTS t_operator_category ( - id int primary key auto_increment, - name varchar(64), + id varchar(64) primary key, + name varchar(64) unique , + value varchar(64) unique , type varchar(64), - parent_id int + parent_id varchar(64), + created_at timestamp default current_timestamp ); CREATE TABLE IF NOT EXISTS t_operator_category_relation ( - category_id int, + category_id varchar(64), operator_id varchar(64), primary key (category_id, operator_id) ); @@ -41,7 +43,7 @@ SELECT o.id AS operator_id, runtime, settings, is_star, - created_at, + o.created_at AS created_at, updated_at, toc.id AS category_id, toc.name AS category_name @@ -49,21 +51,21 @@ FROM t_operator_category_relation tocr LEFT JOIN t_operator o ON tocr.operator_id = o.id LEFT JOIN t_operator_category toc ON tocr.category_id = toc.id; -INSERT IGNORE INTO t_operator_category(id, name, type, parent_id) -VALUES (1, '模态', 'predefined', 0), - (2, '语言', 'predefined', 0), - (3, '文本', 'predefined', 1), - (4, '图片', 'predefined', 1), - (5, '音频', 'predefined', 1), - (6, '视频', 'predefined', 1), - (7, '多模态', 'predefined', 1), - (8, 'Python', 'predefined', 2), - (9, 'Java', 'predefined', 2), - (10, '来源', 'predefined', 0), - (11, '系统预置', 'predefined', 10), - (12, '用户上传', 'predefined', 10), - (13, '收藏状态', 'predefined', 0), - (14, '已收藏', 'predefined', 13); +INSERT IGNORE INTO t_operator_category(id, name, value, type, parent_id) +VALUES ('64465bec-b46b-11f0-8291-00155d0e4808', '模态', 'modal', 'predefined', '0'), + ('873000a2-65b3-474b-8ccc-4813c08c76fb', '语言', 'language', 'predefined', '0'), + ('d8a5df7a-52a9-42c2-83c4-01062e60f597', '文本', 'text', 'predefined', '64465bec-b46b-11f0-8291-00155d0e4808'), + ('de36b61c-9e8a-4422-8c31-d30585c7100f', '图片', 'image', 'predefined', '64465bec-b46b-11f0-8291-00155d0e4808'), + ('42dd9392-73e4-458c-81ff-41751ada47b5', '音频', 'audio', 'predefined', '64465bec-b46b-11f0-8291-00155d0e4808'), + ('a233d584-73c8-4188-ad5d-8f7c8dda9c27', '视频', 'video', 'predefined', '64465bec-b46b-11f0-8291-00155d0e4808'), + ('4d7dbd77-0a92-44f3-9056-2cd62d4a71e4', '多模态', 'multimodal', 'predefined', '64465bec-b46b-11f0-8291-00155d0e4808'), + ('9eda9d5d-072b-499b-916c-797a0a8750e1', 'Python', 'python', 'predefined', '873000a2-65b3-474b-8ccc-4813c08c76fb'), + ('b5bfc548-8ef6-417c-b8a6-a4197c078249', 'Java', 'java', 'predefined', '873000a2-65b3-474b-8ccc-4813c08c76fb'), + ('16e2d99e-eafb-44fc-acd0-f35a2bad28f8', '来源', 'origin', 'predefined', '0'), + ('96a3b07a-3439-4557-a835-525faad60ca3', '系统预置', 'predefined', 'predefined', '16e2d99e-eafb-44fc-acd0-f35a2bad28f8'), + ('ec2cdd17-8b93-4a81-88c4-ac9e98d10757', '用户上传', 'customized', 'predefined', '16e2d99e-eafb-44fc-acd0-f35a2bad28f8'), + ('d8482257-7ee6-41a0-a914-8363c7db1db0', '收藏状态', 'starStatus', 'predefined', '0'), + ('79f2d35a-3b6c-4846-a892-2f2015f48f24', '已收藏', 'isStar', 'predefined', 'd8482257-7ee6-41a0-a914-8363c7db1db0'); INSERT IGNORE INTO t_operator (id, name, description, version, inputs, outputs, runtime, settings, file_name, is_star) @@ -116,7 +118,7 @@ INSERT IGNORE INTO t_operator_category_relation(category_id, operator_id) SELECT c.id, o.id FROM t_operator_category c CROSS JOIN t_operator o -WHERE c.id IN (3, 8, 11) +WHERE c.id IN ('d8a5df7a-52a9-42c2-83c4-01062e60f597', '9eda9d5d-072b-499b-916c-797a0a8750e1', '96a3b07a-3439-4557-a835-525faad60ca3') AND o.id IN ('TextFormatter', 'FileWithShortOrLongLengthFilter', 'FileWithHighRepeatPhraseRateFilter', 'FileWithHighRepeatWordRateFilter', 'FileWithHighSpecialCharRateFilter', 'FileWithManySensitiveWordsFilter', 'DuplicateFilesFilter', 'DuplicateSentencesFilter', 'AnonymizedCreditCardNumber', 'AnonymizedIdNumber', @@ -129,7 +131,7 @@ INSERT IGNORE INTO t_operator_category_relation(category_id, operator_id) SELECT c.id, o.id FROM t_operator_category c CROSS JOIN t_operator o -WHERE c.id IN (4, 8, 11) +WHERE c.id IN ('de36b61c-9e8a-4422-8c31-d30585c7100f', '9eda9d5d-072b-499b-916c-797a0a8750e1', '96a3b07a-3439-4557-a835-525faad60ca3') AND o.id IN ('ImgFormatter', 'ImgBlurredImagesCleaner', 'ImgBrightness', 'ImgContrast', 'ImgDenoise', 'ImgDuplicatedImagesCleaner', 'ImgPerspectiveTransformation', 'ImgResize', 'ImgSaturation', 'ImgShadowRemove', 'ImgSharpness', 'ImgSimilarImagesCleaner', 'ImgTypeUnify'); @@ -138,5 +140,5 @@ INSERT IGNORE INTO t_operator_category_relation(category_id, operator_id) SELECT c.id, o.id FROM t_operator_category c CROSS JOIN t_operator o -WHERE c.id IN (7, 8, 11) - AND o.id IN ('FileExporter', 'UnstructuredFormatter', 'ExternalPDFFormatter'); +WHERE c.id IN ('4d7dbd77-0a92-44f3-9056-2cd62d4a71e4', '9eda9d5d-072b-499b-916c-797a0a8750e1', '96a3b07a-3439-4557-a835-525faad60ca3') + AND o.id IN ('FileExporter', 'UnstructuredFormatter'); diff --git a/scripts/images/backend/Dockerfile b/scripts/images/backend/Dockerfile index b06aad2..3a1cc63 100644 --- a/scripts/images/backend/Dockerfile +++ b/scripts/images/backend/Dockerfile @@ -14,10 +14,9 @@ RUN cd DataX && \ FROM maven:3-amazoncorretto-21-debian AS builder COPY backend/ /opt/backend -COPY scripts/images/backend/settings.xml /opt/backend RUN cd /opt/backend && \ - mvn -U clean package -s settings.xml -Dmaven.test.skip=true + mvn -U clean package -Dmaven.test.skip=true FROM openjdk:21-jdk-slim @@ -25,7 +24,7 @@ FROM openjdk:21-jdk-slim RUN apt-get update && \ apt-get install -y vim wget curl nfs-common rsync python3 python3-pip python-is-python3 dos2unix && \ apt-get clean && \ - rm -rf /var/lib/apy/lists/* + rm -rf /var/lib/apt/lists/* COPY --from=builder /opt/backend/services/main-application/target/data-mate.jar /opt/backend/data-mate.jar COPY --from=datax-builder /DataX/target/datax/datax /opt/datax diff --git a/scripts/images/backend/settings.xml b/scripts/images/backend/settings.xml deleted file mode 100644 index 12d71f0..0000000 --- a/scripts/images/backend/settings.xml +++ /dev/null @@ -1,68 +0,0 @@ - - - - ${user.home}/.m2/repository - - - - - aliyun-maven - Aliyun Maven Repository - https://maven.aliyun.com/repository/public - central,jcenter,google,spring,spring-plugin,gradle-plugin - - - - - - - java21 - - true - 21 - - - 21 - 21 - UTF-8 - - - - - aliyun-repos - - - aliyun-public - Aliyun Public Repository - https://maven.aliyun.com/repository/public - - true - - - false - - - - - - aliyun-plugin - Aliyun Plugin Repository - https://maven.aliyun.com/repository/public - - true - - - false - - - - - - - - aliyun-repos - java21 - - diff --git a/scripts/images/runtime/Dockerfile b/scripts/images/runtime/Dockerfile index a9171e7..92900df 100644 --- a/scripts/images/runtime/Dockerfile +++ b/scripts/images/runtime/Dockerfile @@ -2,6 +2,8 @@ FROM python:3.11 COPY runtime/python-executor /opt/runtime COPY runtime/ops /opt/runtime/datamate/ops +COPY runtime/ops/user /opt/runtime/user +COPY scripts/images/runtime/start.sh /opt/runtime/start.sh ENV PYTHONPATH=/opt/runtime/datamate/ @@ -12,12 +14,13 @@ RUN apt update \ WORKDIR /opt/runtime -ENV HF_HUB_DISABLE_XET=1 - -RUN pip install -e . \ - && pip install -r /opt/runtime/datamate/ops/requirements.txt \ +RUN pip install -e . --trusted-host mirrors.huaweicloud.com -i https://mirrors.huaweicloud.com/repository/pypi/simple \ + && pip install -r /opt/runtime/datamate/ops/requirements.txt --trusted-host mirrors.huaweicloud.com -i https://mirrors.huaweicloud.com/repository/pypi/simple \ && pip cache purge -RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime +RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \ + && chmod +x /opt/runtime/start.sh EXPOSE 8081 + +ENTRYPOINT ["/opt/runtime/start.sh"] diff --git a/scripts/images/runtime/start.sh b/scripts/images/runtime/start.sh new file mode 100644 index 0000000..d454349 --- /dev/null +++ b/scripts/images/runtime/start.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +set -e + +cp -r /opt/runtime/user/* /opt/runtime/datamate/ops/user + +echo "Starting main application..." +exec "$@"