Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions deployment/clear_opensearch/clear_opensearch.sh.tpl
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#!/bin/bash
echo -e "\nDelete OpenSearch indexes\n"
curl -X DELETE https://$OPENSEARCH_DOMAIN/production-locations --aws-sigv4 "aws:amz:eu-west-1:es" --user "$AWS_ACCESS_KEY_ID:$AWS_SECRET_ACCESS_KEY"
echo -e "\nRemove lock file\n"
curl -X DELETE https://$OPENSEARCH_DOMAIN/moderation-events --aws-sigv4 "aws:amz:eu-west-1:es" --user "$AWS_ACCESS_KEY_ID:$AWS_SECRET_ACCESS_KEY"

echo -e "\nRemove the JDBC input lock files from the EFS connected to Logstash\n"
sudo mount -t efs -o tls,accesspoint=$EFS_AP_ID $EFS_ID:/ /mnt
sudo rm /mnt/logstash_jdbc_last_run
sudo rm /mnt/production_locations_jdbc_last_run
sudo rm /mnt/moderation_events_jdbc_last_run
sudo umount /mnt
29 changes: 15 additions & 14 deletions deployment/terraform/container_service.tf
Original file line number Diff line number Diff line change
Expand Up @@ -384,20 +384,21 @@ data "template_file" "app_logstash" {
template = file("task-definitions/app_logstash.json")

vars = {
image = local.app_logstash_image
log_group_name = "log${local.short}AppLogstash"
opensearch_auth_type = var.opensearch_auth_type
aws_region = var.aws_region
opensearch_ssl = var.opensearch_ssl
opensearch_ssl_cert_verification = var.opensearch_ssl_cert_verification
opensearch_host = aws_opensearch_domain.opensearch.endpoint
opensearch_port = var.opensearch_port
postgres_host = aws_route53_record.database.name
postgres_port = module.database_enc.port
postgres_user = var.rds_database_username
postgres_password = var.rds_database_password
postgres_db = var.rds_database_name
logstash_update_interval_minutes = var.logstash_update_interval_minutes
image = local.app_logstash_image
log_group_name = "log${local.short}AppLogstash"
opensearch_auth_type = var.opensearch_auth_type
aws_region = var.aws_region
opensearch_ssl = var.opensearch_ssl
opensearch_ssl_cert_verification = var.opensearch_ssl_cert_verification
opensearch_host = aws_opensearch_domain.opensearch.endpoint
opensearch_port = var.opensearch_port
postgres_host = aws_route53_record.database.name
postgres_port = module.database_enc.port
postgres_user = var.rds_database_username
postgres_password = var.rds_database_password
postgres_db = var.rds_database_name
production_locations_pipeline_update_interval_minutes = var.production_locations_pipeline_update_interval_minutes
moderation_events_pipeline_update_interval_minutes = var.moderation_events_pipeline_update_interval_minutes
}
}

