From 85962d70c8d0aed588c2b0735c049afe67efeebe Mon Sep 17 00:00:00 2001 From: Sam Day Date: Fri, 18 Mar 2016 22:19:37 +1000 Subject: [PATCH] Allow DynamoDB table name to be specified --- .../worker/KinesisClientLibConfiguration.java | 18 ++++++++++++++++++ .../clientlibrary/lib/worker/Worker.java | 4 ++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 510565f51..7deb49dcb 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -155,6 +155,7 @@ public class KinesisClientLibConfiguration { private String applicationName; + private String tableName; private String streamName; private String kinesisEndpoint; private InitialPositionInStream initialPositionInStream; @@ -300,6 +301,7 @@ public KinesisClientLibConfiguration(String applicationName, checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize); checkIsRegionNameValid(regionName); this.applicationName = applicationName; + this.tableName = applicationName; this.streamName = streamName; this.kinesisEndpoint = kinesisEndpoint; this.initialPositionInStream = initialPositionInStream; @@ -366,6 +368,13 @@ public String getApplicationName() { return applicationName; } + /** + * @return Name of the table to use in DynamoDB + */ + public String getTableName() { + return tableName; + } + /** * @return Time within which a worker should renew a lease (else it is assumed dead) */ @@ -572,6 +581,15 @@ public int getInitialLeaseTableWriteCapacity() { } // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES + /** + * @param tableName name of the lease table in DynamoDB + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withTableName(String tableName) { + this.tableName = tableName; + return this; + } + /** * @param kinesisEndpoint Kinesis endpoint * @return KinesisClientLibConfiguration diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index dbbf934df..f1ca9bc17 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -219,7 +219,7 @@ public Worker( config.shouldCleanupLeasesUponShardCompletion(), null, new KinesisClientLibLeaseCoordinator( - new KinesisClientLeaseManager(config.getApplicationName(), dynamoDBClient), + new KinesisClientLeaseManager(config.getTableName(), config.getApplicationName(), dynamoDBClient), config.getWorkerIdentifier(), config.getFailoverTimeMillis(), config.getEpsilonMillis(), @@ -952,7 +952,7 @@ public Worker build() { config.getShardSyncIntervalMillis(), config.shouldCleanupLeasesUponShardCompletion(), null, - new KinesisClientLibLeaseCoordinator(new KinesisClientLeaseManager(config.getApplicationName(), + new KinesisClientLibLeaseCoordinator(new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient), config.getWorkerIdentifier(), config.getFailoverTimeMillis(),