Skip to content

Commit d2bab39

Browse files
committed
Merge branch 'master' into scaling-visualization
2 parents 0164105 + 580ec16 commit d2bab39

File tree

25 files changed

+843
-295
lines changed

25 files changed

+843
-295
lines changed

README.md

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -209,22 +209,40 @@ Usage: (Options preceded by an asterisk are required) [options]
209209

210210
The web based user interface is available on port 31100 of the scheduler by default. It displays real time information about the tasks running in the cluster and a basic configuration overview of the cluster.
211211

212-
The user interface uses REST API of the Elasticsearch Mesos Framework. You can find the API documentation here: [docs.elasticsearchmesosui.apiary.io](http://docs.elasticsearchmesosui.apiary.io/).
212+
The user interface uses REST API of the Elasticsearch Mesos Framework. You can find the API documentation here: [docs.elasticsearchmesos.apiary.io](http://docs.elasticsearchmesos.apiary.io/).
213213

214214
#### Cluster Overview
215215

216-
![Tasks List](docs/screenshot-cluster.png)
216+
![Cluster Overview](docs/screenshot-cluster.png)
217217

218-
Cluster overview page shows on the top the number of Elasticsearch nodes in the cluster, the overall amount of RAM and disk space allocated by the cluster. State of individual nodes is displayed in a bar, one color representing each state and the percentage of nodes being in this state.
218+
Cluster page shows on the top the number of Elasticsearch nodes in the cluster, the overall amount of RAM and disk space allocated by the cluster. State of individual nodes is displayed in a bar, one color representing each state and the percentage of nodes being in this state.
219219

220-
Below you can find Configuration Overview section and Query Browser, that allows you to examine data stored on individual Elasticsearch nodes.
220+
Below you can see Performance Overview with the following metrics over time: number of indices, number of shards, number of documents in the cluster and the cluster data size.
221+
222+
#### Scaling
223+
224+
![Scaling](docs/screenshot-scaling.png)
225+
226+
This simple interface allows you to specify a number of nodes to scale to.
221227

222228
#### Tasks List
223229

224230
![Tasks List](docs/screenshot-tasks.png)
225231

226232
Tasks list displays detailed information about all tasks in the cluster, not only those currently running, but also tasks being staged, finished or failed. Click through individual tasks to get access to Elasticsearch REST API.
227233

234+
#### Configuration
235+
236+
![Configuration](docs/screenshot-configuration.png)
237+
238+
This is a read-only interface displaying an overview of the framework configuration.
239+
240+
#### Query Browser
241+
242+
![Query Browser](docs/screenshot-query-browser.png)
243+
244+
Query Browser allows you to examine data stored on individual Elasticsearch nodes. In this example we searched for the word "Love" on `slave1` node. You can toggle between tabular view and raw results view mode, which displays the raw data returned from Elasticsearch `/_search` API endpoint.
245+
228246
### Known issues
229247

230248
- Issue [#188](https://github.com/mesos/elasticsearch/issues/188): Database data IS NOT persisted to disk. Data storage is wholly reliant on cluster redundancy. This means that the framework is not yet recommended for production use.

docs/screenshot-cluster.png

-224 KB
Loading

docs/screenshot-configuration.png

140 KB
Loading

docs/screenshot-query-browser.png

114 KB
Loading

docs/screenshot-scaling.png

54.4 KB
Loading

docs/screenshot-tasks.png

-118 KB
Loading

scheduler/src/main/java/org/apache/mesos/elasticsearch/scheduler/ElasticsearchScheduler.java

Lines changed: 24 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
import org.apache.mesos.SchedulerDriver;
88
import org.apache.mesos.elasticsearch.scheduler.cluster.ClusterMonitor;
99
import org.apache.mesos.elasticsearch.scheduler.state.ClusterState;
10+
import org.apache.mesos.elasticsearch.scheduler.state.ESTaskStatus;
1011
import org.apache.mesos.elasticsearch.scheduler.state.FrameworkState;
12+
import org.apache.mesos.elasticsearch.scheduler.state.StatePath;
1113

1214
import java.util.*;
1315

@@ -28,6 +30,7 @@ public class ElasticsearchScheduler implements Scheduler {
2830
private Observable statusUpdateWatchers = new StatusUpdateObservable();
2931
private Boolean registered = false;
3032
private ClusterState clusterState;
33+
OfferStrategy offerStrategy;
3134

3235
public ElasticsearchScheduler(Configuration configuration, TaskInfoFactory taskInfoFactory) {
3336
this.configuration = configuration;
@@ -65,7 +68,10 @@ public void registered(SchedulerDriver driver, Protos.FrameworkID frameworkId, P
6568
LOGGER.info("Framework registered as " + frameworkId.getValue());
6669

6770
clusterState = new ClusterState(configuration.getState(), frameworkState); // Must use new framework state. This is when we are allocated our FrameworkID.
68-
clusterMonitor = new ClusterMonitor(configuration, this, driver, clusterState);
71+
offerStrategy = new OfferStrategy(configuration, clusterState);
72+
clusterMonitor = new ClusterMonitor(configuration, this, driver, new StatePath(configuration.getState()));
73+
clusterState.getTaskList().forEach(clusterMonitor::startMonitoringTask); // Get all previous executors and start monitoring them.
74+
statusUpdateWatchers.addObserver(clusterState);
6975
statusUpdateWatchers.addObserver(clusterMonitor);
7076

7177
List<Protos.Resource> resources = Resources.buildFrameworkResources(configuration);
@@ -84,62 +90,29 @@ public void reregistered(SchedulerDriver driver, Protos.MasterInfo masterInfo) {
8490
LOGGER.info("Framework re-registered");
8591
}
8692

87-
// Todo, this massive if statement needs to be performed better.
8893
@Override
8994
public void resourceOffers(SchedulerDriver driver, List<Protos.Offer> offers) {
9095
if (!registered) {
9196
LOGGER.debug("Not registered, can't accept resource offers.");
9297
return;
9398
}
9499
for (Protos.Offer offer : offers) {
95-
if (isHostAlreadyRunningTask(offer)) {
96-
driver.declineOffer(offer.getId()); // DCOS certification 05
97-
LOGGER.info("Declined offer: Host " + offer.getHostname() + " is already running an Elastisearch task");
98-
} else if (clusterMonitor.getClusterState().getTaskList().size() == configuration.getElasticsearchNodes()) {
99-
driver.declineOffer(offer.getId()); // DCOS certification 05
100-
LOGGER.info("Declined offer: Mesos runs already runs " + configuration.getElasticsearchNodes() + " Elasticsearch tasks");
101-
} else if (!containsTwoPorts(offer.getResourcesList())) {
102-
LOGGER.info("Declined offer: Offer did not contain 2 ports for Elasticsearch client and transport connection");
103-
driver.declineOffer(offer.getId());
104-
} else if (!isEnoughCPU(offer.getResourcesList())) {
105-
LOGGER.info("Declined offer: Not enough CPU resources");
106-
driver.declineOffer(offer.getId());
107-
} else if (!isEnoughRAM(offer.getResourcesList())) {
108-
LOGGER.info("Declined offer: Not enough RAM resources");
109-
driver.declineOffer(offer.getId());
110-
} else if (!isEnoughDisk(offer.getResourcesList())) {
111-
LOGGER.info("Not enough Disk resources");
100+
final OfferStrategy.OfferResult result = offerStrategy.evaluate(offer);
101+
102+
if (!result.acceptable) {
103+
LOGGER.debug("Declined offer: " + result.reason.orElse("Unknown"));
112104
driver.declineOffer(offer.getId());
113105
} else {
114-
LOGGER.info("Accepted offer: " + offer.getHostname());
115106
Protos.TaskInfo taskInfo = taskInfoFactory.createTask(configuration, offer);
116107
LOGGER.debug(taskInfo.toString());
117108
driver.launchTasks(Collections.singleton(offer.getId()), Collections.singleton(taskInfo));
118-
clusterMonitor.monitorTask(taskInfo); // Add task to cluster monitor
109+
ESTaskStatus esTask = new ESTaskStatus(configuration.getState(), configuration.getFrameworkId(), taskInfo, new StatePath(configuration.getState())); // Write staging state to zk
110+
clusterState.addTask(esTask); // Add tasks to cluster state and write to zk
111+
clusterMonitor.startMonitoringTask(esTask); // Add task to cluster monitor
119112
}
120113
}
121114
}
122115

123-
private boolean isEnoughDisk(List<Protos.Resource> resourcesList) {
124-
ResourceCheck resourceCheck = new ResourceCheck(Resources.RESOURCE_DISK);
125-
return resourceCheck.isEnough(resourcesList, configuration.getDisk());
126-
}
127-
128-
private boolean isEnoughCPU(List<Protos.Resource> resourcesList) {
129-
ResourceCheck resourceCheck = new ResourceCheck(Resources.RESOURCE_CPUS);
130-
return resourceCheck.isEnough(resourcesList, configuration.getCpus());
131-
}
132-
133-
private boolean isEnoughRAM(List<Protos.Resource> resourcesList) {
134-
ResourceCheck resourceCheck = new ResourceCheck(Resources.RESOURCE_MEM);
135-
return resourceCheck.isEnough(resourcesList, configuration.getMem());
136-
}
137-
138-
private boolean containsTwoPorts(List<Protos.Resource> resources) {
139-
int count = Resources.selectTwoPortsFromRange(resources).size();
140-
return count == 2;
141-
}
142-
143116
@Override
144117
public void offerRescinded(SchedulerDriver driver, Protos.OfferID offerId) {
145118
LOGGER.info("Offer " + offerId.getValue() + " rescinded");
@@ -171,8 +144,15 @@ public void executorLost(SchedulerDriver driver, Protos.ExecutorID executorId, P
171144
// This is never called by Mesos, so we have to call it ourselves via a healthcheck
172145
// https://issues.apache.org/jira/browse/MESOS-313
173146
LOGGER.info("Executor lost: " + executorId.getValue() +
174-
"on slave " + slaveId.getValue() +
175-
"with status " + status);
147+
" on slave " + slaveId.getValue() +
148+
" with status " + status);
149+
try {
150+
Protos.TaskInfo taskInfo = clusterState.getTask(executorId);
151+
statusUpdate(driver, Protos.TaskStatus.newBuilder().setExecutorId(executorId).setSlaveId(slaveId).setTaskId(taskInfo.getTaskId()).setState(Protos.TaskState.TASK_LOST).build());
152+
driver.killTask(taskInfo.getTaskId()); // It may not actually be lost, it may just have hanged. So Kill, just in case.
153+
} catch (IllegalArgumentException e) {
154+
LOGGER.warn("Unable to find TaskInfo with the given Executor ID", e);
155+
}
176156
}
177157

178158
@Override
@@ -182,7 +162,7 @@ public void error(SchedulerDriver driver, String message) {
182162

183163
private boolean isHostAlreadyRunningTask(Protos.Offer offer) {
184164
Boolean result = false;
185-
List<Protos.TaskInfo> stateList = clusterMonitor.getClusterState().getTaskList();
165+
List<Protos.TaskInfo> stateList = clusterState.getTaskList();
186166
for (Protos.TaskInfo t : stateList) {
187167
if (t.getSlaveId().equals(offer.getSlaveId())) {
188168
result = true;
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package org.apache.mesos.elasticsearch.scheduler;
2+
3+
import org.apache.log4j.Logger;
4+
import org.apache.mesos.Protos;
5+
import org.apache.mesos.elasticsearch.scheduler.state.ClusterState;
6+
7+
import java.util.List;
8+
import java.util.Optional;
9+
10+
import static java.util.Arrays.asList;
11+
12+
/**
13+
* Offer strategy
14+
*/
15+
public class OfferStrategy {
16+
private static final Logger LOGGER = Logger.getLogger(ElasticsearchScheduler.class.toString());
17+
private ClusterState clusterState;
18+
private Configuration configuration;
19+
20+
private List<OfferRule> acceptanceRules = asList(
21+
new OfferRule("Host already running task", this::isHostAlreadyRunningTask),
22+
new OfferRule("Cluster size already fulfilled", offer -> clusterState.getTaskList().size() == configuration.getElasticsearchNodes()),
23+
new OfferRule("Offer did not have 2 ports", offer -> !containsTwoPorts(offer.getResourcesList())),
24+
new OfferRule("Offer did not have enough CPU resources", offer -> !isEnoughCPU(configuration, offer.getResourcesList())),
25+
new OfferRule("Offer did not have enough RAM resources", offer -> !isEnoughRAM(configuration, offer.getResourcesList())),
26+
new OfferRule("Offer did not have enough disk resources", offer -> !isEnoughDisk(configuration, offer.getResourcesList()))
27+
);
28+
29+
public OfferStrategy(Configuration configuration, ClusterState clusterState) {
30+
this.clusterState = clusterState;
31+
this.configuration = configuration;
32+
}
33+
34+
public OfferResult evaluate(Protos.Offer offer) {
35+
final Optional<OfferRule> decline = acceptanceRules.stream().filter(offerRule -> offerRule.rule.accepts(offer)).limit(1).findFirst();
36+
if (decline.isPresent()) {
37+
return OfferResult.decline(decline.get().declineReason);
38+
}
39+
40+
LOGGER.info("Accepted offer: " + offer.getHostname());
41+
return OfferResult.accept();
42+
}
43+
44+
/**
45+
* Offer result
46+
*/
47+
public static class OfferResult {
48+
final boolean acceptable;
49+
final Optional<String> reason;
50+
51+
private OfferResult(boolean acceptable, Optional<String> reason) {
52+
this.acceptable = acceptable;
53+
this.reason = reason;
54+
}
55+
56+
public static OfferResult accept() {
57+
return new OfferResult(true, Optional.<String>empty());
58+
}
59+
60+
public static OfferResult decline(String reason) {
61+
return new OfferResult(false, Optional.of(reason));
62+
}
63+
}
64+
65+
private boolean isHostAlreadyRunningTask(Protos.Offer offer) {
66+
Boolean result = false;
67+
List<Protos.TaskInfo> stateList = clusterState.getTaskList();
68+
for (Protos.TaskInfo t : stateList) {
69+
if (t.getSlaveId().equals(offer.getSlaveId())) {
70+
result = true;
71+
}
72+
}
73+
return result;
74+
}
75+
private boolean isEnoughDisk(Configuration configuration, List<Protos.Resource> resourcesList) {
76+
return new ResourceCheck(Resources.RESOURCE_DISK).isEnough(resourcesList, configuration.getDisk());
77+
}
78+
79+
private boolean isEnoughCPU(Configuration configuration, List<Protos.Resource> resourcesList) {
80+
return new ResourceCheck(Resources.RESOURCE_CPUS).isEnough(resourcesList, configuration.getCpus());
81+
}
82+
83+
private boolean isEnoughRAM(Configuration configuration, List<Protos.Resource> resourcesList) {
84+
return new ResourceCheck(Resources.RESOURCE_MEM).isEnough(resourcesList, configuration.getMem());
85+
}
86+
87+
private boolean containsTwoPorts(List<Protos.Resource> resources) {
88+
return Resources.selectTwoPortsFromRange(resources).size() == 2;
89+
}
90+
91+
/**
92+
* Rule and reason container object
93+
*/
94+
private static class OfferRule {
95+
String declineReason;
96+
Rule rule;
97+
98+
public OfferRule(String declineReason, Rule rule) {
99+
this.declineReason = declineReason;
100+
this.rule = rule;
101+
}
102+
}
103+
104+
/**
105+
* Interface for checking offers
106+
*/
107+
@FunctionalInterface
108+
private interface Rule {
109+
boolean accepts(Protos.Offer offer);
110+
}
111+
}

0 commit comments

Comments
 (0)