Monday, December 3, 2012

Hadoop with Spring Data and Spring Batch

This is a beta post :)

Following the blogs - http://www.petrikainulainen.net/programming/apache-hadoop/creating-hadoop-mapreduce-job-with-spring-data-apache-hadoop/
http://bighadoop.wordpress.com/2012/03/25/spring-data-apache-hadoop/
http://static.springsource.org/spring-hadoop/docs/current/reference/html/index.html
My use case was :
I needed to run the Hadood job triggered from my application. For this i could have called hadoop job directly from my application. However i my framework has been heavily relying on Spring. And keeping to the best practise of Spring i have intergrated everyting via Spring.
Check this to get an update of the features provided by Spring-hadoop intergration - http://www.springsource.org/spring-data/hadoop
So the BigData Intergration is also via spring-data-hadoop.
Add following to your POM
org.springframework.data
spring-data-hadoop
1.0.0.RC1
org.apache.hadoop
hadoop-core
1.0.0

</dependencies>

<repositories>
<repository>
<id>repository.springsource.milestone</id>
<name>SpringSource Milestone Repository</name>
<url>http://repo.springsource.org/milestone</url>
</repository>

As mentioned in the Spring-datahadoop ,This is built for versions which are above .20 of Hadoop. SO if you have been writing your mapreduce programs by implementing the Mapper inside mapred package it may not work with spring-data-hadoop.I used to have my sample mapred program written so. so i had to change the implementation by extending the Mapper class within the mapreduce package

In my case my aspring-data application was residing in a different account . And my hadoop installation was running in a different account. This was done to isolate the complexities of installation and hosting of hadoop. But when i run my hadoop job from spring-data in my application i get the exception like
org.apache.hadoop.security.AccessControlException: org.apache.hadoop.security.AccessControlException: Permission denied:  access=write
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:199)
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:180)
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:128)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:5212)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:5
For info on how to tackle this refer : http://hadoop.apache.org/docs/r0.20.2/hdfs_permissions_guide.html
When mapred workd it try to access the files from the hdfs. For every access it tries to check the access permission of the files in the hdfs to the account from within which you are running the job. so to enable a new user to access the hdfs you just need to add the user to the supergroup. For this create a group in my case 'hadoop' and add all the user who want to access hdfs to this group.
sudo groupadd hadoop

usermod -a -G hadoop hadoopadmin
usermod -a -G hadoop jobadmin

here the hadoop administrator is hadoopadmin
and the person running the job from spring-data-hadoop application is jobadmin

and then set hadoop group as supergroup by adding following to hdfs-site.xml and restarting the hadoop,hdfs

<property>
<name>dfs.permissions.supergroup</name>
<value>hadoop</value>
</property>

mapred.job.tracker

By setting the tracker, you delegate the mapper/reducer code to run on
that tracker (if you're using the single-node cluster). Most likely on
that tracker you don't have hadoop-examples available in the classpath
and thus the Wordcount$TokenizerMapper is not found. Make sure these
classes are on the classpath or use the jar/jar-by-class attributes and
specify a jar enclosing the class (such as hadoop-examples.jar).

Thus in my case the hdp:cpfiguration looks like:

<hdp:configuration>
fs.default.name=hdfs://hosta:54410
mapred.job.tracker=hosta:54411
</hdp:configuration>

This ensures that the job i run in spring hadoop would be delegated to run in the map/red code thats running @hosta:54411 which is a multi-node hadoop
cluster.

Finaly to run my job in the remote multinode cluster my job description is something like

<hdp:job id="wordcount-job"
input-path="/sample/test-data.txt" output-path="/sample/out2"
mapper="xxx.social.mine.mapr.WordCount2.TokenizerMapper2"
reducer="xxx.social.mine.mapr.WordCount2.IntSumReducer2"
input-format="org.apache.hadoop.mapreduce.lib.input.TextInputFormat"
jar-by-class="xxxx.social.mine.mapr.WordCount2"
jar="classpath:xxx-job.jar"
key="org.apache.hadoop.io.Text"
value="org.apache.hadoop.io.IntWritable"
/>

And i have added the job as a jar'xxx-job.jar' into my class path. This is referred in jar="classpath:xxx-job.jar" :)
and that should solve your problems....

