Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.exceptions.EventKeyNotFoundException;
import org.opensearch.dataprepper.model.failures.DlqObject;
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
Expand Down Expand Up @@ -77,6 +78,7 @@
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplateAPIWrapper;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplateAPIWrapperFactory;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.TSDBDocumentBuilder;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.TemplateStrategy;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.ServerlessOptions;
import org.slf4j.Logger;
Expand Down Expand Up @@ -171,6 +173,8 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {

private ExistingDocumentQueryManager existingDocumentQueryManager;

private final TSDBDocumentBuilder tsdbDocumentBuilder;

private final ExecutorService queryExecutorService;

private final int processWorkerThreads;
Expand Down Expand Up @@ -219,6 +223,7 @@ public OpenSearchSink(final PluginSetting pluginSetting,
this.lastFlushTimeMap = new ConcurrentHashMap<>();
this.pluginConfigObservable = pluginConfigObservable;
this.objectMapper = new ObjectMapper();
this.tsdbDocumentBuilder = (this.indexType == IndexType.TSDB) ? new TSDBDocumentBuilder() : null;
this.queryExecutorService = openSearchSinkConfig.getIndexConfiguration().getQueryTerm() != null ?
Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("existing-document-query-manager")) : null;

Expand Down Expand Up @@ -470,6 +475,32 @@ public void doOutput(final Collection<Record<Event>> records) {
}

dataStreamIndex.ensureTimestamp(event, indexName);

if (indexType == IndexType.TSDB) {
try {
final List<String> tsdbDocs = tsdbDocumentBuilder.build(event);
final String tsdbAction = resolveEventAction(event);
final List<BulkOperationWrapper> wrappers = new ArrayList<>(tsdbDocs.size());
for (int i = 0; i < tsdbDocs.size(); i++) {
final SerializedJson doc = SerializedJson.fromStringAndOptionals(tsdbDocs.get(i), null, null, null);
final BulkOperation op = getBulkOperationForAction(tsdbAction, doc, null, indexName, null);
final BulkOperationWrapper wrapper = (i == tsdbDocs.size() - 1)
? new BulkOperationWrapper(op, event.getEventHandle(), null, null)
: new BulkOperationWrapper(op, (EventHandle) null, null, null);
wrappers.add(wrapper);
}
for (final BulkOperationWrapper wrapper : wrappers) {
bulkRequest = flushBatch(bulkRequest, wrapper, lastFlushTime);
bulkRequest.addOperation(wrapper);
}
} catch (final Exception e) {
LOG.error("Failed to build TSDB documents for event: {}", e.getMessage(), e);
dynamicIndexDroppedEvents.increment();
logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e);
}
continue;
}

final SerializedJson document = getDocument(event);

Long version = null;
Expand Down Expand Up @@ -502,20 +533,7 @@ public void doOutput(final Collection<Record<Event>> records) {
}
}

String eventAction = action;
if (actions != null) {
for (final ActionConfiguration actionEntry: actions) {
final String condition = actionEntry.getWhen();
eventAction = actionEntry.getType();
if (condition != null &&
expressionEvaluator.evaluateConditional(condition, event)) {
break;
}
}
}
if (eventAction.contains("${")) {
eventAction = event.formatString(eventAction, expressionEvaluator);
}
String eventAction = resolveEventAction(event);

if (dataStreamDetector.isDataStream(indexName)) {
eventAction = dataStreamIndex.determineAction(eventAction, indexName);
Expand Down Expand Up @@ -623,7 +641,7 @@ void successfulOperationsHandler(final List<BulkOperationWrapper> successfulOper
if (bulkOperation.getEvent() != null) {
bulkOperation.getEvent().getEventHandle().release(true);
} else {
bulkOperation.getEventHandle().release(true);
bulkOperation.releaseEventHandle(true);
}
}
return;
Expand Down Expand Up @@ -776,6 +794,24 @@ private DlqObject createDlqObjectFromEvent(final Event event,
return builder.build();
}

private String resolveEventAction(final Event event) {
String resolvedAction = action;
if (actions != null) {
for (final ActionConfiguration actionEntry : actions) {
final String condition = actionEntry.getWhen();
resolvedAction = actionEntry.getType();
if (condition != null &&
expressionEvaluator.evaluateConditional(condition, event)) {
break;
}
}
}
if (resolvedAction.contains("${")) {
resolvedAction = event.formatString(resolvedAction, expressionEvaluator);
}
return resolvedAction;
}

/**
* This function is used for update and upsert bulk actions to determine whether the original JsonNode needs to be filtered down
* based on the user's sink configuration. If a new parameter manipulates the document before sending to OpenSearch, it needs to be added to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,8 @@ private Map<String, Object> readIndexTemplate(final String templateFile, final I
templateURL = loadExistingTemplate(templateType, IndexConstants.METRICS_DEFAULT_TEMPLATE_FILE);
} else if (indexType.equals(IndexType.METRIC_ANALYTICS_PLAIN)) {
templateURL = loadExistingTemplate(templateType, IndexConstants.METRICS_STANDARD_TEMPLATE_FILE);
} else if (indexType.equals(IndexType.TSDB)) {
templateURL = loadExistingTemplate(templateType, IndexConstants.TSDB_DEFAULT_TEMPLATE_FILE);
} else if (templateFile != null) {
if (templateFile.toLowerCase().startsWith(S3_PREFIX)) {
FileReader s3FileReader = new S3FileReader(s3Client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class IndexConstants {
public static final String OTEL_APM_SERVICE_MAP_ISM_FILE_NO_ISM_TEMPLATE = "otel-v2-apm-service-map-policy-no-ism-template.json";
public static final String OTEL_APM_SERVICE_MAP_ISM_FILE_WITH_ISM_TEMPLATE = "otel-v2-apm-service-map-policy-with-ism-template.json";

public static final String TSDB_DEFAULT_TEMPLATE_FILE = "tsdb-index-template.json";

static {
// TODO: extract out version number into version enum
TYPE_TO_DEFAULT_ALIAS.put(IndexType.TRACE_ANALYTICS_SERVICE_MAP, "otel-v1-apm-service-map");
Expand All @@ -52,5 +54,6 @@ public class IndexConstants {
TYPE_TO_DEFAULT_ALIAS.put(IndexType.LOG_ANALYTICS_PLAIN, "logs-otel-v1");
TYPE_TO_DEFAULT_ALIAS.put(IndexType.METRIC_ANALYTICS, "metrics-otel-v1");
TYPE_TO_DEFAULT_ALIAS.put(IndexType.METRIC_ANALYTICS_PLAIN, "metrics-otel-v1");
TYPE_TO_DEFAULT_ALIAS.put(IndexType.TSDB, "metrics-tsdb-v1");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public final IndexManager getIndexManager(final IndexType indexType,
restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
break;
case TRACE_ANALYTICS_SERVICE_MAP:
case TSDB:
indexManager = new TraceAnalyticsServiceMapIndexManager(
restHighLevelClient, openSearchClient, openSearchSinkConfiguration, clusterSettingsParser, templateStrategy, indexAlias);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public enum IndexType {
LOG_ANALYTICS_PLAIN("log-analytics-plain"),
METRIC_ANALYTICS("metric-analytics"),
METRIC_ANALYTICS_PLAIN("metric-analytics-plain"),
TSDB("tsdb"),
CUSTOM("custom"),
MANAGEMENT_DISABLED("management_disabled");

Expand Down
Loading
Loading