mattintosh note

技術書典に出たい人生だった…

Elasticsearch を使って VTuber のデイリーレポートを作ったのでちょっとまとめておく

最近 VTuber にハマりつつある筆者です。

会社とかでたまに VTuber の話が出ることがあるんですが、だいたい「VTuber ってどれくらいいるの?」みたいに聞かれるので「これ見ればいいよ」的なものがあったらなぁと思って気がついたらウェブサイト作ってました。最初は単に YouTube Data API を試したくて Elasticsearch にデータを入れてただけなのにどうしてこうなった…。

vtubers.ga

f:id:mattintosh4:20190317214912p:plain
vtubers.ga

構成は前回の電子書籍ランキングと同じ AWS CloudFront + S3 + Route53 です。Elasticsearch は Raspberry Pi で動かしているため非力で耐えられないので予めクエリの結果を JSON ファイルでエクスポートしておいて JavaScript で整形しています。前回と違うのは SEO 無視で Vue.js にお任せしてるところです。フロントエンドはなかなか慣れないなぁ、という感じです。

YouTube のデータは YouTube Data API から取得できます。無料でも使えるので Elasticsearch 用のサンプルデータを取得するのにもいいのではないでしょうか。(一日のリクエスト回数には上限があります) 使い方は公式のドキュメントを見ればだいだいわかると思います。データを取得するだけなので使うメソッドは list です。チャンネルはいいんですが、動画になるとAPI の制限がなかなか厳しいですね。

Channels: list  |  YouTube Data API (v3)  |  Google Developers

YouTube Data API からデータを取得する

データの取り方は色々あると思うのですが、自分の場合は1回あたりの最大50件ずつで取得するようにしています。

取得する part は下記の3つです。

  • snippet
  • contentDetails
  • statistics

さらに上記から fields を使って必要な項目だけに絞ってます。この辺の項目絞りもクォータ量に影響するんだった気がしますがいまは覚えてません。

fields=items(id,snippet(title,publishedAt,thumbnails),contentDetails(relatedPlaylists/uploads),statistics(viewCount,subscriberCount))

id はカンマ区切りで複数指定が出来るので50件まとめてしまいます。現時点で119チャンネルが取得対象なんですが、これなら3回のリクエストで終わります。

https://www.googleapis.com/youtube/v3/channels?maxResults=50&id=UCWMwHoGz5QhhRDc3K8SQ6cw,UC6UwdMiDJfyjEipxJ66ceUg,UCZ1WJDkMNiZ_QwHnNrVf7Pw,UCCebk1_w5oiMUTRxdNJq0sA,UC4YaOt1yT-ZeyB0OmxHgolA,UC53UDnhAAYwvNO7j_2Ju1cQ,UCIdEIHpS0TdkqRkHL5OkLtA,UCCVwhI5trmaSxfcze_Ovzfw,UCB1s_IdO-r0nUkY2mXeti-A,UCfiy-dr0s1O6LJRV6KHomLw,UC1suqwovbL1kzsoaZgFZLKg,UCfM_A7lE6LkGrzx6_mOtI4g,UCyof-1Ko_jy2sOtivyTpc4Q,UCQ0UDLQCjY0rmuxCDE38FGg,UCpPuEfqwYbpn7e2jWdQeWew,UCT1AQFit-Eaj_YQMsfV0RhQ,UCPvGypSgfDkVe7JG2KygK7A,UCQlLqVz0RFOkFpjrJv-k-Zg,UCD-miitqNY3nyukJ4Fnf4_A,UCmUjjW5zF1MMOhYUwwwQv9Q,UCARI2g7r-PHaxrIcAYsMfmA,UCBe_jjkUHhVNAj46bukAbJA,UC2ZVDmnoZAOdLt7kI7Uaqog,UCM6ZAX8qPfCzEkKcGOFWPMw,UCbFwe3COkDrbNsbMyGNCsDg,UCAr7rLi_Wn09G-XfTA07d4g,UCmTcayoDVo7HXAAV_mquHEg,UCbxANlIBzexmsg7-eucWNoA,UCsg-YqdqQ-KFF0LNk23BY4A,UCtpB6Bvhs1Um93ziEDACQ8g,UCCvInijwD6Qg9xwdtYJcYtQ,UC_GCs6GARLxEHxy1w40d6VQ,UC1zFJrfEKvCixhsjNSb1toQ,UC7fk0CB07ly8oSl0aqKkqFg,UCfiK42sBHraMBK6eNWtsy7A,UCXTpFs_3PqI41qX2d9tL2Rw,UCD8HOxPs4Xvsm8H0ZxXGiBw,UCpnvhOIJ6BN-vPkYU9ls-Eg,UCJQMHCFjVZOVRYafR6gY04Q,UCKYPwPHjmgLWrJwkcLhGvNg,UCHTnX0CSX_KObo5I9WuZ64g,UCmgWMQkenFc72QnYkdxdoKA,UCLhUvJ_wO9hOvv_yYENu4fQ,UCYKP16oMX9KKPbrNgo_Kgag,UCp-5t9SrOQwXMU7iIjQfARg,UC_4tXjqecqox5Uc05ncxpxg,UCwRKt_raV3N5KZgxcFyC1vw,UCkPIfBOLoO0hVPG-tI2YeGg,UC48jH1ul-6HOrcSSfoR02fQ,UC8NZiqKx6fsDT3AVcMiVFyA&part=snippet,contentDetails,statistics&fields=items(id,snippet(title,publishedAt,thumbnails),contentDetails(relatedPlaylists%2Fuploads),statistics(viewCount,subscriberCount))&key=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

50件ずつの区切り方はこんな感じですね。チャンネル数が 50 で割り切れなかったら None で埋めておきます。その後、 zip() で 50 件ごとのリストに分け、最後に None を削っています。最初は None 埋めをピッタリやろうかなと思ったんですが、zip() の時点で勝手に削られるのであまり拘らないことにしました。

MAX_LENGTH = 50

# TSV ファイルからチャンネル ID を読み込む
with open('vtuber.tsv', 'r') as f:
    channelIds = list(set([row[0] for row in csv.reader(f, delimiter='\t')]))

# リストが MAX_LENGTH で割り切れるかチェック
if len(channelIds) % MAX_LENGTH > 0:
    # 割り切れなかったら None で埋める
    channelIds += [None for i in range(MAX_LENGTH)]

# 50 件ごとのリストに分割しつつ None を除去
channelIdsSets = [[y for y in x if y is not None] for x in list(zip(*[iter(channelIds)] * MAX_LENGTH))]

あとはこのリストをまとめて urllib.parse.urlencode() とかで変換するんですが、ポイントとしては fields(),safe キーワードで指定しておかなきゃいけないところですかね。

    url = urllib.parse.urlunsplit([
        'https',
        'www.googleapis.com',
        '/youtube/v3/channels',
        urllib.parse.urlencode({
            'key'       : 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX',
            'id'        : ','.join(s),
            'part'      : ','.join(['snippet', 'contentDetails', 'statistics']),
            'fields'    : 'items(id,snippet(title,publishedAt,thumbnails),contentDetails(relatedPlaylists/uploads),statistics(viewCount,subscriberCount))',
            'maxResults': MAX_LENGTH,
        }, safe=',()'),
        None
    ])

Python のコードの方はスクラッチで書いたものなのでいまのところあんまり凝って書いてはいません。

では取れたデータを見てみます。実際には {"items": []} という配列の中に1チャンネルごとに入っています。statistics.subscriberCountstatistics.viewCount が string になってるのがちょっと注意ですかね。

{
  "id": "UCMxKcUjeTEcgHmC9Zzn3R4w",
  "snippet": {
    "thumbnails": {
      "medium": {
        "url": "https://yt3.ggpht.com/a-/AAuE7mDsZK0Vy1Ih2fGAU8nBLBBM2Y3cmupPAoBZ9w=s240-mo-c-c0xffffffff-rj-k-no",
        "height": 240,
        "width": 240
      },
      "high": {
        "url": "https://yt3.ggpht.com/a-/AAuE7mDsZK0Vy1Ih2fGAU8nBLBBM2Y3cmupPAoBZ9w=s800-mo-c-c0xffffffff-rj-k-no",
        "height": 800,
        "width": 800
      },
      "default": {
        "url": "https://yt3.ggpht.com/a-/AAuE7mDsZK0Vy1Ih2fGAU8nBLBBM2Y3cmupPAoBZ9w=s88-mo-c-c0xffffffff-rj-k-no",
        "height": 88,
        "width": 88
      }
    },
    "publishedAt": "2018-08-08T05:20:43.000Z",
    "title": "由宇霧ちゃんねる"
  },
  "@timestamp": "2019-03-17T09:00:00+09:00",
  "statistics": {
    "subscriberCount": "26961",
    "viewCount": "940158"
  },
  "contentDetails": {
    "relatedPlaylists": {
      "uploads": "UUMxKcUjeTEcgHmC9Zzn3R4w"
    }
  }
}

