Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
381939c
Merge pull request #1 from nils-braun/main
rajagurunath Sep 22, 2020
3e106e3
Merge branch 'nils-braun:main' into main
rajagurunath May 22, 2021
4a9ee68
ML model improvement : Adding "SHOW MODELS and DESCRIBE MODEL"
May 23, 2021
f640ccc
fix typo
rajagurunath May 24, 2021
ae76edd
ML model improvement : added EXPORT MODEL
rajagurunath May 26, 2021
a484136
ML model improvement : refactoring for PR
rajagurunath May 28, 2021
6e1709e
ML model improvement : Adding stmts in notebook
rajagurunath May 28, 2021
d15657d
ML model improvement : Adding stmts in notebook
rajagurunath May 28, 2021
4a3a9ee
ML model improvement : also test the non-happy path
nils-braun May 28, 2021
c722865
ML model improvement : Added mlflow and <With> in sql for extra params
rajagurunath May 30, 2021
d0feef3
ML model improvement : Added mlflow and <With> in sql for extra params
rajagurunath May 30, 2021
3a972da
Merge branch 'feature/export-ml-models' into feature/export-model
rajagurunath May 30, 2021
e31956e
Added Test cases for Export MODEL
rajagurunath Jun 10, 2021
17a1514
Added ML documentation about the following:
rajagurunath Jun 10, 2021
fbf93cb
Merge pull request #3 from rajagurunath/feature/export-model
rajagurunath Jun 10, 2021
eb40f04
FETCH upstream changes
rajagurunath Jun 11, 2021
de7c993
Merge branch 'nils-braun-main' into main
rajagurunath Jun 11, 2021
8886d10
FETCH upstream changes for export model
rajagurunath Jun 11, 2021
e6f1e01
refactored based on PR
rajagurunath Jun 13, 2021
75888ce
Merge pull request #6 from nils-braun/main
rajagurunath Jun 15, 2021
2ef98ea
Added support only for sklearn compatible models
rajagurunath Jun 15, 2021
e173e24
Merge branch 'main' of https://github.com/rajagurunath/dask-sql into …
rajagurunath Jun 15, 2021
d3f03e1
excluded mlflow part from code coverage
rajagurunath Jun 15, 2021
6b7fa8e
install mlflow in test cluster
rajagurunath Jun 16, 2021
fff8dc4
Added test for non sklearn compatible model
rajagurunath Jun 16, 2021
ddaa0c0
get export model from main repo
rajagurunath Jun 17, 2021
e2bac80
Merge branch 'main' of https://github.com/nils-braun/dask-sql into ni…
rajagurunath Jun 22, 2021
a6512c3
upstream merge
rajagurunath Jun 22, 2021
9028003
Merge branch 'nils-braun:main' into main
rajagurunath Jul 21, 2021
aadf500
Merge branch 'nils-braun:main' into main
rajagurunath Aug 12, 2021
cd5f746
Merge branch 'dask-contrib:main' into main
rajagurunath Aug 14, 2021
2553002
Merge branch 'dask-contrib:main' into main
rajagurunath Aug 20, 2021
faef6da
Merge branch 'dask-contrib:main' into main
rajagurunath Aug 23, 2021
d917d35
Merge branch 'dask-contrib:main' into main
rajagurunath Aug 27, 2021
e94fe7c
improve dask-sql-cli:
rajagurunath Aug 29, 2021
3b8d662
Added
rajagurunath Sep 1, 2021
46f48ec
Merge branch 'dask-contrib:main' into feature/improve_cli
rajagurunath Sep 1, 2021
fceacbb
Merge branch 'dask-contrib:main' into feature/improve_cli
rajagurunath Oct 12, 2021
a73eb8c
Merge branch 'dask-contrib:main' into feature/improve_cli
rajagurunath Oct 20, 2021
87b306c
Added PR suggestions and lint fix
rajagurunath Oct 20, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions continuous_integration/recipe/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ requirements:
- prompt_toolkit >=3.0.8
- pygments >=2.7.3
- nest-asyncio >=1.0.0
- tabulate >=0.8.9
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noting that we might change this pinning in a follow up once we do our next release - I want to match the recipe on conda-forge, so once this is added to the auto-generated recipe there I'll grab whatever pinning it has there (which may be the same). This should be good for now though 🙂


test:
commands:
Expand Down
133 changes: 125 additions & 8 deletions dask_sql/cmd.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
import logging
import os
import sys
import tempfile
import traceback
from argparse import ArgumentParser
from functools import partial
from typing import Union

