기업 시스템의 규모가 지속적으로 확장됨에 따라 시스템 로그는 점점 더 커지고 있으며, 안정적인 로그 수집 및 분석 시스템이 없으면 시스템을 효과적으로 모니터링하고 유지 관리하기가 어렵습니다. 이번 글에서는 Spring Boot와 Flume을 기반으로 효율적인 로그 수집 및 분석 시스템을 구축하는 방법을 소개합니다. PREREACESITES 시작하기 전에 다음 소프트웨어를 설치하고 설정해야합니다 이상 버전
Spring Boot 애플리케이션 구성
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency>
# 应用端口号 server.port=8080 # log4j2配置 logging.config=classpath:log4j2.xml # flume配置 flume.agentName=myflume flume.sourceType=avro flume.clientType=load-balancing flume.hosts=localhost:41414 # elasticsearch配置 spring.elasticsearch.rest.uris=http://localhost:9200
@Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true) public class FlumeAppender extends AbstractAppender { private static final ObjectMapper MAPPER = new ObjectMapper(); private final FlumeClient client; private final String sourceType; protected FlumeAppender(String name, Filter filter, Layout<? extends Serializable> layout, FlumeClient client, String sourceType) { super(name, filter, layout, true); this.client = client; this.sourceType = sourceType; } @PluginFactory public static FlumeAppender createAppender(@PluginAttr("name") String name, @PluginElement("Filters") Filter filter, @PluginElement("Layout") Layout<? extends Serializable> layout, @PluginAttr("sourceType") String sourceType, @PluginAttr("hosts") String hosts) { if (name == null) { LOGGER.error("FlumeAppender missing name"); return null; } if (client == null) { LOGGER.error("FlumeAppender missing client"); return null; } return new FlumeAppender(name, filter, layout, createClient(hosts), sourceType); } private static FlumeClient createClient(String hosts) { LoadBalancingRpcClient rpcClient = new LoadBalancingRpcClient(); String[] hostArray = hosts.split(","); for (String host : hostArray) { String[] hostParts = host.split(":"); rpcClient.addHost(new InetSocketAddress(hostParts[0], Integer.parseInt(hostParts[1]))); } Properties props = new Properties(); props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "default_loadbalance"); props.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, hosts); props.setProperty(RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF, "10000"); AvroEventSerializer serializer = new AvroEventSerializer(); serializer.configure(props, false); return new FlumeClient(rpcClient, serializer); } @Override public void append(LogEvent event) { try { byte[] body = ((StringLayout) this.getLayout()).toByteArray(event); Map<String, String> headers = new HashMap<>(); headers.put("timestamp", Long.toString(event.getTimeMillis())); headers.put("source", "log4j"); headers.put("sourceType", sourceType); Event flumeEvent = EventBuilder.withBody(body, headers); client.sendEvent(flumeEvent); } catch (Exception e) { LOGGER.error("Failed to send event to Flume", e); } } }
<?xml version="1.0" encoding="UTF-8"?> <Configuration> <Appenders> <Flume name="flume" sourceType="spring-boot" hosts="${flume.hosts}"> <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/> </Flume> </Appenders> <Loggers> <Root level="info"> <AppenderRef ref="flume"/> </Root> </Loggers> </Configuration>
Flume 구성
# Define the agent name and the agent sources and sinks myflume.sources = mysource myflume.sinks = mysink myflume.channels = channel1 # Define the source myflume.sources.mysource.type = avro myflume.sources.mysource.bind = myflume.sources.mysource.port = 41414 # Define the channel myflume.channels.channel1.type = memory myflume.channels.channel1.capacity = 10000 myflume.channels.channel1.transactionCapacity = 1000 # Define the sink myflume.sinks.mysink.type = org.elasticsearch.hadoop.flume.ElasticsearchSink myflume.sinks.mysink.hostNames = localhost:9200 myflume.sinks.mysink.indexName = ${type}-%{+YYYY.MM.dd} myflume.sinks.mysink.batchSize = 1000 myflume.sinks.mysink.typeName = ${type} # Link the source and sink with the channel myflume.sources.mysource.channels = channel1 myflume.sinks.mysink.channel = channel1
Elasticsearch 및 Kibana 구성
마지막으로 Elasticsearch 및 Kibana를 구성해야 합니다. Elasticsearch에서는 Flume 구성 파일에 정의된 인덱스 이름과 일치하는 인덱스를 생성해야 합니다.결론
이 글에서는 Spring Boot와 Flume을 활용하여 효율적인 로그 수집 및 분석 시스템을 구축하는 방법을 소개했습니다. 우리는 애플리케이션의 로그 이벤트를 Flume 서버로 보내기 위해 사용자 정의 log4j2 Appender를 구현했고, 로그 분석 및 시각화를 위해 Elasticsearch와 Kibana를 사용했습니다. 이 글이 여러분만의 로그 수집 및 분석 시스템을 구축하는 데 도움이 되기를 바랍니다.위 내용은 Spring Boot와 Flume을 기반으로 로그 수집 및 분석 시스템 구축의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!