Skip to content
This repository was archived by the owner on Aug 2, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.addpolicy.TransportAddPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.changepolicy.ChangePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.changepolicy.TransportChangePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.indexpolicy.IndexPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.indexpolicy.TransportIndexPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.explain.ExplainAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.explain.TransportExplainAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy.GetPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy.TransportGetPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.indexpolicy.IndexPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.indexpolicy.TransportIndexPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.removepolicy.RemovePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.removepolicy.TransportRemovePolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.retryfailedmanagedindex.RetryFailedManagedIndexAction
Expand Down Expand Up @@ -209,8 +211,9 @@ internal class IndexManagementPlugin : JobSchedulerExtension, ActionPlugin, Plug
ActionPlugin.ActionHandler(AddPolicyAction.INSTANCE, TransportAddPolicyAction::class.java),
ActionPlugin.ActionHandler(RetryFailedManagedIndexAction.INSTANCE, TransportRetryFailedManagedIndexAction::class.java),
ActionPlugin.ActionHandler(ChangePolicyAction.INSTANCE, TransportChangePolicyAction::class.java),
ActionPlugin.ActionHandler(ExplainAction.INSTANCE, TransportExplainAction::class.java),
ActionPlugin.ActionHandler(IndexPolicyAction.INSTANCE, TransportIndexPolicyAction::class.java),
ActionPlugin.ActionHandler(ExplainAction.INSTANCE, TransportExplainAction::class.java)
ActionPlugin.ActionHandler(GetPolicyAction.INSTANCE, TransportGetPolicyAction::class.java)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,18 @@

package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.POLICY_BASE_URI
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.Policy
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.Policy.Companion.POLICY_TYPE
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE
import com.amazon.opendistroforelasticsearch.indexmanagement.util._ID
import com.amazon.opendistroforelasticsearch.indexmanagement.util._PRIMARY_TERM
import com.amazon.opendistroforelasticsearch.indexmanagement.util._SEQ_NO
import com.amazon.opendistroforelasticsearch.indexmanagement.util._VERSION
import org.elasticsearch.action.get.GetRequest
import org.elasticsearch.action.get.GetResponse
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy.GetPolicyAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy.GetPolicyRequest
import org.elasticsearch.client.node.NodeClient
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler
import org.elasticsearch.common.xcontent.XContentHelper
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.rest.BaseRestHandler
import org.elasticsearch.rest.BaseRestHandler.RestChannelConsumer
import org.elasticsearch.rest.RestHandler.Route
import org.elasticsearch.rest.BytesRestResponse
import org.elasticsearch.rest.RestChannel
import org.elasticsearch.rest.RestRequest
import org.elasticsearch.rest.RestRequest.Method.GET
import org.elasticsearch.rest.RestRequest.Method.HEAD
import org.elasticsearch.rest.RestResponse
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.rest.action.RestActions
import org.elasticsearch.rest.action.RestResponseListener
import org.elasticsearch.rest.action.RestToXContentListener
import org.elasticsearch.search.fetch.subphase.FetchSourceContext

class RestGetPolicyAction : BaseRestHandler() {
Expand All @@ -58,46 +44,19 @@ class RestGetPolicyAction : BaseRestHandler() {

override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
val policyId = request.param("policyID")

if (policyId == null || policyId.isEmpty()) {
throw IllegalArgumentException("Missing policy ID")
}
val getRequest = GetRequest(INDEX_MANAGEMENT_INDEX, policyId)
.version(RestActions.parseVersion(request))

var fetchSrcContext: FetchSourceContext = FetchSourceContext.FETCH_SOURCE
if (request.method() == HEAD) {
getRequest.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE)
fetchSrcContext = FetchSourceContext.DO_NOT_FETCH_SOURCE
}
return RestChannelConsumer { channel -> client.get(getRequest, getPolicyResponse(channel)) }
}

