Streaming data is quite a hot topic right now, so I decided to write something on this topic on my blog. I’m new in that area, but I don’t think this is much different than standard batch processing. Of course, I’m more focused on building models and other ML stuff, not all the administration things, like setting up Kafka, making everything fault tolerant, etc.
In this post, I’ll describe a very basic app, not very different than the one described in the https://spark.apache.org/docs/latest/streaming-programming-guide.html (original code is here: https://github.com/apache/spark/blob/v2.3.1/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala).
However, this post will serve me as a template for future posts. As I mentioned earlier, I use blogdown
to render the output of my posts, so if you are on Linux (preferably Ubuntu), and you have R installed, you should be able to reproduce everything (some values may vary, because of the streaming component which might be dependent on the Spark’s startup time).
In this post, I will be using zstatUtils
package, which can be found on my Github (https://github.com/zzawadz/zstatUtils). It contains a few functions, which are useful, for working with Scala in kntir
and rmarkdown
. To install the package, you can use:
devtools::install_github("zzawadz/zstatUtils")
Setting up the scala engine
Quite recently I wrote a blog post about creating a Scala engine for knitr. It was not economical to include this code in every post, so I moved it into a package. I also realized, that even creating an sbt project to access all necessary JAR files can be integrated into this package. To automate everything I created an sbt
engine, which can download dependencies from maven, and create proper scala
engine. To make it work you need to use make_sbt_engine
function like in the chunk below, and then, create two chunks, one with required plugins (sbt-pack
is required), and the second one is a standard build.sbt
(however, it must contain a line enablePlugins(PackPlugin)
).
library(zstatUtils)
library(knitr)
# ~/spark-streaming-basic-setup - there the project will be created
knitr::knit_engines$set(sbt = make_sbt_engine("~/spark-streaming-basic-setup"))
The first chunk plugins.sbt must contain at least those two lines - // plugins.sbt
is used by sbt
engine to determine that it’s this kind of a file. addSbtPlugin("org.xerial.sbt" % "sbt-pack" % "0.11")
is a sbt
plugin, which allows to easily create a directory with all JARs required by the project. In the future versions, it will be added by default, but now its required.
// plugins.sbt
addSbtPlugin("org.xerial.sbt" % "sbt-pack" % "0.11")
## plugins.sbt created
The second sbt
chunk must contain a line // build.sbt
. It is also used by the sbt
engine to check the kind of the file, the second obligatory line enablePlugins(PackPlugin)
enables sbt-pack
plugin. All other lines are the standard description of sbt
dependencies. Note that executing this chunk for the first time might take a lot of time because sbt
needs to download everything from the remote server. All subsequent runs should be much faster (in fact I cache the content of plugins.sbt
and build.sbt
, so if there’s no change in those chunks, the sbt
won’t be called).
// build.sbt
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.3.0"
enablePlugins(PackPlugin)
## build.sbt created
## Some jars:
## - activation-1.1.1.jar
## - aircompressor-0.8.jar
## - antlr4-runtime-4.7.jar
## - aopalliance-1.0.jar
## - aopalliance-repackaged-2.4.0-b34.jar
## - apacheds-i18n-2.0.0-M15.jar
After the sbt
chunk with build information, the Scala
engine is set up automatically.
println(1 + 1)
println("Hello world!")
## 2
## Hello world!
Prepare some data for the Spark Streaming.
There’s time for the hardest part. In the Spark Streaming Programming Guide (https://spark.apache.org/docs/latest/streaming-programming-guide.html), they wrote, that a Netcat can be used to stream your data into the application. However, if you pipe the file into Netcat, it will send everything in one go. It is not something that anyone would want in a context of a streaming application, where the data should be streamed continuously. Fortunately, there’s a great tool available for bash, called pv
. It allows limiting the amount of data sent per second. Because of the quite linear nature of Rmarkdown
documents, I cannot start the streaming process at the same time as Spark app, so I decided to start the Netcat (nc
) with a very limited transfer, and then run Spark. Some that will be lost, because it will be streamed during Sparks startup, but in this case, it’s not so important.
So let’s prepare some data in R. In every line of a file used for streaming, there will be a few words from “Lorem ipsum dolor sit amet consectetur adipiscing elit”. Then I will start Netcat with pv
using R’s system
function. I tried to use knitr
’s bash
chunk, but it creates a blocking process, and I don’t want to wait to the end of stream (if you want to reproduce this example in the terminal, you can just run pv -L4000 /tmp/lorem.txt | nc -lN localhost 9998
in another window) ;)
set.seed(123)
lorem <- "Lorem ipsum dolor sit amet consectetur adipiscing elit"
lorem <- tolower(lorem)
lorem <- strsplit(lorem, split = " ")[[1]]
lorem <- replicate(1000, paste(sample(lorem, 30, replace = TRUE), collapse = " "))
cat(lorem, file = "/tmp/lorem.txt", sep = "\n")
system("pv -L4000 /tmp/lorem.txt | nc -lN localhost 9998", wait = FALSE)
Now let’s build the application for counting the number of words occurrences in the stream.
I create two DStream
objects. First wordCounts
counts the number of unique words in a given time and the second wordsInLine
is a number of words in the transferred lines (it should always be equal to 30 because each line of /tmp/lorem.txt
contains 30 words).
I know that wordCounts
and wordsInLine
could be defined based on one DStream
, but I kept them separately to make this example more close to the one from the programming guide.
There’s one also one thing, which must be described. I add some commented code containing a Timer
object t
. I used it inside spark-shell
to terminate the streaming process after some time. Without it works continuously. However, in the Rmarkdown
it’s not required.
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
val spark = SparkSession.builder.master("local[*]")
.appName("Simple Application")
.getOrCreate()
val ssc = new StreamingContext(spark.sparkContext, Seconds(2))
val lines = ssc.socketTextStream("localhost", 9998, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
val wordsInLine = lines.map(x => (x.split(" ").size, 1)).reduceByKey(_ + _)
wordCounts.print()
wordsInLine.print()
/*
// for spark-shell
import java.util.Timer
import java.util.TimerTask
val t = new java.util.Timer()
val task = new java.util.TimerTask {
def run() = {
ssc.stop()
t.cancel()
}
}
t.schedule(task, 10000L, 1000L)
*/
ssc.start()
ssc.awaitTerminationOrTimeout(10000L)
## -------------------------------------------
## Time: 1533196712000 ms
## -------------------------------------------
## (consectetur,307)
## (elit,290)
## (adipiscing,317)
## (amet,311)
## (ipsum,317)
## (dolor,300)
## (sit,331)
## (lorem,317)
##
## -------------------------------------------
## Time: 1533196712000 ms
## -------------------------------------------
## (30,83)
##
## -------------------------------------------
## Time: 1533196714000 ms
## -------------------------------------------
## (consectetur,158)
## (elit,135)
## (adipiscing,150)
## (amet,138)
## (ipsum,143)
## (dolor,123)
## (sit,159)
## (lorem,134)
##
## -------------------------------------------
## Time: 1533196714000 ms
## -------------------------------------------
## (30,38)
##
## -------------------------------------------
## Time: 1533196716000 ms
## -------------------------------------------
## (consectetur,149)
## (elit,141)
## (adipiscing,149)
## (amet,146)
## (ipsum,151)
## (dolor,150)
## (sit,141)
## (lorem,143)
##
## -------------------------------------------
## Time: 1533196716000 ms
## -------------------------------------------
## (30,39)
##
## -------------------------------------------
## Time: 1533196718000 ms
## -------------------------------------------
## (consectetur,133)
## (elit,144)
## (adipiscing,147)
## (amet,141)
## (ipsum,132)
## (dolor,159)
## (sit,174)
## (lorem,140)
##
## -------------------------------------------
## Time: 1533196718000 ms
## -------------------------------------------
## (30,39)
##
## -------------------------------------------
## Time: 1533196720000 ms
## -------------------------------------------
## (consectetur,149)
## (elit,139)
## (adipiscing,141)
## (amet,143)
## (ipsum,134)
## (dolor,142)
## (sit,173)
## (lorem,149)
##
## -------------------------------------------
## Time: 1533196720000 ms
## -------------------------------------------
## (30,39)
Summary
In this post, I showed a basic setup for running Spark Streaming application inside a Rmarkdown
document. It will serve as a starting point for other posts on this topic. I think that the most crucial takeaway is the usage of pv
to limit the number of data send in one go to the Netcat, which allows simulating the stream without setting up Kafka
or other more complicated programs.