Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,22 @@ private static ManagedChannel getChannel(String host, Integer grpcPort) {
}
}

public static void removeChannel(String host) {
ManagedChannel channel = CHANNELS.remove(host);
if (channel != null) {
try {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.warn("Channel shutdown interrupted", e);
}

BLOCKING_STUBS.remove(host);
ASYNC_STUBS.remove(host);
FUTURE_STUBS.remove(host);
log.info("Channel to host: {} removed.", host);
}
}

private static <T extends AbstractStub<T>> AbstractStub.StubFactory<T> getFactory(Class<T> clazz) {
return (channel, callOptions) -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.bigtop.manager.server.enums.HostAuthTypeEnum;
import org.apache.bigtop.manager.server.enums.InstalledStatusEnum;
import org.apache.bigtop.manager.server.exception.ApiException;
import org.apache.bigtop.manager.server.grpc.GrpcClient;
import org.apache.bigtop.manager.server.model.converter.ComponentConverter;
import org.apache.bigtop.manager.server.model.converter.HostConverter;
import org.apache.bigtop.manager.server.model.dto.HostDTO;
Expand Down Expand Up @@ -337,6 +338,9 @@ private Boolean execAgentScript(Long hostId, String action) {
String command = path + "/bigtop-manager-agent/bin/agent.sh " + action;
command = "export GRPC_PORT=" + grpcPort + " ; " + command;

// Remove channel before operations
GrpcClient.removeChannel(hostname);

ShellResult result = execCommandOnRemoteHost(hostDTO, hostname, command);
if (result.getExitCode() != MessageConstants.SUCCESS_CODE) {
log.error("Unable to {} agent, hostname: {}, msg: {}", action, hostname, result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bigtop.manager.server.utils;

import org.apache.bigtop.manager.common.utils.JsonUtils;
import org.apache.bigtop.manager.server.enums.PropertyAction;
import org.apache.bigtop.manager.server.model.dto.AttrsDTO;
import org.apache.bigtop.manager.server.model.dto.PropertyDTO;
Expand All @@ -29,8 +30,6 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import org.springframework.beans.BeanUtils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -79,8 +78,9 @@ public static List<ServiceConfigDTO> mergeServiceConfigs(
// To avoid to change the original configs, we use cloned object
List<ServiceConfigDTO> mergedConfigs = new ArrayList<>();
for (ServiceConfigDTO oriConfig : oriConfigs) {
ServiceConfigDTO mergedConfig = new ServiceConfigDTO();
BeanUtils.copyProperties(oriConfig, mergedConfig);
// Deep clone via JSON serialization/deserialization
ServiceConfigDTO mergedConfig =
JsonUtils.readFromString(JsonUtils.writeAsString(oriConfig), ServiceConfigDTO.class);
mergedConfigs.add(mergedConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,25 @@

<configuration>
<property>
<name>jobmanager.archive.fs.dir</name>
<name>jobmanager_archive_fs_dir</name>
<value>hdfs:///completed-jobs/</value>
<description>Directory for JobManager to store the archives of completed jobs.</description>
</property>
<property>
<name>historyserver.archive.fs.dir</name>
<name>historyserver_archive_fs_dir</name>
<value>hdfs:///completed-jobs/</value>
<description>Comma separated list of directories to fetch archived jobs from.</description>
</property>
<property>
<name>historyserver.web.port</name>
<name>historyserver_web_port</name>
<value>8082</value>
<description>The port under which the web-based HistoryServer listens.</description>
</property>
<property>
<name>historyserver.archive.fs.refresh-interval</name>
<name>historyserver_archive_fs_refresh_interval</name>
<value>10000</value>
<description>Interval in milliseconds for refreshing the monitored directories.</description>
</property>
<property>
<name>security.kerberos.login.keytab</name>
<description>Flink keytab path</description>
<value>none</value>
</property>
<property>
<name>security.kerberos.login.principal</name>
<description>Flink principal name</description>
<value>none</value>
</property>
<!-- flink-conf.yaml -->
<property>
<name>content</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@
# Set Hadoop-specific environment variables here.

USER="$(whoami)"
export JAVA_HOME=${java_home!}
export HADOOP_HOME=${hadoop_home!}
export HADOOP_CONF_DIR=${hadoop_conf_dir!}
export JAVA_HOME=${java_home}
export HADOOP_HOME=${hadoop_home}
export HADOOP_CONF_DIR=${hadoop_conf_dir}
export HADOOP_LOG_DIR=${hadoop_log_dir}
export HADOOP_PID_DIR=${hadoop_pid_dir}
export HADOOP_HEAPSIZE_MAX=${hadoop_heapsize_max}
Expand All @@ -79,7 +79,7 @@ export HADOOP_OS_TYPE=${HADOOP_OS_TYPE:-$(uname -s)}

export HADOOP_ROOT_LOGGER=${hadoop_root_logger}
export HADOOP_LIBEXEC_DIR=${hadoop_libexec_dir}
export LD_LIBRARY_PATH=${hadoop_home!}/lib/native
export LD_LIBRARY_PATH=${hadoop_home}/lib/native
]]>
</value>
<attrs>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<display-name>mapred-env template</display-name>
<description>This is the freemarker template for mapred-env.sh file</description>
<value><![CDATA[
export HADOOP_JOB_HISTORYSERVER_HEAPSIZE=${jobhistory_heapsize!}
export HADOOP_JOB_HISTORYSERVER_HEAPSIZE=${jobhistory_heapsize}

export HADOOP_ROOT_LOGGER=INFO,RFA

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@
</property>
<property>
<name>mapreduce.application.classpath</name>
<value>${hadoop_conf_dir!},${hadoop_home!}/*,${hadoop_home!}/lib/*</value>
<value>${hadoop_conf_dir},${hadoop_home}/*,${hadoop_home}/lib/*</value>
<description>CLASSPATH for MR applications. A comma-separated list of CLASSPATH entries.</description>
</property>
<property>
Expand Down Expand Up @@ -296,7 +296,7 @@
</property>
<property>
<name>mapreduce.admin.user.env</name>
<value>LD_LIBRARY_PATH=${hadoop_home!}/lib/native</value>
<value>LD_LIBRARY_PATH=${hadoop_home}/lib/native</value>
<description>
Additional execution environment entries for map and reduce task processes.
This is not an additive property. You must preserve the original value if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
</property>
<property>
<name>yarn.application.classpath</name>
<value>${hadoop_conf_dir!},${hadoop_home!}/*,${hadoop_home!}/lib/*</value>
<value>${hadoop_conf_dir},${hadoop_home}/*,${hadoop_home}/lib/*</value>
<description>Classpath for typical applications.</description>
</property>
<property>
Expand Down Expand Up @@ -308,7 +308,7 @@
</property>
<property>
<name>yarn.timeline-service.enabled</name>
<value>true</value>
<value>false</value>
<description>
Indicate to clients whether timeline service is enabled or not.
If enabled, clients will put entities and events to the timeline server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
# Set environment variables here.

# The java implementation to use. Java 1.6 required.
export JAVA_HOME=${java_home!}
export JAVA_HOME=${java_home}

# HBase Configuration directory
export HBASE_CONF_DIR=${hbase_conf_dir}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@
# Set KAFKA specific environment variables here.

# The java implementation to use.
export JAVA_HOME=${java_home!}
export JAVA_HOME=${java_home}
<#noparse>export PATH=$PATH:${JAVA_HOME}/bin</#noparse>
export PID_DIR=${kafka_pid_dir!}
export LOG_DIR=${kafka_log_dir!}
export PID_DIR=${kafka_pid_dir}
export LOG_DIR=${kafka_log_dir}
export CLASSPATH=$CLASSPATH:${kafka_conf_dir}
]]>
</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
<description>This is the freemarker template for tez-env.sh file</description>
<value><![CDATA[
# Tez specific configuration
export TEZ_CONF_DIR=${tez_conf_dir!}
export TEZ_CONF_DIR=${tez_conf_dir}

# Set HADOOP_HOME to point to a specific hadoop install directory
export HADOOP_HOME=${hadoop_home!}
export HADOOP_HOME=${hadoop_home}

# The java implementation to use.
export JAVA_HOME=${java_home!}
export JAVA_HOME=${java_home}
]]>
</value>
<attrs>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
<property>
<name>tez.lib.uris.classpath</name>
<display-name>TEZ Lib URIs Classpath</display-name>
<value>${hadoop_conf_dir!},${hadoop_home!}/*,${hadoop_home!}/lib/*,${tez_home!}/*,${tez_home!}/lib/*,${tez_conf_dir!}</value>
<value>${hadoop_conf_dir},${hadoop_home}/*,${hadoop_home}/lib/*,${tez_home}/*,${tez_home}/lib/*,${tez_conf_dir}</value>
<description>
Comma-delimited list of the location of the Tez libraries Classpath which will be localized for DAGs.
</description>
</property>
<property>
<name>tez.lib.uris</name>
<display-name>TEZ Lib URIs</display-name>
<value>${tez_lib_uris!}</value>
<value>${tez_lib_uris}</value>
<description>Comma-delimited list of the location of the Tez libraries which will be localized for DAGs.
Specifying a single .tar.gz or .tgz assumes that a compressed version of the tez libs is being used. This is
uncompressed into a tezlibs directory when running containers, and tezlibs/;tezlibs/lib/ are added to the
Expand Down Expand Up @@ -69,7 +69,7 @@
</property>
<property>
<name>tez.am.launch.cmd-opts</name>
<value>-XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps -XX:+UseNUMA -XX:+UseG1GC -XX:+ResizeTLAB ${heap_dump_opts!}</value>
<value>-XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps -XX:+UseNUMA -XX:+UseG1GC -XX:+ResizeTLAB ${heap_dump_opts}</value>
<description>Java options for the Tez AppMaster process. The Xmx value is derived based on
tez.am.resource.memory.mb and is 80% of the value by default.
Used only if the value is not specified explicitly by the DAG definition.
Expand All @@ -84,7 +84,7 @@
</property>
<property>
<name>tez.am.launch.env</name>
<value>LD_LIBRARY_PATH=${hadoop_home!}/lib/native</value>
<value>LD_LIBRARY_PATH=${hadoop_home}/lib/native</value>
<description>
Additional execution environment entries for tez. This is not an additive property. You must preserve the
original value if
Expand All @@ -101,7 +101,7 @@
</property>
<property>
<name>tez.task.launch.cmd-opts</name>
<value>-XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps -XX:+UseNUMA -XX:+UseG1GC -XX:+ResizeTLAB ${heap_dump_opts!}</value>
<value>-XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps -XX:+UseNUMA -XX:+UseG1GC -XX:+ResizeTLAB ${heap_dump_opts}</value>
<description>Java options for tasks. The Xmx value is derived based on tez.task.resource.memory.mb and is 80% of
this value by default.
Used only if the value is not specified explicitly by the DAG definition.
Expand All @@ -116,7 +116,7 @@
</property>
<property>
<name>tez.task.launch.env</name>
<value>LD_LIBRARY_PATH=${hadoop_home!}/lib/native</value>
<value>LD_LIBRARY_PATH=${hadoop_home}/lib/native</value>
<description>
Additional execution environment entries for tez. This is not an additive property. You must preserve the
original value if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@
<display-name>zookeeper-env template</display-name>
<description>This is the freemarker template for zookeeper-env.sh file</description>
<value><![CDATA[
export JAVA_HOME=${java_home!}
export ZOOKEEPER_HOME=${zookeeper_home!}
export ZOO_LOG_DIR=${zookeeper_log_dir!}
export ZOOPIDFILE=${zookeeper_pid_file!}
export SERVER_JVMFLAGS=${SERVER_JVMFLAGS!}
export JAVA_HOME=${java_home}
export ZOOKEEPER_HOME=${zookeeper_home}
export ZOO_LOG_DIR=${zookeeper_log_dir}
export ZOOPIDFILE=${zookeeper_pid_file}
export SERVER_JVMFLAGS=${SERVER_JVMFLAGS}
<#noparse>export JAVA=${JAVA_HOME}/bin/java</#noparse>
export CLASSPATH=$CLASSPATH:/usr/share/zookeeper/*

<#if securityEnabled?? && securityEnabled >
export SERVER_JVMFLAGS="$SERVER_JVMFLAGS -Djava.security.auth.login.config=${zk_server_jaas_file!}"
export CLIENT_JVMFLAGS="$CLIENT_JVMFLAGS -Djava.security.auth.login.config=${zk_client_jaas_file!} -Dzookeeper.sasl.client.username=${zk_principal_user!}"
export SERVER_JVMFLAGS="$SERVER_JVMFLAGS -Djava.security.auth.login.config=${zk_server_jaas_file}"
export CLIENT_JVMFLAGS="$CLIENT_JVMFLAGS -Djava.security.auth.login.config=${zk_client_jaas_file} -Dzookeeper.sasl.client.username=${zk_principal_user}"
</#if>
]]>
</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@
# Set KAFKA specific environment variables here.

# The java implementation to use.
export JAVA_HOME=${java_home!}
export JAVA_HOME=${java_home}
<#noparse>export PATH=$PATH:${JAVA_HOME}/bin</#noparse>
export PID_DIR=${kafka_pid_dir!}
export LOG_DIR=${kafka_log_dir!}
export PID_DIR=${kafka_pid_dir}
export LOG_DIR=${kafka_log_dir}
export CLASSPATH=$CLASSPATH:${kafka_conf_dir}
]]>
</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@
<display-name>zookeeper-env template</display-name>
<description>This is the freemarker template for zookeeper-env.sh file</description>
<value><![CDATA[
export JAVA_HOME=${java_home!}
export ZOOKEEPER_HOME=${zookeeper_home!}
export ZOO_LOG_DIR=${zookeeper_log_dir!}
export ZOOPIDFILE=${zookeeper_pid_file!}
export SERVER_JVMFLAGS=${SERVER_JVMFLAGS!}
export JAVA_HOME=${java_home}
export ZOOKEEPER_HOME=${zookeeper_home}
export ZOO_LOG_DIR=${zookeeper_log_dir}
export ZOOPIDFILE=${zookeeper_pid_file}
export SERVER_JVMFLAGS=${SERVER_JVMFLAGS}
<#noparse>export JAVA=${JAVA_HOME}/bin/java</#noparse>
export CLASSPATH=$CLASSPATH:/usr/share/zookeeper/*

<#if securityEnabled?? && securityEnabled >
export SERVER_JVMFLAGS="$SERVER_JVMFLAGS -Djava.security.auth.login.config=${zk_server_jaas_file!}"
export CLIENT_JVMFLAGS="$CLIENT_JVMFLAGS -Djava.security.auth.login.config=${zk_client_jaas_file!} -Dzookeeper.sasl.client.username=${zk_principal_user!}"
export SERVER_JVMFLAGS="$SERVER_JVMFLAGS -Djava.security.auth.login.config=${zk_server_jaas_file}"
export CLIENT_JVMFLAGS="$CLIENT_JVMFLAGS -Djava.security.auth.login.config=${zk_client_jaas_file} -Dzookeeper.sasl.client.username=${zk_principal_user}"
</#if>
]]>
</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,35 +47,19 @@ public class FlinkParams extends BigtopParams {
private String flinkLog4jConsolePropertiesContent;
private String flinkLog4jSessionPropertiesContent;

private String jobManagerArchiveFsDir;
private String historyServerWebPort;
private String historyServerArchiveFsDir;
private String historyServerArchiveFsRefreshInterval;

public FlinkParams(ComponentCommandPayload componentCommandPayload) {
super(componentCommandPayload);
globalParamsMap.put("flink_user", user());
globalParamsMap.put("flink_group", group());
globalParamsMap.put("java_home", javaHome());
globalParamsMap.put("hadoop_home", hadoopHome());
globalParamsMap.put("hadoop_conf_dir", hadoopConfDir());

globalParamsMap.put("jobmanager_archive_fs_dir", jobManagerArchiveFsDir);
globalParamsMap.put("historyserver_web_port", historyServerWebPort);
globalParamsMap.put("historyserver_archive_fs_dir", historyServerArchiveFsDir);
globalParamsMap.put("historyserver_archive_fs_refresh_interval", historyServerArchiveFsRefreshInterval);
}

@GlobalParams
public Map<String, Object> flinkConf() {
Map<String, Object> configurations = LocalSettings.configurations(getServiceName(), "flink-conf");
flinkConfContent = (String) configurations.get("content");

jobManagerArchiveFsDir = (String) configurations.get("jobmanager.archive.fs.dir");
historyServerWebPort = (String) configurations.get("historyserver.web.port");
historyServerArchiveFsDir = (String) configurations.get("historyserver.archive.fs.dir");
historyServerArchiveFsRefreshInterval =
(String) configurations.get("historyserver.archive.fs.refresh-interval");
return configurations;
}

Expand Down
Loading