import pandas as pd
from dask.datasets import timeseries
from dask.distributed import Client
from dask.distributed import Client, as_completed
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
from prompt_toolkit.completion import WordCompleter
from prompt_toolkit.history import FileHistory
from prompt_toolkit.shortcuts import ProgressBar
from pygments.lexers.sql import SqlLexer

try:
Expand All @@ -17,6 +25,10 @@

from dask_sql.context import Context

meta_command_completer = WordCompleter(
["\\l", "\\d?", "\\dt", "\\df", "\\de", "\\dm", "\\conninfo", "quit"]
)


class CompatiblePromptSession:
"""
Expand All @@ -32,17 +44,106 @@ class CompatiblePromptSession:
"""

def __init__(self, lexer) -> None: # pragma: no cover
# make sure everytime dask-sql uses same history file
kwargs = {
"lexer": lexer,
"history": FileHistory(
os.path.join(tempfile.gettempdir(), "dask-sql-history")
),
"auto_suggest": AutoSuggestFromHistory(),
"completer": meta_command_completer,
}
try:
# Version >= 2.0.1: we can use the session object
from prompt_toolkit import PromptSession

session = PromptSession(lexer=lexer)
session = PromptSession(**kwargs)
self.prompt = session.prompt
except ImportError:
# Version < 2.0: there is no session object
from prompt_toolkit.shortcuts import prompt

self.prompt = partial(prompt, lexer=lexer)
self.prompt = partial(prompt, **kwargs)


