Skip to content

optimize:优化seata-server日志采集配置 #4648

@lingxiao-wu

Description

@lingxiao-wu

Why you need it?

Is your feature request related to a problem? Please describe in details
目前将seata-server日志采集到Kafka或者logstash需要手动修改logbcak-spring.xml,当需要采集到kafka时,kafka生产者的一些配置也需要在kafka-appender.xml,配置较为分散,不好统一管理,可以考虑将配置统一收到logging.extend管理。

How it could be?

A clear and concise description of what you want to happen. You can explain more about input of the feature, and output of it.
配置统一管理后,如果需要开启或关闭日志采集功能,不需要在手动修改logbcak-spring.xml,只需修改application.yml重新打包即可,也可以通过java -jar -Dxxx 开启或关闭日志上报功能,如果接入配置中心,只需修改配置中心配置,在重启seata-server服务即可,配置管理将十分方便。

Other related information

Add any other context or screenshots about the feature request here.
可以参照spring boot整合logback原理实现,伪代码如下:

package io.seata.server.logging.logback;

import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.encoder.PatternLayoutEncoder;
import com.github.danielwegener.logback.kafka.KafkaAppender;
import io.seata.common.util.StringUtils;
import io.seata.server.logging.logback.appender.EnhancedLogstashEncoder;
import net.logstash.logback.appender.LogstashTcpSocketAppender;
import org.slf4j.ILoggerFactory;
import org.slf4j.impl.StaticLoggerBinder;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.context.event.ApplicationEnvironmentPreparedEvent;
import org.springframework.boot.context.logging.LoggingApplicationListener;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.event.GenericApplicationListener;
import org.springframework.core.ResolvableType;
import org.springframework.core.env.ConfigurableEnvironment;

import java.util.Objects;

/**
 * @author wlx
 * @date 2022/5/27 11:18 下午
 */
public class SeataLogbackLoggingExtendListener implements GenericApplicationListener {


    private static final String ROOT = "ROOT";

    private static final String KAFKA_TOPIC = "logging.extend.kafka-appender.topic";

    private static final String KAFKA_BOOTSTRAP_SERVERS = "logging.extend.kafka-appender.bootstrap-servers";

    private static final String LOGSTASH_DESTINATION = "logging.extend.logstash-appender.destination";

    private static final String APP_NAME = "spring.application.name";

    private static final String DEFAULT_APP_NAME = "seata-server";

    private static final String LOGSTASH = "LOGSTASH";

    private static final String KAFKA = "KAFKA";

    private static final String KAFKA_KAFKA_PATTERN = "{\n" +
            "    \"@timestamp\": \"%d{yyyy-MM-dd HH:mm:ss.SSS}\",\n" +
            "    \"level\":\"%p\",\n" +
            "    \"app_name\":\"${APPLICATION_NAME:-seata-server}\",\n" +
            "    \"PORT\": ${RPC_PORT:-0},\n" +
            "    \"thread_name\": \"%t\",\n" +
            "    \"logger_name\": \"%logger\",\n" +
            "    \"X-TX-XID\": \"%X{X-TX-XID:-}\",\n" +
            "    \"X-TX-BRANCH-ID\": \"%X{X-TX-BRANCH-ID:-}\",\n" +
            "    \"message\": \"%m\",\n" +
            "    \"stack_trace\": \"%wex\"\n" +
            "}";


    @Override
    public boolean supportsSourceType(Class<?> sourceType) {
        if (Objects.isNull(sourceType)) {
            return false;
        } else {
            return SpringApplication.class.isAssignableFrom(sourceType)
                    || ApplicationContext.class.isAssignableFrom(sourceType);
        }
    }

