Skip to content

Commit 7439032

Browse files
shanthooshBoris S
authored andcommitted
SAMZA-1952: StreamPartitionCountMonitor for standalone.
This patch adds the capability to detect the partition change of the input streams of a stateless standalone jobs and trigger a re-balancing phase(which will essentially account for new partitions from input stream and distribute it to the live processors of the group). Existing partition count detection of input streams is broken in yarn for stateful jobs. This will be addressed for both yarn and standalone as a part of apache#622 Author: Shanthoosh Venkataraman <[email protected]> Reviewers: Boris Shkolnik <[email protected]> Closes apache#726 from shanthoosh/stream_partition_count_monitor_for_standalone
1 parent a528cc6 commit 7439032

3 files changed

Lines changed: 185 additions & 17 deletions

File tree

samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import com.google.common.annotations.VisibleForTesting;
2222

23-
import java.util.ArrayList;
2423
import java.util.HashSet;
2524
import java.util.HashMap;
2625
import java.util.List;
@@ -38,12 +37,14 @@
3837
import org.apache.samza.config.MapConfig;
3938
import org.apache.samza.config.MetricsConfig;
4039
import org.apache.samza.config.TaskConfigJava;
40+
import org.apache.samza.config.StorageConfig;
4141
import org.apache.samza.config.ZkConfig;
4242
import org.apache.samza.container.TaskName;
4343
import org.apache.samza.coordinator.JobCoordinator;
4444
import org.apache.samza.coordinator.JobCoordinatorListener;
4545
import org.apache.samza.coordinator.JobModelManager;
4646
import org.apache.samza.coordinator.LeaderElectorListener;
47+
import org.apache.samza.coordinator.StreamPartitionCountMonitor;
4748
import org.apache.samza.job.model.ContainerModel;
4849
import org.apache.samza.job.model.JobModel;
4950
import org.apache.samza.metrics.MetricsRegistry;
@@ -53,6 +54,7 @@
5354
import org.apache.samza.storage.ChangelogStreamManager;
5455
import org.apache.samza.system.StreamMetadataCache;
5556
import org.apache.samza.system.SystemAdmins;
57+
import org.apache.samza.system.SystemStream;
5658
import org.apache.samza.util.MetricsReporterLoader;
5759
import org.apache.samza.util.SystemClock;
5860
import org.apache.samza.util.Util;
@@ -105,9 +107,11 @@ public class ZkJobCoordinator implements JobCoordinator {
105107
@VisibleForTesting
106108
ScheduleAfterDebounceTime debounceTimer;
107109

110+
@VisibleForTesting
111+
StreamPartitionCountMonitor streamPartitionCountMonitor = null;
112+
108113
ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
109114
this.config = config;
110-
111115
this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
112116

113117
this.processorId = createProcessorId(config);
@@ -179,6 +183,10 @@ public void stop() {
179183
LOG.debug("Shutting down metrics.");
180184
shutdownMetrics();
181185

186+
if (streamPartitionCountMonitor != null) {
187+
streamPartitionCountMonitor.stop();
188+
}
189+
182190
if (coordinatorListener != null) {
183191
coordinatorListener.onCoordinatorStop();
184192
}
@@ -233,13 +241,11 @@ public String getProcessorId() {
233241
public void onProcessorChange(List<String> processors) {
234242
if (leaderElector.amILeader()) {
235243
LOG.info("ZkJobCoordinator::onProcessorChange - list of processors changed. List size=" + processors.size());
236-
debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () -> doOnProcessorChange(processors));
244+
debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, this::doOnProcessorChange);
237245
}
238246
}
239247

240-
void doOnProcessorChange(List<String> processors) {
241-
// if list of processors is empty - it means we are called from 'onBecomeLeader'
242-
// TODO: Handle empty currentProcessorIds.
248+
void doOnProcessorChange() {
243249
List<String> currentProcessorIds = zkUtils.getSortedActiveProcessorsIDs();
244250
Set<String> uniqueProcessorIds = new HashSet<>(currentProcessorIds);
245251

@@ -320,16 +326,39 @@ private JobModel generateNewJobModel(List<String> processors) {
320326
return new JobModel(new MapConfig(), model.getContainers());
321327
}
322328

329+
@VisibleForTesting
330+
StreamPartitionCountMonitor getPartitionCountMonitor() {
331+
StreamMetadataCache streamMetadata = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
332+
Set<SystemStream> inputStreamsToMonitor = new TaskConfigJava(config).getAllInputStreams();
333+
334+
return new StreamPartitionCountMonitor(
335+
inputStreamsToMonitor,
336+
streamMetadata,
337+
metrics.getMetricsRegistry(),
338+
new JobConfig(config).getMonitorPartitionChangeFrequency(),
339+
streamsChanged -> {
340+
if (leaderElector.amILeader()) {
341+
debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, 0, this::doOnProcessorChange);
342+
}
343+
});
344+
}
345+
323346
class LeaderElectorListenerImpl implements LeaderElectorListener {
324347
@Override
325348
public void onBecomingLeader() {
326349
LOG.info("ZkJobCoordinator::onBecomeLeader - I became the leader");
327350
metrics.isLeader.set(true);
328351
zkUtils.subscribeToProcessorChange(new ProcessorChangeHandler(zkUtils));
329-
debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () -> {
330-
// actual actions to do are the same as onProcessorChange
331-
doOnProcessorChange(new ArrayList<>());
332-
});
352+
if (!new StorageConfig(config).hasDurableStores()) {
353+
// 1. Stop if there's a existing StreamPartitionCountMonitor running.
354+
if (streamPartitionCountMonitor != null) {
355+
streamPartitionCountMonitor.stop();
356+
}
357+
// 2. Start a new instance of StreamPartitionCountMonitor.
358+
streamPartitionCountMonitor = getPartitionCountMonitor();
359+
streamPartitionCountMonitor.start();
360+
}
361+
debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, ZkJobCoordinator.this::doOnProcessorChange);
333362
}
334363
}
335364

