mattintosh note

技術書典に出たい人生だった…

DNS サーバのログを Elasticsearch と Kibana で可視化する

自宅で DNS サーバに Unbound を使っているのだけど立ててるだけで特に監視していないので何か遊んでみようと考えた。

久しぶりに Fluentd を使おうと思ったらバージョンが変わっていて conf の書式にハマった。

  • Unbound 1.9.0
  • fluentd 1.4.0
  • Python 3.5.3
  • Elasticsearch 6.5.2
  • Kibana 6.5.2

こんな感じで必要なものを配置していく。

fluent/
|
+-- fluent.conf
|
+-- geoip_unbound.py
|
+-- GeoLite2-City.mmdb

Fluentd tail Input Plugin で Unbound のログを解析する

まずは Unbound のログの解析。ログファイルから tail プラグインを使って読み込む。

Unbound のログは log-queries を有効にしておいて infoqueryreply の区別が付くようにしておく。log-time-ascii: yes になっていると年が入らないので log-time-ascii: no(デフォルト)でタイムスタンプとして出力されるようにしておく。

unbound.conf

log-time-ascii: no
log-queries: yes
log-tag-queryreply: yes

Unbound ログサンプル

[1551678596] unbound[12266:0] query: 127.0.0.1 yahoo.co.jp. A IN
[1551678596] unbound[12266:0] info: resolving yahoo.co.jp. A IN
[1551678596] unbound[12266:0] query: 127.0.0.1 yahoo.co.jp. AAAA IN
[1551678596] unbound[12266:0] info: resolving yahoo.co.jp. AAAA IN
[1551678596] unbound[12266:0] info: response for yahoo.co.jp. A IN
[1551678596] unbound[12266:0] info: reply from <.> 8.8.8.8#53
[1551678596] unbound[12266:0] info: query response was ANSWER
[1551678596] unbound[12266:0] info: response for yahoo.co.jp. AAAA IN
[1551678596] unbound[12266:0] info: reply from <.> 8.8.8.8#53
[1551678596] unbound[12266:0] info: query response was nodata ANSWER
[1551678596] unbound[12266:0] query: 127.0.0.1 206.135.79.183.in-addr.arpa. PTR IN
[1551678596] unbound[12266:0] info: resolving 206.135.79.183.in-addr.arpa. PTR IN
[1551678596] unbound[12266:0] info: response for 206.135.79.183.in-addr.arpa. PTR IN
[1551678596] unbound[12266:0] info: reply from <.> 8.8.4.4#53
[1551678596] unbound[12266:0] info: query response was ANSWER

Unbound のログの内、解析対象はとりあえずクエリの A レコードと PTR レコードとする。ログのすべてが解析対象にならないため、パターンに一致しない行がワーニングとして [warn]: pattern not match: とダラダラと出てきてしまう。これは @log_levelerror 以上を指定すれば出てこなくなる。expression のテストが終わったらログレベルを上げておく。

fluent.conf

<source>
  @type       tail
  @log_level  error
  tag         unbound.log
  path        /var/log/unbound/unbound.log
  pos_file    /tmp/fluentd_unbount.log.pos
  <parse>
    @type       regexp
    expression  ^\[(?<time>\d+)\] unbound\[\S\] query: (?<client>\S+) (?<host>\S+)\. (?<record_type>(A|PTR)) IN$
    time_key    time
    time_type   unixtime
  </parse>
</source>

# 出力確認用
<filter unbound.log>
  @type stdout
</filter>

🤔 @timestamp フィールドを作るか作らないか

Elasticsearch プラグインlogstash_format を有効にする場合、プラグイン側で自動的に @timestamp フィールドを作成してくれるのでここで用意しておく必要はない。外部フィルターで時間に対して何かしらの処理をしたい場合や logstash_format を使わない場合はここで @timestamp フィールドを作成しておくといいかもしれない。

🤔 時間フィールドが消えてしまう問題

time_key で時間として指定したフィールドはデフォルトだと消えてしまうので残しておきたい場合は keep_time_key を有効にする。

🤔 数値が数値型ではなく文字列型になってしまう問題

キーの型を指定したい場合は typesキー名:型 と指定する必要があるようだ。ここでは数値型にしたいので @timestamp:integer とする。

tail Input Plugin の出力サンプル

2019-03-03 21:02:31.000000000 +0900 unbound.log: {"client":"192.168.1.10","host":"ssl.gstatic.com", "record_type": "A"}

