From 3e0d896b418ab77a46cd6333dd512d034b56c30e Mon Sep 17 00:00:00 2001 From: huangxingbo Date: Mon, 26 Apr 2021 10:47:07 +0800 Subject: [PATCH] [FLINK-22445][python][docs] Add more examples of row-based operations in PyFlink doc --- .../docs/dev/python/table/python_types.md | 4 +- .../dev/python/table/row_based_operations.md | 275 ++++++++++++++++++ docs/content.zh/docs/dev/table/tableApi.md | 9 +- .../docs/dev/python/table/python_types.md | 2 +- .../dev/python/table/row_based_operations.md | 275 ++++++++++++++++++ docs/content/docs/dev/table/tableApi.md | 9 +- 6 files changed, 559 insertions(+), 15 deletions(-) create mode 100644 docs/content.zh/docs/dev/python/table/row_based_operations.md create mode 100644 docs/content/docs/dev/python/table/row_based_operations.md diff --git a/docs/content.zh/docs/dev/python/table/python_types.md b/docs/content.zh/docs/dev/python/table/python_types.md index c65c646cc095b..b61989f119931 100644 --- a/docs/content.zh/docs/dev/python/table/python_types.md +++ b/docs/content.zh/docs/dev/python/table/python_types.md @@ -1,6 +1,6 @@ --- title: "数据类型" -weight: 31 +weight: 32 type: docs aliases: - /zh/dev/python/table-api-users-guide/python_types.html @@ -69,4 +69,4 @@ Python Table API的用户可以在Python Table API中,或者定义Python用户 | `ARRAY` | `list` | `numpy.ndarray` | | `MULTISET` | `list` | `Not Supported Yet` | | `MAP` | `dict` | `Not Supported Yet` | -| `ROW` | `Row` | `dict` | \ No newline at end of file +| `ROW` | `Row` | `dict` | diff --git a/docs/content.zh/docs/dev/python/table/row_based_operations.md b/docs/content.zh/docs/dev/python/table/row_based_operations.md new file mode 100644 index 0000000000000..e69ed9f363901 --- /dev/null +++ b/docs/content.zh/docs/dev/python/table/row_based_operations.md @@ -0,0 +1,275 @@ +--- +title: "Row-based Operations" +weight: 31 +type: docs +--- + + +# Row-based Operations + +This page describes how to use Row-based Operations in PyFlink Table API. + +## Map + +Performs a map operation with a python [general scalar function]({{< ref "docs/dev/python/table/udfs/python_udfs" >}}#scalar-functions) or [vectorized scalar function]({{< ref "docs/dev/python/table/udfs/vectorized_python_udfs" >}}#vectorized-scalar-functions). +The output will be flattened if the output type is a composite type. + +Note If you do not specify input args of your scalar function, all input args will be merged as a Row or Pandas.DataFrame. +```python +from pyflink.common import Row +from pyflink.table import EnvironmentSettings, TableEnvironment +from pyflink.table.expressions import col +from pyflink.table.types import DataTypes + +env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build() +table_env = TableEnvironment.create(env_settings) + +table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data']) + +# 1. Specify columns +@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()), + DataTypes.FIELD("data", DataTypes.STRING())])) +def func1(id: int, data: str) -> Row: + return Row(id, data * 2) + +table.map(func1(col('id'), col('data'))).to_pandas() +# result is +# _c0 _c1 +# 0 1 HiHi +# 1 2 HelloHello + + +# 2. Don't specify columns in general scalar function +@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()), + DataTypes.FIELD("data", DataTypes.STRING())])) +def func2(data: Row) -> Row: + return Row(data[0], data[1] * 2) + +table.map(func2).alias('id', 'data').to_pandas() +# result is +# id data +# 0 1 HiHi +# 1 2 HelloHello + +# 3. Don't specify columns in pandas scalar function +import pandas as pd +@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()), + DataTypes.FIELD("data", DataTypes.STRING())]), + func_type='pandas') +def func3(data: pd.DataFrame) -> pd.DataFrame: + res = pd.concat([data.id, data.data * 2], axis=1) + return res + +table.map(func3).alias('id', 'data').to_pandas() +# result is +# id data +# 0 1 HiHi +# 1 2 HelloHello +``` + +## FlatMap + +Performs a `flat_map` operation with a python [table function]({{< ref "docs/dev/python/table/udfs/python_udfs" >}}#table-functions). + +```python +from pyflink.common import Row +from pyflink.table.udf import udtf +from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment + +env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() +table_env = TableEnvironment.create(env_settings) + +table = table_env.from_elements([(1, 'Hi,Flink'), (2, 'Hello')], ['id', 'data']) + +@udtf(result_types=[DataTypes.INT(), DataTypes.STRING()]) +def split(x: Row) -> Row: + for s in x[1].split(","): + yield x[0], s + +# use table function split in `flat_map` +table.flat_map(split).to_pandas() +# result is +# f0 f1 +# 0 1 Hi +# 1 1 Flink +# 2 2 Hello + +# use table function in `join_lateral` or `left_outer_join_lateral` +table.join_lateral(split.alias('a', 'b')).to_pandas() +# result is +# id data a b +# 0 1 Hi,Flink 1 Hi +# 1 1 Hi,Flink 1 Flink +# 2 2 Hello 2 Hello +``` + +## Aggregate + +Performs an aggregate operation with a python general aggregate function or vectorized aggregate function. +You have to close the "aggregate" with a select statement and the select statement does not support aggregate functions. +The output of aggregate will be flattened if the output type is a composite type. + +Note If you do not specify input args of your aggregate function, all input args including group key will be merged as a Row or Pandas.DataFrame. + +```python +from pyflink.common import Row +from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment +from pyflink.table.expressions import col +from pyflink.table.udf import AggregateFunction, udaf + +class CountAndSumAggregateFunction(AggregateFunction): + + def get_value(self, accumulator): + return Row(accumulator[0], accumulator[1]) + + def create_accumulator(self): + return Row(0, 0) + + def accumulate(self, accumulator, *args): + accumulator[0] += 1 + accumulator[1] += args[0][1] + + def retract(self, accumulator, *args): + accumulator[0] -= 1 + accumulator[1] -= args[0][1] + + def merge(self, accumulator, accumulators): + for other_acc in accumulators: + accumulator[0] += other_acc[0] + accumulator[1] += other_acc[1] + + def get_accumulator_type(self): + return DataTypes.ROW( + [DataTypes.FIELD("a", DataTypes.BIGINT()), + DataTypes.FIELD("b", DataTypes.BIGINT())]) + + def get_result_type(self): + return DataTypes.ROW( + [DataTypes.FIELD("a", DataTypes.BIGINT()), + DataTypes.FIELD("b", DataTypes.BIGINT())]) + +function = CountAndSumAggregateFunction() +agg = udaf(function, + result_type=function.get_result_type(), + accumulator_type=function.get_accumulator_type(), + name=str(function.__class__.__name__)) + +# aggregate with a python general aggregate function + +env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() +table_env = TableEnvironment.create(env_settings) +t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b']) + +result = t.group_by(col('a')) \ + .aggregate(agg.alias("c", "d")) \ + .select(col('a'), col('c'), col('d')) +result.to_pandas() + +# the result is +# a c d +# 0 1 2 5 +# 1 2 1 1 + + +# aggregate with a python vectorized aggregate function +env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build() +table_env = TableEnvironment.create(env_settings) + +t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b']) + +pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()), + result_type=DataTypes.ROW( + [DataTypes.FIELD("a", DataTypes.FLOAT()), + DataTypes.FIELD("b", DataTypes.INT())]), + func_type="pandas") +t.aggregate(pandas_udaf.alias("a", "b")) \ + .select(col('a'), col('b')).to_pandas() + +# the result is +# a b +# 0 2.0 3 +``` + +## FlatAggregate + +Performs a flat_aggregate operation with a python general [Table Aggregate Function]({{< ref "docs/dev/python/table/udfs/python_udfs" >}}#table-aggregate-functions) + +Similar to a **GroupBy Aggregation**. Groups the rows on the grouping keys with the following running table aggregation operator to aggregate rows group-wise. The difference from an AggregateFunction is that TableAggregateFunction may return 0 or more records for a group. You have to close the "flat_aggregate" with a select statement. And the select statement does not support aggregate functions. + +```python +from pyflink.common import Row +from pyflink.table.expressions import col +from pyflink.table.udf import TableAggregateFunction, udtaf +from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment + +class Top2(TableAggregateFunction): + + def emit_value(self, accumulator): + yield Row(accumulator[0]) + yield Row(accumulator[1]) + + def create_accumulator(self): + return [None, None] + + def accumulate(self, accumulator, *args): + if args[0][0] is not None: + if accumulator[0] is None or args[0][0] > accumulator[0]: + accumulator[1] = accumulator[0] + accumulator[0] = args[0][0] + elif accumulator[1] is None or args[0][0] > accumulator[1]: + accumulator[1] = args[0][0] + + def retract(self, accumulator, *args): + accumulator[0] = accumulator[0] - 1 + + def merge(self, accumulator, accumulators): + for other_acc in accumulators: + self.accumulate(accumulator, other_acc[0]) + self.accumulate(accumulator, other_acc[1]) + + def get_accumulator_type(self): + return DataTypes.ARRAY(DataTypes.BIGINT()) + + def get_result_type(self): + return DataTypes.ROW( + [DataTypes.FIELD("a", DataTypes.BIGINT())]) + +mytop = udtaf(Top2()) + +env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() +table_env = TableEnvironment.create(env_settings) +t = table_env.from_elements([(1, 'Hi', 'Hello'), + (3, 'Hi', 'hi'), + (5, 'Hi2', 'hi'), + (7, 'Hi', 'Hello'), + (2, 'Hi', 'Hello')], ['a', 'b', 'c']) +result = t.select(col('a'), col('c')) \ + .group_by(col('c')) \ + .flat_aggregate(mytop) \ + .select(col('b')) \ + .flat_aggregate(mytop.alias("b")) \ + .select(col('b')) + +result.to_pandas() +# the result is +# b +# 0 7 +# 1 5 +``` diff --git a/docs/content.zh/docs/dev/table/tableApi.md b/docs/content.zh/docs/dev/table/tableApi.md index 9725c15b6c377..80300768d7c18 100644 --- a/docs/content.zh/docs/dev/table/tableApi.md +++ b/docs/content.zh/docs/dev/table/tableApi.md @@ -2211,8 +2211,8 @@ def map_function(a: Row) -> Row: # map operation with a python general scalar function func = udf(map_function, result_type=DataTypes.ROW( [DataTypes.FIELD("a", DataTypes.BIGINT()), - DataTypes.FIELD("b", DataTypes.BIGINT()))])) -table = input.map(map_function).alias('a', 'b') + DataTypes.FIELD("b", DataTypes.BIGINT())])) +table = input.map(func).alias('a', 'b') # map operation with a python vectorized scalar function pandas_func = udf(lambda x: x * 2, result_type=DataTypes.ROW( @@ -2414,11 +2414,9 @@ from pyflink.table.udf import AggregateFunction, udaf class CountAndSumAggregateFunction(AggregateFunction): def get_value(self, accumulator): - from pyflink.common import Row return Row(accumulator[0], accumulator[1]) def create_accumulator(self): - from pyflink.common import Row return Row(0, 0) def accumulate(self, accumulator, *args): @@ -2461,8 +2459,7 @@ pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()), [DataTypes.FIELD("a", DataTypes.FLOAT()), DataTypes.FIELD("b", DataTypes.INT())]), func_type="pandas") -t.select(t.b) \ - .aggregate(pandas_udaf.alias("a", "b")) \ +t.aggregate(pandas_udaf.alias("a", "b")) \ .select("a, b") ``` diff --git a/docs/content/docs/dev/python/table/python_types.md b/docs/content/docs/dev/python/table/python_types.md index ff5e101b41121..f1ec4fadec848 100644 --- a/docs/content/docs/dev/python/table/python_types.md +++ b/docs/content/docs/dev/python/table/python_types.md @@ -1,6 +1,6 @@ --- title: "Data Types" -weight: 31 +weight: 32 type: docs aliases: - /dev/python/table-api-users-guide/python_types.html diff --git a/docs/content/docs/dev/python/table/row_based_operations.md b/docs/content/docs/dev/python/table/row_based_operations.md new file mode 100644 index 0000000000000..e69ed9f363901 --- /dev/null +++ b/docs/content/docs/dev/python/table/row_based_operations.md @@ -0,0 +1,275 @@ +--- +title: "Row-based Operations" +weight: 31 +type: docs +--- + + +# Row-based Operations + +This page describes how to use Row-based Operations in PyFlink Table API. + +## Map + +Performs a map operation with a python [general scalar function]({{< ref "docs/dev/python/table/udfs/python_udfs" >}}#scalar-functions) or [vectorized scalar function]({{< ref "docs/dev/python/table/udfs/vectorized_python_udfs" >}}#vectorized-scalar-functions). +The output will be flattened if the output type is a composite type. + +Note If you do not specify input args of your scalar function, all input args will be merged as a Row or Pandas.DataFrame. +```python +from pyflink.common import Row +from pyflink.table import EnvironmentSettings, TableEnvironment +from pyflink.table.expressions import col +from pyflink.table.types import DataTypes + +env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build() +table_env = TableEnvironment.create(env_settings) + +table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data']) + +# 1. Specify columns +@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()), + DataTypes.FIELD("data", DataTypes.STRING())])) +def func1(id: int, data: str) -> Row: + return Row(id, data * 2) + +table.map(func1(col('id'), col('data'))).to_pandas() +# result is +# _c0 _c1 +# 0 1 HiHi +# 1 2 HelloHello + + +# 2. Don't specify columns in general scalar function +@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()), + DataTypes.FIELD("data", DataTypes.STRING())])) +def func2(data: Row) -> Row: + return Row(data[0], data[1] * 2) + +table.map(func2).alias('id', 'data').to_pandas() +# result is +# id data +# 0 1 HiHi +# 1 2 HelloHello + +# 3. Don't specify columns in pandas scalar function +import pandas as pd +@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()), + DataTypes.FIELD("data", DataTypes.STRING())]), + func_type='pandas') +def func3(data: pd.DataFrame) -> pd.DataFrame: + res = pd.concat([data.id, data.data * 2], axis=1) + return res + +table.map(func3).alias('id', 'data').to_pandas() +# result is +# id data +# 0 1 HiHi +# 1 2 HelloHello +``` + +## FlatMap + +Performs a `flat_map` operation with a python [table function]({{< ref "docs/dev/python/table/udfs/python_udfs" >}}#table-functions). + +```python +from pyflink.common import Row +from pyflink.table.udf import udtf +from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment + +env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() +table_env = TableEnvironment.create(env_settings) + +table = table_env.from_elements([(1, 'Hi,Flink'), (2, 'Hello')], ['id', 'data']) + +@udtf(result_types=[DataTypes.INT(), DataTypes.STRING()]) +def split(x: Row) -> Row: + for s in x[1].split(","): + yield x[0], s + +# use table function split in `flat_map` +table.flat_map(split).to_pandas() +# result is +# f0 f1 +# 0 1 Hi +# 1 1 Flink +# 2 2 Hello + +# use table function in `join_lateral` or `left_outer_join_lateral` +table.join_lateral(split.alias('a', 'b')).to_pandas() +# result is +# id data a b +# 0 1 Hi,Flink 1 Hi +# 1 1 Hi,Flink 1 Flink +# 2 2 Hello 2 Hello +``` + +## Aggregate + +Performs an aggregate operation with a python general aggregate function or vectorized aggregate function. +You have to close the "aggregate" with a select statement and the select statement does not support aggregate functions. +The output of aggregate will be flattened if the output type is a composite type. + +Note If you do not specify input args of your aggregate function, all input args including group key will be merged as a Row or Pandas.DataFrame. + +```python +from pyflink.common import Row +from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment +from pyflink.table.expressions import col +from pyflink.table.udf import AggregateFunction, udaf + +class CountAndSumAggregateFunction(AggregateFunction): + + def get_value(self, accumulator): + return Row(accumulator[0], accumulator[1]) + + def create_accumulator(self): + return Row(0, 0) + + def accumulate(self, accumulator, *args): + accumulator[0] += 1 + accumulator[1] += args[0][1] + + def retract(self, accumulator, *args): + accumulator[0] -= 1 + accumulator[1] -= args[0][1] + + def merge(self, accumulator, accumulators): + for other_acc in accumulators: + accumulator[0] += other_acc[0] + accumulator[1] += other_acc[1] + + def get_accumulator_type(self): + return DataTypes.ROW( + [DataTypes.FIELD("a", DataTypes.BIGINT()), + DataTypes.FIELD("b", DataTypes.BIGINT())]) + + def get_result_type(self): + return DataTypes.ROW( + [DataTypes.FIELD("a", DataTypes.BIGINT()), + DataTypes.FIELD("b", DataTypes.BIGINT())]) + +function = CountAndSumAggregateFunction() +agg = udaf(function, + result_type=function.get_result_type(), + accumulator_type=function.get_accumulator_type(), + name=str(function.__class__.__name__)) + +# aggregate with a python general aggregate function + +env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() +table_env = TableEnvironment.create(env_settings) +t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b']) + +result = t.group_by(col('a')) \ + .aggregate(agg.alias("c", "d")) \ + .select(col('a'), col('c'), col('d')) +result.to_pandas() + +# the result is +# a c d +# 0 1 2 5 +# 1 2 1 1 + + +# aggregate with a python vectorized aggregate function +env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build() +table_env = TableEnvironment.create(env_settings) + +t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b']) + +pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()), + result_type=DataTypes.ROW( + [DataTypes.FIELD("a", DataTypes.FLOAT()), + DataTypes.FIELD("b", DataTypes.INT())]), + func_type="pandas") +t.aggregate(pandas_udaf.alias("a", "b")) \ + .select(col('a'), col('b')).to_pandas() + +# the result is +# a b +# 0 2.0 3 +``` + +## FlatAggregate + +Performs a flat_aggregate operation with a python general [Table Aggregate Function]({{< ref "docs/dev/python/table/udfs/python_udfs" >}}#table-aggregate-functions) + +Similar to a **GroupBy Aggregation**. Groups the rows on the grouping keys with the following running table aggregation operator to aggregate rows group-wise. The difference from an AggregateFunction is that TableAggregateFunction may return 0 or more records for a group. You have to close the "flat_aggregate" with a select statement. And the select statement does not support aggregate functions. + +```python +from pyflink.common import Row +from pyflink.table.expressions import col +from pyflink.table.udf import TableAggregateFunction, udtaf +from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment + +class Top2(TableAggregateFunction): + + def emit_value(self, accumulator): + yield Row(accumulator[0]) + yield Row(accumulator[1]) + + def create_accumulator(self): + return [None, None] + + def accumulate(self, accumulator, *args): + if args[0][0] is not None: + if accumulator[0] is None or args[0][0] > accumulator[0]: + accumulator[1] = accumulator[0] + accumulator[0] = args[0][0] + elif accumulator[1] is None or args[0][0] > accumulator[1]: + accumulator[1] = args[0][0] + + def retract(self, accumulator, *args): + accumulator[0] = accumulator[0] - 1 + + def merge(self, accumulator, accumulators): + for other_acc in accumulators: + self.accumulate(accumulator, other_acc[0]) + self.accumulate(accumulator, other_acc[1]) + + def get_accumulator_type(self): + return DataTypes.ARRAY(DataTypes.BIGINT()) + + def get_result_type(self): + return DataTypes.ROW( + [DataTypes.FIELD("a", DataTypes.BIGINT())]) + +mytop = udtaf(Top2()) + +env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() +table_env = TableEnvironment.create(env_settings) +t = table_env.from_elements([(1, 'Hi', 'Hello'), + (3, 'Hi', 'hi'), + (5, 'Hi2', 'hi'), + (7, 'Hi', 'Hello'), + (2, 'Hi', 'Hello')], ['a', 'b', 'c']) +result = t.select(col('a'), col('c')) \ + .group_by(col('c')) \ + .flat_aggregate(mytop) \ + .select(col('b')) \ + .flat_aggregate(mytop.alias("b")) \ + .select(col('b')) + +result.to_pandas() +# the result is +# b +# 0 7 +# 1 5 +``` diff --git a/docs/content/docs/dev/table/tableApi.md b/docs/content/docs/dev/table/tableApi.md index 6f253898c3239..4ca942b44c182 100644 --- a/docs/content/docs/dev/table/tableApi.md +++ b/docs/content/docs/dev/table/tableApi.md @@ -2211,8 +2211,8 @@ def map_function(a: Row) -> Row: # map operation with a python general scalar function func = udf(map_function, result_type=DataTypes.ROW( [DataTypes.FIELD("a", DataTypes.BIGINT()), - DataTypes.FIELD("b", DataTypes.BIGINT()))])) -table = input.map(map_function).alias('a', 'b') + DataTypes.FIELD("b", DataTypes.BIGINT())])) +table = input.map(func).alias('a', 'b') # map operation with a python vectorized scalar function pandas_func = udf(lambda x: x * 2, result_type=DataTypes.ROW( @@ -2414,11 +2414,9 @@ from pyflink.table.udf import AggregateFunction, udaf class CountAndSumAggregateFunction(AggregateFunction): def get_value(self, accumulator): - from pyflink.common import Row return Row(accumulator[0], accumulator[1]) def create_accumulator(self): - from pyflink.common import Row return Row(0, 0) def accumulate(self, accumulator, *args): @@ -2461,8 +2459,7 @@ pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()), [DataTypes.FIELD("a", DataTypes.FLOAT()), DataTypes.FIELD("b", DataTypes.INT())]), func_type="pandas") -t.select(t.b) \ - .aggregate(pandas_udaf.alias("a", "b")) \ +t.aggregate(pandas_udaf.alias("a", "b")) \ .select("a, b") ```