diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java index f681789301fe6..cf283ed0b1367 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java @@ -303,6 +303,8 @@ private Function2>, Iterator, Iterator> updateL final long startTimeForPutsTask = DateTime.now().getMillis(); LOG.info("startTimeForPutsTask for this task: " + startTimeForPutsTask); + final RateLimiter limiter = RateLimiter.create(multiPutBatchSize, TimeUnit.SECONDS); try (BufferedMutator mutator = hbaseConnection.getBufferedMutator(TableName.valueOf(tableName))) { - final RateLimiter limiter = RateLimiter.create(multiPutBatchSize, TimeUnit.SECONDS); while (statusIterator.hasNext()) { WriteStatus writeStatus = statusIterator.next(); List mutations = new ArrayList<>(); @@ -387,6 +389,8 @@ private Function2, Iterator> updateL LOG.info("hbase puts task time for this task: " + (endPutsTime - startTimeForPutsTask)); } catch (IOException e) { throw new HoodieIndexException("Failed to Update Index locations because of exception with HBase Client", e); + } finally { + limiter.stop(); } return writeStatusList.iterator(); }; @@ -586,10 +590,9 @@ public boolean rollbackCommit(String instantTime) { hbaseConnection = getHBaseConnection(); } } + final RateLimiter limiter = RateLimiter.create(multiPutBatchSize, TimeUnit.SECONDS); try (HTable hTable = (HTable) hbaseConnection.getTable(TableName.valueOf(tableName)); BufferedMutator mutator = hbaseConnection.getBufferedMutator(TableName.valueOf(tableName))) { - final RateLimiter limiter = RateLimiter.create(multiPutBatchSize, TimeUnit.SECONDS); - Long rollbackTime = HoodieActiveTimeline.parseDateFromInstantTime(instantTime).getTime(); Long currentTime = new Date().getTime(); Scan scan = new Scan(); @@ -638,6 +641,8 @@ public boolean rollbackCommit(String instantTime) { } catch (Exception e) { LOG.error("hbase index roll back failed", e); return false; + } finally { + limiter.stop(); } return true; }