Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/content.zh/docs/dev/datastream/operators/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ title: 概览
weight: 1
type: docs
aliases:
- /dev/stream/operators/
- /zh/dev/stream/operators/
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
Expand Down Expand Up @@ -182,7 +182,7 @@ keyedStream.reduce { _ + _ }
{{< /tab >}}
{{< tab "Python" >}}
```python
data_stream = env.from_collection(collection=[(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b')], type_info=Types.ROW([Types.INT(), Types.STRING()]))
data_stream = env.from_collection(collection=[(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b')], type_info=Types.TUPLE([Types.INT(), Types.STRING()]))
data_stream.key_by(lambda x: x[1]).reduce(lambda a, b: (a[0] + b[0], b[1]))
```
{{< /tab >}}
Expand Down
119 changes: 119 additions & 0 deletions docs/content.zh/docs/dev/datastream/operators/process_function.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,94 @@ class CountWithTimeoutFunction extends KeyedProcessFunction[Tuple, (String, Stri
}
```
{{< /tab >}}
{{< tab "Python" >}}
```python
import datetime

from pyflink.common import Row, WatermarkStrategy
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.table import StreamTableEnvironment


class CountWithTimeoutFunction(KeyedProcessFunction):

def __init__(self):
self.state = None

def open(self, runtime_context: RuntimeContext):
self.state = runtime_context.get_state(ValueStateDescriptor(
"my_state", Types.ROW([Types.STRING(), Types.LONG(), Types.LONG()])))

def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
# retrieve the current count
current = self.state.value()
if current is None:
current = Row(value.f1, 0, 0)

# update the state's count
current[1] += 1

# set the state's timestamp to the record's assigned event time timestamp
current[2] = ctx.timestamp()

# write the state back
self.state.update(current)

# schedule the next timer 60 seconds from the current event time
ctx.timer_service().register_event_time_timer(current[2] + 60000)

def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
# get the state for the key that scheduled the timer
result = self.state.value()

# check if this is an outdated timer or the latest timer
if timestamp == result[2] + 60000:
# emit the state on timeout
yield result[0], result[1]


class MyTimestampAssigner(TimestampAssigner):

def __init__(self):
self.epoch = datetime.datetime.utcfromtimestamp(0)

def extract_timestamp(self, value, record_timestamp) -> int:
return (value[0] - self.epoch).total_seconds() * 1000.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return (value[0] - self.epoch).total_seconds() * 1000.0
return int((value[0] - self.epoch).total_seconds() * 1000)



if __name__ == '__main__':
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)

t_env.execute_sql("""
CREATE TABLE my_source (
a TIMESTAMP(3),
b VARCHAR,
c VARCHAR
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10'
)
""")

stream = t_env.to_append_stream(
t_env.from_path('my_source'),
Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()]))
watermarked_stream = stream.assign_timestamps_and_watermarks(
WatermarkStrategy.for_monotonous_timestamps()
.with_timestamp_assigner(MyTimestampAssigner()))

# apply the process function onto a keyed stream
result = watermarked_stream.key_by(lambda value: value[1]) \
.process(CountWithTimeoutFunction()) \
.print()
env.execute()
```
{{< /tab >}}
{{< /tabs >}}


Expand Down Expand Up @@ -281,6 +369,13 @@ override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]):
}
```
{{< /tab >}}
{{< tab "Python" >}}
```python
def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
key = ctx.get_current_key()
# ...
```
{{< /tab >}}
{{< /tabs >}}