    @Override
    public boolean supportsEventType(ResolvableType eventType) {
        Class<?> typeRawClass = eventType.getRawClass();
        if (Objects.isNull(typeRawClass)) {
            return false;
        } else {
            return ApplicationEnvironmentPreparedEvent.class.isAssignableFrom(typeRawClass);
        }
    }

    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        if (event instanceof ApplicationEnvironmentPreparedEvent) {
            ILoggerFactory loggerFactory = StaticLoggerBinder.getSingleton().getLoggerFactory();
            LoggerContext loggerContext = (LoggerContext) loggerFactory;

            ConfigurableEnvironment environment = ((ApplicationEnvironmentPreparedEvent) event).getEnvironment();
            // 添加LogstashAppender
            appendLogstashAppender(loggerContext, environment);
            // 添加KafkaAppender
            appendKafkaAppender(loggerContext, environment);

        }
    }

    @Override
    public int getOrder() {
        return LoggingApplicationListener.DEFAULT_ORDER + 2;
    }

    @SuppressWarnings({"rawtypes", "unchecked"})
    private void appendKafkaAppender(LoggerContext loggerContext, ConfigurableEnvironment environment) {
        String kafkaBootstrapServer = environment.getProperty(KAFKA_BOOTSTRAP_SERVERS);
        String kafkaTopic = environment.getProperty(KAFKA_TOPIC);
        if (StringUtils.isNullOrEmpty(kafkaBootstrapServer) || StringUtils.isNullOrEmpty(kafkaTopic)) {
            return;
        }
        Logger root = loggerContext.getLogger(ROOT);

        KafkaAppender kafkaAppender = new KafkaAppender();
        kafkaAppender.setContext(loggerContext);
        kafkaAppender.setName(KAFKA);
        kafkaAppender.setTopic(kafkaTopic);
        kafkaAppender.setKeyingStrategy(new com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy());
        kafkaAppender.setDeliveryStrategy(new com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy());
        kafkaAppender.addProducerConfigValue("bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS);
        kafkaAppender.addProducerConfigValue("acks", 0);
        kafkaAppender.addProducerConfigValue("linger.ms", 1000);
        kafkaAppender.addProducerConfigValue("max.block.ms", 0);

        PatternLayoutEncoder encoder = new PatternLayoutEncoder();
        encoder.setPattern(KAFKA_KAFKA_PATTERN);
        encoder.setContext(loggerContext);
        kafkaAppender.setEncoder(encoder);
        encoder.start();
        kafkaAppender.start();

        root.addAppender(kafkaAppender);
    }

    private void appendLogstashAppender(LoggerContext loggerContext, ConfigurableEnvironment environment) {
        String logstashDestination = environment.getProperty(LOGSTASH_DESTINATION);
        if (StringUtils.isNullOrEmpty(logstashDestination)) {
            return;
        }
        Logger root = loggerContext.getLogger(ROOT);
        LogstashTcpSocketAppender logstashTcpSocketAppender = new LogstashTcpSocketAppender();
        logstashTcpSocketAppender.setName(LOGSTASH);
        logstashTcpSocketAppender.addDestination(logstashDestination);
        logstashTcpSocketAppender.setContext(loggerContext);
        EnhancedLogstashEncoder encoder = new EnhancedLogstashEncoder();
        String appName = environment.getProperty(APP_NAME, DEFAULT_APP_NAME);
        encoder.setCustomFields(appName);
        encoder.setExcludeProvider("net.logstash.logback.composite.LogstashVersionJsonProvider");
        encoder.setExcludeProvider("net.logstash.logback.composite.loggingevent.JsonMessageJsonProvider");
        encoder.setExcludeProvider("net.logstash.logback.composite.loggingevent.TagsJsonProvider");
        encoder.setExcludeProvider("net.logstash.logback.composite.loggingevent.LogstashMarkersJsonProvider");
        encoder.setExcludeProvider("net.logstash.logback.composite.loggingevent.ArgumentsJsonProvider");
        encoder.setContext(loggerContext);
        encoder.start();
        logstashTcpSocketAppender.setEncoder(encoder);

        logstashTcpSocketAppender.start();
        root.addAppender(logstashTcpSocketAppender);
    }


}

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions