2727from twisted .python .failure import Failure
2828from twisted .python .threadpool import ThreadPool
2929
30- from synapse .logging .context import LoggingContext , make_deferred_yieldable
3130from synapse .rest .media .v1 ._base import Responder
3231from synapse .rest .media .v1 .storage_provider import StorageProvider
33-
34- # Synapse 1.13.0 moved current_context to a module-level function.
35- try :
36- from synapse .logging .context import current_context
37- except ImportError :
38- current_context = LoggingContext .current_context
32+ from synapse .module_api import run_in_background
3933
4034logger = logging .getLogger ("synapse.s3" )
4135
@@ -121,34 +115,29 @@ def _get_s3_client(self):
121115 def store_file (self , path , file_info ):
122116 """See StorageProvider.store_file"""
123117
124- parent_logcontext = current_context ()
125-
126118 def _store_file ():
127- with LoggingContext (parent_context = parent_logcontext ):
128- self ._get_s3_client ().upload_file (
129- Filename = os .path .join (self .cache_directory , path ),
130- Bucket = self .bucket ,
131- Key = self .prefix + path ,
132- ExtraArgs = self .extra_args ,
133- )
134-
135- return make_deferred_yieldable (
136- threads .deferToThreadPool (reactor , self ._s3_pool , _store_file )
119+ self ._get_s3_client ().upload_file (
120+ Filename = os .path .join (self .cache_directory , path ),
121+ Bucket = self .bucket ,
122+ Key = self .prefix + path ,
123+ ExtraArgs = self .extra_args ,
124+ )
125+
126+ return run_in_background (
127+ threads .deferToThreadPool , reactor , self ._s3_pool , _store_file
137128 )
138129
139130 def fetch (self , path , file_info ):
140131 """See StorageProvider.fetch"""
141- logcontext = current_context ()
142-
143132 d = defer .Deferred ()
144133
145134 def _get_file ():
146135 s3_download_task (
147- self ._get_s3_client (), self .bucket , self .prefix + path , self .extra_args , d , logcontext
136+ self ._get_s3_client (), self .bucket , self .prefix + path , self .extra_args , d
148137 )
149138
150- self ._s3_pool .callInThread ( _get_file )
151- return make_deferred_yieldable ( d )
139+ run_in_background ( self ._s3_pool .callInThread , _get_file )
140+ return d
152141
153142 @staticmethod
154143 def parse_config (config ):
@@ -196,7 +185,7 @@ def parse_config(config):
196185 return result
197186
198187
199- def s3_download_task (s3_client , bucket , key , extra_args , deferred , parent_logcontext ):
188+ def s3_download_task (s3_client , bucket , key , extra_args , deferred ):
200189 """Attempts to download a file from S3.
201190
202191 Args:
@@ -206,35 +195,31 @@ def s3_download_task(s3_client, bucket, key, extra_args, deferred, parent_logcon
206195 deferred (Deferred[_S3Responder|None]): If file exists
207196 resolved with an _S3Responder instance, if it doesn't
208197 exist then resolves with None.
209- parent_logcontext (LoggingContext): the logcontext to report logs and metrics
210- against.
211198 """
212- with LoggingContext (parent_context = parent_logcontext ):
213- logger .info ("Fetching %s from S3" , key )
214-
215- try :
216- if "SSECustomerKey" in extra_args and "SSECustomerAlgorithm" in extra_args :
217- resp = s3_client .get_object (
218- Bucket = bucket ,
219- Key = key ,
220- SSECustomerKey = extra_args ["SSECustomerKey" ],
221- SSECustomerAlgorithm = extra_args ["SSECustomerAlgorithm" ],
222- )
223- else :
224- resp = s3_client .get_object (Bucket = bucket , Key = key )
225-
226- except botocore .exceptions .ClientError as e :
227- if e .response ["Error" ]["Code" ] in ("404" , "NoSuchKey" ,):
228- logger .info ("Media %s not found in S3" , key )
229- reactor .callFromThread (deferred .callback , None )
230- return
199+ logger .info ("Fetching %s from S3" , key )
231200
232- reactor .callFromThread (deferred .errback , Failure ())
201+ try :
202+ if "SSECustomerKey" in extra_args and "SSECustomerAlgorithm" in extra_args :
203+ resp = s3_client .get_object (
204+ Bucket = bucket ,
205+ Key = key ,
206+ SSECustomerKey = extra_args ["SSECustomerKey" ],
207+ SSECustomerAlgorithm = extra_args ["SSECustomerAlgorithm" ],
208+ )
209+ else :
210+ resp = s3_client .get_object (Bucket = bucket , Key = key )
211+
212+ except botocore .exceptions .ClientError as e :
213+ if e .response ["Error" ]["Code" ] in ("404" , "NoSuchKey" ,):
214+ logger .info ("Media %s not found in S3" , key )
233215 return
234216
235- producer = _S3Responder ()
236- reactor .callFromThread (deferred .callback , producer )
237- _stream_to_producer (reactor , producer , resp ["Body" ], timeout = 90.0 )
217+ reactor .callFromThread (deferred .errback , Failure ())
218+ return
219+
220+ producer = _S3Responder ()
221+ reactor .callFromThread (deferred .callback , producer )
222+ _stream_to_producer (reactor , producer , resp ["Body" ], timeout = 90.0 )
238223
239224
240225def _stream_to_producer (reactor , producer , body , status = None , timeout = None ):
0 commit comments