diff --git a/google-cloud-clients/google-cloud-storage/src/main/java/com/google/cloud/storage/Blob.java b/google-cloud-clients/google-cloud-storage/src/main/java/com/google/cloud/storage/Blob.java index 722df42e9477..7cb7e1a91b95 100644 --- a/google-cloud-clients/google-cloud-storage/src/main/java/com/google/cloud/storage/Blob.java +++ b/google-cloud-clients/google-cloud-storage/src/main/java/com/google/cloud/storage/Blob.java @@ -16,9 +16,11 @@ package com.google.cloud.storage; +import static com.google.cloud.RetryHelper.runWithRetries; import static com.google.cloud.storage.Blob.BlobSourceOption.toGetOptions; import static com.google.cloud.storage.Blob.BlobSourceOption.toSourceOptions; import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.concurrent.Executors.callable; import com.google.api.services.storage.model.StorageObject; import com.google.auth.ServiceAccountSigner; @@ -34,13 +36,11 @@ import com.google.cloud.storage.spi.v1.StorageRpc; import com.google.common.base.Function; import com.google.common.io.BaseEncoding; +import com.google.common.io.CountingOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.OutputStream; import java.net.URL; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; import java.nio.file.Files; import java.nio.file.Path; import java.security.Key; @@ -211,20 +211,40 @@ static Storage.BlobGetOption[] toGetOptions(BlobInfo blobInfo, BlobSourceOption. * @throws StorageException upon failure */ public void downloadTo(Path path, BlobSourceOption... options) { - try (OutputStream outputStream = Files.newOutputStream(path); - ReadChannel reader = reader(options)) { - WritableByteChannel channel = Channels.newChannel(outputStream); - ByteBuffer bytes = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE); - while (reader.read(bytes) > 0) { - bytes.flip(); - channel.write(bytes); - bytes.clear(); - } + try (OutputStream outputStream = Files.newOutputStream(path)) { + downloadTo(outputStream, options); } catch (IOException e) { throw new StorageException(e); } } + /** + * Downloads this blob to the given output stream using specified blob read options. + * + * @param outputStream + * @param options + */ + public void downloadTo(OutputStream outputStream, BlobSourceOption... options) { + final CountingOutputStream countingOutputStream = new CountingOutputStream(outputStream); + final StorageRpc storageRpc = this.options.getStorageRpcV1(); + final Map requestOptions = StorageImpl.optionMap(getBlobId(), options); + runWithRetries( + callable( + new Runnable() { + @Override + public void run() { + storageRpc.read( + getBlobId().toPb(), + requestOptions, + countingOutputStream.getCount(), + countingOutputStream); + } + }), + this.options.getRetrySettings(), + StorageImpl.EXCEPTION_HANDLER, + this.options.getClock()); + } + /** * Downloads this blob to the given file path. * diff --git a/google-cloud-clients/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobTest.java b/google-cloud-clients/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobTest.java index d68bf965f1f4..6f91f968a03e 100644 --- a/google-cloud-clients/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobTest.java +++ b/google-cloud-clients/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobTest.java @@ -33,6 +33,9 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import com.google.api.core.ApiClock; +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.services.storage.model.StorageObject; import com.google.cloud.ReadChannel; import com.google.cloud.storage.Acl.Project; import com.google.cloud.storage.Acl.Project.ProjectRole; @@ -41,12 +44,13 @@ import com.google.cloud.storage.Blob.BlobSourceOption; import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.cloud.storage.Storage.CopyRequest; +import com.google.cloud.storage.spi.v1.StorageRpc; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.io.BaseEncoding; import java.io.File; +import java.io.OutputStream; import java.net.URL; -import java.nio.ByteBuffer; import java.nio.file.Files; import java.security.Key; import java.util.List; @@ -131,6 +135,24 @@ public class BlobTest { private static final Key KEY = new SecretKeySpec(BaseEncoding.base64().decode(BASE64_KEY), "AES256"); + // This retrying setting is used by test testDownloadWithRetries. This unit test is setup + // to write one byte and then throw retryable exception, it then writes another bytes on + // second call succeeds. + private static final RetrySettings RETRY_SETTINGS = + RetrySettings.newBuilder().setMaxAttempts(2).build(); + private static final ApiClock API_CLOCK = + new ApiClock() { + @Override + public long nanoTime() { + return 42_000_000_000L; + } + + @Override + public long millisTime() { + return 42_000L; + } + }; + private Storage storage; private Blob blob; private Blob expectedBlob; @@ -566,28 +588,75 @@ public void testBuilder() { @Test public void testDownload() throws Exception { final byte[] expected = {1, 2}; + StorageRpc mockStorageRpc = createNiceMock(StorageRpc.class); + expect(storage.getOptions()).andReturn(mockOptions).times(1); + replay(storage); + expect(mockOptions.getStorageRpcV1()).andReturn(mockStorageRpc); + expect(mockOptions.getRetrySettings()).andReturn(RETRY_SETTINGS); + expect(mockOptions.getClock()).andReturn(API_CLOCK); + replay(mockOptions); + blob = new Blob(storage, new BlobInfo.BuilderImpl(BLOB_INFO)); + expect( + mockStorageRpc.read( + anyObject(StorageObject.class), + anyObject(Map.class), + eq(0l), + anyObject(OutputStream.class))) + .andAnswer( + new IAnswer() { + @Override + public Long answer() throws Throwable { + ((OutputStream) getCurrentArguments()[3]).write(expected); + return 2l; + } + }); + replay(mockStorageRpc); + File file = File.createTempFile("blob", ".tmp"); + blob.downloadTo(file.toPath()); + byte actual[] = Files.readAllBytes(file.toPath()); + assertArrayEquals(expected, actual); + } - initializeExpectedBlob(2); - ReadChannel channel = createNiceMock(ReadChannel.class); + @Test + public void testDownloadWithRetries() throws Exception { + final byte[] expected = {1, 2}; + StorageRpc mockStorageRpc = createNiceMock(StorageRpc.class); expect(storage.getOptions()).andReturn(mockOptions); - expect(storage.reader(BLOB_INFO.getBlobId())).andReturn(channel); replay(storage); - // First read should return 2 bytes. - expect(channel.read(anyObject(ByteBuffer.class))) + expect(mockOptions.getStorageRpcV1()).andReturn(mockStorageRpc); + expect(mockOptions.getRetrySettings()).andReturn(RETRY_SETTINGS); + expect(mockOptions.getClock()).andReturn(API_CLOCK); + replay(mockOptions); + blob = new Blob(storage, new BlobInfo.BuilderImpl(BLOB_INFO)); + expect( + mockStorageRpc.read( + anyObject(StorageObject.class), + anyObject(Map.class), + eq(0l), + anyObject(OutputStream.class))) .andAnswer( - new IAnswer() { + new IAnswer() { @Override - public Integer answer() throws Throwable { - // Modify the argument to match the expected behavior of `read`. - ((ByteBuffer) getCurrentArguments()[0]).put(expected); - return 2; + public Long answer() throws Throwable { + ((OutputStream) getCurrentArguments()[3]).write(expected[0]); + throw new StorageException(504, "error"); } }); - // Second read should return 0 bytes. - expect(channel.read(anyObject(ByteBuffer.class))).andReturn(0); - replay(channel); - initializeBlob(); - + expect( + mockStorageRpc.read( + anyObject(StorageObject.class), + anyObject(Map.class), + eq(1l), + anyObject(OutputStream.class))) + .andAnswer( + new IAnswer() { + @Override + public Long answer() throws Throwable { + ((OutputStream) getCurrentArguments()[3]).write(expected[1]); + return 1l; + } + }); + replay(mockStorageRpc); File file = File.createTempFile("blob", ".tmp"); blob.downloadTo(file.toPath()); byte actual[] = Files.readAllBytes(file.toPath());