CSV samples

Example: Fetch CSV from Internet and publish the data as JSON

This example uses Akka HTTP to send the HTTP request and Akka HTTPs primary JSON support via Spray JSON to convert the map into a JSON structure.

  • (1) trigger an HTTP request every 30 seconds,
  • (2) send it to web server,
  • (3) continue with the response body as a stream of ByteString,
  • (4) scan the stream for CSV lines,
  • (5) convert the CSV lines into maps with the header line as keys,
  • (6) local logic to clean the data and convert values to Strings,
  • (7) convert the maps to JSON with Spray JSON from Akka HTTP
Scala
Source                                              // stream element type
  .tick(1.seconds, 30.seconds, httpRequest)         //: HttpRequest (1)
  .mapAsync(1)(Http().singleRequest(_))             //: HttpResponse (2)
  .flatMapConcat(extractEntityData)                 //: ByteString (3)
  .via(CsvParsing.lineScanner())                    //: List[ByteString] (4)
  .via(CsvToMap.toMap())                            //: Map[String, ByteString] (5)
  .map(cleanseCsvData)                              //: Map[String, String] (6)
  .map(toJson)                                      //: JsValue (7)

Helper code

Scala
val httpRequest = HttpRequest(uri = "http://www.nasdaq.com/screening/companies-by-name.aspx?exchange=NASDAQ&render=download")

def extractEntityData(response: HttpResponse): Source[ByteString, _] =
  response match {
    case HttpResponse(OK, _, entity, _) => entity.dataBytes
    case notOkResponse =>
      Source.failed(new RuntimeException(s"illegal response $notOkResponse"))
  }

def cleanseCsvData(csvData: Map[String, ByteString]): Map[String, String] =
  csvData
    .filterNot { case (key, _) => key.isEmpty }
    .mapValues(_.utf8String)

def toJson(map: Map[String, String])(
    implicit jsWriter: JsonWriter[Map[String, String]]): JsValue = jsWriter.write(map)

Imports

Scala
import akka.http.scaladsl._
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.alpakka.csv.scaladsl.{CsvParsing, CsvToMap}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.util.ByteString
import spray.json.{DefaultJsonProtocol, JsValue, JsonWriter}

import scala.concurrent.duration.DurationInt

Running the example code

This example is contained in a stand-alone runnable main, it can be run from sbt like this:

Scala
sbt
> csvSamples/run
The source code for this page can be found here.