Skip to content
This repository was archived by the owner on Aug 2, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ import org.elasticsearch.threadpool.ThreadPool
import org.elasticsearch.transport.TransportInterceptor
import org.elasticsearch.watcher.ResourceWatcherService
import java.util.function.Supplier
import org.elasticsearch.common.component.Lifecycle
import org.elasticsearch.common.component.LifecycleComponent
import org.elasticsearch.common.component.LifecycleListener
import org.elasticsearch.common.inject.Inject
import org.elasticsearch.transport.RemoteClusterService
import org.elasticsearch.transport.TransportService

internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin, Plugin() {

Expand All @@ -129,6 +135,7 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
lateinit var clusterService: ClusterService
lateinit var indexNameExpressionResolver: IndexNameExpressionResolver
lateinit var rollupInterceptor: RollupInterceptor
lateinit var fieldCapsFilter: FieldCapsFilter

companion object {
const val PLUGIN_NAME = "opendistro-im"
Expand All @@ -148,6 +155,10 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act

override fun getJobRunner(): ScheduledJobRunner = IndexManagementRunner

override fun getGuiceServiceClasses(): Collection<Class<out LifecycleComponent?>> {
return mutableListOf<Class<out LifecycleComponent?>>(GuiceHolder::class.java)
}

override fun getJobParser(): ScheduledJobParser {
return ScheduledJobParser { xcp, id, jobDocVersion ->
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp)
Expand Down Expand Up @@ -237,6 +248,7 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
.registerMetadataServices(RollupMetadataService(client, xContentRegistry))
.registerConsumers()
rollupInterceptor = RollupInterceptor(clusterService, settings, indexNameExpressionResolver)
fieldCapsFilter = FieldCapsFilter(clusterService, settings, indexNameExpressionResolver)
this.indexNameExpressionResolver = indexNameExpressionResolver

val skipFlag = SkipExecution(client, clusterService)
Expand Down Expand Up @@ -293,7 +305,8 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
RollupSettings.ROLLUP_SEARCH_BACKOFF_MILLIS,
RollupSettings.ROLLUP_INDEX,
RollupSettings.ROLLUP_ENABLED,
RollupSettings.ROLLUP_SEARCH_ENABLED
RollupSettings.ROLLUP_SEARCH_ENABLED,
RollupSettings.ROLLUP_DASHBOARDS
)
}

Expand Down Expand Up @@ -326,6 +339,28 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
}

override fun getActionFilters(): List<ActionFilter> {
return listOf(FieldCapsFilter(clusterService, indexNameExpressionResolver))
return listOf(fieldCapsFilter)
}
}

