Skip to content

Commit e9c8de0

Browse files
committed
vdk-core: add new managed db_connection_execute_operation hook
Currently we cannot track the full execution of a query (before and after it) . This is necessary in order to be able to generate proper lineage events (query start and end) or to track its duration (we had to currently had to add this to vdk-core (#804) but that's really better implemented as plugin (think aspect-oriented programming). In the future we should consider adding hook for fetch (fetchMany, fetchAll) as well as it's there where the data is returned (plugins can take stats - count number of rows, validate sensitive columns, do result validation tests, etc.) Testing Done: unit tests, functional test. Signed-off-by: Antoni Ivanov <[email protected]>
1 parent 13d334f commit e9c8de0

File tree

18 files changed

+380
-44
lines changed

18 files changed

+380
-44
lines changed

projects/vdk-core/src/vdk/api/plugin/connection_hook_spec.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55

66
from vdk.api.plugin.hook_markers import hookspec
77
from vdk.internal.builtin_plugins.connection.decoration_cursor import DecorationCursor
8+
from vdk.internal.builtin_plugins.connection.execution_cursor import (
9+
ExecuteOperationResult,
10+
)
11+
from vdk.internal.builtin_plugins.connection.execution_cursor import ExecutionCursor
812
from vdk.internal.builtin_plugins.connection.recovery_cursor import RecoveryCursor
913

1014

@@ -68,6 +72,68 @@ def db_connection_decorate_operation(
6872
"""
6973
pass
7074

75+
@hookspec(firstresult=True)
76+
def db_connection_execute_operation(
77+
self, execution_cursor: ExecutionCursor
78+
) -> Optional[ExecuteOperationResult]:
79+
"""
80+
The method that executes the actual SQL query using execution cursor.
81+
82+
When the hook is called the call loop will only invoke up to
83+
the first hookimpl which returns a result other then None.
84+
It is then taken as result of the overall hook call.
85+
86+
You can see the default implementation at DefaultConnectionHookImpl.
87+
88+
If you want to override the default implementation return a non-None result using tryfirst=True or no decorator.
89+
But in most cases you would not need to overwrite the default implementation (hence return None).
90+
If some hookimpl function raises an exception, the behaviour is as outlined in hook_markers module
91+
92+
Often it may be needed to wrap around it so that you can track the execution of the query.
93+
94+
For example:
95+
96+
.. code-block::
97+
98+
@hookimpl(hookwrapper=True)
99+
db_connection_execute_operation(execution_cursor: ExecutionCursor) -> Optional[int]:
100+
# let's track duration of the query
101+
start = time.time()
102+
log.info(f"Starting query: {execution_cursor.get_managed_operation().get_operation()}")
103+
outcome: pluggy.callers._Result
104+
outcome = yield # we yield the execution to other implementations (including default one)
105+
is_success: bool = outcome.excinfo is None
106+
log.info(f"Query finished. duration: {time.time() - start}. is_success: {is_success}")
107+
# no return!
108+
109+
Another example - we want to change the result
110+
111+
.. code-block::
112+
113+
@hookimpl(hookwrapper=True)
114+
db_connection_execute_operation(execution_cursor: ExecutionCursor) -> Optional[int]:
115+
outcome: pluggy.callers._Result
116+
outcome = yield #
117+
outcome.force_result(new_result) # set new return result
118+
119+
Another example - let's say we are writing vdk-impala plugin and want to print more debug info
120+
which is available from the Impala native cursor (provided by impyla library)
121+
122+
.. code-block::
123+
124+
@hookimpl(hookwrapper=True)
125+
db_connection_execute_operation(execution_cursor: ExecutionCursor) -> Optional[int]:
126+
yield # let the query execute first
127+
c = cast(impala.interface.Cursor, execution_cursor)
128+
log.info(f"Query {execution_cursor.get_managed_operation().get_operation()} debug info:"
129+
f"summary: {c.get_summary()}, profile: {c.get_profile()}")
130+
131+
132+
:param execution_cursor: ExecutionCursor
133+
A PEP249Cursor implementation purposed for actual query execution.
134+
"""
135+
pass
136+
71137
@hookspec
72138
def db_connection_recover_operation(self, recovery_cursor: RecoveryCursor) -> None:
73139
"""

projects/vdk-core/src/vdk/api/plugin/hook_markers.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@
2727
The method name must match one of those defined as hookspec
2828
The method signature - the arguments need to be subset of arguments defined in hookspec
2929
30+
If any hookimpl errors with an exception no further callbacks are invoked
31+
and the exception is packaged up and delivered to any wrappers
32+
before being re-raised at the hook invocation point.
33+
3034
Plugin execution order can be configured in the decorator with following variables:
3135
3236
If ``optionalhook`` is ``True`` a missing matching hook specification will not result
@@ -55,5 +59,7 @@ def func():
5559
after_other_hooks_executed(outcome.get_result())
5660
outcome.force_result(new_res) # set new return result
5761
62+
For more details see https://pluggy.readthedocs.io
63+
5864
"""
5965
hookimpl = pluggy.HookimplMarker(GROUP_NAME)
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# Copyright 2021 VMware, Inc.
2+
# SPDX-License-Identifier: Apache-2.0
3+
from typing import Any
4+
from typing import cast
5+
6+
from vdk.api.plugin.connection_hook_spec import ConnectionHookSpec
7+
from vdk.api.plugin.hook_markers import hookimpl
8+
from vdk.api.plugin.plugin_registry import IPluginRegistry
9+
from vdk.internal.builtin_plugins.connection.execution_cursor import (
10+
ExecuteOperationResult,
11+
)
12+
from vdk.internal.builtin_plugins.connection.execution_cursor import ExecutionCursor
13+
from vdk.internal.core.errors import ErrorMessage
14+
from vdk.internal.core.errors import PlatformServiceError
15+
16+
17+
class DefaultConnectionHookImpl:
18+
"""
19+
The default implementation of execute operation.
20+
Generally it should not be overridden.
21+
To "override" with new implementation, make a new hook which returns non-None ExecuteOperationResult.
22+
See ConnectionHookSpec documentation for more details.
23+
"""
24+
25+
@hookimpl(trylast=True)
26+
def db_connection_execute_operation(self, execution_cursor: ExecutionCursor) -> Any:
27+
managed_operation = execution_cursor.get_managed_operation()
28+
native_cursor = execution_cursor.get_native_cursor()
29+
if managed_operation.get_parameters():
30+
native_result = native_cursor.execute(
31+
managed_operation.get_operation(), managed_operation.get_parameters()
32+
)
33+
else:
34+
native_result = native_cursor.execute(managed_operation.get_operation())
35+
return ExecuteOperationResult(native_result)
36+
37+
38+
class ConnectionHookSpecFactory:
39+
"""
40+
Class used to create properly initialized ConnectionHookSpec instance to use to execute the underlying hooks
41+
"""
42+
43+
def __init__(self, plugin_registry: IPluginRegistry):
44+
self.__plugin_registry = plugin_registry
45+
46+
def get_connection_hook_spec(self) -> ConnectionHookSpec:
47+
"""
48+
Returns ConnectionHookSpec class which would act as a relay and invoke the underlying implemented hooks
49+
It's generally a ConnectionHookSpec cast of plugin_registry.PluginHookRelay to enable easier hook invocations.
50+
It also initializes some of the hooks (now only db_connection_execute_operation) with default implementations.
51+
:return: ConnectionHookSpec
52+
"""
53+
if self.__plugin_registry:
54+
if not self.__plugin_registry.has_plugin(
55+
DefaultConnectionHookImpl.__name__
56+
):
57+
self.__plugin_registry.load_plugin_with_hooks_impl(
58+
DefaultConnectionHookImpl(), DefaultConnectionHookImpl.__name__
59+
)
60+
return cast(ConnectionHookSpec, self.__plugin_registry.hook())
61+
else:
62+
raise PlatformServiceError(
63+
ErrorMessage(
64+
"Managed Cursor not initialized properly",
65+
"Cannot connect to database using vdk managed cursor",
66+
"Plugin registry is not initialized. That seems like a bug.",
67+
"Without plugin registry the connection cannot be started",
68+
"Open a vdk github issue "
69+
"and/or revert to previous version of vdk-core.",
70+
)
71+
)
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# Copyright 2021 VMware, Inc.
2+
# SPDX-License-Identifier: Apache-2.0
3+
import logging
4+
from typing import Any
5+
6+
from vdk.internal.builtin_plugins.connection.decoration_cursor import ManagedOperation
7+
from vdk.internal.builtin_plugins.connection.pep249.interfaces import PEP249Cursor
8+
9+
10+
class ExecuteOperationResult:
11+
def __init__(self, native_result: Any):
12+
self.__native_result = native_result
13+
14+
def get_native_result(self) -> Any:
15+
"""
16+
Returns the result of PEP249Cursor.execute invocation using the native cursor.
17+
Since this is vendor specific the result type is any - but usually it's number of selected or updated rows.
18+
:return: Any
19+
"""
20+
return self.__native_result
21+
22+
23+
class ExecutionCursor(PEP249Cursor):
24+
"""
25+
Extends PEP249Cursor to provide:
26+
* ability to directly access and execute operations with the native cursor.
27+
Generally it should be used only if default implementation does not work (which should be almost never)
28+
or more likely - to use some vendor specific features.
29+
30+
See connection_hook_spec#db_connection_execute_operation for more details and examples how to use it.
31+
"""
32+
33+
def __init__(
34+
self,
35+
native_cursor: PEP249Cursor,
36+
managed_operation: ManagedOperation,
37+
log=logging.getLogger(__name__),
38+
):
39+
super().__init__(native_cursor, log)
40+
self.__managed_operation = managed_operation
41+
self.__native_cursor = native_cursor
42+
43+
def get_native_cursor(self) -> PEP249Cursor:
44+
"""
45+
Get the underlying native cursor. Used to actually execute the operation.
46+
Generally should not be accessed directly.
47+
But it's useful to be exposed in case the native library used provide some extra features.
48+
For example Impala (impyla driver) provides a way to get the profile of a query after it's executed
49+
which can be printed to aid debugging.
50+
51+
Check the example usages in docstring of connection_hook_spec#db_connection_execute_operation .
52+
53+
:return: PEP249Cursor
54+
the underlying native cursor being managed.
55+
"""
56+
return self.__native_cursor
57+
58+
def get_managed_operation(self) -> ManagedOperation:
59+
"""
60+
Retrieve an object that contains information about the query and query parameters used in
61+
the database operation. The retrieved Data Transfer Object (DTO) is purposed
62+
to curate the query and parameters.
63+
64+
:return: ManagedOperation
65+
Query and parameters DTO
66+
"""
67+
return self.__managed_operation

projects/vdk-core/src/vdk/internal/builtin_plugins/connection/impl/router.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@
55
from typing import Dict
66
from typing import Union
77

8-
from vdk.api.plugin.connection_hook_spec import (
9-
ConnectionHookSpec,
10-
)
118
from vdk.api.plugin.plugin_input import IManagedConnectionRegistry
129
from vdk.internal.builtin_plugins.config.vdk_config import DB_DEFAULT_TYPE
10+
from vdk.internal.builtin_plugins.connection.connection_hooks import (
11+
ConnectionHookSpecFactory,
12+
)
1313
from vdk.internal.builtin_plugins.connection.impl.wrapped_connection import (
1414
WrappedConnection,
1515
)
@@ -29,9 +29,13 @@ class ManagedConnectionRouter(IManagedConnectionRegistry):
2929
In both cases dbtype must match the string in which the plugin register itself with.
3030
"""
3131

32-
def __init__(self, cfg: Configuration, connection_hook_spec: ConnectionHookSpec):
32+
def __init__(
33+
self,
34+
cfg: Configuration,
35+
connection_hook_spec_factory: ConnectionHookSpecFactory,
36+
):
3337
self._cfg: Configuration = cfg
34-
self._connection_hook_spec = connection_hook_spec
38+
self._connection_hook_spec_factory = connection_hook_spec_factory
3539
self._log: logging.Logger = logging.getLogger(__name__)
3640
self._connections: Dict[str, ManagedConnectionBase] = dict()
3741
self._connection_builders: Dict[
@@ -94,8 +98,8 @@ def __create_connection(self, dbtype):
9498
conn = self._connection_builders[dbtype]()
9599
if isinstance(conn, ManagedConnectionBase):
96100
self._connections[dbtype] = conn
97-
if not conn._connection_hook_spec:
98-
conn._connection_hook_spec = self._connection_hook_spec
101+
if not conn._connection_hook_spec_factory:
102+
conn._connection_hook_spec_factory = self._connection_hook_spec_factory
99103
elif conn is None:
100104
errors.log_and_throw(
101105
to_be_fixed_by=errors.ResolvableBy.CONFIG_ERROR,
@@ -111,6 +115,8 @@ def __create_connection(self, dbtype):
111115
log = logging.getLogger(conn.__class__.__name__)
112116
conn.close() # we will let ManagedConnection to open it when needed.
113117
self._connections[dbtype] = WrappedConnection(
114-
log, self._connection_builders[dbtype], self._connection_hook_spec
118+
log,
119+
self._connection_builders[dbtype],
120+
self._connection_hook_spec_factory,
115121
)
116122
return self._connections[dbtype]

projects/vdk-core/src/vdk/internal/builtin_plugins/connection/impl/wrapped_connection.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
from typing import Any
55
from typing import Callable
66

7-
from vdk.api.plugin.connection_hook_spec import (
8-
ConnectionHookSpec,
7+
from vdk.internal.builtin_plugins.connection.connection_hooks import (
8+
ConnectionHookSpecFactory,
99
)
1010
from vdk.internal.builtin_plugins.connection.managed_connection_base import (
1111
ManagedConnectionBase,
@@ -22,17 +22,17 @@ def __init__(
2222
self,
2323
log: logging.Logger,
2424
new_connection_builder_function: Callable[[], PEP249Connection],
25-
connection_hook_spec: ConnectionHookSpec,
25+
connection_hook_spec_factory: ConnectionHookSpecFactory,
2626
) -> None:
2727
"""
2828
:param new_connection_builder_function: method that returns a new (e.g. SAP Hana) connection
2929
Example
3030
def connection() -> ManagedConnectionBase:
3131
db = pyhdb.connect(host='hana-prod-d1.northpole.com', port=30015, user='claus', password='hohoho')
3232
return db
33-
:param connection_hook_spec: connection hook implementations from plugins
33+
:param connection_hook_spec_factory: ConnectionHookSpecFactory
3434
"""
35-
super().__init__(log, None, connection_hook_spec)
35+
super().__init__(log, None, connection_hook_spec_factory)
3636
self._log = logging.getLogger(__name__)
3737
self._new_connection_builder_function = new_connection_builder_function
3838

projects/vdk-core/src/vdk/internal/builtin_plugins/connection/managed_connection_base.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
from tenacity import stop_after_attempt
1717
from tenacity import wait_exponential
1818
from vdk.api.job_input import IManagedConnection
19-
from vdk.api.plugin.connection_hook_spec import (
20-
ConnectionHookSpec,
19+
from vdk.internal.builtin_plugins.connection.connection_hooks import (
20+
ConnectionHookSpecFactory,
2121
)
2222
from vdk.internal.builtin_plugins.connection.managed_cursor import ManagedCursor
2323
from vdk.internal.builtin_plugins.connection.pep249.interfaces import PEP249Connection
@@ -40,7 +40,7 @@ def __init__(
4040
self,
4141
log: logging.Logger = logger,
4242
db_con: Optional[PEP249Connection] = None,
43-
connection_hook_spec: ConnectionHookSpec = None,
43+
connection_hook_spec_factory: ConnectionHookSpecFactory = None,
4444
):
4545
"""
4646
this constructor MUST be called by inheritors
@@ -51,7 +51,7 @@ def __init__(
5151
self._log = logging.getLogger(__name__)
5252
self._is_db_con_open: bool = db_con is not None
5353
self._db_con: Optional[PEP249Connection] = db_con
54-
self._connection_hook_spec: ConnectionHookSpec = connection_hook_spec
54+
self._connection_hook_spec_factory = connection_hook_spec_factory
5555

5656
def __getattr__(self, attr):
5757
"""
@@ -96,13 +96,14 @@ def method(*args, **kwargs):
9696
)
9797
def connect(self) -> PEP249Connection:
9898
"""
99-
:return: PEP249 Connection object (unmanaged)
99+
:return: PEP249 Connection object (managed)
100100
"""
101101
if not self._is_db_con_open:
102102
db_con = self._connect()
103103
self._log.debug(f"Established {str(db_con)}")
104104
self._is_db_con_open = True
105105
self._db_con = db_con
106+
106107
return self
107108

108109
# def get_managed_connection(self) -> PEP249Connection:
@@ -160,7 +161,7 @@ def cursor(self, *args, **kwargs):
160161
return ManagedCursor(
161162
self._db_con.cursor(*args, **kwargs),
162163
self._log,
163-
self._connection_hook_spec,
164+
self._connection_hook_spec_factory,
164165
)
165166
return super().cursor()
166167

0 commit comments

Comments
 (0)