Akka HTTP サンプルコード (Scala)
[最終更新] (2019/06/03 00:39:40)
最近の投稿
注目の記事

概要

こちらに記載した Akka アクターを用いて実装された汎用 HTTP フレームワークです。Spray の後継です。コアモジュールである akka-http-core は 2016/2/17 に experimental が外れました。akka-http などのいくつかのサブモジュールは 2016/3/1 現在 experimental のままですが、基本的な機能に関しては大きな仕様変更はなさそうです。基本的な機能を利用したサンプルと、関連する公式ドキュメントへのリンクを記載します。サンプルで使用した Scala のバージョンは 2.11.7 です。(Play Framework はブラウザを対象としたフレームワークであり、Akka HTTP の更に上位の存在です。試験的に Akka HTTP をバックエンドにした開発が進められています。)

応用例

IoT BaaS API (Qoosky Cloud Controller)

Akka HTTP 公式ドキュメント (2.4.2)

Akka Streams 公式ドキュメント (2.4.2)

インストール

最新のバージョンは The Central Repository で検索してください。

build.sbt

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-http-core" % "2.4.2", // 主に低レベルのサーバーサイドおよびクライアントサイド HTTP/WebSocket API
  "com.typesafe.akka" %% "akka-http-experimental" % "2.4.2", // 高レベルのサーバーサイド API (experimental)
  "com.typesafe.akka" %% "akka-http-spray-json-experimental" % "2.4.2",  // Akka で JSON を扱う場合はこれ (experimental)
  "com.typesafe.akka" %% "akka-http-xml-experimental" % "2.4.2" // Akka で XML を扱う場合はこれ (experimental)
)

Hello world!

トップページを GET して h1 タグを返すだけのシンプルな内容です。

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.http.scaladsl.server.Directives._ // 下記 `get` などの Routing DSL ディレクティブが使用可能になる
import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._ // 下記 *1 の記法が使用可能になる
import akka.http.scaladsl.Http

object Main {
  def main(args: Array[String]): Unit = {

    // アクターシステムおよびその他「おまじない」を `implicit` で宣言
    implicit val system = ActorSystem("mySystem")
    implicit val materializer = ActorMaterializer()
    implicit val ec = system.dispatcher

    // ルーティング GET '/'
    val route = path("") {
      get {
        complete {
          <h1>Say hello to akka-http</h1>
        } // ↑*1
      }
    }

    // IP とポートを指定してリッスン開始
    val bindingFuture = Http().bindAndHandle(route, "127.0.0.1", 8080)

    // 簡単のためデーモン化せず、リターンを入力すれば停止するようにしています。
    println("Server online at http://127.0.0.1:8080/\nPress RETURN to stop...")
    scala.io.StdIn.readLine()
    bindingFuture.flatMap(_.unbind()).onComplete(_ => system.terminate())
  }
}

GET リクエストに対する複数のルーティングを設定する例です。

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._
import akka.http.scaladsl.Http

object Main {
  def main(args: Array[String]): Unit = {

    implicit val system = ActorSystem("mySystem")
    implicit val materializer = ActorMaterializer()
    implicit val ec = system.dispatcher

    // ルーティング
    val route = get {
      pathSingleSlash {
        complete {
          <html><body>Hello world!</body></html>
        }
      }
    } ~
    path("ok") {
      complete("ok")
    } ~
    path("not_ok") {
      sys.error("not_ok") // 500 エラー
    }

    val bindingFuture = Http().bindAndHandle(route, "127.0.0.1", 8080)

    println("Server online at http://127.0.0.1:8080/\nPress RETURN to stop...")
    scala.io.StdIn.readLine()
    bindingFuture.flatMap(_.unbind()).onComplete(_ => system.terminate())
  }
}

Akka Streams の理解が捗るサンプル

akka-http-core だけで動作します。

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.HttpMethods._
import scala.concurrent.Future

