@@ -143,19 +143,22 @@ public void setup(Context context) throws IOException {
143143 * A mapper that just writes out Cells. This one can be used together with {@link CellSortReducer}
144144 */
145145 static class WALCellMapper extends Mapper <WALKey , WALEdit , ImmutableBytesWritable , Cell > {
146- private byte [] table ;
146+ private Set <String > tableSet = new HashSet <>();
147+ private boolean multiTableSupport = false ;
147148
148149 @ Override
149150 public void map (WALKey key , WALEdit value , Context context ) throws IOException {
150151 try {
151- // skip all other tables
152- if (Bytes . equals (table , key . getTableName (). getName ())) {
152+ TableName table = key . getTableName ();
153+ if (tableSet . contains (table . getNameAsString ())) {
153154 for (Cell cell : value .getCells ()) {
154155 if (WALEdit .isMetaEditFamily (cell )) {
155156 continue ;
156157 }
157- context .write (new ImmutableBytesWritable (CellUtil .cloneRow (cell )),
158- new MapReduceExtendedCell (cell ));
158+ byte [] outKey = multiTableSupport
159+ ? Bytes .add (table .getName (), Bytes .toBytes (tableSeparator ), CellUtil .cloneRow (cell ))
160+ : CellUtil .cloneRow (cell );
161+ context .write (new ImmutableBytesWritable (outKey ), new MapReduceExtendedCell (cell ));
159162 }
160163 }
161164 } catch (InterruptedException e ) {
@@ -165,13 +168,12 @@ public void map(WALKey key, WALEdit value, Context context) throws IOException {
165168
166169 @ Override
167170 public void setup (Context context ) throws IOException {
168- // only a single table is supported when HFiles are generated with HFileOutputFormat
169- String [] tables = context . getConfiguration () .getStrings (TABLES_KEY );
170- if ( tables == null || tables . length != 1 ) {
171- // this can only happen when WALMapper is used directly by a class other than WALPlayer
172- throw new IOException ( "Exactly one table must be specified for bulk HFile case." );
171+ Configuration conf = context . getConfiguration ();
172+ String [] tables = conf .getStrings (TABLES_KEY );
173+ this . multiTableSupport = conf . getBoolean ( MULTI_TABLES_SUPPORT , false );
174+ for ( String table : tables ) {
175+ tableSet . add ( table );
173176 }
174- table = Bytes .toBytes (tables [0 ]);
175177 }
176178 }
177179
@@ -363,7 +365,7 @@ public Job createSubmittableJob(String[] args) throws IOException {
363365 FileOutputFormat .setOutputPath (job , outputDir );
364366 job .setMapOutputValueClass (KeyValue .class );
365367 try (Connection conn = ConnectionFactory .createConnection (conf );) {
366- List <TableInfo > tableInfoList = new ArrayList <TableInfo >();
368+ List <TableInfo > tableInfoList = new ArrayList <>();
367369 for (TableName tableName : tableNames ) {
368370 Table table = conn .getTable (tableName );
369371 RegionLocator regionLocator = conn .getRegionLocator (tableName );
0 commit comments