Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ async def main() -> None:
max_db_pool_size=2,
)

res: QueryResult = await db_pool.execute(
"SELECT * FROM users",
)
async with db_pool.acquire() as conn:
res: QueryResult = await conn.execute(
"SELECT * FROM users",
)

print(res.result())
db_pool.close()
Expand Down
49 changes: 0 additions & 49 deletions docs/components/connection_pool.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,55 +178,6 @@ It has 4 parameters:
- `available` - available connection in the connection pool.
- `waiting` - waiting requests to retrieve connection from connection pool.

### Execute

#### Parameters:

- `querystring`: Statement string.
- `parameters`: List of parameters for the statement string.
- `prepared`: Prepare statement before execution or not.

You can execute any query directly from Connection Pool.
This method supports parameters, each parameter must be marked as `$<number>` (number starts with 1).
Parameters must be passed as list after querystring.
::: caution
You must use `ConnectionPool.execute` method in high-load production code wisely!
It pulls connection from the pool each time you execute query.
Preferable way to execute statements with [Connection](./../components/connection.md) or [Transaction](./../components/transaction.md)
:::

```python
async def main() -> None:
...
results: QueryResult = await db_pool.execute(
"SELECT * FROM users WHERE id = $1 and username = $2",
[100, "Alex"],
)

dict_results: list[dict[str, Any]] = results.result()
```

### Fetch

#### Parameters:

- `querystring`: Statement string.
- `parameters`: List of parameters for the statement string.
- `prepared`: Prepare statement before execution or not.

The same as the `execute` method, for some people this naming is preferable.

```python
async def main() -> None:
...
results: QueryResult = await db_pool.fetch(
"SELECT * FROM users WHERE id = $1 and username = $2",
[100, "Alex"],
)

dict_results: list[dict[str, Any]] = results.result()
```

### Acquire

Get single connection for async context manager.
Expand Down
8 changes: 8 additions & 0 deletions docs/integrations/opentelemetry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
title: Integration with OpenTelemetry
---

# OTLP-PSQLPy

