Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions stubs/pika/@tests/requirements-stubtest.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
twisted
tornado
5 changes: 2 additions & 3 deletions stubs/pika/@tests/stubtest_allowlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ pika.compat.StringIO.seek
pika.compat.StringIO.truncate

# Requires external libraries to be installed.
pika\.adapters\.gevent_connection.*
pika\.adapters\.tornado_connection.*
pika\.adapters\.twisted_connection.*
pika.adapters.gevent_connection
pika.adapters._twisted_shims

# Stubtest doesn't understand that a property alias is also read-only.
pika.BlockingConnection.basic_nack
Expand Down
205 changes: 205 additions & 0 deletions stubs/pika/pika/adapters/_twisted_shims.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
from _typeshed import Incomplete, Self
from asyncio import AbstractEventLoop, Future
from collections.abc import Awaitable, Callable, Coroutine, Generator, Iterable, Mapping, Sequence
from enum import Enum
from typing import Any, Generic, NoReturn, Protocol as _TypingProtocol, TypeVar
from typing_extensions import Literal, TypeAlias

_T = TypeVar("_T")

# region twisted.python.failure
class Failure(BaseException):
pickled: int
stack: Incomplete
count: Incomplete
type: Incomplete
captureVars: Incomplete
value: Incomplete
tb: Incomplete
parents: Incomplete
def __init__(
self,
exc_value: Incomplete | None = ...,
exc_type: Incomplete | None = ...,
exc_tb: Incomplete | None = ...,
captureVars: bool = ...,
) -> None: ...
def trap(self, *errorTypes): ...
def check(self, *errorTypes): ...
def raiseException(self) -> NoReturn: ...
def throwExceptionIntoGenerator(self, g): ...
__dict__: Incomplete
def cleanFailure(self) -> None: ...
def getTracebackObject(self): ...
def getErrorMessage(self) -> str: ...
def getBriefTraceback(self) -> str: ...
def getTraceback(self, elideFrameworkCode: int = ..., detail: str = ...) -> str: ...
def printTraceback(self, file: Incomplete | None = ..., elideFrameworkCode: bool = ..., detail: str = ...) -> None: ...
def printBriefTraceback(self, file: Incomplete | None = ..., elideFrameworkCode: int = ...) -> None: ...
def printDetailedTraceback(self, file: Incomplete | None = ..., elideFrameworkCode: int = ...) -> None: ...

# endregion

# region twisted.internet.interfaces

class Interface: # incomplete
def __init__(self, obj, alternate=...) -> None: ...

class _InterfaceProtocol(_TypingProtocol): # incomplete
def __init__(self, obj, alternate=...) -> None: ...

class IAddress(Interface): ...

class IDelayedCall(Interface):
def getTime(self) -> float: ...
def cancel(self) -> None: ...
def delay(self, secondsLater: float) -> None: ...
def reset(sself, econdsFromNow: float) -> None: ...
def active(self) -> bool: ...

class IReactorTime(_InterfaceProtocol):
def seconds(self) -> float: ...
def callLater(self, delay: float, callable: Callable[..., Any], *args: object, **kwargs: object) -> IDelayedCall: ...
def getDelayedCalls(self) -> Sequence[IDelayedCall]: ...

class ITransport(_InterfaceProtocol):
def write(self, data: bytes) -> None: ...
def writeSequence(self, data: Iterable[bytes]) -> None: ...
def loseConnection(self) -> None: ...
def getPeer(self) -> IAddress: ...
def getHost(self) -> IAddress: ...

# endregion

# region twisted.internet.base
class _Sentinel(Enum):
_NO_RESULT = object()
_CONTINUE = object()

DeferredCallback: TypeAlias = Callable[..., object]
DeferredErrback: TypeAlias = Callable[..., object]
_CallbackOrderedArguments: TypeAlias = tuple[object, ...]
_CallbackKeywordArguments: TypeAlias = Mapping[str, object]
_CallbackChain: TypeAlias = tuple[
tuple[DeferredCallback | Literal[_Sentinel._CONTINUE], _CallbackOrderedArguments, _CallbackKeywordArguments],
tuple[
DeferredErrback | DeferredCallback, Literal[_Sentinel._CONTINUE], _CallbackOrderedArguments, _CallbackKeywordArguments,
],
]
_DeferredResultT = TypeVar("_DeferredResultT", contravariant=True)
_NextDeferredResultT = TypeVar("_NextDeferredResultT", covariant=True)

