1515# KIND, either express or implied. See the License for the
1616# specific language governing permissions and limitations
1717# under the License.
18- #
1918"""This module contains a Google Cloud Storage hook."""
19+ from __future__ import annotations
20+
2021import functools
2122import gzip as gz
2223import os
2829from io import BytesIO
2930from os import path
3031from tempfile import NamedTemporaryFile
31- from typing import (
32- IO ,
33- Callable ,
34- Generator ,
35- List ,
36- Optional ,
37- Sequence ,
38- Set ,
39- Tuple ,
40- TypeVar ,
41- Union ,
42- cast ,
43- overload ,
44- )
32+ from typing import IO , Callable , Generator , Sequence , TypeVar , cast , overload
4533from urllib .parse import urlparse
4634
4735from google .api_core .exceptions import NotFound
6048RT = TypeVar ('RT' )
6149T = TypeVar ("T" , bound = Callable )
6250
51+ # GCSHook has a method named 'list' (to junior devs: please don't do this), so
52+ # we need to create an alias to prevent Mypy being confused.
53+ List = list
54+
6355# Use default timeout from google-cloud-storage
6456DEFAULT_TIMEOUT = 60
6557
@@ -80,7 +72,7 @@ def _fallback_object_url_to_object_name_and_bucket_name(
8072
8173 def _wrapper (func : T ):
8274 @functools .wraps (func )
83- def _inner_wrapper (self : " GCSHook" , * args , ** kwargs ) -> RT :
75+ def _inner_wrapper (self : GCSHook , * args , ** kwargs ) -> RT :
8476 if args :
8577 raise AirflowException (
8678 "You must use keyword arguments in this methods rather than positional"
@@ -139,13 +131,13 @@ class GCSHook(GoogleBaseHook):
139131 connection.
140132 """
141133
142- _conn = None # type: Optional[ storage.Client]
134+ _conn : storage .Client | None = None
143135
144136 def __init__ (
145137 self ,
146138 gcp_conn_id : str = "google_cloud_default" ,
147- delegate_to : Optional [ str ] = None ,
148- impersonation_chain : Optional [ Union [ str , Sequence [str ]]] = None ,
139+ delegate_to : str | None = None ,
140+ impersonation_chain : str | Sequence [str ] | None = None ,
149141 ) -> None :
150142 super ().__init__ (
151143 gcp_conn_id = gcp_conn_id ,
@@ -166,8 +158,8 @@ def copy(
166158 self ,
167159 source_bucket : str ,
168160 source_object : str ,
169- destination_bucket : Optional [ str ] = None ,
170- destination_object : Optional [ str ] = None ,
161+ destination_bucket : str | None = None ,
162+ destination_object : str | None = None ,
171163 ) -> None :
172164 """
173165 Copies an object from a bucket to another, with renaming if requested.
@@ -215,7 +207,7 @@ def rewrite(
215207 source_bucket : str ,
216208 source_object : str ,
217209 destination_bucket : str ,
218- destination_object : Optional [ str ] = None ,
210+ destination_object : str | None = None ,
219211 ) -> None :
220212 """
221213 Has the same functionality as copy, except that will work on files
@@ -270,9 +262,9 @@ def download(
270262 bucket_name : str ,
271263 object_name : str ,
272264 filename : None = None ,
273- chunk_size : Optional [ int ] = None ,
274- timeout : Optional [ int ] = DEFAULT_TIMEOUT ,
275- num_max_attempts : Optional [ int ] = 1 ,
265+ chunk_size : int | None = None ,
266+ timeout : int | None = DEFAULT_TIMEOUT ,
267+ num_max_attempts : int | None = 1 ,
276268 ) -> bytes :
277269 ...
278270
@@ -282,21 +274,21 @@ def download(
282274 bucket_name : str ,
283275 object_name : str ,
284276 filename : str ,
285- chunk_size : Optional [ int ] = None ,
286- timeout : Optional [ int ] = DEFAULT_TIMEOUT ,
287- num_max_attempts : Optional [ int ] = 1 ,
277+ chunk_size : int | None = None ,
278+ timeout : int | None = DEFAULT_TIMEOUT ,
279+ num_max_attempts : int | None = 1 ,
288280 ) -> str :
289281 ...
290282
291283 def download (
292284 self ,
293285 bucket_name : str ,
294286 object_name : str ,
295- filename : Optional [ str ] = None ,
296- chunk_size : Optional [ int ] = None ,
297- timeout : Optional [ int ] = DEFAULT_TIMEOUT ,
298- num_max_attempts : Optional [ int ] = 1 ,
299- ) -> Union [ str , bytes ] :
287+ filename : str | None = None ,
288+ chunk_size : int | None = None ,
289+ timeout : int | None = DEFAULT_TIMEOUT ,
290+ num_max_attempts : int | None = 1 ,
291+ ) -> str | bytes :
300292 """
301293 Downloads a file from Google Cloud Storage.
302294
@@ -351,9 +343,9 @@ def download_as_byte_array(
351343 self ,
352344 bucket_name : str ,
353345 object_name : str ,
354- chunk_size : Optional [ int ] = None ,
355- timeout : Optional [ int ] = DEFAULT_TIMEOUT ,
356- num_max_attempts : Optional [ int ] = 1 ,
346+ chunk_size : int | None = None ,
347+ timeout : int | None = DEFAULT_TIMEOUT ,
348+ num_max_attempts : int | None = 1 ,
357349 ) -> bytes :
358350 """
359351 Downloads a file from Google Cloud Storage.
@@ -383,9 +375,9 @@ def download_as_byte_array(
383375 def provide_file (
384376 self ,
385377 bucket_name : str = PROVIDE_BUCKET ,
386- object_name : Optional [ str ] = None ,
387- object_url : Optional [ str ] = None ,
388- dir : Optional [ str ] = None ,
378+ object_name : str | None = None ,
379+ object_url : str | None = None ,
380+ dir : str | None = None ,
389381 ) -> Generator [IO [bytes ], None , None ]:
390382 """
391383 Downloads the file to a temporary directory and returns a file handle
@@ -412,8 +404,8 @@ def provide_file(
412404 def provide_file_and_upload (
413405 self ,
414406 bucket_name : str = PROVIDE_BUCKET ,
415- object_name : Optional [ str ] = None ,
416- object_url : Optional [ str ] = None ,
407+ object_name : str | None = None ,
408+ object_url : str | None = None ,
417409 ) -> Generator [IO [bytes ], None , None ]:
418410 """
419411 Creates temporary file, returns a file handle and uploads the files content
@@ -440,15 +432,15 @@ def upload(
440432 self ,
441433 bucket_name : str ,
442434 object_name : str ,
443- filename : Optional [ str ] = None ,
444- data : Optional [ Union [ str , bytes ]] = None ,
445- mime_type : Optional [ str ] = None ,
435+ filename : str | None = None ,
436+ data : str | bytes | None = None ,
437+ mime_type : str | None = None ,
446438 gzip : bool = False ,
447439 encoding : str = 'utf-8' ,
448- chunk_size : Optional [ int ] = None ,
449- timeout : Optional [ int ] = DEFAULT_TIMEOUT ,
440+ chunk_size : int | None = None ,
441+ timeout : int | None = DEFAULT_TIMEOUT ,
450442 num_max_attempts : int = 1 ,
451- metadata : Optional [ dict ] = None ,
443+ metadata : dict | None = None ,
452444 ) -> None :
453445 """
454446 Uploads a local file or file data as string or bytes to Google Cloud Storage.
@@ -683,7 +675,7 @@ def delete_bucket(self, bucket_name: str, force: bool = False) -> None:
683675 except NotFound :
684676 self .log .info ("Bucket %s not exists" , bucket_name )
685677
686- def list (self , bucket_name , versions = None , max_results = None , prefix = None , delimiter = None ) -> list :
678+ def list (self , bucket_name , versions = None , max_results = None , prefix = None , delimiter = None ) -> List :
687679 """
688680 List all objects from the bucket with the give string prefix in name
689681
@@ -730,10 +722,10 @@ def list_by_timespan(
730722 bucket_name : str ,
731723 timespan_start : datetime ,
732724 timespan_end : datetime ,
733- versions : Optional [ bool ] = None ,
734- max_results : Optional [ int ] = None ,
735- prefix : Optional [ str ] = None ,
736- delimiter : Optional [ str ] = None ,
725+ versions : bool | None = None ,
726+ max_results : int | None = None ,
727+ prefix : str | None = None ,
728+ delimiter : str | None = None ,
737729 ) -> List [str ]:
738730 """
739731 List all objects from the bucket with the give string prefix in name that were
@@ -838,11 +830,11 @@ def get_md5hash(self, bucket_name: str, object_name: str) -> str:
838830 def create_bucket (
839831 self ,
840832 bucket_name : str ,
841- resource : Optional [ dict ] = None ,
833+ resource : dict | None = None ,
842834 storage_class : str = 'MULTI_REGIONAL' ,
843835 location : str = 'US' ,
844- project_id : Optional [ str ] = None ,
845- labels : Optional [ dict ] = None ,
836+ project_id : str | None = None ,
837+ labels : dict | None = None ,
846838 ) -> str :
847839 """
848840 Creates a new bucket. Google Cloud Storage uses a flat namespace, so
@@ -900,7 +892,7 @@ def create_bucket(
900892 return bucket .id
901893
902894 def insert_bucket_acl (
903- self , bucket_name : str , entity : str , role : str , user_project : Optional [ str ] = None
895+ self , bucket_name : str , entity : str , role : str , user_project : str | None = None
904896 ) -> None :
905897 """
906898 Creates a new ACL entry on the specified bucket_name.
@@ -933,8 +925,8 @@ def insert_object_acl(
933925 object_name : str ,
934926 entity : str ,
935927 role : str ,
936- generation : Optional [ int ] = None ,
937- user_project : Optional [ str ] = None ,
928+ generation : int | None = None ,
929+ user_project : str | None = None ,
938930 ) -> None :
939931 """
940932 Creates a new ACL entry on the specified object.
@@ -967,7 +959,7 @@ def insert_object_acl(
967959
968960 self .log .info ('A new ACL entry created for object: %s in bucket: %s' , object_name , bucket_name )
969961
970- def compose (self , bucket_name : str , source_objects : List , destination_object : str ) -> None :
962+ def compose (self , bucket_name : str , source_objects : List [ str ] , destination_object : str ) -> None :
971963 """
972964 Composes a list of existing object into a new object in the same storage bucket_name
973965
@@ -1002,8 +994,8 @@ def sync(
1002994 self ,
1003995 source_bucket : str ,
1004996 destination_bucket : str ,
1005- source_object : Optional [ str ] = None ,
1006- destination_object : Optional [ str ] = None ,
997+ source_object : str | None = None ,
998+ destination_object : str | None = None ,
1007999 recursive : bool = True ,
10081000 allow_overwrite : bool = False ,
10091001 delete_extra_files : bool = False ,
@@ -1104,7 +1096,7 @@ def sync(
11041096 self .log .info ("Synchronization finished." )
11051097
11061098 def _calculate_sync_destination_path (
1107- self , blob : storage .Blob , destination_object : Optional [ str ] , source_object_prefix_len : int
1099+ self , blob : storage .Blob , destination_object : str | None , source_object_prefix_len : int
11081100 ) -> str :
11091101 return (
11101102 path .join (destination_object , blob .name [source_object_prefix_len :])
@@ -1116,10 +1108,10 @@ def _calculate_sync_destination_path(
11161108 def _prepare_sync_plan (
11171109 source_bucket : storage .Bucket ,
11181110 destination_bucket : storage .Bucket ,
1119- source_object : Optional [ str ] ,
1120- destination_object : Optional [ str ] ,
1111+ source_object : str | None ,
1112+ destination_object : str | None ,
11211113 recursive : bool ,
1122- ) -> Tuple [ Set [storage .Blob ], Set [storage .Blob ], Set [storage .Blob ]]:
1114+ ) -> tuple [ set [storage .Blob ], set [storage .Blob ], set [storage .Blob ]]:
11231115 # Calculate the number of characters that remove from the name, because they contain information
11241116 # about the parent's path
11251117 source_object_prefix_len = len (source_object ) if source_object else 0
@@ -1139,11 +1131,11 @@ def _prepare_sync_plan(
11391131 # Determine objects to copy and delete
11401132 to_copy = source_names - destination_names
11411133 to_delete = destination_names - source_names
1142- to_copy_blobs = {source_names_index [a ] for a in to_copy } # type: Set[storage.Blob]
1143- to_delete_blobs = {destination_names_index [a ] for a in to_delete } # type: Set[storage.Blob]
1134+ to_copy_blobs : set [ storage . Blob ] = {source_names_index [a ] for a in to_copy }
1135+ to_delete_blobs : set [ storage . Blob ] = {destination_names_index [a ] for a in to_delete }
11441136 # Find names that are in both buckets
11451137 names_to_check = source_names .intersection (destination_names )
1146- to_rewrite_blobs = set () # type: Set [storage.Blob]
1138+ to_rewrite_blobs : set [storage .Blob ] = set ()
11471139 # Compare objects based on crc32
11481140 for current_name in names_to_check :
11491141 source_blob = source_names_index [current_name ]
@@ -1164,7 +1156,7 @@ def gcs_object_is_directory(bucket: str) -> bool:
11641156 return len (blob ) == 0 or blob .endswith ('/' )
11651157
11661158
1167- def _parse_gcs_url (gsurl : str ) -> Tuple [str , str ]:
1159+ def _parse_gcs_url (gsurl : str ) -> tuple [str , str ]:
11681160 """
11691161 Given a Google Cloud Storage URL (gs://<bucket>/<blob>), returns a
11701162 tuple containing the corresponding bucket and blob.
0 commit comments