玩转大数据系列之Apache Pig如何与MySQL集成(三)

栏目: Apache · 发布时间: 7年前

上篇介绍了如何把Pig的结果存储到Solr中,那么可能就会有朋友问了,为什么不存到数据库呢? 不支持还是? 其实只要我们愿意,我们可以存储它的结果集到任何地方,只需要重写我们自己的StoreFunc类即可。

关于如何将Pig分析完的结果存储到数据库,在pig的piggy贡献组织里,已经有了对应的UDF了,piggybank是非apache官方提供的 工具 函数,里面的大部分的UDF都是,其他公司或着个人在后来使用时贡献的,这些工具类,虽然没有正式划入pig的源码包里,但是pig每次发行的时候,都会以扩展库的形式附带,编译后会放在pig根目录下一个叫contrib的目录下,

piggybank的地址是

https://cwiki.apache.org/confluence/display/PIG/PiggyBank

,感兴趣的朋友们,可以看一看。

将pig分析完的结果存入到数据库,也是非常简单的,需要的条件有:

(1)piggybank.jar的jar包

(2)依赖数据库的对应的驱动jar

有一点需要注意下,在将结果存储到数据库之前,一定要确保有访问和写入数据库的权限,否则任务就会失败!

散仙在存储到远程的 MySQL 上,就是由于权限的问题,而写入失败了,具体的异常是这样描述的:

Access denied for user 'root'@'localhost'

当出现上面异常的时候,就意味着权限写入有问题,我们使用以下的授权方法,来给目标机赋予权限:

(1)允许所有的机器ip访问

GRANT ALL PRIVILEGES ON *.* TO 'myuser'@'%' IDENTIFIED BY 'mypassword' WITH GRANT OPTION; 

(2)允许指定的机器ip访问:

1. GRANT ALL PRIVILEGES ON *.* TO 'myuser'@'192.168.1.3' IDENTIFIED BY    'mypassword' WITH GRANT OPTION;  

确定有权限之后,我们就可以造一份数据,测试是否可以将HDFS上的数据存储到数据库中,测试数据如下:

1,2,3
1,2,4
2,2,4
3,4,2
8,2,4

提前在对应的MySQL上,建库建表建字段,看下散仙测试表的结构:

玩转大数据系列之Apache Pig如何与MySQL集成(三)

最后,在来看下我们的pig脚本是如何定义和使用的:

--注册数据库驱动包和piggybank的jar
register ./dependfiles/mysql-connector-java-5.1.23-bin.jar;
register ./dependfiles/piggybank.jar

--为了能使schemal和数据库对应起来,建议在这个地方给数据加上列名
a = load '/tmp/dongliang/g.txt' using PigStorage(',') as (id:int,name:chararray,count:int) ;


--过滤出id大于2的数据

a = filter a by id > 2;

--存储结果到数据库里
STORE a INTO '/tmp/dbtest' using org.apache.pig.piggybank.storage.DBStorage('com.mysql.jdbc.Driver', 'jdbc:mysql://192.168.146.63/user', 'root', 'pwd',
    'INSERT into pig(id,name,count) values (?,?,?)');
~

执行成功后,我们再去查看数据库发现已经将pig处理后的数据正确的写入到了数据库中:

玩转大数据系列之Apache Pig如何与MySQL集成(三)

最后,附上DBStore类的源码:

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.pig.piggybank.storage;

import org.joda.time.DateTime;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.pig.StoreFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;

import java.io.IOException;
import java.sql.*;

public class DBStorage extends StoreFunc {
  private final Log log = LogFactory.getLog(getClass());

  private PreparedStatement ps;
  private Connection con;
  private String jdbcURL;
  private String user;
  private String pass;
  private int batchSize;
  private int count = 0;
  private String insertQuery;

  public DBStorage(String driver, String jdbcURL, String insertQuery) {
    this(driver, jdbcURL, null, null, insertQuery, "100");
  }

  public DBStorage(String driver, String jdbcURL, String user, String pass,
      String insertQuery) throws SQLException {
    this(driver, jdbcURL, user, pass, insertQuery, "100");
  }

  public DBStorage(String driver, String jdbcURL, String user, String pass,
      String insertQuery, String batchSize) throws RuntimeException {
    log.debug("DBStorage(" + driver + "," + jdbcURL + "," + user + ",XXXX,"
        + insertQuery + ")");
    try {
      Class.forName(driver);
    } catch (ClassNotFoundException e) {
      log.error("can't load DB driver:" + driver, e);
      throw new RuntimeException("Can't load DB Driver", e);
    }
    this.jdbcURL = jdbcURL;
    this.user = user;
    this.pass = pass;
    this.insertQuery = insertQuery;
    this.batchSize = Integer.parseInt(batchSize);
  }

