diff --git a/titan-es/src/main/java/com/thinkaurelius/titan/diskstorage/es/ElasticSearchIndex.java b/titan-es/src/main/java/com/thinkaurelius/titan/diskstorage/es/ElasticSearchIndex.java index ad83222b97..5d4a1b5fc5 100644 --- a/titan-es/src/main/java/com/thinkaurelius/titan/diskstorage/es/ElasticSearchIndex.java +++ b/titan-es/src/main/java/com/thinkaurelius/titan/diskstorage/es/ElasticSearchIndex.java @@ -29,7 +29,9 @@ import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequestBuilder; @@ -46,6 +48,7 @@ import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeBuilder; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.sort.FieldSortBuilder; @@ -561,7 +564,10 @@ public void mutate(Map> mutations, KeyInforma } } - if (bulkrequests > 0) brb.execute().actionGet(); + if (bulkrequests > 0) { + BulkResponse bulkResponse = brb.execute().actionGet(); + checkBulkResponseForFailures(bulkResponse); + } } catch (Exception e) { throw convert(e); } @@ -595,13 +601,38 @@ public void restore(Map>> documents, KeyInfo } } - if (requests > 0) - bulk.execute().actionGet(); + if (requests > 0) { + BulkResponse bulkResponse = bulk.execute().actionGet(); + checkBulkResponseForFailures(bulkResponse); + } } catch (Exception e) { throw convert(e); } } + /** + * If a BulkResponse has a failure except for {@link RestStatus#NOT_FOUND}, throw an exception. + * + * @param bulkResponse + * a BulkResponse to be analyzed + * + * @throws Exception + * if any of the requests failed + */ + private static void checkBulkResponseForFailures(BulkResponse bulkResponse) throws Exception { + if (bulkResponse.hasFailures()) { + for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) { + // We don't need to update a document if it's been deleted + // If it failed for a different reason, this is an "actual" failure + boolean actualFailure = bulkItemResponse.isFailed() && + bulkItemResponse.getFailure().getStatus() != RestStatus.NOT_FOUND; + if (actualFailure) { + throw new Exception(bulkResponse.buildFailureMessage()); + } + } + } + } + public FilterBuilder getFilter(Condition condition, KeyInformation.StoreRetriever informations) { if (condition instanceof PredicateCondition) { PredicateCondition atom = (PredicateCondition) condition;