RTX1210のログをelasticsearchで扱う(後編)

View_RTX_logs_on_kibana

View RTX logs on kibana

前回はタイトルに反してRTX1210のかけらもない内容でしたが、ここから肝心の部分。
どのような環境で行ったのか。前編はそんな位置づけ。
自分が特にバックアップ的に書きたいことは後編です。

RTX1210 => rsyslog => log-file <= logstash => elasticsearch <= kibana

このようなイメージでデータを流しますから、
まずrsyslogの設定から。

$ sudo cat /etc/rsyslog.conf | grep -v '^\s*$' | grep -v '^\s*#'
※変更箇所のみ
module(load="imudp")
input(type="imudp" port="514")
※RTX1210から受けるので、UDP/514を使います。

$AllowedSender UDP, 127.0.0.1, 192.168.0.0/24   # UDP
※自分のネットワークから受けるように。ルーターそのもののIPで良いと思います。

$ActionFileDefaultTemplate RSYSLOG_FileFormat
※タイムスタンプがDefaultだと大雑把なので少し細かくしました。

*.*;auth,authpriv.none,local0.none              -/var/log/syslog
*.=info;*.=notice;*.=warn;\
        auth,authpriv.none;\
        cron,daemon.none;\
        mail.none,local0.none           -/var/log/messages
※syslogとmessageには出力しないようにしました。local0の部分。

rtx1210を単独ファイルに出力する設定も作ります。

$ sudo vim /etc/rsyslog.d/75-rtx1210.conf
local0.*           -/var/log/rtx1210log.log
※センスのないファイル名は適宜修正のこと!

設定後、rsyslogを再起動すると514/UDPをリッスンしています。

$ ss -lnu4
State          Recv-Q         Send-Q                 Local Address:Port                  Peer Address:Port         Process         
UNCONN         0              0                            0.0.0.0:514                        0.0.0.0:*                      

ログローテーションも設定しました。

$ sudo vim /etc/logrotate.d/rtx1210
/var/log/rtx1210log.log {
    daily
    missingok
    rotate 90
    compress
    delaycompress
    create 644 root root
    sharedscripts
    postrotate
#    /usr/bin/systemctl restart rsyslog.service > /tmp/rtx.out 2>&1
    /usr/bin/systemctl restart rsyslog.service > /dev/null 2>/dev/null || true
    endscript
}

コメントアウト部分は、動作がおかしいときに出力する様に書いてあります。

さて、受け側の準備が出来たので、RTX1210からsyslogを飛ばします。

syslog host 192.168.xxx.xxx ※rsyslogサーバーアドレス
syslog facility local0
syslog notice on
syslog info on
syslog debug off

上記を設定するとrsyslogで受信し始めます。
特に意識することなく設定したフィルター設定はおそらくpass設定なので、ログ出力ありません。
必要に応じてpass-logに変えましょう。

さて、捕捉するログファイル出力がここまでで完了しました。
やっとElastic Stack。

$ sudo mkdir /var/lib/logstash/sincedb/
※fileプラグインがどこまで読んだかを記録するファイルを置くディレクトリを作成します。
※無くても動作しますが、概ね設定するもののようです。

$ sudo vim /etc/logstash/conf.d/rtx.conf
input {
	file {
		id => "RTX1210"
		path => "/var/log/rtx1210log.log"
		sincedb_path => "/var/lib/logstash/sincedb/rtx_accesslog"
		tags => "router"
	}
}

