@@ -72,6 +72,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
7272 private static final Logger LOG = LoggerFactory .getLogger (DefaultMobStoreCompactor .class );
7373 protected long mobSizeThreshold ;
7474 protected HMobStore mobStore ;
75+ protected boolean ioOptimizedMode = false ;
7576
7677 /*
7778 * MOB file reference set thread local variable. It contains set of a MOB file names, which newly
@@ -99,15 +100,15 @@ protected Boolean initialValue() {
99100 };
100101
101102 /*
102- * Map : MOB file name - file length Can be expensive for large amount of MOB files?
103+ * Map : MOB file name - file length Can be expensive for large amount of MOB files.
103104 */
104105 static ThreadLocal <HashMap <String , Long >> mobLengthMap =
105- new ThreadLocal <HashMap <String , Long >>() {
106- @ Override
107- protected HashMap <String , Long > initialValue () {
108- return new HashMap <String , Long >();
109- }
110- };
106+ new ThreadLocal <HashMap <String , Long >>() {
107+ @ Override
108+ protected HashMap <String , Long > initialValue () {
109+ return new HashMap <String , Long >();
110+ }
111+ };
111112
112113 private final InternalScannerFactory scannerFactory = new InternalScannerFactory () {
113114
@@ -145,34 +146,45 @@ public DefaultMobStoreCompactor(Configuration conf, HStore store) {
145146 if (!(store instanceof HMobStore )) {
146147 throw new IllegalArgumentException ("The store " + store + " is not a HMobStore" );
147148 }
148- mobStore = (HMobStore ) store ;
149- mobSizeThreshold = store .getColumnFamilyDescriptor ().getMobThreshold ();
149+ this .mobStore = (HMobStore ) store ;
150+ this .mobSizeThreshold = store .getColumnFamilyDescriptor ().getMobThreshold ();
151+ this .ioOptimizedMode = conf .get (MobConstants .MOB_COMPACTION_TYPE_KEY ,
152+ MobConstants .DEFAULT_MOB_COMPACTION_TYPE ).
153+ equals (MobConstants .OPTIMIZED_MOB_COMPACTION_TYPE );
154+
150155 }
151156
152157 @ Override
153158 public List <Path > compact (CompactionRequestImpl request ,
154159 ThroughputController throughputController , User user ) throws IOException {
155- LOG .info ("Mob compaction: major=" + request .isMajor () + " isAll=" + request .isAllFiles ()
156- + " priority=" + request .getPriority ());
160+ String tableName = store .getTableName ().toString ();
161+ String regionName = store .getRegionInfo ().getRegionNameAsString ();
162+ String familyName = store .getColumnFamilyName ();
163+ LOG .info ("MOB compaction: major={} isAll={} priority={} throughput controller={}" +
164+ " table={} cf={} region={}" ,
165+ request .isMajor (), request .isAllFiles (), request .getPriority (),
166+ throughputController , tableName , familyName , regionName );
157167 if (request .getPriority () == HStore .PRIORITY_USER ) {
158168 userRequest .set (Boolean .TRUE );
159169 } else {
160170 userRequest .set (Boolean .FALSE );
161171 }
162- LOG .info ("Mob compaction files: " + request .getFiles ());
172+ LOG .debug ("MOB compaction table={} cf={} region={} files: " , tableName , familyName ,
173+ regionName , request .getFiles ());
163174 // Check if I/O optimized MOB compaction
164- if (conf .get (MobConstants .MOB_COMPACTION_TYPE_KEY , MobConstants .DEFAULT_MOB_COMPACTION_TYPE )
165- .equals (MobConstants .IO_OPTIMIZED_MOB_COMPACTION_TYPE )) {
175+ if (ioOptimizedMode ) {
166176 if (request .isMajor () && request .getPriority () == HStore .PRIORITY_USER ) {
167177 Path mobDir =
168178 MobUtils .getMobFamilyPath (conf , store .getTableName (), store .getColumnFamilyName ());
169179 List <Path > mobFiles = MobUtils .getReferencedMobFiles (request .getFiles (), mobDir );
170180 if (mobFiles .size () > 0 ) {
171181 calculateMobLengthMap (mobFiles );
172182 }
173- LOG .info ("I/O optimized MOB compaction. Total referenced MOB files: {}" , mobFiles .size ());
183+ LOG .info ("Table={} cf={} region={}. I/O optimized MOB compaction. " +
184+ "Total referenced MOB files: {}" , tableName , familyName , regionName , mobFiles .size ());
174185 }
175186 }
187+
176188 return compact (request , scannerFactory , writerFactory , throughputController , user );
177189 }
178190
@@ -183,7 +195,7 @@ private void calculateMobLengthMap(List<Path> mobFiles) throws IOException {
183195 for (Path p : mobFiles ) {
184196 FileStatus st = fs .getFileStatus (p );
185197 long size = st .getLen ();
186- LOG .info ( "Ref MOB file={} size={}" , p , size );
198+ LOG .debug ( "Referenced MOB file={} size={}" , p , size );
187199 map .put (p .getName (), fs .getFileStatus (p ).getLen ());
188200 }
189201 }
@@ -234,20 +246,18 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
234246 mobRefSet .get ().clear ();
235247 boolean isUserRequest = userRequest .get ();
236248 boolean compactMOBs = major && isUserRequest ;
237- boolean ioOptimizedMode =
238- conf .get (MobConstants .MOB_COMPACTION_TYPE_KEY , MobConstants .DEFAULT_MOB_COMPACTION_TYPE )
239- .equals (MobConstants .IO_OPTIMIZED_MOB_COMPACTION_TYPE );
240-
241249 boolean discardMobMiss = conf .getBoolean (MobConstants .MOB_UNSAFE_DISCARD_MISS_KEY ,
242250 MobConstants .DEFAULT_MOB_DISCARD_MISS );
243-
251+ if (discardMobMiss ) {
252+ LOG .warn ("{}=true. This is unsafe setting recommended only" +
253+ " during upgrade process from MOB 1.0 to MOB 2.0 versions." ,
254+ MobConstants .MOB_UNSAFE_DISCARD_MISS_KEY );
255+ }
244256 long maxMobFileSize = conf .getLong (MobConstants .MOB_COMPACTION_MAX_FILE_SIZE_KEY ,
245257 MobConstants .DEFAULT_MOB_COMPACTION_MAX_FILE_SIZE );
246- LOG .info ("Compact MOB={} optimized={} maximum MOB file size={} major={}" , compactMOBs ,
247- ioOptimizedMode , maxMobFileSize , major );
248-
258+ LOG .info ("Compact MOB={} optimized={} maximum MOB file size={} major={} store={}" , compactMOBs ,
259+ ioOptimizedMode , maxMobFileSize , major , getStoreInfo ());
249260 FileSystem fs = FileSystem .get (conf );
250-
251261 // Since scanner.next() can return 'false' but still be delivering data,
252262 // we have to use a do/while loop.
253263 List <Cell > cells = new ArrayList <>();
@@ -298,19 +308,20 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
298308 mobCell = mobStore .resolve (c , true , false ).getCell ();
299309 } catch (FileNotFoundException fnfe ) {
300310 if (discardMobMiss ) {
301- LOG .debug ("Missing MOB cell: file={} not found cell={}" , pp , c );
311+ LOG .error ("Missing MOB cell: file={} not found cell={}" , fName , c );
302312 continue ;
303313 } else {
304314 throw fnfe ;
305315 }
306316 }
307317
308318 if (discardMobMiss && mobCell .getValueLength () == 0 ) {
309- LOG .error ("Missing MOB cell value: file=" + pp + " cell=" + mobCell );
319+ LOG .error ("Missing MOB cell value: file={} cell={}" , pp , mobCell );
310320 continue ;
311321 } else if (mobCell .getValueLength () == 0 ) {
312- // TODO: what to do here? This is data corruption?
313- LOG .warn ("Found 0 length MOB cell in a file={} cell={}" , pp , mobCell );
322+ String errMsg = String .format ("Found 0 length MOB cell in a file=%s cell=%s" ,
323+ fName , mobCell );
324+ throw new IOException (errMsg );
314325 }
315326
316327 if (mobCell .getValueLength () > mobSizeThreshold ) {
@@ -329,8 +340,8 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
329340 if (size == null ) {
330341 // FATAL error, abort compaction
331342 String msg = String .format (
332- "Found unreferenced MOB file during compaction %s, aborting." , fName );
333- LOG . error ( msg );
343+ "Found unreferenced MOB file during compaction %s, aborting compaction %s" ,
344+ fName , getStoreInfo () );
334345 throw new IOException (msg );
335346 }
336347 // Can not be null
@@ -344,11 +355,10 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
344355 MobUtils .createMobRefCell (mobCell , fileName , this .mobStore .getRefCellTags ()));
345356 // Update total size of the output (we do not take into account
346357 // file compression yet)
347- long len = getLength (mobFileWriter );
348-
358+ long len = mobFileWriter .getPos ();
349359 if (len > maxMobFileSize ) {
350- LOG .debug ("Closing output MOB File, length={} file={}" , len ,
351- Bytes .toString (fileName ));
360+ LOG .debug ("Closing output MOB File, length={} file={}, store= " , len ,
361+ Bytes .toString (fileName ), getStoreInfo () );
352362 commitOrAbortMobWriter (mobFileWriter , fd .maxSeqId , mobCells , major );
353363 mobFileWriter = newMobWriter (fd );
354364 fileName = Bytes .toBytes (mobFileWriter .getPath ().getName ());
@@ -384,7 +394,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
384394 if (ioOptimizedMode ) {
385395 // Update total size of the output (we do not take into account
386396 // file compression yet)
387- long len = getLength ( mobFileWriter );
397+ long len = mobFileWriter . getPos ( );
388398 if (len > maxMobFileSize ) {
389399 commitOrAbortMobWriter (mobFileWriter , fd .maxSeqId , mobCells , major );
390400 mobFileWriter = newMobWriter (fd );
@@ -411,9 +421,8 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
411421 // Add MOB reference to a MOB reference set
412422 mobRefSet .get ().add (MobUtils .getMobFileName (c ));
413423 } else {
414- // TODO ????
415- LOG .error ("Corrupted MOB reference: " + c );
416- writer .append (c );
424+ String errMsg = String .format ("Corrupted MOB reference: %s" , c .toString ());
425+ throw new IOException (errMsg );
417426 }
418427 } else if (c .getValueLength () <= mobSizeThreshold ) {
419428 // If the value size of a cell is not larger than the threshold, directly write it to
@@ -431,7 +440,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
431440 cellsCountCompactedToMob ++;
432441 cellsSizeCompactedToMob += c .getValueLength ();
433442 if (ioOptimizedMode ) {
434- long len = getLength ( mobFileWriter );
443+ long len = mobFileWriter . getPos ( );
435444 if (len > maxMobFileSize ) {
436445 commitOrAbortMobWriter (mobFileWriter , fd .maxSeqId , mobCells , major );
437446 mobFileWriter = newMobWriter (fd );
@@ -486,8 +495,9 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
486495 throw new InterruptedIOException (
487496 "Interrupted while control throughput of compacting " + compactionName );
488497 } catch (IOException t ) {
489- LOG .error ("Mob compaction failed for region:{} " , store .getRegionInfo ().getEncodedName ());
490- throw t ;
498+ String msg = "Mob compaction failed for region: " +
499+ store .getRegionInfo ().getEncodedName ();
500+ throw new IOException (msg , t );
491501 } finally {
492502 // Clone last cell in the final because writer will append last cell when committing. If
493503 // don't clone here and once the scanner get closed, then the memory of last cell will be
@@ -498,15 +508,15 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
498508 // Remove all MOB references because compaction failed
499509 mobRefSet .get ().clear ();
500510 // Abort writer
501- LOG .debug ("Aborting writer for {} because of a compaction failure" ,
502- mobFileWriter .getPath ());
511+ LOG .debug ("Aborting writer for {} because of a compaction failure, Store {} " ,
512+ mobFileWriter .getPath (), getStoreInfo () );
503513 abortWriter (mobFileWriter );
504514 }
505515 }
506516
507517 // Commit last MOB writer
508518 commitOrAbortMobWriter (mobFileWriter , fd .maxSeqId , mobCells , major );
509-
519+ clearThreadLocals ();
510520 mobStore .updateCellsCountCompactedFromMob (cellsCountCompactedFromMob );
511521 mobStore .updateCellsCountCompactedToMob (cellsCountCompactedToMob );
512522 mobStore .updateCellsSizeCompactedFromMob (cellsSizeCompactedFromMob );
@@ -515,11 +525,20 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
515525 return true ;
516526 }
517527
518- private long getLength (StoreFileWriter mobFileWriter ) throws IOException {
519- return mobFileWriter .getPos ();
528+ private String getStoreInfo () {
529+ return String .format ("[table=%s family=%s region=%s]" , store .getTableName ().getNameAsString (),
530+ store .getColumnFamilyName (), store .getRegionInfo ().getEncodedName ()) ;
520531 }
521532
522- private StoreFileWriter newMobWriter (FileDetails fd /* , boolean compactMOBs */ )
533+ private void clearThreadLocals () {
534+ Set <String > set = mobRefSet .get ();
535+ if (set != null ) set .clear ();
536+ HashMap <String , Long > map = mobLengthMap .get ();
537+ if (map != null ) map .clear ();
538+ }
539+
540+
541+ private StoreFileWriter newMobWriter (FileDetails fd )
523542 throws IOException {
524543 try {
525544 StoreFileWriter mobFileWriter = mobStore .createWriterInTmp (new Date (fd .latestPutTs ),
@@ -530,8 +549,8 @@ private StoreFileWriter newMobWriter(FileDetails fd/* , boolean compactMOBs */)
530549 return mobFileWriter ;
531550 } catch (IOException e ) {
532551 // Bailing out
533- LOG . error ("Failed to create mob writer, " , e );
534- throw e ;
552+ throw new IOException ( String . format ("Failed to create mob writer, store=%s" ,
553+ getStoreInfo ()), e ) ;
535554 }
536555 }
537556
@@ -544,8 +563,9 @@ private void commitOrAbortMobWriter(StoreFileWriter mobFileWriter, long maxSeqId
544563 // become orphans and will be deleted during next MOB cleaning chore cycle
545564
546565 if (mobFileWriter != null ) {
547- LOG .info ("Commit or abort size={} mobCells={} major={} file={}" , mobFileWriter .getPos (),
548- mobCells , major , mobFileWriter .getPath ().getName ());
566+ LOG .debug ("Commit or abort size={} mobCells={} major={} file={}, store={}" ,
567+ mobFileWriter .getPos (), mobCells , major , mobFileWriter .getPath ().getName (),
568+ getStoreInfo ());
549569 Path path =
550570 MobUtils .getMobFamilyPath (conf , store .getTableName (), store .getColumnFamilyName ());
551571 if (mobCells > 0 ) {
@@ -555,20 +575,18 @@ private void commitOrAbortMobWriter(StoreFileWriter mobFileWriter, long maxSeqId
555575 mobStore .commitFile (mobFileWriter .getPath (), path );
556576 } else {
557577 // If the mob file is empty, delete it instead of committing.
558- LOG .debug ("Aborting writer for {} because there are no MOB cells" , mobFileWriter .getPath ());
578+ LOG .debug ("Aborting writer for {} because there are no MOB cells, store={}" ,
579+ mobFileWriter .getPath (), getStoreInfo ());
559580 // Remove MOB file from reference set
560581 mobRefSet .get ().remove (mobFileWriter .getPath ().getName ());
561582 abortWriter (mobFileWriter );
562583 }
563584 } else {
564- LOG .info ("Mob file writer is null, skipping commit/abort." );
585+ LOG .debug ("Mob file writer is null, skipping commit/abort, store=" ,
586+ getStoreInfo ());
565587 }
566588 }
567589
568- protected static String createKey (TableName tableName , String encodedName ,
569- String columnFamilyName ) {
570- return tableName .getNameAsString () + "_" + encodedName + "_" + columnFamilyName ;
571- }
572590
573591 @ Override
574592 protected List <Path > commitWriter (StoreFileWriter writer , FileDetails fd ,
0 commit comments