Skip to content
This repository was archived by the owner on Aug 2, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexstatemanagement.action

import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ActionConfig.ActionType
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.IndexPriorityActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.indexpriority.AttemptSetIndexPriorityStep
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService

class IndexPriorityAction(
clusterService: ClusterService,
client: Client,
managedIndexMetaData: ManagedIndexMetaData,
config: IndexPriorityActionConfig
) : Action(ActionType.INDEX_PRIORITY, config, managedIndexMetaData) {

private val attemptSetIndexPriorityStep = AttemptSetIndexPriorityStep(clusterService, client, config, managedIndexMetaData)
private val steps = listOf(attemptSetIndexPriorityStep)

override fun getSteps(): List<Step> = steps

override fun getStepToExecute(): Step = attemptSetIndexPriorityStep
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ abstract class ActionConfig(
REPLICA_COUNT("replica_count"),
FORCE_MERGE("force_merge"),
NOTIFICATION("notification"),
SNAPSHOT("snapshot");
SNAPSHOT("snapshot"),
INDEX_PRIORITY("index_priority");

override fun toString(): String {
return type
Expand Down Expand Up @@ -96,6 +97,7 @@ abstract class ActionConfig(
ActionType.FORCE_MERGE.type -> actionConfig = ForceMergeActionConfig.parse(xcp, index)
ActionType.NOTIFICATION.type -> actionConfig = NotificationActionConfig.parse(xcp, index)
ActionType.SNAPSHOT.type -> actionConfig = SnapshotActionConfig.parse(xcp, index)
ActionType.INDEX_PRIORITY.type -> actionConfig = IndexPriorityActionConfig.parse(xcp, index)
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in Action.")
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action

import com.amazon.opendistroforelasticsearch.indexstatemanagement.action.Action
import com.amazon.opendistroforelasticsearch.indexstatemanagement.action.IndexPriorityAction
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.ToXContentObject
import org.elasticsearch.common.xcontent.XContentBuilder
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentParser.Token
import org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.elasticsearch.script.ScriptService
import java.io.IOException

data class IndexPriorityActionConfig(
val indexPriority: Int,
val index: Int
) : ToXContentObject, ActionConfig(ActionType.INDEX_PRIORITY, index) {

init {
require(indexPriority >= 0) { "IndexPriorityActionConfig index_priority value must be a non-negative number" }
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
super.toXContent(builder, params).startObject(ActionType.INDEX_PRIORITY.type)
builder.field(INDEX_PRIORITY_FIELD, indexPriority)
return builder.endObject().endObject()
}

override fun isFragment(): Boolean = super<ToXContentObject>.isFragment()

override fun toAction(
clusterService: ClusterService,
scriptService: ScriptService,
client: Client,
managedIndexMetaData: ManagedIndexMetaData
): Action = IndexPriorityAction(clusterService, client, managedIndexMetaData, this)

companion object {
const val INDEX_PRIORITY_FIELD = "priority"

@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser, index: Int): IndexPriorityActionConfig {
var indexPriority: Int? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
INDEX_PRIORITY_FIELD -> indexPriority = xcp.intValue()
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in IndexPriorityActionConfig.")
}
}

return IndexPriorityActionConfig(
indexPriority = requireNotNull(indexPriority) { "$INDEX_PRIORITY_FIELD is null" },
index = index
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexstatemanagement.step.indexpriority

import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.IndexPriorityActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.settings.Settings

class AttemptSetIndexPriorityStep(
val clusterService: ClusterService,
val client: Client,
val config: IndexPriorityActionConfig,
managedIndexMetaData: ManagedIndexMetaData
) : Step("attempt_set_index_priority", managedIndexMetaData) {

private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override fun isIdempotent() = true

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
val indexPriority = config.indexPriority
try {
logger.info("Executing $name on ${managedIndexMetaData.index}")
val updateSettingsRequest = UpdateSettingsRequest()
.indices(managedIndexMetaData.index)
.settings(Settings.builder().put("index.priority", indexPriority))
val response: AcknowledgedResponse = client.admin().indices()
.suspendUntil { updateSettings(updateSettingsRequest, it) }

if (response.isAcknowledged) {
logger.info("Successfully executed $name on ${managedIndexMetaData.index}")
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to "Successfully set index priority to $indexPriority")
} else {
stepStatus = StepStatus.FAILED
info = mapOf("message" to "Failed to set index priority to $indexPriority")
}
} catch (e: Exception) {
logger.error("Failed to set index priority [index=${managedIndexMetaData.index}]", e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to "Failed to set index priority to $indexPriority")
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}
}

override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData {
return currentMetaData.copy(
stepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), stepStatus),
transitionTo = null,
info = info
)
}
}
9 changes: 8 additions & 1 deletion src/main/resources/mappings/opendistro-ism-config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"_meta" : {
"schema_version": 2
"schema_version": 3
},
"dynamic": "strict",
"properties": {
Expand Down Expand Up @@ -175,6 +175,13 @@
}
}
},
"index_priority": {
"properties": {
"priority": {
"type": "integer"
}
}
},
"close": {
"type": "object"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ class IndexStateManagementIndicesIT : IndexStateManagementRestTestCase() {
val policyId = ESTestCase.randomAlphaOfLength(10)
client().makeRequest("PUT", "$POLICY_BASE_URI/$policyId", emptyMap(), policy.toHttpEntity())
assertIndexExists(INDEX_STATE_MANAGEMENT_INDEX)
verifyIndexSchemaVersion(INDEX_STATE_MANAGEMENT_INDEX, 2)
verifyIndexSchemaVersion(INDEX_STATE_MANAGEMENT_INDEX, 3)
}

fun `test update management index mapping with new schema version`() {
assertIndexDoesNotExist(INDEX_STATE_MANAGEMENT_INDEX)

val mapping = indexStateManagementMappings.trimStart('{').trimEnd('}')
.replace("\"schema_version\": 2", "\"schema_version\": 0")
.replace("\"schema_version\": 3", "\"schema_version\": 0")

createIndex(INDEX_STATE_MANAGEMENT_INDEX, Settings.builder().put("index.hidden", true).build(), mapping)
assertIndexExists(INDEX_STATE_MANAGEMENT_INDEX)
Expand All @@ -43,7 +43,7 @@ class IndexStateManagementIndicesIT : IndexStateManagementRestTestCase() {
client().makeRequest("PUT", "$POLICY_BASE_URI/$policyId", emptyMap(), policy.toHttpEntity())

assertIndexExists(INDEX_STATE_MANAGEMENT_INDEX)
verifyIndexSchemaVersion(INDEX_STATE_MANAGEMENT_INDEX, 2)
verifyIndexSchemaVersion(INDEX_STATE_MANAGEMENT_INDEX, 3)
}

fun `test changing policy on an index that hasn't initialized yet check schema version`() {
Expand All @@ -58,7 +58,7 @@ class IndexStateManagementIndicesIT : IndexStateManagementRestTestCase() {
assertEquals("Policy id does not match", policy.id, managedIndexConfig.policyID)

val mapping = "{" + indexStateManagementMappings.trimStart('{').trimEnd('}')
.replace("\"schema_version\": 2", "\"schema_version\": 0")
.replace("\"schema_version\": 3", "\"schema_version\": 0")

val entity = StringEntity(mapping, ContentType.APPLICATION_JSON)
client().makeRequest(RestRequest.Method.PUT.toString(),
Expand All @@ -72,7 +72,7 @@ class IndexStateManagementIndicesIT : IndexStateManagementRestTestCase() {
RestRequest.Method.POST.toString(),
"${RestChangePolicyAction.CHANGE_POLICY_BASE_URI}/$index", emptyMap(), changePolicy.toHttpEntity())

verifyIndexSchemaVersion(INDEX_STATE_MANAGEMENT_INDEX, 2)
verifyIndexSchemaVersion(INDEX_STATE_MANAGEMENT_INDEX, 3)

assertAffectedIndicesResponseIsEqual(mapOf(FAILURES to false, FAILED_INDICES to emptyList<Any>(), UPDATED_INDICES to 1), response.asMap())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,12 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() {
return (indexSettings[indexName]!!["settings"]!!["index.number_of_replicas"] as String).toInt()
}

@Suppress("UNCHECKED_CAST")
protected fun getIndexPrioritySetting(indexName: String): Int {
val indexSettings = getIndexSettings(indexName) as Map<String, Map<String, Map<String, Any?>>>
return (indexSettings[indexName]!!["settings"]!!["index.priority"] as String).toInt()
}

@Suppress("UNCHECKED_CAST")
protected fun getUuid(indexName: String): String {
val indexSettings = getIndexSettings(indexName) as Map<String, Map<String, Map<String, Any?>>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.N
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ReadOnlyActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ReadWriteActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ReplicaCountActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.IndexPriorityActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.RolloverActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.SnapshotActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.coordinator.ClusterStateManagedIndexConfig
Expand Down Expand Up @@ -145,6 +146,10 @@ fun randomReplicaCountActionConfig(numOfReplicas: Int = ESRestTestCase.randomInt
return ReplicaCountActionConfig(index = 0, numOfReplicas = numOfReplicas)
}

fun randomIndexPriorityActionConfig(indexPriority: Int = ESRestTestCase.randomIntBetween(0, 100)): IndexPriorityActionConfig {
return IndexPriorityActionConfig(index = 0, indexPriority = indexPriority)
}

fun randomForceMergeActionConfig(
maxNumSegments: Int = ESRestTestCase.randomIntBetween(1, 50)
): ForceMergeActionConfig {
Expand Down Expand Up @@ -345,6 +350,11 @@ fun ReplicaCountActionConfig.toJsonString(): String {
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
}

fun IndexPriorityActionConfig.toJsonString(): String {
val builder = XContentFactory.jsonBuilder()
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
}

fun ForceMergeActionConfig.toJsonString(): String {
val builder = XContentFactory.jsonBuilder()
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexstatemanagement.action

import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementRestTestCase
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Policy
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.State
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.IndexPriorityActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomErrorNotification
import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.Locale

class IndexPriorityActionIT : IndexStateManagementRestTestCase() {

private val testIndexName = javaClass.simpleName.toLowerCase(Locale.ROOT)

fun `test basic index priority`() {
val indexName = "${testIndexName}_index_1"
val policyID = "${testIndexName}_testPolicyName_1"
val actionConfig = IndexPriorityActionConfig(50, 0)
val states = listOf(State(name = "SetPriorityState", actions = listOf(actionConfig), transitions = listOf()))
val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)

createPolicy(policy, policyID)

createIndex(indexName, policyID)

val managedIndexConfig = getExistingManagedIndexConfig(indexName)
// Change the runJob start time so the job will trigger in 2 seconds
updateManagedIndexConfigStartTime(managedIndexConfig)

// ism policy initialized
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }

// change the runJob start time to change index priority
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertEquals("Index did not set index_priority to ${actionConfig.indexPriority}", actionConfig.indexPriority, getIndexPrioritySetting(indexName)) }
}
}
Loading