Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import static org.apache.bigtop.manager.common.constants.CacheFiles.CLUSTER_INFO;
import static org.apache.bigtop.manager.common.constants.CacheFiles.COMPONENTS_INFO;
import static org.apache.bigtop.manager.common.constants.CacheFiles.CONFIGURATIONS_INFO;
import static org.apache.bigtop.manager.common.constants.CacheFiles.HOSTS_INFO;
import static org.apache.bigtop.manager.common.constants.CacheFiles.REPOS_INFO;
Expand All @@ -48,17 +50,21 @@ public class JobCacheServiceGrpcImpl extends JobCacheServiceGrpc.JobCacheService
public void save(JobCacheRequest request, StreamObserver<JobCacheReply> responseObserver) {
try {
JobCachePayload payload = JsonUtils.readFromString(request.getPayload(), JobCachePayload.class);
String cacheDir = ProjectPathUtils.getAgentCachePath();
String cacheDir = ProjectPathUtils.getAgentCachePath() + File.separator + payload.getClusterId();
Path p = Paths.get(cacheDir);
if (!Files.exists(p)) {
Files.createDirectories(p);
}

String dir = p.getParent().toFile().getAbsolutePath();
JsonUtils.writeToFile(dir + "/current", payload.getCurrentClusterId());

JsonUtils.writeToFile(cacheDir + CONFIGURATIONS_INFO, payload.getConfigurations());
JsonUtils.writeToFile(cacheDir + HOSTS_INFO, payload.getComponentHosts());
JsonUtils.writeToFile(cacheDir + COMPONENTS_INFO, payload.getComponentHosts());
JsonUtils.writeToFile(cacheDir + USERS_INFO, payload.getUserInfo());
JsonUtils.writeToFile(cacheDir + REPOS_INFO, payload.getRepoInfo());
JsonUtils.writeToFile(cacheDir + CLUSTER_INFO, payload.getClusterInfo());
JsonUtils.writeToFile(cacheDir + HOSTS_INFO, payload.getHosts());

JobCacheReply reply = JobCacheReply.newBuilder()
.setCode(MessageConstants.SUCCESS_CODE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class FileUtils {

public static String readFile2Str(String filename) {
File file = new File(filename);
return readFile2Str(file);
}

/**
* Get Content
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import lombok.Data;

import java.util.List;
import java.util.Map;

@Data
public class ComponentCommandPayload {
Expand All @@ -46,10 +45,4 @@ public class ComponentCommandPayload {
private List<PackageSpecificInfo> packageSpecifics;

private List<TemplateInfo> templates;

/**
* This field is exclusively used for Prometheus and Grafana within the infra services.
* Includes cluster and corresponding hostname.
*/
private Map<String, List<String>> clusterHosts;
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
@Data
public class JobCachePayload {

private Long currentClusterId;

private Long clusterId;

private ClusterInfo clusterInfo;
Expand All @@ -40,4 +42,6 @@ public class JobCachePayload {
private Map<String, Map<String, String>> configurations;

private Map<String, List<String>> componentHosts;

private List<String> hosts;
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.apache.bigtop.manager.common.constants.Constants.ALL_HOST_KEY;

public class JobCacheHelper {

private static ClusterDao clusterDao;
Expand Down Expand Up @@ -90,19 +88,31 @@ public static void sendJobCache(Long jobId, List<String> hostnames) {

List<CompletableFuture<Boolean>> futures = new ArrayList<>();
for (HostPO hostPO : hostPOList) {
genClusterPayload(payload, hostPO.getClusterId());
JobCacheRequest request = JobCacheRequest.newBuilder()
.setJobId(jobId)
.setPayload(JsonUtils.writeAsString(payload))
.build();
futures.add(CompletableFuture.supplyAsync(() -> {
JobCacheServiceGrpc.JobCacheServiceBlockingStub stub = GrpcClient.getBlockingStub(
hostPO.getHostname(),
hostPO.getGrpcPort(),
JobCacheServiceGrpc.JobCacheServiceBlockingStub.class);
JobCacheReply reply = stub.save(request);
return reply != null && reply.getCode() == MessageConstants.SUCCESS_CODE;
}));
payload.setCurrentClusterId(hostPO.getClusterId());

List<Long> clusterIds = new ArrayList<>();
if (hostRequiresAllData(hostPO.getHostname())) {
clusterIds.addAll(
clusterDao.findAll().stream().map(ClusterPO::getId).toList());
} else {
clusterIds.add(hostPO.getClusterId());
}

for (Long clusterId : clusterIds) {
genClusterPayload(payload, clusterId);
JobCacheRequest request = JobCacheRequest.newBuilder()
.setJobId(jobId)
.setPayload(JsonUtils.writeAsString(payload))
.build();
futures.add(CompletableFuture.supplyAsync(() -> {
JobCacheServiceGrpc.JobCacheServiceBlockingStub stub = GrpcClient.getBlockingStub(
hostPO.getHostname(),
hostPO.getGrpcPort(),
JobCacheServiceGrpc.JobCacheServiceBlockingStub.class);
JobCacheReply reply = stub.save(request);
return reply != null && reply.getCode() == MessageConstants.SUCCESS_CODE;
}));
}
}

List<Boolean> results = futures.stream()
Expand Down Expand Up @@ -139,22 +149,22 @@ private static void genClusterPayload(JobCachePayload payload, Long clusterId) {
Map<String, List<String>> componentHostMap = payload.getComponentHosts();
componentHostMap.putAll(getComponentHostMap(clusterId));

List<String> hosts = hostDao.findAllByClusterId(clusterId).stream()
.map(HostPO::getHostname)
.toList();

payload.setClusterId(clusterId);
payload.setClusterInfo(clusterInfo);
payload.setConfigurations(serviceConfigMap);
payload.setComponentHosts(componentHostMap);
payload.setHosts(hosts);
}

private static void genGlobalPayload(JobCachePayload payload) {
List<RepoPO> repoPOList = repoDao.findAll();
List<HostPO> hostPOList = hostDao.findAll();

Map<String, Map<String, String>> serviceConfigMap = getServiceConfigMap(0L);

Map<String, List<String>> componentHostMap = new HashMap<>();
List<String> allHostnames = hostPOList.stream().map(HostPO::getHostname).toList();
componentHostMap.put(ALL_HOST_KEY, allHostnames);
componentHostMap.putAll(getComponentHostMap(0L));
Map<String, List<String>> componentHostMap = new HashMap<>(getComponentHostMap(0L));

List<RepoInfo> repoList = new ArrayList<>();
repoPOList.forEach(repoPO -> {
Expand Down Expand Up @@ -222,6 +232,16 @@ private static Map<String, List<String>> getComponentHostMap(Long clusterId) {

private static Boolean hostRequiresAllData(String hostname) {
// Some services like prometheus requires all clusters info to collect metrics.
List<ComponentPO> components = componentDao.findByQuery(
ComponentQuery.builder().hostname(hostname).build());
for (ComponentPO component : components) {
ServiceDTO serviceDTO = StackUtils.getServiceDTOByComponentName(component.getName());
StackDTO stack = StackUtils.getServiceStack(serviceDTO.getName());
if (stack.getStackName().equals("infra")) {
return true;
}
}

return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,13 @@
package org.apache.bigtop.manager.server.command.stage;

import org.apache.bigtop.manager.dao.po.ClusterPO;
import org.apache.bigtop.manager.dao.po.HostPO;
import org.apache.bigtop.manager.dao.repository.ClusterDao;
import org.apache.bigtop.manager.server.command.task.TaskContext;
import org.apache.bigtop.manager.server.holder.SpringContextHolder;
import org.apache.bigtop.manager.server.model.dto.ComponentDTO;
import org.apache.bigtop.manager.server.model.dto.ServiceDTO;
import org.apache.bigtop.manager.server.utils.StackUtils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public abstract class AbstractComponentStage extends AbstractStage {

private ClusterDao clusterDao;
Expand Down Expand Up @@ -78,23 +72,6 @@ protected TaskContext createTaskContext(String hostname) {
taskContext.setServiceUser(serviceDTO.getUser());
taskContext.setUserGroup(clusterPO == null ? null : clusterPO.getUserGroup());
taskContext.setRootDir(clusterPO == null ? null : clusterPO.getRootDir());

Map<String, Object> properties = new HashMap<>();
properties.put("clusterHosts", getClusterHosts());
taskContext.setProperties(properties);
return taskContext;
}

protected Map<String, List<String>> getClusterHosts() {
Map<String, List<String>> clusterHosts = new HashMap<>();
for (ClusterPO clusterPO : clusterDao.findAll()) {
List<String> hosts = new ArrayList<>();
for (HostPO hostPO : hostDao.findAllByClusterId(clusterPO.getId())) {
String host = hostPO.getHostname();
hosts.add(host);
}
clusterHosts.put(clusterPO.getName(), hosts);
}
return clusterHosts;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public abstract class AbstractComponentTask extends AbstractTask {

Expand Down Expand Up @@ -83,12 +82,6 @@ protected ComponentCommandRequest getComponentCommandRequest() {
payload.setTemplates(convertTemplateInfo(serviceDTO.getName(), serviceDTO.getTemplates()));
payload.setPackageSpecifics(convertPackageSpecificInfo(serviceDTO.getPackageSpecifics()));

Map<String, Object> properties = taskContext.getProperties();
if (stackDTO.getStackName().equals("infra")) {
Map<String, List<String>> clusterHosts = (Map<String, List<String>>) properties.get("clusterHosts");
payload.setClusterHosts(clusterHosts);
}

ComponentCommandRequest.Builder requestBuilder = ComponentCommandRequest.newBuilder();
requestBuilder.setPayload(JsonUtils.writeAsString(payload));
requestBuilder.setTaskId(getTaskPO().getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,20 @@ private static void parseServiceConfigurations(File file, String serviceName) {
private static void parseServiceTemplates(File file, String serviceName) {
File templateFolder = new File(file.getAbsolutePath(), TEMPLATE_FOLDER);
if (templateFolder.exists()) {
for (File templateFile :
Optional.ofNullable(templateFolder.listFiles()).orElse(new File[0])) {
String filename = templateFile.getName();
String content = FileUtils.readFile2Str(templateFile);
Map<String, String> map = SERVICE_TEMPLATE_MAP.computeIfAbsent(serviceName, k -> new HashMap<>());
map.put(filename, content);
Map<String, String> map = SERVICE_TEMPLATE_MAP.computeIfAbsent(serviceName, k -> new HashMap<>());
parseTemplateFiles(templateFolder, templateFolder, map);
}
}

private static void parseTemplateFiles(File templateRoot, File currentFolder, Map<String, String> templateMap) {
for (File file : Optional.ofNullable(currentFolder.listFiles()).orElse(new File[0])) {
if (file.isDirectory()) {
parseTemplateFiles(templateRoot, file, templateMap);
} else {
String relativePath =
templateRoot.toURI().relativize(file.toURI()).getPath();
String content = FileUtils.readFile2Str(file);
templateMap.put(relativePath, content);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@
<templates>
<template>
<src>cluster-dashboard.json</src>
<dest>conf/provisioning/dashboards/cluster</dest>
<dest>conf/provisioning/dashboards/cluster/cluster-dashboard.json</dest>
</template>
<template>
<src>host-dashboard.json</src>
<dest>conf/provisioning/dashboards/host</dest>
<dest>conf/provisioning/dashboards/host/host-dashboard.json</dest>
</template>
</templates>

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ global:
# Rule files specifies a list of globs. Rules and alerts are read from
# all matching files.
rule_files:
<#if rules_file_name??>
- ${rules_file_name}
</#if>
- rules/zookeeper.yml

# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,12 @@
</packages>
</package-specific>
</package-specifics>

<templates>
<template>
<src>rules/zookeeper.yml</src>
<dest>rules/zookeeper.yml</dest>
</template>
</templates>
</service>
</metainfo>
</metainfo>
Loading
Loading