From 010ffceab5a93ca889726ec222b4d409e6ad24cd Mon Sep 17 00:00:00 2001
From: Jerry Yan <792602257@qq.com>
Date: Fri, 9 Jan 2026 13:49:18 +0800
Subject: [PATCH] glusterfs support
---
runtime/datax/glusterfsreader/pom.xml | 80 +++++++++++
.../src/main/assembly/package.xml | 35 +++++
.../glusterfsreader/GlusterfsMountUtil.java | 119 ++++++++++++++++
.../glusterfsreader/GlusterfsReader.java | 133 ++++++++++++++++++
.../src/main/resources/plugin.json | 6 +
.../main/resources/plugin_job_template.json | 10 ++
runtime/datax/glusterfswriter/pom.xml | 80 +++++++++++
.../src/main/assembly/package.xml | 35 +++++
.../glusterfswriter/GlusterfsMountUtil.java | 118 ++++++++++++++++
.../glusterfswriter/GlusterfsWriter.java | 123 ++++++++++++++++
.../writer/glusterfswriter/ShellUtil.java | 46 ++++++
.../src/main/resources/plugin.json | 6 +
.../main/resources/plugin_job_template.json | 10 ++
runtime/datax/package.xml | 14 ++
runtime/datax/pom.xml | 2 +
scripts/db/data-collection-init.sql | 3 +-
scripts/images/backend-python/Dockerfile | 2 +-
17 files changed, 820 insertions(+), 2 deletions(-)
create mode 100644 runtime/datax/glusterfsreader/pom.xml
create mode 100644 runtime/datax/glusterfsreader/src/main/assembly/package.xml
create mode 100644 runtime/datax/glusterfsreader/src/main/java/com/datamate/plugin/reader/glusterfsreader/GlusterfsMountUtil.java
create mode 100644 runtime/datax/glusterfsreader/src/main/java/com/datamate/plugin/reader/glusterfsreader/GlusterfsReader.java
create mode 100644 runtime/datax/glusterfsreader/src/main/resources/plugin.json
create mode 100644 runtime/datax/glusterfsreader/src/main/resources/plugin_job_template.json
create mode 100644 runtime/datax/glusterfswriter/pom.xml
create mode 100644 runtime/datax/glusterfswriter/src/main/assembly/package.xml
create mode 100644 runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/GlusterfsMountUtil.java
create mode 100644 runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/GlusterfsWriter.java
create mode 100644 runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/ShellUtil.java
create mode 100644 runtime/datax/glusterfswriter/src/main/resources/plugin.json
create mode 100644 runtime/datax/glusterfswriter/src/main/resources/plugin_job_template.json
diff --git a/runtime/datax/glusterfsreader/pom.xml b/runtime/datax/glusterfsreader/pom.xml
new file mode 100644
index 0000000..7022cf2
--- /dev/null
+++ b/runtime/datax/glusterfsreader/pom.xml
@@ -0,0 +1,80 @@
+
+
+
+ datax-all
+ com.alibaba.datax
+ 0.0.1-SNAPSHOT
+
+
+ 4.0.0
+ glusterfsreader
+ glusterfsreader
+ jar
+
+
+
+ com.alibaba.datax
+ datax-core
+ ${datax-project-version}
+
+
+ com.alibaba.datax
+ datax-common
+ ${datax-project-version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+ ch.qos.logback
+ logback-classic
+
+
+
+
+
+
+ src/main/java
+
+ **/*.properties
+
+
+
+
+
+
+ maven-compiler-plugin
+
+ ${jdk-version}
+ ${jdk-version}
+ ${project-sourceEncoding}
+
+
+
+
+ maven-assembly-plugin
+
+
+ src/main/assembly/package.xml
+
+ datax
+
+
+
+ dwzip
+ package
+
+ single
+
+
+
+
+
+
+
+
+
diff --git a/runtime/datax/glusterfsreader/src/main/assembly/package.xml b/runtime/datax/glusterfsreader/src/main/assembly/package.xml
new file mode 100644
index 0000000..e4a4cff
--- /dev/null
+++ b/runtime/datax/glusterfsreader/src/main/assembly/package.xml
@@ -0,0 +1,35 @@
+
+
+
+ dir
+
+ false
+
+
+ src/main/resources
+
+ plugin.json
+ plugin_job_template.json
+
+ plugin/reader/glusterfsreader
+
+
+ target/
+
+ glusterfsreader-0.0.1-SNAPSHOT.jar
+
+ plugin/reader/glusterfsreader
+
+
+
+
+
+ false
+ plugin/reader/glusterfsreader/libs
+ runtime
+
+
+
diff --git a/runtime/datax/glusterfsreader/src/main/java/com/datamate/plugin/reader/glusterfsreader/GlusterfsMountUtil.java b/runtime/datax/glusterfsreader/src/main/java/com/datamate/plugin/reader/glusterfsreader/GlusterfsMountUtil.java
new file mode 100644
index 0000000..9f784da
--- /dev/null
+++ b/runtime/datax/glusterfsreader/src/main/java/com/datamate/plugin/reader/glusterfsreader/GlusterfsMountUtil.java
@@ -0,0 +1,119 @@
+package com.datamate.plugin.reader.glusterfsreader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.file.DirectoryNotEmptyException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+
+/**
+ * GlusterFS 挂载工具类
+ * 通过系统命令 mount -t glusterfs 进行挂载
+ */
+public final class GlusterfsMountUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(GlusterfsMountUtil.class);
+
+ private GlusterfsMountUtil() {
+ }
+
+ /**
+ * 挂载 GlusterFS 卷
+ *
+ * @param remote 远程地址,格式: ip:/volume
+ * @param mountPoint 本地挂载点
+ * @param subPath 卷内子路径(可选,用于后续读取)
+ */
+ public static void mount(String remote, String mountPoint, String subPath) {
+ try {
+ Path mp = Paths.get(mountPoint);
+ if (isMounted(mountPoint)) {
+ throw new IOException("Already mounted: " + mountPoint);
+ }
+
+ Files.createDirectories(mp);
+
+ ProcessBuilder pb = new ProcessBuilder();
+ pb.command("mount", "-t", "glusterfs", remote, mountPoint);
+
+ LOG.info("Mounting GlusterFS: {}", pb.command());
+ pb.redirectErrorStream(true);
+ Process p = pb.start();
+ StringBuilder output = new StringBuilder();
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ output.append(line).append(System.lineSeparator());
+ }
+ }
+ int rc = p.waitFor();
+ if (rc != 0) {
+ throw new RuntimeException("GlusterFS mount failed, exit=" + rc + ", output: " + output);
+ }
+ LOG.info("GlusterFS mounted successfully: {} -> {}", remote, mountPoint);
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException("Failed to mount GlusterFS: " + remote, e);
+ }
+ }
+
+ /**
+ * 卸载挂载点
+ *
+ * @param mountPoint 挂载点路径
+ * @throws IOException 卸载失败
+ * @throws InterruptedException 进程等待中断
+ */
+ public static void umount(String mountPoint) throws IOException, InterruptedException {
+ if (!isMounted(mountPoint)) {
+ return;
+ }
+
+ ProcessBuilder pb = new ProcessBuilder("umount", "-l", mountPoint);
+ pb.redirectErrorStream(true);
+ Process p = pb.start();
+ StringBuilder output = new StringBuilder();
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ output.append(line).append(System.lineSeparator());
+ }
+ }
+ int rc = p.waitFor();
+ if (rc != 0) {
+ throw new RuntimeException("GlusterFS umount failed, exit=" + rc + ", output: " + output);
+ }
+
+ // 清理空目录
+ try {
+ Files.deleteIfExists(Paths.get(mountPoint));
+ } catch (DirectoryNotEmptyException ignore) {
+ // 目录非空,保留
+ }
+ LOG.info("GlusterFS unmounted: {}", mountPoint);
+ }
+
+ /**
+ * 判断挂载点是否已挂载
+ *
+ * @param mountPoint 挂载点路径
+ * @return true 表示已挂载
+ * @throws IOException 读取 /proc/mounts 失败
+ */
+ public static boolean isMounted(String mountPoint) throws IOException {
+ Path procMounts = Paths.get("/proc/mounts");
+ if (!Files.exists(procMounts)) {
+ throw new IOException("/proc/mounts not found");
+ }
+ String expected = mountPoint.trim();
+ List lines = Files.readAllLines(procMounts);
+ return lines.stream()
+ .map(l -> l.split("\\s+"))
+ .filter(a -> a.length >= 2)
+ .anyMatch(a -> a[1].equals(expected));
+ }
+}
diff --git a/runtime/datax/glusterfsreader/src/main/java/com/datamate/plugin/reader/glusterfsreader/GlusterfsReader.java b/runtime/datax/glusterfsreader/src/main/java/com/datamate/plugin/reader/glusterfsreader/GlusterfsReader.java
new file mode 100644
index 0000000..827e60b
--- /dev/null
+++ b/runtime/datax/glusterfsreader/src/main/java/com/datamate/plugin/reader/glusterfsreader/GlusterfsReader.java
@@ -0,0 +1,133 @@
+package com.datamate.plugin.reader.glusterfsreader;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.element.StringColumn;
+import com.alibaba.datax.common.plugin.RecordSender;
+import com.alibaba.datax.common.spi.Reader;
+import com.alibaba.datax.common.util.Configuration;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * GlusterFS Reader 插件
+ * 通过 mount -t glusterfs 挂载 GlusterFS 卷,读取文件列表
+ */
+public class GlusterfsReader extends Reader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GlusterfsReader.class);
+
+ public static class Job extends Reader.Job {
+ private Configuration jobConfig = null;
+ private String mountPoint;
+
+ @Override
+ public void init() {
+ this.jobConfig = super.getPluginJobConf();
+ }
+
+ @Override
+ public void prepare() {
+ this.mountPoint = "/dataset/mount/" + UUID.randomUUID();
+ this.jobConfig.set("mountPoint", this.mountPoint);
+
+ String ip = this.jobConfig.getString("ip");
+ String volume = this.jobConfig.getString("volume");
+ String subPath = this.jobConfig.getString("path", "");
+
+ // GlusterFS mount 格式: mount -t glusterfs ip:/volume /mountpoint
+ String remote = ip + ":/" + volume;
+ GlusterfsMountUtil.mount(remote, mountPoint, subPath);
+ }
+
+ @Override
+ public List split(int adviceNumber) {
+ return Collections.singletonList(this.jobConfig);
+ }
+
+ @Override
+ public void post() {
+ try {
+ GlusterfsMountUtil.umount(this.mountPoint);
+ new File(this.mountPoint).deleteOnExit();
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void destroy() {
+ }
+ }
+
+ public static class Task extends Reader.Task {
+
+ private Configuration jobConfig;
+ private String mountPoint;
+ private String subPath;
+ private Set fileType;
+ private List files;
+
+ @Override
+ public void init() {
+ this.jobConfig = super.getPluginJobConf();
+ this.mountPoint = this.jobConfig.getString("mountPoint");
+ this.subPath = this.jobConfig.getString("path", "");
+ this.fileType = new HashSet<>(this.jobConfig.getList("fileType", Collections.emptyList(), String.class));
+ this.files = this.jobConfig.getList("files", Collections.emptyList(), String.class);
+ }
+
+ @Override
+ public void startRead(RecordSender recordSender) {
+ String readPath = this.mountPoint;
+ if (StringUtils.isNotBlank(this.subPath)) {
+ readPath = this.mountPoint + "/" + this.subPath.replaceFirst("^/+", "");
+ }
+
+ try (Stream stream = Files.list(Paths.get(readPath))) {
+ List fileList = stream.filter(Files::isRegularFile)
+ .filter(file -> fileType.isEmpty() || fileType.contains(getFileSuffix(file)))
+ .map(path -> path.getFileName().toString())
+ .filter(fileName -> this.files.isEmpty() || this.files.contains(fileName))
+ .collect(Collectors.toList());
+
+ fileList.forEach(filePath -> {
+ Record record = recordSender.createRecord();
+ record.addColumn(new StringColumn(filePath));
+ recordSender.sendToWriter(record);
+ });
+ this.jobConfig.set("columnNumber", 1);
+ } catch (IOException e) {
+ LOG.error("Error reading files from GlusterFS mount point: {}", readPath, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private String getFileSuffix(Path path) {
+ String fileName = path.getFileName().toString();
+ int lastDotIndex = fileName.lastIndexOf('.');
+ if (lastDotIndex == -1 || lastDotIndex == fileName.length() - 1) {
+ return "";
+ }
+ return fileName.substring(lastDotIndex + 1);
+ }
+
+ @Override
+ public void destroy() {
+ }
+ }
+}
diff --git a/runtime/datax/glusterfsreader/src/main/resources/plugin.json b/runtime/datax/glusterfsreader/src/main/resources/plugin.json
new file mode 100644
index 0000000..ddcb0cb
--- /dev/null
+++ b/runtime/datax/glusterfsreader/src/main/resources/plugin.json
@@ -0,0 +1,6 @@
+{
+ "name": "glusterfsreader",
+ "class": "com.datamate.plugin.reader.glusterfsreader.GlusterfsReader",
+ "description": "read file list from GlusterFS distributed file system",
+ "developer": "datamate"
+}
diff --git a/runtime/datax/glusterfsreader/src/main/resources/plugin_job_template.json b/runtime/datax/glusterfsreader/src/main/resources/plugin_job_template.json
new file mode 100644
index 0000000..003c053
--- /dev/null
+++ b/runtime/datax/glusterfsreader/src/main/resources/plugin_job_template.json
@@ -0,0 +1,10 @@
+{
+ "name": "glusterfsreader",
+ "parameter": {
+ "ip": "",
+ "volume": "",
+ "path": "",
+ "fileType": [],
+ "files": []
+ }
+}
diff --git a/runtime/datax/glusterfswriter/pom.xml b/runtime/datax/glusterfswriter/pom.xml
new file mode 100644
index 0000000..ca3608d
--- /dev/null
+++ b/runtime/datax/glusterfswriter/pom.xml
@@ -0,0 +1,80 @@
+
+
+
+ datax-all
+ com.alibaba.datax
+ 0.0.1-SNAPSHOT
+
+
+ 4.0.0
+ glusterfswriter
+ glusterfswriter
+ jar
+
+
+
+ com.alibaba.datax
+ datax-core
+ ${datax-project-version}
+
+
+ com.alibaba.datax
+ datax-common
+ ${datax-project-version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+ ch.qos.logback
+ logback-classic
+
+
+
+
+
+
+ src/main/java
+
+ **/*.properties
+
+
+
+
+
+
+ maven-compiler-plugin
+
+ ${jdk-version}
+ ${jdk-version}
+ ${project-sourceEncoding}
+
+
+
+
+ maven-assembly-plugin
+
+
+ src/main/assembly/package.xml
+
+ datax
+
+
+
+ dwzip
+ package
+
+ single
+
+
+
+
+
+
+
+
+
diff --git a/runtime/datax/glusterfswriter/src/main/assembly/package.xml b/runtime/datax/glusterfswriter/src/main/assembly/package.xml
new file mode 100644
index 0000000..4856c0c
--- /dev/null
+++ b/runtime/datax/glusterfswriter/src/main/assembly/package.xml
@@ -0,0 +1,35 @@
+
+
+
+ dir
+
+ false
+
+
+ src/main/resources
+
+ plugin.json
+ plugin_job_template.json
+
+ plugin/writer/glusterfswriter
+
+
+ target/
+
+ glusterfswriter-0.0.1-SNAPSHOT.jar
+
+ plugin/writer/glusterfswriter
+
+
+
+
+
+ false
+ plugin/writer/glusterfswriter/libs
+ runtime
+
+
+
diff --git a/runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/GlusterfsMountUtil.java b/runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/GlusterfsMountUtil.java
new file mode 100644
index 0000000..98b36ac
--- /dev/null
+++ b/runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/GlusterfsMountUtil.java
@@ -0,0 +1,118 @@
+package com.datamate.plugin.writer.glusterfswriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.file.DirectoryNotEmptyException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+
+/**
+ * GlusterFS 挂载工具类
+ * 通过系统命令 mount -t glusterfs 进行挂载
+ */
+public final class GlusterfsMountUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(GlusterfsMountUtil.class);
+
+ private GlusterfsMountUtil() {
+ }
+
+ /**
+ * 挂载 GlusterFS 卷
+ *
+ * @param remote 远程地址,格式: ip:/volume
+ * @param mountPoint 本地挂载点
+ */
+ public static void mount(String remote, String mountPoint) {
+ try {
+ Path mp = Paths.get(mountPoint);
+ if (isMounted(mountPoint)) {
+ throw new IOException("Already mounted: " + mountPoint);
+ }
+
+ Files.createDirectories(mp);
+
+ ProcessBuilder pb = new ProcessBuilder();
+ pb.command("mount", "-t", "glusterfs", remote, mountPoint);
+
+ LOG.info("Mounting GlusterFS: {}", pb.command());
+ pb.redirectErrorStream(true);
+ Process p = pb.start();
+ StringBuilder output = new StringBuilder();
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ output.append(line).append(System.lineSeparator());
+ }
+ }
+ int rc = p.waitFor();
+ if (rc != 0) {
+ throw new RuntimeException("GlusterFS mount failed, exit=" + rc + ", output: " + output);
+ }
+ LOG.info("GlusterFS mounted successfully: {} -> {}", remote, mountPoint);
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException("Failed to mount GlusterFS: " + remote, e);
+ }
+ }
+
+ /**
+ * 卸载挂载点
+ *
+ * @param mountPoint 挂载点路径
+ * @throws IOException 卸载失败
+ * @throws InterruptedException 进程等待中断
+ */
+ public static void umount(String mountPoint) throws IOException, InterruptedException {
+ if (!isMounted(mountPoint)) {
+ return;
+ }
+
+ ProcessBuilder pb = new ProcessBuilder("umount", "-l", mountPoint);
+ pb.redirectErrorStream(true);
+ Process p = pb.start();
+ StringBuilder output = new StringBuilder();
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ output.append(line).append(System.lineSeparator());
+ }
+ }
+ int rc = p.waitFor();
+ if (rc != 0) {
+ throw new RuntimeException("GlusterFS umount failed, exit=" + rc + ", output: " + output);
+ }
+
+ // 清理空目录
+ try {
+ Files.deleteIfExists(Paths.get(mountPoint));
+ } catch (DirectoryNotEmptyException ignore) {
+ // 目录非空,保留
+ }
+ LOG.info("GlusterFS unmounted: {}", mountPoint);
+ }
+
+ /**
+ * 判断挂载点是否已挂载
+ *
+ * @param mountPoint 挂载点路径
+ * @return true 表示已挂载
+ * @throws IOException 读取 /proc/mounts 失败
+ */
+ public static boolean isMounted(String mountPoint) throws IOException {
+ Path procMounts = Paths.get("/proc/mounts");
+ if (!Files.exists(procMounts)) {
+ throw new IOException("/proc/mounts not found");
+ }
+ String expected = mountPoint.trim();
+ List lines = Files.readAllLines(procMounts);
+ return lines.stream()
+ .map(l -> l.split("\\s+"))
+ .filter(a -> a.length >= 2)
+ .anyMatch(a -> a[1].equals(expected));
+ }
+}
diff --git a/runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/GlusterfsWriter.java b/runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/GlusterfsWriter.java
new file mode 100644
index 0000000..61eba88
--- /dev/null
+++ b/runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/GlusterfsWriter.java
@@ -0,0 +1,123 @@
+package com.datamate.plugin.writer.glusterfswriter;
+
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.exception.CommonErrorCode;
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.plugin.RecordReceiver;
+import com.alibaba.datax.common.spi.Writer;
+import com.alibaba.datax.common.util.Configuration;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * GlusterFS Writer 插件
+ * 通过 mount -t glusterfs 挂载 GlusterFS 卷,将文件写入到目标位置
+ */
+public class GlusterfsWriter extends Writer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GlusterfsWriter.class);
+
+ public static class Job extends Writer.Job {
+ private Configuration jobConfig;
+ private String mountPoint;
+
+ @Override
+ public void init() {
+ this.jobConfig = super.getPluginJobConf();
+ }
+
+ @Override
+ public void prepare() {
+ this.mountPoint = "/dataset/mount/" + UUID.randomUUID();
+ this.jobConfig.set("mountPoint", this.mountPoint);
+ new File(this.mountPoint).mkdirs();
+
+ String ip = this.jobConfig.getString("ip");
+ String volume = this.jobConfig.getString("volume");
+
+ // GlusterFS mount 格式: mount -t glusterfs ip:/volume /mountpoint
+ String remote = ip + ":/" + volume;
+ GlusterfsMountUtil.mount(remote, mountPoint);
+
+ String destPath = this.jobConfig.getString("destPath");
+ new File(destPath).mkdirs();
+ }
+
+ @Override
+ public List split(int mandatoryNumber) {
+ return Collections.singletonList(this.jobConfig);
+ }
+
+ @Override
+ public void post() {
+ try {
+ GlusterfsMountUtil.umount(this.mountPoint);
+ new File(this.mountPoint).deleteOnExit();
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void destroy() {
+ }
+ }
+
+ public static class Task extends Writer.Task {
+ private Configuration jobConfig;
+ private String mountPoint;
+ private String subPath;
+ private String destPath;
+ private List files;
+
+ @Override
+ public void init() {
+ this.jobConfig = super.getPluginJobConf();
+ this.destPath = this.jobConfig.getString("destPath");
+ this.mountPoint = this.jobConfig.getString("mountPoint");
+ this.subPath = this.jobConfig.getString("path", "");
+ this.files = this.jobConfig.getList("files", Collections.emptyList(), String.class);
+ }
+
+ @Override
+ public void startWrite(RecordReceiver lineReceiver) {
+ String sourcePath = this.mountPoint;
+ if (StringUtils.isNotBlank(this.subPath)) {
+ sourcePath = this.mountPoint + "/" + this.subPath.replaceFirst("^/+", "");
+ }
+
+ try {
+ Record record;
+ while ((record = lineReceiver.getFromReader()) != null) {
+ String fileName = record.getColumn(0).asString();
+ if (StringUtils.isBlank(fileName)) {
+ continue;
+ }
+ if (!files.isEmpty() && !files.contains(fileName)) {
+ continue;
+ }
+
+ String filePath = sourcePath + "/" + fileName;
+ ShellUtil.runCommand("rsync", Arrays.asList("--no-links", "--chmod=754", "--", filePath,
+ this.destPath + "/" + fileName));
+ }
+ } catch (Exception e) {
+ LOG.error("Error writing files from GlusterFS: {}", e.getMessage(), e);
+ throw DataXException.asDataXException(CommonErrorCode.RUNTIME_ERROR, e);
+ }
+ }
+
+ @Override
+ public void destroy() {
+ }
+ }
+}
diff --git a/runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/ShellUtil.java b/runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/ShellUtil.java
new file mode 100644
index 0000000..e25d28d
--- /dev/null
+++ b/runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/ShellUtil.java
@@ -0,0 +1,46 @@
+package com.datamate.plugin.writer.glusterfswriter;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Shell 命令执行工具类
+ */
+public class ShellUtil {
+ /**
+ * 执行 shell 命令
+ *
+ * @param cmd 命令
+ * @param extraArgs 额外参数,可为空
+ * @return 命令完整输出(stdout + stderr)
+ * @throws Exception 如果命令返回非 0 或发生 IO 异常
+ */
+ public static String runCommand(String cmd, List extraArgs) throws Exception {
+ List commands = new ArrayList<>();
+ commands.add(cmd);
+ if (extraArgs != null && !extraArgs.isEmpty()) {
+ commands.addAll(extraArgs);
+ }
+
+ ProcessBuilder pb = new ProcessBuilder(commands);
+ pb.redirectErrorStream(true); // 合并 stdout & stderr
+ Process p = pb.start();
+
+ StringBuilder sb = new StringBuilder();
+ try (BufferedReader br = new BufferedReader(
+ new InputStreamReader(p.getInputStream()))) {
+ String line;
+ while ((line = br.readLine()) != null) {
+ sb.append(line).append(System.lineSeparator());
+ }
+ }
+
+ int exit = p.waitFor();
+ if (exit != 0) {
+ throw new RuntimeException("Command exited with code " + exit + System.lineSeparator() + sb);
+ }
+ return sb.toString();
+ }
+}
diff --git a/runtime/datax/glusterfswriter/src/main/resources/plugin.json b/runtime/datax/glusterfswriter/src/main/resources/plugin.json
new file mode 100644
index 0000000..c1b65b1
--- /dev/null
+++ b/runtime/datax/glusterfswriter/src/main/resources/plugin.json
@@ -0,0 +1,6 @@
+{
+ "name": "glusterfswriter",
+ "class": "com.datamate.plugin.writer.glusterfswriter.GlusterfsWriter",
+ "description": "write files to GlusterFS distributed file system",
+ "developer": "datamate"
+}
diff --git a/runtime/datax/glusterfswriter/src/main/resources/plugin_job_template.json b/runtime/datax/glusterfswriter/src/main/resources/plugin_job_template.json
new file mode 100644
index 0000000..7a7ee3e
--- /dev/null
+++ b/runtime/datax/glusterfswriter/src/main/resources/plugin_job_template.json
@@ -0,0 +1,10 @@
+{
+ "name": "glusterfswriter",
+ "parameter": {
+ "ip": "",
+ "volume": "",
+ "path": "",
+ "destPath": "",
+ "files": []
+ }
+}
diff --git a/runtime/datax/package.xml b/runtime/datax/package.xml
index faf1fd9..b2124c5 100644
--- a/runtime/datax/package.xml
+++ b/runtime/datax/package.xml
@@ -271,6 +271,13 @@
datax
+
+ glusterfsreader/target/datax/
+
+ **/*.*
+
+ datax
+
@@ -581,6 +588,13 @@
datax
+
+ glusterfswriter/target/datax/
+
+ **/*.*
+
+ datax
+
obsreader/target/datax/
diff --git a/runtime/datax/pom.xml b/runtime/datax/pom.xml
index 947b6e8..5436948 100644
--- a/runtime/datax/pom.xml
+++ b/runtime/datax/pom.xml
@@ -85,6 +85,7 @@
nfsreader
+ glusterfsreader
mysqlwriter
starrockswriter
@@ -133,6 +134,7 @@
nfswriter
+ glusterfswriter
plugin-rdbms-util
plugin-unstructured-storage-util
diff --git a/scripts/db/data-collection-init.sql b/scripts/db/data-collection-init.sql
index 6eba014..2aebb29 100644
--- a/scripts/db/data-collection-init.sql
+++ b/scripts/db/data-collection-init.sql
@@ -76,4 +76,5 @@ INSERT IGNORE INTO t_dc_collection_templates(id, name, description, source_type,
VALUES ('1', 'NAS归集模板', '将NAS存储上的文件归集到DataMate平台上。', 'nfsreader', 'nfsreader', 'nfswriter', 'nfswriter', '{"parameter": {"ip": {"name": "NAS地址","description": "NAS服务的地址,可以为IP或者域名。","type": "input", "required": true, "index": 1}, "path": {"name": "共享路径","description": "NAS服务的共享路径。","type": "input", "required": true, "index": 2}, "files": {"name": "文件列表","description": "指定文件列表进行归集。","type": "selectTag", "required": false, "index": 3}}, "reader": {}, "writer": {}}', True, 'system', 'system'),
('2', 'OBS归集模板', '将OBS存储上的文件归集到DataMate平台上。', 'obsreader', 'obsreader', 'obswriter', 'obswriter', '{"parameter": {"endpoint": {"name": "服务地址","description": "OBS的服务地址。","type": "input", "required": true, "index": 1},"bucket": {"name": "存储桶名称","description": "OBS存储桶名称。","type": "input", "required": true, "index": 2},"accessKey": {"name": "AK","description": "OBS访问密钥。","type": "input", "required": true, "index": 3},"secretKey": {"name": "SK","description": "OBS密钥。","type": "password", "required": true, "index": 4},"prefix": {"name": "匹配前缀","description": "按照匹配前缀去选中OBS中的文件进行归集。","type": "input", "required": true, "index": 5}}, "reader": {}, "writer": {}}', True, 'system', 'system'),
('3', 'MYSQL归集模板', '将MYSQL数据库中的数据以csv文件的形式归集到DataMate平台上。', 'mysqlreader', 'mysqlreader', 'txtfilewriter', 'txtfilewriter', '{"parameter": {}, "reader": {"username": {"name": "用户名","description": "数据库的用户名。","type": "input", "required": true, "index": 2}, "password": {"name": "密码","description": "数据库的密码。","type": "password", "required": true, "index": 3}, "connection": {"name": "数据库连接信息", "description": "数据库连接信息。", "type": "multipleList", "size": 1, "index": 1, "properties": {"jdbcUrl": {"type": "inputList", "name": "数据库连接", "description": "数据库连接url。", "required": true, "index": 1}, "querySql": {"type": "inputList", "name": "查询sql", "description": "输入符合语法的sql查询语句。", "required": true, "index": 2}}}}, "writer": {"header": {"name": "列名","description": "查询结果的列名,最终会体现为csv文件的表头。","type": "selectTag", "required": false}}}', True, 'system', 'system'),
- ('4', 'StarRocks归集模板', '将StarRocks中的数据以csv文件的形式归集到DataMate平台上。', 'starrocksreader', 'starrocksreader', 'txtfilewriter', 'txtfilewriter', '{"parameter": {}, "reader": {"username": {"name": "用户名","description": "数据库的用户名。","type": "input", "required": true, "index": 2}, "password": {"name": "密码","description": "数据库的密码。","type": "password", "required": true, "index": 3}, "connection": {"name": "数据库连接信息", "description": "数据库连接信息。", "type": "multipleList", "size": 1, "index": 1, "properties": {"jdbcUrl": {"type": "inputList", "name": "数据库连接", "description": "数据库连接url。", "required": true, "index": 1}, "querySql": {"type": "inputList", "name": "查询sql", "description": "输入符合语法的sql查询语句。", "required": true, "index": 2}}}}, "writer": {"header": {"name": "列名","description": "查询结果的列名,最终会体现为csv文件的表头。","type": "selectTag", "required": false}}}', True, 'system', 'system');
+ ('4', 'StarRocks归集模板', '将StarRocks中的数据以csv文件的形式归集到DataMate平台上。', 'starrocksreader', 'starrocksreader', 'txtfilewriter', 'txtfilewriter', '{"parameter": {}, "reader": {"username": {"name": "用户名","description": "数据库的用户名。","type": "input", "required": true, "index": 2}, "password": {"name": "密码","description": "数据库的密码。","type": "password", "required": true, "index": 3}, "connection": {"name": "数据库连接信息", "description": "数据库连接信息。", "type": "multipleList", "size": 1, "index": 1, "properties": {"jdbcUrl": {"type": "inputList", "name": "数据库连接", "description": "数据库连接url。", "required": true, "index": 1}, "querySql": {"type": "inputList", "name": "查询sql", "description": "输入符合语法的sql查询语句。", "required": true, "index": 2}}}}, "writer": {"header": {"name": "列名","description": "查询结果的列名,最终会体现为csv文件的表头。","type": "selectTag", "required": false}}}', True, 'system', 'system'),
+ ('5', 'GlusterFS归集模板', '将GlusterFS分布式文件系统上的文件归集到DataMate平台上。', 'glusterfsreader', 'glusterfsreader', 'glusterfswriter', 'glusterfswriter', '{"parameter": {"ip": {"name": "服务器地址","description": "GlusterFS服务器的IP地址或域名。","type": "input", "required": true, "index": 1}, "volume": {"name": "卷名称","description": "GlusterFS卷名称。","type": "input", "required": true, "index": 2}, "path": {"name": "子路径","description": "卷内的子目录路径(可选)。","type": "input", "required": false, "index": 3}, "files": {"name": "文件列表","description": "指定文件列表进行归集。","type": "selectTag", "required": false, "index": 4}}, "reader": {}, "writer": {}}', True, 'system', 'system');
diff --git a/scripts/images/backend-python/Dockerfile b/scripts/images/backend-python/Dockerfile
index 052cc53..aa9ffc3 100644
--- a/scripts/images/backend-python/Dockerfile
+++ b/scripts/images/backend-python/Dockerfile
@@ -40,7 +40,7 @@ RUN if [ -f /etc/apt/sources.list.d/debian.sources ]; then \
sed -i 's/deb.debian.org/mirrors.aliyun.com/g' /etc/apt/sources.list; \
fi && \
apt-get update && \
- apt-get install -y --no-install-recommends vim openjdk-21-jre nfs-common rsync && \
+ apt-get install -y --no-install-recommends vim openjdk-21-jre nfs-common glusterfs-client rsync && \
rm -rf /var/lib/apt/lists/*
ENV PYTHONDONTWRITEBYTECODE=1 \