Akka HTTP サンプルコード (Scala)
こちらに記載した Akka アクターを用いて実装された汎用 HTTP フレームワークです。Spray の後継です。コアモジュールである akka-http-core は 2016/2/17 に experimental が外れました。akka-http などのいくつかのサブモジュールは 2016/3/1 現在 experimental のままですが、基本的な機能に関しては大きな仕様変更はなさそうです。基本的な機能を利用したサンプルと、関連する公式ドキュメントへのリンクを記載します。サンプルで使用した Scala のバージョンは 2.11.7 です。(Play Framework はブラウザを対象としたフレームワークであり、Akka HTTP の更に上位の存在です。試験的に Akka HTTP をバックエンドにした開発が進められています。)


libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-http-core" % "2.4.2", // 主に低レベルのサーバーサイドおよびクライアントサイド HTTP/WebSocket API
  "com.typesafe.akka" %% "akka-http-experimental" % "2.4.2", // 高レベルのサーバーサイド API (experimental)
  "com.typesafe.akka" %% "akka-http-spray-json-experimental" % "2.4.2",  // Akka で JSON を扱う場合はこれ (experimental)
  "com.typesafe.akka" %% "akka-http-xml-experimental" % "2.4.2" // Akka で XML を扱う場合はこれ (experimental)

Hello world!

トップページを GET して h1 タグを返すだけのシンプルな内容です。

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.http.scaladsl.server.Directives._ // 下記 `get` などの Routing DSL ディレクティブが使用可能になる
import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._ // 下記 *1 の記法が使用可能になる
import akka.http.scaladsl.Http

