Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 48 additions & 2 deletions docs/content.zh/docs/connectors/table/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,12 @@ To define when to commit a partition, providing partition commit trigger:
<td>Duration</td>
<td>The partition will not commit until the delay time. If it is a daily partition, should be '1 d', if it is a hourly partition, should be '1 h'.</td>
</tr>
<tr>
<td><h5>sink.partition-commit.watermark-time-zone</h5></td>
<td style="word-wrap: break-word;">UTC</td>
<td>String</td>
<td>The time zone to parse the long watermark value to TIMESTAMP value, the parsed watermark timestamp is used to compare with partition time to decide the partition should commit or not. This option is only take effect when `sink.partition-commit.trigger` is set to 'partition-time'. If this option is not configured correctly, e.g. source rowtime is defined on TIMESTAMP_LTZ column, but this config is not configured, then users may see the partition committed after a few hours. The default value is 'UTC', which means the watermark is defined on TIMESTAMP column or not defined. If the watermark is defined on TIMESTAMP_LTZ column, the time zone of watermark is the session time zone. The option value is either a full name such as 'America/Los_Angeles', or a custom timezone id such as 'GMT-8:00'.</td>
</tr>
</tbody>
</table>

Expand Down Expand Up @@ -401,15 +407,15 @@ The parallelism of writing files into external file system (including Hive) can

## Full Example

The below shows how the file system connector can be used to write a streaming query to write data from Kafka into a file system and runs a batch query to read that data back out.
The below examples show how the file system connector can be used to write a streaming query to write data from Kafka into a file system and runs a batch query to read that data back out.

```sql

CREATE TABLE kafka_table (
user_id STRING,
order_amount DOUBLE,
log_ts TIMESTAMP(3),
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP column
) WITH (...);

CREATE TABLE fs_table (
Expand Down Expand Up @@ -438,4 +444,44 @@ FROM kafka_table;
SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
```

If the watermark is defined on TIMESTAMP_LTZ column and used `partition-time` to commit, the `sink.partition-commit.watermark-time-zone` is required to set to the session time zone, otherwise the partition committed may happen after a few hours.
```sql

CREATE TABLE kafka_table (
user_id STRING,
order_amount DOUBLE,
ts BIGINT, -- time in epoch milliseconds
ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP_LTZ column
) WITH (...);

CREATE TABLE fs_table (
user_id STRING,
order_amount DOUBLE,
dt STRING,
`hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
'connector'='filesystem',
'path'='...',
'format'='parquet',
'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- Assume user configured time zone is 'Asia/Shanghai'
'sink.partition-commit.policy.kind'='success-file'
);

-- streaming sql, insert into file system table
INSERT INTO fs_table
SELECT
user_id,
order_amount,
DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'),
DATE_FORMAT(ts_ltz, 'HH')
FROM kafka_table;

-- batch sql, select with partition pruning
SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
```

{{< top >}}
37 changes: 35 additions & 2 deletions docs/content.zh/docs/connectors/table/hive/hive_read_write.md
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1') SELECT 'Tom
visible - incrementally. Users control when/how to trigger commits with several properties. Insert
overwrite is not supported for streaming write.

The below shows how the streaming sink can be used to write a streaming query to write data from Kafka into a Hive table with partition-commit,
The below examples show how the streaming sink can be used to write a streaming query to write data from Kafka into a Hive table with partition-commit,
and runs a batch query to read that data back out.

Please see the [streaming sink]({{< ref "docs/connectors/table/filesystem" >}}#streaming-sink) for a full list of available configurations.
Expand All @@ -369,7 +369,7 @@ CREATE TABLE kafka_table (
user_id STRING,
order_amount DOUBLE,
log_ts TIMESTAMP(3),
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP column
) WITH (...);

-- streaming sql, insert into hive table
Expand All @@ -382,6 +382,39 @@ SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';

```

If the watermark is defined on TIMESTAMP_LTZ column and used `partition-time` to commit, the `sink.partition-commit.watermark-time-zone` is required to set to the session time zone, otherwise the partition committed may happen after a few hours.
```sql

SET table.sql-dialect=hive;
CREATE TABLE hive_table (
user_id STRING,
order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- Assume user configured time zone is 'Asia/Shanghai'
'sink.partition-commit.policy.kind'='metastore,success-file'
);

SET table.sql-dialect=default;
CREATE TABLE kafka_table (
user_id STRING,
order_amount DOUBLE,
ts BIGINT, -- time in epoch milliseconds
ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP_LTZ column
) WITH (...);

-- streaming sql, insert into hive table
INSERT INTO TABLE hive_table
SELECT user_id, order_amount, DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'), DATE_FORMAT(ts_ltz, 'HH')
FROM kafka_table;

-- batch sql, select with partition pruning
SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';

```

