这创立了一个字符串类型的 DataStream。DataStream 是 Flink 中做流处置的中心 API,下面定义了十分多常见的操作(如,过滤、转换、聚合、窗口、关联等)。在本示例中,我们感兴味的是每个单词在特定时间窗口中出现的次数,比如说5秒窗口。为此,我们首先要将字符串数据解析成单词和次数(运用Tuple2表示),第一个字段是单词,第二个字段是次数,次数初始值都设置成了1。我们完成了一个 flatmap 来做解析的任务,由于一行数据中能够有多个单词。
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
for (String word : value.split("\\s")) {
out.collect(Tuple2.of(word, 1));
}
}
});
接着我们将数据流按照单词字段(即0号索引字段)做分组,这里可以复杂地运用 keyBy(int index) 办法,失掉一个以单词为 key 的Tuple2数据流。然后我们可以在流上指定想要的窗口,并依据窗口中的数据计算结果。在我们的例子中,我们想要每5秒聚合一次单词数,每个窗口都是从零末尾统计的。
DataStream<Tuple2<String, Integer>> windowCounts = wordCounts
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
第二个调用的 .timeWindow() 指定我们想要5秒的翻腾窗口(Tumble)。第三个调用为每个key每个窗口指定了sum聚合函数,在我们的例子中是按照次数字段(即1号索引字段)相加。失掉的结果数据流,将每5秒输入一次这5秒内每个单词出现的次数。
最后一件事就是将数据流打印到控制台,并末尾执行:
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
最后的 env.execute 调用是启动实践Flink作业所必需的。一切算子操作(例如创立源、聚合、打印)只是构建了外部算子操作的图形。只要在execute()被调用时才会在提交到集群上或本地计算机上执行。
下面是残缺的代码,部分代码经过简化(代码在 GitHub 上也能拜访到):
package myflink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// 创立 execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 经过衔接 socket 获取输入数据,这里衔接到本地9000端口,假设9000端口已被占用,请换一个端口
DataStream<String> text = env.socketTextStream("localhost", 9000, "\n");
// 解析数据,按 word 分组,开窗,聚合
DataStream<Tuple2<String, Integer>> windowCounts = text
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
(责任编辑:admin)