Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.cloud.storage.spi.v1.StorageRpc;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigInteger;
import java.nio.file.FileAlreadyExistsException;
import java.util.ArrayList;
Expand Down Expand Up @@ -297,6 +298,37 @@ public Tuple<String, byte[]> read(
return Tuple.of("etag-goes-here", ret);
}

@Override
public long read(
StorageObject from, Map<Option, ?> options, long position, OutputStream outputStream) {
// if non-null, then we check the file's at that generation.
Long generationMatch = null;
for (Option op : options.keySet()) {
if (op.equals(StorageRpc.Option.IF_GENERATION_MATCH)) {
generationMatch = (Long) options.get(op);
} else {
throw new UnsupportedOperationException("Unknown option: " + op);
}
}
String key = fullname(from);
if (!contents.containsKey(key)) {
throw new StorageException(404, "File not found: " + key);
}
checkGeneration(key, generationMatch);
if (position < 0) {
position = 0;
}
byte[] full = contents.get(key);
int bytes = (int) (full.length - position);
if (bytes <= 0) {
// special case: you're trying to read past the end
return 0;
}
byte[] ret = new byte[bytes];
System.arraycopy(full, (int) position, ret, 0, bytes);
return bytes;
}

@Override
public String open(StorageObject object, Map<Option, ?> options) throws StorageException {
String key = fullname(object);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.LinkedList;
Expand Down Expand Up @@ -640,30 +641,59 @@ public RpcBatch createBatch() {
return new DefaultRpcBatch(storage);
}

private Get createReadRequest(StorageObject from, Map<Option, ?> options) throws IOException {
Get req =
storage
.objects()
.get(from.getBucket(), from.getName())
.setGeneration(from.getGeneration())
.setIfMetagenerationMatch(Option.IF_METAGENERATION_MATCH.getLong(options))
.setIfMetagenerationNotMatch(Option.IF_METAGENERATION_NOT_MATCH.getLong(options))
.setIfGenerationMatch(Option.IF_GENERATION_MATCH.getLong(options))
.setIfGenerationNotMatch(Option.IF_GENERATION_NOT_MATCH.getLong(options))
.setUserProject(Option.USER_PROJECT.getString(options));
setEncryptionHeaders(req.getRequestHeaders(), ENCRYPTION_KEY_PREFIX, options);
req.setReturnRawInputStream(true);
return req;
}

@Override
public long read(
StorageObject from, Map<Option, ?> options, long position, OutputStream outputStream) {
Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_READ);
Scope scope = tracer.withSpan(span);
try {
Get req = createReadRequest(from, options);
req.getMediaHttpDownloader().setBytesDownloaded(position);
req.getMediaHttpDownloader().setDirectDownloadEnabled(true);
req.executeMediaAndDownloadTo(outputStream);
return req.getMediaHttpDownloader().getNumBytesDownloaded();
} catch (IOException ex) {
span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage()));
StorageException serviceException = translate(ex);
if (serviceException.getCode() == SC_REQUESTED_RANGE_NOT_SATISFIABLE) {
return 0;
}
throw serviceException;
} finally {
scope.close();
span.end();
}
}

@Override
public Tuple<String, byte[]> read(
StorageObject from, Map<Option, ?> options, long position, int bytes) {
Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_READ);
Scope scope = tracer.withSpan(span);
try {
Get req =
storage
.objects()
.get(from.getBucket(), from.getName())
.setGeneration(from.getGeneration())
.setIfMetagenerationMatch(Option.IF_METAGENERATION_MATCH.getLong(options))
.setIfMetagenerationNotMatch(Option.IF_METAGENERATION_NOT_MATCH.getLong(options))
.setIfGenerationMatch(Option.IF_GENERATION_MATCH.getLong(options))
.setIfGenerationNotMatch(Option.IF_GENERATION_NOT_MATCH.getLong(options))
.setUserProject(Option.USER_PROJECT.getString(options));
checkArgument(position >= 0, "Position should be non-negative, is %d", position);
Get req = createReadRequest(from, options);
StringBuilder range = new StringBuilder();
range.append("bytes=").append(position).append("-").append(position + bytes - 1);
HttpHeaders requestHeaders = req.getRequestHeaders();
requestHeaders.setRange(range.toString());
setEncryptionHeaders(requestHeaders, ENCRYPTION_KEY_PREFIX, options);
ByteArrayOutputStream output = new ByteArrayOutputStream(bytes);
req.setReturnRawInputStream(true);
req.executeMedia().download(output);
String etag = req.getLastResponseHeaders().getETag();
return Tuple.of(etag, output.toByteArray());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.cloud.Tuple;
import com.google.cloud.storage.StorageException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -282,6 +283,15 @@ StorageObject compose(
*/
Tuple<String, byte[]> read(StorageObject from, Map<Option, ?> options, long position, int bytes);

/**
* Reads all the bytes from a storage object at the given position in to outputstream using direct
* download.
*
* @return number of bytes downloaded, returns 0 if position higher than length.
* @throws StorageException upon failure
*/
long read(StorageObject from, Map<Option, ?> options, long position, OutputStream outputStream);

/**
* Opens a resumable upload channel for a given storage object.
*
Expand Down