|
30 | 30 | import java.nio.charset.Charset; |
31 | 31 | import java.util.ArrayList; |
32 | 32 | import java.util.Arrays; |
| 33 | +import java.util.Collections; |
33 | 34 | import java.util.List; |
34 | 35 | import java.util.Map; |
35 | 36 | import java.util.Map.Entry; |
@@ -159,6 +160,15 @@ protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) |
159 | 160 | static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY = |
160 | 161 | "hbase.mapreduce.use.multi.table.hfileoutputformat"; |
161 | 162 |
|
| 163 | + /** |
| 164 | + * ExtendedCell and ExtendedCellSerialization are InterfaceAudience.Private. We expose this config |
| 165 | + * package-private for internal usage for jobs like WALPlayer which need to use features of |
| 166 | + * ExtendedCell. |
| 167 | + */ |
| 168 | + static final String EXTENDED_CELL_SERIALIZATION_ENABLED_KEY = |
| 169 | + "hbase.mapreduce.hfileoutputformat.extendedcell.enabled"; |
| 170 | + static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false; |
| 171 | + |
162 | 172 | public static final String REMOTE_CLUSTER_CONF_PREFIX = "hbase.hfileoutputformat.remote.cluster."; |
163 | 173 | public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY = |
164 | 174 | REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum"; |
@@ -619,13 +629,12 @@ static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo, |
619 | 629 | LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); |
620 | 630 | } |
621 | 631 |
|
622 | | - // Order matters here. Hadoop's SerializationFactory runs through serializations in the order |
623 | | - // they are registered. Register ExtendedCellSerialization before CellSerialization because both |
624 | | - // work for ExtendedCells but ExtendedCellSerialization handles them properly. |
625 | 632 | conf.setStrings("io.serializations", conf.get("io.serializations"), |
626 | 633 | MutationSerialization.class.getName(), ResultSerialization.class.getName(), |
627 | 634 | ExtendedCellSerialization.class.getName(), CellSerialization.class.getName()); |
628 | 635 |
|
| 636 | + mergeSerializations(conf); |
| 637 | + |
629 | 638 | if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { |
630 | 639 | LOG.info("bulkload locality sensitive enabled"); |
631 | 640 | } |
@@ -673,6 +682,33 @@ static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo, |
673 | 682 | LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ",")); |
674 | 683 | } |
675 | 684 |
|
| 685 | + private static void mergeSerializations(Configuration conf) { |
| 686 | + List<String> serializations = new ArrayList<>(); |
| 687 | + |
| 688 | + // add any existing values that have been set |
| 689 | + String[] existing = conf.getStrings("io.serializations"); |
| 690 | + if (existing != null) { |
| 691 | + Collections.addAll(serializations, existing); |
| 692 | + } |
| 693 | + |
| 694 | + serializations.add(MutationSerialization.class.getName()); |
| 695 | + serializations.add(ResultSerialization.class.getName()); |
| 696 | + |
| 697 | + // Add ExtendedCellSerialization, if configured. Order matters here. Hadoop's |
| 698 | + // SerializationFactory runs through serializations in the order they are registered. |
| 699 | + // We want to register ExtendedCellSerialization before CellSerialization because both |
| 700 | + // work for ExtendedCells but only ExtendedCellSerialization handles them properly. |
| 701 | + if ( |
| 702 | + conf.getBoolean(EXTENDED_CELL_SERIALIZATION_ENABLED_KEY, |
| 703 | + EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT) |
| 704 | + ) { |
| 705 | + serializations.add(ExtendedCellSerialization.class.getName()); |
| 706 | + } |
| 707 | + serializations.add(CellSerialization.class.getName()); |
| 708 | + |
| 709 | + conf.setStrings("io.serializations", serializations.toArray(new String[0])); |
| 710 | + } |
| 711 | + |
676 | 712 | public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) |
677 | 713 | throws IOException { |
678 | 714 | Configuration conf = job.getConfiguration(); |
|
0 commit comments