Scala先駆者インタビュー 最終回 ChatWork かとじゅんさん 〜前編〜
めぐりめぐってたどり着いた最終回
-- 過去7人の先駆者をご紹介しましたが今回で最終回を迎えます。最後は水島さんからのご紹介で、かとじゅんさんです。水島さんとかとじゅんさんの出逢いの頃からお話をきかせてください。
かとじゅん:水島さんとの出会いは、ちょうどScalaを実践投入しようと思っていた時でした。
水島:実は、そもそも自分がかとじゅんさんといつどこで出会ったのかをあまり憶えていないんですよね......(笑) かとじゅんさんが書いていたブログに時々出没してコメントしていたのは憶えています。
かとじゅん:ブログにもよくコメントいただいてましたよね。僕が初めて水島さんと出会ったのは、確か僕がD社時代にまだScalaを実践投入してない段階で、Scalaを勉強するために一度社内勉強会を開いたことがあって、そこに水島さんを呼んだ覚えがあります。
水島:なるほど、そう言われると加藤さんに呼ばれた記憶はあります。
かとじゅん:あとは、吉田さんやゆろよろさんも来てくれた気がします。ただ、今みたいには盛り上がっていない状況ではありました。僕が最初に水島さんを見かけたのは、2007年頃に筑波かどこかで開かれたJJUGでScalaの紹介をしていた時だったと思います。
水島:はい、していましたね。
かとじゅん:その頃の僕は、Java寄りの考えがすごく強かったので、Scalaを見た時は「全然訳がわからないな」と感じて1回身を引いたことを憶えています。そこから、3年くらい時間を空けた2010年頃、ちょうどドメイン駆動設計をやり始めて、Viewモデルとドメインモデルやドメインモデルとテーブルモデルの、レイヤをまたいだモデルの変換をmap, foldなどの高階関数を使って考えていた時にScalaをやり始めました。きっかけとしては、Asakusa Frameworkを開発している荒川さんから、そういった問題解決には関数型の考え方が役に立つのでScalaをやってみると良い、というアドバイスをもらってそこからアクセルをかけてScalaを実践的に始めました。
色々な障壁がありながらもScala導入の土台作りに取り組んでいました
水島:加藤さんがScalaを使い始めたのは、どちらかというと、プログラミングの面よりも設計手法として見たところからなんですね。
かとじゅん:そうですね。Scalaを学びたいというよりも問題解決のための設計手法を学びたかったという気持ちがあって、オブジェクト指向をベースにしながら関数型の考え方を取り入れているところは、僕の中ではすごくフィットしてました。実際にやってみるとすごくやりやすかった印象があります。Scalaでは、Javaの資産との結合部分にnullが登場して扱いが辛いなどという話はありますが、mapやreduce、foldなどミクロな解決手法を組み合わせて大きな問題を解くというコンセプトがドメイン駆動設計をやるときにマッチした気がします(ただ、高階関数の名前はユビキタス言語には対応付かないことが多いので、ドメイン層のI/Fではなく、その内部実装で使えそうという話です)。そこが出発点で、そこからはひたすらそういうことばかり考えていた気がします。この時は、まだD社の社内で導入できるかどうかというところではありました。
水島:2011年8月頃に第1回Scala会議を、かとじゅんさんが開催しましたよね。
かとじゅん:やりましたね!いろんな人に助けてもらって開催できましたね。感謝しかないです。記憶が定かではないですが、その年の12月に第二回も開催していたようです…。Scala人口を増やすためにScalaのコミュニティーを育てて、Scalaを使うための土壌を広げたかったという思いがありました。手段を目的化してしまっているような側面もあったのですが、実用言語として実力のある言語だとその当時も感じていたので、課題感が同じ人達が集まって情報共有できると良いなということでやっていました。
-- その当時で何人くらい集まっていたのでしょうか。
かとじゅん:80人くらいです。
水島:今でこそ、ScalaMatsuri(2017年開催はこちら)は海外の人も含めて大勢の方に参加していただけるようになりましたが、当時の浸透度や認知度からすると、かなりの人数が集まっていたと思います。
かとじゅん:今では、ScalaMatsuriの規模感が当たり前みたいになってしまっていますが、僕からすると、あの当時と比べても短期間で劇的に変わったなという印象です。
-- それは日本だけでなく、海外で見ても同じような盛り上がりなのでしょうか。
水島:そうですね。最近はApache Sparkを使うというところからScalaを使い始めたという人達も多くなっていますよね。
かとじゅん:Sparkを使いたいからScalaを使うのは当たり前みたいな話もあるようですから、違和感なく使い始める人も多くなっていると思います。
水島:Scalaに関して言えば、Sparkが一つの火付け役になって、キラーミドルウェアとして良い影響を与えてくれたと思います。
かとじゅん:Scalaをやり始めた当時は、Scalaをプロダクトで採用する時にはすごく苦労しましたね。PHPとJavaの運用実績があるなかで、それ以外を導入すると運用が大変になるという理由から、新しいモノを導入することについては、インフラを担当する方達が簡単には首を縦に振りませんでした。ただ、Scalaが良かった点として、伝統的なJavaのランタイム上で動くということがかなりのアドバンテージになっていたのは事実です。あの当時は、Javaであるということが導入障壁を下げたという感じです。コンパイルが遅いなどの辛さはありますが、JVM上で動くのであればインフラの人も既存の知識とツールで対応できる、というところがハードルを下げましたね。Scalaと言わずに「Javaのアプリケーションです」と言って使っていました(笑)
水島:今のD社だとチーム単位に裁量があって責任を持つという空気感がありますが、当時は違った雰囲気だったのですね。
かとじゅん:インフラの最高責任者の人が首を縦にふらないと新しい言語やミドルウェアは採用できないという風潮がありました。これは、会社とかインフラ部門が悪という話ではなく、全体最適を考えた場合にすごく妥当な判断だったと思うんですよね。でも、僕らは個別最適も必要と考えていたので、そういう難しいゲームのルールを把握し、インフラ部門の協力者も得て、Scalaの導入に成功したという感じです。ちなみに、D社でScalaを最初に取り入れたのが、友人で元同僚のヨシオリで、彼が当時のチームでKestrelを導入しました。僕がPlay FrameworkでRESTサーバを書いていたより前に本番環境に乗っかっていたようです。だから、一緒に飲むと、彼に”最初に道を作ったのは俺だぞ”とツッコミをもらうことがあります(笑)
ただ、Scalaが黎明期だったということもあって、そういうことを大手を振って発言できないカルチャーではありました。なので、D社でScalaを採用しますと最初に表立って発言したのは僕かもしれないですが、それ以前にも隠密で実践している人達はいました(笑)
水島:ひっそりとですか(笑)
かとじゅん:今でこそ、非同期・ノンブロッキングで処理をしたいとなったら一つの選択肢としてScalaは有力候補になるのですが、当時はPHPでもメッセージキューを使えばできるだろうということを言われましたね…。
土壌がない状態でやるからには覚悟を決めて実践してました
-- 何もないさら地からScalaを導入したと言った感じでしょうか。
かとじゅん:最初はScalaの土壌が無い状態で導入を図っていたこともあってか、上司には"ほんとに大丈夫?"などと心配されるようなことがありました。まぁ、上司としては適正な振る舞いだと思います(笑)。そういった事情もあって、Javaアプリケーションだということだけを伝えて(ScalaコンパイラとJVMという分離された信頼性という担保があったから)、表立ってはScalaとは言わずに採用するということをしていました。「許可を求めるのなら謝罪しろ」ということではないですが、エンジニアが良いと思ったものを裁量で導入して、上手くいかなければ謝る覚悟を決めて実践していました。これを真似すればうまくいくとはいいませんが、技術的に裏打ちされた、現場の強い覚悟みたいなものがありましたね…。
-- 一方で、チーム内での評判はいかがだったんでしょうか。
かとじゅん:チームメンバーの理解が得られやすいものという一定の基準は当然ありますね。実際に、メンバーには新しいものが好きな人が多くて、そもそも大学で関数型の研究をやってた人やOCamlができる人が自分の周りにいたので抵抗なくやっていました。たまに、なぜScalaは引数を型推論できないのかといったような内容の質問を受ると、困るので、「そういうのは水島さんに聞こう」と言って水島さんに質問したりしていました(笑) そういう意味では、無理なく浸透した感はありました。
-- 水島さんは結構そういう質問を受けたりするんですか。
水島:そういうのはしょっちゅう聞かれていました(笑)引数の型推論の話について言えば、nominalとstructureのタイプの引数の推論について、OCamlの例などを挙げてモヒカンばりに突っ込んだりしていました。
最近では問題解決手法の一つとしてのScalaが段々と認知されてきたように感じます
かとじゅん:さきほどモヒカンという言葉が出ましたが、当時は、社内勉強会に参加してくれていたモヒカン達が勉強会でモナドに代表される型クラスの話を普通にしていて、そういう話を聞いたScala入門者の人達がカルチャーショックを受けて良い刺激になる、というのが何回か繰り返されるのを見てきました。もちろん、実際にScalaを実践投入するならPlay FrameworkやFinagleが良いといった実用的な面での情報交換もありました。ただ、単に仕事で使う言語というだけでなく、自分がもっと上手く扱うことができればコードの表現力を向上させることができると話す人達が結構いて、実用でありながら工学的なことも考えるというコミュニティーのカルチャーにはJavaコミュニティーには感じられなかった学ぶべき世界観があったと感じています。
水島:それもありますが、実用で使っている人でもScalaから何かを学び取ろうという感じで触っているというコミュニティーのカルチャーはあるかと思います。
かとじゅん:そうですね。実用もさることながら、Scalaを使って何かやってみよう、学びを得ようといった入り方は共通認識として通じていましたね。
-- 関数型から何かを学びたい活かしたいという流れから、数年前にFunctional Reactive Programming(FRP)というプログラミングモデルが話題になりましたよね。
かとじゅん:言わずもがな、FRPを学ぶには関数型プログラミングの知識は必要ですね。ChatWork社の大阪のメンバーも数年前からFunctional Programming in Scala(和訳版はScala関数型デザイン&プログラミング ― Scalazコントリビューターによる関数型徹底ガイド)の勉強会を月1回のペースでやってます。色々な人が集まる中で、Haskellなどの関数型プログラミングに詳しい方からサポートを受けながら進めているそうです。
余談ですが、関数型はパラダイムが違うので難しいと言う人は多いですが、モダンなJavaScriptを書く人達は、高階関数なんて当たり前に使っているので、Scalaにも応用できる広い世界観を持っていると感じています。core.jsのインタフェースを見てるとmapやreduceは元々あって、Scalaに慣れている僕からするとすごくやりやすいです。ファーストクラスファンクションなカルチャーだから自然に慣れてしまうんだなと思いました。逆に、兼ねてからJavaを使っている人達からは、JavaSE 8で導入されたStream APIで記述されたコードを見ると何をやってるか分からないという、声も聞きいたりします。処理の効率性や宣言的な記述としては意味はあるんだろうけど、理解し難いというメンタリティーはあると思います。
ただ、Microsoftが.NET向けのRx(Reactive Extensions)を開発したきっかけで、.NETだけでなくJavaScriptをはじめ様々な言語に広まったことは有名ですが、Springも力を入れているようでReactive StreamsのAPIに準拠したReactorというプロダクトを開発しているようです。そういう意味ではリアクティブシステムに関連する話題が増えました。つまるところ、最近の、ハードウェア・ソフトウェアに関連するエコシステムの変化は、マーケットの要求に追従するためにそういう方向に向かっていることは間違いないと思っています。だた、コミュニティーとしては、そこまでのメンタリティーを持って追従している人は少数で、多くの人がまだまだこれからという印象があります。
水島:私にもそう見えます。
かとじゅん:サーバサイドをやってるJavaエンジニアからすると、JavaScriptはできるだけ避けるという考え方もあると思いますが、少し触ってみると新しい発見があるかもしれませんし、それによってScalaに対しても意外に悪くないなという印象はもってもらえるかもしれません。少し語弊がありますが「JavaScriptに型がついた言語でしょ?」という感じで、すんなりScalaを使えるようになったJavaScriptエンジニアを何人も知っています。 あ、誤解しないで欲しいのですが、Javaは素晴らしい言語だし、Scalaを学ぶのにJavaScriptを学ぶことが必須と言っているわけではなく、他のパラダイムにも触れることに意味があるって話です(笑)
僕がそうだったのですが、一つの言語で長く育ったエンジニアは、その言語によって価値観が固定化されやすい傾向にあると思っています。これは無理もないことですが、その前提で他の言語を見た時に違和感を覚えたりしますが、少しでも他の言語にも触れていくと、自分が普段使っている言語を違った視点で見ることもできます。エンジニアとして視野が広いということは、そのような多様な価値観を把握し、どういう使い方をすれば、役に立つか提案できることではないかと思っています。
水島:Springでも、Functional Web FrameworkというJava8のラムダに基づいたアノテーションベースでないものが出ていているので、そういう風がJavaにも来てるのかなと思います。
かとじゅん:Spring ReactorはFunctionalな感じがしますね。
水島:この辺りを見ると従来のSpringと世界観が違いますね。
かとじゅん:関数型プログラミングやリアクティブシステムなどの考え方がオブジェクト指向言語のなかでも問題解決の1つの手法として認知されてきたのかなと思います。
リアクティブシステムなどの文脈と絡めてScalaの知見がもっと広まると良い流れができそうです
-- リアクティブシステムをSIで適用しようとした場合に、今まではSpringベースでやっていたものをリアクティブシステムに変えるとなると壮大なプロジェクトになりそうですよね。逆に、規模が小さすぎるとメリットを活かしきれなさそうでもありますし、どういう風に導入していければ良いのかなと悩みがあります。それと、どうやって引き継いでいくといいのかという懸念があり、そんな中でも実践を重ねていきたい気持ちもあります。
かとじゅん:教育というところは確かに難しいですね。でも、メッセージパッシングやSupervisorだってErlang/OTPの歴史を紐解けば別段新しくはないですし、リアクティブシステムを構成する一つ一つの考え方(メッセージ駆動、耐障害性、弾力性、即応性)は昔からあったものですよね。ただ、コンセプトとして名前をつけて体系化して共通理解を得やすくなったのは割と最近になってからという気がします。エコシステムとして理解して実用できるまでが難しいって感じかもしれません。
-- ひと言で言って分かりやすくマニフェスト風にまとめたリアクティブ宣言もそういう助けになっていますよね。
かとじゅん:Many Core時代と言われて久しいですが「サーバを跨いでコアを使い切りたい」「サーバのダウンタイムを限りなく0にしたい」「レスポンスタイムを限りなく短くしたい」「終わりのない大量データを効率的に処理したい」など、文脈に合わせて新しい価値観として作り上げたところに大きな意味があります。リアクティブ宣言のコンセプトに賛同している企業も多く、今後も少なからずITを通じていろいろな業界に影響を与えていくのではないかと思っています。そして、その道を最初に作ったJonas Bonerさんはすごいなって思います。
-- SIにもそういった流れが訪れて欲しいものです。
かとじゅん:SIの場合だと、リアクティブシステムを採用して事業的にも良いフィードバックを出せるのかもしれませんが、事業が成功するための変数は他にもあるので導入は簡単ではないですよね。ともすれば手段が目的化してしまうので。リアクティブシステムは考え方がガラッと変わるので、設計、運用面、人材確保など諸々難しい問題はありますね。先ほどおっしゃったように小さすぎる案件だとすぐ終わってしまうので、費用対効果が合わないという問題もありますね。まぁエンジニア個人としては素振りしておきたいというのはありますが、ビジネスとなるとKPIにインパクトを与えられないと難しいですね。
-- ビジネスのことや運用上のことも考えて価値を出していくというのが大事なので、そういうような知見があるかどうかの差が大きいのかなと思います。
かとじゅん:そういう意味だと、Web業界で大規模なサービスをやってるドワンゴさんやLINEさんといったところが、AkkaやErlangの実用経験を通して溜まった知見をコミュニティーの中でフィードバックしていければ、助けが得られて良い形で広がっていくのかなと思います。個人的にはTISさんに頑張ってもらいたいなという思いもあります。
水島:TISさんは実際にLightbendとパートナーシップを持ったり色々試行錯誤もされていますね。
かとじゅん:前出さんは最近akka.ioの日本語訳サイトを立ち上げたので、そこにも段々と人が集まってきて、Akkaを使う人が増えていくのではないかと思っています。
-- 前出さんが以前言われていたのは、TISさんだけでなく他の企業さんにも頑張ってもらうと、Scalaの導入障壁を下げることにも繋がると仰られていました。
かとじゅん:最近は、僕も利用者の裾野を広げるためにScalaと組み合わせてドメイン駆動設計(以下 DDD)やリアクティブの話をしているのですが、理解を深めてもらうための活動としてこれからもやっていきたいと思っています。
コンピュータリソースをフル活用するためのプログラミングモデルとは
-- 実際にそういう知見を共有していただける機会などは予定されているんでしょうか。
かとじゅん:仕事ではAkkaを使い込んでいるので、今まさに溜まっている知見をScala Matsuriの時期には共有できる見込みです。実際に、ビジネスロジックでもAkka Streamsで書いているので、プログラミングモデルが今までとは全く変わってしまうことを目の当たりにしています。これは個人的な興味の範囲ですが、PersistentActor(永続化可能なアクター)とActorの位置透過性を利用して、ドメインモデルや集約をどのノードに配置されていてもActorを利用できるようにすることで、スケールアップもスケールアウトも同じプログラミングモデルで実現できるようなシステムを作ってみたいと考えています。
-- 6年ほど前に、「マルチコアをフル活用するためのプログラミングができる人でなければこの先生き残っていけないし、リソースをフル活用できないと未来はない。」といった話を丸山先生が言っていたのを聞いて衝撃を受けました。最近になってクラウドが台頭してきて、今まさに直面しているという状況ですよね。
かとじゅん:CPUリソースの面でいうと、マルチコア化以降では、単体のコアのクロック数が変わらないか、下がっていく傾向にあるので、それを意識しないプログラミングをしてしまうとスループットが下がるという問題に直結します。単一サーバでマルチコアをいかに使い切るかという話も当然ありますが、さらにサーバを跨いでコアをいくつ使えるかという視点に移っていると思います。CPUリソースの規模の指標が、今まではサーバ台数でしたが、今ではサーバを跨いだコア数に変わってきています。
水島:たとえばIntelのCore i7の性能を見ても、実は最新モデルの1コアあたりの処理性能はあまり伸びていないですよね。8コア16スレッドなど、コアが増える傾向にあるように見えます。
-- AWSのインスタンスでも1TBのメモリを活用できるようになったので、メモリもCPUコアも増やせるようになりましたね。性能の良いインスタンスでスケールアップできて、性能がそこそこのインスタンスもスケールアウトが出来るので、コアやリソースを使い切るというところがコストに響いてきてお客さんに提供出来る価値にも影響してくるのかなと思います。
かとじゅん:そうですね。スケール戦略を実現する実装手段はいろいろありますが、その1つであるアクターモデルで考えるならば、今までメソッドベースでやってた人達が、アクターが持つ、Fire-and-Forgetのカルチャーに馴染むことができるか否かがポイントになります。
水島:語弊があるかもしれませんが、Javaの視点のまま学んでしまうと途中でつまづくところがあるかもしれないです。
かとじゅん:必ずしもそうではないですが、つまずく傾向にはあるかと思います。
そういう意味では、非同期に処理すると一口にいっても、Promise(Future)、Actor、Reactive Streamsなどの複数の選択肢がありますね。それぞれの目的と向き不向きを実際に試しながら理解することが必要だと思います。
-- 常にいろんなものを触ってみたり新しいことを身につけていくということが大切ということでしょうか。
かとじゅん:新しいものでも古いものでも、そういうスタンスに違いはないと思います。複数の選択肢があると目的に合わせて選ぶのが難しい側面がありますが、解決方法は解く問題に合わせて選ぶことが大切ですね。エンジニアの基本的なスタンスとして必要じゃないかと。まぁ、これはScalaに限った話ではなく、どの言語でも共通であって、そこで思考停止しない方がよいという意味になると思います。
ChatWork社では常に新しいことをやらせてもらっていて最近はAkkaと戯れる日々です
水島:現在の話になりますが、ChatWork社での、かとじゅんさんはどんな感じで日々過ごされているのでしょうか。
かとじゅん:まだ、あまり具体的なことは話せないのですが、ChatWork社での役割は次のプロダクトの開発で、現行の開発には関わっていないです。言うまでもなくScalaは使っていますが、そこそこ大きいサービスになってきたこともあって問題解決の手法も変わってきました。なので、メッセージ基盤として、ストレージはHBaseを使いながら、アプリケーションはAkkaのActorベースでやっています。前職ではFinagleを利用していたのですが、他と違ってAkkaの良いところはErlang/OTP由来のSupervisionがあるところです。Actorヒエラルキーの下位層のActorが障害を起こしても、上の監督者としてのActor(Supervisor)がそれらを管理することでリカバリもしやすくなる利点があります。耐障害性という面では魅力的な機構だと思っています。
-- レジリエンスのような話でしょうか。
かとじゅん: レジリエンスは回復力とか治癒力などという意味ですね。先ほども簡単に説明したように、下位層のActorで起きた障害はSuprvisorに通知され、必要に応じて再起動・停止・さらに親のSupervisorにエスカレートすることができます。例えば、子アクターでネットワークやディスクなどのIO例外が発生した場合は、起きた例外をSupervisorに通知し、Supervisorが子アクターに再起動を命じます。そうすると子アクターは破棄され、再び起動(リトライ)できるようになります。これは、障害が起きた部分を正常な部分から隔離するという可用性に優れた考え方で、let-it-crashとも呼ばれています。
無論、要件に応じて、無限にリトライしないようにもできますし、BackoffSupervisorを利用することで指数関数的にリトライまでの待ち時間を調整し、タイトなリトライループを回避して、過度な負荷がDBやネットワークなどにかからないようにすることもできます。
蛇足ですが、この際、アクターのインスタンスは入れ替わりますが、その参照であるActorRefは同じ値を示します。これはなかなかすごいことですが、プログラミング言語の標準機能で同様の実装をするのは骨が折れますが、Actor Systemの恩恵にあずかることで僕らエンジニアは本題に集中できるようになります。
今は実戦でやりきるだけのノウハウを溜めているところで、自らフィードバックできれば良いかなと思っています。話が少しそれましたが、僕の役割は次の事業の基盤になるような開発をすることです。
-- 普段は開発もこなしつつまとめ役のようなポジションでお仕事されているのですか。
かとじゅん:マネージャーやリーダーなど、チームをまとめる立場の人は別にいます。基本的にはチームで作業していて、その中でも僕は毎日Akkaと戯れているという感じです(笑)日々、Actorヒエラルキーをどう設計するかといったような会話をチームメンバーとしています。
ここから話が少し変わってしまいますが、個人的にもAkka Streamsが好きなので、個人活動としてRedisクライアントを題材に「車輪の再発明」をしてます。
Akka Streamsを知らない人に簡単にまず前提をお話すると、Akka Streamsではストリームの上流でデータを提供するSourceと、下流でデータを消費するSinkというAPIが提供されています。例えば、Sourceは、Source#singleで単一の値を取ったり、Source#applyでコレクションを取ったり、Source#fromFutureでFutureのインスタンスを取ったりできます。また、SinkはSink#foreachであったり、Sink#reduceなどがあります。これらを結合すると、ストリームとしての実行可能なRunnableGraphというオブジェクトが手に入ります。そして、RunnableGraph#runメソッドを実行するとストリームを実行できますが、SourceやSinkが実行中に内部で持つマテリアライズド・バリュー(以下 MVと略す)という値を取得することができます。
あまり内部がどのような仕組みで動いているか細かく説明しませんが、たとえば、数列を合計するようなストリーム処理は以下のようになります。
implicit val system = ActorSystem("myActorSystem")
implicit val actorMaterializer = ActorMaterializer()
// コレクションとして値を持つSource。MVはNotUsed。つまり使わない。
val source: Source[Int, NotUsed] = Source(1 to 10)
// ストリームデータをReduceするSink。MVはFuture[Int]
val sink: Sink[Int, Future[Int]] = Sink.reduce[Int](_ + _)
// 実行可能なグラフを生成。Keep.rightはストリームの右側にあるSinkが実行時に持つMVを取得することを意味する。
// Keep.leftはSourceのMVだがNotUsedなので取得しても意味がない。
val runnableGraph: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right)
// 実行可能なグラフを実行すると指定したMVのインスタンスが取得できる。
val future: Future[Int] = runnableGraph.run()
// MVからSinkが得た計算結果を得る。
val value = Await.result(future, Duration.Inf)
println(value)
また、以下のようにActorをSourceとして利用するストリームも作れます。少し難しく見えますが、実行すると前述と同じ計算結果になります。ストリームのデータサイズが固定できない場合や、アクターをインターフェイスにしたい場合などに役に立ちます。
// グラフを構築・実行し、Source.actorRefのMVであるActorRefとSinkのMVであるFutureを取得する。
val (actorRef, future) = Source.actorRef[Int](Int.MaxValue, OverflowStrategy.fail)
.toMat(Sink.reduce[Int](_ + _))(Keep.both).run()
// SourceとしてのActorRefを使ってストリームにデータを流す
for {n <- 1 to 10} {
actorRef ! n
}
// ストリームの完了を通知する。
actorRef ! Status.Success(1)
// SinkのMVから結果を取得する。
val value = Await.result(future, Duration.Inf)
println(value)
さらに、ちょっとコードが長いですが、以下のように、このグラフをアクター内部で構築・実行することも可能です(サンプルコードなので、Supervisionは考慮していません)。 Akka StreamのTcpにはoutgoingConnectionというFlowを返すAPIがあって、これを使うと非同期・ノンブロッキングI/Oができるので、これを使ってRedisクライアントを作って遊んでいます。
ちなみに、Flowは、Source, Sinkに並ぶコンポーネントで、上流から受け取った値に何かを行って下流に流すことができます。また、FlowをSourceに結合するとSourceに、Sinkに結合するとSinkになる特徴を持っています。なので、Source -> Flow -> Sinkのようなグラフを作ることができます。以下の例では、リクエストとしてByteStringを流すとレスポンスとしてByteStringが返ってくる Source -> Flow -> Sink なグラフを作っています。
class ClientActor(remoteAddress: InetSocketAddress) extends Actor {
// requestIdとsenderを紐づける
private def putSender(requestId: Long, sender: ActorRef): Unit = ???
// requestIdからsenderを取得する
private def getSender(requestId: Long): ActorRef = ???
// Source#actorRef
private val sourceActorRef: Source[(Long, ByteString), ActorRef] =
Source.actorRef[(Long, ByteString)](Int.MaxValue, OverflowStrategy.fail)
// RedisにTCPで接続するフロー
private val tcpFlow: Flow[ByteString, ByteString, Future[OutgoingConnection]] =
Tcp().outgoingConnection(remoteAddress)
// Sink#foreach
private val sinkForeach: Sink[(ByteString, Long), Future[Done]] =
Sink.foreach[(ByteString, Long)](responseWithRequestId => self ! responseWithRequestId)
// 内部でtcpFlowを使うが、(requestId, request) -> (response, requestId)にするFlow
private val connectionFlow: Flow[(Long, ByteString), (ByteString, Long), NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val requestFlow = b.add(Flow[(Long, ByteString)].map {
case (requestId, request) => (request.concat(ByteString("\r\n")), requestId)
})
val unzip = b.add(Unzip[ByteString, Long]())
val zip = b.add(Zip[ByteString, Long]())
requestFlow.out ~> unzip.in
unzip.out0 ~> tcpFlow ~> zip.in0 // request -> response
unzip.out1 ~> zip.in1 // requestId をそのまま引き継ぐ
FlowShape(requestFlow.in, zip.out)
})
// ストリームの構築・実行
private val internalClientRef: ActorRef = sourceActorRef
.via(connectionFlow)
.toMat(sinkForeach)(Keep.left)
.run()
override def receive: Receive = {
case (requestId: Long, request: String) =>
putSender(requestId, sender())
internalClientRef ! (requestId, ByteString.fromString(request))
case (response: ByteString, requestId: Long) =>
getSender(requestId) ! (requestId, response.toString())
}
}
-- その素振りは面白いですね。
これは、Akka Streamsの勉強をするために作っているので、実用では既に存在するOSSなどを使えば良いと思います。それなりに使い込まないと自分で理解できないので、大事かなと思ってひたすらAkka StreamsとActorを使いこんでいます。あとは、最近あまり参加できていないのですが、、、Akka in Actionの読書会を開いてもらったり、Reactive Messaging Patterns with the Actor Modelsの読書会に参加したりしています。Akka StreamsのActorMaterializer(RunnableGraphをActor上で実行可能にするオブジェクト)の詳細な説明や、コードを読むときの糸口になるようなヒントが書いてあるので、見ておくとコードも書きやすくなるかなと思います。
というか、ほとんど仕事の話をしていませんね(笑)。詳しくはまた時期をみてということで。ChatWork社では常に新しいことをやっています、って感じです。
-- ズバリ、楽しいですか。
かとじゅん:これで楽しくないと言ったら怒られてしまいますね(笑)実際楽しくやらせてもらっています。Scala化するという宣言をしてから暫くリリース物が出ていませんが、結果を出すまでは絶対にやらないといけないので最後までやり通したいなと思います。
出席者
- インタビューイー ChatWork株式会社 かとじゅんさん
- インタビューワー アットウェア 浅野・三嶋、 株式会社ドワンゴ/一般社団法人Japan Scala Association 水島さん