[Management][Network][Linux] Elasticsearchを使ってログ分析基盤を組んでみた

技術のお話し

単語はよく聞くElasticsearch

ここ最近よく聞くElasticsearch。全文検索システムの名称らしいのだが、どうにもパッと頭に浮かばない。結構開発寄りな話が多いかなあぁ・・・?という気がしていて、あまり手を出してなかったのだけど、ここ最近どうやらMastodonの最新バージョンで全文検索を実装するようになったみたいで、その前段に何か学ぶことがないかなぁ?とおもったりしたら、ログ分析の話を見かけたりしたわけでして。

ログ分析では、過去にSplunkというものに触れてみたことがあるのだけど、Elasticsearchでも似たようなことが出来るんだとか。そりゃ面白そうだということで手を出してみたのが今回であります。

今回組んだ構成

ログ分析基盤を構成するにあたり、Elasticsearchはあくまで検索エンジンのコアであり、それとは別に以下のコンポーネントが必要なようです。

  • logstash:ログの加工を行い、Elasticsearchへ送り込む
  • kibana:Elasticsearchとデータのやり取りをし、分析・可視化するフロントエンド

今回はインターネットゲートウェイの一部機能を担っているRouterboard RB2011 UiASをログ分析対象にしました。
当該機器は、既存の仕組みの中でFirewallのログをログサーバへ転送をしています。ログサーバではrsyslogを使用して、ログファイルの仕分けを行っています。この仕組で結構な数の機器のログ取り込みをしているため、コレを変更せずに済む方法を考えました。

当初は、ログサーバに実装することを考えたのですが、実はログサーバがボチボチ古いサーバであり、実装の仕方を学ぶには少し不都合があるため、新規にサーバを立て、そこにElasticsearch/logstash/kibanaを導入することにしました。

既存環境ではログ格納領域をローカル領域にしていたのですが、分析サーバから一貫したデータとして参照できるようにするため、NFSマウント可能なNAS上にログデータを保存するよう構成することにしました。

構成を図に表すとこんな感じです。ログ分析サーバにすべてのコンポーネントを組み込む形とし、ユーザはkibanaを通じて統計情報を把握できるようにする形としています。

前準備

前準備として、ローカルストレージん保存していたログサーバのデータをNASへ移行します。ざっくり以下の流れで実施しました。

【ログサーバ側対応】

  • リモートログ領域を全てNFSサーバへ移行する
  • ログサーバへログインし、rsyslogを停止
  • NFS領域へリモートログデータをrsyncでコピー
  • /etc/fstab設定を修正してリモートログ領域をNFSマウント
  • rsyslog再開

【ログ分析サーバ側対応】

  • /etc/fstab設定を修正してリモートログ領域をNFSマウント

各コンポーネントのインストール

各コンポーネントのインストールに入るのですが、細かい点についてはあくまで参考程度にとどめてください。実際の手順やリポジトリの書き方としては、以下を参照することが望ましいです。(他Tipsではバージョン差異により情報が正しくないことがあったりします)
https://www.elastic.co/guide/en/elasticsearch/reference/current/rpm.html

Import the Elasticsearch PGP Keyの記載に従い、以下コマンドを実行します。

rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

Installing from the RPM repositoryの記載に従い、/etc/yum.repos.d 配下に elasticsearch.repo と言うファイルを作り、以下の通り記載します。

[elasticsearch-6.x]
name=Elasticsearch repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

以下コマンドを実行し、インストールします。

# yum -y install elastic search
# yum -y install kibana
# yum -y install logstash

各種設定

【Elasticsearchの設定】

以下コマンドを実行し、設定ファイルを修正します。

# vi /etc/elasticsearch/elasticsearch.yml

—–以下の箇所が修正箇所です。

network.host: "0.0.0.0"
->今後Mastodonとの連携もあるため、外部から接続可能にした。

【Kibanaの設定】

以下コマンドを実行し、設定ファイルを修正する。

# vi /etc/kibana/kibana.yml

—–以下箇所が修正箇所です。

server.host: "0.0.0.0"
elasticsearch.url: "http://localhost:9200"

