Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,14 @@ public Optional<Integer> getParallelism() {
return config.getOptional(SINK_PARALLELISM);
}

public String getIndexSuffixFieldName() {
return config.get(ElasticsearchConnectorOptions.INDEX_SUFFIX_FIELD_NAME_OPTION);
}

public int getIndexSuffixFieldLength() {
return config.get(ElasticsearchConnectorOptions.INDEX_SUFFIX_FIELD_LENGTH_OPTION);
}

/**
* Parse Hosts String to list.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,16 @@ public class ElasticsearchConnectorOptions {
.enumType(DeliveryGuarantee.class)
.defaultValue(DeliveryGuarantee.AT_LEAST_ONCE)
.withDescription("Optional delivery guarantee when committing.");

public static final ConfigOption<String> INDEX_SUFFIX_FIELD_NAME_OPTION =
ConfigOptions.key("index.suffix.field.name")
.stringType()
.noDefaultValue()
.withDescription("The index suffix field name");

public static final ConfigOption<Integer> INDEX_SUFFIX_FIELD_LENGTH_OPTION =
ConfigOptions.key("index.suffix.field.length")
.intType()
.defaultValue(-1)
.withDescription("The length(exclusive) of index suffix field value");
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ Function<RowData, String> createKeyExtractor() {

IndexGenerator createIndexGenerator() {
return IndexGeneratorFactory.createIndexGenerator(
config.getIndex(),
config,
DataType.getFieldNames(physicalRowDataType),
DataType.getFieldDataTypes(physicalRowDataType),
localTimeZoneId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.FORMAT_OPTION;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_SUFFIX_FIELD_LENGTH_OPTION;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_SUFFIX_FIELD_NAME_OPTION;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.SOCKET_TIMEOUT;
Expand Down Expand Up @@ -286,7 +288,9 @@ public Set<ConfigOption<?>> optionalOptions() {
PARTIAL_CACHE_EXPIRE_AFTER_WRITE,
PARTIAL_CACHE_MAX_ROWS,
PARTIAL_CACHE_CACHE_MISSING_KEY,
MAX_RETRIES)
MAX_RETRIES,
INDEX_SUFFIX_FIELD_LENGTH_OPTION,
INDEX_SUFFIX_FIELD_NAME_OPTION)
.collect(Collectors.toSet());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,56 @@ public static IndexGenerator createIndexGenerator(
return createIndexGenerator(index, fieldNames, dataTypes, ZoneId.systemDefault());
}

public static IndexGenerator createIndexGenerator(
ElasticsearchConfiguration config,
List<String> fieldNames,
List<DataType> dataTypes,
ZoneId localTimeZoneId) {
if (config.getIndexSuffixFieldName() != null) {
return createSuffixIndexGenerator(
config.getIndex(),
config.getIndexSuffixFieldName(),
config.getIndexSuffixFieldLength(),
fieldNames,
dataTypes);
} else {
return createIndexGenerator(config.getIndex(), fieldNames, dataTypes, localTimeZoneId);
}
}

private static IndexGenerator createSuffixIndexGenerator(
String indexPrefix,
String indexSuffixFieldName,
int indexSuffixFieldLength,
List<String> fieldNames,
List<DataType> fieldTypes) {
int indexFieldPos = fieldNames.indexOf(indexSuffixFieldName);
if (indexFieldPos < 0) {
throw new TableException(
String.format(
"Unknown index field '%s' of '%s', please check the field name.",
indexSuffixFieldName, String.join(",", fieldNames)));
}
final LogicalType indexFieldType = fieldTypes.get(indexFieldPos).getLogicalType();
final RowData.FieldGetter fieldGetter =
RowData.createFieldGetter(indexFieldType, indexFieldPos);
return row -> {
Object fieldOrNull = fieldGetter.getFieldOrNull(row);
final String indexSuffix;
if (fieldOrNull != null) {
if (indexSuffixFieldLength > 0) {
indexSuffix = String.valueOf(fieldOrNull).substring(0, indexSuffixFieldLength);
} else {
indexSuffix = String.valueOf(fieldOrNull);
}
} else {
throw new RuntimeException(
"Index suffix field " + indexSuffixFieldName + " is null");
}
return String.format("%s%s", indexPrefix, indexSuffix);
};
}

interface DynamicFormatter extends Serializable {
String format(@Nonnull Object fieldValue, DateTimeFormatter formatter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.connector.elasticsearch.table;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.GenericRowData;
Expand All @@ -39,6 +40,9 @@
import java.util.Arrays;
import java.util.List;

import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_SUFFIX_FIELD_LENGTH_OPTION;
import static org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_SUFFIX_FIELD_NAME_OPTION;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static org.junit.jupiter.api.Assumptions.assumingThat;
Expand Down Expand Up @@ -367,4 +371,69 @@ public void testUnsupportedIndexFieldType() {
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(expectedExceptionMsg);
}

@Test
public void testSuffixIndexGenerator() {
Configuration config = new Configuration();
config.set(INDEX_OPTION, "index_");
config.set(INDEX_SUFFIX_FIELD_NAME_OPTION, "log_ts");
config.set(INDEX_SUFFIX_FIELD_LENGTH_OPTION, 10);
IndexGenerator indexGenerator =
IndexGeneratorFactory.createIndexGenerator(
new ElasticsearchConfiguration(config),
fieldNames,
dataTypes,
ZoneId.systemDefault());
// 1584504734000
assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("index_1584504734");
// 1584591734000
assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("index_1584591734");
}

@Test
public void testSuffixIndexGeneratorWithoutLengthLimitation() {
Configuration config = new Configuration();
config.set(INDEX_OPTION, "index_");
config.set(INDEX_SUFFIX_FIELD_NAME_OPTION, "log_ts");
IndexGenerator indexGenerator =
IndexGeneratorFactory.createIndexGenerator(
new ElasticsearchConfiguration(config),
fieldNames,
dataTypes,
ZoneId.systemDefault());
// 1584504734000
assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("index_1584504734000");
// 1584591734000
assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("index_1584591734000");
}

@Test
public void testStaticIndexCompatibility() {
Configuration config = new Configuration();
config.set(INDEX_OPTION, "my-index");
IndexGenerator indexGenerator =
IndexGeneratorFactory.createIndexGenerator(
new ElasticsearchConfiguration(config),
fieldNames,
dataTypes,
ZoneId.systemDefault());
indexGenerator.open();
assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("my-index");
assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("my-index");
}

@Test
public void testDynamicIndexFromDateCompatibility() {
Configuration config = new Configuration();
config.set(INDEX_OPTION, "my-index-{log_date|yyyy/MM/dd}");
IndexGenerator indexGenerator =
IndexGeneratorFactory.createIndexGenerator(
new ElasticsearchConfiguration(config),
fieldNames,
dataTypes,
ZoneId.systemDefault());
indexGenerator.open();
assertThat(indexGenerator.generate(rows.get(0))).isEqualTo("my-index-2020/03/18");
assertThat(indexGenerator.generate(rows.get(1))).isEqualTo("my-index-2020/03/19");
}
}