Spark Streaming--应用与实战(二)

栏目: 编程工具 · 发布时间: 6年前

内容简介:Spark Streaming--应用与实战(二)
  • 总体思路就是:
  1. put数据构造json数据,写入kafka;
  2. spark streaming任务启动后首先去zookeeper中去读取offset,组装成fromOffsets;
  3. spark streaming 获取到fromOffsets后通过KafkaUtils.createDirectStream去消费Kafka的数据;
  4. 读取kafka数据返回一个InputDStream的信息,foreachRDD遍历,同时记录读取到的offset到zk中;
  5. 写入数据到HBase
    Spark Streaming--应用与实战(二)

初始化与配置加载

  • 下面是一些接收参数,加载配置,获取配置中的topic,还有初始化配置,代码如下:
    //接收参数
    val Array(kafka_topic, timeWindow, maxRatePerPartition) = args
    
    //加载配置
    val prop: Properties = new Properties()
    prop.load(this.getClass().getResourceAsStream("/kafka.properties"))
    
    val groupName = prop.getProperty("group.id")
    
    //获取配置文件中的topic
    val kafkaTopics: String = prop.getProperty("kafka.topic." + kafka_topic)
    if (kafkaTopics == null || kafkaTopics.length <= 0) {
        System.err.println("Usage: KafkaDataStream <kafka_topic> is number from kafka.properties")
        System.exit(1)
    }
    
    val topics: Set[String] = kafkaTopics.split(",").toSet
    
    val kafkaParams = scala.collection.immutable.Map[String, String](
        "metadata.broker.list" -> prop.getProperty("bootstrap.servers"),
        "group.id" -> groupName,
        "auto.offset.reset" -> "largest")
    
    val kc = new KafkaCluster(kafkaParams)
    
    //初始化配置
    val sparkConf = new SparkConf()
            .setAppName(KafkaDataStream.getClass.getSimpleName + topics.toString())
            .set("spark.yarn.am.memory", prop.getProperty("am.memory"))
            .set("spark.yarn.am.memoryOverhead", prop.getProperty("am.memoryOverhead"))
            .set("spark.yarn.executor.memoryOverhead", prop.getProperty("executor.memoryOverhead"))
            .set("spark.streaming.kafka.maxRatePerPartition", maxRatePerPartition) //此处为每秒每个partition的条数
            .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
            .set("spark.reducer.maxSizeInFlight", "1m")
    
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(timeWindow.toInt)) //多少秒处理一次请求
    

只是需要注意一下,这里的KafkaCluster,需要把源码拷贝过来,修改一下,因为里面有些方法是私有的。copy过来后改为public 即可。

链接ZK

  • 注意:这里的ZKStringSerializer,需要把源码拷贝过来,修改一下
    //zk
    val zkClient = new ZkClient(prop.getProperty("zk.connect"), Integer.MAX_VALUE, 100000, ZKStringSerializer)
    val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
    

组装fromOffsets

  • 组装fromOffsets,createDirectStream接收的是一个map的结构,所以可以支持多个topic的消费
    var fromOffsets: Map[TopicAndPartition, Long] = Map() //多个partition的offset
    
    
    //支持多个topic : Set[String]
    topics.foreach(topicName => {
    
        //去brokers中获取partition数量,注意:新增partition后需要重启
        val children = zkClient.countChildren(ZkUtils.getTopicPartitionsPath(topicName))
        for (i <- 0 until children) {
    
            //kafka consumer 中是否有该partition的消费记录,如果没有设置为0
            val tp = TopicAndPartition(topicName, i)
            val path: String = s"${new ZKGroupTopicDirs(groupName, topicName).consumerOffsetDir}/$i"
            if (zkClient.exists(path)) {
                fromOffsets += (tp -> zkClient.readData[String](path).toLong)
            } else {
                fromOffsets += (tp -> 0)
            }
        }
    })
    

通过createDirectStream接受数据

  • 使用KafkaUtils里面的createDirectStream方法去消费kafka数据,createDirectStream使用的是kafka简单的Consumer API,所以需要自己去管理offset,我们把offset写入到zk中,这样也方便了一些监控软件读取记录
    //创建Kafka持续读取流,通过zk中记录的offset
    val messages: InputDStream[(String, String)] =
        KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
    

入库

  • 入库HBase

    //数据操作
            messages.foreachRDD(rdd => {
                val offsetsList: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    
                //data 处理
                rdd.foreachPartition(partitionRecords => {
                    //TaskContext 上下文
                    val offsetRange: OffsetRange = offsetsList(TaskContext.get.partitionId)
                    logger.info(s"${offsetRange.topic} ${offsetRange.partition} ${offsetRange.fromOffset} ${offsetRange.untilOffset}")
    
                    //TopicAndPartition 主构造参数第一个是topic,第二个是Kafka partition id
                    val topicAndPartition = TopicAndPartition(offsetRange.topic, offsetRange.partition)
                    val either = kc.setConsumerOffsets(groupName, Map((topicAndPartition, offsetRange.untilOffset))) //是
                    if (either.isLeft) {
                        logger.info(s"Error updating the offset to Kafka cluster: ${either.left.get}")
                    }
    
                    partitionRecords.foreach(data => {
                        HBaseDao.insert(data)
                    })
                })
    
            })
    
  • 插入数据到具体HBase数据库

    /**
      * 
      * 插入数据到 HBase
      *
      * 参数( tableName ,  json ) ):
      * 
      * Json格式:
      *     {
      *         "rowKey": "00000-0",
      *         "family:qualifier": "value",
      *         "family:qualifier": "value",
      *         ......
      *     }
      *
      * @param data
      * @return
      */
    def insert(data: (String, String)): Boolean = {
    
        val t: HTable = getTable(data._1) //HTable
        try {
            val map: mutable.HashMap[String, Object] = JsonUtils.json2Map(data._2)
            val rowKey: Array[Byte] = String.valueOf(map.get("rowKey")).getBytes //rowKey
            val put = new Put(rowKey)
    
            for ((k, v) <- map) {
                val keys: Array[String] = k.split(":")
                if (keys.length == 2){
                    put.addColumn(keys(0).getBytes, keys(1).getBytes, String.valueOf(v).getBytes)
                }
            }
    
            Try(t.put(put)).getOrElse(t.close())
            true
        } catch {
            case e: Exception =>
                e.printStackTrace()
                false
        }
    }
    

运行并查看结果

  • 运行命令:

    /opt/cloudera/parcels/CDH/bin/spark-submit --master yarn-client --class com.xiaoxiaomo.streaming.KafkaDataStream hspark-1.0.jar 1 3 1000

    运行后可以去spark UI中去查看相关运行情况,UI中具体细节见下篇博客

    Spark Streaming--应用与实战(二) Spark Streaming--应用与实战(二)

1.除非注明,博文均为原创,转载请标明地址: http://blog.xiaoxiaomo.com/2017/06/10/SparkStreaming-应用与实战-二/

2.文章作者:小小默

3.发布时间:2017年06月10日 - 16时56分

4.如果本文帮到了您,不妨点一下右下角的 分享到 按钮,您的鼓励是博主写作最大的动力。


以上所述就是小编给大家介绍的《Spark Streaming--应用与实战(二)》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Beginning Google Maps API 3

Beginning Google Maps API 3

Gabriel Svennerberg / Apress / 2010-07-27 / $39.99

This book is about the next generation of the Google Maps API. It will provide the reader with the skills and knowledge necessary to incorporate Google Maps v3 on web pages in both desktop and mobile ......一起来看看 《Beginning Google Maps API 3》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试