Search This Blog

Word Count using Cascading

In this Post we learn how to write word count program using Cascading.And we run it in local mode on windows within Eclipse.Cascading is a platform for developing big data applications on Hadoop.It has got many benefits over other mapreduce based tools.Cascading has plumbing terminology (taps,pipes etc ...)  to develop applications.

Assume we have input data in c:\data\in\data.txt

This is a Hadoop input file
Hadoop is a bigdata technology 



1 . Define input details

We define input details like path and schema using source tap.Cascading reads the data from source tap .Here We use FileTap to read data from local file.it takes file path and scheme (schema or columns).

String inputPath = "c:\\data\\in\\data.txt";
Tap srctap = new FileTap( new TextLine( new Fields("line" )) , inputPath );


2. Convert line into wrods.

Now we convert line data into words by applying regex filter. 

This
 is
 a
 Hadoop
 input 
file
Hadoop
 is
 a
 bigdata 
technology 

Regex filter converts line data into words using Space delimiter.We use RegexSplitGenerator function withing Each filter.start is the name of it.

Pipe words=new Each("start",new RegexSplitGenerator("\\s+"));



3. Apply group by

We use groupby pipe class to convert words into groups.

Pipe group=new GroupBy(words);

We apply this GroupBy on our last Pipe words.



4. Calculate Count of words.


Count count=new Count();
Pipe wcount=new Every(group, count);

We use count function of Cascading to generate count of words here. and we are applying it on our last group.

 a 2
bigdata 1 
 Hadoop 2
file 1
 input 1
 is 2 
technology 1 
This 1


5. Declare output path.

Now We have generated count and we have to write that output to a path.

String outputPath = "c:\\data\\out";
Tap sinkTap =new FileTap(  new TextLine( new Fields("word" ,"count")), outputPath, SinkMode.REPLACE );

With in FileTap we have declared columns word and count , output path and sink mode as replace.

we are asking cascading to replace existing data of output path .

6. Set properties.

We set main class using AppProps class.


Properties properties = new Properties();
AppProps.setApplicationJarClass( properties, WordCount.class );


7. Create a flow

We create a flow by connecting source tap,sink tap and last operation we performed.

we use LocalFlowConnector  to run the program in local mode.

LocalFlowConnector flowConnector = new LocalFlowConnector();
Flow flow = flowConnector.connect( "wordcount", srctap, sinkTap, wcount );
flow.complete();

Below are jar files required for running this program in eclipse.

Displaying jars.png

Below is the complete program for the same.

import java.util.Properties;

import cascading.flow.Flow;
import cascading.flow.local.LocalFlowConnector;
import cascading.operation.aggregator.Count;
import cascading.operation.regex.RegexSplitGenerator;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.local.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.local.FileTap;
import cascading.tuple.Fields;

/** * Wordcount example in Cascading */
public class WordCount {
public static void main(String[] args) {
String inputPath = "c:\\data\\in\\data.txt";
String outputPath = "c:\\data\\out";
Tap srctap = new FileTap( new TextLine( new Fields("line" )) , inputPath );
Tap sinkTap =new FileTap(  new TextLine( new Fields("word" ,"count")), outputPath, SinkMode.REPLACE );
Pipe words=new Each("start",new RegexSplitGenerator("\\s+"));
Pipe group=new GroupBy(words);
Count count=new Count();
Pipe wcount=new Every(group, count);
Properties properties = new Properties();


AppProps.setApplicationJarClass( properties, WordCount.class );
LocalFlowConnector flowConnector = new LocalFlowConnector();


Flow flow = flowConnector.connect( "wordcount", srctap, sinkTap, wcount );
flow.complete();

}
}