Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions dask_sql/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from uvicorn import Config, Server

from dask_sql.context import Context
from dask_sql.server.presto_jdbc import create_meta_data
from dask_sql.server.responses import DataResults, ErrorResults, QueryResults

app = FastAPI()
Expand Down Expand Up @@ -74,6 +75,12 @@ async def query(request: Request):
"""
try:
sql = (await request.body()).decode().strip()
# required for PrestoDB JDBC driver compatibility
# replaces queries to unsupported `system` catalog with queries to `system_jdbc`
# schema created by `create_meta_data(context)` when `jdbc_metadata=True`
# TODO: explore Trino which should make JDBC compatibility easier but requires
# changing response headers (see https://github.com/dask-contrib/dask-sql/pull/351)
sql = sql.replace("system.jdbc", "system_jdbc")
df = request.app.c.sql(sql)

if df is None:
Expand Down Expand Up @@ -102,6 +109,7 @@ def run_server(
startup=False,
log_level=None,
blocking: bool = True,
jdbc_metadata: bool = False,
): # pragma: no cover
"""
Run a HTTP server for answering SQL queries using ``dask-sql``.
Expand All @@ -128,6 +136,8 @@ def run_server(
log_level: (:obj:`str`): The log level of the server and dask-sql
blocking: (:obj:`bool`): If running in an environment with an event loop (e.g. a jupyter notebook),
do not block. The server can be stopped with `context.stop_server()` afterwards.
jdbc_metadata: (:obj:`bool`): If enabled create JDBC metadata tables using schemas and tables in
the current dask_sql context

Example:
It is possible to run an SQL server by using the CLI script ``dask-sql-server``
Expand Down Expand Up @@ -179,6 +189,8 @@ def run_server(

"""
_init_app(app, context=context, client=client)
if jdbc_metadata:
create_meta_data(context)

if startup:
app.c.sql("SELECT 1 + 1").compute()
Expand Down
149 changes: 149 additions & 0 deletions dask_sql/server/presto_jdbc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import logging

import pandas as pd

from dask_sql.context import Context

logger = logging.getLogger(__name__)


def create_meta_data(c: Context):
"""
Creates the schema, table and column data for prestodb JDBC driver so that data can be viewed
in a database tool like DBeaver. It doesn't create a catalog entry although JDBC expects one
as dask-sql doesn't support catalogs. For both catalogs and procedures empty placeholder
tables are created.

The meta-data appears in a separate schema called system_jdbc largely because the JDBC driver
tries to access system.jdbc and it sufficiently so shouldn't clash with other schemas.

A function is required in the /v1/statement to change system.jdbc to system_jdbc and ignore
order by statements from the driver (as adjust_for_presto_sql above)

:param c: Context containing created tables
:return:
"""

if c is None:
logger.warn("Context None: jdbc meta data not created")
return
catalog = ""
system_schema = "system_jdbc"
c.create_schema(system_schema)

# TODO: add support for catalogs in presto interface
# see https://github.com/dask-contrib/dask-sql/pull/351
# if catalog and len(catalog.strip()) > 0:
# catalogs = pd.DataFrame().append(create_catalog_row(catalog), ignore_index=True)
# c.create_table("catalogs", catalogs, schema_name=system_schema)

schemas = pd.DataFrame().append(create_schema_row(), ignore_index=True)
c.create_table("schemas", schemas, schema_name=system_schema)
schema_rows = []

tables = pd.DataFrame().append(create_table_row(), ignore_index=True)
c.create_table("tables", tables, schema_name=system_schema)
table_rows = []

columns = pd.DataFrame().append(create_column_row(), ignore_index=True)
c.create_table("columns", columns, schema_name=system_schema)
column_rows = []

for schema_name, schema in c.schema.items():
schema_rows.append(create_schema_row(catalog, schema_name))
for table_name, dc in schema.tables.items():
df = dc.df
logger.info(f"schema ${schema_name}, table {table_name}, {df}")
table_rows.append(create_table_row(catalog, schema_name, table_name))
pos: int = 0
for column in df.columns:
pos = pos + 1
logger.debug(f"column {column}")
dtype = "VARCHAR"
if df[column].dtype == "int64" or df[column].dtype == "int":
dtype = "INTEGER"
elif df[column].dtype == "float64" or df[column].dtype == "float":
dtype = "FLOAT"
elif (
df[column].dtype == "datetime"
or df[column].dtype == "datetime64[ns]"
):
dtype = "TIMESTAMP"
column_rows.append(
create_column_row(
catalog,
schema_name,
table_name,
dtype,
df[column].name,
str(pos),
)
)

schemas = pd.DataFrame(schema_rows)
c.create_table("schemas", schemas, schema_name=system_schema)
tables = pd.DataFrame(table_rows)
c.create_table("tables", tables, schema_name=system_schema)
columns = pd.DataFrame(column_rows)
c.create_table("columns", columns, schema_name=system_schema)

logger.info(f"jdbc meta data ready for {len(table_rows)} tables")


def create_catalog_row(catalog: str = ""):
return {"TABLE_CAT": catalog}


def create_schema_row(catalog: str = "", schema: str = ""):
return {"TABLE_CATALOG": catalog, "TABLE_SCHEM": schema}


def create_table_row(catalog: str = "", schema: str = "", table: str = ""):
return {
"TABLE_CAT": catalog,
"TABLE_SCHEM": schema,
"TABLE_NAME": table,
"TABLE_TYPE": "",
"REMARKS": "",
"TYPE_CAT": "",
"TYPE_SCHEM": "",
"TYPE_NAME": "",
"SELF_REFERENCING_COL_NAME": "",
"REF_GENERATION": "",
}


def create_column_row(
catalog: str = "",
schema: str = "",
table: str = "",
dtype: str = "",
column: str = "",
pos: str = "",
):
return {
"TABLE_CAT": catalog,
"TABLE_SCHEM": schema,
"TABLE_NAME": table,
"COLUMN_NAME": column,
"DATA_TYPE": dtype,
"TYPE_NAME": dtype,
"COLUMN_SIZE": "",
"BUFFER_LENGTH": "",
"DECIMAL_DIGITS": "",
"NUM_PREC_RADIX": "",
"NULLABLE": "",
"REMARKS": "",
"COLUMN_DEF": "",
"SQL_DATA_TYPE": dtype,
"SQL_DATETIME_SUB": "",
"CHAR_OCTET_LENGTH": "",
"ORDINAL_POSITION": pos,
"IS_NULLABLE": "",
"SCOPE_CATALOG": "",
"SCOPE_SCHEMA": "",
"SCOPE_TABLE": "",
"SOURCE_DATA_TYPE": "",
"IS_AUTOINCREMENT": "",
"IS_GENERATEDCOLUMN": "",
}
Loading