Skip to content

Commit 580ec16

Browse files
committed
Merge pull request #325 from mwl/refactor/292-offers-method
Reviewed and approved by @philwinder.
2 parents 523ab01 + 3af922e commit 580ec16

File tree

4 files changed

+302
-131
lines changed

4 files changed

+302
-131
lines changed

scheduler/src/main/java/org/apache/mesos/elasticsearch/scheduler/ElasticsearchScheduler.java

Lines changed: 6 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class ElasticsearchScheduler implements Scheduler {
3030
private Observable statusUpdateWatchers = new StatusUpdateObservable();
3131
private Boolean registered = false;
3232
private ClusterState clusterState;
33+
OfferStrategy offerStrategy;
3334

3435
public ElasticsearchScheduler(Configuration configuration, TaskInfoFactory taskInfoFactory) {
3536
this.configuration = configuration;
@@ -67,6 +68,7 @@ public void registered(SchedulerDriver driver, Protos.FrameworkID frameworkId, P
6768
LOGGER.info("Framework registered as " + frameworkId.getValue());
6869

6970
clusterState = new ClusterState(configuration.getState(), frameworkState); // Must use new framework state. This is when we are allocated our FrameworkID.
71+
offerStrategy = new OfferStrategy(configuration, clusterState);
7072
clusterMonitor = new ClusterMonitor(configuration, this, driver, new StatePath(configuration.getState()));
7173
clusterState.getTaskList().forEach(clusterMonitor::startMonitoringTask); // Get all previous executors and start monitoring them.
7274
statusUpdateWatchers.addObserver(clusterState);
@@ -88,34 +90,19 @@ public void reregistered(SchedulerDriver driver, Protos.MasterInfo masterInfo) {
8890
LOGGER.info("Framework re-registered");
8991
}
9092

91-
// Todo, this massive if statement needs to be performed better.
9293
@Override
9394
public void resourceOffers(SchedulerDriver driver, List<Protos.Offer> offers) {
9495
if (!registered) {
9596
LOGGER.debug("Not registered, can't accept resource offers.");
9697
return;
9798
}
9899
for (Protos.Offer offer : offers) {
99-
if (isHostAlreadyRunningTask(offer)) {
100-
driver.declineOffer(offer.getId()); // DCOS certification 05
101-
LOGGER.info("Declined offer: Host " + offer.getHostname() + " is already running an Elastisearch task");
102-
} else if (clusterState.getTaskList().size() == configuration.getElasticsearchNodes()) {
103-
driver.declineOffer(offer.getId()); // DCOS certification 05
104-
LOGGER.info("Declined offer: Mesos runs already runs " + configuration.getElasticsearchNodes() + " Elasticsearch tasks");
105-
} else if (!containsTwoPorts(offer.getResourcesList())) {
106-
LOGGER.info("Declined offer: Offer did not contain 2 ports for Elasticsearch client and transport connection");
107-
driver.declineOffer(offer.getId());
108-
} else if (!isEnoughCPU(offer.getResourcesList())) {
109-
LOGGER.info("Declined offer: Not enough CPU resources");
110-
driver.declineOffer(offer.getId());
111-
} else if (!isEnoughRAM(offer.getResourcesList())) {
112-
LOGGER.info("Declined offer: Not enough RAM resources");
113-
driver.declineOffer(offer.getId());
114-
} else if (!isEnoughDisk(offer.getResourcesList())) {
115-
LOGGER.info("Not enough Disk resources");
100+
final OfferStrategy.OfferResult result = offerStrategy.evaluate(offer);
101+
102+
if (!result.acceptable) {
103+
LOGGER.debug("Declined offer: " + result.reason.orElse("Unknown"));
116104
driver.declineOffer(offer.getId());
117105
} else {
118-
LOGGER.info("Accepted offer: " + offer.getHostname());
119106
Protos.TaskInfo taskInfo = taskInfoFactory.createTask(configuration, offer);
120107
LOGGER.debug(taskInfo.toString());
121108
driver.launchTasks(Collections.singleton(offer.getId()), Collections.singleton(taskInfo));
@@ -126,26 +113,6 @@ public void resourceOffers(SchedulerDriver driver, List<Protos.Offer> offers) {
126113
}
127114
}
128115

129-
private boolean isEnoughDisk(List<Protos.Resource> resourcesList) {
130-
ResourceCheck resourceCheck = new ResourceCheck(Resources.RESOURCE_DISK);
131-
return resourceCheck.isEnough(resourcesList, configuration.getDisk());
132-
}
133-
134-
private boolean isEnoughCPU(List<Protos.Resource> resourcesList) {
135-
ResourceCheck resourceCheck = new ResourceCheck(Resources.RESOURCE_CPUS);
136-
return resourceCheck.isEnough(resourcesList, configuration.getCpus());
137-
}
138-
139-
private boolean isEnoughRAM(List<Protos.Resource> resourcesList) {
140-
ResourceCheck resourceCheck = new ResourceCheck(Resources.RESOURCE_MEM);
141-
return resourceCheck.isEnough(resourcesList, configuration.getMem());
142-
}
143-
144-
private boolean containsTwoPorts(List<Protos.Resource> resources) {
145-
int count = Resources.selectTwoPortsFromRange(resources).size();
146-
return count == 2;
147-
}
148-
149116
@Override
150117
public void offerRescinded(SchedulerDriver driver, Protos.OfferID offerId) {
151118
LOGGER.info("Offer " + offerId.getValue() + " rescinded");
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package org.apache.mesos.elasticsearch.scheduler;
2+
3+
import org.apache.log4j.Logger;
4+
import org.apache.mesos.Protos;
5+
import org.apache.mesos.elasticsearch.scheduler.state.ClusterState;
6+
7+
import java.util.List;
8+
import java.util.Optional;
9+
10+
import static java.util.Arrays.asList;
11+
12+
/**
13+
* Offer strategy
14+
*/
15+
public class OfferStrategy {
16+
private static final Logger LOGGER = Logger.getLogger(ElasticsearchScheduler.class.toString());
17+
private ClusterState clusterState;
18+
private Configuration configuration;
19+
20+
private List<OfferRule> acceptanceRules = asList(
21+
new OfferRule("Host already running task", this::isHostAlreadyRunningTask),
22+
new OfferRule("Cluster size already fulfilled", offer -> clusterState.getTaskList().size() == configuration.getElasticsearchNodes()),
23+
new OfferRule("Offer did not have 2 ports", offer -> !containsTwoPorts(offer.getResourcesList())),
24+
new OfferRule("Offer did not have enough CPU resources", offer -> !isEnoughCPU(configuration, offer.getResourcesList())),
25+
new OfferRule("Offer did not have enough RAM resources", offer -> !isEnoughRAM(configuration, offer.getResourcesList())),
26+
new OfferRule("Offer did not have enough disk resources", offer -> !isEnoughDisk(configuration, offer.getResourcesList()))
27+
);
28+
29+
public OfferStrategy(Configuration configuration, ClusterState clusterState) {
30+
this.clusterState = clusterState;
31+
this.configuration = configuration;
32+
}
33+
34+
public OfferResult evaluate(Protos.Offer offer) {
35+
final Optional<OfferRule> decline = acceptanceRules.stream().filter(offerRule -> offerRule.rule.accepts(offer)).limit(1).findFirst();
36+
if (decline.isPresent()) {
37+
return OfferResult.decline(decline.get().declineReason);
38+
}
39+
40+
LOGGER.info("Accepted offer: " + offer.getHostname());
41+
return OfferResult.accept();
42+
}
43+
44+
/**
45+
* Offer result
46+
*/
47+
public static class OfferResult {
48+
final boolean acceptable;
49+
final Optional<String> reason;
50+
51+
private OfferResult(boolean acceptable, Optional<String> reason) {
52+
this.acceptable = acceptable;
53+
this.reason = reason;
54+
}
55+
56+
public static OfferResult accept() {
57+
return new OfferResult(true, Optional.<String>empty());
58+
}
59+
60+
public static OfferResult decline(String reason) {
61+
return new OfferResult(false, Optional.of(reason));
62+
}
63+
}
64+
65+
private boolean isHostAlreadyRunningTask(Protos.Offer offer) {
66+
Boolean result = false;
67+
List<Protos.TaskInfo> stateList = clusterState.getTaskList();
68+
for (Protos.TaskInfo t : stateList) {
69+
if (t.getSlaveId().equals(offer.getSlaveId())) {
70+
result = true;
71+
}
72+
}
73+
return result;
74+
}
75+
private boolean isEnoughDisk(Configuration configuration, List<Protos.Resource> resourcesList) {
76+
return new ResourceCheck(Resources.RESOURCE_DISK).isEnough(resourcesList, configuration.getDisk());
77+
}
78+
79+
private boolean isEnoughCPU(Configuration configuration, List<Protos.Resource> resourcesList) {
80+
return new ResourceCheck(Resources.RESOURCE_CPUS).isEnough(resourcesList, configuration.getCpus());
81+
}
82+
83+
private boolean isEnoughRAM(Configuration configuration, List<Protos.Resource> resourcesList) {
84+
return new ResourceCheck(Resources.RESOURCE_MEM).isEnough(resourcesList, configuration.getMem());
85+
}
86+
87+
private boolean containsTwoPorts(List<Protos.Resource> resources) {
88+
return Resources.selectTwoPortsFromRange(resources).size() == 2;
89+
}
90+
91+
/**
92+
* Rule and reason container object
93+
*/
94+
private static class OfferRule {
95+
String declineReason;
96+
Rule rule;
97+
98+
public OfferRule(String declineReason, Rule rule) {
99+
this.declineReason = declineReason;
100+
this.rule = rule;
101+
}
102+
}
103+
104+
/**
105+
* Interface for checking offers
106+
*/
107+
@FunctionalInterface
108+
private interface Rule {
109+
boolean accepts(Protos.Offer offer);
110+
}
111+
}

scheduler/src/test/java/org/apache/mesos/elasticsearch/scheduler/ElasticsearchSchedulerTest.java

Lines changed: 32 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,17 @@
1212
import org.junit.Test;
1313
import org.mockito.Mockito;
1414

15+
import java.net.InetSocketAddress;
16+
import java.time.ZonedDateTime;
1517
import java.util.Date;
1618
import java.util.UUID;
1719

20+
import static java.util.Collections.singleton;
21+
import static java.util.Collections.singletonList;
1822
import static org.apache.mesos.elasticsearch.common.Offers.newOfferBuilder;
1923
import static org.apache.mesos.elasticsearch.scheduler.Resources.*;
2024
import static org.mockito.Mockito.mock;
25+
import static org.mockito.Mockito.verify;
2126
import static org.mockito.Mockito.when;
2227

2328
/**
@@ -58,16 +63,9 @@ public class ElasticsearchSchedulerTest {
5863
private TaskInfoFactory taskInfoFactory;
5964

6065
private org.apache.mesos.elasticsearch.scheduler.Configuration configuration;
61-
// private ZonedDateTime now = ZonedDateTime.now();
62-
// private InetSocketAddress transportAddress = new InetSocketAddress("localhost", 9300);
63-
// private InetSocketAddress clientAddress = new InetSocketAddress("localhost", 9200);
64-
6566

6667
@Before
6768
public void before() {
68-
Clock clock = mock(Clock.class);
69-
when(clock.now()).thenReturn(TASK1_DATE).thenReturn(TASK2_DATE);
70-
7169
frameworkID = Protos.FrameworkID.newBuilder().setValue(UUID.randomUUID().toString()).build();
7270
FrameworkState frameworkState = mock(FrameworkState.class);
7371
when(frameworkState.getFrameworkID()).thenReturn(frameworkID);
@@ -92,6 +90,7 @@ public void before() {
9290

9391
masterInfo = newMasterInfo();
9492
scheduler.registered(driver, frameworkID, masterInfo);
93+
scheduler.offerStrategy = mock(OfferStrategy.class);
9594
}
9695

9796
@Ignore
@@ -107,7 +106,7 @@ public void shouldCallOberversWhenExecutorLost() {
107106

108107
@Test
109108
public void testRegistered() {
110-
Mockito.verify(driver).requestResources(
109+
verify(driver).requestResources(
111110
Mockito.argThat(
112111
new RequestMatcher(
113112
configuration.getCpus(),
@@ -119,92 +118,33 @@ public void testRegistered() {
119118
);
120119
}
121120

122-
// TODO (pnw): This requires scheduler refactoring to work. Refactor instantiation of objects out of registered method. And use setters.
123-
// @Test
124-
// public void testResourceOffers_isSlaveAlreadyRunningTask() {
125-
// Task task1 = new Task("host1", "1", Protos.TaskState.TASK_RUNNING, now, clientAddress, transportAddress);
126-
// Task task2 = new Task("host2", "2", Protos.TaskState.TASK_RUNNING, now, clientAddress, transportAddress);
127-
// scheduler.tasks = new HashMap<>();
128-
// scheduler.tasks.put("host1", task1);
129-
// scheduler.tasks.put("host2", task2);
130-
//
131-
// Protos.Offer.Builder offer = newOffer("host1");
132-
//
133-
// scheduler.resourceOffers(driver, singletonList(offer.build()));
134-
//
135-
// verify(driver).declineOffer(offer.getId());
136-
// }
137-
//
138-
// @Test
139-
// public void testResourceOffers_enoughNodes() {
140-
// Task task1 = new Task("host1", "1", Protos.TaskState.TASK_RUNNING, now, clientAddress, transportAddress);
141-
// Task task2 = new Task("host2", "2", Protos.TaskState.TASK_RUNNING, now, clientAddress, transportAddress);
142-
// Task task3 = new Task("host3", "3", Protos.TaskState.TASK_RUNNING, now, clientAddress, transportAddress);
143-
// scheduler.tasks = new HashMap<>();
144-
// scheduler.tasks.put("host1", task1);
145-
// scheduler.tasks.put("host2", task2);
146-
// scheduler.tasks.put("host3", task3);
147-
//
148-
// Protos.Offer.Builder offer = newOffer("host4");
149-
//
150-
// scheduler.resourceOffers(driver, singletonList(offer.build()));
151-
//
152-
// verify(driver).declineOffer(offer.getId());
153-
// }
154-
//
155-
// @Test
156-
// public void testResourceOffers_noPorts() {
157-
// Task task1 = new Task("host1", "1", Protos.TaskState.TASK_RUNNING, now, clientAddress, transportAddress);
158-
// Task task2 = new Task("host2", "2", Protos.TaskState.TASK_RUNNING, now, clientAddress, transportAddress);
159-
// scheduler.tasks = new HashMap<>();
160-
// scheduler.tasks.put("host1", task1);
161-
// scheduler.tasks.put("host2", task2);
162-
//
163-
// Protos.Offer.Builder offer = newOffer("host3");
164-
//
165-
// scheduler.resourceOffers(driver, singletonList(offer.build()));
166-
//
167-
// verify(driver).declineOffer(offer.getId());
168-
// }
169-
//
170-
// @SuppressWarnings("unchecked")
171-
// @Test
172-
// public void testResourceOffers_singlePort() {
173-
// Task task = new Task("host1", "task1", Protos.TaskState.TASK_RUNNING, now, clientAddress, transportAddress);
174-
// scheduler.tasks = new HashMap<>();
175-
// scheduler.tasks.put("host1", task);
176-
//
177-
// Protos.Offer.Builder offerBuilder = newOffer("host3");
178-
// offerBuilder.addResources(portRange(9200, 9200));
179-
//
180-
// scheduler.resourceOffers(driver, singletonList(offerBuilder.build()));
181-
//
182-
// Mockito.verify(driver).declineOffer(offerBuilder.build().getId());
183-
// }
184-
//
185-
// @Test
186-
// public void testResourceOffers_launchTasks() {
187-
// scheduler.tasks = new HashMap<>();
188-
//
189-
// Protos.Offer.Builder offerBuilder = newOffer("host3");
190-
// offerBuilder.addResources(portRange(9200, 9200));
191-
// offerBuilder.addResources(portRange(9300, 9300));
192-
//
193-
// Protos.TaskInfo taskInfo = ProtoTestUtil.getDefaultTaskInfo();
194-
//
195-
// when(taskInfoFactory.createTask(configuration, offerBuilder.build())).thenReturn(taskInfo);
196-
//
197-
// scheduler.resourceOffers(driver, singletonList(offerBuilder.build()));
198-
//
199-
// verify(driver).launchTasks(Collections.singleton(offerBuilder.build().getId()), Collections.singleton(taskInfo));
200-
// }
121+
@Test
122+
public void willDeclineOfferIfStrategyDeclinesOffer() {
123+
Protos.Offer offer = newOffer("host1").build();
124+
125+
when(scheduler.offerStrategy.evaluate(offer)).thenReturn(OfferStrategy.OfferResult.decline("Test"));
126+
127+
scheduler.resourceOffers(driver, singletonList(offer));
128+
129+
verify(driver).declineOffer(offer.getId());
130+
}
131+
132+
@Test
133+
public void testResourceOffers_launchTasks() {
134+
final Protos.Offer offer = newOffer("host3").build();
135+
when(scheduler.offerStrategy.evaluate(offer)).thenReturn(OfferStrategy.OfferResult.accept());
136+
137+
Protos.TaskInfo taskInfo = ProtoTestUtil.getDefaultTaskInfo();
138+
139+
when(taskInfoFactory.createTask(configuration, offer)).thenReturn(taskInfo);
140+
141+
scheduler.resourceOffers(driver, singletonList(offer));
142+
143+
verify(driver).launchTasks(singleton(offer.getId()), singleton(taskInfo));
144+
}
201145

202146
private Protos.Offer.Builder newOffer(String hostname) {
203-
Protos.Offer.Builder builder = newOfferBuilder(UUID.randomUUID().toString(), hostname, UUID.randomUUID().toString(), frameworkID);
204-
builder.addResources(cpus(configuration.getCpus(), configuration.getFrameworkRole()));
205-
builder.addResources(mem(configuration.getMem(), configuration.getFrameworkRole()));
206-
builder.addResources(disk(configuration.getDisk(), configuration.getFrameworkRole()));
207-
return builder;
147+
return newOfferBuilder(UUID.randomUUID().toString(), hostname, UUID.randomUUID().toString(), frameworkID);
208148
}
209149

210150
private Protos.MasterInfo newMasterInfo() {

0 commit comments

Comments
 (0)