借hbase-rdd二次开发谈如何在Spark Core之上扩建自己的模块

栏目: 数据库 · 发布时间: 6年前

内容简介:借hbase-rdd二次开发谈如何在Spark Core之上扩建自己的模块

我是51CTO学院讲师张敏,在51CTO学院“4.20 IT充电节”(4月19~20日) 到来之际,和大家分享一下Spark Core之上扩建自己的模块的经验。正文来啦~~~

借hbase-rdd二次开发谈如何在Spark Core之上扩建自己的模块

hbase-rdd是一个构建在SparkContext基础之上的用于对Hbase进行增删改查的第三方开源模块,目前最新版本为0.7.1。目前该rdd在操作hbase时,默认调用隐式方法。

implicitdef stringToBytes(s: String): Array[Byte] = {  
Bytes.toBytes(s)  
} 

将RDD的key转换成字节b,然后调用Hbase的put(b)方法保存rowkey,之后将RDD的每一行存入hbase。

在轨迹图绘制项目数据计算中,我们考虑到hbase的rowkey的设计——尽量减少rowkey存储的开销。虽然hbase-rdd最终的rowkey默认都是采用字节数组,但这个地方我们希望按自己的方式组装rowkey。使用MD5(imei)+dateTime组成的字节数组作为rowkey。因此默认的hbase-rdd提供的方法是不满足我们存储需求的,需要对源代码进行修改。在toHbase方法中,有一个convert方法,该方法将对RDD中的每一行数据进行转化,使用RDD中的key生成Put(Bytes.toBytes(key))对象,该对象为之后存储Hbase提供rowkey。

在convert函数中,对其实现进行了改造,hbase-rdd默认使用stringToBytes隐式函数将RDD的String类型的key转换成字节数组,这里我们需要改造,不使stringToBytes隐式方法,而是直接生成字节数据。

protected def convert(id: String, values: Map[String, Map[String, A]], put: PutAdder[A]) = {  
val strs = id.split(",")  
val imei = strs {0}  
val dateTime = strs {1}  
val b1 = MD5Utils.computeMD5Hash(imei.getBytes())  
val b2 = Bytes.toBytes(dateTime.toLong)  
val key = b1.++(b2)  
val p = new Put(key)//改造  
var empty = true  
for {  
(family, content) <- values  
(key, value) <- content  
} {  
empty = false  
if (StrUtils.isNotEmpty(family) &&StrUtils.isNotEmpty(key)) {  
put(p, family, key, value)  
}  
}  
if (empty) None else Some(new ImmutableBytesWritable, p)  
} 

这样就实现了使用自己的方式构建rowkey,当然基于此思想我们可以使用任意的方式构建rowkey。

在使用hbase-rdd插件的过程中,我在思考,默认的RDD上是没有toHbase方法的,那为什么引入hbase-rdd包之后,RDD之上就有toHbase方法了?经过查看源码,发现hbase-rdd包中提供了两个隐式方法:

implicitdef toHBaseRDDSimple[A](rdd: RDD[(String, Map[String, A])])(implicit writer: Writes[A]): HBaseWriteRDDSimple[A] =new HBaseWriteRDDSimple(rdd, pa[A]) 
implicit def toHBaseRDDSimpleTS[A](rdd: RDD[(String, Map[String, (A, Long)])])(implicit writer: Writes[A]): HBaseWriteRDDSimple[(A, Long)] =new HBaseWriteRDDSimple(rdd, pa[A]) 

这两个方法在发现RDD上没有toHbase方法时会自动尝试调用,从隐式定义中尝试找到解决方案,尝试之后发现有定义toHBaseRDDSimple隐式方法,于是调用该隐式方法新建HBaseWriteRDDSimple类,返回hBaseWriteRDDSimple,而在hBaseWriteRDDSimple对象中是有toHbase方法的,因此在引入hbase-rdd之后,可以发现原本没有toHbase方法的RDD上有toHbase方法了。这一切都要归功于Scala强大的隐式转换功能。

那明白了原理,是否我们可以基于RDD写自己的模块,说干就干!

第一步:新建Trait

traitHaha{ 
implicitdef gaga[A](rdd: RDD[String]): Hehe= 
newHehe(rdd) 
} 

第二步:新建Hehe类

final  class Hehe(rdd:RDD[String]) { 
def wow(tableName:String,family:String): Unit ={ 
println("---------------------------------------------") 
println("tableName:"+tableName+" - family:"+family) 
println("size:"+rdd.count()) 
rdd.collect().foreach(data=>println(data)) 
println("---------------------------------------------") 
   } 
} 

第三步:新建包对象

package object test extends Haha 

第四步:新建test类

object Test{ 
def main(args: Array[String]) { 
valsparkConf = new SparkConf().setAppName("Test") 
valsc = new SparkContext(sparkConf) 
sc.makeRDD(Seq("one","two","three","four")).wow("taskDataPre","T") 
  } 
} 

项目结构图:

借hbase-rdd二次开发谈如何在Spark Core之上扩建自己的模块

运行效果图:

借hbase-rdd二次开发谈如何在Spark Core之上扩建自己的模块

希望对大家以后的开发有帮助,同时借鉴本案例,在Spark Core之上构建自己的小模块。

51CTO学院 4.20 IT充电节

(19-20号两天,100门视频课程免单抢,更有视频课程会员享6折,非会员享7折,套餐折上8折,微职位立减2000元钜惠)

活动链接: http://edu.51cto.com/activity/lists/id-47.html?wenzhang


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

京东技术解密

京东技术解密

京东研发体系 / 电子工业出版社 / 2014-11-18 / 65

京东高速的增长、闪电响应的供应链、庞大的团队规模等背后内幕,对于业界一直像谜一样神秘。随着成为中国B2C领导厂商以及在纳斯达克上市,京东越来越需要开放自己,与业界形成更好的交流与融合。《京东技术解密》的面世,就是京东技术团队首次向业界集体亮相。本书用翔实的内容为读者逐一解答——如何用技术支撑网站的综合竞争实力,如何把握技术革新的时间点,如何应对各种棘手问题及压力,如何在网站高速运转的情况下进行系统......一起来看看 《京东技术解密》 这本书的介绍吧!

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具