自宅で DNS サーバに Unbound を使っているのだけど立ててるだけで特に監視していないので何か遊んでみようと考えた。
久しぶりに Fluentd を使おうと思ったらバージョンが変わっていて conf の書式にハマった。
- Unbound 1.9.0
- fluentd 1.4.0
- Python 3.5.3
- Elasticsearch 6.5.2
- Kibana 6.5.2
こんな感じで必要なものを配置していく。
fluent/ | +-- fluent.conf | +-- geoip_unbound.py | +-- GeoLite2-City.mmdb
Fluentd tail Input Plugin で Unbound のログを解析する
まずは Unbound のログの解析。ログファイルから tail プラグインを使って読み込む。
Unbound のログは log-queries
を有効にしておいて info
、query
、reply
の区別が付くようにしておく。log-time-ascii: yes
になっていると年が入らないので log-time-ascii: no
(デフォルト)でタイムスタンプとして出力されるようにしておく。
log-time-ascii: no log-queries: yes log-tag-queryreply: yes
[1551678596] unbound[12266:0] query: 127.0.0.1 yahoo.co.jp. A IN [1551678596] unbound[12266:0] info: resolving yahoo.co.jp. A IN [1551678596] unbound[12266:0] query: 127.0.0.1 yahoo.co.jp. AAAA IN [1551678596] unbound[12266:0] info: resolving yahoo.co.jp. AAAA IN [1551678596] unbound[12266:0] info: response for yahoo.co.jp. A IN [1551678596] unbound[12266:0] info: reply from <.> 8.8.8.8#53 [1551678596] unbound[12266:0] info: query response was ANSWER [1551678596] unbound[12266:0] info: response for yahoo.co.jp. AAAA IN [1551678596] unbound[12266:0] info: reply from <.> 8.8.8.8#53 [1551678596] unbound[12266:0] info: query response was nodata ANSWER [1551678596] unbound[12266:0] query: 127.0.0.1 206.135.79.183.in-addr.arpa. PTR IN [1551678596] unbound[12266:0] info: resolving 206.135.79.183.in-addr.arpa. PTR IN [1551678596] unbound[12266:0] info: response for 206.135.79.183.in-addr.arpa. PTR IN [1551678596] unbound[12266:0] info: reply from <.> 8.8.4.4#53 [1551678596] unbound[12266:0] info: query response was ANSWER
Unbound のログの内、解析対象はとりあえずクエリの A レコードと PTR レコードとする。ログのすべてが解析対象にならないため、パターンに一致しない行がワーニングとして [warn]: pattern not match:
とダラダラと出てきてしまう。これは @log_level
で error
以上を指定すれば出てこなくなる。expression
のテストが終わったらログレベルを上げておく。
<source>
@type tail
@log_level error
tag unbound.log
path /var/log/unbound/unbound.log
pos_file /tmp/fluentd_unbount.log.pos
<parse>
@type regexp
expression ^\[(?<time>\d+)\] unbound\[\S\] query: (?<client>\S+) (?<host>\S+)\. (?<record_type>(A|PTR)) IN$
time_key time
time_type unixtime
</parse>
</source>
# 出力確認用
<filter unbound.log>
@type stdout
</filter>
🤔 @timestamp フィールドを作るか作らないか
Elasticsearch プラグインで logstash_format
を有効にする場合、プラグイン側で自動的に @timestamp
フィールドを作成してくれるのでここで用意しておく必要はない。外部フィルターで時間に対して何かしらの処理をしたい場合や logstash_format
を使わない場合はここで @timestamp
フィールドを作成しておくといいかもしれない。
🤔 時間フィールドが消えてしまう問題
time_key
で時間として指定したフィールドはデフォルトだと消えてしまうので残しておきたい場合は keep_time_key
を有効にする。
🤔 数値が数値型ではなく文字列型になってしまう問題
キーの型を指定したい場合は types
で キー名:型
と指定する必要があるようだ。ここでは数値型にしたいので @timestamp:integer
とする。
2019-03-03 21:02:31.000000000 +0900 unbound.log: {"client":"192.168.1.10","host":"ssl.gstatic.com", "record_type": "A"}
exec_filter でホスト名から国や緯度経度情報を取得するフィルターを作る
以前は GeoIP
モジュールを使っていたけど今回は GeoLiteCity.dat ではなく GeoLite2-City.mmdb を使いたかったので maxminddb
モジュールを使うことにした。GeoIP
だと record_by_name()
でホスト名から情報を拾えるんだけど maxminddb
は get()
で IP アドレスを渡すくらいしかできないので socket.gethostbyname()
を通して IP アドレスを渡している。
最新の GeoLite2-City.mmdb は以下からダウンロードできる。
maxminddb
モジュールの使い方メモとして残しています。DNS レコードタイプ別の設定をする以前のものですのでその辺りも異なります。#!/usr/bin/env python3 import sys import json import socket import maxminddb reader = maxminddb.open_database('GeoLite2-City.mmdb') while True: line = sys.stdin.readline() d = json.loads(line) g = reader.get(socket.gethostbyname(d['host'])) d.update({ 'country': { 'iso_code': g['country']['iso_code'], }, 'location': { 'lat': g['location']['latitude'], 'lon': g['location']['longitude'], }, }) sys.stdout.write(json.dumps(d))
🤐 重大な欠点があったため作り直した
「socket.gethostbyname()
で DNS サーバに問い合わせたらそれも Unbound のログに出力されて無限ループじゃんアホかい!」ってなったので作り直した。使うモジュールも maxminddb
じゃなくて geoip2
に変更した。
geoip2
も引数にアドレスを指定しなければいけないのは変わらないのでホスト名からなんとかアドレスを得る。dnspython
というモジュールを使うと問い合わせ先の DNS サーバを指定できるとのことだったのでこれを使えば Unbound を経由させずに済む。(ただし無駄は多い)
geoip2
のデータは geoip2.models.City
というオブジェクトで返ってくるんだけど、raw
というプロパティを持っているのでそこを引っこ抜けば JSON に変換できるものが全部出てくる。
>>> from geoip2.database >>> from pprint import pprint >>> reader = geoip2.database.Reader('GeoLite2-City.mmdb') >>> response = reader('54.70.157.111') >>> pprint(response.raw) {'city': {'geoname_id': 5714964, 'names': {'en': 'Boardman', 'ru': 'Бордман'}}, 'continent': {'code': 'NA', 'geoname_id': 6255149, 'names': {'de': 'Nordamerika', 'en': 'North America', 'es': 'Norteamérica', 'fr': 'Amérique du Nord', 'ja': '北アメリカ', 'pt-BR': 'América do Norte', 'ru': 'Северная Америка', 'zh-CN': '北美洲'}}, 'country': {'geoname_id': 6252001, 'iso_code': 'US', 'names': {'de': 'USA', 'en': 'United States', 'es': 'Estados Unidos', 'fr': 'États-Unis', 'ja': 'アメリカ合衆国', 'pt-BR': 'Estados Unidos', 'ru': 'США', 'zh-CN': '美国'}}, 'location': {'accuracy_radius': 1000, 'latitude': 45.8491, 'longitude': -119.7143, 'metro_code': 810, 'time_zone': 'America/Los_Angeles'}, 'postal': {'code': '97818'}, 'registered_country': {'geoname_id': 6252001, 'iso_code': 'US', 'names': {'de': 'USA', 'en': 'United States', 'es': 'Estados Unidos', 'fr': 'États-Unis', 'ja': 'アメリカ合衆国', 'pt-BR': 'Estados Unidos', 'ru': 'США', 'zh-CN': '美国'}}, 'subdivisions': [{'geoname_id': 5744337, 'iso_code': 'OR', 'names': {'de': 'Oregon', 'en': 'Oregon', 'es': 'Oregón', 'fr': 'Oregon', 'ja': 'オレゴン州', 'pt-BR': 'Oregão', 'ru': 'Орегон', 'zh-CN': '俄勒冈州'}}], 'traits': {'ip_address': '54.70.157.111'}}
geoip2
で得られる位置情報は location.latitude
、location.longitude
となっているのでこれを location.lat
と location.lon
に変更する必要がある。また、Elasticsearch で geo_type
に指定したフィールドは lat
、lon
以外のフィールドを入れるとエラーになってしまうようなので time_zone
などのフィールドを削除するか、geo_point
用のフィールドを別に作成する必要がある。
今回はとりあえず raw
から location
を引っこ抜いて latitude
と longitude
だけ戻すことにした。
#!/usr/bin/env python3 import sys import json import geoip2.database reader = geoip2.database.Reader('GeoLite2-City.mmdb') from dns import resolver resolv = resolver.Resolver() resolv.nameservers = ['8.8.8.8'] while True: line = sys.stdin.readline() if len(line) < 1: continue source = json.loads(line) if None: pass elif source['record_type'] == 'A': ip = [x.address for x in resolv.query(source['host'])][0] elif source['record_type'] == 'PTR': ip = '.'.join(reversed(source['host'].split('.')[0:4])) geo = reader.city(ip) geo_raw = geo.raw location = geo_raw.pop('location') geo_raw.update({ 'location': { 'lat': location['latitude'], 'lon': location['longitude'], } }) source.update(geo_raw) sys.stdout.write(json.dumps(source))
GeoIP フィルターとやりとりする部分を書いていく。バッファはメモリに配置。<format></format>
や <parse></parse>
が以前のバージョンになかったので悩んだ。chunk_limit_records
は控えめに 100
くらいからスタート。
- https://docs.fluentd.org/v1.0/articles/format-section
- https://docs.fluentd.org/v1.0/articles/parse-section
Elasticsearch プラグインで logstash_format
を使う場合、外部フィルターから返ってきた JSON から再度時間を抽出する必要がある。
<match unbound.log>
@type exec_filter
tag exec.unbound
command ./geoip_unbound.py
child_respawn inf
<format>
@type json
</format>
<parse>
@type json
</parse>
<buffer>
@type memory
retry_max_times 0
chunk_limit_records 100
</buffer>
</match>
# 出力確認用
<filter exec.unbound>
@type stdout
</filter>
🤔 何かデータ入ってこないけど、もしかして Python 死んでない?
暫く Fluentd を動かしているとリトライが発生し、それが発生するとその後 Elasticsearch にまったくデータが入らなくなるということがあった。最初は Elasticsearch のパフォーマンスの問題かと思い Fluentd を都度再起動していたが、実は外部コマンドの Python でエラーが発生し、しかも異常終了や再起動せずに Fluentd がひたすらリトライをするためだった。(ps
コマンドで見ても Python が生きているように見えてしまう)
child_respawn inf
を設定しておけばプロセスが終了してしまった場合に立ち上げなおしてくれる。一度止まった場合、今回は再試行する意味がないので retry_max_times 0
で即時再起動するようにしている。
Respawn command when command exit. Default is disabled. If you specify a positive number, try to respawn until specified times. If you specifyinf
or-1
, try to respawn forever.
Elasticsearch のマッピングをする
Fluentd から Logstash 形式でデータを投入する場合、日付ベースで自動的にインデックスが作成されるが、マッピングをしたい場合は template_file
などを使うらしい。今回は緯度経度情報を扱うため、位置情報フィールドに対して geo_point
型の指定が必須になる。
Fluentd の Elasticsearch プラグインの仕様が変わると面倒だったので、Elasticsearch 側にテンプレートを用意して自動的に適用されるようにしておくことにした。データ量が多くなることに備えて refresh_interval
をデフォルトの 1s
から 30s
に変更。string
型のデータは解析する必要もないので text
ではなく keyword
で入るようにしておく。_all
や field_names
のフィールドは使わないので無効にしている。
@timestamp
に epoch_second
を使う場合はここで設定しておけばよい。
PUT _template/unbound { "index_patterns": "unbound-*", "settings": { "number_of_shards": 1, "number_of_replicas": 0, "refresh_interval": "30s" }, "mappings": { "_doc": { "_all": { "enabled": false }, "_field_names": { "enabled": false }, "dynamic_templates": [ { "strings": { "match_mapping_type": "string", "mapping": { "type": "keyword" } } }, { "geo_point": { "match": "location", "mapping": { "type": "geo_point" } } } ] } } }
Fluentd から Elasticsearch にデータを投入する部分を書く
マシンが非力なので request_timeout
をデフォルトの 5s
から 30s
に変更。type_name
を指定しないと fluentd
という名前になってしまうので type_name _doc
を指定しておく。今回は logstash_format
を有効にするのでレコードの @timestamp
フィールドは自動的に作成される。
<match exec.unbound> @type elasticsearch hosts localhost:9200 type_name _doc logstash_format true logstash_prefix unbound request_timeout 30s <buffer> flush_thread_count 2 </buffer> </match>
{ "_index": "unbound-2019.03.04", "_type": "_doc", "_id": "JrzHR2kBVp0AiN9YlA3_", "_score": 1, "_source": { "city": { "geoname_id": 5714964, "names": { "en": "Boardman", "ru": "Бордман" } }, "country": { "geoname_id": 6252001, "iso_code": "US", "names": { "en": "United States", "de": "USA", "es": "Estados Unidos", "ja": "アメリカ合衆国", "ru": "США", "pt-BR": "Estados Unidos", "zh-CN": "美国", "fr": "États-Unis" } }, "continent": { "geoname_id": 6255149, "code": "NA", "names": { "en": "North America", "de": "Nordamerika", "es": "Norteamérica", "ja": "北アメリカ", "ru": "Северная Америка", "pt-BR": "América do Norte", "zh-CN": "北美洲", "fr": "Amérique du Nord" } }, "subdivisions": [ { "geoname_id": 5744337, "iso_code": "OR", "names": { "en": "Oregon", "de": "Oregon", "es": "Oregón", "ja": "オレゴン州", "ru": "Орегон", "pt-BR": "Oregão", "zh-CN": "俄勒冈州", "fr": "Oregon" } } ], "record_type": "A", "host": "elastic.co", "location": { "lon": -119.7143, "lat": 45.8491 }, "registered_country": { "geoname_id": 6252001, "iso_code": "US", "names": { "en": "United States", "de": "USA", "es": "Estados Unidos", "ja": "アメリカ合衆国", "ru": "США", "pt-BR": "Estados Unidos", "zh-CN": "美国", "fr": "États-Unis" } }, "traits": { "ip_address": "54.70.157.111" }, "client": "127.0.0.1", "postal": { "code": "97818" }, "@timestamp": "2019-03-04T17:17:19.731231232+09:00" }, "fields": { "@timestamp": [ "2019-03-04T08:17:19.731Z" ] } }
Kibana でダッシュボードを作る
Fluentd から送られてきたデータを Coordinate Map、Heat Map、Line で可視化する。
こうして見てみるとただブラウザで調べごとをしていたりするだけでも案外いろんな国にまで行っているのだなぁと感じる。
しばらく監視してみておかしなサイトに繋ぎに行ってないかとか発見出来れば面白いかな。
プロキシサーバのログも解析したいけど今日はもう疲れたのでまた今度にする…( ˘ω˘)スヤァ
以下後日談
ホストだと分割されすぎるので tldextract でドメイン名だけを抜き出す
Google だけでも
とか色々あるのでサブドメインを取り除いたドメイン名だけ欲しくなった。Stack Overflow を見ると「.
でスプリットして後ろから2つ取ればいいのでは?」みたいな回答多かったけどそれじゃ .co.jp
とか困るやろ、ってなったのでちゃんと解析してくれるモジュールを探したら tldextract っていうのがあった。
pip3 install tldextract
属性はこんなものがある様子。欲しいのは registered_domain
。
count domain fqdn index ipv4 registered_domain subdomain suffix
import tldextract rd = tldextract.extract('www.yahoo.co.jp').registered_domain
'yahoo.co.jp'
GJ