Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.factories;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

import static org.apache.flink.configuration.ConfigOptions.key;

/** {@link ConfigOption}s for {@link DataGenTableSourceFactory}. */
@Internal
public class DataGenOptions {

public static final Long ROWS_PER_SECOND_DEFAULT_VALUE = 10000L;

public static final String FIELDS = "fields";
public static final String KIND = "kind";
public static final String START = "start";
public static final String END = "end";
public static final String MIN = "min";
public static final String MAX = "max";
public static final String LENGTH = "length";

public static final String SEQUENCE = "sequence";
public static final String RANDOM = "random";

public static final ConfigOption<Long> ROWS_PER_SECOND =
key("rows-per-second")
.longType()
.defaultValue(ROWS_PER_SECOND_DEFAULT_VALUE)
.withDescription("Rows per second to control the emit rate.");

public static final ConfigOption<Long> NUMBER_OF_ROWS =
key("number-of-rows")
.longType()
.noDefaultValue()
.withDescription(
"Total number of rows to emit. By default, the source is unbounded.");

/** Placeholder {@link ConfigOption}. Not used for retrieving values. */
public static final ConfigOption<String> FIELD_KIND =
ConfigOptions.key(String.format("%s.#.%s", FIELDS, KIND))
.stringType()
.defaultValue("random")
.withDescription("Generator of this '#' field. Can be 'sequence' or 'random'.");

/** Placeholder {@link ConfigOption}. Not used for retrieving values. */
public static final ConfigOption<String> FIELD_MIN =
ConfigOptions.key(String.format("%s.#.%s", FIELDS, MIN))
.stringType()
.noDefaultValue()
.withDescription(
"Minimum value to generate for fields of kind 'random'. Minimum value possible for the type of the field.");

/** Placeholder {@link ConfigOption}. Not used for retrieving values. */
public static final ConfigOption<String> FIELD_MAX =
ConfigOptions.key(String.format("%s.#.%s", FIELDS, MAX))
.stringType()
.noDefaultValue()
.withDescription(
"Maximum value to generate for fields of kind 'random'. Maximum value possible for the type of the field.");

/** Placeholder {@link ConfigOption}. Not used for retrieving values. */
public static final ConfigOption<Integer> FIELD_LENGTH =
ConfigOptions.key(String.format("%s.#.%s", FIELDS, LENGTH))
.intType()
.defaultValue(100)
.withDescription(
"Size or length of the collection for generating char/varchar/string/array/map/multiset types.");

/** Placeholder {@link ConfigOption}. Not used for retrieving values. */
public static final ConfigOption<String> FIELD_START =
ConfigOptions.key(String.format("%s.#.%s", FIELDS, START))
.stringType()
.noDefaultValue()
.withDescription("Start value of sequence generator.");

/** Placeholder {@link ConfigOption}. Not used for retrieving values. */
public static final ConfigOption<String> FIELD_END =
ConfigOptions.key(String.format("%s.#.%s", FIELDS, END))
.stringType()
.noDefaultValue()
.withDescription("End value of sequence generator.");

private DataGenOptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,31 +46,6 @@
public class DataGenTableSourceFactory implements DynamicTableSourceFactory {

public static final String IDENTIFIER = "datagen";
public static final Long ROWS_PER_SECOND_DEFAULT_VALUE = 10000L;

public static final ConfigOption<Long> ROWS_PER_SECOND =
key("rows-per-second")
.longType()
.defaultValue(ROWS_PER_SECOND_DEFAULT_VALUE)
.withDescription("Rows per second to control the emit rate.");

public static final ConfigOption<Long> NUMBER_OF_ROWS =
key("number-of-rows")
.longType()
.noDefaultValue()
.withDescription(
"Total number of rows to emit. By default, the source is unbounded.");

public static final String FIELDS = "fields";
public static final String KIND = "kind";
public static final String START = "start";
public static final String END = "end";
public static final String MIN = "min";
public static final String MAX = "max";
public static final String LENGTH = "length";

public static final String SEQUENCE = "sequence";
public static final String RANDOM = "random";

@Override
public String factoryIdentifier() {
Expand All @@ -85,8 +60,17 @@ public Set<ConfigOption<?>> requiredOptions() {
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(ROWS_PER_SECOND);
options.add(NUMBER_OF_ROWS);
options.add(DataGenOptions.ROWS_PER_SECOND);
options.add(DataGenOptions.NUMBER_OF_ROWS);

// Placeholder options
options.add(DataGenOptions.FIELD_KIND);
options.add(DataGenOptions.FIELD_MIN);
options.add(DataGenOptions.FIELD_MAX);
options.add(DataGenOptions.FIELD_LENGTH);
options.add(DataGenOptions.FIELD_START);
options.add(DataGenOptions.FIELD_END);

return options;
}

Expand All @@ -105,7 +89,9 @@ public DynamicTableSource createDynamicTableSource(Context context) {
DataType type = schema.getFieldDataTypes()[i];

ConfigOption<String> kind =
key(FIELDS + "." + name + "." + KIND).stringType().defaultValue(RANDOM);
key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.KIND)
.stringType()
.defaultValue(DataGenOptions.RANDOM);
DataGeneratorContainer container =
createContainer(name, type, options.get(kind), options);
fieldGenerators[i] = container.getGenerator();
Expand All @@ -118,8 +104,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {

Set<String> consumedOptionKeys = new HashSet<>();
consumedOptionKeys.add(CONNECTOR.key());
consumedOptionKeys.add(ROWS_PER_SECOND.key());
consumedOptionKeys.add(NUMBER_OF_ROWS.key());
consumedOptionKeys.add(DataGenOptions.ROWS_PER_SECOND.key());
consumedOptionKeys.add(DataGenOptions.NUMBER_OF_ROWS.key());
optionalOptions.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add);
FactoryUtil.validateUnconsumedKeys(
factoryIdentifier(), options.keySet(), consumedOptionKeys);
Expand All @@ -129,16 +115,16 @@ public DynamicTableSource createDynamicTableSource(Context context) {
fieldGenerators,
name,
schema,
options.get(ROWS_PER_SECOND),
options.get(NUMBER_OF_ROWS));
options.get(DataGenOptions.ROWS_PER_SECOND),
options.get(DataGenOptions.NUMBER_OF_ROWS));
}

private DataGeneratorContainer createContainer(
String name, DataType type, String kind, ReadableConfig options) {
switch (kind) {
case RANDOM:
case DataGenOptions.RANDOM:
return type.getLogicalType().accept(new RandomGeneratorVisitor(name, options));
case SEQUENCE:
case DataGenOptions.SEQUENCE:
return type.getLogicalType().accept(new SequenceGeneratorVisitor(name, options));
default:
throw new ValidationException("Unsupported generator kind: " + kind);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.factories.DataGenOptions;
import org.apache.flink.table.factories.datagen.types.DataGeneratorMapper;
import org.apache.flink.table.factories.datagen.types.DecimalDataRandomGenerator;
import org.apache.flink.table.factories.datagen.types.RowDataGenerator;
Expand Down Expand Up @@ -55,10 +56,6 @@
import java.util.stream.Collectors;

import static org.apache.flink.configuration.ConfigOptions.key;
import static org.apache.flink.table.factories.DataGenTableSourceFactory.FIELDS;
import static org.apache.flink.table.factories.DataGenTableSourceFactory.LENGTH;
import static org.apache.flink.table.factories.DataGenTableSourceFactory.MAX;
import static org.apache.flink.table.factories.DataGenTableSourceFactory.MIN;

/** Creates a random {@link DataGeneratorContainer} for a particular logical type. */
@Internal
Expand All @@ -76,8 +73,8 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
public RandomGeneratorVisitor(String name, ReadableConfig config) {
super(name, config);

this.minKey = key(FIELDS + "." + name + "." + MIN);
this.maxKey = key(FIELDS + "." + name + "." + MAX);
this.minKey = key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.MIN);
this.maxKey = key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.MAX);
}

@Override
Expand All @@ -88,7 +85,7 @@ public DataGeneratorContainer visit(BooleanType booleanType) {
@Override
public DataGeneratorContainer visit(CharType booleanType) {
ConfigOption<Integer> lenOption =
key(FIELDS + "." + name + "." + LENGTH)
key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.LENGTH)
.intType()
.defaultValue(RANDOM_STRING_LENGTH_DEFAULT);
return DataGeneratorContainer.of(
Expand All @@ -98,7 +95,7 @@ public DataGeneratorContainer visit(CharType booleanType) {
@Override
public DataGeneratorContainer visit(VarCharType booleanType) {
ConfigOption<Integer> lenOption =
key(FIELDS + "." + name + "." + LENGTH)
key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.LENGTH)
.intType()
.defaultValue(RANDOM_STRING_LENGTH_DEFAULT);
return DataGeneratorContainer.of(
Expand Down Expand Up @@ -188,7 +185,7 @@ public DataGeneratorContainer visit(DayTimeIntervalType dayTimeIntervalType) {
@Override
public DataGeneratorContainer visit(ArrayType arrayType) {
ConfigOption<Integer> lenOption =
key(FIELDS + "." + name + "." + LENGTH)
key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.LENGTH)
.intType()
.defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT);

Expand All @@ -206,7 +203,7 @@ public DataGeneratorContainer visit(ArrayType arrayType) {
@Override
public DataGeneratorContainer visit(MultisetType multisetType) {
ConfigOption<Integer> lenOption =
key(FIELDS + "." + name + "." + LENGTH)
key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.LENGTH)
.intType()
.defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT);

Expand All @@ -228,7 +225,7 @@ public DataGeneratorContainer visit(MultisetType multisetType) {
@Override
public DataGeneratorContainer visit(MapType mapType) {
ConfigOption<Integer> lenOption =
key(FIELDS + "." + name + "." + LENGTH)
key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.LENGTH)
.intType()
.defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.factories.DataGenOptions;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.CharType;
Expand All @@ -38,9 +39,6 @@
import org.apache.flink.table.types.logical.VarCharType;

import static org.apache.flink.configuration.ConfigOptions.key;
import static org.apache.flink.table.factories.DataGenTableSourceFactory.END;
import static org.apache.flink.table.factories.DataGenTableSourceFactory.FIELDS;
import static org.apache.flink.table.factories.DataGenTableSourceFactory.START;

/** Creates a sequential {@link DataGeneratorContainer} for a particular logical type. */
@Internal
Expand All @@ -65,8 +63,8 @@ public SequenceGeneratorVisitor(String name, ReadableConfig config) {

this.config = config;

this.startKeyStr = FIELDS + "." + name + "." + START;
this.endKeyStr = FIELDS + "." + name + "." + END;
this.startKeyStr = DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.START;
this.endKeyStr = DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.END;

ConfigOptions.OptionBuilder startKey = key(startKeyStr);
ConfigOptions.OptionBuilder endKey = key(endKeyStr);
Expand Down
Loading