您好,欢迎来到12图资源库!分享精神,快乐你我!我们只是素材的搬运工!!
  • 首 页
  • 当前位置:首页 > 开发 > WEB开发 >
    用Spark,Kafka和k8s构建下一代数据管道(3)
    时间:2021-08-09 08:00 来源:网络整理 作者:网络 浏览:收藏 挑错 推荐 打印

    在这两种状况下,第一个数据是从两个不同的来源加载的,并且产品数据针对一切非电气产品停止过滤。买卖数据依据订单日期的某种格式停止更改。然后,将两个数据帧衔接起来,并生成该超市中细分支出和产品销售数量的结果。

    当然,这是加载、验证、转换和聚合的复杂示例。运用SparkSQL 可以停止更复杂的操作。要了解有关SparkSQL 效劳的更多信息,请参阅此处的文档。

    Sparkfor Speed 层

    SparkStreaming 是一个库,用于中心Spark框架之上。它确保实时数据流处置的可扩展性、高吞吐量和容错性。

    用Spark,Kafka和k8s构建下一代数据管道

    图 5:SparkStreaming 架构(来源:https : //spark.apache.org)

    如上图所示,Spark将输入数据流转换为批量输入数据。这种团圆Batch有两种完成方式:a) Dstreams 或团圆化流和 b) 结构化流。前者十分受欢迎,直到后者作为更初级的版本出现。但是,Dstream 还没有完全过时,为了残缺起见,将其保留在本文中。

    · Discretized Streams:这提供了对火花流库的笼统。它是 RDD 的集合,代表一个延续的数据流。它将数据团圆成小批量并运转小作业来处置这些小批量。义务依据数据的位置分配给任务节点。因此,经过 Dstream 的这个概念,Spark可以并行读取数据,执行小批量处置流并确保流处置的有效节点分配。

    · 结构化流:这是运用Spark引擎的最先进和现代的流处置办法。它与SparkDataframe API(在下面的Batch部分中讨论)很好地集成在一同,用于对流数据的各种操作。结构化流可以增量和延续地处置数据。基于特定窗口和水印的近实时聚合也是能够的。

    Spark结构化流可以处置不同的流处置用例,如下面的示例所示:

    复杂的结构化流媒体

    复杂的结构化流只会转换和加载来自流的数据,并且不包括特定时间范围内的任何聚合。例如,系统从 Apache Kafka 获取数据,并经过Spark流和SparkSQL 近乎实时地对其停止转换(请参阅下面的代码片段)。

    Python 

    from pyspark.sql importSparkSession 

    from pyspark.streaming import StreamingContext 

    import pyspark.sql.functions as sf 

    spark=SparkSession.builder.master('local').appName('StructuredStreamingApp').getOrCreate() 

    df =Spark.readStream.format("kafka").option("kafka.bootstrap.servers,"localhost:9092") 

                                                 .option("subscribe""test_topic").load() 

    df1 = df.selectExpr("CAST(value AS STRING)"

    df2 = df1.selectExpr("split(value, ',')[0] as Dept","split(value, ',')[1] as Age"

    df2.show() 

    SparkSession 对象的ReadStream函数用于衔接特定的 Kafka 主题。正如下面选项中的代码片段一样,我们需求提供 Kafka 集群代理的 IP 和 Kafka 主题称号。此代码的输入是一个表,有两列:Dept 和 Age。

    结构化流媒体聚合

    可以经过 Structured Streaming 对流数据停止聚合,它可以在新事情抵达的基础上计算滚动聚合结果。这是对整个数据流的运转聚合。请参考下面的代码片段,它在整个数据流上推导出部门明智的平均年龄。

    Python 

    from pyspark.sql importSparkSession 

    from pyspark.streaming import StreamingContext 

    import pyspark.sql.functions as sf 

    spark=SparkSession.builder.master('local').appName('StructuredStreamingApp').getOrCreate() 

    df =Spark.readStream.format("kafka").option("kafka.bootstrap.servers""localhost:9092"

                                         .option("subscribe""test_topic").load() 

    df1 = df.selectExpr("CAST(value AS STRING)"

    (责任编辑:admin)