diff --git a/nutkit/backend/backend.py b/nutkit/backend/backend.py index 096bc0895..dc0cf9174 100644 --- a/nutkit/backend/backend.py +++ b/nutkit/backend/backend.py @@ -1,3 +1,4 @@ +from contextlib import contextmanager import inspect import json import os @@ -59,6 +60,7 @@ def __init__(self, address, port): self._encoder = Encoder() self._reader = self._socket.makefile(mode="r", encoding="utf-8") self._writer = self._socket.makefile(mode="w", encoding="utf-8") + self.default_timeout = DEFAULT_TIMEOUT def close(self): self._reader.close() @@ -79,7 +81,9 @@ def send(self, req, hooks=None): self._writer.write("#request end\n") self._writer.flush() - def receive(self, timeout=DEFAULT_TIMEOUT, hooks=None): + def receive(self, timeout=None, hooks=None): + if timeout is None: + timeout = self.default_timeout self._socket.settimeout(timeout) response = "" in_response = False @@ -128,6 +132,14 @@ def receive(self, timeout=DEFAULT_TIMEOUT, hooks=None): elif DEBUG_MESSAGES: print("[BACKEND]: %s" % line) - def send_and_receive(self, req, timeout=DEFAULT_TIMEOUT, hooks=None): + def send_and_receive(self, req, timeout=None, hooks=None): self.send(req, hooks=hooks) return self.receive(timeout, hooks=hooks) + + +@contextmanager +def backend_timeout_adjustment(backend, timeout): + old_timeout = backend.default_timeout + backend.default_timeout = timeout + yield + backend.default_timeout = old_timeout diff --git a/nutkit/frontend/driver.py b/nutkit/frontend/driver.py index 680c2596a..766e285e6 100644 --- a/nutkit/frontend/driver.py +++ b/nutkit/frontend/driver.py @@ -6,7 +6,8 @@ class Driver: def __init__(self, backend, uri, auth_token, user_agent=None, resolver_fn=None, domain_name_resolver_fn=None, connection_timeout_ms=None, fetch_size=None, - max_tx_retry_time_ms=None, encrypted=None, + max_tx_retry_time_ms=None, session_connection_timeout_ms=None, + update_routing_table_timeout_ms=None, encrypted=None, trusted_certificates=None, liveness_check_timeout_ms=None, max_connection_pool_size=None, connection_acquisition_timeout_ms=None): @@ -18,6 +19,8 @@ def __init__(self, backend, uri, auth_token, user_agent=None, resolverRegistered=resolver_fn is not None, domainNameResolverRegistered=domain_name_resolver_fn is not None, connectionTimeoutMs=connection_timeout_ms, + sessionConnectionTimeoutMs=session_connection_timeout_ms, + updateRoutingTableTimeoutMs=update_routing_table_timeout_ms, fetchSize=fetch_size, maxTxRetryTimeMs=max_tx_retry_time_ms, encrypted=encrypted, trustedCertificates=trusted_certificates, liveness_check_timeout_ms=liveness_check_timeout_ms, diff --git a/nutkit/protocol/feature.py b/nutkit/protocol/feature.py index 8fc7eb231..6c58544e0 100644 --- a/nutkit/protocol/feature.py +++ b/nutkit/protocol/feature.py @@ -4,9 +4,18 @@ class Feature(Enum): # === FUNCTIONAL FEATURES === + # The driver offers a configuration option to limit time it spends at most, + # trying to acquire a connection from the pool. + # The connection acquisition timeout must account for the whole acquisition + # execution time, whether a new connection is created, an idle connection + # is picked up instead or we need to wait until the full pool depletes. + API_CONNECTION_ACQUISITION_TIMEOUT = \ + "Feature:API:ConnectionAcquisitionTimeout" # The driver offers a method for driver objects to report if they were # configured with a or without encryption. API_DRIVER_IS_ENCRYPTED = "Feature:API:Driver.IsEncrypted" + # The driver supports connection liveness check. + API_LIVENESS_CHECK = "Feature:API:Liveness.Check" # The driver offers a method for the result to return all records as a list # or array. This method should exhaust the result. API_RESULT_LIST = "Feature:API:Result.List" @@ -18,8 +27,13 @@ class Feature(Enum): # This methods asserts that exactly one record in left in the result # stream, else it will raise an exception. API_RESULT_SINGLE = "Feature:API:Result.Single" - # The driver supports connection liveness check. - API_LIVENESS_CHECK = "Feature:API:Liveness.Check" + # The driver offers a configuration option to limit time it spends at most, + # trying to acquire a usable read/write connection for any session. + # The connection acquisition timeout must account for the whole acquisition + # execution time, whether a new connection is created, an idle connection + # is picked up instead, we need to wait until the full pool depletes, or + # a routing table must be fetched. + API_SESSION_CONNECTION_TIMEOUT = "Feature:API:SessionConnectionTimeout" # The driver implements explicit configuration options for SSL. # - enable / disable SSL # - verify signature against system store / custom cert / not at all @@ -31,6 +45,9 @@ class Feature(Enum): API_SSL_SCHEMES = "Feature:API:SSLSchemes" # The driver supports sending and receiving temporal data types. API_TYPE_TEMPORAL = "Feature:API:Type.Temporal" + # The driver offers a configuration option to limit time it spends at most, + # trying to update the routing table whenever needed. + API_UPDATE_ROUTING_TABLE_TIMEOUT = "Feature:API:UpdateRoutingTableTimeout" # The driver supports single-sign-on (SSO) by providing a bearer auth token # API. AUTH_BEARER = "Feature:Auth:Bearer" diff --git a/nutkit/protocol/requests.py b/nutkit/protocol/requests.py index 283069d97..d541d8f7b 100644 --- a/nutkit/protocol/requests.py +++ b/nutkit/protocol/requests.py @@ -48,6 +48,7 @@ class NewDriver: def __init__( self, uri, authToken, userAgent=None, resolverRegistered=False, domainNameResolverRegistered=False, connectionTimeoutMs=None, + sessionConnectionTimeoutMs=None, updateRoutingTableTimeoutMs=None, fetchSize=None, maxTxRetryTimeMs=None, encrypted=None, trustedCertificates=None, liveness_check_timeout_ms=None, max_connection_pool_size=None, @@ -62,6 +63,8 @@ def __init__( self.resolverRegistered = resolverRegistered self.domainNameResolverRegistered = domainNameResolverRegistered self.connectionTimeoutMs = connectionTimeoutMs + self.sessionConnectionTimeoutMs = sessionConnectionTimeoutMs + self.updateRoutingTableTimeoutMs = updateRoutingTableTimeoutMs self.fetchSize = fetchSize self.maxTxRetryTimeMs = maxTxRetryTimeMs self.livenessCheckTimeoutMs = liveness_check_timeout_ms diff --git a/tests/stub/driver_parameters/__init__.py b/tests/stub/driver_parameters/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/stub/driver_parameters/scripts/router.script b/tests/stub/driver_parameters/scripts/router.script new file mode 100644 index 000000000..fc8836f36 --- /dev/null +++ b/tests/stub/driver_parameters/scripts/router.script @@ -0,0 +1,12 @@ +!: BOLT 4.4 +!: AUTO RESET +!: ALLOW RESTART + +A: HELLO {"{}": "*"} +*: RESET +{+ + C: ROUTE "*" "*" "*" + S: SUCCESS { "rt": { "ttl": 1000, "servers": [{"addresses": ["#HOST#:9000"], "role":"ROUTE"}, {"addresses": ["#HOST#:9010"], "role":"READ"}, {"addresses": ["#HOST#:9010"], "role":"WRITE"}]}} + *: RESET ++} +?: GOODBYE diff --git a/tests/stub/driver_parameters/scripts/router_hello_delay.script b/tests/stub/driver_parameters/scripts/router_hello_delay.script new file mode 100644 index 000000000..b146cb073 --- /dev/null +++ b/tests/stub/driver_parameters/scripts/router_hello_delay.script @@ -0,0 +1,16 @@ +!: BOLT 4.4 +!: AUTO RESET +!: ALLOW RESTART + +C: HELLO {"{}": "*"} +S: 2 + + 2 + SUCCESS {"server": "Neo4j/5.0.0", "connection_id": "bolt-123456789"} +*: RESET +{+ + C: ROUTE "*" "*" "*" + S: SUCCESS { "rt": { "ttl": 1000, "servers": [{"addresses": ["#HOST#:9000"], "role":"ROUTE"}, {"addresses": ["#HOST#:9010"], "role":"READ"}, {"addresses": ["#HOST#:9010"], "role":"WRITE"}]}} + *: RESET ++} +?: GOODBYE diff --git a/tests/stub/driver_parameters/scripts/router_route_delay.script b/tests/stub/driver_parameters/scripts/router_route_delay.script new file mode 100644 index 000000000..60358a03e --- /dev/null +++ b/tests/stub/driver_parameters/scripts/router_route_delay.script @@ -0,0 +1,15 @@ +!: BOLT 4.4 +!: AUTO RESET +!: ALLOW RESTART + +A: HELLO {"{}": "*"} +*: RESET +{+ + C: ROUTE "*" "*" "*" + S: 2 + + 2 + SUCCESS { "rt": { "ttl": 1000, "servers": [{"addresses": ["#HOST#:9000"], "role":"ROUTE"}, {"addresses": ["#HOST#:9010"], "role":"READ"}, {"addresses": ["#HOST#:9010"], "role":"WRITE"}]}} + *: RESET ++} +?: GOODBYE diff --git a/tests/stub/driver_parameters/scripts/session_run.script b/tests/stub/driver_parameters/scripts/session_run.script new file mode 100644 index 000000000..890c92993 --- /dev/null +++ b/tests/stub/driver_parameters/scripts/session_run.script @@ -0,0 +1,12 @@ +!: BOLT 4.4 +!: ALLOW CONCURRENT + +A: HELLO {"{}": "*"} +*: RESET +C: RUN "*" "*" "*" +S: SUCCESS {"fields": ["n"]} +C: PULL {"n": "*"} +S: RECORD [1] + SUCCESS {"type": "r"} +*: RESET +?: GOODBYE diff --git a/tests/stub/driver_parameters/scripts/session_run_auth_delay.script b/tests/stub/driver_parameters/scripts/session_run_auth_delay.script new file mode 100644 index 000000000..c5fa948ff --- /dev/null +++ b/tests/stub/driver_parameters/scripts/session_run_auth_delay.script @@ -0,0 +1,16 @@ +!: BOLT 4.4 +!: ALLOW CONCURRENT + +C: HELLO {"{}": "*"} +S: 2 + + 2 + SUCCESS {"server": "Neo4j/5.0.0", "connection_id": "bolt-123456789"} +*: RESET +C: RUN "*" "*" "*" +S: SUCCESS {"fields": ["n"]} +C: PULL {"n": "*"} +S: RECORD [1] + SUCCESS {"type": "r"} +*: RESET +?: GOODBYE diff --git a/tests/stub/driver_parameters/scripts/tx_without_commit_or_rollback.script b/tests/stub/driver_parameters/scripts/tx_without_commit_or_rollback.script new file mode 100644 index 000000000..21f83003a --- /dev/null +++ b/tests/stub/driver_parameters/scripts/tx_without_commit_or_rollback.script @@ -0,0 +1,14 @@ +!: BOLT 4.4 +!: ALLOW CONCURRENT + +A: HELLO {"{}": "*"} +*: RESET + +C: BEGIN {"[mode]": "*"} +S: SUCCESS {} +C: RUN "*" "*" "*" +S: SUCCESS {"fields": ["n"]} +C: PULL {"n": "*"} +S: RECORD [1] + SUCCESS {"type": "r"} +?: GOODBYE diff --git a/tests/stub/driver_parameters/test_connection_acquisition_timeout_ms.py b/tests/stub/driver_parameters/test_connection_acquisition_timeout_ms.py new file mode 100644 index 000000000..07c7f268f --- /dev/null +++ b/tests/stub/driver_parameters/test_connection_acquisition_timeout_ms.py @@ -0,0 +1,254 @@ +from nutkit.frontend import Driver +import nutkit.protocol as types +from tests.shared import ( + driver_feature, + TestkitTestCase, +) +from tests.stub.shared import StubServer + + +class TestConnectionAcquisitionTimeoutMs(TestkitTestCase): + """ + Connection Acquisition Timeout Tests. + + The connection acquisition timeout must account for the + whole acquisition execution time, whether a new connection is created, + an idle connection is picked up instead or we need to wait + until the full pool depletes. + + In particular, the connection acquisition timeout (CAT) has precedence + over the socket connection timeout (SCT). + + If the SCT is set to 2 hours and CAT to 50ms, + the connection acquisition should time out after 50ms, + even if the connection is successfully created within the SCT period. + + The CAT must NOT be replaced by the lowest of the two values (CAT and SCT). + Indeed, even if SCT is lower than CAT, there could be situations + where the pool takes longer to borrow an _idle_ connection than the SCT. + Such a scenario should work as long as the overall acquisition happens + within the CAT. + This is unfortunately hard to translate into a test. + """ + + required_features = ( + types.Feature.BOLT_4_4, + types.Feature.API_CONNECTION_ACQUISITION_TIMEOUT, + ) + + def setUp(self): + super().setUp() + self._server = StubServer(9010) + self._router = StubServer(9000) + self._driver = None + self._session = None + self._sessions = [] + self._txs = [] + + def tearDown(self): + self._server.reset() + self._router.reset() + for tx in self._txs: + with self.assertRaises(types.DriverError): + # The server does not accept ending the transaction. + # We still call it to potentially free resources. + tx.commit() + + for s in self._sessions: + s.close() + + if self._session: + self._session.close() + + if self._driver: + self._driver.close() + + return super().tearDown() + + def _start_server(self, server, script): + server.start(self.script_path(script), + vars_={"#HOST#": self._router.host}) + + def test_should_work_when_every_step_is_done_in_time(self): + """ + Everything in time scenario. + + This test scenario tests the case where: + + 1. the connection acquisition timeout is higher than + the connection creation timeout + 2. the connection is successfully created and in due time + + Then the query is executed successfully + """ + self._start_server(self._server, "session_run_auth_delay.script") + + auth = types.AuthorizationToken("basic", principal="neo4j", + credentials="pass") + uri = "bolt://%s" % self._server.address + self._driver = Driver(self._backend, uri, auth, + connection_acquisition_timeout_ms=10000, + connection_timeout_ms=5000) + + self._session = self._driver.session("r") + + list(self._session.run("RETURN 1 AS n")) + + def test_should_encompass_the_handshake_time(self): + """ + Handshake takes longer scenario. + + This test scenario tests the case where: + + 1. the connection acquisition timeout is smaller than + the connection creation timeout + 2. the connection is successfully created and in due time + 3. the handshake takes longer than the connection acquisition timeout + + Then the query is not executed since the connection acquisition + timed out. + """ + self._start_server(self._server, "session_run_auth_delay.script") + + auth = types.AuthorizationToken("basic", principal="neo4j", + credentials="pass") + uri = "bolt://%s" % self._server.address + self._driver = Driver(self._backend, uri, auth, + connection_acquisition_timeout_ms=2000, + connection_timeout_ms=720000) + + self._session = self._driver.session("r") + + with self.assertRaises(types.DriverError): + list(self._session.run("RETURN 1 AS n")) + + def test_should_fail_when_acquisition_timeout_is_reached_first(self): + """ + Connection creation bigger than acquisition timeout scenario. + + This test scenario tests the case where: + + 1. the connection acquisition timeout is smaller than + the connection creation timeout + 2. the connection takes longer than the + acquisition timeout to be created + + Then the query is not executed since the connection acquisition + times out. + """ + auth = types.AuthorizationToken("basic", principal="neo4j", + credentials="pass") + + # Non routable address + uri = "bolt://10.255.255.255" + self._driver = Driver(self._backend, uri, auth, + connection_acquisition_timeout_ms=2000, + connection_timeout_ms=720000) + + self._session = self._driver.session("r") + + with self.assertRaises(types.DriverError): + list(self._session.run("RETURN 1 AS n")) + + def test_should_fail_when_connection_timeout_is_reached_first(self): + """ + Acquisition timeout bigger than connection creation timeout scenario. + + This test scenario tests the case where: + + 1. the connection acquisition timeout is bigger than + the connection creation timeout + 2. the connection is successfully takes longer than the + connection timeout to be created + + Then the query is not executed since the connection creation + times out. + """ + auth = types.AuthorizationToken("basic", principal="neo4j", + credentials="pass") + + # Non routable address + uri = "bolt://10.255.255.255" + self._driver = Driver(self._backend, uri, auth, + connection_acquisition_timeout_ms=72000, + connection_timeout_ms=2000) + + self._session = self._driver.session("r") + + with self.assertRaises(types.DriverError): + list(self._session.run("RETURN 1 AS n")) + + def test_does_not_encompass_router_handshake(self): + self._start_server(self._router, "router_hello_delay.script") + self._start_server(self._server, "session_run.script") + + uri = "neo4j://%s" % self._router.address + auth = types.AuthorizationToken("basic", principal="neo4j", + credentials="pass") + self._driver = Driver(self._backend, uri, auth, + connection_acquisition_timeout_ms=2000, + connection_timeout_ms=720000) + self._session = self._driver.session("r") + list(self._session.run("RETURN 1 AS n")) + + self._session.close() + self._session = None + self._driver.close() + self._driver = None + self._router.done() + self._server.done() + + def test_does_not_encompass_router_route_response(self): + self._start_server(self._router, "router_route_delay.script") + self._start_server(self._server, "session_run.script") + + uri = "neo4j://%s" % self._router.address + auth = types.AuthorizationToken("basic", principal="neo4j", + credentials="pass") + self._driver = Driver(self._backend, uri, auth, + connection_acquisition_timeout_ms=2000, + connection_timeout_ms=720000) + self._session = self._driver.session("r") + list(self._session.run("RETURN 1 AS n")) + + self._session.close() + self._session = None + self._driver.close() + self._driver = None + self._router.done() + self._server.done() + + @driver_feature(types.Feature.OPT_EAGER_TX_BEGIN) + def test_should_regulate_the_time_for_acquiring_connections(self): + """ + No connection available scenario. + + This test scenario tests the case where: + 1. The connection pool is configured for max 1 connection + 2. A connection is acquired and locked by another transaction + 3. When the new session try to acquire a connection, the connection + pool doesn't have connections available in suitable time + + Then the begin transaction is not executed + since the connection acquisition times out. + """ + self._start_server(self._server, + "tx_without_commit_or_rollback.script") + + auth = types.AuthorizationToken("basic", principal="neo4j", + credentials="pass") + uri = "bolt://%s" % self._server.address + self._driver = Driver(self._backend, uri, auth, + connection_acquisition_timeout_ms=2000, + connection_timeout_ms=720000, + max_connection_pool_size=1) + + self._sessions = [ + self._driver.session("r"), + self._driver.session("r"), + ] + + self._txs = [self._sessions[0].begin_transaction()] + + with self.assertRaises(types.DriverError): + self._txs.append(self._sessions[1].begin_transaction()) diff --git a/tests/stub/driver_parameters/test_max_connection_pool_size.py b/tests/stub/driver_parameters/test_max_connection_pool_size.py new file mode 100644 index 000000000..8ebf73c59 --- /dev/null +++ b/tests/stub/driver_parameters/test_max_connection_pool_size.py @@ -0,0 +1,100 @@ +from contextlib import contextmanager + +from nutkit.backend.backend import backend_timeout_adjustment +from nutkit.frontend import Driver +import nutkit.protocol as types +from tests.shared import TestkitTestCase +from tests.stub.shared import StubServer + + +class TestMaxConnectionPoolSize(TestkitTestCase): + + required_features = types.Feature.BOLT_4_4, + + def setUp(self): + super().setUp() + # This needs to be a port that's not used by other tests. + # Else, when testing the javascript driver in a browser (specifically + # Firefox), the browser might block this port for the driver after this + # test for security reasons. + self._server = StubServer(9999) + self._server.start( + self.script_path("tx_without_commit_or_rollback.script") + ) + self._driver = None + self._sessions = [] + self._transactions = [] + self._last_exc = None + + def tearDown(self): + # If test raised an exception this will make sure that the stub server + # is killed, and it's output is dumped for analysis. + self._server.reset() + for tx in self._transactions: + with self.assertRaises(types.DriverError): + # The server does not accept ending the transaction. + # We still call it to potentially free resources. + tx.commit() + for s in self._sessions: + s.close() + if self._driver: + self._driver.close() + super().tearDown() + + def _open_driver(self, max_pool_size=None): + assert self._driver is None + kwargs = {} + if self.driver_supports_features( + types.Feature.API_CONNECTION_ACQUISITION_TIMEOUT + ): + kwargs["connection_acquisition_timeout_ms"] = 500 + if self.driver_supports_features( + types.Feature.API_SESSION_CONNECTION_TIMEOUT + ): + kwargs["session_connection_timeout_ms"] = 1000 + if max_pool_size is not None: + kwargs["max_connection_pool_size"] = max_pool_size + auth = types.AuthorizationToken("basic", principal="neo4j", + credentials="pass") + uri = "bolt://%s" % self._server.address + self._driver = Driver(self._backend, uri, auth, **kwargs) + + @contextmanager + def _backend_timeout_adjustment(self): + if any( + self.driver_supports_features(feature) + for feature in ( + types.Feature.API_CONNECTION_ACQUISITION_TIMEOUT, + types.Feature.API_SESSION_CONNECTION_TIMEOUT, + ) + ): + yield + else: + with backend_timeout_adjustment(self._backend, 70): + yield + + def _open_connections(self, n): + for _ in range(n): + self._sessions.append(self._driver.session("r")) + self._transactions.append( + self._sessions[-1].begin_transaction() + ) + list(self._transactions[-1].run("RETURN 1 AS n")) + + def test_connection_pool_maxes_out_at_100_by_default(self): + self._open_driver() + self._open_connections(100) + with self._backend_timeout_adjustment(): + with self.assertRaises(types.DriverError): + self._open_connections(1) + self.assertEqual(self._server.count_responses(""), 0) + self.assertEqual(self._server.count_responses(""), 100) + + def test_connection_pool_custom_max_size(self): + self._open_driver(2) + self._open_connections(2) + with self._backend_timeout_adjustment(): + with self.assertRaises(types.DriverError): + self._open_connections(1) + self.assertEqual(self._server.count_responses(""), 0) + self.assertEqual(self._server.count_responses(""), 2) diff --git a/tests/stub/driver_parameters/test_session_connection_timeout.py b/tests/stub/driver_parameters/test_session_connection_timeout.py new file mode 100644 index 000000000..2375fa1f3 --- /dev/null +++ b/tests/stub/driver_parameters/test_session_connection_timeout.py @@ -0,0 +1,175 @@ +from nutkit.frontend import Driver +import nutkit.protocol as types +from tests.shared import TestkitTestCase +from tests.stub.shared import StubServer + + +class TestConnectionAcquisitionTimeoutMs(TestkitTestCase): + """ + Connection Acquisition Timeout Tests. + + The connection acquisition timeout must account for the + whole acquisition execution time, whether a new connection is created, + an idle connection is picked up instead or we need to wait + until the full pool depletes. + + In particular, the connection acquisition timeout (CAT) has precedence + over the socket connection timeout (SCT). + + If the SCT is set to 2 hours and CAT to 50ms, + the connection acquisition should time out after 50ms, + even if the connection is successfully created within the SCT period. + + The CAT must NOT be replaced by the lowest of the two values (CAT and SCT). + Indeed, even if SCT is lower than CAT, there could be situations + where the pool takes longer to borrow an _idle_ connection than the SCT. + Such a scenario should work as long as the overall acquisition happens + within the CAT. + This is unfortunately hard to translate into a test. + """ + + required_features = ( + types.Feature.BOLT_4_4, + types.Feature.API_SESSION_CONNECTION_TIMEOUT + ) + + def setUp(self): + super().setUp() + self._server = StubServer(9010) + self._router = StubServer(9000) + self._driver = None + self._session = None + self._sessions = [] + self._txs = [] + + def tearDown(self): + self._server.reset() + self._router.reset() + for tx in self._txs: + with self.assertRaises(types.DriverError): + # The server does not accept ending the transaction. + # We still call it to potentially free resources. + tx.commit() + + for s in self._sessions: + s.close() + + if self._session: + self._session.close() + + if self._driver: + self._driver.close() + + return super().tearDown() + + def _start_server(self, server, script): + server.start(self.script_path(script), + vars_={"#HOST#": self._router.host}) + + def test_should_work_when_every_step_is_done_in_time(self): + self._start_server(self._server, "session_run_auth_delay.script") + + auth = types.AuthorizationToken("basic", principal="neo4j", + credentials="pass") + uri = "bolt://%s" % self._server.address + self._driver = Driver(self._backend, uri, auth, + session_connection_timeout_ms=10000) + + self._session = self._driver.session("r") + + def test_should_work_when_every_step_is_done_in_time_with_routing(self): + self._start_server(self._server, "session_run_auth_delay.script") + self._start_server(self._router, "router_route_delay.script") + + auth = types.AuthorizationToken("basic", principal="neo4j", + credentials="pass") + uri = "bolt://%s" % self._server.address + self._driver = Driver(self._backend, uri, auth, + session_connection_timeout_ms=10000) + + self._session = self._driver.session("r") + + def test_encompasses_router_connection_time(self): + """Router connection times out.""" + uri = "neo4j://10.255.255.255" + auth = types.AuthorizationToken("basic", principal="neo4j", + credentials="pass") + self._driver = Driver(self._backend, uri, auth, + session_connection_timeout_ms=2000) + + self._session = self._driver.session("r") + + with self.assertRaises(types.DriverError): + list(self._session.run("RETURN 1 as n")) + + def test_encompasses_router_handshake(self): + """Router available but with delayed HELLO response.""" + self._start_server(self._router, "router_hello_delay.script") + self._start_server(self._server, "session_run.script") + + uri = "neo4j://%s" % self._router.address + auth = types.AuthorizationToken("basic", principal="neo4j", + credentials="pass") + self._driver = Driver(self._backend, uri, auth, + session_connection_timeout_ms=2000) + + self._session = self._driver.session("r") + + with self.assertRaises(types.DriverError): + list(self._session.run("RETURN 1 as n")) + + self._session.close() + self._session = None + self._driver.close() + self._driver = None + self._server.reset() + reader_connections = self._server.count_responses("") + self.assertEqual(0, reader_connections) + + def test_encompasses_router_route_response(self): + """Router available but with delayed ROUTE response.""" + self._start_server(self._router, "router_route_delay.script") + self._start_server(self._server, "session_run.script") + + uri = "neo4j://%s" % self._router.address + auth = types.AuthorizationToken("basic", principal="neo4j", + credentials="pass") + self._driver = Driver(self._backend, uri, auth, + session_connection_timeout_ms=2000) + + self._session = self._driver.session("r") + + with self.assertRaises(types.DriverError): + list(self._session.run("RETURN 1 as n")) + + self._session.close() + self._session = None + self._driver.close() + self._driver = None + self._server.reset() + reader_connections = self._server.count_responses("") + self.assertEqual(0, reader_connections) + + def test_combined_router_and_reader_delay(self): + """Slow but in time router + slow but in time router == too slow.""" + self._start_server(self._router, "router_hello_delay.script") + self._start_server(self._server, "session_run_auth_delay.script") + + uri = "neo4j://%s" % self._router.address + auth = types.AuthorizationToken("basic", principal="neo4j", + credentials="pass") + self._driver = Driver(self._backend, uri, auth, + session_connection_timeout_ms=6000) + + self._session = self._driver.session("r") + + with self.assertRaises(types.DriverError): + list(self._session.run("RETURN 1 as n")) + + self._session.close() + self._session = None + self._driver.close() + self._driver = None + self._server.reset() + reader_connections = self._server.count_responses("") + self.assertEqual(1, reader_connections) diff --git a/tests/stub/driver_parameters/test_update_routing_table_timeout_ms.py b/tests/stub/driver_parameters/test_update_routing_table_timeout_ms.py new file mode 100644 index 000000000..d53a22798 --- /dev/null +++ b/tests/stub/driver_parameters/test_update_routing_table_timeout_ms.py @@ -0,0 +1,143 @@ +from nutkit.frontend import Driver +import nutkit.protocol as types +from tests.shared import TestkitTestCase +from tests.stub.shared import StubServer + + +class TestUpdateRoutingTableTimeoutMs(TestkitTestCase): + + required_features = ( + types.Feature.BOLT_4_4, + types.Feature.API_UPDATE_ROUTING_TABLE_TIMEOUT, + ) + + def setUp(self): + super().setUp() + self._server = StubServer(9010) + self._router = StubServer(9000) + self._driver = None + self._session = None + self._sessions = [] + self._txs = [] + + def tearDown(self): + self._server.reset() + self._router.reset() + for tx in self._txs: + with self.assertRaises(types.DriverError): + # The server does not accept ending the transaction. + # We still call it to potentially free resources. + tx.commit() + + for s in self._sessions: + s.close() + + if self._session: + self._session.close() + + if self._driver: + self._driver.close() + + return super().tearDown() + + def _start_server(self, server, script): + server.start(self.script_path(script), + vars_={"#HOST#": self._router.host}) + + def test_should_work_when_every_step_is_done_in_time(self): + """Everything in time.""" + self._start_server(self._router, "router.script") + self._start_server(self._server, "session_run.script") + uri = "neo4j://10.255.255.255" + auth = types.AuthorizationToken("basic", principal="neo4j", + credentials="pass") + + uri = "neo4j://%s" % self._router.address + self._driver = Driver(self._backend, uri, auth, + update_routing_table_timeout_ms=2000) + + self._session = self._driver.session("r") + + list(self._session.run("RETURN 1 AS n")) + + def test_encompasses_router_connection_time(self): + """Router connection times out.""" + uri = "neo4j://10.255.255.255" + auth = types.AuthorizationToken("basic", principal="neo4j", + credentials="pass") + self._driver = Driver(self._backend, uri, auth, + update_routing_table_timeout_ms=2000) + + self._session = self._driver.session("r") + + with self.assertRaises(types.DriverError): + list(self._session.run("RETURN 1 as n")) + + def test_encompasses_router_handshake(self): + """Router available but with delayed HELLO response.""" + self._start_server(self._router, "router_hello_delay.script") + self._start_server(self._server, "session_run.script") + + uri = "neo4j://%s" % self._router.address + auth = types.AuthorizationToken("basic", principal="neo4j", + credentials="pass") + self._driver = Driver(self._backend, uri, auth, + update_routing_table_timeout_ms=2000) + + self._session = self._driver.session("r") + + with self.assertRaises(types.DriverError): + list(self._session.run("RETURN 1 as n")) + + self._session.close() + self._session = None + self._driver.close() + self._driver = None + self._server.reset() + reader_connections = self._server.count_responses("") + self.assertEqual(0, reader_connections) + + def test_encompasses_router_route_response(self): + """Router available but with delayed ROUTE response.""" + self._start_server(self._router, "router_route_delay.script") + self._start_server(self._server, "session_run.script") + + uri = "neo4j://%s" % self._router.address + auth = types.AuthorizationToken("basic", principal="neo4j", + credentials="pass") + self._driver = Driver(self._backend, uri, auth, + update_routing_table_timeout_ms=2000) + + self._session = self._driver.session("r") + + with self.assertRaises(types.DriverError): + list(self._session.run("RETURN 1 as n")) + + self._session.close() + self._session = None + self._driver.close() + self._driver = None + self._server.reset() + reader_connections = self._server.count_responses("") + self.assertEqual(0, reader_connections) + + def test_does_not_encompass_reader_connection_time(self): + self._start_server(self._router, "router.script") + self._start_server(self._server, "session_run_auth_delay.script") + + uri = "neo4j://%s" % self._router.address + auth = types.AuthorizationToken("basic", principal="neo4j", + credentials="pass") + self._driver = Driver(self._backend, uri, auth, + update_routing_table_timeout_ms=2000) + + self._session = self._driver.session("r") + + list(self._session.run("RETURN 1 AS n")) + + self._session.close() + self._session = None + self._driver.close() + self._driver = None + self._router.done() + self._server.done() diff --git a/tests/stub/routing/test_routing_v4x4.py b/tests/stub/routing/test_routing_v4x4.py index ea28ef5df..045c69aa0 100644 --- a/tests/stub/routing/test_routing_v4x4.py +++ b/tests/stub/routing/test_routing_v4x4.py @@ -2776,7 +2776,7 @@ def test_should_drop_connections_failing_liveness_check(self): def test_should_enforce_pool_size_per_cluster_member(self): driver = Driver(self._backend, self._uri_with_context, self._auth, self._userAgent, max_connection_pool_size=1, - connection_acquisition_timeout_ms=10) + connection_acquisition_timeout_ms=250) self.start_server(self._routingServer1, "router_adb_multi_no_bookmarks.script") self.start_server(self._writeServer1, "writer_tx.script") @@ -2804,9 +2804,9 @@ def test_should_enforce_pool_size_per_cluster_member(self): "org.neo4j.driver.exceptions.ClientException", exc.exception.errorType ) - self.assertTrue("Unable to acquire connection from the " - "pool within configured maximum time of 10ms" - in exc.exception.msg) + self.assertIn("Unable to acquire connection from the " + "pool within configured maximum time of 250ms", + exc.exception.msg) session2.close()