ELKで得られる情報を監視する
ElasticsearchとLogstash、Kibanaを組み合わせたインフラをELKと呼ぶそうです。
Elasticsearchとlogstashを使用してログの吸い上げ、解析を行い、Kibanaで可視化することをコレまでやってきてるわけですが、これに監視を加えようと思います。本来、X-Packという有償ツールを使えばそれが叶うようなのですが、それはコストの面から難しいため、なにかツールはないかと探したところ、Elastalertなるものを見つけました。お、何だかこれは便利そうだ!
Elastalertとは
Elasticsearchを監視し、特定の条件に従いアラート発報をするツールのようです。Pythonベースで作られてるようで、通知先としてはEmailに限らずSlackなんかにも通知が行えるようです。
今回、WAFのログ情報を監視し、ブログサイトへ何かしらの攻撃が来た際に通知をするようにしてみました。実は、セキュリティ保護・維持目的というよりは、攻撃が来た時WAFがどんなシグニチャに反応し、応答するのかの情報を抑えること(WAFの仕組みの理解)が目的です。もちろん、適切にブロックできてるかどうかというのを知るのにも役立てることを考えていました。
Python環境等、周辺環境の整備
ElastalertはPythonベースなので、この環境整備をします。プリインストールされてるPython2.xで動作可能なようなので、周辺環境で必要となるものを入れていきます。
■pipのインストール # curl -kL https://bootstrap.pypa.io/get-pip.py | python ■pipの動作確認 # pip -V pip 9.0.3 from /usr/lib/python2.7/site-packages (python 2.7) ■setuptoolsのアップグレード # pip install --upgrade setuptools ■その他必要となるコンポーネントのインストール # yum install gcc python-devel
Elastalertのインストール
Elastalertをインストールします。どうやらElasticsearchのバージョンを指定する必要があるようです。当方環境は6.2.2なので、6.x系のElastalertをインストールします。
# pip install "elasticsearch==6.0.0." elastalert
こんなふうに出てきたら取り敢えずインストール完了みたいです。
Successfully installed PyJWT-1.6.1 PyStaticConfiguration-0.10.3 PyYAML-3.12 argparse-1.4.0 blist-1.3.6 boto3-1.6.21 botocore-1.9.21 configparser-3.5.0 defusedxml-0.5.0 docopt-0.6.2 docutils-0.14 elastalert-0.1.29 envparse-0.2.0 exotel-0.1.5 funcsigs-1.0.2 functools32-3.2.3.post2 futures-3.2.0 jira-1.0.14 jmespath-0.9.3 jsonschema-2.6.0 mock-2.0.0 oauthlib-2.0.7 ordereddict-1.1 pbr-4.0.0 pyOpenSSL-17.5.0 requests-oauthlib-0.8.0 requests-toolbelt-0.8.0 s3transfer-0.1.13 simplejson-3.13.2 stomp.py-4.1.20 texttable-1.2.1 twilio-6.0.0
試しにコマンド実行し、ちゃんと動きそうだということを確認しときます。
# elastalert -h usage: elastalert [-h] [--config CONFIG] [--debug] [--rule RULE] [--silence SILENCE] [--start START] [--end END] [--verbose] [--patience TIMEOUT] [--pin_rules] [--es_debug] [--es_debug_trace ES_DEBUG_TRACE] optional arguments: (以下略)
Elastalertの設定
Elastalertに必要となるディレクトリを作ります。
# mkdir /var/lib/elastalert/rules # mkdir /etc/elastalert
コンフィグファイルを作成します。
------/etc/elastalert/config.yml # The Elasticsearch hostname for metadata writeback # Note that every rule can have its own Elasticsearch host es_host: localhost # The Elasticsearch port es_port: 9200 # Connect with TLS to Elasticsearch use_ssl: false # This is the folder that contains the rule yaml files # Any .yaml file will be loaded as a rule rules_folder: /var/lib/elastalert/rules # How often ElastAlert will query Elasticsearch # The unit can be anything from weeks to seconds run_every: minutes: 2 # ElastAlert will buffer results from the most recent # period of time, in case some log sources are not in real time buffer_time: minutes: 5 # The index on es_host which is used for metadata storage # This can be a unmapped index, but it is recommended that you run # elastalert-create-index to set a mapping writeback_index: elastalert_status # If an alert fails for some reason, ElastAlert will retry # sending the alert until this time period has elapsed alert_time_limit: minutes: 60 -----
Elastalertに必要となるインデックスの作成
# elastalert-create-index
こんなインデックスが作られていました。
- elastalert_status_past
- elastalert_status
- elastalert_status_status
- elastalert_status_error
- elastalert_status_silence
Elastalertのサービス化
Elastalert単独ではフォアグラウンドでしか動かないため、コレをサービス化します。どうやらこれはsupervisordというツールを使ってサービス化するようです。
# pip install supervisor
supervisordの設定を行います。
-----/etc/supervisord.conf [unix_http_server] file=/var/run/supervisor.sock [supervisord] logfile=/var/log/supervisord.log logfile_maxbytes=1MB logfile_backups=2 loglevel=warn nodaemon=false directory=%(here)s [inet_http_server] port = 127.0.0.1:9001 [rpcinterface:supervisor] supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface [supervisorctl] serverurl=unix:///var/run/supervisor.sock [program:elastalert] command=elastalert --config /etc/elastalert/config.yml --verbose process_name=elastalert autorestart=true startsecs=15 stopsignal=TERM stopasgroup=true killasgroup=true redirect_stderr=false stderr_logfile=/var/log/supervisord/elastalert_stderr.log stderr_logfile_maxbytes=1MB stderr_logfile_backups=5 -----
supervisord自体もinitdでサービス化させます。
■supervisordの起動スクリプトを入手 # curl -o /etc/rc.d/init.d/supervisord https://raw.githubusercontent.com/Supervisor/initscripts/master/redhat-init-equeffelec # chmod 755 supervisord ■サービスとして登録する # chkconfig --add supervisord ■このようにして起動する(実際は設定等突っ込んでテストしたあとで良いと思う) # service supervisord start # supervisorctl start elastalert
ルールの作成
ルールは/var/lib/elastalert/rules配下に配置します。サービス起動した後、ここに配置されているYAMLファイルを読み込み、Elastalertが動くことになります。
記載例を以下の通り示します。
----- /var/lib/elastalert/rules/asm_alert.yaml index: log-virocana-asm filter: [] name: "Attack to WWW.BLUECORE.NET" type: blacklist compare_key: "violation_rating" blacklist: - "1" - "2" - "3" - "4" - "5" timeframe: minutes: 1 timestamp_field: "time" alert: - "email" alert_subject: "[WWW.BLUECORE.NET] ASMからの攻撃通知" email: - "***@bluecore.net" - "***@***.com" email_from_field: "Elasticsearch" email_add_domain: "@bluecore.net" smtp_host: "192.168.***.***" -----
内容としては以下のような感じです。
- index: 監視対象インデックスを指定します。
- filter: フィルタ条件があれば記載します。なければ[](大かっこ開き閉じ)を記載します。
- name: 任意のルール名を記載します。
- type: 監視形式を記載します。
- 今回は「該当したキーワードを拾ったら発報する」というblacklistを使っています。
- compare_key: blacklistにマッチするかを調べるフィールド名を記載します。
- blacklist: ブラックリストをリスト記載します。
- 今回violation_rateを監視し、0以外を発報するようにします。
- 本当はwhitelistが望ましいのですが、うまく動かなかったことから渋々blacklistにしています。
- timeframe: 監視間隔を提議します。ここでは1分間隔にしています。
- timestamp_field: timestampとして取り扱うフィールドを指定します。
- ここではSYSLOG受信時刻を記録するフィールドである「time」を指定しています。
- これは、logstashで定義するフィールド名によって左右されます。
- alert: 発報形式を指定します。ここでは電子メールを指定しています。
- alert_subject: 電子メールの件名を指定します。日本語いけます。
- email: 電子メール送信先を指定します。
- リスト記載することで複数の送信先指定が可能です。
- email_from_field: 電子メールのfrom欄を指定します。
- email_add_domain: 電子メールのfromにドメイン名を付与します。
- smtp_host: SMTPサーバを指定します。
ルールのテスト
ルールのテストコマンドがあります。
■ディレクトリへ移動 # cd /var/lib/elastalert/rules ■テスト実行 # elastalert-test-rule --config /etc/elastalert/config.yml big-ip_asm_alert.yaml --day 13
すると、こんな出力が行われます。上記コマンドでは、過去13日間のログを対象にelasalertの処理を実行します。オプションに「–alert」を加えることで、実際に発報テストをすることも可能です。
・・・・・・(中略)・・・・・・ INFO:elastalert:Queried rule Attack to WWW.BLUECORE.NET from 2018-04-07 10:21 JST to 2018-04-07 10:26 JST: 5836 / 0 hits INFO:elastalert:Queried rule Attack to WWW.BLUECORE.NET from 2018-04-07 10:26 JST to 2018-04-07 10:31 JST: 5837 / 1 hits INFO:elastalert:Queried rule Attack to WWW.BLUECORE.NET from 2018-04-07 10:31 JST to 2018-04-07 10:36 JST: 5841 / 4 hits INFO:elastalert:Queried rule Attack to WWW.BLUECORE.NET from 2018-04-07 10:36 JST to 2018-04-07 10:41 JST: 5845 / 4 hits INFO:elastalert:Queried rule Attack to WWW.BLUECORE.NET from 2018-04-07 10:41 JST to 2018-04-07 10:46 JST: 5845 / 0 hits INFO:elastalert:Queried rule Attack to WWW.BLUECORE.NET from 2018-04-07 10:46 JST to 2018-04-07 10:51 JST: 5845 / 0 hits INFO:elastalert:Queried rule Attack to WWW.BLUECORE.NET from 2018-04-07 10:51 JST to 2018-04-07 10:56 JST: 5845 / 0 hits INFO:elastalert:Queried rule Attack to WWW.BLUECORE.NET from 2018-04-07 10:56 JST to 2018-04-07 11:01 JST: 5846 / 1 hits INFO:elastalert:Queried rule Attack to WWW.BLUECORE.NET from 2018-04-07 11:01 JST to 2018-04-07 11:06 JST: 5847 / 1 hits INFO:elastalert:Alert for Attack to WWW.BLUECORE.NET at 2018-04-03T14:44:23Z: INFO:elastalert:Attack to WWW.BLUECORE.NET @timestamp: 2018-04-03T14:44:23.000Z @version: 1 _id: 3Tz4i2IBown-vw_HIzsT _index: log-virocana-asm _type: doc : : : (中略) : : : INFO:elastalert:Ignoring match for silenced rule Attack to WWW.BLUECORE.NET INFO:elastalert:Ignoring match for silenced rule Attack to WWW.BLUECORE.NET Would have written the following documents to writeback index (default is elastalert_status): silence - {'rule_name': 'Attack to WWW.BLUECORE.NET', '@timestamp': datetime.datetime(2018, 4, 7, 2, 7, 21, 22673, tzinfo=tzutc()), 'exponent': 0, 'until': datetime.datetime(2018, 4, 7, 2, 8, 21, 22654, tzinfo=tzutc())} elastalert_status - {'hits': 5847, 'matches': 3, '@timestamp': datetime.datetime(2018, 4, 7, 2, 7, 21, 66200, tzinfo=tzutc()), 'rule_name': 'Attack to WWW.BLUECORE.NET', 'starttime': datetime.datetime(2018, 3, 25, 2, 6, 47, 269394, tzinfo=tzutc()), 'endtime': datetime.datetime(2018, 4, 7, 2, 6, 47, 269394, tzinfo=tzutc()), 'time_taken': 33.78481388092041}
まず対象インデックスの検索を行い、ログを洗います。最終的に、パターンマッチしたデータの列挙と結果の表示が行われます。ちゃんと動いてそうだったら、サービス起動して自動起動をONにしてはい、できあがり。
実際に発報させてみた
実際に発報させた結果はこんな感じです。
Attack to www.bluecore.net @timestamp: 2018-04-06T05:07:14.000Z @version: 1 _id: IN9amWIBH69oBVFjpMgB _index: log-virocana-asm _type: doc attack_type: Predictable Resource Location date_time: 2018-04-06 14:07:13 dest_ip: 10.5.0.152 dest_port: 80 geo_location: N/A geoip: { "city_name": "San Diego", (省略) } host: Ganesa ip_client: 10.5.0.1 ipversion: 4 message: (省略) method: GET num_hits: 4 num_matches: 1 path: /var/log/remote_log/asm_messages policy_name: (省略) protocol: HTTP request: GET (省略) request_status: blocked response: Request was blocked response_code: 0 route_domain: 0 severity: Error sig_names: Unix hidden (dot-file) access sig_set_names: {Systems: PHP, Apache, MySQL...} src_port: 43741 syslog_received_at: 2018-04-06T05:07:56.449Z time: 2018-04-06T05:07:14Z type: asm-message uri: /.well-known/security.txt utm: virocana.bluecore.net violation_details: (省略) violation_rating: 1 violations: Attack signature detected virus_name: N/A x_forward_for_utm: 10.5.0.1 x_forwarded_for_cloudflare: (省略) x_forwarded_for_fqdn: census12.shodan.io x_forwarded_for_realip:(省略)
一通りの情報がメールに載ってやってきます。上記はセキュリティ脆弱性の情報を提供するshodanが脆弱性スキャンをした形跡のようです。上記の場合、violation_rateは1、ドット付きファイルへのアクセスを試みている点がシグニチャにひっかかったようです。
実際動かしてみて
現状、コレ含めて2件ほど反応、発報していることを確認しています。こうして形跡とかを様々眺めることで、どんな攻撃が来てるかを掴んだりすることが可能になりました。分析基盤だけだとどうしても「そもそもどんな物があるのか」を見つけるのが大変なのですが、この発報構成を組むことで、取っ掛かりの対応がだいぶ楽になった感じです。
当方のブログはCloudFlareのフリー版を使ってますが、今使ってるWAFはx_forwarded_forヘッダを使ってアクセス元、対応したCloudFlareリバースプロキシ、UTMのアドレスというふうに全経路を抑えることが可能なので、そこから分割してアクセス元を割り出して対応しています。
logstashのgrokフィルタをもう少しネリネリすると、もっとディープに色々な情報が掘れるのかもしれませんね。そういう意味で、こうして発報された情報をもとに、logstashの設定を見直す、或いは負荷低減のためのチューニングをするなどしつつフィードバックしていくと、ELK基盤もより良い感じになっていくんじゃないかと思います。
Comments are closed