[Security] [Python]mod_securityの検出状態を可視化する

意外とない可視化ツール

現在、自宅環境内の統合リバースプロキシはNginx + Mod_Security + LDAP連携を用いて動かしているのですが、Mod_Securityの検出状況が捉えづらく、これを何とか可視化できないか?と思って探すものの、なかなか良い物が見つからず。

  • AuditConsole
    Mod_Securityのログ変換が対応できてない(OWASP2.9ならいけそう)
  • WAF-FLE
    どうしてもmlog形式で出力したログが、WAF-FLE上のControllerに展開できない

と言うかそもそも、こうした可視化ツールが大分昔に開発が停滞しているのか、そもそもやる気を失ったのかというところで、WAF-FLEも0.6.4が最後で、2015年頃を最後に更新が止まっているようです。

Pythonに頼ることに

で、ちょうど面白い物を見かけたのですが、これっす。

Modsecurity (3.0) + Elasticsearch + Kibanaでログを可視化する | セキュリティブログ
WAF等に用いられるModSecurityのログですが、可視化するのは意外に面倒くさいです。ここでは、ModSecurity(3.0)のログをElasticsearchに入れKibanaで可視化する方法をまとめます。具体的にはmodsecurity-to-elasticsearchのPythonスクリプトを修正して利用...

そしてこの中で紹介されている以下のもの。

theMiddleBlue/modsecurity-to-elasticsearch
Very simple and primitive Python script that sends ModSecurity JSON Audit Logs to Elasticsearch - theMiddleBlue/modsecurity-to-elasticsearch

これらを活用して、Elasticsearchで見ることが出来ないかどうかの確認をしてみました。
ちなみにこのスクリプト、Python2.xで動作する感じになってるようです。

Mod_SecurityでJSON出力を可能にする

まず、Mod_Security環境ですが、私はNginx/Mod_Security共にソースからコンパイルした物を実装しています。その際、JSON出力を想定した作りにはしていなかったので、まずはそこを再構築する必要があります。

今回、あらかじめMod_Securityのソースディレクトリへ移動した上で、configure実行時に以下の引数を加えることでこれを可能にします。YAJLというライブラリを使うことで、JSON出力が可能になるようです。

yum -y install gcc pcre-devel libxml2-devel yaji-devel libcurl-devel yajl-devel
./configure --enable-mlogc --with-yajl
make
make install

その上で、modsecurity.confに以下の設定変更を行います。

SecAuditLogFormat JSON

私はそのほか、以下の設定を加えています

SecAuditEngine RelevantOnly
SecAuditLogRelevantStatus "^[0-9]+"
SecAuditLogType concurrent
SecAuditLog /var/log/mlog2waffle/modsec_audit.log
SecAuditLogStorageDir /var/log/mlog2waffle/data

元々WAF-FLEで可視化をしようとしていたので、その名残が生きていますが、上記の設定を行うことでSecAuditLogStorageDirで指定したパスへ、ディレクトリ構造が組まれ、分単位で検知データが保存されるようになります。

JSONパーサーを作る

と言っても、先述したGithubで公開されたソースを流用しています。そのままでは動いてくれなくて、それなりに改修要素が必要でした。

#!/usr/bin/env python
#
#

import sys, os, getopt, json, time
from datetime import datetime,date
from elasticsearch5 import Elasticsearch

# Please, check the elasticsearch URL below:
es = Elasticsearch(['http://elasticsearch:9200'])

# parse arguments
opts, args = getopt.getopt(sys.argv[1:],"hd:",["help","log-directory="])
for i in opts:
        if i[0] == "-d" or i[0] == "--log-directory":
                basedir = i[1]

# set headers name to lowercase
def renameKeys(iterable):
    if type(iterable) is dict:
        for key in iterable.keys():
            iterable[key.lower()] = iterable.pop(key)
            if type(iterable[key.lower()]) is dict or type(iterable[key.lower()]) is list:
                iterable[key.lower()] = renameKeys(iterable[key.lower()])
    elif type(iterable) is list:
        for item in iterable:
            item = renameKeys(item)
    return iterable

