Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions python/distributed-ucxx/distributed_ucxx/tests/test_ucxx.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,9 +381,15 @@ async def test_ucxx_protocol(ucxx_loop, cleanup, port):


@gen_test()
@pytest.mark.ignore_alive_references(True)
async def test_ucxx_unreachable(
ucxx_loop,
):
# It is not entirely clear why, but when attempting to reconnect
# Distributed may fail to complete async tasks, leaving UCXX references
# still alive. For now we disable those errors that only occur during the
# teardown phase of this test.

with pytest.raises(OSError, match="Timed out trying to connect to"):
await Client("ucxx://255.255.255.255:12345", timeout=1, asynchronous=True)

Expand Down
5 changes: 5 additions & 0 deletions python/distributed-ucxx/distributed_ucxx/ucxx.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@ def __init__( # type: ignore[no-untyped-def]

logger.debug("UCX.__init__ %s", self)

def __del__(self) -> None:
self.abort()

@property
def local_address(self) -> str:
return self._local_addr
Expand Down Expand Up @@ -471,6 +474,7 @@ async def close(self):
ucxx.exceptions.UCXCloseError,
ucxx.exceptions.UCXCanceledError,
ucxx.exceptions.UCXConnectionResetError,
ucxx.exceptions.UCXUnreachableError,
):
# If the other end is in the process of closing,
# UCX will sometimes raise a `Input/output` error,
Expand Down Expand Up @@ -524,6 +528,7 @@ async def connect(
ucxx.exceptions.UCXCanceledError,
ucxx.exceptions.UCXConnectionResetError,
ucxx.exceptions.UCXNotConnectedError,
ucxx.exceptions.UCXUnreachableError,
):
raise CommClosedError("Connection closed before handshake completed")
return self.comm_class(
Expand Down
27 changes: 25 additions & 2 deletions python/distributed-ucxx/distributed_ucxx/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,19 @@ def ucxx_exception_handler(event_loop, context):
# Let's make sure that UCX gets time to cancel
# progress tasks before closing the event loop.
@pytest.fixture(scope="function")
def ucxx_loop():
def ucxx_loop(request):
"""Allows UCX to cancel progress tasks before closing event loop.

When UCX tasks are not completed in time (e.g., by unexpected Endpoint
closure), clean up tasks before closing the event loop to prevent unwanted
errors from being raised.

Additionally add an `ignore_alive_references` marker that will override
checks for alive references to `ApplicationContext`. Use sparingly!
"""
marker = request.node.get_closest_marker("ignore_alive_references")
ignore_alive_references = False if marker is None else marker.args[0]

event_loop = asyncio.new_event_loop()
event_loop.set_exception_handler(ucxx_exception_handler)

Expand All @@ -75,7 +81,24 @@ def ucxx_loop():

with check_thread_leak():
yield loop
ucxx.reset()
if ignore_alive_references:
try:
ucxx.reset()
except ucxx.exceptions.UCXError as e:
if (
len(e.args) > 0
and "The following objects are still referencing ApplicationContext"
in e.args[0]
):
print(
"ApplicationContext still has alive references but this test "
f"is ignoring them. Original error:\n{e}",
flush=True,
)
else:
raise e
else:
ucxx.reset()
event_loop.close()

# Reset also Distributed's UCX initialization, i.e., revert the effects of
Expand Down
1 change: 1 addition & 0 deletions python/distributed-ucxx/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,5 +121,6 @@ version = {file = "distributed_ucxx/VERSION"}

[tool.pytest.ini_options]
markers = [
"ignore_alive_references",
"slow",
]
16 changes: 16 additions & 0 deletions python/ucxx/_lib/libucxx.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,8 @@ cdef class UCXEndpoint():
raise TypeError("UCXListener cannot be instantiated directly.")

def __dealloc__(self) -> None:
self.remove_close_callback()

with nogil:
self._endpoint.reset()

Expand Down Expand Up @@ -1488,6 +1490,20 @@ cdef class UCXEndpoint():
)
del func_close_callback

def remove_close_callback(self) -> None:
cdef Endpoint* endpoint

with nogil:
# Unset close callback, in case the Endpoint error callback runs
# after the Python object has been destroyed.
# Cast explicitly to prevent Cython `Cannot assign type ...` errors.
endpoint = self._endpoint.get()
if endpoint != nullptr:
endpoint.setCloseCallback(
<function[void (void *) except *]>nullptr,
nullptr,
)


cdef void _listener_callback(ucp_conn_request_h conn_request, void *args) with gil:
"""Callback function used by UCXListener"""
Expand Down
1 change: 1 addition & 0 deletions python/ucxx/_lib_async/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def abort(self, period=10**10, max_attempts=1):
logger.debug("Endpoint.abort(): 0x%x" % self.uid)
# Wait for a maximum of `period` ns
self._ep.close(period=period, max_attempts=max_attempts)
self._ep.remove_close_callback()
self._ep = None
self._ctx = None

Expand Down