But If we need to write relavant map reduce code then simple hdfs file read and word count is not enough.
We need much more like the DBInputFormat which is used for reading from DB from within Hadoop code.
When we need to intergrate such codes having DBInputFormat the TextinputFormat which is the default and only support
provided by spring data become a paintpoint. However thanks to springs flexibility. We have alternate options :

Create a Hadoop Job by Code and add it to JobRunner.
But in that case the jar issue like adding the jar to class path need to be done by us.

A seconnd and efficient approach is let the Spring built the job object for you. but you can modigy the job object to add
support for DBInput format. But in this case i came across some issues like the spring-hadoop.xsd insists one to set the
input-path and output-path which are mandatory for fileInputText. However for DB readin this can be ignored. So i had to change the
xsd to remove the required attribute for them.

After a little research i was able to find that the spring Data was primarily for latter verisons of Hadoop. the Hadoop-1.0.0 lacks DBInput Format
in the mapreduce package which is the approach going forward as DBInputFormat within mapred is deprecated. And moreover
Spring-data-hadoop has been built for latter versions those using the mapreduce package.So will need to migrate to Hadoop 1.1.0

However when my map-red started working for reading from postgresql . i got this error

java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.postgresql.Driver
at org.apache.hadoop.mapreduce.lib.db.DBInputFormat.setConf(DBInputFormat.java:164)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:62)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:117)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:723)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1136)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.postgresql.Driver
at org.apache.hadoop.mapreduce.lib.db.DBInputFormat.getConnection(DBInputFormat.java:190)
at org.apache.hadoop.mapreduce.lib.db.DBInputFormat.setConf(DBInputFormat.java:158)
... 9 more
Caused by: java.lang.ClassNotFoundException: org.postgresql.Driver
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:186)
at org.apache.hadoop.mapreduce.lib.db.DBConfiguration.getConnection(DBConfiguration.java:148)
at org.apache.hadoop.mapreduce.lib.db.DBInputFormat.getConnection(DBInputFormat.java:184)
... 10 mor

I hope this is because the node running the jobes desnt have postgresql.jar. I may have to put it into the distributed cache so that
every node will have the jar.

I solved by adding : libs

<hdp:job id="wordcountJob"
output-path="/sample/out2"
jar-by-class="xxx.mine.mapr.WordCount2"
jar="classpath:dbjob-job.jar"
key="org.apache.hadoop.io.LongWritable"
value="xxx.mine.mapr.SocialRecord"
libs="file://repository/postgresql/postgresql/9.0-801.jdbc4/postgresql-9.0-801.jdbc4.jar"
/>

And my job configuration is something like

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"/WEB-INF/spring/ApplicationContext.xml"})
public class UpdateJobServiceImplTest {

@Inject
JobRunner jobRunner;

@Inject
Job wordcountJob;

@Test
public void testUpdateMine() {
System.out.println("Hello!!");

Configuration conf = wordcountJob.getConfiguration();

String [] fields = {"xxxxx", "xxxxxx"};
DBConfiguration.configureDB(conf,"org.postgresql.Driver" ,"jdbc:postgresql://xx.xx.xx.xx:5432/testdb/postgres", "xxxxx","xxxxx");
DBInputFormat.setInput(wordcountJob,SocialRecord.class,"xxx",null,"xxxx",fields);
wordcountJob.setInputFormatClass(DBInputFormat.class);
wordcountJob.setMapperClass(DBMapR.DBMapper.class);

try {
jobRunner.call();
} catch (Exception e) {

e.printStackTrace();
}

}

}

Now it ran successfuly!!!!


NB : if jobtracker is local then the map is by default set to one. -mapred.map.tasks
If you accidently start a hadoop job the only way to stop it is like kill the job
hadoop job -kill job_201212061636_0013