Skip to content
This repository was archived by the owner on Aug 2, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
14 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import com.amazon.opendistroforelasticsearch.alerting.core.JobSweeper
import com.amazon.opendistroforelasticsearch.alerting.core.ScheduledJobIndices
import com.amazon.opendistroforelasticsearch.alerting.core.action.node.ScheduledJobsStatsAction
import com.amazon.opendistroforelasticsearch.alerting.core.action.node.ScheduledJobsStatsTransportAction
import com.amazon.opendistroforelasticsearch.alerting.core.model.LocalUriInput
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
import com.amazon.opendistroforelasticsearch.alerting.core.resthandler.RestScheduledJobStatsHandler
Expand Down Expand Up @@ -200,7 +201,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
}

override fun getNamedXContent(): List<NamedXContentRegistry.Entry> {
return listOf(Monitor.XCONTENT_REGISTRY, SearchInput.XCONTENT_REGISTRY)
return listOf(Monitor.XCONTENT_REGISTRY, SearchInput.XCONTENT_REGISTRY, LocalUriInput.XCONTENT_REGISTRY)
}

override fun createComponents(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertError
import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertIndices
import com.amazon.opendistroforelasticsearch.alerting.alerts.moveAlerts
import com.amazon.opendistroforelasticsearch.alerting.core.JobRunner
import com.amazon.opendistroforelasticsearch.alerting.core.model.LocalUriInput
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.InjectorContextElement
Expand Down Expand Up @@ -56,8 +57,10 @@ import com.amazon.opendistroforelasticsearch.alerting.settings.DestinationSettin
import com.amazon.opendistroforelasticsearch.alerting.settings.DestinationSettings.Companion.loadDestinationSettings
import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils
import com.amazon.opendistroforelasticsearch.alerting.util.addUserBackendRolesFilter
import com.amazon.opendistroforelasticsearch.alerting.util.executeTransportAction
import com.amazon.opendistroforelasticsearch.alerting.util.isADMonitor
import com.amazon.opendistroforelasticsearch.alerting.util.isAllowed
import com.amazon.opendistroforelasticsearch.alerting.util.toMap
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
Expand Down Expand Up @@ -336,6 +339,13 @@ class MonitorRunner(
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
results += searchResponse.convertToMap()
}
is LocalUriInput -> {
logger.debug("LocalUriInput path: ${input.toConstructedUri().path}")
val response = executeTransportAction(input, client)
results += withContext(Dispatchers.IO) {
response.toMap()
}
}
else -> {
throw IllegalArgumentException("Unsupported input type: ${input.name()}.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.amazon.opendistroforelasticsearch.alerting.model

import com.amazon.opendistroforelasticsearch.alerting.core.model.CronSchedule
import com.amazon.opendistroforelasticsearch.alerting.core.model.Input
import com.amazon.opendistroforelasticsearch.alerting.core.model.LocalUriInput
import com.amazon.opendistroforelasticsearch.alerting.core.model.Schedule
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
Expand All @@ -25,6 +26,7 @@ import com.amazon.opendistroforelasticsearch.alerting.elasticapi.optionalTimeFie
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.optionalUserField
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MONITOR_MAX_INPUTS
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MONITOR_MAX_TRIGGERS
import com.amazon.opendistroforelasticsearch.alerting.settings.SupportedApiSettings
import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
import com.amazon.opendistroforelasticsearch.alerting.util._ID
import com.amazon.opendistroforelasticsearch.alerting.util._VERSION
Expand Down Expand Up @@ -197,7 +199,11 @@ data class Monitor(
INPUTS_FIELD -> {
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_ARRAY) {
inputs.add(Input.parse(xcp))
val input = Input.parse(xcp)
if (input is LocalUriInput) {
SupportedApiSettings.validateLocalUriInput(input)
}
inputs.add(input)
}
}
TRIGGERS_FIELD -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.amazon.opendistroforelasticsearch.alerting.settings

import com.amazon.opendistroforelasticsearch.alerting.core.model.LocalUriInput

/**
* A class that supports storing a unique set of API paths that can be accessed by general users.
*/
class SupportedApiSettings {
companion object {
const val CLUSTER_HEALTH_PATH = "/_cluster/health"
const val CLUSTER_STATS_PATH = "/_cluster/stats"

/**
* Each String represents the path to call an API.
* NOTE: Paths should conform to the following pattern:
* "/_cluster/health"
*
* Each Set<String> represents the supported JSON payload for the respective API.
*/
private var supportedApiList = HashMap<String, Map<String, Set<String>>>()

/**
* Set to TRUE to enable the supportedApiList check. Set to FALSE to disable.
*/
// TODO: Currently set to TRUE for testing purposes.
// Should likely be set to FALSE by default.
private var supportedApiListEnabled = true

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious: why do we need this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is essentially an on/off switch that I implemented at the beginning of the project. If a user did not want to check the API called by the monitor against the allow list, this value could be set to FALSE. That would allow users to create monitors for any API that this feature support. At the moment, it doesn't serve much of a purpose; but it could be more valuable as more API become supported.

We could remove this value if we feel it's extraneous.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand this correctly this is something intended to be specified by the user and if this is set to false then they cannot use the LocalUrlInput feature.

I am not sure I understand how this is configurable by the end user at the moment. Do you think this can be defined as a cluster setting so user can turn it on off as it seems fit for them? or if this is not configurable at the moment suggest going with removing this as it serves no purpose and in future when needed to add the customization we can introduce the cluster setting

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If configured to false, users could still use the LocalUriInput feature; but the allow list wouldn't be checked for the requested API before creating and executing the monitor. At the moment, the allow list exposes both of the supported API; but as support for more API is added, a cluster owner could essentially configure this setting to false to allow access to any supported API, regardless of whether they are on the allow list.

Your comment about configuring via settings makes sense though. This will be easy enough to reimplement if there's a need in the future.

Removed this value.


init {
supportedApiList[CLUSTER_HEALTH_PATH] = hashMapOf()
supportedApiList[CLUSTER_STATS_PATH] = hashMapOf()
}

/**
* Returns the map of all supported json payload associated with the provided path from supportedApiList.
* @param path The path for the requested API.
* @return The map of all supported json payload for the requested API.
* @throws IllegalArgumentException When supportedApiList does not contain a value for the provided key.
*/
fun getSupportedJsonPayload(path: String): Map<String, Set<String>> {
return supportedApiList[path] ?: throw IllegalArgumentException("API path not in supportedApiList: $path")
}

/**
* If [supportedApiListEnabled] is TRUE, calls [validatePath] to confirm whether the provided path
* is in supportedApiList. Will otherwise take no actions.
* @param localUriInput The [LocalUriInput] to validate.
* @return The path that was validated.
*/
fun validateLocalUriInput(localUriInput: LocalUriInput): String {
Copy link

@thalurur thalurur Apr 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it makes sense to rename this to something like resolveToUriPath` as this method seems to be doing more than just validation, its also seems to be converting LocalUriInput`` to a path.

Also do we really need to convert to a uri path? Looks the entity being used later on is transport action request do you think it makes sense to just build and return ActionRequest from this method instead, so callers don't need to worry about building the action request with all the correct attributes?

fun resolveToActionRequest(localUriInput: LocalUriInput): ActionRequest? {
  ....
} 

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I'll refactor this method to resolveToActionRequest and return the ActionRequest. Thank you for the feedback! I've also made the validatePath method public for use in the Monitor class.

val path = localUriInput.toConstructedUri().path
if (supportedApiListEnabled) validatePath(path)
return path
}

/**
* Confirms whether the provided path is in supportedApiList.
* Throws an exception if the provided path is not on the list; otherwise performs no action.
* @param path The path to validate.
* @throws IllegalArgumentException When supportedApiList does not contain the provided path.
*/
private fun validatePath(path: String) {
if (!supportedApiList.contains(path)) throw IllegalArgumentException("API path not in supportedApiList: $path")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.amazon.opendistroforelasticsearch.alerting.util

import com.amazon.opendistroforelasticsearch.alerting.core.model.LocalUriInput
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.convertToMap
import com.amazon.opendistroforelasticsearch.alerting.settings.SupportedApiSettings
import com.amazon.opendistroforelasticsearch.alerting.settings.SupportedApiSettings.Companion.validateLocalUriInput
import org.elasticsearch.action.ActionResponse
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequest
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse
import org.elasticsearch.client.Client

fun executeTransportAction(localUriInput: LocalUriInput, client: Client): ActionResponse {
val path = validateLocalUriInput(localUriInput)
if (path == SupportedApiSettings.CLUSTER_HEALTH_PATH) {
return client.admin().cluster().health(ClusterHealthRequest()).get()
}
if (path == SupportedApiSettings.CLUSTER_STATS_PATH) {
return client.admin().cluster().clusterStats(ClusterStatsRequest()).get()
}
throw IllegalArgumentException("Unsupported API: $path")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor:
you can use when statement instead

when (path) {
   is SupportedApiSettings.CLUSTER_HEALTH_PATH -> return ..
   is SupportedApiSettings.CLUSTER_STATS_PATH -> return ...
   else -> throw IllegalArgumentException
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the tip! The syntax is a little different from your example due to some compiler warnings, but I refactored this to when.

}

fun ActionResponse.toMap(): Map<String, Any> {
if (this is ClusterHealthResponse) {
return this.convertToMap()
}
if (this is ClusterStatsResponse) {
return this.convertToMap()
}
throw IllegalArgumentException("Unsupported ActionResponse type: ${this.javaClass.name}")
Copy link

@thalurur thalurur Apr 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: using when

when (this) {
    is ClusterHealthResponse, is ClusterStatsResponse -> return this.convertToMap()
    else -> throw IllegalArgumentException
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the tip! The syntax is a little different from your example due to some compiler warnings, but I refactored this to when.

}
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,133 @@ class MonitorRunnerIT : AlertingRestTestCase() {
}
}

fun `test LocalUriInput monitor with ClusterHealth API`() {
// GIVEN
val path = "/_cluster/health"
val clusterIndex = randomInt(clusterHosts.size - 1)
val input = randomLocalUriInput(
scheme = clusterHosts[clusterIndex].schemeName,
host = clusterHosts[clusterIndex].hostName,
port = clusterHosts[clusterIndex].port,
path = path
)
val monitor = createMonitor(randomMonitor(inputs = listOf(input)))

// WHEN
val response = executeMonitor(monitor.id)

// THEN
val output = entityAsMap(response)
val inputResults = output.stringMap("input_results")
val resultsContent = (inputResults?.get("results") as ArrayList<*>)[0]
val errorMessage = inputResults["error"]

assertEquals(monitor.name, output["monitor_name"])
assertTrue("Monitor results should contain cluster_name, but found: $resultsContent",
resultsContent.toString().contains("cluster_name"))
assertNull("There should not be an error message, but found: $errorMessage", errorMessage)
}

fun `test LocalUriInput monitor with ClusterStats API`() {
// GIVEN
val path = "/_cluster/stats"
val clusterIndex = randomInt(clusterHosts.size - 1)
val input = randomLocalUriInput(
scheme = clusterHosts[clusterIndex].schemeName,
host = clusterHosts[clusterIndex].hostName,
port = clusterHosts[clusterIndex].port,
path = path
)
val monitor = createMonitor(randomMonitor(inputs = listOf(input)))

// WHEN
val response = executeMonitor(monitor.id)

// THEN
val output = entityAsMap(response)
val inputResults = output.stringMap("input_results")
val resultsContent = (inputResults?.get("results") as ArrayList<*>)[0]
val errorMessage = inputResults["error"]

assertEquals(monitor.name, output["monitor_name"])
assertTrue("Monitor results should contain cluster_name, but found: $resultsContent",
resultsContent.toString().contains("memory_size_in_bytes"))
assertNull("There should not be an error message, but found: $errorMessage", errorMessage)
}

fun `test LocalUriInput monitor with alert triggered`() {
// GIVEN
putAlertMappings()
val trigger = randomTrigger(condition = Script("""
return ctx.results[0].number_of_pending_tasks < 1
""".trimIndent()), destinationId = createDestination().id)
val path = "/_cluster/health"
val clusterIndex = randomInt(clusterHosts.size - 1)
val input = randomLocalUriInput(
scheme = clusterHosts[clusterIndex].schemeName,
host = clusterHosts[clusterIndex].hostName,
port = clusterHosts[clusterIndex].port,
path = path
)
val monitor = createMonitor(randomMonitor(inputs = listOf(input), triggers = listOf(trigger)))

// WHEN
val response = executeMonitor(monitor.id)

// THEN
val output = entityAsMap(response)
assertEquals(monitor.name, output["monitor_name"])

val triggerResults = output.objectMap("trigger_results").values
for (triggerResult in triggerResults) {
assertTrue("This triggerResult should be triggered: $triggerResult",
triggerResult.objectMap("action_results").isNotEmpty())
}

val alerts = searchAlerts(monitor)
assertEquals("Alert not saved, $output", 1, alerts.size)
verifyAlert(alerts.single(), monitor, ACTIVE)
}

fun `test LocalUriInput monitor with no alert triggered`() {
// GIVEN
putAlertMappings()
val trigger = randomTrigger(condition = Script("""
return ctx.results[0].status.equals("red")
""".trimIndent()))
val path = "/_cluster/stats"
val clusterIndex = randomInt(clusterHosts.size - 1)
val input = randomLocalUriInput(
scheme = clusterHosts[clusterIndex].schemeName,
host = clusterHosts[clusterIndex].hostName,
port = clusterHosts[clusterIndex].port,
path = path
)
val monitor = createMonitor(randomMonitor(inputs = listOf(input), triggers = listOf(trigger)))

// WHEN
val response = executeMonitor(monitor.id)

// THEN
val output = entityAsMap(response)
assertEquals(monitor.name, output["monitor_name"])

val triggerResults = output.objectMap("trigger_results").values
for (triggerResult in triggerResults) {
assertTrue("This triggerResult should not be triggered: $triggerResult",
triggerResult.objectMap("action_results").isEmpty())
}

val alerts = searchAlerts(monitor)
assertEquals("Alert saved for test monitor, output: $output", 0, alerts.size)
}

// TODO: Once an API is implemented that supports adding/removing entries on the
// SupportedApiSettings::supportedApiList, create an test that simulates executing
// a preexisting LocalUriInput monitor for an API that has been removed from the supportedApiList.
// This will likely involve adding an API to the list before creating the monitor, and then removing
// the API from the list before executing the monitor.

private fun prepareTestAnomalyResult(detectorId: String, user: User) {
val adResultIndex = ".opendistro-anomaly-results-history-2020.10.17"
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package com.amazon.opendistroforelasticsearch.alerting

import com.amazon.opendistroforelasticsearch.alerting.core.model.Input
import com.amazon.opendistroforelasticsearch.alerting.core.model.IntervalSchedule
import com.amazon.opendistroforelasticsearch.alerting.core.model.LocalUriInput
import com.amazon.opendistroforelasticsearch.alerting.core.model.Schedule
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.string
Expand Down Expand Up @@ -229,6 +230,19 @@ fun randomUserEmpty(): User {
return User("", listOf(), listOf(), listOf())
}

fun randomLocalUriInput(
scheme: String = if (randomInt(3) >= 2) "http" else "https",
host: String = "localhost",
port: Int = randomInt(LocalUriInput.MAX_PORT),
path: String,
queryParams: Map<String, String> = hashMapOf(),
url: String = "",
connectionTimeout: Int = 1 + randomInt(LocalUriInput.MAX_CONNECTION_TIMEOUT - 1),
socketTimeout: Int = 1 + randomInt(LocalUriInput.MAX_SOCKET_TIMEOUT - 1)
): LocalUriInput {
return LocalUriInput(scheme, host, port, path, queryParams, url, connectionTimeout, socketTimeout)
}

fun EmailAccount.toJsonString(): String {
val builder = XContentFactory.jsonBuilder()
return this.toXContent(builder).string()
Expand Down
1 change: 1 addition & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {
compile "org.elasticsearch.client:elasticsearch-rest-client:${es_version}"
compile 'com.google.googlejavaformat:google-java-format:1.3'
compile "com.amazon.opendistroforelasticsearch:common-utils:1.13.0.0"
compile 'commons-validator:commons-validator:1.7'

testImplementation "org.elasticsearch.test:framework:${es_version}"
testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
Expand Down
Loading