exec_filter でホスト名から国や緯度経度情報を取得するフィルターを作る

以前は GeoIP モジュールを使っていたけど今回は GeoLiteCity.dat ではなく GeoLite2-City.mmdb を使いたかったので maxminddb モジュールを使うことにした。GeoIP だと record_by_name() でホスト名から情報を拾えるんだけど maxminddbget() で IP アドレスを渡すくらいしかできないので socket.gethostbyname() を通して IP アドレスを渡している。

最新の GeoLite2-City.mmdb は以下からダウンロードできる。

このスクリプトには重大な欠点がありましたが、maxminddb モジュールの使い方メモとして残しています。DNS レコードタイプ別の設定をする以前のものですのでその辺りも異なります。

my_geoip.py (ボツ)

#!/usr/bin/env python3
import sys
import json
import socket
import maxminddb
reader = maxminddb.open_database('GeoLite2-City.mmdb')
while True:
    line = sys.stdin.readline()
    d = json.loads(line)
    g = reader.get(socket.gethostbyname(d['host']))
    d.update({
        'country': {
            'iso_code': g['country']['iso_code'],
        },
        'location': {
            'lat': g['location']['latitude'],
            'lon': g['location']['longitude'],
        },
    })
    sys.stdout.write(json.dumps(d))

🤐 重大な欠点があったため作り直した

socket.gethostbyname()DNS サーバに問い合わせたらそれも Unbound のログに出力されて無限ループじゃんアホかい!」ってなったので作り直した。使うモジュールも maxminddb じゃなくて geoip2 に変更した。

geoip2 も引数にアドレスを指定しなければいけないのは変わらないのでホスト名からなんとかアドレスを得る。dnspython というモジュールを使うと問い合わせ先の DNS サーバを指定できるとのことだったのでこれを使えば Unbound を経由させずに済む。(ただし無駄は多い)

geoip2 のデータは geoip2.models.City というオブジェクトで返ってくるんだけど、raw というプロパティを持っているのでそこを引っこ抜けば JSON に変換できるものが全部出てくる。

geoip2 の使い方例

>>> from geoip2.database
>>> from pprint import pprint
>>> reader = geoip2.database.Reader('GeoLite2-City.mmdb')
>>> response = reader('54.70.157.111')
>>> pprint(response.raw)
{'city': {'geoname_id': 5714964, 'names': {'en': 'Boardman', 'ru': 'Бордман'}},
 'continent': {'code': 'NA',
               'geoname_id': 6255149,
               'names': {'de': 'Nordamerika',
                         'en': 'North America',
                         'es': 'Norteamérica',
                         'fr': 'Amérique du Nord',
                         'ja': '北アメリカ',
                         'pt-BR': 'América do Norte',
                         'ru': 'Северная Америка',
                         'zh-CN': '北美洲'}},
 'country': {'geoname_id': 6252001,
             'iso_code': 'US',
             'names': {'de': 'USA',
                       'en': 'United States',
                       'es': 'Estados Unidos',
                       'fr': 'États-Unis',
                       'ja': 'アメリカ合衆国',
                       'pt-BR': 'Estados Unidos',
                       'ru': 'США',
                       'zh-CN': '美国'}},
 'location': {'accuracy_radius': 1000,
              'latitude': 45.8491,
              'longitude': -119.7143,
              'metro_code': 810,
              'time_zone': 'America/Los_Angeles'},
 'postal': {'code': '97818'},
 'registered_country': {'geoname_id': 6252001,
                        'iso_code': 'US',
                        'names': {'de': 'USA',
                                  'en': 'United States',
                                  'es': 'Estados Unidos',
                                  'fr': 'États-Unis',
                                  'ja': 'アメリカ合衆国',
                                  'pt-BR': 'Estados Unidos',
                                  'ru': 'США',
                                  'zh-CN': '美国'}},
 'subdivisions': [{'geoname_id': 5744337,
                   'iso_code': 'OR',
                   'names': {'de': 'Oregon',
                             'en': 'Oregon',
                             'es': 'Oregón',
                             'fr': 'Oregon',
                             'ja': 'オレゴン州',
                             'pt-BR': 'Oregão',
                             'ru': 'Орегон',
                             'zh-CN': '俄勒冈州'}}],
 'traits': {'ip_address': '54.70.157.111'}}

