Skip to content

Commit 79fb5b7

Browse files
committed
Adds rollover conditions into info object (opendistro-for-elasticsearch#208)
1 parent 7054b4b commit 79fb5b7

5 files changed

Lines changed: 136 additions & 89 deletions

File tree

src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/rollover/AttemptRolloverStep.kt

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse
3030
import org.elasticsearch.client.Client
3131
import org.elasticsearch.cluster.service.ClusterService
3232
import org.elasticsearch.common.unit.ByteSizeValue
33+
import org.elasticsearch.common.unit.TimeValue
3334
import org.elasticsearch.rest.RestStatus
3435
import java.time.Instant
3536

@@ -67,24 +68,49 @@ class AttemptRolloverStep(
6768
statsResponse ?: return
6869

6970
val indexCreationDate = clusterService.state().metaData().index(index).creationDate
70-
val indexCreationDateInstant = Instant.ofEpochMilli(indexCreationDate)
71-
if (indexCreationDate == -1L) {
71+
val indexAgeTimeValue = if (indexCreationDate == -1L) {
7272
logger.warn("$index had an indexCreationDate=-1L, cannot use for comparison")
73+
// since we cannot use for comparison, we can set it to 0 as minAge will never be <= 0
74+
TimeValue.timeValueMillis(0)
75+
} else {
76+
TimeValue.timeValueMillis(Instant.now().toEpochMilli() - indexCreationDate)
7377
}
7478
val numDocs = statsResponse.primaries.docs?.count ?: 0
7579
val indexSize = ByteSizeValue(statsResponse.primaries.docs?.totalSizeInBytes ?: 0)
76-
77-
if (config.evaluateConditions(indexCreationDateInstant, numDocs, indexSize)) {
80+
val conditions = listOfNotNull(
81+
config.minAge?.let {
82+
RolloverActionConfig.MIN_INDEX_AGE_FIELD to mapOf(
83+
"condition" to it.toString(),
84+
"current" to indexAgeTimeValue.toString(),
85+
"creationDate" to indexCreationDate
86+
)
87+
},
88+
config.minDocs?.let {
89+
RolloverActionConfig.MIN_DOC_COUNT_FIELD to mapOf(
90+
"condition" to it,
91+
"current" to numDocs
92+
)
93+
},
94+
config.minSize?.let {
95+
RolloverActionConfig.MIN_SIZE_FIELD to mapOf(
96+
"condition" to it.toString(),
97+
"current" to indexSize.toString()
98+
)
99+
}
100+
).toMap()
101+
102+
if (config.evaluateConditions(indexAgeTimeValue, numDocs, indexSize)) {
78103
logger.info("$index rollover conditions evaluated to true [indexCreationDate=$indexCreationDate," +
79104
" numDocs=$numDocs, indexSize=${indexSize.bytes}]")
80-
executeRollover(alias)
105+
executeRollover(alias, conditions)
81106
} else {
82107
stepStatus = StepStatus.CONDITION_NOT_MET
83-
info = mapOf("message" to "Attempting to rollover")
108+
info = mapOf("message" to "Attempting to rollover", "conditions" to conditions)
84109
}
85110
}
86111

87-
private suspend fun executeRollover(alias: String) {
112+
@Suppress("ComplexMethod")
113+
private suspend fun executeRollover(alias: String, conditions: Map<String, Map<String, Any?>>) {
88114
try {
89115
val request = RolloverRequest(alias, null)
90116
val response: RolloverResponse = client.admin().indices().suspendUntil { rolloverIndex(request, it) }
@@ -95,12 +121,18 @@ class AttemptRolloverStep(
95121
// If response isAcknowledged it means the index was created and alias was added to new index
96122
if (response.isAcknowledged) {
97123
stepStatus = StepStatus.COMPLETED
98-
info = mapOf("message" to "Rolled over index")
124+
info = listOfNotNull(
125+
"message" to "Rolled over index",
126+
if (conditions.isEmpty()) null else "conditions" to conditions // don't show empty conditions object if no conditions specified
127+
).toMap()
99128
} else {
100129
// If the alias update response is NOT acknowledged we will get back isAcknowledged=false
101130
// This means the new index was created but we failed to swap the alias
102131
stepStatus = StepStatus.FAILED
103-
info = mapOf("message" to "New index created (${response.newIndex}), but failed to update alias")
132+
info = listOfNotNull(
133+
"message" to "New index created (${response.newIndex}), but failed to update alias",
134+
if (conditions.isEmpty()) null else "conditions" to conditions // don't show empty conditions object if no conditions specified
135+
).toMap()
104136
}
105137
} catch (e: Exception) {
106138
logger.error("Failed to rollover index [index=${managedIndexMetaData.index}]", e)

src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/ManagedIndexUtils.kt

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import org.elasticsearch.action.update.UpdateRequest
4646
import org.elasticsearch.client.Client
4747
import org.elasticsearch.cluster.service.ClusterService
4848
import org.elasticsearch.common.unit.ByteSizeValue
49+
import org.elasticsearch.common.unit.TimeValue
4950
import org.elasticsearch.common.xcontent.ToXContent
5051
import org.elasticsearch.common.xcontent.XContentFactory
5152
import org.elasticsearch.index.query.BoolQueryBuilder
@@ -216,7 +217,7 @@ fun Transition.hasStatsConditions(): Boolean = this.conditions?.docCount != null
216217

217218
@Suppress("ReturnCount")
218219
fun RolloverActionConfig.evaluateConditions(
219-
indexCreationDate: Instant,
220+
indexAgeTimeValue: TimeValue,
220221
numDocs: Long,
221222
indexSize: ByteSizeValue
222223
): Boolean {
@@ -232,11 +233,7 @@ fun RolloverActionConfig.evaluateConditions(
232233
}
233234

234235
if (this.minAge != null) {
235-
val indexCreationDateMilli = indexCreationDate.toEpochMilli()
236-
if (indexCreationDateMilli != -1L) {
237-
val elapsedTime = Instant.now().toEpochMilli() - indexCreationDateMilli
238-
if (this.minAge.millis <= elapsedTime) return true
239-
}
236+
if (this.minAge.millis <= indexAgeTimeValue.millis) return true
240237
}
241238

242239
if (this.minSize != null) {

src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ActionTimeoutIT.kt

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedI
2020
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ActionConfig
2121
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.ActionMetaData
2222
import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor
23+
import org.hamcrest.collection.IsMapContaining
2324
import java.time.Instant
2425
import java.util.Locale
2526

@@ -54,13 +55,11 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() {
5455

5556
// the second execution we move into rollover action, we won't hit the timeout as this is the execution that sets the startTime
5657
updateManagedIndexConfigStartTime(managedIndexConfig)
57-
58-
val expectedInfoString = mapOf("message" to "Attempting to rollover").toString()
5958
waitFor {
60-
assertPredicatesOnMetaData(
61-
listOf(indexName to listOf(ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedInfoString == info.toString())),
62-
getExplainMap(indexName),
63-
strict = false
59+
assertThat(
60+
"Should be attempting to rollover",
61+
getExplainManagedIndexMetaData(indexName).info,
62+
IsMapContaining.hasEntry("message", "Attempting to rollover" as Any?)
6463
)
6564
}
6665

@@ -122,13 +121,11 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() {
122121
// the third execution we move into rollover action, we should not hit the timeout yet because its the first execution of rollover
123122
// but there was a bug before where it would use the startTime from the previous actions metadata and immediately fail
124123
updateManagedIndexConfigStartTime(managedIndexConfig)
125-
126-
val expectedRolloverInfoString = mapOf("message" to "Attempting to rollover").toString()
127124
waitFor {
128-
assertPredicatesOnMetaData(
129-
listOf(indexName to listOf(ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedRolloverInfoString == info.toString())),
130-
getExplainMap(indexName),
131-
strict = false
125+
assertThat(
126+
"Should be attempting to rollover",
127+
getExplainManagedIndexMetaData(indexName).info,
128+
IsMapContaining.hasEntry("message", "Attempting to rollover" as Any?)
132129
)
133130
}
134131
}

src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/RolloverActionIT.kt

Lines changed: 67 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,15 @@
1616
package com.amazon.opendistroforelasticsearch.indexstatemanagement.action
1717

1818
import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementRestTestCase
19-
import com.amazon.opendistroforelasticsearch.indexstatemanagement.makeRequest
2019
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Policy
2120
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.State
2221
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.RolloverActionConfig
2322
import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomErrorNotification
2423
import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor
25-
import org.apache.http.entity.ContentType
26-
import org.apache.http.entity.StringEntity
2724
import org.elasticsearch.common.unit.ByteSizeUnit
2825
import org.elasticsearch.common.unit.ByteSizeValue
29-
import org.elasticsearch.rest.RestRequest
26+
import org.elasticsearch.common.unit.TimeValue
27+
import org.hamcrest.core.Is.isA
3028
import org.junit.Assert
3129
import java.time.Instant
3230
import java.time.temporal.ChronoUnit
@@ -36,6 +34,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() {
3634

3735
private val testIndexName = javaClass.simpleName.toLowerCase(Locale.ROOT)
3836

37+
@Suppress("UNCHECKED_CAST")
3938
fun `test rollover no condition`() {
4039
val aliasName = "${testIndexName}_alias"
4140
val indexNameBase = "${testIndexName}_index"
@@ -65,10 +64,15 @@ class RolloverActionIT : IndexStateManagementRestTestCase() {
6564

6665
// Need to speed up to second execution where it will trigger the first execution of the action
6766
updateManagedIndexConfigStartTime(managedIndexConfig)
68-
waitFor { assertEquals("Index did not rollover.", mapOf("message" to "Rolled over index"), getExplainManagedIndexMetaData(firstIndex).info) }
67+
waitFor {
68+
val info = getExplainManagedIndexMetaData(firstIndex).info as Map<String, Any?>
69+
assertEquals("Index did not rollover.", "Rolled over index", info["message"])
70+
assertNull("Should not have conditions if none specified", info["conditions"])
71+
}
6972
Assert.assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002"))
7073
}
7174

75+
@Suppress("UNCHECKED_CAST")
7276
fun `test rollover multi condition byte size`() {
7377
val aliasName = "${testIndexName}_byte_alias"
7478
val indexNameBase = "${testIndexName}_index_byte"
@@ -98,36 +102,47 @@ class RolloverActionIT : IndexStateManagementRestTestCase() {
98102

99103
// Need to speed up to second execution where it will trigger the first execution of the action
100104
updateManagedIndexConfigStartTime(managedIndexConfig)
101-
waitFor { assertEquals("Index rollover before it met the condition.", mapOf("message" to "Attempting to rollover"), getExplainManagedIndexMetaData(firstIndex).info) }
102-
103-
client().makeRequest(
104-
RestRequest.Method.PUT.toString(),
105-
"$firstIndex/_doc/1111",
106-
StringEntity("{ \"testkey\": \"some valueaaaaaaa\" }", ContentType.APPLICATION_JSON)
107-
)
108-
client().makeRequest(
109-
RestRequest.Method.PUT.toString(),
110-
"$firstIndex/_doc/2222",
111-
StringEntity("{ \"testkey1\": \"some value1\" }", ContentType.APPLICATION_JSON)
112-
)
113-
client().makeRequest(
114-
RestRequest.Method.PUT.toString(),
115-
"$firstIndex/_doc/3333",
116-
StringEntity("{ \"testkey2\": \"some value2\" }", ContentType.APPLICATION_JSON)
117-
)
105+
waitFor {
106+
val info = getExplainManagedIndexMetaData(firstIndex).info as Map<String, Any?>
107+
assertEquals("Index rollover before it met the condition.", "Attempting to rollover", info["message"])
108+
val conditions = info["conditions"] as Map<String, Any?>
109+
assertEquals("Did not have exclusively min size and min doc count conditions",
110+
setOf(RolloverActionConfig.MIN_SIZE_FIELD, RolloverActionConfig.MIN_DOC_COUNT_FIELD), conditions.keys)
111+
val minSize = conditions[RolloverActionConfig.MIN_SIZE_FIELD] as Map<String, Any?>
112+
val minDocCount = conditions[RolloverActionConfig.MIN_DOC_COUNT_FIELD] as Map<String, Any?>
113+
assertEquals("Did not have min size condition", "10b", minSize["condition"])
114+
assertThat("Did not have min size current", minSize["current"], isA(String::class.java))
115+
assertEquals("Did not have min doc count condition", 1000000, minDocCount["condition"])
116+
assertEquals("Did not have min doc count current", 0, minDocCount["current"])
117+
}
118+
119+
insertSampleData(index = firstIndex, docCount = 5, delay = 0)
118120

119121
// Need to speed up to second execution where it will trigger the first execution of the action
120122
updateManagedIndexConfigStartTime(managedIndexConfig)
121-
waitFor { assertEquals("Index did not rollover.", mapOf("message" to "Rolled over index"), getExplainManagedIndexMetaData(firstIndex).info) }
123+
waitFor {
124+
val info = getExplainManagedIndexMetaData(firstIndex).info as Map<String, Any?>
125+
assertEquals("Index did not rollover", "Rolled over index", info["message"])
126+
val conditions = info["conditions"] as Map<String, Any?>
127+
assertEquals("Did not have exclusively min size and min doc count conditions",
128+
setOf(RolloverActionConfig.MIN_SIZE_FIELD, RolloverActionConfig.MIN_DOC_COUNT_FIELD), conditions.keys)
129+
val minSize = conditions[RolloverActionConfig.MIN_SIZE_FIELD] as Map<String, Any?>
130+
val minDocCount = conditions[RolloverActionConfig.MIN_DOC_COUNT_FIELD] as Map<String, Any?>
131+
assertEquals("Did not have min size condition", "10b", minSize["condition"])
132+
assertThat("Did not have min size current", minSize["current"], isA(String::class.java))
133+
assertEquals("Did not have min doc count condition", 1000000, minDocCount["condition"])
134+
assertEquals("Did not have min doc count current", 5, minDocCount["current"])
135+
}
122136
Assert.assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002"))
123137
}
124138

139+
@Suppress("UNCHECKED_CAST")
125140
fun `test rollover multi condition doc size`() {
126141
val aliasName = "${testIndexName}_doc_alias"
127142
val indexNameBase = "${testIndexName}_index_doc"
128143
val firstIndex = "$indexNameBase-1"
129144
val policyID = "${testIndexName}_testPolicyName_doc_1"
130-
val actionConfig = RolloverActionConfig(ByteSizeValue(10, ByteSizeUnit.TB), 3, null, 0)
145+
val actionConfig = RolloverActionConfig(null, 3, TimeValue.timeValueDays(2), 0)
131146
val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf()))
132147
val policy = Policy(
133148
id = policyID,
@@ -151,27 +166,37 @@ class RolloverActionIT : IndexStateManagementRestTestCase() {
151166

152167
// Need to speed up to second execution where it will trigger the first execution of the action
153168
updateManagedIndexConfigStartTime(managedIndexConfig)
154-
waitFor { assertEquals("Index rollover before it met the condition.", mapOf("message" to "Attempting to rollover"), getExplainManagedIndexMetaData(firstIndex).info) }
155-
156-
client().makeRequest(
157-
RestRequest.Method.PUT.toString(),
158-
"$firstIndex/_doc/1111",
159-
StringEntity("{ \"testkey\": \"some value\" }", ContentType.APPLICATION_JSON)
160-
)
161-
client().makeRequest(
162-
RestRequest.Method.PUT.toString(),
163-
"$firstIndex/_doc/2222",
164-
StringEntity("{ \"testkey1\": \"some value1\" }", ContentType.APPLICATION_JSON)
165-
)
166-
client().makeRequest(
167-
RestRequest.Method.PUT.toString(),
168-
"$firstIndex/_doc/3333",
169-
StringEntity("{ \"testkey2\": \"some value2\" }", ContentType.APPLICATION_JSON)
170-
)
169+
waitFor {
170+
val info = getExplainManagedIndexMetaData(firstIndex).info as Map<String, Any?>
171+
assertEquals("Index rollover before it met the condition.", "Attempting to rollover", info["message"])
172+
val conditions = info["conditions"] as Map<String, Any?>
173+
assertEquals("Did not have exclusively min age and min doc count conditions",
174+
setOf(RolloverActionConfig.MIN_INDEX_AGE_FIELD, RolloverActionConfig.MIN_DOC_COUNT_FIELD), conditions.keys)
175+
val minAge = conditions[RolloverActionConfig.MIN_INDEX_AGE_FIELD] as Map<String, Any?>
176+
val minDocCount = conditions[RolloverActionConfig.MIN_DOC_COUNT_FIELD] as Map<String, Any?>
177+
assertEquals("Did not have min age condition", "2d", minAge["condition"])
178+
assertThat("Did not have min age current", minAge["current"], isA(String::class.java))
179+
assertEquals("Did not have min doc count condition", 3, minDocCount["condition"])
180+
assertEquals("Did not have min doc count current", 0, minDocCount["current"])
181+
}
182+
183+
insertSampleData(index = firstIndex, docCount = 5, delay = 0)
171184

172185
// Need to speed up to second execution where it will trigger the first execution of the action
173186
updateManagedIndexConfigStartTime(managedIndexConfig)
174-
waitFor { assertEquals("Index did not rollover.", mapOf("message" to "Rolled over index"), getExplainManagedIndexMetaData(firstIndex).info) }
187+
waitFor {
188+
val info = getExplainManagedIndexMetaData(firstIndex).info as Map<String, Any?>
189+
assertEquals("Index did not rollover", "Rolled over index", info["message"])
190+
val conditions = info["conditions"] as Map<String, Any?>
191+
assertEquals("Did not have exclusively min age and min doc count conditions",
192+
setOf(RolloverActionConfig.MIN_INDEX_AGE_FIELD, RolloverActionConfig.MIN_DOC_COUNT_FIELD), conditions.keys)
193+
val minAge = conditions[RolloverActionConfig.MIN_INDEX_AGE_FIELD] as Map<String, Any?>
194+
val minDocCount = conditions[RolloverActionConfig.MIN_DOC_COUNT_FIELD] as Map<String, Any?>
195+
assertEquals("Did not have min age condition", "2d", minAge["condition"])
196+
assertThat("Did not have min age current", minAge["current"], isA(String::class.java))
197+
assertEquals("Did not have min doc count condition", 3, minDocCount["condition"])
198+
assertEquals("Did not have min doc count current", 5, minDocCount["current"])
199+
}
175200
Assert.assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002"))
176201
}
177202
}

0 commit comments

Comments
 (0)