flink-table-sql-demo2

栏目: 编程工具 · 发布时间: 7年前

阅读更多

一.背景

这个和demo1类似,只是提供另外一种实现方式,类似kafka 利用streamTableSource 来做。

二.代码

@Data
@ToString
public class UserInfo implements Serializable {
    private Timestamp pTime;
    private String userId;
    private String itemId;

    public UserInfo() {
    }

    public UserInfo(String userId, String itemId) {
        this.userId = userId;
        this.itemId = itemId;
        this.pTime = new Timestamp(System.currentTimeMillis());
    }
}
public class UserTableSource implements StreamTableSource<UserInfo>, DefinedRowtimeAttributes {
    /**
     * 返回类型
     * @return
     */
    @Override
    public TypeInformation<UserInfo> getReturnType() {
        return TypeInformation.of(UserInfo.class);
    }

    @Override
    public TableSchema getTableSchema() {
          // 可以 这样定义
//        TableSchema schema = new TableSchema(
//                new String[]{"pTime","userId","itemId"},
//                new TypeInformation[]{Types.SQL_TIMESTAMP,Types.STRING,Types.STRING});
        return TableSchema.fromTypeInfo(getReturnType());
    }



    @Override
    public String explainSource() {
        return "userSource";
    }


    @Override
    public DataStream<UserInfo> getDataStream(StreamExecutionEnvironment execEnv) {
        UserDataSource source = new UserDataSource();
        DataStream<UserInfo> userInfoDataStream = execEnv.addSource(source);
        return userInfoDataStream;
    }

    @Override
    public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
        RowtimeAttributeDescriptor descriptor =
                new RowtimeAttributeDescriptor("pTime",
                new ExistingField("pTime"),
                new AscendingTimestamps());
        return Collections.singletonList(descriptor);
    }


}
public class UserStreamTableApp {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // create a TableEnvironment
        StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

        tEnv.registerTableSource("test", new UserTableSource());


        Table result = tEnv.sqlQuery("SELECT userId,TUMBLE_END(pTime, INTERVAL '5' SECOND) as pTime,count(1) as cnt FROM  test" +
                " GROUP BY TUMBLE(pTime, INTERVAL '5' SECOND),userId ");
        // deal with (Tuple2<Boolean, Row> value) -> out.collect(row)
        SingleOutputStreamOperator allClick = tEnv.toRetractStream(result, Row.class)
                .flatMap((Tuple2<Boolean, Row> value, Collector<Row> out) -> {
                    out.collect(value.f1);
                }).returns(Row.class);
        // add sink or print
        allClick.print();
        env.execute("test");

    }

}

0顶

0踩

分享到:

flink-table-sql-demo1

评论


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

XSS跨站脚本攻击剖析与防御

XSS跨站脚本攻击剖析与防御

邱永华 / 人民邮电出版社 / 2013-9-1 / 49.00元

《XSS跨站脚本攻击剖析与防御》是一本专门剖析XSS安全的专业书,总共8章,主要包括的内容如下。第1章 XSS初探,主要阐述了XSS的基础知识,包括XSS的攻击原理和危害。第2章 XSS利用方式,就当前比较流行的XSS利用方式做了深入的剖析,这些攻击往往基于客户端,从挂马、窃取Cookies、会话劫持到钓鱼欺骗,各种攻击都不容忽视。第3章 XSS测试和利用工具,介绍了一些常见的XSS测试工具。第4......一起来看看 《XSS跨站脚本攻击剖析与防御》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具