Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions google/cloud/storage/_media/_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,12 @@ class ResumableUpload(UploadBase):
"""

def __init__(
self, upload_url, chunk_size, checksum="auto", headers=None, retry=DEFAULT_RETRY
self,
upload_url,
chunk_size,
checksum="auto",
headers=None,
retry=DEFAULT_RETRY,
):
super(ResumableUpload, self).__init__(upload_url, headers=headers, retry=retry)
if chunk_size % UPLOAD_CHUNK_SIZE != 0:
Expand Down Expand Up @@ -472,7 +477,12 @@ def total_bytes(self):
return self._total_bytes

def _prepare_initiate_request(
self, stream, metadata, content_type, total_bytes=None, stream_final=True
self,
stream,
metadata,
content_type,
total_bytes=None,
stream_final=True,
):
"""Prepare the contents of HTTP request to initiate upload.

Expand Down Expand Up @@ -955,7 +965,12 @@ class XMLMPUContainer(UploadBase):
"""

def __init__(
self, upload_url, filename, headers=None, upload_id=None, retry=DEFAULT_RETRY
self,
upload_url,
filename,
headers=None,
upload_id=None,
retry=DEFAULT_RETRY,
):
super().__init__(upload_url, headers=headers, retry=retry)
self._filename = filename
Expand Down
60 changes: 49 additions & 11 deletions google/cloud/storage/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@

# pylint: disable=too-many-lines

"""Create / interact with Google Cloud Storage blobs.
"""
"""Create / interact with Google Cloud Storage blobs."""

import base64
import copy
Expand Down Expand Up @@ -142,8 +141,8 @@
r"(?P<scheme>gs)://(?P<bucket_name>[a-z0-9_.-]+)/(?P<object_name>.+)"
)

_DEFAULT_CHUNKSIZE = 104857600 # 1024 * 1024 B * 100 = 100 MB
_MAX_MULTIPART_SIZE = 8388608 # 8 MB
_DEFAULT_CHUNKSIZE = 104857600 # 1024 * 1024 B * 100 = 100 MiB
_MAX_MULTIPART_SIZE = 8388608 # 8 MiB

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -181,6 +180,14 @@ class Blob(_PropertyMixin):
:type generation: long
:param generation:
(Optional) If present, selects a specific revision of this object.

:type crc32c_checksum: str
:param crc32c_checksum:
(Optional) If set, the CRC32C checksum of the blob's content.
CRC32c checksum, as described in RFC 4960, Appendix B; encoded using
base64 in big-endian byte order. See
Apenndix B: https://datatracker.ietf.org/doc/html/rfc4960#appendix-B
base64: https://datatracker.ietf.org/doc/html/rfc4648#section-4
"""