There is a library for OpenTelemetry support.
Please follow the [link](https://github.com/psqlpy-python/otlp-psqlpy)
8 changes: 8 additions & 0 deletions docs/integrations/taskiq.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
title: Integration with TaskIQ
---

# TaskIQ-PSQLPy

There is integration with [TaskIQ](https://github.com/taskiq-python/taskiq-psqlpy).
You can use PSQLPy for result backend.
16 changes: 6 additions & 10 deletions docs/introduction/lets_start.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,16 @@ async def main() -> None:
# It uses default connection parameters
db_pool: Final = ConnectionPool()

results: Final[QueryResult] = await db_pool.execute(
"SELECT * FROM users WHERE id = $1",
[2],
)
async with db_pool.acquire() as conn:
results: Final[QueryResult] = await conn.execute(
"SELECT * FROM users WHERE id = $1",
[2],
)

dict_results: Final[list[dict[Any, Any]]] = results.result()
db_pool.close()
```

::: tip
You must call `close()` on database pool when you application is shutting down.
:::
::: caution
You must not use `ConnectionPool.execute` method in high-load production code!
It pulls new connection from connection pull each call.
Recommended way to make queries is executing them with `Connection`, `Transaction` or `Cursor`.
It's better to call `close()` on database pool when you application is shutting down.
:::
6 changes: 4 additions & 2 deletions psqlpy-stress/psqlpy_stress/mocker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ def get_pool() -> psqlpy.ConnectionPool:
async def fill_users() -> None:
pool = get_pool()
users_amount = 10000000
connection = await pool.connection()
for _ in range(users_amount):
await pool.execute(
await connection.execute(
querystring="INSERT INTO users (username) VALUES($1)",
parameters=[str(uuid.uuid4())],
)
Expand All @@ -35,8 +36,9 @@ def generate_random_dict() -> dict[str, str]:
async def fill_big_table() -> None:
pool = get_pool()
big_table_amount = 10000000
connection = await pool.connection()
for _ in range(big_table_amount):
await pool.execute(
await connection.execute(
"INSERT INTO big_table (string_field, integer_field, json_field, array_field) VALUES($1, $2, $3, $4)",
parameters=[
str(uuid.uuid4()),
Expand Down
76 changes: 22 additions & 54 deletions python/psqlpy/_internal/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,16 @@ class Cursor:
It can be used as an asynchronous iterator.
"""

cursor_name: str
querystring: str
parameters: Sequence[Any]
prepared: bool | None
conn_dbname: str | None
user: str | None
host_addrs: list[str]
hosts: list[str]
ports: list[int]

def __aiter__(self: Self) -> Self: ...
async def __anext__(self: Self) -> QueryResult: ...
async def __aenter__(self: Self) -> Self: ...
Expand Down Expand Up @@ -424,6 +434,12 @@ class Transaction:
`.transaction()`.
"""

conn_dbname: str | None
user: str | None
host_addrs: list[str]
hosts: list[str]
ports: list[int]

async def __aenter__(self: Self) -> Self: ...
async def __aexit__(
self: Self,
Expand Down Expand Up @@ -874,6 +890,12 @@ class Connection:
It can be created only from connection pool.
"""

conn_dbname: str | None
user: str | None
host_addrs: list[str]
hosts: list[str]
ports: list[int]

async def __aenter__(self: Self) -> Self: ...
async def __aexit__(
self: Self,
Expand Down Expand Up @@ -1284,60 +1306,6 @@ class ConnectionPool:
### Parameters:
- `new_max_size`: new size for the connection pool.
"""
async def execute(
self: Self,
querystring: str,
parameters: Sequence[Any] | None = None,
prepared: bool = True,
) -> QueryResult:
"""Execute the query.

Querystring can contain `$<number>` parameters
for converting them in the driver side.

### Parameters:
- `querystring`: querystring to execute.
- `parameters`: list of parameters to pass in the query.
- `prepared`: should the querystring be prepared before the request.
By default any querystring will be prepared.

### Example:
```python
import asyncio

from psqlpy import PSQLPool, QueryResult

async def main() -> None:
db_pool = PSQLPool()
query_result: QueryResult = await psqlpy.execute(
"SELECT username FROM users WHERE id = $1",
[100],
)
dict_result: List[Dict[Any, Any]] = query_result.result()
# you don't need to close the pool,
# it will be dropped on Rust side.
```
"""
async def fetch(
self: Self,
querystring: str,
parameters: Sequence[Any] | None = None,
prepared: bool = True,
) -> QueryResult:
"""Fetch the result from database.

It's the same as `execute` method, we made it because people are used
to `fetch` method name.

Querystring can contain `$<number>` parameters
for converting them in the driver side.

### Parameters:
- `querystring`: querystring to execute.
- `parameters`: list of parameters to pass in the query.
- `prepared`: should the querystring be prepared before the request.
By default any querystring will be prepared.
"""
async def connection(self: Self) -> Connection:
"""Create new connection.

Expand Down
12 changes: 7 additions & 5 deletions python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,19 @@ async def create_default_data_for_tests(
table_name: str,
number_database_records: int,
) -> AsyncGenerator[None, None]:
await psql_pool.execute(
connection = await psql_pool.connection()
await connection.execute(
f"CREATE TABLE {table_name} (id SERIAL, name VARCHAR(255))",
)

for table_id in range(1, number_database_records + 1):
new_name = random_string()
await psql_pool.execute(
await connection.execute(
querystring=f"INSERT INTO {table_name} VALUES ($1, $2)",
parameters=[table_id, new_name],
)
yield
await psql_pool.execute(
await connection.execute(
f"DROP TABLE {table_name}",
)

Expand All @@ -147,14 +148,15 @@ async def create_table_for_listener_tests(
psql_pool: ConnectionPool,
listener_table_name: str,
) -> AsyncGenerator[None, None]:
await psql_pool.execute(
connection = await psql_pool.connection()
await connection.execute(
f"CREATE TABLE {listener_table_name}"
f"(id SERIAL, payload VARCHAR(255),"
f"channel VARCHAR(255), process_id INT)",
)

yield
await psql_pool.execute(
await connection.execute(
f"DROP TABLE {listener_table_name}",
)

Expand Down
25 changes: 14 additions & 11 deletions python/tests/test_binary_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ async def test_binary_copy_to_table_in_connection(
) -> None:
"""Test binary copy in connection."""
table_name: typing.Final = "cars"
await psql_pool.execute(f"DROP TABLE IF EXISTS {table_name}")
await psql_pool.execute(
connection = await psql_pool.connection()
await connection.execute(f"DROP TABLE IF EXISTS {table_name}")
await connection.execute(
"""
CREATE TABLE IF NOT EXISTS cars (
model VARCHAR,
Expand Down Expand Up @@ -46,17 +47,16 @@ async def test_binary_copy_to_table_in_connection(
buf.write(encoder.finish())
buf.seek(0)

async with psql_pool.acquire() as connection:
inserted_rows = await connection.binary_copy_to_table(
source=buf,
table_name=table_name,
)
inserted_rows = await connection.binary_copy_to_table(
source=buf,
table_name=table_name,
)

expected_inserted_row: typing.Final = 32

assert inserted_rows == expected_inserted_row

real_table_rows: typing.Final = await psql_pool.execute(
real_table_rows: typing.Final = await connection.execute(
f"SELECT COUNT(*) AS rows_count FROM {table_name}",
)
assert real_table_rows.result()[0]["rows_count"] == expected_inserted_row
Expand All @@ -67,8 +67,10 @@ async def test_binary_copy_to_table_in_transaction(
) -> None:
"""Test binary copy in transaction."""
table_name: typing.Final = "cars"
await psql_pool.execute(f"DROP TABLE IF EXISTS {table_name}")
await psql_pool.execute(

connection = await psql_pool.connection()
await connection.execute(f"DROP TABLE IF EXISTS {table_name}")
await connection.execute(
"""
CREATE TABLE IF NOT EXISTS cars (
model VARCHAR,
Expand Down Expand Up @@ -108,7 +110,8 @@ async def test_binary_copy_to_table_in_transaction(

assert inserted_rows == expected_inserted_row

real_table_rows: typing.Final = await psql_pool.execute(
connection = await psql_pool.connection()
real_table_rows: typing.Final = await connection.execute(
f"SELECT COUNT(*) AS rows_count FROM {table_name}",
)
assert real_table_rows.result()[0]["rows_count"] == expected_inserted_row
5 changes: 3 additions & 2 deletions python/tests/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,9 @@ async def test_closed_connection_error(

async def test_execute_batch_method(psql_pool: ConnectionPool) -> None:
"""Test `execute_batch` method."""
await psql_pool.execute(querystring="DROP TABLE IF EXISTS execute_batch")
await psql_pool.execute(querystring="DROP TABLE IF EXISTS execute_batch2")
connection = await psql_pool.connection()
await connection.execute(querystring="DROP TABLE IF EXISTS execute_batch")
await connection.execute(querystring="DROP TABLE IF EXISTS execute_batch2")
query = "CREATE TABLE execute_batch (name VARCHAR);CREATE TABLE execute_batch2 (name VARCHAR);"
async with psql_pool.acquire() as conn:
await conn.execute_batch(querystring=query)
Expand Down
Loading
Loading