class DelayedCall(_TypingProtocol):
debug: bool
creator: Sequence[str] | None
resetter: Incomplete
canceller: Incomplete
seconds: Incomplete
cancelled: int
delayed_time: float
def __init__(
self,
time: float,
func: Callable[..., Any],
args: Sequence[object],
kw: dict[str, object],
cancel: Callable[[DelayedCall], None],
reset: Callable[[DelayedCall], None],
seconds: Callable[[], float] = ...,
) -> None: ...
def getTime(self) -> float: ...
def cancel(self) -> None: ...
time: Incomplete
def reset(self, secondsFromNow: float) -> None: ...
def delay(self, secondsLater: float) -> None: ...
def activate_delay(self) -> None: ...
def active(self) -> bool: ...
def __le__(self, other: object) -> bool: ...
def __lt__(self, other: object) -> bool: ...

# Variance: This is how it's typed in source
class Deferred(Awaitable[_DeferredResultT]): # type: ignore[type-var] # pyright: ignore[reportGeneralTypeIssues]
called: bool
paused: int
debug: bool
callbacks: list[_CallbackChain]
def __init__(self, canceller: Callable[[Deferred[Any]], None] | None = ...) -> None: ...
def addCallbacks(
self,
callback: Callable[..., _NextDeferredResultT | Deferred[_NextDeferredResultT]],
errback: Callable[..., Failure | _NextDeferredResultT | Deferred[_NextDeferredResultT]] = ...,
callbackArgs: _CallbackOrderedArguments = ...,
callbackKeywords: _CallbackKeywordArguments = ...,
errbackArgs: _CallbackOrderedArguments = ...,
errbackKeywords: _CallbackKeywordArguments = ...,
) -> Deferred[_NextDeferredResultT]: ...
def addCallback(
self, callback: Callable[..., _NextDeferredResultT | Deferred[_NextDeferredResultT]], *args: object, **kwargs: object
) -> Deferred[_NextDeferredResultT]: ...
def addErrback(
self,
errback: Callable[..., Failure | _NextDeferredResultT | Deferred[_NextDeferredResultT]],
*args: object,
**kwargs: object,
) -> Deferred[_DeferredResultT | _NextDeferredResultT]: ...
def addBoth(
self, callback: Callable[..., _NextDeferredResultT | Deferred[_NextDeferredResultT]], *args: object, **kwargs: object
) -> Deferred[_NextDeferredResultT]: ...
def addTimeout(
self, timeout: float, clock: IReactorTime, onTimeoutCancel: Callable[[object, float], object] | None = ...
) -> Deferred[_DeferredResultT]: ...
def chainDeferred(self, d: Deferred[_DeferredResultT]) -> Deferred[None]: ...
def callback(self, result: _DeferredResultT | Failure) -> None: ...
def errback(self, fail: Failure | BaseException | None = ...) -> None: ...
def pause(self) -> None: ...
def unpause(self) -> None: ...
def cancel(self) -> None: ...
def __iter__(self) -> Deferred[_DeferredResultT]: ...
def send(self, value: object = ...) -> Deferred[_DeferredResultT]: ...
def __await__(self) -> Generator[Any, None, _DeferredResultT]: ...
__next__: Incomplete
def asFuture(self, loop: AbstractEventLoop) -> Future[_DeferredResultT]: ...
@classmethod
def fromFuture(cls, future: Future[Incomplete]) -> Deferred[Any]: ...
@classmethod
def fromCoroutine(cls, coro: Coroutine[Deferred[_T], Any, _T] | Generator[Deferred[_T], Any, _T]) -> Deferred[_T]: ...

