Skip to content
This repository was archived by the owner on Aug 2, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,36 @@ import org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken

/** Properties that will persist across steps of a single Action. Will be stored in the [ActionMetaData]. */
data class ActionProperties(
val maxNumSegments: Int? = null
val maxNumSegments: Int? = null,
val snapshotName: String? = null
) : Writeable, ToXContentFragment {

override fun writeTo(out: StreamOutput) {
out.writeOptionalInt(maxNumSegments)
out.writeOptionalString(snapshotName)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
if (maxNumSegments != null) builder.field(MAX_NUM_SEGMENTS, maxNumSegments)

if (snapshotName != null) builder.field(SNAPSHOT_NAME, snapshotName)
return builder
}

companion object {
const val ACTION_PROPERTIES = "action_properties"
const val MAX_NUM_SEGMENTS = "max_num_segments"
const val SNAPSHOT_NAME = "snapshot_name"

fun fromStreamInput(si: StreamInput): ActionProperties {
val maxNumSegments: Int? = si.readOptionalInt()
val snapshotName: String? = si.readOptionalString()

return ActionProperties(maxNumSegments)
return ActionProperties(maxNumSegments, snapshotName)
}

fun parse(xcp: XContentParser): ActionProperties {
var maxNumSegments: Int? = null
var snapshotName: String? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
while (xcp.nextToken() != Token.END_OBJECT) {
Expand All @@ -60,10 +65,11 @@ data class ActionProperties(

when (fieldName) {
MAX_NUM_SEGMENTS -> maxNumSegments = xcp.intValue()
SNAPSHOT_NAME -> snapshotName = xcp.text()
}
}

return ActionProperties(maxNumSegments)
return ActionProperties(maxNumSegments, snapshotName)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.step.snapshot
import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.SnapshotActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.ActionProperties
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step
import org.apache.logging.log4j.LogManager
Expand All @@ -27,6 +28,7 @@ import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.snapshots.ConcurrentSnapshotExecutionException
import org.elasticsearch.transport.RemoteTransportException
import java.time.LocalDateTime
import java.time.ZoneId
import java.time.format.DateTimeFormatter
Expand All @@ -42,20 +44,21 @@ class AttemptSnapshotStep(
private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null
private var snapshotName: String? = null

override fun isIdempotent() = false

@Suppress("TooGenericExceptionCaught")
@Suppress("TooGenericExceptionCaught", "ComplexMethod")
override suspend fun execute() {
try {
logger.info("Executing snapshot on ${managedIndexMetaData.index}")
val snapshotName = config
snapshotName = config
.snapshot
.plus("-")
.plus(LocalDateTime
.now(ZoneId.of("UTC"))
.format(DateTimeFormatter.ofPattern("uuuu.MM.dd-HH:mm:ss.SSS", Locale.ROOT)))
val mutableInfo = mutableMapOf("snapshotName" to snapshotName)
val mutableInfo = mutableMapOf<String, String>()

val createSnapshotRequest = CreateSnapshotRequest()
.userMetadata(mapOf("snapshot_created" to "Open Distro for Elasticsearch Index Management"))
Expand All @@ -68,7 +71,7 @@ class AttemptSnapshotStep(
when (response.status()) {
RestStatus.ACCEPTED -> {
stepStatus = StepStatus.COMPLETED
mutableInfo["message"] = "Snapshot creation started and is still in progress for index: ${managedIndexMetaData.index}"
mutableInfo["message"] = "Snapshot creation started for index: ${managedIndexMetaData.index}"
}
RestStatus.OK -> {
stepStatus = StepStatus.COMPLETED
Expand All @@ -81,24 +84,40 @@ class AttemptSnapshotStep(
}
}
info = mutableInfo.toMap()
} catch (e: RemoteTransportException) {
if (e.cause is ConcurrentSnapshotExecutionException) {
resolveSnapshotException(e.cause as ConcurrentSnapshotExecutionException)
} else {
resolveException(e)
}
} catch (e: ConcurrentSnapshotExecutionException) {
val message = "Snapshot creation already in progress."
logger.debug(message, e)
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to message)
resolveSnapshotException(e)
} catch (e: Exception) {
val message = "Failed to create snapshot for index: ${managedIndexMetaData.index}"
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
resolveException(e)
}
}

private fun resolveSnapshotException(e: ConcurrentSnapshotExecutionException) {
val message = "Snapshot creation already in progress."
logger.debug(message, e)
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to message)
}

private fun resolveException(e: Exception) {
val message = "Failed to create snapshot for index: ${managedIndexMetaData.index}"
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}

override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData {
val currentActionMetaData = currentMetaData.actionMetaData
return currentMetaData.copy(
actionMetaData = currentActionMetaData?.copy(actionProperties = ActionProperties(snapshotName = snapshotName)),
stepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), stepStatus),
transitionTo = null,
info = info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.step.snapshot
import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.SnapshotActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.ActionProperties
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.SnapshotsInProgress.State
import org.elasticsearch.cluster.service.ClusterService

class WaitForSnapshotStep(
Expand All @@ -40,31 +42,61 @@ class WaitForSnapshotStep(
override fun isIdempotent() = true

override suspend fun execute() {
logger.info("Waiting for snapshot to complete...")
val request = SnapshotsStatusRequest()
.snapshots(arrayOf(managedIndexMetaData.info?.get("snapshotName").toString()))
.repository(config.repository)
val response: SnapshotsStatusResponse = client.admin().cluster().suspendUntil { snapshotsStatus(request, it) }
val status: SnapshotStatus? = response
.snapshots
.find { snapshotStatus ->
snapshotStatus.snapshot.snapshotId.name == managedIndexMetaData.info?.get("snapshotName").toString() &&
snapshotStatus.snapshot.repository == config.repository
}
if (status != null) {
if (status.state.completed()) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to "Snapshot created for index: ${managedIndexMetaData.index}")
try {
logger.info("Waiting for snapshot to complete...")
val snapshotName = getSnapshotName() ?: return
val request = SnapshotsStatusRequest()
.snapshots(arrayOf(snapshotName))
.repository(config.repository)
val response: SnapshotsStatusResponse = client.admin().cluster().suspendUntil { snapshotsStatus(request, it) }
val status: SnapshotStatus? = response
.snapshots
.find { snapshotStatus ->
snapshotStatus.snapshot.snapshotId.name == snapshotName &&
snapshotStatus.snapshot.repository == config.repository
}
if (status != null) {
when (status.state) {
State.INIT, State.STARTED, State.WAITING -> {
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to "Creating snapshot in progress for index: ${managedIndexMetaData.index}", "state" to status.state)
}
State.SUCCESS -> {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to "Snapshot successfully created for index: ${managedIndexMetaData.index}", "state" to status.state)
}
else -> { // State.FAILED, State.ABORTED, State.MISSING, null
stepStatus = StepStatus.FAILED
info = mapOf("message" to "Snapshot doesn't exist for index: ${managedIndexMetaData.index}", "state" to status.state)
}
}
} else {
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to "Creating snapshot in progress for index: ${managedIndexMetaData.index}")
stepStatus = StepStatus.FAILED
info = mapOf("message" to "Snapshot doesn't exist for index: ${managedIndexMetaData.index}")
}
} else {
} catch (e: Exception) {
val message = "Failed to get status of snapshot for index: ${managedIndexMetaData.index}"
logger.error(message, e)
stepStatus = StepStatus.FAILED
info = mapOf("message" to "Snapshot doesn't exist for index: ${managedIndexMetaData.index}")
val mutableInfo = mutableMapOf("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}
}

private fun getSnapshotName(): String? {
val actionProperties = managedIndexMetaData.actionMetaData?.actionProperties

if (actionProperties?.snapshotName == null) {
stepStatus = StepStatus.FAILED
info = mapOf("message" to "Unable to retrieve [${ActionProperties.SNAPSHOT_NAME}] from ActionProperties=$actionProperties")
return null
}

return actionProperties.snapshotName
}

override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData {
return currentMetaData.copy(
stepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), stepStatus),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,11 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() {
}
}

protected fun deleteSnapshot(repository: String, snapshotName: String) {
val response = client().makeRequest("DELETE", "_snapshot/$repository/$snapshotName")
assertEquals("Unable to delete snapshot", RestStatus.OK, response.restStatus())
}

@Suppress("UNCHECKED_CAST")
protected fun assertSnapshotExists(
repository: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,104 @@ class SnapshotActionIT : IndexStateManagementRestTestCase() {
waitFor { assertSnapshotExists(repository, snapshot) }
waitFor { assertSnapshotFinishedWithSuccess(repository, snapshot) }
}

fun `test successful wait for snapshot step`() {
val indexName = "${testIndexName}_index_1"
val policyID = "${testIndexName}_testPolicyName_1"
val repository = "repository"
val snapshot = "snapshot_success_test"
val actionConfig = SnapshotActionConfig(repository, snapshot, 0)
val states = listOf(
State("Snapshot", listOf(actionConfig), listOf())
)

createRepository(repository)

val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)
createPolicy(policy, policyID)
createIndex(indexName, policyID)

val managedIndexConfig = getExistingManagedIndexConfig(indexName)

// Change the start time so the job will initialize the policy
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }

// Change the start time so attempt snapshot step with execute
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals("Snapshot creation started for index: $indexName", getExplainManagedIndexMetaData(indexName).info?.get("message")) }

// Change the start time so wait for snapshot step will execute
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals("Snapshot successfully created for index: $indexName", getExplainManagedIndexMetaData(indexName).info?.get("message")) }

// verify we set snapshotName in action properties
waitFor {
assert(
getExplainManagedIndexMetaData(indexName).actionMetaData?.actionProperties?.snapshotName?.contains(snapshot) == true
)
}

waitFor { assertSnapshotExists(repository, snapshot) }
waitFor { assertSnapshotFinishedWithSuccess(repository, snapshot) }
}

fun `test failed wait for snapshot step`() {
val indexName = "${testIndexName}_index_1"
val policyID = "${testIndexName}_testPolicyName_1"
val repository = "repository"
val snapshot = "snapshot_failed_test"
val actionConfig = SnapshotActionConfig(repository, snapshot, 0)
val states = listOf(
State("Snapshot", listOf(actionConfig), listOf())
)

createRepository(repository)

val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)
createPolicy(policy, policyID)
createIndex(indexName, policyID)

val managedIndexConfig = getExistingManagedIndexConfig(indexName)

// Change the start time so the job will initialize the policy
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }

// Change the start time so attempt snapshot step with execute
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals("Snapshot creation started for index: $indexName", getExplainManagedIndexMetaData(indexName).info?.get("message")) }

// Confirm successful snapshot creation
waitFor { assertSnapshotExists(repository, snapshot) }
waitFor { assertSnapshotFinishedWithSuccess(repository, snapshot) }

// Delete the snapshot so wait for step will fail with missing snapshot exception
val snapshotName = getExplainManagedIndexMetaData(indexName).actionMetaData?.actionProperties?.snapshotName
assertNotNull("Snapshot name is null", snapshotName)
deleteSnapshot(repository, snapshotName!!)

// Change the start time so wait for snapshot step will execute where we should see a missing snapshot exception
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor {
assertEquals("Failed to get status of snapshot for index: $indexName", getExplainManagedIndexMetaData(indexName).info?.get("message"))
assertEquals("[$repository:$snapshotName] is missing", getExplainManagedIndexMetaData(indexName).info?.get("cause"))
}
}
}