Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
8c6dfa1
Combine commits for anyio upgrade
Graeme22 Aug 5, 2025
0c1d856
Merge branch 'master' into anyio
Graeme22 Sep 26, 2025
4d2d563
make tests necessary again
Graeme22 Sep 26, 2025
6bc2b9d
Merge branch 'master' into anyio
Graeme22 Sep 26, 2025
c618d0e
fix blocking command bug
Graeme22 Sep 27, 2025
daeb683
sync w/ master
Graeme22 Oct 11, 2025
b98bb2c
use anyio TLS wrapper
Graeme22 Oct 14, 2025
97657aa
improve blocking pool
Graeme22 Oct 22, 2025
bbf66e6
blocking logic tweak
Graeme22 Oct 27, 2025
d92800c
Add implicit pytest_mode marker for all tests
alisaifee Oct 27, 2025
01483d0
Sync all test fixtures to use async with before yielding client
alisaifee Oct 27, 2025
2fde017
Fix typing ambiguity
alisaifee Oct 28, 2025
5ccd9b3
Merge pull request #3 from alisaifee/anyio
Graeme22 Oct 29, 2025
d20e432
uncouple pool & cxn, restructure pool
Graeme22 Oct 30, 2025
dfc91f9
fix corrupted pubsub issue
Graeme22 Oct 30, 2025
9966498
pubsub reconnection logic
Graeme22 Oct 31, 2025
b7db708
Allow gather to return exceptions
alisaifee Oct 29, 2025
227e5d5
Add utility AsyncQueue
alisaifee Oct 29, 2025
7103ffe
Refactor cluster client/connection pool for anyio
alisaifee Oct 29, 2025
ce67ba9
Simplify implementation of gather
alisaifee Oct 31, 2025
1c2eb64
Ensure per request encoding is respected
alisaifee Nov 1, 2025
215cc57
Ensure noreply commands do not enqueue requests to be parsed
alisaifee Nov 1, 2025
885c999
uncouple pool & cxn, restructure pool
Graeme22 Oct 30, 2025
fd96d9a
fix pool logic
Graeme22 Nov 3, 2025
b0642a1
use deque for free connections, cleaner pubsub cleanup
Graeme22 Nov 4, 2025
754feac
Merge remote-tracking branch 'graeme/anyio' into anyio
alisaifee Nov 4, 2025
74a557b
Change cluster pool internals o LIFO and remove non blocking
alisaifee Nov 4, 2025
c63b462
Remove unused cluster pool disconnect method
alisaifee Nov 4, 2025
591563d
Restore cluster connection pool get_connection public scope
alisaifee Nov 4, 2025
ea0dfcc
Non functional cleanups in cluster pool
alisaifee Nov 4, 2025
23ff1f8
Remove references to blocking fixtures
alisaifee Nov 4, 2025
dbc2a43
Narrow definition of async queue for connections
alisaifee Nov 5, 2025
5dbea9f
Fix incorrectly wrapped pipeline response
alisaifee Nov 5, 2025
680c367
Fix incorrect construction of basic redis fixture
alisaifee Nov 5, 2025
0ad1b98
Fix cluster command tests to correctly use node clients
alisaifee Nov 5, 2025
91cdf6e
Fix server command tests to correctly user cloner
alisaifee Nov 5, 2025
2bdd022
Fix incorrect use of acquire method in generic command tests
alisaifee Nov 5, 2025
b35373a
Fix connection command tests
alisaifee Nov 5, 2025
2e83648
Simplify basic connection pool to use ConnectionQueue
alisaifee Nov 5, 2025
7c9c021
Improve connection error handling
alisaifee Nov 5, 2025
453d29f
Update stream tests to use anyio task group instead of threads
alisaifee Nov 6, 2025
3df2a05
Disable object idle time test for redict
alisaifee Nov 6, 2025
a4ca602
axe multiplexing, re-add transaction/watch helper
Graeme22 Nov 6, 2025
3c2aeb1
remove separate fn for blocking commands
Graeme22 Nov 6, 2025
f0e1dc3
Remove unused cluster in use connection tracking
alisaifee Nov 7, 2025
1b423ff
Reintroduce connection.disconnect method
alisaifee Nov 7, 2025
5ca950b
Add implementation for cluster pubsub
alisaifee Nov 7, 2025
b6d3d4b
Fix parser tests
alisaifee Nov 7, 2025
a0efa78
Fix cluster scan_iter method
alisaifee Nov 7, 2025
37a744d
Ensure cluster pipeline transaction response is awaitable
alisaifee Nov 7, 2025
06ef8dc
Update module tests to use correct pipeline syntax
alisaifee Nov 7, 2025
15f510b
Merge branch 'anyio' into anyio
Graeme22 Nov 7, 2025
156244d
remove disconnect fn and problematic raise
Graeme22 Nov 7, 2025
f5bed44
rename max_block_time
Graeme22 Nov 7, 2025
5caf99a
Merge pull request #6 from alisaifee/anyio
Graeme22 Nov 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions coredis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,8 @@
Connection,
UnixDomainSocketConnection,
)
from coredis.pool import (
BlockingClusterConnectionPool,
BlockingConnectionPool,
ClusterConnectionPool,
ConnectionPool,
)
from coredis.pool import ClusterConnectionPool, ConnectionPool
from coredis.sentinel import Sentinel
from coredis.tokens import PureToken

