AWS Kinesis Stream を大規模データで検証してわかったことの事例紹介

10分間で数億件を超えるIoT関連のデータをストリーム処理するためのPoC(概念実証)をする機会がありました。そこで経験をしてハマったことなど一部事例として紹介します。

検証の概要

まずは、何のために検証しようとしたかというと...

IoT機器からKinesis Streamを通して大量のメッセージを受け取り、分散処理で関連付け処理を検証します。

実証検証の目的としては

  • アーキテクチャ構成の妥当性を検証し
  • コスト軽減のポイントを把握
  • 運用に向けた課題を洗い出す

としました。

データと要件の特性

今回の検証用データは自前で生成し投入する必要があり、結構なデータ量となります。

  • トラフィック量 およそ数十万件/秒(数億件/10分)
  • 時間帯によってバースト(スパイク)が存在する
  • メッセージの関連付け処理を行うが、いつ終点メッセージが届くかわからない。
  • 数分後には処理結果を利用したいのでバッチでは実現が困難

毎秒平均的にトラフィックを流し続けるのはそれなりのチューニングが必要でしたので、そこに時間をかけず、波はありながらも、都合およそ数十万件/秒となるようにメッセージを10分間流し続けました。

Kinesis Streamについて

Kinesis Streamを挟む狙いとしては、アーキテクチャとして要件にあっているということに加え、下記のような特性を活かせるという狙いから採用候補としました。

  • 受け取ったストリームデータを損失しないようにする(リトライ含み)
  • ストリームからデータ取得を担うConsumerが自分のペースで処理を進められる
  • 安定的にストリーム向け分散メッセージキューを運用できる
  • ストリーム処理を繋ぐところのIn/Outを疎結合にできる
    • 疎結合にすると言っても、お互いがうまく結合できるように動作確認はしておきたい
      • Producerは大量データでもデータを全てKinesisに送信できているか
      • Consumerは大量データでもデータを全てKinesisから受信できているか

大規模データと小規模データの違いでおこったこと

規模の捉え方は、格納するメッセージ容量や要件によって様々ですが、今回は数百バイトのメッセージを大量に扱うという特性をもっていました。10分間の間に扱うメッセージ数は、

  • 小規模検証(ロジック動作確認)は数百万程度のメッセージで行う
  • 大規模検証(運用シュミレーション)は数億程度のメッセージで行う

としました。

Kinesis Stream シャード制限とリソースコストの関係

これくらいのトラフィックだと、処理の効率化でインフラコストが大きく変わってきます。いかにしてそれを抑えることができるかを探りました。

Producerの視点で

Kinesis Streamの料金は主にシャード数で決まってきます。

まず、Kinesis Streamを使うときは、自分たちがどれくらいの性能を必要としているのかを把握します。1シャードあたり1000メッセージまたは1MB/秒の制限がありますので、これを元にシャード数を指定してStreamとしてのスループットを確保します。

メッセージを投入する際、Kinesis Producer Library(KPL)を使うと便利です。シャードスループットを超えるリクエストがあった場合は、KPLが自動的にリトライをしてくれます。

Producer側からみるとより少ないシャード数で効率よくメッセージを詰め込むことで、Kinesis利用料を削減することができるのです。

今回のように小さなサイズのメッセージを大量に流したい場合、シャードあたりの容量制限に余裕があるものの、先にメッセージ数上限に達してしまい、シャード分散量が増えてしまいがちです。そういう時に使えるアプローチがユーザレコードを集約する機能です。

仮に1メッセージが100バイトである場合、10メッセージを1メッセージとして送信すれば、それだけでスループットを10倍稼ぐことが出来ます。同様に1000メッセージを1メッセージに集約させれば約100キロバイトになるので、理論上は1000倍のスループットとなります。

KPLを使ってレコードを集約しKinesis Streamに送信すると、総送信回数も減るので、負荷も軽減されメッセージ送信完了までの速度もあがります。

Producerのインスタンス性能も下げることができ、結果的にKinesis StreamとKPLを実行しているEC2両方の価格を抑えることに繋がります。

いいことづくめですね。

大規模データでKPLを使ったら...

ここまでは、AWSの公式ドキュメントをみるとわかることなのですが、数百シャードを使って分散されたProducerがレコードを集約して送信したところ、小規模で試した時より送信スループットを得ることができませんでした。

なぜこういうことが起こるのでしょうか。

Kinesisの内部実装を把握しているわけではないのですが、発生した現象をもとにした考察推測を紹介します。

※後日AWSサポートに問い合わせようかなと考えています

