diff --git a/sync_diff_inspector/diff.go b/sync_diff_inspector/diff.go index 965a66ae7..bb5aad010 100644 --- a/sync_diff_inspector/diff.go +++ b/sync_diff_inspector/diff.go @@ -141,6 +141,15 @@ func (df *Diff) init(ctx context.Context, cfg *config.Config) (err error) { return errors.Trace(err) } + // check the upstream's table whether some tables are not matched with that from tableDiffs. + newDiffTables, passed := df.upstream.CheckTablesMatched(df.report) + if !passed { + df.upstream.UpdateTables(newDiffTables) + df.downstream.UpdateTables(newDiffTables) + } else { + log.Info("table match check passed!!") + } + df.workSource = df.pickSource(ctx) df.FixSQLDir = cfg.Task.FixDir df.CheckpointDir = cfg.Task.CheckpointDir diff --git a/sync_diff_inspector/report/report.go b/sync_diff_inspector/report/report.go index 684e581c1..07f080ba8 100644 --- a/sync_diff_inspector/report/report.go +++ b/sync_diff_inspector/report/report.go @@ -43,6 +43,7 @@ const ( // Fail means not all data or struct of tables are equal Fail = "fail" Error = "error" + Warn = "warn" ) // ReportConfig stores the config information for the user @@ -74,18 +75,24 @@ type ChunkResult struct { RowsDelete int `json:"rows-delete"` // `RowsDelete` is the number of rows needed to delete } +type MissingTables struct { + MissingTargetTables []string `json:"missing-target-tables"` + MissingSourceTables []string `json:"missing-source-tables"` +} + // Report saves the check results. type Report struct { sync.RWMutex - Result string `json:"-"` // Result is pass or fail - PassNum int32 `json:"-"` // The pass number of tables - FailedNum int32 `json:"-"` // The failed number of tables - TableResults map[string]map[string]*TableResult `json:"table-results"` // TableResult saved the map of `schema` => `table` => `tableResult` - StartTime time.Time `json:"start-time"` - Duration time.Duration `json:"time-duration"` - TotalSize int64 `json:"-"` // Total size of the checked tables - SourceConfig [][]byte `json:"-"` - TargetConfig []byte `json:"-"` + Result string `json:"-"` // Result is pass or fail + PassNum int32 `json:"-"` // The pass number of tables + FailedNum int32 `json:"-"` // The failed number of tables + TableResults map[string]map[string]*TableResult `json:"table-results"` // TableResult saved the map of `schema` => `table` => `tableResult` + StartTime time.Time `json:"start-time"` + Duration time.Duration `json:"time-duration"` + TotalSize int64 `json:"-"` // Total size of the checked tables + SourceConfig [][]byte `json:"-"` + TargetConfig []byte `json:"-"` + MissingTables MissingTables `json:"missing-tables"` task *config.TaskConfig `json:"-"` } @@ -220,6 +227,21 @@ func (r *Report) CommitSummary() error { table.Render() summaryFile.WriteString(tableString.String()) } + + if len(r.MissingTables.MissingSourceTables) > 0 { + summaryFile.WriteString("\nWarn: some tables from source are skipped, because the target has no table to becompared with:") + for _, tableName := range r.MissingTables.MissingSourceTables { + summaryFile.WriteString(fmt.Sprintf("\n\t%s", tableName)) + } + } + + if len(r.MissingTables.MissingTargetTables) > 0 { + summaryFile.WriteString("\nWarn: some tables from target are skipped, because the source has no table to becompared with:") + for _, tableName := range r.MissingTables.MissingTargetTables { + summaryFile.WriteString(fmt.Sprintf("\n\t%s", tableName)) + } + } + duration := r.Duration + time.Since(r.StartTime) summaryFile.WriteString(fmt.Sprintf("\nTime Cost: %s\n", duration)) summaryFile.WriteString(fmt.Sprintf("Average Speed: %fMB/s\n", float64(r.TotalSize)/(1024.0*1024.0*duration.Seconds()))) @@ -261,6 +283,20 @@ func (r *Report) Print(w io.Writer) error { } summary.WriteString(fmt.Sprintf("You can view the comparision details through '%s/%s'\n", r.task.OutputDir, config.LogFileName)) } + + if len(r.MissingTables.MissingSourceTables) > 0 { + summary.WriteString("\nWarn: some tables from source are skipped, because the target has no table to becompared with:\n") + for _, tableName := range r.MissingTables.MissingSourceTables { + summary.WriteString(fmt.Sprintf("\t%s\n", tableName)) + } + } + + if len(r.MissingTables.MissingTargetTables) > 0 { + summary.WriteString("\nWarn: some tables from target are skipped, because the source has no table to becompared with:\n") + for _, tableName := range r.MissingTables.MissingTargetTables { + summary.WriteString(fmt.Sprintf("\t%s\n", tableName)) + } + } fmt.Fprint(w, summary.String()) return nil } @@ -271,9 +307,23 @@ func NewReport(task *config.TaskConfig) *Report { TableResults: make(map[string]map[string]*TableResult), Result: Pass, task: task, + MissingTables: MissingTables{ + MissingTargetTables: make([]string, 0), + MissingSourceTables: make([]string, 0), + }, } } +func (r *Report) AddMissingTargetTable(table string) { + r.Result = Warn + r.MissingTables.MissingTargetTables = append(r.MissingTables.MissingTargetTables, table) +} + +func (r *Report) AddMissingSourceTable(table string) { + r.Result = Warn + r.MissingTables.MissingSourceTables = append(r.MissingTables.MissingSourceTables, table) +} + func (r *Report) Init(tableDiffs []*common.TableDiff, sourceConfig [][]byte, targetConfig []byte) { r.StartTime = time.Now() r.SourceConfig = sourceConfig diff --git a/sync_diff_inspector/source/mysql_shard.go b/sync_diff_inspector/source/mysql_shard.go index 004b8195e..4c5f932d1 100644 --- a/sync_diff_inspector/source/mysql_shard.go +++ b/sync_diff_inspector/source/mysql_shard.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb-tools/pkg/dbutil" "github.com/pingcap/tidb-tools/pkg/filter" "github.com/pingcap/tidb-tools/sync_diff_inspector/config" + "github.com/pingcap/tidb-tools/sync_diff_inspector/report" "github.com/pingcap/tidb-tools/sync_diff_inspector/source/common" "github.com/pingcap/tidb-tools/sync_diff_inspector/splitter" "github.com/pingcap/tidb-tools/sync_diff_inspector/utils" @@ -62,6 +63,10 @@ type MySQLSources struct { tableDiffs []*common.TableDiff sourceTablesMap map[string][]*common.TableShardSource + + // only for check + targetUniqueTableMap map[string]struct{} + sourceTablesAfterRoute map[string][]string } func getMatchedSourcesForTable(sourceTablesMap map[string][]*common.TableShardSource, table *common.TableDiff) []*common.TableShardSource { @@ -142,6 +147,14 @@ func (s *MySQLSources) GetTables() []*common.TableDiff { return s.tableDiffs } +func (s *MySQLSources) UpdateTables(tableDiffs []*common.TableDiff) { + s.tableDiffs = tableDiffs +} + +func (s *MySQLSources) CheckTablesMatched(report *report.Report) ([]*common.TableDiff, bool) { + return checkTableMatched(s.targetUniqueTableMap, s.sourceTablesAfterRoute, s.tableDiffs, report) +} + func (s *MySQLSources) GenerateFixSQL(t DMLType, upstreamData, downstreamData map[string]*dbutil.ColumnData, tableIndex int) string { switch t { case Insert: @@ -292,7 +305,7 @@ func NewMySQLSources(ctx context.Context, tableDiffs []*common.TableDiff, ds []* } // only used for check - sourceTablesAfterRoute := make(map[string]struct{}) + sourceTablesAfterRoute := make(map[string][]string) for i, sourceDB := range ds { sourceSchemas, err := dbutil.GetSchemas(ctx, sourceDB.Conn) @@ -323,7 +336,7 @@ func NewMySQLSources(ctx context.Context, tableDiffs []*common.TableDiff, ds []* // get all tables from all source db instance if f.MatchTable(targetSchema, targetTable) { // if match the filter, we should respect it and check target has this table later. - sourceTablesAfterRoute[uniqueId] = struct{}{} + sourceTablesAfterRoute[uniqueId] = append(sourceTablesAfterRoute[uniqueId], utils.UniqueID(schema, table)) } if _, ok := targetUniqueTableMap[uniqueId]; !ok { continue @@ -355,13 +368,12 @@ func NewMySQLSources(ctx context.Context, tableDiffs []*common.TableDiff, ds []* } - if err := checkTableMatched(targetUniqueTableMap, sourceTablesAfterRoute); err != nil { - return nil, errors.Annotatef(err, "please make sure the filter is correct.") - } - mss := &MySQLSources{ tableDiffs: tableDiffs, sourceTablesMap: sourceTablesMap, + + targetUniqueTableMap: targetUniqueTableMap, + sourceTablesAfterRoute: sourceTablesAfterRoute, } return mss, nil } diff --git a/sync_diff_inspector/source/source.go b/sync_diff_inspector/source/source.go index c7c235f41..6a0a86c84 100644 --- a/sync_diff_inspector/source/source.go +++ b/sync_diff_inspector/source/source.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb-tools/pkg/filter" tableFilter "github.com/pingcap/tidb-tools/pkg/table-filter" "github.com/pingcap/tidb-tools/sync_diff_inspector/config" + "github.com/pingcap/tidb-tools/sync_diff_inspector/report" "github.com/pingcap/tidb-tools/sync_diff_inspector/source/common" "github.com/pingcap/tidb-tools/sync_diff_inspector/splitter" "github.com/pingcap/tidb-tools/sync_diff_inspector/utils" @@ -93,6 +94,13 @@ type Source interface { // GetTables represents the tableDiffs. GetTables() []*common.TableDiff + // UpdateTables reset the tableDiffs. + UpdateTables([]*common.TableDiff) + + // check the upstream's table whether some tables are not matched with that from tableDiffs. + // and return new tablediffs. + CheckTablesMatched(*report.Report) ([]*common.TableDiff, bool) + // GetSourceStructInfo get the source table info from a given target table GetSourceStructInfo(context.Context, int) ([]*model.TableInfo, error) @@ -374,21 +382,32 @@ type RangeIterator interface { Close() } -func checkTableMatched(targetMap map[string]struct{}, sourceMap map[string]struct{}) error { +func checkTableMatched(targetMap map[string]struct{}, sourceMap map[string][]string, tableDiffs []*common.TableDiff, report *report.Report) ([]*common.TableDiff, bool) { + newTableDiffs := make([]*common.TableDiff, 0, len(tableDiffs)) + passed := true // check target exists but source not found - for tableDiff := range targetMap { + for _, tableDiff := range tableDiffs { // target table have all passed in tableFilter - if _, ok := sourceMap[tableDiff]; !ok { - return errors.Errorf("the source has no table to be compared. target-table is `%s`", tableDiff) + tableName := utils.UniqueID(tableDiff.Schema, tableDiff.Table) + if _, ok := sourceMap[tableName]; !ok { + log.Warn("the source has no table to be compared", zap.String("target-table", tableName)) + report.AddMissingTargetTable(tableName) + passed = false + } else { + newTableDiffs = append(newTableDiffs, tableDiff) } } // check source exists but target not found - for tableDiff := range sourceMap { + for tableName, sourceNames := range sourceMap { // need check source table have passd in tableFilter here - if _, ok := targetMap[tableDiff]; !ok { - return errors.Errorf("the target has no table to be compared. source-table is `%s`", tableDiff) + if _, ok := targetMap[tableName]; !ok { + for _, sourceName := range sourceNames { + log.Warn("the target has no table to be compared", zap.String("source-table", sourceName)) + report.AddMissingSourceTable(sourceName) + passed = false + } } } - log.Info("table match check passed!!") - return nil + + return newTableDiffs, passed } diff --git a/sync_diff_inspector/source/source_test.go b/sync_diff_inspector/source/source_test.go index 1bcc9fb01..7d0133e0e 100644 --- a/sync_diff_inspector/source/source_test.go +++ b/sync_diff_inspector/source/source_test.go @@ -29,6 +29,7 @@ import ( filter "github.com/pingcap/tidb-tools/pkg/table-filter" "github.com/pingcap/tidb-tools/sync_diff_inspector/chunk" "github.com/pingcap/tidb-tools/sync_diff_inspector/config" + "github.com/pingcap/tidb-tools/sync_diff_inspector/report" "github.com/pingcap/tidb-tools/sync_diff_inspector/source/common" "github.com/pingcap/tidb-tools/sync_diff_inspector/splitter" "github.com/pingcap/tidb-tools/sync_diff_inspector/utils" @@ -911,19 +912,42 @@ func TestInitTables(t *testing.T) { func TestCheckTableMatched(t *testing.T) { tmap := make(map[string]struct{}) - smap := make(map[string]struct{}) - - tmap["1"] = struct{}{} - tmap["2"] = struct{}{} - - smap["1"] = struct{}{} - smap["2"] = struct{}{} - require.NoError(t, checkTableMatched(tmap, smap)) - - delete(smap, "1") - require.Contains(t, checkTableMatched(tmap, smap).Error(), "the source has no table to be compared. target-table") - - delete(tmap, "1") - smap["1"] = struct{}{} - require.Contains(t, checkTableMatched(tmap, smap).Error(), "the target has no table to be compared. source-table") + smap := make(map[string][]string) + + tmap["`1`.`1`"] = struct{}{} + tmap["`2`.`2`"] = struct{}{} + + smap["`1`.`1`"] = []string{"1", "`1`.`1`"} + smap["`2`.`2`"] = []string{"2"} + + r := report.NewReport(nil) + tableDiff := []*common.TableDiff{ + {Schema: "1", Table: "1"}, + {Schema: "2", Table: "2"}, + } + newTableDiff, passed := checkTableMatched(tmap, smap, tableDiff, r) + require.True(t, passed) + require.Equal(t, len(newTableDiff), 2) + require.Equal(t, len(r.MissingTables.MissingSourceTables), 0) + require.Equal(t, len(r.MissingTables.MissingTargetTables), 0) + + r = report.NewReport(nil) + delete(smap, "`1`.`1`") + newTableDiff, passed = checkTableMatched(tmap, smap, tableDiff, r) + require.False(t, passed) + require.Equal(t, len(newTableDiff), 1) + require.Equal(t, len(r.MissingTables.MissingSourceTables), 0) + require.Equal(t, len(r.MissingTables.MissingTargetTables), 1) + + tableDiff = []*common.TableDiff{ + {Schema: "2", Table: "2"}, + } + r = report.NewReport(nil) + delete(tmap, "`1`.`1`") + smap["`1`.`1`"] = []string{"1", "`1`.`1`"} + newTableDiff, passed = checkTableMatched(tmap, smap, tableDiff, r) + require.False(t, passed) + require.Equal(t, len(newTableDiff), 1) + require.Equal(t, len(r.MissingTables.MissingSourceTables), 2) + require.Equal(t, len(r.MissingTables.MissingTargetTables), 0) } diff --git a/sync_diff_inspector/source/tidb.go b/sync_diff_inspector/source/tidb.go index 86ebdf826..2a33a0fe8 100644 --- a/sync_diff_inspector/source/tidb.go +++ b/sync_diff_inspector/source/tidb.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb-tools/pkg/dbutil" "github.com/pingcap/tidb-tools/pkg/filter" "github.com/pingcap/tidb-tools/sync_diff_inspector/config" + "github.com/pingcap/tidb-tools/sync_diff_inspector/report" "github.com/pingcap/tidb-tools/sync_diff_inspector/source/common" "github.com/pingcap/tidb-tools/sync_diff_inspector/splitter" "github.com/pingcap/tidb-tools/sync_diff_inspector/utils" @@ -91,6 +92,10 @@ type TiDBSource struct { dbConn *sql.DB version *semver.Version + + // only for check + targetUniqueTableMap map[string]struct{} + sourceTablesAfterRoute map[string][]string } func (s *TiDBSource) GetTableAnalyzer() TableAnalyzer { @@ -141,6 +146,14 @@ func (s *TiDBSource) GetTables() []*common.TableDiff { return s.tableDiffs } +func (s *TiDBSource) UpdateTables(tableDiffs []*common.TableDiff) { + s.tableDiffs = tableDiffs +} + +func (s *TiDBSource) CheckTablesMatched(report *report.Report) ([]*common.TableDiff, bool) { + return checkTableMatched(s.targetUniqueTableMap, s.sourceTablesAfterRoute, s.tableDiffs, report) +} + func (s *TiDBSource) GetSourceStructInfo(ctx context.Context, tableIndex int) ([]*model.TableInfo, error) { var err error tableInfos := make([]*model.TableInfo, 1) @@ -194,7 +207,7 @@ func (s *TiDBSource) GetSnapshot() string { return s.snapshot } -func getSourceTableMap(ctx context.Context, tableDiffs []*common.TableDiff, ds *config.DataSource, f tableFilter.Filter) (map[string]*common.TableSource, error) { +func getSourceTableMap(ctx context.Context, tableDiffs []*common.TableDiff, ds *config.DataSource, f tableFilter.Filter) (map[string]*common.TableSource, map[string]struct{}, map[string][]string, error) { sourceTableMap := make(map[string]*common.TableSource) log.Info("find router for tidb source") // we should get the real table name @@ -203,14 +216,14 @@ func getSourceTableMap(ctx context.Context, tableDiffs []*common.TableDiff, ds * for _, tableDiff := range tableDiffs { targetUniqueTableMap[utils.UniqueID(tableDiff.Schema, tableDiff.Table)] = struct{}{} } - sourceTablesAfterRoute := make(map[string]struct{}) + sourceTablesAfterRoute := make(map[string][]string) // instance -> db -> table allTablesMap := make(map[string]map[string]interface{}) sourceSchemas, err := dbutil.GetSchemas(ctx, ds.Conn) if err != nil { - return nil, errors.Annotatef(err, "get schemas from database") + return nil, nil, nil, errors.Annotatef(err, "get schemas from database") } for _, schema := range sourceSchemas { @@ -220,7 +233,7 @@ func getSourceTableMap(ctx context.Context, tableDiffs []*common.TableDiff, ds * } allTables, err := dbutil.GetTables(ctx, ds.Conn, schema) if err != nil { - return nil, errors.Annotatef(err, "get tables from %s", schema) + return nil, nil, nil, errors.Annotatef(err, "get tables from %s", schema) } allTablesMap[schema] = utils.SliceToMap(allTables) } @@ -231,14 +244,14 @@ func getSourceTableMap(ctx context.Context, tableDiffs []*common.TableDiff, ds * if ds.Router != nil { targetSchema, targetTable, err = ds.Router.Route(schema, table) if err != nil { - return nil, errors.Errorf("get route result for %s.%s failed, error %v", schema, table, err) + return nil, nil, nil, errors.Errorf("get route result for %s.%s failed, error %v", schema, table, err) } } uniqueId := utils.UniqueID(targetSchema, targetTable) if f.MatchTable(targetSchema, targetTable) { // if match the filter, we should respect it and check target has this table later. - sourceTablesAfterRoute[uniqueId] = struct{}{} + sourceTablesAfterRoute[uniqueId] = append(sourceTablesAfterRoute[uniqueId], utils.UniqueID(schema, table)) } if _, ok := targetUniqueTableMap[uniqueId]; ok { if _, ok := sourceTableMap[uniqueId]; ok { @@ -253,14 +266,11 @@ func getSourceTableMap(ctx context.Context, tableDiffs []*common.TableDiff, ds * } } - if err = checkTableMatched(targetUniqueTableMap, sourceTablesAfterRoute); err != nil { - return nil, errors.Annotatef(err, "please make sure the filter is correct.") - } - return sourceTableMap, nil + return sourceTableMap, targetUniqueTableMap, sourceTablesAfterRoute, nil } func NewTiDBSource(ctx context.Context, tableDiffs []*common.TableDiff, ds *config.DataSource, bucketSpliterPool *utils.WorkerPool, f tableFilter.Filter) (Source, error) { - sourceTableMap, err := getSourceTableMap(ctx, tableDiffs, ds, f) + sourceTableMap, targetUniqueTableMap, sourceTablesAfterRoute, err := getSourceTableMap(ctx, tableDiffs, ds, f) if err != nil { return nil, errors.Trace(err) } @@ -271,6 +281,9 @@ func NewTiDBSource(ctx context.Context, tableDiffs []*common.TableDiff, ds *conf dbConn: ds.Conn, bucketSpliterPool: bucketSpliterPool, version: utils.TryToGetVersion(ctx, ds.Conn), + + targetUniqueTableMap: targetUniqueTableMap, + sourceTablesAfterRoute: sourceTablesAfterRoute, } return ts, nil }