Akka streamを利用したストリームクライアント

Akka stream

misatowater.hatenablog.com

以前の記事でAkka Httpを利用したストリームクライアントを作成し、NTRIP特有のICYSOURCETABLEというプロトコルに玉砕しました。
HTTPではないプロトコルに対応するため、Akka streamを利用してクライアントを作成します。

Tcp#outgoingConnection

リクエストを送信してメッセージを受信するだけならば、Akka streamの流れを利用すれば取得できます。 f:id:misatospring:20190701223942j:plain ただ今回は接続先から受信したメッセージを基にリクエストを生成するというループ構成にしたかったので、下図のフローを考えました。 f:id:misatospring:20190701223950j:plain ループの用途として

  • リクエストメッセージ送信前にHTTP Proxyとのコネクションをはる
  • ストリームに対する定期的な応答

を想定しており、今回の実装案ではHTTP Proxyとのコネクションのみが対象です。

HTTP Proxyとの接続

HTTP Proxyとの接続もAkka streamの機能を利用したかったのですが、TCP#outgoingConnectionでのProxy接続は実装完了してないようです。
またTLSでない接続は接続先をProxyとし、URLをドメインを含んだフルパスにするというのが基本ですが、Akka HTTPに合わせてconnectメソッドでの接続としています。

メッセージ処理パート

メッセージの送信と受信処理はhandler: ActorRedで一括して行っています(フロー図の左部分)。
proxy接続の場合は

  • connectメッセージを送信
  • connection established待ち
  • リクエストメッセージを送信
  • レスポンスを表示

という流れで、直接接続の場合はproxy connectのシーケンスを省いています。

object connectHandler {
  def props(url: URL, sourceActor: ActorRef): Props = Props(new connectHandler(url, sourceActor))
}

class connectHandler(url: URL, sourceActor: ActorRef) extends Actor {
  import context.{become, unbecome}

  // メッセージ受信時の動作(通常時)。
  override def receive: Receive = {
    case ConnectReady =>
      sourceActor ! makeGetMessage()
    case WaitProxy =>
      sourceActor ! makeConnectMessage()
      become(proxyStage)
    case b: ByteString =>
      println(HexOutput.dump(b))  // Tcp().outgoingConnectionで受信したByteStringの出力
  }

  // Proxy Connection待ちのときの動作
  val connectAcceptPattern: Regex = "^HTTP/1\\.1 200[\\s|\\S]*".r
  def proxyStage: Receive = {
    case b: ByteString => b.utf8String match {
      case connectAcceptPattern() =>
        sourceActor ! makeGetMessage()
        unbecome()
      case _ =>
        sourceActor ! PoisonPill
    }
  }

  val host: String = url.getHost
  val port: Int = url.getPort match {
    case -1 => url.getDefaultPort
    case i: Int => i
  }
  val path: String = url.getPath match {
    case "" => "/"
    case s: String => s
  }

  def makeConnectMessage(): ByteString = {
    ByteString(s"CONNECT $host:$port HTTP/1.1\r\nHost: $host\r\n\r\n")
  }

  def makeGetMessage(): ByteString = {
    ByteString(s"GET $path HTTP/1.1\r\nHost: $host:$port\r\nUser-Agent: NTRIP Client\r\nConnection: close\r\n\r\n")
  }
}

streamパート

TCPストリーム部分はhandlerが送信したメッセージを受信するActorとSourceを連携させています(フロー図の中から右側部分)。

object TcpClient {
  def apply(dstAddress: String) = new TcpClient(dstAddress)
  def apply(dstAddress: String, proxyAddress: String, proxyPort:Int) =
    new TcpClient(dstAddress, Option(proxyAddress), Option(proxyPort))
}

class TcpClient(dstAddress: String, proxyAddress: Option[String] = None, proxyPort: Option[Int] = None) {
  val url: URL = new URL(dstAddress)
  val host: String = proxyAddress.getOrElse(url.getHost)  // TCP connection 接続先アドレス
  val port: Int =
    proxyPort.getOrElse(if(url.getPort == -1) url.getDefaultPort else url.getPort) // TCP connection 接続先ポート

  implicit val system: ActorSystem = ActorSystem()
  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  // stream SourceにActorを設定する
  val baseSource: Source[ByteString, ActorRef] = Source.actorRef[ByteString](2048, OverflowStrategy.fail)
  // Actor名を取得
  val (flowActor: ActorRef, source: baseSource.ReprMat[ByteString, NotUsed]) = baseSource.preMaterialize()
  // メッセージ処理用Actorを生成
  val handler: ActorRef = system.actorOf(connectHandler.props(url, flowActor))

  val flow: Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]] = Tcp().outgoingConnection(host, port)
  val sink: Sink[ByteString, Future[Done]] = Sink.foreach(b => handler ! b)
  val result: Future[Done] = source.via(flow).runWith(sink)

  // Proxyの設定に基づきメッセージ処理用Actorに送信
  if(proxyAddress.isEmpty)
    handler ! ConnectReady
  else
    handler ! WaitProxy

  // 終了処理
  result.onComplete {
    case Success(_) =>
      Thread.sleep(1000)
      println("connection closed")
      system.terminate()
    case Failure(e) =>
      println("Error: " + e.getMessage)
      system.terminate()
  }
}

参考: mainパート

実際に動かしてみたい人向けにmain部分も載せておきます。

package akkaGraph

import java.net.URL
import akka.{Done, NotUsed}
import akka.actor.{Actor, ActorRef, ActorSystem, PoisonPill, Props}
import akka.stream.scaladsl.{Flow, Sink, Source, Tcp}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.util.ByteString
import akkaGraph.MessageStatus.{ConnectReady, WaitProxy}
import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.matching.Regex
import scala.util.{Failure, Success}

object SimpleConnect {
  def main(args: Array[String]): Unit = {
    val dstAddress: String = "接続先アドレス"
    val (proxyAddress: String, proxyPort: Int) = ("プロキシアドレス", プロキシポート)

    TcpClient(dstAddress)  // 直接接続の場合
//  TcpClient(dstAddress, proxyAddress, proxyPort)  // proxy経由の場合
  }
}

object MessageStatus {
  case object ConnectReady
  case object WaitProxy
}

実装にあたって

Akka streamのSource - Flow - Sinkというクローズの流れの中でどうやってループさせるのだという問題を解決するためにずいぶん時間を要しました。
Actorを利用しているためback-puressureが効かなかったりしますが、他にSource.queueを利用したり、Graph内部でループさせる方法もあると思うのでいろいろ試してみてください。