Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ public final class StoreStatisticNames {
/** {@value}. */
public static final String OP_MODIFY_ACL_ENTRIES = "op_modify_acl_entries";

/** {@value}. */
public static final String OP_MSYNC = "op_msync";

/** {@value}. */
public static final String OP_OPEN = "op_open";

Expand Down Expand Up @@ -172,6 +175,9 @@ public final class StoreStatisticNames {
public static final String STORE_IO_THROTTLED
= "store_io_throttled";

/** Rate limiting was reported {@value}. */
public static final String STORE_IO_RATE_LIMITED = "store_io_rate_limited";

/** Requests made of a store: {@value}. */
public static final String STORE_IO_REQUEST
= "store_io_request";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -450,12 +451,37 @@ public static <B> B trackDuration(
* @param factory factory of duration trackers
* @param statistic statistic key
* @param input input callable.
* @throws IOException IO failure.
*/
public static void trackDurationOfInvocation(
DurationTrackerFactory factory,
String statistic,
InvocationRaisingIOE input) throws IOException {

measureDurationOfInvocation(factory, statistic, input);
}

/**
* Given an IOException raising callable/lambda expression,
* execute it and update the relevant statistic,
* returning the measured duration.
*
* {@link #trackDurationOfInvocation(DurationTrackerFactory, String, InvocationRaisingIOE)}
* with the duration returned for logging etc.; added as a new
* method to avoid linking problems with any code calling the existing
* method.
*
* @param factory factory of duration trackers
* @param statistic statistic key
* @param input input callable.
* @return the duration of the operation, as measured by the duration tracker.
* @throws IOException IO failure.
*/
public static Duration measureDurationOfInvocation(
DurationTrackerFactory factory,
String statistic,
InvocationRaisingIOE input) throws IOException {

// create the tracker outside try-with-resources so
// that failures can be set in the catcher.
DurationTracker tracker = createTracker(factory, statistic);
Expand All @@ -473,6 +499,7 @@ public static void trackDurationOfInvocation(
// set the failed flag.
tracker.close();
}
return tracker.asDuration();
}

/**
Expand Down Expand Up @@ -622,7 +649,7 @@ public static <B> B trackDurationOfSupplier(
* @param statistic statistic to track
* @return a duration tracker.
*/
private static DurationTracker createTracker(
public static DurationTracker createTracker(
@Nullable final DurationTrackerFactory factory,
final String statistic) {
return factory != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,15 @@ default long incrementCounter(String key) {
*/
void addTimedOperation(String prefix, Duration duration);

/**
* Add a statistics sample as a min, max and mean and count.
* @param key key to add.
* @param count count.
*/
default void addSample(String key, long count) {
incrementCounter(key, count);
addMeanStatisticSample(key, count);
addMaximumSample(key, count);
addMinimumSample(key, count);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public void close() {
public Duration asDuration() {
return firstDuration.asDuration();
}

@Override
public String toString() {
return firstDuration.toString();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,11 @@ public void close() {
}
iostats.addTimedOperation(name, asDuration());
}

@Override
public String toString() {
return " Duration of " +
(failed? (key + StoreStatisticNames.SUFFIX_FAILURES) : key)
+ ": " + super.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ public synchronized T fromJsonStream(InputStream stream) throws IOException {
@SuppressWarnings("unchecked")
public synchronized T load(File jsonFile)
throws IOException, JsonParseException, JsonMappingException {
if (!jsonFile.exists()) {
throw new FileNotFoundException("No such file: " + jsonFile);
}
if (!jsonFile.isFile()) {
throw new FileNotFoundException("Not a file: " + jsonFile);
}
Expand All @@ -181,7 +184,7 @@ public synchronized T load(File jsonFile)
try {
return mapper.readValue(jsonFile, classType);
} catch (IOException e) {
LOG.error("Exception while parsing json file {}", jsonFile, e);
LOG.warn("Exception while parsing json file {}", jsonFile, e);
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.util;

import java.time.Duration;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/**
* Minimal subset of google rate limiter class.
* Can be used to throttle use of object stores where excess load
* will trigger cluster-wide throttling, backoff etc. and so collapse
* performance.
* The time waited is returned as a Duration type.
* The google rate limiter implements this by allowing a caller to ask for
* more capacity than is available. This will be granted
* but the subsequent request will be blocked if the bucket of
* capacity hasn't let refilled to the point where there is
* capacity again.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface RateLimiting {

/**
* Acquire rate limiter capacity.
* If there is not enough space, the permits will be acquired,
* but the subsequent call will block until the capacity has been
* refilled.
* @param requestedCapacity capacity to acquire.
* @return time spent waiting for output.
*/
Duration acquire(int requestedCapacity);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.util;

import java.time.Duration;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.RateLimiter;

/**
* Factory for Rate Limiting.
* This should be only place in the code where the guava RateLimiter is imported.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class RateLimitingFactory {

private static final RateLimiting UNLIMITED = new NoRateLimiting();

/**
* No waiting took place.
*/
private static final Duration INSTANTLY = Duration.ofMillis(0);

private RateLimitingFactory() {
}

/**
* No Rate Limiting.
*/
private static class NoRateLimiting implements RateLimiting {


@Override
public Duration acquire(int requestedCapacity) {
return INSTANTLY;
}
}

/**
* Rate limiting restricted to that of a google rate limiter.
*/
private static final class RestrictedRateLimiting implements RateLimiting {
private final RateLimiter limiter;

/**
* Constructor.
* @param capacityPerSecond capacity in permits/second.
*/
private RestrictedRateLimiting(int capacityPerSecond) {
this.limiter = RateLimiter.create(capacityPerSecond);
}

@Override
public Duration acquire(int requestedCapacity) {
final double delayMillis = limiter.acquire(requestedCapacity);
return delayMillis == 0
? INSTANTLY
: Duration.ofMillis((long) (delayMillis * 1000));
}

}

/**
* Get the unlimited rate.
* @return a rate limiter which always has capacity.
*/
public static RateLimiting unlimitedRate() {
return UNLIMITED;
}

/**
* Create an instance.
* If the rate is 0; return the unlimited rate.
* @param capacity capacity in permits/second.
* @return limiter restricted to the given capacity.
*/
public static RateLimiting create(int capacity) {

return capacity == 0
? unlimitedRate()
: new RestrictedRateLimiting(capacity);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.util.functional;

import java.io.Closeable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

import static java.util.Objects.requireNonNull;

/**
* A task submitter which is closeable, and whose close() call
* shuts down the pool. This can help manage
* thread pool lifecycles.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public final class CloseableTaskPoolSubmitter implements TaskPool.Submitter,
Closeable {

/** Executors. */
private ExecutorService pool;

/**
* Constructor.
* @param pool non-null executor.
*/
public CloseableTaskPoolSubmitter(final ExecutorService pool) {
this.pool = requireNonNull(pool);
}

/**
* Get the pool.
* @return the pool.
*/
public ExecutorService getPool() {
return pool;
}

/**
* Shut down the pool.
*/
@Override
public void close() {
if (pool != null) {
pool.shutdown();
pool = null;
}
}

@Override
public Future<?> submit(final Runnable task) {
return pool.submit(task);
}
}
Loading