[Management][Network][Linux] Elasticsearchログ基盤を強化する

技術のお話し

ログ解析が面白い

前回の記事でElasticsearchを使ったRouterboardのFirewallのログ分析基盤を作ってみたのですが、いざKibanaを使ってDiscoverやVisualize、Dashboardなど触ってみると、色々情報が整理できて面白いなということを知りました。

そこで、今回は様々なログを収集・分析し、いろいろな通信を把握することを試みてみました。

新たにやりたい事

主なネタとしては以下のような内容です。

  • VPNルーター(RTX-3000,RTX-1100)のログ分析
    • RTX-3000は本番拠点のVPNルーターです。
      このルーターでは、Mastodon本番インスタンスである(social.bluecore.net)のゲートウェイを兼ねています。これまでこのインスタンスに対する通信状況は、時々sidekiqを見るぐらいしか行っておらず、果たしてどのぐらいの通信が発生しているかを捉えられていませんでした。
    • RTX-1100は災対拠点のゲートウェイルーター兼拠点間ルーターです。
      このルーターは災対拠点の拠点間ルーターであると共に、外部インターネットに対する境界ルーターでもあります。全体の通信把握ができてないのと、外部接続/内部接続の比率がわからないという課題が有りました。
  • WAF(Sophos XG Firewall)のログ分析
    • ブログは
      CloudFlare->WAF(Sophos XG Firewall)->BIG-IP ASM->origin(2台)
      と言う通信経路でつながっています。
      この時、CloudFlareとWAF間の疎通に関しての通信把握が困難であるという課題が有りました。原因はSophos XG Firewallのログビューアが使いこなせないことに有り、WAFの通信のみ抽出し、どのバーチャルサーバにどれだけの通信トラフィックがあるかを押さえたいという思いがありました。
  • IPv6中継ルーターのログ分析
    • Homenocとの接続はRouterboardが担保しているのですが、実際に内部とのアクセスを中継・制御しているのは基本的にVMで動作するSEIL/x86です。
      こちらのACL状況を把握する必要があり、併せてブログやMastodonインスタンスのIPv6通信状況もこちらを通じて把握する必要がありました。

構成

コンポーネントの構成としては、従来のものをそのまま踏襲します。

修正を行う必要があるのは、以下のようなポイントです。

  • VMのスケールアップ
    • 従来は2vCPU/2GB RAMで、一応これでも回っていたのですが、Indexが増えるに従いSwap使用量が上昇していることがかくにんされたので、サーバ・ストレージ基盤増強をしたことも有り、今回は4vCPU/12GB RAMに増強をしました。
  • ログ収集設定の追加
    • ログ収集対象が増えるため、設定ファイルを追加します。また、logstashの特性により、いくつか配慮する事項があるため、既存のRouterboard用設定ファイルも修正を加えます。
  • Elasticsearchおよびlogstashの設定変更
    • 主にJVMのメモリ使用量設定を修正します。より多くのメモリを確保し、多くのデータ処理に耐えられるよう構成します。

Elasticsearch、logstashのメモリ使用量設定修正

いずれもJavaを使うため、JVMのメモリ設定が必要です。私の環境では以下の通りとしました。

Elasticsearch (/etc/elasticsearch/jvm.options)

-Xms4g
-Xmx4g

logstash (/etc/logstash/jvm.options)

-Xms3g
-Xmx3g

ログ収集設定の追加

ログ収集設定は /var/log/logstash/conf.d に追加していきます。今回は以下のファイルを追加しています。

yomogi.conf -> RTX-3000用収集設定
soba.conf   -> RTX-1100用収集設定
blog.conf   -> WAF用収集設定
kaede.conf  -> SEIL/x86用収集設定

そのウチの一つであるyomogi.confの設定を以下の通り表記します。

input {
  file {
    path => "/var/log/remote_log/message_yomogi.bluecore.net"
    start_position => "beginning"
    type => "yomogi-message"
  }
}

