您好,欢迎来到12图资源库!分享精神,快乐你我!我们只是素材的搬运工!!
  • 首 页
  • 当前位置:首页 > 开发 > WEB开发 >
    用Spark,Kafka和k8s构建下一代数据管道(4)
    时间:2021-08-09 08:00 来源:网络整理 作者:网络 浏览:收藏 挑错 推荐 打印

    df2 = df1.selectExpr("split(value, ',')[0] as Dept","split(value, ',')[1] as Age"

    df3 = df2.groupBy("Dept").avg("Age"

    df3.show() 

    窗口聚合

    有时我们需求在某个时间窗口内停止聚合,而不是运转聚合。SparkStructured Streaming 也提供了这样的功用。假定我们要计算过去 5 分钟内的事情数。这个带聚合的窗口函数将协助我们。

    Python 

    from pyspark.sql importSparkSession 

    from pyspark.streaming import StreamingContext 

    import pyspark.sql.functions as sf 

    import datetime 

    import time 

    spark=SparkSession.builder.master('local').appName('StructuredStreamingApp').getOrCreate() 

    df =Spark.readStream.format("kafka").option("kafka.bootstrap.servers""localhost:9092"

                                         .option("subscribe""test_topic").load() 

    df1 = df.selectExpr("CAST(value AS STRING)"

    df2 = df1.selectExpr("split(value, ',')[0] as Dept","split(value, ',')[1] as Age"

    df3 = df2.withColumn("Age", df2.Age.cast('int')) 

    df4 = df3.withColumn("eventTime",sf.current_timestamp()) 

    df_final = df4.groupBy(sf.window("eventTime""5 minute")).count() 

    df_final.show() 

    堆叠窗口上的聚合

    在下面的例子中,每个窗口都是一个完成聚合的组。还提供了经过提及窗口长度和滑动距离来定义堆叠窗口的规则。它在窗口聚合中的前期数据处置中十分有用。下面的代码基于 5 分钟窗口计算事情数,滑动距离为 10 分钟。

    Python 

    from pyspark.sql importSparkSession 

    from pyspark.streaming import StreamingContext 

    import pyspark.sql.functions as sf 

    import datetime 

    import time 

    spark=SparkSession.builder.master('local').appName('StructuredStreamingApp').getOrCreate() 

    df =Spark.readStream.format("kafka").option("kafka.bootstrap.servers""localhost:9092"

                                         .option("subscribe""test_topic").load() 

    df1 = df.selectExpr("CAST(value AS STRING)"

    df2 = df1.selectExpr("split(value, ',')[0] as Dept","split(value, ',')[1] as Age"

    df3 = df2.withColumn("Age", df2.Age.cast('int')) 

    df4 = df3.withColumn("eventTime",sf.current_timestamp()) 

    (责任编辑:admin)