【logstashの設定】

logstashの設定箇所は特にありません。
具体的なコンフィグは/etc/logstash/conf.dに実装します。

elasticsearch/kibanaの起動

以下コマンドを実行し、elasticsearchとkibanaを起動します。

# systemctl restart elasticsearch
# systemctl enable elasticsearch

# systemctl start kibana
# systemctl enable kibana

kibanaにアクセスする

ブラウザで http://<IPアドレス>:5601 へアクセスし、トップページの表示を確認します。

上図のようなページが表示されたらOKです。

elasticsearchへのアクセス

ブラウザで http://<IPアドレス>:9200 へアクセスします。

上図のようなJSON応答が帰ってきたらOKです。

ログのパターン当て込み

では、Routerboardのログパターンを解析し、logstashで要素ごとに振り分けができるようにします。Routerboardのfirewallログを見る限り、以下のようなパターンが有ることがわかりました。

[パターン1:TCP/UDP以外のプロトコルを使用しているパターン(OSPF等)]
Mar 8 14:01:15 10.5.1.1 firewall,info input: in:******** out:********, src-mac **:**:**:**:**:**, proto ***, ***.***.***.***->***.***.***.***, len 72

[パターン2:ICMP通信を行っているパターン]
Mar 9 11:37:03 10.5.1.1 firewall,info input: in:******** out:********, src-mac **:**:**:**:**:**, proto ICMP (type 136, code 0), fe80::****:****:****:****->fe80::****:****:****:****, len 24

[パターン3:TCP/UDP通信をIPv4で行っているパターン]
Mar 9 11:55:49 10.5.1.1 firewall,info input: in:******** out:********, proto TCP (SYN), ***.***.***.***:***->***.***.***.***:***, len 44

[パターン4:TCP/UDP通信をIPv6アドレスで行っているパターン]
Mar 9 11:37:03 10.5.1.1 firewall,info forward: in:******** out:********, src-mac **:**:**:**:**:**, proto TCP (ACK,RST), [****:****:****:****:****:****:****:****]:***->[****:****:****:****:****:****:****:****]:***, len 32

[パターン5:src-mac情報がないパターン]
Mar 8 14:28:55 10.5.1.1 firewall,info input: in:******** out:********, proto TCP (SYN), ***.***.***.***:***->***.***.***.***:***, len **

さて、こうしたパターンをlogstashの設定に当てはめていくわけですが、この時使用するのがglokプラグインというものです。読み込んだメッセージからパターンに一致するものをフィールド名と紐付けをすることが出来るようです。ざっくりイメージを書いてみました。

具体的にどのようなパターンが定義されているかについては、以下URLを確認すると良いと思います。
https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns

さて、この形で先述したログのパターンに当て込んでみたところ、以下のように組むことで振り分けができるということがわかりました。

[パターン1および2に対応する定義]
"%{SYSLOGTIMESTAMP:time} %{IPV4:router} firewall,info %{DATA:rule}: in:%{DATA:inbridge} out:%{DATA:outbridge}, src-mac %{COMMONMAC:srcmac}, proto %{DATA:protocol}, %{IP:srcaddr}-\>%{IP:dstaddr}, len %{NUMBER:pktlength}"

[パターン3に対応する定義]
"%{SYSLOGTIMESTAMP:time} %{IPV4:router} firewall,info %{DATA:rule}: in:%{DATA:inbridge} out:%{DATA:outbridge}, src-mac %{COMMONMAC:srcmac}, proto %{DATA:protocol}, %{IP:srcaddr}:%{DATA:srcport}-\>%{IP:dstaddr}:%{DATA:dstport}, len %{NUMBER:pktlength}"

[パターン4に対応する定義]
"%{SYSLOGTIMESTAMP:time} %{IPV4:router} firewall,info %{DATA:rule}: in:%{DATA:inbridge} out:%{DATA:outbridge}, proto %{DATA:protocol}, \[%{IP:srcaddr}\]:%{DATA:srcport}-\>\[%{IP:dstaddr}\]:%{DATA:dstport}, len %{NUMBER:pktlength}"