private fun getPolicyResponse(channel: RestChannel): RestResponseListener<GetResponse> {
return object : RestResponseListener<GetResponse>(channel) {
@Throws(Exception::class)
override fun buildResponse(response: GetResponse): RestResponse {
if (!response.isExists) {
return BytesRestResponse(RestStatus.NOT_FOUND, channel.newBuilder())
}
val getPolicyRequest = GetPolicyRequest(policyId, RestActions.parseVersion(request), fetchSrcContext)

val builder = channel.newBuilder()
.startObject()
.field(_ID, response.id)
.field(_VERSION, response.version)
.field(_SEQ_NO, response.seqNo)
.field(_PRIMARY_TERM, response.primaryTerm)
if (!response.isSourceEmpty) {
XContentHelper.createParser(
channel.request().xContentRegistry,
LoggingDeprecationHandler.INSTANCE,
response.sourceAsBytesRef,
XContentType.JSON
).use { xcp ->
val policy = Policy.parseWithType(xcp, response.id, response.seqNo, response.primaryTerm)
builder.field(POLICY_TYPE, policy, XCONTENT_WITHOUT_TYPE)
}
}
builder.endObject()
return BytesRestResponse(RestStatus.OK, builder)
}
return RestChannelConsumer { channel ->
client.execute(GetPolicyAction.INSTANCE, getPolicyRequest, RestToXContentListener(channel))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy

import org.elasticsearch.action.ActionType

class GetPolicyAction private constructor() : ActionType<GetPolicyResponse>(NAME, ::GetPolicyResponse) {
companion object {
val INSTANCE = GetPolicyAction()
val NAME = "cluster:admin/opendistro/ism/policy/read"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy

import org.elasticsearch.action.ActionRequest
import org.elasticsearch.action.ActionRequestValidationException
import org.elasticsearch.action.ValidateActions
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import org.elasticsearch.search.fetch.subphase.FetchSourceContext
import java.io.IOException

class GetPolicyRequest : ActionRequest {

val policyID: String
val version: Long
val fetchSrcContext: FetchSourceContext

constructor(
policyID: String,
version: Long,
fetchSrcContext: FetchSourceContext
) : super() {
this.policyID = policyID
this.version = version
this.fetchSrcContext = fetchSrcContext
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
policyID = sin.readString(),
version = sin.readLong(),
fetchSrcContext = FetchSourceContext(sin)
)

override fun validate(): ActionRequestValidationException? {
var validationException: ActionRequestValidationException? = null
if (policyID.isBlank()) {
validationException = ValidateActions.addValidationError(
"Missing policy ID",
validationException
)
}
return validationException
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(policyID)
out.writeLong(version)
fetchSrcContext.writeTo(out)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy

import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.Policy
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE
import com.amazon.opendistroforelasticsearch.indexmanagement.util._ID
import com.amazon.opendistroforelasticsearch.indexmanagement.util._PRIMARY_TERM
import com.amazon.opendistroforelasticsearch.indexmanagement.util._SEQ_NO
import com.amazon.opendistroforelasticsearch.indexmanagement.util._VERSION
import org.elasticsearch.action.ActionResponse
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.ToXContentObject
import org.elasticsearch.common.xcontent.XContentBuilder
import java.io.IOException

class GetPolicyResponse : ActionResponse, ToXContentObject {

val id: String
val version: Long
val seqNo: Long
val primaryTerm: Long
val policy: Policy?

constructor(
id: String,
version: Long,
seqNo: Long,
primaryTerm: Long,
policy: Policy?
) : super() {
this.id = id
this.version = version
this.seqNo = seqNo
this.primaryTerm = primaryTerm
this.policy = policy
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
id = sin.readString(),
version = sin.readLong(),
seqNo = sin.readLong(),
primaryTerm = sin.readLong(),
policy = sin.readOptionalWriteable(::Policy)
)

override fun writeTo(out: StreamOutput) {
out.writeString(id)
out.writeLong(version)
out.writeLong(seqNo)
out.writeLong(primaryTerm)
out.writeOptionalWriteable(policy)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.field(_ID, id)
.field(_VERSION, version)
.field(_SEQ_NO, seqNo)
.field(_PRIMARY_TERM, primaryTerm)
if (policy != null) {
builder.field(Policy.POLICY_TYPE, policy, XCONTENT_WITHOUT_TYPE)
}

return builder.endObject()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.getpolicy

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.Policy
import org.elasticsearch.ElasticsearchStatusException
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.get.GetRequest
import org.elasticsearch.action.get.GetResponse
import org.elasticsearch.action.support.ActionFilters
import org.elasticsearch.action.support.HandledTransportAction
import org.elasticsearch.client.node.NodeClient
import org.elasticsearch.common.inject.Inject
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler
import org.elasticsearch.common.xcontent.NamedXContentRegistry
import org.elasticsearch.common.xcontent.XContentHelper
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.tasks.Task
import org.elasticsearch.transport.TransportService

class TransportGetPolicyAction @Inject constructor(
val client: NodeClient,
transportService: TransportService,
actionFilters: ActionFilters,
val xContentRegistry: NamedXContentRegistry
) : HandledTransportAction<GetPolicyRequest, GetPolicyResponse>(
GetPolicyAction.NAME, transportService, actionFilters, ::GetPolicyRequest
) {
override fun doExecute(task: Task, request: GetPolicyRequest, listener: ActionListener<GetPolicyResponse>) {
GetPolicyHandler(client, listener, request).start()
}

inner class GetPolicyHandler(
private val client: NodeClient,
private val actionListener: ActionListener<GetPolicyResponse>,
private val request: GetPolicyRequest
) {
fun start() {
val getRequest = GetRequest(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX, request.policyID)
.version(request.version)
.fetchSourceContext(request.fetchSrcContext)

client.get(getRequest, object : ActionListener<GetResponse> {
override fun onResponse(response: GetResponse) {
onGetResponse(response)
}

override fun onFailure(t: Exception) {
actionListener.onFailure(t)
}
})
}

fun onGetResponse(response: GetResponse) {
if (!response.isExists) {
actionListener.onFailure(ElasticsearchStatusException("Policy not found", RestStatus.NOT_FOUND))
return
}

var policy: Policy? = null
if (!response.isSourceEmpty) {
XContentHelper.createParser(
xContentRegistry,
LoggingDeprecationHandler.INSTANCE,
response.sourceAsBytesRef,
XContentType.JSON
).use { xcp ->
policy = Policy.parseWithType(xcp, response.id, response.seqNo, response.primaryTerm)
}
}

actionListener.onResponse(
GetPolicyResponse(response.id, response.version, response.seqNo, response.primaryTerm, policy)
)
}
}
}
Loading