Saturday, November 23, 2013

DataDrivenInputFormat and DBInputFormat

Our requirement was to read data from a database and insert them into Hbase as a backup flow . For postgresql this was straightforward

        String databaseDriver="org.postgresql.Driver";
        String databaseURL = "jdbc:postgresql://xxxx:5432/xxxxdb";
        String databaseUsername="xxxxxx";
        String databasePassword="xxxxxxxx";
       
        job.setInputFormatClass(DBInputFormat.class);
        String [] fields = {"xwxwxwxw","xwxwxwxw","xwxwxwxw","xwxwxwxw","xwxwxwwxwxw","xwxwxwxw","xwxwxwxw"};
        DBConfiguration.configureDB(conf,databaseDriver ,databaseURL, databaseUsername,databasePassword);  
DBInputFormat.setInput(job,DBRecord.class,"xwxwxwxw",null,"xw",fields);    
        job.setOutputFormatClass(TableOutputFormat.class);

But then we had another dump which was  in SQL server. We got an error saying
java.io.IOException: Incorrect syntax near 'LIMIT'. at org.apache.hadoop.mapreduce.lib.db.DBRecordReader.nextKeyValue(DBRecordReader.java:235)
This was because the default DBInputFormat used LIMIT and OFFSET for creating splits from the database records. In MSSQL and Oracle these were not supported.

There are some DBSpecific record readers. In such cases we can use the DataDrivenInputFormat. Here we are required to give two queries one which retrieve the data for the split and other to retrieve tthe total count of the records. The new configuration looked like this

        String databaseDriver="com.microsoft.sqlserver.jdbc.SQLServerDriver";
        String databaseURL = "jdbc:postgresql://xxxx:1433/xxxxdb";
        String databaseUsername="xxxxxx";
        String databasePassword="xxxxxxxx";
        DBConfiguration.configureDB(conf,databaseDriver ,databaseURL, databaseUsername,databasePassword);
 
       String inputQuery = "SELECT * FROM 'xxxxxxxx' WHERE $CONDITIONS";
       String boundQuery="SELECT MIN(id),MAX(id) FROM 'xxxxxxx'"
      DataDrivenDBInputFormat.setInput(job, DBRecord.class, inputQuery, boundQuery);

Now that run fine except I had some issues like

 main" java.io.IOException: The index 2 is out of range.
    at org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat.getSplits(DataDrivenDBInputFormat.java:193)
    at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:1063)
Which was because the DBRecord.class for mapping the table to VO was stale and dint included the id as it was written for the DBInputFormat. Now with the DataDrivenInoutFormat I also had to incorporate the id as that is used for calucalting the bound and splits. And so I also had to incorporate the id in the DBRecord.java. That solved the problem !!

Happy coding


No comments: