全部版块 我的主页
论坛 数据科学与人工智能 大数据分析 Hadoop论坛
1168 3
2014-06-13

Hadoop应用实例

从今天开始做一个自己的hadoop的例子,例子的目标是完成如下任务。
任务目标:
有一个informix数据库,其中有一个表有100万的数据,任务完成通过MapRed的方式将这100万数据导入到HDFS中,可以使用informix的分页sql达到并发查询数据库的目的。
任务开始:
1
、编写MapRed的实现类起名称为InformixLoader
大致代码如下

Java代码  [url=][/url]

1.             LOG.info("SqlMapper");  

2.             String url = context.getConfiguration().get("informix.url");  

3.             String[] str = value.toString().split("_");  

4.             long start = Long.parseLong(str[0]);  

5.             long length = Long.parseLong(str[1]);  

6.             Connection conn = null;  

7.             Statement st = null;  

8.             ResultSet rs = null;  

9.             String sql = "";  

10.            try {  

11.                Class.forName("com.informix.jdbc.IfxDriver");  

12.                conn = DriverManager.getConnection(url, "niosuser", "db");  

13.                st = conn.createStatement();  

14.                sql = "select skip " + start + " first " + length  

15.                        + " int_id from tpa_sts_cell_ne";  

16.                LOG.info("SqlMapper sql:" + sql);  

17.                rs = st.executeQuery(sql);  

18.                dataKey.set(value.toString());  

19.                Path file = new Path(StringUtils.unEscapeString(TMP_MAP_DIR  

20.                        + value.toString()));  

21.                FileSystem fs = file.getFileSystem(context.getConfiguration());  

22.                fs.createNewFile(file);  

23.                FSDataOutputStream output = fs.create(file);  

24.                LOG.info("SqlMapper createNewFile OK!");  

25.                while (rs.next()) {  

26.                    String c1 = rs.getString(1)+"\n";  

27.                    output.write(c1.getBytes());  

28.                    output.flush();  

29.                }  

30.                output.close();  

31.//              fs.close();  

32.                data.set(value.toString());  

33.                context.write(dataKey, value);  

34.                LOG.info("SqlMapper OK!");  

35.            } catch (Exception e) {  

36.                throw new IOException(sql, e.fillInStackTrace());  

37.            } finally {  

38.                if (rs != null) {  

39.                    try {  

40.                        rs.close();  

41.                    } catch (SQLException e) {  

42.                        throw new IOException(e.fillInStackTrace());  

43.                    }  

44.                }  

45.                if (st != null) {  

46.                    try {  

47.                        st.close();  

48.                    } catch (SQLException e) {  

49.                        throw new IOException(e.fillInStackTrace());  

50.                    }  

51.                }  

52.                if (conn != null) {  

53.                    try {  

54.                        conn.close();  

55.                    } catch (SQLException e) {  

56.                        throw new IOException(e.fillInStackTrace());  

57.                    }  

58.                }  

59.            }  

60.         




2
、编写InputFormater
代码如下:

Java代码  [url=][/url]

1. LOG.info("InputFormarter");  

2. String url = context.getConfiguration().get("informix.url");  

3. Connection conn = null;  

4. Statement st = null;  

5. ResultSet rs = null;  

6. try {  

7.     Class.forName("com.informix.jdbc.IfxDriver");  

8.     conn = DriverManager.getConnection(url, "niosuser", "db");  

9.     st = conn.createStatement();  

10.    String sql = "select count(*) from tpa_sts_cell_ne";  

11.    rs = st.executeQuery(sql);  

12.    rs.next();  

13.    int count = rs.getInt(1);  

14.    List<InputSplit> splits = new ArrayList<InputSplit>();  

15.    int size = 50000;  

16.    int inv = count / size;  

17.    int last = count % size;  

18.    for (int i = 0; i < inv; i++) {  

19.            SqlSplit s = new SqlSplit(i * size, size);  

20.            splits.add(s);  

21.    }  

22.    if (last!=0){  

23.        SqlSplit s = new SqlSplit(inv * size, last);  

24.        splits.add(s);  

25.    }  

26.    return splits;  

27.} catch (Exception e) {  

28.    throw new IOException(e.fillInStackTrace());  

29.} finally {  

30.    if (rs != null) {  

31.        try {  

32.            rs.close();  

33.        } catch (SQLException e) {  

34.            throw new IOException(e.fillInStackTrace());  

35.        }  

36.    }  

37.    if (st != null) {  

38.        try {  

39.            st.close();  

40.        } catch (SQLException e) {  

41.            throw new IOException(e.fillInStackTrace());  

42.        }  

43.    }  

44.    if (conn != null) {  

45.        try {  

46.            conn.close();  

47.        } catch (SQLException e) {  

48.            throw new IOException(e.fillInStackTrace());  

49.        }  

50.    }  

51.}  



3
、编写reducer
大致代码如下

Java代码  [url=][/url]

1.             String keyStr = key.toString();  

2.             Path outFile = new Path(StringUtils.unEscapeString(TMP_RED_DIR  

3.                     + keyStr));  

4.             LOG.info("SqlReducer outfile:"+outFile.getName());  

5.             FileSystem outfs = outFile  

6.                     .getFileSystem(context.getConfiguration());  