filter {

# syslogtimestampを切り離し
    grok {
		patterns_dir => ["/etc/logstash/conf.d/patterns"]
        match => { 
			"message" => [
				"%{TIMESTAMP_ISO8601:time} %{ALL:message}",
				"%{SYSLOGTIMESTAMP:time} %{ALL:message}"
			]
		}
        overwrite =>  [ "message" ]
    }

# filtter系log
    grok {
		patterns_dir => ["/etc/logstash/conf.d/patterns"]
        match => {
            "message" => [
				"%{IP:syslog_host_ip}  \[INSPECT\] %{RTXIF:interface_name}\[%{NOTSPACE:direction}\]\[%{NOTSPACE:filter_number}\] %{NOTSPACE:protocol} %{IP:src_addr}[:\.]%{INT:src_port} > %{IP:dest_addr}[:\.]%{INT:dest_port} \(%{TIMESTAMP_JP:start_time}\)",
				"^%{IP:syslog_host_ip}*[ ]*%{RTXIF:interface_name} %{NOTSPACE:pass_reject} at %{NOTSPACE:direction}\(%{NOTSPACE:filter_number}\) filter: %{NOTSPACE:protocol} %{IP:src_addr}[:\.]*%{NOTSPACE:src_port}* > %{IP:dest_addr}[:\.]*%{NOTSPACE:dest_port}*( : )*%{RTX_MSG:comment}*$",
				"^%{IP:syslog_host_ip}*[ ]*%{RTXIF:interface_name} %{NOTSPACE:pass_reject} at %{NOTSPACE:direction}\(%{NOTSPACE:filter_number}\) filter: %{NOTSPACE:protocol} %{IP:src_addr}[:\.]%{NOTSPACE:src_port} > %{IP:dest_addr}[:\.]%{NOTSPACE:dest_port} \(%{RTX_MSG:packet_type}[ )]\[*%{ZONE:dns_query}*\]*\)*$"
            ]
        }
		remove_tag => ["_grokparsefailure"]
		add_tag => ["filter"]
    }

# tunnel系log
	if "_grokparsefailure" in [tags] {
    	grok {
			patterns_dir => ["/etc/logstash/conf.d/patterns"]
        	match => {
            	"message" => [
					"^%{IP:syslog_host_ip}  \[IKE\] %{NOTSPACE:status} %{RTX_MSG:infomation} \(%{NOTSPACE:kinds}\)$",
					"^%{IP:syslog_host_ip}  \[IKE\] %{NOTSPACE:status} %{NOTSPACE:protocol} phase to %{IP:dest_addr}$",
					"^%{IP:syslog_host_ip}  \[IKE2\] %{NOTSPACE:sub prefix} %{RTX_MSG:infomation}$",
					"^%{IP:syslog_host_ip}  \[L2TP\] %{RTXIF:interface_name} %{RTX_MSG:infomation}$",
					"^%{IP:syslog_host_ip}  \[L2TP\] %{RTXIF:interface_name} %{NOTSPACE:status} from %{IP:from_addr}$",
					"^IP %{RTXIF:interface_name} %{RTX_MSG:status}$",
					"^%{IP:syslog_host_ip} %{RTXIF:interface_name} %{RTX_MSG:status} user \'%{RTX_USERNAME:user_name}\'$",
					"^%{IP:syslog_host_ip} %{RTXIF:interface_name} %{RTX_MSG:status}: %{NOTSPACE:protocol} %{IP:src_addr}[:\.]*%{NOTSPACE:src_port}* > %{IP:dest_addr}[:\.]*%{NOTSPACE:dest_port}*( : )*%{RTX_MSG:comment}*$",
					"^%{IP:syslog_host_ip} %{RTXIF:interface_name} %{NOTSPACE:protocol} up  \(Local: %{IP:local_ip}, Remote: %{IP:remote_ip}\)$",
					"^%{IP:syslog_host_ip} %{RTXIF:interface_name} %{NOTSPACE:protocol} %{RTX_MSG:status}$"
            	]
        	}
			remove_tag => ["_grokparsefailure"]
			add_tag => ["tunnel"]
    	}
	}

# management系log
	if "_grokparsefailure" in [tags] {
		grok {
			patterns_dir => ["/etc/logstash/conf.d/patterns"]
			match => {
				"message" => [
					"^%{IP:syslog_host_ip}  \[DHCPD\] %{RTXIF:dhcp_allocate_interface}\(%{NOTSPACE:dhcp_allocate_interface_port}\) %{NOTSPACE:transaction} %{IP:dhcp_allocate_address}: %{MAC:dhcp_allocate_mac_address}$",
					"^%{IP:syslog_host_ip}  \[INSPECT\] %{RTX_MSG:rtx_status} %{IP:src_addr} > %{IP:dest_addr}$",
					"^%{IP:syslog_host_ip} +\[SCHEDULE\] %{RTX_CMD:command}$",
					"^%{IP:syslog_host_ip} %{TIMESTAMP_JP:start_time} +%{NOTSPACE:ntp_result}$",
					"^Login %{NOTSPACE:access_result} for %{NOTSPACE:protocol}: %{IP:from_addr} %{RTX_USERNAME:user_name}*",
					"^%{IP:syslog_host_ip} '%{RTX_USERNAME:user_name}' %{NOTSPACE:consequence} for %{NOTSPACE:protocol}: %{IP:from_addr} %{RTX_USERNAME:logon_name}*",
					"^Logout from %{NOTSPACE:protocol}: %{IP:from_addr} %{RTX_USERNAME:user_name}*",
					"^Configuration saved in \"%{NOTSPACE:config_name}\" by %{NOTSPACE:protocol}\(*%{RTX_USERNAME:user_name}*\)*$"
				]
			}
			remove_tag => ["_grokparsefailure"]
			add_tag => ["manage"]
		}
	}

# syslogtimestampを@timestampにする
    date {
      match => [ "time", "ISO8601" ]
      timezone => "Asia/Tokyo"
      locale => "en"
    }

    mutate {
      replace => { "time" => "%{@timestamp}" }
    }

# protocolcolをサービス名で統一する ※2021年10月12日追記
	if [protocol] {
		if [dest_port] {
			if [protocol] == "TCP" {
				translate {
					source => "[dest_port]"
					target => "[dest_port]"
					override => true
					dictionary_path => "/etc/logstash/conf.d/wkp_tcp.yml"
				}
			}
			if [protocol] == "UDP" {
				translate {
					source => "[dest_port]"
					target => "[dest_port]"
					override => true
					dictionary_path => "/etc/logstash/conf.d/wkp_udp.yml"
				}
			}
		}
	}

# fqdnフィールドを作成して相手先のipを引く
	if [dest_addr]{
		mutate {
			add_field => {
			"dest_addr_fqdn" => "%{dest_addr}"
			}
		}
		dns {
			reverse => [ "dest_addr_fqdn" ]
			action => "replace"
            nameserver => {
                address => [ "8.8.8.8", "1.1.1.1" ]
                search => [ "localdomain" ]
            }
			hit_cache_size => 4096
			hit_cache_ttl => 900
			failed_cache_size => 512
			failed_cache_ttl => 900
		}
	}

# geoipで位置情報を得る
    if [src_addr]{
        geoip {
        	source => "src_addr"
        	target => "src_geoip"
			tag_on_failure => "_src_geoip_lookup_failure"
        }
    }
	if ![geoip.ip] {
		if [dest_addr]{
        	geoip {
            	source => "dest_addr"
            	target => "dest_geoip"
				tag_on_failure => "_dest_geoip_lookup_failure"
				remove_tag => ["_src_geoip_lookup_failure"]
        	}
    	}
	}

    if [from_addr]{
        geoip {
            source => "from_addr"
            target => "from_geoip"
			default_database_type => "City"
			tag_on_failure => "from_geoip_lookup_failure"
			remove_tag => ["_dest_geoip_lookup_failure"]
        }
    }

}