Expand Down
3 changes: 2 additions & 1 deletion deployment/terraform/task-definitions/app_logstash.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
{ "name": "POSTGRES_PASSWORD", "value": "${postgres_password}" },
{ "name": "POSTGRES_DB", "value": "${postgres_db}" },
{ "name": "AWS_REGION", "value": "${aws_region}"},
{ "name": "LOGSTASH_UPDATE_INTERVAL_MINUTES", "value": "${logstash_update_interval_minutes}"}
{ "name": "PRODUCTION_LOCATIONS_PIPELINE_UPDATE_INTERVAL_MINUTES", "value": "${production_locations_pipeline_update_interval_minutes}"},
{ "name": "MODERATION_EVENTS_PIPELINE_UPDATE_INTERVAL_MINUTES", "value": "${moderation_events_pipeline_update_interval_minutes}" }
],
"logConfiguration": {
"logDriver": "awslogs",
Expand Down
7 changes: 6 additions & 1 deletion deployment/terraform/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -542,11 +542,16 @@ variable "opensearch_ssl_cert_verification" {
default = true
}

variable "logstash_update_interval_minutes" {
variable "production_locations_pipeline_update_interval_minutes" {
type = number
default = 15
}

variable "moderation_events_pipeline_update_interval_minutes" {
type = number
default = 1
}

variable "app_logstash_ecs_desired_count" {
type = number
default = 1
Expand Down
3 changes: 2 additions & 1 deletion doc/release/RELEASE-NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ This project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html

### Architecture/Environment changes
* The OpenSearch version has been increased to 2.15.
* [OSDEV-1335](https://opensupplyhub.atlassian.net/browse/OSDEV-1335) - The new "moderation events" Logstash pipeline has been configured and implemented to collect moderation event data from the current PostgreSQL database and save it to OpenSearch. This setup allows for fast searches on the moderation events data.

### Bugfix
* *Describe bugfix here.*
* [OSDEV-1335](https://opensupplyhub.atlassian.net/browse/OSDEV-1335) - Fixed the assertion in the test for the `country.rb` filter of the "production locations" Logstash pipeline. The main issue was with the evaluation of statements in the Ruby block. Since only the last statement is evaluated in a Ruby block, all the checks were grouped into one chain of logical statements and returned as a `result` variable at the end.

### What's new
* *Describe what's new here. The changes that can impact user experience should be listed in this section.*
Expand Down
3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ services:
- POSTGRES_USER=opensupplyhub
- POSTGRES_PASSWORD=opensupplyhub
- POSTGRES_DB=opensupplyhub
- LOGSTASH_UPDATE_INTERVAL_MINUTES=1
- PRODUCTION_LOCATIONS_PIPELINE_UPDATE_INTERVAL_MINUTES=1
- MODERATION_EVENTS_PIPELINE_UPDATE_INTERVAL_MINUTES=1

depends_on:
- opensearch-single-node
Expand Down
15 changes: 10 additions & 5 deletions src/logstash/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
FROM logstash:8.13.4
FROM logstash:8.15.3

ENV LS_JAVA_OPTS="-XX:MaxDirectMemorySize=512m"

# Install a specific version of the OpenSearch output plugin for Logstash.
RUN bin/logstash-plugin install --version 2.0.2 logstash-output-opensearch

# Delete configuration samples in the custom image to ensure that the example
# 1. Delete configuration samples in the custom image to ensure that the example
# configuration files from the base image are not retained.
# 2. Delete the pipelines.yml file to overwrite it and support multiple
# pipelines.
# 3. Create the folder to save the lock files for the JDBC inputs.
RUN rm -f /usr/share/logstash/pipeline/* \
&& rm -f /usr/share/logstash/config/logstash-sample.conf
&& rm -f /usr/share/logstash/config/logstash-sample.conf \
&& rm -f /usr/share/logstash/config/pipelines.yml \
&& mkdir -p /usr/share/logstash/data/plugins/inputs/jdbc

# Copy the PostgreSQL jdbc driver to the external library folder of Logstash.
COPY --chown=logstash:root ./bin/ /usr/share/logstash/logstash-core/lib/jars/

# Copy the Logstash configuration file to the config folder of Logstash.
# Copy the Logstash configuration files to the config folder of Logstash.
COPY --chown=logstash:root ./config/ /usr/share/logstash/config/

# Copy the Logstash pipeline configuration file to the pipeline folder of
# Copy the Logstash pipeline configuration files to the pipeline folder of
# Logstash.
COPY --chown=logstash:root ./pipeline/ /usr/share/logstash/pipeline/

Expand Down
1 change: 1 addition & 0 deletions src/logstash/config/logstash.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
api.enabled: false
8 changes: 8 additions & 0 deletions src/logstash/config/pipelines.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# This file is where pipelines are defined.
# For more information on multiple pipelines, see the documentation:
# https://www.elastic.co/guide/en/logstash/8.15/multiple-pipelines.html

- pipeline.id: production_locations
path.config: "/usr/share/logstash/pipeline/sync_production_locations.conf"
- pipeline.id: moderation_events
path.config: "/usr/share/logstash/pipeline/sync_moderation_events.conf"
64 changes: 64 additions & 0 deletions src/logstash/indexes/moderation_events.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{
"index_patterns": [
"moderation-events*"
],
"template": {
"mappings": {
"properties": {
"moderation_id": {
"type": "keyword"
},
"os_id": {
"type": "keyword"
},
"contributor_id": {
"type": "keyword"
},
"contributor_name": {
"type": "keyword"
},
"claim_id": {
"type": "keyword"
},
"request_type": {
"type": "keyword"
},
"source": {
"type": "keyword"
},
"status": {
"type": "keyword"
},
"status_change_date": {
"type": "date"
},
"created_at": {
"type": "date"
},
"updated_at": {
"type": "date"
},
"cleaned_data": {
"properties": {
"country": {
"properties": {
"name": {
"type": "keyword"
},
"alpha_2": {
"type": "keyword"
},
"alpha_3": {
"type": "keyword"
},
"numeric": {
"type": "keyword"
}
}
}
}
}
}
}
}
}
64 changes: 64 additions & 0 deletions src/logstash/pipeline/sync_moderation_events.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
input{
jdbc{
jdbc_connection_string => "jdbc:postgresql://${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}"
jdbc_user => "${POSTGRES_USER}"
jdbc_password => "${POSTGRES_PASSWORD}"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_paging_enabled => true
use_column_value => true
tracking_column => "updated_at"
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/data/plugins/inputs/jdbc/moderation_events_jdbc_last_run"
schedule => "*/${MODERATION_EVENTS_PIPELINE_UPDATE_INTERVAL_MINUTES} * * * *"
statement_filepath => "/usr/share/logstash/sql/sync_moderation_events.sql"
}
}
filter {
json {
source => "cleaned_data_value"
target => "cleaned_data_value"
}
ruby {
path => "/usr/share/logstash/scripts/moderation_events/country.rb"
}
ruby {
path => "/usr/share/logstash/scripts/moderation_events/os_id.rb"
}
ruby {
path => "/usr/share/logstash/scripts/moderation_events/claim_id.rb"
}
ruby {
path => "/usr/share/logstash/scripts/moderation_events/source.rb"
}
ruby {
path => "/usr/share/logstash/scripts/moderation_events/status_change_date.rb"
}
mutate {
copy => { "moderation_id" => "[@metadata][_id]" }
remove_field => [
"@version",
"@timestamp",
"cleaned_data_value",
"os_id_value",
"claim_id_value",
"source_value",
"status_change_date_value"
]
}
}
output {
opensearch {
hosts => ["${OPENSEARCH_HOST}:${OPENSEARCH_PORT}"]
auth_type => {
type => "${OPENSEARCH_AUTH_TYPE}"
region => "${AWS_REGION}"
}
ssl => "${OPENSEARCH_SSL}"
ssl_certificate_verification => "${OPENSEARCH_SSL_CERT_VERIFICATION}"
index => "moderation-events"
document_id => "%{[@metadata][_id]}"
template_name => "moderation_events_template"
template => "/usr/share/logstash/indexes/moderation_events.json"
legacy_template => false
}
}
3 changes: 2 additions & 1 deletion src/logstash/pipeline/sync_production_locations.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ input{
use_column_value => true
tracking_column => "updated_at"
tracking_column_type => "timestamp"
schedule => "*/${LOGSTASH_UPDATE_INTERVAL_MINUTES} * * * *"
last_run_metadata_path => "/usr/share/logstash/data/plugins/inputs/jdbc/production_locations_jdbc_last_run"
schedule => "*/${PRODUCTION_LOCATIONS_PIPELINE_UPDATE_INTERVAL_MINUTES} * * * *"
statement_filepath => "/usr/share/logstash/sql/sync_production_locations.sql"
}
}
Expand Down
42 changes: 42 additions & 0 deletions src/logstash/scripts/moderation_events/claim_id.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
def filter(event)
claim_id_value = event.get('claim_id_value')

is_claim_id_value_valid = !claim_id_value.nil?
event.set('claim_id', claim_id_value) if is_claim_id_value_valid

return [event]
end

test 'claim_id filter with valid claim_id_value' do
in_event {
{
'claim_id_value' => 12,
'status' => 'PENDING'
}
}

expect('returns an object with claim_id set') do |events|
is_claim_id_value_present = events[0].get('claim_id_value') == 12
is_status_present = events[0].get('status') == 'PENDING'
is_claim_id_present = events[0].get('claim_id') == 12

is_claim_id_value_present && is_status_present && is_claim_id_present
end
end

test 'claim_id filter with nil claim_id_value' do
in_event {
{
'claim_id_value' => nil,
'status' => 'PENDING'
}
}

expect('returns the same object') do |events|
is_claim_id_value_nil = events[0].get('claim_id_value').nil?
is_status_present = events[0].get('status') == 'PENDING'
is_claim_id_present = events[0].to_hash.key?('claim_id')

is_claim_id_value_nil && is_status_present && !is_claim_id_present
end
end
Loading