Showing posts with label java. Show all posts
Showing posts with label java. Show all posts

Thursday, August 22, 2013

Degrees of Separation from Kevin Bacon using Cascading


Motivation


This post came about as a result of two events. First, I finished reading Paco Nathan's "Enterprise Data Workflows with Cascading" book (see my review on Amazon), and second, I started learning about the Enterprise Control Language (ECL) on the Lexis-Nexis High Performance Computing Cluster (HPCC). ECL is a bit like Pig which is a bit like Cascading, and one of the examples in the ECL tutorial was the Kevin Bacon Six Degrees of Separation problem. So I decided to try to build the example with Cascading, both as a way to get some experience with the Cascading API and as a comparison with the ECL solution.

Data Loading


The input to the problem is a set of (actor, movie) tuples from the IMDB database. This document (PDF) on the HPCC site contains links to FTP sites from which you can download the actors and actresses files, from which you can derive the (actor, movie) tuples using the following Java code.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// Source: src/main/java/com/mycompany/kevinbacon/load/Imdb2Csv.java
package com.mycompany.kevinbacon.load;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.PrintWriter;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.util.StringUtils;

public class Imdb2Csv {
  
  public static void main(String[] args) {
    
    try {
      Pattern actorPattern = Pattern.compile(
        "(.*?)\\s\\(.*?\\)"); 
      Pattern moviePattern = 
        Pattern.compile("(.*?)\\s\\(\\d{4}\\).*$");
      String[] inputs = new String[] {
        "data/landing/actors.list",
        "data/landing/actresses.list"
      };
      PrintWriter output = new PrintWriter(
        new FileWriter("data/input/actor-movie.csv"), true);
      for (String input : inputs) {
        boolean header = true;
        boolean data = false;
        boolean footer = false;
        String actor = null;
        String movie = null;
        BufferedReader reader = new BufferedReader(
          new FileReader(new File(input)));
        String line = null;
        while ((line = reader.readLine()) != null) {
          // loop through lines until we hit this pattern
          // Name\tTitles
          // ----\t-------
          if (line.startsWith("----\t")) header = false;
          // skip the footer, it occurs after a long 40 dash
          // or so standalone line (pattern below works if
          // you are already in the data area).
          if (data && line.startsWith("--------------------")) 
            footer = true;
          if (! header && ! footer) {
            data = true;
            if (line.trim().length() > 0 && ! line.startsWith("----\t")) {
              String[] cols = line.replaceAll("\t+", "\t").split("\t");
              if (! line.startsWith("\t")) {
                Matcher ma = actorPattern.matcher(cols[0]);
                if (ma.matches()) actor = ma.group(1);
                Matcher mm = moviePattern.matcher(cols[1]);
                if (mm.matches()) movie = mm.group(1);
              } else {
                Matcher mm = moviePattern.matcher(cols[1]);
                if (mm.matches()) movie = mm.group(1);
              }
              // if line contains non-ascii chars, skip this line
              // the reasoning is that this is perhaps non-English
              // movie which we don't care about.
              if (isNonAscii(actor) || isNonAscii(movie)) continue;
              if (actor != null && movie != null)
                output.println(dequote(actor) + "\t" + dequote(movie));
            }
          }
        }
        reader.close();
      }
      output.flush();
      output.close();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
  
  private static boolean isNonAscii(String s) {
    if (s == null) return true;
    char[] cs = s.toCharArray();
    for (int i = 0; i < cs.length; i++) {
      if (cs[i] > 127) return true;
    }
    return false;
  }

  private static String dequote(String s) {
    String st = s.trim();
    if (st.startsWith("\"") && st.endsWith("\"")) 
      return st.substring(1, st.length()-1);
    else return st;
  }
}

For this project, I decided to use Gradle, a relatively new (at least to me) build system that uses Groovy as the underlying language (as opposed to XML for Maven and Ant, or Scala for SBT). The decision was in part driven by the fact that Cascading devs use Gradle, and consequently their solutions for build level problems are also usually Gradle based, so I figured this would make my learning curve easier. In reality, I ended up spending quite some time wrestling with Gradle, but I think I now know enough about Gradle to get by. In any case, to run the above code using Gradle, you would need to put your IMDB files under data/landing and run the following command to get the (actor, movie) tuples file in data/inputs/actor-movie.csv.

1
2
sujit@cyclone:cascading-kevinbacon$ gradle run \
    -DmainClass=com.mycompany.kevinbacon.load.Imdb2Csv

Gradle Build File


Here is my Gradle build file, adapted from examples in the Cascading book, the Cascading for the Impatient series, and a lot of Googling. I can use this to build my Eclipse .classpath file, compile, run JUnit tests, and build a fat JAR that can be used both locally as well as on Amazon's Elastic Map Reduce (EMR) platform. It is included here as an example of a fully functioning build file (at least for my purposes).

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// Source: build.gradle
apply plugin: "java"
apply plugin: "idea"
apply plugin: "eclipse"
apply plugin: "application"

mainClassName = System.getProperty("mainClass")

archivesBaseName = "kevinbacon"

repositories {
  mavenLocal()
  mavenCentral()
  mavenRepo name: "conjars", url: "http://conjars.org/repo/"
}

configurations {
  provided
  compile.extendsFrom provided
}

ext.cascadingVer = "2.1.6"

dependencies {
  compile(group: "cascading", name: "cascading-core", version: cascadingVer)
  compile(group: "cascading", name: "cascading-local", version: cascadingVer)
  compile(group: "cascading", name: "cascading-hadoop", version: cascadingVer)
  provided(group: "org.apache.hadoop", name: "hadoop-core", version: "1.0.3")
  testCompile(group: "cascading", name: "cascading-platform", 
              version: cascadingVer)
  testCompile("org.apache.hadoop:hadoop-test:1.0.3")
  testCompile("cascading:cascading-test:2.0.8")
  testCompile("junit:junit:4.8.+")
  testCompile("org.slf4j:slf4j-api:1.7.2")
  testCompile("commons-io:commons-io:2.1")
  testRuntime("org.slf4j:slf4j-log4j12:1.7.2")
  testRuntime("log4j:log4j:1.2.16")
}

test {
  testLogging.showStandardStreams = true
  beforeTest {
    descriptor -> logger.lifecycle("Running test: " + descriptor)
  }
  onOutput {
    descriptor, event ->
      logger.lifecycle("Test " + descriptor + " produced error: " + 
        event.message)
  }
}

jar {
  description = "Assembles a JAR file"
  from {
    (configurations.runtime - configurations.provided).collect {
      it.isDirectory() ? it : zipTree(it)
    }
  }
  {
    exclude "META-INF/*.SF"
    exclude "META-INF/*.DSA"
    exclude "META-INF/*.RSA"
  }
  manifest {
    attributes("Main-Class": "com.mycompany.kevinbacon.flow.Main")
  }
}

The Cascading Flow



The DOT file generated by the Cascading flow planner is shown on left. Essentially, the job consists of 7 iterations - at each iteration, the input is the (actor, movie) tuple collection and the Kevin Bacon costars from the previous degree of separation. For example, at the first iteration, we are looking for direct costars of Kevin Bacon (0 degrees of separation), so our costars are the tuple containing Kevin Bacon. We join against the (actor, movie) tuple to find all movies where Kevin Bacon worked in, then join the (actor, movie) tuple set against the (movie) tuple set to find the list of Kevin Bacon costars. We also annotate each costar with the current degree of separation. In the next iteration, this set of costar tuples are used to find the costars of the costars, and so on. Finally, the costar tuples from all 7 iterations are merged and grouped so the minimum degree of separation is stored against each costar. We then group this tuple on the "Bacon number" to find a count of costars at each degree of separation.

The code below represents the Cascading flow depicted in the diagrams above. It translates to 18 Hadoop MapReduce jobs.

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Source: src/main/java/com/mycompany/kevinbacon/flow/Main.java
package com.mycompany.kevinbacon.flow;

import java.util.Properties;

import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.FlowDef;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.operation.Filter;
import cascading.operation.Identity;
import cascading.operation.aggregator.Min;
import cascading.operation.expression.ExpressionFilter;
import cascading.operation.filter.Not;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Merge;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.CountBy;
import cascading.pipe.assembly.Rename;
import cascading.pipe.assembly.Retain;
import cascading.pipe.assembly.Unique;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextDelimited;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;

public class Main {
  
  public static void main(String[] args) {

    String input = args[0];
    String detailOutput = args[1];
    String summaryOutput = args[2];
    
    Tap<?,?,?> tin = new Hfs(new TextDelimited(
      Constants.inputFields, false, false, "\t"), input);
    Tap<?,?,?> toutDetail = new Hfs(new TextDelimited(
      Constants.detailFields, false, false, "\t"), detailOutput);
    Tap<?,?,?> toutSummary = new Hfs(new TextDelimited(
      Constants.summaryFields, false, false, "\t"), summaryOutput);
    
    Pipe allPairs = new Pipe("allPairs");
    
    // create a pipe with only Kevin Bacon
    Pipe kevinBacon = new Pipe("kevinBacon", allPairs);
    Filter<?> kevinBaconFilter = new ExpressionFilter(
      "! actor.equals(\"Bacon, Kevin\")", String.class);
    kevinBacon = new Each(kevinBacon, kevinBaconFilter);
    kevinBacon = new Retain(kevinBacon, Constants.actorField);
    kevinBacon = new Unique(kevinBacon, Constants.actorField);
    
    // At each degree of separation, find the costars of 
    // actors in the actor pipe (second arg to FindCostars)
    // by joining on actor to find movies, then joining on
    // movie to find costars.
    Pipe kevinBaconCostars0 = new FindCostars(
      allPairs, kevinBacon, 0);
      
    Pipe kevinBaconCostars1 = new FindCostars(
      allPairs, kevinBaconCostars0, 1);
      
    Pipe kevinBaconCostars2 = new FindCostars(
      allPairs, kevinBaconCostars1, 2);
      
    Pipe kevinBaconCostars3 = new FindCostars(
      allPairs, kevinBaconCostars2, 3);
      
    Pipe kevinBaconCostars4 = new FindCostars(
      allPairs, kevinBaconCostars3, 4);
      
    Pipe kevinBaconCostars5 = new FindCostars(
      allPairs, kevinBaconCostars4, 5);
      
    Pipe kevinBaconCostars6 = new FindCostars(
      allPairs, kevinBaconCostars5, 6);

    // merge pipes together, then filter out Kevin Bacon, 
    // group by actors and choose the minimum Bacon number
    // for each actor, and finally rename the min column to
    // count.
    Pipe merged = new Merge("merged", Pipe.pipes(
      kevinBaconCostars0, kevinBaconCostars1, kevinBaconCostars3,
      kevinBaconCostars4, kevinBaconCostars5, kevinBaconCostars6));
    merged = new Each(merged, new Not(kevinBaconFilter));
    merged = new GroupBy(merged, Constants.actorField);
    merged = new Every(merged, Constants.kbnumField, new Min());
    merged = new Rename(merged, new Fields("min"), Constants.kbnumField);
    
    // split the merged pipe into detail and summary pipes.
    // This is needed to avoid "duplicate pipe" errors from
    // Cascading when trying to set two tail sinks. The 
    // merged pipe already contains the information needed
    // for the detail pipe, and the summary pipe needs a bit
    // more processing, detailed in the comment block below.
    Pipe details = new Pipe("details", merged);
    details = new Each(details, new Identity());
    
    // generate summary stats - retain only the bacon number
    // column and group by it and count the number of costars
    // in each bacon number group.
    Pipe summary = new Pipe("summary", merged);
    summary = new Retain(summary, Constants.kbnumField);
    summary = new CountBy(summary, 
      Constants.kbnumField, Constants.countField);
    
    FlowDef fd = FlowDef.flowDef().
      addSource(allPairs, tin).
      addTailSink(details, toutDetail).
      addTailSink(summary, toutSummary);

    Properties props = new Properties();
    AppProps.setApplicationJarClass(props, Main.class);
    FlowConnector fc = new HadoopFlowConnector(props);
    
    Flow<?> flow = fc.connect(fd);
    flow.writeDOT("data/kevinbacon.dot");
    flow.writeStepsDOT("data/kevinbacon-steps.dot");
    flow.complete();
  }
}

We have broken off the work to find costars at each degree of separation into a sub-assembly FindCostars, which is also shown below:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// Source: src/main/java/com/mycompany/kevinbacon/flow/FindCostars.java
package com.mycompany.kevinbacon.flow;

import cascading.operation.Insert;
import cascading.pipe.Each;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.pipe.assembly.Retain;
import cascading.pipe.assembly.Unique;
import cascading.pipe.joiner.InnerJoin;
import cascading.tuple.Fields;

/**
 * This subassembly returns a pipe containing costars at 
 * the next degree of separation. Following functionality
 * is implemented.
 * 
 * (1) Join with original pipe of (actor, movie) tuples and
 *     the pipe containing actors found in previous step
 *     to find all movies acted in by the actors.
 * (2) Dedup the movies pipe.
 * (3) Join with original pipe of (actor, movie) tuples and
 *     the movies pipe to find all costars of the actors.
 * (4) Dedup the actors pipe.
 * (5) Add a new column with the current Kevin Bacon number
 *     (degree of separation).
 */
public class FindCostars extends SubAssembly {

  private static final long serialVersionUID = 3450219986636439710L;

  private Fields movieResultFields = 
    new Fields("actor", "movie", "actor1");
  private Fields actorResultFields = 
    new Fields("actor", "movie", "movie1");

  public FindCostars(Pipe allPairs, Pipe actors, int kbNumber) {
    // join with original pipe on actor to produce pipe of
    // all movies acted on by the actors in pipe actor
    actors = new Retain(actors, Constants.actorField);
    Pipe movies = new HashJoin(
      allPairs, Constants.actorField, 
      actors, Constants.actorField,
      movieResultFields, new InnerJoin());
    movies = new Retain(movies, Constants.movieField);
    movies = new Unique(movies, Constants.movieField);
    // now find all the actors for these movies, these
    // will be the costars for the incoming actors in 
    // actorPipe. Finally insert the Bacon number for
    // costars at this degree of separation.
    Pipe costars = new HashJoin(
      allPairs, Constants.movieField, 
      movies, Constants.movieField, 
      actorResultFields, new InnerJoin());
    costars = new Retain(costars, Constants.actorField);
    costars = new Unique(costars, Constants.actorField);
    Insert insfun = new Insert(Constants.kbnumField, kbNumber);
    costars = new Each(costars, insfun, Constants.detailFields);
    setTails(costars);
  }
}

Unit Testing


The Cascading user guide recommends that all sub-assemblies, flows, operations, etc should be unit tested. Unit tests should extend the PlatformTestCase class. Tuples participating in test cases are populated via test files. Here is the JUnit test for the FindCostars subassembly. In addition, since I was unfamiliar with some operations in the Cascading API, I built another test case to help me experiment with various strategies (not included here, its on the GitHub site).

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// Source: src/test/java/com/mycompany/kevinbacon/flow/FindCostarsTest.java
package com.mycompany.kevinbacon.flow;

import java.io.File;

import org.apache.commons.io.FileUtils;
import org.junit.Before;
import org.junit.Test;

import cascading.PlatformTestCase;
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.pipe.Pipe;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.test.LocalPlatform;
import cascading.test.PlatformRunner.Platform;

@Platform(LocalPlatform.class)
public class FindCostarsTest extends PlatformTestCase {
  
  private static final long serialVersionUID = 8950872097793074273L;

  @Before
  public void setup() throws Exception {
    File output = new File("src/test/data/output");
    FileUtils.deleteDirectory(output);
  }
  
  @Test
  public void testFindCostars() throws Exception {
    String allPairsFilename = "src/test/data/find_costars_allpairs.csv";
    String actorsFilename = "src/test/data/find_costars_actors.csv";
    String costarFilename = "src/test/data/output/costars.csv";
    
    getPlatform().copyFromLocal(allPairsFilename);
    getPlatform().copyFromLocal(actorsFilename);
    
    Tap<?,?,?> tapAllPairs = getPlatform().getDelimitedFile(
      Constants.inputFields, "\t", allPairsFilename, SinkMode.KEEP);
    Tap<?,?,?> tapActors = getPlatform().getDelimitedFile(
      Constants.detailFields, "\t", actorsFilename, SinkMode.KEEP);
    Tap<?,?,?> tapCostars = getPlatform().getDelimitedFile(
      Constants.detailFields, "\t", costarFilename, SinkMode.REPLACE);
    
    Pipe allPairs = new Pipe("allPairs");
    Pipe actors = new Pipe("actors");
    Pipe costars = new FindCostars(allPairs, actors, 2);
    
    FlowDef flowDef = FlowDef.flowDef().
      addSource(allPairs, tapAllPairs).
      addSource(actors, tapActors).
      addTailSink(costars, tapCostars);
    
    Flow<?> flow = getPlatform().getFlowConnector().connect(flowDef);
    flow.complete();
    validateLength(flow, 7);
  }
}

To run it, we can use the gradle test task, like below:

1
2
sujit@cyclone:cascading-kevinbacon$ gradle test \
    -Dtest.single=path.to.test.class

Running on Amazon EMR


Before running on Amazon EMR, I made sure that the job ran correctly (with a tiny input file I hand-built) against my local Hadoop instance.

1
2
3
4
5
6
7
sujit@cyclone:cascading-kevinbacon$ gradle clean jar
sujit@cyclone:cascading-kevinbacon$ rm -rf data/output 
sujit@cyclone:cascading-kevinbacon$ /opt/hadoop-1.2.1/bin/hadoop \
    jar build/libs/kevinbacon.jar \
    src/test/data/test.csv \
    data/output/detail \
    data/output/summary

I then used the EMR browser interface to start the job. The input file and fat JAR first needed to be uploaded into S3, then a new job flow had to be created in EMR - the prompts are pretty easy to understand. My input file had 9438301 (actor, movie) tuples. It took 58 minutes on a 1+4 node m1.medium cluster to produce the following summary:

1
2
3
0    43405
1    208829
3    635

Conclusion


Building this project was a lot of fun. I ended up learning enough about the Cascading API to be reasonably confident about being able to implement real-world applications in future. Also, the Cascading code clearly came out ahead in terms of readability compared to the ECL example code - although that may be at least partly because I am more proficient in Java than ECL.

In any case, hope you found this interesting as well. If you would like to replicate this or improve upon it, you can find the code on my GitHub page for this project.

Wednesday, April 03, 2013

A Newspaper Clipping Service with Cascading


This post describes a possible implementation for an automated Newspaper Clipping Service. The end-user is a researcher (or team of researchers) in a particular discipline who registers an interest in a set of topics (or web-pages). An assistant (or team of assistants) then scour information sources to find more documents of interest to the researcher based on these topics identified. In this particular case, the information sources were limited to a set of "approved" newspapers, hence the name "Newspaper Clipping Service". The goal is to replace the assistants with an automated system.

The solution I came up with was to analyze the original web pages and treat keywords extracted out of these pages as topics, then for each keyword, query a popular search engine and gather the top 10 results from each query. The search engine can be customized so the sites it looks at is restricted by the list of approved newspapers. Finally the URLs of the results are aggregated together, and only URLs which were returned by more than 1 keyword topic are given back to the user.

The entire flow can be thought of as a series of Hadoop Map-Reduce jobs, to first download, extract and count keywords from (web pages corresponding to) URLs, and then to extract and count search result URLs from the keywords. I've been wanting to play with Cascading for a while, and this seemed like a good candidate, so the solution is implemented with Cascading.

I have used Scalding in the past, but it seems to me that while Scalding's collection-like interface is easier to work with, its harder to extend. So even though I think I could have done this in Scalding without any problems, the objective was to learn Cascading, so I used that instead. I initially started using Cascading with Scala (I write enough Java code at work :-)), but Cascading's use of generics (at least some of it) is too complicated for Scala's type inference system, so I fell back to using Java instead*.

One can write Cascading code in local mode, which uses in-memory data structures and the local filesystem, or in hadoop mode, which uses Hadoop and HDFS. Since this was a learning exercise, I decided to use local mode. To move it to Hadoop, one would have to use Hadoop specific FlowControllers and Taps instead. Here is the code for the Main (callable) class. The entire Maven project is available on my GitHub page.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// Source: src/main/java/com/mycompany/newsclip/Main.java
package com.mycompany.newsclip;

import java.util.Properties;

import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.FlowDef;
import cascading.flow.local.LocalFlowConnector;
import cascading.operation.Aggregator;
import cascading.operation.Filter;
import cascading.operation.Function;
import cascading.operation.Identity;
import cascading.operation.aggregator.Count;
import cascading.operation.expression.ExpressionFilter;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.local.TextLine;
import cascading.tap.Tap;
import cascading.tap.local.FileTap;
import cascading.tuple.Fields;

public class Main {

  @SuppressWarnings("rawtypes")
  public static void main(String[] args) {
    // handle input parameters
    String input = args[0];
    String output = args[1];

    Fields urlFields = new Fields("num", "line");
    Tap iTap = new FileTap(new TextLine(urlFields), input);
    
    Fields kFields = new Fields("kword");
    Tap oTap = new FileTap(new TextLine(kFields), output);

    Pipe pipe = new Pipe("keyword");
    
    // read urls, download, clean and extract keywords (1:n)
    Function kFun = new KeywordExtractFunction(kFields);
    pipe = new Each(pipe, urlFields,  kFun);
    
    // group by word and count it
    pipe = new GroupBy(pipe, kFields);
    Aggregator kCount = new Count(new Fields("count"));
    pipe = new Every(pipe, kCount);
    
    // filter out words with count < 1
    Filter kCountFilter = new ExpressionFilter("$1 <= 1", Integer.class);
    pipe = new Each(pipe, kCountFilter);
    
    // pass the keywords to our custom google search
    Fields kcFields = new Fields("kword", "count");
    Fields uFields = new Fields("url");
    Function uFun = new UrlExtractFunction(uFields);
    pipe = new Each(pipe, kcFields, uFun);
    
    // group by url and count it
    pipe = new GroupBy(pipe, uFields);
    Aggregator uCount = new Count(new Fields("count"));
    pipe = new Every(pipe, uCount);
    
    // filter out urls that occur once
    Filter uCountFilter = new ExpressionFilter("$1 <= 1", Integer.class);
    pipe = new Each(pipe, uCountFilter);
    
    // remove the count value
    pipe = new Each(pipe, Fields.ALL, new Identity(), Fields.FIRST);
    
    FlowDef flowDef = FlowDef.flowDef().
      setName("newsclip").
      addSource(pipe, iTap).
      addTailSink(pipe,  oTap);
    
    Properties props = new Properties();
    AppProps.setApplicationJarClass(props, Main.class);
    FlowConnector flowConnector = new LocalFlowConnector(props);

    Flow flow = flowConnector.connect(flowDef);
    flow.writeDOT("data/newsclip.dot");
    flow.complete();
  }
}




The corresponding Graphviz DOT file for the assembly (generated by flow.writeDOT in the code above) is shown at left. I converted it to a web-displayable PNG file using the command "dot -Tpng newsclip.dot -o newsclip.png".

The code above uses built-in functions and filters where available, but the core operations are done by custom functions. The KeywordExtractFunctionTest extracts a set of keywords from a web page given its URL. It uses Boilerpipe to extract the relevant plain text from a web page, and my implementation of the RAKE algorithm to extract keywords from the plain text.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// Source: src/main/java/com/mycompany/newsclip/KeywordExtractFunction.java
package com.mycompany.newsclip;

import java.io.InputStream;
import java.net.URL;
import java.util.Collections;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import de.l3s.boilerpipe.BoilerpipeProcessingException;
import de.l3s.boilerpipe.extractors.DefaultExtractor;

@SuppressWarnings("rawtypes")
public class KeywordExtractFunction extends BaseOperation 
    implements Function {

  private static final long serialVersionUID = -7122434545764806604L;
  private static final Logger LOGGER = 
    LoggerFactory.getLogger(KeywordExtractFunction.class);

  public KeywordExtractFunction(Fields fields) {
    super(2, fields);
  }
  
  @Override
  public void operate(FlowProcess flowProcess, FunctionCall funCall) {
    TupleEntry args = funCall.getArguments();
    String url = args.getString(1);
    String rawText = download(url);
    String plainText = parse(rawText);
    List<String> keywords = extractKeywords(plainText);
    for (String keyword : keywords) {
      Tuple t = new Tuple();
      t.add(keyword);
      funCall.getOutputCollector().add(t);
    }
  }

  protected String download(String url) {
    try {
      URL u = new URL(url);
      u.openConnection();
      InputStream istream = u.openStream();
      StringBuilder buf = new StringBuilder();
      byte[] b = new byte[1024];
      int bytesRead = 0;
      while ((bytesRead = istream.read(b)) > 0) {
        buf.append(new String(b, 0, bytesRead));
        b = new byte[1024];
      }
      istream.close();
      return buf.toString();
    } catch (Exception e) {
      LOGGER.error(e.getMessage(), e);
      return null;
    }
  }

  protected String parse(String rawText) {
    if (StringUtils.isEmpty(rawText)) return null;
    else {
      try {
        return DefaultExtractor.INSTANCE.getText(rawText);
      } catch (BoilerpipeProcessingException e) {
        LOGGER.error(e.getMessage(), e);
        return null;
      }
    }
  }

  protected List<String> extractKeywords(String plainText) {
    try {
      return RakeExtractor.INSTANCE.extract(plainText);
    } catch (Exception e) {
      LOGGER.error(e.getMessage(), e);
      return Collections.emptyList();
    }
  }
}

The other custom function is the UrlExtractFunction, which takes each keyword and hands it off to Google's Custom Search API, and returns the URLs of the top 10 search results returned. The Custom Search instance you set up can be customized to only allow results from a list of websites (or the entire web). The KEY and CX values are parameters that identify your client to the Google Search API, and you will need to populate a file with these values in src/main/resources/google.lic (the one in GitHub has placeholders).

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
// Source: src/main/java/com/mycompany/newsclip/UrlExtractFunction.java
package com.mycompany.newsclip;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ArrayNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;

/**
 * Function to take a keyword and use Google's custom search
 * service to retrieve the top 10 URLs.
 */
@SuppressWarnings("rawtypes")
public class UrlExtractFunction extends BaseOperation implements Function {

  private static final long serialVersionUID = 1622228905563317614L;
  private static final Logger LOGGER = 
    LoggerFactory.getLogger(UrlExtractFunction.class);
  
  private static final String CUSTOM_SEARCH_URL_TEMPLATE =
    "https://www.googleapis.com/customsearch/v1?key={KEY}&cx={CX}&q={Q}";
  private String key;
  private String cx;
  private ObjectMapper objectMapper;
  
  public UrlExtractFunction(Fields fields) {
    super(2, fields);
    Properties props = new Properties();
    try {
      props.load(new FileInputStream("src/main/resources/google.lic"));
    } catch (IOException e) {
      LOGGER.error(e.getMessage(), e);
    }
    key = props.getProperty("key");
    cx = props.getProperty("cx");
    objectMapper = new ObjectMapper();
  }
  
  @Override
  public void operate(FlowProcess flowProcess, FunctionCall funCall) {
    TupleEntry args = funCall.getArguments();
    String keyword = args.getString(0);
    List<String> urls = parseSearchResult(keyword);
    for (String url : urls) {
      Tuple t = new Tuple();
      t.add(url);
      funCall.getOutputCollector().add(t);
    }
  }

  protected List<String> parseSearchResult(String keyword) {
    try {
      String url = CUSTOM_SEARCH_URL_TEMPLATE.
        replaceAll("{KEY}", key).
        replaceAll("{CX}", cx).
        replaceAll("{Q}", URLEncoder.encode(keyword, "UTF-8"));
      URL u = new URL(url);
      u.openConnection();
      InputStream istream = u.openStream();
      StringBuilder buf = new StringBuilder();
      byte[] b = new byte[1024];
      int bytesRead = 0;
      while ((bytesRead = istream.read(b)) > 0) {
        buf.append(new String(b, 0, bytesRead));
        b = new byte[1024];
      }
      istream.close();
      return parseJson(buf.toString());
    } catch (Exception e) {
      LOGGER.error(e.getMessage(), e);
      return Collections.emptyList();
    }
  }

  protected List<String> parseJson(String json) throws Exception {
    List<String> urls = new ArrayList<String>();
    JsonParser parser = objectMapper.getJsonFactory().
      createJsonParser(json);
    JsonNode root = objectMapper.readTree(parser);
    ArrayNode items = (ArrayNode) root.get("items");
    for (JsonNode item : items) {
      urls.add(item.get("link").getTextValue());
    }
    return urls;
  }
}

And thats pretty much it. Put the list of your "interesting pages", one per line, into data/urls.txt, and run the Cascading job locally using the mvn exec:java command, as shown below. The output of the job is written to data/new_urls.txt. The new data can be used to feed back URLs into the original list (perhaps after some sort of manual vetting by the researcher).

1
2
3
sujit@cyclone:cascading-newsclip$ mvn exec:java \
  -Dexec.mainClass="com.mycompany.newsclip.Main" \
  -Dexec.args="data/urls.txt data/new_urls.txt"

As you can see from the diagram, the Cascading code is running 11 Hadoop Map-Reduce jobs in sequence. This translates to a lot of Hadoop code. So Cascading, like Pig, is a huge time saver. Pig does allow Java UDFs, but I think Cascading's all-Java approach is easier to work with.

[*] Update 2013-04-16: I came across Tommy Chheng's post where he shows how to write Cascading code in Scala on this GitHub page. So great news, it appears that it may be possible to do this after all :-).