df2 = df1.selectExpr("split(value, ',')[0] as Dept","split(value, ',')[1] as Age")
df3 = df2.groupBy("Dept").avg("Age")
df3.show()
窗口聚合有时我们需求在某个时间窗口内停止聚合,而不是运转聚合。SparkStructured Streaming 也提供了这样的功用。假定我们要计算过去 5 分钟内的事情数。这个带聚合的窗口函数将协助我们。
Python
from pyspark.sql importSparkSession
from pyspark.streaming import StreamingContext
import pyspark.sql.functions as sf
import datetime
import time
spark=SparkSession.builder.master('local').appName('StructuredStreamingApp').getOrCreate()
df =Spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test_topic").load()
df1 = df.selectExpr("CAST(value AS STRING)")
df2 = df1.selectExpr("split(value, ',')[0] as Dept","split(value, ',')[1] as Age")
df3 = df2.withColumn("Age", df2.Age.cast('int'))
df4 = df3.withColumn("eventTime",sf.current_timestamp())
df_final = df4.groupBy(sf.window("eventTime", "5 minute")).count()
df_final.show()
堆叠窗口上的聚合在下面的例子中,每个窗口都是一个完成聚合的组。还提供了经过提及窗口长度和滑动距离来定义堆叠窗口的规则。它在窗口聚合中的前期数据处置中十分有用。下面的代码基于 5 分钟窗口计算事情数,滑动距离为 10 分钟。
Python
from pyspark.sql importSparkSession
from pyspark.streaming import StreamingContext
import pyspark.sql.functions as sf
import datetime
import time
spark=SparkSession.builder.master('local').appName('StructuredStreamingApp').getOrCreate()
df =Spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test_topic").load()
df1 = df.selectExpr("CAST(value AS STRING)")
df2 = df1.selectExpr("split(value, ',')[0] as Dept","split(value, ',')[1] as Age")
df3 = df2.withColumn("Age", df2.Age.cast('int'))
df4 = df3.withColumn("eventTime",sf.current_timestamp())
(责任编辑:admin)