In: Computer Science
Develop code in a Scala Maven project to monitor a folder in HDFS in real time such that any new file in the folder will be processed. For each RDD in the stream, the following subtasks are performed concurrently:
(a) Count the word frequency and save the output in HDFS.Note, for each word, make sure space (" "), comma (","), semicolon (";"), colon (":"), period ("."), apostrophe (“’”), quotation marks (“””), exclamation (“!”), question mark (“?”), and brackets ("[", “{”, “(”, “<”,"]", “)”, “}”,”>” ) are trimmed.
(b) Filter out the short words (i.e., < 5 characters) and save the output in HDFS.
(c) Count the co-occurrence of words in each RDD where the context is the same line; and save the output in HDFS.
Answer : Given data
* For each RDD in the stream, the following subtasks are performed concurrently:
* Introduction:
* Resilient Distributed Dataset (RDD), the distributed collection of items is the Spark’s primary abstraction.
* RDDs can be created from Hadoop InputFormats (like HDFS files) or by transforming other RDDs.
Step1: Initial Configuration:
· To read data from Hadoop’s HDFS, add a dependency on hadoop-client for your version of HDFS:
libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "<your-hdfs-version>"
· Open Scala IDE. From the menu, choose “File-> New -> Project -> Maven” by selecting Create a simple project. Use default location for Workspace.
· Click Next to POM setting page.
· Enter Group Id = ca.sparkera.spark and Artifact Id = WordCount.
· Click Finish.
· Open pom.xml file in working area and replace using code given below. Save the file. Jar files are automatically downloaded and work space is built.
· Right click on the project -> configure - > Add Scala Nature.
· Right click on project- > Properties -> Scala Compiler -> Use Project Settings ->Scala Installation Latest 2.10 bundle (dynamic) to update Scala compiler version.
· Right click on Refactor -> Rename to refactor source folder src/main/java to src/main/scala . Create a package under it say, ca.sparkera.spark.
· Right click on the package -> New -> Scala Object to create a Scala object under above created package say, WordCount.scala and give WordCount as object name.
· Paste below code for WordCount.scala
· Create a text file and a a directory/folder in HDFS, where text file is to be stored. “$ hdfs dfs -mkdir /spark” ( command)
· Upload the text file on HDFS in the specific folder. $ hdfs dfs -put /home/sparkdata.txt /spark
Step 2: Code
package ca.sparkera.spark
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object WordCount
{
def main(args: Array[String])
{
if (args.length < 1) {
System.err.println("Usage: <file>")
System.exit(1)
} .
val conf = new SparkConf()
conf.setAppName("SparkWordCount").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.textFile(args(0))
rdd.flatMap(_.split(" ")).
map((_, 1)).
reduceByKey(_ + _).
map(x => (x._2, x._1)).
sortByKey(false).
map(x => (x._2, x._1)).
saveAsTextFile("SparkWordCountResult")
sc.stop
}
}
Step 3: Run the Project
Right click on WordCount.scala - > Run as -> Run Configurations -> Arguments to run the application. Add input file path as /Users/will/Downloads/testdata/. Check the output files containing word count result from proper place where you can find from console output.
Step 4: Output:
s -l /Users/will/workspace/WordCount/SparkWordCountResult
total 1248
-rw-r--r-- 1 will staff 0 17 Oct 23:10 _SUCCESS
-rw-r--r-- 1 will staff 42669 17 Oct 23:10 part-00000
-rw-r--r-- 1 will staff 15600 17 Oct 23:10 part-00001
-rw-r--r-- 1 will staff 129997 17 Oct 23:10 part-00002
-rw-r--r-- 1 will staff 443519 17 Oct 23:10 part-00003