[パターン5に対応する定義]
"%{SYSLOGTIMESTAMP:time} %{IPV4:router} firewall,info %{DATA:rule}: in:%{DATA:inbridge} out:%{DATA:outbridge}, proto %{DATA:protocol}, %{IP:srcaddr}:%{DATA:srcport}-\>%{IP:dstaddr}:%{DATA:dstport}, len %{NUMBER:pktlength}"

実際に作成したlogstashの設定(/etc/logstash/conf.d/rb2011.conf)

では、実際にConfigを作成します。作成したコンフィグはこんな感じになりました。

input {
  file {
    path => "/var/log/remote_log/rb2011_messages"
    start_position => "beginning"
  }
}

filter {
  grok {
    match => [
      "message", "%{SYSLOGTIMESTAMP:time} %{IPV4:router} firewall,info %{DATA:rule}: in:%{DATA:inbridge} out:%{DATA:outbridge}, src-mac %{COMMONMAC:srcmac}, proto %{DATA:protocol}, %{IP:srcaddr}-\>%{IP:dstaddr}, len %{NUMBER:pktlength}",
      "message", "%{SYSLOGTIMESTAMP:time} %{IPV4:router} firewall,info %{DATA:rule}: in:%{DATA:inbridge} out:%{DATA:outbridge}, src-mac %{COMMONMAC:srcmac}, proto %{DATA:protocol}, %{IP:srcaddr}:%{DATA:srcport}-\>%{IP:dstaddr}:%{DATA:dstport}, len %{NUMBER:pktlength}",
      "message", "%{SYSLOGTIMESTAMP:time} %{IPV4:router} firewall,info %{DATA:rule}: in:%{DATA:inbridge} out:%{DATA:outbridge}, proto %{DATA:protocol}, %{IP:srcaddr}:%{DATA:srcport}-\>%{IP:dstaddr}:%{DATA:dstport}, len %{NUMBER:pktlength}",
      "message", "%{SYSLOGTIMESTAMP:time} %{IPV4:router} firewall,info %{DATA:rule}: in:%{DATA:inbridge} out:%{DATA:outbridge}, proto %{DATA:protocol}, \[%{IP:srcaddr}\]:%{DATA:srcport}-\>\[%{IP:dstaddr}\]:%{DATA:dstport}, len %{NUMBER:pktlength}"
     ]
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "rb2011-log"
  }
}

注意を要するところとしては、以下の様なところがありました。

  • glokプラグインのメッセージ取り込みの記述は、単一パターン時と複数パターン時で異なるので注意
    • 単一パターンだと{}で囲うが、複数パターンだと[]で囲う
    • 単一パターンだとmessageとパターンの間を=>でつなぐが、複数パターンだと,(カンマ)でつなぐ
  • output文内のelasticsearchプラグインの記述は、バージョン5.xを境に変わっている
    • 他Tipsサイトではバージョン差異で記述が異なっていたりするので注意すること
    • /var/log/logstash/logstash-plain.logにエラー出力されるので、比較的わかりやすいと思う

試行錯誤の仕方

私、いきなり先述したコンフィグが書けてたわけじゃありませんで、当然試行錯誤しています。ただ、いきなりelasticsearchに放り込んだらその後どうしたらよいかわからないため、事前にちゃんと振り分けができてるかどうかの確認を行いました。

ログの一部を取り敢えず拾っときます。

# tail /var/log/remote_log/rb2011_messages > ~/rb2011_messages

そして、コンフィグもどきを作ります。これを、test.confとかの名前で保存します。

input {
  stdin {}
}

filter {
  grok {
    match => [
      "message", "(何かしらのルール)",
      "message", "(何かしらのルール)",
      "message", "(何かしらのルール)",
      "message", "(何かしらのルール)"
    ]
  }
}

output {
  stdout { codec => rubydebug }
}

そして、以下のように取得した一部ログをlogstashに噛ませます。

# cat ~/rb2011_messages | /usr/share/logstash/bin/logstash -f test.conf

