Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/pages/cmd.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ or by running these lines of code
cmd_loop()

Some options can be set, e.g. to preload some testdata.
Have a look into :func:`dask_sql.cmd_loop` or call
Have a look into :func:`~dask_sql.cmd_loop` or call

.. code-block:: bash

Expand Down
4 changes: 2 additions & 2 deletions docs/pages/custom.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Scalar Functions
----------------

A scalar function (such as :math:`x \to x^2`) turns a given column into another column of the same length.
It can be registered for usage in SQL with the :func:`dask_sql.Context.register_function` method.
It can be registered for usage in SQL with the :func:`~dask_sql.Context.register_function` method.

Example:

Expand All @@ -38,7 +38,7 @@ Aggregation Functions

Aggregation functions run on a single column and turn them into a single value.
This means they can only be used in ``GROUP BY`` aggregations.
They can be registered with the :func:`dask_sql.Context.register_aggregation` method.
They can be registered with the :func:`~dask_sql.Context.register_aggregation` method.
This time however, an instance of a :class:`dask.dataframe.Aggregation` needs to be passed
instead of a plain function.
More information on dask aggregations can be found in the
Expand Down
12 changes: 6 additions & 6 deletions docs/pages/data_input.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
Data Loading and Input
======================

Before data can be queried with ``dask-sql``, it needs to be loaded into the dask cluster (or local instance) and registered with the :class:`dask_sql.Context`.
Before data can be queried with ``dask-sql``, it needs to be loaded into the dask cluster (or local instance) and registered with the :class:`~dask_sql.Context`.
For this, ``dask-sql`` uses the wide field of possible `input formats <https://docs.dask.org/en/latest/dataframe-create.html>`_ of ``dask``, plus some additional formats only suitable for `dask-sql`.
You have multiple possibilities to load input data in ``dask-sql``:

1. Load it via python
-------------------------------

You can either use already created dask dataframes or create one by using the :func:`create_table` function.
You can either use already created dask dataframes or create one by using the :func:`~dask_sql.Context.create_table` function.
Chances are high, there exists already a function to load your favorite format or location (e.g. s3 or hdfs).
See below for all formats understood by ``dask-sql``.
Make sure to install required libraries both on the driver and worker machines.
Expand Down Expand Up @@ -58,7 +58,7 @@ In ``dask``, you can publish datasets with names into the cluster memory.
This allows to reuse the same data from multiple clients/users in multiple sessions.

For example, you can publish your data using the ``client.publish_dataset`` function of the ``distributed.Client``,
and then later register it in the :class:`dask_sql.Context` via SQL:
and then later register it in the :class:`~dask_sql.Context` via SQL:

.. code-block:: python

Expand Down Expand Up @@ -93,7 +93,7 @@ Input Formats
* All formats and locations mentioned in `the Dask docu <https://docs.dask.org/en/latest/dataframe-create.html>`_, including csv, parquet, json.
Just pass in the location as string (and possibly the format, e.g. "csv" if it is not clear from the file extension).
The data can be from local disc or many remote locations (S3, hdfs, Azure Filesystem, http, Google Filesystem, ...) - just prefix the path with the matching protocol.
Additional arguments passed to :func:`create_table` or ``CREATE TABLE`` are given to the ``read_<format>`` calls.
Additional arguments passed to :func:`~dask_sql.Context.create_table` or ``CREATE TABLE`` are given to the ``read_<format>`` calls.

Example:

Expand All @@ -113,7 +113,7 @@ Input Formats
)

* If your data is already in Pandas (or Dask) DataFrames format, you can just use it as it is via the Python API
by giving it to :ref:`create_table` directly.
by giving it to :func:`~dask_sql.Context.create_table` directly.
* You can connect ``dask-sql`` to an `intake <https://intake.readthedocs.io/en/latest/index.html>`_ catalog and
use the data registered there. Assuming you have an intake catalog stored in "catalog.yaml" (can also be
the URL of an intake server), you can read in a stored table "data_table" either via Python
Expand Down Expand Up @@ -161,7 +161,7 @@ Input Formats
c.create_table("my_data", cursor, hive_table_name="the_name_in_hive")