string になっている数値は Python ではキャストせずに Elasticsearch のマッピングで対応しています。

{
  "mappings": {
    "_doc": {
      "dynamic_templates": [
        {
          "count": {
            "match_mapping_type": "string",
            "match": "*Count",
            "mapping": {
              "type": "long"
            }
          }
        }
      ]
    }
  }
}

デイリーの差分を Aggregations で抽出する

Elasticsearch の良いところは内部で色々な計算が出来るところですね。YouTube のデイリーデータの活用方法をあまり見い出せていませんが、とりあえず必要になるのは以下の2つでしょうか。

  • 再生回数の前日との差
  • チャンネル登録者数の前日との差

これは Aggregation Query で前日と当日の max を取り、それらの Bucket を Pipeline Aggregations の derivativeserial_diff で計算する感じですね。

{
  "size": 0,
  "query": {
    "query_string": {
      "default_field": "@timestamp",
      "query": "[now-1d/d TO now/d]" // 対象データを前日と当日に限定
    }
  },
  "aggs": {
    "channel": {
      "terms": {
        "field": "id",
        "size": 500 // 取得対象のチャンネル数に応じて調整
      },
      "aggs": {
        "daily": {
          "date_histogram": {
            "field": "@timestamp",
            "interval": "day"
          },
          "aggs": {
            // チャンネル登録者数用
            "subscriberCount": {
              "max": {
                "field": "statistics.subscriberCount",
                "missing": 0
              }
            },
            "serialdiffSubscriberCount": {
              "serial_diff": {
                "buckets_path": "subscriberCount" // 上で計算した Bucket を指定する
              }
            },
            // 再生回数用
            "viewCount": {
              "max": {
                "field": "statistics.viewCount",
                "missing": 0
              }
            },
            "serialdiffViewCount": {
              "serial_diff": {
                "buckets_path": "viewCount" // 上で計算した Bucket を指定する
              }
            }
          }
        }
      }
    }
  }
}

Aggregations の結果を絞り込む

グラフ用のデータを取り出す必要があったんですが、チャンネルの件数が多いので Aggregations の結果から「上位○○件」みたいな絞り込みをしようと思いました。やや複雑になりますが順番と書き方のコツさえ掴めばそんなに難しくはないはず。

  1. ❶: 日付ごとの値を取り出す
  2. ❷: ❶の結果から差分を計算する
  3. ❸: ❷の結果の累積和を計算する
  4. ❹: ❸の結果の最大値を計算する
  5. ❺: ❹の結果を降順でソートする
  6. ❻: ❺の結果を上位 n 件で絞り込む

date_histograminterval ごとに分割された結果では親はどの値を元にソートすればいいかわからないため、分割のひとつ上の階層(date_histogram と同じ階層)から分割された bucket の中を除き、max 等で値を取り出して単体の要素にします。で、あとは buckets_sortfromsize が指定できるので絞り込む感じ。

buckets_path> 記号を使っていますが、これは比較演算子ではなくて Aggregation Separator です。stats などを使った場合は Metric Separator の . を使って .avg のように指定するようです。色々試したら Aggregation Separator を . で置き換えても動作するみたいですが、Metric Separator を > で置き換えるとエラーになりました。詳しくは https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-pipeline.html#buckets-path-syntax を参照するとよいかと。

{
  "size": 0,
  "query": {
    "query_string": {
      "default_field": "@timestamp",
      "query": "[now-7d/d TO now/d]"
    }
  },
  "aggs": {
    "channel": {
      "terms": {
        "field": "snippet.title",
        "size": 500
      },
      "aggs": {
        "daily": {
          "date_histogram": { // ❶
            "field": "@timestamp",
            "interval": "day"
          },
          "aggs": {
            "subscriberCount": { // ❶
              "max": {
                "field": "statistics.subscriberCount",
                "missing": 0
              }
            },
            "serialdiffSubscriberCount": {
              "serial_diff": { // ❷
                "buckets_path": "subscriberCount"
              }
            },
            "cumulativesumSerialdiffSubscriberCount": {
              "cumulative_sum": { // ❸
                  "buckets_path": "serialdiffSubscriberCount"
                }
            }
          }
        },
        "maxSerialdiffSubscribersCount": { // この項目は今回条件に使っていません
          "max_bucket": {
            "buckets_path": "daily>serialdiffSubscriberCount"
          }
        },
        "maxCumulativesumSubscriberCount": {
          "max_bucket": { // ❹
            "buckets_path": "daily>cumulativesumSerialdiffSubscriberCount"
          }
        },
        "sortCondition": {
          "bucket_sort": { // ❺
            "sort": [
              {
                "maxCumulativesumSubscriberCount": {
                  "order": "desc" // ❺
                }
              }
            ],
            "from": 0, // ❻
            "size": 10 // ❻
          }
        }
      }
    }
  }
}
{
  "took" : 59,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 513,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "channel" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "由宇霧ちゃんねる",
          "doc_count" : 5,
          "daily" : {
            "buckets" : [
              {
                "key_as_string" : "2019-03-14T00:00:00.000Z",
                "key" : 1552521600000,
                "doc_count" : 1,
                "subscriberCount" : {
                  "value" : 15823.0
                },
                "cumulativesumSerialdiffSubscriberCount" : {
                  "value" : 0.0
                }
              },
              {
                "key_as_string" : "2019-03-15T00:00:00.000Z",
                "key" : 1552608000000,
                "doc_count" : 1,
                "subscriberCount" : {
                  "value" : 17682.0
                },
                "serialdiffSubscriberCount" : {
                  "value" : 1859.0
                },
                "cumulativesumSerialdiffSubscriberCount" : {
                  "value" : 1859.0
                }
              },
              {
                "key_as_string" : "2019-03-16T00:00:00.000Z",
                "key" : 1552694400000,
                "doc_count" : 1,
                "subscriberCount" : {
                  "value" : 22277.0
                },
                "serialdiffSubscriberCount" : {
                  "value" : 4595.0
                },
                "cumulativesumSerialdiffSubscriberCount" : {
                  "value" : 6454.0
                }
              },
              {
                "key_as_string" : "2019-03-17T00:00:00.000Z",
                "key" : 1552780800000,
                "doc_count" : 1,
                "subscriberCount" : {
                  "value" : 26961.0
                },
                "serialdiffSubscriberCount" : {
                  "value" : 4684.0
                },
                "cumulativesumSerialdiffSubscriberCount" : {
                  "value" : 11138.0
                }
              },
              {
                "key_as_string" : "2019-03-18T00:00:00.000Z",
                "key" : 1552867200000,
                "doc_count" : 1,
                "subscriberCount" : {
                  "value" : 35666.0
                },
                "serialdiffSubscriberCount" : {
                  "value" : 8705.0
                },
                "cumulativesumSerialdiffSubscriberCount" : {
                  "value" : 19843.0
                }
              }
            ]
          },
          "maxDerivativeSubscribersCount" : {
            "value" : 8705.0,
            "keys" : [
              "2019-03-18T00:00:00.000Z"
            ]
          },
          "maxCumulativesumSubscriberCount" : {
            "value" : 19843.0,
            "keys" : [
              "2019-03-18T00:00:00.000Z"
            ]
          }
        }
      ]
    }
  }
}

AWS と Vue.js については次の記事で書く予定。

jq コマンドで2つのファイルから配列を結合する

YouTube Data API を使っていると maxResult=50 が限界なのでそれ以上になるとどうしても JSON が分かれてしまう。Python とかなら JSON をオブジェクトに変換してしまえばいいのだけど、忘れるので jq コマンドで実行する方法をメモっておく。

1.json

{
  "items": [
    {
      "id": "UCD8HOxPs4Xvsm8H0ZxXGiBw",
      "snippet": {
        "title": "Mel Channel 夜空メルチャンネル",
        "publishedAt": "2018-04-25T02:07:54.000Z",
        "thumbnails": {
          "default": {
            "url": "https://yt3.ggpht.com/a-/AAuE7mC5-XNF0MJc4spIJdwfxY1sFIyjWB8qAHd9_A=s88-mo-c-c0xffffffff-rj-k-no",
            "width": 88,
            "height": 88
          },
          "medium": {
            "url": "https://yt3.ggpht.com/a-/AAuE7mC5-XNF0MJc4spIJdwfxY1sFIyjWB8qAHd9_A=s240-mo-c-c0xffffffff-rj-k-no",
            "width": 240,
            "height": 240
          },
          "high": {
            "url": "https://yt3.ggpht.com/a-/AAuE7mC5-XNF0MJc4spIJdwfxY1sFIyjWB8qAHd9_A=s800-mo-c-c0xffffffff-rj-k-no",
            "width": 800,
            "height": 800
          }
        }
      },
      "contentDetails": {
        "relatedPlaylists": {
          "uploads": "UUD8HOxPs4Xvsm8H0ZxXGiBw"
        }
      },
      "statistics": {
        "viewCount": "1217525",
        "subscriberCount": "32661"
      }
    }
  ]
}

