glusterfs support

This commit is contained in:
2026-01-09 13:49:18 +08:00
parent fa755faf72
commit 010ffceab5
17 changed files with 820 additions and 2 deletions

View File

@@ -0,0 +1,80 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>glusterfswriter</artifactId>
<name>glusterfswriter</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
</includes>
</resource>
</resources>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,35 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/glusterfswriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>glusterfswriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/glusterfswriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/glusterfswriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@@ -0,0 +1,118 @@
package com.datamate.plugin.writer.glusterfswriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.DirectoryNotEmptyException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
/**
* GlusterFS 挂载工具类
* 通过系统命令 mount -t glusterfs 进行挂载
*/
public final class GlusterfsMountUtil {
private static final Logger LOG = LoggerFactory.getLogger(GlusterfsMountUtil.class);
private GlusterfsMountUtil() {
}
/**
* 挂载 GlusterFS 卷
*
* @param remote 远程地址,格式: ip:/volume
* @param mountPoint 本地挂载点
*/
public static void mount(String remote, String mountPoint) {
try {
Path mp = Paths.get(mountPoint);
if (isMounted(mountPoint)) {
throw new IOException("Already mounted: " + mountPoint);
}
Files.createDirectories(mp);
ProcessBuilder pb = new ProcessBuilder();
pb.command("mount", "-t", "glusterfs", remote, mountPoint);
LOG.info("Mounting GlusterFS: {}", pb.command());
pb.redirectErrorStream(true);
Process p = pb.start();
StringBuilder output = new StringBuilder();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()))) {
String line;
while ((line = reader.readLine()) != null) {
output.append(line).append(System.lineSeparator());
}
}
int rc = p.waitFor();
if (rc != 0) {
throw new RuntimeException("GlusterFS mount failed, exit=" + rc + ", output: " + output);
}
LOG.info("GlusterFS mounted successfully: {} -> {}", remote, mountPoint);
} catch (IOException | InterruptedException e) {
throw new RuntimeException("Failed to mount GlusterFS: " + remote, e);
}
}
/**
* 卸载挂载点
*
* @param mountPoint 挂载点路径
* @throws IOException 卸载失败
* @throws InterruptedException 进程等待中断
*/
public static void umount(String mountPoint) throws IOException, InterruptedException {
if (!isMounted(mountPoint)) {
return;
}
ProcessBuilder pb = new ProcessBuilder("umount", "-l", mountPoint);
pb.redirectErrorStream(true);
Process p = pb.start();
StringBuilder output = new StringBuilder();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()))) {
String line;
while ((line = reader.readLine()) != null) {
output.append(line).append(System.lineSeparator());
}
}
int rc = p.waitFor();
if (rc != 0) {
throw new RuntimeException("GlusterFS umount failed, exit=" + rc + ", output: " + output);
}
// 清理空目录
try {
Files.deleteIfExists(Paths.get(mountPoint));
} catch (DirectoryNotEmptyException ignore) {
// 目录非空,保留
}
LOG.info("GlusterFS unmounted: {}", mountPoint);
}
/**
* 判断挂载点是否已挂载
*
* @param mountPoint 挂载点路径
* @return true 表示已挂载
* @throws IOException 读取 /proc/mounts 失败
*/
public static boolean isMounted(String mountPoint) throws IOException {
Path procMounts = Paths.get("/proc/mounts");
if (!Files.exists(procMounts)) {
throw new IOException("/proc/mounts not found");
}
String expected = mountPoint.trim();
List<String> lines = Files.readAllLines(procMounts);
return lines.stream()
.map(l -> l.split("\\s+"))
.filter(a -> a.length >= 2)
.anyMatch(a -> a[1].equals(expected));
}
}

View File

