Skip to content

Commit 0f8adc0

Browse files
committed
[FLINK-20720][python][docs] Add documentation for ProcessFunction in Python DataStream API
This closes #15733.
1 parent a886497 commit 0f8adc0

4 files changed

Lines changed: 342 additions & 0 deletions

File tree

docs/content.zh/docs/dev/datastream/operators/process_function.md

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,94 @@ class CountWithTimeoutFunction extends KeyedProcessFunction[Tuple, (String, Stri
246246
}
247247
```
248248
{{< /tab >}}
249+
{{< tab "Python" >}}
250+
```python
251+
import datetime
252+
253+
from pyflink.common import Row, WatermarkStrategy
254+
from pyflink.common.typeinfo import Types
255+
from pyflink.common.watermark_strategy import TimestampAssigner
256+
from pyflink.datastream import StreamExecutionEnvironment
257+
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
258+
from pyflink.datastream.state import ValueStateDescriptor
259+
from pyflink.table import StreamTableEnvironment
260+
261+
262+
class CountWithTimeoutFunction(KeyedProcessFunction):
263+
264+
def __init__(self):
265+
self.state = None
266+
267+
def open(self, runtime_context: RuntimeContext):
268+
self.state = runtime_context.get_state(ValueStateDescriptor(
269+
"my_state", Types.ROW([Types.STRING(), Types.LONG(), Types.LONG()])))
270+
271+
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
272+
# retrieve the current count
273+
current = self.state.value()
274+
if current is None:
275+
current = Row(value.f1, 0, 0)
276+
277+
# update the state's count
278+
current[1] += 1
279+
280+
# set the state's timestamp to the record's assigned event time timestamp
281+
current[2] = ctx.timestamp()
282+
283+
# write the state back
284+
self.state.update(current)
285+
286+
# schedule the next timer 60 seconds from the current event time
287+
ctx.timer_service().register_event_time_timer(current[2] + 60000)
288+
289+
def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
290+
# get the state for the key that scheduled the timer
291+
result = self.state.value()
292+
293+
# check if this is an outdated timer or the latest timer
294+
if timestamp == result[2] + 60000:
295+
# emit the state on timeout
296+
yield result[0], result[1]
297+
298+
299+
class MyTimestampAssigner(TimestampAssigner):
300+
301+
def __init__(self):
302+
self.epoch = datetime.datetime.utcfromtimestamp(0)
303+
304+
def extract_timestamp(self, value, record_timestamp) -> int:
305+
return int((value[0] - self.epoch).total_seconds() * 1000)
306+
307+
308+
if __name__ == '__main__':
309+
env = StreamExecutionEnvironment.get_execution_environment()
310+
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
311+
312+
t_env.execute_sql("""
313+
CREATE TABLE my_source (
314+
a TIMESTAMP(3),
315+
b VARCHAR,
316+
c VARCHAR
317+
) WITH (
318+
'connector' = 'datagen',
319+
'rows-per-second' = '10'
320+
)
321+
""")
322+
323+
stream = t_env.to_append_stream(
324+
t_env.from_path('my_source'),
325+
Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()]))
326+
watermarked_stream = stream.assign_timestamps_and_watermarks(
327+
WatermarkStrategy.for_monotonous_timestamps()
328+
.with_timestamp_assigner(MyTimestampAssigner()))
329+
330+
# apply the process function onto a keyed stream
331+
result = watermarked_stream.key_by(lambda value: value[1]) \
332+
.process(CountWithTimeoutFunction()) \
333+
.print()
334+
env.execute()
335+
```
336+
{{< /tab >}}
249337
{{< /tabs >}}
250338

251339

@@ -281,6 +369,13 @@ override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]):
281369
}
282370
```
283371
{{< /tab >}}
372+
{{< tab "Python" >}}
373+
```python
374+
def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
375+
key = ctx.get_current_key()
376+
# ...
377+
```
378+
{{< /tab >}}
284379
{{< /tabs >}}
285380

