Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.InternalEventHandle;
import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException;
import org.opensearch.dataprepper.model.failures.DlqObject;
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
Expand Down Expand Up @@ -77,6 +79,8 @@
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplateAPIWrapper;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplateAPIWrapperFactory;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.CustomDocumentBuilder;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.CustomDocumentBuilderFactory;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.TemplateStrategy;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.ServerlessOptions;
import org.slf4j.Logger;
Expand Down Expand Up @@ -171,6 +175,8 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {

private ExistingDocumentQueryManager existingDocumentQueryManager;

private final CustomDocumentBuilder customDocumentBuilder;

private final ExecutorService queryExecutorService;

private final int processWorkerThreads;
Expand Down Expand Up @@ -219,6 +225,7 @@ public OpenSearchSink(final PluginSetting pluginSetting,
this.lastFlushTimeMap = new ConcurrentHashMap<>();
this.pluginConfigObservable = pluginConfigObservable;
this.objectMapper = new ObjectMapper();
this.customDocumentBuilder = new CustomDocumentBuilderFactory().create(this.indexType);
this.queryExecutorService = openSearchSinkConfig.getIndexConfiguration().getQueryTerm() != null ?
Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("existing-document-query-manager")) : null;

Expand Down Expand Up @@ -470,6 +477,32 @@ public void doOutput(final Collection<Record<Event>> records) {
}

dataStreamIndex.ensureTimestamp(event, indexName);

if (customDocumentBuilder != null) {
try {
final List<String> tsdbDocs = customDocumentBuilder.buildDocuments(event);
final String tsdbAction = resolveEventAction(event);
final EventHandle eventHandle = event.getEventHandle();
if (tsdbDocs.size() > 1 && eventHandle instanceof InternalEventHandle) {
for (int i = 0; i < tsdbDocs.size() - 1; i++) {
((InternalEventHandle) eventHandle).acquireReference();
}
}
for (final String tsdbDoc : tsdbDocs) {
final SerializedJson doc = SerializedJson.fromStringAndOptionals(tsdbDoc, null, null, null);
final BulkOperation op = getBulkOperationForAction(tsdbAction, doc, null, indexName, null);
final BulkOperationWrapper wrapper = new BulkOperationWrapper(op, eventHandle, null, null);
bulkRequest = flushBatch(bulkRequest, wrapper, lastFlushTime);
bulkRequest.addOperation(wrapper);
}
} catch (final Exception e) {
LOG.error("Failed to build TSDB documents for event: {}", e.getMessage(), e);
dynamicIndexDroppedEvents.increment();
logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e);
}
continue;
}

final SerializedJson document = getDocument(event);

Long version = null;
Expand Down Expand Up @@ -502,20 +535,7 @@ public void doOutput(final Collection<Record<Event>> records) {
}
}

String eventAction = action;
if (actions != null) {
for (final ActionConfiguration actionEntry: actions) {
final String condition = actionEntry.getWhen();
eventAction = actionEntry.getType();
if (condition != null &&
expressionEvaluator.evaluateConditional(condition, event)) {
break;
}
}
}
if (eventAction.contains("${")) {
eventAction = event.formatString(eventAction, expressionEvaluator);
}
String eventAction = resolveEventAction(event);

if (dataStreamDetector.isDataStream(indexName)) {
eventAction = dataStreamIndex.determineAction(eventAction, indexName);
Expand Down Expand Up @@ -623,7 +643,7 @@ void successfulOperationsHandler(final List<BulkOperationWrapper> successfulOper
if (bulkOperation.getEvent() != null) {
bulkOperation.getEvent().getEventHandle().release(true);
} else {
bulkOperation.getEventHandle().release(true);
bulkOperation.releaseEventHandle(true);
}
}
return;
Expand Down Expand Up @@ -776,6 +796,24 @@ private DlqObject createDlqObjectFromEvent(final Event event,
return builder.build();
}

private String resolveEventAction(final Event event) {
String resolvedAction = action;
if (actions != null) {
for (final ActionConfiguration actionEntry : actions) {
final String condition = actionEntry.getWhen();
resolvedAction = actionEntry.getType();
if (condition != null &&
expressionEvaluator.evaluateConditional(condition, event)) {
break;
}
}
}
if (resolvedAction.contains("${")) {
resolvedAction = event.formatString(resolvedAction, expressionEvaluator);
}
return resolvedAction;
}

/**
* This function is used for update and upsert bulk actions to determine whether the original JsonNode needs to be filtered down
* based on the user's sink configuration. If a new parameter manipulates the document before sending to OpenSearch, it needs to be added to
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugins.sink.opensearch.index;

import org.opensearch.dataprepper.model.event.Event;

import java.util.List;

/**
* Converts a Data Prepper {@link Event} into one or more JSON document strings
* suitable for indexing into OpenSearch. Implementations handle index-type-specific
* document transformations such as TSDB metric expansion.
*/
public interface CustomDocumentBuilder {
List<String> buildDocuments(Event event);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugins.sink.opensearch.index;

public final class CustomDocumentBuilderFactory {

public CustomDocumentBuilder create(final IndexType indexType) {
if (indexType == IndexType.TSDB) {
return new TSDBDocumentBuilder();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,8 @@ private Map<String, Object> readIndexTemplate(final String templateFile, final I
templateURL = loadExistingTemplate(templateType, IndexConstants.METRICS_DEFAULT_TEMPLATE_FILE);
} else if (indexType.equals(IndexType.METRIC_ANALYTICS_PLAIN)) {
templateURL = loadExistingTemplate(templateType, IndexConstants.METRICS_STANDARD_TEMPLATE_FILE);
} else if (indexType.equals(IndexType.TSDB)) {
templateURL = loadExistingTemplate(templateType, IndexConstants.TSDB_DEFAULT_TEMPLATE_FILE);
} else if (templateFile != null) {
if (templateFile.toLowerCase().startsWith(S3_PREFIX)) {
FileReader s3FileReader = new S3FileReader(s3Client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class IndexConstants {
public static final String OTEL_APM_SERVICE_MAP_ISM_FILE_NO_ISM_TEMPLATE = "otel-v2-apm-service-map-policy-no-ism-template.json";
public static final String OTEL_APM_SERVICE_MAP_ISM_FILE_WITH_ISM_TEMPLATE = "otel-v2-apm-service-map-policy-with-ism-template.json";

public static final String TSDB_DEFAULT_TEMPLATE_FILE = "tsdb-index-template.json";

static {
// TODO: extract out version number into version enum
TYPE_TO_DEFAULT_ALIAS.put(IndexType.TRACE_ANALYTICS_SERVICE_MAP, "otel-v1-apm-service-map");
Expand All @@ -52,5 +54,6 @@ public class IndexConstants {
TYPE_TO_DEFAULT_ALIAS.put(IndexType.LOG_ANALYTICS_PLAIN, "logs-otel-v1");
TYPE_TO_DEFAULT_ALIAS.put(IndexType.METRIC_ANALYTICS, "metrics-otel-v1");
TYPE_TO_DEFAULT_ALIAS.put(IndexType.METRIC_ANALYTICS_PLAIN, "metrics-otel-v1");
TYPE_TO_DEFAULT_ALIAS.put(IndexType.TSDB, "metrics-tsdb-v1");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public enum IndexType {
LOG_ANALYTICS_PLAIN("log-analytics-plain"),
METRIC_ANALYTICS("metric-analytics"),
METRIC_ANALYTICS_PLAIN("metric-analytics-plain"),
TSDB("tsdb"),
CUSTOM("custom"),
MANAGEMENT_DISABLED("management_disabled");

Expand Down
Loading
Loading