  /**
   * Write the tuple to Database directly here.
   */
  public void putNext(Tuple tuple) throws IOException {
    int sqlPos = 1;
    try {
      int size = tuple.size();
      for (int i = 0; i < size; i++) {
        try {
          Object field = tuple.get(i);

          switch (DataType.findType(field)) {
          case DataType.NULL:
            ps.setNull(sqlPos, java.sql.Types.VARCHAR);
            sqlPos++;
            break;

          case DataType.BOOLEAN:
            ps.setBoolean(sqlPos, (Boolean) field);
            sqlPos++;
            break;

          case DataType.INTEGER:
            ps.setInt(sqlPos, (Integer) field);
            sqlPos++;
            break;

          case DataType.LONG:
            ps.setLong(sqlPos, (Long) field);
            sqlPos++;
            break;

          case DataType.FLOAT:
            ps.setFloat(sqlPos, (Float) field);
            sqlPos++;
            break;

          case DataType.DOUBLE:
            ps.setDouble(sqlPos, (Double) field);
            sqlPos++;
            break;

          case DataType.DATETIME:
            ps.setDate(sqlPos, new Date(((DateTime) field).getMillis()));
            sqlPos++;
            break;

          case DataType.BYTEARRAY:
            byte[] b = ((DataByteArray) field).get();
            ps.setBytes(sqlPos, b);

            sqlPos++;
            break;
          case DataType.CHARARRAY:
            ps.setString(sqlPos, (String) field);
            sqlPos++;
            break;
          case DataType.BYTE:
            ps.setByte(sqlPos, (Byte) field);
            sqlPos++;
            break;

          case DataType.MAP:
          case DataType.TUPLE:
          case DataType.BAG:
            throw new RuntimeException("Cannot store a non-flat tuple "
                + "using DbStorage");

          default:
            throw new RuntimeException("Unknown datatype "
                + DataType.findType(field));

          }

        } catch (ExecException ee) {
          throw new RuntimeException(ee);
        }

      }
      ps.addBatch();
      count++;
      if (count > batchSize) {
        count = 0;
        ps.executeBatch();
        ps.clearBatch();
        ps.clearParameters();
      }
    } catch (SQLException e) {
      try {
        log
            .error("Unable to insert record:" + tuple.toDelimitedString("\t"),
                e);
      } catch (ExecException ee) {
        // do nothing
      }
      if (e.getErrorCode() == 1366) {
        // errors that come due to utf-8 character encoding
        // ignore these kind of errors TODO: Temporary fix - need to find a
        // better way of handling them in the argument statement itself
      } else {
        throw new RuntimeException("JDBC error", e);
      }
    }
  }

  class MyDBOutputFormat extends OutputFormat<NullWritable, NullWritable> {

    @Override
    public void checkOutputSpecs(JobContext context) throws IOException,
        InterruptedException {
      // IGNORE
    }

    @Override
    public OutputCommitter getOutputCommitter(TaskAttemptContext context)
        throws IOException, InterruptedException {
      return new OutputCommitter() {

        @Override
        public void abortTask(TaskAttemptContext context) throws IOException {
          try {
            if (ps != null) {
              ps.close();
            }
            if (con != null) {
              con.rollback();
              con.close();
            }
          } catch (SQLException sqe) {
            throw new IOException(sqe);
          }
        }

        @Override
        public void commitTask(TaskAttemptContext context) throws IOException {
          if (ps != null) {
            try {
              ps.executeBatch();
              con.commit();
              ps.close();
              con.close();
              ps = null;
              con = null;
            } catch (SQLException e) {
              log.error("ps.close", e);
              throw new IOException("JDBC Error", e);
            }
          }
        }

        @Override
        public boolean needsTaskCommit(TaskAttemptContext context)
            throws IOException {
          return true;
        }

        @Override
        public void cleanupJob(JobContext context) throws IOException {
          // IGNORE
        }

        @Override
        public void setupJob(JobContext context) throws IOException {
          // IGNORE
        }

        @Override
        public void setupTask(TaskAttemptContext context) throws IOException {
          // IGNORE
        }
      };
    }

    @Override
    public RecordWriter<NullWritable, NullWritable> getRecordWriter(
        TaskAttemptContext context) throws IOException, InterruptedException {
      // We don't use a record writer to write to database
    	return new RecordWriter<NullWritable, NullWritable>() {
    		   	  @Override
    		   	  public void close(TaskAttemptContext context) {
    		   		  // Noop
    		    	  }
    		    	  @Override
    		    	  public void write(NullWritable k, NullWritable v) {
    		    		  // Noop
    		    	  }
    		      };
    }

  }

  @SuppressWarnings("unchecked")
  @Override
  public OutputFormat getOutputFormat()
      throws IOException {
    return new MyDBOutputFormat();
  }

  /**
   * Initialise the database connection and prepared statement here.
   */
  @SuppressWarnings("unchecked")
  @Override
  public void prepareToWrite(RecordWriter writer)
      throws IOException {
    ps = null;
    con = null;
    if (insertQuery == null) {
      throw new IOException("SQL Insert command not specified");
    }
    try {
      if (user == null || pass == null) {
        con = DriverManager.getConnection(jdbcURL);
      } else {
        con = DriverManager.getConnection(jdbcURL, user, pass);
      }
      con.setAutoCommit(false);
      ps = con.prepareStatement(insertQuery);
    } catch (SQLException e) {
      log.error("Unable to connect to JDBC @" + jdbcURL);
      throw new IOException("JDBC Error", e);
    }
    count = 0;
  }

  @Override
  public void setStoreLocation(String location, Job job) throws IOException {
    // IGNORE since we are writing records to DB.
  }
}

欢迎扫码关注微信公众号:我是攻城师(woshigcs)

本公众号的内容是有关搜索和大数据技术和互联网等方面内容的分享,也是一个温馨的技术互动交流的小家园,有什么问题随时都可以留言,欢迎大家来访!

玩转大数据系列之Apache Pig如何与MySQL集成(三)

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

云攻略

云攻略

马克·贝尼奥夫、卡莱尔·阿德勒 / 徐杰 / 海天出版社 / 2010年8月 / 36.00元

Apple、Google、甲骨文、腾讯 都已投入了云的怀抱, 你还在等什么? 快来加入我们! 最初,Salesforce.com 只是一间小小的租赁公寓 在短短10年内 它已成长为 世界上发展最快、最具创新力的 产业变革领导者 曾经,这是个软件为王的时代。 现在,这是个云计算的新时代。 NO SOFTWARE 抛弃软件的......一起来看看 《云攻略》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

随机密码生成器
随机密码生成器

多种字符组合密码

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具