Skip to content

Commit 0a24178

Browse files
author
Ramkrishna
committed
HBASE-18649 Deprecate KV Usage in MR to move to Cells in 3.0 (ram)
1 parent 16d483f commit 0a24178

20 files changed

Lines changed: 638 additions & 155 deletions

File tree

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,29 +24,29 @@
2424
import org.apache.hadoop.conf.Configuration;
2525
import org.apache.hadoop.conf.Configured;
2626
import org.apache.hadoop.fs.Path;
27+
import org.apache.hadoop.hbase.Cell;
2728
import org.apache.hadoop.hbase.CellUtil;
2829
import org.apache.hadoop.hbase.HBaseConfiguration;
29-
import org.apache.hadoop.hbase.KeyValue;
30-
import org.apache.hadoop.hbase.KeyValue.Type;
3130
import org.apache.hadoop.hbase.TableName;
32-
import org.apache.yetus.audience.InterfaceAudience;
3331
import org.apache.hadoop.hbase.client.Connection;
3432
import org.apache.hadoop.hbase.client.ConnectionFactory;
3533
import org.apache.hadoop.hbase.client.RegionLocator;
3634
import org.apache.hadoop.hbase.client.Table;
3735
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36+
import org.apache.hadoop.hbase.mapreduce.CellSortReducer;
3837
import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
3938
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
40-
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
4139
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
4240
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
41+
import org.apache.hadoop.hbase.util.MapReduceCell;
4342
import org.apache.hadoop.io.NullWritable;
4443
import org.apache.hadoop.mapreduce.Job;
4544
import org.apache.hadoop.mapreduce.Mapper;
4645
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
4746
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
4847
import org.apache.hadoop.util.Tool;
4948
import org.apache.hadoop.util.ToolRunner;
49+
import org.apache.yetus.audience.InterfaceAudience;
5050

5151
/**
5252
* A tool to split HFiles into new region boundaries as a MapReduce job. The tool generates HFiles
@@ -70,24 +70,15 @@ protected MapReduceHFileSplitterJob(final Configuration c) {
7070

7171
/**
7272
* A mapper that just writes out cells. This one can be used together with
73-
* {@link KeyValueSortReducer}
73+
* {@link CellSortReducer}
7474
*/
7575
static class HFileCellMapper extends
76-
Mapper<NullWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
76+
Mapper<NullWritable, Cell, ImmutableBytesWritable, Cell> {
7777

7878
@Override
79-
public void map(NullWritable key, KeyValue value, Context context) throws IOException,
80-
InterruptedException {
81-
// Convert value to KeyValue if subclass
82-
if (!value.getClass().equals(KeyValue.class)) {
83-
value =
84-
new KeyValue(value.getRowArray(), value.getRowOffset(), value.getRowLength(),
85-
value.getFamilyArray(), value.getFamilyOffset(), value.getFamilyLength(),
86-
value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(),
87-
value.getTimestamp(), Type.codeToType(value.getTypeByte()), value.getValueArray(),
88-
value.getValueOffset(), value.getValueLength());
89-
}
90-
context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value);
79+
public void map(NullWritable key, Cell value, Context context)
80+
throws IOException, InterruptedException {
81+
context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), new MapReduceCell(value));
9182
}
9283