# parsing...
def parseLogFile(file):
        # define the index mapping
        settings = {
                "settings": {
                        "number_of_shards": 1,
                        "number_of_replicas": 0
                },
                "mappings": {
                        "modsecurity": {
                                "properties": {
                                        "unixts": {
                                                "type": "date"
                                        }
                                }
                        }
                }
        }

        # set all dict keys to lower
        #d = renameKeys(json.load(open(file,'r','utf-8','ignore')))
        try:
                fi = open(file,'rb')
                fir = fi.read()
                d = renameKeys(json.loads(fir))
                # create a unixts field as a timestamp field
                d['transaction']['unixts'] = int(d['transaction']['unique_id'][0:14].replace('.',''))
        except:
                print "Cannot Analyze or Write Log"
                #print (d)
                print (fir)
        else:
                # create 1 index per day... you could change it
                # if you need to store all logs in a single index:
                index = 'modsecurity_' + str(date.today()).replace('-','')

                # because objects in array are not well supported,
                # redefine all "messages" params and values in "msg"
                new_messages = []
                new_ruleid = []
                new_tags = []
                new_file = []
                new_linenumber = []
                new_data = []
                new_match = []
                new_severity = []

                d['transaction']['msg'] = {}

                for i in d['transaction']['messages']:
                        new_messages.append(i['message'])
                        new_ruleid.append(i['details']['ruleid'])

                        for tag in i['details']['tags']:
                                if tag not in new_tags:
                                        new_tags.append(tag)

                        new_file.append(i['details']['file'])
                        new_linenumber.append(i['details']['linenumber'])
                        new_data.append(i['details']['data'])
                        new_match.append(i['details']['match'])
                        new_severity.append(i['details']['severity'])

                d['transaction']['msg']['message'] = new_messages
                d['transaction']['msg']['ruleid'] = new_ruleid
                d['transaction']['msg']['tags'] = new_tags
                d['transaction']['msg']['file'] = new_file
                d['transaction']['msg']['linenumber'] = new_linenumber
                d['transaction']['msg']['data'] = new_data
                d['transaction']['msg']['match'] = new_match
                d['transaction']['msg']['severity'] = new_severity

                # remove old messages list
                del d['transaction']['messages']

                # if index exists noop, else create it with mapping
                if es.indices.exists(index):
                        indexexists=True
                else:
                        es.indices.create(index=index, ignore=400, body=settings)

                # write the log
                res = es.index(index=index, doc_type="modsecurity", body=d['transaction'])

                # check if log has been created
                if res['result'] == 'created':
                        os.remove(file)
                        print "Parsed "+str(file)
                else:
                        print "Warning: log not created:"
                        print res

while True:
        for root, subFolders, files in os.walk(basedir):
                for file in files:
                        logfile = os.path.join(root, file)
                        parseLogFile(file=logfile)

        print "Sleeping for a while..."
        time.sleep(5)

改修した箇所は以下の通りです。

  • 今回使ったElasticsearchのバージョンが5.6系でしたので、それ用のライブラリが必要です。「pip install elasticsearch5」にてインストールを行っています。
  • JSONをファイルから取り出す際の処理として、51行目以降ですが、例外処理を追加しています。
    これは、出力されたログの中にエンコード処理が失敗したケースがあり、それが原因で処理がスタックするためです。
    失敗した場合、一体どこが問題だったのかを追及できるよう、ファイル内のJSONデータをログに出力するよう構成しています。
  • SIOSのサイトに記載されている(引用URL1)内容に従い、56行目、115行目の記述を修正しています。

サービス化

このPythonスクリプトですが、AuditStorageパス内のファイルを自立的に探索し、ファイルが存在すれば読み出すという動きをします。つまりはこのスクリプト自体待機状態を保持できるスクリプトだと言うことで、Systemdを用いてサービス化します。

以下のようなファイルを/usr/lib/systemd/system/modsec-parser.services と言うファイルとして作成します。

[Unit]
Description=Mod Security Log Parser
After=syslog.target
After=network.target

[Service]
Environment="ADTLOG=/var/log/mlog2waffle/data"
Type=simple
#Restart=on-failure
PIDFile=/run/parser-process.pid
KillMode=control-group
ExecStart=/usr/local/sbin/modsec_parser_el5.py -d $ADTLOG
#ExecReload=/bin/kill -s HUP $MAINPID
ExecStop=/bin/kill -SIGTERM $MAINPID
RestartSec=10s

[Install]
WantedBy=multi-user.target

で、systemdへ反映、開始すれば、実行ログも/var/log/messagesへ送られるようになりますし便利です。

systemctl daemon-reload
systemctl start modsec-parser.services

