Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,13 @@ public void close() throws IOException {
public void sendLifeline(DatanodeRegistration registration,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xmitsInProgress, int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException {
VolumeFailureSummary volumeFailureSummary,
float volumeUsageStdDev) throws IOException {
HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration))
.setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
.setFailedVolumes(failedVolumes);
.setFailedVolumes(failedVolumes)
.setVolumeUsageStdDev(volumeUsageStdDev);
builder.addAllReports(PBHelperClient.convertStorageReports(reports));
if (cacheCapacity != 0) {
builder.setCacheCapacity(cacheCapacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public LifelineResponseProto sendLifeline(RpcController controller,
impl.sendLifeline(PBHelper.convert(request.getRegistration()), report,
request.getCacheCapacity(), request.getCacheUsed(),
request.getXmitsInProgress(), request.getXceiverCount(),
request.getFailedVolumes(), volumeFailureSummary);
request.getFailedVolumes(), volumeFailureSummary,
request.getVolumeUsageStdDev());
return VOID_LIFELINE_RESPONSE_PROTO;
} catch (IOException e) {
throw new ServiceException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,15 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks)
@Nonnull SlowDiskReports slowDisks,
float volumeUsageStdDev)
throws IOException {
HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration))
.setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
.setFailedVolumes(failedVolumes)
.setRequestFullBlockReportLease(requestFullBlockReportLease);
.setRequestFullBlockReportLease(requestFullBlockReportLease)
.setVolumeUsageStdDev(volumeUsageStdDev);
builder.addAllReports(PBHelperClient.convertStorageReports(reports));
if (cacheCapacity != 0) {
builder.setCacheCapacity(cacheCapacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ public HeartbeatResponseProto sendHeartbeat(RpcController controller,
request.getXceiverCount(), request.getFailedVolumes(),
volumeFailureSummary, request.getRequestFullBlockReportLease(),
PBHelper.convertSlowPeerInfo(request.getSlowPeersList()),
PBHelper.convertSlowDiskInfo(request.getSlowDisksList()));
PBHelper.convertSlowDiskInfo(request.getSlowDisksList()),
request.getVolumeUsageStdDev());
} catch (IOException e) {
throw new ServiceException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2657,24 +2657,24 @@ public long getProvidedCapacity() {

void updateHeartbeat(DatanodeDescriptor node, StorageReport[] reports,
long cacheCapacity, long cacheUsed, int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) {
VolumeFailureSummary volumeFailureSummary, float volumeUsageStdDev) {

for (StorageReport report: reports) {
providedStorageMap.updateStorage(node, report.getStorage());
}
node.updateHeartbeat(reports, cacheCapacity, cacheUsed, xceiverCount,
failedVolumes, volumeFailureSummary);
failedVolumes, volumeFailureSummary, volumeUsageStdDev);
}

void updateHeartbeatState(DatanodeDescriptor node,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) {
VolumeFailureSummary volumeFailureSummary, float volumeUsageStdDev) {
for (StorageReport report: reports) {
providedStorageMap.updateStorage(node, report.getStorage());
}
node.updateHeartbeatState(reports, cacheCapacity, cacheUsed, xceiverCount,
failedVolumes, volumeFailureSummary);
failedVolumes, volumeFailureSummary, volumeUsageStdDev);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ public Type getType() {
private long lastBlocksScheduledRollTime = 0;
private int volumeFailures = 0;
private VolumeFailureSummary volumeFailureSummary = null;
private float volumeUsageStdDev;

/**
* When set to true, the node is not in include list and is not allowed
Expand Down Expand Up @@ -340,7 +341,7 @@ boolean hasStaleStorages() {
}

public void resetBlocks() {
updateStorageStats(this.getStorageReports(), 0L, 0L, 0, 0, null);
updateStorageStats(this.getStorageReports(), 0L, 0L, 0, 0, null, 0.0f);
synchronized (invalidateBlocks) {
this.invalidateBlocks.clear();
}
Expand Down Expand Up @@ -379,9 +380,9 @@ public int numBlocks() {
*/
void updateHeartbeat(StorageReport[] reports, long cacheCapacity,
long cacheUsed, int xceiverCount, int volFailures,
VolumeFailureSummary volumeFailureSummary) {
VolumeFailureSummary volumeFailureSummary, float volumeUsageSD) {
updateHeartbeatState(reports, cacheCapacity, cacheUsed, xceiverCount,
volFailures, volumeFailureSummary);
volFailures, volumeFailureSummary, volumeUsageSD);
heartbeatedSinceRegistration = true;
}

Expand All @@ -390,17 +391,17 @@ void updateHeartbeat(StorageReport[] reports, long cacheCapacity,
*/
void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
long cacheUsed, int xceiverCount, int volFailures,
VolumeFailureSummary volumeFailureSummary) {
VolumeFailureSummary volumeFailureSummary, float volumeUsageSD) {
updateStorageStats(reports, cacheCapacity, cacheUsed, xceiverCount,
volFailures, volumeFailureSummary);
volFailures, volumeFailureSummary, volumeUsageSD);
setLastUpdate(Time.now());
setLastUpdateMonotonic(Time.monotonicNow());
rollBlocksScheduled(getLastUpdateMonotonic());
}

