[Feature]Add Kingbase Catalog Support#10427
Conversation
…NT type in MySQL compatibility mode
|
Hi @LeonYoah, thanks for this contribution! Adding Catalog support for Kingbase is a great enhancement. After reviewing the code, I have a few suggestions to improve code consistency and compatibility: 1. Naming InconsistencyThe existing classes in the project typically use Suggestion: Please rename the new classes to match the existing convention:
2. Compatibility Risk with
|
Thank you for your review. I will make the changes. |
Issue 1: All Tests DisabledLocation: @Disabled("Please Test it in your local environment")
class KingbaseCatalogTest {
// All test methods are disabled
}Problem Description: Potential Risks:
Scope of Impact:
Severity: CRITICAL Improvement Suggestions: // Remove @Disabled or use Testcontainers
@Testcontainers
class KingbaseCatalogTest {
@Container
private static final PostgreSQLContainer<?> kingbaseContainer =
new PostgreSQLContainer<>("kingbase/kes:latest")
.withDatabaseName("test")
.withUsername("kingbase")
.withPassword("kingbase");
@Test
void databaseExists() {
// Test using real containers
}
}Reason: Apache projects require all code changes to be covered by executable tests. Issue 2: Hardcoded Chinese Exception MessagesLocation: } catch (SQLException e) {
throw new CatalogException("查询数据库是否存在失败: " + databaseName, e);
}Problem Description: Related Context:
Potential Risks:
Scope of Impact:
Severity: MAJOR Improvement Suggestions: } catch (SQLException e) {
throw new CatalogException(
String.format("Failed to check if database exists: %s", databaseName),
e
);
}Reason: Complies with Apache project internationalization standards. Issue 3: Missing Null CheckLocation: @Override
protected Column buildColumn(ResultSet resultSet) throws SQLException {
String columnName = resultSet.getString("COLUMN_NAME");
String typeName = resultSet.getString("TYPE_NAME");
String fullTypeName = resultSet.getString("FULL_TYPE_NAME");
// ...
Object defaultValue = resultSet.getObject("DEFAULT_VALUE");
boolean isNullable = resultSet.getString("IS_NULLABLE").equals("YES");
// ...
}Problem Description: Related Context:
Potential Risks:
Scope of Impact:
Severity: MAJOR Improvement Suggestions: @Override
protected Column buildColumn(ResultSet resultSet) throws SQLException {
String columnName = resultSet.getString("COLUMN_NAME");
if (columnName == null) {
throw new SQLException("COLUMN_NAME cannot be null");
}
String typeName = resultSet.getString("TYPE_NAME");
String fullTypeName = resultSet.getString("FULL_TYPE_NAME");
long columnLength = resultSet.getLong("COLUMN_LENGTH");
long columnPrecision = resultSet.getLong("COLUMN_PRECISION");
int columnScale = resultSet.getInt("COLUMN_SCALE");
String columnComment = resultSet.getString("COLUMN_COMMENT");
Object defaultValue = resultSet.getObject("DEFAULT_VALUE");
String isNullableStr = resultSet.getString("IS_NULLABLE");
boolean isNullable = "YES".equalsIgnoreCase(isNullableStr);
BasicTypeDefine typeDefine = BasicTypeDefine.builder()
.name(columnName)
.columnType(typeName)
.dataType(typeName)
.length(columnLength)
.precision(columnPrecision)
.scale(columnScale)
.nullable(isNullable)
.defaultValue(defaultValue)
.comment(columnComment)
.build();
return KingbaseTypeConverter.INSTANCE.convert(typeDefine);
}Reason: Defensive programming to avoid NPE. Issue 4: SQL Injection Risk (Potential)Location: @Override
protected String getSelectColumnsSql(TablePath tablePath) {
return String.format(
SELECT_COLUMNS_SQL_TEMPLATE,
tablePath.getSchemaName(),
tablePath.getTableName());
}Problem Description: Related Context:
Potential Risks:
Scope of Impact:
Severity: MAJOR Improvement Suggestions: @Override
protected String getSelectColumnsSql(TablePath tablePath) {
String schemaName = escapeSqlIdentifier(tablePath.getSchemaName());
String tableName = escapeSqlIdentifier(tablePath.getTableName());
return String.format(SELECT_COLUMNS_SQL_TEMPLATE, schemaName, tableName);
}
private String escapeSqlIdentifier(String identifier) {
return identifier.replace("'", "''");
}Reason: Prevent SQL injection, comply with secure coding standards. Issue 5: KB_CLOB Type Sets columnLength TwiceLocation: case KB_CLOB:
builder.dataType(BasicType.STRING_TYPE);
builder.columnLength(typeDefine.getLength());
builder.columnLength((long) (1024 * 1024 * 1024)); // Second setup
break;Problem Description: Related Context:
Potential Risks:
Scope of Impact:
Severity: MINOR Improvement Suggestions: case KB_CLOB:
builder.dataType(BasicType.STRING_TYPE);
if (typeDefine.getLength() != null && typeDefine.getLength() > 0) {
builder.columnLength(typeDefine.getLength());
} else {
builder.columnLength((long) (1024 * 1024 * 1024));
}
break;Reason: Fix logic error, preserve user-defined length. Issue 6: MySqlTypeConverter Constant Visibility Modification Affects EncapsulationLocation: // Before modification
-static final String MYSQL_INT = "INT";
-static final String MYSQL_DATETIME = "DATETIME";
// ... 38 constants
// After modification
+public static final String MYSQL_INT = "INT";
+public static final String MYSQL_DATETIME = "DATETIME";
// ... 38 constantsProblem Description: Related Context:
Potential Risks:
Scope of Impact:
Severity: MINOR Improvement Suggestions: // Create new file: JdbcCompatibilityTypeConstants.java
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
public final class JdbcCompatibilityTypeConstants {
// MySQL compatibility types
public static final String MYSQL_INT = "INT";
public static final String MYSQL_DATETIME = "DATETIME";
// ...
// Oracle compatibility types
public static final String ORACLE_NUMBER = "NUMBER";
// ...
// SQLServer compatibility types
public static final String SQLSERVER_DATETIME2 = "DATETIME2";
// ...
private JdbcCompatibilityTypeConstants() {}
}Then reference in Kingbase: import static org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcCompatibilityTypeConstants.*;
case MYSQL_INT:
builder.dataType(BasicType.INT_TYPE);
break;Reason: Centrally manage compatibility type constants, reduce coupling, improve maintainability. Issue 8: seatunnel-dist/pom.xml Not UpdatedLocation: Problem Description: Related Context:
Potential Risks:
Scope of Impact:
Severity: MAJOR Improvement Suggestions: <!-- 在 seatunnel-dist/pom.xml 中 -->
<dependency>
<groupId>com.kingbase</groupId>
<artifactId>kingbase8</artifactId>
<version>${kingbase.driver.version}</version>
</dependency>Reason: Ensure releases include all necessary dependencies. Issue 9: Incomplete DocumentationLocation: Problem Description: Related Context:
Potential Risks:
Scope of Impact:
Severity: MAJOR Improvement Suggestions: ## Catalog Support
Kingbase connector supports catalog functionality for metadata querying and automatic table creation.
### Supported Data Types
| Kingbase Type | SeaTunnel Type | Notes |
|---------------|----------------|-------|
| INT | INT | MySQL compatibility mode |
| INTEGER | INT | Native type |
| VARCHAR | STRING | |
| NUMBER | DECIMAL | Oracle compatibility mode |
| ... | ... | |
### MySQL Compatibility Mode
When Kingbase is running in MySQL compatibility mode, the connector will automatically map MySQL types (e.g., INT, DATETIME) to appropriate SeaTunnel types.
### Example
```hocon
source {
KingbaseCatalog {
catalog {
database = "test_db"
schema = "public"
}
table_path = "public.my_table"
}
}Reason: Complies with PR Checklist requirements, ensures correct CI operation. |
|
|
||
| import java.util.List; | ||
|
|
||
| @Disabled("Please Test it in your local environment") |
There was a problem hiding this comment.
Unit testing introduces containers, plus dialect and catalog testing, reference: #10210
There was a problem hiding this comment.
OK, I will modify the Kingbase unit test support and add container test support.
| BasicTypeDefine typeDefine = | ||
| BasicTypeDefine.builder() | ||
| .name(columnName) | ||
| .columnType(typeName) |
There was a problem hiding this comment.
FULL_TYPE_NAME is read but not used; columnType/sourceType loses information such as VARCHAR(255), CHAR(10), and NUMERIC(38,18).
| .append(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "\"")) | ||
| .append(CatalogUtils.quoteIdentifier(" IS '", fieldIde)) | ||
| .append(column.getComment()) | ||
| .append("'"); |
There was a problem hiding this comment.
Unescaped single quotes in comment concatenation: it is recommended to use replace("'", "''").
There was a problem hiding this comment.
Thank you for your review. Please note that this feature is already available in other connectors. I forgot to add it and will include it later.
…comment and fullTypeName
…ue where the source type may be null
There was a problem hiding this comment.
Please obtain KINGBASE_LICENSE through environment variables
There was a problem hiding this comment.
Could you give an example? Which variable is it placed in? Does it need to be set in the GitHub repository, such as in GitHub Secrets?
…btain the license
…btain the license
…btain the license
.github/workflows/backend.yml
Outdated
| ./mvnw -B -T 1 clean verify -DskipUT=false -DskipIT=true -D"license.skipAddThirdParty"=true -D"skip.ui"=true --no-snapshot-updates | ||
| env: | ||
| MAVEN_OPTS: -Xmx4096m | ||
| KINGBASE_LICENSE: ${{ vars.KINGBASE_LICENSE }} |
There was a problem hiding this comment.
If it is not used, please delete it
Purpose of this pull request
Added support for the catalog functionality of the domestic database Kingbase, enabling metadata querying and auto table creation. Additionally, a related fix was implemented: When using query parameters in JDBC source, if the Kingbase instance is running in MySQL compatibility mode, the operation previously failed due to unsupported INT type errors. This occurred because the Kingbase JDBC driver's compatibility adaptations returned INT types, prompting this corrective update
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.