_chunk_size = None # Default value for each instance.
Expand Down Expand Up @@ -214,6 +221,7 @@ def __init__(
encryption_key=None,
kms_key_name=None,
generation=None,
crc32c_checksum=None,
):
"""
property :attr:`name`
Expand All @@ -237,6 +245,9 @@ def __init__(
if generation is not None:
self._properties["generation"] = generation

if crc32c_checksum is not None:
self._properties["crc32c"] = crc32c_checksum

@property
def bucket(self):
"""Bucket which contains the object.
Expand Down Expand Up @@ -1643,7 +1654,9 @@ def download_as_string(
:raises: :class:`google.cloud.exceptions.NotFound`
"""
warnings.warn(
_DOWNLOAD_AS_STRING_DEPRECATED, PendingDeprecationWarning, stacklevel=2
_DOWNLOAD_AS_STRING_DEPRECATED,
PendingDeprecationWarning,
stacklevel=2,
)
with create_trace_span(name="Storage.Blob.downloadAsString"):
return self.download_as_bytes(
Expand Down Expand Up @@ -1999,12 +2012,18 @@ def _do_multipart_upload(
transport = self._get_transport(client)
if "metadata" in self._properties and "metadata" not in self._changes:
self._changes.add("metadata")

info = self._get_upload_arguments(client, content_type, command=command)
headers, object_metadata, content_type = info

if "crc32c" in self._properties:
object_metadata["crc32c"] = self._properties["crc32c"]

hostname = _get_host_name(client._connection)
base_url = _MULTIPART_URL_TEMPLATE.format(
hostname=hostname, bucket_path=self.bucket.path, api_version=_API_VERSION
hostname=hostname,
bucket_path=self.bucket.path,
api_version=_API_VERSION,
)
name_value_pairs = []

Expand Down Expand Up @@ -2195,9 +2214,14 @@ def _initiate_resumable_upload(
if extra_headers is not None:
headers.update(extra_headers)

if "crc32c" in self._properties:
object_metadata["crc32c"] = self._properties["crc32c"]

hostname = _get_host_name(client._connection)
base_url = _RESUMABLE_URL_TEMPLATE.format(
hostname=hostname, bucket_path=self.bucket.path, api_version=_API_VERSION
hostname=hostname,
bucket_path=self.bucket.path,
api_version=_API_VERSION,
)
name_value_pairs = []

Expand Down Expand Up @@ -2234,7 +2258,11 @@ def _initiate_resumable_upload(

upload_url = _add_query_parameters(base_url, name_value_pairs)
upload = ResumableUpload(
upload_url, chunk_size, headers=headers, checksum=checksum, retry=retry
upload_url,
chunk_size,
headers=headers,
checksum=checksum,
retry=retry,
)

upload.initiate(
Expand Down Expand Up @@ -3426,7 +3454,11 @@ def set_iam_policy(
return Policy.from_api_repr(info)

def test_iam_permissions(
self, permissions, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY
self,
permissions,
client=None,
timeout=_DEFAULT_TIMEOUT,
retry=DEFAULT_RETRY,
):
"""API call: test permissions

Expand Down Expand Up @@ -3693,7 +3725,10 @@ def compose(

source_objects = []
for source, source_generation in zip(sources, if_source_generation_match):
source_object = {"name": source.name, "generation": source.generation}
source_object = {
"name": source.name,
"generation": source.generation,
}

preconditions = {}
if source_generation is not None:
Expand Down Expand Up @@ -4154,7 +4189,10 @@ def open(
"encoding, errors and newline arguments are for text mode only"
)
return BlobWriter(
self, chunk_size=chunk_size, ignore_flush=ignore_flush, **kwargs
self,
chunk_size=chunk_size,
ignore_flush=ignore_flush,
**kwargs,
)
elif mode in ("r", "rt"):
if ignore_flush:
Expand Down
26 changes: 23 additions & 3 deletions google/cloud/storage/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,10 @@ class LifecycleRuleSetStorageClass(dict):
def __init__(self, storage_class, **kw):
conditions = LifecycleRuleConditions(**kw)
rule = {
"action": {"type": "SetStorageClass", "storageClass": storage_class},
"action": {
"type": "SetStorageClass",
"storageClass": storage_class,
},
"condition": dict(conditions),
}
super().__init__(rule)
Expand Down Expand Up @@ -846,6 +849,7 @@ def blob(
encryption_key=None,
kms_key_name=None,
generation=None,
crc32c_checksum=None,
):
"""Factory constructor for blob object.

Expand Down Expand Up @@ -873,6 +877,14 @@ def blob(
:param generation: (Optional) If present, selects a specific revision of
this object.

:type crc32c_checksum: str
:param crc32c_checksum:
(Optional) If set, the CRC32C checksum of the blob's content.
CRC32c checksum, as described in RFC 4960, Appendix B; encoded using
base64 in big-endian byte order. See
Apenndix B: https://datatracker.ietf.org/doc/html/rfc4960#appendix-B
base64: https://datatracker.ietf.org/doc/html/rfc4648#section-4

:rtype: :class:`google.cloud.storage.blob.Blob`
:returns: The blob object created.
"""
Expand All @@ -883,6 +895,7 @@ def blob(
encryption_key=encryption_key,
kms_key_name=kms_key_name,
generation=generation,
crc32c_checksum=crc32c_checksum,
)

def notification(
Expand Down Expand Up @@ -3253,7 +3266,10 @@ def configure_website(self, main_page_suffix=None, not_found_page=None):
:type not_found_page: str
:param not_found_page: The file to use when a page isn't found.
"""
data = {"mainPageSuffix": main_page_suffix, "notFoundPage": not_found_page}
data = {
"mainPageSuffix": main_page_suffix,
"notFoundPage": not_found_page,
}
self._patch_property("website", data)

def disable_website(self):
Expand Down Expand Up @@ -3385,7 +3401,11 @@ def set_iam_policy(
return Policy.from_api_repr(info)

def test_iam_permissions(
self, permissions, client=None, timeout=_DEFAULT_TIMEOUT, retry=DEFAULT_RETRY
self,
permissions,
client=None,
timeout=_DEFAULT_TIMEOUT,
retry=DEFAULT_RETRY,
):
"""API call: test permissions

Expand Down
Binary file added tests/data/random_9_MiB_file
Binary file not shown.
1 change: 1 addition & 0 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
("logo", "CloudPlatform_128px_Retina.png"),
("big", "five-point-one-mb-file.zip"),
("simple", "simple.txt"),
("big_9MiB", "random_9_MiB_file"),
]
_file_data = {
key: {"path": os.path.join(data_dirname, file_name)}
Expand Down
34 changes: 34 additions & 0 deletions tests/system/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,40 @@ def _check_blob_hash(blob, info):
assert md5_hash == info["hash"]


def test_large_file_write_from_stream_w_user_provided_checksum(
shared_bucket,
blobs_to_delete,
file_data,
service_account,
):
blob = shared_bucket.blob(
f"LargeFile{uuid.uuid4().hex}", crc32c_checksum="20tD7w=="
)

info = file_data["big_9MiB"]
with open(info["path"], "rb") as file_obj:
blob.upload_from_file(file_obj)
blobs_to_delete.append(blob)


def test_large_file_write_from_stream_w_user_provided_wrong_checksum(
shared_bucket,
blobs_to_delete,
file_data,
service_account,
):
blob = shared_bucket.blob(
f"LargeFile{uuid.uuid4().hex}", crc32c_checksum="A0tD7w=="
)

info = file_data["big_9MiB"]
with pytest.raises(exceptions.BadRequest) as excep_info:
with open(info["path"], "rb") as file_obj:
blob.upload_from_file(file_obj)
blobs_to_delete.append(blob)
assert excep_info.value.code == 400


def test_large_file_write_from_stream(
shared_bucket,
blobs_to_delete,
Expand Down
26 changes: 25 additions & 1 deletion tests/unit/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -2762,12 +2762,21 @@ def _initiate_resumable_helper(
metadata=None,
mtls=False,
retry=None,
crc32c_checksum=None,
):
from google.cloud.storage._media.requests import ResumableUpload
from google.cloud.storage.blob import _DEFAULT_CHUNKSIZE

bucket = _Bucket(name="whammy", user_project=user_project)
blob = self._make_one("blob-name", bucket=bucket, kms_key_name=kms_key_name)
if crc32c_checksum is None:
blob = self._make_one("blob-name", bucket=bucket, kms_key_name=kms_key_name)
else:
blob = self._make_one(
"blob-name",
bucket=bucket,
kms_key_name=kms_key_name,
crc32c_checksum=crc32c_checksum,
)
if metadata:
self.assertIsNone(blob.metadata)
blob._properties["metadata"] = metadata
Expand Down Expand Up @@ -2919,6 +2928,10 @@ def _initiate_resumable_helper(
else:
# Check the mocks.
blob._get_writable_metadata.assert_called_once_with()

if "crc32c" in blob._properties:
object_metadata["crc32c"] = blob._properties["crc32c"]

payload = json.dumps(object_metadata).encode("utf-8")

with patch.object(
Expand All @@ -2945,6 +2958,17 @@ def _initiate_resumable_helper(
def test__initiate_resumable_upload_with_metadata(self):
self._initiate_resumable_helper(metadata={"test": "test"})

def test__initiate_resumable_upload_with_user_provided_checksum(self):
self._initiate_resumable_helper(
crc32c_checksum="this-is-a-fake-checksum-for-unit-tests",
)

def test__initiate_resumable_upload_w_metadata_and_user_provided_checksum(self):
self._initiate_resumable_helper(
crc32c_checksum="test-checksum",
metadata={"my-fav-key": "my-fav-value"},
)

def test__initiate_resumable_upload_with_custom_timeout(self):
self._initiate_resumable_helper(timeout=9.58)

Expand Down