output {
	elasticsearch {
		hosts => ["http://localhost:9200"]
		index => "rtx-%{+YYYY.MM.dd}"
		user => "elastic"
		password => "elastic-pass"
	}
}

こちらが苦労したrtx用のlogstashコンフィグです。
grokプラグインの中で、オリジナルパターンを使っているため、下記の様にパターンファイルを作ります。

$ sudo vim /etc/logstash/conf.d/patterns
ALL .+$
RTXIF %{NOTSPACE}((ANONYMOUS)*\[[0-9][0-9]\])*
RTX_MSG [a-zA-Z0-9 ]+
RTX_CMD [a-zA-Z0-9 -_\.]+
RTX_USERNAME [a-zA-Z0-9_-]+
ZONE [a-zA-Z0-9/. _-]+
DATE_JP %{YEAR}[/-]%{MONTHNUM}[/-]%{MONTHDAY}
TIMESTAMP_JP %{DATE_JP}[ ,]%{TIME}

※2021/10/12日追記
プロトコル表記がサービス名とポート番号と混合して出力されるため、443/TCPとhttpsのように同じ意味の項目が区分されて扱われていたので、サービス名に置き換えるtranslateフィルター設定を追加しました。
80/TCPはサービス名で統一してhttpとしたかったのですが、rtxからwwwで出力されるログがあるのでwwwとしました。(番号出力への切替方法をご存じの方、教えてください。)
dictionary fileは次の通りです。