7.             outfs.createNewFile(outFile);  

8.             FSDataOutputStream output = outfs.create(outFile);  

9.             for (Text val : values) {  

10.                LOG.info("SqlReducer");  

11.                String str = val.toString();  

12.                LOG.info("file:"+str);  

13.                Path inputFile = new Path(StringUtils  

14.                        .unEscapeString(TMP_MAP_DIR + str));  

15.                FileSystem fs = inputFile.getFileSystem(context  

16.                        .getConfiguration());  

17.                FSDataInputStream input = fs.open(inputFile);  

18.                BufferedInputStream bi = new BufferedInputStream(input);  

19.                byte[] buffer=new byte[1024];  

20.                int length=bi.read(buffer);  

21.                while (length!=-1) {  

22.                    if (length==1024){  

23.                    output.write(buffer);  

24.                    }else{  

25.                        byte[] tmp=new byte[length];  

26.                        for(int i=0;i<tmp.length;i++){  

27.                            tmp=buffer;  

28.                        }  

29.                        output.write(buffer);  

30.                    }  

31.                    length=bi.read(buffer);  

32.                }  

33.                bi.close();  

34.                input.close();  

35.//              fs.close();  

36.                output.flush();  

37.            }  

38.            output.close();  

39.            result.set(key.toString());  

40.            context.write(key, result);  

41.         



4
、编写outformat
大致代码如下:

Java代码  [url=][/url]

1.             Path outFilePath = getDefaultWorkFile(context, "");  

2.             final FileSystem fs = outFilePath.getFileSystem(context  

3.                     .getConfiguration());  

4.             final FSDataOutputStream output = fs.create(outFilePath);  

5.             return new RecordWriter<Text, Text>() {  

6.   

7.                 @Override  

8.                 public void close(TaskAttemptContext context)  

9.                         throws IOException, InterruptedException {  

10.                    output.flush();  

11.                    output.close();  

12.//                  fs.close();  

13.  

14.                }  

15.  

16.                @Override  

17.                public void write(Text key, Text value) throws IOException,  

18.                        InterruptedException {  

19.                    LOG.info("RecordWriter filename:"+value.toString());  

20.                    Path file = new Path(StringUtils.unEscapeString(TMP_RED_DIR  

21.                            + value.toString()));  

22.                    FileSystem fs = file.getFileSystem(context  

23.                            .getConfiguration());  

24.                    FSDataInputStream input = fs.open(file);  

25.                    BufferedInputStream bi = new BufferedInputStream(input);  

26.                    byte[] buffer=new byte[1024];  

27.                    int length=bi.read(buffer);  

28.                    while (length!=-1) {  

29.                        if (length==1024){  

30.                        output.write(buffer);  

31.                        }else{  

32.                            byte[] tmp=new byte[length];  

33.                            for(int i=0;i<tmp.length;i++){  

34.                                tmp=buffer;  

35.                            }  

36.                            output.write(buffer);  

37.                        }  

38.                        length=bi.read(buffer);  

39.                    }  

40.                    bi.close();  

41.                    input.close();  

42.//                  fs.close();  

43.                     

44.                }  

45.            };  

46.         



5
、编写启动代码
大致代码如下:

Java代码  [url=][/url]

1. File jarFile = EJob.createTempJar("bin");  

2.         EJob.addClasspath("I:\\work\\hadoop\\hadoop\\hadoop-site.xml");  

3.         ClassLoader classLoader = EJob.getClassLoader();  

4.         Thread.currentThread().setContextClassLoader(classLoader);  

5.         args = new String[] { "/tmp/sqlloader10/" };  

6.   

7.         Configuration conf = new Configuration();  

8.         String[] otherArgs = new GenericOptionsParser(conf, args)  

9.                 .getRemainingArgs();  

10.  

11.        if (otherArgs.length != 1) {  

12.            System.err.println("Usage: informixLoader <out>");  

13.            System.exit(2);  

14.        }  

15.  

16.        conf  

17.                .set("informix.url",  

18.                        "jdbc:informix-sqli://10.0.2.36:8001/niosdb:INFORMIXSERVER=niosserver");  

19.  

20.        Job job = new Job(conf, "informix loader");  

21.        // And add this statement. XXX  

22.        ((JobConf) job.getConfiguration()).setJar(jarFile.toString());  

23.        job.setInputFormatClass(InputFormarter.class);  

24.        job.setJarByClass(InformixLoader.class);  

25.        job.setMapperClass(SqlMapper.class);  

26.        job.setReducerClass(SqlReducer.class);  

27.        job.setOutputKeyClass(Text.class);  

28.        job.setOutputValueClass(Text.class);  

29.        job.setOutputFormatClass(OutputFormater.class);  

30.        // FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  

31.        FileOutputFormat.setOutputPath(job, new Path(otherArgs[0]));  

32.        System.exit(job.waitForCompletion(true) ? 0 : 1);  



二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

全部回复
2014-12-10 15:07:01
不错,实践中!
二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

2014-12-10 15:07:53
不错,实践中!
二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

2014-12-13 19:45:36
看到代码里有windows路径
二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

相关推荐
栏目导航
热门文章
推荐文章

分享

扫码加好友,拉您进群