京东金融数据分析案例(二)

栏目: 数据库 · 发布时间: 5年前

内容简介:版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/mingyunxiaohai/article/details/82703827

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/mingyunxiaohai/article/details/82703827

接着上一篇 京东金融数据分析案例(一)

任务 5

利用 spark streaming 实时分析每个页面点击次数和不同年龄段消费总金额

步骤:编写 Kafka produer 程序读取hdfs上的文件每隔一段时间产生数据,然后使用spark streaming读取kafka中的数据进行分析,分析结果写入到 redis 中。

(1)将 t_click 数据依次写入 kafka 中的 t_click 主题中,每条数据写入间隔为10 毫秒,其中 uid 为 key,click_time+” ,” +pid 为 value

public class ClickProducer {
    private static KafkaProducer<String,String> producer;
    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setAppName("producer.ClickProducer");

        JavaSparkContext sc = new JavaSparkContext(conf);
        //从hdfs上读取数据
        JavaRDD<String> input = sc.textFile("hdfs://master:9000/warehouse/t_click");

        //kafka的broker的地址(localhost:9092)这里设置常量
        String brokers = KafkaRedisConfig.KAFKA_ADDR;

        Map<String, Object> props = new HashMap<String, Object>();
        props.put("bootstrap.servers", brokers);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        producer = new KafkaProducer<String,String>(props);

        //循环读取到的数据,每隔十秒发送一条
        input.foreach(new VoidFunction<String>() {
            public void call(String s) throws Exception {
                String[] split = s.split(",");
                JSONObject event = new JSONObject();
                event.put("uid",split[1]+","+split[2]);
                ProducerRecord<String, String> msg =
                        new ProducerRecord<String, String>(KafkaRedisConfig.KAFKA_CLICK_TOPIC, event.toString());
                producer.send(msg);

                System.out.println("Message sent: " + event);

                Thread.sleep(1000);
            }
        });


    }
}

运行结果