-----/var/log/messages を見るとこんなかんじっす
Oct 12 14:02:06 Anisakis modsec_parser_el5.py: Parsed /var/log/mlog2waffle/data/20191012/20191012-1401/20191012-140155-157085651573.800068
Oct 12 14:02:06 Anisakis modsec_parser_el5.py: Parsed /var/log/mlog2waffle/data/20191012/20191012-1401/20191012-140155-157085651557.343314
Oct 12 14:02:06 Anisakis modsec_parser_el5.py: Sleeping for a while...
Oct 12 14:02:06 Anisakis modsec_parser_el5.py: Parsed /var/log/mlog2waffle/data/20191012/20191012-1401/20191012-140158-157085651840.214759
Oct 12 14:02:06 Anisakis modsec_parser_el5.py: Sleeping for a while...
Oct 12 14:02:06 Anisakis modsec_parser_el5.py: Parsed /var/log/mlog2waffle/data/20191012/20191012-1402/20191012-140203-157085652398.515713

これでElasticsearchへ送られるはず!ということで、Kibanaで状態を確認します。

無事ログが取得でけたの図

そしてVisualizeやらDashboardやら

後は可視化しやすいと考えられるグラフやテーブルなどを使用して、アクセス統計であったり検知数であったりの細かいパーツの追加等を行っていきます。

左は自分的に必要と考えられるデータをDashboardに並べた物になります。左上から、「期間中のアクセス数」「アクセス先となるVirtual Server」「アクセス数推移」「検知ルールの件数グラフ」「検知されたルール一覧と件数」「アクセス元IP」となっています。

よく検知してたルールを読み解いてみる

今回、特に目立って検知が多かったのはOWASP 3.0の920420と言うルールでした。
これは「Request content type is not allowed by policy」というContent Typeのヘッダ設定がポリシー違反を起こしたことを示すルールに該当します。何故これが検知されたのかを読み解いてみることにしました。

Mod_Security上のmsg.matchを確認すると、こんな記述がありました。

Matched "Operator `Rx' with parameter
 `^application/x-www-form-urlencoded|
   multipart/form-data|
   text/xml|
   application/xml|
   application/soap+xml|
   application/x-amf|
   application/json|
   application/octet-stream|
   application/csp-report|
   application/xss- (26 characters omitted)'
  against variable `TX:0' (Value: `application/activity+json' )

上記、ずらずら羅列されているのは、Mod_Securityが想定しているContents-Type値であり、どうやら末尾にある「application/activity+json」という値が想定外であることを示しているようです。実はこのアクセス先はActivity Pubと言うソフトウェアを用いて作成されたSNSサイトを指していて、つまりはMod_SecurityはActivity Pubのことをどうやら知らなかったようです。(あ、ちなみにMisskeyというSNSツールが動いてます)

そのため、「なんだか知らないContent-Typeをリクエストしてきやがったぞ!」とMod_Securityは判断してRuleId920420で引っかけてしまったようです。これ自体は全くもってActivity Pubで正当なアクセスであり、920420は誤検知であると判断して良さそうなので、一旦ルールを適用対象から除外することにしました。

その後、DetectOnlyなルールをOnへ切り替え、現在正しい形でMod_Securityは動作することが出来ているように見えます。

可視化は是非積極的に行おう

こうしたセキュリティツールですが、組み立てるまでは行った後、結構おざなりになるケースが多く、自宅だとなおさら・・・・ってな感じによくなります。元々私自身セキュリティみたいに、どっから来るか分からない敵を相手にするのはどうも苦手でして・・・発想力も足りないですからねハッハッハ・・・(・∀・)

でも、可視化してみると結構動作の流れなどがつかみやすく、応用に向けた思考も働かせやすくなります。そして何より楽しいっすよ。連携などが絡むとなおさらでありまして、現在とっても便利なElasticsearchってツールもあるわけでして。

また、JSONの扱いを通じて、今をときめくKey-Valueストアの扱いも学ぶことが出来ます。これもこれでかなり色々役に立ちますので、是非一度深掘り勉強してみることをおすすめしておきます。

その他、この仕組みを作るにあたって、複数のツール連携やOS側のSystemdとの連携など、かなりいろんな物を組み合わせて作っており、その殆どをリファレンスやヘルプ情報をその時に探し出して組んだりしてますが、ノウハウも記憶力もない私でもこうして組める感じですので、着実にステップを踏んで作っていくと、恐らくは誰もが対応できるのではないかななんて。こうした工夫のノウハウというのは、今後のエンジニアの必須スキルという風になるかもしれないので、そうした意味でも早めの視野拡張をお勧めします。

タイトルとURLをコピーしました