diff --git a/runtime/datax/glusterfsreader/pom.xml b/runtime/datax/glusterfsreader/pom.xml new file mode 100644 index 0000000..7022cf2 --- /dev/null +++ b/runtime/datax/glusterfsreader/pom.xml @@ -0,0 +1,80 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + + 4.0.0 + glusterfsreader + glusterfsreader + jar + + + + com.alibaba.datax + datax-core + ${datax-project-version} + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + org.slf4j + slf4j-api + + + + ch.qos.logback + logback-classic + + + + + + + src/main/java + + **/*.properties + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + + + diff --git a/runtime/datax/glusterfsreader/src/main/assembly/package.xml b/runtime/datax/glusterfsreader/src/main/assembly/package.xml new file mode 100644 index 0000000..e4a4cff --- /dev/null +++ b/runtime/datax/glusterfsreader/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/reader/glusterfsreader + + + target/ + + glusterfsreader-0.0.1-SNAPSHOT.jar + + plugin/reader/glusterfsreader + + + + + + false + plugin/reader/glusterfsreader/libs + runtime + + + diff --git a/runtime/datax/glusterfsreader/src/main/java/com/datamate/plugin/reader/glusterfsreader/GlusterfsMountUtil.java b/runtime/datax/glusterfsreader/src/main/java/com/datamate/plugin/reader/glusterfsreader/GlusterfsMountUtil.java new file mode 100644 index 0000000..9f784da --- /dev/null +++ b/runtime/datax/glusterfsreader/src/main/java/com/datamate/plugin/reader/glusterfsreader/GlusterfsMountUtil.java @@ -0,0 +1,119 @@ +package com.datamate.plugin.reader.glusterfsreader; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.file.DirectoryNotEmptyException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; + +/** + * GlusterFS 挂载工具类 + * 通过系统命令 mount -t glusterfs 进行挂载 + */ +public final class GlusterfsMountUtil { + private static final Logger LOG = LoggerFactory.getLogger(GlusterfsMountUtil.class); + + private GlusterfsMountUtil() { + } + + /** + * 挂载 GlusterFS 卷 + * + * @param remote 远程地址,格式: ip:/volume + * @param mountPoint 本地挂载点 + * @param subPath 卷内子路径(可选,用于后续读取) + */ + public static void mount(String remote, String mountPoint, String subPath) { + try { + Path mp = Paths.get(mountPoint); + if (isMounted(mountPoint)) { + throw new IOException("Already mounted: " + mountPoint); + } + + Files.createDirectories(mp); + + ProcessBuilder pb = new ProcessBuilder(); + pb.command("mount", "-t", "glusterfs", remote, mountPoint); + + LOG.info("Mounting GlusterFS: {}", pb.command()); + pb.redirectErrorStream(true); + Process p = pb.start(); + StringBuilder output = new StringBuilder(); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()))) { + String line; + while ((line = reader.readLine()) != null) { + output.append(line).append(System.lineSeparator()); + } + } + int rc = p.waitFor(); + if (rc != 0) { + throw new RuntimeException("GlusterFS mount failed, exit=" + rc + ", output: " + output); + } + LOG.info("GlusterFS mounted successfully: {} -> {}", remote, mountPoint); + } catch (IOException | InterruptedException e) { + throw new RuntimeException("Failed to mount GlusterFS: " + remote, e); + } + } + + /** + * 卸载挂载点 + * + * @param mountPoint 挂载点路径 + * @throws IOException 卸载失败 + * @throws InterruptedException 进程等待中断 + */ + public static void umount(String mountPoint) throws IOException, InterruptedException { + if (!isMounted(mountPoint)) { + return; + } + + ProcessBuilder pb = new ProcessBuilder("umount", "-l", mountPoint); + pb.redirectErrorStream(true); + Process p = pb.start(); + StringBuilder output = new StringBuilder(); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()))) { + String line; + while ((line = reader.readLine()) != null) { + output.append(line).append(System.lineSeparator()); + } + } + int rc = p.waitFor(); + if (rc != 0) { + throw new RuntimeException("GlusterFS umount failed, exit=" + rc + ", output: " + output); + } + + // 清理空目录 + try { + Files.deleteIfExists(Paths.get(mountPoint)); + } catch (DirectoryNotEmptyException ignore) { + // 目录非空,保留 + } + LOG.info("GlusterFS unmounted: {}", mountPoint); + } + + /** + * 判断挂载点是否已挂载 + * + * @param mountPoint 挂载点路径 + * @return true 表示已挂载 + * @throws IOException 读取 /proc/mounts 失败 + */ + public static boolean isMounted(String mountPoint) throws IOException { + Path procMounts = Paths.get("/proc/mounts"); + if (!Files.exists(procMounts)) { + throw new IOException("/proc/mounts not found"); + } + String expected = mountPoint.trim(); + List lines = Files.readAllLines(procMounts); + return lines.stream() + .map(l -> l.split("\\s+")) + .filter(a -> a.length >= 2) + .anyMatch(a -> a[1].equals(expected)); + } +} diff --git a/runtime/datax/glusterfsreader/src/main/java/com/datamate/plugin/reader/glusterfsreader/GlusterfsReader.java b/runtime/datax/glusterfsreader/src/main/java/com/datamate/plugin/reader/glusterfsreader/GlusterfsReader.java new file mode 100644 index 0000000..827e60b --- /dev/null +++ b/runtime/datax/glusterfsreader/src/main/java/com/datamate/plugin/reader/glusterfsreader/GlusterfsReader.java @@ -0,0 +1,133 @@ +package com.datamate.plugin.reader.glusterfsreader; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.element.StringColumn; +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.spi.Reader; +import com.alibaba.datax.common.util.Configuration; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * GlusterFS Reader 插件 + * 通过 mount -t glusterfs 挂载 GlusterFS 卷,读取文件列表 + */ +public class GlusterfsReader extends Reader { + + private static final Logger LOG = LoggerFactory.getLogger(GlusterfsReader.class); + + public static class Job extends Reader.Job { + private Configuration jobConfig = null; + private String mountPoint; + + @Override + public void init() { + this.jobConfig = super.getPluginJobConf(); + } + + @Override + public void prepare() { + this.mountPoint = "/dataset/mount/" + UUID.randomUUID(); + this.jobConfig.set("mountPoint", this.mountPoint); + + String ip = this.jobConfig.getString("ip"); + String volume = this.jobConfig.getString("volume"); + String subPath = this.jobConfig.getString("path", ""); + + // GlusterFS mount 格式: mount -t glusterfs ip:/volume /mountpoint + String remote = ip + ":/" + volume; + GlusterfsMountUtil.mount(remote, mountPoint, subPath); + } + + @Override + public List split(int adviceNumber) { + return Collections.singletonList(this.jobConfig); + } + + @Override + public void post() { + try { + GlusterfsMountUtil.umount(this.mountPoint); + new File(this.mountPoint).deleteOnExit(); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void destroy() { + } + } + + public static class Task extends Reader.Task { + + private Configuration jobConfig; + private String mountPoint; + private String subPath; + private Set fileType; + private List files; + + @Override + public void init() { + this.jobConfig = super.getPluginJobConf(); + this.mountPoint = this.jobConfig.getString("mountPoint"); + this.subPath = this.jobConfig.getString("path", ""); + this.fileType = new HashSet<>(this.jobConfig.getList("fileType", Collections.emptyList(), String.class)); + this.files = this.jobConfig.getList("files", Collections.emptyList(), String.class); + } + + @Override + public void startRead(RecordSender recordSender) { + String readPath = this.mountPoint; + if (StringUtils.isNotBlank(this.subPath)) { + readPath = this.mountPoint + "/" + this.subPath.replaceFirst("^/+", ""); + } + + try (Stream stream = Files.list(Paths.get(readPath))) { + List fileList = stream.filter(Files::isRegularFile) + .filter(file -> fileType.isEmpty() || fileType.contains(getFileSuffix(file))) + .map(path -> path.getFileName().toString()) + .filter(fileName -> this.files.isEmpty() || this.files.contains(fileName)) + .collect(Collectors.toList()); + + fileList.forEach(filePath -> { + Record record = recordSender.createRecord(); + record.addColumn(new StringColumn(filePath)); + recordSender.sendToWriter(record); + }); + this.jobConfig.set("columnNumber", 1); + } catch (IOException e) { + LOG.error("Error reading files from GlusterFS mount point: {}", readPath, e); + throw new RuntimeException(e); + } + } + + private String getFileSuffix(Path path) { + String fileName = path.getFileName().toString(); + int lastDotIndex = fileName.lastIndexOf('.'); + if (lastDotIndex == -1 || lastDotIndex == fileName.length() - 1) { + return ""; + } + return fileName.substring(lastDotIndex + 1); + } + + @Override + public void destroy() { + } + } +} diff --git a/runtime/datax/glusterfsreader/src/main/resources/plugin.json b/runtime/datax/glusterfsreader/src/main/resources/plugin.json new file mode 100644 index 0000000..ddcb0cb --- /dev/null +++ b/runtime/datax/glusterfsreader/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "glusterfsreader", + "class": "com.datamate.plugin.reader.glusterfsreader.GlusterfsReader", + "description": "read file list from GlusterFS distributed file system", + "developer": "datamate" +} diff --git a/runtime/datax/glusterfsreader/src/main/resources/plugin_job_template.json b/runtime/datax/glusterfsreader/src/main/resources/plugin_job_template.json new file mode 100644 index 0000000..003c053 --- /dev/null +++ b/runtime/datax/glusterfsreader/src/main/resources/plugin_job_template.json @@ -0,0 +1,10 @@ +{ + "name": "glusterfsreader", + "parameter": { + "ip": "", + "volume": "", + "path": "", + "fileType": [], + "files": [] + } +} diff --git a/runtime/datax/glusterfswriter/pom.xml b/runtime/datax/glusterfswriter/pom.xml new file mode 100644 index 0000000..ca3608d --- /dev/null +++ b/runtime/datax/glusterfswriter/pom.xml @@ -0,0 +1,80 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + + 4.0.0 + glusterfswriter + glusterfswriter + jar + + + + com.alibaba.datax + datax-core + ${datax-project-version} + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + org.slf4j + slf4j-api + + + + ch.qos.logback + logback-classic + + + + + + + src/main/java + + **/*.properties + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + + + diff --git a/runtime/datax/glusterfswriter/src/main/assembly/package.xml b/runtime/datax/glusterfswriter/src/main/assembly/package.xml new file mode 100644 index 0000000..4856c0c --- /dev/null +++ b/runtime/datax/glusterfswriter/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/glusterfswriter + + + target/ + + glusterfswriter-0.0.1-SNAPSHOT.jar + + plugin/writer/glusterfswriter + + + + + + false + plugin/writer/glusterfswriter/libs + runtime + + + diff --git a/runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/GlusterfsMountUtil.java b/runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/GlusterfsMountUtil.java new file mode 100644 index 0000000..98b36ac --- /dev/null +++ b/runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/GlusterfsMountUtil.java @@ -0,0 +1,118 @@ +package com.datamate.plugin.writer.glusterfswriter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.file.DirectoryNotEmptyException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; + +/** + * GlusterFS 挂载工具类 + * 通过系统命令 mount -t glusterfs 进行挂载 + */ +public final class GlusterfsMountUtil { + private static final Logger LOG = LoggerFactory.getLogger(GlusterfsMountUtil.class); + + private GlusterfsMountUtil() { + } + + /** + * 挂载 GlusterFS 卷 + * + * @param remote 远程地址,格式: ip:/volume + * @param mountPoint 本地挂载点 + */ + public static void mount(String remote, String mountPoint) { + try { + Path mp = Paths.get(mountPoint); + if (isMounted(mountPoint)) { + throw new IOException("Already mounted: " + mountPoint); + } + + Files.createDirectories(mp); + + ProcessBuilder pb = new ProcessBuilder(); + pb.command("mount", "-t", "glusterfs", remote, mountPoint); + + LOG.info("Mounting GlusterFS: {}", pb.command()); + pb.redirectErrorStream(true); + Process p = pb.start(); + StringBuilder output = new StringBuilder(); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()))) { + String line; + while ((line = reader.readLine()) != null) { + output.append(line).append(System.lineSeparator()); + } + } + int rc = p.waitFor(); + if (rc != 0) { + throw new RuntimeException("GlusterFS mount failed, exit=" + rc + ", output: " + output); + } + LOG.info("GlusterFS mounted successfully: {} -> {}", remote, mountPoint); + } catch (IOException | InterruptedException e) { + throw new RuntimeException("Failed to mount GlusterFS: " + remote, e); + } + } + + /** + * 卸载挂载点 + * + * @param mountPoint 挂载点路径 + * @throws IOException 卸载失败 + * @throws InterruptedException 进程等待中断 + */ + public static void umount(String mountPoint) throws IOException, InterruptedException { + if (!isMounted(mountPoint)) { + return; + } + + ProcessBuilder pb = new ProcessBuilder("umount", "-l", mountPoint); + pb.redirectErrorStream(true); + Process p = pb.start(); + StringBuilder output = new StringBuilder(); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()))) { + String line; + while ((line = reader.readLine()) != null) { + output.append(line).append(System.lineSeparator()); + } + } + int rc = p.waitFor(); + if (rc != 0) { + throw new RuntimeException("GlusterFS umount failed, exit=" + rc + ", output: " + output); + } + + // 清理空目录 + try { + Files.deleteIfExists(Paths.get(mountPoint)); + } catch (DirectoryNotEmptyException ignore) { + // 目录非空,保留 + } + LOG.info("GlusterFS unmounted: {}", mountPoint); + } + + /** + * 判断挂载点是否已挂载 + * + * @param mountPoint 挂载点路径 + * @return true 表示已挂载 + * @throws IOException 读取 /proc/mounts 失败 + */ + public static boolean isMounted(String mountPoint) throws IOException { + Path procMounts = Paths.get("/proc/mounts"); + if (!Files.exists(procMounts)) { + throw new IOException("/proc/mounts not found"); + } + String expected = mountPoint.trim(); + List lines = Files.readAllLines(procMounts); + return lines.stream() + .map(l -> l.split("\\s+")) + .filter(a -> a.length >= 2) + .anyMatch(a -> a[1].equals(expected)); + } +} diff --git a/runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/GlusterfsWriter.java b/runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/GlusterfsWriter.java new file mode 100644 index 0000000..61eba88 --- /dev/null +++ b/runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/GlusterfsWriter.java @@ -0,0 +1,123 @@ +package com.datamate.plugin.writer.glusterfswriter; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.CommonErrorCode; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +/** + * GlusterFS Writer 插件 + * 通过 mount -t glusterfs 挂载 GlusterFS 卷,将文件写入到目标位置 + */ +public class GlusterfsWriter extends Writer { + + private static final Logger LOG = LoggerFactory.getLogger(GlusterfsWriter.class); + + public static class Job extends Writer.Job { + private Configuration jobConfig; + private String mountPoint; + + @Override + public void init() { + this.jobConfig = super.getPluginJobConf(); + } + + @Override + public void prepare() { + this.mountPoint = "/dataset/mount/" + UUID.randomUUID(); + this.jobConfig.set("mountPoint", this.mountPoint); + new File(this.mountPoint).mkdirs(); + + String ip = this.jobConfig.getString("ip"); + String volume = this.jobConfig.getString("volume"); + + // GlusterFS mount 格式: mount -t glusterfs ip:/volume /mountpoint + String remote = ip + ":/" + volume; + GlusterfsMountUtil.mount(remote, mountPoint); + + String destPath = this.jobConfig.getString("destPath"); + new File(destPath).mkdirs(); + } + + @Override + public List split(int mandatoryNumber) { + return Collections.singletonList(this.jobConfig); + } + + @Override + public void post() { + try { + GlusterfsMountUtil.umount(this.mountPoint); + new File(this.mountPoint).deleteOnExit(); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void destroy() { + } + } + + public static class Task extends Writer.Task { + private Configuration jobConfig; + private String mountPoint; + private String subPath; + private String destPath; + private List files; + + @Override + public void init() { + this.jobConfig = super.getPluginJobConf(); + this.destPath = this.jobConfig.getString("destPath"); + this.mountPoint = this.jobConfig.getString("mountPoint"); + this.subPath = this.jobConfig.getString("path", ""); + this.files = this.jobConfig.getList("files", Collections.emptyList(), String.class); + } + + @Override + public void startWrite(RecordReceiver lineReceiver) { + String sourcePath = this.mountPoint; + if (StringUtils.isNotBlank(this.subPath)) { + sourcePath = this.mountPoint + "/" + this.subPath.replaceFirst("^/+", ""); + } + + try { + Record record; + while ((record = lineReceiver.getFromReader()) != null) { + String fileName = record.getColumn(0).asString(); + if (StringUtils.isBlank(fileName)) { + continue; + } + if (!files.isEmpty() && !files.contains(fileName)) { + continue; + } + + String filePath = sourcePath + "/" + fileName; + ShellUtil.runCommand("rsync", Arrays.asList("--no-links", "--chmod=754", "--", filePath, + this.destPath + "/" + fileName)); + } + } catch (Exception e) { + LOG.error("Error writing files from GlusterFS: {}", e.getMessage(), e); + throw DataXException.asDataXException(CommonErrorCode.RUNTIME_ERROR, e); + } + } + + @Override + public void destroy() { + } + } +} diff --git a/runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/ShellUtil.java b/runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/ShellUtil.java new file mode 100644 index 0000000..e25d28d --- /dev/null +++ b/runtime/datax/glusterfswriter/src/main/java/com/datamate/plugin/writer/glusterfswriter/ShellUtil.java @@ -0,0 +1,46 @@ +package com.datamate.plugin.writer.glusterfswriter; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; + +/** + * Shell 命令执行工具类 + */ +public class ShellUtil { + /** + * 执行 shell 命令 + * + * @param cmd 命令 + * @param extraArgs 额外参数,可为空 + * @return 命令完整输出(stdout + stderr) + * @throws Exception 如果命令返回非 0 或发生 IO 异常 + */ + public static String runCommand(String cmd, List extraArgs) throws Exception { + List commands = new ArrayList<>(); + commands.add(cmd); + if (extraArgs != null && !extraArgs.isEmpty()) { + commands.addAll(extraArgs); + } + + ProcessBuilder pb = new ProcessBuilder(commands); + pb.redirectErrorStream(true); // 合并 stdout & stderr + Process p = pb.start(); + + StringBuilder sb = new StringBuilder(); + try (BufferedReader br = new BufferedReader( + new InputStreamReader(p.getInputStream()))) { + String line; + while ((line = br.readLine()) != null) { + sb.append(line).append(System.lineSeparator()); + } + } + + int exit = p.waitFor(); + if (exit != 0) { + throw new RuntimeException("Command exited with code " + exit + System.lineSeparator() + sb); + } + return sb.toString(); + } +} diff --git a/runtime/datax/glusterfswriter/src/main/resources/plugin.json b/runtime/datax/glusterfswriter/src/main/resources/plugin.json new file mode 100644 index 0000000..c1b65b1 --- /dev/null +++ b/runtime/datax/glusterfswriter/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "glusterfswriter", + "class": "com.datamate.plugin.writer.glusterfswriter.GlusterfsWriter", + "description": "write files to GlusterFS distributed file system", + "developer": "datamate" +} diff --git a/runtime/datax/glusterfswriter/src/main/resources/plugin_job_template.json b/runtime/datax/glusterfswriter/src/main/resources/plugin_job_template.json new file mode 100644 index 0000000..7a7ee3e --- /dev/null +++ b/runtime/datax/glusterfswriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,10 @@ +{ + "name": "glusterfswriter", + "parameter": { + "ip": "", + "volume": "", + "path": "", + "destPath": "", + "files": [] + } +} diff --git a/runtime/datax/package.xml b/runtime/datax/package.xml index faf1fd9..b2124c5 100644 --- a/runtime/datax/package.xml +++ b/runtime/datax/package.xml @@ -271,6 +271,13 @@ datax + + glusterfsreader/target/datax/ + + **/*.* + + datax + @@ -581,6 +588,13 @@ datax + + glusterfswriter/target/datax/ + + **/*.* + + datax + obsreader/target/datax/ diff --git a/runtime/datax/pom.xml b/runtime/datax/pom.xml index 947b6e8..5436948 100644 --- a/runtime/datax/pom.xml +++ b/runtime/datax/pom.xml @@ -85,6 +85,7 @@ nfsreader + glusterfsreader mysqlwriter starrockswriter @@ -133,6 +134,7 @@ nfswriter + glusterfswriter plugin-rdbms-util plugin-unstructured-storage-util diff --git a/scripts/db/data-collection-init.sql b/scripts/db/data-collection-init.sql index 6eba014..2aebb29 100644 --- a/scripts/db/data-collection-init.sql +++ b/scripts/db/data-collection-init.sql @@ -76,4 +76,5 @@ INSERT IGNORE INTO t_dc_collection_templates(id, name, description, source_type, VALUES ('1', 'NAS归集模板', '将NAS存储上的文件归集到DataMate平台上。', 'nfsreader', 'nfsreader', 'nfswriter', 'nfswriter', '{"parameter": {"ip": {"name": "NAS地址","description": "NAS服务的地址,可以为IP或者域名。","type": "input", "required": true, "index": 1}, "path": {"name": "共享路径","description": "NAS服务的共享路径。","type": "input", "required": true, "index": 2}, "files": {"name": "文件列表","description": "指定文件列表进行归集。","type": "selectTag", "required": false, "index": 3}}, "reader": {}, "writer": {}}', True, 'system', 'system'), ('2', 'OBS归集模板', '将OBS存储上的文件归集到DataMate平台上。', 'obsreader', 'obsreader', 'obswriter', 'obswriter', '{"parameter": {"endpoint": {"name": "服务地址","description": "OBS的服务地址。","type": "input", "required": true, "index": 1},"bucket": {"name": "存储桶名称","description": "OBS存储桶名称。","type": "input", "required": true, "index": 2},"accessKey": {"name": "AK","description": "OBS访问密钥。","type": "input", "required": true, "index": 3},"secretKey": {"name": "SK","description": "OBS密钥。","type": "password", "required": true, "index": 4},"prefix": {"name": "匹配前缀","description": "按照匹配前缀去选中OBS中的文件进行归集。","type": "input", "required": true, "index": 5}}, "reader": {}, "writer": {}}', True, 'system', 'system'), ('3', 'MYSQL归集模板', '将MYSQL数据库中的数据以csv文件的形式归集到DataMate平台上。', 'mysqlreader', 'mysqlreader', 'txtfilewriter', 'txtfilewriter', '{"parameter": {}, "reader": {"username": {"name": "用户名","description": "数据库的用户名。","type": "input", "required": true, "index": 2}, "password": {"name": "密码","description": "数据库的密码。","type": "password", "required": true, "index": 3}, "connection": {"name": "数据库连接信息", "description": "数据库连接信息。", "type": "multipleList", "size": 1, "index": 1, "properties": {"jdbcUrl": {"type": "inputList", "name": "数据库连接", "description": "数据库连接url。", "required": true, "index": 1}, "querySql": {"type": "inputList", "name": "查询sql", "description": "输入符合语法的sql查询语句。", "required": true, "index": 2}}}}, "writer": {"header": {"name": "列名","description": "查询结果的列名,最终会体现为csv文件的表头。","type": "selectTag", "required": false}}}', True, 'system', 'system'), - ('4', 'StarRocks归集模板', '将StarRocks中的数据以csv文件的形式归集到DataMate平台上。', 'starrocksreader', 'starrocksreader', 'txtfilewriter', 'txtfilewriter', '{"parameter": {}, "reader": {"username": {"name": "用户名","description": "数据库的用户名。","type": "input", "required": true, "index": 2}, "password": {"name": "密码","description": "数据库的密码。","type": "password", "required": true, "index": 3}, "connection": {"name": "数据库连接信息", "description": "数据库连接信息。", "type": "multipleList", "size": 1, "index": 1, "properties": {"jdbcUrl": {"type": "inputList", "name": "数据库连接", "description": "数据库连接url。", "required": true, "index": 1}, "querySql": {"type": "inputList", "name": "查询sql", "description": "输入符合语法的sql查询语句。", "required": true, "index": 2}}}}, "writer": {"header": {"name": "列名","description": "查询结果的列名,最终会体现为csv文件的表头。","type": "selectTag", "required": false}}}', True, 'system', 'system'); + ('4', 'StarRocks归集模板', '将StarRocks中的数据以csv文件的形式归集到DataMate平台上。', 'starrocksreader', 'starrocksreader', 'txtfilewriter', 'txtfilewriter', '{"parameter": {}, "reader": {"username": {"name": "用户名","description": "数据库的用户名。","type": "input", "required": true, "index": 2}, "password": {"name": "密码","description": "数据库的密码。","type": "password", "required": true, "index": 3}, "connection": {"name": "数据库连接信息", "description": "数据库连接信息。", "type": "multipleList", "size": 1, "index": 1, "properties": {"jdbcUrl": {"type": "inputList", "name": "数据库连接", "description": "数据库连接url。", "required": true, "index": 1}, "querySql": {"type": "inputList", "name": "查询sql", "description": "输入符合语法的sql查询语句。", "required": true, "index": 2}}}}, "writer": {"header": {"name": "列名","description": "查询结果的列名,最终会体现为csv文件的表头。","type": "selectTag", "required": false}}}', True, 'system', 'system'), + ('5', 'GlusterFS归集模板', '将GlusterFS分布式文件系统上的文件归集到DataMate平台上。', 'glusterfsreader', 'glusterfsreader', 'glusterfswriter', 'glusterfswriter', '{"parameter": {"ip": {"name": "服务器地址","description": "GlusterFS服务器的IP地址或域名。","type": "input", "required": true, "index": 1}, "volume": {"name": "卷名称","description": "GlusterFS卷名称。","type": "input", "required": true, "index": 2}, "path": {"name": "子路径","description": "卷内的子目录路径(可选)。","type": "input", "required": false, "index": 3}, "files": {"name": "文件列表","description": "指定文件列表进行归集。","type": "selectTag", "required": false, "index": 4}}, "reader": {}, "writer": {}}', True, 'system', 'system'); diff --git a/scripts/images/backend-python/Dockerfile b/scripts/images/backend-python/Dockerfile index 052cc53..aa9ffc3 100644 --- a/scripts/images/backend-python/Dockerfile +++ b/scripts/images/backend-python/Dockerfile @@ -40,7 +40,7 @@ RUN if [ -f /etc/apt/sources.list.d/debian.sources ]; then \ sed -i 's/deb.debian.org/mirrors.aliyun.com/g' /etc/apt/sources.list; \ fi && \ apt-get update && \ - apt-get install -y --no-install-recommends vim openjdk-21-jre nfs-common rsync && \ + apt-get install -y --no-install-recommends vim openjdk-21-jre nfs-common glusterfs-client rsync && \ rm -rf /var/lib/apt/lists/* ENV PYTHONDONTWRITEBYTECODE=1 \