Skip to content

Commit e788c26

Browse files
committed
HBASE-28456 HBase Restore restores old data if data for the same timestamp is in different hfiles
1 parent 38aef80 commit e788c26

7 files changed

Lines changed: 328 additions & 15 deletions

File tree

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ public Job createSubmittableJob(String[] args) throws IOException {
9999
conf.set(FileInputFormat.INPUT_DIR, inputDirs);
100100
Job job = Job.getInstance(conf,
101101
conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
102+
// MapReduceHFileSplitter needs ExtendedCellSerialization so that sequenceId can be propagated
103+
// when sorting cells in CellSortReducer
104+
job.getConfiguration().setBoolean(HFileOutputFormat2.EXTENDED_CELL_SERIALIZATION_ENABLED_KEY,
105+
true);
102106
job.setJarByClass(MapReduceHFileSplitterJob.class);
103107
job.setInputFormatClass(HFileInputFormat.class);
104108
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.backup;
19+
20+
import static org.apache.hadoop.hbase.backup.BackupInfo.BackupState.COMPLETE;
21+
import static org.apache.hadoop.hbase.backup.BackupType.FULL;
22+
import static org.junit.Assert.*;
23+
24+
import java.io.IOException;
25+
import java.nio.ByteBuffer;
26+
import java.time.Instant;
27+
import java.util.*;
28+
import org.apache.hadoop.conf.Configuration;
29+
import org.apache.hadoop.fs.FileSystem;
30+
import org.apache.hadoop.fs.Path;
31+
import org.apache.hadoop.hbase.Cell;
32+
import org.apache.hadoop.hbase.HBaseClassTestRule;
33+
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
34+
import org.apache.hadoop.hbase.HBaseConfiguration;
35+
import org.apache.hadoop.hbase.KeyValue;
36+
import org.apache.hadoop.hbase.TableName;
37+
import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
38+
import org.apache.hadoop.hbase.backup.impl.BackupManager;
39+
import org.apache.hadoop.hbase.client.*;
40+
import org.apache.hadoop.hbase.io.hfile.HFile;
41+
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
42+
import org.apache.hadoop.hbase.testclassification.MediumTests;
43+
import org.apache.hadoop.hbase.testing.TestingHBaseCluster;
44+
import org.apache.hadoop.hbase.testing.TestingHBaseClusterOption;
45+
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
46+
import org.apache.hadoop.hbase.util.Bytes;
47+
import org.junit.After;
48+
import org.junit.Before;
49+
import org.junit.ClassRule;
50+
import org.junit.Test;
51+
import org.junit.experimental.categories.Category;
52+
import org.junit.runner.RunWith;
53+
import org.junit.runners.Parameterized;
54+
import org.slf4j.Logger;
55+
import org.slf4j.LoggerFactory;
56+
57+
@Category(MediumTests.class)
58+
@RunWith(Parameterized.class)
59+
public class TestBackupRestoreWithModifications {
60+
61+
@ClassRule
62+
public static final HBaseClassTestRule CLASS_RULE =
63+
HBaseClassTestRule.forClass(TestBackupRestoreWithModifications.class);
64+
65+
@Parameterized.Parameters(name = "{index}: useBulkLoad={0}")
66+
public static Iterable<Object[]> data() {
67+
return HBaseCommonTestingUtil.BOOLEAN_PARAMETERIZED;
68+
}
69+
70+
@Parameterized.Parameter(0)
71+
public boolean useBulkLoad;
72+
73+
private TestingHBaseCluster cluster;
74+
75+
private static final TableName SOURCE_TABLE_NAME = TableName.valueOf("A_TABLE");
76+
private static final TableName TARGET_TABLE_NAME = TableName.valueOf("ANOTHER_TABLE");
77+
private static final List<TableName> ALL_TABLES =
78+
Arrays.asList(SOURCE_TABLE_NAME, TARGET_TABLE_NAME);
79+
private static Path BACKUP_ROOT_DIR = new Path("backupIT");
80+
private static final byte[] COLUMN_FAMILY = Bytes.toBytes("0");
81+
82+
private static final Logger LOG =
83+
LoggerFactory.getLogger(TestBackupRestoreWithModifications.class);
84+
85+
@Before
86+
public void setUpCluster() throws Exception {
87+
Configuration conf = HBaseConfiguration.create();
88+
conf.setBoolean("hbase.mapreduce.hfileoutputformat.extendedcell.enabled", true);
89+
enableBackup(conf);
90+
cluster = TestingHBaseCluster.create(TestingHBaseClusterOption.builder().conf(conf).build());
91+
cluster.start();
92+
createTable(SOURCE_TABLE_NAME);
93+
createTable(TARGET_TABLE_NAME);
94+
}
95+
96+
@After
97+
public void tearDown() throws Exception {
98+
cluster.stop();
99+
}
100+
101+
@Test
102+
public void testModificationsOnTable() throws Exception {
103+
Instant timestamp = Instant.now();
104+
105+
// load some data
106+
load(SOURCE_TABLE_NAME, timestamp, "data");
107+
108+
String backupId = backup(FULL, ALL_TABLES);
109+
BackupInfo backupInfo = verifyBackup(backupId, FULL, COMPLETE);
110+
assertTrue(backupInfo.getTables().contains(SOURCE_TABLE_NAME));
111+
112+
restore(backupId, SOURCE_TABLE_NAME, TARGET_TABLE_NAME);
113+
validateDataEquals(SOURCE_TABLE_NAME, "data");
114+
validateDataEquals(TARGET_TABLE_NAME, "data");
115+
116+
// load new data on the same timestamp
117+
load(SOURCE_TABLE_NAME, timestamp, "changed_data");
118+
119+
backupId = backup(FULL, ALL_TABLES);
120+
backupInfo = verifyBackup(backupId, FULL, COMPLETE);
121+
assertTrue(backupInfo.getTables().contains(SOURCE_TABLE_NAME));
122+
123+
restore(backupId, SOURCE_TABLE_NAME, TARGET_TABLE_NAME);
124+
validateDataEquals(SOURCE_TABLE_NAME, "changed_data");
125+
validateDataEquals(TARGET_TABLE_NAME, "changed_data");
126+
}
127+
128+
private void createTable(TableName tableName) throws IOException {
129+
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName)
130+
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY));
131+
try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
132+
Admin admin = connection.getAdmin()) {
133+
admin.createTable(builder.build());
134+
}
135+
}
136+
137+
private void load(TableName tableName, Instant timestamp, String data) throws IOException {
138+
if (useBulkLoad) {
139+
hFileBulkLoad(tableName, timestamp, data);
140+
} else {
141+
putLoad(tableName, timestamp, data);
142+
}
143+
}
144+
145+
private void putLoad(TableName tableName, Instant timestamp, String data) throws IOException {
146+
LOG.info("Writing new data to HBase: " + data);
147+
try (Connection connection = ConnectionFactory.createConnection(cluster.getConf())) {
148+
Table table = connection.getTable(SOURCE_TABLE_NAME);
149+
List<Put> puts = new ArrayList<>();
150+
for (int i = 0; i < 10; i++) {
151+
Put put = new Put(Bytes.toBytes(i), timestamp.toEpochMilli());
152+
put.addColumn(COLUMN_FAMILY, Bytes.toBytes("data"), Bytes.toBytes(data));
153+
puts.add(put);
154+
155+
if (i % 100 == 0) {
156+
table.put(puts);
157+
puts.clear();
158+
}
159+
}
160+
if (!puts.isEmpty()) {
161+
table.put(puts);
162+
}
163+
connection.getAdmin().flush(tableName);
164+
}
165+
}
166+
167+
private void hFileBulkLoad(TableName tableName, Instant timestamp, String data)
168+
throws IOException {
169+
FileSystem fs = FileSystem.get(cluster.getConf());
170+
LOG.info("Writing new data to HBase: " + data);
171+
// HFiles require this strict directory structure to allow to load them
172+
Path hFileRootPath = new Path("/tmp/hfiles_" + UUID.randomUUID());
173+
fs.mkdirs(hFileRootPath);
174+
Path hFileFamilyPath = new Path(hFileRootPath, Bytes.toString(COLUMN_FAMILY));
175+
fs.mkdirs(hFileFamilyPath);
176+
try (HFile.Writer writer = HFile.getWriterFactoryNoCache(cluster.getConf())
177+
.withPath(fs, new Path(hFileFamilyPath, "hfile_" + UUID.randomUUID()))
178+
.withFileContext(new HFileContextBuilder().withTableName(tableName.toBytes())
179+
.withColumnFamily(COLUMN_FAMILY).build())
180+
.create()) {
181+
for (int i = 0; i < 10; i++) {
182+
writer.append(new KeyValue(Bytes.toBytes(i), COLUMN_FAMILY, Bytes.toBytes("data"),
183+
timestamp.toEpochMilli(), Bytes.toBytes(data)));
184+
}
185+
}
186+
Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> result =
187+
BulkLoadHFiles.create(cluster.getConf()).bulkLoad(tableName, hFileRootPath);
188+
assertFalse(result.isEmpty());
189+
}
190+
191+
private String backup(BackupType backupType, List<TableName> tables) throws IOException {
192+
LOG.info("Creating the backup ...");
193+
194+
try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
195+
BackupAdmin backupAdmin = new BackupAdminImpl(connection)) {
196+
BackupRequest backupRequest =
197+
new BackupRequest.Builder().withTargetRootDir(BACKUP_ROOT_DIR.toString())
198+
.withTableList(new ArrayList<>(tables)).withBackupType(backupType).build();
199+
return backupAdmin.backupTables(backupRequest);
200+
}
201+
202+
}
203+
204+
private void restore(String backupId, TableName sourceTableName, TableName targetTableName)
205+
throws IOException {
206+
LOG.info("Restoring data ...");
207+
try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
208+
BackupAdmin backupAdmin = new BackupAdminImpl(connection)) {
209+
RestoreRequest restoreRequest = new RestoreRequest.Builder().withBackupId(backupId)
210+
.withBackupRootDir(BACKUP_ROOT_DIR.toString()).withOvewrite(true)
211+
.withFromTables(new TableName[] { sourceTableName })
212+
.withToTables(new TableName[] { targetTableName }).build();
213+
backupAdmin.restore(restoreRequest);
214+
}
215+
}
216+
217+
private void validateDataEquals(TableName tableName, String expectedData) throws IOException {
218+
try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
219+
Table table = connection.getTable(tableName)) {
220+
Scan scan = new Scan();
221+
scan.readAllVersions();
222+
scan.setRaw(true);
223+
scan.setBatch(100);
224+
Iterator<Result> scanner = table.getScanner(scan).iterator();
225+
226+
while (scanner.hasNext()) {
227+
Result sourceResult = scanner.next();
228+
List<Cell> sourceCells = sourceResult.listCells();
229+
for (int i = 0; i < sourceCells.size(); i++) {
230+
Cell cell = sourceCells.get(i);
231+
assertEquals(expectedData, Bytes.toStringBinary(cell.getValueArray(),
232+
cell.getValueOffset(), cell.getValueLength()));
233+
}
234+
}
235+
}
236+
}
237+
238+
private BackupInfo verifyBackup(String backupId, BackupType expectedType,
239+
BackupInfo.BackupState expectedState) throws IOException {
240+
try (Connection connection = ConnectionFactory.createConnection(cluster.getConf());
241+
BackupAdmin backupAdmin = new BackupAdminImpl(connection)) {
242+
BackupInfo backupInfo = backupAdmin.getBackupInfo(backupId);
243+
244+
// Verify managed backup in HBase
245+
assertEquals(backupId, backupInfo.getBackupId());
246+
assertEquals(expectedState, backupInfo.getState());
247+
assertEquals(expectedType, backupInfo.getType());
248+
return backupInfo;
249+
}
250+
}
251+
252+
private void enableBackup(Configuration conf) {
253+
// Enable backup
254+
conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
255+
BackupManager.decorateMasterConfiguration(conf);
256+
BackupManager.decorateRegionServerConfiguration(conf);
257+
}
258+
259+
}

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,18 @@
2121
import java.util.ArrayList;
2222
import java.util.Collections;
2323
import java.util.List;
24+
import java.util.OptionalLong;
2425
import org.apache.hadoop.conf.Configuration;
2526
import org.apache.hadoop.fs.FileStatus;
2627
import org.apache.hadoop.fs.FileSystem;
2728
import org.apache.hadoop.fs.Path;
2829
import org.apache.hadoop.fs.PathFilter;
2930
import org.apache.hadoop.hbase.Cell;
31+
import org.apache.hadoop.hbase.PrivateCellUtil;
3032
import org.apache.hadoop.hbase.io.hfile.HFile;
3133
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
3234
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
35+
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
3336
import org.apache.hadoop.io.NullWritable;
3437
import org.apache.hadoop.mapreduce.InputSplit;
3538
import org.apache.hadoop.mapreduce.JobContext;
@@ -78,6 +81,7 @@ private static class HFileRecordReader extends RecordReader<NullWritable, Cell>
7881
private Cell value = null;
7982
private long count;
8083
private boolean seeked = false;
84+
private OptionalLong bulkloadSeqId;
8185

