Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ protected DumpParameter doGenerate(File workingDir) throws IOException {
parameter.setMaxFileSize(transferConfig.getMaxDumpSizeBytes());
}
parameter.setSchemaless(true);
// ob-dumper would use `show create table` for dump schema ddl
parameter.setCompactSchema(true);
setFetchSize(parameter);
setTransferFormat(parameter, transferConfig);
setDumpObjects(parameter, transferConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.stream.Collectors;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -35,6 +36,7 @@
import com.oceanbase.tools.loaddump.common.enums.ObjectType;
import com.oceanbase.tools.loaddump.common.model.BaseParameter;
import com.oceanbase.tools.loaddump.common.model.DumpParameter;
import com.oceanbase.tools.loaddump.common.model.LoadParameter;
import com.oceanbase.tools.loaddump.common.model.ObjectStatus;
import com.oceanbase.tools.loaddump.common.model.TaskDetail;
import com.oceanbase.tools.loaddump.context.TaskContext;
Expand Down Expand Up @@ -152,7 +154,7 @@ public DataTransferTaskResult call() throws Exception {
if (schemaContext == null) {
throw new NullPointerException("Data task context is null");
}
syncWaitFinished(schemaContext);
syncWaitFinished(schemaContext, true);
}

if (transferData) {
Expand All @@ -162,7 +164,7 @@ public DataTransferTaskResult call() throws Exception {
if (dataContext == null) {
throw new NullPointerException("Data task context is null");
}
syncWaitFinished(dataContext);
syncWaitFinished(dataContext, false);
}

LOGGER.info("Transfer task finished by ob-loader-dumper!");
Expand All @@ -181,8 +183,28 @@ public DataTransferTaskResult call() throws Exception {
return new DataTransferTaskResult(getDataObjectsStatus(), getSchemaObjectsStatus());
}

/**
* When user configured {@link DataTransferConfig#replaceSchemaWhenExists} is false, we would set
* {@link LoadParameter#replaceObjectIfExists} to false. But ob-loader would still report error if
* an object exists already. So we should eat these exceptions. Our strategy for handling exceptions
* is as follows:
*
* <pre>
* +----------------+--------------------------+----------------------------+-----------------------------+
* | stopWhenError | replaceSchemaWhenExists | Error Message Contains "already exists" | Handling Behavior |
* +----------------+--------------------------+----------------------------+-----------------------------+
* | true | N/A | N/A | Always skip errors |
* +----------------+--------------------------+----------------------------+-----------------------------+
* | false | true | N/A | Throw exception |
* +----------------+--------------------------+----------------------------+-----------------------------+
* | false | false | Yes | Filter out, not throw |
* +----------------+--------------------------+----------------------------+-----------------------------+
* | false | false | No | Throw exception |
* +----------------+--------------------------+----------------------------+-----------------------------+
* </pre>
*/
@SuppressWarnings("all")
private void syncWaitFinished(@NonNull TaskContext context) throws InterruptedException {
private void syncWaitFinished(@NonNull TaskContext context, boolean isTransferSchema) throws InterruptedException {
while (!Thread.currentThread().isInterrupted() && !status.isTerminated()) {
if (context.isAllTasksSuccessed()) {
shutdownContext(context);
Expand All @@ -193,6 +215,18 @@ private void syncWaitFinished(@NonNull TaskContext context) throws InterruptedEx
if (CollectionUtils.isEmpty(failedTasks)) {
throw new IllegalStateException("No failed task details");
}
if (isTransferSchema && parameter instanceof LoadParameter) {
if (((LoadParameter) parameter).getMaxErrors() == -1) {
return;
}
if (!((LoadParameter) parameter).isReplaceObjectIfExists()) {
failedTasks = failedTasks.stream()
.filter(i -> !isCreateExistsObjectError(i.getError())).collect(Collectors.toList());
if (failedTasks.isEmpty()) {
return;
}
}
}
String errorMsg = failedTasks.stream()
.map(i -> i.getSchemaTable() + ": " + i.getError())
.collect(Collectors.joining("\n"));
Expand Down Expand Up @@ -255,4 +289,9 @@ private List<ObjectResult> transformStatus(List<ObjectStatus> origin) {
.collect(Collectors.toList());
}

private boolean isCreateExistsObjectError(String error) {
return StringUtils.containsIgnoreCase(error, "already exist") ||
StringUtils.containsIgnoreCase(error, "already used by an existing object");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import javax.sql.DataSource;
Expand Down Expand Up @@ -112,7 +113,11 @@ protected AbstractJob generateDataXImportJob(ObjectResult object, JobConfigurati
MySQLWriterPluginParameter parameter = (MySQLWriterPluginParameter) writer.getParameter();
parameter.setSession(getSessionOptions());
parameter.setWriteMode(null);
parameter.setPreSql(null);
if (transferConfig.isTruncateTableBeforeImport()) {
parameter.setPreSql(Collections.singletonList("TRUNCATE TABLE " + object.getName()));
} else {
parameter.setPreSql(null);
}
parameter.setPostSql(null);
writer.setName("oraclewriter");
return new DataXTransferJob(object, jobConfiguration, workingDir, logDir);
Expand Down