Again, ``hive_table_name`` is optional and defaults to the table name in ``dask-sql``.
You can also control the database used in Hive via the ``hive_schema_name```parameter.
You can also control the database used in Hive via the ``hive_schema_name`` parameter.
Additional arguments are pushed to the internally called ``read_<format>`` functions.

.. note::
Expand Down
118 changes: 113 additions & 5 deletions docs/pages/how_does_it_work.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,116 @@ At the core, ``dask-sql`` does two things:
which is specified as a tree of java objects - similar to many other SQL engines (Hive, Flink, ...)
- convert this description of the query from java objects into dask API calls (and execute them) - returning a dask dataframe.

For the first step, Apache Calcite needs to know about the columns and types of the dask dataframes,
therefore some java classes to store this information for dask dataframes are defined in ``planner``.
After the translation to a relational algebra is done (using ``RelationalAlgebraGenerator.getRelationalAlgebra``),
the python methods defined in ``dask_sql.physical`` turn this into a physical dask execution plan by converting
each piece of the relational algebra one-by-one.
Th following example explains this in quite some technical details.
For most of the users, this level of technical understanding is not needed.

1. SQL enters the library
-------------------------

No matter of via the Python API (:ref:`api`), the command line client (:ref:`cmd`) or the server (:ref:`server`), eventually the SQL statement by the user will end up as a string in the function :func:`~dask_sql.Context.sql`.

2. SQL is parsed
----------------

This function will first give the SQL string to the implemented Java classes (especially :class:`RelationalAlgebraGenerator`) via the ``jpype`` library.
Inside this class, Apache Calcite is used to first parse the SQL string and then turn it into a relational algebra.
For this, Apache Calcite uses the SQL language description specified in the Calcite library itself and the additional definitions in the ``.ftl```files in the ``dask-sql`` repository.
They specify custom language features, such as the ``CREATE MODEL`` statement.

.. note::

``.ftl`` stands for FreeMarker Template Language and is one of the standard templating languages used in the Java ecosystem.
Each of the "functions" defined in the documents defines a part of the (extended) SQL language in ``javacc`` format.
FreeMarker is used to combine these parser definitions with the ones from Apache Calcite. Have a look into the ``config.fmpp`` file for more information.

For example the following ``javacc`` code

.. code-block::

SqlNode SqlShowTables() :
{
final Span s;
final SqlIdentifier schema;
}
{
<SHOW> { s = span(); } <TABLES> <FROM>
schema = CompoundIdentifier()
{
return new SqlShowTables(s.end(this), schema);
}
}

describes a parser line, which understands SQL statements such as

.. code-block:: sql

SHOW TABLES FROM "schema"

While parsing the SQL, they are turned into an instance of the Java class :class:`SqlShowTables` (which is also defined in this project).
The :class:`Span` is used internally in Apache Calcite to store the position in the parsed SQL statement (e.g. for better error output).
The ``SqlShowTables`` javacc function (not the Java class SqlShowTables) is listed in ``config.fmpp`` as a ``statementParserMethods``, which makes it parsable as main SQL statement (similar to any normal ``SELECT ...`` statement).
All Java classes used as parser return values inherit from the Calcite class :class:`SqlNode` or any derived subclass (if it makes sense). Those classes are barely containers to store the information from the parsed SQL statements (such as the schema name in the example above) and do not have any business logic by themselves.

3. SQL is (maybe) optimized
---------------------------

Once the SQL string is parsed into an instance of a :class:`SqlNode` (or a subclass of it), Apache Calcite can convert it into a relational algebra and optimize it. As this is only implemented for Calcite-own classes (and not for the custom classes such as :class:`SqlCreateModel`) this conversion and optimization is not triggered for all SQL statements (have a look into :func:`Context._get_ral`).

After optimization, the resulting Java instance will be a class of any of the :class:`Logical*` classes in Apache Calcite (such as :class:`LogicalJoin`). Each of those can contain other instances as "inputs" creating a tree of different steps in the SQL statement (see below for an example).

So after all, the result is either an optimized tree of steps in the relational algebra (represented by instances of the :class:`Logical*` classes) or an instance of a :class:`SqlNode` (sub)class.

4. Translation to Dask API calls
--------------------------------

Depending on which type the resulting java class has, they are converted into calls to python functions using different python "converters". For each Java class, there exist a converter class in the ``dask_sql.physical.rel`` folder, which are registered at the :class:`dask_sql.physical.rel.convert.RelConverter` class.
Their job is to use the information stored in the java class instances and turn it into calls to python functions (see the example below for more information).

As many SQL statements contain calculations using literals and/or columns, these are split into their own functionality (``dask_sql.physical.rex``) following a similar plugin-based converter system.
Have a look into the specific classes to understand how the conversion of a specific SQL language feature is implemented.

5. Result
---------

The result of each of the conversions is a :class:`dask.DataFrame`, which is given to the user. In case of the command line tool or the SQL server, it is evaluated immediately - otherwise it can be used for further calculations by the user.

Example
-------

Let's walk through the steps above using the example SQL statement

.. code-block:: sql

SELECT x + y FROM timeseries WHERE x > 0

assuming the table "timeseries" is already registered.
If you want to follow along with the steps outlined in the following, start the command line tool in debug mode

.. code-block:: bash

dask-sql --load-test-data --startup --log-level DEBUG

and enter the SQL statement above.

First, the SQL is parsed by Apache Calcite and (as it is not a custom statement) transformed into a tree of relational algebra objects.

.. code-block:: none

LogicalProject(EXPR$0=[+($3, $4)])
LogicalFilter(condition=[>($3, 0)])
LogicalTableScan(table=[[schema, timeseries]])