結果が標準出力に出てきます。少し時間がかかります。また、そもそも設定記載に問題があれば、1件も処理されずにエラー出力されます。

こんな風にできれば成功。ちゃんとデータを区切れています。

{
 "message" => "(ログメッセージ本体)",
 "time" => "Mar 8 14:28:41",
 "protocol" => "ICMP (type 130, code 0)",
 "srcaddr" => "****::****:****:****:****",
 "@version" => "1",
 "host" => "Ganesa",
 "outbridge" => "(unknown 0)",
 "pktlength" => "**",
 "dstaddr" => "****::1",
 "router" => "***.***.***.***",
 "rule" => "*****",
 "inbridge" => "*****",
 "srcmac" => "**:**:**:**:**:**",
 "@timestamp" => 2018-03-08T07:01:40.603Z
}

こんなふうに出た場合、何れのパターンにも当てはまらなかったことを示しています。
ログメッセージ本体を取り出し、定義したパターンのどこが当てはまらないかを確認する必要があると思われます。

{
 "message" => "(ログメッセージ本体)",
 "@version" => "1",
 "host" => "Ganesa",
 "@timestamp" => 2018-03-08T07:01:40.604Z,
 "tags" => [
 [0] "_grokparsefailure"
 ]
}

こうして試行錯誤を繰り返したら、config内容をFIXします。

logstashサービスの開始・自動起動有効化

logstashサービスを開始・自動起動を有効化します。これにより/var/logstash/conf.dの内容を読み取りログをさばいてくれるようになります。

# systemctl start logstash
# systemctl enable logstash

サービスが起動したら、ログを確認して状態を見てみませう。

# tail -f /var/log/logstash/logstash-plain.log

さてさて・・・

[2018-03-09T10:42:35,128][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.2.2"}
[2018-03-09T10:42:35,363][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2018-03-09T10:42:36,874][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-03-09T10:42:37,535][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2018-03-09T10:42:37,542][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2018-03-09T10:42:37,946][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://localhost:9200/"}
 :
 :

と言う具合にINFOログを中心に流れれば取り敢えず大丈夫そう。
以下のようなログが出力された場合は、Configに何らかのミスが有ると思われます。

[2018-03-08T16:40:45,953][ERROR][logstash.agent ] 
 Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, 
 :exception=>"LogStash::ConfigurationError", 
 :message=>"Something is wrong with your configuration."
 (以下略)

私の場合、elasticsearchプラグインの記述がバージョン差異で変わっていることに気づいてないことが原因で、そこの構文エラーが発行されたようでした。

最終的には

本日(3/10)時点ですでにRB2011のある程度の解析ができるようになっていて、下図のようにログの要素から内容を掘り下げてみたりとか出来るようになりますた。何となく単純にログを出力させている箇所はSplunkにそっくりですね。

下図のようにKibanaを使ってVisualize設定を作成したり、Dashboardを作成したりすることによって、視覚的にトラフィックを把握することが出来るようになります。

Splunkはデータの取り込みなんかは比較的簡単にできますが、そこから機械学習とか使ってパターン学ばせたり、正規表現を直接書いたりなど、そうした所はめんどくさいのかなと。その点、glokがかなり便利で、結構直感的にパターン書きができるので、そういう所は楽です。

ただ、取り込みするための設定を書くのが個別パターンになると色々めんどくさいかなーと感じたりもしました。商用パッケージであるX-Packを組み込むことで、さらに色んな機能をもたせることができ、ログ解析基盤としての可能性は結構あるように思えました。(Splunkと違って、ログ解析にとどまらず様々な検索・分析機能を有するものなので、そもそもSplunkと同じ土俵に載せる自体が間違ってる気もしますが・・・てへ。)

気が向いたらKibanaの取扱についてもかければ書きたいと思うのですが・・。それにしてもElasticsearch面白いっす。もう少し他に色んなログを取り込んでみたいっすね。

また、Mastodon 2.3.0の正式版がタグ化されたら、全文検索を実装したいですね。サーバはこいつを使って出来るはずなので、やってみる価値はあるとおもってます。はい。

Tags:

Comments are closed

PAGE TOP