def _display_markdown(content, **kwargs):
df = pd.DataFrame(content, **kwargs)
print(df.to_markdown(tablefmt="fancy_grid"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, I love this - looks really good.
Unfortunately, it needs an additional package (tabulate). I see three possibilities:

  • try it, and on failure do a normal print
  • try it, and on failure do a normal print and print a warning
  • add the package to setup.py and make it a dependency.

I would favor 2 or 3 (with a small tendency towards 3), but its your choice!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tabulate in setup.py basically picked third option



def _parse_meta_command(sql):
command, _, arg = sql.partition(" ")
return command, arg.strip()


def _meta_commands(sql: str, context: Context, client: Client) -> Union[bool, Client]:
"""
parses metacommands and prints their result
returns True if meta commands detected
"""
cmd, schema_name = _parse_meta_command(sql)
available_commands = [
["\\l", "List schemas"],
["\\d?, help, ?", "Show available commands"],
["\\conninfo", "Show Dask cluster info"],
["\\dt [schema]", "List tables"],
["\\df [schema]", "List functions"],
["\\dm [schema]", "List models"],
["\\de [schema]", "List experiments"],
["\\dss [schema]", "Switch schema"],
["\\dsc [dask scheduler address]", "Switch Dask cluster"],
["quit", "Quits dask-sql-cli"],
]
if cmd == "\\dsc":
# Switch Dask cluster
_, scheduler_address = _parse_meta_command(sql)
client = Client(scheduler_address)
return client # pragma: no cover
schema_name = schema_name or context.schema_name
if cmd == "\\d?" or cmd == "help" or cmd == "?":
_display_markdown(available_commands, columns=["Commands", "Description"])
elif cmd == "\\l":
_display_markdown(context.schema.keys(), columns=["Schemas"])
elif cmd == "\\dt":
_display_markdown(context.schema[schema_name].tables.keys(), columns=["Tables"])
elif cmd == "\\df":
_display_markdown(
context.schema[schema_name].functions.keys(), columns=["Functions"]
)
elif cmd == "\\de":
_display_markdown(
context.schema[schema_name].experiments.keys(), columns=["Experiments"]
)
elif cmd == "\\dm":
_display_markdown(context.schema[schema_name].models.keys(), columns=["Models"])
elif cmd == "\\conninfo":
cluster_info = [
["Dask scheduler", client.scheduler.__dict__["addr"]],
["Dask dashboard", client.dashboard_link],
["Cluster status", client.status],
["Dask workers", len(client.cluster.workers)],
]
_display_markdown(
cluster_info, columns=["components", "value"]
) # pragma: no cover
elif cmd == "\\dss":
if schema_name in context.schema:
context.schema_name = schema_name
else:
print(f"Schema {schema_name} not available")
elif cmd == "quit":
print("Quitting dask-sql ...")
client.close() # for safer side
sys.exit()
elif cmd.startswith("\\"):
print(
f"The meta command {cmd} not available, please use commands from below list"
)
_display_markdown(available_commands, columns=["Commands", "Description"])
else:
# nothing detected probably not a meta command
return False
return True


def cmd_loop(
Expand Down Expand Up @@ -103,11 +204,27 @@ def cmd_loop(
if not text:
continue

try:
df = context.sql(text, return_futures=False)
print(df)
except Exception:
traceback.print_exc()
meta_command_detected = _meta_commands(text, context=context, client=client)
if isinstance(meta_command_detected, Client):
client = meta_command_detected

if not meta_command_detected:
try:
df = context.sql(text, return_futures=True)
if df is not None: # some sql commands returns None
df = df.persist()
# Now turn it into a list of futures
futures = client.futures_of(df)
with ProgressBar() as pb:
for _ in pb(
as_completed(futures), total=len(futures), label="Executing"
):
continue
df = df.compute()
print(df.to_markdown(tablefmt="fancy_grid"))

except Exception:
traceback.print_exc()


def main(): # pragma: no cover
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def run(self):
"tzlocal>=2.1",
"prompt_toolkit",
"pygments",
"tabulate",
"nest-asyncio",
# backport for python versions without importlib.metadata
"importlib_metadata; python_version < '3.8.0'",
Expand Down
137 changes: 137 additions & 0 deletions tests/integration/test_cmd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import pytest
from mock import MagicMock, patch
from prompt_toolkit.application import create_app_session
from prompt_toolkit.input import create_pipe_input
from prompt_toolkit.output import DummyOutput
from prompt_toolkit.shortcuts import PromptSession

from dask_sql.cmd import _meta_commands


@pytest.fixture(autouse=True, scope="function")
def mock_prompt_input():
pipe_input = create_pipe_input()
try:
with create_app_session(input=pipe_input, output=DummyOutput()):
yield pipe_input
finally:
pipe_input.close()


def _feed_cli_with_input(
text,
editing_mode=None,
clipboard=None,
history=None,
multiline=False,
check_line_ending=True,
key_bindings=None,
):
"""
Create a Prompt, feed it with the given user input and return the CLI
object.
This returns a (result, Application) tuple.
"""
# If the given text doesn't end with a newline, the interface won't finish.
if check_line_ending:
assert text.endswith("\r")

inp = create_pipe_input()

try:
inp.send_text(text)
session = PromptSession(
input=inp,
output=DummyOutput(),
editing_mode=editing_mode,
history=history,
multiline=multiline,
clipboard=clipboard,
key_bindings=key_bindings,
)

result = session.prompt()
return session.default_buffer.document, session.app

finally:
inp.close()


def test_meta_commands(c, client, capsys):
_meta_commands("?", context=c, client=client)
captured = capsys.readouterr()
assert "Commands" in captured.out

_meta_commands("help", context=c, client=client)
captured = capsys.readouterr()
assert "Commands" in captured.out

_meta_commands("\\d?", context=c, client=client)
captured = capsys.readouterr()
assert "Commands" in captured.out

_meta_commands("\\l", context=c, client=client)
captured = capsys.readouterr()
assert "Schemas" in captured.out

_meta_commands("\\dt", context=c, client=client)
captured = capsys.readouterr()
assert "Tables" in captured.out

_meta_commands("\\dm", context=c, client=client)
captured = capsys.readouterr()
assert "Models" in captured.out

_meta_commands("\\df", context=c, client=client)
captured = capsys.readouterr()
assert "Functions" in captured.out

_meta_commands("\\de", context=c, client=client)
captured = capsys.readouterr()
assert "Experiments" in captured.out

c.create_schema("test_schema")
_meta_commands("\\dss test_schema", context=c, client=client)
assert c.schema_name == "test_schema"

_meta_commands("\\dss not_exists", context=c, client=client)
captured = capsys.readouterr()
assert "Schema not_exists not available\n" == captured.out

with pytest.raises(
OSError,
match="Timed out during handshake while "
"connecting to tcp://localhost:8787 after 5 s",
):
client = _meta_commands("\\dsc localhost:8787", context=c, client=client)
assert client.scheduler.__dict__["addr"] == "localhost:8787"


def test_connection_info(c, client, capsys):
dummy_client = MagicMock()
dummy_client.scheduler.__dict__["addr"] = "somewhereonearth:8787"
dummy_client.cluster.worker = ["worker1", "worker2"]

_meta_commands("\\conninfo", context=c, client=dummy_client)
captured = capsys.readouterr()
assert "somewhereonearth" in captured.out


def test_quit(c, client, capsys):
with patch("sys.exit", return_value=lambda: "exit"):
_meta_commands("quit", context=c, client=client)
captured = capsys.readouterr()
assert captured.out == "Quitting dask-sql ...\n"


def test_non_meta_commands(c, client, capsys):
_meta_commands("\\x", context=c, client=client)
captured = capsys.readouterr()
assert (
"The meta command \\x not available, please use commands from below list"
in captured.out
)

res = _meta_commands("Select 42 as answer", context=c, client=client)
captured = capsys.readouterr()
assert res is False