在这两种状况下,第一个数据是从两个不同的来源加载的,并且产品数据针对一切非电气产品停止过滤。买卖数据依据订单日期的某种格式停止更改。然后,将两个数据帧衔接起来,并生成该超市中细分支出和产品销售数量的结果。
当然,这是加载、验证、转换和聚合的复杂示例。运用SparkSQL 可以停止更复杂的操作。要了解有关SparkSQL 效劳的更多信息,请参阅此处的文档。
Sparkfor Speed 层SparkStreaming 是一个库,用于中心Spark框架之上。它确保实时数据流处置的可扩展性、高吞吐量和容错性。
图 5:SparkStreaming 架构(来源:https : //spark.apache.org)
如上图所示,Spark将输入数据流转换为批量输入数据。这种团圆Batch有两种完成方式:a) Dstreams 或团圆化流和 b) 结构化流。前者十分受欢迎,直到后者作为更初级的版本出现。但是,Dstream 还没有完全过时,为了残缺起见,将其保留在本文中。
· Discretized Streams:这提供了对火花流库的笼统。它是 RDD 的集合,代表一个延续的数据流。它将数据团圆成小批量并运转小作业来处置这些小批量。义务依据数据的位置分配给任务节点。因此,经过 Dstream 的这个概念,Spark可以并行读取数据,执行小批量处置流并确保流处置的有效节点分配。
· 结构化流:这是运用Spark引擎的最先进和现代的流处置办法。它与SparkDataframe API(在下面的Batch部分中讨论)很好地集成在一同,用于对流数据的各种操作。结构化流可以增量和延续地处置数据。基于特定窗口和水印的近实时聚合也是能够的。
Spark结构化流可以处置不同的流处置用例,如下面的示例所示:
复杂的结构化流媒体复杂的结构化流只会转换和加载来自流的数据,并且不包括特定时间范围内的任何聚合。例如,系统从 Apache Kafka 获取数据,并经过Spark流和SparkSQL 近乎实时地对其停止转换(请参阅下面的代码片段)。
Python
from pyspark.sql importSparkSession
from pyspark.streaming import StreamingContext
import pyspark.sql.functions as sf
spark=SparkSession.builder.master('local').appName('StructuredStreamingApp').getOrCreate()
df =Spark.readStream.format("kafka").option("kafka.bootstrap.servers,"localhost:9092")
.option("subscribe", "test_topic").load()
df1 = df.selectExpr("CAST(value AS STRING)")
df2 = df1.selectExpr("split(value, ',')[0] as Dept","split(value, ',')[1] as Age")
df2.show()
SparkSession 对象的ReadStream函数用于衔接特定的 Kafka 主题。正如下面选项中的代码片段一样,我们需求提供 Kafka 集群代理的 IP 和 Kafka 主题称号。此代码的输入是一个表,有两列:Dept 和 Age。
结构化流媒体聚合可以经过 Structured Streaming 对流数据停止聚合,它可以在新事情抵达的基础上计算滚动聚合结果。这是对整个数据流的运转聚合。请参考下面的代码片段,它在整个数据流上推导出部门明智的平均年龄。
Python
from pyspark.sql importSparkSession
from pyspark.streaming import StreamingContext
import pyspark.sql.functions as sf
spark=SparkSession.builder.master('local').appName('StructuredStreamingApp').getOrCreate()
df =Spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test_topic").load()
df1 = df.selectExpr("CAST(value AS STRING)")
(责任编辑:admin)