object Main {
  def main(args: Array[String]): Unit = {

    // おまじないを三行
    implicit val system = ActorSystem() // akka-http は akka-stream を用いて実装されています。akka-stream 内にはアクターシステムが存在します。
    implicit val materializer = ActorMaterializer() // 「ストリームを実行(run)するためのシステム」を生成するもの。
    implicit val ec = system.dispatcher

    // HTTP コネクションの発生源 (Source)
    val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
      Http().bind(interface = "127.0.0.1", port = 8080)

    // 下記 `map` で使用される関数
    val requestHandler: HttpRequest => HttpResponse = {
      case HttpRequest(GET, Uri.Path("/"), _, _, _) => {
        HttpResponse(entity = "ok")
      }
      case HttpRequest(GET, Uri.Path("/not_ok"), _, _, _) => {
        sys.error("not_ok")
      }
      case _: HttpRequest => {
        HttpResponse(404, entity = "Not Found.")
      }
    }

    // HTTP コネクションが処理される出口 (Sink)
    val connectionHandler: Sink[Http.IncomingConnection, _] = Sink.foreach { connection: Http.IncomingConnection =>
      println("Accepted new connection from " + connection.remoteAddress)

      // コネクション内の HTTP リクエストが処理される出口 (Flow + Sink = Sink)
      connection.handleWith {
        Flow[HttpRequest].map(requestHandler)
      }
    }

    // 非同期にコネクションを確立
    val bindingFuture: Future[Http.ServerBinding] = serverSource.to(connectionHandler).run()

    // エンターが入力されるまで待つ
    println("Server online at http://127.0.0.1:8080/\nPress RETURN to stop...")
    scala.io.StdIn.readLine()

    // エンターが押されてここに処理がきたら、
    bindingFuture
      .flatMap(_.unbind()) // まずポートのバインドを解除
      .onComplete(_ => system.terminate()) // 次にアクターシステムを終了
  }
}

クライアントサイド API

いくつかのレベルの API が提供されています。低レベルの API はカスタマイズ性は高いですが、コード量は増えます。逆に高レベルの API は柔軟性が低い代わりに簡潔なコードになります。

コネクションレベル API

最も低レベルな API です。akka-http-core だけで動作します。

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

object Main {
  def main(args: Array[String]): Unit = {

    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()

    // コネクション (Flow で表現)
    val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] =
      Http().outgoingConnection("www.example.com")
      // Http().outgoingConnectionHttps("www.example.com") // https の場合

    // RunnableGraph 化して run する
    val responseFuture: Future[HttpResponse] =
      Source.single(HttpRequest(uri = "/"))
        .via(connectionFlow)
        .runWith(Sink.head) // Sink.head はストリームの最初の要素 (HttpResponse) を含む Future を返す

    // 完了するのを待つ
    responseFuture.foreach { response =>
      println(response.status) //=> 200 OK
      system.terminate() // アクターシステムを終了
    }
  }
}

リクエストレベル API

最も高レベルな API です。これも akka-http-core だけで動作します。

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

object Main {
  def main(args: Array[String]): Unit = {

    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()

    // ストリームの Source/Sink/Flow を使用せずに表現
    val responseFuture: Future[HttpResponse] =
      Http().singleRequest(HttpRequest(uri = "http://www.example.com"))
      // Http().singleRequest(HttpRequest(uri = "https://www.example.com")) // https の場合

    // 完了するのを待つ
    responseFuture.foreach { response =>
      println(response.status) //=> 200 OK
    }
  }
}

アクター内での使用

import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Actor
import akka.event.Logging
import scala.concurrent.ExecutionContext.Implicits.global

class MyActor extends Actor {
  import akka.pattern.pipe

  // アクター内では明示的に system を渡します
  final implicit val materializer = ActorMaterializer(ActorMaterializerSettings(context.system))
  val http = Http(context.system)
  val log = Logging(context.system, this)

  def receive = {
    case s: String => {
      // 非同期処理が完了したら self にメッセージとしてパイプします。
      log.info("received: %s" format s)
      http.singleRequest(HttpRequest(uri = "http://www.example.com")).pipeTo(self)
      // http.singleRequest(HttpRequest(uri = "https://www.example.com")).pipeTo(self) // https の場合
    }
    case HttpResponse(StatusCodes.OK, headers, entity, _) => {
      log.info("Got response, body: " + entity.toString)
    }
    case HttpResponse(code, _, _, _) => {
      log.info("Request failed, response code: " + code)
    }
    case _ => {
    }
  }
}

object Main {
  def main(args: Array[String]): Unit = {

    val system = ActorSystem("mySystem")
    val props = Props[MyActor]
    val actor = system.actorOf(props, name = "myActor")

    while(true) {
      actor ! "hi!"
      Thread.sleep(1000)
    }
  }
}

実行例

