diff --git a/python/psqlpy/__init__.py b/python/psqlpy/__init__.py index 98a7c154..baaa76f8 100644 --- a/python/psqlpy/__init__.py +++ b/python/psqlpy/__init__.py @@ -4,9 +4,11 @@ ConnRecyclingMethod, Cursor, IsolationLevel, + LoadBalanceHosts, QueryResult, ReadVariant, SingleQueryResult, + TargetSessionAttrs, Transaction, connect, ) @@ -22,4 +24,6 @@ "IsolationLevel", "ReadVariant", "connect", + "LoadBalanceHosts", + "TargetSessionAttrs", ] diff --git a/python/psqlpy/_internal/__init__.pyi b/python/psqlpy/_internal/__init__.pyi index df1b05d5..af6a8374 100644 --- a/python/psqlpy/_internal/__init__.pyi +++ b/python/psqlpy/_internal/__init__.pyi @@ -1,6 +1,6 @@ import types from enum import Enum -from typing import Any, Callable, Optional, TypeVar +from typing import Any, Callable, List, Optional, TypeVar from typing_extensions import Self @@ -104,6 +104,24 @@ class IsolationLevel(Enum): RepeatableRead = 3 Serializable = 4 +class LoadBalanceHosts(Enum): + """Load balancing configuration.""" + + # Make connection attempts to hosts in the order provided. + Disable = 1 + # Make connection attempts to hosts in a random order. + Random = 2 + +class TargetSessionAttrs(Enum): + """Properties required of a session.""" + + # No special properties are required. + Any = 1 + # The session must allow writes. + ReadWrite = 2 + # The session allow only reads. + ReadOnly = 3 + class ReadVariant(Enum): """Class for Read Variant for transaction.""" @@ -869,8 +887,24 @@ class ConnectionPool: username: Optional[str] = None, password: Optional[str] = None, host: Optional[str] = None, + hosts: Optional[List[str]] = None, port: Optional[int] = None, + ports: Optional[List[int]] = None, db_name: Optional[str] = None, + target_session_attrs: Optional[TargetSessionAttrs] = None, + options: Optional[str] = None, + application_name: Optional[str] = None, + connect_timeout_sec: Optional[int] = None, + connect_timeout_nanosec: Optional[int] = None, + tcp_user_timeout_sec: Optional[int] = None, + tcp_user_timeout_nanosec: Optional[int] = None, + keepalives: Optional[bool] = None, + keepalives_idle_sec: Optional[int] = None, + keepalives_idle_nanosec: Optional[int] = None, + keepalives_interval_sec: Optional[int] = None, + keepalives_interval_nanosec: Optional[int] = None, + keepalives_retries: Optional[int] = None, + load_balance_hosts: Optional[LoadBalanceHosts] = None, max_db_pool_size: int = 2, conn_recycling_method: Optional[ConnRecyclingMethod] = None, ) -> None: @@ -879,22 +913,67 @@ class ConnectionPool: It connects to the database and create pool. You cannot set the minimum size for the connection - pool, by default it is 1. + pool, by it is 0. + `ConnectionPool` doesn't create connections on startup. + It makes new connection on demand. - This connection pool can: - - Startup itself with `startup` method - - Execute queries and return `QueryResult` class as a result - - Create new instance of `Transaction` + If you specify `dsn` parameter then `username`, `password`, + `host`, `hosts`, `port`, `ports`, `db_name` and `target_session_attrs` + parameters will be ignored. ### Parameters: - - `dsn`: full dsn connection string. + - `dsn`: Full dsn connection string. `postgres://postgres:postgres@localhost:5432/postgres?target_session_attrs=read-write` - - `username`: username of the user in postgres - - `password`: password of the user in postgres - - `host`: host of postgres - - `port`: port of postgres - - `db_name`: name of the database in postgres - - `max_db_pool_size`: maximum size of the connection pool + - `username`: Username of the user in the PostgreSQL + - `password`: Password of the user in the PostgreSQL + - `host`: Host of the PostgreSQL + - `hosts`: Hosts of the PostgreSQL + - `port`: Port of the PostgreSQL + - `ports`: Ports of the PostgreSQL + - `db_name`: Name of the database in PostgreSQL + - `target_session_attrs`: Specifies requirements of the session. + - `options`: Command line options used to configure the server + - `application_name`: Sets the application_name parameter on the server. + - `connect_timeout_sec`: The time limit in seconds applied to each socket-level + connection attempt. + Note that hostnames can resolve to multiple IP addresses, + and this limit is applied to each address. Defaults to no timeout. + - `connect_timeout_nanosec`: nanosec for connection timeout, + can be used only with connect_timeout_sec. + - `tcp_user_timeout_sec`: The time limit that + transmitted data may remain unacknowledged + before a connection is forcibly closed. + This is ignored for Unix domain socket connections. + It is only supported on systems where TCP_USER_TIMEOUT + is available and will default to the system default if omitted + or set to 0; on other systems, it has no effect. + - `tcp_user_timeout_nanosec`: nanosec for cp_user_timeout, + can be used only with tcp_user_timeout_sec. + - `keepalives`: Controls the use of TCP keepalive. + This option is ignored when connecting with Unix sockets. + Defaults to on. + - `keepalives_idle_sec`: The number of seconds of inactivity after + which a keepalive message is sent to the server. + This option is ignored when connecting with Unix sockets. + Defaults to 2 hours. + - `keepalives_idle_nanosec`: Nanosec for keepalives_idle_sec. + - `keepalives_interval_sec`: The time interval between TCP keepalive probes. + This option is ignored when connecting with Unix sockets. + - `keepalives_interval_nanosec`: Nanosec for keepalives_interval_sec. + - `keepalives_retries`: The maximum number of TCP keepalive probes + that will be sent before dropping a connection. + This option is ignored when connecting with Unix sockets. + - `load_balance_hosts`: Controls the order in which the client tries to connect + to the available hosts and addresses. + Once a connection attempt is successful no other + hosts and addresses will be tried. + This parameter is typically used in combination with multiple host names + or a DNS record that returns multiple IPs. + If set to disable, hosts and addresses will be tried in the order provided. + If set to random, hosts will be tried in a random order, and the IP addresses + resolved from a hostname will also be tried in a random order. + Defaults to disable. + - `max_db_pool_size`: maximum size of the connection pool. - `conn_recycling_method`: how a connection is recycled. """ async def execute( @@ -945,21 +1024,92 @@ def connect( username: Optional[str] = None, password: Optional[str] = None, host: Optional[str] = None, + hosts: Optional[List[str]] = None, port: Optional[int] = None, + ports: Optional[List[int]] = None, db_name: Optional[str] = None, + target_session_attrs: Optional[TargetSessionAttrs] = None, + options: Optional[str] = None, + application_name: Optional[str] = None, + connect_timeout_sec: Optional[int] = None, + connect_timeout_nanosec: Optional[int] = None, + tcp_user_timeout_sec: Optional[int] = None, + tcp_user_timeout_nanosec: Optional[int] = None, + keepalives: Optional[bool] = None, + keepalives_idle_sec: Optional[int] = None, + keepalives_idle_nanosec: Optional[int] = None, + keepalives_interval_sec: Optional[int] = None, + keepalives_interval_nanosec: Optional[int] = None, + keepalives_retries: Optional[int] = None, + load_balance_hosts: Optional[LoadBalanceHosts] = None, max_db_pool_size: int = 2, conn_recycling_method: Optional[ConnRecyclingMethod] = None, ) -> ConnectionPool: - """Create new connection pool. + """Create new PostgreSQL connection pool. + + It connects to the database and create pool. + + You cannot set the minimum size for the connection + pool, by it is 0. + `ConnectionPool` doesn't create connections on startup. + It makes new connection on demand. + + If you specify `dsn` parameter then `username`, `password`, + `host`, `hosts`, `port`, `ports`, `db_name` and `target_session_attrs` + parameters will be ignored. ### Parameters: - - `dsn`: full dsn connection string. + - `dsn`: Full dsn connection string. `postgres://postgres:postgres@localhost:5432/postgres?target_session_attrs=read-write` - - `username`: username of the user in postgres - - `password`: password of the user in postgres - - `host`: host of postgres - - `port`: port of postgres - - `db_name`: name of the database in postgres - - `max_db_pool_size`: maximum size of the connection pool + - `username`: Username of the user in the PostgreSQL + - `password`: Password of the user in the PostgreSQL + - `host`: Host of the PostgreSQL + - `hosts`: Hosts of the PostgreSQL + - `port`: Port of the PostgreSQL + - `ports`: Ports of the PostgreSQL + - `db_name`: Name of the database in PostgreSQL + - `target_session_attrs`: Specifies requirements of the session. + - `options`: Command line options used to configure the server + - `application_name`: Sets the application_name parameter on the server. + - `connect_timeout_sec`: The time limit in seconds applied to each socket-level + connection attempt. + Note that hostnames can resolve to multiple IP addresses, + and this limit is applied to each address. Defaults to no timeout. + - `connect_timeout_nanosec`: nanosec for connection timeout, + can be used only with connect_timeout_sec. + - `tcp_user_timeout_sec`: The time limit that + transmitted data may remain unacknowledged + before a connection is forcibly closed. + This is ignored for Unix domain socket connections. + It is only supported on systems where TCP_USER_TIMEOUT + is available and will default to the system default if omitted + or set to 0; on other systems, it has no effect. + - `tcp_user_timeout_nanosec`: nanosec for cp_user_timeout, + can be used only with tcp_user_timeout_sec. + - `keepalives`: Controls the use of TCP keepalive. + This option is ignored when connecting with Unix sockets. + Defaults to on. + - `keepalives_idle_sec`: The number of seconds of inactivity after + which a keepalive message is sent to the server. + This option is ignored when connecting with Unix sockets. + Defaults to 2 hours. + - `keepalives_idle_nanosec`: Nanosec for keepalives_idle_sec. + - `keepalives_interval_sec`: The time interval between TCP keepalive probes. + This option is ignored when connecting with Unix sockets. + - `keepalives_interval_nanosec`: Nanosec for keepalives_interval_sec. + - `keepalives_retries`: The maximum number of TCP keepalive probes + that will be sent before dropping a connection. + This option is ignored when connecting with Unix sockets. + - `load_balance_hosts`: Controls the order in which the client tries to connect + to the available hosts and addresses. + Once a connection attempt is successful no other + hosts and addresses will be tried. + This parameter is typically used in combination with multiple host names + or a DNS record that returns multiple IPs. + If set to disable, hosts and addresses will be tried in the order provided. + If set to random, hosts will be tried in a random order, and the IP addresses + resolved from a hostname will also be tried in a random order. + Defaults to disable. + - `max_db_pool_size`: maximum size of the connection pool. - `conn_recycling_method`: how a connection is recycled. """ diff --git a/python/tests/test_connection_pool.py b/python/tests/test_connection_pool.py index 467b2899..e2962dbf 100644 --- a/python/tests/test_connection_pool.py +++ b/python/tests/test_connection_pool.py @@ -1,7 +1,15 @@ import pytest -from psqlpy import Connection, ConnectionPool, ConnRecyclingMethod, QueryResult, connect -from psqlpy.exceptions import RustPSQLDriverPyBaseError +from psqlpy import ( + Connection, + ConnectionPool, + ConnRecyclingMethod, + LoadBalanceHosts, + QueryResult, + TargetSessionAttrs, + connect, +) +from psqlpy.exceptions import DBPoolConfigurationError, RustPSQLDriverPyBaseError pytestmark = pytest.mark.anyio @@ -68,6 +76,73 @@ async def test_pool_conn_recycling_method( await pg_pool.execute("SELECT 1") +async def test_build_pool_failure() -> None: + with pytest.raises(expected_exception=DBPoolConfigurationError): + ConnectionPool( + dsn="postgres://postgres:postgres@localhost:5432/psqlpy_test", + connect_timeout_nanosec=12, + ) + with pytest.raises(expected_exception=DBPoolConfigurationError): + ConnectionPool( + dsn="postgres://postgres:postgres@localhost:5432/psqlpy_test", + connect_timeout_nanosec=12, + ) + with pytest.raises(expected_exception=DBPoolConfigurationError): + ConnectionPool( + dsn="postgres://postgres:postgres@localhost:5432/psqlpy_test", + keepalives_idle_nanosec=12, + ) + with pytest.raises(expected_exception=DBPoolConfigurationError): + ConnectionPool( + dsn="postgres://postgres:postgres@localhost:5432/psqlpy_test", + keepalives_interval_nanosec=12, + ) + + +@pytest.mark.parametrize( + "target_session_attrs", + [ + TargetSessionAttrs.Any, + TargetSessionAttrs.ReadWrite, + TargetSessionAttrs.ReadOnly, + ], +) +async def test_pool_target_session_attrs( + target_session_attrs: TargetSessionAttrs, +) -> None: + pg_pool = ConnectionPool( + db_name="psqlpy_test", + host="localhost", + username="postgres", + password="postgres", # noqa: S106 + target_session_attrs=target_session_attrs, + ) + + if target_session_attrs == TargetSessionAttrs.ReadOnly: + with pytest.raises(expected_exception=RustPSQLDriverPyBaseError): + await pg_pool.execute("SELECT 1") + else: + await pg_pool.execute("SELECT 1") + + +@pytest.mark.parametrize( + "load_balance_hosts", + [ + LoadBalanceHosts.Disable, + LoadBalanceHosts.Random, + ], +) +async def test_pool_load_balance_hosts( + load_balance_hosts: LoadBalanceHosts, +) -> None: + pg_pool = ConnectionPool( + dsn="postgres://postgres:postgres@localhost:5432/psqlpy_test", + load_balance_hosts=load_balance_hosts, + ) + + await pg_pool.execute("SELECT 1") + + async def test_close_connection_pool() -> None: """Test that `close` method closes connection pool.""" pg_pool = ConnectionPool( diff --git a/src/driver/common_options.rs b/src/driver/common_options.rs index cd8cb362..2d906234 100644 --- a/src/driver/common_options.rs +++ b/src/driver/common_options.rs @@ -19,3 +19,44 @@ impl ConnRecyclingMethod { } } } + +#[pyclass] +#[derive(Clone, Copy)] +pub enum LoadBalanceHosts { + /// Make connection attempts to hosts in the order provided. + Disable, + /// Make connection attempts to hosts in a random order. + Random, +} + +impl LoadBalanceHosts { + #[must_use] + pub fn to_internal(&self) -> tokio_postgres::config::LoadBalanceHosts { + match self { + LoadBalanceHosts::Disable => tokio_postgres::config::LoadBalanceHosts::Disable, + LoadBalanceHosts::Random => tokio_postgres::config::LoadBalanceHosts::Random, + } + } +} + +#[pyclass] +#[derive(Clone, Copy)] +pub enum TargetSessionAttrs { + /// No special properties are required. + Any, + /// The session must allow writes. + ReadWrite, + /// The session allow only reads. + ReadOnly, +} + +impl TargetSessionAttrs { + #[must_use] + pub fn to_internal(&self) -> tokio_postgres::config::TargetSessionAttrs { + match self { + TargetSessionAttrs::Any => tokio_postgres::config::TargetSessionAttrs::Any, + TargetSessionAttrs::ReadWrite => tokio_postgres::config::TargetSessionAttrs::ReadWrite, + TargetSessionAttrs::ReadOnly => tokio_postgres::config::TargetSessionAttrs::ReadOnly, + } + } +} diff --git a/src/driver/connection_pool.rs b/src/driver/connection_pool.rs index fad7cb9e..caa36e32 100644 --- a/src/driver/connection_pool.rs +++ b/src/driver/connection_pool.rs @@ -1,7 +1,7 @@ use crate::runtime::tokio_runtime; use deadpool_postgres::{Manager, ManagerConfig, Object, Pool, RecyclingMethod}; use pyo3::{pyclass, pyfunction, pymethods, PyAny}; -use std::{str::FromStr, vec}; +use std::vec; use tokio_postgres::{NoTls, Row}; use crate::{ @@ -10,7 +10,11 @@ use crate::{ value_converter::{convert_parameters, PythonDTO, QueryParameter}, }; -use super::{common_options::ConnRecyclingMethod, connection::Connection}; +use super::{ + common_options::{ConnRecyclingMethod, LoadBalanceHosts, TargetSessionAttrs}, + connection::Connection, + utils::build_connection_config, +}; /// Make new connection pool. /// @@ -23,8 +27,25 @@ pub fn connect( username: Option, password: Option, host: Option, + hosts: Option>, port: Option, + ports: Option>, db_name: Option, + target_session_attrs: Option, + options: Option, + application_name: Option, + connect_timeout_sec: Option, + connect_timeout_nanosec: Option, + tcp_user_timeout_sec: Option, + tcp_user_timeout_nanosec: Option, + keepalives: Option, + keepalives_idle_sec: Option, + keepalives_idle_nanosec: Option, + keepalives_interval_sec: Option, + keepalives_interval_nanosec: Option, + keepalives_retries: Option, + load_balance_hosts: Option, + max_db_pool_size: Option, conn_recycling_method: Option, ) -> RustPSQLDriverPyResult { @@ -36,27 +57,30 @@ pub fn connect( } } - let mut pg_config: tokio_postgres::Config; - if let Some(dsn_string) = dsn { - pg_config = tokio_postgres::Config::from_str(&dsn_string)?; - } else { - pg_config = tokio_postgres::Config::new(); - if let (Some(password), Some(username)) = (password, username) { - pg_config.password(&password); - pg_config.user(&username); - } - if let Some(host) = host { - pg_config.host(&host); - } - - if let Some(port) = port { - pg_config.port(port); - } - - if let Some(db_name) = db_name { - pg_config.dbname(&db_name); - } - } + let pg_config = build_connection_config( + dsn, + username, + password, + host, + hosts, + port, + ports, + db_name, + target_session_attrs, + options, + application_name, + connect_timeout_sec, + connect_timeout_nanosec, + tcp_user_timeout_sec, + tcp_user_timeout_nanosec, + keepalives, + keepalives_idle_sec, + keepalives_idle_nanosec, + keepalives_interval_sec, + keepalives_interval_nanosec, + keepalives_retries, + load_balance_hosts, + )?; let mgr_config: ManagerConfig; if let Some(conn_recycling_method) = conn_recycling_method { @@ -96,8 +120,24 @@ impl ConnectionPool { username: Option, password: Option, host: Option, + hosts: Option>, port: Option, + ports: Option>, db_name: Option, + target_session_attrs: Option, + options: Option, + application_name: Option, + connect_timeout_sec: Option, + connect_timeout_nanosec: Option, + tcp_user_timeout_sec: Option, + tcp_user_timeout_nanosec: Option, + keepalives: Option, + keepalives_idle_sec: Option, + keepalives_idle_nanosec: Option, + keepalives_interval_sec: Option, + keepalives_interval_nanosec: Option, + keepalives_retries: Option, + load_balance_hosts: Option, max_db_pool_size: Option, conn_recycling_method: Option, ) -> RustPSQLDriverPyResult { @@ -106,8 +146,24 @@ impl ConnectionPool { username, password, host, + hosts, port, + ports, db_name, + target_session_attrs, + options, + application_name, + connect_timeout_sec, + connect_timeout_nanosec, + tcp_user_timeout_sec, + tcp_user_timeout_nanosec, + keepalives, + keepalives_idle_sec, + keepalives_idle_nanosec, + keepalives_interval_sec, + keepalives_interval_nanosec, + keepalives_retries, + load_balance_hosts, max_db_pool_size, conn_recycling_method, ) diff --git a/src/driver/mod.rs b/src/driver/mod.rs index aec33d5b..1ba0c203 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -4,3 +4,4 @@ pub mod connection_pool; pub mod cursor; pub mod transaction; pub mod transaction_options; +pub mod utils; diff --git a/src/driver/utils.rs b/src/driver/utils.rs new file mode 100644 index 00000000..df4cf1b2 --- /dev/null +++ b/src/driver/utils.rs @@ -0,0 +1,155 @@ +use std::{str::FromStr, time::Duration}; + +use crate::exceptions::rust_errors::{RustPSQLDriverError, RustPSQLDriverPyResult}; + +use super::common_options::{LoadBalanceHosts, TargetSessionAttrs}; + +/// Create new config. +/// +/// # Errors +/// May return Err Result if cannot build new config. +#[allow(clippy::too_many_arguments)] +pub fn build_connection_config( + dsn: Option, + username: Option, + password: Option, + host: Option, + hosts: Option>, + port: Option, + ports: Option>, + db_name: Option, + target_session_attrs: Option, + options: Option, + application_name: Option, + connect_timeout_sec: Option, + connect_timeout_nanosec: Option, + tcp_user_timeout_sec: Option, + tcp_user_timeout_nanosec: Option, + keepalives: Option, + keepalives_idle_sec: Option, + keepalives_idle_nanosec: Option, + keepalives_interval_sec: Option, + keepalives_interval_nanosec: Option, + keepalives_retries: Option, + load_balance_hosts: Option, +) -> RustPSQLDriverPyResult { + if tcp_user_timeout_nanosec.is_some() && tcp_user_timeout_sec.is_none() { + return Err(RustPSQLDriverError::DataBasePoolConfigurationError( + "tcp_user_timeout_nanosec must be used with tcp_user_timeout_sec param.".into(), + )); + } + + if connect_timeout_nanosec.is_some() && connect_timeout_sec.is_none() { + return Err(RustPSQLDriverError::DataBasePoolConfigurationError( + "connect_timeout_nanosec must be used with connect_timeout_sec param.".into(), + )); + } + + if keepalives_idle_nanosec.is_some() && keepalives_idle_sec.is_none() { + return Err(RustPSQLDriverError::DataBasePoolConfigurationError( + "keepalives_idle_nanosec must be used with keepalives_idle_sec param.".into(), + )); + } + + if keepalives_interval_nanosec.is_some() && keepalives_interval_sec.is_none() { + return Err(RustPSQLDriverError::DataBasePoolConfigurationError( + "keepalives_interval_nanosec must be used with keepalives_interval_sec param.".into(), + )); + } + + let mut pg_config: tokio_postgres::Config; + + if let Some(dsn_string) = dsn { + pg_config = tokio_postgres::Config::from_str(&dsn_string)?; + } else { + pg_config = tokio_postgres::Config::new(); + + if let Some(password) = password { + pg_config.password(&password); + } + + if let Some(username) = username { + pg_config.user(&username); + } + + if let Some(hosts) = hosts { + for single_host in hosts { + pg_config.host(&single_host); + } + } + + if let Some(host) = host { + pg_config.host(&host); + } + + if let Some(ports) = ports { + for single_port in ports { + pg_config.port(single_port); + } + } + + if let Some(port) = port { + pg_config.port(port); + } + + if let Some(db_name) = db_name { + pg_config.dbname(&db_name); + } + + if let Some(target_session_attrs) = target_session_attrs { + pg_config.target_session_attrs(target_session_attrs.to_internal()); + } + } + + if let Some(options) = options { + pg_config.options(&options); + } + + if let Some(application_name) = application_name { + pg_config.application_name(&application_name); + } + + if let Some(connect_timeout_sec) = connect_timeout_sec { + pg_config.connect_timeout(Duration::new( + connect_timeout_sec, + connect_timeout_nanosec.unwrap_or_default(), + )); + } + + if let Some(tcp_user_timeout_sec) = tcp_user_timeout_sec { + pg_config.tcp_user_timeout(Duration::new( + tcp_user_timeout_sec, + tcp_user_timeout_nanosec.unwrap_or_default(), + )); + } + + if let Some(keepalives) = keepalives { + if keepalives { + pg_config.keepalives(keepalives); + + if let Some(keepalives_idle_sec) = keepalives_idle_sec { + pg_config.keepalives_idle(Duration::new( + keepalives_idle_sec, + keepalives_idle_nanosec.unwrap_or_default(), + )); + } + + if let Some(keepalives_interval_sec) = keepalives_interval_sec { + pg_config.keepalives_interval(Duration::new( + keepalives_interval_sec, + keepalives_interval_nanosec.unwrap_or_default(), + )); + } + + if let Some(keepalives_retries) = keepalives_retries { + pg_config.keepalives_retries(keepalives_retries); + } + } + } + + if let Some(load_balance_hosts) = load_balance_hosts { + pg_config.load_balance_hosts(load_balance_hosts.to_internal()); + } + + Ok(pg_config) +} diff --git a/src/lib.rs b/src/lib.rs index 2392f098..cf26c4da 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,6 +23,8 @@ fn psqlpy(py: Python<'_>, pymod: &Bound<'_, PyModule>) -> PyResult<()> { pymod.add_class::()?; pymod.add_class::()?; pymod.add_class::()?; + pymod.add_class::()?; + pymod.add_class::()?; pymod.add_class::()?; pymod.add_class::()?; add_module(py, pymod, "extra_types", extra_types_module)?;