Producer側ではHashKeyを明示的に指定せずにランダムのPartitionKeyを指定して実装していました。シャード数が増えたことにより、KPLによって集約された1レコード内に異なる多数のシャードに格納すべきとされたレコードが存在してしまったため、KPLからリトライするメッセージが増えてしまっていると考えられます。

チューニングすれば解消できる問題ですが、シャード数が多くなればなるほど、シャードが分散されていることを考えてどう処理されるかを意識し実装しないと、Kinesis Streamの性能やコスト削減を最大限にすることから遠ざかってしまいますね。

Consumerの視点で

続いて、Consumer処理です。こちらは、Kinesis Client Library(KCL)を使うことで、メッセージを取得する仕組み・状態管理をライブラリに任せることができます。

KCLは下記のような特徴を持っています。

  • 複数起動されたKCLプロセスごとに担当シャードが決まる
    • 均等に担当量が配分される
    • KCLがワーカースレッドを作りメッセージを取得する
      • 担当シャードの数だけワーカースレッドが立ちあがる
  • 分散起動されたKCLの1台が落ちても他のKCLがシャードを引継ぐことができる

シャード数が数百を超える規模になってくると、1台のインスタンスのCPU Coreで賄えなきれなくなることは明白で、どのインスタンスタイプでどれくらいの分散度にするのかが重要なチューニングポイントとなってきます。Consumer側は

  • 相互関係にあるパラメータ
    • Kinesis Streamシャード数
    • EC2コア数
    • EC2インスタンス数

の関係性をパタン化し測定しました。

例えば、EC2インスタンスはc4.8xlargeだと36Coreとなり、c4.4xlargeならば16Coreなので、Core数という面ではc4.8xlargeは有利なのですが、分散化して測定してみると、c4.8xlargeで少ない数のインスタンスで総Core数が多いケースより、c4.4xlargeで多い数のインスタンスで、総Core数が少ない方が処理効率は良い結果となりました。 (あくまで本検証ケースの実装とデータの結果となりますので、異なる前提条件下では同様の結果になるとは限りません。)

大規模データでKCLを使ったら...

Consumer周りでも大規模データでの検証になると問題が発生しました。

具体的にいうと、Producerのメッセージ送信量の要件にあわせてシャード数が多くなった場合、KCLの分散度やWorker数が多くなるのですが、メッセージの取りこぼしが発生しました。

調査を重ねた結果、KCLが状態管理に使用しているDynamoDBのKinesisストリームアプリケーションテーブルに起因することがわかりました。KCLでは起動時にDynamoDBにテーブルを自動生成するのですが、DynamoDBの初期設定Read・Writeともキャパシティ値を超えてKCLからアクセスが発生していたことが原因でした。

KCLが自動生成したテーブルは、Read・Writeとも10ユニットで設定されます。

測定してみたところ、数百シャードを超える場合は、ピークのアクセスとなる時に

  • Read性能はデフォルト値前後を推移
  • Write性能は10倍以上が必要

ということがわかりました。

そこで、2017年6月にリリースされたDynamoDBのオートスケール機能を使います。 KCL起動前にCLIを使って手動でテーブル作成しておきキャパシティをオートスケーリング設定にしておくことで、Consumerのメッセージ取りこぼしが発生しなくなりました。

大規模データでの検証を始める前は、管理テーブルだからそんなにアクセス量はないだろう とか KCLがいい感じにやっておいてくれるだろうと想定していました。ところが分散数が増えてくると様々なところで小規模検証では無視できた負荷が大きく現れてきて、様々な事象が発生しました。

まとめ

今回は2ケースほど大規模ならではの事例を紹介しました。他にも紹介しきれなかった事象もあり、事象から一つ一つ原因を探っていく地味な作業を繰り返したくさんの知見を得ることができました。また、検証にかかったインフラ利用料金も想定の範囲内に抑えることができ、PoCでの目的を達成することができました。

大規模検証はやってみて初めて発生する事象に出逢うことはよくあることで、逆に言うと事前検証なしで大規模に適用するアーキテクチャを決めることは、何か課題が発生した場合、アーキテクチャを根幹から変更しなくてはならないレベルの影響が及ぶことになりかねません。時間と予算をとってPoCをすることの有用性を改めて感じました。

検証を通してKinesis自身の安定性は抜群で、かつ料金も妥当と思えるものだったので、今後もマッチする要件にはKinesisを提案していく機会が増えていくことでしょう。また、こういう経験をアウトプットすることで、みなさんに少しでも役だてていただければなによりです。