Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ class RegionFlusherAndCompactor implements Closeable {
flushThread.start();
compactExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setNameFormat("Procedure-Region-Store-Compactor").setDaemon(true).build());
LOG.info("Constructor flushSize={}, flushPerChanges={}, flushIntervalMs={}, " +
"compactMin=", flushSize, flushPerChanges, flushIntervalMs, compactMin);
}

// inject our flush related configurations
Expand All @@ -130,6 +132,8 @@ static void setupConf(Configuration conf) {
conf.setLong(HRegion.MEMSTORE_FLUSH_PER_CHANGES, flushPerChanges);
long flushIntervalMs = conf.getLong(FLUSH_INTERVAL_MS_KEY, DEFAULT_FLUSH_INTERVAL_MS);
conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, flushIntervalMs);
LOG.info("Injected flushSize={}, flushPerChanges={}, flushIntervalMs={}", flushSize,
flushPerChanges, flushIntervalMs);
}

private void compact() {
Expand Down Expand Up @@ -180,6 +184,7 @@ private void flushLoop() {
changesAfterLastFlush.set(0);
try {
region.flush(true);
lastFlushTime = EnvironmentEdgeManager.currentTime();
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to flush procedure store region, aborting...", e);
abortable.abort("Failed to flush procedure store region", e);
Expand Down Expand Up @@ -207,8 +212,14 @@ private void flushLoop() {
}

private boolean shouldFlush(long changes) {
return region.getMemStoreHeapSize() + region.getMemStoreOffHeapSize() >= flushSize ||
changes > flushPerChanges;
long heapSize = region.getMemStoreHeapSize();
long offHeapSize = region.getMemStoreOffHeapSize();
boolean flush = heapSize + offHeapSize >= flushSize || changes > flushPerChanges;
if (flush && LOG.isTraceEnabled()) {
LOG.trace("shouldFlush totalMemStoreSize={}, flushSize={}, changes={}, flushPerChanges={}",
heapSize + offHeapSize, flushSize, changes, flushPerChanges);
}
return flush;
}

void onUpdate() {
Expand Down Expand Up @@ -237,4 +248,4 @@ public void close() {
flushThread.interrupt();
compactExecutor.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ private void tryMigrate(FileSystem fs) throws IOException {
if (!fs.exists(procWALDir)) {
return;
}
LOG.info("The old procedure wal directory {} exists, start migrating", procWALDir);
LOG.info("The old WALProcedureStore wal directory {} exists, migrating...", procWALDir);
WALProcedureStore store = new WALProcedureStore(conf, leaseRecovery);
store.start(numThreads);
store.recoverLease();
Expand Down Expand Up @@ -347,7 +347,7 @@ public void handleCorrupted(ProcedureIterator procIter) throws IOException {
}
}
});
LOG.info("The max pid is {}, and the max pid of all loaded procedures is {}",
LOG.info("The WALProcedureStore max pid is {}, and the max pid of all loaded procedures is {}",
maxProcIdSet.longValue(), maxProcIdFromProcs.longValue());
// Theoretically, the maxProcIdSet should be greater than or equal to maxProcIdFromProcs, but
// anyway, let's do a check here.
Expand All @@ -358,12 +358,13 @@ public void handleCorrupted(ProcedureIterator procIter) throws IOException {
PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
}
} else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) {
LOG.warn("The max pid is less than the max pid of all loaded procedures");
LOG.warn("The WALProcedureStore max pid is less than the max pid of all loaded procedures");
}
if (!fs.delete(procWALDir, true)) {
throw new IOException("Failed to delete the migrated proc wal directory " + procWALDir);
throw new IOException("Failed to delete the WALProcedureStore migrated proc wal directory " +
procWALDir);
}
LOG.info("Migration finished");
LOG.info("Migration of WALProcedureStore finished");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2401,7 +2401,7 @@ public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean writeFlus
flushesQueued.reset();
}

status.markComplete("Flush successful");
status.markComplete("Flush successful " + fs.toString());
return fs;
} finally {
synchronized (writestate) {
Expand Down Expand Up @@ -8871,4 +8871,4 @@ static void decorateRegionConfiguration(Configuration conf) {
}
}
}
}
}