2.json

{
  "items": [
    {
      "id": "UCsg-YqdqQ-KFF0LNk23BY4A",
      "snippet": {
        "title": "樋口楓【にじさんじ所属】",
        "publishedAt": "2018-01-31T10:47:47.000Z",
        "thumbnails": {
          "default": {
            "url": "https://yt3.ggpht.com/a-/AAuE7mCLW7sO_ERKAC41K1XInz_nT0r8Q7JHgGnV0w=s88-mo-c-c0xffffffff-rj-k-no",
            "width": 88,
            "height": 88
          },
          "medium": {
            "url": "https://yt3.ggpht.com/a-/AAuE7mCLW7sO_ERKAC41K1XInz_nT0r8Q7JHgGnV0w=s240-mo-c-c0xffffffff-rj-k-no",
            "width": 240,
            "height": 240
          },
          "high": {
            "url": "https://yt3.ggpht.com/a-/AAuE7mCLW7sO_ERKAC41K1XInz_nT0r8Q7JHgGnV0w=s800-mo-c-c0xffffffff-rj-k-no",
            "width": 800,
            "height": 800
          }
        }
      },
      "contentDetails": {
        "relatedPlaylists": {
          "uploads": "UUsg-YqdqQ-KFF0LNk23BY4A"
        }
      },
      "statistics": {
        "viewCount": "13124038",
        "subscriberCount": "172943"
      }
    }
  ]
}

トップレベルのオブジェクトが2つある場合はそれぞれ .[0].[1] のようなインデックスに入っているらしい。そこから配列部分を取り出して + で連結してあげる。あとは並び替えの条件とかを追加して最後にまた {"items": .} に入れてあげる。

メモ: 配列からピンポイントで .[0] のように取ろうとすると要素が1つの場合は配列にしてくれないようなので .[0:1] のようにスライスで取得する。

cat 1.json 2.json | jq -s '.[0].items + .[1].items | sort_by(.statistics.viewCount | tonumber) | reverse | {"items": .}'

concatenated.json

{
  "items": [
    {
      "id": "UCsg-YqdqQ-KFF0LNk23BY4A",
      "snippet": {
        "title": "樋口楓【にじさんじ所属】",
        "publishedAt": "2018-01-31T10:47:47.000Z",
        "thumbnails": {
          "default": {
            "url": "https://yt3.ggpht.com/a-/AAuE7mCLW7sO_ERKAC41K1XInz_nT0r8Q7JHgGnV0w=s88-mo-c-c0xffffffff-rj-k-no",
            "width": 88,
            "height": 88
          },
          "medium": {
            "url": "https://yt3.ggpht.com/a-/AAuE7mCLW7sO_ERKAC41K1XInz_nT0r8Q7JHgGnV0w=s240-mo-c-c0xffffffff-rj-k-no",
            "width": 240,
            "height": 240
          },
          "high": {
            "url": "https://yt3.ggpht.com/a-/AAuE7mCLW7sO_ERKAC41K1XInz_nT0r8Q7JHgGnV0w=s800-mo-c-c0xffffffff-rj-k-no",
            "width": 800,
            "height": 800
          }
        }
      },
      "contentDetails": {
        "relatedPlaylists": {
          "uploads": "UUsg-YqdqQ-KFF0LNk23BY4A"
        }
      },
      "statistics": {
        "viewCount": "13124038",
        "subscriberCount": "172943"
      }
    },
    {
      "id": "UCD8HOxPs4Xvsm8H0ZxXGiBw",
      "snippet": {
        "title": "Mel Channel 夜空メルチャンネル",
        "publishedAt": "2018-04-25T02:07:54.000Z",
        "thumbnails": {
          "default": {
            "url": "https://yt3.ggpht.com/a-/AAuE7mC5-XNF0MJc4spIJdwfxY1sFIyjWB8qAHd9_A=s88-mo-c-c0xffffffff-rj-k-no",
            "width": 88,
            "height": 88
          },
          "medium": {
            "url": "https://yt3.ggpht.com/a-/AAuE7mC5-XNF0MJc4spIJdwfxY1sFIyjWB8qAHd9_A=s240-mo-c-c0xffffffff-rj-k-no",
            "width": 240,
            "height": 240
          },
          "high": {
            "url": "https://yt3.ggpht.com/a-/AAuE7mC5-XNF0MJc4spIJdwfxY1sFIyjWB8qAHd9_A=s800-mo-c-c0xffffffff-rj-k-no",
            "width": 800,
            "height": 800
          }
        }
      },
      "contentDetails": {
        "relatedPlaylists": {
          "uploads": "UUD8HOxPs4Xvsm8H0ZxXGiBw"
        }
      },
      "statistics": {
        "viewCount": "1217525",
        "subscriberCount": "32661"
      }
    }
  ]
}

AWS Elasticache で作成した Redis に外部から接続したい

Heroku Redis を使っている方から「Redis だけ AWS を利用出来ないか?」というご相談をいただいたので検証してみた。

本記事は接続検証を目的としたものです。本記事に起因して発生したいかなるトラブルや損害等について当方は一切責任を負いません。

事前調査

  • Elasticache のエンドポイントには EIP を付与することはできない。
  • Elasticache で作成したエンドポイントにはパブリック IP が付与できないので EC2 インスタンスのように外から簡単にアクセス出来るようにはなっていない。
  • Network Load Balancer を使えば TCP のロードバランシングも出来るが、ターゲットには IP アドレスか EC2 インスタンスしか選択できないので Elasticache のエンドポイントが指定できない。(プライマリのプライベート IP アドレス決め打ちなら出来るかもしれないが問題が起きた時に変更するの面倒)

というわけでリバースプロキシを立てることにして、今回は HAProxy メインで、NGINX でも検証してみる。透過プロキシは設定が面倒なので普通のリバースプロキシで。

AWS の構成

VPC

パブリックサブネット1つとプライベートサブネットを1つ作成する。パブリックサブネットに EC2 インスタンスを配置、Elasticache のサブネットグループでプライベートサブネットを指定する。

EIP

EC2 インスタンス用に1つ作成しておく。(グローバル IP が変わってもいいなら不要)

セキュリティグループ

EC2 インスタンス用と Redis 用の2つを用意する。HAProxy で 6379 番ポートを待ち受けるのは危険なので Redis のデフォルトから変更してソースもマイ IP で絞り込んでおく。

EC2 インスタンス

タイプ プロトコル ポート範囲 ソース
SSH TCP 22 マイ IP
カスタム TCP ルール TCP 16379 マイ IP

Redis 用

タイプ プロトコル ポート範囲 ソース
カスタム TCP ルール TCP 6379 ↑で作った EC2 インスタンス用のセキュリティグループ

EC2 インスタンス

普通の EC2 インスタンスを作成する。Amazon Linux 系はバージョンがかなり古いので Ubuntu が良いかもね。SSL 対応は HAProxy 1.5 からなので Amazon Linux も一応対応してる。

amzn2-ami-hvm-2.0.20190228-x86_64-gp2 (ami-097473abce069b8e9)

HA-Proxy version 1.5.18 2016/05/10
Copyright 2000-2016 Willy Tarreau <willy@haproxy.org>

Build options :
  TARGET  = linux2628
  CPU     = generic
  CC      = gcc
  CFLAGS  = -O2 -g -fno-strict-aliasing -DTCP_USER_TIMEOUT=18
  OPTIONS = USE_LINUX_TPROXY=1 USE_ZLIB=1 USE_REGPARM=1 USE_OPENSSL=1 USE_PCRE=1

Default settings :
  maxconn = 2000, bufsize = 16384, maxrewrite = 8192, maxpollevents = 200

Encrypted password support via crypt(3): yes
Built with zlib version : 1.2.7
Compression algorithms supported : identity, deflate, gzip
Built with OpenSSL version : OpenSSL 1.0.2k-fips  26 Jan 2017
Running on OpenSSL version : OpenSSL 1.0.2k-fips  26 Jan 2017
OpenSSL library supports TLS extensions : yes
OpenSSL library supports SNI : yes
OpenSSL library supports prefer-server-ciphers : yes
Built with PCRE version : 8.32 2012-11-30
PCRE library supports JIT : no (USE_PCRE_JIT not set)
Built with transparent proxy support using: IP_TRANSPARENT IPV6_TRANSPARENT IP_FREEBIND

Available polling systems :
      epoll : pref=300,  test result OK
       poll : pref=200,  test result OK
     select : pref=150,  test result OK
Total: 3 (3 usable), will use epoll.

ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190212.1 (ami-0eb48a19a8d81e20b)

HA-Proxy version 1.8.8-1ubuntu0.4 2019/01/24
Copyright 2000-2018 Willy Tarreau <willy@haproxy.org>