private void updateStorageStats(StorageReport[] reports, long cacheCapacity,
long cacheUsed, int xceiverCount, int volFailures,
VolumeFailureSummary volumeFailureSummary) {
VolumeFailureSummary volumeFailureSummary, float volumeUsageSD) {
long totalCapacity = 0;
long totalRemaining = 0;
long totalBlockPoolUsed = 0;
Expand Down Expand Up @@ -454,6 +455,7 @@ private void updateStorageStats(StorageReport[] reports, long cacheCapacity,
setXceiverCount(xceiverCount);
this.volumeFailures = volFailures;
this.volumeFailureSummary = volumeFailureSummary;
this.volumeUsageStdDev = volumeUsageSD;
for (StorageReport report : reports) {

DatanodeStorageInfo storage = null;
Expand Down Expand Up @@ -945,6 +947,13 @@ public VolumeFailureSummary getVolumeFailureSummary() {
return volumeFailureSummary;
}

/**
* @return the standard deviation of volume usage
*/
public float getVolumeUsageStdDev() {
return volumeUsageStdDev;
}

/**
* @param nodeReg DatanodeID to update registration for.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1751,7 +1751,8 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
int maxTransfers, int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks) throws IOException {
@Nonnull SlowDiskReports slowDisks,
float volumeUsageStdDev) throws IOException {
final DatanodeDescriptor nodeinfo;
try {
nodeinfo = getDatanode(nodeReg);
Expand All @@ -1769,7 +1770,8 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity,
cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary);
cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary,
volumeUsageStdDev);

// If we are in safemode, do not send back any recovery / replication
// requests. Don't even drain the existing queue of work.
Expand Down Expand Up @@ -1891,12 +1893,15 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
* @param xceiverCount estimated count of transfer threads running at DataNode
* @param failedVolumes count of failed volumes at DataNode
* @param volumeFailureSummary info on failed volumes at DataNode
* @param volumeUsageStdDev the standard deviation of volume usage
*
* @throws IOException if there is an error
*/
public void handleLifeline(DatanodeRegistration nodeReg,
StorageReport[] reports, long cacheCapacity,
long cacheUsed, int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException {
VolumeFailureSummary volumeFailureSummary,
float volumeUsageStdDev) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Received handleLifeline from nodeReg = " + nodeReg);
}
Expand All @@ -1917,7 +1922,7 @@ public void handleLifeline(DatanodeRegistration nodeReg,
return;
}
heartbeatManager.updateLifeline(nodeinfo, reports, cacheCapacity, cacheUsed,
xceiverCount, failedVolumes, volumeFailureSummary);
xceiverCount, failedVolumes, volumeFailureSummary, volumeUsageStdDev);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ synchronized void register(final DatanodeDescriptor d) {
addDatanode(d);

//update its timestamp
d.updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null);
d.updateHeartbeatState(StorageReport.EMPTY_ARRAY,
0L, 0L, 0, 0, null, 0.0f);
stats.add(d);
}
}
Expand Down Expand Up @@ -254,24 +255,24 @@ synchronized void removeDatanode(DatanodeDescriptor node) {
synchronized void updateHeartbeat(final DatanodeDescriptor node,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) {
VolumeFailureSummary volumeFailureSummary, float volumeUsageStdDev) {
stats.subtract(node);
blockManager.updateHeartbeat(node, reports, cacheCapacity, cacheUsed,
xceiverCount, failedVolumes, volumeFailureSummary);
xceiverCount, failedVolumes, volumeFailureSummary, volumeUsageStdDev);
stats.add(node);
}