class DeferredQueue(Generic[_T]):
waiting: list[Deferred[_T]]
pending: list[_T]
size: int | None
backlog: int | None
def __init__(self, size: int | None = ..., backlog: int | None = ...) -> None: ...
def put(self, obj: _T) -> None: ...
def get(self) -> Deferred[_T]: ...

# endregion

# region twisted.internet.protocol

class Factory:
protocol: Callable[[], Protocol] | None
numPorts: int
noisy: bool
@classmethod
def forProtocol(cls: type[Self], protocol: Protocol, *args, **kwargs) -> Self: ...
def logPrefix(self): ...
def doStart(self) -> None: ...
def doStop(self) -> None: ...
def startFactory(self) -> None: ...
def stopFactory(self) -> None: ...
def buildProtocol(self, addr: IAddress) -> Protocol | None: ...

class BaseProtocol:
connected: int
transport: ITransport | None
def makeConnection(self, transport) -> None: ...
def connectionMade(self) -> None: ...

class Protocol(BaseProtocol):
factory: Factory | None
def logPrefix(self): ...
def dataReceived(self, data: bytes): ...
def connectionLost(self, reason: BaseException = ...): ...

# endregion
89 changes: 51 additions & 38 deletions stubs/pika/pika/adapters/twisted_connection.pyi
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
from _typeshed import Incomplete
from typing import Any, NamedTuple
from typing_extensions import TypeAlias
from typing import NamedTuple, TypeVar

import pika.connection
from pika.adapters.utils import nbio_interface

DeferredQueue: TypeAlias = Any # TODO: twisted.internet.defer.DeferredQueue
Protocol: TypeAlias = Any # TODO: twisted.internet.protocol.Protocol
from ._twisted_shims import Deferred, DeferredQueue, DelayedCall, Failure, ITransport, Protocol

_T = TypeVar("_T")

LOGGER: Incomplete

class ClosableDeferredQueue(DeferredQueue):
closed: Incomplete
class ClosableDeferredQueue(DeferredQueue[_T]):
closed: Failure | BaseException | None
def __init__(self, size: Incomplete | None = ..., backlog: Incomplete | None = ...) -> None: ...
def put(self, obj): ...
def get(self): ...
# Returns a Deferred with an error if fails. None if success
def put(self, obj: _T) -> Deferred[Failure | BaseException] | None: ... # type: ignore[override]
def get(self) -> Deferred[Failure | BaseException | _T]: ...
pending: Incomplete
def close(self, reason) -> None: ...
def close(self, reason: BaseException | None) -> None: ...

class ReceivedMessage(NamedTuple):
channel: Incomplete
Expand All @@ -25,7 +26,7 @@ class ReceivedMessage(NamedTuple):
body: Incomplete