Build options :
  TARGET  = linux2628
  CPU     = generic
  CC      = gcc
  CFLAGS  = -g -O2 -fdebug-prefix-map=/build/haproxy-Mxbbv4/haproxy-1.8.8=. -fstack-protector-strong -Wformat -Werror=format-security -Wdate-time -D_FORTIFY_SOURCE=2
  OPTIONS = USE_GETADDRINFO=1 USE_ZLIB=1 USE_REGPARM=1 USE_OPENSSL=1 USE_LUA=1 USE_SYSTEMD=1 USE_PCRE=1 USE_PCRE_JIT=1 USE_NS=1

Default settings :
  maxconn = 2000, bufsize = 16384, maxrewrite = 1024, maxpollevents = 200

Built with OpenSSL version : OpenSSL 1.1.0g  2 Nov 2017
Running on OpenSSL version : OpenSSL 1.1.0g  2 Nov 2017
OpenSSL library supports TLS extensions : yes
OpenSSL library supports SNI : yes
OpenSSL library supports : TLSv1.0 TLSv1.1 TLSv1.2
Built with Lua version : Lua 5.3.3
Built with transparent proxy support using: IP_TRANSPARENT IPV6_TRANSPARENT IP_FREEBIND
Encrypted password support via crypt(3): yes
Built with multi-threading support.
Built with PCRE version : 8.39 2016-06-14
Running on PCRE version : 8.39 2016-06-14
PCRE library supports JIT : yes
Built with zlib version : 1.2.11
Running on zlib version : 1.2.11
Compression algorithms supported : identity("identity"), deflate("deflate"), raw-deflate("deflate"), gzip("gzip")
Built with network namespace support.

Available polling systems :
      epoll : pref=300,  test result OK
       poll : pref=200,  test result OK
     select : pref=150,  test result OK
Total: 3 (3 usable), will use epoll.

Available filters :
        [SPOE] spoe
        [COMP] compression
        [TRACE] trace

Elasticache Redis

クラスタ構成無しで作成。HAProxy が SSL オフローダーになるので SSL 対応不要な気がするけど実験なので「送信中の暗号化」を有効にしておく。他は以下のように設定。

  • 疎通確認なので「Redis AUTH」は使わない。
  • サブネットグループにはプライベートサブネットを選択。
  • EC2 に付与してあるセキュリティグループからのみ 6379 番ポートのインバウンド接続を受け付けるセキュリティグループを用意する。

f:id:mattintosh4:20190313191141p:plain
AWS Elasticache

HAProxy の設定

まずはインストールから。

Amazon Linux

yum install haproxy
service haproxy start
chkconfig haproxy on

Ubuntu

apt install haproxy

設定はどちらも /etc/haproxy/haproxy.cfg くらいしか使わない。

自己署名証明書を作成する

HAProxy を SSL 対応させるために証明書を作成するんだけど実験なので自己署名で作成する。ドメイン持ってるなら Let's Encrypt で作成した privkey.pemfullchain.pem を結合したものを用意すれば良い。

ファイルの配置は以下のようにするけど人それぞれ。

/etc/haproxy/
|
+-- haproxy.cfg
|
+-- ssl/
    |
    +-- haproxy.pem

自己署名なので openssl req コマンドで作成するけどいつも忘れるのでここらでちゃんと覚えておく。

openssl req -help

