Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions docs/en/connectors/source/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,60 @@ If one dialect not supported by SeaTunnel, it will use the default dialect `Gene
| IRIS | Inceptor | Highgo |


### Postgres Copy Method Read

`Postgres` database supports reading data using the `COPY` protocol, which is generally much faster than using `SELECT` queries.

To enable this feature, configure the following options:

| Name | Type | Required | Default Value | Description |
| --- | --- | --- | --- | --- |
| use_copy_statement | Boolean | No | false | Whether to use COPY method for reading. |
| binary | Boolean | No | false | Whether to use binary format for COPY reading. Only takes effect when use_copy_statement=true. |
| pg_copy_buffer_size | Int | No | 1048576 | Buffer size for COPY reading (bytes). Only takes effect when use_copy_statement=true. |

> Note: Currently, this feature is only available for Postgres database.

#### Compatibility and SQL Construction

This feature is fully compatible with the existing sharding mechanism of JDBC Source. When `use_copy_statement=true` is enabled, SeaTunnel automatically constructs efficient COPY SQL statements based on your configured `query`, `partition_column`, and `where_condition`.

**SQL Generation Logic Example:**

1. **Sharding and Filtering**: If `partition_column` or `where_condition` is configured, the COPY statement will include a subquery to support data splitting:
```sql
COPY (
SELECT * FROM (
SELECT * FROM "schema"."table_name"
WHERE "partition_column" >= ? AND "partition_column" < ?
) tmp <where_condition>
) TO STDOUT WITH [BINARY|CSV];
```

2. **Custom Query**: If only `query` is configured, it is constructed directly based on the query statement:
```sql
COPY (
SELECT * FROM (<query>) tmp
) TO STDOUT WITH [BINARY|CSV];
```

Example configuration:

```hocon
Jdbc {
url = "jdbc:postgresql://localhost:5432/test"
driver = "org.postgresql.Driver"
user = "postgres"
password = "password"
# Enable COPY method
use_copy_statement = true
# Enable binary COPY method
binary = true
# Buffer size
pg_copy_buffer_size = 1048576
}
```

## Parallel Reader

The JDBC Source connector supports parallel reading of data from tables. SeaTunnel will use certain rules to split the data in the table, which will be handed over to readers for reading. The number of readers is determined by the `parallelism` option.
Expand Down
54 changes: 54 additions & 0 deletions docs/zh/connectors/source/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,60 @@ int_type_narrowing = false
| Vertica | OceanBase | XUGU |
| IRIS | Inceptor | Highgo |

### Postgres Copy 方式读取

`Postgres` 数据库支持使用 `COPY` 协议进行数据读取,这通常比使用 `SELECT` 查询快得多。

要启用此功能,请配置以下选项:

| 参数名 | 类型 | 必填 | 默认值 | 描述 |
| --- | --- | --- | --- | --- |
| use_copy_statement | Boolean | 否 | false | 是否使用 COPY 方式读取。 |
| binary | Boolean | 否 | false | 是否使用二进制格式进行 COPY 读取。仅在 use_copy_statement=true 时生效。 |
| pg_copy_buffer_size | Int | 否 | 1048576 | COPY 读取的缓冲区大小(字节)。仅在 use_copy_statement=true 时生效。 |

> 注意:目前,此功能仅适用于 Postgres 数据库。

#### 兼容性与 SQL 构建

该功能与 JDBC Source 现有的分片(Sharding)机制完全兼容。当启用 `use_copy_statement=true` 时,SeaTunnel 会根据您配置的 `query`、`partition_column` 和 `where_condition` 自动构建高效的 COPY SQL 语句。

**SQL 生成逻辑示例:**

1. **分片与过滤**:若配置了 `partition_column` 或 `where_condition`,COPY 语句将包含子查询以支持数据切分:
```sql
COPY (
SELECT * FROM (
SELECT * FROM "schema"."table_name"
WHERE "partition_column" >= ? AND "partition_column" < ?
) tmp <where_condition>
) TO STDOUT WITH [BINARY|CSV];
```

2. **自定义查询**:若仅配置了 `query`,则直接基于查询语句构建:
```sql
COPY (
SELECT * FROM (<query>) tmp
) TO STDOUT WITH [BINARY|CSV];
```

示例配置:

```hocon
Jdbc {
url = "jdbc:postgresql://localhost:5432/test"
driver = "org.postgresql.Driver"
user = "postgres"
password = "password"
# 启用 COPY 方式
use_copy_statement = true
# 启用二进制 COPY 方式
binary = true
# 缓冲区大小
pg_copy_buffer_size = 1048576
}
```

## 并行读取器

JDBC 源连接器支持从表中并行读取数据。SeaTunnel 将使用某些规则分割表中的数据,这些数据将交给读取器进行读取。读取器的数量由 `parallelism` 选项确定。
Expand Down
7 changes: 7 additions & 0 deletions seatunnel-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>

<!-- <dependency>
<groupId>ATTACHED-JAR</groupId>
<artifactId>org.apache.seatunnel</artifactId>
<version>seatunnel-guava</version>
<scope>compile</scope>
</dependency> -->
</dependencies>

</project>
142 changes: 56 additions & 86 deletions seatunnel-connectors-v2/connector-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,22 @@
<sqlite.version>3.39.3.0</sqlite.version>
<tablestore.version>5.13.9</tablestore.version>
<teradata.version>17.20.00.12</teradata.version>
<redshift.version>2.1.0.30</redshift.version>
<saphana.version>2.23.10</saphana.version>
<redshift.version>2.1.0.9</redshift.version>
<saphana.version>2.14.7</saphana.version>
<snowflake.version>3.13.29</snowflake.version>
<vertica.version>12.0.3-0</vertica.version>
<hikari.version>4.0.3</hikari.version>
<postgis.jdbc.version>2.5.1</postgis.jdbc.version>
<kingbase8.version>8.6.0</kingbase8.version>
<hive.jdbc.version>3.1.3</hive.jdbc.version>
<oceanbase.jdbc.version>2.4.12</oceanbase.jdbc.version>
<oceanbase.jdbc.version>2.4.11</oceanbase.jdbc.version>
<xugu.jdbc.version>12.2.0</xugu.jdbc.version>
<iris.jdbc.version>3.0.0</iris.jdbc.version>
<tikv.version>3.2.0</tikv.version>
<opengauss.jdbc.version>5.1.0-og</opengauss.jdbc.version>
<mariadb.jdbc.version>3.5.1</mariadb.jdbc.version>
<mariadb.version>2.7.6</mariadb.version>
<highgo.version>6.2.3</highgo.version>
<presto.version>0.279</presto.version>
<trino.version>460</trino.version>
<aws.sdk.version>2.31.30</aws.sdk.version>
<duckdb.version>1.3.1.0</duckdb.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -210,48 +207,13 @@
<version>${iris.jdbc.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.tikv</groupId>
<artifactId>tikv-client-java</artifactId>
<version>${tikv.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.opengauss</groupId>
<artifactId>opengauss-jdbc</artifactId>
<version>${opengauss.jdbc.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>${mariadb.jdbc.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.highgo</groupId>
<artifactId>HgdbJdbc</artifactId>
<version>${highgo.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-jdbc</artifactId>
<version>${presto.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-jdbc</artifactId>
<version>${trino.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.duckdb</groupId>
<artifactId>duckdb_jdbc</artifactId>
<version>${duckdb.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand All @@ -263,10 +225,9 @@
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-hikari</artifactId>
<version>${project.version}</version>
<classifier>optional</classifier>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>${hikari.version}</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -362,63 +323,72 @@
<dependency>
<groupId>org.tikv</groupId>
<artifactId>tikv-client-java</artifactId>
</dependency>
<dependency>
<groupId>org.opengauss</groupId>
<artifactId>opengauss-jdbc</artifactId>
<version>${tikv.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>${mariadb.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.highgo</groupId>
<artifactId>HgdbJdbc</artifactId>
<version>${highgo.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-jdbc</artifactId>
<version>${presto.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.duckdb</groupId>
<artifactId>duckdb_jdbc</artifactId>
<version>${trino.version}</version>
<scope>provided</scope>
</dependency>
<!-- AWS SDK for DSQL -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dsql</artifactId>
<version>${aws.sdk.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
<version>${aws.sdk.version}</version>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>regions</artifactId>
<version>${aws.sdk.version}</version>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
<version>${aws.sdk.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>shade</goal>
</goals>
<phase>package</phase>
<configuration>
<createSourcesJar>false</createSourcesJar>
<shadeSourcesContent>true</shadeSourcesContent>
<shadedArtifactAttached>false</shadedArtifactAttached>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<relocations>
<!-- rename hikari to avoid jar conflict from spark -->
<relocation>
<pattern>com.zaxxer.hikari</pattern>
<shadedPattern>${seatunnel.shade.package}.com.zaxxer.hikari</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Loading