@@ -0,0 +1,123 @@
package com.datamate.plugin.writer.glusterfswriter;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.CommonErrorCode;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
/**
* GlusterFS Writer 插件
* 通过 mount -t glusterfs 挂载 GlusterFS 卷,将文件写入到目标位置
*/
public class GlusterfsWriter extends Writer {
private static final Logger LOG = LoggerFactory.getLogger(GlusterfsWriter.class);
public static class Job extends Writer.Job {
private Configuration jobConfig;
private String mountPoint;
@Override
public void init() {
this.jobConfig = super.getPluginJobConf();
}
@Override
public void prepare() {
this.mountPoint = "/dataset/mount/" + UUID.randomUUID();
this.jobConfig.set("mountPoint", this.mountPoint);
new File(this.mountPoint).mkdirs();
String ip = this.jobConfig.getString("ip");
String volume = this.jobConfig.getString("volume");
// GlusterFS mount 格式: mount -t glusterfs ip:/volume /mountpoint
String remote = ip + ":/" + volume;
GlusterfsMountUtil.mount(remote, mountPoint);
String destPath = this.jobConfig.getString("destPath");
new File(destPath).mkdirs();
}
@Override
public List<Configuration> split(int mandatoryNumber) {
return Collections.singletonList(this.jobConfig);
}
@Override
public void post() {
try {
GlusterfsMountUtil.umount(this.mountPoint);
new File(this.mountPoint).deleteOnExit();
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public void destroy() {
}
}
public static class Task extends Writer.Task {
private Configuration jobConfig;
private String mountPoint;
private String subPath;
private String destPath;
private List<String> files;
@Override
public void init() {
this.jobConfig = super.getPluginJobConf();
this.destPath = this.jobConfig.getString("destPath");
this.mountPoint = this.jobConfig.getString("mountPoint");
this.subPath = this.jobConfig.getString("path", "");
this.files = this.jobConfig.getList("files", Collections.emptyList(), String.class);
}
@Override
public void startWrite(RecordReceiver lineReceiver) {
String sourcePath = this.mountPoint;
if (StringUtils.isNotBlank(this.subPath)) {
sourcePath = this.mountPoint + "/" + this.subPath.replaceFirst("^/+", "");
}
try {
Record record;
while ((record = lineReceiver.getFromReader()) != null) {
String fileName = record.getColumn(0).asString();
if (StringUtils.isBlank(fileName)) {
continue;
}
if (!files.isEmpty() && !files.contains(fileName)) {
continue;
}
String filePath = sourcePath + "/" + fileName;
ShellUtil.runCommand("rsync", Arrays.asList("--no-links", "--chmod=754", "--", filePath,
this.destPath + "/" + fileName));
}
} catch (Exception e) {
LOG.error("Error writing files from GlusterFS: {}", e.getMessage(), e);
throw DataXException.asDataXException(CommonErrorCode.RUNTIME_ERROR, e);
}
}
@Override
public void destroy() {
}
}
}

View File

@@ -0,0 +1,46 @@
package com.datamate.plugin.writer.glusterfswriter;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
/**
* Shell 命令执行工具类
*/
public class ShellUtil {
/**
* 执行 shell 命令
*
* @param cmd 命令
* @param extraArgs 额外参数,可为空
* @return 命令完整输出(stdout + stderr)
* @throws Exception 如果命令返回非 0 或发生 IO 异常
*/
public static String runCommand(String cmd, List<String> extraArgs) throws Exception {
List<String> commands = new ArrayList<>();
commands.add(cmd);
if (extraArgs != null && !extraArgs.isEmpty()) {
commands.addAll(extraArgs);
}
ProcessBuilder pb = new ProcessBuilder(commands);
pb.redirectErrorStream(true); // 合并 stdout & stderr
Process p = pb.start();
StringBuilder sb = new StringBuilder();
try (BufferedReader br = new BufferedReader(
new InputStreamReader(p.getInputStream()))) {
String line;
while ((line = br.readLine()) != null) {
sb.append(line).append(System.lineSeparator());
}
}
int exit = p.waitFor();
if (exit != 0) {
throw new RuntimeException("Command exited with code " + exit + System.lineSeparator() + sb);
}
return sb.toString();
}
}

View File

@@ -0,0 +1,6 @@
{
"name": "glusterfswriter",
"class": "com.datamate.plugin.writer.glusterfswriter.GlusterfsWriter",
"description": "write files to GlusterFS distributed file system",
"developer": "datamate"
}

View File

@@ -0,0 +1,10 @@
{
"name": "glusterfswriter",
"parameter": {
"ip": "",
"volume": "",
"path": "",
"destPath": "",
"files": []
}
}