您好,欢迎来到12图资源库!分享精神,快乐你我!我们只是素材的搬运工!!
  • 首 页
  • 当前位置:首页 > 开发 > WEB开发 >
    5分钟从零构建第一个 Flink 运用(2)
    时间:2018-11-08 21:03 来源:网络整理 作者:网络 浏览:收藏 挑错 推荐 打印

    这创立了一个字符串类型的 DataStream。DataStream 是 Flink 中做流处置的中心 API,下面定义了十分多常见的操作(如,过滤、转换、聚合、窗口、关联等)。在本示例中,我们感兴味的是每个单词在特定时间窗口中出现的次数,比如说5秒窗口。为此,我们首先要将字符串数据解析成单词和次数(运用Tuple2表示),第一个字段是单词,第二个字段是次数,次数初始值都设置成了1。我们完成了一个 flatmap 来做解析的任务,由于一行数据中能够有多个单词。

    DataStream<Tuple2<String, Integer>> wordCounts = text 

            .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { 

              @Override 

              public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { 

                for (String word : value.split("\\s")) { 

                  out.collect(Tuple2.of(word, 1)); 

                } 

              } 

            }); 

    接着我们将数据流按照单词字段(即0号索引字段)做分组,这里可以复杂地运用 keyBy(int index) 办法,失掉一个以单词为 key 的Tuple2数据流。然后我们可以在流上指定想要的窗口,并依据窗口中的数据计算结果。在我们的例子中,我们想要每5秒聚合一次单词数,每个窗口都是从零末尾统计的。

    DataStream<Tuple2<String, Integer>> windowCounts = wordCounts 

            .keyBy(0) 

            .timeWindow(Time.seconds(5)) 

            .sum(1); 

    第二个调用的 .timeWindow() 指定我们想要5秒的翻腾窗口(Tumble)。第三个调用为每个key每个窗口指定了sum聚合函数,在我们的例子中是按照次数字段(即1号索引字段)相加。失掉的结果数据流,将每5秒输入一次这5秒内每个单词出现的次数。

    最后一件事就是将数据流打印到控制台,并末尾执行:

    windowCounts.print().setParallelism(1); 

    env.execute("Socket Window WordCount"); 

    最后的 env.execute 调用是启动实践Flink作业所必需的。一切算子操作(例如创立源、聚合、打印)只是构建了外部算子操作的图形。只要在execute()被调用时才会在提交到集群上或本地计算机上执行。

    下面是残缺的代码,部分代码经过简化(代码在 GitHub 上也能拜访到):

    package myflink; 

    import org.apache.flink.api.common.functions.FlatMapFunction; 

    import org.apache.flink.api.java.tuple.Tuple2; 

    import org.apache.flink.streaming.api.datastream.DataStream; 

    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 

    import org.apache.flink.streaming.api.windowing.time.Time

    import org.apache.flink.util.Collector; 

    public class SocketWindowWordCount { 

      public static void main(String[] args) throws Exception { 

        // 创立 execution environment 

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

        // 经过衔接 socket 获取输入数据,这里衔接到本地9000端口,假设9000端口已被占用,请换一个端口 

        DataStream<String> text = env.socketTextStream("localhost", 9000, "\n"); 

        // 解析数据,按 word 分组,开窗,聚合 

        DataStream<Tuple2<String, Integer>> windowCounts = text 

            .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { 

              @Override 

    (责任编辑:admin)