@@ -332,12 +332,12 @@ def get_redis_connection(self, key: str, transaction: bool = True) -> Pipeline:
332332 pipe = conn .pipeline (transaction = transaction )
333333 return pipe
334334
335- def _execute_redis_operation (
335+ def _execute_redis_operation_no_txn (
336336 self , key : str , operation : RedisOperation , * args : Any , ** kwargs : Any
337337 ) -> Any :
338338 metrics_str = f"redis_buffer.{ operation .value } "
339339 metrics .incr (metrics_str )
340- pipe = self .get_redis_connection (self .pending_key )
340+ pipe = self .get_redis_connection (self .pending_key , transaction = False )
341341 getattr (pipe , operation .value )(key , * args , ** kwargs )
342342 if args :
343343 pipe .expire (key , self .key_expire )
@@ -356,7 +356,7 @@ def _execute_sharded_redis_operation(
356356
357357 metrics_str = f"redis_buffer.{ operation .value } "
358358 metrics .incr (metrics_str , amount = len (keys ))
359- pipe = self .get_redis_connection (self .pending_key )
359+ pipe = self .get_redis_connection (self .pending_key , transaction = False )
360360 for key in keys :
361361 getattr (pipe , operation .value )(key , * args , ** kwargs )
362362 if args :
@@ -369,10 +369,10 @@ def push_to_sorted_set(self, key: str, value: list[int] | int) -> None:
369369 value_dict = {v : now for v in value }
370370 else :
371371 value_dict = {value : now }
372- self ._execute_redis_operation (key , RedisOperation .SORTED_SET_ADD , value_dict )
372+ self ._execute_redis_operation_no_txn (key , RedisOperation .SORTED_SET_ADD , value_dict )
373373
374374 def get_sorted_set (self , key : str , min : float , max : float ) -> list [tuple [int , float ]]:
375- redis_set = self ._execute_redis_operation (
375+ redis_set = self ._execute_redis_operation_no_txn (
376376 key ,
377377 RedisOperation .SORTED_SET_GET_RANGE ,
378378 min = min ,
@@ -410,7 +410,9 @@ def bulk_get_sorted_set(
410410 return data_to_timestamps
411411
412412 def delete_key (self , key : str , min : float , max : float ) -> None :
413- self ._execute_redis_operation (key , RedisOperation .SORTED_SET_DELETE_RANGE , min = min , max = max )
413+ self ._execute_redis_operation_no_txn (
414+ key , RedisOperation .SORTED_SET_DELETE_RANGE , min = min , max = max
415+ )
414416
415417 def delete_keys (self , keys : list [str ], min : float , max : float ) -> None :
416418 self ._execute_sharded_redis_operation (
@@ -427,7 +429,7 @@ def delete_hash(
427429 fields : list [str ],
428430 ) -> None :
429431 key = self ._make_key (model , filters )
430- pipe = self .get_redis_connection (self .pending_key )
432+ pipe = self .get_redis_connection (self .pending_key , transaction = False )
431433 for field in fields :
432434 getattr (pipe , RedisOperation .HASH_DELETE .value )(key , field )
433435 pipe .expire (key , self .key_expire )
@@ -441,7 +443,7 @@ def push_to_hash(
441443 value : str ,
442444 ) -> None :
443445 key = self ._make_key (model , filters )
444- self ._execute_redis_operation (key , RedisOperation .HASH_ADD , field , value )
446+ self ._execute_redis_operation_no_txn (key , RedisOperation .HASH_ADD , field , value )
445447
446448 def push_to_hash_bulk (
447449 self ,
@@ -450,11 +452,11 @@ def push_to_hash_bulk(
450452 data : dict [str , str ],
451453 ) -> None :
452454 key = self ._make_key (model , filters )
453- self ._execute_redis_operation (key , RedisOperation .HASH_ADD_BULK , data )
455+ self ._execute_redis_operation_no_txn (key , RedisOperation .HASH_ADD_BULK , data )
454456
455457 def get_hash (self , model : type [models .Model ], field : dict [str , BufferField ]) -> dict [str , str ]:
456458 key = self ._make_key (model , field )
457- redis_hash = self ._execute_redis_operation (key , RedisOperation .HASH_GET_ALL )
459+ redis_hash = self ._execute_redis_operation_no_txn (key , RedisOperation .HASH_GET_ALL )
458460 decoded_hash = {}
459461 for k , v in redis_hash .items ():
460462 if isinstance (k , bytes ):
@@ -467,7 +469,7 @@ def get_hash(self, model: type[models.Model], field: dict[str, BufferField]) ->
467469
468470 def get_hash_length (self , model : type [models .Model ], field : dict [str , BufferField ]) -> int :
469471 key = self ._make_key (model , field )
470- return self ._execute_redis_operation (key , RedisOperation .HASH_LENGTH )
472+ return self ._execute_redis_operation_no_txn (key , RedisOperation .HASH_LENGTH )
471473
472474 def incr (
473475 self ,
0 commit comments