User login |
Using Scala with HadoopJonhnny Weslley gave an interesting post about how to use Scala with Hadoop. However, due to Hadoop API changes and other reasons, the codes from Weslley can not work as I tried for hadoop-0.20.0. I have tried a version that works with hadoop-0.20.0. The most tricky part is the generics of Java in Scala.
package shadoop
import SHadoop._
import java.util.Iterator
import org.apache.hadoop.fs._
import org.apache.hadoop.io._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.OutputCollector
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.Mapper
import org.apache.hadoop.mapreduce.MapContext
import org.apache.hadoop.mapreduce.Reducer
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.util.GenericOptionsParser
object WordCount {
class MyMap extends Mapper[LongWritable, Text, Text, IntWritable] {
val one = 1
override def map(ky: LongWritable, value: Text, output: Mapper[LongWritable, Text, Text, IntWritable]#Context) = {
(value split " ") foreach (output write (_, one))
}
}
class MyReduce extends Reducer[Text, IntWritable, Text, IntWritable] {
override def reduce(key: Text, values: java.lang.Iterable[IntWritable],
output: Reducer[Text, IntWritable, Text, IntWritable]#Context) = {
val iter: Iterator[IntWritable] = values.iterator()
val sum = iter reduceLeft ((a: Int, b: Int) => a + b)
output write (key, sum)
}
}
def main(args: Array[String]) = {
val conf = new Configuration()
val otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs()
val job = new Job(conf, "word count")
job setJarByClass(WordCount getClass())
job setMapperClass(classOf[WordCount.MyMap])
job setCombinerClass(classOf[WordCount.MyReduce])
job setReducerClass(classOf[WordCount.MyReduce])
job setMapOutputKeyClass(classOf[Text])
job setMapOutputValueClass(classOf[IntWritable])
job setOutputKeyClass(classOf[Text])
job setOutputValueClass(classOf[IntWritable])
FileInputFormat addInputPath(job, new Path(otherArgs(0)))
FileOutputFormat setOutputPath(job, new Path(otherArgs(1)))
System exit(job waitForCompletion(true) match { case true => 0 case false => 1})
}
}
By viirya at 2009-08-31 17:15 | login to post comments
|