From c90afcb99d662794f290a7686b07fd699a987a90 Mon Sep 17 00:00:00 2001 From: Archana Katiyar Date: Wed, 31 Oct 2018 22:09:04 +0530 Subject: [PATCH] Basic heat map implementation at region level --- .../hbase/regionserver/HRegionServer.java | 101 +++++- .../hbase/regionserver/stats/AccessStats.java | 131 +++++++ .../stats/AccessStatsFactory.java | 48 +++ .../stats/AccessStatsRecorderConstants.java | 60 ++++ .../stats/AccessStatsRecorderTableImpl.java | 227 ++++++++++++ .../stats/AccessStatsRecorderUtils.java | 126 +++++++ .../stats/IAccessStatsRecorder.java | 55 +++ .../regionserver/stats/RegionAccessStats.java | 75 ++++ .../regionserver/stats/UidGenerator.java | 326 ++++++++++++++++++ .../hbase-webapps/master/accessStats.jsp | 214 ++++++++++++ .../hbase-webapps/master/tablesDetailed.jsp | 4 + .../hbase/regionserver/TestAccessStats.java | 251 ++++++++++++++ 12 files changed, 1616 insertions(+), 2 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/AccessStats.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/AccessStatsFactory.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/AccessStatsRecorderConstants.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/AccessStatsRecorderTableImpl.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/AccessStatsRecorderUtils.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/IAccessStatsRecorder.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/RegionAccessStats.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/UidGenerator.java create mode 100644 hbase-server/src/main/resources/hbase-webapps/master/accessStats.jsp create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAccessStats.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 68eb006f46ff..bab8193f6488 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -30,6 +30,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -131,6 +132,12 @@ import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler; import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; +import org.apache.hadoop.hbase.regionserver.stats.AccessStats; +import org.apache.hadoop.hbase.regionserver.stats.AccessStats.AccessStatsType; +import org.apache.hadoop.hbase.regionserver.stats.AccessStatsRecorderTableImpl; +import org.apache.hadoop.hbase.regionserver.stats.AccessStatsRecorderUtils; +import org.apache.hadoop.hbase.regionserver.stats.IAccessStatsRecorder; +import org.apache.hadoop.hbase.regionserver.stats.RegionAccessStats; import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; @@ -385,6 +392,11 @@ public class HRegionServer extends HasThread implements * Check for flushes */ ScheduledChore periodicFlusher; + + /* + * Record periodic region counters (read and write) + */ + ScheduledChore regionStatsRecorder; protected volatile WALFactory walFactory; @@ -1818,6 +1830,83 @@ protected void chore() { } } } + + /* + * This class is to periodically store information about region access stats. + */ + private static class RegionStatsRecorder extends ScheduledChore { + private final HRegionServer instance; + IAccessStatsRecorder accessStatsRecorder; + + Map regionLastReadCountMap = new HashMap(); + Map regionLastWriteCountMap = new HashMap(); + + RegionStatsRecorder(final HRegionServer h, final Stoppable stopper, final int durationInMinutes, + final Configuration configuration) { + super("RegionStatsRecorder", stopper, durationInMinutes * 60 * 1000); + this.instance = h; + + accessStatsRecorder = new AccessStatsRecorderTableImpl(configuration); + + LOG.info(this.getName() + " runs every " + durationInMinutes + + " minutes with initial delay of " + durationInMinutes + " minutes."); + } + + @Override + protected void chore() { + List accessStatsList = new ArrayList<>(); + + for (Region region : this.instance.onlineRegions.values()) { + if (region == null) continue; + + String regionName = region.getRegionInfo().getRegionNameAsString(); + + long currentReadRequestCount = region.getReadRequestsCount(); + long currentWriteRequestCount = region.getWriteRequestsCount(); + + Long lastReadRequestCount = regionLastReadCountMap.get(regionName); + Long lastWriteRequestCount = regionLastWriteCountMap.get(regionName); + + long readRequestCountSinceLastIteration = currentReadRequestCount; + long writeRequestCountSinceLastIteration = currentWriteRequestCount; + + if (lastReadRequestCount != null) { + readRequestCountSinceLastIteration = currentReadRequestCount - lastReadRequestCount; + } + + if (lastWriteRequestCount != null) { + writeRequestCountSinceLastIteration = currentWriteRequestCount - lastWriteRequestCount; + } + + RegionAccessStats regionAccessStats = + new RegionAccessStats(region.getRegionInfo().getTable(), AccessStatsType.READCOUNT, + region.getRegionInfo().getStartKey(), region.getRegionInfo().getEndKey(), + readRequestCountSinceLastIteration); + regionAccessStats.setRegionName(regionName); + accessStatsList.add(regionAccessStats); + + regionAccessStats = new RegionAccessStats(region.getRegionInfo().getTable(), + AccessStatsType.WRITECOUNT, region.getRegionInfo().getStartKey(), + region.getRegionInfo().getEndKey(), writeRequestCountSinceLastIteration); + regionAccessStats.setRegionName(regionName); + accessStatsList.add(regionAccessStats); + + regionLastReadCountMap.put(regionName, currentReadRequestCount); + regionLastWriteCountMap.put(regionName, currentWriteRequestCount); + } + + accessStatsRecorder.writeAccessStats(accessStatsList); + } + + @Override + protected synchronized void cleanup() { + try { + accessStatsRecorder.close(); + } catch (IOException e) { + LOG.error("Exception in cleanup of RegionStatsRecorder - "+e.getMessage()); + } + } + } /** * Report the status of the server. A server is online once all the startup is @@ -1977,6 +2066,7 @@ private void startServices() throws IOException { if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore); if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher); if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner); + if (this.regionStatsRecorder != null) choreService.scheduleChore(regionStatsRecorder); if (this.fsUtilizationChore != null) choreService.scheduleChore(fsUtilizationChore); // Leases is not a Thread. Internally it runs a daemon thread. If it gets @@ -2019,6 +2109,11 @@ private void initializeThreads() throws IOException { // in a while. It will take care of not checking too frequently on store-by-store basis. this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this); this.periodicFlusher = new PeriodicMemStoreFlusher(this.threadWakeFrequency, this); + + AccessStatsRecorderUtils.createInstance(conf); + this.regionStatsRecorder = new RegionStatsRecorder(this, this, + AccessStatsRecorderUtils.getInstance().getIterationDuration(), conf); + this.leases = new Leases(this.threadWakeFrequency); // Create the thread to clean the moved regions list @@ -2131,7 +2226,8 @@ private boolean isHealthy() { && (this.cacheFlusher == null || this.cacheFlusher.isAlive()) && (this.walRoller == null || this.walRoller.isAlive()) && (this.compactionChecker == null || this.compactionChecker.isScheduled()) - && (this.periodicFlusher == null || this.periodicFlusher.isScheduled()); + && (this.periodicFlusher == null || this.periodicFlusher.isScheduled() + && (this.regionStatsRecorder == null || this.regionStatsRecorder.isScheduled())); if (!healthy) { stop("One or more threads are no longer alive -- stop"); } @@ -2473,6 +2569,7 @@ protected void stopServiceThreads() { choreService.cancelChore(healthCheckChore); choreService.cancelChore(storefileRefresher); choreService.cancelChore(movedRegionsCleaner); + choreService.cancelChore(regionStatsRecorder); choreService.cancelChore(fsUtilizationChore); // clean up the remaining scheduled chores (in case we missed out any) choreService.shutdown(); @@ -3837,4 +3934,4 @@ public void run() { System.exit(1); } } -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/AccessStats.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/AccessStats.java new file mode 100644 index 000000000000..f4e2a5277686 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/AccessStats.java @@ -0,0 +1,131 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.stats; + +import java.util.List; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Generic class to hold AccessStats information together, its an abstract class and certain methods + * needs to be implemented for given granularity of access stats. Currently its implemented only for + * REGION. + */ +@InterfaceAudience.Private +public abstract class AccessStats { + protected AccessStatsType accessStatsType; + TableName table; + protected long normalizedTimeInEpoch; + protected long value; + protected byte[] keyRangeStart; + protected byte[] keyRangeEnd; + + public AccessStats(TableName table, AccessStatsType accessStatsType, byte[] keyRangeStart, + byte[] keyRangeEnd, long value) { + this(table, accessStatsType, keyRangeStart, keyRangeEnd, + AccessStatsRecorderUtils.getInstance().getNormalizedTimeCurrent(), value); + } + + public AccessStats(TableName table, AccessStatsType accessStatsType, byte[] keyRangeStart, + byte[] keyRangeEnd, long time, long value) { + this.accessStatsType = accessStatsType; + this.table = table; + this.normalizedTimeInEpoch = time; + this.value = value; + this.keyRangeStart = keyRangeStart; + this.keyRangeEnd = keyRangeEnd; + } + + public long getEpochTime() { + return normalizedTimeInEpoch; + } + + public byte[] getValueInBytes() { + return Bytes.toBytes(value); + } + + public long getValue() { + return value; + } + + public byte[] getAccessStatsType() { + return Bytes.toBytes(accessStatsType.toString()); + } + + public TableName getTable() { + return table; + } + + public byte[] getKeyRangeStart() { + return keyRangeStart; + } + + public byte[] getKeyRangeEnd() { + return keyRangeEnd; + } + + /* + * Each AccessStats record is uniquely identified by a row key which is a combination of table + * name and time, followed by fixed set of KeyPartDescriptors; all encoded in fix length binary + * encoding except time. For a given granularity, this method will return list of + * KeyPartDescriptor which will be used in generating row key. + */ + protected abstract List getKeyPartDescriptors(); + + /* + * Since only this class knows how to divide byte encoded row key suffix into different parts, + * this method is supposed to do that. + */ + protected abstract byte[][] convertKeySuffixToByteArrayList(byte[] keyByteArray, int indexStart); + + /* + * This method should set the fields specific to given granularity by using decoded row key parts. + */ + protected abstract void setFieldsUsingKeyParts(List strings); + + class KeyPartDescriptor { + private String keyStr; + private int uidLengthInBytes; + + public KeyPartDescriptor(String keyStr, int uidLengthInBytes) { + this.keyStr = keyStr; + this.uidLengthInBytes = uidLengthInBytes; + } + + public String getKeyStr() { + return keyStr; + } + + public int getUidLengthInBytes() { + return uidLengthInBytes; + } + + } + + public enum AccessStatsType { + READCOUNT, WRITECOUNT + } + + public enum AccessStatsGranularity { + REGION + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/AccessStatsFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/AccessStatsFactory.java new file mode 100644 index 000000000000..d42e7e0dcb89 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/AccessStatsFactory.java @@ -0,0 +1,48 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.stats; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.stats.AccessStats.AccessStatsGranularity; +import org.apache.hadoop.hbase.regionserver.stats.AccessStats.AccessStatsType; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class AccessStatsFactory { + public static AccessStats getAccessStatsObj(TableName table, + AccessStatsGranularity accessStatsGranularity, AccessStatsType accessStatsType, + byte[] keyRangeStart, byte[] keyRangeEnd, long timeInEpoch, long value) { + AccessStats accessStats = null; + + switch (accessStatsGranularity) { + case REGION: + accessStats = new RegionAccessStats(table, accessStatsType, keyRangeStart, keyRangeEnd, + timeInEpoch, value); + break; + default: + accessStats = new RegionAccessStats(table, accessStatsType, keyRangeStart, keyRangeEnd, + timeInEpoch, value); + break; + } + + return accessStats; + } + +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/AccessStatsRecorderConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/AccessStatsRecorderConstants.java new file mode 100644 index 000000000000..0580f3165d36 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/AccessStatsRecorderConstants.java @@ -0,0 +1,60 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.stats; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class AccessStatsRecorderConstants { + // config key to specify iteration period for recording periodic region counters + public final static String REGION_STATS_RECORDER_IN_MINUTES_KEY = + "hbase.region.stats.duration.minutes"; + + public final static String ACCESS_STATS_TABLE_KEY = "hbase.region.stats.table"; + + public final static String ACCESS_STATS_TABLE_NAME_DEFAULT = "ACCESS_STATS"; + + public final static byte[] STATS_COLUMN_FAMILY = Bytes.toBytes("S"); + public final static int STATS_COLUMN_FAMILY_TTL = 30*24*60*60; // 30 days + public final static int STATS_COLUMN_FAMILY_MAXVERSIONS = 1; // 1 version is sufficient + + public final static byte[] STATS_COLUMN_QUALIFIER_START_KEY = Bytes.toBytes("SK"); + public final static byte[] STATS_COLUMN_QUALIFIER_END_KEY = Bytes.toBytes("EK"); + + public final static byte[] UID_COLUMN_FAMILY = Bytes.toBytes("U");; + public final static byte[] UID_COLUMN_QUALIFIER = { (byte) 1 }; + + public final static byte[] UID_AUTO_INCREMENT_COLUMN_FAMILY = Bytes.toBytes("UA");; + + public final static byte[] UID_AUTO_INCREMENT_ROW_KEY = + Bytes.toBytes("ROW_FOR_AUTOINCREMENT_COLUMNS"); + + public final static byte[] UID_AUTO_INCREMENT_COLUMN_QUALIFIER_1 = { (byte) 10 }; + public final static byte[] UID_AUTO_INCREMENT_COLUMN_QUALIFIER_2 = { (byte) 20 }; + public final static byte[] UID_AUTO_INCREMENT_COLUMN_QUALIFIER_3 = { (byte) 30 }; + public final static byte[] UID_AUTO_INCREMENT_COLUMN_QUALIFIER_4 = { (byte) 40 }; + + public final static int MAX_UID_BYTE_ARRAY_LENGTH_SUPPORTED = 4; + + public final static int NUM_RETRIES_FOR_GENERATING_UID = 5; + public final static long WAIT_TIME_BEFORE_RETRY_FOR_GENERATING_UID = 30*1000; + +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/AccessStatsRecorderTableImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/AccessStatsRecorderTableImpl.java new file mode 100644 index 000000000000..a677ad8b0115 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/AccessStatsRecorderTableImpl.java @@ -0,0 +1,227 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.stats; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.stats.AccessStats.AccessStatsGranularity; +import org.apache.hadoop.hbase.regionserver.stats.AccessStats.AccessStatsType; +import org.apache.hadoop.hbase.regionserver.stats.AccessStats.KeyPartDescriptor; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class AccessStatsRecorderTableImpl implements IAccessStatsRecorder { + + private static final Log LOG = LogFactory.getLog(AccessStatsRecorderTableImpl.class); + + private Connection connection; + private TableName tableNameToStoreStats; + private UidGenerator uidGenerator; + + private boolean isInitializationDoneCorrectly = false; + + public AccessStatsRecorderTableImpl(Configuration configuration) { + this.tableNameToStoreStats = AccessStatsRecorderUtils.getInstance().getTableNameToRecordStats(); + + try { + connection = ConnectionFactory.createConnection(configuration); + uidGenerator = new UidGenerator(configuration); + isInitializationDoneCorrectly = true; + } catch (Exception e) { + LOG.error( + "Exception while creating connection in AccessStatsRecorderTableImpl " + e.getMessage()); + } + } + + @Override + public void writeAccessStats(List listAccessStats) { + if (!isInitializationDoneCorrectly) { + LOG.error( + "AccessStatsRecorderTableImpl is not initialized properly so skipping writeAccessStats call"); + + return; + } + + try (HTable tableToStoreStats = (HTable) connection.getTable(tableNameToStoreStats)) { + List puts = new ArrayList<>(); + + for (AccessStats accessStats : listAccessStats) { + byte[] rowKey = getRowKey(accessStats.getTable(), accessStats.getKeyPartDescriptors(), + accessStats.normalizedTimeInEpoch); + + Put put = new Put(rowKey); + put.addColumn(AccessStatsRecorderConstants.STATS_COLUMN_FAMILY, + accessStats.getAccessStatsType(), accessStats.getValueInBytes()); + put.addColumn(AccessStatsRecorderConstants.STATS_COLUMN_FAMILY, + AccessStatsRecorderConstants.STATS_COLUMN_QUALIFIER_START_KEY, + accessStats.getKeyRangeStart()); + put.addColumn(AccessStatsRecorderConstants.STATS_COLUMN_FAMILY, + AccessStatsRecorderConstants.STATS_COLUMN_QUALIFIER_END_KEY, + accessStats.getKeyRangeEnd()); + puts.add(put); + + LOG.info("Access stats details " + accessStats.getTable().getNameAsString() + " " + + accessStats.normalizedTimeInEpoch + " " + + Bytes.toString(accessStats.getAccessStatsType()) + " " + accessStats.getValue()); + } + + tableToStoreStats.put(puts); + + LOG.info("Access stats written to the table"); + + } catch (Exception e) { + e.printStackTrace(); + LOG.error("Exception in writeAccessStats with message " + e.getMessage()); + } + + } + + @Override + public List readAccessStats(TableName table, AccessStatsType accessStatsType, + AccessStatsGranularity accessStatsGranularity, long epochTimeTo, int numIterations) { + int iterationDurationInMinutes = AccessStatsRecorderUtils.getInstance().getIterationDuration(); + epochTimeTo = AccessStatsRecorderUtils.getInstance().getNormalizedTime(epochTimeTo); + + List accessStatsList = new ArrayList<>(); + + try (HTable tableToStoreStats = (HTable) connection.getTable(tableNameToStoreStats)) { + long epochCurrent; + + for (int i = 0; i < numIterations; i++) { + epochCurrent = epochTimeTo - i * iterationDurationInMinutes * 60 * 1000; + + Scan scan = new Scan(); + byte[] rowKeyPrefix = getRowKeyPrefix(table, epochCurrent); + scan.setRowPrefixFilter(rowKeyPrefix); + ResultScanner scanner = tableToStoreStats.getScanner(scan); + + for (Result result = scanner.next(); result != null; result = scanner.next()) { + byte[] rowKey = result.getRow(); + AccessStats accessStats = AccessStatsFactory.getAccessStatsObj(table, + accessStatsGranularity, accessStatsType, + readValue(AccessStatsRecorderConstants.STATS_COLUMN_QUALIFIER_START_KEY, result), + readValue(AccessStatsRecorderConstants.STATS_COLUMN_QUALIFIER_END_KEY, result), + epochCurrent, Bytes.toLong(readValue(Bytes.toBytes(accessStatsType.toString()), result))); + + byte[][] byteArrayOfArrays = + accessStats.convertKeySuffixToByteArrayList(rowKey, rowKeyPrefix.length); + + List stringList = new ArrayList<>(); + for (int j = 0; j < byteArrayOfArrays.length; j++) { + stringList.add(uidGenerator.getStringForUID(byteArrayOfArrays[j])); + } + + accessStats.setFieldsUsingKeyParts(stringList); + + accessStatsList.add(accessStats); + } + } + } catch (Exception e) { + e.printStackTrace(); + LOG.error("Exception in readAccessStats with message " + e.getMessage()); + } + return accessStatsList; + } + + private byte[] readValue(byte[] columnQualifier, Result result) { + return result.getValue(AccessStatsRecorderConstants.STATS_COLUMN_FAMILY, columnQualifier); + } + + /* + * Row key is byte encoded collection of strings - + * + */ + private byte[] getRowKey(TableName table, List keyPartDescriptors, + long epochTime) throws Exception { + epochTime = AccessStatsRecorderUtils.getInstance().getNormalizedTime(epochTime); + int currentPointer = 0; + byte[] rowKeyPrefix = getRowKeyPrefix(table, epochTime); + + // first count number of bytes needed in byte array + int numBytes = rowKeyPrefix.length; + + for (KeyPartDescriptor keyPartDescriptor : keyPartDescriptors) { + numBytes += keyPartDescriptor.getUidLengthInBytes(); + } + + byte[] rowKey = new byte[numBytes]; + currentPointer = copyByteArray(currentPointer, rowKey, rowKeyPrefix); + + for (KeyPartDescriptor keyPartDescriptor : keyPartDescriptors) { + byte[] uid = uidGenerator.getUIDForString(keyPartDescriptor.getKeyStr(), + keyPartDescriptor.getUidLengthInBytes()); + + currentPointer = copyByteArray(currentPointer, rowKey, uid); + } + + return rowKey; + } + + /* + * row key prefix is fixed length byte array + * + */ + private byte[] getRowKeyPrefix(TableName table, long epochTime) throws Exception { + epochTime = AccessStatsRecorderUtils.getInstance().getNormalizedTime(epochTime); + int numBytes = 1 + 3 + 4; + int currentPointer = 0; + + byte[] rowKeyPrefix = new byte[numBytes]; + byte[] uid = uidGenerator.getUIDForString(table.getNameAsString(), 1); + currentPointer = copyByteArray(currentPointer, rowKeyPrefix, uid); + uid = uidGenerator.getUIDForString(table.getNamespaceAsString(), 3); + currentPointer = copyByteArray(currentPointer, rowKeyPrefix, uid); + + int unixTime = (int) (epochTime / 1000); + byte[] timeStampInBytes = new byte[] { (byte) (unixTime >> 24), (byte) (unixTime >> 16), + (byte) (unixTime >> 8), (byte) unixTime + + }; + + currentPointer = copyByteArray(currentPointer, rowKeyPrefix, timeStampInBytes); + + return rowKeyPrefix; + } + + private int copyByteArray(int startPointerInDestArray, byte[] destArray, byte[] sourceArray) { + for (int i = 0; i < sourceArray.length; i++) { + destArray[startPointerInDestArray++] = sourceArray[i]; + } + return startPointerInDestArray; + } + + @Override + public void close() throws IOException { + connection.close(); + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/AccessStatsRecorderUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/AccessStatsRecorderUtils.java new file mode 100644 index 000000000000..21d1549a84b1 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/AccessStatsRecorderUtils.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.stats; + +import java.util.Calendar; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.yetus.audience.InterfaceAudience; +import org.mortbay.log.Log; + +@InterfaceAudience.Private +public class AccessStatsRecorderUtils { + + private Configuration conf = null; + + private static volatile AccessStatsRecorderUtils instance; + private static Object mutex = new Object(); + + private AccessStatsRecorderUtils(Configuration configuration) { + this.conf = configuration; + } + + public static AccessStatsRecorderUtils createInstance(Configuration configuration) { + AccessStatsRecorderUtils result = instance; + if (result == null) { + synchronized (mutex) { + result = instance; + if (result == null) + { + instance = result = new AccessStatsRecorderUtils(configuration); + } + } + } + return result; + } + + public static AccessStatsRecorderUtils getInstance() { + return instance; + } + + public int getIterationDuration() { + int durationInMinutes = conf.getInt(AccessStatsRecorderConstants.REGION_STATS_RECORDER_IN_MINUTES_KEY, 15); + if (durationInMinutes > 60) { + durationInMinutes = 60; + } + return durationInMinutes; + } + + public TableName getTableNameToRecordStats() { + String[] tableNames = conf.getStrings(AccessStatsRecorderConstants.ACCESS_STATS_TABLE_KEY); + + if (tableNames != null && tableNames.length >= 1) return TableName.valueOf(tableNames[0]); + + return TableName.valueOf(AccessStatsRecorderConstants.ACCESS_STATS_TABLE_NAME_DEFAULT); + } + + public long getNormalizedTime(long epochTime) { + Calendar calendar = Calendar.getInstance(); + calendar.setTimeInMillis(epochTime); + + return getNormalizedTimeInternal(calendar); + } + + public long getNormalizedTimeCurrent() { + Calendar calendar = Calendar.getInstance(); + return getNormalizedTimeInternal(calendar); + } + + private long getNormalizedTimeInternal(Calendar calendar) { + calendar.set(Calendar.SECOND, 0); + calendar.set(Calendar.MILLISECOND, 0); + + int modulo = (int) (calendar.get(Calendar.MINUTE) % getIterationDuration()); + if (modulo > 0) { + calendar.add(Calendar.MINUTE, -modulo); + } + + long currentTime = calendar.getTimeInMillis(); + return currentTime; + } + + //supposed to be used only with test or once post cluster deployment. + public void createTableToStoreStats() throws Exception + { + try(Connection connection = ConnectionFactory.createConnection(conf)) + { + HTableDescriptor tableDescriptor = new HTableDescriptor(getTableNameToRecordStats()); + + HColumnDescriptor columnDescriptorStats = new HColumnDescriptor(AccessStatsRecorderConstants.STATS_COLUMN_FAMILY); + columnDescriptorStats.setTimeToLive(AccessStatsRecorderConstants.STATS_COLUMN_FAMILY_TTL); + columnDescriptorStats.setMaxVersions(AccessStatsRecorderConstants.STATS_COLUMN_FAMILY_MAXVERSIONS); + + tableDescriptor.addFamily(columnDescriptorStats); + + HColumnDescriptor columnDescriptorUID = new HColumnDescriptor(AccessStatsRecorderConstants.UID_COLUMN_FAMILY); + columnDescriptorUID.setMaxVersions(AccessStatsRecorderConstants.STATS_COLUMN_FAMILY_MAXVERSIONS); + + tableDescriptor.addFamily(columnDescriptorUID); + + HColumnDescriptor columnDescriptorAutoIncrement = new HColumnDescriptor(AccessStatsRecorderConstants.UID_AUTO_INCREMENT_COLUMN_FAMILY); + columnDescriptorAutoIncrement.setMaxVersions(AccessStatsRecorderConstants.STATS_COLUMN_FAMILY_MAXVERSIONS); + + tableDescriptor.addFamily(columnDescriptorAutoIncrement); + + Admin admin = connection.getAdmin(); + admin.createTable(tableDescriptor); + admin.close(); + + Log.info("Table created with name - "+ tableDescriptor.getNameAsString()); + } + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/IAccessStatsRecorder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/IAccessStatsRecorder.java new file mode 100644 index 000000000000..8902cf5a0fb4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/IAccessStatsRecorder.java @@ -0,0 +1,55 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.stats; + +import java.io.Closeable; +import java.util.List; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.stats.AccessStats.AccessStatsGranularity; +import org.apache.hadoop.hbase.regionserver.stats.AccessStats.AccessStatsType; +import org.apache.yetus.audience.InterfaceAudience; + + +/* + * Interface to store access stats. + */ +@InterfaceAudience.Private +public interface IAccessStatsRecorder extends Closeable{ + + /** + * @param table TableName for which access stats is to be read + * @param accessStatsType Read Stats or Write Stats + * @param accessStatsGranularity only REGION level granularity is supported currently + * @param epochTimeTo latest time till when access stats is required + * @param numIterations number of access stats records to be returned + * @return List of AccessStats + */ + public List readAccessStats(TableName table, AccessStatsType accessStatsType, AccessStatsGranularity accessStatsGranularity, + long epochTimeTo, int numIterations); + + + + /** + * @param listAccessStats List of AccessStats to store in DB + */ + public void writeAccessStats(List listAccessStats) ; + +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/RegionAccessStats.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/RegionAccessStats.java new file mode 100644 index 000000000000..9425fc8b2566 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/RegionAccessStats.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.stats; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.TableName; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class RegionAccessStats extends AccessStats { + + public RegionAccessStats(TableName table, AccessStatsType accessStatsType, byte[] keyRangeStart, + byte[] keyRangeEnd, long value) { + super(table, accessStatsType, keyRangeStart, keyRangeEnd, value); + } + + public RegionAccessStats(TableName table, AccessStatsType accessStatsType, byte[] keyRangeStart, + byte[] keyRangeEnd, long time, long value) { + super(table, accessStatsType, keyRangeStart, keyRangeEnd, time, value); + } + + private String regionName = null; + + public void setRegionName(String regionName) { + this.regionName = regionName; + } + + public String getRegionName() { + return regionName; + } + + @Override + protected List getKeyPartDescriptors() { + List keyList = new ArrayList<>(); + keyList.add(new KeyPartDescriptor("Region", 1)); + keyList.add(new KeyPartDescriptor(regionName, 4)); + + return keyList; + } + + @Override + protected byte[][] convertKeySuffixToByteArrayList(byte[] keyByteArray, int indexStart) { + byte[][] byteArrayOfArrays = new byte[2][]; + + byte[] regionUid = new byte[1]; + regionUid[0] = keyByteArray[indexStart]; + + byte[] regionNameUid = new byte[4]; + for (int i = 0; i < 4; i++) { + regionNameUid[i] = keyByteArray[i + indexStart + 1]; + } + + byteArrayOfArrays[0] = regionUid; + byteArrayOfArrays[1] = regionNameUid; + + return byteArrayOfArrays; + } + + @Override + protected void setFieldsUsingKeyParts(List strings) { + regionName = strings.get(1); //second value should be region name + } + +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/UidGenerator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/UidGenerator.java new file mode 100644 index 000000000000..0202c50cabb0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/stats/UidGenerator.java @@ -0,0 +1,326 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.stats; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs.Ids; + +/* + * This class is to generate UID for storing access stats. One special row in the access stats will + * be used to store columns for generating monotonically increasing and unique ids. The idea is to + * use HBase's incrementColumnValue feature to generate monotonically increasing numbers and use + * those numbers as uids. This class support generation of UID with given length. For each length, + * specific column qualifies is used to make sure no collision happens across different byte array + * lengths. + */ +@InterfaceAudience.Private +public class UidGenerator implements Closeable { + + private static final Log LOG = LogFactory.getLog(UidGenerator.class); + + private Connection connection; + private TableName tableName; + + String zookeeperParentNode = "/salesforce/uid-generator"; + ZKWatcher zooKeeperWatcher; + + private HashMap> stringToByteArrayHashMap = new HashMap<>(); + private HashMap> byteArrayToStringHashMap = new HashMap<>(); + + boolean storeUIDsInMemory = true; + + public UidGenerator(Configuration configuration) throws IOException, KeeperException { + this.tableName = AccessStatsRecorderUtils.getInstance().getTableNameToRecordStats(); + connection = ConnectionFactory.createConnection(configuration); + + zooKeeperWatcher = new ZKWatcher(configuration, "UidGenerator", null); + LOG.info("Creating the parent lock node:" + zookeeperParentNode); + ZKUtil.createWithParents(zooKeeperWatcher, zookeeperParentNode); + + for(int i=1; i<= AccessStatsRecorderConstants.MAX_UID_BYTE_ARRAY_LENGTH_SUPPORTED; i++) + { + stringToByteArrayHashMap.put(i, new HashMap()); + byteArrayToStringHashMap.put(i, new HashMap()); + } + } + + /* + * If UIDs need not be cached in memory then use this constructor. + * Currently its being used only for testing purpose. + */ + public UidGenerator(Configuration configuration, boolean storeUIDsInMemory) throws IOException, KeeperException { + this(configuration); + this.storeUIDsInMemory = storeUIDsInMemory; + } + + public byte[] getUIDForString(String keyStr, int uidLengthInBytes) + throws Exception, InterruptedException { + if(uidLengthInBytes > 4) + { + throw new Exception("UID only upto size 4 is supported."); + } + + byte[] uid = null; + for (int i = 0; i < AccessStatsRecorderConstants.NUM_RETRIES_FOR_GENERATING_UID; i++) { + uid = getUIDForStringInternal(keyStr, uidLengthInBytes); + if (uid != null) { + break; + } + + LOG.info("getUIDForString returned null, Retry Count :" + i); + Thread.sleep(AccessStatsRecorderConstants.WAIT_TIME_BEFORE_RETRY_FOR_GENERATING_UID); + } + + if (uid == null) { + throw new Exception("Couldn't generate UID even after multiple retries."); + } + + return uid; + } + + public String getStringForUID(byte[] uidByteArray) throws IOException { + String string = byteArrayToStringHashMap.get(uidByteArray.length).get(Bytes.toString(uidByteArray)); + + if (string == null) { + LOG.info("String for UID not found in memory, looking in table."); + try (HTable table = (HTable) connection.getTable(tableName)) { + Get getVal = new Get(uidByteArray); + Result result = table.get(getVal); + byte[] value = result.getValue(AccessStatsRecorderConstants.UID_COLUMN_FAMILY, + getColumnQualifierBasedOnUidLength(uidByteArray.length)); + if(value != null) + { + string = new String(value); + } + } + } + + return string; + } + + @Override + public void close() throws IOException { + if (connection != null) { + connection.close(); + } + + if (zooKeeperWatcher != null) { + zooKeeperWatcher.close(); + } + } + + private byte[] getColumnQualifierBasedOnUidLength(int uidLengthInBytes) { + byte[] columnQualifierByteArray = + AccessStatsRecorderConstants.UID_AUTO_INCREMENT_COLUMN_QUALIFIER_1; + switch (uidLengthInBytes) { + case 1: + columnQualifierByteArray = AccessStatsRecorderConstants.UID_AUTO_INCREMENT_COLUMN_QUALIFIER_1; + break; + + case 2: + columnQualifierByteArray = AccessStatsRecorderConstants.UID_AUTO_INCREMENT_COLUMN_QUALIFIER_2; + break; + + case 3: + columnQualifierByteArray = AccessStatsRecorderConstants.UID_AUTO_INCREMENT_COLUMN_QUALIFIER_3; + break; + + case 4: + columnQualifierByteArray = AccessStatsRecorderConstants.UID_AUTO_INCREMENT_COLUMN_QUALIFIER_4; + break; + + default: + break; + } + return columnQualifierByteArray; + } + + private byte[] readUIDFromTable(String keyStr, HTable table, int uidLengthInBytes) + throws IOException { + byte[] byteArray = null; + + Get getVal = new Get(Bytes.toBytes(keyStr)); + Result result = table.get(getVal); + + byteArray = result.getValue(AccessStatsRecorderConstants.UID_COLUMN_FAMILY, + getColumnQualifierBasedOnUidLength(uidLengthInBytes)); + + return byteArray; + } + + /* + * Following is rough sketch of the steps followed - + + * 1. Check if there is an entry in the table with given string as key + * 2. If yes, then use the value as uid. + * 3. If not, then follow next set of steps - + * 4. take a lock at zookeeper level for the given string. + * 5. Check again if there is an entry in the table with given string as key; + * this is important as someone might have taken the lock, finished the work and + * eventually released the lock between steps #1 and #4. If there is an entry present, + * simply use that value as uid. If no entry present then move to next steps - + * 6. get uid by HTable#incrementColumnValue + * 7. store string equivalent as row key and uid as value + * 8. store uid as row key and string equivalent as value + * 9. release zookeeper lock + */ + private byte[] getUIDForStringInternal(String keyStr, int uidLengthInBytes) throws Exception { + byte[] byteArray = null; + + Long byteArrayinLong = stringToByteArrayHashMap.get(uidLengthInBytes).get(keyStr); + if(byteArrayinLong != null) + { + LOG.info("UID found in memory; keyStr - " + keyStr); + + byteArray = convertLongToByteArray(uidLengthInBytes, byteArrayinLong); + } + + if (byteArray == null) { + LOG.info("UID not found in memory; reading table. keyStr - " + keyStr); + long nextUid =0; + try (HTable table = (HTable) connection.getTable(tableName)) { + byteArray = readUIDFromTable(keyStr, table, uidLengthInBytes); + + if (byteArray == null) { + LOG.info("UID not found in table; need to generate. keyStr - " + keyStr); + + if (acquireLock(uidLengthInBytes + "_" + keyStr)) { + LOG.info("Acquired zookeeper lock."); + // check the table again + byteArray = readUIDFromTable(keyStr, table, uidLengthInBytes); + + if (byteArray == null) { // generate uid + LOG.info("Generating UID for keyStr - " + keyStr); + + byte[] columnQualifierByteArray = + getColumnQualifierBasedOnUidLength(uidLengthInBytes); + + nextUid = table.incrementColumnValue( + AccessStatsRecorderConstants.UID_AUTO_INCREMENT_ROW_KEY, + AccessStatsRecorderConstants.UID_AUTO_INCREMENT_COLUMN_FAMILY, + columnQualifierByteArray, 1); + + byteArray = convertLongToByteArray(uidLengthInBytes, nextUid); + + List putList = new ArrayList<>(); + + Put put = new Put(Bytes.toBytes(keyStr)); + put.addColumn(AccessStatsRecorderConstants.UID_COLUMN_FAMILY, + columnQualifierByteArray, byteArray); + putList.add(put); + + put = new Put(byteArray); + put.addColumn(AccessStatsRecorderConstants.UID_COLUMN_FAMILY, + columnQualifierByteArray, Bytes.toBytes(keyStr)); + putList.add(put); + + table.put(putList); + } else { + LOG.info("Someone else had already generated required UID; no worries. keyStr - "+ keyStr); + } + releaseLock(uidLengthInBytes + "_" + keyStr); + } else { + LOG.warn( + "Couldn't acquire zookeeper lock; this means someone is already generating UID for this keyStr. Please wait and retry. keyStr - "+ keyStr); + } + } + } + if (storeUIDsInMemory && byteArray != null) { // fill the in-memory hashmap + stringToByteArrayHashMap.get(uidLengthInBytes).put(keyStr, converyByteArrayToLong(byteArray)); + byteArrayToStringHashMap.get(uidLengthInBytes).put(Bytes.toString(byteArray), keyStr); + } + } + + return byteArray; + } + + private long converyByteArrayToLong(byte[] byteArray) + { + long value = 0; + for (int i = 0; i < byteArray.length; i++) + { + value += ((long) byteArray[i] & 0xffL) << (8 * i); + } + + return value; + } + + private byte[] convertLongToByteArray(int uidLengthInBytes, Long byteArrayinLong) { + byte[] byteArray = new byte[uidLengthInBytes]; + for(int i =0; i< uidLengthInBytes; i++) + { + byteArray[i] = (byte) (byteArrayinLong >> (i*8)); + } + return byteArray; + } + + private boolean acquireLock(String lockName) throws KeeperException, InterruptedException { + String lockNode = zookeeperParentNode + "/" + lockName; + String nodeValue = UUID.randomUUID().toString(); + LOG.info("Trying to acquire the lock by creating node:" + lockNode + " value:" + nodeValue); + try { + zooKeeperWatcher.getRecoverableZooKeeper().create(lockNode, Bytes.toBytes(nodeValue), + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + } catch (KeeperException.NodeExistsException e) { + LOG.info("Node " + lockName + " already exists. Another process has the lock. " + + ". This may not be an error condition." + e.getMessage()); + return false; + } + LOG.info("Obtained the lock :" + lockNode); + return true; + } + + private boolean releaseLock(String lockName) throws KeeperException, InterruptedException { + String lockNode = zookeeperParentNode + "/" + lockName; + LOG.info("Releasing lock node:" + lockNode); + try { + zooKeeperWatcher.getRecoverableZooKeeper().delete(lockNode, 0); + } catch (KeeperException.NodeExistsException e) { + LOG.info("Node " + lockName + " already exists. Another process has the lock. " + + ". This may not be an error condition." + e.getMessage()); + return false; + } + LOG.info("Deleted the lock :" + lockNode); + return true; + } +} \ No newline at end of file diff --git a/hbase-server/src/main/resources/hbase-webapps/master/accessStats.jsp b/hbase-server/src/main/resources/hbase-webapps/master/accessStats.jsp new file mode 100644 index 000000000000..c58f0923ab32 --- /dev/null +++ b/hbase-server/src/main/resources/hbase-webapps/master/accessStats.jsp @@ -0,0 +1,214 @@ +<%-- +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +--%> +<%@page + import="org.apache.hadoop.hbase.regionserver.stats.RegionAccessStats"%> +<%@page import="java.time.Instant"%> +<%@page + import="org.apache.hadoop.hbase.regionserver.stats.AccessStats.AccessStatsGranularity"%> +<%@page + import="org.apache.hadoop.hbase.regionserver.stats.AccessStats.AccessStatsType"%> +<%@page import="org.apache.hadoop.hbase.TableName"%> +<%@page import="org.apache.hadoop.hbase.regionserver.stats.AccessStats"%> +<%@page + import="org.apache.hadoop.hbase.regionserver.stats.AccessStatsRecorderTableImpl"%> +<%@page + import="org.apache.hadoop.hbase.regionserver.stats.IAccessStatsRecorder"%> +<%@page + import="org.apache.hadoop.hbase.regionserver.stats.AccessStatsRecorderUtils"%> +<%@ page contentType="text/html;charset=UTF-8" + import="static org.apache.commons.lang.StringEscapeUtils.escapeXml" + import="java.net.URLEncoder" import="java.util.TreeMap" + import="java.util.Map" import="java.util.*" + import="org.apache.commons.lang.StringEscapeUtils" + import="org.apache.hadoop.conf.Configuration" + import="org.apache.hadoop.hbase.Cell" + import="org.apache.hadoop.hbase.client.HTable" + import="org.apache.hadoop.hbase.client.Admin" + import="org.apache.hadoop.hbase.client.Result" + import="org.apache.hadoop.hbase.client.ResultScanner" + import="org.apache.hadoop.hbase.client.Scan" + import="org.apache.hadoop.hbase.HRegionInfo" + import="org.apache.hadoop.hbase.master.HMaster" + import="org.apache.hadoop.hbase.util.Bytes" + import="org.apache.hadoop.hbase.HBaseConfiguration"%> +<% + HMaster master = (HMaster) getServletContext().getAttribute(HMaster.MASTER); + Configuration conf = master.getConfiguration(); + + String fqtn = request.getParameter("name"); + final String escaped_fqtn = StringEscapeUtils.escapeHtml(fqtn); + + AccessStatsRecorderUtils.createInstance(conf); + int durationInMinutes = AccessStatsRecorderUtils.getInstance().getIterationDuration(); + + int iterations = 100; //TODO take this as a parameter + + String countType = request.getParameter("countType"); + if (countType == null) { + countType = "WRITECOUNT"; //default + } +%> + + + + + +Table: <%=escaped_fqtn%> + + + + + + + + + <%!private String getColor(long count, long maxCount, long minCount) { + int red, green; + int blue = 0; + + if (maxCount == 0) { + maxCount = 1; //to avoid divide by zero + } + + int number = (int) ((count * 254) / (maxCount - minCount)); + + String colorPart = Integer.toHexString(number); + String color = "#"+colorPart+colorPart+colorPart; + + return color; + }%> + <% + if (fqtn != null) { + + Map> regionDetails = new HashMap>(); + + long maxCount = 0; + long minCount = 0; + + Date endTime = new Date(); + Date startTime = new Date(); + + try (IAccessStatsRecorder accessStatsRecorder = new AccessStatsRecorderTableImpl(conf)) { + List accessStatsList = accessStatsRecorder.readAccessStats( + TableName.valueOf(escaped_fqtn), AccessStatsType.valueOf(countType), + AccessStatsGranularity.REGION, Instant.now().toEpochMilli(), iterations); + + for (AccessStats accessStats : accessStatsList) { + RegionAccessStats regionAccessStats = (RegionAccessStats) accessStats; + long value = regionAccessStats.getValue(); + String regionName = regionAccessStats.getRegionName(); + + if (regionDetails.get(regionName) == null) { + regionDetails.put(regionName, new HashMap()); + } + + regionDetails.get(regionName).put(regionAccessStats.getEpochTime(), regionAccessStats); + + if (maxCount < value) { + maxCount = value; + } + + if (value != -1 && minCount > value) { + minCount = value; + } + } + + if (accessStatsList.size() > 0) { + endTime = new Date(accessStatsList.get(0).getEpochTime()); + startTime = new Date(accessStatsList.get(accessStatsList.size() - 1).getEpochTime()); + } + } + %> +
+
+ + + + + + + + + + + + + + + + +
<%=escaped_fqtn%>Metric name : + Metric interval : <%=durationInMinutes%> minutes +
(Minimum value) <%=minCount%> + <%=maxCount%> (Maximum value)
<%=startTime.toString()%> +     to     <%=endTime.toString()%> +
+
+
+ + <% + for (Map.Entry> regionDetail : regionDetails.entrySet()) { + String regionName = regionDetail.getKey(); + %> + + <% + Map countTimeDetails = regionDetail.getValue(); + long epochCurrent = 0; + for (int i = 0; i < iterations; i++) { + epochCurrent = endTime.getTime() - i * durationInMinutes * 60 * 1000; + RegionAccessStats regionAccessStats = countTimeDetails.get(epochCurrent); + + if(regionAccessStats != null) + { + long count = regionAccessStats.getValue(); + + String hoverText = (new Date(epochCurrent)).toString() + + ", Region:"+ regionName + + ", StartKey:"+ Bytes.toString(regionAccessStats.getKeyRangeStart()) + + ", EndKey:"+ Bytes.toString(regionAccessStats.getKeyRangeEnd()) + + ", Count:"+count; + + %> + + <% + } + } + %> + + <% + } + %> +
   
+ <% + } + %> +
+
+ + + + \ No newline at end of file diff --git a/hbase-server/src/main/resources/hbase-webapps/master/tablesDetailed.jsp b/hbase-server/src/main/resources/hbase-webapps/master/tablesDetailed.jsp index 2ad44a3d68bd..20afe9799d9d 100644 --- a/hbase-server/src/main/resources/hbase-webapps/master/tablesDetailed.jsp +++ b/hbase-server/src/main/resources/hbase-webapps/master/tablesDetailed.jsp @@ -51,6 +51,7 @@ + <% for (TableDescriptor htDesc : tables) { %> @@ -59,6 +60,9 @@ <%= escapeXml( htDesc.getTableName().getNameAsString()) %> + diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAccessStats.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAccessStats.java new file mode 100644 index 000000000000..8df7dadc872e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAccessStats.java @@ -0,0 +1,251 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.stats.AccessStats; +import org.apache.hadoop.hbase.regionserver.stats.AccessStats.AccessStatsGranularity; +import org.apache.hadoop.hbase.regionserver.stats.AccessStats.AccessStatsType; +import org.apache.hadoop.hbase.regionserver.stats.AccessStatsRecorderConstants; +import org.apache.hadoop.hbase.regionserver.stats.AccessStatsRecorderTableImpl; +import org.apache.hadoop.hbase.regionserver.stats.AccessStatsRecorderUtils; +import org.apache.hadoop.hbase.regionserver.stats.IAccessStatsRecorder; +import org.apache.hadoop.hbase.regionserver.stats.RegionAccessStats; +import org.apache.hadoop.hbase.regionserver.stats.UidGenerator; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category;; + +@Category(LargeTests.class) +public class TestAccessStats { + private static Configuration conf = HBaseConfiguration.create(); + private static HBaseTestingUtility utility; + private static UidGenerator uidGenerator; + private static int iterationDurationInMinutes = 1; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf.setInt(AccessStatsRecorderConstants.REGION_STATS_RECORDER_IN_MINUTES_KEY, + iterationDurationInMinutes); + + utility = new HBaseTestingUtility(conf); + utility.startMiniCluster(); + + AccessStatsRecorderUtils.createInstance(conf); + AccessStatsRecorderUtils.getInstance().createTableToStoreStats(); + + uidGenerator = new UidGenerator(conf); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + uidGenerator.close(); + + utility.shutdownMiniCluster(); + } + + @Test + public void testStatsRecordWriteAndGet() throws Exception { + TableName testTable = TableName.valueOf("TestTable"); + try (IAccessStatsRecorder accessStatsRecorder = new AccessStatsRecorderTableImpl(conf)) { + long epochTime = AccessStatsRecorderUtils.getInstance().getNormalizedTimeCurrent(); + Random random = new Random(); + List accessStatsList = new ArrayList<>(); + + String regionName = "Region 1"; + + long count = random.nextInt(99); + if (count < 0) count = count * -1; + + RegionAccessStats regionAccessStats = + new RegionAccessStats(testTable, AccessStatsType.READCOUNT, Bytes.toBytes("StartKey"), + Bytes.toBytes("EndKey"), epochTime, count); + regionAccessStats.setRegionName(regionName); + accessStatsList.add(regionAccessStats); + + accessStatsRecorder.writeAccessStats(accessStatsList); + + accessStatsList = accessStatsRecorder.readAccessStats(testTable, AccessStatsType.READCOUNT, + AccessStatsGranularity.REGION, Instant.now().toEpochMilli(), 10); + + Assert.assertEquals("AccessStatsList size is not as expected", 1, accessStatsList.size()); + for (AccessStats accessStats : accessStatsList) { + regionAccessStats = (RegionAccessStats) accessStats; + Assert.assertEquals("AccessStats type is not as expected", AccessStatsType.READCOUNT.toString(), + Bytes.toString(regionAccessStats.getAccessStatsType())); + Assert.assertEquals("AccessStats value is not as expected", count, + regionAccessStats.getValue()); + Assert.assertEquals("AccessStats regionName is not as expected", regionName, + regionAccessStats.getRegionName()); + Assert.assertEquals("AccessStats StartKey is not as expected", "StartKey", + Bytes.toString(regionAccessStats.getKeyRangeStart())); + Assert.assertEquals("AccessStats EndKey is not as expected", "EndKey", + Bytes.toString(regionAccessStats.getKeyRangeEnd())); + } + } + } + + @Test + public void testUIDGeneration() throws Exception { + generateAndVerifyUIDs("String1", 1); + generateAndVerifyUIDs("String2", 1); + + generateAndVerifyUIDs("String1", 2); + + generateAndVerifyUIDs("String2", 3); + generateAndVerifyUIDs("String3", 3); + generateAndVerifyUIDs("String4", 3); + + generateAndVerifyUIDs("String2", 4); + + generateAndVerifyUIDs("String5", 1); + } + + @Test + public void testStatsGeneration() { + try { + Connection connection = ConnectionFactory.createConnection(conf); + + TableName tableName = TableName.valueOf("TestTable"); + + HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); + HColumnDescriptor columnDescriptor = new HColumnDescriptor(Bytes.toBytes("CF")); + tableDescriptor.addFamily(columnDescriptor); + Admin admin = connection.getAdmin(); + admin.createTable(tableDescriptor); + admin.close(); + + HTable table = (HTable) connection.getTable(tableName); + + sleep(); + + int readOperation = 2; + int writeOperation = 3; + + perforReadAndWriteOperations(table, readOperation, writeOperation); + + sleep(); + + int[] expectedReadCountArray1 = { readOperation }; + int[] expectedWriteCountArray1 = { writeOperation }; + + vallidateStatsStored(tableName, expectedReadCountArray1, AccessStatsType.READCOUNT, 1); + vallidateStatsStored(tableName, expectedWriteCountArray1, AccessStatsType.WRITECOUNT, 1); + + readOperation = 7; + writeOperation = 10; + + perforReadAndWriteOperations(table, readOperation, writeOperation); + + sleep(); + + int[] expectedReadCountArray2 = { readOperation, 2 }; + int[] expectedWriteCountArray2 = { writeOperation, 3 }; + + vallidateStatsStored(tableName, expectedReadCountArray2, AccessStatsType.READCOUNT, 2); + vallidateStatsStored(tableName, expectedWriteCountArray2, AccessStatsType.WRITECOUNT, 2); + + } catch (Exception e) { + Assert.fail("Failed with exception message -"+ e.getMessage()); + } + } + + private void vallidateStatsStored(TableName tableName, int[] expectedCounts, + AccessStatsType accessStatsType, int iteration) throws IOException { + try (IAccessStatsRecorder accessStatsRecorder = new AccessStatsRecorderTableImpl(conf)) { + List accessStatsList = accessStatsRecorder.readAccessStats(tableName, + accessStatsType, AccessStatsGranularity.REGION, Instant.now().toEpochMilli(), iteration); + + int i = 0; + for (AccessStats accessStats : accessStatsList) { + RegionAccessStats regionAccessStats = (RegionAccessStats) accessStats; + long value = regionAccessStats.getValue(); + + Assert.assertEquals("Stats is not as expected.", expectedCounts[i++], value); + } + } + } + + private void perforReadAndWriteOperations(HTable table, int readOperation, int writeOperation) + throws IOException { + for (int i = 0; i < readOperation; i++) { + performReadOperation(table); + } + + for (int i = 0; i < writeOperation; i++) { + performWriteOperation(table); + } + } + + private void sleep() throws InterruptedException { + Thread.sleep(iterationDurationInMinutes * 60 * 1000 + 10 * 1000); + } + + private void performWriteOperation(HTable table) throws IOException { + long now = Instant.now().toEpochMilli(); + + Put put = new Put(Bytes.toBytes(now)); + put.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("C"), Bytes.toBytes(now)); + + table.put(put); + } + + private void performReadOperation(HTable table) throws IOException { + table.get(new Get(Bytes.toBytes(Instant.now().toEpochMilli()))); + } + + private void generateAndVerifyUIDs(String str, int uidLengthInBytes) + throws Exception, InterruptedException { + byte[] uidByteArray = uidGenerator.getUIDForString(str, uidLengthInBytes); + String strEquivalentUid = uidGenerator.getStringForUID(uidByteArray); + + Assert.assertEquals("Generated UID byte array is not of expected length", uidLengthInBytes, + uidByteArray.length); + Assert.assertEquals("String and corresponding UID equivalent are not same", str, + strEquivalentUid); + + byte[] uidByteArraySecond = uidGenerator.getUIDForString(str, uidLengthInBytes); + Assert.assertEquals("Different UID has been returned for same string", + Bytes.toString(uidByteArray), Bytes.toString(uidByteArraySecond)); + } +} \ No newline at end of file
TableHeatmap Description
+ Key Access Heatmap + <%= htDesc.toString() %>