Reading from JMS

Example: Read text messages from JMS queue and append to file

  • listens to the JMS queue “test” receiving javax.jms.Messages (1),
  • converts the JMS message to a javax.jms.TextMessage and extracts the text (3),
  • converts incoming data to akka.util.ByteString (4),
  • and appends the data to the file target/out (2).
Scala
val jmsSource: Source[Message, NotUsed] = // (1)
  JmsSource(
    JmsSourceSettings(connectionFactory).withBufferSize(10).withQueue("test")
  )

val fileSink: Sink[ByteString, Future[IOResult]] = // (2)
  FileIO.toPath(Paths.get("target/out.txt"),
                Set(WRITE, TRUNCATE_EXISTING, CREATE))

val finished: Future[IOResult] =
  jmsSource
    .map(_.asInstanceOf[TextMessage].getText) // (3)
    .map(ByteString(_)) // (4)
    .runWith(fileSink)
Scala Imports
import java.nio.file.Paths
import java.nio.file.StandardOpenOption._
import javax.jms.{Message, TextMessage}

import akka.NotUsed
import akka.stream.IOResult
import akka.stream.alpakka.jms.JmsSourceSettings
import akka.stream.alpakka.jms.scaladsl.JmsSource
import akka.stream.scaladsl.{FileIO, Sink, Source}
import akka.util.ByteString

import scala.concurrent.Future
import scalasthlm.alpakka.playground.ActiveMqBroker

Example: Read text messages from JMS queue and create one file per message

  • listens to the JMS queue “test” receiving javax.jms.Messages (1),
  • converts the JMS message to a javax.jms.TextMessage and extracts the text (2),
  • converts incoming data to akka.util.ByteString (3),
  • combines the incoming data with a counter (4),
  • creates an intermediary stream writing the incoming data to a file using the counter value to create unique file names (5).
Scala
val jmsSource: Source[Message, NotUsed] = // (1)
  JmsSource(
    JmsSourceSettings(connectionFactory).withBufferSize(10).withQueue("test")
  )

jmsSource
  .map(_.asInstanceOf[TextMessage].getText)               // (2)
  .map(ByteString(_))                                     // (3)
  .zip(Source.fromIterator(() => Iterator.from(0)))       // (4)
  .mapAsync(parallelism = 5) { case (byteStr, number) =>
    Source                                                // (5)
      .single(byteStr)
      .runWith(FileIO.toPath(Paths.get(s"target/out-${number}.txt"),
                             Set(WRITE, TRUNCATE_EXISTING, CREATE)))
  }
  .runWith(Sink.ignore)
Scala Imports
import java.nio.file.Paths
import java.nio.file.StandardOpenOption.{CREATE, TRUNCATE_EXISTING, WRITE}
import javax.jms.{Message, TextMessage}

import akka.NotUsed
import akka.stream.alpakka.jms.JmsSourceSettings
import akka.stream.alpakka.jms.scaladsl.JmsSource
import akka.stream.scaladsl.{FileIO, Sink, Source}
import akka.util.ByteString

import scalasthlm.alpakka.playground.ActiveMqBroker

Example: Read text messages from JMS queue and send to web server

Scala
val jmsSource: Source[Message, NotUsed] = // (1)
  JmsSource(
    JmsSourceSettings(connectionFactory).withBufferSize(10).withQueue("test")
  )

val httpConnectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = // (2)
  Http().outgoingConnection("localhost", 8080)

val finished: Future[Done] =
  jmsSource
    .map(_.asInstanceOf[TextMessage].getText) // (3)
    .map(ByteString(_)) // (4)
    .map(bs => HttpRequest(uri = Uri("/hello"), entity = HttpEntity(bs))) // (5)
    .via(httpConnectionFlow) // (6)
    .runWith(Sink.foreach(println))
Scala Imports
import javax.jms.{Message, TextMessage}

import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.alpakka.jms.JmsSourceSettings
import akka.stream.alpakka.jms.scaladsl.JmsSource
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.util.ByteString
import akka.{Done, NotUsed}

import scala.concurrent.Future
import scalasthlm.alpakka.playground.{ActiveMqBroker, WebServer}

Example: Read text messages from JMS queue and send to web socket

Scala
val jmsSource: Source[jms.Message, NotUsed] =
  JmsSource(
    JmsSourceSettings(connectionFactory).withBufferSize(10).withQueue("test")
  )

val webSocketFlow: Flow[ws.Message, ws.Message, Future[WebSocketUpgradeResponse]] =
  Http().webSocketClientFlow(WebSocketRequest("ws://localhost:8080/webSocket/ping"))

val (wsUpgradeResponse, finished): (Future[WebSocketUpgradeResponse], Future[Done]) =
  jmsSource
    .map(_.asInstanceOf[jms.TextMessage].getText)
    .map(ws.TextMessage(_))
    .viaMat(webSocketFlow)(Keep.right)
    .mapAsync(1)(wsMessageToString)
    .map("client received: " + _)
    .toMat(Sink.foreach(println))(Keep.both)
    .run()
Scala Imports
import javax.jms

import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws.{WebSocketRequest, WebSocketUpgradeResponse}
import akka.stream.alpakka.jms.JmsSourceSettings
import akka.stream.alpakka.jms.scaladsl.JmsSource
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.{Done, NotUsed}

import scala.collection.immutable.Seq
import scala.concurrent.Future
import scalasthlm.alpakka.playground.{ActiveMqBroker, WebServer}

Running the example code

This example is contained in a stand-alone runnable main, it can be run from sbt like this:

Scala
sbt
> jmsToFile/run
The source code for this page can be found here.