9384
@Override
@@ -119,14 +110,14 @@ public Job createSubmittableJob(String[] args) throws IOException {
119110
LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
120111
TableName tableName = TableName.valueOf(tabName);
121112
job.setMapperClass(HFileCellMapper.class);
122-
job.setReducerClass(KeyValueSortReducer.class);
113+
job.setReducerClass(CellSortReducer.class);
123114
Path outputDir = new Path(hfileOutPath);
124115
FileOutputFormat.setOutputPath(job, outputDir);
125-
job.setMapOutputValueClass(KeyValue.class);
116+
job.setMapOutputValueClass(MapReduceCell.class);
126117
try (Connection conn = ConnectionFactory.createConnection(conf);
127118
Table table = conn.getTable(tableName);
128119
RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
129-
HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
120+
HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
130121
}
131122
LOG.debug("success configuring load incremental job");
132123

hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.apache.hadoop.hbase.KeyValue.getDelimiter;
2525
import static org.apache.hadoop.hbase.KeyValue.COLUMN_FAMILY_DELIM_ARRAY;
2626

27+
import java.io.DataOutput;
2728
import java.io.DataOutputStream;
2829
import java.io.IOException;
2930
import java.io.OutputStream;
@@ -1465,9 +1466,12 @@ public static boolean isDeleteColumnOrFamily(Cell cell) {
14651466
}
14661467

14671468
/**
1468-
* Estimate based on keyvalue's serialization format.
1469+
* Estimate based on keyvalue's serialization format in the RPC layer. Note that there is an extra
1470+
* SIZEOF_INT added to the size here that indicates the actual length of the cell for cases where
1471+
* cell's are serialized in a contiguous format (For eg in RPCs).
14691472
* @param cell
1470-
* @return Estimate of the <code>cell</code> size in bytes.
1473+
* @return Estimate of the <code>cell</code> size in bytes plus an extra SIZEOF_INT indicating the
1474+
* actual cell length.
14711475
*/
14721476
public static int estimatedSerializedSizeOf(final Cell cell) {
14731477
if (cell instanceof ExtendedCell) {
@@ -1764,7 +1768,7 @@ public static boolean updateLatestStamp(Cell cell, byte[] ts, int tsOffset) thro
17641768
* @param out
17651769
* @throws IOException
17661770
*/
1767-
public static void writeFlatKey(Cell cell, DataOutputStream out) throws IOException {
1771+
public static void writeFlatKey(Cell cell, DataOutput out) throws IOException {
17681772
short rowLen = cell.getRowLength();
17691773
byte fLen = cell.getFamilyLength();
17701774
int qLen = cell.getQualifierLength();
@@ -1790,6 +1794,69 @@ public static void writeFlatKey(Cell cell, DataOutputStream out) throws IOExcept
17901794
out.writeByte(cell.getTypeByte());
17911795
}
17921796

1797+
/**
1798+
* Deep clones the given cell if the cell supports deep cloning
1799+
* @param cell the cell to be cloned
1800+
* @return the cloned cell
1801+
* @throws CloneNotSupportedException
1802+
*/
1803+
public static Cell deepClone(Cell cell) throws CloneNotSupportedException {
1804+
if (cell instanceof ExtendedCell) {
1805+
return ((ExtendedCell) cell).deepClone();
1806+
}
1807+
throw new CloneNotSupportedException();
1808+
}
1809+
1810+
/**
1811+
* Writes the cell to the given OutputStream
1812+
* @param cell the cell to be written
1813+
* @param out the outputstream
1814+
* @param withTags if tags are to be written or not
1815+
* @return the total bytes written
1816+
* @throws IOException
1817+
*/
1818+
public static int writeCell(Cell cell, OutputStream out, boolean withTags) throws IOException {
1819+
if (cell instanceof ExtendedCell) {
1820+
return ((ExtendedCell) cell).write(out, withTags);
1821+
} else {
1822+
ByteBufferUtils.putInt(out, CellUtil.estimatedSerializedSizeOfKey(cell));
1823+
ByteBufferUtils.putInt(out, cell.getValueLength());
1824+
CellUtil.writeFlatKey(cell, out);
1825+
CellUtil.writeValue(out, cell, cell.getValueLength());
1826+
int tagsLength = cell.getTagsLength();
1827+
if (withTags) {
1828+
byte[] len = new byte[Bytes.SIZEOF_SHORT];
1829+
Bytes.putAsShort(len, 0, tagsLength);
1830+
out.write(len);
1831+
if (tagsLength > 0) {
1832+
CellUtil.writeTags(out, cell, tagsLength);
1833+
}
1834+
}
1835+
int lenWritten = (2 * Bytes.SIZEOF_INT) + CellUtil.estimatedSerializedSizeOfKey(cell)
1836+
+ cell.getValueLength();
1837+
if (withTags) {
1838+
lenWritten += Bytes.SIZEOF_SHORT + tagsLength;
1839+
}
1840+
return lenWritten;
1841+
}
1842+
}
1843+
1844+
/**
1845+
* Writes a cell to the buffer at the given offset
1846+
* @param cell the cell to be written
1847+
* @param buf the buffer to which the cell has to be wrriten
1848+
* @param offset the offset at which the cell should be written
1849+
*/
1850+
public static void writeCellToBuffer(Cell cell, ByteBuffer buf, int offset) {
1851+
if (cell instanceof ExtendedCell) {
1852+
((ExtendedCell) cell).write(buf, offset);
1853+
} else {
1854+
// Using the KVUtil
1855+
byte[] bytes = KeyValueUtil.copyToNewByteArray(cell);
1856+
ByteBufferUtils.copyFromArrayToBuffer(buf, offset, bytes, 0, bytes.length);
1857+
}
1858+
}
1859+
17931860
public static int writeFlatKey(Cell cell, OutputStream out) throws IOException {
17941861
short rowLen = cell.getRowLength();
17951862
byte fLen = cell.getFamilyLength();
@@ -1844,7 +1911,7 @@ public static void writeRow(OutputStream out, Cell cell, short rlength) throws I
18441911
public static void writeRowSkippingBytes(DataOutputStream out, Cell cell, short rlength,
18451912
int commonPrefix) throws IOException {
18461913
if (cell instanceof ByteBufferCell) {
1847-
ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getRowByteBuffer(),
1914+
ByteBufferUtils.copyBufferToStream((DataOutput)out, ((ByteBufferCell) cell).getRowByteBuffer(),
18481915
((ByteBufferCell) cell).getRowPosition() + commonPrefix, rlength - commonPrefix);
18491916
} else {
18501917
out.write(cell.getRowArray(), cell.getRowOffset() + commonPrefix, rlength - commonPrefix);
@@ -1894,7 +1961,7 @@ public static void writeQualifier(OutputStream out, Cell cell, int qlength)
18941961
public static void writeQualifierSkippingBytes(DataOutputStream out, Cell cell,
18951962
int qlength, int commonPrefix) throws IOException {
18961963
if (cell instanceof ByteBufferCell) {
1897-
ByteBufferUtils.copyBufferToStream(out, ((ByteBufferCell) cell).getQualifierByteBuffer(),
1964+
ByteBufferUtils.copyBufferToStream((DataOutput)out, ((ByteBufferCell) cell).getQualifierByteBuffer(),
18981965
((ByteBufferCell) cell).getQualifierPosition() + commonPrefix, qlength - commonPrefix);
18991966
} else {
19001967
out.write(cell.getQualifierArray(), cell.getQualifierOffset() + commonPrefix,

hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.hadoop.hbase.io.encoding;
1818

1919
import java.io.DataInputStream;
20+
import java.io.DataOutput;
2021
import java.io.DataOutputStream;
2122
import java.io.IOException;
2223
import java.nio.ByteBuffer;
@@ -262,7 +263,7 @@ private int compressSingleKeyValue(DataOutputStream out, Cell cell, Cell prevCel
262263
ByteBufferUtils.putCompressedInt(out, kLength);
263264
ByteBufferUtils.putCompressedInt(out, vLength);
264265
ByteBufferUtils.putCompressedInt(out, 0);
265-
CellUtil.writeFlatKey(cell, out);
266+
CellUtil.writeFlatKey(cell, (DataOutput)out);
266267
// Write the value part
267268
CellUtil.writeValue(out, cell, cell.getValueLength());
268269
} else {

hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.hadoop.hbase.io.encoding;
1818

1919
import java.io.DataInputStream;
20+
import java.io.DataOutput;
2021
import java.io.DataOutputStream;
2122
import java.io.IOException;
2223
import java.nio.ByteBuffer;
@@ -59,7 +60,7 @@ public int internalEncode(Cell cell, HFileBlockDefaultEncodingContext encodingCo
5960
ByteBufferUtils.putCompressedInt(out, klength);
6061
ByteBufferUtils.putCompressedInt(out, vlength);
6162
ByteBufferUtils.putCompressedInt(out, 0);
62-
CellUtil.writeFlatKey(cell, out);
63+
CellUtil.writeFlatKey(cell, (DataOutput)out);
6364
} else {
6465
// find a common prefix and skip it
6566
int common = CellUtil.findCommonPrefixInFlatKey(cell, state.prevCell, true, true);

hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.ByteArrayOutputStream;
2121
import java.io.DataInput;
2222
import java.io.DataInputStream;
23+
import java.io.DataOutput;
2324
import java.io.IOException;
2425
import java.io.InputStream;
2526
import java.io.OutputStream;
@@ -193,6 +194,28 @@ public static void copyBufferToStream(OutputStream out, ByteBuffer in,
193194
}
194195
}
195196

197+
/**
198+
* Copy data from a buffer to an output stream. Does not update the position
199+
* in the buffer.
200+
* @param out the output stream to write bytes to
201+
* @param in the buffer to read bytes from
202+
* @param offset the offset in the buffer (from the buffer's array offset)
203+
* to start copying bytes from
204+
* @param length the number of bytes to copy
205+
*/
206+
public static void copyBufferToStream(DataOutput out, ByteBuffer in, int offset, int length)
207+
throws IOException {
208+
if (out instanceof ByteBufferWriter) {
209+
((ByteBufferWriter) out).write(in, offset, length);
210+
} else if (in.hasArray()) {
211+
out.write(in.array(), in.arrayOffset() + offset, length);
212+
} else {
213+
for (int i = 0; i < length; ++i) {
214+
out.write(toByte(in, offset + i));
215+
}
216+
}
217+
}
218+
196219
public static int putLong(OutputStream out, final long value,
197220
final int fitInBytes) throws IOException {
198221
long tmpValue = value;

0 commit comments

Comments
 (0)