Akka アクターを用いて実装された汎用 HTTP フレームワークです。Spray の後継です。コアモジュールである akka-http-core は 2016/2/17 に experimental が外れました。akka-http などのいくつかのサブモジュールは 2016/3/1 現在 experimental のままですが、基本的な機能に関しては大きな仕様変更はなさそうです。基本的な機能を利用したサンプルと、関連する公式ドキュメントへのリンクを記載します。サンプルで使用した Scala のバージョンは 2.11.7 です。(Play Framework はブラウザを対象としたフレームワークであり、Akka HTTP の更に上位の存在です。試験的に Akka HTTP をバックエンドにした開発が進められています。)
応用例
IoT BaaS API (Qoosky Cloud Controller)
Akka HTTP 公式ドキュメント (2.4.2)
- Low-Level Server-Side API
- High-level Server-Side API
- Consuming HTTP-based Services (Client-Side)
- Common Abstractions (Client- and Server-Side)
- Configuration
Akka Streams 公式ドキュメント (2.4.2)
インストール
最新のバージョンは The Central Repository で検索してください。
build.sbt
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-http-core" % "2.4.2", // 主に低レベルのサーバーサイドおよびクライアントサイド HTTP/WebSocket API
"com.typesafe.akka" %% "akka-http-experimental" % "2.4.2", // 高レベルのサーバーサイド API (experimental)
"com.typesafe.akka" %% "akka-http-spray-json-experimental" % "2.4.2", // Akka で JSON を扱う場合はこれ (experimental)
"com.typesafe.akka" %% "akka-http-xml-experimental" % "2.4.2" // Akka で XML を扱う場合はこれ (experimental)
)
Hello world!
トップページを GET して h1 タグを返すだけのシンプルな内容です。
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.http.scaladsl.server.Directives._ // 下記 `get` などの Routing DSL ディレクティブが使用可能になる
import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._ // 下記 *1 の記法が使用可能になる
import akka.http.scaladsl.Http
object Main {
def main(args: Array[String]): Unit = {
// アクターシステムおよびその他「おまじない」を `implicit` で宣言
implicit val system = ActorSystem("mySystem")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
// ルーティング GET '/'
val route = path("") {
get {
complete {
<h1>Say hello to akka-http</h1>
} // ↑*1
}
}
// IP とポートを指定してリッスン開始
val bindingFuture = Http().bindAndHandle(route, "127.0.0.1", 8080)
// 簡単のためデーモン化せず、リターンを入力すれば停止するようにしています。
println("Server online at http://127.0.0.1:8080/\nPress RETURN to stop...")
scala.io.StdIn.readLine()
bindingFuture.flatMap(_.unbind()).onComplete(_ => system.terminate())
}
}
GET リクエストに対する複数のルーティングを設定する例です。
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._
import akka.http.scaladsl.Http
object Main {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("mySystem")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
// ルーティング
val route = get {
pathSingleSlash {
complete {
<html><body>Hello world!</body></html>
}
}
} ~
path("ok") {
complete("ok")
} ~
path("not_ok") {
sys.error("not_ok") // 500 エラー
}
val bindingFuture = Http().bindAndHandle(route, "127.0.0.1", 8080)
println("Server online at http://127.0.0.1:8080/\nPress RETURN to stop...")
scala.io.StdIn.readLine()
bindingFuture.flatMap(_.unbind()).onComplete(_ => system.terminate())
}
}
Akka Streams の理解が捗るサンプル
akka-http-core
だけで動作します。
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.HttpMethods._
import scala.concurrent.Future
object Main {
def main(args: Array[String]): Unit = {
// おまじないを三行
implicit val system = ActorSystem() // akka-http は akka-stream を用いて実装されています。akka-stream 内にはアクターシステムが存在します。
implicit val materializer = ActorMaterializer() // 「ストリームを実行(run)するためのシステム」を生成するもの。
implicit val ec = system.dispatcher
// HTTP コネクションの発生源 (Source)
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
Http().bind(interface = "127.0.0.1", port = 8080)
// 下記 `map` で使用される関数
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(GET, Uri.Path("/"), _, _, _) => {
HttpResponse(entity = "ok")
}
case HttpRequest(GET, Uri.Path("/not_ok"), _, _, _) => {
sys.error("not_ok")
}
case _: HttpRequest => {
HttpResponse(404, entity = "Not Found.")
}
}
// HTTP コネクションが処理される出口 (Sink)
val connectionHandler: Sink[Http.IncomingConnection, _] = Sink.foreach { connection: Http.IncomingConnection =>
println("Accepted new connection from " + connection.remoteAddress)
// コネクション内の HTTP リクエストが処理される出口 (Flow + Sink = Sink)
connection.handleWith {
Flow[HttpRequest].map(requestHandler)
}
}
// 非同期にコネクションを確立
val bindingFuture: Future[Http.ServerBinding] = serverSource.to(connectionHandler).run()
// エンターが入力されるまで待つ
println("Server online at http://127.0.0.1:8080/\nPress RETURN to stop...")
scala.io.StdIn.readLine()
// エンターが押されてここに処理がきたら、
bindingFuture
.flatMap(_.unbind()) // まずポートのバインドを解除
.onComplete(_ => system.terminate()) // 次にアクターシステムを終了
}
}
クライアントサイド API
いくつかのレベルの API が提供されています。低レベルの API はカスタマイズ性は高いですが、コード量は増えます。逆に高レベルの API は柔軟性が低い代わりに簡潔なコードになります。
コネクションレベル API
最も低レベルな API です。akka-http-core
だけで動作します。
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
object Main {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
// コネクション (Flow で表現)
val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] =
Http().outgoingConnection("www.example.com")
// Http().outgoingConnectionHttps("www.example.com") // https の場合
// RunnableGraph 化して run する
val responseFuture: Future[HttpResponse] =
Source.single(HttpRequest(uri = "/"))
.via(connectionFlow)
.runWith(Sink.head) // Sink.head はストリームの最初の要素 (HttpResponse) を含む Future を返す
// 完了するのを待つ
responseFuture.foreach { response =>
println(response.status) //=> 200 OK
system.terminate() // アクターシステムを終了
}
}
}
リクエストレベル API
最も高レベルな API です。これも akka-http-core
だけで動作します。
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
object Main {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
// ストリームの Source/Sink/Flow を使用せずに表現
val responseFuture: Future[HttpResponse] =
Http().singleRequest(HttpRequest(uri = "http://www.example.com"))
// Http().singleRequest(HttpRequest(uri = "https://www.example.com")) // https の場合
// 完了するのを待つ
responseFuture.foreach { response =>
println(response.status) //=> 200 OK
}
}
}
アクター内での使用
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Actor
import akka.event.Logging
import scala.concurrent.ExecutionContext.Implicits.global
class MyActor extends Actor {
import akka.pattern.pipe
// アクター内では明示的に system を渡します
final implicit val materializer = ActorMaterializer(ActorMaterializerSettings(context.system))
val http = Http(context.system)
val log = Logging(context.system, this)
def receive = {
case s: String => {
// 非同期処理が完了したら self にメッセージとしてパイプします。
log.info("received: %s" format s)
http.singleRequest(HttpRequest(uri = "http://www.example.com")).pipeTo(self)
// http.singleRequest(HttpRequest(uri = "https://www.example.com")).pipeTo(self) // https の場合
}
case HttpResponse(StatusCodes.OK, headers, entity, _) => {
log.info("Got response, body: " + entity.toString)
}
case HttpResponse(code, _, _, _) => {
log.info("Request failed, response code: " + code)
}
case _ => {
}
}
}
object Main {
def main(args: Array[String]): Unit = {
val system = ActorSystem("mySystem")
val props = Props[MyActor]
val actor = system.actorOf(props, name = "myActor")
while(true) {
actor ! "hi!"
Thread.sleep(1000)
}
}
}
実行例
[INFO] [03/12/2016 22:46:12.544] [mySystem-akka.actor.default-dispatcher-4] [akka://mySystem/user/myActor] received: hi!
[INFO] [03/12/2016 22:46:12.846] [mySystem-akka.actor.default-dispatcher-4] [akka://mySystem/user/myActor] received: hi!
[INFO] [03/12/2016 22:46:13.644] [mySystem-akka.actor.default-dispatcher-9] [akka://mySystem/user/myActor] Got response, body: HttpEntity.Strict(text/html; charset=UTF-8,<!doctype html>
<html>
<head>
<title>Example Domain</title>
...
WebSocket
Spray では WebSocket がサポートされていないのに対して Akka HTTP ではサーバーサイドおよびクライアントサイドの両方について WebSocket がサポートされています。
サーバーサイド
上記 Akka Streams の理解が捗るサンプルの応用
UpgradeToWebSocket
ヘッダーを受信したら handleMessages
によって myWebSocketService
(Flow) につなぎます。WebSocket では HTTP の場合と異なり connection はずっと一つです。一つのコネクションの中で無数の Message がやり取りされます。Message の形式はテキストまたはバイナリが選択できます。受信した Message を Source として扱い、送信する Message を Sink として扱うことで、完全な Message でなくても動作するようにすることが推奨されています。
Therefore, application code must be able to handle both streaming and strict messages and not expect certain messages to be strict.
Main.scala (akka-http-core
だけで動作します)
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._
import akka.http.scaladsl.model.HttpMethods._
import scala.concurrent.Future
object Main {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
// HTTP コネクションの発生源
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
Http().bind(interface = "127.0.0.1", port = 8080)
// 下記 `requestHandler` で使用される Flow (Stream 版)
val myWebSocketService: Flow[Message, Message, _] =
Flow[Message].mapConcat {
// `mapConcat` は List(List(1),List(2)) を List(1,2) にする `flatMap` のようなものです。
// 参考: http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-quickstart.html
case tm: TextMessage => {
// WebSocket では TextMessage または BinaryMessage をやり取りします。
// すべて受信せずに応答を開始するためにはストリームの Source (textStream) から Message を生成します。
// - TextMessage.textStream は Source[String, _] を返します。
// - BinaryMessage.dataStream は Source[ByteString, _] を返します。
// Source の連結は `++` 演算子で行います。
TextMessage(Source.single("Hello ") ++ tm.textStream) :: Nil // `mapConcat` を利用しているためリスト化しておきます。
}
case bm: BinaryMessage => {
// バイナリメッセージを skip します (Sink.ignore に流します)
bm.dataStream.runWith(Sink.ignore)
// `map` ではなく、わざわざ `mapConcat` にしている理由は List(List(), List(1)) を List(1) にできるためです。
// Nil (= List() 空リスト) を返して、何もなかったことにします。
Nil
}
}
// 下記 `requestHandler` で使用される Flow (Strict 版)
val myWebSocketService2: Flow[Message, Message, _] =
Flow[Message].map { // `map` を使用する例を兼ねることにします。
case TextMessage.Strict(txt) => {
// 完全なメッセージを受信するまで待ちます。
// Strict (完全なメッセージ) を受信してから、完全なメッセージで応答します。
// つまり、TextMessage(textStream: Source[String, Any]) ではなく TextMessage(text: String) で応答します。
TextMessage("Hello " + txt)
}
case _ => {
// バイナリメッセージを受信
TextMessage("Message type unsupported")
}
}
// 下記 `map` で使用される関数
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(GET, Uri.Path("/"), _, _, _) => {
HttpResponse(entity = "ok")
}
case req @ HttpRequest(GET, Uri.Path("/ws"), _, _, _) =>
// `@` で HttpRequest をまとめてバインドする Scala の文法です。
req.header[UpgradeToWebSocket] match {
case Some(upgrade) => {
upgrade.handleMessages(myWebSocketService)
// upgrade.handleMessages(myWebSocketService2)
}
case None => {
HttpResponse(400, entity = "Not a valid websocket request!")
}
}
case _: HttpRequest => {
HttpResponse(404, entity = "Not Found.")
}
}
// HTTP コネクションが処理される出口 (Sink)
val connectionHandler: Sink[Http.IncomingConnection, _] = Sink.foreach { connection: Http.IncomingConnection =>
println("Accepted new connection from " + connection.remoteAddress)
// コネクション内の HTTP リクエストが処理される出口 (Flow + Sink = Sink)
connection.handleWith {
Flow[HttpRequest].map(requestHandler)
}
}
// 非同期にコネクションを確立
val bindingFuture: Future[Http.ServerBinding] = serverSource.to(connectionHandler).run()
// エンターが入力されるまで待つ
println("Server online at http://127.0.0.1:8080/\nPress RETURN to stop...")
scala.io.StdIn.readLine()
// エンターが押されてここに処理がきたら、
bindingFuture
.flatMap(_.unbind()) // まずポートのバインドを解除
.onComplete(_ => system.terminate()) // 次にアクターシステムを終了
}
}
client.html
<html>
<head>
<script type="text/javascript">
// 接続
var ws = new WebSocket("ws://127.0.0.1:8080/ws");
// コネクション確立時のイベント
ws.onopen = function(){
console.log('Initialize Success');
}
// メッセージ受信時のイベント
ws.onmessage = function(message){
console.log('received : ' + message.data);
}
// エラー時のイベント
ws.onerror = function(){
console.log('Error!');
}
// コネクション切断時のイベント
ws.onclose = function(){
console.log('Connection Close!');
}
// 別のページに遷移する前やブラウザが閉じられる前に発生するイベント
window.onbeforeunload = function(){
// コネクションを切断
ws.onclose();
}
// メッセージを送信する関数
function send(){
ws.send("qoosky");
}
</script>
</head>
<body>
<a href="#" onclick="send()">送信</a>
</body>
</html>
実行例
上記 Hello world! サンプルの応用
akka-http-core
だけでなく akka-http-experimental
を利用すると、同じ処理がより簡潔に記述できます。
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.server.Directives._ // 下記 `get` などの Routing DSL ディレクティブが使用可能になる
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws._
object Main {
def main(args: Array[String]): Unit = {
// アクターシステムおよびその他「おまじない」を `implicit` で宣言
implicit val system = ActorSystem("mySystem")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
// 下記 `handleWebSocketMessages` で使用される Flow (Strict 版)
val myWebSocketService: Flow[Message, Message, _] =
Flow[Message].map {
case TextMessage.Strict(txt) => {
// 完全なメッセージを受信するまで待ちます。
// Strict (完全なメッセージ) を受信してから、完全なメッセージで応答します。
// つまり、TextMessage(textStream: Source[String, Any]) ではなく TextMessage(text: String) で応答します。
TextMessage("Hello " + txt)
}
case _ => {
// バイナリメッセージを受信
TextMessage("Message type unsupported")
}
}
// ルーティング
val route = path("") {
get {
complete("ok")
}
} ~
path("ws") {
handleWebSocketMessages(myWebSocketService)
}
// IP とポートを指定してリッスン開始
val bindingFuture = Http().bindAndHandle(route, "127.0.0.1", 8080)
// 簡単のためデーモン化せず、リターンを入力すれば停止するようにしています。
println("Server online at http://127.0.0.1:8080/\nPress RETURN to stop...")
scala.io.StdIn.readLine()
bindingFuture.flatMap(_.unbind()).onComplete(_ => system.terminate())
}
}
クライアントサイド
akka-http-core
だけで動作します。singleWebSocketRequest
を利用する方法と webSocketClientFlow
を利用する方法があります。視点が異なるだけで、処理内容に差はありません。
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._
import akka.{ Done, NotUsed }
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
object Main {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
// 確立した WebSocket コネクション上で
// サーバーから受け取るメッセージを処理する出口 (Sink)
// 一般に Sink は値を返します。`Sink.foreach` では `Future[Done]` です。
// 無限個数の Message でない限り、いつか必ず foreach の終わりがきます。
val printSink: Sink[Message, Future[Done]] =
Sink.foreach {
// 完全 (Strict) なメッセージを受信できるまで待つ
case message: TextMessage.Strict => {
println(message.text)
}
case _ => {
println("Message type unsupported")
}
}
// 確立した WebSocket コネクション上で
// サーバーの送信するメッセージの発生源 (Source)
// 一つのメッセージだけを生成します。
val helloSource: Source[Message, NotUsed] =
Source.single(TextMessage("qoosky"))
//// 方法 その1
// WebSocket(Source) ~> myFlow ~> WebSocket(Sink)
// 「Message を printSink に流す」
// 「Message を helloSource から生成する」
// これらを合わせると Flow[Message, Message, *1]
// 更に Flow の materialized value を Sink と Source のどちらから採用するか決定 (*1)
// - (Keep.left) とすれば Sink の Future[Done]
// - (Keep.right) とすれば Source の NotUsed
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)
// `singleWebSocketRequest` で「WebSocket コネクションを確立して flow につなぐ」
// WebSocket ではコネクションは張りっぱなしが基本
// - upgradeResponse: Future[WebSocketUpgradeResponse]
// コネクション確立/失敗時に WebSocketUpgradeResponse が得られる
// - closed: Future[Done]
// WebSocket コネクションが切断されて
// メッセージがストリームで流れなくなったときに Done が得られる
val (upgradeResponse, closed) =
Http().singleWebSocketRequest(WebSocketRequest("ws://127.0.0.1:8080/ws"), flow)
// ------------------------------------------------------------
//// 方法 その2
// mySource ~> WebSocket(Flow) ~> mySink
// val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://127.0.0.1:8080/ws"))
// val (upgradeResponse, closed) =
// helloSource
// .viaMat(webSocketFlow)(Keep.right) // materialized value としては webSocketFlow の Future[WebSocketUpgradeResponse] を採用
// .toMat(printSink)(Keep.both) // left: webSocketFlow の Future[WebSocketUpgradeResponse], right: printSink の Future[Done]
// .run()
// ------------------------------------------------------------
// `upgradeResponse` は Future です。
// 値が得られたら、それが成功なのか失敗なのか判定します。
val connected = upgradeResponse.map { upgrade: WebSocketUpgradeResponse =>
if (upgrade.response.status == StatusCodes.OK) {
// 200 ならばコネクション確立が成功
Done
} else {
// 404 などの場合
// `s""` とすると "" の中に ${式} を挿入できます。Scala の文法です。
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
// コネクションが確立または失敗したら実行される処理を登録
connected.onComplete(_ => println("Initialize Success"))
// メッセージのやり取りが終了したら (コネクションが切断されたら) 実行される処理を登録
closed.foreach{ _ =>
println("closed")
system.terminate // アクターシステムを終了
}
}
}
実行例
Initialize Success
Hello qoosky
closed
関連記事
- Spring Security フォームログインのサンプルコードSpring フレームワークによる Web アプリケーション開発で、ログイン処理を実装する際は Spring Security が便利です。ここでは特に Spring Boot で Web アプリケーションを開発する場合を対象とし、フォームによる ID/Password ログインを行うためのサンプルコードをまとめます。 公式ドキュメント [Spring Security チュートリアル](http...
- Java配列の宣言方法 (C/C++との違い)Javaの配列 Javaの配列宣言方法はC/C++と似ているようで若干異なる。 初期化しない場合 C/C++の int array[10]; はJavaでは int array[] = new int[10]; となる。同様にC/C++の int array[3][3]; はJavaでは int array[][] = new int[3][3]; となる。 初期化
- PlantUML による UML 図の描き方PlantUML はテキスト形式で表現されたシーケンス図やクラス図といった UML (Unified Modeling Language) 図の情報から画像を生成するためのツールです。簡単な使い方をまとめます。 インストール方法の選択 Atom や Eclipse のプラグインをインストールしてエディタから利用する方法、JAR をダウンロードして Java コマンドで実行する方法、Redmine ...
- Kestrel の使用例Kestrel は Message Queue (MQ) の実装のひとつです。一般に MQ はアプリケーション間やプロセス間、スレッド間で非同期に通信するために用いられます。メッセージの送信側は MQ に書き込めば受信側の応答を待たずに次の処理に非同期に進むことができます。Kestrel はわずか 2500 行程の Scala で実装されており JVM で動作します。MQ 自体はメモリ上に存在する...
- Android Activity ライフサイクルの様子を把握するための簡易サンプルコードAndroid アプリケーションのコンポーネントの一つである Activity についてライフサイクルの様子を把握するための簡易サンプルコードです。 The Activity Lifecycle [ライフサイクルの状態遷移図](https://developer.android.com/guide/components/