synchronized void updateLifeline(final DatanodeDescriptor node,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) {
VolumeFailureSummary volumeFailureSummary, float volumeUsageStdDev) {
stats.subtract(node);
// This intentionally calls updateHeartbeatState instead of
// updateHeartbeat, because we don't want to modify the
// heartbeatedSinceRegistration flag. Arrival of a lifeline message does
// not count as arrival of the first heartbeat.
blockManager.updateHeartbeatState(node, reports, cacheCapacity, cacheUsed,
xceiverCount, failedVolumes, volumeFailureSummary);
xceiverCount, failedVolumes, volumeFailureSummary, volumeUsageStdDev);
stats.add(node);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
scheduler.scheduleNextHeartbeat();
StorageReport[] reports =
dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
float volumeUsageStdDev = dn.getFSDataset().getVolumeUsageStdDev();
if (LOG.isDebugEnabled()) {
LOG.debug("Sending heartbeat with " + reports.length +
" storage reports from service actor: " + this);
Expand Down Expand Up @@ -567,7 +568,8 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
volumeFailureSummary,
requestBlockReportLease,
slowPeers,
slowDisks);
slowDisks,
volumeUsageStdDev);

if (outliersReportDue) {
// If the report was due and successfully sent, schedule the next one.
Expand Down Expand Up @@ -1117,14 +1119,16 @@ private void sendLifeline() throws IOException {
.getVolumeFailureSummary();
int numFailedVolumes = volumeFailureSummary != null ?
volumeFailureSummary.getFailedStorageLocations().length : 0;
float volumeUsageStdDev = dn.getFSDataset().getVolumeUsageStdDev();
lifelineNamenode.sendLifeline(bpRegistration,
reports,
dn.getFSDataset().getCacheCapacity(),
dn.getFSDataset().getCacheUsed(),
dn.getXmitsInProgress(),
dn.getXceiverCount(),
numFailedVolumes,
volumeFailureSummary);
volumeFailureSummary,
volumeUsageStdDev);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ StorageReport[] getStorageReports(String bpid)
/** @return a volume information map (name {@literal =>} info). */
Map<String, Object> getVolumeInfoMap();

/** @return the standard deviation of a volume usage. */
float getVolumeUsageStdDev();

/**
* Returns info about volume failures.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,10 @@ public long getDfsUsed() throws IOException {
return volumes.getDfsUsed();
}

public long setDfsUsed() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this API doesn't appear to be used.

return volumes.getDfsUsed();
}

/**
* Return the total space used by dfs datanode
*/
Expand Down Expand Up @@ -3129,6 +3133,7 @@ private static class VolumeInfo {
final String directory;
final long usedSpace; // size of space used by HDFS
final long freeSpace; // size of free space excluding reserved space
final float volumeUsagePercent; // usage of volume
final long reservedSpace; // size of space reserved for non-HDFS
final long reservedSpaceForReplicas; // size of space reserved RBW or
// re-replication
Expand All @@ -3139,6 +3144,7 @@ private static class VolumeInfo {
this.directory = v.toString();
this.usedSpace = usedSpace;
this.freeSpace = freeSpace;
this.volumeUsagePercent = (usedSpace * 100.0f) / (usedSpace + freeSpace);
this.reservedSpace = v.getReserved();
this.reservedSpaceForReplicas = v.getReservedForReplicas();
this.numBlocks = v.getNumBlocks();
Expand Down Expand Up @@ -3175,6 +3181,7 @@ public Map<String, Object> getVolumeInfoMap() {
final Map<String, Object> innerInfo = new HashMap<String, Object>();
innerInfo.put("usedSpace", v.usedSpace);
innerInfo.put("freeSpace", v.freeSpace);
innerInfo.put("volumeUsagePercent", v.volumeUsagePercent);
innerInfo.put("reservedSpace", v.reservedSpace);
innerInfo.put("reservedSpaceForReplicas", v.reservedSpaceForReplicas);
innerInfo.put("numBlocks", v.numBlocks);
Expand All @@ -3184,6 +3191,26 @@ public Map<String, Object> getVolumeInfoMap() {
return info;
}

@Override
public float getVolumeUsageStdDev() {
Collection<VolumeInfo> volumeInfos = getVolumeInfo();
ArrayList<Float> usages = new ArrayList<Float>();
float totalDfsUsed = 0;
float dev = 0;
for (VolumeInfo v : volumeInfos) {
usages.add(v.volumeUsagePercent);
totalDfsUsed += v.volumeUsagePercent;
}

totalDfsUsed /= volumeInfos.size();
Collections.sort(usages);
for (Float usage : usages) {
dev += (usage - totalDfsUsed) * (usage - totalDfsUsed);
}
dev = (float) Math.sqrt(dev / usages.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a check to ensure usages.size() never returns 0?

return dev;
}

@Override //FsDatasetSpi
public void deleteBlockPool(String bpid, boolean force)
throws IOException {
Expand Down
Loading