class GuiceHolder @Inject constructor(
remoteClusterService: TransportService
) : LifecycleComponent {
override fun close() {}
override fun lifecycleState(): Lifecycle.State? {
return null
}

override fun addLifecycleListener(listener: LifecycleListener) {}
override fun removeLifecycleListener(listener: LifecycleListener) {}
override fun start() {}
override fun stop() {}

companion object {
lateinit var remoteClusterService: RemoteClusterService
}

init {
Companion.remoteClusterService = remoteClusterService.remoteClusterService
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.amazon.opendistroforelasticsearch.indexmanagement.rollup.actionfilter

import com.amazon.opendistroforelasticsearch.indexmanagement.GuiceHolder
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.model.Rollup
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.model.RollupFieldMapping
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.settings.RollupSettings
Expand All @@ -33,57 +34,85 @@ import org.elasticsearch.action.support.ActionFilterChain
import org.elasticsearch.action.support.IndicesOptions
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.Strings
import org.elasticsearch.common.xcontent.DeprecationHandler
import org.elasticsearch.common.xcontent.NamedXContentRegistry
import org.elasticsearch.common.xcontent.XContentFactory
import org.elasticsearch.common.xcontent.json.JsonXContent
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.tasks.Task
import org.elasticsearch.transport.RemoteClusterAware

private val logger = LogManager.getLogger(FieldCapsFilter::class.java)

@Suppress("UNCHECKED_CAST", "SpreadOperator", "TooManyFunctions")
@Suppress("UNCHECKED_CAST", "SpreadOperator", "TooManyFunctions", "ComplexMethod", "NestedBlockDepth")
class FieldCapsFilter(
val clusterService: ClusterService,
val indexNameExpressionResolver: IndexNameExpressionResolver
val settings: Settings,
private val indexNameExpressionResolver: IndexNameExpressionResolver
) : ActionFilter {

@Volatile private var shouldIntercept = RollupSettings.ROLLUP_DASHBOARDS.get(settings)

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_DASHBOARDS) {
flag -> shouldIntercept = flag
}
}

override fun <Request : ActionRequest?, Response : ActionResponse?> apply(
task: Task,
action: String,
request: Request,
listener: ActionListener<Response>,
chain: ActionFilterChain<Request, Response>
) {
if (request is FieldCapabilitiesRequest) {
if (request is FieldCapabilitiesRequest && shouldIntercept) {
val indices = request.indices().map { it.toString() }.toTypedArray()
val concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterService.state(), request.indicesOptions(), *indices)
val rollupIndices = mutableSetOf<String>()
val nonRollupIndices = mutableSetOf<String>()
for (index in concreteIndices) {
val isRollupIndex = RollupSettings.ROLLUP_INDEX.get(clusterService.state().metadata.index(index).settings)
if (isRollupIndex) {
rollupIndices.add(index)
} else {
nonRollupIndices.add(index)
val remoteClusterIndices = GuiceHolder.remoteClusterService.groupIndices(request.indicesOptions(), indices) {
idx: String? -> indexNameExpressionResolver.hasIndexAbstraction(idx, clusterService.state())
}
val localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)

localIndices?.let {
val concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterService.state(), request.indicesOptions(), it)
for (index in concreteIndices) {
val isRollupIndex = RollupSettings.ROLLUP_INDEX.get(clusterService.state().metadata.index(index).settings)
if (isRollupIndex) {
rollupIndices.add(index)
} else {
nonRollupIndices.add(index)
}
}
}

remoteClusterIndices.entries.forEach {
val cluster = it.key
val clusterIndices = it.value
clusterIndices.indices().forEach { index ->
nonRollupIndices.add("$cluster${RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR}$index")
}
}
logger.debug("Resolved into rollup $rollupIndices and non rollup $nonRollupIndices indices")

if (rollupIndices.isEmpty()) {
return chain.proceed(task, action, request, listener)
}

if (nonRollupIndices.isEmpty()) {
val rewrittenResponse = rewriteResponse(mapOf(), arrayOf(), rollupIndices)
return listener.onResponse(rewrittenResponse as Response)
/**
* The request can be one of two cases:
* 1 Just rollup indices
* 2 Rollup + NonRollup indices
* If 1 we forward the request to chain and discard the whole response from chain when rewriting.
* If 2 we forward the request to chain with only non rollup indices and append rollup data to response when rewriting.
* We are calling with rollup indices in 1 instead of an empty request since empty is defaulted to returning all indices in cluster.
**/
if (nonRollupIndices.isNotEmpty()) {
request.indices(*nonRollupIndices.toTypedArray())
}

request.indices(*nonRollupIndices.toTypedArray())
chain.proceed(task, action, request, object : ActionListener<Response> {
override fun onResponse(response: Response) {
logger.info("Has rollup indices will rewrite field caps response")
response as FieldCapabilitiesResponse
val rewrittenResponse = rewriteResponse(response.get(), response.indices, rollupIndices)
val rewrittenResponse = rewriteResponse(response, rollupIndices, nonRollupIndices.isEmpty())
listener.onResponse(rewrittenResponse as Response)
}

Expand All @@ -96,30 +125,63 @@ class FieldCapsFilter(
}
}

/**
* The FieldCapabilitiesResponse can contain merged or unmerged data. The response will hold unmerged data if its a cross cluster search.
*
* There is a boolean available in the FieldCapabilitiesRequest `isMergeResults` which indicates if the response is merged/unmerged.
* Unfortunately this is package private and when rewriting we can't access it from request. Instead will be relying on the response.
* If response has indexResponses then its unmerged else merged.
*/
private fun rewriteResponse(response: FieldCapabilitiesResponse, rollupIndices: Set<String>, shouldDiscardResponse: Boolean): ActionResponse {
val ismFieldCapabilitiesResponse = ISMFieldCapabilitiesResponse.fromFieldCapabilitiesResponse(response)
val isMergedResponse = ismFieldCapabilitiesResponse.indexResponses.isEmpty()

// if original response contained only rollup indices we should discard it
val fields = if (shouldDiscardResponse) mapOf() else response.get()
val indices = if (shouldDiscardResponse) arrayOf() else response.indices
val indexResponses = if (shouldDiscardResponse) listOf() else ismFieldCapabilitiesResponse.indexResponses

return if (isMergedResponse) {
rewriteResponse(indices, fields, rollupIndices)
} else {
val rollupIndexResponses = populateRollupIndexResponses(rollupIndices)
val mergedIndexResponses = indexResponses + rollupIndexResponses

val rewrittenISMResponse = ISMFieldCapabilitiesResponse(arrayOf(), mapOf(), mergedIndexResponses)
rewrittenISMResponse.toFieldCapabilitiesResponse()
}
}

private fun populateRollupIndexResponses(rollupIndices: Set<String>): List<ISMFieldCapabilitiesIndexResponse> {
val indexResponses = mutableListOf<ISMFieldCapabilitiesIndexResponse>()
rollupIndices.forEach { rollupIndex ->
val rollupIsmFieldCapabilities = mutableMapOf<String, ISMIndexFieldCapabilities>()
val rollupFieldMappings = populateSourceFieldMappingsForRollupIndex(rollupIndex)

rollupFieldMappings.forEach { rollupFieldMapping ->
val fieldName = rollupFieldMapping.fieldName
val type = rollupFieldMapping.sourceType!!
val isSearchable = rollupFieldMapping.fieldType == RollupFieldMapping.Companion.FieldType.DIMENSION
rollupIsmFieldCapabilities[fieldName] = ISMIndexFieldCapabilities(fieldName, type, isSearchable, true, mapOf())
}

indexResponses.add(ISMFieldCapabilitiesIndexResponse(rollupIndex, rollupIsmFieldCapabilities, true))
}

return indexResponses
}

private fun rewriteResponse(
fields: Map<String, Map<String, FieldCapabilities>>,
indices: Array<String>,
fields: Map<String, Map<String, FieldCapabilities>>,
rollupIndices: Set<String>
): ActionResponse {
val filteredIndicesFields = expandIndicesInFields(indices, fields)
val rollupIndicesFields = populateRollupIndicesFields(rollupIndices)
val mergedFields = mergeFields(filteredIndicesFields, rollupIndicesFields)
val mergedIndices = indices + rollupIndices.toTypedArray()

return buildFieldCapsResponse(mergedIndices, mergedFields)
}

private fun buildFieldCapsResponse(indices: Array<String>, fields: Map<String, Map<String, FieldCapabilities>>): ActionResponse {
val builder = XContentFactory.jsonBuilder().prettyPrint()
builder.startObject()
builder.field("indices", indices)
builder.field("fields", fields as Map<String, Any>?)
builder.endObject()

val parser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler
.THROW_UNSUPPORTED_OPERATION, Strings.toString(builder))

return FieldCapabilitiesResponse.fromXContent(parser)
return FieldCapabilitiesResponse(mergedIndices, mergedFields)
}

private fun populateRollupIndicesFields(rollupIndices: Set<String>): Map<String, Map<String, FieldCapabilities>> {
Expand All @@ -134,7 +196,7 @@ class FieldCapsFilter(
}
val isSearchable = fieldMapping.fieldType == RollupFieldMapping.Companion.FieldType.DIMENSION
response[fieldName]!![type] = FieldCapabilities(fieldName, type, isSearchable, true, fieldMappingIndexMap.getValue(fieldMapping)
.toTypedArray(), null, null, mapOf<String, Set<String>>())
.toTypedArray(), null, null, mapOf<String, Set<String>>())
}

return response
Expand All @@ -156,14 +218,11 @@ class FieldCapsFilter(
return rollupFieldMappings
}

private fun populateSourceFieldMappingsForRollupIndex(rollupIndex: String): Map<String, Set<RollupFieldMapping>> {
val fieldMappings = mutableMapOf<String, MutableSet<RollupFieldMapping>>()
private fun populateSourceFieldMappingsForRollupIndex(rollupIndex: String): Set<RollupFieldMapping> {
val fieldMappings = mutableSetOf<RollupFieldMapping>()
val rollupJobs = clusterService.state().metadata.index(rollupIndex).getRollupJobs() ?: return fieldMappings
rollupJobs.forEach { rollup ->
if (fieldMappings[rollup.targetIndex] == null) {
fieldMappings[rollup.targetIndex] = mutableSetOf()
}
fieldMappings[rollup.targetIndex]!!.addAll(populateSourceFieldMappingsForRollupJob(rollup))
fieldMappings.addAll(populateSourceFieldMappingsForRollupJob(rollup))
}
return fieldMappings
}
Expand All @@ -174,13 +233,11 @@ class FieldCapsFilter(

rollupIndices.forEach { rollupIndex ->
val fieldMappings = populateSourceFieldMappingsForRollupIndex(rollupIndex)
fieldMappings.forEach { rollupIndexFieldMappings ->
rollupIndexFieldMappings.value.forEach { fieldMapping ->
if (fieldMappingsMap[fieldMapping] == null) {
fieldMappingsMap[fieldMapping] = mutableSetOf()
}
fieldMappingsMap[fieldMapping]!!.add(rollupIndexFieldMappings.key)
fieldMappings.forEach { fieldMapping ->
if (fieldMappingsMap[fieldMapping] == null) {
fieldMappingsMap[fieldMapping] = mutableSetOf()
}
fieldMappingsMap[fieldMapping]!!.add(rollupIndex)
}
}

Expand All @@ -205,7 +262,7 @@ class FieldCapsFilter(
val fieldCaps = fields.getValue(field).getValue(type)
val rewrittenIndices = if (fieldCaps.indices() != null && fieldCaps.indices().isNotEmpty()) fieldCaps.indices() else indices
expandedResponse[field]!![type] = FieldCapabilities(fieldCaps.name, fieldCaps.type, fieldCaps.isSearchable, fieldCaps
.isAggregatable, rewrittenIndices, fieldCaps.nonSearchableIndices(), fieldCaps.nonAggregatableIndices(), fieldCaps.meta())
.isAggregatable, rewrittenIndices, fieldCaps.nonSearchableIndices(), fieldCaps.nonAggregatableIndices(), fieldCaps.meta())
}
}

Expand Down Expand Up @@ -257,12 +314,12 @@ class FieldCapsFilter(
val nonAggregatableIndices = mergeNonAggregatableIndices(fc1, fc2)
val nonSearchableIndices = mergeNonSearchableIndices(fc1, fc2)
val meta = (fc1.meta().keys + fc2.meta().keys)
.associateWith {
val data = mutableSetOf<String>()
data.addAll(fc1.meta().getOrDefault(it, mutableSetOf()))
data.addAll(fc2.meta().getOrDefault(it, mutableSetOf()))
data
}
.associateWith {
val data = mutableSetOf<String>()
data.addAll(fc1.meta().getOrDefault(it, mutableSetOf()))
data.addAll(fc2.meta().getOrDefault(it, mutableSetOf()))
data
}

return FieldCapabilities(name, type, isSearchable, isAggregatable, indices, nonSearchableIndices, nonAggregatableIndices, meta)
}
Expand Down
Loading