UTMログの取り込み対応時にあったこと
ログ収集基盤をこれまで作成し、色々収集対象を増やしていってるところなんですが、Sophos UTM9の追加を行った際に色々不具合が出てきました。
- grokプラグインでパースできたのに、tagsに「_grokparsefailure」が残ってる
- @timestampがログ収集時点のタイムスタンプになっているため、ログ再取り込みをしたときに時系列が狂う
今回、これの対応を行いました。
「_grokparsefailure」が残る
ログのマッチングパターンの作成を行い、適合するかどうかを調べていたのですが、こんな出力が出ることが有りました。
{ "severity" => "info", "@timestamp" => 2018-03-23T07:06:31.506Z, "otherparam" => "srcport=\"*****\" dstport=\"*****\" tcpflags=\"ACK PSH\" ", "component" => "firewall", "out-interface" => "eth1", "dstaddr" => "***.***.***.***", "srcaddr_geoip" => {}, "@version" => "1", "utm" => "meiji", "ttl" => { "\\" => "62" }, "description" => "AFC Alert", "host" => "Ganesa", "appnum" => "****", "srcmac" => "00:50:56:**:**:**", "tags" => [ [0] "_grokparsefailure", [1] "_geoip_lookup_failure" ]
ちゃんとgrokでparseできているのに「_grokparsefailure」が残ってる・・・・これが残ってると、最後の条件分岐で「drop」の対象になってしまうため、コレをそのまま処理流すとElasticsearchに取り込まれません。これは困った。
というわけで、grokでparseできた場合にちゃんと「_grokparsefailure」が残らないよう明示的な削除処理を加えることにしました。記載例としてはこんな感じです。
grok { match => [ "message" , "<適合パターンA>", "message" , "<適合パターンB>", "message" , "<適合パターンC>" ] add_field => { "component" => "firewall" "syslog_received_at" => "%{@timestamp}" } remove_tag => ["_grokparsefailure"] }
太字箇所が該当します。これでtagsの内容から「_grokparsefailure」が消えます。
タイムスタンプをSyslogタイムスタンプに変更する
_grokparsefailureの問題から、ログの取り直しが複数回発生しました。取り直し手順は後述するんですが、これやったときにすべてのログのタイムスタンプが「logstashでの取り込み時点」となってしまい、正常な時系列が表現できません。
そこで、${SYSLOGTIMESTAMP:time}を@timestampとするように修正を行いました。
当該処理を組むに当たり、以下URLの情報を参考とさせていただきました。<(_ _)>
Logstash cheat sheet
http://www.egrep.jp/wiki/index.php/Logstash_cheat_sheet
まず、各grok文に「syslog_received_at」と言うフィールドを追加し、いつlogstashで取り込んだかの情報を格納できるようにします。先述した内容と二重記載になりますが、以下記載します。
grok { match => [ "message" , "<適合パターンA>", "message" , "<適合パターンB>", "message" , "<適合パターンC>" ] add_field => { "component" => "firewall" "syslog_received_at" => "%{@timestamp}" } remove_tag => ["_grokparsefailure"] }
続いて、変換を行います。
date { match => [ "time", "MMM d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ] timezone => "Asia/Tokyo" locale => "en" } mutate { replace => { "time" => "%{@timestamp}" } }
上記はgrok文と同列に置きます。つまりは、
filter { if [type] == "utm-message" { grok { <grokによる処理> } if "_grokparsefailure" in [tags] { grok { <grokによる処理> } } if "_grokparsefailure" in [tags] { drop {} } date { match => [ "time", "MMM d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ] timezone => "Asia/Tokyo" locale => "en" } mutate { replace => { "time" => "%{@timestamp}" } } } }
こんな感じ。
これで、SYSLOGTIMESTAMPで認識した日付時刻が@timestampとして扱われるようになり、ログの再取り込みをしても時間の不整合が起きないようにすることができます。
ログの再取り込み
ログの再取り込みですが、logstashで読み取り位置をbegginingにしている場合、「どこまでログを取り込んだか」の情報がlogstashのデータとして格納されており、コレを削除する必要があります。
■サービスの停止 # service logstash stop ■ログ読み込みポインタの削除 # cd /var/lib/logstash/plugins/inputs/file/ # ls -al 合計 17 drwxr-xr-x+ 2 logstash logstash 8 3月 25 18:25 . drwxr-xr-x+ 3 logstash logstash 3 3月 8 16:41 .. -rw-r--r--+ 1 logstash logstash 23 3月 25 18:25 .sincedb_**************** -rw-r--r--+ 1 logstash logstash 23 3月 25 18:25 .sincedb_**************** -rw-r--r--+ 1 logstash logstash 23 3月 25 18:25 .sincedb_**************** -rw-r--r--+ 1 logstash logstash 23 3月 25 18:25 .sincedb_**************** -rw-r--r--+ 1 logstash logstash 24 3月 25 18:25 .sincedb_**************** -rw-r--r--+ 1 logstash logstash 23 3月 25 18:25 .sincedb_**************** 上記の.sincedb_から始まるすべてのファイルを削除します。 # rm -f .sincedb_*
コレで読み込み位置がリセットされましたので、次にelasticsearch内のログデータを削除します。
# curl -XDELETE 'http://localhost:9200/<削除対象インデックス名>'
これを、ログインデックス分だけ実行します。ログインデックスの状態は以下コマンドで確認できます。
# curl -XGET localhost:9200/_cat/indices?v
確認できたら、logstashサービスを再開します。
# service logstash start
本当は、修正すべきインデックスだけ読み込み位置をリセットすればよいのだろうなーとは思うのですが、今回修正を全体に対して行ったため、ちょっと都合よくデータを削除することができませんでした。よって、全面的にデータの取り込み直しということになりました。
ログの初期取り込み・再取り込みは大変
実際取り込み直してみると、データ量が中々多く、チョットエライことになりました。
Elasticsearchサーバはこの対応を行った時点では4vCPU/8GB RAMで構成されていましたが、全然処理が追いつかず、これはちょっとまずいということで、スケールアップを行ってログの再取り込みを行いました。最終的には8vCPU/12GB RAMの構成で落ち着きましたが、それでもLoad Averqageの値は20を記録するほどになりました。
多分、CPUの負荷とか100%超えする光景を拝むのってこういうときぐらいじゃないですかね・・?
また、データ量も半端なく多いため、あっという間にデータ量が7GBを突破しました。そのままVMDKサイズを増加してパーティション広げても良いのですが、色々と面倒だったので、NFS領域をNexentaStor上に200GBぐらい作り、そこに放り込むように設定を変更しました。
果たして何ヶ月分のデータが取り込めるでしょーかっ・・・・と言うところですね。
予想通りというかなんというか、実際NexentaStorのディスク状況で確認してみると、ZILディスクの負荷がかなり高く表示されてました。ログの再取り込み時のIO要求処理量が恒常的に1000IOPSを超えてる感じでした。大規模環境とかだと、かなりしっかりリソース設計しておかないと、色々辛い目に遭いそうな気がしますね。
というわけで
無事UTMのログも正常に取り込めるようになり、我が家ではポリシールートを使ってインターネットに抜ける経路を2系統に振り分ける構成になっているんですが、その傾向をある程度掴むことが出来るようになりました。
上記グラフ、緑色がクライアントがアクセスするINTERLINK回線、水色がサーバがアクセスするHomenoc回線、青色が内部(ステージング環境のMastodonへのアクセス)となっており、カウントベースではありますが、どのぐらいの頻度で通信しているかがグラフで表せるようになりました。
UTMログ収集では、firewall、メールプロキシ、IPSを収集対象にしていて、それぞれどんなパケットがdropされてるかとか、UTM9のExecutive ReportではわかりにくいところをDiscover画面を使って探索するというのが主な用途にになっています。
ココ最近ドハマリしまくってるElasticsearchですが、導入自体は本当に簡単なので、一度やってみると面白いのかなと思います。これでログの取り込みの基礎等を学んでから、Splunkを学んでいくもよし、もう少し汎用な内容をおさえ、アプリ開発に適用させていくもよし、何にしても専門外の知識を得るというのは、ときに新鮮味を感じるよい機会なのかなーと感じています。
Comments are closed