Collect nginx logs through filebeat and transfer them to logstash. After processing by logstash, they are written to elasticsearch. Filebeat is only responsible for collection work, while logstash completes log formatting, data replacement, splitting, and creation of indexes after writing logs to elasticsearch.
1. Configure nginx log format
log_format main '$remote_addr $http_x_forwarded_for [$time_local] $server_name $request ' '$status $body_bytes_sent $http_referer ' '"$http_user_agent" ' '"$connection" ' '"$http_cookie" ' '$request_time ' '$upstream_response_time';
2. Install and configure filebeat, enable nginx module
tar -zxvf filebeat-6.2.4-linux-x86_64.tar.gz -c /usr/local cd /usr/local;ln -s filebeat-6.2.4-linux-x86_64 filebeat cd /usr/local/filebeat
Enable nginx module
./filebeat modules enable nginx
View module
./filebeat modules list
Create configuration file
vim /usr/local/filebeat/blog_module_logstash.yml filebeat.modules: - module: nginx access: enabled: true var.paths: ["/home/weblog/blog.cnfol.com_access.log"] #error: # enabled: true # var.paths: ["/home/weblogerr/blog.cnfol.com_error.log"] output.logstash: hosts: ["192.168.15.91:5044"]
Start filebeat
./filebeat -c blog_module_logstash.yml -e
3. Configure logstash
tar -zxvf logstash-6.2.4.tar.gz /usr/local cd /usr/local;ln -s logstash-6.2.4 logstash 创建一个nginx日志的pipline文件 cd /usr/local/logstash
logstash built-in template directory
vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns
Edit grok-patterns to add a regular pattern that supports multiple IPs
forword (?:%{ipv4}[,]?[ ]?)+|%{word}
Official grok
#Create logstash pipline Configuration file
#input { # stdin {} #} # 从filebeat接受数据 input { beats { port => 5044 host => "0.0.0.0" } } filter { # 添加一个调试的开关 mutate{add_field => {"[@metadata][debug]"=>true}} grok { # 过滤nginx日志 #match => { "message" => "%{nginxaccess_test2}" } #match => { "message" => '%{iporhost:clientip} # (?<http_x_forwarded_for>[^\#]*) # \[%{httpdate:[@metadata][webtime]}\] # %{notspace:hostname} # %{word:verb} %{uripathparam:request} http/%{number:httpversion} # %{number:response} # (?:%{number:bytes}|-) # (?:"(?:%{notspace:referrer}|-)"|%{notspace:referrer}|-) # (?:"(?<http_user_agent>[^#]*)") # (?:"(?:%{number:connection}|-)"|%{number:connection}|-) # (?:"(?<cookies>[^#]*)") # %{number:request_time:float} # (?:%{number:upstream_response_time:float}|-)' } #match => { "message" => '(?:%{iporhost:clientip}|-) (?:%{two_ip:http_x_forwarded_for}|%{ipv4:http_x_forwarded_for}|-) \[%{httpdate:[@metadata][webtime]}\] (?:%{hostname:hostname}|-) %{word:method} %{uripathparam:request} http/%{number:httpversion} %{number:response} (?:%{number:bytes}|-) (?:"(?:%{notspace:referrer}|-)"|%{notspace:referrer}|-) %{qs:agent} (?:"(?:%{number:connection}|-)"|%{number:connection}|-) (?:"(?<cookies>[^#]*)") %{number:request_time:float} (?:%{number:upstream_response_time:float}|-)' } match => { "message" => '(?:%{iporhost:clientip}|-) %{forword:http_x_forwarded_for} \[%{httpdate:[@metadata][webtime]}\] (?:%{hostname:hostname}|-) %{word:method} %{uripathparam:request} http/%{number:httpversion} %{number:response} (?:%{number:bytes}|-) (?:"(?:%{notspace:referrer}|-)"|%{notspace:referrer}|-) %{qs:agent} (?:"(?:%{number:connection}|-)"|%{number:connection}|-) %{qs:cookie} %{number:request_time:float} (?:%{number:upstream_response_time:float}|-)' } } # 将默认的@timestamp(beats收集日志的时间)的值赋值给新字段@read_tiimestamp ruby { #code => "event.set('@read_timestamp',event.get('@timestamp'))" #将时区改为东8区 code => "event.set('@read_timestamp',event.get('@timestamp').time.localtime + 8*60*60)" } # 将nginx的日志记录时间格式化 # 格式化时间 20/may/2015:21:05:56 +0000 date { locale => "en" match => ["[@metadata][webtime]","dd/mmm/yyyy:hh:mm:ss z"] } # 将bytes字段由字符串转换为数字 mutate { convert => {"bytes" => "integer"} } # 将cookie字段解析成一个json #mutate { # gsub => ["cookies",'\;',','] #} # 如果有使用到cdn加速http_x_forwarded_for会有多个ip,第一个ip是用户真实ip if[http_x_forwarded_for] =~ ", "{ ruby { code => 'event.set("http_x_forwarded_for", event.get("http_x_forwarded_for").split(",")[0])' } } # 解析ip,获得ip的地理位置 geoip { source => "http_x_forwarded_for" # # 只获取ip的经纬度、国家、城市、时区 fields => ["location","country_name","city_name","region_name"] } # 将agent字段解析,获得浏览器、系统版本等具体信息 useragent { source => "agent" target => "useragent" } #指定要删除的数据 #mutate{remove_field=>["message"]} # 根据日志名设置索引名的前缀 ruby { code => 'event.set("@[metadata][index_pre]",event.get("source").split("/")[-1])' } # 将@timestamp 格式化为2019.04.23 ruby { code => 'event.set("@[metadata][index_day]",event.get("@timestamp").time.localtime.strftime("%y.%m.%d"))' } # 设置输出的默认索引名 mutate { add_field => { #"[@metadata][index]" => "%{@[metadata][index_pre]}_%{+yyyy.mm.dd}" "[@metadata][index]" => "%{@[metadata][index_pre]}_%{@[metadata][index_day]}" } } # 将cookies字段解析成json # mutate { # gsub => [ # "cookies", ";", ",", # "cookies", "=", ":" # ] # #split => {"cookies" => ","} # } # json_encode { # source => "cookies" # target => "cookies_json" # } # mutate { # gsub => [ # "cookies_json", ',', '","', # "cookies_json", ':', '":"' # ] # } # json { # source => "cookies_json" # target => "cookies2" # } # 如果grok解析存在错误,将错误独立写入一个索引 if "_grokparsefailure" in [tags] { #if "_dateparsefailure" in [tags] { mutate { replace => { #"[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{+yyyy.mm.dd}" "[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{@[metadata][index_day]}" } } # 如果不存在错误就删除message }else{ mutate{remove_field=>["message"]} } } output { if [@metadata][debug]{ # 输出到rubydebuyg并输出metadata stdout{codec => rubydebug{metadata => true}} }else{ # 将输出内容转换成 "." stdout{codec => dots} # 将输出到指定的es elasticsearch { hosts => ["192.168.15.160:9200"] index => "%{[@metadata][index]}" document_type => "doc" } } }
Start logstash
nohup bin/logstash -f test_pipline2.conf &
The above is the detailed content of How to import nginx logs into elasticsearch. For more information, please follow other related articles on the PHP Chinese website!