__all__ = [
Expand All @@ -33,10 +29,9 @@
"Connection",
"UnixDomainSocketConnection",
"ClusterConnection",
"BlockingConnectionPool",
"ConnectionPool",
"BlockingClusterConnectionPool",
"ClusterConnectionPool",
"PureToken",
"Sentinel",
"__version__",
]
5 changes: 2 additions & 3 deletions coredis/_protocols.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import asyncio

from anyio.streams.memory import MemoryObjectSendStream
from typing_extensions import runtime_checkable

from coredis.response._callbacks import NoopCallback
Expand Down Expand Up @@ -47,4 +46,4 @@ def create_request(
class ConnectionP(Protocol):
decode_responses: bool
encoding: str
push_messages: asyncio.Queue[ResponseType]
push_messages: MemoryObjectSendStream[ResponseType]
114 changes: 0 additions & 114 deletions coredis/_sidecar.py

This file was deleted.

114 changes: 112 additions & 2 deletions coredis/_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from __future__ import annotations

import logging
from collections import UserDict
from typing import Any
from typing import Any, Awaitable, overload

from anyio import create_task_group

from coredis.typing import (
Hashable,
Expand All @@ -13,6 +16,9 @@
TypeVar,
)

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

T = TypeVar("T")
U = TypeVar("U")

Expand Down Expand Up @@ -138,7 +144,7 @@ def make_hashable(*args: Any) -> tuple[Hashable, ...]:
)


def query_param_to_bool(value: Any | None) -> bool | None:
def query_param_to_bool(value: Any) -> bool | None:
if value is None or value in ("", b""):
return None
if isinstance(value, (int, float, bool, str, bytes)):
Expand Down Expand Up @@ -434,6 +440,110 @@ def hash_slot(key: bytes) -> int:
return crc16(key) % 16384


T1 = TypeVar("T1")
T2 = TypeVar("T2")
T3 = TypeVar("T3")
T4 = TypeVar("T4")
T5 = TypeVar("T5")
T6 = TypeVar("T6")


@overload
async def gather(
awaitable1: Awaitable[T1],
awaitable2: Awaitable[T2],
/,
*,
return_exceptions: bool = False,
) -> tuple[T1, T2]: ...


@overload
async def gather(
awaitable1: Awaitable[T1],
awaitable2: Awaitable[T2],
awaitable3: Awaitable[T3],
/,
*,
return_exceptions: bool = False,
) -> tuple[T1, T2, T3]: ...


@overload
async def gather(
awaitable1: Awaitable[T1],
awaitable2: Awaitable[T2],
awaitable3: Awaitable[T3],
awaitable4: Awaitable[T4],
/,
*,
return_exceptions: bool = False,
) -> tuple[T1, T2, T3, T4]: ...


@overload
async def gather(
awaitable1: Awaitable[T1],
awaitable2: Awaitable[T2],
awaitable3: Awaitable[T3],
awaitable4: Awaitable[T4],
awaitable5: Awaitable[T5],
/,
*,
return_exceptions: bool = False,
) -> tuple[T1, T2, T3, T4, T5]: ...


@overload
async def gather(
awaitable1: Awaitable[T1],
awaitable2: Awaitable[T2],
awaitable3: Awaitable[T3],
awaitable4: Awaitable[T4],
awaitable5: Awaitable[T5],
awaitable6: Awaitable[T6],
/,
*,
return_exceptions: bool = False,
) -> tuple[T1, T2, T3, T4, T5, T6]: ...


@overload
async def gather(
*awaitables: Awaitable[T1],
return_exceptions: bool = False,
) -> tuple[T1, ...]: ...


async def gather(*awaitables: Awaitable[Any], return_exceptions: bool = False) -> tuple[Any, ...]:
if not awaitables:
return ()
if len(awaitables) == 1:
try:
return (await awaitables[0],)
except Exception as exc:
if return_exceptions:
return (exc,)
else:
raise

results: list[Any] = [None] * len(awaitables)

async def runner(awaitable: Awaitable[Any], i: int) -> None:
try:
results[i] = await awaitable
except Exception as exc:
if not return_exceptions:
raise
results[i] = exc

async with create_task_group() as tg:
for i, awaitable in enumerate(awaitables):
tg.start_soon(runner, awaitable, i)

return tuple(results)


__all__ = [
"hash_slot",
"EncodingInsensitiveDict",
Expand Down
Loading
Loading