You've already forked DataMate
s3-compatible-fs support
This commit is contained in:
@@ -285,6 +285,13 @@
|
|||||||
</includes>
|
</includes>
|
||||||
<outputDirectory>datax</outputDirectory>
|
<outputDirectory>datax</outputDirectory>
|
||||||
</fileSet>
|
</fileSet>
|
||||||
|
<fileSet>
|
||||||
|
<directory>s3reader/target/datax/</directory>
|
||||||
|
<includes>
|
||||||
|
<include>**/*.*</include>
|
||||||
|
</includes>
|
||||||
|
<outputDirectory>datax</outputDirectory>
|
||||||
|
</fileSet>
|
||||||
|
|
||||||
<!-- writer -->
|
<!-- writer -->
|
||||||
<fileSet>
|
<fileSet>
|
||||||
@@ -609,6 +616,13 @@
|
|||||||
</includes>
|
</includes>
|
||||||
<outputDirectory>datax</outputDirectory>
|
<outputDirectory>datax</outputDirectory>
|
||||||
</fileSet>
|
</fileSet>
|
||||||
|
<fileSet>
|
||||||
|
<directory>s3writer/target/datax/</directory>
|
||||||
|
<includes>
|
||||||
|
<include>**/*.*</include>
|
||||||
|
</includes>
|
||||||
|
<outputDirectory>datax</outputDirectory>
|
||||||
|
</fileSet>
|
||||||
<fileSet>
|
<fileSet>
|
||||||
<directory>obsreader/target/datax/</directory>
|
<directory>obsreader/target/datax/</directory>
|
||||||
<includes>
|
<includes>
|
||||||
|
|||||||
@@ -87,6 +87,7 @@
|
|||||||
<module>nfsreader</module>
|
<module>nfsreader</module>
|
||||||
<module>glusterfsreader</module>
|
<module>glusterfsreader</module>
|
||||||
<module>localreader</module>
|
<module>localreader</module>
|
||||||
|
<module>s3reader</module>
|
||||||
<!-- writer -->
|
<!-- writer -->
|
||||||
<module>mysqlwriter</module>
|
<module>mysqlwriter</module>
|
||||||
<module>starrockswriter</module>
|
<module>starrockswriter</module>
|
||||||
@@ -137,6 +138,7 @@
|
|||||||
<module>nfswriter</module>
|
<module>nfswriter</module>
|
||||||
<module>glusterfswriter</module>
|
<module>glusterfswriter</module>
|
||||||
<module>localwriter</module>
|
<module>localwriter</module>
|
||||||
|
<module>s3writer</module>
|
||||||
<!-- common support module -->
|
<!-- common support module -->
|
||||||
<module>plugin-rdbms-util</module>
|
<module>plugin-rdbms-util</module>
|
||||||
<module>plugin-unstructured-storage-util</module>
|
<module>plugin-unstructured-storage-util</module>
|
||||||
|
|||||||
79
runtime/datax/s3reader/pom.xml
Normal file
79
runtime/datax/s3reader/pom.xml
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<parent>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<artifactId>datax-all</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<artifactId>s3reader</artifactId>
|
||||||
|
<name>s3reader</name>
|
||||||
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<artifactId>datax-core</artifactId>
|
||||||
|
<version>${datax-project-version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<artifactId>datax-common</artifactId>
|
||||||
|
<version>${datax-project-version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
<artifactId>slf4j-api</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>ch.qos.logback</groupId>
|
||||||
|
<artifactId>logback-classic</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>software.amazon.awssdk</groupId>
|
||||||
|
<artifactId>s3</artifactId>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
<build>
|
||||||
|
<resources>
|
||||||
|
<resource>
|
||||||
|
<directory>src/main/java</directory>
|
||||||
|
<includes>
|
||||||
|
<include>**/*.properties</include>
|
||||||
|
</includes>
|
||||||
|
</resource>
|
||||||
|
</resources>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<artifactId>maven-compiler-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<source>${jdk-version}</source>
|
||||||
|
<target>${jdk-version}</target>
|
||||||
|
<encoding>${project-sourceEncoding}</encoding>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<artifactId>maven-assembly-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<descriptors>
|
||||||
|
<descriptor>src/main/assembly/package.xml</descriptor>
|
||||||
|
</descriptors>
|
||||||
|
<finalName>datax</finalName>
|
||||||
|
</configuration>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<id>dwzip</id>
|
||||||
|
<phase>package</phase>
|
||||||
|
<goals>
|
||||||
|
<goal>single</goal>
|
||||||
|
</goals>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
|
</project>
|
||||||
35
runtime/datax/s3reader/src/main/assembly/package.xml
Normal file
35
runtime/datax/s3reader/src/main/assembly/package.xml
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
<assembly
|
||||||
|
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
|
||||||
|
<id></id>
|
||||||
|
<formats>
|
||||||
|
<format>dir</format>
|
||||||
|
</formats>
|
||||||
|
<includeBaseDirectory>false</includeBaseDirectory>
|
||||||
|
<fileSets>
|
||||||
|
<fileSet>
|
||||||
|
<directory>src/main/resources</directory>
|
||||||
|
<includes>
|
||||||
|
<include>plugin.json</include>
|
||||||
|
<include>plugin_job_template.json</include>
|
||||||
|
</includes>
|
||||||
|
<outputDirectory>plugin/reader/s3reader</outputDirectory>
|
||||||
|
</fileSet>
|
||||||
|
<fileSet>
|
||||||
|
<directory>target/</directory>
|
||||||
|
<includes>
|
||||||
|
<include>s3reader-0.0.1-SNAPSHOT.jar</include>
|
||||||
|
</includes>
|
||||||
|
<outputDirectory>plugin/reader/s3reader</outputDirectory>
|
||||||
|
</fileSet>
|
||||||
|
</fileSets>
|
||||||
|
|
||||||
|
<dependencySets>
|
||||||
|
<dependencySet>
|
||||||
|
<useProjectArtifact>false</useProjectArtifact>
|
||||||
|
<outputDirectory>plugin/reader/s3reader/libs</outputDirectory>
|
||||||
|
<scope>runtime</scope>
|
||||||
|
</dependencySet>
|
||||||
|
</dependencySets>
|
||||||
|
</assembly>
|
||||||
@@ -0,0 +1,222 @@
|
|||||||
|
package com.datamate.plugin.reader.s3reader;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.element.Record;
|
||||||
|
import com.alibaba.datax.common.element.StringColumn;
|
||||||
|
import com.alibaba.datax.common.exception.CommonErrorCode;
|
||||||
|
import com.alibaba.datax.common.exception.DataXException;
|
||||||
|
import com.alibaba.datax.common.plugin.RecordSender;
|
||||||
|
import com.alibaba.datax.common.spi.Reader;
|
||||||
|
import com.alibaba.datax.common.util.Configuration;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
|
||||||
|
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
|
||||||
|
import software.amazon.awssdk.regions.Region;
|
||||||
|
import software.amazon.awssdk.services.s3.S3Client;
|
||||||
|
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
|
||||||
|
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
|
||||||
|
import software.amazon.awssdk.services.s3.model.S3Object;
|
||||||
|
import software.amazon.awssdk.services.s3.S3Configuration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* S3兼容对象存储读取器
|
||||||
|
* 支持自定义 S3 兼容对象存储(如 MinIO、Ceph 等)
|
||||||
|
*/
|
||||||
|
public class S3Reader extends Reader {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(S3Reader.class);
|
||||||
|
|
||||||
|
public static class Job extends Reader.Job {
|
||||||
|
private Configuration jobConfig = null;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init() {
|
||||||
|
this.jobConfig = super.getPluginJobConf();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void prepare() {
|
||||||
|
String endpoint = this.jobConfig.getString("endpoint");
|
||||||
|
String bucket = this.jobConfig.getString("bucket");
|
||||||
|
String accessKey = this.jobConfig.getString("accessKey");
|
||||||
|
String secretKey = this.jobConfig.getString("secretKey");
|
||||||
|
|
||||||
|
if (StringUtils.isBlank(endpoint)) {
|
||||||
|
throw new RuntimeException("endpoint is required for s3reader");
|
||||||
|
}
|
||||||
|
if (StringUtils.isBlank(bucket)) {
|
||||||
|
throw new RuntimeException("bucket is required for s3reader");
|
||||||
|
}
|
||||||
|
if (StringUtils.isBlank(accessKey)) {
|
||||||
|
throw new RuntimeException("accessKey is required for s3reader");
|
||||||
|
}
|
||||||
|
if (StringUtils.isBlank(secretKey)) {
|
||||||
|
throw new RuntimeException("secretKey is required for s3reader");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Configuration> split(int adviceNumber) {
|
||||||
|
return Collections.singletonList(this.jobConfig);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void post() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy() {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Task extends Reader.Task {
|
||||||
|
|
||||||
|
private Configuration jobConfig;
|
||||||
|
private Set<String> fileType;
|
||||||
|
private String endpoint;
|
||||||
|
private String accessKey;
|
||||||
|
private String secretKey;
|
||||||
|
private String bucket;
|
||||||
|
private String prefix;
|
||||||
|
private String region;
|
||||||
|
private S3Client s3;
|
||||||
|
private String effectivePrefix;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init() {
|
||||||
|
this.jobConfig = super.getPluginJobConf();
|
||||||
|
this.fileType = new HashSet<>(this.jobConfig.getList("fileType", Collections.emptyList(), String.class));
|
||||||
|
this.endpoint = this.jobConfig.getString("endpoint");
|
||||||
|
this.accessKey = this.jobConfig.getString("accessKey");
|
||||||
|
this.secretKey = this.jobConfig.getString("secretKey");
|
||||||
|
this.bucket = this.jobConfig.getString("bucket");
|
||||||
|
this.prefix = this.jobConfig.getString("prefix");
|
||||||
|
this.region = this.jobConfig.getString("region", "us-east-1");
|
||||||
|
this.s3 = getS3Client();
|
||||||
|
this.effectivePrefix = getEffectivePrefix();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void startRead(RecordSender recordSender) {
|
||||||
|
try {
|
||||||
|
List<String> files = listFiles().stream()
|
||||||
|
.filter(file -> fileType.isEmpty() || fileType.contains(getFileSuffix(file)))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
files.forEach(filePath -> {
|
||||||
|
Record record = recordSender.createRecord();
|
||||||
|
record.addColumn(new StringColumn(filePath));
|
||||||
|
recordSender.sendToWriter(record);
|
||||||
|
});
|
||||||
|
this.jobConfig.set("columnNumber", 1);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Error reading files from S3 compatible storage: {}", this.endpoint, e);
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 列举 S3 对象
|
||||||
|
* 非递归:只列举 prefix 当前目录下的对象(通过 delimiter="/" 实现)
|
||||||
|
*/
|
||||||
|
private List<String> listFiles() {
|
||||||
|
if (StringUtils.isBlank(endpoint) || StringUtils.isBlank(bucket)) {
|
||||||
|
throw new IllegalArgumentException("endpoint and bucket must be provided");
|
||||||
|
}
|
||||||
|
List<String> keys = new ArrayList<>();
|
||||||
|
String continuationToken = null;
|
||||||
|
try {
|
||||||
|
do {
|
||||||
|
ListObjectsV2Request.Builder reqBuilder = ListObjectsV2Request.builder()
|
||||||
|
.bucket(bucket)
|
||||||
|
.prefix(effectivePrefix)
|
||||||
|
.delimiter("/");
|
||||||
|
if (continuationToken != null) {
|
||||||
|
reqBuilder.continuationToken(continuationToken);
|
||||||
|
}
|
||||||
|
ListObjectsV2Response res = s3.listObjectsV2(reqBuilder.build());
|
||||||
|
for (S3Object obj : res.contents()) {
|
||||||
|
String key = obj.key();
|
||||||
|
if (isInValid(key)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
keys.add(key);
|
||||||
|
}
|
||||||
|
continuationToken = res.isTruncated() ? res.nextContinuationToken() : null;
|
||||||
|
} while (continuationToken != null);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Failed to list S3 objects: {}", e.getMessage(), e);
|
||||||
|
}
|
||||||
|
return keys;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isInValid(String key) {
|
||||||
|
if (!effectivePrefix.isEmpty() && !key.startsWith(effectivePrefix)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (key.equals(effectivePrefix) || key.endsWith("/")) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getEffectivePrefix() {
|
||||||
|
String effectivePrefix = "";
|
||||||
|
if (prefix != null) {
|
||||||
|
effectivePrefix = prefix.startsWith("/") ? prefix.substring(1) : prefix;
|
||||||
|
if (!effectivePrefix.isEmpty() && !effectivePrefix.endsWith("/")) {
|
||||||
|
effectivePrefix = effectivePrefix + "/";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return effectivePrefix;
|
||||||
|
}
|
||||||
|
|
||||||
|
private S3Client getS3Client() {
|
||||||
|
try {
|
||||||
|
AwsBasicCredentials creds = AwsBasicCredentials.create(accessKey, secretKey);
|
||||||
|
S3Configuration serviceConfig = S3Configuration.builder()
|
||||||
|
.pathStyleAccessEnabled(true)
|
||||||
|
.build();
|
||||||
|
return S3Client.builder()
|
||||||
|
.endpointOverride(new URI(endpoint))
|
||||||
|
.region(Region.of(region))
|
||||||
|
.serviceConfiguration(serviceConfig)
|
||||||
|
.credentialsProvider(StaticCredentialsProvider.create(creds))
|
||||||
|
.build();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Error init S3 client: {}", this.endpoint, e);
|
||||||
|
throw DataXException.asDataXException(CommonErrorCode.RUNTIME_ERROR, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getFileSuffix(String key) {
|
||||||
|
String fileName = Paths.get(key).getFileName().toString();
|
||||||
|
int lastDotIndex = fileName.lastIndexOf('.');
|
||||||
|
if (lastDotIndex == -1 || lastDotIndex == fileName.length() - 1) {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
return fileName.substring(lastDotIndex + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy() {
|
||||||
|
if (s3 != null) {
|
||||||
|
try {
|
||||||
|
s3.close();
|
||||||
|
} catch (Exception ignore) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
6
runtime/datax/s3reader/src/main/resources/plugin.json
Normal file
6
runtime/datax/s3reader/src/main/resources/plugin.json
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
{
|
||||||
|
"name": "s3reader",
|
||||||
|
"class": "com.datamate.plugin.reader.s3reader.S3Reader",
|
||||||
|
"description": "read from S3 compatible object storage",
|
||||||
|
"developer": "datamate"
|
||||||
|
}
|
||||||
@@ -0,0 +1,11 @@
|
|||||||
|
{
|
||||||
|
"name": "s3reader",
|
||||||
|
"parameter": {
|
||||||
|
"endpoint": "http://127.0.0.1:9000",
|
||||||
|
"bucket": "test-bucket",
|
||||||
|
"accessKey": "ak-xxx",
|
||||||
|
"secretKey": "sk-xxx",
|
||||||
|
"prefix": "/test",
|
||||||
|
"region": "us-east-1"
|
||||||
|
}
|
||||||
|
}
|
||||||
79
runtime/datax/s3writer/pom.xml
Normal file
79
runtime/datax/s3writer/pom.xml
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<parent>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<artifactId>datax-all</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<artifactId>s3writer</artifactId>
|
||||||
|
<name>s3writer</name>
|
||||||
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<artifactId>datax-core</artifactId>
|
||||||
|
<version>${datax-project-version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<artifactId>datax-common</artifactId>
|
||||||
|
<version>${datax-project-version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
<artifactId>slf4j-api</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>ch.qos.logback</groupId>
|
||||||
|
<artifactId>logback-classic</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>software.amazon.awssdk</groupId>
|
||||||
|
<artifactId>s3</artifactId>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
<build>
|
||||||
|
<resources>
|
||||||
|
<resource>
|
||||||
|
<directory>src/main/java</directory>
|
||||||
|
<includes>
|
||||||
|
<include>**/*.properties</include>
|
||||||
|
</includes>
|
||||||
|
</resource>
|
||||||
|
</resources>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<artifactId>maven-compiler-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<source>${jdk-version}</source>
|
||||||
|
<target>${jdk-version}</target>
|
||||||
|
<encoding>${project-sourceEncoding}</encoding>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<artifactId>maven-assembly-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<descriptors>
|
||||||
|
<descriptor>src/main/assembly/package.xml</descriptor>
|
||||||
|
</descriptors>
|
||||||
|
<finalName>datax</finalName>
|
||||||
|
</configuration>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<id>dwzip</id>
|
||||||
|
<phase>package</phase>
|
||||||
|
<goals>
|
||||||
|
<goal>single</goal>
|
||||||
|
</goals>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
|
</project>
|
||||||
35
runtime/datax/s3writer/src/main/assembly/package.xml
Normal file
35
runtime/datax/s3writer/src/main/assembly/package.xml
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
<assembly
|
||||||
|
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
|
||||||
|
<id></id>
|
||||||
|
<formats>
|
||||||
|
<format>dir</format>
|
||||||
|
</formats>
|
||||||
|
<includeBaseDirectory>false</includeBaseDirectory>
|
||||||
|
<fileSets>
|
||||||
|
<fileSet>
|
||||||
|
<directory>src/main/resources</directory>
|
||||||
|
<includes>
|
||||||
|
<include>plugin.json</include>
|
||||||
|
<include>plugin_job_template.json</include>
|
||||||
|
</includes>
|
||||||
|
<outputDirectory>plugin/writer/s3writer</outputDirectory>
|
||||||
|
</fileSet>
|
||||||
|
<fileSet>
|
||||||
|
<directory>target/</directory>
|
||||||
|
<includes>
|
||||||
|
<include>s3writer-0.0.1-SNAPSHOT.jar</include>
|
||||||
|
</includes>
|
||||||
|
<outputDirectory>plugin/writer/s3writer</outputDirectory>
|
||||||
|
</fileSet>
|
||||||
|
</fileSets>
|
||||||
|
|
||||||
|
<dependencySets>
|
||||||
|
<dependencySet>
|
||||||
|
<useProjectArtifact>false</useProjectArtifact>
|
||||||
|
<outputDirectory>plugin/writer/s3writer/libs</outputDirectory>
|
||||||
|
<scope>runtime</scope>
|
||||||
|
</dependencySet>
|
||||||
|
</dependencySets>
|
||||||
|
</assembly>
|
||||||
@@ -0,0 +1,181 @@
|
|||||||
|
package com.datamate.plugin.writer.s3writer;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.element.Record;
|
||||||
|
import com.alibaba.datax.common.exception.CommonErrorCode;
|
||||||
|
import com.alibaba.datax.common.exception.DataXException;
|
||||||
|
import com.alibaba.datax.common.plugin.RecordReceiver;
|
||||||
|
import com.alibaba.datax.common.spi.Writer;
|
||||||
|
import com.alibaba.datax.common.util.Configuration;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
|
||||||
|
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
|
||||||
|
import software.amazon.awssdk.core.sync.ResponseTransformer;
|
||||||
|
import software.amazon.awssdk.regions.Region;
|
||||||
|
import software.amazon.awssdk.services.s3.S3Client;
|
||||||
|
import software.amazon.awssdk.services.s3.S3Configuration;
|
||||||
|
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* S3兼容对象存储写入器
|
||||||
|
* 从S3兼容存储下载文件到本地目标目录
|
||||||
|
*/
|
||||||
|
public class S3Writer extends Writer {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(S3Writer.class);
|
||||||
|
|
||||||
|
public static class Job extends Writer.Job {
|
||||||
|
private Configuration jobConfig = null;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init() {
|
||||||
|
this.jobConfig = super.getPluginJobConf();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void prepare() {
|
||||||
|
String destPath = this.jobConfig.getString("destPath");
|
||||||
|
if (StringUtils.isBlank(destPath)) {
|
||||||
|
throw new RuntimeException("destPath is required for s3writer");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Files.createDirectories(Paths.get(destPath));
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException("Failed to create destination directory: " + destPath, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Configuration> split(int adviceNumber) {
|
||||||
|
return Collections.singletonList(this.jobConfig);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void post() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy() {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Task extends Writer.Task {
|
||||||
|
|
||||||
|
private Configuration jobConfig;
|
||||||
|
private Set<String> fileType;
|
||||||
|
private String endpoint;
|
||||||
|
private String accessKey;
|
||||||
|
private String secretKey;
|
||||||
|
private String bucket;
|
||||||
|
private String destPath;
|
||||||
|
private String region;
|
||||||
|
private S3Client s3;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init() {
|
||||||
|
this.jobConfig = super.getPluginJobConf();
|
||||||
|
this.fileType = new HashSet<>(this.jobConfig.getList("fileType", Collections.emptyList(), String.class));
|
||||||
|
this.endpoint = this.jobConfig.getString("endpoint");
|
||||||
|
this.accessKey = this.jobConfig.getString("accessKey");
|
||||||
|
this.secretKey = this.jobConfig.getString("secretKey");
|
||||||
|
this.bucket = this.jobConfig.getString("bucket");
|
||||||
|
this.destPath = this.jobConfig.getString("destPath");
|
||||||
|
this.region = this.jobConfig.getString("region", "us-east-1");
|
||||||
|
this.s3 = getS3Client();
|
||||||
|
}
|
||||||
|
|
||||||
|
private S3Client getS3Client() {
|
||||||
|
try {
|
||||||
|
AwsBasicCredentials creds = AwsBasicCredentials.create(accessKey, secretKey);
|
||||||
|
S3Configuration serviceConfig = S3Configuration.builder()
|
||||||
|
.pathStyleAccessEnabled(true)
|
||||||
|
.build();
|
||||||
|
return S3Client.builder()
|
||||||
|
.endpointOverride(new URI(endpoint))
|
||||||
|
.region(Region.of(region))
|
||||||
|
.serviceConfiguration(serviceConfig)
|
||||||
|
.credentialsProvider(StaticCredentialsProvider.create(creds))
|
||||||
|
.build();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Error init S3 client: {}", this.endpoint, e);
|
||||||
|
throw DataXException.asDataXException(CommonErrorCode.RUNTIME_ERROR, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void startWrite(RecordReceiver lineReceiver) {
|
||||||
|
try {
|
||||||
|
Record record;
|
||||||
|
while ((record = lineReceiver.getFromReader()) != null) {
|
||||||
|
String key = record.getColumn(0).asString();
|
||||||
|
if (StringUtils.isBlank(key)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
copyFileFromS3(key);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Error writing files from S3 compatible storage: {}", this.endpoint, e);
|
||||||
|
throw DataXException.asDataXException(CommonErrorCode.RUNTIME_ERROR, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void copyFileFromS3(String key) throws IOException {
|
||||||
|
if (StringUtils.isBlank(endpoint) || StringUtils.isBlank(bucket)) {
|
||||||
|
throw new IllegalArgumentException("endpoint and bucket must be provided");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Path targetDir = Paths.get(destPath);
|
||||||
|
try {
|
||||||
|
Files.createDirectories(targetDir);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Create dest dir {} failed: {}", targetDir, e.getMessage(), e);
|
||||||
|
}
|
||||||
|
|
||||||
|
String fileName = Paths.get(key).getFileName().toString();
|
||||||
|
if (StringUtils.isBlank(fileName)) {
|
||||||
|
LOG.warn("Skip object with empty file name for key {}", key);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Path target = targetDir.resolve(fileName);
|
||||||
|
try {
|
||||||
|
if (Files.exists(target)) {
|
||||||
|
Files.delete(target);
|
||||||
|
}
|
||||||
|
GetObjectRequest getReq = GetObjectRequest.builder()
|
||||||
|
.bucket(bucket)
|
||||||
|
.key(key)
|
||||||
|
.build();
|
||||||
|
s3.getObject(getReq, ResponseTransformer.toFile(target));
|
||||||
|
LOG.info("Downloaded S3 object {} to {}", key, target.toString());
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.warn("Failed to download object {}: {}", key, ex.getMessage(), ex);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Failed to download object {}: {}", key, e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy() {
|
||||||
|
if (s3 != null) {
|
||||||
|
try {
|
||||||
|
s3.close();
|
||||||
|
} catch (Exception ignore) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
6
runtime/datax/s3writer/src/main/resources/plugin.json
Normal file
6
runtime/datax/s3writer/src/main/resources/plugin.json
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
{
|
||||||
|
"name": "s3writer",
|
||||||
|
"class": "com.datamate.plugin.writer.s3writer.S3Writer",
|
||||||
|
"description": "write S3 compatible object storage files to local",
|
||||||
|
"developer": "datamate"
|
||||||
|
}
|
||||||
@@ -0,0 +1,12 @@
|
|||||||
|
{
|
||||||
|
"name": "s3writer",
|
||||||
|
"parameter": {
|
||||||
|
"endpoint": "http://127.0.0.1:9000",
|
||||||
|
"bucket": "test-bucket",
|
||||||
|
"accessKey": "ak-xxx",
|
||||||
|
"secretKey": "sk-xxx",
|
||||||
|
"prefix": "/test",
|
||||||
|
"region": "us-east-1",
|
||||||
|
"destPath": "/data/dest"
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -78,4 +78,5 @@ VALUES ('1', 'NAS归集模板', '将NAS存储上的文件归集到DataMate平台
|
|||||||
('3', 'MYSQL归集模板', '将MYSQL数据库中的数据以csv文件的形式归集到DataMate平台上。', 'mysqlreader', 'mysqlreader', 'txtfilewriter', 'txtfilewriter', '{"parameter": {}, "reader": {"username": {"name": "用户名","description": "数据库的用户名。","type": "input", "required": true, "index": 2}, "password": {"name": "密码","description": "数据库的密码。","type": "password", "required": true, "index": 3}, "connection": {"name": "数据库连接信息", "description": "数据库连接信息。", "type": "multipleList", "size": 1, "index": 1, "properties": {"jdbcUrl": {"type": "inputList", "name": "数据库连接", "description": "数据库连接url。", "required": true, "index": 1}, "querySql": {"type": "inputList", "name": "查询sql", "description": "输入符合语法的sql查询语句。", "required": true, "index": 2}}}}, "writer": {"header": {"name": "列名","description": "查询结果的列名,最终会体现为csv文件的表头。","type": "selectTag", "required": false}}}', True, 'system', 'system'),
|
('3', 'MYSQL归集模板', '将MYSQL数据库中的数据以csv文件的形式归集到DataMate平台上。', 'mysqlreader', 'mysqlreader', 'txtfilewriter', 'txtfilewriter', '{"parameter": {}, "reader": {"username": {"name": "用户名","description": "数据库的用户名。","type": "input", "required": true, "index": 2}, "password": {"name": "密码","description": "数据库的密码。","type": "password", "required": true, "index": 3}, "connection": {"name": "数据库连接信息", "description": "数据库连接信息。", "type": "multipleList", "size": 1, "index": 1, "properties": {"jdbcUrl": {"type": "inputList", "name": "数据库连接", "description": "数据库连接url。", "required": true, "index": 1}, "querySql": {"type": "inputList", "name": "查询sql", "description": "输入符合语法的sql查询语句。", "required": true, "index": 2}}}}, "writer": {"header": {"name": "列名","description": "查询结果的列名,最终会体现为csv文件的表头。","type": "selectTag", "required": false}}}', True, 'system', 'system'),
|
||||||
('4', 'StarRocks归集模板', '将StarRocks中的数据以csv文件的形式归集到DataMate平台上。', 'starrocksreader', 'starrocksreader', 'txtfilewriter', 'txtfilewriter', '{"parameter": {}, "reader": {"username": {"name": "用户名","description": "数据库的用户名。","type": "input", "required": true, "index": 2}, "password": {"name": "密码","description": "数据库的密码。","type": "password", "required": true, "index": 3}, "connection": {"name": "数据库连接信息", "description": "数据库连接信息。", "type": "multipleList", "size": 1, "index": 1, "properties": {"jdbcUrl": {"type": "inputList", "name": "数据库连接", "description": "数据库连接url。", "required": true, "index": 1}, "querySql": {"type": "inputList", "name": "查询sql", "description": "输入符合语法的sql查询语句。", "required": true, "index": 2}}}}, "writer": {"header": {"name": "列名","description": "查询结果的列名,最终会体现为csv文件的表头。","type": "selectTag", "required": false}}}', True, 'system', 'system'),
|
('4', 'StarRocks归集模板', '将StarRocks中的数据以csv文件的形式归集到DataMate平台上。', 'starrocksreader', 'starrocksreader', 'txtfilewriter', 'txtfilewriter', '{"parameter": {}, "reader": {"username": {"name": "用户名","description": "数据库的用户名。","type": "input", "required": true, "index": 2}, "password": {"name": "密码","description": "数据库的密码。","type": "password", "required": true, "index": 3}, "connection": {"name": "数据库连接信息", "description": "数据库连接信息。", "type": "multipleList", "size": 1, "index": 1, "properties": {"jdbcUrl": {"type": "inputList", "name": "数据库连接", "description": "数据库连接url。", "required": true, "index": 1}, "querySql": {"type": "inputList", "name": "查询sql", "description": "输入符合语法的sql查询语句。", "required": true, "index": 2}}}}, "writer": {"header": {"name": "列名","description": "查询结果的列名,最终会体现为csv文件的表头。","type": "selectTag", "required": false}}}', True, 'system', 'system'),
|
||||||
('5', 'GlusterFS归集模板', '将GlusterFS分布式文件系统上的文件归集到DataMate平台上。', 'glusterfsreader', 'glusterfsreader', 'glusterfswriter', 'glusterfswriter', '{"parameter": {"ip": {"name": "服务器地址","description": "GlusterFS服务器的IP地址或域名。","type": "input", "required": true, "index": 1}, "volume": {"name": "卷名称","description": "GlusterFS卷名称。","type": "input", "required": true, "index": 2}, "path": {"name": "子路径","description": "卷内的子目录路径(可选)。","type": "input", "required": false, "index": 3}, "files": {"name": "文件列表","description": "指定文件列表进行归集。","type": "selectTag", "required": false, "index": 4}}, "reader": {}, "writer": {}}', True, 'system', 'system'),
|
('5', 'GlusterFS归集模板', '将GlusterFS分布式文件系统上的文件归集到DataMate平台上。', 'glusterfsreader', 'glusterfsreader', 'glusterfswriter', 'glusterfswriter', '{"parameter": {"ip": {"name": "服务器地址","description": "GlusterFS服务器的IP地址或域名。","type": "input", "required": true, "index": 1}, "volume": {"name": "卷名称","description": "GlusterFS卷名称。","type": "input", "required": true, "index": 2}, "path": {"name": "子路径","description": "卷内的子目录路径(可选)。","type": "input", "required": false, "index": 3}, "files": {"name": "文件列表","description": "指定文件列表进行归集。","type": "selectTag", "required": false, "index": 4}}, "reader": {}, "writer": {}}', True, 'system', 'system'),
|
||||||
('6', '本地文件夹归集模板', '将本地文件系统上的文件归集到DataMate平台上。', 'localreader', 'localreader', 'localwriter', 'localwriter', '{"parameter": {"path": {"name": "源目录路径","description": "本地文件系统的源目录绝对路径。","type": "input", "required": true, "index": 1}, "files": {"name": "文件列表","description": "指定文件列表进行归集。","type": "selectTag", "required": false, "index": 2}}, "reader": {}, "writer": {}}', True, 'system', 'system');
|
('6', '本地文件夹归集模板', '将本地文件系统上的文件归集到DataMate平台上。', 'localreader', 'localreader', 'localwriter', 'localwriter', '{"parameter": {"path": {"name": "源目录路径","description": "本地文件系统的源目录绝对路径。","type": "input", "required": true, "index": 1}, "files": {"name": "文件列表","description": "指定文件列表进行归集。","type": "selectTag", "required": false, "index": 2}}, "reader": {}, "writer": {}}', True, 'system', 'system'),
|
||||||
|
('7', 'S3兼容存储归集模板', '将S3兼容对象存储(如MinIO、Ceph等)上的文件归集到DataMate平台上。', 's3reader', 's3reader', 's3writer', 's3writer', '{"parameter": {"endpoint": {"name": "服务地址","description": "S3兼容存储的服务地址(如http://minio.example.com:9000)。","type": "input", "required": true, "index": 1}, "bucket": {"name": "存储桶名称","description": "S3存储桶名称。","type": "input", "required": true, "index": 2}, "accessKey": {"name": "Access Key","description": "S3访问密钥(Access Key ID)。","type": "input", "required": true, "index": 3}, "secretKey": {"name": "Secret Key","description": "S3密钥(Secret Access Key)。","type": "password", "required": true, "index": 4}, "prefix": {"name": "匹配前缀","description": "按照匹配前缀选中S3中的文件进行归集。","type": "input", "required": false, "index": 5}, "region": {"name": "区域","description": "S3区域(默认us-east-1)。","type": "input", "required": false, "index": 6}}, "reader": {}, "writer": {}}', True, 'system', 'system');
|
||||||
|
|||||||
Reference in New Issue
Block a user