[INFO] [03/12/2016 22:46:12.544] [mySystem-akka.actor.default-dispatcher-4] [akka://mySystem/user/myActor] received: hi!
[INFO] [03/12/2016 22:46:12.846] [mySystem-akka.actor.default-dispatcher-4] [akka://mySystem/user/myActor] received: hi!
[INFO] [03/12/2016 22:46:13.644] [mySystem-akka.actor.default-dispatcher-9] [akka://mySystem/user/myActor] Got response, body: HttpEntity.Strict(text/html; charset=UTF-8,<!doctype html>
<html>
<head>
    <title>Example Domain</title>
...

WebSocket

Spray では WebSocket がサポートされていないのに対して Akka HTTP ではサーバーサイドおよびクライアントサイドの両方について WebSocket がサポートされています。

サーバーサイド

上記 Akka Streams の理解が捗るサンプルの応用

UpgradeToWebSocket ヘッダーを受信したら handleMessages によって myWebSocketService (Flow) につなぎます。WebSocket では HTTP の場合と異なり connection はずっと一つです。一つのコネクションの中で無数の Message がやり取りされます。Message の形式はテキストまたはバイナリが選択できます。受信した Message を Source として扱い、送信する Message を Sink として扱うことで、完全な Message でなくても動作するようにすることが推奨されています

Therefore, application code must be able to handle both streaming and strict messages and not expect certain messages to be strict.

Main.scala (akka-http-core だけで動作します)

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._
import akka.http.scaladsl.model.HttpMethods._
import scala.concurrent.Future

object Main {
  def main(args: Array[String]): Unit = {

    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    implicit val ec = system.dispatcher

    // HTTP コネクションの発生源
    val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
      Http().bind(interface = "127.0.0.1", port = 8080)

    // 下記 `requestHandler` で使用される Flow (Stream 版)
    val myWebSocketService: Flow[Message, Message, _] =
      Flow[Message].mapConcat {
        // `mapConcat` は List(List(1),List(2)) を List(1,2) にする `flatMap` のようなものです。
        // 参考: http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-quickstart.html
        case tm: TextMessage => {
          // WebSocket では TextMessage または BinaryMessage をやり取りします。
          // すべて受信せずに応答を開始するためにはストリームの Source (textStream) から Message を生成します。
          // - TextMessage.textStream は Source[String, _] を返します。
          // - BinaryMessage.dataStream は Source[ByteString, _] を返します。
          // Source の連結は `++` 演算子で行います。
          TextMessage(Source.single("Hello ") ++ tm.textStream) :: Nil // `mapConcat` を利用しているためリスト化しておきます。
        }
        case bm: BinaryMessage => {
          // バイナリメッセージを skip します (Sink.ignore に流します)
          bm.dataStream.runWith(Sink.ignore)
          // `map` ではなく、わざわざ `mapConcat` にしている理由は List(List(), List(1)) を List(1) にできるためです。
          // Nil (= List() 空リスト) を返して、何もなかったことにします。
          Nil
        }
      }

    // 下記 `requestHandler` で使用される Flow (Strict 版)
    val myWebSocketService2: Flow[Message, Message, _] =
      Flow[Message].map { // `map` を使用する例を兼ねることにします。
        case TextMessage.Strict(txt) => {
          // 完全なメッセージを受信するまで待ちます。
          // Strict (完全なメッセージ) を受信してから、完全なメッセージで応答します。
          // つまり、TextMessage(textStream: Source[String, Any]) ではなく TextMessage(text: String) で応答します。
          TextMessage("Hello " + txt)
        }
        case _ => {
          // バイナリメッセージを受信
          TextMessage("Message type unsupported")
        }
      }

    // 下記 `map` で使用される関数
    val requestHandler: HttpRequest => HttpResponse = {
      case HttpRequest(GET, Uri.Path("/"), _, _, _) => {
        HttpResponse(entity = "ok")
      }
      case req @ HttpRequest(GET, Uri.Path("/ws"), _, _, _) =>
        // `@` で HttpRequest をまとめてバインドする Scala の文法です。
        // 参考: https://www.qoosky.io/techs/c7921ddb98
        req.header[UpgradeToWebSocket] match {
          case Some(upgrade) => {
            upgrade.handleMessages(myWebSocketService)
            // upgrade.handleMessages(myWebSocketService2)
          }
          case None => {
            HttpResponse(400, entity = "Not a valid websocket request!")
          }
        }
      case _: HttpRequest => {
        HttpResponse(404, entity = "Not Found.")
      }
    }

    // HTTP コネクションが処理される出口 (Sink)
    val connectionHandler: Sink[Http.IncomingConnection, _] = Sink.foreach { connection: Http.IncomingConnection =>
      println("Accepted new connection from " + connection.remoteAddress)

      // コネクション内の HTTP リクエストが処理される出口 (Flow + Sink = Sink)
      connection.handleWith {
        Flow[HttpRequest].map(requestHandler)
      }
    }

    // 非同期にコネクションを確立
    val bindingFuture: Future[Http.ServerBinding] = serverSource.to(connectionHandler).run()

    // エンターが入力されるまで待つ
    println("Server online at http://127.0.0.1:8080/\nPress RETURN to stop...")
    scala.io.StdIn.readLine()

    // エンターが押されてここに処理がきたら、
    bindingFuture
      .flatMap(_.unbind()) // まずポートのバインドを解除
      .onComplete(_ => system.terminate()) // 次にアクターシステムを終了
  }
}

client.html

<html>
  <head>
    <script type="text/javascript">
     // 接続
     var ws = new WebSocket("ws://127.0.0.1:8080/ws");

     // コネクション確立時のイベント
     ws.onopen = function(){
       console.log('Initialize Success');
     }

     // メッセージ受信時のイベント
     ws.onmessage = function(message){
       console.log('received : ' + message.data);
     }

     // エラー時のイベント
     ws.onerror = function(){
       console.log('Error!');
     }

     // コネクション切断時のイベント
     ws.onclose = function(){
       console.log('Connection Close!');
     }

     // 別のページに遷移する前やブラウザが閉じられる前に発生するイベント
     window.onbeforeunload = function(){
       // コネクションを切断
       ws.onclose();
     }

     // メッセージを送信する関数
     function send(){
       ws.send("qoosky");
     }
    </script>
  </head>
  <body>
    <a href="#" onclick="send()">送信</a>
  </body>
</html>

実行例

Uploaded Image

上記 Hello world! サンプルの応用

akka-http-core だけでなく akka-http-experimental を利用すると、同じ処理がより簡潔に記述できます。

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.server.Directives._ // 下記 `get` などの Routing DSL ディレクティブが使用可能になる
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws._

object Main {
  def main(args: Array[String]): Unit = {

    // アクターシステムおよびその他「おまじない」を `implicit` で宣言
    implicit val system = ActorSystem("mySystem")
    implicit val materializer = ActorMaterializer()
    implicit val ec = system.dispatcher

    // 下記 `handleWebSocketMessages` で使用される Flow (Strict 版)
    val myWebSocketService: Flow[Message, Message, _] =
      Flow[Message].map {
        case TextMessage.Strict(txt) => {
          // 完全なメッセージを受信するまで待ちます。
          // Strict (完全なメッセージ) を受信してから、完全なメッセージで応答します。
          // つまり、TextMessage(textStream: Source[String, Any]) ではなく TextMessage(text: String) で応答します。
          TextMessage("Hello " + txt)
        }
        case _ => {
          // バイナリメッセージを受信
          TextMessage("Message type unsupported")
        }
      }

    // ルーティング
    val route = path("") {
      get {
        complete("ok")
      }
    } ~
    path("ws") {
      handleWebSocketMessages(myWebSocketService)
    }

    // IP とポートを指定してリッスン開始
    val bindingFuture = Http().bindAndHandle(route, "127.0.0.1", 8080)

    // 簡単のためデーモン化せず、リターンを入力すれば停止するようにしています。
    println("Server online at http://127.0.0.1:8080/\nPress RETURN to stop...")
    scala.io.StdIn.readLine()
    bindingFuture.flatMap(_.unbind()).onComplete(_ => system.terminate())
  }
}

クライアントサイド

この続きが気になる方は

Akka HTTP サンプルコード (Scala)

残り文字数は全体の約 16 %
tybot
100 円
関連ページ
    概要 Akka 2.4.2 を用いたサンプルコード集です。動作には Java 8 以降が必要です。 Akka requires that you have Java 8 or later installed on your machine. インストール方法は複数提供されています。その一部を記載します。
    概要 こちらで基本的な使用方法をまとめた Akka HTTP Scala アプリケーションをデーモン化します。 参考サイト Apache Commons Daemonを使ってJavaのデーモンプログラムを作る Start and stop a Scala application in production
    概要 Go 言語に関する基本的な事項を記載します。 モジュール、パッケージ、ライブラリ、アプリケーション 一つ以上の関数をまとめたものをパッケージとして利用します。更に一つ以上のパッケージをまとめたものをモジュールとして利用します。モジュールにはライブラリとして機能するものと、アプリケーションとして機能するものがあります。