286381
## Timers
@@ -327,6 +422,12 @@ val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 1000
327422
ctx.timerService.registerProcessingTimeTimer(coalescedTime)
328423
```
329424
{{< /tab >}}
425+
{{< tab "Python" >}}
426+
```python
427+
coalesced_time = ((ctx.timestamp() + timeout) // 1000) * 1000
428+
ctx.timer_service().register_processing_time_timer(coalesced_time)
429+
```
430+
{{< /tab >}}
330431
{{< /tabs >}}
331432

332433
Since event-time timers only fire with watermarks coming in, you may also schedule and coalesce
@@ -345,6 +446,12 @@ val coalescedTime = ctx.timerService.currentWatermark + 1
345446
ctx.timerService.registerEventTimeTimer(coalescedTime)
346447
```
347448
{{< /tab >}}
449+
{{< tab "Python" >}}
450+
```python
451+
coalesced_time = ctx.timer_service().current_watermark() + 1
452+
ctx.timer_service().register_event_time_timer(coalesced_time)
453+
```
454+
{{< /tab >}}
348455
{{< /tabs >}}
349456

350457
Timers can also be stopped and removed as follows:
@@ -364,6 +471,12 @@ val timestampOfTimerToStop = ...
364471
ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)
365472
```
366473
{{< /tab >}}
474+
{{< tab "Python" >}}
475+
```python
476+
timestamp_of_timer_to_stop = ...
477+
ctx.timer_service().delete_processing_time_timer(timestamp_of_timer_to_stop)
478+
```
479+
{{< /tab >}}
367480
{{< /tabs >}}
368481

369482
Stopping an event-time timer:
@@ -381,6 +494,12 @@ val timestampOfTimerToStop = ...
381494
ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)
382495
```
383496
{{< /tab >}}
497+
{{< tab "Python" >}}
498+
```python
499+
timestamp_of_timer_to_stop = ...
500+
ctx.timer_service().delete_event_time_timer(timestamp_of_timer_to_stop)
501+
```
502+
{{< /tab >}}
384503
{{< /tabs >}}
385504

386505
{{< hint info >}}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
---
2+
title: "Process Function"
3+
weight: 4
4+
type: docs
5+
---
6+
<!--
7+
Licensed to the Apache Software Foundation (ASF) under one
8+
or more contributor license agreements. See the NOTICE file
9+
distributed with this work for additional information
10+
regarding copyright ownership. The ASF licenses this file
11+
to you under the Apache License, Version 2.0 (the
12+
"License"); you may not use this file except in compliance
13+
with the License. You may obtain a copy of the License at
14+
15+
http://www.apache.org/licenses/LICENSE-2.0
16+
17+
Unless required by applicable law or agreed to in writing,
18+
software distributed under the License is distributed on an
19+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
20+
KIND, either express or implied. See the License for the
21+
specific language governing permissions and limitations
22+
under the License.
23+
-->
24+
25+
# Process Function
26+
27+
## ProcessFunction
28+
29+
The `ProcessFunction` is a low-level stream processing operation, giving access to the basic building blocks of
30+
all (acyclic) streaming applications:
31+
32+
- events (stream elements)
33+
- state (fault-tolerant, consistent, only on keyed stream)
34+
- timers (event time and processing time, only on keyed stream)
35+
36+
The `ProcessFunction` can be thought of as a `FlatMapFunction` with access to keyed state and timers. It handles events
37+
by being invoked for each event received in the input stream(s).
38+
39+
Please refer to [Process Function]({{< ref "docs/dev/datastream/operators/process_function" >}})
40+
for more details about the concept and usage of `ProcessFunction`.
41+
42+
## Execution behavior of timer
43+
44+
Python user-defined functions are executed in a separate Python process from Flink's operators which run in a JVM,
45+
the timer registration requests made in `ProcessFunction` will be sent to the Java operator asynchronously.
46+
Once received timer registration requests, the Java operator will register it into the underlying timer service.
47+
48+
If the registered timer has already passed the current time (the current system time for processing time timer,
49+
or the current watermark for event time), it will be triggered immediately.
50+
51+
Note that, due to the asynchronous processing characteristics, it may happen that the timer was triggered a little later than the actual time.
52+
For example, a registered processing time timer of `10:00:00` may be actually processed at `10:00:05`.

