RTX1210のログをelasticsearchで扱う(後編)
前回はタイトルに反してRTX1210のかけらもない内容でしたが、ここから肝心の部分。
どのような環境で行ったのか。前編はそんな位置づけ。
自分が特にバックアップ的に書きたいことは後編です。
RTX1210 => rsyslog => log-file <= logstash => elasticsearch <= kibana
このようなイメージでデータを流しますから、
まずrsyslogの設定から。
$ sudo cat /etc/rsyslog.conf | grep -v '^\s*$' | grep -v '^\s*#'
※変更箇所のみ
module(load="imudp")
input(type="imudp" port="514")
※RTX1210から受けるので、UDP/514を使います。
$AllowedSender UDP, 127.0.0.1, 192.168.0.0/24 # UDP
※自分のネットワークから受けるように。ルーターそのもののIPで良いと思います。
$ActionFileDefaultTemplate RSYSLOG_FileFormat
※タイムスタンプがDefaultだと大雑把なので少し細かくしました。
*.*;auth,authpriv.none,local0.none -/var/log/syslog
*.=info;*.=notice;*.=warn;\
auth,authpriv.none;\
cron,daemon.none;\
mail.none,local0.none -/var/log/messages
※syslogとmessageには出力しないようにしました。local0の部分。
rtx1210を単独ファイルに出力する設定も作ります。
$ sudo vim /etc/rsyslog.d/75-rtx1210.conf
local0.* -/var/log/rtx1210log.log
※センスのないファイル名は適宜修正のこと!
設定後、rsyslogを再起動すると514/UDPをリッスンしています。
$ ss -lnu4
State Recv-Q Send-Q Local Address:Port Peer Address:Port Process
UNCONN 0 0 0.0.0.0:514 0.0.0.0:*
ログローテーションも設定しました。
$ sudo vim /etc/logrotate.d/rtx1210
/var/log/rtx1210log.log {
daily
missingok
rotate 90
compress
delaycompress
create 644 root root
sharedscripts
postrotate
# /usr/bin/systemctl restart rsyslog.service > /tmp/rtx.out 2>&1
/usr/bin/systemctl restart rsyslog.service > /dev/null 2>/dev/null || true
endscript
}
コメントアウト部分は、動作がおかしいときに出力する様に書いてあります。
さて、受け側の準備が出来たので、RTX1210からsyslogを飛ばします。
syslog host 192.168.xxx.xxx ※rsyslogサーバーアドレス
syslog facility local0
syslog notice on
syslog info on
syslog debug off
上記を設定するとrsyslogで受信し始めます。
特に意識することなく設定したフィルター設定はおそらくpass設定なので、ログ出力ありません。
必要に応じてpass-logに変えましょう。
さて、捕捉するログファイル出力がここまでで完了しました。
やっとElastic Stack。
$ sudo mkdir /var/lib/logstash/sincedb/
※fileプラグインがどこまで読んだかを記録するファイルを置くディレクトリを作成します。
※無くても動作しますが、概ね設定するもののようです。
$ sudo vim /etc/logstash/conf.d/rtx.conf
input {
file {
id => "RTX1210"
path => "/var/log/rtx1210log.log"
sincedb_path => "/var/lib/logstash/sincedb/rtx_accesslog"
tags => "router"
}
}
filter {
# syslogtimestampを切り離し
grok {
patterns_dir => ["/etc/logstash/conf.d/patterns"]
match => {
"message" => [
"%{TIMESTAMP_ISO8601:time} %{ALL:message}",
"%{SYSLOGTIMESTAMP:time} %{ALL:message}"
]
}
overwrite => [ "message" ]
}
# filtter系log
grok {
patterns_dir => ["/etc/logstash/conf.d/patterns"]
match => {
"message" => [
"%{IP:syslog_host_ip} \[INSPECT\] %{RTXIF:interface_name}\[%{NOTSPACE:direction}\]\[%{NOTSPACE:filter_number}\] %{NOTSPACE:protocol} %{IP:src_addr}[:\.]%{INT:src_port} > %{IP:dest_addr}[:\.]%{INT:dest_port} \(%{TIMESTAMP_JP:start_time}\)",
"^%{IP:syslog_host_ip}*[ ]*%{RTXIF:interface_name} %{NOTSPACE:pass_reject} at %{NOTSPACE:direction}\(%{NOTSPACE:filter_number}\) filter: %{NOTSPACE:protocol} %{IP:src_addr}[:\.]*%{NOTSPACE:src_port}* > %{IP:dest_addr}[:\.]*%{NOTSPACE:dest_port}*( : )*%{RTX_MSG:comment}*$",
"^%{IP:syslog_host_ip}*[ ]*%{RTXIF:interface_name} %{NOTSPACE:pass_reject} at %{NOTSPACE:direction}\(%{NOTSPACE:filter_number}\) filter: %{NOTSPACE:protocol} %{IP:src_addr}[:\.]%{NOTSPACE:src_port} > %{IP:dest_addr}[:\.]%{NOTSPACE:dest_port} \(%{RTX_MSG:packet_type}[ )]\[*%{ZONE:dns_query}*\]*\)*$"
]
}
remove_tag => ["_grokparsefailure"]
add_tag => ["filter"]
}
# tunnel系log
if "_grokparsefailure" in [tags] {
grok {
patterns_dir => ["/etc/logstash/conf.d/patterns"]
match => {
"message" => [
"^%{IP:syslog_host_ip} \[IKE\] %{NOTSPACE:status} %{RTX_MSG:infomation} \(%{NOTSPACE:kinds}\)$",
"^%{IP:syslog_host_ip} \[IKE\] %{NOTSPACE:status} %{NOTSPACE:protocol} phase to %{IP:dest_addr}$",
"^%{IP:syslog_host_ip} \[IKE2\] %{NOTSPACE:sub prefix} %{RTX_MSG:infomation}$",
"^%{IP:syslog_host_ip} \[L2TP\] %{RTXIF:interface_name} %{RTX_MSG:infomation}$",
"^%{IP:syslog_host_ip} \[L2TP\] %{RTXIF:interface_name} %{NOTSPACE:status} from %{IP:from_addr}$",
"^IP %{RTXIF:interface_name} %{RTX_MSG:status}$",
"^%{IP:syslog_host_ip} %{RTXIF:interface_name} %{RTX_MSG:status} user \'%{RTX_USERNAME:user_name}\'$",
"^%{IP:syslog_host_ip} %{RTXIF:interface_name} %{RTX_MSG:status}: %{NOTSPACE:protocol} %{IP:src_addr}[:\.]*%{NOTSPACE:src_port}* > %{IP:dest_addr}[:\.]*%{NOTSPACE:dest_port}*( : )*%{RTX_MSG:comment}*$",
"^%{IP:syslog_host_ip} %{RTXIF:interface_name} %{NOTSPACE:protocol} up \(Local: %{IP:local_ip}, Remote: %{IP:remote_ip}\)$",
"^%{IP:syslog_host_ip} %{RTXIF:interface_name} %{NOTSPACE:protocol} %{RTX_MSG:status}$"
]
}
remove_tag => ["_grokparsefailure"]
add_tag => ["tunnel"]
}
}
# management系log
if "_grokparsefailure" in [tags] {
grok {
patterns_dir => ["/etc/logstash/conf.d/patterns"]
match => {
"message" => [
"^%{IP:syslog_host_ip} \[DHCPD\] %{RTXIF:dhcp_allocate_interface}\(%{NOTSPACE:dhcp_allocate_interface_port}\) %{NOTSPACE:transaction} %{IP:dhcp_allocate_address}: %{MAC:dhcp_allocate_mac_address}$",
"^%{IP:syslog_host_ip} \[INSPECT\] %{RTX_MSG:rtx_status} %{IP:src_addr} > %{IP:dest_addr}$",
"^%{IP:syslog_host_ip} +\[SCHEDULE\] %{RTX_CMD:command}$",
"^%{IP:syslog_host_ip} %{TIMESTAMP_JP:start_time} +%{NOTSPACE:ntp_result}$",
"^Login %{NOTSPACE:access_result} for %{NOTSPACE:protocol}: %{IP:from_addr} %{RTX_USERNAME:user_name}*",
"^%{IP:syslog_host_ip} '%{RTX_USERNAME:user_name}' %{NOTSPACE:consequence} for %{NOTSPACE:protocol}: %{IP:from_addr} %{RTX_USERNAME:logon_name}*",
"^Logout from %{NOTSPACE:protocol}: %{IP:from_addr} %{RTX_USERNAME:user_name}*",
"^Configuration saved in \"%{NOTSPACE:config_name}\" by %{NOTSPACE:protocol}\(*%{RTX_USERNAME:user_name}*\)*$"
]
}
remove_tag => ["_grokparsefailure"]
add_tag => ["manage"]
}
}
# syslogtimestampを@timestampにする
date {
match => [ "time", "ISO8601" ]
timezone => "Asia/Tokyo"
locale => "en"
}
mutate {
replace => { "time" => "%{@timestamp}" }
}
# protocolcolをサービス名で統一する ※2021年10月12日追記
if [protocol] {
if [dest_port] {
if [protocol] == "TCP" {
translate {
source => "[dest_port]"
target => "[dest_port]"
override => true
dictionary_path => "/etc/logstash/conf.d/wkp_tcp.yml"
}
}
if [protocol] == "UDP" {
translate {
source => "[dest_port]"
target => "[dest_port]"
override => true
dictionary_path => "/etc/logstash/conf.d/wkp_udp.yml"
}
}
}
}
# fqdnフィールドを作成して相手先のipを引く
if [dest_addr]{
mutate {
add_field => {
"dest_addr_fqdn" => "%{dest_addr}"
}
}
dns {
reverse => [ "dest_addr_fqdn" ]
action => "replace"
nameserver => {
address => [ "8.8.8.8", "1.1.1.1" ]
search => [ "localdomain" ]
}
hit_cache_size => 4096
hit_cache_ttl => 900
failed_cache_size => 512
failed_cache_ttl => 900
}
}
# geoipで位置情報を得る
if [src_addr]{
geoip {
source => "src_addr"
target => "src_geoip"
tag_on_failure => "_src_geoip_lookup_failure"
}
}
if ![geoip.ip] {
if [dest_addr]{
geoip {
source => "dest_addr"
target => "dest_geoip"
tag_on_failure => "_dest_geoip_lookup_failure"
remove_tag => ["_src_geoip_lookup_failure"]
}
}
}
if [from_addr]{
geoip {
source => "from_addr"
target => "from_geoip"
default_database_type => "City"
tag_on_failure => "from_geoip_lookup_failure"
remove_tag => ["_dest_geoip_lookup_failure"]
}
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "rtx-%{+YYYY.MM.dd}"
user => "elastic"
password => "elastic-pass"
}
}
こちらが苦労したrtx用のlogstashコンフィグです。
grokプラグインの中で、オリジナルパターンを使っているため、下記の様にパターンファイルを作ります。
$ sudo vim /etc/logstash/conf.d/patterns
ALL .+$
RTXIF %{NOTSPACE}((ANONYMOUS)*\[[0-9][0-9]\])*
RTX_MSG [a-zA-Z0-9 ]+
RTX_CMD [a-zA-Z0-9 -_\.]+
RTX_USERNAME [a-zA-Z0-9_-]+
ZONE [a-zA-Z0-9/. _-]+
DATE_JP %{YEAR}[/-]%{MONTHNUM}[/-]%{MONTHDAY}
TIMESTAMP_JP %{DATE_JP}[ ,]%{TIME}
※2021/10/12日追記
プロトコル表記がサービス名とポート番号と混合して出力されるため、443/TCPとhttpsのように同じ意味の項目が区分されて扱われていたので、サービス名に置き換えるtranslateフィルター設定を追加しました。
80/TCPはサービス名で統一してhttpとしたかったのですが、rtxからwwwで出力されるログがあるのでwwwとしました。(番号出力への切替方法をご存じの方、教えてください。)
dictionary fileは次の通りです。
/etc/logstash/conf.d/wkp_tcp.yml
[
"443": "https",
"80": "http",
"995": "pop3s",
"993": "imaps",
"465": "smtps",
"123": "ntp",
"22": "ssh",
"21": "ftp",
"20": "ftp-data",
"23": ":telnet"
]
/etc/logstash/conf.d/wkp_udp.yml
[
"53": "domain",
"443": "https",
"546": "dhcpv6-client",
"547": "dhcpv6-server",
"123": "ntp",
"137": "netbios-ns",
"138": "netbios-dgm",
"139": "netbios-ssn"
]
※並びが妙なことになっているのは出力が大手あろう順にしてみたからです。
※2021/10/12追記ここまで
logstashの再起動はelasticsearchにインデックステンプレートを作成してからです。
右も左もわからない私はここで大分苦戦していました。
猫でも出来そうな手順で示します。
1.kibanaで、Management->スタック管理
2.データのインデックス管理から、インデックステンプレートを選び、テンプレートを作成
3.適宜名前をつけ、インデックスパターンを設定します。インデックスパターンは上記のコンフィグファイルの場合、rtx-*となります。
4.コンポーネントテンプレートはそのまま次へ
5.インデックス設定は下記
{
"index": {
"number_of_shards": "1",
"refresh_interval": "5s"
}
}
これはlogstashのテンプレートをベースにしています。
6.マッピングではJSONの読み込みを押してから下記内容を記入します。
{
"_doc": {
"dynamic_templates": [
{
"message_field": {
"path_match": "message",
"mapping": {
"norms": false,
"type": "text"
},
"match_mapping_type": "string"
}
},
{
"string_fields": {
"mapping": {
"norms": false,
"type": "text",
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword"
}
}
},
"match_mapping_type": "string",
"match": "*"
}
}
],
"properties": {
"@timestamp": {
"type": "date"
},
"geoip": {
"dynamic": true,
"properties": {
"ip": {
"type": "ip"
},
"latitude": {
"type": "half_float"
},
"location": {
"type": "geo_point"
},
"longitude": {
"type": "half_float"
}
}
},
"src_geoip": {
"dynamic": true,
"properties": {
"ip": {
"type": "ip"
},
"latitude": {
"type": "half_float"
},
"location": {
"type": "geo_point"
},
"longitude": {
"type": "half_float"
}
}
},
"dest_geoip": {
"dynamic": true,
"properties": {
"ip": {
"type": "ip"
},
"latitude": {
"type": "half_float"
},
"location": {
"type": "geo_point"
},
"longitude": {
"type": "half_float"
}
}
},
"from_geoip": {
"dynamic": true,
"properties": {
"ip": {
"type": "ip"
},
"latitude": {
"type": "half_float"
},
"location": {
"type": "geo_point"
},
"longitude": {
"type": "half_float"
}
}
},
"@version": {
"type": "keyword"
}
}
}
}
よく見ていただくとわかりますが、geoip項目をlogstashコンフィグで設定したロケーションを示すフィールド名で同様に定義します。
インデックステンプレートは別冊の方ですが、logstashについて網羅的に情報を得たいときは書籍を買うのが手っ取り早いです。なかなかのお値段ですが、たぶん役立つと思います。
これで一通り設定が出来たので、logstashを再起動しますと
データが流れ込んでくるはずです。
こうしてログを見やすくしていると、rtxそのものの設定ミスなどがちらほらわかってくるものですね。