Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions deployment/clear_opensearch/clear_opensearch.sh.tpl
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#!/bin/bash
echo -e "\nDelete OpenSearch indexes\n"
curl -X DELETE https://$OPENSEARCH_DOMAIN/production-locations --aws-sigv4 "aws:amz:eu-west-1:es" --user "$AWS_ACCESS_KEY_ID:$AWS_SECRET_ACCESS_KEY"
echo -e "\nRemove lock file\n"
curl -X DELETE https://$OPENSEARCH_DOMAIN/moderation-events --aws-sigv4 "aws:amz:eu-west-1:es" --user "$AWS_ACCESS_KEY_ID:$AWS_SECRET_ACCESS_KEY"

echo -e "\nRemove the JDBC input lock files from the EFS connected to Logstash\n"
sudo mount -t efs -o tls,accesspoint=$EFS_AP_ID $EFS_ID:/ /mnt
sudo rm /mnt/logstash_jdbc_last_run
sudo rm /mnt/production_locations_jdbc_last_run
sudo rm /mnt/moderation_events_jdbc_last_run
sudo umount /mnt
29 changes: 15 additions & 14 deletions deployment/terraform/container_service.tf
Original file line number Diff line number Diff line change
Expand Up @@ -384,20 +384,21 @@ data "template_file" "app_logstash" {
template = file("task-definitions/app_logstash.json")

vars = {
image = local.app_logstash_image
log_group_name = "log${local.short}AppLogstash"
opensearch_auth_type = var.opensearch_auth_type
aws_region = var.aws_region
opensearch_ssl = var.opensearch_ssl
opensearch_ssl_cert_verification = var.opensearch_ssl_cert_verification
opensearch_host = aws_opensearch_domain.opensearch.endpoint
opensearch_port = var.opensearch_port
postgres_host = aws_route53_record.database.name
postgres_port = module.database_enc.port
postgres_user = var.rds_database_username
postgres_password = var.rds_database_password
postgres_db = var.rds_database_name
logstash_update_interval_minutes = var.logstash_update_interval_minutes
image = local.app_logstash_image
log_group_name = "log${local.short}AppLogstash"
opensearch_auth_type = var.opensearch_auth_type
aws_region = var.aws_region
opensearch_ssl = var.opensearch_ssl
opensearch_ssl_cert_verification = var.opensearch_ssl_cert_verification
opensearch_host = aws_opensearch_domain.opensearch.endpoint
opensearch_port = var.opensearch_port
postgres_host = aws_route53_record.database.name
postgres_port = module.database_enc.port
postgres_user = var.rds_database_username
postgres_password = var.rds_database_password
postgres_db = var.rds_database_name
production_locations_pipeline_update_interval_minutes = var.production_locations_pipeline_update_interval_minutes
moderation_events_pipeline_update_interval_minutes = var.moderation_events_pipeline_update_interval_minutes
}
}

