こちらに記載した Akka アクターを用いて実装された汎用 HTTP フレームワークです。Spray の後継です。コアモジュールである akka-http-core は 2016/2/17 に experimental が外れました。akka-http などのいくつかのサブモジュールは 2016/3/1 現在 experimental のままですが、基本的な機能に関しては大きな仕様変更はなさそうです。基本的な機能を利用したサンプルと、関連する公式ドキュメントへのリンクを記載します。サンプルで使用した Scala のバージョンは 2.11.7 です。(Play Framework はブラウザを対象としたフレームワークであり、Akka HTTP の更に上位の存在です。試験的に Akka HTTP をバックエンドにした開発が進められています。)
IoT BaaS API (Qoosky Cloud Controller)
最新のバージョンは The Central Repository で検索してください。
build.sbt
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-http-core" % "2.4.2", // 主に低レベルのサーバーサイドおよびクライアントサイド HTTP/WebSocket API
"com.typesafe.akka" %% "akka-http-experimental" % "2.4.2", // 高レベルのサーバーサイド API (experimental)
"com.typesafe.akka" %% "akka-http-spray-json-experimental" % "2.4.2", // Akka で JSON を扱う場合はこれ (experimental)
"com.typesafe.akka" %% "akka-http-xml-experimental" % "2.4.2" // Akka で XML を扱う場合はこれ (experimental)
)
トップページを GET して h1 タグを返すだけのシンプルな内容です。
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.http.scaladsl.server.Directives._ // 下記 `get` などの Routing DSL ディレクティブが使用可能になる
import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._ // 下記 *1 の記法が使用可能になる
import akka.http.scaladsl.Http
object Main {
def main(args: Array[String]): Unit = {
// アクターシステムおよびその他「おまじない」を `implicit` で宣言
implicit val system = ActorSystem("mySystem")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
// ルーティング GET '/'
val route = path("") {
get {
complete {
<h1>Say hello to akka-http</h1>
} // ↑*1
}
}
// IP とポートを指定してリッスン開始
val bindingFuture = Http().bindAndHandle(route, "127.0.0.1", 8080)
// 簡単のためデーモン化せず、リターンを入力すれば停止するようにしています。
println("Server online at http://127.0.0.1:8080/\nPress RETURN to stop...")
scala.io.StdIn.readLine()
bindingFuture.flatMap(_.unbind()).onComplete(_ => system.terminate())
}
}
GET リクエストに対する複数のルーティングを設定する例です。
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._
import akka.http.scaladsl.Http
object Main {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("mySystem")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
// ルーティング
val route = get {
pathSingleSlash {
complete {
<html><body>Hello world!</body></html>
}
}
} ~
path("ok") {
complete("ok")
} ~
path("not_ok") {
sys.error("not_ok") // 500 エラー
}
val bindingFuture = Http().bindAndHandle(route, "127.0.0.1", 8080)
println("Server online at http://127.0.0.1:8080/\nPress RETURN to stop...")
scala.io.StdIn.readLine()
bindingFuture.flatMap(_.unbind()).onComplete(_ => system.terminate())
}
}
akka-http-core
だけで動作します。
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.HttpMethods._
import scala.concurrent.Future
object Main {
def main(args: Array[String]): Unit = {
// おまじないを三行
implicit val system = ActorSystem() // akka-http は akka-stream を用いて実装されています。akka-stream 内にはアクターシステムが存在します。
implicit val materializer = ActorMaterializer() // 「ストリームを実行(run)するためのシステム」を生成するもの。
implicit val ec = system.dispatcher
// HTTP コネクションの発生源 (Source)
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
Http().bind(interface = "127.0.0.1", port = 8080)
// 下記 `map` で使用される関数
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(GET, Uri.Path("/"), _, _, _) => {
HttpResponse(entity = "ok")
}
case HttpRequest(GET, Uri.Path("/not_ok"), _, _, _) => {
sys.error("not_ok")
}
case _: HttpRequest => {
HttpResponse(404, entity = "Not Found.")
}
}
// HTTP コネクションが処理される出口 (Sink)
val connectionHandler: Sink[Http.IncomingConnection, _] = Sink.foreach { connection: Http.IncomingConnection =>
println("Accepted new connection from " + connection.remoteAddress)
// コネクション内の HTTP リクエストが処理される出口 (Flow + Sink = Sink)
connection.handleWith {
Flow[HttpRequest].map(requestHandler)
}
}
// 非同期にコネクションを確立
val bindingFuture: Future[Http.ServerBinding] = serverSource.to(connectionHandler).run()
// エンターが入力されるまで待つ
println("Server online at http://127.0.0.1:8080/\nPress RETURN to stop...")
scala.io.StdIn.readLine()
// エンターが押されてここに処理がきたら、
bindingFuture
.flatMap(_.unbind()) // まずポートのバインドを解除
.onComplete(_ => system.terminate()) // 次にアクターシステムを終了
}
}
いくつかのレベルの API が提供されています。低レベルの API はカスタマイズ性は高いですが、コード量は増えます。逆に高レベルの API は柔軟性が低い代わりに簡潔なコードになります。
最も低レベルな API です。akka-http-core
だけで動作します。
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
object Main {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
// コネクション (Flow で表現)
val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] =
Http().outgoingConnection("www.example.com")
// Http().outgoingConnectionHttps("www.example.com") // https の場合
// RunnableGraph 化して run する
val responseFuture: Future[HttpResponse] =
Source.single(HttpRequest(uri = "/"))
.via(connectionFlow)
.runWith(Sink.head) // Sink.head はストリームの最初の要素 (HttpResponse) を含む Future を返す
// 完了するのを待つ
responseFuture.foreach { response =>
println(response.status) //=> 200 OK
system.terminate() // アクターシステムを終了
}
}
}
最も高レベルな API です。これも akka-http-core
だけで動作します。
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
object Main {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
// ストリームの Source/Sink/Flow を使用せずに表現
val responseFuture: Future[HttpResponse] =
Http().singleRequest(HttpRequest(uri = "http://www.example.com"))
// Http().singleRequest(HttpRequest(uri = "https://www.example.com")) // https の場合
// 完了するのを待つ
responseFuture.foreach { response =>
println(response.status) //=> 200 OK
}
}
}
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Actor
import akka.event.Logging
import scala.concurrent.ExecutionContext.Implicits.global
class MyActor extends Actor {
import akka.pattern.pipe
// アクター内では明示的に system を渡します
final implicit val materializer = ActorMaterializer(ActorMaterializerSettings(context.system))
val http = Http(context.system)
val log = Logging(context.system, this)
def receive = {
case s: String => {
// 非同期処理が完了したら self にメッセージとしてパイプします。
log.info("received: %s" format s)
http.singleRequest(HttpRequest(uri = "http://www.example.com")).pipeTo(self)
// http.singleRequest(HttpRequest(uri = "https://www.example.com")).pipeTo(self) // https の場合
}
case HttpResponse(StatusCodes.OK, headers, entity, _) => {
log.info("Got response, body: " + entity.toString)
}
case HttpResponse(code, _, _, _) => {
log.info("Request failed, response code: " + code)
}
case _ => {
}
}
}
object Main {
def main(args: Array[String]): Unit = {
val system = ActorSystem("mySystem")
val props = Props[MyActor]
val actor = system.actorOf(props, name = "myActor")
while(true) {
actor ! "hi!"
Thread.sleep(1000)
}
}
}
実行例
[INFO] [03/12/2016 22:46:12.544] [mySystem-akka.actor.default-dispatcher-4] [akka://mySystem/user/myActor] received: hi!
[INFO] [03/12/2016 22:46:12.846] [mySystem-akka.actor.default-dispatcher-4] [akka://mySystem/user/myActor] received: hi!
[INFO] [03/12/2016 22:46:13.644] [mySystem-akka.actor.default-dispatcher-9] [akka://mySystem/user/myActor] Got response, body: HttpEntity.Strict(text/html; charset=UTF-8,<!doctype html>
<html>
<head>
<title>Example Domain</title>
...
Spray では WebSocket がサポートされていないのに対して Akka HTTP ではサーバーサイドおよびクライアントサイドの両方について WebSocket がサポートされています。
UpgradeToWebSocket
ヘッダーを受信したら handleMessages
によって myWebSocketService
(Flow) につなぎます。WebSocket では HTTP の場合と異なり connection はずっと一つです。一つのコネクションの中で無数の Message がやり取りされます。Message の形式はテキストまたはバイナリが選択できます。受信した Message を Source として扱い、送信する Message を Sink として扱うことで、完全な Message でなくても動作するようにすることが推奨されています。
Therefore, application code must be able to handle both streaming and strict messages and not expect certain messages to be strict.
Main.scala (akka-http-core
だけで動作します)
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._
import akka.http.scaladsl.model.HttpMethods._
import scala.concurrent.Future
object Main {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
// HTTP コネクションの発生源
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
Http().bind(interface = "127.0.0.1", port = 8080)
// 下記 `requestHandler` で使用される Flow (Stream 版)
val myWebSocketService: Flow[Message, Message, _] =
Flow[Message].mapConcat {
// `mapConcat` は List(List(1),List(2)) を List(1,2) にする `flatMap` のようなものです。
// 参考: http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-quickstart.html
case tm: TextMessage => {
// WebSocket では TextMessage または BinaryMessage をやり取りします。
// すべて受信せずに応答を開始するためにはストリームの Source (textStream) から Message を生成します。
// - TextMessage.textStream は Source[String, _] を返します。
// - BinaryMessage.dataStream は Source[ByteString, _] を返します。
// Source の連結は `++` 演算子で行います。
TextMessage(Source.single("Hello ") ++ tm.textStream) :: Nil // `mapConcat` を利用しているためリスト化しておきます。
}
case bm: BinaryMessage => {
// バイナリメッセージを skip します (Sink.ignore に流します)
bm.dataStream.runWith(Sink.ignore)
// `map` ではなく、わざわざ `mapConcat` にしている理由は List(List(), List(1)) を List(1) にできるためです。
// Nil (= List() 空リスト) を返して、何もなかったことにします。
Nil
}
}
// 下記 `requestHandler` で使用される Flow (Strict 版)
val myWebSocketService2: Flow[Message, Message, _] =
Flow[Message].map { // `map` を使用する例を兼ねることにします。
case TextMessage.Strict(txt) => {
// 完全なメッセージを受信するまで待ちます。
// Strict (完全なメッセージ) を受信してから、完全なメッセージで応答します。
// つまり、TextMessage(textStream: Source[String, Any]) ではなく TextMessage(text: String) で応答します。
TextMessage("Hello " + txt)
}
case _ => {
// バイナリメッセージを受信
TextMessage("Message type unsupported")
}
}
// 下記 `map` で使用される関数
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(GET, Uri.Path("/"), _, _, _) => {
HttpResponse(entity = "ok")
}
case req @ HttpRequest(GET, Uri.Path("/ws"), _, _, _) =>
// `@` で HttpRequest をまとめてバインドする Scala の文法です。
// 参考: https://www.qoosky.io/techs/c7921ddb98
req.header[UpgradeToWebSocket] match {
case Some(upgrade) => {
upgrade.handleMessages(myWebSocketService)
// upgrade.handleMessages(myWebSocketService2)
}
case None => {
HttpResponse(400, entity = "Not a valid websocket request!")
}
}
case _: HttpRequest => {
HttpResponse(404, entity = "Not Found.")
}
}
// HTTP コネクションが処理される出口 (Sink)
val connectionHandler: Sink[Http.IncomingConnection, _] = Sink.foreach { connection: Http.IncomingConnection =>
println("Accepted new connection from " + connection.remoteAddress)
// コネクション内の HTTP リクエストが処理される出口 (Flow + Sink = Sink)
connection.handleWith {
Flow[HttpRequest].map(requestHandler)
}
}
// 非同期にコネクションを確立
val bindingFuture: Future[Http.ServerBinding] = serverSource.to(connectionHandler).run()
// エンターが入力されるまで待つ
println("Server online at http://127.0.0.1:8080/\nPress RETURN to stop...")
scala.io.StdIn.readLine()
// エンターが押されてここに処理がきたら、
bindingFuture
.flatMap(_.unbind()) // まずポートのバインドを解除
.onComplete(_ => system.terminate()) // 次にアクターシステムを終了
}
}
client.html
<html>
<head>
<script type="text/javascript">
// 接続
var ws = new WebSocket("ws://127.0.0.1:8080/ws");
// コネクション確立時のイベント
ws.onopen = function(){
console.log('Initialize Success');
}
// メッセージ受信時のイベント
ws.onmessage = function(message){
console.log('received : ' + message.data);
}
// エラー時のイベント
ws.onerror = function(){
console.log('Error!');
}
// コネクション切断時のイベント
ws.onclose = function(){
console.log('Connection Close!');
}
// 別のページに遷移する前やブラウザが閉じられる前に発生するイベント
window.onbeforeunload = function(){
// コネクションを切断
ws.onclose();
}
// メッセージを送信する関数
function send(){
ws.send("qoosky");
}
</script>
</head>
<body>
<a href="#" onclick="send()">送信</a>
</body>
</html>
実行例
akka-http-core
だけでなく akka-http-experimental
を利用すると、同じ処理がより簡潔に記述できます。
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.server.Directives._ // 下記 `get` などの Routing DSL ディレクティブが使用可能になる
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws._
object Main {
def main(args: Array[String]): Unit = {
// アクターシステムおよびその他「おまじない」を `implicit` で宣言
implicit val system = ActorSystem("mySystem")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
// 下記 `handleWebSocketMessages` で使用される Flow (Strict 版)
val myWebSocketService: Flow[Message, Message, _] =
Flow[Message].map {
case TextMessage.Strict(txt) => {
// 完全なメッセージを受信するまで待ちます。
// Strict (完全なメッセージ) を受信してから、完全なメッセージで応答します。
// つまり、TextMessage(textStream: Source[String, Any]) ではなく TextMessage(text: String) で応答します。
TextMessage("Hello " + txt)
}
case _ => {
// バイナリメッセージを受信
TextMessage("Message type unsupported")
}
}
// ルーティング
val route = path("") {
get {
complete("ok")
}
} ~
path("ws") {
handleWebSocketMessages(myWebSocketService)
}
// IP とポートを指定してリッスン開始
val bindingFuture = Http().bindAndHandle(route, "127.0.0.1", 8080)
// 簡単のためデーモン化せず、リターンを入力すれば停止するようにしています。
println("Server online at http://127.0.0.1:8080/\nPress RETURN to stop...")
scala.io.StdIn.readLine()
bindingFuture.flatMap(_.unbind()).onComplete(_ => system.terminate())
}
}