Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,15 @@ dependencies {
implementation group: 'io.protostuff', name: 'protostuff-collectionschema', version: '1.8.0'
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.17.0'

// Multi-tenant SDK Client
if (System.getenv('REMOTE_METADATA_SDK_IMPL') == 'ddb-client') {
api("org.opensearch:opensearch-remote-metadata-sdk:${opensearch_build}")
implementation("org.opensearch:opensearch-remote-metadata-sdk-ddb-client:${opensearch_build}") {
exclude group: "jakarta.json", module: "jakarta.json-api"
}
} else {
implementation("org.opensearch:opensearch-remote-metadata-sdk:${opensearch_build}")
}

implementation "org.jacoco:org.jacoco.agent:0.8.12"
implementation ("org.jacoco:org.jacoco.ant:0.8.12") {
Expand Down Expand Up @@ -219,8 +228,8 @@ allprojects {
}

java {
targetCompatibility = JavaVersion.VERSION_21
sourceCompatibility = JavaVersion.VERSION_21
targetCompatibility = JavaVersion.VERSION_21
sourceCompatibility = JavaVersion.VERSION_21
}

ext {
Expand Down Expand Up @@ -272,6 +281,28 @@ configurations.all {
force("com.fasterxml.jackson.core:jackson-core:${jacksonVersion}")
force "org.eclipse.platform:org.eclipse.core.runtime:3.29.0" // CVE for < 3.29.0
force "org.ow2.asm:asm:9.7.1"

// Additional forced dependencies for remote metadata SDK when using DDB client
if (System.getenv('REMOTE_METADATA_SDK_IMPL') == 'ddb-client') {
// OpenSearch Java client brings in different versions of the below dependencies.
// Needed for runtime in SDK but conflict with OpenSearch 3.x test dependencies.
// Jackson dependencies
force("com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}")
force("com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}")
force("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${versions.jackson_databind}")

// OpenSearch client dependencies
force("org.opensearch.client:opensearch-rest-client:${opensearch_version}")

// Jakarta and Eclipse dependencies
force("jakarta.json.bind:jakarta.json.bind-api:${jakartaJsonBindVersion}")
force("org.eclipse:yasson:${yassonVersion}")
force("org.eclipse.parsson:parsson:${parssonVersion}")
force("org.glassfish:jakarta.json:${jakartaJsonVersion}")

// Apache and Commons dependencies
force("org.apache.httpcomponents:httpcore:${versions.httpcore}")
}
}
}

Expand Down Expand Up @@ -837,7 +868,7 @@ task "${baseName}#mixedClusterTask"(type: RestIntegTestTask) {
useCluster testClusters."${baseName}0"
dependsOn "${baseName}#oldVersionClusterTask0"
doFirst {
testClusters."${baseName}0".upgradeNodeAndPluginToNextVersion(plugins)
testClusters."${baseName}0".upgradeNodeAndPluginToNextVersion(plugins)
}
filter {
includeTestsMatching "org.opensearch.ad.bwc.*IT"
Expand All @@ -856,7 +887,7 @@ task "${baseName}#twoThirdsUpgradedClusterTask"(type: RestIntegTestTask) {
dependsOn "${baseName}#mixedClusterTask"
useCluster testClusters."${baseName}0"
doFirst {
testClusters."${baseName}0".upgradeNodeAndPluginToNextVersion(plugins)
testClusters."${baseName}0".upgradeNodeAndPluginToNextVersion(plugins)
}
filter {
includeTestsMatching "org.opensearch.ad.bwc.*IT"
Expand All @@ -875,7 +906,7 @@ task "${baseName}#rollingUpgradeClusterTask"(type: RestIntegTestTask) {
dependsOn "${baseName}#twoThirdsUpgradedClusterTask"
useCluster testClusters."${baseName}0"
doFirst {
testClusters."${baseName}0".upgradeNodeAndPluginToNextVersion(plugins)
testClusters."${baseName}0".upgradeNodeAndPluginToNextVersion(plugins)
}
filter {
includeTestsMatching "org.opensearch.ad.bwc.*IT"
Expand All @@ -894,7 +925,7 @@ task "${baseName}#fullRestartClusterTask"(type: RestIntegTestTask) {
dependsOn "${baseName}#oldVersionClusterTask1"
useCluster testClusters."${baseName}1"
doFirst {
testClusters."${baseName}1".upgradeAllNodesAndPluginsToNextVersion(plugins)
testClusters."${baseName}1".upgradeAllNodesAndPluginsToNextVersion(plugins)
}
filter {
includeTestsMatching "org.opensearch.ad.bwc.*IT"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.rest.RestResponse;
import org.opensearch.rest.action.RestResponseListener;
import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin;
import org.opensearch.timeseries.util.TenantAwareHelper;
import org.opensearch.transport.client.node.NodeClient;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -89,7 +90,10 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
detectorId = AnomalyDetector.NO_ID;
}

String tenantId = TenantAwareHelper.getTenantID(ADEnabledSetting.isADMultiTenancyEnabled(), request);

IndexAnomalyDetectorRequest indexAnomalyDetectorRequest = new IndexAnomalyDetectorRequest(
tenantId,
detectorId,
seqNo,
primaryTerm,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.remote.metadata.client.SdkClient;
import org.opensearch.rest.RestRequest;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
Expand Down Expand Up @@ -127,6 +128,8 @@ public abstract class AbstractAnomalyDetectorActionHandler<T extends ActionRespo
* @param isDryRun Whether handler is dryrun or not
* @param clock clock object to know when to timeout
* @param settings Node settings
* @param sdkClient remote metadata client
* @param tenantId tenant id
*/
public AbstractAnomalyDetectorActionHandler(
ClusterService clusterService,
Expand All @@ -152,7 +155,9 @@ public AbstractAnomalyDetectorActionHandler(
String validationType,
boolean isDryRun,
Clock clock,
Settings settings
Settings settings,
SdkClient sdkClient,
String tenantId
) {
super(
anomalyDetector,
Expand Down Expand Up @@ -183,7 +188,9 @@ public AbstractAnomalyDetectorActionHandler(
clock,
settings,
ValidationAspect.DETECTOR,
ADCommonName.CONFIG_INDEX
ADCommonName.CONFIG_INDEX,
sdkClient,
tenantId
);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.remote.metadata.client.SdkClient;
import org.opensearch.rest.RestRequest;
import org.opensearch.timeseries.feature.SearchFeatureDao;
import org.opensearch.timeseries.util.SecurityClientUtil;
Expand Down Expand Up @@ -58,6 +59,8 @@ public class IndexAnomalyDetectorActionHandler extends AbstractAnomalyDetectorAc
* @param adTaskManager AD Task manager
* @param searchFeatureDao Search feature dao
* @param settings Node settings
* @param sdkClient remote metadata client
* @param tenantId tenant id
*/
public IndexAnomalyDetectorActionHandler(
ClusterService clusterService,
Expand All @@ -80,7 +83,9 @@ public IndexAnomalyDetectorActionHandler(
User user,
ADTaskManager adTaskManager,
SearchFeatureDao searchFeatureDao,
Settings settings
Settings settings,
SdkClient sdkClient,
String tenantId
) {
super(
clusterService,
Expand All @@ -106,7 +111,9 @@ public IndexAnomalyDetectorActionHandler(
null,
false,
null,
settings
settings,
sdkClient,
tenantId
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.remote.metadata.client.SdkClient;
import org.opensearch.rest.RestRequest;
import org.opensearch.timeseries.feature.SearchFeatureDao;
import org.opensearch.timeseries.model.Config;
Expand Down Expand Up @@ -52,6 +53,8 @@ public class ValidateAnomalyDetectorActionHandler extends AbstractAnomalyDetecto
* @param validationType Specified type for validation
* @param clock Clock object to know when to timeout
* @param settings Node settings
* @param sdkClient remote metadata client
* @param tenantId tenant id
*/
public ValidateAnomalyDetectorActionHandler(
ClusterService clusterService,
Expand All @@ -70,7 +73,9 @@ public ValidateAnomalyDetectorActionHandler(
SearchFeatureDao searchFeatureDao,
String validationType,
Clock clock,
Settings settings
Settings settings,
SdkClient sdkClient,
String tenantId
) {
super(
clusterService,
Expand All @@ -96,7 +101,9 @@ public ValidateAnomalyDetectorActionHandler(
validationType,
true,
clock,
settings
settings,
sdkClient,
tenantId
);
}
}
42 changes: 42 additions & 0 deletions src/main/java/org/opensearch/ad/settings/ADEnabledSetting.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,40 @@ public class ADEnabledSetting extends DynamicNumericSetting {

public static final String DOOR_KEEPER_IN_CACHE_ENABLED = "plugins.anomaly_detection.door_keeper_in_cache.enabled";

/**
* Indicates whether multi-tenancy is enabled in Anomaly Detection.
*
* This is a static setting that must be configured before starting OpenSearch. The corresponding setting {@code plugins.ml_commons.multi_tenancy_enabled} in the ML Commons plugin should match.
*
* It can be set in the following ways, in priority order:
*
* <ol>
* <li>As a command-line argument using the <code>-E</code> flag (this overrides other options):
* <pre>
* ./bin/opensearch -Eplugins.anomaly_detection.multi_tenancy_enabled=true
* </pre>
* </li>
* <li>As a system property using <code>OPENSEARCH_JAVA_OPTS</code> (this overrides <code>opensearch.yml</code>):
* <pre>
* export OPENSEARCH_JAVA_OPTS="-Dplugins.anomaly_detection.multi_tenancy_enabled=true"
* ./bin/opensearch
* </pre>
* Or inline when starting OpenSearch:
* <pre>
* OPENSEARCH_JAVA_OPTS="-Dplugins.anomaly_detection.multi_tenancy_enabled=true" ./bin/opensearch
* </pre>
* </li>
* <li>In the <code>opensearch.yml</code> configuration file:
* <pre>
* plugins.anomaly_detection.multi_tenancy_enabled: true
* </pre>
* </li>
* </ol>
*
* After setting this option, a full cluster restart is required for the changes to take effect.
*/
public static final String AD_MULTI_TENANCY_ENABLED = "plugins.anomaly_detection.multi_tenancy_enabled";

public static final Map<String, Setting<?>> settings = unmodifiableMap(new HashMap<String, Setting<?>>() {
{
Setting LegacyADEnabledSetting = Setting.boolSetting(LEGACY_OPENDISTRO_AD_ENABLED, true, NodeScope, Dynamic, Deprecated);
Expand Down Expand Up @@ -131,4 +165,12 @@ public static boolean isInterpolationInColdStartEnabled() {
public static boolean isDoorKeeperInCacheEnabled() {
return ADEnabledSetting.getInstance().getSettingValue(ADEnabledSetting.DOOR_KEEPER_IN_CACHE_ENABLED);
}

/**
* Whether AD multi-tenancy is enabled or not.
* @return whether AD multi-tenancy is enabled.
*/
public static boolean isADMultiTenancyEnabled() {
return ADEnabledSetting.getInstance().getSettingValue(ADEnabledSetting.AD_MULTI_TENANCY_ENABLED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

package org.opensearch.ad.settings;

import static org.opensearch.remote.metadata.common.CommonValue.*;

import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.timeseries.settings.TimeSeriesSettings;
Expand Down Expand Up @@ -671,4 +673,20 @@ private AnomalyDetectorSettings() {}
// The reason we need a max is because user could give an arbitrarly large interval where we don't know even
// with multiplying the interval down how many intervals will be tried.
public static final int MAX_TIMES_DECREASING_INTERVAL = 10;

/** This setting sets the remote metadata type */
public static final Setting<String> REMOTE_METADATA_TYPE = Setting
.simpleString("plugins.anomaly_detection." + REMOTE_METADATA_TYPE_KEY, Setting.Property.NodeScope, Setting.Property.Final);

/** This setting sets the remote metadata endpoint */
public static final Setting<String> REMOTE_METADATA_ENDPOINT = Setting
.simpleString("plugins.anomaly_detection." + REMOTE_METADATA_ENDPOINT_KEY, Setting.Property.NodeScope, Setting.Property.Final);

/** This setting sets the remote metadata region */
public static final Setting<String> REMOTE_METADATA_REGION = Setting
.simpleString("plugins.anomaly_detection." + REMOTE_METADATA_REGION_KEY, Setting.Property.NodeScope, Setting.Property.Final);

/** This setting sets the remote metadata service name */
public static final Setting<String> REMOTE_METADATA_SERVICE_NAME = Setting
.simpleString("plugins.anomaly_detection." + REMOTE_METADATA_SERVICE_NAME_KEY, Setting.Property.NodeScope, Setting.Property.Final);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

public class IndexAnomalyDetectorRequest extends ActionRequest {

private String tenantId;
private String detectorID;
private long seqNo;
private long primaryTerm;
Expand All @@ -40,6 +41,7 @@ public class IndexAnomalyDetectorRequest extends ActionRequest {

public IndexAnomalyDetectorRequest(StreamInput in) throws IOException {
super(in);
tenantId = in.readOptionalString();
detectorID = in.readString();
seqNo = in.readLong();
primaryTerm = in.readLong();
Expand All @@ -54,6 +56,7 @@ public IndexAnomalyDetectorRequest(StreamInput in) throws IOException {
}

public IndexAnomalyDetectorRequest(
String tenantId,
String detectorID,
long seqNo,
long primaryTerm,
Expand All @@ -67,6 +70,7 @@ public IndexAnomalyDetectorRequest(
Integer maxCategoricalFields
) {
super();
this.tenantId = tenantId;
this.detectorID = detectorID;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
Expand All @@ -80,6 +84,10 @@ public IndexAnomalyDetectorRequest(
this.maxCategoricalFields = maxCategoricalFields;
}

public String getTenantId() {
return tenantId;
}

public String getDetectorID() {
return detectorID;
}
Expand Down Expand Up @@ -127,6 +135,7 @@ public Integer getMaxCategoricalFields() {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(tenantId);
out.writeString(detectorID);
out.writeLong(seqNo);
out.writeLong(primaryTerm);
Expand Down
Loading
Loading