Reading from JMS
Example: Read text messages from JMS queue and append to file
- listens to the JMS queue “test” receiving
javax.jms.Message
s (1), - converts the JMS message to a
javax.jms.TextMessage
and extracts the text (3), - converts incoming data to
akka.util.ByteString
(4), - and appends the data to the file
target/out
(2).
- Scala
-
val jmsSource: Source[Message, NotUsed] = // (1) JmsSource( JmsSourceSettings(connectionFactory).withBufferSize(10).withQueue("test") ) val fileSink: Sink[ByteString, Future[IOResult]] = // (2) FileIO.toPath(Paths.get("target/out.txt"), Set(WRITE, TRUNCATE_EXISTING, CREATE)) val finished: Future[IOResult] = jmsSource .map(_.asInstanceOf[TextMessage].getText) // (3) .map(ByteString(_)) // (4) .runWith(fileSink)
- Scala Imports
-
import java.nio.file.Paths import java.nio.file.StandardOpenOption._ import javax.jms.{Message, TextMessage} import akka.NotUsed import akka.stream.IOResult import akka.stream.alpakka.jms.JmsSourceSettings import akka.stream.alpakka.jms.scaladsl.JmsSource import akka.stream.scaladsl.{FileIO, Sink, Source} import akka.util.ByteString import scala.concurrent.Future import scalasthlm.alpakka.playground.ActiveMqBroker
Example: Read text messages from JMS queue and create one file per message
- listens to the JMS queue “test” receiving
javax.jms.Message
s (1), - converts the JMS message to a
javax.jms.TextMessage
and extracts the text (2), - converts incoming data to
akka.util.ByteString
(3), - combines the incoming data with a counter (4),
- creates an intermediary stream writing the incoming data to a file using the counter value to create unique file names (5).
- Scala
-
val jmsSource: Source[Message, NotUsed] = // (1) JmsSource( JmsSourceSettings(connectionFactory).withBufferSize(10).withQueue("test") ) jmsSource .map(_.asInstanceOf[TextMessage].getText) // (2) .map(ByteString(_)) // (3) .zip(Source.fromIterator(() => Iterator.from(0))) // (4) .mapAsync(parallelism = 5) { case (byteStr, number) => Source // (5) .single(byteStr) .runWith(FileIO.toPath(Paths.get(s"target/out-${number}.txt"), Set(WRITE, TRUNCATE_EXISTING, CREATE))) } .runWith(Sink.ignore)
- Scala Imports
-
import java.nio.file.Paths import java.nio.file.StandardOpenOption.{CREATE, TRUNCATE_EXISTING, WRITE} import javax.jms.{Message, TextMessage} import akka.NotUsed import akka.stream.alpakka.jms.JmsSourceSettings import akka.stream.alpakka.jms.scaladsl.JmsSource import akka.stream.scaladsl.{FileIO, Sink, Source} import akka.util.ByteString import scalasthlm.alpakka.playground.ActiveMqBroker
Example: Read text messages from JMS queue and send to web server
- Scala
-
val jmsSource: Source[Message, NotUsed] = // (1) JmsSource( JmsSourceSettings(connectionFactory).withBufferSize(10).withQueue("test") ) val httpConnectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = // (2) Http().outgoingConnection("localhost", 8080) val finished: Future[Done] = jmsSource .map(_.asInstanceOf[TextMessage].getText) // (3) .map(ByteString(_)) // (4) .map(bs => HttpRequest(uri = Uri("/hello"), entity = HttpEntity(bs))) // (5) .via(httpConnectionFlow) // (6) .runWith(Sink.foreach(println))
- Scala Imports
-
import javax.jms.{Message, TextMessage} import akka.http.scaladsl.Http import akka.http.scaladsl.model._ import akka.stream.alpakka.jms.JmsSourceSettings import akka.stream.alpakka.jms.scaladsl.JmsSource import akka.stream.scaladsl.{Flow, Sink, Source} import akka.util.ByteString import akka.{Done, NotUsed} import scala.concurrent.Future import scalasthlm.alpakka.playground.{ActiveMqBroker, WebServer}
Example: Read text messages from JMS queue and send to web socket
- Scala
-
val jmsSource: Source[jms.Message, NotUsed] = JmsSource( JmsSourceSettings(connectionFactory).withBufferSize(10).withQueue("test") ) val webSocketFlow: Flow[ws.Message, ws.Message, Future[WebSocketUpgradeResponse]] = Http().webSocketClientFlow(WebSocketRequest("ws://localhost:8080/webSocket/ping")) val (wsUpgradeResponse, finished): (Future[WebSocketUpgradeResponse], Future[Done]) = jmsSource .map(_.asInstanceOf[jms.TextMessage].getText) .map(ws.TextMessage(_)) .viaMat(webSocketFlow)(Keep.right) .mapAsync(1)(wsMessageToString) .map("client received: " + _) .toMat(Sink.foreach(println))(Keep.both) .run()
- Scala Imports
-
import javax.jms import akka.http.scaladsl.Http import akka.http.scaladsl.model._ import akka.http.scaladsl.model.ws.{WebSocketRequest, WebSocketUpgradeResponse} import akka.stream.alpakka.jms.JmsSourceSettings import akka.stream.alpakka.jms.scaladsl.JmsSource import akka.stream.scaladsl.{Flow, Keep, Sink, Source} import akka.{Done, NotUsed} import scala.collection.immutable.Seq import scala.concurrent.Future import scalasthlm.alpakka.playground.{ActiveMqBroker, WebServer}
Running the example code
This example is contained in a stand-alone runnable main, it can be run from sbt
like this:
- Scala
-
sbt > jmsToFile/run
The source code for this page can be found here.