Skip to content

Commit 032b99e

Browse files
HBASE-25839 Bulk Import fails with java.io.IOException: Type mismatch in value from map
1 parent 9932b00 commit 032b99e

2 files changed

Lines changed: 49 additions & 4 deletions

File tree

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.DataInput;
2222
import java.io.DataInputStream;
2323
import java.io.DataOutput;
24+
import java.io.DataOutputStream;
2425
import java.io.IOException;
2526
import java.lang.reflect.InvocationTargetException;
2627
import java.lang.reflect.Method;
@@ -202,9 +203,12 @@ public CellWritableComparable(Cell kv) {
202203

203204
@Override
204205
public void write(DataOutput out) throws IOException {
205-
out.writeInt(PrivateCellUtil.estimatedSerializedSizeOfKey(kv));
206-
out.writeInt(0);
207-
PrivateCellUtil.writeFlatKey(kv, out);
206+
int keyLen = CellUtil.estimatedSerializedSizeOfKey(kv);
207+
int valueLen = 0; // We avoid writing value here. So just serialize as if an empty value.
208+
out.writeInt(keyLen + valueLen + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE);
209+
out.writeInt(keyLen);
210+
out.writeInt(valueLen);
211+
CellUtil.writeFlatKey(kv, (DataOutputStream) out);
208212
}
209213

210214
@Override
@@ -413,7 +417,7 @@ public void map(ImmutableBytesWritable row, Result value, Context context) throw
413417
// skip if we filtered it out
414418
if (kv == null) continue;
415419
Cell ret = convertKv(kv, cfRenameMap);
416-
context.write(new CellWritableComparable(ret), ret);
420+
context.write(new CellWritableComparable(ret), new MapReduceExtendedCell(ret));
417421
}
418422
}
419423
} catch (InterruptedException e) {

hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,47 @@ public void testWithFilter() throws Throwable {
508508
importTable.close();
509509
}
510510

511+
/**
512+
* Create a simple table, run an Export Job on it, Import with bulk output and enable largeResult
513+
*/
514+
@Test
515+
public void testBulkImportAndLargeResult() throws Throwable {
516+
// Create simple table to export
517+
TableDescriptor desc = TableDescriptorBuilder
518+
.newBuilder(TableName.valueOf(name.getMethodName()))
519+
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build())
520+
.build();
521+
UTIL.getAdmin().createTable(desc);
522+
Table exportTable = UTIL.getConnection().getTable(desc.getTableName());
523+
524+
Put p1 = new Put(ROW1);
525+
p1.addColumn(FAMILYA, QUAL, now, QUAL);
526+
527+
// Having another row would actually test the filter.
528+
Put p2 = new Put(ROW2);
529+
p2.addColumn(FAMILYA, QUAL, now, QUAL);
530+
531+
exportTable.put(Arrays.asList(p1, p2));
532+
533+
// Export the simple table
534+
String[] args = new String[] { name.getMethodName(), FQ_OUTPUT_DIR, "1000" };
535+
assertTrue(runExport(args));
536+
537+
// Import to a new table
538+
final String IMPORT_TABLE = name.getMethodName() + "import";
539+
desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(IMPORT_TABLE))
540+
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build())
541+
.build();
542+
UTIL.getAdmin().createTable(desc);
543+
544+
String O_OUTPUT_DIR =
545+
new Path(OUTPUT_DIR + 1).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString();
546+
547+
args = new String[] { "-D" + Import.BULK_OUTPUT_CONF_KEY + "=" + O_OUTPUT_DIR,
548+
"-D" + Import.HAS_LARGE_RESULT + "=" + true, IMPORT_TABLE, FQ_OUTPUT_DIR, "1000" };
549+
assertTrue(runImport(args));
550+
}
551+
511552
/**
512553
* Count the number of keyvalues in the specified table with the given filter
513554
* @param table the table to scan

0 commit comments

Comments
 (0)