diff --git a/server/plugins/task-plugin-ob-mysql/src/main/java/com/oceanbase/odc/plugin/task/obmysql/datatransfer/factory/DumpParameterFactory.java b/server/plugins/task-plugin-ob-mysql/src/main/java/com/oceanbase/odc/plugin/task/obmysql/datatransfer/factory/DumpParameterFactory.java index 4cb08d33fc..1b34d55bef 100644 --- a/server/plugins/task-plugin-ob-mysql/src/main/java/com/oceanbase/odc/plugin/task/obmysql/datatransfer/factory/DumpParameterFactory.java +++ b/server/plugins/task-plugin-ob-mysql/src/main/java/com/oceanbase/odc/plugin/task/obmysql/datatransfer/factory/DumpParameterFactory.java @@ -82,6 +82,8 @@ protected DumpParameter doGenerate(File workingDir) throws IOException { parameter.setMaxFileSize(transferConfig.getMaxDumpSizeBytes()); } parameter.setSchemaless(true); + // ob-dumper would use `show create table` for dump schema ddl + parameter.setCompactSchema(true); setFetchSize(parameter); setTransferFormat(parameter, transferConfig); setDumpObjects(parameter, transferConfig); diff --git a/server/plugins/task-plugin-ob-mysql/src/main/java/com/oceanbase/odc/plugin/task/obmysql/datatransfer/task/BaseOceanBaseTransferJob.java b/server/plugins/task-plugin-ob-mysql/src/main/java/com/oceanbase/odc/plugin/task/obmysql/datatransfer/task/BaseOceanBaseTransferJob.java index 8d17003668..cb67603fd6 100644 --- a/server/plugins/task-plugin-ob-mysql/src/main/java/com/oceanbase/odc/plugin/task/obmysql/datatransfer/task/BaseOceanBaseTransferJob.java +++ b/server/plugins/task-plugin-ob-mysql/src/main/java/com/oceanbase/odc/plugin/task/obmysql/datatransfer/task/BaseOceanBaseTransferJob.java @@ -23,6 +23,7 @@ import java.util.stream.Collectors; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +36,7 @@ import com.oceanbase.tools.loaddump.common.enums.ObjectType; import com.oceanbase.tools.loaddump.common.model.BaseParameter; import com.oceanbase.tools.loaddump.common.model.DumpParameter; +import com.oceanbase.tools.loaddump.common.model.LoadParameter; import com.oceanbase.tools.loaddump.common.model.ObjectStatus; import com.oceanbase.tools.loaddump.common.model.TaskDetail; import com.oceanbase.tools.loaddump.context.TaskContext; @@ -152,7 +154,7 @@ public DataTransferTaskResult call() throws Exception { if (schemaContext == null) { throw new NullPointerException("Data task context is null"); } - syncWaitFinished(schemaContext); + syncWaitFinished(schemaContext, true); } if (transferData) { @@ -162,7 +164,7 @@ public DataTransferTaskResult call() throws Exception { if (dataContext == null) { throw new NullPointerException("Data task context is null"); } - syncWaitFinished(dataContext); + syncWaitFinished(dataContext, false); } LOGGER.info("Transfer task finished by ob-loader-dumper!"); @@ -181,8 +183,28 @@ public DataTransferTaskResult call() throws Exception { return new DataTransferTaskResult(getDataObjectsStatus(), getSchemaObjectsStatus()); } + /** + * When user configured {@link DataTransferConfig#replaceSchemaWhenExists} is false, we would set + * {@link LoadParameter#replaceObjectIfExists} to false. But ob-loader would still report error if + * an object exists already. So we should eat these exceptions. Our strategy for handling exceptions + * is as follows: + * + *
+     * +----------------+--------------------------+----------------------------+-----------------------------+
+     * | stopWhenError | replaceSchemaWhenExists | Error Message Contains "already exists" | Handling Behavior |
+     * +----------------+--------------------------+----------------------------+-----------------------------+
+     * | true          |          N/A            |            N/A             |   Always skip errors        |
+     * +----------------+--------------------------+----------------------------+-----------------------------+
+     * | false         |         true            |            N/A             |   Throw exception           |
+     * +----------------+--------------------------+----------------------------+-----------------------------+
+     * | false         |         false           |          Yes               |   Filter out, not throw     |
+     * +----------------+--------------------------+----------------------------+-----------------------------+
+     * | false         |         false           |           No               |   Throw exception           |
+     * +----------------+--------------------------+----------------------------+-----------------------------+
+     * 
+ */ @SuppressWarnings("all") - private void syncWaitFinished(@NonNull TaskContext context) throws InterruptedException { + private void syncWaitFinished(@NonNull TaskContext context, boolean isTransferSchema) throws InterruptedException { while (!Thread.currentThread().isInterrupted() && !status.isTerminated()) { if (context.isAllTasksSuccessed()) { shutdownContext(context); @@ -193,6 +215,18 @@ private void syncWaitFinished(@NonNull TaskContext context) throws InterruptedEx if (CollectionUtils.isEmpty(failedTasks)) { throw new IllegalStateException("No failed task details"); } + if (isTransferSchema && parameter instanceof LoadParameter) { + if (((LoadParameter) parameter).getMaxErrors() == -1) { + return; + } + if (!((LoadParameter) parameter).isReplaceObjectIfExists()) { + failedTasks = failedTasks.stream() + .filter(i -> !isCreateExistsObjectError(i.getError())).collect(Collectors.toList()); + if (failedTasks.isEmpty()) { + return; + } + } + } String errorMsg = failedTasks.stream() .map(i -> i.getSchemaTable() + ": " + i.getError()) .collect(Collectors.joining("\n")); @@ -255,4 +289,9 @@ private List transformStatus(List origin) { .collect(Collectors.toList()); } + private boolean isCreateExistsObjectError(String error) { + return StringUtils.containsIgnoreCase(error, "already exist") || + StringUtils.containsIgnoreCase(error, "already used by an existing object"); + } + } diff --git a/server/plugins/task-plugin-oracle/src/main/java/com/oceanbase/odc/plugin/task/oracle/datatransfer/job/factory/OracleTransferJobFactory.java b/server/plugins/task-plugin-oracle/src/main/java/com/oceanbase/odc/plugin/task/oracle/datatransfer/job/factory/OracleTransferJobFactory.java index c32d905a02..14e9529614 100644 --- a/server/plugins/task-plugin-oracle/src/main/java/com/oceanbase/odc/plugin/task/oracle/datatransfer/job/factory/OracleTransferJobFactory.java +++ b/server/plugins/task-plugin-oracle/src/main/java/com/oceanbase/odc/plugin/task/oracle/datatransfer/job/factory/OracleTransferJobFactory.java @@ -20,6 +20,7 @@ import java.sql.Connection; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import javax.sql.DataSource; @@ -112,7 +113,11 @@ protected AbstractJob generateDataXImportJob(ObjectResult object, JobConfigurati MySQLWriterPluginParameter parameter = (MySQLWriterPluginParameter) writer.getParameter(); parameter.setSession(getSessionOptions()); parameter.setWriteMode(null); - parameter.setPreSql(null); + if (transferConfig.isTruncateTableBeforeImport()) { + parameter.setPreSql(Collections.singletonList("TRUNCATE TABLE " + object.getName())); + } else { + parameter.setPreSql(null); + } parameter.setPostSql(null); writer.setName("oraclewriter"); return new DataXTransferJob(object, jobConfiguration, workingDir, logDir);