class TwistedChannel:
on_closed: Incomplete
on_closed: Deferred[Incomplete | Failure | BaseException | None]
def __init__(self, channel) -> None: ...
@property
def channel_number(self): ...
Expand All @@ -44,24 +45,30 @@ class TwistedChannel:
def callback_deferred(self, deferred, replies) -> None: ...
def add_on_return_callback(self, callback): ...
def basic_ack(self, delivery_tag: int = ..., multiple: bool = ...): ...
def basic_cancel(self, consumer_tag: str = ...): ...
def basic_cancel(self, consumer_tag: str = ...) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def basic_consume(
self,
queue,
auto_ack: bool = ...,
exclusive: bool = ...,
consumer_tag: Incomplete | None = ...,
arguments: Incomplete | None = ...,
): ...
def basic_get(self, queue, auto_ack: bool = ...): ...
) -> Deferred[Incomplete | Failure | BaseException]: ...
def basic_get(self, queue, auto_ack: bool = ...) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def basic_nack(self, delivery_tag: Incomplete | None = ..., multiple: bool = ..., requeue: bool = ...): ...
def basic_publish(self, exchange, routing_key, body, properties: Incomplete | None = ..., mandatory: bool = ...): ...
def basic_qos(self, prefetch_size: int = ..., prefetch_count: int = ..., global_qos: bool = ...): ...
def basic_publish(
self, exchange, routing_key, body, properties: Incomplete | None = ..., mandatory: bool = ...
) -> Deferred[Incomplete | Failure | BaseException]: ...
def basic_qos(
self, prefetch_size: int = ..., prefetch_count: int = ..., global_qos: bool = ...
) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def basic_reject(self, delivery_tag, requeue: bool = ...): ...
def basic_recover(self, requeue: bool = ...): ...
def basic_recover(self, requeue: bool = ...) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def close(self, reply_code: int = ..., reply_text: str = ...): ...
def confirm_delivery(self): ...
def exchange_bind(self, destination, source, routing_key: str = ..., arguments: Incomplete | None = ...): ...
def confirm_delivery(self) -> Deferred[Incomplete | None]: ...
def exchange_bind(
self, destination, source, routing_key: str = ..., arguments: Incomplete | None = ...
) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def exchange_declare(
self,
exchange,
Expand All @@ -71,18 +78,22 @@ class TwistedChannel:
auto_delete: bool = ...,
internal: bool = ...,
arguments: Incomplete | None = ...,
): ...
def exchange_delete(self, exchange: Incomplete | None = ..., if_unused: bool = ...): ...
) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def exchange_delete(
self, exchange: Incomplete | None = ..., if_unused: bool = ...
) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def exchange_unbind(
self,
destination: Incomplete | None = ...,
source: Incomplete | None = ...,
routing_key: str = ...,
arguments: Incomplete | None = ...,
): ...
def flow(self, active): ...
) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def flow(self, active) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def open(self): ...
def queue_bind(self, queue, exchange, routing_key: Incomplete | None = ..., arguments: Incomplete | None = ...): ...
def queue_bind(
self, queue, exchange, routing_key: Incomplete | None = ..., arguments: Incomplete | None = ...
) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def queue_declare(
self,
queue,
Expand All @@ -91,37 +102,39 @@ class TwistedChannel:
exclusive: bool = ...,
auto_delete: bool = ...,
arguments: Incomplete | None = ...,
): ...
def queue_delete(self, queue, if_unused: bool = ..., if_empty: bool = ...): ...
def queue_purge(self, queue): ...
) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def queue_delete(
self, queue, if_unused: bool = ..., if_empty: bool = ...
) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def queue_purge(self, queue) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def queue_unbind(
self, queue, exchange: Incomplete | None = ..., routing_key: Incomplete | None = ..., arguments: Incomplete | None = ...
): ...
def tx_commit(self): ...
def tx_rollback(self): ...
def tx_select(self): ...
) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def tx_commit(self) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def tx_rollback(self) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def tx_select(self) -> Deferred[Incomplete | Failure | BaseException | None]: ...

class _TwistedConnectionAdapter(pika.connection.Connection):
def __init__(self, parameters, on_open_callback, on_open_error_callback, on_close_callback, custom_reactor) -> None: ...
def connection_made(self, transport) -> None: ...
def connection_lost(self, error) -> None: ...
def connection_made(self, transport: ITransport) -> None: ...
def connection_lost(self, error: Exception) -> None: ...
def data_received(self, data) -> None: ...

class TwistedProtocolConnection(Protocol):
ready: Incomplete
closed: Incomplete
ready: Deferred[None] | None
closed: Deferred[None] | Failure | BaseException | None
def __init__(self, parameters: Incomplete | None = ..., custom_reactor: Incomplete | None = ...) -> None: ...
def channel(self, channel_number: Incomplete | None = ...): ...
@property
def is_open(self): ...
@property
def is_closed(self): ...
def close(self, reply_code: int = ..., reply_text: str = ...): ...
def close(self, reply_code: int = ..., reply_text: str = ...) -> Deferred[None] | Failure | BaseException | None: ...
def dataReceived(self, data) -> None: ...
def connectionLost(self, reason=...) -> None: ...
def makeConnection(self, transport) -> None: ...
def connectionLost(self, reason: Failure | BaseException = ...) -> None: ...
def makeConnection(self, transport: ITransport) -> None: ...
def connectionReady(self): ...

class _TimerHandle(nbio_interface.AbstractTimerReference):
def __init__(self, handle) -> None: ...
def __init__(self, handle: DelayedCall) -> None: ...
def cancel(self) -> None: ...