Akka stream
以前の記事でAkka Httpを利用したストリームクライアントを作成し、NTRIP特有のICY
やSOURCETABLE
というプロトコルに玉砕しました。
HTTPではないプロトコルに対応するため、Akka streamを利用してクライアントを作成します。
Tcp#outgoingConnection
リクエストを送信してメッセージを受信するだけならば、Akka streamの流れを利用すれば取得できます。 ただ今回は接続先から受信したメッセージを基にリクエストを生成するというループ構成にしたかったので、下図のフローを考えました。 ループの用途として
- リクエストメッセージ送信前にHTTP Proxyとのコネクションをはる
- ストリームに対する定期的な応答
を想定しており、今回の実装案ではHTTP Proxyとのコネクションのみが対象です。
HTTP Proxyとの接続
HTTP Proxyとの接続もAkka streamの機能を利用したかったのですが、TCP#outgoingConnectionでのProxy接続は実装完了してないようです。
またTLSでない接続は接続先をProxyとし、URLをドメインを含んだフルパスにするというのが基本ですが、Akka HTTPに合わせてconnectメソッドでの接続としています。
メッセージ処理パート
メッセージの送信と受信処理はhandler: ActorRedで一括して行っています(フロー図の左部分)。
proxy接続の場合は
- connectメッセージを送信
- connection established待ち
- リクエストメッセージを送信
- レスポンスを表示
という流れで、直接接続の場合はproxy connectのシーケンスを省いています。
object connectHandler { def props(url: URL, sourceActor: ActorRef): Props = Props(new connectHandler(url, sourceActor)) } class connectHandler(url: URL, sourceActor: ActorRef) extends Actor { import context.{become, unbecome} // メッセージ受信時の動作(通常時)。 override def receive: Receive = { case ConnectReady => sourceActor ! makeGetMessage() case WaitProxy => sourceActor ! makeConnectMessage() become(proxyStage) case b: ByteString => println(HexOutput.dump(b)) // Tcp().outgoingConnectionで受信したByteStringの出力 } // Proxy Connection待ちのときの動作 val connectAcceptPattern: Regex = "^HTTP/1\\.1 200[\\s|\\S]*".r def proxyStage: Receive = { case b: ByteString => b.utf8String match { case connectAcceptPattern() => sourceActor ! makeGetMessage() unbecome() case _ => sourceActor ! PoisonPill } } val host: String = url.getHost val port: Int = url.getPort match { case -1 => url.getDefaultPort case i: Int => i } val path: String = url.getPath match { case "" => "/" case s: String => s } def makeConnectMessage(): ByteString = { ByteString(s"CONNECT $host:$port HTTP/1.1\r\nHost: $host\r\n\r\n") } def makeGetMessage(): ByteString = { ByteString(s"GET $path HTTP/1.1\r\nHost: $host:$port\r\nUser-Agent: NTRIP Client\r\nConnection: close\r\n\r\n") } }
streamパート
TCPストリーム部分はhandlerが送信したメッセージを受信するActorとSourceを連携させています(フロー図の中から右側部分)。
object TcpClient { def apply(dstAddress: String) = new TcpClient(dstAddress) def apply(dstAddress: String, proxyAddress: String, proxyPort:Int) = new TcpClient(dstAddress, Option(proxyAddress), Option(proxyPort)) } class TcpClient(dstAddress: String, proxyAddress: Option[String] = None, proxyPort: Option[Int] = None) { val url: URL = new URL(dstAddress) val host: String = proxyAddress.getOrElse(url.getHost) // TCP connection 接続先アドレス val port: Int = proxyPort.getOrElse(if(url.getPort == -1) url.getDefaultPort else url.getPort) // TCP connection 接続先ポート implicit val system: ActorSystem = ActorSystem() implicit val materializer: ActorMaterializer = ActorMaterializer() implicit val executionContext: ExecutionContextExecutor = system.dispatcher // stream SourceにActorを設定する val baseSource: Source[ByteString, ActorRef] = Source.actorRef[ByteString](2048, OverflowStrategy.fail) // Actor名を取得 val (flowActor: ActorRef, source: baseSource.ReprMat[ByteString, NotUsed]) = baseSource.preMaterialize() // メッセージ処理用Actorを生成 val handler: ActorRef = system.actorOf(connectHandler.props(url, flowActor)) val flow: Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]] = Tcp().outgoingConnection(host, port) val sink: Sink[ByteString, Future[Done]] = Sink.foreach(b => handler ! b) val result: Future[Done] = source.via(flow).runWith(sink) // Proxyの設定に基づきメッセージ処理用Actorに送信 if(proxyAddress.isEmpty) handler ! ConnectReady else handler ! WaitProxy // 終了処理 result.onComplete { case Success(_) => Thread.sleep(1000) println("connection closed") system.terminate() case Failure(e) => println("Error: " + e.getMessage) system.terminate() } }
参考: mainパート
実際に動かしてみたい人向けにmain部分も載せておきます。
package akkaGraph import java.net.URL import akka.{Done, NotUsed} import akka.actor.{Actor, ActorRef, ActorSystem, PoisonPill, Props} import akka.stream.scaladsl.{Flow, Sink, Source, Tcp} import akka.stream.{ActorMaterializer, OverflowStrategy} import akka.util.ByteString import akkaGraph.MessageStatus.{ConnectReady, WaitProxy} import scala.concurrent.{ExecutionContextExecutor, Future} import scala.util.matching.Regex import scala.util.{Failure, Success} object SimpleConnect { def main(args: Array[String]): Unit = { val dstAddress: String = "接続先アドレス" val (proxyAddress: String, proxyPort: Int) = ("プロキシアドレス", プロキシポート) TcpClient(dstAddress) // 直接接続の場合 // TcpClient(dstAddress, proxyAddress, proxyPort) // proxy経由の場合 } } object MessageStatus { case object ConnectReady case object WaitProxy }
実装にあたって
Akka streamのSource - Flow - Sinkというクローズの流れの中でどうやってループさせるのだという問題を解決するためにずいぶん時間を要しました。
Actorを利用しているためback-puressureが効かなかったりしますが、他にSource.queueを利用したり、Graph内部でループさせる方法もあると思うのでいろいろ試してみてください。