Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ public final class StreamStatisticNames {
public static final String STREAM_READ_OPERATIONS =
"stream_read_operations";

/** GET requests made by the analytics stream. */
public static final String STREAM_READ_ANALYTICS_GET_REQUESTS = "stream_read_analytics_get_requests";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: update these comments to say: GET requests made by the analytics stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually do we really need new statistics? why can't we re-use the existing ones?

ACTION_HTTP_GET_REQUEST and ACTION_HTTP_HEAD_REQUEST?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, new analytics specific statistics provide isolated tracking. In case if both S3A and Analytics streams are used simultaneously, separate metrics provide precise tracking

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need isolated tracking? Can we ever use mix and match streams?


/** HEAD requests made by the analytics stream. */
public static final String STREAM_READ_ANALYTICS_HEAD_REQUESTS = "stream_read_analytics_head_requests";

/**
* Count of readVectored() operations in an input stream.
* Value: {@value}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,8 @@ private InputStreamStatistics(
StreamStatisticNames.STREAM_READ_CLOSE_OPERATIONS,
StreamStatisticNames.STREAM_READ_OPENED,
StreamStatisticNames.STREAM_READ_BYTES,
StreamStatisticNames.STREAM_READ_ANALYTICS_GET_REQUESTS,
StreamStatisticNames.STREAM_READ_ANALYTICS_HEAD_REQUESTS,
StreamStatisticNames.STREAM_READ_EXCEPTIONS,
StreamStatisticNames.STREAM_READ_FULLY_OPERATIONS,
StreamStatisticNames.STREAM_READ_OPERATIONS,
Expand Down Expand Up @@ -1128,6 +1130,15 @@ public void readVectoredBytesDiscarded(int discarded) {
bytesDiscardedInVectoredIO.addAndGet(discarded);
}

@Override
public void incrementAnalyticsGetRequests() {
increment(StreamStatisticNames.STREAM_READ_ANALYTICS_GET_REQUESTS);
}
@Override
public void incrementAnalyticsHeadRequests() {
increment(StreamStatisticNames.STREAM_READ_ANALYTICS_HEAD_REQUESTS);
}

@Override
public void executorAcquired(Duration timeInQueue) {
// update the duration fields in the IOStatistics.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,14 @@ public enum Statistic {
"Gauge of active memory in use",
TYPE_GAUGE),

ANALYTICS_GET_REQUESTS(
StreamStatisticNames.STREAM_READ_ANALYTICS_GET_REQUESTS,
"GET requests made by analytics streams",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nit but i believe an important one. We have to make sure we are giving right name, description as well as doing it in the right place.
1/ why are these are here at these lines? This is neither lexicographic nor following a logical separation. We probably need to move these up to Object IO section.

2/ Next is the name for Stream read we had STREAM_READ_OPENED and we added STREAM_READ_ANALYTICS_OPENED so following the same schema we probably need to re-name this to OBJECT_GET_ANALYTICS_REQUESTS (there is no GET equivalent currently because stream read opens = gets. Likewise for HEAD, we have OBJECT_METADATA_REQUESTS, which we can add OBJECT_METADATA_ANALYTICS_REQUESTS.

3/ we should make sure the messages are similar to others. For example, we should call this Count of object get requests made by analytics stream. and for head Count of requests for object metadata made by analytics stream

TYPE_COUNTER),
ANALYTICS_HEAD_REQUESTS(
StreamStatisticNames.STREAM_READ_ANALYTICS_HEAD_REQUESTS,
"HEAD requests made by analytics streams",
TYPE_COUNTER),
/* Stream Write statistics */

STREAM_WRITE_EXCEPTIONS(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a.impl.streams;

import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.DurationTracker;
import software.amazon.s3.analyticsaccelerator.util.RequestCallback;

Check failure on line 23 in hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java#L23

javadoc: error: cannot find symbol

/**
* Implementation of AAL's RequestCallback interface that tracks analytics operations.
*/
public class AnalyticsRequestCallback implements RequestCallback {

Check failure on line 28 in hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java#L28

javadoc: error: cannot find symbol
private final S3AInputStreamStatistics statistics;

/**
* Create a new callback instance.
* @param statistics the statistics to update
*/
public AnalyticsRequestCallback(S3AInputStreamStatistics statistics) {
this.statistics = statistics;
}

@Override
public void onGetRequest() {
statistics.incrementAnalyticsGetRequests();
// Update ACTION_HTTP_GET_REQUEST statistic
DurationTracker tracker = statistics.initiateGetRequest();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you really need a DurationTracker here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

way to increment the ACTION_HTTP_GET_REQUEST statistic. The statistics.initiateGetRequest() call increments the counter, and tracker.close() completes the measurement

tracker.close();
}

@Override
public void onHeadRequest() {
statistics.incrementAnalyticsHeadRequests();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
import software.amazon.s3.analyticsaccelerator.util.S3URI;
import software.amazon.s3.analyticsaccelerator.util.RequestCallback;

Check failure on line 31 in hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java#L31

javadoc: error: cannot find symbol

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,13 +49,17 @@
private S3SeekableInputStream inputStream;
private long lastReadCurrentPos = 0;
private volatile boolean closed;
private final long contentLength;
private final long lengthLimit;

public static final Logger LOG = LoggerFactory.getLogger(AnalyticsStream.class);

public AnalyticsStream(final ObjectReadParameters parameters,
final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws IOException {
super(InputStreamType.Analytics, parameters);
S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();
this.contentLength = s3Attributes.getLen();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getLen() is needed for length limiting, it ensures AnalyticsStream respects the declared file length from openFile() options rather than reading the entire S3 object

this.lengthLimit = s3Attributes.getLen();
this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(),
s3Attributes.getKey()), buildOpenStreamInformation(parameters));
getS3AStreamStatistics().streamOpened(InputStreamType.Analytics);
Expand All @@ -63,13 +68,23 @@
@Override
public int read() throws IOException {
throwIfClosed();
if (getPos() >= lengthLimit) {
return -1; // EOF reached due to length limit

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to close underlying stream here before returning? We might check this from other Stream implementations.

}
getS3AStreamStatistics().readOperationStarted(getPos(), 1);

int bytesRead;
try {
bytesRead = inputStream.read();
} catch (IOException ioe) {
onReadFailure(ioe);
throw ioe;
}

if (bytesRead != -1) {
incrementBytesRead(1);
}

return bytesRead;
}

Expand Down Expand Up @@ -105,26 +120,47 @@
*/
public int readTail(byte[] buf, int off, int len) throws IOException {
throwIfClosed();
getS3AStreamStatistics().readOperationStarted(getPos(), len);

int bytesRead;
try {
bytesRead = inputStream.readTail(buf, off, len);
} catch (IOException ioe) {
onReadFailure(ioe);
throw ioe;
}

if (bytesRead > 0) {
incrementBytesRead(bytesRead);
}

return bytesRead;
}

@Override
public int read(byte[] buf, int off, int len) throws IOException {
throwIfClosed();
long pos = getPos();
if (pos >= lengthLimit) {
return -1; // EOF reached due to length limit
}

Check failure on line 147 in hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java#L147

blanks: end of line
// Limit read length to not exceed the length limit
int maxRead = (int) Math.min(len, lengthLimit - pos);
getS3AStreamStatistics().readOperationStarted(pos, maxRead);

int bytesRead;
try {
bytesRead = inputStream.read(buf, off, len);
bytesRead = inputStream.read(buf, off, maxRead);
} catch (IOException ioe) {
onReadFailure(ioe);
throw ioe;
}

if (bytesRead > 0) {
incrementBytesRead(bytesRead);
}

return bytesRead;
}

Expand Down Expand Up @@ -194,10 +230,13 @@
}

private OpenStreamInformation buildOpenStreamInformation(ObjectReadParameters parameters) {

final RequestCallback requestCallback = new AnalyticsRequestCallback(getS3AStreamStatistics());

OpenStreamInformation.OpenStreamInformationBuilder openStreamInformationBuilder =
OpenStreamInformation.builder()
.inputPolicy(mapS3AInputPolicyToAAL(parameters.getContext()
.getInputPolicy()));
.getInputPolicy())).requestCallback(requestCallback);

if (parameters.getObjectAttributes().getETag() != null) {
openStreamInformationBuilder.objectMetadata(ObjectMetadata.builder()
Expand Down Expand Up @@ -235,4 +274,16 @@
throw new IOException(getKey() + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
}
}

/**
* Increment the bytes read counter if there is a stats instance
* and the number of bytes read is more than zero.
* @param bytesRead number of bytes read
*/
private void incrementBytesRead(long bytesRead) {
getS3AStreamStatistics().bytesRead(bytesRead);
if (getContext().getStats() != null && bytesRead > 0) {
getContext().getStats().incrementBytesRead(bytesRead);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,14 @@ void readVectoredOperationStarted(int numIncomingRanges,
long getVersionMismatches();

long getInputPolicy();

/**
* Increment the counter for GET requests made by Analytics Accelerator Library.
*/
void incrementAnalyticsGetRequests();
/**
* Increment the counter for HEAD requests made by Analytics Accelerator Library.
*/
void incrementAnalyticsHeadRequests();
/**
* Get the value of a counter.
* @param name counter name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,12 @@ public void seekForwards(final long skipped,
final long bytesReadInSeek) {

}

@Override
public void incrementAnalyticsGetRequests() {
}
@Override
public void incrementAnalyticsHeadRequests() {
}
@Override
public long streamOpened() {
return 0;
Expand Down
Loading