geoip2 で得られる位置情報は location.latitudelocation.longitude となっているのでこれを location.latlocation.lon に変更する必要がある。また、Elasticsearch で geo_type に指定したフィールドは latlon 以外のフィールドを入れるとエラーになってしまうようなので time_zone などのフィールドを削除するか、geo_point 用のフィールドを別に作成する必要がある。

今回はとりあえず raw から location を引っこ抜いて latitudelongitude だけ戻すことにした。

my_geoip.py

#!/usr/bin/env python3
import sys
import json

import geoip2.database
reader = geoip2.database.Reader('GeoLite2-City.mmdb')

from dns import resolver
resolv = resolver.Resolver()
resolv.nameservers = ['8.8.8.8']

while True:
    line = sys.stdin.readline()
    if len(line) < 1:
        continue
    source = json.loads(line)
    if None:
        pass
    elif source['record_type'] == 'A':
        ip = [x.address for x in resolv.query(source['host'])][0]
    elif source['record_type'] == 'PTR':
        ip = '.'.join(reversed(source['host'].split('.')[0:4]))

    geo      = reader.city(ip)
    geo_raw  = geo.raw
    location = geo_raw.pop('location')
    geo_raw.update({
        'location': {
            'lat': location['latitude'],
            'lon': location['longitude'],
        }
    })
    source.update(geo_raw)
    sys.stdout.write(json.dumps(source))

GeoIP フィルターとやりとりする部分を書いていく。バッファはメモリに配置。<format></format><parse></parse> が以前のバージョンになかったので悩んだ。chunk_limit_records は控えめに 100 くらいからスタート。

Elasticsearch プラグインlogstash_format を使う場合、外部フィルターから返ってきた JSON から再度時間を抽出する必要がある。

fluent.conf

<match unbound.log>
  @type   exec_filter
  tag     exec.unbound
  command ./geoip_unbound.py
  child_respawn inf
  <format>
    @type json
  </format>
  <parse>
    @type json
  </parse>
  <buffer>
    @type memory
    retry_max_times 0
    chunk_limit_records 100
  </buffer>
</match>

# 出力確認用
<filter exec.unbound>
  @type stdout
</filter>

🤔 何かデータ入ってこないけど、もしかして Python 死んでない?

暫く Fluentd を動かしているとリトライが発生し、それが発生するとその後 Elasticsearch にまったくデータが入らなくなるということがあった。最初は Elasticsearch のパフォーマンスの問題かと思い Fluentd を都度再起動していたが、実は外部コマンドの Python でエラーが発生し、しかも異常終了や再起動せずに Fluentd がひたすらリトライをするためだった。(ps コマンドで見ても Python が生きているように見えてしまう)

child_respawn inf を設定しておけばプロセスが終了してしまった場合に立ち上げなおしてくれる。一度止まった場合、今回は再試行する意味がないので retry_max_times 0 で即時再起動するようにしている。

Respawn command when command exit. Default is disabled. If you specify a positive number, try to respawn until specified times. If you specify inf or -1, try to respawn forever.
一部のレコードでエラーが発生しても動き続けてしまうのでしっかりデバッグを行ってから設定の有無を決めた方が良い。

Elasticsearch のマッピングをする

Fluentd から Logstash 形式でデータを投入する場合、日付ベースで自動的にインデックスが作成されるが、マッピングをしたい場合は template_file などを使うらしい。今回は緯度経度情報を扱うため、位置情報フィールドに対して geo_point 型の指定が必須になる。

Fluentd の Elasticsearch プラグインの仕様が変わると面倒だったので、Elasticsearch 側にテンプレートを用意して自動的に適用されるようにしておくことにした。データ量が多くなることに備えて refresh_interval をデフォルトの 1s から 30s に変更。string 型のデータは解析する必要もないので text ではなく keyword で入るようにしておく。_allfield_names のフィールドは使わないので無効にしている。

@timestampepoch_second を使う場合はここで設定しておけばよい。

Kibana - DevTool

PUT _template/unbound
{
  "index_patterns": "unbound-*",
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0,
    "refresh_interval": "30s"
  },
  "mappings": {
    "_doc": {
      "_all": {
        "enabled": false
      },
      "_field_names": {
        "enabled": false
      },
      "dynamic_templates": [
        {
          "strings": {
            "match_mapping_type": "string",
            "mapping": {
              "type": "keyword"
            }
          }
        },
        {
          "geo_point": {
            "match": "location",
            "mapping": {
              "type": "geo_point"
            }
          }
        }
      ]
    }
  }
}

