diff --git a/docs/content.zh/docs/dev/datastream/operators/overview.md b/docs/content.zh/docs/dev/datastream/operators/overview.md
index 9c4ac57f7e450..c26ac984feb5e 100644
--- a/docs/content.zh/docs/dev/datastream/operators/overview.md
+++ b/docs/content.zh/docs/dev/datastream/operators/overview.md
@@ -3,7 +3,7 @@ title: 概览
weight: 1
type: docs
aliases:
- - /dev/stream/operators/
+ - /zh/dev/stream/operators/
---
+
+# Process Function
+
+## ProcessFunction
+
+The `ProcessFunction` is a low-level stream processing operation, giving access to the basic building blocks of
+all (acyclic) streaming applications:
+
+- events (stream elements)
+- state (fault-tolerant, consistent, only on keyed stream)
+- timers (event time and processing time, only on keyed stream)
+
+The `ProcessFunction` can be thought of as a `FlatMapFunction` with access to keyed state and timers. It handles events
+by being invoked for each event received in the input stream(s).
+
+Please refer to [Process Function]({{< ref "docs/dev/datastream/operators/process_function" >}})
+for more details about the concept and usage of `ProcessFunction`.
+
+## Execution behavior of timer
+
+Python user-defined functions are executed in a separate Python process from Flink's operators which run in a JVM,
+the timer registration requests made in `ProcessFunction` will be sent to the Java operator asynchronously.
+Once received timer registration requests, the Java operator will register it into the underlying timer service.
+
+If the registered timer has already passed the current time (the current system time for processing time timer,
+or the current watermark for event time), it will be triggered immediately.
+
+Note that, due to the asynchronous processing characteristics, it may happen that the timer was triggered a little later than the actual time.
+For example, a registered processing time timer of `10:00:00` may be actually processed at `10:00:05`.
diff --git a/docs/content/docs/dev/datastream/operators/overview.md b/docs/content/docs/dev/datastream/operators/overview.md
index c41bd3b67b3d2..93e32b891a30b 100644
--- a/docs/content/docs/dev/datastream/operators/overview.md
+++ b/docs/content/docs/dev/datastream/operators/overview.md
@@ -184,7 +184,7 @@ keyedStream.reduce { _ + _ }
{{< /tab >}}
{{< tab "Python" >}}
```python
-data_stream = env.from_collection(collection=[(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b')], type_info=Types.ROW([Types.INT(), Types.STRING()]))
+data_stream = env.from_collection(collection=[(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b')], type_info=Types.TUPLE([Types.INT(), Types.STRING()]))
data_stream.key_by(lambda x: x[1]).reduce(lambda a, b: (a[0] + b[0], b[1]))
```
{{< /tab >}}
diff --git a/docs/content/docs/dev/datastream/operators/process_function.md b/docs/content/docs/dev/datastream/operators/process_function.md
index 90d9ed1602be9..4268009742b0f 100644
--- a/docs/content/docs/dev/datastream/operators/process_function.md
+++ b/docs/content/docs/dev/datastream/operators/process_function.md
@@ -246,6 +246,94 @@ class CountWithTimeoutFunction extends KeyedProcessFunction[Tuple, (String, Stri
}
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+import datetime
+
+from pyflink.common import Row, WatermarkStrategy
+from pyflink.common.typeinfo import Types
+from pyflink.common.watermark_strategy import TimestampAssigner
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
+from pyflink.datastream.state import ValueStateDescriptor
+from pyflink.table import StreamTableEnvironment
+
+
+class CountWithTimeoutFunction(KeyedProcessFunction):
+
+ def __init__(self):
+ self.state = None
+
+ def open(self, runtime_context: RuntimeContext):
+ self.state = runtime_context.get_state(ValueStateDescriptor(
+ "my_state", Types.ROW([Types.STRING(), Types.LONG(), Types.LONG()])))
+
+ def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
+ # retrieve the current count
+ current = self.state.value()
+ if current is None:
+ current = Row(value.f1, 0, 0)
+
+ # update the state's count
+ current[1] += 1
+
+ # set the state's timestamp to the record's assigned event time timestamp
+ current[2] = ctx.timestamp()
+
+ # write the state back
+ self.state.update(current)
+
+ # schedule the next timer 60 seconds from the current event time
+ ctx.timer_service().register_event_time_timer(current[2] + 60000)
+
+ def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
+ # get the state for the key that scheduled the timer
+ result = self.state.value()
+
+ # check if this is an outdated timer or the latest timer
+ if timestamp == result[2] + 60000:
+ # emit the state on timeout
+ yield result[0], result[1]
+
+
+class MyTimestampAssigner(TimestampAssigner):
+
+ def __init__(self):
+ self.epoch = datetime.datetime.utcfromtimestamp(0)
+
+ def extract_timestamp(self, value, record_timestamp) -> int:
+ return (value[0] - self.epoch).total_seconds() * 1000.0
+
+
+if __name__ == '__main__':
+ env = StreamExecutionEnvironment.get_execution_environment()
+ t_env = StreamTableEnvironment.create(stream_execution_environment=env)
+
+ t_env.execute_sql("""
+ CREATE TABLE my_source (
+ a TIMESTAMP(3),
+ b VARCHAR,
+ c VARCHAR
+ ) WITH (
+ 'connector' = 'datagen',
+ 'rows-per-second' = '10'
+ )
+ """)
+
+ stream = t_env.to_append_stream(
+ t_env.from_path('my_source'),
+ Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()]))
+ watermarked_stream = stream.assign_timestamps_and_watermarks(
+ WatermarkStrategy.for_monotonous_timestamps()
+ .with_timestamp_assigner(MyTimestampAssigner()))
+
+ # apply the process function onto a keyed stream
+ result = watermarked_stream.key_by(lambda value: value[1]) \
+ .process(CountWithTimeoutFunction()) \
+ .print()
+ env.execute()
+```
+{{< /tab >}}
{{< /tabs >}}
@@ -281,6 +369,13 @@ override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]):
}
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
+ key = ctx.get_current_key()
+ # ...
+```
+{{< /tab >}}
{{< /tabs >}}
## Timers
@@ -327,6 +422,12 @@ val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 1000
ctx.timerService.registerProcessingTimeTimer(coalescedTime)
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+coalesced_time = ((ctx.timestamp() + timeout) / 1000) * 1000
+ctx.timer_service().register_processing_time_timer(coalesced_time)
+```
+{{< /tab >}}
{{< /tabs >}}
Since event-time timers only fire with watermarks coming in, you may also schedule and coalesce
@@ -345,6 +446,12 @@ val coalescedTime = ctx.timerService.currentWatermark + 1
ctx.timerService.registerEventTimeTimer(coalescedTime)
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+coalesced_time = ctx.timer_service().current_watermark() + 1
+ctx.timer_service().register_event_time_timer(coalesced_time)
+```
+{{< /tab >}}
{{< /tabs >}}
Timers can also be stopped and removed as follows:
@@ -364,6 +471,12 @@ val timestampOfTimerToStop = ...
ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+timestamp_of_timer_to_stop = ...
+ctx.timer_service().delete_processing_time_timer(timestamp_of_timer_to_stop)
+```
+{{< /tab >}}
{{< /tabs >}}
Stopping an event-time timer:
@@ -381,6 +494,12 @@ val timestampOfTimerToStop = ...
ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+timestamp_of_timer_to_stop = ...
+ctx.timer_service().delete_event_time_timer(timestamp_of_timer_to_stop)
+```
+{{< /tab >}}
{{< /tabs >}}
{{< hint info >}}
diff --git a/docs/content/docs/dev/python/datastream/operators/overview.md b/docs/content/docs/dev/python/datastream/operators/overview.md
index 30cd642abebc5..da01cb016fa40 100644
--- a/docs/content/docs/dev/python/datastream/operators/overview.md
+++ b/docs/content/docs/dev/python/datastream/operators/overview.md
@@ -26,28 +26,24 @@ under the License.
# Operators
-
Operators transform one or more DataStreams into a new DataStream. Programs can combine multiple transformations into
sophisticated dataflow topologies.
-
-
-# DataStream Transformations
+## DataStream Transformations
DataStream programs in Flink are regular programs that implement transformations on data streams (e.g., mapping,
filtering, reducing). Please see [operators]({{< ref "docs/dev/datastream/operators/overview" >}})
-for an overview of the available stream transformations in Python DataStream API.
+for an overview of the available transformations in Python DataStream API.
-# Functions
-Most transformations require a user-defined function as input to define the functionality of the transformation. The
-following describes different ways of defining user-defined functions.
+## Functions
+Transformations accept user-defined functions as input to define the functionality of the transformations.
+The following section describes different ways of defining Python user-defined functions in Python DataStream API.
-## Implementing Function Interfaces
+### Implementing Function Interfaces
Different Function interfaces are provided for different transformations in the Python DataStream API. For example,
`MapFunction` is provided for the `map` transformation, `FilterFunction` is provided for the `filter` transformation, etc.
Users can implement the corresponding Function interface according to the type of the transformation. Take MapFunction for
-instance:
-
+instance:
```python
# Implementing MapFunction
@@ -60,24 +56,19 @@ data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())
```
-
-Note In Python DataStream API, users can specify the output type information of the transformation explicityly. If not
-specified, the output type will be `Types.PICKLED_BYTE_ARRAY` so that data will be in a form of byte array generated by
-the pickle seriallizer. For more details about the `Pickle Serialization`, please refer to [DataTypes]({{< ref "docs/dev/python/datastream/data_types" >}}#pickle-serialization).
-
-## Lambda Function
-As shown in the following example, all the transformations can also accept a lambda function to define the functionality of the transformation:
+### Lambda Function
+As shown in the following example, the transformations can also accept a lambda function to define the functionality of the transformation:
```python
data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(lambda x: x + 1, output_type=Types.INT())
```
-Note Operations ConnectedStream.map() and ConnectedStream.flat_map() do not support
-lambda function and must accept `CoMapFunction` and `CoFlatMapFunction` seperately.
+Note `ConnectedStream.map()` and `ConnectedStream.flat_map()` do not support
+lambda function and must accept `CoMapFunction` and `CoFlatMapFunction` separately.
-## Python Function
-Users can also use Python function:
+### Python Function
+Users could also use Python function to define the functionality of the transformation:
```python
def my_map_func(value):
@@ -87,3 +78,88 @@ data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(my_map_func, output_type=Types.INT())
```
+## Output Type
+
+Users could specify the output type information of the transformation explicitly in Python DataStream API. If not
+specified, the output type will be `Types.PICKLED_BYTE_ARRAY` by default, and the result data will be serialized using pickle serializer.
+For more details about the pickle serializer, please refer to [Pickle Serialization]({{< ref "docs/dev/python/datastream/data_types" >}}#pickle-serialization).
+
+The output type must usually be specified in the following scenarios.
+
+### Convert DataStream into Table
+
+```python
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import StreamTableEnvironment
+
+
+def data_stream_api_demo():
+ env = StreamExecutionEnvironment.get_execution_environment()
+ t_env = StreamTableEnvironment.create(stream_execution_environment=env)
+
+ t_env.execute_sql("""
+ CREATE TABLE my_source (
+ a INT,
+ b VARCHAR
+ ) WITH (
+ 'connector' = 'datagen',
+ 'number-of-rows' = '10'
+ )
+ """)
+
+ ds = t_env.to_append_stream(
+ t_env.from_path('my_source'),
+ Types.ROW([Types.INT(), Types.STRING()]))
+
+ def split(s):
+ splits = s[1].split("|")
+ for sp in splits:
+ yield s[0], sp
+
+ ds = ds.map(lambda i: (i[0] + 1, i[1])) \
+ .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
+ .key_by(lambda i: i[1]) \
+ .reduce(lambda i, j: (i[0] + j[0], i[1]))
+
+ t_env.execute_sql("""
+ CREATE TABLE my_sink (
+ a INT,
+ b VARCHAR
+ ) WITH (
+ 'connector' = 'print'
+ )
+ """)
+
+ table = t_env.from_data_stream(ds)
+ table_result = table.execute_insert("my_sink")
+
+ # 1)wait for job finishes and only used in local execution, otherwise, it may happen that the script exits with the job is still running
+ # 2)should be removed when submitting the job to a remote cluster such as YARN, standalone, K8s etc in detach mode
+ table_result.wait()
+
+
+if __name__ == '__main__':
+ data_stream_api_demo()
+```
+
+The output type must be specified for the flat_map operation in the above example which will be used as
+the output type of the reduce operation implicitly. The reason is that
+`t_env.from_data_stream(ds)` requires the output type of `ds` must be a composite type.
+
+### Write DataStream to Sink
+
+```python
+from pyflink.common.typeinfo import Types
+
+def split(s):
+ splits = s[1].split("|")
+ for sp in splits:
+ yield s[0], sp
+
+ds.map(lambda i: (i[0] + 1, i[1]), Types.TUPLE([Types.INT(), Types.STRING()])) \
+ .sink_to(...)
+```
+
+The output type should usually be specified for the map operation in the above example if the sink only accepts special kinds of data, e.g. Row, etc.
+
diff --git a/docs/content/docs/dev/python/datastream/operators/process_function.md b/docs/content/docs/dev/python/datastream/operators/process_function.md
new file mode 100644
index 0000000000000..d9f772b22f913
--- /dev/null
+++ b/docs/content/docs/dev/python/datastream/operators/process_function.md
@@ -0,0 +1,52 @@
+---
+title: "Process Function"
+weight: 4
+type: docs
+---
+
+
+# Process Function
+
+## ProcessFunction
+
+The `ProcessFunction` is a low-level stream processing operation, giving access to the basic building blocks of
+all (acyclic) streaming applications:
+
+- events (stream elements)
+- state (fault-tolerant, consistent, only on keyed stream)
+- timers (event time and processing time, only on keyed stream)
+
+The `ProcessFunction` can be thought of as a `FlatMapFunction` with access to keyed state and timers. It handles events
+by being invoked for each event received in the input stream(s).
+
+Please refer to [Process Function]({{< ref "docs/dev/datastream/operators/process_function" >}})
+for more details about the concept and usage of `ProcessFunction`.
+
+## Execution behavior of timer
+
+Python user-defined functions are executed in a separate Python process from Flink's operators which run in a JVM,
+the timer registration requests made in `ProcessFunction` will be sent to the Java operator asynchronously.
+Once received timer registration requests, the Java operator will register it into the underlying timer service.
+
+If the registered timer has already passed the current time (the current system time for processing time timer,
+or the current watermark for event time), it will be triggered immediately.
+
+Note that, due to the asynchronous processing characteristics, it may happen that the timer was triggered a little later than the actual time.
+For example, a registered processing time timer of `10:00:00` may be actually processed at `10:00:05`.