The tree output above means, that the outer instance (:class:`LogicalProject`) needs as input the output of the previous instance (:class:`LogicalFilter`) etc.

Therefore the conversion to python API calls is called recursively (depth-first). First, the :class:`LogicalTableScan` is converted using the :class:`rel.logical.table_scan.LogicalTableScanPlugin` plugin. It will just get the correct :class:`dask.DataFrame` from the dictionary of already registered tables of the context.
Next, the :class:`LogicalFilter` (having the dataframe as input), is converted via the :class:`rel.logical.filter.LogicalFilterPlugin`.
The filter expression ``>($3, 0)`` is converted into ``df["x"] > 0`` using a combination of REX plugins (have a look into the debug output to learn more) and applied to the dataframe.
The resulting dataframe is then passed to the converter :class:`rel.logical.project.LogicalProjectPlugin` for the :class:`LogicalProject`.
This will calculate the expression ``df["x"] + df["y"]`` (after having converted it via the class:`RexCallPlugin` plugin) and return the final result to the user.

.. code-block:: python

df_table_scan = context.tables["timeseries"]
df_filter = df_table_scan[df_table_scan["x"] > 0]
df_project = df_filter.assign(col=df_filter["x"] + df_filter["y"])
return df_project[["col"]]
4 changes: 2 additions & 2 deletions docs/pages/machine_learning.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Please also see :ref:`ml` for more information on the SQL statements used on thi
-------------------------------------------------------------

If you are familiar with Python and the ML ecosystem in Python, this one is probably
the simplest possibility. You can use the :func:`Context.sql` call as described
the simplest possibility. You can use the :func:`~dask_sql.Context.sql` call as described
before to extract the data for your training or ML prediction.
The result will be a Dask dataframe, which you can either directly feed into your model
or convert to a pandas dataframe with `.compute()` before.
Expand Down Expand Up @@ -49,7 +49,7 @@ automatically. The syntax is similar to the `BigQuery Predict Syntax <https://cl
This call will first collect the data from the inner ``SELECT`` call (which can be any valid
``SELECT`` call, including ``JOIN``, ``WHERE``, ``GROUP BY``, custom tables and views etc.)
and will then apply the model with the name "my_model" for prediction.
The model needs to be registered at the context before using :func:`register_model`.
The model needs to be registered at the context before using :func:`~dask_sql.Context.register_model`.

.. code-block:: python

Expand Down
2 changes: 1 addition & 1 deletion docs/pages/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Read more on the data input part in :ref:`data_input`.
--------------------

If we want to work with the data in SQL, we need to give the data frame a unique name.
We do this by registering the data at an instance of a :class:`dask_sql.Context`.
We do this by registering the data at an instance of a :class:`~dask_sql.Context`.
Typically, you only have a single context per application.

.. code-block:: python
Expand Down
4 changes: 2 additions & 2 deletions docs/pages/server.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ or by using the created docker image
docker run --rm -it -p 8080:8080 nbraun/dask-sql

This will spin up a server on port 8080 (by default).
The port and bind interfaces can be controlled with the ``--port`` and ``--host`` command line arguments (or options to :func:`dask_sql.run_server`).
The port and bind interfaces can be controlled with the ``--port`` and ``--host`` command line arguments (or options to :func:`~dask_sql.run_server`).

The running server looks similar to a normal presto database to any presto client and can therefore be used
with any library, e.g. the `presto CLI client <https://prestosql.io/docs/current/installation/cli.html>`_ or
Expand Down Expand Up @@ -68,7 +68,7 @@ commands.
Preregister your own data sources
---------------------------------

The python function :func:`dask_sql.run_server` accepts an already created :class:`dask_sql.Context`.
The python function :func:`~dask_sql.run_server` accepts an already created :class:`~dask_sql.Context`.
This means you can preload your data sources and register them with a context before starting your server.
By this, your server will already have data to query:

Expand Down
4 changes: 2 additions & 2 deletions docs/pages/sql/ml.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ As all SQL statements in ``dask-sql`` are eventually converted to Python calls,
any custom Python function and library, e.g. Machine Learning libraries. Although it would be possible to
register custom functions (see :ref:`custom`) for this and use them, it is much more convenient if this functionality
is already included in the core SQL language.
These three statements help in training and using models. Every :class:`Context` has a registry for models, which
These three statements help in training and using models. Every :class:`~dask_sql.Context` has a registry for models, which
can be used for training or prediction.
For a full example, see :ref:`machine_learning`.

Expand Down Expand Up @@ -128,7 +128,7 @@ Predict the target using the given model and dataframe from the ``SELECT`` query
The return value is the input dataframe with an additional column named
"target", which contains the predicted values.
The model needs to be registered at the context before using it in this function,
either by calling :func:`Context.register_model` explicitly or by training
either by calling :func:`~dask_sql.Context.register_model` explicitly or by training
a model using the ``CREATE MODEL`` SQL statement above.

A model can be anything which has a ``predict`` function.
Expand Down