Fluentd から Elasticsearch にデータを投入する部分を書く

マシンが非力なので request_timeout をデフォルトの 5s から 30s に変更。type_name を指定しないと fluentd という名前になってしまうので type_name _doc を指定しておく。今回は logstash_format を有効にするのでレコードの @timestamp フィールドは自動的に作成される。

fluent.conf

<match exec.unbound>
  @type           elasticsearch
  hosts           localhost:9200
  type_name       _doc
  logstash_format true
  logstash_prefix unbound
  request_timeout 30s
  <buffer>
    flush_thread_count 2
  </buffer>
</match>

Elasticsearch に投入されたデータサンプル

{
  "_index": "unbound-2019.03.04",
  "_type": "_doc",
  "_id": "JrzHR2kBVp0AiN9YlA3_",
  "_score": 1,
  "_source": {
    "city": {
      "geoname_id": 5714964,
      "names": {
        "en": "Boardman",
        "ru": "Бордман"
      }
    },
    "country": {
      "geoname_id": 6252001,
      "iso_code": "US",
      "names": {
        "en": "United States",
        "de": "USA",
        "es": "Estados Unidos",
        "ja": "アメリカ合衆国",
        "ru": "США",
        "pt-BR": "Estados Unidos",
        "zh-CN": "美国",
        "fr": "États-Unis"
      }
    },
    "continent": {
      "geoname_id": 6255149,
      "code": "NA",
      "names": {
        "en": "North America",
        "de": "Nordamerika",
        "es": "Norteamérica",
        "ja": "北アメリカ",
        "ru": "Северная Америка",
        "pt-BR": "América do Norte",
        "zh-CN": "北美洲",
        "fr": "Amérique du Nord"
      }
    },
    "subdivisions": [
      {
        "geoname_id": 5744337,
        "iso_code": "OR",
        "names": {
          "en": "Oregon",
          "de": "Oregon",
          "es": "Oregón",
          "ja": "オレゴン州",
          "ru": "Орегон",
          "pt-BR": "Oregão",
          "zh-CN": "俄勒冈州",
          "fr": "Oregon"
        }
      }
    ],
    "record_type": "A",
    "host": "elastic.co",
    "location": {
      "lon": -119.7143,
      "lat": 45.8491
    },
    "registered_country": {
      "geoname_id": 6252001,
      "iso_code": "US",
      "names": {
        "en": "United States",
        "de": "USA",
        "es": "Estados Unidos",
        "ja": "アメリカ合衆国",
        "ru": "США",
        "pt-BR": "Estados Unidos",
        "zh-CN": "美国",
        "fr": "États-Unis"
      }
    },
    "traits": {
      "ip_address": "54.70.157.111"
    },
    "client": "127.0.0.1",
    "postal": {
      "code": "97818"
    },
    "@timestamp": "2019-03-04T17:17:19.731231232+09:00"
  },
  "fields": {
    "@timestamp": [
      "2019-03-04T08:17:19.731Z"
    ]
  }
}

Kibana でダッシュボードを作る

Fluentd から送られてきたデータを Coordinate Map、Heat Map、Line で可視化する。

こうして見てみるとただブラウザで調べごとをしていたりするだけでも案外いろんな国にまで行っているのだなぁと感じる。

f:id:mattintosh4:20190304190438p:plain
Kibana - Dashboard

しばらく監視してみておかしなサイトに繋ぎに行ってないかとか発見出来れば面白いかな。

プロキシサーバのログも解析したいけど今日はもう疲れたのでまた今度にする…( ˘ω˘)スヤァ


以下後日談

ホストだと分割されすぎるので tldextract でドメイン名だけを抜き出す

Google だけでも

とか色々あるのでサブドメインを取り除いたドメイン名だけ欲しくなった。Stack Overflow を見ると「. でスプリットして後ろから2つ取ればいいのでは?」みたいな回答多かったけどそれじゃ .co.jp とか困るやろ、ってなったのでちゃんと解析してくれるモジュールを探したら tldextract っていうのがあった。

github.com

Console

pip3 install tldextract

属性はこんなものがある様子。欲しいのは registered_domain

count
domain
fqdn
index
ipv4
registered_domain
subdomain
suffix

Python3

import tldextract
rd = tldextract.extract('www.yahoo.co.jp').registered_domain

Return

'yahoo.co.jp'

GJ