diff --git a/lib/logstash/inputs/cloudwatch_logs.rb b/lib/logstash/inputs/cloudwatch_logs.rb index 38d3cc6..3c438fa 100644 --- a/lib/logstash/inputs/cloudwatch_logs.rb +++ b/lib/logstash/inputs/cloudwatch_logs.rb @@ -8,6 +8,7 @@ require "aws-sdk" require "logstash/inputs/cloudwatch_logs/patch" require "fileutils" +require "json" Aws.eager_autoload! @@ -51,6 +52,9 @@ class LogStash::Inputs::CloudWatch_Logs < LogStash::Inputs::Base config :start_position, :default => 'beginning' + # use dynamodb to store the sincedb + config :dynamodb_sincedb_table, :validate => :string, :default => nil + # def register public def register @@ -60,11 +64,33 @@ def register @sincedb = {} check_start_position_validity - - Aws::ConfigService::Client.new(aws_options_hash) - @cloudwatch = Aws::CloudWatchLogs::Client.new(aws_options_hash) - - if @sincedb_path.nil? + aws_options = aws_options_hash + aws_options[:region]=ENV["AWS_REGION"] + + Aws::ConfigService::Client.new(aws_options) + @cloudwatch = Aws::CloudWatchLogs::Client.new(aws_options) + + if !@dynamodb_sincedb_table.nil? + # use dynamodb for sincedb stuff + @ddb=Aws::DynamoDB::Client.new(aws_options) + begin + resp = @ddb.describe_table( + { + table_name: @dynamodb_sincedb_table, + }) + if resp.nil? + @logger.debug("dynamodb table not found - create it #{@dynamodb_sincedb_table}") + create_ddb_table + else + @logger.debug("dynamodb table found #{@dynamodb_sincedb_table} - #{resp.to_json}") + end + rescue Aws::DynamoDB::Errors::ResourceNotFoundException + # table doesn't exist - create it + @logger.debug("dynamodb table needs creating #{@dynamodb_sincedb_table}") + create_ddb_table + end + end + if @dynamodb_sincedb_table.nil? && @sincedb_path.nil? if settings datapath = File.join(settings.get_value("path.data"), "plugins", "inputs", "cloudwatch_logs") # Ensure that the filepath exists before writing, since it's deeply nested. @@ -75,7 +101,7 @@ def register # This section is going to be deprecated eventually, as path.data will be # the default, not an environment variable (SINCEDB_DIR or HOME) - if @sincedb_path.nil? # If it is _still_ nil... + if @dynamodb_sincedb_table.nil? && @sincedb_path.nil? # If it is _still_ nil... if ENV["SINCEDB_DIR"].nil? && ENV["HOME"].nil? @logger.error("No SINCEDB_DIR or HOME environment variable set, I don't know where " \ "to keep track of the files I'm watching. Either set " \ @@ -158,6 +184,54 @@ def priority_of(group) @priority.index(group) || -1 end + public + def create_ddb_table + @logger.debug("Creating dynamodb table: #{@dynamodb_sincedb_table}") + resp = @ddb.create_table( + { + attribute_definitions: [ + { + attribute_name: "GroupName", + attribute_type: "S", + }, + ], + key_schema: [ + { + attribute_name: "GroupName", + key_type: "HASH", + }, + ], + provisioned_throughput: { + read_capacity_units: 1, + write_capacity_units: 1, + }, + table_name: @dynamodb_sincedb_table, + }) + begin + resp = @ddb.describe_table( + { + table_name: @dynamodb_sincedb_table, + }) + rescue Aws::DynamoDB::Errors::ResourceNotFoundException + @logger.debug("not found checking dynamodb table: #{@dynamodb_sincedb_table}") + + end + + until !resp.nil? && resp.table.table_status == "ACTIVE" + @logger.debug("waiting before checking dynamodb table: #{@dynamodb_sincedb_table}") + sleep 5 + begin + resp = @ddb.describe_table( + { + table_name: @dynamodb_sincedb_table, + }) + rescue Aws::DynamoDB::Errors::ResourceNotFoundException + @logger.debug("not found again checking dynamodb table: #{@dynamodb_sincedb_table}") + end + end + @logger.debug("Done creating dynamodb table: #{@dynamodb_sincedb_table}") + end + public def determine_start_position(groups, sincedb) groups.each do |group| @@ -195,8 +269,12 @@ def process_group(group) process_log(event, group) end - _sincedb_write - + unless @dynamodb_sincedb_table.nil? + update_ddb_group(group) + else + _sincedb_write + end + next_token = resp.next_token break if next_token.nil? end @@ -229,23 +307,56 @@ def parse_time(data) private def _sincedb_open - begin - File.open(@sincedb_path) do |db| - @logger.debug? && @logger.debug("_sincedb_open: reading from #{@sincedb_path}") - db.each do |line| - group, pos = line.split(" ", 2) - @logger.debug? && @logger.debug("_sincedb_open: setting #{group} to #{pos.to_i}") - @sincedb[group] = pos.to_i + unless @dynamodb_sincedb_table.nil? + #get the sincedb from ddb + begin + @logger.debug? && @logger.debug("_sincedb_open: reading from DynmanoDB: #{@dynamodb_sincedb_table}") + resp = @ddb.scan( + { + table_name: @dynamodb_sincedb_table, + } + ) + resp.items.each do |item| + @logger.debug? && @logger.debug("_sincedb_open: setting #{item["GroupName"]} to #{item["StartPosition"]}") + @sincedb[item["GroupName"]] = item["StartPosition"].to_i + end + rescue + #problem opening DynamoDB + @logger.debug? && @logger.debug("_sincedb_open: error: #{@dynamodb_sincedb_table}: #{$!}") + end + else + begin + File.open(@sincedb_path) do |db| + @logger.debug? && @logger.debug("_sincedb_open: reading from #{@sincedb_path}") + db.each do |line| + group, pos = line.split(" ", 2) + @logger.debug? && @logger.debug("_sincedb_open: setting #{group} to #{pos.to_i}") + @sincedb[group] = pos.to_i + end end + rescue + #No existing sincedb to load + @logger.debug? && @logger.debug("_sincedb_open: error: #{@sincedb_path}: #{$!}") end - rescue - #No existing sincedb to load - @logger.debug? && @logger.debug("_sincedb_open: error: #{@sincedb_path}: #{$!}") end end # def _sincedb_open + private + def update_ddb_group(group) + resp = @ddb.put_item( + { + table_name: @dynamodb_sincedb_table, + item: { + "GroupName" => group, + "StartPosition" => "#{@sincedb[group]}" + } + } + ) + end + private def _sincedb_write + begin IO.write(@sincedb_path, serialize_sincedb, 0) rescue Errno::EACCES