## Timers
Expand Down Expand Up @@ -327,6 +422,12 @@ val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 1000
ctx.timerService.registerProcessingTimeTimer(coalescedTime)
```
{{< /tab >}}
{{< tab "Python" >}}
```python
coalesced_time = ((ctx.timestamp() + timeout) / 1000) * 1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
coalesced_time = ((ctx.timestamp() + timeout) / 1000) * 1000
coalesced_time = ((ctx.timestamp() + timeout) // 1000) * 1000

ctx.timer_service().register_processing_time_timer(coalesced_time)
```
{{< /tab >}}
{{< /tabs >}}

Since event-time timers only fire with watermarks coming in, you may also schedule and coalesce
Expand All @@ -345,6 +446,12 @@ val coalescedTime = ctx.timerService.currentWatermark + 1
ctx.timerService.registerEventTimeTimer(coalescedTime)
```
{{< /tab >}}
{{< tab "Python" >}}
```python
coalesced_time = ctx.timer_service().current_watermark() + 1
ctx.timer_service().register_event_time_timer(coalesced_time)
```
{{< /tab >}}
{{< /tabs >}}

Timers can also be stopped and removed as follows:
Expand All @@ -364,6 +471,12 @@ val timestampOfTimerToStop = ...
ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)
```
{{< /tab >}}
{{< tab "Python" >}}
```python
timestamp_of_timer_to_stop = ...
ctx.timer_service().delete_processing_time_timer(timestamp_of_timer_to_stop)
```
{{< /tab >}}
{{< /tabs >}}

Stopping an event-time timer:
Expand All @@ -381,6 +494,12 @@ val timestampOfTimerToStop = ...
ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)
```
{{< /tab >}}
{{< tab "Python" >}}
```python
timestamp_of_timer_to_stop = ...
ctx.timer_service().delete_event_time_timer(timestamp_of_timer_to_stop)
```
{{< /tab >}}
{{< /tabs >}}

{{< hint info >}}
Expand Down
119 changes: 97 additions & 22 deletions docs/content.zh/docs/dev/python/datastream/operators/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,24 @@ under the License.

# Operators


Operators transform one or more DataStreams into a new DataStream. Programs can combine multiple transformations into
sophisticated dataflow topologies.



# DataStream Transformations
## DataStream Transformations

DataStream programs in Flink are regular programs that implement transformations on data streams (e.g., mapping,
filtering, reducing). Please see [operators]({{< ref "docs/dev/datastream/operators/overview" >}})
for an overview of the available stream transformations in Python DataStream API.
for an overview of the available transformations in Python DataStream API.

# Functions
Most transformations require a user-defined function as input to define the functionality of the transformation. The
following describes different ways of defining user-defined functions.
## Functions
Transformations accept user-defined functions as input to define the functionality of the transformations.
The following section describes different ways of defining Python user-defined functions in Python DataStream API.

## Implementing Function Interfaces
### Implementing Function Interfaces
Different Function interfaces are provided for different transformations in the Python DataStream API. For example,
`MapFunction` is provided for the `map` transformation, `FilterFunction` is provided for the `filter` transformation, etc.
Users can implement the corresponding Function interface according to the type of the transformation. Take MapFunction for
instance:

instance:

```python
# Implementing MapFunction
Expand All @@ -60,24 +56,19 @@ data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())
```