By default, for streaming writes, Flink only supports renaming committers, meaning the S3 filesystem
cannot support exactly-once streaming writes.
Expand Down
48 changes: 47 additions & 1 deletion docs/content/docs/connectors/table/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,12 @@ To define when to commit a partition, providing partition commit trigger:
<td>Duration</td>
<td>The partition will not commit until the delay time. If it is a daily partition, should be '1 d', if it is a hourly partition, should be '1 h'.</td>
</tr>
<tr>
<td><h5>sink.partition-commit.watermark-time-zone</h5></td>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call it time-zone-of-partition-time? I find it's difficult to understand why watermark has time zone semantic and how to infer it. If we call it time zone of partition time, then the inference can be simply forward rowtime time zone, e.g. if rowtime is NTZ, then it should be UTC; if rowtime is LTZ, then it should be session time zone.

Besides, we should also remind users

  1. this option is only take effect when sink.partition-commit.trigger is set to partition-time.
  2. If this option is not configured correctly, e.g. source rowtime is LTZ, but this config is not configured, then users may see the partition committed after a few hours.  

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The partition-time is a string with TIMESTAMP meaning, and I think it's not correct to say the partition-time has time zone. And the watermark is a long value, it can be explained as UTC timestamp mills or Instant mills, I tend to use current option name

<td style="word-wrap: break-word;">UTC</td>
<td>String</td>
<td>The time zone to parse the long watermark value to TIMESTAMP value, the parsed watermark timestamp is used to compare with partition time to decide the partition should commit or not. This option is only take effect when `sink.partition-commit.trigger` is set to 'partition-time'. If this option is not configured correctly, e.g. source rowtime is defined on TIMESTAMP_LTZ column, but this config is not configured, then users may see the partition committed after a few hours. The default value is 'UTC', which means the watermark is defined on TIMESTAMP column or not defined. If the watermark is defined on TIMESTAMP_LTZ column, the time zone of watermark is the session time zone. The option value is either a full name such as 'America/Los_Angeles', or a custom timezone id such as 'GMT-8:00'.</td>
</tr>
</tbody>
</table>

Expand Down Expand Up @@ -401,7 +407,7 @@ The parallelism of writing files into external file system (including Hive) can

## Full Example

The below shows how the file system connector can be used to write a streaming query to write data from Kafka into a file system and runs a batch query to read that data back out.
The below examples show how the file system connector can be used to write a streaming query to write data from Kafka into a file system and runs a batch query to read that data back out.

```sql

Expand Down Expand Up @@ -438,4 +444,44 @@ FROM kafka_table;
SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
```

If the watermark is defined on TIMESTAMP_LTZ column and used `partition-time` to commit, the `sink.partition-commit.watermark-time-zone` is required to set to the session time zone, otherwise the partition committed may happen after a few hours.
```sql

CREATE TABLE kafka_table (
user_id STRING,
order_amount DOUBLE,
ts BIGINT, -- time in epoch milliseconds
ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP_LTZ column
) WITH (...);

CREATE TABLE fs_table (
user_id STRING,
order_amount DOUBLE,
dt STRING,
`hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
'connector'='filesystem',
'path'='...',
'format'='parquet',
'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- Assume user configured time zone is 'Asia/Shanghai'
'sink.partition-commit.policy.kind'='success-file'
);

-- streaming sql, insert into file system table
INSERT INTO fs_table
SELECT
user_id,
order_amount,
DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'),
DATE_FORMAT(ts_ltz, 'HH')
FROM kafka_table;

-- batch sql, select with partition pruning
SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
```

{{< top >}}
37 changes: 35 additions & 2 deletions docs/content/docs/connectors/table/hive/hive_read_write.md
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1') SELECT 'Tom
visible - incrementally. Users control when/how to trigger commits with several properties. Insert
overwrite is not supported for streaming write.

The below shows how the streaming sink can be used to write a streaming query to write data from Kafka into a Hive table with partition-commit,
The below examples show how the streaming sink can be used to write a streaming query to write data from Kafka into a Hive table with partition-commit,
and runs a batch query to read that data back out.

Please see the [streaming sink]({{< ref "docs/connectors/table/filesystem" >}}#streaming-sink) for a full list of available configurations.
Expand All @@ -369,7 +369,7 @@ CREATE TABLE kafka_table (
user_id STRING,
order_amount DOUBLE,
log_ts TIMESTAMP(3),
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP column
) WITH (...);

-- streaming sql, insert into hive table
Expand All @@ -382,6 +382,39 @@ SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';

```

If the watermark is defined on TIMESTAMP_LTZ column and used `partition-time` to commit, the `sink.partition-commit.watermark-time-zone` is required to set to the session time zone, otherwise the partition committed may happen after a few hours.
```sql

SET table.sql-dialect=hive;
CREATE TABLE hive_table (
user_id STRING,
order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- Assume user configured time zone is 'Asia/Shanghai'
'sink.partition-commit.policy.kind'='metastore,success-file'
);

SET table.sql-dialect=default;
CREATE TABLE kafka_table (
user_id STRING,
order_amount DOUBLE,
ts BIGINT, -- time in epoch milliseconds
ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP_LTZ column
) WITH (...);

-- streaming sql, insert into hive table
INSERT INTO TABLE hive_table
SELECT user_id, order_amount, DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'), DATE_FORMAT(ts_ltz, 'HH')
FROM kafka_table;

-- batch sql, select with partition pruning
SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';

```

By default, for streaming writes, Flink only supports renaming committers, meaning the S3 filesystem
cannot support exactly-once streaming writes.
Expand Down
Loading