How can we detect faults in a stream of sensor data being transmitted from an embedded device mounted for measuring temperature variations in an energy plant? Videos are becoming popular more than single text posts in social media like twitter and how can we perform a market analysis using these streamed data. It is apparent that, amount of streaming data, we use is increasing, and Apache Spark Streaming, an extension on Spark core, comes as a promising utility for processing large-scale streaming data. In this chapter, we will look into basics involved in this and look into an example of log analyser with Spark.
Analysis of Stream Data
Three main steps are included in the pipeline of stream data processing:
Before moving into the SparkStreaming API details, let us see about its dependency linking and StamingContextAPI which is the entry point to any application with SparkStreaming.
Linking Spark Streaming
Both Spark and Spark Streaming can be imported from the Maven Repository. Before writing any Spark Streaming application, dependencies should be configured in Maven project as below.
Or, for sbt as below.
libraryDependencies += “org.apache.spark” % “spark-streaming_2.12” % “1.3.1” If the application will be reading input from external source like Kafka, twitter, etc. the relevant libraries to handle the data receipt and buffering, should be added accordingly.
Streaming Context Object
The first step in any Spark Streaming application is to initialize a StreamingContext object from the SparkConf object.
Once initialized, the normal flow of the application will be:
1. Create input DStreams to define sources of input.
2. Apply transformations and output operations to DStream to define the necessary computations.
3. Receive data and process them by calling start() method on the initialized, StreamingContext object.
4. Wait until the processing ends either due to an error or by calling the function stop() on initialized StreamingContext object. This waiting is done by calling the function, awaitTermination on the same object.
StreamingContext object is initialized as below:
import org.apache.spark._
import org.apache.spark.streaming._
val configurations = new
SparkConf().setAppName(applicationName).setMaster(masterURL)
val streamingContextObject= new StreamingContext(configurations, Seconds(2))
The second argument for creating the StreamingContext object is the batch interval (about which we will see the details in next section)
This internally generates a SparkContext (referred from streaming ContextObject, spark Contest ),which initializes all Spark functionalities.
SparkStreaming for Scalable, Fault Tolerant, Efficient Stream Data Processing
Apache spark comes with high level abstractions for reading streamed data from various sources, semantics to maintain the fault tolerance in data processing and finally support for integrating results with various data storages. Internally it can be illustrated as below.
Spark processing pipeline, includes two main components as SparkStreaming and Spark Engine. Receivers objects in SparkStreaming divide the data streams into input data batches of small time frames and Spark Engine processes these batches send the results to required data storages. We will dig into main components in SparkStreaming model
and the application development and deployment.
First, let us look into above components in detail and the fundamentals of big streaming data analysis application with SparkStreaming.
DStreams: Distretized Streams
Distretized Streams shortened as Dstreams are the core representation of a stream of data in SparkStreaming. A DStream can refer to either the stream of input data or the results generated after processing.
A DStream is nothing but an infinite sequence of RDDs providing an abstraction to handle large stream data. A given RDD in a DStream, holds the data for a given time step in the continuous stream. DStream API is similar to the RDD API we discussed in chapter 1. Any function called upon a DStream will be converted to a transformation on its RDDs. This transformation is handled by the SparkEngine. With this design, DStream API provides a convenient high level abstraction for the developer.
Let us further see details on input data streams, transformations and output operations.
Input DStreams, Receivers and Streaming Resources
DStreams objects which hold information on received stream of data are termed as Input DStreams Associated with each Input DStream except streams from files, there is an object which handles buffering the data into Spark memory, chopping them into batches of data for processing.
An input source to SparkStreaming can be of two forms:
1. Basic Source
2. Advanced Source
Basic Sources are the input sources which can be directly accessed using SparkContext API. Examples are sockets, file systems and AKKA actors. Accessing Advanced Sources, require using additional supporting utilities or dependencies which should be explicitly linked with the application. Examples are Twitter, Kaffka and Flume, etc.
Users can use multiple forms of above inputs by creating multiple input DStreams. Multiple receivers will be generated in this case, which will simultaneously handle the multiple inputs.
Spark analysis pipeline utilizes, one core of the cores allocated for SparkStreaming for its execution. Therefore, Special note should be taken that, SparkStreaming application should have enough cores allocated to it for both data processing and running the receivers.
Basic Sources
StreamingContext API comes with methods for handling these sources. They are file streams, Akka Actors and socket connections.
Let’s consider filestreams first. A DStream object to read input from an HDFS API compatible file system such as HDFS, NFS, S3, etc. can be instantiated as below.
var dStreamobj = sstreamingContextObj.filestream[keyClass, valueClass, InputFormatClass](inputDataDirectory)
Above tells the Spark Streaming to monitor the input DataDirectory and process as files are being created in it. It should be noted that, files within nested directories cannot be monitored, file should be in same format. Renaming or moving the files to input DataDirectory should be an atomic operation and any modification done after moving the file, would not take effect as files are read continuously.
Alternative method to access text files is text FileStream which can be called simply as below:
var dStreamTxtobj = streamingContextObj.textFilestream(inputDataDirectory)
It is not necessary to allocate additional cores for receivers since filstreams don’t need any receivers.
Connection to a stream of data from a socket is done as below:
var dStreamobj = streamingContextObj.socketTextStream(“host_name”,port)
The method actorStream(actorProps, actor-name) can be used to create aDStream object , receiving data from custom actors such as Akka actors. Actors are supported in Java and Scala only. Hence this method is not available for Python.
There is also an option available for reading a queue of RDDs into a DStream object, which can be useful to perform tests on an application with various test data. This can be done as below:
var testDStreamobj = streamingContextObj.queuestream(RDDqueue)
As RDDs are being pushed to the queue continuously, it will be received and processed similar to a stream.
Those are the categories which fall under, basic sources, supported by SparkContextAPI.
Advanced Sources
Sources sending input data from, sources not supported by Spark core and should be interfaced with external APIs are termed as advanced sources. Since they are not part of Spark and the interfaces may vary with new releases, handling the dependencies and instantiating the DStreams are to be done through external supporting library, which can be linked and imported into the application as needed. Examples are sources from Kafka, Flume or Twitter. In these cases, the necessary libraries should be linked as dependencies, DStream object should be instantiated importing necessary utilities and the dependency libraries should be bundled in the final uber jar when deploying. Still, these applications cannot be executed on Spark Shell and hence we cannot test them on Shell.
In addition to above main two sources, we can also work with DStreams to read data from custom inputs with custom receivers being implemented to read data from input and forward it to Spark.
Reliable and Unreliable Receivers
Depending on their reliability, data sources can be of two kinds. Some data sources (for an example Flume and Kafka) supports acknowledging the data receipt. If the correct data receipt can be acknowledged by the receiver, it ensures that, all the data will be received without any loss. Depending on receiver’s ability to send the acknowledgement they are of two kinds as reliable receivers and unreliable receivers. Reliable receiver can acknowledge properly to the source, once the data is received and buffered in Spark application. Unreliable receiver will not acknowledge the data receipt. In case you don’t want to mess with the complexity involved in acknowledgement or are working with an unreliable source which does not support the acknowledgement, an unreliable receiver could be used.
DStream Transformations
Data stored in DStream object could be modified through transformations. Both RDD and DStreams share many similar transformations. In this section, we will see details on quite frequently used set of transformations.
Map(function): This transformation, calls the given function upon source DStream and returns a DStream
FlatMap(function): Functions similar to above, but can map each item in the input DStream to zero or more outputs.
Reduce(function): Aggregates elements in source DStream based on an associative function computable in parallel and returns a new DStream holding an RDD with the result.
Repartition (Number Of Partitions): Decreases or increases the number of partitions, hence changing the paralleled operations by DStream object.
Union (Other DStream): Returns a DStream containing elements from both source and other DStreams (taking a union).
Filter(function): Filters items from the DStream, based on the function passed as argument and returns a DStream with filtered data.
Join(other DStream, [number Of Tasks]): From the source DStream and other DStream of the form (key,value1), (key, value2), returns a DStream object in the form of (key,(value1,value2)) .
Transform(function): Applies the function in the form of RDD->RDD , to each element of the source DStream. Any RDD operation can be performed DStream using this transformation.
Count(): Returns a DStream of one RDD holding the count of the elements in source DStream.
From the input DStreams, let us now see the operations on output DStreams:
Output DStreams
In above, we saw input operations on DStreams. Similarly, DStreams’ output operations can be used for forwarding generated results into data storages, such as file systems or databases. The real execution of all the transformation on DStreams will be initiated by output operations’ call, since it is the time when the results will be read from an external source. Details of output operations are as below:
For each RDD (function): This operation executes the given function on each RDD being forwarding the results data to the external storage. For an example the function passed, could be either writing to a database through a network or a save to a file, etc.
Print(): Can be used for debugging purposes and development to show the first ten items in every set of data in a DStream on the main application.
The calling DStream’s content will be saved in to text files. Name of the text file generated at each time interval will be in the form of “prefix-time_in_milli_seconds [suffix]”.
Checkout Apache Spark Interview Questions
With the tools in hand, let’s build an application to collect data from twitter data stream:
This example application demonstrates how we can read from stream of data from Twitter and process and write the collected tweets into a file. The collected tweets will be written in JSON(JavaScript Object Notation) format.
Below is the implementation in Scala:
Package com.spark.examples.twitterdata
/**
* Collect the given number of tweets and write into a text file.
* The twee
*/
object TweetCollector {
private var numofPartitions = 0
private var collectedTweetsCount = 0L
private var gson = new Gson()
def main(args: Array[String]) {
// Raed from application arguments and assign them accordingly
if (args.length < 3) {
System.err.println(“Arguments to ” + this.getClass.getSimpleName +
“should be in the form of
”)
System.exit(1)
}
val Array(resultsDirectory, Utils.IntParam(numOfTweetsToRead),
Utils.IntParam(timeIntervalInSecs), Utils.IntParam(outPutFileCountInEachInterval)) =
Utils.parseCommandLineWithTwitterCredentials(args)
val outputDirectory = new File(resultsDirectory.toString)
if (outputDirectory.exists()) {
System.err.println(“ERROR – %s Directory exists already,Provide another directory
or delete the existing”.format(
resultsDirectory))
System.exit(1)
}
outputDirectory.mkdirs()
val configuration = new SparkConf().setAppName(this.getClass.getSimpleName)
val streamingContextObject = new StreamingContext(configuration,
Seconds(timeIntervalInSecs))
val tweetsDataStream = TwitterUtils.createStream(streamingContextObject,
Utils.getAuth)
.map(gson.toJson(_))
tweetsDataStream.foreachRDD((rddElement, time) => {
val inputsCount = rddElement.inputsCount()
if (inputsCount > 0) {
val resutlRDD = rddElement.repartition(outPutFileCountInEachInterval)
resutlRDD.saveAsTextFile(resultsDirectory + “/tweetsRead_” +
time.milliseconds.toString)
collectedTweetsCount += inputsCount
if (collectedTweetsCount > numOfTweetsToRead) {
System.exit(0)
}
}
})
streamingContextObject.start()
streamingContextObject.awaitTermination()
}
}
Hadoop Adminstartion | MapReduce |
Big Data On AWS | Informatica Big Data Integration |
Bigdata Greenplum DBA | Informatica Big Data Edition |
Hadoop Hive | Impala |
Hadoop Testing | Apache Mahout |
Are you looking to get trained on Apache Spark, we have the right course designed according to your needs. Our expert trainers help you gain the essential knowledge required for the latest industry needs. Join our Apache Spark Certification Training program from your nearest city.
Learn Apache Spark Online Training Bangalore
These courses are equipped with Live Instructor-Led Training, Industry Use cases, and hands-on live projects. Additionally, you get access to Free Mock Interviews, Job and Certification Assistance by Certified Apache Spark Trainer
Our work-support plans provide precise options as per your project tasks. Whether you are a newbie or an experienced professional seeking assistance in completing project tasks, we are here with the following plans to meet your custom needs:
Name | Dates | |
---|---|---|
Apache Spark Training | Nov 19 to Dec 04 | View Details |
Apache Spark Training | Nov 23 to Dec 08 | View Details |
Apache Spark Training | Nov 26 to Dec 11 | View Details |
Apache Spark Training | Nov 30 to Dec 15 | View Details |
Ravindra Savaram is a Technical Lead at Mindmajix.com. His passion lies in writing articles on the most popular IT platforms including Machine learning, DevOps, Data Science, Artificial Intelligence, RPA, Deep Learning, and so on. You can stay up to date on all these technologies by following him on LinkedIn and Twitter.