/etc/logstash/conf.d/wkp_tcp.yml
[
"443": "https",
"80": "http",
"995": "pop3s",
"993": "imaps",
"465": "smtps",
"123": "ntp",
"22": "ssh",
"21": "ftp",
"20": "ftp-data",
"23": ":telnet"
]

/etc/logstash/conf.d/wkp_udp.yml
[
"53": "domain",
"443": "https",
"546": "dhcpv6-client",
"547": "dhcpv6-server",
"123": "ntp",
"137": "netbios-ns",
"138": "netbios-dgm",
"139": "netbios-ssn"
]

※並びが妙なことになっているのは出力が大手あろう順にしてみたからです。
※2021/10/12追記ここまで

logstashの再起動はelasticsearchにインデックステンプレートを作成してからです。
右も左もわからない私はここで大分苦戦していました。

猫でも出来そうな手順で示します。

1.kibanaで、Management->スタック管理
2.データのインデックス管理から、インデックステンプレートを選び、テンプレートを作成
3.適宜名前をつけ、インデックスパターンを設定します。インデックスパターンは上記の紺フィルファイルの場合、rtx-*となります。
4.コンポーネントテンプレートはそのまま次へ
5.インデックス設定は下記

{
  "index": {
    "number_of_shards": "1",
    "refresh_interval": "5s"
  }
}

これはlogstashのテンプレートをベースにしています。
6.マッピングではJSONの読み込みを押してから下記内容を記入します。

{
  "_doc": {
    "dynamic_templates": [
      {
        "message_field": {
          "path_match": "message",
          "mapping": {
            "norms": false,
            "type": "text"
          },
          "match_mapping_type": "string"
        }
      },
      {
        "string_fields": {
          "mapping": {
            "norms": false,
            "type": "text",
            "fields": {
              "keyword": {
                "ignore_above": 256,
                "type": "keyword"
              }
            }
          },
          "match_mapping_type": "string",
          "match": "*"
        }
      }
    ],
    "properties": {
      "@timestamp": {
        "type": "date"
      },
      "geoip": {
        "dynamic": true,
        "properties": {
          "ip": {
            "type": "ip"
          },
          "latitude": {
            "type": "half_float"
          },
          "location": {
            "type": "geo_point"
          },
          "longitude": {
            "type": "half_float"
          }
        }
      },
      "src_geoip": {
        "dynamic": true,
        "properties": {
          "ip": {
            "type": "ip"
          },
          "latitude": {
            "type": "half_float"
          },
          "location": {
            "type": "geo_point"
          },
          "longitude": {
            "type": "half_float"
          }
        }
      },
      "dest_geoip": {
        "dynamic": true,
        "properties": {
          "ip": {
            "type": "ip"
          },
          "latitude": {
            "type": "half_float"
          },
          "location": {
            "type": "geo_point"
          },
          "longitude": {
            "type": "half_float"
          }
        }
      },
      "from_geoip": {
        "dynamic": true,
        "properties": {
          "ip": {
            "type": "ip"
          },
          "latitude": {
            "type": "half_float"
          },
          "location": {
            "type": "geo_point"
          },
          "longitude": {
            "type": "half_float"
          }
        }
      },
      "@version": {
        "type": "keyword"
      }
    }
  }
}

よく見ていただくとわかりますが、geoip項目をlogstashコンフィグで設定したロケーションを示すフィールド名で同様に定義します。
インデックステンプレートは別冊の方ですが、logstashについて網羅的に情報を得たいときは書籍を買うのが手っ取り早いです。なかなかのお値段ですが、たぶん役立つと思います。

これで一通り設定が出来たので、logstashを再起動しますと
データが流れ込んでくるはずです。

kibanaで様子を見る

こうしてログを見やすくしていると、rtxそのものの設定ミスなどがちらほらわかってくるものですね。

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です