-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Storage: Fix slow download performance #5791
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
c00cb4c
615dab7
ae1ffab
1bdc864
48777c2
0aebf4c
58469f5
cb65519
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,9 @@ | |
| import static org.junit.Assert.assertSame; | ||
| import static org.junit.Assert.assertTrue; | ||
|
|
||
| import com.google.api.core.ApiClock; | ||
| import com.google.api.gax.retrying.RetrySettings; | ||
| import com.google.api.services.storage.model.StorageObject; | ||
| import com.google.cloud.ReadChannel; | ||
| import com.google.cloud.storage.Acl.Project; | ||
| import com.google.cloud.storage.Acl.Project.ProjectRole; | ||
|
|
@@ -41,12 +44,13 @@ | |
| import com.google.cloud.storage.Blob.BlobSourceOption; | ||
| import com.google.cloud.storage.Storage.BlobWriteOption; | ||
| import com.google.cloud.storage.Storage.CopyRequest; | ||
| import com.google.cloud.storage.spi.v1.StorageRpc; | ||
| import com.google.common.collect.ImmutableList; | ||
| import com.google.common.collect.ImmutableMap; | ||
| import com.google.common.io.BaseEncoding; | ||
| import java.io.File; | ||
| import java.io.OutputStream; | ||
| import java.net.URL; | ||
| import java.nio.ByteBuffer; | ||
| import java.nio.file.Files; | ||
| import java.security.Key; | ||
| import java.util.List; | ||
|
|
@@ -58,6 +62,7 @@ | |
| import org.junit.After; | ||
| import org.junit.Before; | ||
| import org.junit.Test; | ||
| import org.threeten.bp.Duration; | ||
|
||
|
|
||
| public class BlobTest { | ||
|
|
||
|
|
@@ -130,6 +135,28 @@ public class BlobTest { | |
| private static final String BASE64_KEY = "JVzfVl8NLD9FjedFuStegjRfES5ll5zc59CIXw572OA="; | ||
| private static final Key KEY = | ||
| new SecretKeySpec(BaseEncoding.base64().decode(BASE64_KEY), "AES256"); | ||
| private static final RetrySettings RETRY_SETTINGS = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's heavy retry settings in the tests. Is this by design and how do they help?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated retry settings needed for test. |
||
| RetrySettings.newBuilder() | ||
| .setInitialRetryDelay(Duration.ofMillis(100L)) | ||
| .setRetryDelayMultiplier(1.3) | ||
| .setMaxRetryDelay(Duration.ofMillis(60000L)) | ||
| .setInitialRpcTimeout(Duration.ofMillis(20000L)) | ||
| .setRpcTimeoutMultiplier(1.0) | ||
| .setMaxRpcTimeout(Duration.ofMillis(20000L)) | ||
| .setTotalTimeout(Duration.ofMillis(600000L)) | ||
| .build(); | ||
| private static final ApiClock API_CLOCK = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please document how retries are being exercised in these unit tests. It will help for posterity. |
||
| new ApiClock() { | ||
| @Override | ||
| public long nanoTime() { | ||
| return 42_000_000_000L; | ||
| } | ||
|
|
||
| @Override | ||
| public long millisTime() { | ||
| return 42_000L; | ||
| } | ||
| }; | ||
|
|
||
| private Storage storage; | ||
| private Blob blob; | ||
|
|
@@ -566,28 +593,75 @@ public void testBuilder() { | |
| @Test | ||
| public void testDownload() throws Exception { | ||
| final byte[] expected = {1, 2}; | ||
| StorageRpc mockStorageRpc = createNiceMock(StorageRpc.class); | ||
| expect(storage.getOptions()).andReturn(mockOptions).times(1); | ||
| replay(storage); | ||
| expect(mockOptions.getStorageRpcV1()).andReturn(mockStorageRpc); | ||
| expect(mockOptions.getRetrySettings()).andReturn(RETRY_SETTINGS); | ||
| expect(mockOptions.getClock()).andReturn(API_CLOCK); | ||
| replay(mockOptions); | ||
| blob = new Blob(storage, new BlobInfo.BuilderImpl(BLOB_INFO)); | ||
| expect( | ||
| mockStorageRpc.read( | ||
| anyObject(StorageObject.class), | ||
| anyObject(Map.class), | ||
| eq(0l), | ||
| anyObject(OutputStream.class))) | ||
| .andAnswer( | ||
| new IAnswer<Long>() { | ||
| @Override | ||
| public Long answer() throws Throwable { | ||
| ((OutputStream) getCurrentArguments()[3]).write(expected); | ||
| return 2l; | ||
| } | ||
| }); | ||
| replay(mockStorageRpc); | ||
| File file = File.createTempFile("blob", ".tmp"); | ||
| blob.downloadTo(file.toPath()); | ||
| byte actual[] = Files.readAllBytes(file.toPath()); | ||
| assertArrayEquals(expected, actual); | ||
| } | ||
|
|
||
| initializeExpectedBlob(2); | ||
| ReadChannel channel = createNiceMock(ReadChannel.class); | ||
| @Test | ||
| public void testDownloadWithRetries() throws Exception { | ||
| final byte[] expected = {1, 2}; | ||
| StorageRpc mockStorageRpc = createNiceMock(StorageRpc.class); | ||
| expect(storage.getOptions()).andReturn(mockOptions); | ||
| expect(storage.reader(BLOB_INFO.getBlobId())).andReturn(channel); | ||
| replay(storage); | ||
| // First read should return 2 bytes. | ||
| expect(channel.read(anyObject(ByteBuffer.class))) | ||
| expect(mockOptions.getStorageRpcV1()).andReturn(mockStorageRpc); | ||
| expect(mockOptions.getRetrySettings()).andReturn(RETRY_SETTINGS); | ||
| expect(mockOptions.getClock()).andReturn(API_CLOCK); | ||
| replay(mockOptions); | ||
| blob = new Blob(storage, new BlobInfo.BuilderImpl(BLOB_INFO)); | ||
| expect( | ||
| mockStorageRpc.read( | ||
| anyObject(StorageObject.class), | ||
| anyObject(Map.class), | ||
| eq(0l), | ||
| anyObject(OutputStream.class))) | ||
| .andAnswer( | ||
| new IAnswer<Integer>() { | ||
| new IAnswer<Long>() { | ||
| @Override | ||
| public Integer answer() throws Throwable { | ||
| // Modify the argument to match the expected behavior of `read`. | ||
| ((ByteBuffer) getCurrentArguments()[0]).put(expected); | ||
| return 2; | ||
| public Long answer() throws Throwable { | ||
| ((OutputStream) getCurrentArguments()[3]).write(expected[0]); | ||
| throw new StorageException(504, "error"); | ||
| } | ||
| }); | ||
| // Second read should return 0 bytes. | ||
| expect(channel.read(anyObject(ByteBuffer.class))).andReturn(0); | ||
| replay(channel); | ||
| initializeBlob(); | ||
|
|
||
| expect( | ||
| mockStorageRpc.read( | ||
| anyObject(StorageObject.class), | ||
| anyObject(Map.class), | ||
| eq(1l), | ||
| anyObject(OutputStream.class))) | ||
| .andAnswer( | ||
| new IAnswer<Long>() { | ||
| @Override | ||
| public Long answer() throws Throwable { | ||
| ((OutputStream) getCurrentArguments()[3]).write(expected[1]); | ||
| return 1l; | ||
| } | ||
| }); | ||
| replay(mockStorageRpc); | ||
| File file = File.createTempFile("blob", ".tmp"); | ||
| blob.downloadTo(file.toPath()); | ||
| byte actual[] = Files.readAllBytes(file.toPath()); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This overloaded method doesn't look necessary. My varargs are rusty but if no Options are specified in downloadTo(OutputStream os, Options..) it should still work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes its not needed. removed it.