feature: add mysql collection and starrocks collection (#222)

* fix: fix the path for backend-python imaage building

* feature: add mysql collection and starrocks collection

* feature: add mysql collection and starrocks collection

* fix: change the permission of those files which collected from nfs to 754

* fix: delete collected files, config files and log files while deleting collection task

* fix: add the collection task detail api

* fix: change the log of collecting for dataset

* fix: add collection task selecting while creating and updating dataset

* fix: set the umask value to 0022 for java process
This commit is contained in:
hefanli
2026-01-04 19:05:08 +08:00
committed by GitHub
parent 8d61eb28c3
commit ccfb84c034
13 changed files with 208 additions and 115 deletions

View File

@@ -4,14 +4,14 @@ on:
push:
branches: [ "main" ]
paths:
- 'scripts/images/datamate-python/**'
- 'scripts/images/backend-python/**'
- 'runtime/datamate-python/**'
- '.github/workflows/docker-image-backend-python.yml'
- '.github/workflows/docker-images-reusable.yml'
pull_request:
branches: [ "main" ]
paths:
- 'scripts/images/datamate-python/**'
- 'scripts/images/backend-python/**'
- 'runtime/datamate-python/**'
- '.github/workflows/docker-image-backend-python.yml'
- '.github/workflows/docker-images-reusable.yml'

View File

@@ -237,8 +237,8 @@ public class DatasetApplicationService {
if (CollectionUtils.isEmpty(filePaths)) {
return;
}
log.info("Starting file scan, total files: {}", filePaths.size());
datasetFileApplicationService.copyFilesToDatasetDir(datasetId, new CopyFilesRequest(filePaths));
log.info("Success file scan, total files: {}", filePaths.size());
} catch (Exception e) {
log.error("处理数据源文件扫描失败,数据集ID: {}, 数据源ID: {}", datasetId, dataSourceId, e);
}

View File

@@ -81,19 +81,7 @@ export default function CollectionTaskCreate() {
const handleSubmit = async () => {
try {
await form.validateFields();
const values = form.getFieldsValue(true);
const payload = {
name: values.name,
description: values.description,
syncMode: values.syncMode,
scheduleExpression: values.scheduleExpression,
timeoutSeconds: values.timeoutSeconds,
templateId: values.templateId,
config: values.config,
};
await createTaskUsingPost(payload);
await createTaskUsingPost(newTask);
message.success("任务创建成功");
navigate("/data/collection");
} catch (error) {
@@ -104,88 +92,108 @@ export default function CollectionTaskCreate() {
const selectedTemplate = templates.find((t) => t.id === selectedTemplateId);
const renderTemplateFields = (
section: "parameter" | "reader" | "writer",
section: any[],
defs: Record<string, TemplateFieldDef> | undefined
) => {
if (!defs || typeof defs !== "object") return null;
let items_ = []
const items = Object.entries(defs).map(([key, def]) => {
Object.entries(defs).sort(([key1, def1], [key2, def2]) => {
const def1Order = def1?.index || 0;
const def2Order = def2?.index || 0;
return def1Order - def2Order;
}).forEach(([key, def]) => {
const label = def?.name || key;
const description = def?.description;
const fieldType = (def?.type || "input").toLowerCase();
const required = def?.required !== false;
const rules = required
? [{ required: true, message: `请输入${label}` }]
: undefined;
const name = section.concat(key)
if (fieldType === "password") {
return (
switch (fieldType) {
case "password":
items_.push((
<Form.Item
key={`${section}.${key}`}
name={["config", section, key]}
name={name}
label={label}
tooltip={description}
rules={rules}
>
<Input.Password placeholder={description || `请输入${label}`} />
</Form.Item>
);
}
if (fieldType === "textarea") {
return (
));
break;
case "selecttag":
items_.push((
<Form.Item
key={`${section}.${key}`}
name={["config", section, key]}
name={name}
label={label}
tooltip={description}
rules={rules}
className="md:col-span-2"
>
<TextArea rows={4} placeholder={description || `请输入${label}`} />
<Select placeholder={description || `请输入${label}`} mode="tags" />
</Form.Item>
);
}
if (fieldType === "select") {
));
break;
case "select":
const options = (def?.options || []).map((opt: any) => {
if (typeof opt === "string" || typeof opt === "number") {
return { label: String(opt), value: opt };
}
return { label: opt?.label ?? String(opt?.value), value: opt?.value };
});
return (
items_.push((
<Form.Item
key={`${section}.${key}`}
name={["config", section, key]}
name={name}
label={label}
tooltip={description}
rules={rules}
>
<Select placeholder={description || `请选择${label}`} options={options} />
</Form.Item>
);
}
return (
));
break;
case "multiple":
const itemsMultiple = renderTemplateFields(name, def?.properties)
items_.push(itemsMultiple)
break;
case "multiplelist":
const realName = name.concat(0)
const itemsMultipleList = renderTemplateFields(realName, def?.properties)
items_.push(itemsMultipleList)
break;
case "inputlist":
items_.push((
<Form.Item
key={`${section}.${key}`}
name={["config", section, key]}
name={name.concat(0)}
label={label}
tooltip={description}
rules={rules}
>
<Input placeholder={description || `请输入${label}`} />
</Form.Item>
);
});
));
break;
default:
items_.push((
<Form.Item
key={`${section}.${key}`}
name={name}
label={label}
tooltip={description}
rules={rules}
>
<Input placeholder={description || `请输入${label}`} />
</Form.Item>
));
}
})
return (
<div className="grid grid-cols-1 md:grid-cols-2 gap-x-4 gap-y-2">
{items}
</div>
);
return items_
};
const getPropertyCountSafe = (obj: any) => {
@@ -342,10 +350,12 @@ export default function CollectionTaskCreate() {
<h3 className="font-medium text-gray-900 pt-2 mb-2">
</h3>
<div className="grid grid-cols-1 md:grid-cols-2 gap-x-4 gap-y-2">
{renderTemplateFields(
"parameter",
["config", "parameter"],
selectedTemplate.templateContent?.parameter as Record<string, TemplateFieldDef>
)}
</div>
</>
): null}
@@ -354,10 +364,12 @@ export default function CollectionTaskCreate() {
<h3 className="font-medium text-gray-900 pt-2 mb-2">
</h3>
<div className="grid grid-cols-1 md:grid-cols-2 gap-x-4 gap-y-2">
{renderTemplateFields(
"reader",
["config", "reader"],
selectedTemplate.templateContent?.reader as Record<string, TemplateFieldDef>
)}
</div>
</>
) : null}
@@ -366,10 +378,12 @@ export default function CollectionTaskCreate() {
<h3 className="font-medium text-gray-900 pt-2 mb-2">
</h3>
<div className="grid grid-cols-1 md:grid-cols-2 gap-x-4 gap-y-2">
{renderTemplateFields(
"writer",
["config", "writer"],
selectedTemplate.templateContent?.writer as Record<string, TemplateFieldDef>
)}
</div>
</>
) : null}
</>

View File

@@ -3,6 +3,7 @@ import { Input, Select, Form } from "antd";
import { datasetTypes } from "../../dataset.const";
import { useEffect, useState } from "react";
import { queryDatasetTagsUsingGet } from "../../dataset.api";
import {queryTasksUsingGet} from "@/pages/DataCollection/collection.apis.ts";
export default function BasicInformation({
data,
@@ -20,6 +21,7 @@ export default function BasicInformation({
options: { label: JSX.Element; value: string }[];
}[]
>([]);
const [collectionOptions, setCollectionOptions] = useState([]);
// 获取标签
const fetchTags = async () => {
@@ -36,8 +38,23 @@ export default function BasicInformation({
}
};
// 获取归集任务
const fetchCollectionTasks = async () => {
try {
const res = await queryTasksUsingGet({ page: 0, size: 100 });
const options = res.data.content.map((task: any) => ({
label: task.name,
value: task.id,
}));
setCollectionOptions(options);
} catch (error) {
console.error("Error fetching collection tasks:", error);
}
};
useEffect(() => {
fetchTags();
fetchCollectionTasks();
}, []);
return (
<>
@@ -78,6 +95,11 @@ export default function BasicInformation({
/>
</Form.Item>
)}
{!hidden.includes("dataSource") && (
<Form.Item name="dataSource" label="关联归集任务">
<Select placeholder="请选择归集任务" options={collectionOptions} />
</Form.Item>
)}
</>
);
}

View File

@@ -13,9 +13,10 @@ from app.module.shared.schema import TaskStatus
logger = get_logger(__name__)
class DataxClient:
def __init__(self, task: CollectionTask, execution: TaskExecution):
def __init__(self, task: CollectionTask, execution: TaskExecution, template: CollectionTemplate):
self.execution = execution
self.task = task
self.template = template
self.config_file_path = f"/flow/data-collection/{task.id}/config.json"
self.python_path = "python"
self.datax_main = "/opt/datax/bin/datax.py"
@@ -53,10 +54,21 @@ class DataxClient:
**(task_config.parameter if task_config.parameter else {}),
**(task_config.reader if task_config.reader else {})
}
dest_parameter = {}
if template.target_type == "txtfilewriter":
dest_parameter = {
"path": target_path,
"fileName": "collection_result",
"writeMode": "truncate"
}
elif template.target_type == "nfswriter" or template.target_type == "obswriter":
dest_parameter = {
"destPath": target_path
}
writer_parameter = {
**(task_config.parameter if task_config.parameter else {}),
**(task_config.writer if task_config.writer else {}),
"destPath": target_path
**dest_parameter
}
# 生成任务运行配置
job_config = {
@@ -128,6 +140,7 @@ class DataxClient:
logger.info(f"DataX 任务执行成功: {self.execution.id}")
logger.info(f"执行耗时: {self.execution.duration_seconds:.2f}")
self.execution.status = TaskStatus.COMPLETED.name
self.rename_collection_result()
else:
self.execution.error_message = self.execution.error_message or f"DataX 任务执行失败,退出码: {exit_code}"
self.execution.status = TaskStatus.FAILED.name
@@ -141,6 +154,23 @@ class DataxClient:
if self.task.sync_mode == SyncMode.ONCE:
self.task.status = self.execution.status
def rename_collection_result(self):
if self.template.target_type != "txtfilewriter":
return
target_path = Path(self.task.target_path)
if not target_path.exists():
logger.warning(f"Target path does not exist: {target_path}")
return
# If it's a directory, find all files without extensions
for file_path in target_path.iterdir():
if file_path.is_file() and not file_path.suffix:
new_path = file_path.with_suffix('.csv')
try:
file_path.rename(new_path)
logger.info(f"Renamed {file_path} to {new_path}")
except Exception as e:
logger.error(f"Failed to rename {file_path} to {new_path}: {str(e)}")
def _run_process(self, cmd: list[str], log_f) -> int:
# 启动进程
process = subprocess.Popen(

View File

@@ -1,5 +1,7 @@
import math
import uuid
import shutil
import os
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query
@@ -138,6 +140,13 @@ async def delete_collection_tasks(
.where(TaskExecution.task_id == task_id)
)
target_path = f"/dataset/local/{task_id}"
if os.path.exists(target_path):
shutil.rmtree(target_path)
job_path = f"/flow/data-collection/{task_id}"
if os.path.exists(job_path):
shutil.rmtree(job_path)
# 删除任务
await db.delete(task)
await db.commit()
@@ -155,3 +164,29 @@ async def delete_collection_tasks(
await db.rollback()
logger.error(f"Failed to delete collection task: {str(e)}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/{task_id}", response_model=StandardResponse[CollectionTaskBase])
async def get_task(
task_id: str,
db: AsyncSession = Depends(get_db)
):
"""获取归集任务详情"""
try:
# Query the task by ID
task = await db.get(CollectionTask, task_id)
if not task:
raise HTTPException(
status_code=404,
detail=f"Task with ID {task_id} not found"
)
return StandardResponse(
code=200,
message="Success",
data=converter_to_response(task)
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get task {task_id}: {str(e)}", e)
raise HTTPException(status_code=500, detail="Internal server error")

View File

@@ -58,6 +58,7 @@ class CollectionTaskService:
logger.error(f"task {task_id} not exist")
return
template = await session.execute(select(CollectionTemplate).where(CollectionTemplate.id == task.template_id))
template = template.scalar_one_or_none()
if not template:
logger.error(f"template {task.template_name} not exist")
return
@@ -65,6 +66,6 @@ class CollectionTaskService:
session.add(task_execution)
await session.commit()
await asyncio.to_thread(
DataxClient(execution=task_execution, task=task).run_datax_job
DataxClient(execution=task_execution, task=task, template=template).run_datax_job
)
await session.commit()

View File

@@ -69,12 +69,14 @@ public class NfsReader extends Reader {
private Configuration jobConfig;
private String mountPoint;
private Set<String> fileType;
private List<String> files;
@Override
public void init() {
this.jobConfig = super.getPluginJobConf();
this.mountPoint = this.jobConfig.getString("mountPoint");
this.fileType = new HashSet<>(this.jobConfig.getList("fileType", Collections.emptyList(), String.class));
this.files = this.jobConfig.getList("files", Collections.emptyList(), String.class);
}
@Override
@@ -83,6 +85,7 @@ public class NfsReader extends Reader {
List<String> files = stream.filter(Files::isRegularFile)
.filter(file -> fileType.isEmpty() || fileType.contains(getFileSuffix(file)))
.map(path -> path.getFileName().toString())
.filter(fileName -> this.files.isEmpty() || this.files.contains(fileName))
.collect(Collectors.toList());
files.forEach(filePath -> {
Record record = recordSender.createRecord();

View File

@@ -85,7 +85,7 @@ public class NfsWriter extends Writer {
}
String filePath = this.mountPoint + "/" + fileName;
ShellUtil.runCommand("rsync", Arrays.asList("--no-links", "--chmod=750", "--", filePath,
ShellUtil.runCommand("rsync", Arrays.asList("--no-links", "--chmod=754", "--", filePath,
this.destPath + "/" + fileName));
}
} catch (Exception e) {

View File

@@ -73,5 +73,7 @@ CREATE TABLE t_dc_collection_templates (
) COMMENT='数据归集模板配置表';
INSERT IGNORE INTO t_dc_collection_templates(id, name, description, source_type, source_name, target_type, target_name, template_content, built_in, created_by, updated_by)
VALUES ('1', 'NAS归集模板', '将NAS存储上的文件归集到DataMate平台上。', 'nfsreader', 'nfsreader', 'nfswriter', 'nfswriter', '{"parameter": {}, "reader": {}, "writer": {}}', True, 'system', 'system'),
('2', 'OBS归集模板', '将OBS存储上的文件归集到DataMate平台上。', 'obsreader', 'obsreader', 'obswriter', 'obswriter', '{"parameter": {"endpoint": {"name": "服务地址","description": "OBS的服务地址。","type": "input"},"bucket": {"name": "存储桶名称","description": "OBS存储桶名称。","type": "input"},"accessKey": {"name": "访问密钥","description": "OBS访问密钥。","type": "input"},"secretKey": {"name": "密钥","description": "OBS密钥。","type": "input"},"prefix": {"name": "匹配前缀","description": "按照匹配前缀去选中OBS中的文件进行归集。","type": "input"}}, "reader": {}, "writer": {}}', True, 'system', 'system');
VALUES ('1', 'NAS归集模板', '将NAS存储上的文件归集到DataMate平台上。', 'nfsreader', 'nfsreader', 'nfswriter', 'nfswriter', '{"parameter": {"ip": {"name": "NAS地址","description": "NAS服务的地址,可以为IP或者域名。","type": "input", "required": true, "index": 1}, "path": {"name": "共享路径","description": "NAS服务的共享路径。","type": "input", "required": true, "index": 2}, "files": {"name": "文件列表","description": "指定文件列表进行归集。","type": "selectTag", "required": false, "index": 3}}, "reader": {}, "writer": {}}', True, 'system', 'system'),
('2', 'OBS归集模板', '将OBS存储上的文件归集到DataMate平台上。', 'obsreader', 'obsreader', 'obswriter', 'obswriter', '{"parameter": {"endpoint": {"name": "服务地址","description": "OBS的服务地址。","type": "input", "required": true, "index": 1},"bucket": {"name": "存储桶名称","description": "OBS存储桶名称。","type": "input", "required": true, "index": 2},"accessKey": {"name": "AK","description": "OBS访问密钥。","type": "input", "required": true, "index": 3},"secretKey": {"name": "SK","description": "OBS密钥。","type": "password", "required": true, "index": 4},"prefix": {"name": "匹配前缀","description": "按照匹配前缀去选中OBS中的文件进行归集。","type": "input", "required": true, "index": 5}}, "reader": {}, "writer": {}}', True, 'system', 'system'),
('3', 'MYSQL归集模板', '将MYSQL数据库中的数据以csv文件的形式归集到DataMate平台上。', 'mysqlreader', 'mysqlreader', 'txtfilewriter', 'txtfilewriter', '{"parameter": {}, "reader": {"username": {"name": "用户名","description": "数据库的用户名。","type": "input", "required": true, "index": 2}, "password": {"name": "密码","description": "数据库的密码。","type": "password", "required": true, "index": 3}, "connection": {"name": "数据库连接信息", "description": "数据库连接信息。", "type": "multipleList", "size": 1, "index": 1, "properties": {"jdbcUrl": {"type": "inputList", "name": "数据库连接", "description": "数据库连接url。", "required": true, "index": 1}, "querySql": {"type": "inputList", "name": "查询sql", "description": "输入符合语法的sql查询语句。", "required": true, "index": 2}}}}, "writer": {"header": {"name": "列名","description": "查询结果的列名,最终会体现为csv文件的表头。","type": "selectTag", "required": false}}}', True, 'system', 'system'),
('4', 'StarRocks归集模板', '将StarRocks中的数据以csv文件的形式归集到DataMate平台上。', 'starrocksreader', 'starrocksreader', 'txtfilewriter', 'txtfilewriter', '{"parameter": {}, "reader": {"username": {"name": "用户名","description": "数据库的用户名。","type": "input", "required": true, "index": 2}, "password": {"name": "密码","description": "数据库的密码。","type": "password", "required": true, "index": 3}, "connection": {"name": "数据库连接信息", "description": "数据库连接信息。", "type": "multipleList", "size": 1, "index": 1, "properties": {"jdbcUrl": {"type": "inputList", "name": "数据库连接", "description": "数据库连接url。", "required": true, "index": 1}, "querySql": {"type": "inputList", "name": "查询sql", "description": "输入符合语法的sql查询语句。", "required": true, "index": 2}}}}, "writer": {"header": {"name": "列名","description": "查询结果的列名,最终会体现为csv文件的表头。","type": "selectTag", "required": false}}}', True, 'system', 'system');

View File

@@ -17,19 +17,18 @@ FROM python:3.12-slim
# Note: to use the cache mount syntax you must build with BuildKit enabled:
# DOCKER_BUILDKIT=1 docker build . -f scripts/images/datamate-python/Dockerfile -t datamate-backend-python
RUN apt-get update \
&& apt-get install -y --no-install-recommends openjdk-21-jre-headless \
&& rm -rf /var/lib/apt/lists/*
RUN apt-get update && \
apt-get install -y --no-install-recommends vim openjdk-21-jre nfs-common rsync && \
rm -rf /var/lib/apt/lists/*
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
# Poetry configuration
POETRY_VERSION=2.2.1 \
POETRY_NO_INTERACTION=1 \
POETRY_VIRTUALENVS_CREATE=false \
POETRY_CACHE_DIR=/tmp/poetry_cache
ENV JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64
ENV JAVA_HOME=/usr/lib/jvm/java-21-openjdk
ENV PATH="/root/.local/bin:$JAVA_HOME/bin:$PATH"
@@ -42,6 +41,7 @@ RUN --mount=type=cache,target=/root/.cache/pip \
&& pipx install "poetry==$POETRY_VERSION"
COPY --from=datax-builder /DataX/target/datax/datax /opt/datax
RUN cp /opt/datax/plugin/reader/mysqlreader/libs/mysql* /opt/datax/plugin/reader/starrocksreader/libs/
# Copy only dependency files first (leverages layer caching when dependencies don't change)
COPY runtime/datamate-python/pyproject.toml runtime/datamate-python/poetry.lock* /app/

View File

@@ -1,16 +1,3 @@
FROM maven:3-eclipse-temurin-8 AS datax-builder
RUN apt-get update && \
apt-get install -y git && \
git clone https://github.com/alibaba/DataX.git
COPY runtime/datax/ DataX/
RUN cd DataX && \
sed -i "s/com.mysql.jdbc.Driver/com.mysql.cj.jdbc.Driver/g" \
plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java && \
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
FROM maven:3-eclipse-temurin-21 AS builder
COPY backend/ /opt/backend
@@ -22,12 +9,11 @@ RUN cd /opt/backend/services && \
FROM eclipse-temurin:21-jdk
RUN apt-get update && \
apt-get install -y vim wget curl nfs-common rsync python3 python3-pip python-is-python3 dos2unix && \
apt-get install -y vim wget curl rsync python3 python3-pip python-is-python3 dos2unix && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
COPY --from=builder /opt/backend/services/main-application/target/datamate.jar /opt/backend/datamate.jar
COPY --from=datax-builder /DataX/target/datax/datax /opt/datax
COPY scripts/images/backend/start.sh /opt/backend/start.sh

View File

@@ -2,7 +2,7 @@
set -e
rpcbind
umask 0022
echo "Starting main application..."
exec "$@"