ストリームクライアント
NtripプロトコルはHTTPライクなシーケンスの後、ストリームでRTCMが送信されます。
新たに学び始めたscalaで、本シーケンスをシミュレートするクライアントを作成していきたいと思います。
Akka streaming / HTTP
scalaにはHTTPスタックを実装したAkka httpモジュールが存在します。
HTTPスタックの実装は大変なので、まずは本モジュールを利用してHTTP GET送信、HTTP Response受信のベースを作成します。
Request送信 -> Response受信のサンプル(HTTP Proxy経由)
※自己学習を兼ねているので返り値の型指定や関数の分割を行っています。
import java.net.InetSocketAddress import akka.Done import akka.util.ByteString import akka.actor.ActorSystem import akka.http.scaladsl.{ClientTransport, Http} import akka.http.scaladsl.model.{HttpRequest, HttpResponse} import akka.http.scaladsl.model.headers.`User-Agent` import akka.http.scaladsl.model.headers.Connection import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings} import akka.stream.scaladsl.{Source, Sink} import akka.stream.ActorMaterializer import scala.util.{Failure, Success} import scala.concurrent.Future object TestClient { def main(args: Array[String]): Unit = { implicit val system: ActorSystem = ActorSystem("stream") implicit val materializer: ActorMaterializer = ActorMaterializer() implicit val executionContext: ExecutionContextExecutor = system.dispatcher // HTTP Proxy設定 val proxyHost: String = "プロキシのアドレス" val proxyPort: Int = 8080 val httpsProxyTransport: ClientTransport = ClientTransport.httpsProxy(InetSocketAddress.createUnresolved(proxyHost, proxyPort)) val settings: ConnectionPoolSettings = ConnectionPoolSettings(system) .withConnectionSettings(ClientConnectionSettings(system) .withTransport(httpsProxyTransport)) // HTTP Requestを生成 val connectUri: String = "http://接続先のURI" val request: HttpRequest = HttpRequest(uri = connectUri).addHeader(`User-Agent`.apply("Stream Client 0.1")).addHeader(Connection.apply("close")) // HTTP Requestを送信し、responseをFutureにセット val response: Future[HttpResponse] = Http().singleRequest(request, settings = settings) // Response受信処理 response.onComplete { case Success(res: HttpResponse) => println("protocol: " + res.protocol) println("status: " + res.status) // HTTP body/ストリームはHttpResponseのentity.dataByteにSourceで収容されている val bodies: Source[ByteString, Any] = res.entity.dataBytes // ストリームのbyte列受信時にイベントが発生するので、foreachで取得する val result: Future[Done] = bodies.runWith(Sink.foreach{body: ByteString => println(HexOutput.dump(body))}) // ストリーム終了処理 result.onComplete { _ => println("shutting down connection") system.terminate() } case Failure(e: Throwable) => println("could not connect: " + connectUri + s" (${e.getMessage})") system.terminate() } } }
このHTTPスタックではproxy経由の接続はuriの変更ではなく、connectでの接続になります。 手を抜いている感がありますが、接続に支障はないので進めます。
HexOutput.dump()
の部分は、ByteStringをバイナリとアスキーで出力するコードとなりますが、以下のコードが参考になります。
Proxyを経由しない場合
// 以下の行を変更 // val response: Future[HttpResponse] = Http().singleRequest(request, settings = settings) val response: Future[HttpResponse] = Http().singleRequest(request)
サンプルをNtripCasterに接続するとどうなるか
接続先をNtripCasterに向けて接続するとHTTP Proxy経由ならば動作しますが(Proxyの仕様による)、直接続の場合はakka.http.scaladsl.model.IllegalResponseException
の例外が発生します。
これはNtripの仕様でResponseのプロトコルがICY
やSOURCETABLE
となっているため、HTTPプロトコルでなくなってしまうためです(sourcetableに関してはリクエストヘッダのuser-agentが`Ntrip Client"になっていない場合、NtripCasterがブラウザと判断してHTTPで返信してくるケースもあります)。
ちょっと調べた限りではサポートプロトコルを増やす方法は分かりませんでした。ご存じの方がいらっしゃればぜひ教えていただきたいです。
今後
akka HTTPの基礎は理解できましたが、接続部分を解決しないとプロトコル解析に進めないので、低レベルAPIを利用したコード作成に取り掛かる予定です。