Usage: req [options]
Valid options are:
 -help               Display this summary
 -inform PEM|DER     Input format - DER or PEM
 -outform PEM|DER    Output format - DER or PEM
 -in infile          Input file
 -out outfile        Output file
 -key val            Private key to use
 -keyform format     Key file format
 -pubkey             Output public key
 -new                New request
 -config infile      Request template file
 -keyout outfile     File to send the key to
 -passin val         Private key password source
 -passout val        Output file pass phrase source
 -rand val           Load the file(s) into the random number generator
 -newkey val         Specify as type:bits
 -pkeyopt val        Public key options as opt:value
 -sigopt val         Signature parameter in n:v form
 -batch              Do not ask anything during request generation
 -newhdr             Output "NEW" in the header lines
 -modulus            RSA modulus
 -verify             Verify signature on REQ
 -nodes              Don't encrypt the output key
 -noout              Do not output REQ
 -verbose            Verbose output
 -utf8               Input characters are UTF8 (default ASCII)
 -nameopt val        Various certificate name options
 -reqopt val         Various request text options
 -text               Text form of request
 -x509               Output a x509 structure instead of a cert request
                     (Required by some CA's)
 -subj val           Set or modify request subject
 -subject            Output the request's subject
 -multivalue-rdn     Enable support for multivalued RDNs
 -days +int          Number of days cert is valid for
 -set_serial val     Serial number to use
 -extensions val     Cert extension section (override value in config file)
 -reqexts val        Request extension section (override value in config file)
 -*                  Any supported digest
 -engine val         Use engine, possibly a hardware device
 -keygen_engine val  Specify engine to be used for key generation operations

使うオプションは以下の通り。対話形式で作成したいなら -subj は省略。

オプション 動作
-new 新規リクエス
-x509 証明書要求の代わりに X509 構造体を出力する
-nodes 鍵を暗号化しない(パスワード不要で作成する)
-newkey type:bits 鍵のタイプを指定する(例:rsa:2048
-keyout /path/to/file 鍵の出力先
-out /path/to/file 出力先(-x509 と併用しているので証明書の出力先)
-subj val 対話形式での回答を省略
-days val 証明書の有効期間を指定

Common Name を EIP の IP アドレスにしておく。秘密鍵と証明書を分けて作成してあとから結合してもいいけど、実験なので最初からまとめてしまう。

install -d -m 0700 /etc/haproxy/ssl
openssl req -new -x509 -nodes \
  -newkey rsa:2048 \
  -keyout /etc/haproxy/ssl/haproxy.pem \
  -out /etc/haproxy/ssl/haproxy.pem \
  -subj "/C=JP/CN=192.168.1.254" \
  -days 365
chmod 0600 /etc/haproxy/ssl/haproxy.pem

HAProxy の SSL 設定

Amazon LinuxUbuntu でバージョンが異なるため初期設定も異なる。以下には必要なところだけ記載しているので詳しくは各バージョンごとの公式ドキュメントを参照。frontend ブロックで入力を作って backend で出力を作ってやればよい。

/etc/haproxy/haproxy.cfg

global
  ssl-default-bind-ciphers ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:RSA+AESGCM:RSA+AES:!aNULL:!MD5:!DSS
# ssl-default-bind-options no-sslv3
# tune.ssl.default-dh-param 2048

defaults
  log    global
  mode   tcp
  option tcplog
  option dontlognull

## 非 SSL 用設定例
frontend nossl
  bind *:16379
  default_backend redis-nossl

backend redis-nossl
  server redis-1 名前.apne1.cache.amazonaws.com:6379 check

## SSL 用設定例(フロントエンド、バックエンドともに SSL 対応)
frontend ssl
  bind *:16380 ssl crt /etc/haproxy/ssl/haproxy.pem
  default_backend redis-ssl

backend redis-ssl
  server redis-1 master.名前.apne1.cache.amazonaws.com:6379 check ssl verify none

Amazon Linux の場合、ssl-default-bind-options no-sslv3 だとオプションが存在しないためエラーになる。また、tune.ssl.default-dh-param 2048 を書いておかないと起動時に警告がでる。

Amazon LinuxUbuntu の haproxy.cfg のソース

インスタンス作って確認するの面倒なので残しておく。

Amazon Linux

#---------------------------------------------------------------------
# Example configuration for a possible web application.  See the
# full configuration options online.
#
#   http://haproxy.1wt.eu/download/1.4/doc/configuration.txt
#
#---------------------------------------------------------------------

#---------------------------------------------------------------------
# Global settings
#---------------------------------------------------------------------
global
    # to have these messages end up in /var/log/haproxy.log you will
    # need to:
    #
    # 1) configure syslog to accept network log events.  This is done
    #    by adding the '-r' option to the SYSLOGD_OPTIONS in
    #    /etc/sysconfig/syslog
    #
    # 2) configure local2 events to go to the /var/log/haproxy.log
    #   file. A line like the following can be added to
    #   /etc/sysconfig/syslog
    #
    #    local2.*                       /var/log/haproxy.log
    #
    log         127.0.0.1 local2

    chroot      /var/lib/haproxy
    pidfile     /var/run/haproxy.pid
    maxconn     4000
    user        haproxy
    group       haproxy
    daemon

    # turn on stats unix socket
    stats socket /var/lib/haproxy/stats

#---------------------------------------------------------------------
# common defaults that all the 'listen' and 'backend' sections will
# use if not designated in their block
#---------------------------------------------------------------------
defaults
    mode                    http
    log                     global
    option                  httplog
    option                  dontlognull
    option http-server-close
    option forwardfor       except 127.0.0.0/8
    option                  redispatch
    retries                 3
    timeout http-request    10s
    timeout queue           1m
    timeout connect         10s
    timeout client          1m
    timeout server          1m
    timeout http-keep-alive 10s
    timeout check           10s
    maxconn                 3000

#---------------------------------------------------------------------
# main frontend which proxys to the backends
#---------------------------------------------------------------------
frontend  main *:5000
    acl url_static       path_beg       -i /static /images /javascript /stylesheets
    acl url_static       path_end       -i .jpg .gif .png .css .js

    use_backend static          if url_static
    default_backend             app

#---------------------------------------------------------------------
# static backend for serving up images, stylesheets and such
#---------------------------------------------------------------------
backend static
    balance     roundrobin
    server      static 127.0.0.1:4331 check

#---------------------------------------------------------------------
# round robin balancing between the various backends
#---------------------------------------------------------------------
backend app
    balance     roundrobin
    server  app1 127.0.0.1:5001 check
    server  app2 127.0.0.1:5002 check
    server  app3 127.0.0.1:5003 check
    server  app4 127.0.0.1:5004 check

Ubuntu

global
    log /dev/log    local0
    log /dev/log    local1 notice
    chroot /var/lib/haproxy
    stats socket /run/haproxy/admin.sock mode 660 level admin expose-fd listeners
    stats timeout 30s
    user haproxy
    group haproxy
    daemon

    # Default SSL material locations
    ca-base /etc/ssl/certs
    crt-base /etc/ssl/private

    # Default ciphers to use on SSL-enabled listening sockets.
    # For more information, see ciphers(1SSL). This list is from:
    #  https://hynek.me/articles/hardening-your-web-servers-ssl-ciphers/
    # An alternative list with additional directives can be obtained from
    #  https://mozilla.github.io/server-side-tls/ssl-config-generator/?server=haproxy
    ssl-default-bind-ciphers ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:RSA+AESGCM:RSA+AES:!aNULL:!MD5:!DSS
    ssl-default-bind-options no-sslv3

defaults
    log global
    mode    http
    option  httplog
    option  dontlognull
        timeout connect 5000
        timeout client  50000
        timeout server  50000
    errorfile 400 /etc/haproxy/errors/400.http
    errorfile 403 /etc/haproxy/errors/403.http
    errorfile 408 /etc/haproxy/errors/408.http
    errorfile 500 /etc/haproxy/errors/500.http
    errorfile 502 /etc/haproxy/errors/502.http
    errorfile 503 /etc/haproxy/errors/503.http
    errorfile 504 /etc/haproxy/errors/504.http

Amazon Linux でのログファイルの出力設定

rsyslog の設定

Amazon Linux ではログが rsyslog に飛ぶようになっているので UDP を有効にして local2.*/var/log/haproxy.log にでも出力されるようにしておけば良い。

/etc/rsyslog.conf

--- /dev/fd/63  2019-03-13 11:21:15.779833034 +0000
+++ /etc/rsyslog.conf   2019-03-11 04:56:02.570224255 +0000
@@ -10,8 +10,8 @@
 #$ModLoad immark  # provides --MARK-- message capability
 
 # Provides UDP syslog reception
-#$ModLoad imudp
-#$UDPServerRun 514
+$ModLoad imudp
+$UDPServerRun 514
 
 # Provides TCP syslog reception
 #$ModLoad imtcp
@@ -55,6 +55,7 @@
 
 # Save boot messages also to boot.log
 local7.*                                                /var/log/boot.log
+local2.*                                                /var/log/haproxy.log
 
 
 # ### begin forwarding rule ###

AWS Logs の設定

CloudWatch Logs にログを飛ばす場合は以下のように書いておく。インスタンスにロールを付与しておくのを忘れずに。

/etc/awslogs/awslogs.conf

[/var/log/haproxy.log]
datetime_format = %b %d %H:%M:%S
file = /var/log/haproxy.log
log_stream_name = /var/log/haproxy.log
initial_position = start_of_file
log_group_name = foo

サービスの再起動

Amazon Linux

sudo service haproxy restart

Ubuntu

sudo systemctl restart haproxy

HAProxy Statistics Report を使う場合

ロードバランシングの状態が確認できる。haproxy.cfg のどこかに listen ディレクティブを作って軽く定義するだけで使える。

f:id:mattintosh4:20190313201702p:plain
HAProxy Statistics Report

HAProxy の状態 haproxy.cfglisten というディレクティブで設定を書いていく。書き方は色々あるけど Datadog さんのサンプルがわかりやすいかな。

listen stats # Define a listen section called "stats"
  bind :9000 # Listen on localhost:9000
  mode http
  stats enable  # Enable stats page
  stats hide-version  # Hide HAProxy version
  stats realm Haproxy\ Statistics  # Title text for popup window
  stats uri /haproxy_stats  # Stats URI
  stats auth Username:Password  # Authentication credentials

NGINX で同じことやってみる

NGINX にもリバースプロキシの機能があるので試してみた。ポイントは秘密鍵と証明書が別ってところと、stream 内では access_log が使えないので error_log 使うくらいかな。

/etc/nginx/nginx.conf

stream {
        server {
                listen 16379 ssl;
                ssl_protocols TLSv1 TLSv1.1 TLSv1.2;  
                ssl_prefer_server_ciphers on;
                proxy_pass redis-ssl;
                proxy_ssl on;
                ssl_certificate     /etc/nginx/ssl/cert.pem;
                ssl_certificate_key /etc/nginx/ssl/privkey.pem;
                error_log /var/log/nginx/proxy.log info;
        }
        upstream redis-ssl {
                server master.名前.apne1.cache.amazonaws.com:6379;
        }
}

接続テスト

192.168.1.254 は EIP のアドレスに置き換えてください。

openssl コマンド

openssl s_client -connect 192.168.1.254:16379
ping
set foo bar
get foo

Python

import redis
r = redis.Redis(host='192.168.1.254', port=16379, ssl=True)
r.ping()
r.set('foo', 'bar')
r.get('foo')

Ruby

require "redis"
r = Redis.new(url: "rediss://192.168.1.254:16379")
r.ping()
r.set("foo", "bar")
r.get("foo")

Python GeoIP 系のメモ

最近 Python で GeoIP を使うことが多いけど、なんか色々種類があってわからなくなってきたのでちょっとまとめておく。

  • GeoIP2
  • maxminddb
  • GeoIP

データベースファイル(GeoLite2-City.mmdb)は下記からダウンロードできる。

GeoIP2

GeoLite2-City.mmdb が使えるモジュール。インストールは apt または pip から。

Console

sudo apt install python3-geoip2
pip3 install geoip2

Reader クラスのインスタンスを作って city() で IP アドレスを渡せばよい。Reader クラスは __init__(self, fileish, locales=None, mode=0) となっているので mode に以下のいずれかを渡すことでモードの指定が可能。

MODE_MMAP_EXT - use the C extension with memory map.
MODE_MMAP     - read from memory map. Pure Python.
MODE_FILE     - read database as standard file. Pure Python.
MODE_MEMORY   - load database into memory. Pure Python.
MODE_FD       - the param passed via fileish is a file descriptor, not a path. This mode implies MODE_MEMORY. Pure Python.
MODE_AUTO     - try MODE_MMAP_EXT, MODE_MMAP, MODE_FILE in that order. Default.

書き方は2種類かな?

Python3

import geoip2.database
reader = geoip2.database.Reader('GeoLite2-City.mmdb')
response = reader.city('54.70.157.111')
reader.close()

Python3

import geoip2.database
with geoip2.database.Reader('GeoLite2-City.mmdb') as reader:
    response = reader.city('54.70.157.111')

戻り値は geoip2.models.City オブジェクト。

Response

<class 'geoip2.models.City'>
geoip2.models.City({'country': {'names': {'ru': 'США', 'es': 'Estados Unidos', 'fr': 'États-Unis', 'zh-CN': '美国', 'en': 'United States', 'pt-BR': 'Estados Unidos', 'de': 'USA', 'ja': 'アメリカ合衆国'}, 'geoname_id': 6252001, 'iso_code': 'US'}, 'location': {'accuracy_radius': 1000, 'time_zone': 'America/Los_Angeles', 'metro_code': 810, 'longitude': -119.7143, 'latitude': 45.8491}, 'continent': {'names': {'ru': 'Северная Америка', 'es': 'Norteamérica', 'fr': 'Amérique du Nord', 'zh-CN': '北美洲', 'en': 'North America', 'pt-BR': 'América do Norte', 'de': 'Nordamerika', 'ja': '北アメリカ'}, 'code': 'NA', 'geoname_id': 6255149}, 'traits': {'ip_address': '54.70.157.111'}, 'registered_country': {'names': {'ru': 'США', 'es': 'Estados Unidos', 'fr': 'États-Unis', 'zh-CN': '美国', 'en': 'United States', 'pt-BR': 'Estados Unidos', 'de': 'USA', 'ja': 'アメリカ合衆国'}, 'geoname_id': 6252001, 'iso_code': 'US'}, 'subdivisions': [{'names': {'ru': 'Орегон', 'es': 'Oregón', 'fr': 'Oregon', 'zh-CN': '俄勒冈州', 'en': 'Oregon', 'pt-BR': 'Oregão', 'de': 'Oregon', 'ja': 'オレ ゴン州'}, 'geoname_id': 5744337, 'iso_code': 'OR'}], 'postal': {'code': '97818'}, 'city': {'names': {'ru': 'Бордман', 'en': 'Boardman'}, 'geoname_id': 5714964}}, ['en'])

各キーに入っている値を見て見ると geoip2.records というオブジェクトで入っている。(_locales を除いて)raw は辞書で入っているので全体が欲しければこれを取り出すのが簡単。

Python Console

>>> pprint({k: type(v) for k, v in response.__dict__.items()})
{'_locales': <class 'list'>,
 'city': <class 'geoip2.records.City'>,
 'continent': <class 'geoip2.records.Continent'>,
 'country': <class 'geoip2.records.Country'>,
 'location': <class 'geoip2.records.Location'>,
 'maxmind': <class 'geoip2.records.MaxMind'>,
 'postal': <class 'geoip2.records.Postal'>,
 'raw': <class 'dict'>,
 'registered_country': <class 'geoip2.records.Country'>,
 'represented_country': <class 'geoip2.records.RepresentedCountry'>,
 'subdivisions': <class 'geoip2.records.Subdivisions'>,
 'traits': <class 'geoip2.records.Traits'>}

location には latitudelongitude 以外にもデータが入っているので Kibana で geo_point として扱う場合は編集が必要。また、Region Map を使って都道府県別にマッピングしたい場合は {国コード}-{区域コード} を組み合わせるので {country.iso_code}-{subdivisions.iso_code}(例: JP-01)みたいになるのだけど、subdivisions のキーが存在しないことがあるので確認した方がいいかも。

response.raw

{'city': {'geoname_id': 5714964, 'names': {'en': 'Boardman', 'ru': 'Бордман'}},
 'continent': {'code': 'NA',
               'geoname_id': 6255149,
               'names': {'de': 'Nordamerika',
                         'en': 'North America',
                         'es': 'Norteamérica',
                         'fr': 'Amérique du Nord',
                         'ja': '北アメリカ',
                         'pt-BR': 'América do Norte',
                         'ru': 'Северная Америка',
                         'zh-CN': '北美洲'}},
 'country': {'geoname_id': 6252001,
             'iso_code': 'US',
             'names': {'de': 'USA',
                       'en': 'United States',
                       'es': 'Estados Unidos',
                       'fr': 'États-Unis',
                       'ja': 'アメリカ合衆国',
                       'pt-BR': 'Estados Unidos',
                       'ru': 'США',
                       'zh-CN': '美国'}},
 'location': {'accuracy_radius': 1000,
              'latitude': 45.8491,
              'longitude': -119.7143,
              'metro_code': 810,
              'time_zone': 'America/Los_Angeles'},
 'postal': {'code': '97818'},
 'registered_country': {'geoname_id': 6252001,
                        'iso_code': 'US',
                        'names': {'de': 'USA',
                                  'en': 'United States',
                                  'es': 'Estados Unidos',
                                  'fr': 'États-Unis',
                                  'ja': 'アメリカ合衆国',
                                  'pt-BR': 'Estados Unidos',
                                  'ru': 'США',
                                  'zh-CN': '美国'}},
 'subdivisions': [{'geoname_id': 5744337,
                   'iso_code': 'OR',
                   'names': {'de': 'Oregon',
                             'en': 'Oregon',
                             'es': 'Oregón',
                             'fr': 'Oregon',
                             'ja': 'オレゴン州',
                             'pt-BR': 'Oregão',
                             'ru': 'Орегон',
                             'zh-CN': '俄勒冈州'}}],
 'traits': {'ip_address': '54.70.157.111'}}

ついでに geoip2.models.City オブジェクトの属性も見てみる。

Python Console

>>> print(*dir(response), sep='\n')
__class__
__delattr__
__dict__
__dir__
__doc__
__eq__
__format__
__ge__
__getattribute__
__gt__
__hash__
__init__
__le__
__lt__
__metaclass__
__module__
__ne__
__new__
__reduce__
__reduce_ex__
__repr__
__setattr__
__sizeof__
__str__
__subclasshook__
__weakref__
_locales
city
continent
country
location
maxmind
postal
raw
registered_country
represented_country
subdivisions
traits

🤔 ホスト名はわかるけど IP アドレスがわからない

socket.gethostbyname() で変換するとか。

Python3

import socket
socket.gethostbyname('elastic.co')

Response

'54.70.157.111'

dnspython を使うとか。こっちは nameservers 属性で問い合わせ先の変更ができる。戻り値は dns.resolver.Answer オブジェクトで、リストに出来るのでそれぞれから address 属性を取り出す。

Python3

import dns.resolver
resolver = dns.resolver.Resolver()
resolver.nameservers = ['1.1.1.1']
response = resolver.query('elastic.co')
list(response)
[x.address for x in response]

Response

[<DNS IN A rdata: 52.11.225.213>, <DNS IN A rdata: 54.70.157.111>]
['52.11.225.213', '54.70.157.111']

maxminddb

GeoIP2 よりもシンプル。

Console

sudo apt install python3-maxminddb
pip3 install maxminddb

Reader() または open_database() でデータベースファイルのパスを与えてインスタンスを作成する。open_database() ではモードの指定が出来る。モードについては help(maxminddb) を参照。

Python3

from pprint import pprint
import maxminddb
reader = maxminddb.Reader('GeoLite2-City.mmdb')
# reader = maxminddb.open_database('GeoLite2-City.mmdb', maxminddb.MODE_MMAP
response = reader.get('54.70.157.111')
print(type(response))
pprint(response)

戻り値は辞書型。

Response

<class 'dict'>

出力は GeoIP2 の raw とほぼ一緒。ただし、問い合わせた IP アドレスは含まれない。

Response

{'city': {'geoname_id': 5714964, 'names': {'en': 'Boardman', 'ru': 'Бордман'}},
 'continent': {'code': 'NA',
               'geoname_id': 6255149,
               'names': {'de': 'Nordamerika',
                         'en': 'North America',
                         'es': 'Norteamérica',
                         'fr': 'Amérique du Nord',
                         'ja': '北アメリカ',
                         'pt-BR': 'América do Norte',
                         'ru': 'Северная Америка',
                         'zh-CN': '北美洲'}},
 'country': {'geoname_id': 6252001,
             'iso_code': 'US',
             'names': {'de': 'USA',
                       'en': 'United States',
                       'es': 'Estados Unidos',
                       'fr': 'États-Unis',
                       'ja': 'アメリカ合衆国',
                       'pt-BR': 'Estados Unidos',
                       'ru': 'США',
                       'zh-CN': '美国'}},
 'location': {'accuracy_radius': 1000,
              'latitude': 45.8491,
              'longitude': -119.7143,
              'metro_code': 810,
              'time_zone': 'America/Los_Angeles'},
 'postal': {'code': '97818'},
 'registered_country': {'geoname_id': 6252001,
                        'iso_code': 'US',
                        'names': {'de': 'USA',
                                  'en': 'United States',
                                  'es': 'Estados Unidos',
                                  'fr': 'États-Unis',
                                  'ja': 'アメリカ合衆国',
                                  'pt-BR': 'Estados Unidos',
                                  'ru': 'США',
                                  'zh-CN': '美国'}},
 'subdivisions': [{'geoname_id': 5744337,
                   'iso_code': 'OR',
                   'names': {'de': 'Oregon',
                             'en': 'Oregon',
                             'es': 'Oregón',
                             'fr': 'Oregon',
                             'ja': 'オレゴン州',
                             'pt-BR': 'Oregão',
                             'ru': 'Орегон',
                             'zh-CN': '俄勒冈州'}}]}

GeoIP

(古い?)dat 形式のデータベースを使う方。new() または open()インスタンスを作成する。 open() の場合はデータベースファイルとモードを指定する。GeoIP2 と異なり、IP アドレスだけでなく record_by_name() でホスト名を与えることが出来る。

Python3

from pprint import pprint
import GeoIP
gi = GeoIP.open('/usr/share/GeoIP/GeoIPCity.dat', GeoIP.GEOIP_MEMORY_CACHE)
response = gi.record_by_name('elastic.co')
# response = gi.record_by_addr('54.70.157.111')
print(type(response))
pprint(response)

戻り値は辞書型。

Response

<class 'dict'>

GeoIP2 に比べると情報が少ないが、country_code3 を持っていたり、緯度経度情報が細かったりする。(ただしこれは日本の場合は都庁や皇居などであることが多く、ピンポイントで建物を示しているわけではない)

Response

{'area_code': 541,
 'city': 'Boardman',
 'country_code': 'US',
 'country_code3': 'USA',
 'country_name': 'United States',
 'dma_code': 810,
 'latitude': 45.869598388671875,
 'longitude': -119.68800354003906,
 'metro_code': 810,
 'postal_code': '97818',
 'region': 'OR',
 'region_name': 'Oregon',
 'time_zone': 'America/Los_Angeles'}

DNS サーバのログを Elasticsearch と Kibana で可視化する

自宅で DNS サーバに Unbound を使っているのだけど立ててるだけで特に監視していないので何か遊んでみようと考えた。

久しぶりに Fluentd を使おうと思ったらバージョンが変わっていて conf の書式にハマった。

  • Unbound 1.9.0
  • fluentd 1.4.0
  • Python 3.5.3
  • Elasticsearch 6.5.2
  • Kibana 6.5.2

こんな感じで必要なものを配置していく。

fluent/
|
+-- fluent.conf
|
+-- geoip_unbound.py
|
+-- GeoLite2-City.mmdb

Fluentd tail Input Plugin で Unbound のログを解析する

まずは Unbound のログの解析。ログファイルから tail プラグインを使って読み込む。

Unbound のログは log-queries を有効にしておいて infoqueryreply の区別が付くようにしておく。log-time-ascii: yes になっていると年が入らないので log-time-ascii: no(デフォルト)でタイムスタンプとして出力されるようにしておく。

unbound.conf

log-time-ascii: no
log-queries: yes
log-tag-queryreply: yes

Unbound ログサンプル

[1551678596] unbound[12266:0] query: 127.0.0.1 yahoo.co.jp. A IN
[1551678596] unbound[12266:0] info: resolving yahoo.co.jp. A IN
[1551678596] unbound[12266:0] query: 127.0.0.1 yahoo.co.jp. AAAA IN
[1551678596] unbound[12266:0] info: resolving yahoo.co.jp. AAAA IN
[1551678596] unbound[12266:0] info: response for yahoo.co.jp. A IN
[1551678596] unbound[12266:0] info: reply from <.> 8.8.8.8#53
[1551678596] unbound[12266:0] info: query response was ANSWER
[1551678596] unbound[12266:0] info: response for yahoo.co.jp. AAAA IN
[1551678596] unbound[12266:0] info: reply from <.> 8.8.8.8#53
[1551678596] unbound[12266:0] info: query response was nodata ANSWER
[1551678596] unbound[12266:0] query: 127.0.0.1 206.135.79.183.in-addr.arpa. PTR IN
[1551678596] unbound[12266:0] info: resolving 206.135.79.183.in-addr.arpa. PTR IN
[1551678596] unbound[12266:0] info: response for 206.135.79.183.in-addr.arpa. PTR IN
[1551678596] unbound[12266:0] info: reply from <.> 8.8.4.4#53
[1551678596] unbound[12266:0] info: query response was ANSWER

Unbound のログの内、解析対象はとりあえずクエリの A レコードと PTR レコードとする。ログのすべてが解析対象にならないため、パターンに一致しない行がワーニングとして [warn]: pattern not match: とダラダラと出てきてしまう。これは @log_levelerror 以上を指定すれば出てこなくなる。expression のテストが終わったらログレベルを上げておく。

fluent.conf

<source>
  @type       tail
  @log_level  error
  tag         unbound.log
  path        /var/log/unbound/unbound.log
  pos_file    /tmp/fluentd_unbount.log.pos
  <parse>
    @type       regexp
    expression  ^\[(?<time>\d+)\] unbound\[\S\] query: (?<client>\S+) (?<host>\S+)\. (?<record_type>(A|PTR)) IN$
    time_key    time
    time_type   unixtime
  </parse>
</source>

# 出力確認用
<filter unbound.log>
  @type stdout
</filter>

🤔 @timestamp フィールドを作るか作らないか

Elasticsearch プラグインlogstash_format を有効にする場合、プラグイン側で自動的に @timestamp フィールドを作成してくれるのでここで用意しておく必要はない。外部フィルターで時間に対して何かしらの処理をしたい場合や logstash_format を使わない場合はここで @timestamp フィールドを作成しておくといいかもしれない。

🤔 時間フィールドが消えてしまう問題

time_key で時間として指定したフィールドはデフォルトだと消えてしまうので残しておきたい場合は keep_time_key を有効にする。

🤔 数値が数値型ではなく文字列型になってしまう問題

キーの型を指定したい場合は typesキー名:型 と指定する必要があるようだ。ここでは数値型にしたいので @timestamp:integer とする。

tail Input Plugin の出力サンプル

2019-03-03 21:02:31.000000000 +0900 unbound.log: {"client":"192.168.1.10","host":"ssl.gstatic.com", "record_type": "A"}

exec_filter でホスト名から国や緯度経度情報を取得するフィルターを作る

以前は GeoIP モジュールを使っていたけど今回は GeoLiteCity.dat ではなく GeoLite2-City.mmdb を使いたかったので maxminddb モジュールを使うことにした。GeoIP だと record_by_name() でホスト名から情報を拾えるんだけど maxminddbget() で IP アドレスを渡すくらいしかできないので socket.gethostbyname() を通して IP アドレスを渡している。

最新の GeoLite2-City.mmdb は以下からダウンロードできる。

このスクリプトには重大な欠点がありましたが、maxminddb モジュールの使い方メモとして残しています。DNS レコードタイプ別の設定をする以前のものですのでその辺りも異なります。

my_geoip.py (ボツ)

#!/usr/bin/env python3
import sys
import json
import socket
import maxminddb
reader = maxminddb.open_database('GeoLite2-City.mmdb')
while True:
    line = sys.stdin.readline()
    d = json.loads(line)
    g = reader.get(socket.gethostbyname(d['host']))
    d.update({
        'country': {
            'iso_code': g['country']['iso_code'],
        },
        'location': {
            'lat': g['location']['latitude'],
            'lon': g['location']['longitude'],
        },
    })
    sys.stdout.write(json.dumps(d))

🤐 重大な欠点があったため作り直した

socket.gethostbyname()DNS サーバに問い合わせたらそれも Unbound のログに出力されて無限ループじゃんアホかい!」ってなったので作り直した。使うモジュールも maxminddb じゃなくて geoip2 に変更した。

geoip2 も引数にアドレスを指定しなければいけないのは変わらないのでホスト名からなんとかアドレスを得る。dnspython というモジュールを使うと問い合わせ先の DNS サーバを指定できるとのことだったのでこれを使えば Unbound を経由させずに済む。(ただし無駄は多い)

geoip2 のデータは geoip2.models.City というオブジェクトで返ってくるんだけど、raw というプロパティを持っているのでそこを引っこ抜けば JSON に変換できるものが全部出てくる。

geoip2 の使い方例

>>> from geoip2.database
>>> from pprint import pprint
>>> reader = geoip2.database.Reader('GeoLite2-City.mmdb')
>>> response = reader('54.70.157.111')
>>> pprint(response.raw)
{'city': {'geoname_id': 5714964, 'names': {'en': 'Boardman', 'ru': 'Бордман'}},
 'continent': {'code': 'NA',
               'geoname_id': 6255149,
               'names': {'de': 'Nordamerika',
                         'en': 'North America',
                         'es': 'Norteamérica',
                         'fr': 'Amérique du Nord',
                         'ja': '北アメリカ',
                         'pt-BR': 'América do Norte',
                         'ru': 'Северная Америка',
                         'zh-CN': '北美洲'}},
 'country': {'geoname_id': 6252001,
             'iso_code': 'US',
             'names': {'de': 'USA',
                       'en': 'United States',
                       'es': 'Estados Unidos',
                       'fr': 'États-Unis',
                       'ja': 'アメリカ合衆国',
                       'pt-BR': 'Estados Unidos',
                       'ru': 'США',
                       'zh-CN': '美国'}},
 'location': {'accuracy_radius': 1000,
              'latitude': 45.8491,
              'longitude': -119.7143,
              'metro_code': 810,
              'time_zone': 'America/Los_Angeles'},
 'postal': {'code': '97818'},
 'registered_country': {'geoname_id': 6252001,
                        'iso_code': 'US',
                        'names': {'de': 'USA',
                                  'en': 'United States',
                                  'es': 'Estados Unidos',
                                  'fr': 'États-Unis',
                                  'ja': 'アメリカ合衆国',
                                  'pt-BR': 'Estados Unidos',
                                  'ru': 'США',
                                  'zh-CN': '美国'}},
 'subdivisions': [{'geoname_id': 5744337,
                   'iso_code': 'OR',
                   'names': {'de': 'Oregon',
                             'en': 'Oregon',
                             'es': 'Oregón',
                             'fr': 'Oregon',
                             'ja': 'オレゴン州',
                             'pt-BR': 'Oregão',
                             'ru': 'Орегон',
                             'zh-CN': '俄勒冈州'}}],
 'traits': {'ip_address': '54.70.157.111'}}

geoip2 で得られる位置情報は location.latitudelocation.longitude となっているのでこれを location.latlocation.lon に変更する必要がある。また、Elasticsearch で geo_type に指定したフィールドは latlon 以外のフィールドを入れるとエラーになってしまうようなので time_zone などのフィールドを削除するか、geo_point 用のフィールドを別に作成する必要がある。

今回はとりあえず raw から location を引っこ抜いて latitudelongitude だけ戻すことにした。

my_geoip.py

#!/usr/bin/env python3
import sys
import json

import geoip2.database
reader = geoip2.database.Reader('GeoLite2-City.mmdb')

from dns import resolver
resolv = resolver.Resolver()
resolv.nameservers = ['8.8.8.8']

while True:
    line = sys.stdin.readline()
    if len(line) < 1:
        continue
    source = json.loads(line)
    if None:
        pass
    elif source['record_type'] == 'A':
        ip = [x.address for x in resolv.query(source['host'])][0]
    elif source['record_type'] == 'PTR':
        ip = '.'.join(reversed(source['host'].split('.')[0:4]))

    geo      = reader.city(ip)
    geo_raw  = geo.raw
    location = geo_raw.pop('location')
    geo_raw.update({
        'location': {
            'lat': location['latitude'],
            'lon': location['longitude'],
        }
    })
    source.update(geo_raw)
    sys.stdout.write(json.dumps(source))

GeoIP フィルターとやりとりする部分を書いていく。バッファはメモリに配置。<format></format><parse></parse> が以前のバージョンになかったので悩んだ。chunk_limit_records は控えめに 100 くらいからスタート。

Elasticsearch プラグインlogstash_format を使う場合、外部フィルターから返ってきた JSON から再度時間を抽出する必要がある。

fluent.conf

<match unbound.log>
  @type   exec_filter
  tag     exec.unbound
  command ./geoip_unbound.py
  child_respawn inf
  <format>
    @type json
  </format>
  <parse>
    @type json
  </parse>
  <buffer>
    @type memory
    retry_max_times 0
    chunk_limit_records 100
  </buffer>
</match>

# 出力確認用
<filter exec.unbound>
  @type stdout
</filter>

🤔 何かデータ入ってこないけど、もしかして Python 死んでない?

暫く Fluentd を動かしているとリトライが発生し、それが発生するとその後 Elasticsearch にまったくデータが入らなくなるということがあった。最初は Elasticsearch のパフォーマンスの問題かと思い Fluentd を都度再起動していたが、実は外部コマンドの Python でエラーが発生し、しかも異常終了や再起動せずに Fluentd がひたすらリトライをするためだった。(ps コマンドで見ても Python が生きているように見えてしまう)

child_respawn inf を設定しておけばプロセスが終了してしまった場合に立ち上げなおしてくれる。一度止まった場合、今回は再試行する意味がないので retry_max_times 0 で即時再起動するようにしている。

Respawn command when command exit. Default is disabled. If you specify a positive number, try to respawn until specified times. If you specify inf or -1, try to respawn forever.
一部のレコードでエラーが発生しても動き続けてしまうのでしっかりデバッグを行ってから設定の有無を決めた方が良い。

Elasticsearch のマッピングをする

Fluentd から Logstash 形式でデータを投入する場合、日付ベースで自動的にインデックスが作成されるが、マッピングをしたい場合は template_file などを使うらしい。今回は緯度経度情報を扱うため、位置情報フィールドに対して geo_point 型の指定が必須になる。

Fluentd の Elasticsearch プラグインの仕様が変わると面倒だったので、Elasticsearch 側にテンプレートを用意して自動的に適用されるようにしておくことにした。データ量が多くなることに備えて refresh_interval をデフォルトの 1s から 30s に変更。string 型のデータは解析する必要もないので text ではなく keyword で入るようにしておく。_allfield_names のフィールドは使わないので無効にしている。

@timestampepoch_second を使う場合はここで設定しておけばよい。

Kibana - DevTool

PUT _template/unbound
{
  "index_patterns": "unbound-*",
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0,
    "refresh_interval": "30s"
  },
  "mappings": {
    "_doc": {
      "_all": {
        "enabled": false
      },
      "_field_names": {
        "enabled": false
      },
      "dynamic_templates": [
        {
          "strings": {
            "match_mapping_type": "string",
            "mapping": {
              "type": "keyword"
            }
          }
        },
        {
          "geo_point": {
            "match": "location",
            "mapping": {
              "type": "geo_point"
            }
          }
        }
      ]
    }
  }
}

Fluentd から Elasticsearch にデータを投入する部分を書く

マシンが非力なので request_timeout をデフォルトの 5s から 30s に変更。type_name を指定しないと fluentd という名前になってしまうので type_name _doc を指定しておく。今回は logstash_format を有効にするのでレコードの @timestamp フィールドは自動的に作成される。

fluent.conf

<match exec.unbound>
  @type           elasticsearch
  hosts           localhost:9200
  type_name       _doc
  logstash_format true
  logstash_prefix unbound
  request_timeout 30s
  <buffer>
    flush_thread_count 2
  </buffer>
</match>

Elasticsearch に投入されたデータサンプル

{
  "_index": "unbound-2019.03.04",
  "_type": "_doc",
  "_id": "JrzHR2kBVp0AiN9YlA3_",
  "_score": 1,
  "_source": {
    "city": {
      "geoname_id": 5714964,
      "names": {
        "en": "Boardman",
        "ru": "Бордман"
      }
    },
    "country": {
      "geoname_id": 6252001,
      "iso_code": "US",
      "names": {
        "en": "United States",
        "de": "USA",
        "es": "Estados Unidos",
        "ja": "アメリカ合衆国",
        "ru": "США",
        "pt-BR": "Estados Unidos",
        "zh-CN": "美国",
        "fr": "États-Unis"
      }
    },
    "continent": {
      "geoname_id": 6255149,
      "code": "NA",
      "names": {
        "en": "North America",
        "de": "Nordamerika",
        "es": "Norteamérica",
        "ja": "北アメリカ",
        "ru": "Северная Америка",
        "pt-BR": "América do Norte",
        "zh-CN": "北美洲",
        "fr": "Amérique du Nord"
      }
    },
    "subdivisions": [
      {
        "geoname_id": 5744337,
        "iso_code": "OR",
        "names": {
          "en": "Oregon",
          "de": "Oregon",
          "es": "Oregón",
          "ja": "オレゴン州",
          "ru": "Орегон",
          "pt-BR": "Oregão",
          "zh-CN": "俄勒冈州",
          "fr": "Oregon"
        }
      }
    ],
    "record_type": "A",
    "host": "elastic.co",
    "location": {
      "lon": -119.7143,
      "lat": 45.8491
    },
    "registered_country": {
      "geoname_id": 6252001,
      "iso_code": "US",
      "names": {
        "en": "United States",
        "de": "USA",
        "es": "Estados Unidos",
        "ja": "アメリカ合衆国",
        "ru": "США",
        "pt-BR": "Estados Unidos",
        "zh-CN": "美国",
        "fr": "États-Unis"
      }
    },
    "traits": {
      "ip_address": "54.70.157.111"
    },
    "client": "127.0.0.1",
    "postal": {
      "code": "97818"
    },
    "@timestamp": "2019-03-04T17:17:19.731231232+09:00"
  },
  "fields": {
    "@timestamp": [
      "2019-03-04T08:17:19.731Z"
    ]
  }
}

Kibana でダッシュボードを作る

Fluentd から送られてきたデータを Coordinate Map、Heat Map、Line で可視化する。

こうして見てみるとただブラウザで調べごとをしていたりするだけでも案外いろんな国にまで行っているのだなぁと感じる。

f:id:mattintosh4:20190304190438p:plain
Kibana - Dashboard

しばらく監視してみておかしなサイトに繋ぎに行ってないかとか発見出来れば面白いかな。

プロキシサーバのログも解析したいけど今日はもう疲れたのでまた今度にする…( ˘ω˘)スヤァ


以下後日談

ホストだと分割されすぎるので tldextract でドメイン名だけを抜き出す

Google だけでも

とか色々あるのでサブドメインを取り除いたドメイン名だけ欲しくなった。Stack Overflow を見ると「. でスプリットして後ろから2つ取ればいいのでは?」みたいな回答多かったけどそれじゃ .co.jp とか困るやろ、ってなったのでちゃんと解析してくれるモジュールを探したら tldextract っていうのがあった。

github.com

Console

pip3 install tldextract

属性はこんなものがある様子。欲しいのは registered_domain

count
domain
fqdn
index
ipv4
registered_domain
subdomain
suffix

Python3

import tldextract
rd = tldextract.extract('www.yahoo.co.jp').registered_domain

Return

'yahoo.co.jp'

GJ