Expand Down
3 changes: 2 additions & 1 deletion deployment/terraform/task-definitions/app_logstash.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
{ "name": "POSTGRES_PASSWORD", "value": "${postgres_password}" },
{ "name": "POSTGRES_DB", "value": "${postgres_db}" },
{ "name": "AWS_REGION", "value": "${aws_region}"},
{ "name": "LOGSTASH_UPDATE_INTERVAL_MINUTES", "value": "${logstash_update_interval_minutes}"}
{ "name": "PRODUCTION_LOCATIONS_PIPELINE_UPDATE_INTERVAL_MINUTES", "value": "${production_locations_pipeline_update_interval_minutes}"},
{ "name": "MODERATION_EVENTS_PIPELINE_UPDATE_INTERVAL_MINUTES", "value": "${moderation_events_pipeline_update_interval_minutes}" }
],
"logConfiguration": {
"logDriver": "awslogs",
Expand Down
7 changes: 6 additions & 1 deletion deployment/terraform/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -542,11 +542,16 @@ variable "opensearch_ssl_cert_verification" {
default = true
}

variable "logstash_update_interval_minutes" {
variable "production_locations_pipeline_update_interval_minutes" {
type = number
default = 15
}

variable "moderation_events_pipeline_update_interval_minutes" {
type = number
default = 1
}

variable "app_logstash_ecs_desired_count" {
type = number
default = 1
Expand Down
7 changes: 4 additions & 3 deletions doc/release/RELEASE-NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,21 @@ This project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html
* *Describe scheme changes here.*

### Code/API changes
* *Describe code/API changes here.*
* [OSDEV-1335](https://opensupplyhub.atlassian.net/browse/OSDEV-1335) - Explicitly set the number of shards and the number of replicas for the "production locations" and "moderation events" OpenSearch indexes. Based on the OpenSearch documentation, a storage size of 10–30 GB is preferred for workloads that prioritize low search latency. Additionally, having too many small shards can unnecessarily exhaust memory by storing excessive metadata. Currently, the "production locations" index utilizes 651.9 MB, including replicas, while the "moderation events" index is empty. This indicates that one shard and one replica should be sufficient for the "production locations" and "moderation events" indexes.

### Architecture/Environment changes
* The OpenSearch version has been increased to 2.15.
* [OSDEV-1335](https://opensupplyhub.atlassian.net/browse/OSDEV-1335) - The new "moderation events" Logstash pipeline has been configured and implemented to collect moderation event data from the current PostgreSQL database and save it to OpenSearch. This setup allows for fast searches on the moderation events data.

### Bugfix
* *Describe bugfix here.*
* [OSDEV-1335](https://opensupplyhub.atlassian.net/browse/OSDEV-1335) - Fixed the assertion in the test for the `country.rb` filter of the "production locations" Logstash pipeline. The main issue was with the evaluation of statements in the Ruby block. Since only the last statement is evaluated in a Ruby block, all the checks were grouped into one chain of logical statements and returned as a `result` variable at the end.

### What's new
* *Describe what's new here. The changes that can impact user experience should be listed in this section.*

### Release instructions:
* The following steps should be completed while deploying to Staging or Production:
1. Run the `[Release] Deploy` pipeline for these environments with the flag 'Clear OpenSearch indexes' set to true. This will allow Logstash to refill OpenSearch since the OpenSearch instance will be recreated due to the version increase.
1. Run the `[Release] Deploy` pipeline for these environments with the flag 'Clear OpenSearch indexes' set to true. This will allow Logstash to refill OpenSearch since the OpenSearch instance will be recreated due to the version increase. It is also necessary due to changes in the OpenSearch index settings.
2. Open the triggered `Deploy to AWS` workflow and ensure that the `apply` job is completed. **Right after** finishing the `apply` job, follow these instructions, which should be the last steps in setting up the recreated OpenSearch instance:
- Copy the ARN of the `terraform_ci` user from the AWS IAM console.
- Navigate to the AWS console's search input, type "IAM", and open the IAM console.
Expand Down
3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ services:
- POSTGRES_USER=opensupplyhub
- POSTGRES_PASSWORD=opensupplyhub
- POSTGRES_DB=opensupplyhub
- LOGSTASH_UPDATE_INTERVAL_MINUTES=1
- PRODUCTION_LOCATIONS_PIPELINE_UPDATE_INTERVAL_MINUTES=1
- MODERATION_EVENTS_PIPELINE_UPDATE_INTERVAL_MINUTES=1

depends_on:
- opensearch-single-node
Expand Down
15 changes: 10 additions & 5 deletions src/logstash/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
FROM logstash:8.13.4
FROM logstash:8.15.3

ENV LS_JAVA_OPTS="-XX:MaxDirectMemorySize=512m"

# Install a specific version of the OpenSearch output plugin for Logstash.
RUN bin/logstash-plugin install --version 2.0.2 logstash-output-opensearch

# Delete configuration samples in the custom image to ensure that the example
# 1. Delete configuration samples in the custom image to ensure that the example
# configuration files from the base image are not retained.
# 2. Delete the pipelines.yml file to overwrite it and support multiple
# pipelines.
# 3. Create the folder to save the lock files for the JDBC inputs.
RUN rm -f /usr/share/logstash/pipeline/* \
&& rm -f /usr/share/logstash/config/logstash-sample.conf
&& rm -f /usr/share/logstash/config/logstash-sample.conf \
&& rm -f /usr/share/logstash/config/pipelines.yml \
&& mkdir -p /usr/share/logstash/data/plugins/inputs/jdbc

# Copy the PostgreSQL jdbc driver to the external library folder of Logstash.
COPY --chown=logstash:root ./bin/ /usr/share/logstash/logstash-core/lib/jars/

# Copy the Logstash configuration file to the config folder of Logstash.
# Copy the Logstash configuration files to the config folder of Logstash.
COPY --chown=logstash:root ./config/ /usr/share/logstash/config/

# Copy the Logstash pipeline configuration file to the pipeline folder of
# Copy the Logstash pipeline configuration files to the pipeline folder of
# Logstash.
COPY --chown=logstash:root ./pipeline/ /usr/share/logstash/pipeline/

Expand Down
1 change: 1 addition & 0 deletions src/logstash/config/logstash.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
api.enabled: false
8 changes: 8 additions & 0 deletions src/logstash/config/pipelines.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# This file is where pipelines are defined.
# For more information on multiple pipelines, see the documentation:
# https://www.elastic.co/guide/en/logstash/8.15/multiple-pipelines.html

- pipeline.id: production_locations
path.config: "/usr/share/logstash/pipeline/sync_production_locations.conf"
- pipeline.id: moderation_events
path.config: "/usr/share/logstash/pipeline/sync_moderation_events.conf"
68 changes: 68 additions & 0 deletions src/logstash/indexes/moderation_events.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
{
"index_patterns": [
"moderation-events*"
],
"template": {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"moderation_id": {
"type": "keyword"
},
"os_id": {
"type": "keyword"
},
"contributor_id": {
"type": "keyword"
},
"contributor_name": {
"type": "keyword"
},
"claim_id": {
"type": "keyword"
},
"request_type": {
"type": "keyword"
},
"source": {
"type": "keyword"
},
"status": {
"type": "keyword"
},
"status_change_date": {
"type": "date"
},
"created_at": {
"type": "date"
},
"updated_at": {
"type": "date"
},
"cleaned_data": {
"properties": {
"country": {
"properties": {
"name": {
"type": "keyword"
},
"alpha_2": {
"type": "keyword"
},
"alpha_3": {
"type": "keyword"
},
"numeric": {
"type": "keyword"
}
}
}
}
}
}
}
}
}
2 changes: 2 additions & 0 deletions src/logstash/indexes/production_locations.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
],
"template": {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1,
"analysis": {
"analyzer": {
"custom_asciifolding_analyzer": {
Expand Down
64 changes: 64 additions & 0 deletions src/logstash/pipeline/sync_moderation_events.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
input{
jdbc{
jdbc_connection_string => "jdbc:postgresql://${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}"
jdbc_user => "${POSTGRES_USER}"
jdbc_password => "${POSTGRES_PASSWORD}"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_paging_enabled => true
use_column_value => true
tracking_column => "updated_at"
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/data/plugins/inputs/jdbc/moderation_events_jdbc_last_run"
schedule => "*/${MODERATION_EVENTS_PIPELINE_UPDATE_INTERVAL_MINUTES} * * * *"
statement_filepath => "/usr/share/logstash/sql/sync_moderation_events.sql"
}
}
filter {
json {
source => "cleaned_data_value"
target => "cleaned_data_value"
}
ruby {
path => "/usr/share/logstash/scripts/moderation_events/country.rb"
}
ruby {
path => "/usr/share/logstash/scripts/moderation_events/os_id.rb"
}
ruby {
path => "/usr/share/logstash/scripts/moderation_events/claim_id.rb"
}
ruby {
path => "/usr/share/logstash/scripts/moderation_events/source.rb"
}
ruby {
path => "/usr/share/logstash/scripts/moderation_events/status_change_date.rb"
}
mutate {
copy => { "moderation_id" => "[@metadata][_id]" }
remove_field => [
"@version",
"@timestamp",
"cleaned_data_value",
"os_id_value",
"claim_id_value",
"source_value",
"status_change_date_value"
]
}
}
output {
opensearch {
hosts => ["${OPENSEARCH_HOST}:${OPENSEARCH_PORT}"]
auth_type => {
type => "${OPENSEARCH_AUTH_TYPE}"
region => "${AWS_REGION}"
}
ssl => "${OPENSEARCH_SSL}"
ssl_certificate_verification => "${OPENSEARCH_SSL_CERT_VERIFICATION}"
index => "moderation-events"
document_id => "%{[@metadata][_id]}"
template_name => "moderation_events_template"
template => "/usr/share/logstash/indexes/moderation_events.json"
legacy_template => false
}
}
3 changes: 2 additions & 1 deletion src/logstash/pipeline/sync_production_locations.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ input{
use_column_value => true
tracking_column => "updated_at"
tracking_column_type => "timestamp"
schedule => "*/${LOGSTASH_UPDATE_INTERVAL_MINUTES} * * * *"
last_run_metadata_path => "/usr/share/logstash/data/plugins/inputs/jdbc/production_locations_jdbc_last_run"
schedule => "*/${PRODUCTION_LOCATIONS_PIPELINE_UPDATE_INTERVAL_MINUTES} * * * *"
statement_filepath => "/usr/share/logstash/sql/sync_production_locations.sql"
}
}
Expand Down
42 changes: 42 additions & 0 deletions src/logstash/scripts/moderation_events/claim_id.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
def filter(event)
claim_id_value = event.get('claim_id_value')

is_claim_id_value_valid = !claim_id_value.nil?
event.set('claim_id', claim_id_value) if is_claim_id_value_valid

return [event]
end

test 'claim_id filter with valid claim_id_value' do
in_event {
{
'claim_id_value' => 12,
'status' => 'PENDING'
}
}

expect('returns an object with claim_id set') do |events|
is_claim_id_value_present = events[0].get('claim_id_value') == 12
is_status_present = events[0].get('status') == 'PENDING'
is_claim_id_present = events[0].get('claim_id') == 12

is_claim_id_value_present && is_status_present && is_claim_id_present
end
end

test 'claim_id filter with nil claim_id_value' do
in_event {
{
'claim_id_value' => nil,
'status' => 'PENDING'
}
}

expect('returns the same object') do |events|
is_claim_id_value_nil = events[0].get('claim_id_value').nil?
is_status_present = events[0].get('status') == 'PENDING'
is_claim_id_present = events[0].to_hash.key?('claim_id')

is_claim_id_value_nil && is_status_present && !is_claim_id_present
end
end
Loading