object Main {
  def main(args: Array[String]): Unit = {

    // アクターシステムおよびその他「おまじない」を `implicit` で宣言
    implicit val system = ActorSystem("mySystem")
    implicit val materializer = ActorMaterializer()
    implicit val ec = system.dispatcher

    // ルーティング GET '/'
    val route = path("") {
      get {
        complete {
          <h1>Say hello to akka-http</h1>
        } // ↑*1

    // IP とポートを指定してリッスン開始
    val bindingFuture = Http().bindAndHandle(route, "", 8080)

    // 簡単のためデーモン化せず、リターンを入力すれば停止するようにしています。
    println("Server online at\nPress RETURN to stop...")
    bindingFuture.flatMap(_.unbind()).onComplete(_ => system.terminate())

GET リクエストに対する複数のルーティングを設定する例です。

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._
import akka.http.scaladsl.Http

object Main {
  def main(args: Array[String]): Unit = {

    implicit val system = ActorSystem("mySystem")
    implicit val materializer = ActorMaterializer()
    implicit val ec = system.dispatcher

    // ルーティング
    val route = get {
      pathSingleSlash {
        complete {
          <html><body>Hello world!</body></html>
    } ~
    path("ok") {
    } ~
    path("not_ok") {
      sys.error("not_ok") // 500 エラー

    val bindingFuture = Http().bindAndHandle(route, "", 8080)

    println("Server online at\nPress RETURN to stop...")
    bindingFuture.flatMap(_.unbind()).onComplete(_ => system.terminate())

Akka Streams の理解が捗るサンプル

akka-http-core だけで動作します。

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.HttpMethods._
import scala.concurrent.Future

object Main {
  def main(args: Array[String]): Unit = {

    // おまじないを三行
    implicit val system = ActorSystem() // akka-http は akka-stream を用いて実装されています。akka-stream 内にはアクターシステムが存在します。
    implicit val materializer = ActorMaterializer() // 「ストリームを実行(run)するためのシステム」を生成するもの。
    implicit val ec = system.dispatcher

    // HTTP コネクションの発生源 (Source)
    val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
      Http().bind(interface = "", port = 8080)

    // 下記 `map` で使用される関数
    val requestHandler: HttpRequest => HttpResponse = {
      case HttpRequest(GET, Uri.Path("/"), _, _, _) => {
        HttpResponse(entity = "ok")
      case HttpRequest(GET, Uri.Path("/not_ok"), _, _, _) => {
      case _: HttpRequest => {
        HttpResponse(404, entity = "Not Found.")

    // HTTP コネクションが処理される出口 (Sink)
    val connectionHandler: Sink[Http.IncomingConnection, _] = Sink.foreach { connection: Http.IncomingConnection =>
      println("Accepted new connection from " + connection.remoteAddress)

      // コネクション内の HTTP リクエストが処理される出口 (Flow + Sink = Sink)
      connection.handleWith {

    // 非同期にコネクションを確立
    val bindingFuture: Future[Http.ServerBinding] = serverSource.to(connectionHandler).run()

    // エンターが入力されるまで待つ
    println("Server online at\nPress RETURN to stop...")

    // エンターが押されてここに処理がきたら、
      .flatMap(_.unbind()) // まずポートのバインドを解除
      .onComplete(_ => system.terminate()) // 次にアクターシステムを終了

クライアントサイド API

いくつかのレベルの API が提供されています。低レベルの API はカスタマイズ性は高いですが、コード量は増えます。逆に高レベルの API は柔軟性が低い代わりに簡潔なコードになります。

コネクションレベル API

最も低レベルな API です。akka-http-core だけで動作します。

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

object Main {
  def main(args: Array[String]): Unit = {

    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()

    // コネクション (Flow で表現)
    val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] =
      // Http().outgoingConnectionHttps("www.example.com") // https の場合

    // RunnableGraph 化して run する
    val responseFuture: Future[HttpResponse] =
      Source.single(HttpRequest(uri = "/"))
        .runWith(Sink.head) // Sink.head はストリームの最初の要素 (HttpResponse) を含む Future を返す

    // 完了するのを待つ
    responseFuture.foreach { response =>
      println(response.status) //=> 200 OK
      system.terminate() // アクターシステムを終了

リクエストレベル API

最も高レベルな API です。これも akka-http-core だけで動作します。

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

object Main {
  def main(args: Array[String]): Unit = {

    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()

    // ストリームの Source/Sink/Flow を使用せずに表現
    val responseFuture: Future[HttpResponse] =
      Http().singleRequest(HttpRequest(uri = "http://www.example.com"))
      // Http().singleRequest(HttpRequest(uri = "https://www.example.com")) // https の場合

    // 完了するのを待つ
    responseFuture.foreach { response =>
      println(response.status) //=> 200 OK


import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Actor
import akka.event.Logging
import scala.concurrent.ExecutionContext.Implicits.global

class MyActor extends Actor {
  import akka.pattern.pipe

  // アクター内では明示的に system を渡します
  final implicit val materializer = ActorMaterializer(ActorMaterializerSettings(context.system))
  val http = Http(context.system)
  val log = Logging(context.system, this)

  def receive = {
    case s: String => {
      // 非同期処理が完了したら self にメッセージとしてパイプします。
      log.info("received: %s" format s)
      http.singleRequest(HttpRequest(uri = "http://www.example.com")).pipeTo(self)
      // http.singleRequest(HttpRequest(uri = "https://www.example.com")).pipeTo(self) // https の場合
    case HttpResponse(StatusCodes.OK, headers, entity, _) => {
      log.info("Got response, body: " + entity.toString)
    case HttpResponse(code, _, _, _) => {
      log.info("Request failed, response code: " + code)
    case _ => {

object Main {
  def main(args: Array[String]): Unit = {

    val system = ActorSystem("mySystem")
    val props = Props[MyActor]
    val actor = system.actorOf(props, name = "myActor")

    while(true) {
      actor ! "hi!"


[INFO] [03/12/2016 22:46:12.544] [mySystem-akka.actor.default-dispatcher-4] [akka://mySystem/user/myActor] received: hi!
[INFO] [03/12/2016 22:46:12.846] [mySystem-akka.actor.default-dispatcher-4] [akka://mySystem/user/myActor] received: hi!
[INFO] [03/12/2016 22:46:13.644] [mySystem-akka.actor.default-dispatcher-9] [akka://mySystem/user/myActor] Got response, body: HttpEntity.Strict(text/html; charset=UTF-8,<!doctype html>
    <title>Example Domain</title>


Spray では WebSocket がサポートされていないのに対して Akka HTTP ではサーバーサイドおよびクライアントサイドの両方について WebSocket がサポートされています。


上記 Akka Streams の理解が捗るサンプルの応用

UpgradeToWebSocket ヘッダーを受信したら handleMessages によって myWebSocketService (Flow) につなぎます。WebSocket では HTTP の場合と異なり connection はずっと一つです。一つのコネクションの中で無数の Message がやり取りされます。Message の形式はテキストまたはバイナリが選択できます。受信した Message を Source として扱い、送信する Message を Sink として扱うことで、完全な Message でなくても動作するようにすることが推奨されています

Therefore, application code must be able to handle both streaming and strict messages and not expect certain messages to be strict.

Main.scala (akka-http-core だけで動作します)

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._
import akka.http.scaladsl.model.HttpMethods._
import scala.concurrent.Future

object Main {
  def main(args: Array[String]): Unit = {

    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    implicit val ec = system.dispatcher

    // HTTP コネクションの発生源
    val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
      Http().bind(interface = "", port = 8080)

    // 下記 `requestHandler` で使用される Flow (Stream 版)
    val myWebSocketService: Flow[Message, Message, _] =
      Flow[Message].mapConcat {
        // `mapConcat` は List(List(1),List(2)) を List(1,2) にする `flatMap` のようなものです。
        // 参考: http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-quickstart.html
        case tm: TextMessage => {
          // WebSocket では TextMessage または BinaryMessage をやり取りします。
          // すべて受信せずに応答を開始するためにはストリームの Source (textStream) から Message を生成します。
          // - TextMessage.textStream は Source[String, _] を返します。
          // - BinaryMessage.dataStream は Source[ByteString, _] を返します。
          // Source の連結は `++` 演算子で行います。
          TextMessage(Source.single("Hello ") ++ tm.textStream) :: Nil // `mapConcat` を利用しているためリスト化しておきます。
        case bm: BinaryMessage => {
          // バイナリメッセージを skip します (Sink.ignore に流します)
          // `map` ではなく、わざわざ `mapConcat` にしている理由は List(List(), List(1)) を List(1) にできるためです。
          // Nil (= List() 空リスト) を返して、何もなかったことにします。

    // 下記 `requestHandler` で使用される Flow (Strict 版)
    val myWebSocketService2: Flow[Message, Message, _] =
      Flow[Message].map { // `map` を使用する例を兼ねることにします。
        case TextMessage.Strict(txt) => {
          // 完全なメッセージを受信するまで待ちます。
          // Strict (完全なメッセージ) を受信してから、完全なメッセージで応答します。
          // つまり、TextMessage(textStream: Source[String, Any]) ではなく TextMessage(text: String) で応答します。
          TextMessage("Hello " + txt)
        case _ => {
          // バイナリメッセージを受信
          TextMessage("Message type unsupported")

    // 下記 `map` で使用される関数
    val requestHandler: HttpRequest => HttpResponse = {
      case HttpRequest(GET, Uri.Path("/"), _, _, _) => {
        HttpResponse(entity = "ok")
      case req @ HttpRequest(GET, Uri.Path("/ws"), _, _, _) =>
        // `@` で HttpRequest をまとめてバインドする Scala の文法です。
        // 参考: https://www.qoosky.io/techs/c7921ddb98
        req.header[UpgradeToWebSocket] match {
          case Some(upgrade) => {
            // upgrade.handleMessages(myWebSocketService2)
          case None => {
            HttpResponse(400, entity = "Not a valid websocket request!")
      case _: HttpRequest => {
        HttpResponse(404, entity = "Not Found.")

    // HTTP コネクションが処理される出口 (Sink)
    val connectionHandler: Sink[Http.IncomingConnection, _] = Sink.foreach { connection: Http.IncomingConnection =>
      println("Accepted new connection from " + connection.remoteAddress)

      // コネクション内の HTTP リクエストが処理される出口 (Flow + Sink = Sink)
      connection.handleWith {

    // 非同期にコネクションを確立
    val bindingFuture: Future[Http.ServerBinding] = serverSource.to(connectionHandler).run()

    // エンターが入力されるまで待つ
    println("Server online at\nPress RETURN to stop...")

    // エンターが押されてここに処理がきたら、
      .flatMap(_.unbind()) // まずポートのバインドを解除
      .onComplete(_ => system.terminate()) // 次にアクターシステムを終了


    <script type="text/javascript">
     // 接続
     var ws = new WebSocket("ws://");

     // コネクション確立時のイベント
     ws.onopen = function(){
       console.log('Initialize Success');

     // メッセージ受信時のイベント
     ws.onmessage = function(message){
       console.log('received : ' + message.data);

     // エラー時のイベント
     ws.onerror = function(){

     // コネクション切断時のイベント
     ws.onclose = function(){
       console.log('Connection Close!');

     // 別のページに遷移する前やブラウザが閉じられる前に発生するイベント
     window.onbeforeunload = function(){
       // コネクションを切断

     // メッセージを送信する関数
     function send(){
    <a href="#" onclick="send()">送信</a>


上記 Hello world! サンプルの応用

akka-http-core だけでなく akka-http-experimental を利用すると、同じ処理がより簡潔に記述できます。

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.server.Directives._ // 下記 `get` などの Routing DSL ディレクティブが使用可能になる
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws._

object Main {
  def main(args: Array[String]): Unit = {

    // アクターシステムおよびその他「おまじない」を `implicit` で宣言
    implicit val system = ActorSystem("mySystem")
    implicit val materializer = ActorMaterializer()
    implicit val ec = system.dispatcher

    // 下記 `handleWebSocketMessages` で使用される Flow (Strict 版)
    val myWebSocketService: Flow[Message, Message, _] =
      Flow[Message].map {
        case TextMessage.Strict(txt) => {
          // 完全なメッセージを受信するまで待ちます。
          // Strict (完全なメッセージ) を受信してから、完全なメッセージで応答します。
          // つまり、TextMessage(textStream: Source[String, Any]) ではなく TextMessage(text: String) で応答します。
          TextMessage("Hello " + txt)
        case _ => {
          // バイナリメッセージを受信
          TextMessage("Message type unsupported")

    // ルーティング
    val route = path("") {
      get {
    } ~
    path("ws") {

    // IP とポートを指定してリッスン開始
    val bindingFuture = Http().bindAndHandle(route, "", 8080)

    // 簡単のためデーモン化せず、リターンを入力すれば停止するようにしています。
    println("Server online at\nPress RETURN to stop...")
    bindingFuture.flatMap(_.unbind()).onComplete(_ => system.terminate())