docs/content/docs/dev/datastream/operators/process_function.md

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,94 @@ class CountWithTimeoutFunction extends KeyedProcessFunction[Tuple, (String, Stri
246246
}
247247
```
248248
{{< /tab >}}
249+
{{< tab "Python" >}}
250+
```python
251+
import datetime
252+
253+
from pyflink.common import Row, WatermarkStrategy
254+
from pyflink.common.typeinfo import Types
255+
from pyflink.common.watermark_strategy import TimestampAssigner
256+
from pyflink.datastream import StreamExecutionEnvironment
257+
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
258+
from pyflink.datastream.state import ValueStateDescriptor
259+
from pyflink.table import StreamTableEnvironment
260+
261+
262+
class CountWithTimeoutFunction(KeyedProcessFunction):
263+
264+
def __init__(self):
265+
self.state = None
266+
267+
def open(self, runtime_context: RuntimeContext):
268+
self.state = runtime_context.get_state(ValueStateDescriptor(
269+
"my_state", Types.ROW([Types.STRING(), Types.LONG(), Types.LONG()])))
270+
271+
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
272+
# retrieve the current count
273+
current = self.state.value()
274+
if current is None:
275+
current = Row(value.f1, 0, 0)
276+
277+
# update the state's count
278+
current[1] += 1
279+
280+
# set the state's timestamp to the record's assigned event time timestamp
281+
current[2] = ctx.timestamp()
282+
283+
# write the state back
284+
self.state.update(current)
285+
286+
# schedule the next timer 60 seconds from the current event time
287+
ctx.timer_service().register_event_time_timer(current[2] + 60000)
288+
289+
def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
290+
# get the state for the key that scheduled the timer
291+
result = self.state.value()
292+
293+
# check if this is an outdated timer or the latest timer
294+
if timestamp == result[2] + 60000:
295+
# emit the state on timeout
296+
yield result[0], result[1]
297+
298+
299+
class MyTimestampAssigner(TimestampAssigner):
300+
301+
def __init__(self):
302+
self.epoch = datetime.datetime.utcfromtimestamp(0)
303+
304+
def extract_timestamp(self, value, record_timestamp) -> int:
305+
return int((value[0] - self.epoch).total_seconds() * 1000)
306+
307+
308+
if __name__ == '__main__':
309+
env = StreamExecutionEnvironment.get_execution_environment()
310+
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
311+
312+
t_env.execute_sql("""
313+
CREATE TABLE my_source (
314+
a TIMESTAMP(3),
315+
b VARCHAR,
316+
c VARCHAR
317+
) WITH (
318+
'connector' = 'datagen',
319+
'rows-per-second' = '10'
320+
)
321+
""")
322+
323+
stream = t_env.to_append_stream(
324+
t_env.from_path('my_source'),
325+
Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()]))
326+
watermarked_stream = stream.assign_timestamps_and_watermarks(
327+
WatermarkStrategy.for_monotonous_timestamps()
328+
.with_timestamp_assigner(MyTimestampAssigner()))
329+
330+
# apply the process function onto a keyed stream
331+
result = watermarked_stream.key_by(lambda value: value[1]) \
332+
.process(CountWithTimeoutFunction()) \
333+
.print()
334+
env.execute()
335+
```
336+
{{< /tab >}}
249337
{{< /tabs >}}
250338

251339

@@ -281,6 +369,13 @@ override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]):
281369
}
282370
```
283371
{{< /tab >}}
372+
{{< tab "Python" >}}
373+
```python
374+
def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
375+
key = ctx.get_current_key()
376+
# ...
377+
```
378+
{{< /tab >}}
284379
{{< /tabs >}}
285380

286381
## Timers
@@ -327,6 +422,12 @@ val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 1000
327422
ctx.timerService.registerProcessingTimeTimer(coalescedTime)
328423
```
329424
{{< /tab >}}
425+
{{< tab "Python" >}}
426+
```python
427+
coalesced_time = ((ctx.timestamp() + timeout) // 1000) * 1000
428+
ctx.timer_service().register_processing_time_timer(coalesced_time)
429+
```
430+
{{< /tab >}}
330431
{{< /tabs >}}
331432

332433
Since event-time timers only fire with watermarks coming in, you may also schedule and coalesce
@@ -345,6 +446,12 @@ val coalescedTime = ctx.timerService.currentWatermark + 1
345446
ctx.timerService.registerEventTimeTimer(coalescedTime)
346447
```
347448
{{< /tab >}}
449+
{{< tab "Python" >}}
450+
```python
451+
coalesced_time = ctx.timer_service().current_watermark() + 1
452+
ctx.timer_service().register_event_time_timer(coalesced_time)
453+
```
454+
{{< /tab >}}
348455
{{< /tabs >}}
349456

350457
Timers can also be stopped and removed as follows:
@@ -364,6 +471,12 @@ val timestampOfTimerToStop = ...
364471
ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)
365472
```
366473
{{< /tab >}}
474+
{{< tab "Python" >}}
475+
```python
476+
timestamp_of_timer_to_stop = ...
477+
ctx.timer_service().delete_processing_time_timer(timestamp_of_timer_to_stop)
478+
```
479+
{{< /tab >}}
367480
{{< /tabs >}}
368481

369482
Stopping an event-time timer:
@@ -381,6 +494,12 @@ val timestampOfTimerToStop = ...
381494
ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)
382495
```
383496
{{< /tab >}}
497+
{{< tab "Python" >}}
498+
```python
499+
timestamp_of_timer_to_stop = ...
500+
ctx.timer_service().delete_event_time_timer(timestamp_of_timer_to_stop)
501+
```
502+
{{< /tab >}}
384503
{{< /tabs >}}
385504

386505
{{< hint info >}}

0 commit comments

Comments
 (0)