#input {
# stdin {}
#}
# 从filebeat接受数据
input {
beats {
port => 5044
host =>
"0.0.0.0"
}
}
filter {
# 添加一个调试的开关
mutate{add_field => {
"[@metadata][debug]"
=>true}}
grok {
# 过滤nginx日志
#match => {
"message"
=>
"%{nginxaccess_test2}"
}
#match => {
"message"
=> '%{iporhost:clientip} # (?<http_x_forwarded_for>[^\#]*) # \[%{httpdate:[@metadata][webtime]}\] # %{notspace:hostname} # %{word:verb} %{uripathparam:request} http/%{number:httpversion} # %{number:response} # (?:%{number:bytes}|-) # (?:
"(?:%{notspace:referrer}|-)"
|%{notspace:referrer}|-) # (?:
"(?<http_user_agent>[^#]*)"
) # (?:
"(?:%{number:connection}|-)"
|%{number:connection}|-) # (?:
"(?<cookies>[^#]*)"
) # %{number:request_time:float} # (?:%{number:upstream_response_time:float}|-)' }
#match => {
"message"
=> '(?:%{iporhost:clientip}|-) (?:%{two_ip:http_x_forwarded_for}|%{ipv4:http_x_forwarded_for}|-) \[%{httpdate:[@metadata][webtime]}\] (?:%{hostname:hostname}|-) %{word:method} %{uripathparam:request} http/%{number:httpversion} %{number:response} (?:%{number:bytes}|-) (?:
"(?:%{notspace:referrer}|-)"
|%{notspace:referrer}|-) %{qs:agent} (?:
"(?:%{number:connection}|-)"
|%{number:connection}|-) (?:
"(?<cookies>[^#]*)"
) %{number:request_time:float} (?:%{number:upstream_response_time:float}|-)' }
match => {
"message"
=> '(?:%{iporhost:clientip}|-) %{forword:http_x_forwarded_for} \[%{httpdate:[@metadata][webtime]}\] (?:%{hostname:hostname}|-) %{word:method} %{uripathparam:request} http/%{number:httpversion} %{number:response} (?:%{number:bytes}|-) (?:
"(?:%{notspace:referrer}|-)"
|%{notspace:referrer}|-) %{qs:agent} (?:
"(?:%{number:connection}|-)"
|%{number:connection}|-) %{qs:cookie} %{number:request_time:float} (?:%{number:upstream_response_time:float}|-)' }
}
# 将默认的@timestamp(beats收集日志的时间)的值赋值给新字段@read_tiimestamp
ruby {
#code =>
"event.set('@read_timestamp',event.get('@timestamp'))"
#将时区改为东8区
code =>
"event.set('@read_timestamp',event.get('@timestamp').time.localtime + 8*60*60)"
}
# 将nginx的日志记录时间格式化
# 格式化时间 20/may/2015:21:05:56 +0000
date
{
locale =>
"en"
match => [
"[@metadata][webtime]"
,
"dd/mmm/yyyy:hh:mm:ss z"
]
}
# 将bytes字段由字符串转换为数字
mutate {
convert => {
"bytes"
=>
"integer"
}
}
# 将cookie字段解析成一个json
#mutate {
# gsub => [
"cookies"
,'\;',',']
#}
# 如果有使用到cdn加速http_x_forwarded_for会有多个ip,第一个ip是用户真实ip
if
[http_x_forwarded_for] =~
", "
{
ruby {
code => 'event.set(
"http_x_forwarded_for"
, event.get(
"http_x_forwarded_for"
).split(
","
)[0])'
}
}
# 解析ip,获得ip的地理位置
geoip {
source =>
"http_x_forwarded_for"
# # 只获取ip的经纬度、国家、城市、时区
fields => [
"location"
,
"country_name"
,
"city_name"
,
"region_name"
]
}
# 将agent字段解析,获得浏览器、系统版本等具体信息
useragent {
source =>
"agent"
target =>
"useragent"
}
#指定要删除的数据
#mutate{remove_field=>[
"message"
]}
# 根据日志名设置索引名的前缀
ruby {
code => 'event.set(
"@[metadata][index_pre]"
,event.get(
"source"
).split(
"/"
)[-1])'
}
# 将@timestamp 格式化为2019.04.23
ruby {
code => 'event.set(
"@[metadata][index_day]"
,event.get(
"@timestamp"
).time.localtime.
strftime
(
"%y.%m.%d"
))'
}
# 设置输出的默认索引名
mutate {
add_field => {
#
"[@metadata][index]"
=>
"%{@[metadata][index_pre]}_%{+yyyy.mm.dd}"
"[@metadata][index]"
=>
"%{@[metadata][index_pre]}_%{@[metadata][index_day]}"
}
}
# 将cookies字段解析成json
# mutate {
# gsub => [
#
"cookies"
,
";"
,
","
,
#
"cookies"
,
"="
,
":"
# ]
# #split => {
"cookies"
=>
","
}
# }
# json_encode {
# source =>
"cookies"
# target =>
"cookies_json"
# }
# mutate {
# gsub => [
#
"cookies_json"
, ',', '
","
',
#
"cookies_json"
, ':', '
":"
'
# ]
# }
# json {
# source =>
"cookies_json"
# target =>
"cookies2"
# }
# 如果grok解析存在错误,将错误独立写入一个索引
if
"_grokparsefailure"
in [tags] {
#
if
"_dateparsefailure"
in [tags] {
mutate {
replace => {
#
"[@metadata][index]"
=>
"%{@[metadata][index_pre]}_failure_%{+yyyy.mm.dd}"
"[@metadata][index]"
=>
"%{@[metadata][index_pre]}_failure_%{@[metadata][index_day]}"
}
}
# 如果不存在错误就删除message
}
else
{
mutate{remove_field=>[
"message"
]}
}
}
output {
if
[@metadata][debug]{
# 输出到rubydebuyg并输出metadata
stdout{codec => rubydebug{metadata => true}}
}
else
{
# 将输出内容转换成
"."
stdout{codec => dots}
# 将输出到指定的es
elasticsearch {
hosts => [
"192.168.15.160:9200"
]
index =>
"%{[@metadata][index]}"
document_type =>
"doc"
}
}
}