Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package com.google.cloud.storage;

import static com.google.cloud.RetryHelper.runWithRetries;
import static com.google.cloud.storage.Blob.BlobSourceOption.toGetOptions;
import static com.google.cloud.storage.Blob.BlobSourceOption.toSourceOptions;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.Executors.callable;

import com.google.api.services.storage.model.StorageObject;
import com.google.auth.ServiceAccountSigner;
Expand All @@ -34,13 +36,11 @@
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.base.Function;
import com.google.common.io.BaseEncoding;
import com.google.common.io.CountingOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.OutputStream;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.Key;
Expand Down Expand Up @@ -211,20 +211,40 @@ static Storage.BlobGetOption[] toGetOptions(BlobInfo blobInfo, BlobSourceOption.
* @throws StorageException upon failure
*/
public void downloadTo(Path path, BlobSourceOption... options) {
try (OutputStream outputStream = Files.newOutputStream(path);
ReadChannel reader = reader(options)) {
WritableByteChannel channel = Channels.newChannel(outputStream);
ByteBuffer bytes = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
while (reader.read(bytes) > 0) {
bytes.flip();
channel.write(bytes);
bytes.clear();
}
try (OutputStream outputStream = Files.newOutputStream(path)) {
downloadTo(outputStream, options);
} catch (IOException e) {
throw new StorageException(e);
}
}

/**
* Downloads this blob to the given output stream using specified blob read options.
*
* @param outputStream
* @param options
*/
public void downloadTo(OutputStream outputStream, BlobSourceOption... options) {
final CountingOutputStream countingOutputStream = new CountingOutputStream(outputStream);
final StorageRpc storageRpc = this.options.getStorageRpcV1();
final Map<StorageRpc.Option, ?> requestOptions = StorageImpl.optionMap(getBlobId(), options);
runWithRetries(
callable(
new Runnable() {
@Override
public void run() {
storageRpc.read(
getBlobId().toPb(),
requestOptions,
countingOutputStream.getCount(),
countingOutputStream);
}
}),
this.options.getRetrySettings(),
StorageImpl.EXCEPTION_HANDLER,
this.options.getClock());
}

/**
* Downloads this blob to the given file path.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;

import com.google.api.core.ApiClock;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.ReadChannel;
import com.google.cloud.storage.Acl.Project;
import com.google.cloud.storage.Acl.Project.ProjectRole;
Expand All @@ -41,12 +44,13 @@
import com.google.cloud.storage.Blob.BlobSourceOption;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.Storage.CopyRequest;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.BaseEncoding;
import java.io.File;
import java.io.OutputStream;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.security.Key;
import java.util.List;
Expand Down Expand Up @@ -131,6 +135,24 @@ public class BlobTest {
private static final Key KEY =
new SecretKeySpec(BaseEncoding.base64().decode(BASE64_KEY), "AES256");

// This retrying setting is used by test testDownloadWithRetries. This unit test is setup
// to write one byte and then throw retryable exception, it then writes another bytes on
// second call succeeds.
private static final RetrySettings RETRY_SETTINGS =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's heavy retry settings in the tests. Is this by design and how do they help?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated retry settings needed for test.

RetrySettings.newBuilder().setMaxAttempts(2).build();
private static final ApiClock API_CLOCK =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document how retries are being exercised in these unit tests. It will help for posterity.

new ApiClock() {
@Override
public long nanoTime() {
return 42_000_000_000L;
}

@Override
public long millisTime() {
return 42_000L;
}
};

private Storage storage;
private Blob blob;
private Blob expectedBlob;
Expand Down Expand Up @@ -566,28 +588,75 @@ public void testBuilder() {
@Test
public void testDownload() throws Exception {
final byte[] expected = {1, 2};
StorageRpc mockStorageRpc = createNiceMock(StorageRpc.class);
expect(storage.getOptions()).andReturn(mockOptions).times(1);
replay(storage);
expect(mockOptions.getStorageRpcV1()).andReturn(mockStorageRpc);
expect(mockOptions.getRetrySettings()).andReturn(RETRY_SETTINGS);
expect(mockOptions.getClock()).andReturn(API_CLOCK);
replay(mockOptions);
blob = new Blob(storage, new BlobInfo.BuilderImpl(BLOB_INFO));
expect(
mockStorageRpc.read(
anyObject(StorageObject.class),
anyObject(Map.class),
eq(0l),
anyObject(OutputStream.class)))
.andAnswer(
new IAnswer<Long>() {
@Override
public Long answer() throws Throwable {
((OutputStream) getCurrentArguments()[3]).write(expected);
return 2l;
}
});
replay(mockStorageRpc);
File file = File.createTempFile("blob", ".tmp");
blob.downloadTo(file.toPath());
byte actual[] = Files.readAllBytes(file.toPath());
assertArrayEquals(expected, actual);
}

initializeExpectedBlob(2);
ReadChannel channel = createNiceMock(ReadChannel.class);
@Test
public void testDownloadWithRetries() throws Exception {
final byte[] expected = {1, 2};
StorageRpc mockStorageRpc = createNiceMock(StorageRpc.class);
expect(storage.getOptions()).andReturn(mockOptions);
expect(storage.reader(BLOB_INFO.getBlobId())).andReturn(channel);
replay(storage);
// First read should return 2 bytes.
expect(channel.read(anyObject(ByteBuffer.class)))
expect(mockOptions.getStorageRpcV1()).andReturn(mockStorageRpc);
expect(mockOptions.getRetrySettings()).andReturn(RETRY_SETTINGS);
expect(mockOptions.getClock()).andReturn(API_CLOCK);
replay(mockOptions);
blob = new Blob(storage, new BlobInfo.BuilderImpl(BLOB_INFO));
expect(
mockStorageRpc.read(
anyObject(StorageObject.class),
anyObject(Map.class),
eq(0l),
anyObject(OutputStream.class)))
.andAnswer(
new IAnswer<Integer>() {
new IAnswer<Long>() {
@Override
public Integer answer() throws Throwable {
// Modify the argument to match the expected behavior of `read`.
((ByteBuffer) getCurrentArguments()[0]).put(expected);
return 2;
public Long answer() throws Throwable {
((OutputStream) getCurrentArguments()[3]).write(expected[0]);
throw new StorageException(504, "error");
}
});
// Second read should return 0 bytes.
expect(channel.read(anyObject(ByteBuffer.class))).andReturn(0);
replay(channel);
initializeBlob();

expect(
mockStorageRpc.read(
anyObject(StorageObject.class),
anyObject(Map.class),
eq(1l),
anyObject(OutputStream.class)))
.andAnswer(
new IAnswer<Long>() {
@Override
public Long answer() throws Throwable {
((OutputStream) getCurrentArguments()[3]).write(expected[1]);
return 1l;
}
});
replay(mockStorageRpc);
File file = File.createTempFile("blob", ".tmp");
blob.downloadTo(file.toPath());
byte actual[] = Files.readAllBytes(file.toPath());
Expand Down