8286
@Override
8387
public void initialize(InputSplit split, TaskAttemptContext context)
@@ -88,6 +92,7 @@ public void initialize(InputSplit split, TaskAttemptContext context)
8892
FileSystem fs = path.getFileSystem(conf);
8993
LOG.info("Initialize HFileRecordReader for {}", path);
9094
this.in = HFile.createReader(fs, path, conf);
95+
this.bulkloadSeqId = StoreFileInfo.getBulkloadSeqId(path);
9196

9297
// The file info must be loaded before the scanner can be used.
9398
// This seems like a bug in HBase, but it's easily worked around.
@@ -109,6 +114,9 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
109114
return false;
110115
}
111116
value = scanner.getCell();
117+
if (value != null && bulkloadSeqId.isPresent()) {
118+
PrivateCellUtil.setSequenceId(value, bulkloadSeqId.getAsLong());
119+
}
112120
count++;
113121
return true;
114122
}

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix)
165165
* package-private for internal usage for jobs like WALPlayer which need to use features of
166166
* ExtendedCell.
167167
*/
168-
static final String EXTENDED_CELL_SERIALIZATION_ENABLED_KEY =
168+
public static final String EXTENDED_CELL_SERIALIZATION_ENABLED_KEY =
169169
"hbase.mapreduce.hfileoutputformat.extendedcell.enabled";
170170
static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false;
171171

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ private Path preCommitStoreFile(final String familyName, final Path buildPath, f
490490

491491
String name = buildPath.getName();
492492
if (generateNewName) {
493-
name = generateUniqueName((seqNum < 0) ? null : "_SeqId_" + seqNum + "_");
493+
name = generateUniqueName((seqNum < 0) ? null : StoreFileInfo.formatBulkloadSeqId(seqNum));
494494
}
495495
Path dstPath = new Path(storeDir, name);
496496
if (!fs.exists(buildPath)) {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -329,13 +329,8 @@ public byte[] getMetadataValue(byte[] key) {
329329

330330
@Override
331331
public boolean isBulkLoadResult() {
332-
boolean bulkLoadedHFile = false;
333-
String fileName = this.getPath().getName();
334-
int startPos = fileName.indexOf("SeqId_");
335-
if (startPos != -1) {
336-
bulkLoadedHFile = true;
337-
}
338-
return bulkLoadedHFile || (metadataMap != null && metadataMap.containsKey(BULKLOAD_TIME_KEY));
332+
return StoreFileInfo.hasBulkloadSeqId(this.getPath())
333+
|| (metadataMap != null && metadataMap.containsKey(BULKLOAD_TIME_KEY));
339334
}
340335

341336
public boolean isCompactedAway() {
@@ -415,17 +410,15 @@ private void open() throws IOException {
415410
if (isBulkLoadResult()) {
416411
// generate the sequenceId from the fileName
417412
// fileName is of the form <randomName>_SeqId_<id-when-loaded>_
418-
String fileName = this.getPath().getName();
419-
// Use lastIndexOf() to get the last, most recent bulk load seqId.
420-
int startPos = fileName.lastIndexOf("SeqId_");
421-
if (startPos != -1) {
422-
this.sequenceid =
423-
Long.parseLong(fileName.substring(startPos + 6, fileName.indexOf('_', startPos + 6)));
413+
OptionalLong sequenceId = StoreFileInfo.getBulkloadSeqId(this.getPath());
414+
if (sequenceId.isPresent()) {
415+
this.sequenceid = sequenceId.getAsLong();
424416
// Handle reference files as done above.
425417
if (fileInfo.isTopReference()) {
426418
this.sequenceid += 1;
427419
}
428420
}
421+
429422
// SKIP_RESET_SEQ_ID only works in bulk loaded file.
430423
// In mob compaction, the hfile where the cells contain the path of a new mob file is bulk
431424
// loaded to hbase, these cells have the same seqIds with the old ones. We do not want

0 commit comments

Comments
 (0)