```
{"uid":"2016-10-04 12:22:30,1"}
{"uid":"2016-08-09 11:24:13,9"}
{"uid":"2016-09-27 14:40:37,10"}
{"uid":"2016-10-04 12:18:42,6"}
......

(2)将 t_order 数据依次写入 kafka 中的 t_order 主题中,每条数据写入间隔为10 毫秒,其中 uid 为 key,uid+” ,” +price + “,” + discount 为value

public class OrderProducer {
    private static KafkaProducer<String,String> producer;
    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setAppName("producer.OrderProducer");

        JavaSparkContext sc = new JavaSparkContext(conf);
         //从hdfs上读取数据
        JavaRDD<String> input = sc.textFile("hdfs://master:9000/warehouse/t_order");
        //t_order 主题 使用常量定义
        final String topic = KafkaRedisConfig.KAFKA_ORDER_TOPIC;
         //kafka的broker的地址(localhost:9092)这里设置常量
        String brokers = KafkaRedisConfig.KAFKA_ADDR;

        Map<String, Object> props = new HashMap<String, Object>();
        props.put("bootstrap.servers", brokers);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        producer = new KafkaProducer<String,String>(props);
        //循环读取到的数据,每隔十秒发送一条
        input.foreach(new VoidFunction<String>() {
            public void call(String s) throws Exception {
                String[] split = s.split(",");
                JSONObject event = new JSONObject();
                if(!split[0].contains("uid")){
                    event.put("uid",split[0]+","+split[2]+","+split[5]);
                    ProducerRecord<String, String> msg = new ProducerRecord<String, String>(topic, event.toString());
                    producer.send(msg);
                    System.out.println("Message sent: " + event);
                }

                Thread.sleep(2000);
            }
        });
    }
}

运行结果

{"uid":"55792,1.4898961024,0"}
{"uid":"45370,3.9950093311,0"}
{"uid":"85278,0.6658123361,0"}
......

(3)编写 spark streaming 程序,依次读取 kafka 中 t_click 主题数据,并统计: 每个页面累计点击次数,并存入 redis,其中 key 为” click+pid” ,value 为累计的次数

public class ClickStreamingAnalysis {
    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf().setAppName("ClickStreamingAnalysis");

        JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5));
        ssc.sparkContext().setLogLevel("WARN");
        // Kafka configurations
        String[] topics = KafkaRedisConfig.KAFKA_CLICK_TOPIC.split("\\,");
        System.out.println("Topics: " + Arrays.toString(topics));
        String brokers = KafkaRedisConfig.KAFKA_ADDR;

        Map<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put("metadata.broker.list", brokers);
        kafkaParams.put("serializer.class", "kafka.serializer.StringEncoder");

        // Create a direct stream 这里使用spark-streaming-kafka-0-8_2.11中的kafkautil
        JavaPairInputDStream<String, String> kafkaStream = KafkaUtils.createDirectStream(ssc,
                String.class, String.class,
                StringDecoder.class, StringDecoder.class,
                kafkaParams,
                new HashSet<String>(Arrays.asList(topics)));
        //接收到的数据格式为{"uid":"2016-10-04 12:22:30,1"} 创建json对象
        JavaDStream<JSONObject> events = kafkaStream.map(new Function<Tuple2<String, String>, JSONObject>() {
            public JSONObject call(Tuple2<String, String> line) throws Exception {
                System.out.println("line:" + line._2());
                JSONObject data = JSON.parseObject(line._2());
                return data;
            }
        });
        //取出pid 并map成(pid,1)的格式,然后聚合即可算出此批次该pid的点击次数
        JavaPairDStream<String, Long> clickDs = events.mapToPair(new PairFunction<JSONObject, String, Long>() {
            public Tuple2<String, Long> call(JSONObject json) throws Exception {
//                System.out.println("clickUID:" + json.getString("uid"));
                return new Tuple2<String, Long>(json.getString("uid").split(",")[1], 1L);
            }
        }).reduceByKey(new Function2<Long, Long, Long>() {
            public Long call(Long v1, Long v2) throws Exception {
                return v1 + v2;
            }
        });
        //定义一个redis的hashkey
        final String clickHashKey = "pid::click";
        clickDs.foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() {
            public void call(JavaPairRDD<String, Long> rdd) throws Exception {
                rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Long>>>() {
                    public void call(Iterator<Tuple2<String, Long>> tuple2Iterator) throws Exception {                       //在foreachPartition中创建jdeis的连接可以减少连接
                        Jedis jedis = JavaRedisClient.get().getResource();
                        try{
                            while (tuple2Iterator.hasNext()){
                                Tuple2<String, Long> next = tuple2Iterator.next();
                                String pid = "click"+next._1();
                                Long clickCount = next._2();
                                jedis.hincrBy(clickHashKey, pid, clickCount);
                                System.out.println(pid+":"+clickCount);
                            }
                        }catch (Exception e){
                            System.out.println("error:"+e);
                        }
                        //用完一定要关了,不然连接池泄露程序就卡主了
                        jedis.close();
                    }
                });
            }
        });
        ssc.start();
        ssc.awaitTermination();
    }
}

运行结果redis

127.0.0.1:6379> HGETALL age::money
 1) "30"
 2) "88.71581602079999521"
 3) "40"
 4) "33.95183371870000115"
 5) "35"
 ......

(4)编写 spark streaming 程序,依次读取 kafka 中 t_order 主题数据,并统计:不同年龄段消费总金额,并存入 redis,其中 key 为” age” ,value 为累计的消费金额

public class OrderStreamingAnalysis {
    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf().setAppName("OrderStreamingAnalysis");
//        final SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

        JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(2));
        //kafka中接收到的数据格式{"uid":"55792,1.4898961024,0"} 里面没有年龄,所以需要跟user表做join,
        //因为一个程序中只能有一个sparkcontext所以通过JavaStreamingContext 获得
        final SQLContext sqlcontext = new SQLContext(ssc.sparkContext());

        ssc.sparkContext().setLogLevel("WARN");

        final Dataset<Row> userDs = sqlcontext.read().csv("hdfs://master:9000/warehouse/t_user");
        //设置schema信息
        StructType userSchema = new StructType()
                .add("uid", "string", false)
                .add("age", "string", false)
                .add("six", "string", true)
                .add("active_date", "string", false)
                .add("limit", "string", false);
        final Dataset<Row> userDf = sqlcontext.createDataFrame(userDs.toJavaRDD(), userSchema);


        // Kafka configurations
        String[] topics = KafkaRedisConfig.KAFKA_ORDER_TOPIC.split("\\,");
        System.out.println("Topics: " + Arrays.toString(topics));
        String brokers = KafkaRedisConfig.KAFKA_ADDR;

        Set<String> topicsSet = new HashSet<String>(Arrays.asList(topics));
        Map<String, Object> kafkaParams = new HashMap<String, Object>();
        kafkaParams.put("bootstrap.servers", brokers);
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);
        //初始化dstream 这里使用spark-streaming-kafka-0-10_2.11中的kafkautil
        JavaInputDStream<ConsumerRecord<String, String>> kafkaStream =
                KafkaUtils.createDirectStream(
                        ssc,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String>Subscribe(topicsSet, kafkaParams)
                );
        //读取到的string转化为json对象
        JavaDStream<JSONObject> events = kafkaStream.map(new Function<ConsumerRecord<String, String>, JSONObject>() {
            public JSONObject call(ConsumerRecord<String, String> line) throws Exception {
                System.out.println("line:" + line.value());
                return JSON.parseObject(line.value());
            }
        });
        //取出uid和金额
        JavaPairDStream<String, Double> orderDs = events.mapToPair(new PairFunction<JSONObject, String, Double>() {
            public Tuple2<String, Double> call(JSONObject json) throws Exception {
                String[] strs = json.getString("uid").split(",");
                return new Tuple2<String, Double>(strs[0], Double.parseDouble(strs[1]) - Double.parseDouble(strs[2]));
            }
        });
        orderDs.foreachRDD(new VoidFunction<JavaPairRDD<String, Double>>() {
            public void call(JavaPairRDD<String, Double> rdd) throws Exception {
                JavaRDD<Row> mapRow = rdd.map(new Function<Tuple2<String, Double>, Row>() {
                    public Row call(Tuple2<String, Double> v1) throws Exception {
                        String uid = v1._1();
                        Double money = v1._2();
//                        System.out.println("orderUID:" + uid+":"+money);
                        return RowFactory.create(uid, money);
                    }
                });
                StructType orderSchema = new StructType()
                        .add("uid", "string", false)
                        .add("money", "Double", false);
                Dataset<Row> orderDf = sqlcontext.createDataFrame(mapRow, orderSchema);
              //定义一个redis的hashkey
                final String moneyHashKey = "age::money";
              //查询
                Dataset<Row> count = orderDf.join(userDf, orderDf.col("uid").equalTo(userDf.col("uid")))
                        .select("age", "money")
                        .groupBy("age")
                        .sum("money");

                count.printSchema();
                count.repartition(3).foreachPartition(new ForeachPartitionFunction<Row>() {
                    public void call(Iterator<Row> t) throws Exception {
                        Jedis jedis = JavaRedisClient.get().getResource();
                        try {
                            if(t.hasNext()){
                                Row row = t.next();
                                String age = row.getString(0);
                                Double money = row.getDouble(1);
                                System.out.println(age+"::::"+money);
                                jedis.hincrByFloat(moneyHashKey, age, money);
                            }
                        }catch (Exception e){
                            System.out.println("error"+e);
                        }
                        jedis.close();
                    }
                });          
            }
        });
        ssc.start();
        ssc.awaitTermination();
    }

运行结果redis

127.0.0.1:6379> HGETALL age::money
 1) "30"
 2) "107.51128448799999422"
 3) "40"
 4) "39.40304406300000115"
 5) "35"
 6) "83.02971674589999735"
 ......

OK完成


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

软件调试

软件调试

张银奎 / 电子工业出版社 / 2008-6 / 128.00元

围绕如何实现高效调试这一主题,本书深入系统地介绍了以调试器为核心的各种软件调试技术。本书共30章,分为6篇。第1篇介绍了软件调试的概况和简要历史。第2篇以英特尔架构(IA)的CPU为例,介绍了计算机系统的硬件核心所提供的调试支持,包括异常、断点指令、单步执行标志、分支监视、JTAG和MCE等。第3篇以Windows操作系统为例,介绍了计算机系统的软件核心中的调试设施,包括内核调试引擎、用户态调试子......一起来看看 《软件调试》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具