@@ -371,10 +400,8 @@ public void onBarrierStateChanged(final String version, ZkBarrierForVersionUpgra
371400
LOG.warn("Barrier for version " + version + " timed out.");
372401
if (leaderElector.amILeader()) {
373402
LOG.info("Leader will schedule a new job model generation");
374-
debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, () -> {
375-
// actual actions to do are the same as onProcessorChange
376-
doOnProcessorChange(new ArrayList<>());
377-
});
403+
// actual actions to do are the same as onProcessorChange
404+
debounceTimer.scheduleAfterDebounceTime(ON_PROCESSOR_CHANGE, debounceTimeMs, ZkJobCoordinator.this::doOnProcessorChange);
378405
}
379406
}
380407
}
@@ -477,6 +504,11 @@ public void handleStateChanged(Watcher.Event.KeeperState state) {
477504
if (leaderElector.amILeader()) {
478505
leaderElector.resignLeadership();
479506
}
507+
508+
if (streamPartitionCountMonitor != null) {
509+
streamPartitionCountMonitor.stop();
510+
}
511+
480512
/**
481513
* After this event, one amongst the following two things could potentially happen:
482514
* A. On successful reconnect to another zookeeper server in ensemble, this processor is going to
@@ -513,7 +545,6 @@ public void handleStateChanged(Watcher.Event.KeeperState state) {
513545
default:
514546
// received SyncConnected, ConnectedReadOnly, and SaslAuthenticated. NoOp
515547
LOG.info("Got ZK event " + state.toString() + " for processor=" + processorId + ". Continue");
516-
return;
517548
}
518549
}
519550

samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import org.I0Itec.zkclient.ZkClient;
2626
import org.apache.samza.config.MapConfig;
27+
import org.apache.samza.coordinator.StreamPartitionCountMonitor;
2728
import org.apache.samza.job.model.JobModel;
2829
import org.apache.samza.util.NoOpMetricsRegistry;
2930
import org.apache.samza.zk.ZkJobCoordinator.ZkSessionStateChangedListener;
@@ -36,7 +37,6 @@
3637
import static junit.framework.Assert.assertTrue;
3738
import static org.mockito.Mockito.*;
3839

39-
4040
public class TestZkJobCoordinator {
4141
private static final String TEST_BARRIER_ROOT = "/testBarrierRoot";
4242
private static final String TEST_JOB_MODEL_VERSION = "1";
@@ -90,4 +90,78 @@ public void testShouldRemoveBufferedEventsInDebounceQueueOnSessionExpiration() {
9090
verify(mockDebounceTimer).cancelAllScheduledActions();
9191
verify(mockDebounceTimer).scheduleAfterDebounceTime(Mockito.eq("ZK_SESSION_EXPIRED"), Mockito.eq(0L), Mockito.any(Runnable.class));
9292
}
93+
94+
@Test
95+
public void testShouldStopPartitionCountMonitorOnSessionExpiration() {
96+
ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
97+
ZkClient mockZkClient = Mockito.mock(ZkClient.class);
98+
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
99+
100+
ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
101+
when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
102+
when(zkUtils.getZkClient()).thenReturn(mockZkClient);
103+
when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
104+
105+
ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
106+
107+
ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
108+
StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
109+
zkJobCoordinator.debounceTimer = mockDebounceTimer;
110+
zkJobCoordinator.streamPartitionCountMonitor = monitor;
111+
112+
ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
113+
zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.Expired);
114+
Mockito.verify(monitor).stop();
115+
}
116+
117+
@Test
118+
public void testShouldStartPartitionCountMonitorOnBecomingLeader() {
119+
ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
120+
ZkClient mockZkClient = Mockito.mock(ZkClient.class);
121+
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
122+
123+
ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
124+
when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
125+
when(zkUtils.getZkClient()).thenReturn(mockZkClient);
126+
when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
127+
128+
ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
129+
130+
ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
131+
132+
StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
133+
zkJobCoordinator.debounceTimer = mockDebounceTimer;
134+
zkJobCoordinator.streamPartitionCountMonitor = monitor;
135+
when(zkJobCoordinator.getPartitionCountMonitor()).thenReturn(monitor);
136+
137+
ZkJobCoordinator.LeaderElectorListenerImpl listener = zkJobCoordinator.new LeaderElectorListenerImpl();
138+
139+
listener.onBecomingLeader();
140+
141+
Mockito.verify(monitor).start();
142+
}
143+
144+
@Test
145+
public void testShouldStopPartitionCountMonitorWhenStoppingTheJobCoordinator() {
146+
ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
147+
ZkClient mockZkClient = Mockito.mock(ZkClient.class);
148+
when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
149+
150+
ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
151+
when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
152+
when(zkUtils.getZkClient()).thenReturn(mockZkClient);
153+
when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
154+
155+
ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
156+
157+
ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
158+
159+
StreamPartitionCountMonitor monitor = Mockito.mock(StreamPartitionCountMonitor.class);
160+
zkJobCoordinator.debounceTimer = mockDebounceTimer;
161+
zkJobCoordinator.streamPartitionCountMonitor = monitor;
162+
163+
zkJobCoordinator.stop();
164+
165+
Mockito.verify(monitor).stop();
166+
}
93167
}

samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,17 @@
2929
import java.util.Map;
3030
import java.util.Properties;
3131
import java.util.UUID;
32+
import java.util.Objects;
33+
import java.util.Set;
34+
import java.util.HashSet;
3235
import java.util.concurrent.CountDownLatch;
3336
import java.util.concurrent.atomic.AtomicBoolean;
3437
import kafka.admin.AdminUtils;
38+
import kafka.admin.RackAwareMode;
3539
import kafka.utils.TestUtils;
3640
import org.I0Itec.zkclient.ZkClient;
3741
import org.apache.kafka.clients.producer.KafkaProducer;
3842
import org.apache.kafka.clients.producer.ProducerRecord;
39-
import org.apache.samza.SamzaException;
4043
import org.apache.samza.config.ApplicationConfig;
4144
import org.apache.samza.config.Config;
4245
import org.apache.samza.config.JobConfig;
@@ -51,6 +54,8 @@
5154
import org.apache.samza.job.model.TaskModel;
5255
import org.apache.samza.runtime.ApplicationRunner;
5356
import org.apache.samza.runtime.ApplicationRunners;
57+
import org.apache.samza.SamzaException;
58+
import org.apache.samza.system.SystemStreamPartition;
5459
import org.apache.samza.test.StandaloneIntegrationTestHarness;
5560
import org.apache.samza.test.StandaloneTestUtils;
5661
import org.apache.samza.util.NoOpMetricsRegistry;
@@ -203,6 +208,7 @@ private Map<String, String> buildStreamApplicationConfigMap(String systemName, S
203208
.put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS)
204209
.put(TaskConfig.DROP_PRODUCER_ERROR(), "true")
205210
.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS)
211+
.put(JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS(), "1000")
206212
.build();
207213
Map<String, String> applicationConfig = Maps.newHashMap(samzaContainerConfig);
208214

@@ -646,4 +652,61 @@ public void testShouldStopStreamApplicationWhenShutdownTimeOutIsLessThanContaine
646652
assertEquals(ApplicationStatus.SuccessfulFinish, appRunner3.status());
647653
}
648654

655+
/**
656+
* A. Create a kafka topic with partition count set to 5.
657+
* B. Create and launch a samza application which consumes events from the kafka topic.
658+
* C. Validate that the {@link JobModel} contains 5 {@link SystemStreamPartition}'s.
659+
* D. Increase the partition count of the input kafka topic to 100.
660+
* E. Validate that the new {@link JobModel} contains 100 {@link SystemStreamPartition}'s.
661+
*/
662+
@Test
663+
public void testShouldGenerateJobModelOnPartitionCountChange() throws Exception {
664+
publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
665+
666+
// Create StreamApplication from configuration.
667+
CountDownLatch kafkaEventsConsumedLatch1 = new CountDownLatch(NUM_KAFKA_EVENTS);
668+
CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
669+
670+
ApplicationRunner appRunner1 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
671+
TEST_SYSTEM, inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch1,
672+
applicationConfig1), applicationConfig1);
673+
674+
appRunner1.run();
675+
processedMessagesLatch1.await();
676+
677+
String jobModelVersion = zkUtils.getJobModelVersion();
678+
JobModel jobModel = zkUtils.getJobModel(jobModelVersion);
679+
Set<SystemStreamPartition> ssps = getSystemStreamPartitions(jobModel);
680+
681+
// Validate that the input partition count is 5 in the JobModel.
682+
Assert.assertEquals(5, ssps.size());
683+
684+
// Increase the partition count of input kafka topic to 100.
685+
AdminUtils.addPartitions(zkUtils(), inputKafkaTopic, 100, "", true, RackAwareMode.Enforced$.MODULE$);
686+
687+
long jobModelWaitTimeInMillis = 10;
688+
while (Objects.equals(zkUtils.getJobModelVersion(), jobModelVersion)) {
689+
LOGGER.info("Waiting for new jobModel to be published");
690+
Thread.sleep(jobModelWaitTimeInMillis);
691+
jobModelWaitTimeInMillis = jobModelWaitTimeInMillis * 2;
692+
}
693+
694+
String newJobModelVersion = zkUtils.getJobModelVersion();
695+
JobModel newJobModel = zkUtils.getJobModel(newJobModelVersion);
696+
ssps = getSystemStreamPartitions(newJobModel);
697+
698+
// Validate that the input partition count is 100 in the new JobModel.
699+
Assert.assertEquals(100, ssps.size());
700+
appRunner1.kill();
701+
appRunner1.waitForFinish();
702+
assertEquals(ApplicationStatus.SuccessfulFinish, appRunner1.status());
703+
}
704+
705+
private static Set<SystemStreamPartition> getSystemStreamPartitions(JobModel jobModel) {
706+
Set<SystemStreamPartition> ssps = new HashSet<>();
707+
jobModel.getContainers().forEach((containerName, containerModel) -> {
708+
containerModel.getTasks().forEach((taskName, taskModel) -> ssps.addAll(taskModel.getSystemStreamPartitions()));
709+
});
710+
return ssps;
711+
}
649712
}

0 commit comments

Comments
 (0)