CSV samples
Example: Fetch CSV from Internet and publish the data as JSON
This example uses Akka HTTP to send the HTTP request and Akka HTTPs primary JSON support via Spray JSON to convert the map into a JSON structure.
- (1) trigger an HTTP request every 30 seconds,
- (2) send it to web server,
- (3) continue with the response body as a stream of
ByteString
, - (4) scan the stream for CSV lines,
- (5) convert the CSV lines into maps with the header line as keys,
- (6) local logic to clean the data and convert values to Strings,
- (7) convert the maps to JSON with Spray JSON from Akka HTTP
- Scala
-
Source // stream element type .tick(1.seconds, 30.seconds, httpRequest) //: HttpRequest (1) .mapAsync(1)(Http().singleRequest(_)) //: HttpResponse (2) .flatMapConcat(extractEntityData) //: ByteString (3) .via(CsvParsing.lineScanner()) //: List[ByteString] (4) .via(CsvToMap.toMap()) //: Map[String, ByteString] (5) .map(cleanseCsvData) //: Map[String, String] (6) .map(toJson) //: JsValue (7)
Helper code
- Scala
-
val httpRequest = HttpRequest(uri = "http://www.nasdaq.com/screening/companies-by-name.aspx?exchange=NASDAQ&render=download") def extractEntityData(response: HttpResponse): Source[ByteString, _] = response match { case HttpResponse(OK, _, entity, _) => entity.dataBytes case notOkResponse => Source.failed(new RuntimeException(s"illegal response $notOkResponse")) } def cleanseCsvData(csvData: Map[String, ByteString]): Map[String, String] = csvData .filterNot { case (key, _) => key.isEmpty } .mapValues(_.utf8String) def toJson(map: Map[String, String])( implicit jsWriter: JsonWriter[Map[String, String]]): JsValue = jsWriter.write(map)
Imports
- Scala
-
import akka.http.scaladsl._ import akka.http.scaladsl.model.StatusCodes._ import akka.http.scaladsl.model.{HttpRequest, HttpResponse} import akka.stream.alpakka.csv.scaladsl.{CsvParsing, CsvToMap} import akka.stream.scaladsl.{Keep, Sink, Source} import akka.util.ByteString import spray.json.{DefaultJsonProtocol, JsValue, JsonWriter} import scala.concurrent.duration.DurationInt
Running the example code
This example is contained in a stand-alone runnable main, it can be run from sbt
like this:
- Scala
-
sbt > csvSamples/run
The source code for this page can be found here.