<span class="label label-info">Note</span> In Python DataStream API, users can specify the output type information of the transformation explicityly. If not
specified, the output type will be `Types.PICKLED_BYTE_ARRAY` so that data will be in a form of byte array generated by
the pickle seriallizer. For more details about the `Pickle Serialization`, please refer to [DataTypes]({{< ref "docs/dev/python/datastream/data_types" >}}#pickle-serialization).

## Lambda Function
As shown in the following example, all the transformations can also accept a lambda function to define the functionality of the transformation:
### Lambda Function
As shown in the following example, the transformations can also accept a lambda function to define the functionality of the transformation:

```python
data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(lambda x: x + 1, output_type=Types.INT())
```

<span class="label label-info">Note</span> Operations ConnectedStream.map() and ConnectedStream.flat_map() do not support
lambda function and must accept `CoMapFunction` and `CoFlatMapFunction` seperately.
<span class="label label-info">Note</span> `ConnectedStream.map()` and `ConnectedStream.flat_map()` do not support
lambda function and must accept `CoMapFunction` and `CoFlatMapFunction` separately.

## Python Function
Users can also use Python function:
### Python Function
Users could also use Python function to define the functionality of the transformation:

```python
def my_map_func(value):
Expand All @@ -87,3 +78,87 @@ data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(my_map_func, output_type=Types.INT())
```

## Output Type

Users could specify the output type information of the transformation explicitly in Python DataStream API. If not
specified, the output type will be `Types.PICKLED_BYTE_ARRAY` by default, and the result data will be serialized using pickle serializer.
For more details about the pickle serializer, please refer to [Pickle Serialization]({{< ref "docs/dev/python/datastream/data_types" >}}#pickle-serialization).

The output type must usually be specified in the following scenarios.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The output type must usually be specified in the following scenarios.
Generally, the output type must be specified in the following scenarios.


### Convert DataStream into Table

```python
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment


def data_stream_api_demo():
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)

t_env.execute_sql("""
CREATE TABLE my_source (
a INT,
b VARCHAR
) WITH (
'connector' = 'datagen',
'number-of-rows' = '10'
)
""")

ds = t_env.to_append_stream(
t_env.from_path('my_source'),
Types.ROW([Types.INT(), Types.STRING()]))

def split(s):
splits = s[1].split("|")
for sp in splits:
yield s[0], sp

ds = ds.map(lambda i: (i[0] + 1, i[1])) \
.flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
.key_by(lambda i: i[1]) \
.reduce(lambda i, j: (i[0] + j[0], i[1]))

t_env.execute_sql("""
CREATE TABLE my_sink (
a INT,
b VARCHAR
) WITH (
'connector' = 'print'
)
""")

table = t_env.from_data_stream(ds)
table_result = table.execute_insert("my_sink")

# 1)等待作业执行结束,用于local执行,否则可能作业尚未执行结束,该脚本已退出,会导致minicluster过早退出
# 2)当作业通过detach模式往remote集群提交时,比如YARN/Standalone/K8s等,需要移除该方法
table_result.wait()


if __name__ == '__main__':
data_stream_api_demo()
```

The output type must be specified for the flat_map operation in the above example which will be used as
the output type of the reduce operation implicitly. The reason is that
`t_env.from_data_stream(ds)` requires the output type of `ds` must be a composite type.

### Write DataStream to Sink

```python
from pyflink.common.typeinfo import Types

def split(s):
splits = s[1].split("|")
for sp in splits:
yield s[0], sp

ds.map(lambda i: (i[0] + 1, i[1]), Types.TUPLE([Types.INT(), Types.STRING()])) \
.sink_to(...)
```

The output type should usually be specified for the map operation in the above example if the sink only accepts special kinds of data, e.g. Row, etc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The output type should usually be specified for the map operation in the above example if the sink only accepts special kinds of data, e.g. Row, etc.
Generally, the output type should be specified for the map operation in the above example if the sink only accepts special kinds of data, e.g. Row, etc.

Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
---
title: "Process Function"
weight: 4
type: docs
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Process Function

## ProcessFunction

The `ProcessFunction` is a low-level stream processing operation, giving access to the basic building blocks of
all (acyclic) streaming applications:

- events (stream elements)
- state (fault-tolerant, consistent, only on keyed stream)
- timers (event time and processing time, only on keyed stream)

The `ProcessFunction` can be thought of as a `FlatMapFunction` with access to keyed state and timers. It handles events
by being invoked for each event received in the input stream(s).

Please refer to [Process Function]({{< ref "docs/dev/datastream/operators/process_function" >}})
for more details about the concept and usage of `ProcessFunction`.

## Execution behavior of timer

Python user-defined functions are executed in a separate Python process from Flink's operators which run in a JVM,
the timer registration requests made in `ProcessFunction` will be sent to the Java operator asynchronously.
Once received timer registration requests, the Java operator will register it into the underlying timer service.

If the registered timer has already passed the current time (the current system time for processing time timer,
or the current watermark for event time), it will be triggered immediately.

Note that, due to the asynchronous processing characteristics, it may happen that the timer was triggered a little later than the actual time.
For example, a registered processing time timer of `10:00:00` may be actually processed at `10:00:05`.
2 changes: 1 addition & 1 deletion docs/content/docs/dev/datastream/operators/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ keyedStream.reduce { _ + _ }
{{< /tab >}}
{{< tab "Python" >}}
```python
data_stream = env.from_collection(collection=[(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b')], type_info=Types.ROW([Types.INT(), Types.STRING()]))
data_stream = env.from_collection(collection=[(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b')], type_info=Types.TUPLE([Types.INT(), Types.STRING()]))
data_stream.key_by(lambda x: x[1]).reduce(lambda a, b: (a[0] + b[0], b[1]))
```
{{< /tab >}}
Expand Down
Loading