filter {
  if [type] == "yomogi-message" {
    grok {
      match => [
        "message", "%{SYSLOGTIMESTAMP:time} %{DATA:router} \[INSPECT\] %{DATA:interface}\[%{DATA:direction}\]\[%{DATA:rule}\] %{DATA:protocol} %{IP:srcaddr}:%{NUMBER:srcport} \> %{IP:dstaddr}:%{NUMBER:dstport} \(%{DATA}\)",
        "message", "%{SYSLOGTIMESTAMP:time} %{DATA} %{DATA:interface} %{DATA:action} at %{DATA:direction}\(%{DATA:rule}\) filter: %{DATA:protocol} %{IP:srcaddr}:%{NUMBER:srcport} \> %{IP:dstaddr}:%{NUMBER:dstport}",
        "message", "%{SYSLOGTIMESTAMP:time} %{DATA:interface} %{DATA:action} at %{DATA:direction}\(%{DATA:rule}\) filter: %{DATA:protocol} %{IP:srcaddr}:%{NUMBER:srcport} \> %{IP:dstaddr}:%{NUMBER:dstport}",
        "message", "%{SYSLOGTIMESTAMP:time} %{DATA:router} \[IKE\] %{DATA:ipsecmsg} from %{IP:srcaddr}",
        "message", "%{SYSLOGTIMESTAMP:time} %{DATA:router} \[IKE\] %{DATA:ipsecmsg} : no message",
        "message", "%{SYSLOGTIMESTAMP:time} %{DATA:router} \[IKE\] SA\[%{NUMBER}\] %{DATA:ipsecmsg}",
        "message", "%{SYSLOGTIMESTAMP:time} %{DATA:router} \[IKE\] respond ISAKMP phase to %{IP:srcaddr}"
      ]
    }
  }
}

output {
  if [type] == "yomogi-message" {
    elasticsearch {
      hosts => ["localhost:9200"]
      index => "log-yomogi"
    }
  }
}

ここで重要なのは、以下の部分です。

input {
 file {
 path => "/var/log/remote_log/message_yomogi.bluecore.net"
 start_position => "beginning"
 type => "yomogi-message"
 }
}

typeと言う変数を定義し、それが何であるかを指定します。そして、filterおよびoutputセクションでif分岐をさせています。
filterセクションでは以下のように。

