Akka HTTPを利用したストリームクライアント

ストリームクライアント

NtripプロトコルはHTTPライクなシーケンスの後、ストリームでRTCMが送信されます。
新たに学び始めたscalaで、本シーケンスをシミュレートするクライアントを作成していきたいと思います。

Akka streaming / HTTP

scalaにはHTTPスタックを実装したAkka httpモジュールが存在します。
HTTPスタックの実装は大変なので、まずは本モジュールを利用してHTTP GET送信、HTTP Response受信のベースを作成します。

Request送信 -> Response受信のサンプル(HTTP Proxy経由)

※自己学習を兼ねているので返り値の型指定や関数の分割を行っています。

import java.net.InetSocketAddress
import akka.Done
import akka.util.ByteString
import akka.actor.ActorSystem
import akka.http.scaladsl.{ClientTransport, Http}
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.model.headers.`User-Agent`
import akka.http.scaladsl.model.headers.Connection
import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings}
import akka.stream.scaladsl.{Source, Sink}
import akka.stream.ActorMaterializer

import scala.util.{Failure, Success}
import scala.concurrent.Future

object TestClient {
  def main(args: Array[String]): Unit = {
    implicit val system: ActorSystem = ActorSystem("stream")
    implicit val materializer: ActorMaterializer = ActorMaterializer()
    implicit val executionContext: ExecutionContextExecutor = system.dispatcher

    // HTTP Proxy設定
    val proxyHost: String = "プロキシのアドレス"
    val proxyPort: Int = 8080
    val httpsProxyTransport: ClientTransport = ClientTransport.httpsProxy(InetSocketAddress.createUnresolved(proxyHost, proxyPort))
    val settings: ConnectionPoolSettings = ConnectionPoolSettings(system)
      .withConnectionSettings(ClientConnectionSettings(system)
        .withTransport(httpsProxyTransport))

    // HTTP Requestを生成
    val connectUri: String = "http://接続先のURI"
    val request: HttpRequest = HttpRequest(uri = connectUri).addHeader(`User-Agent`.apply("Stream Client 0.1")).addHeader(Connection.apply("close"))

    // HTTP Requestを送信し、responseをFutureにセット
    val response: Future[HttpResponse] = Http().singleRequest(request, settings = settings)

    // Response受信処理
    response.onComplete {
      case Success(res: HttpResponse) =>
        println("protocol: " + res.protocol)
        println("status: " + res.status)

        // HTTP body/ストリームはHttpResponseのentity.dataByteにSourceで収容されている
        val bodies: Source[ByteString, Any] = res.entity.dataBytes
        // ストリームのbyte列受信時にイベントが発生するので、foreachで取得する
        val result: Future[Done] = bodies.runWith(Sink.foreach{body: ByteString => println(HexOutput.dump(body))})
        // ストリーム終了処理
        result.onComplete { _ =>
          println("shutting down connection")
          system.terminate()
        }
      case Failure(e: Throwable) =>
        println("could not connect: " + connectUri + s" (${e.getMessage})")
        system.terminate()
    }
  }
}

このHTTPスタックではproxy経由の接続はuriの変更ではなく、connectでの接続になります。 手を抜いている感がありますが、接続に支障はないので進めます。

HexOutput.dump()の部分は、ByteStringをバイナリとアスキーで出力するコードとなりますが、以下のコードが参考になります。

Proxyを経由しない場合

    // 以下の行を変更
    //  val response: Future[HttpResponse] = Http().singleRequest(request, settings = settings)
    val response: Future[HttpResponse] = Http().singleRequest(request)

サンプルをNtripCasterに接続するとどうなるか

接続先をNtripCasterに向けて接続するとHTTP Proxy経由ならば動作しますが(Proxyの仕様による)、直接続の場合はakka.http.scaladsl.model.IllegalResponseExceptionの例外が発生します。

これはNtripの仕様でResponseのプロトコルがICYSOURCETABLEとなっているため、HTTPプロトコルでなくなってしまうためです(sourcetableに関してはリクエストヘッダのuser-agentが`Ntrip Client"になっていない場合、NtripCasterがブラウザと判断してHTTPで返信してくるケースもあります)。

ちょっと調べた限りではサポートプロトコルを増やす方法は分かりませんでした。ご存じの方がいらっしゃればぜひ教えていただきたいです。

今後

akka HTTPの基礎は理解できましたが、接続部分を解決しないとプロトコル解析に進めないので、低レベルAPIを利用したコード作成に取り掛かる予定です。

misatowater.hatenablog.com