Scala MapReduce Framework giving Type Mismatch

J Calbreath

I have a MapReduce framework in Scala that is based on several o fthe org.apache.hadoop libraries. It works great with a simple wordcount program. However, I want to apply it to something useful and am hitting a roadblock. I want to take a csv file (or any delimiter really) and pass whatever is in the 1st column as the key and subsequently count the incidence of keys.

The mapper code looks like this

class WordCountMapper extends Mapper[LongWritable, Text, Text, LongWritable] with HImplicits {
  protected override def map(lnNumber: LongWritable, line: Text, context: Mapper[LongWritable, Text, Text, LongWritable]#Context): Unit = {
  line.split(",", -1)(0) foreach (context.write(_,1))  //Splits data

The problem comes in the 'line.split' code. When I try to compile it, I get an error that says:

found: char

line.split... should return a string that is being passed to the _ in the write(_,1), but for soem reason itis thinking it is a char. I've even added .toString to explicitly make it a string but that didn't work either.

Any ideas are appreciated. Let me know what additional details I can provide.


Here is the list of imports:

import{LongWritable, Text}
import org.apache.hadoop.mapreduce.{Reducer, Job, Mapper}
import org.apache.hadoop.conf.{Configured}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import scala.collection.JavaConversions._
import org.apache.hadoop.util.{ToolRunner, Tool}

Here is the build.sbt code:

import AssemblyKeys._ // put this at the top of the file


organization := "scala"

name := "WordCount"

version := "1.0"

scalaVersion:= "2.11.2"

scalacOptions ++= Seq("-no-specialization", "-deprecation")

libraryDependencies ++= Seq("org.apache.hadoop" % "hadoop-client" % "1.2.1",
                        "org.apache.hadoop" % "hadoop-core" % "latest.integration" exclude ("hadoop-core", "org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.class") ,
                        "org.apache.hadoop" % "hadoop-common" % "2.5.1",
                        "org.apache.hadoop" % "hadoop-mapreduce-client-core" % "2.5.1",
                        "commons-configuration" % "commons-configuration" % "1.9",
                        "org.apache.hadoop" % "hadoop-hdfs" % "latest.integration")

 jarName in assembly := "WordCount.jar"

 mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
  {case s if s.endsWith(".class") => MergeStrategy.last
case s if s.endsWith(".xsd") => MergeStrategy.last
case s if s.endsWith(".dtd") => MergeStrategy.last
case s if s.endsWith(".xml") => MergeStrategy.last
case s if s.endsWith(".properties") => MergeStrategy.last
case x => old(x)
I actually solved this by not using the _ notation and just directly specifying the value in context.write. So instead of:

line.split(",", -1)(0) foreach (context.write(_,1))

I used:

context.write(line.split(",", -1)(0), 1)

I found a item online that said sometime Scala gets confused on data types when using the _, and recommending just explicitly defining the value in place. Not sure if that is true, but it solved the problem in this case.