filter {
 if [type] == "yomogi-message" {
 grok {

outputセクションでは以下のようにしています。

output {
 if [type] == "yomogi-message" {

どうやら、logstashはサービスとして起動すると、自動的に /etc/logstash/conf.d 配下のすべてのファイルを自動的に読み込んでログ収集処理を開始するようなのですが、設定自体はファイル単位に分かれてるわけではなく、logstashとしては一つにマージして動いてしまうようです。

よって、このようにtypeを定義し、コレを用いてif分岐をさせないと、個別の設定が効かないようです。そのため、既存コンフィグであるrb2011.conf(Routerboard用収集設定)も同じロジックに基づいて修正しています。

WAFのログ収集設定

WAFのログ収集設定であるblog.confの中身は以下のようにしています。

input {
  file {
    path => "/var/log/remote_log/message_taisho.bluecore.net"
    start_position => "beginning"
    type => "waf-message"
  }
}

filter {
  if [type] == "waf-message" {
    grok {
      match => {
        "message" => "%{SYSLOGTIMESTAMP:time} %{DATA:utm} device=%{GREEDYDATA} log_type=\"WAF\" log_component=\"%{DATA}\" priority=%{DATA:servirity} user_name=\"%{DATA:username}\" server=%{DATA:vserver} sourceip=%{IP:srcaddr} localip=%{IP:dstaddr} ws_protocol=\"%{DATA:wsproto}\" url=%{DATA:urldata} querystring=%{GREEDYDATA} referer=%{DATA:referer} method=%{DATA:method} httpstatus=%{NUMBER:status} reason=\"%{DATA:reason}\" extra=\"%{DATA}\" contenttype=\"%{DATA:contenttype}\" useragent=\"%{DATA:useragent}\" host=%{IP} responsetime=%{NUMBER:restime} bytessent=%{NUMBER:sentbyte} bytesrcv=%{NUMBER:rcvbyte} fw_rule_id=%{NUMBER:ruleid}"
      }
      add_field => {
        "ipversion" => "4"
      }
    }
    if "_grokparsefailure" in [tags] {
      grok {
        match => {
          "message" => "%{SYSLOGTIMESTAMP:time} %{DATA:utm} device=%{GREEDYDATA} log_type=\"Firewall\" log_component=\"%{DATA}\" log_subtype=\"Allowed\" status=\"Allow\" priority=%{DATA:servirity} dulation=%{NUMBER} fw_rule=17 policy_type=%{NUMBER} user_name=%{GREEDYDATA} src_ip=%{IP:srcaddr} src_country_code=%{GREEDYDATA} dst_ip=%{IP:dstaddr} dst_country_code=%{GREEDYDATA} sent_bytes=%{NUMBER:sentbyte} recv_bytes=%{NUMBER:rcvbyte} %{GREEDYDATA}"
        }
        add_field => {
          "vserver" => "www.bluecore.net"
          "ipversion" => "6"
        }
      }
    }
    if "_grokparsefailure" in [tags] {
      drop {}
    }
    geoip {
      source => ["srcaddr"]
    }
    useragent {
      source => "useragent"
      target => "ua-desc"
    }
  }
}

output {
  if [type] == "waf-message" {
    elasticsearch {
      hosts => ["localhost:9200"]
      index => "log-waf"
    }
  }
}

これは、以下点に留意しています。

  • 読み飛ばししたい所は%{GREEDYDATA}ですっ飛ばす
  • WAFやIPv6 NAT以外の情報は不要なので、そもそもElasticsearchに送り込まない
  • ipversionと言うフィールドを作り、IPv4とIPv6のアクセス比率が見れるようにする
  • IPv6 NATをした通信ではvserverフィールドに格納する情報がないので、NATされた情報に対してvserverフィールドにブログの仮想サーバFQDNを追加する
  • geoipを使って、そのIPアドレスがどこの国のものなのかを抽出し、データとして投入する
  • useragentの詳細情報を収集する

これらについて、順を追って説明しようかなと思います。

GREEDYDATAによる複数項目読み飛ばし

まず、WAFの出力ログの例を書いておきます。

Mar 20 03:26:47 taisho device="SFW" date=2018-03-20 time=03:26:47 
 timezone="JST" device_name="SFVH" device_id=C0100********8B log_id=07500061071
 log_type="WAF" log_component="Web Application Firewall" priority=Information
 user_name=\"-\" server=ma.bluecore.net sourceip=***.***.***.*** 
 localip=103.247.181.39 ws_protocol=\"HTTP/1.1\" 
 url=(省略) 
 querystring= cookie=\"(省略)" referer=- metho=PROPFIND httpstatus=207 
 reason="-" extra="-" contenttype="application/xml" 
 useragent="Mozilla/5.0 (Windows) mirall/2.4.1 (build 9270)" 
 host=103.247.181.140 respnsetime=360412 bytessent=1402 bytesrcv=1410
 fw_rule_id=13

コレに対するマッチングパターンはこうしています。

"%{SYSLOGTIMESTAMP:time} %{DATA:utm} device=%{GREEDYDATA}
 log_type=\"WAF\" log_component=\"%{DATA}\" priority=%{DATA:servirity}
 user_name=\"%{DATA:username}\" server=%{DATA:vserver}
 sourceip=%{IP:srcaddr} localip=%{IP:dstaddr}
 ws_protocol=\"%{DATA:wsproto}\" url=%{DATA:urldata}
 querystring=%{GREEDYDATA}
 referer=%{DATA:referer} method=%{DATA:method}
 httpstatus=%{NUMBER:status} reason=\"%{DATA:reason}\"
 extra=\"%{DATA}\" contenttype=\"%{DATA:contenttype}\"
 useragent=\"%{DATA:useragent}\" host=%{IP}
 responsetime=%{NUMBER:restime} bytessent=%{NUMBER:sentbyte}
 bytesrcv=%{NUMBER:rcvbyte} fw_rule_id=%{NUMBER:ruleid}"

ログ例にある太字箇所が「これはいらない」と判断した箇所で、そこを複数項目に渡って「GREEDYDATA」として取り扱い、変数定義しないことでElasticsearchの登録対象からまるっと外すことが可能となりました。こうして、マッチングパターンの文字数を少なくすることが可能になります。

フィールドの追加・値の投入

grokでマッチした文字列や値をフィールドへ放り込むのは以前の記事でも書いたのですが、そこにない値、例えば「このパターンに合致したログにフラグを付与したい」という場合、フィールドを追加して、文字列や数値を組み込むことが可能だったりします。

 grok {
   match => {
     "message" => "(IPv4でWAFにて受け付けるパターン)"
   }
   add_field => {
     "ipversion" => "4"
   }
 }
 if "_grokparsefailure" in [tags] {
   grok {
     match => {
       "message" => "(IPv6でNATにて受け付けるパターン)"
     }
     add_field => {
       "vserver" => "www.bluecore.net"
       "ipversion" => "6"
     }
   }
(以下略)

流れとしては以下のような感じです。

  • まず取り込まれたログはWAFログパターンと照合されます。
    • 該当すれば、IPv4で受け付けたアクセスになるので、ipversionと言うフィールドに4と言う値を放り込みます。
    • 該当しなければ、tagsフィールドに「_grokparsefailure」というメッセージが付与されます。
  • 引き続き、処理された情報のtagsフィールドに「_grokparsefailure」というメッセージが付与されているかどうか確認します。
    • 付与されている場合、NATログパターンと照合されます。
      • 照合した場合、tagsフィールドの「_grokparsefailure」は消去されます。
      • ipversionと言うフィールドに6と言う値、vserverと言うフィールドに「www.bluecore.net」を付与します。
      • 該当しなければ、tagsフィールドに「_grokparsefailure」と言うメッセージが付与されたままになります。

これにより、IPv4とIPv6、どのようなデータが来てるかを振り分けられるようになります。

無駄なデータを読み込まない

何れのパターンにも当てはまらなかった場合、tagsフィールドに「_grokparsefailure」と言うメッセージが残ったままとなり、必要最低限のフィールドしかもたない・・・けれども、クソ長いmessageフィールド値を持ったレコードが爆誕します。

ファイアウォールのログ量というのは中々強烈で、コレをまともにElasticsearchに取り込んだらリソースがいくらあっても足りません。ので、それらは取り込まないようにします。

 if "_grokparsefailure" in [tags] {
   drop {}
 }

geoipによる国情報を取得

logstashにはgeoipと言う処理を使って、IPアドレスがどこの国に属するのかを調べ、データに追加することが可能なようです。dropされずに出てきたデータに対してこの処理を実施します。

 geoip {
   source => ["srcaddr"]
 }

useragentの詳細化

useragentそれ自体が結構きちんとしたブラウザ情報を提供してくれていますが、より直感的に詳細の情報をデータに追加することが可能なようです。それを行います。既存情報のフィールドを上書きするのは避けたいため、別名のフィールドを構成し、そこに情報を格納するようにします。

 useragent {
   source => "useragent"
   target => "ua-desc"
 }

Kibanaなりを使用してデータ取り込み状況を確認する

完了したら、logstashを再起動し、Indexパターンを登録した上でDiscovery画面を開きます。そして、取り込んだログが正常に分類されているかどうかを確認します。

後は・・・・

Visualizeでグラフ作成などを行い、Dashboardを作れば、それなりの可視化が出来るようになりますし、細かい所はDiscover画面でサーチできるようになり、分析が比較的楽になると思います。

特に、geoipで世界地図にアクセス状況を出すのとか、何だかちょっぴりかっこよく、それっぽい感じになりますね。

これまで、CloudFlareが受けていたアクセスや、ページビュー情報等については、Google Analyticsを使ってある程度掴むことができていたんですが、自宅側のオリジンサーバに対するアクセスについては、コレを使ってある程度把握できたんでよかったなぁーって感じです。

他にも色んな事ができそうなので、ボチボチ試してみたいと思います。

負荷状況・参照系コマンド

負荷状況ですが、こんな感じになってます。

logstashの起動時が一番CPU負荷が高く、%CPUが395ということで、4vCPUをほぼ全部使い切ってました。最初の起動がすぎれば、その後はだいぶ負荷が下がります。ただその後もログは流れ続けるので、CPU使用率は10%前後をキープするという感じになっています。

例えば、大規模環境で大量のログを集積するサーバなどは、この辺りのCPUチューニングが必要なのかなと感じています。

参照系コマンドですが、以下のものが非常に便利でした。

インデックス一覧の表示

コマンドとしては以下のとおりです。

# curl -XGET <ElasticsearchのIP>:<ポート>/_cat/indices?v

出力結果はこんな感じになります。

件数や容量もわかるので、かなーり便利です。ちなみに登録された情報は /var/lib/elasticsearch 配下に溜まるみたいです。

インデックスの削除

コマンドとしては以下のとおりです。

# curl -XDELETE 'http://<ElasticsearchのIP>:<ポート>/<削除するインデックス名>'

インデックス名の指定はワイルドカードとか使えます。私の場合、途中で構成を見直して取り込み直しをしたり、Elasticsearchの中身を整理したかったりなど色々あったため、この削除コマンドはすごく役に立ちました。

正直、Elasticsearchに関して当初は全く興味がなく、「一体何をそんなに盛り上がってるんだろう?」的な気分でみていたものですが、ハマると面白いなーと思います。特にログ分析とかを手作業でやって死にそうになってる人は、一度こういうのをやってみると良いかもしれません。

logstashは先の記事でも書いてるように、logstashとしてはstdinから取り込むようにして、外部からもってきたログファイルをcatしてインポートすることにより、Elasticsearchで分析するということもできたりするので、色々応用が効くんじゃないかなと思います。

 

Tags:

Comments are closed

PAGE TOP