PySpark Word count Program
What is WORD COUNT:
Word Count reads text files and counts how often words occur. The input is text files and the output is text files, each line of which contains a word and the count of how often it occurred, separated by a tab.
PYSPARK:
PySpark is the python binding for the Spark Platform and API and not much different from the Java/Scala versions. Python is dynamically typed, so RDDs can hold objects of multiple types. PySpark does not yet support a few API calls, such as lookup and non-text input files, though these will be added in future releases.
SPARK:
Spark is an open source big data processing framework built around speed, ease of use, and sophisticated analytics. Spark can be 100x faster than Hadoop for large scale data processing by exploiting in memory computing and other optimizations. It has easy-to-use APIs for operating on large datasets. It comes packaged with higher-level libraries, including support for SQL queries, streaming data, machine learning and graph processing.
PYTHON:
Python is an interpreted, object-oriented, high-level programming language with dynamic semantics. Its high-level built in data structures, combined with dynamic typing and dynamic binding, make it very attractive for Rapid Application Development, as well as for use as a scripting or glue language to connect existing components together.
Create an Spark Application using Python and read a file and count number of times words will occur the file and also ignore all empty lines.
Step-1: Enter into PySpark
( Open a terminal and type a command )
pyspark
Step-2: Create an Sprk Application
( First we import the SparkContext and SparkConf into pyspark )
from pyspark import SparkContext, SparkConf
Step-3: Create Configuration object and set App name
conf = SparkConf().setAppName(“Pyspark Pgm”)
sc = SparkContext(conf = conf)
Step-4: Load data from HDFS
(i). First Create a text file and load the file into HDFS
Here is the Example File:
Save the following into PySpark.txt
PySpark is the python binding for the Spark Platform and API and not much different from the Java/Scala versions. A good starting point is the official page i.e Examples | Apache Spark. Python is dynamically typed, so RDDs can hold objects of multiple types. PySpark does not yet support a few API calls, such as lookup and non-text input files, though these will be added in future releases.
Load the file into HDFS
hdfs dfs -put /home/geouser/Documents/PySpark.txt /user/geouser/
(ii). Next load the PySpark.txt file (from HDFS) to pyspark
contentRDD =sc.textFile(“hdfs://localhost:9000/user/geouser/PySpark.txt”)
Step-5: Filter out non-empty lines from the loaded file (PySpark.txt)
nonempty_lines = contentRDD.filter(lambda x: len(x) > 0)
Step-6: Split the content based on space
words = nonempty_lines.flatMap(lambda x: x.split(‘ ‘))
Step-7: Count the words
wordcount = words.map(lambda x:(x,1)) \
.reduceByKey(lambda x,y: x+y) \
.map(lambda x: (x[1], x[0])).sortByKey(False)
Step-8: View the file after filter
for word in wordcount.collect():
print(word) (Give 4 spaces before the print statement)
Step-9: Save the Final Data to HDFS
wordcount.saveAsTextFile(“hdfs://localhost:9000/user/geouser/Wordcount”)
Description about Commands:
from pyspark import SparkContext, SparkConf
–> The first step of any Spark driver application is to create a SparkContext.
The SparkContext allows your Spark driver application to access the cluster through a resource manager. The resource manager can be YARN, or Spark’s cluster manager.
In order to create a SparkContext you should first create a SparkConf. The SparkConf stores configuration parameters that your Spark driver application will pass to SparkContext.
conf = SparkConf().setAppName(“Pyspark Pgm”)
–> setAppName() gives your Spark driver application a name so you can identify it in the Spark or Yarn UI.
sc = SparkContext(conf = conf)
-> First time configure we use this, suppose if the conf already exist we use
val sc = new SparkContext(conf)
contentRDD =sc.textFile(“hdfs://localhost:9000/user/geouser/PySpark.txt”)
contentRDD =sc.textFile() -> keyword for load a file
“hdfs://localhost:9000/user/geouser/PySpark.txt” –> Path of the HDFS file, which
should be loaded.
nonempty_lines = contentRDD.filter(lambda x: len(x) > 0)
–> Used for omitting empty lines
words = nonempty_lines.flatMap(lambda x: x.split(‘ ‘))
-> Used For Split the Content based on space (‘ ‘)
wordcount = words.map(lambda x:(x,1)) .reduceByKey(lambda x,y: x+y) .map(lambda x: (x[1], x[0])).sortByKey(False)
-> Perform a Map and Reduce Task
for word in wordcount.collect():
-> Used for Collect the Word
print(word)
-> Print the Filtered Words
wordcount.saveAsTextFile(“hdfs://localhost:9000/user/geouser/Wordcount”)
saveAsTextFile() -> Save the Filtered Output
“hdfs://localhost:9000/user/geouser/Wordcount” -> Path of the HDFS file, where should be
loaded.