SQL 效劳:SparkSQL 效劳是协助我们创立数据框和保存关系数据以停止进一步转换的主要元素。这是我们运用SparkSQL 时Batch层转换的入口点。在转换进程中,可以运用python、R、Scala或Java中的不同API,也可以直接执行SQL来转换数据。
下面是一些Batch的代码示例:
假定有两个表:一个是 PRODUCT,另一个是 TRANSACTION。PRODUCT 表包含商店特定产品的一切信息,Transaction 表包含针对每个产品的一切买卖。我们可以经过转换和聚合失掉以下信息。
产品明智的总销售量
分部明智的总支出
经过在Spark数据帧上编写纯 SQL 或运用聚合函数可以取得相反的结果。
Python
from pyspark.sql importSparkSession
from pyspark.sql.functions import *
Spark=SparkSession.builder.master("local").appName("Superstore").getOrCreate()
df1 =Spark.read.csv("Product.csv")
df2 =Spark.read.csv("Transaction.csv")
df3 = df1.filter(df1.Segment != 'Electric')
df4 = df2.withColumn("OrderDate",df2.OrderDate[7:10])
result_df1 = df3.join(df4, on= ['ProductCode'], how='inner')
result_df2 = result_df1.groupBy('ProductName').sum('Quantity')
result_df2.show()
# Display segment wise revenue generated
result_df3 = result_df1.groupBy('Segment').sum('Price')
result_df3.show()
Python
from pyspark.sql importSparkSession
from pyspark.sql.functions import *
Spark=SparkSession.builder.master("local").appName("Superstore").getOrCreate()
df1 =Spark.read.csv("Product.csv")
df2 =Spark.read.csv("Transaction.csv")
df3 = df1.filter(df1.Segment != 'Electric')
df4 = df2.withColumn("OrderDate",df2.OrderDate[7:10])
result_df1 = df3.join(df4, on= ['ProductCode'], how='inner')
result_df1.createOrReplaceTempView("SuperStore")
# Display product wise quantity sold
result_df2 =Spark.sql("select ProductName , Sum(Quantity) from Superstore group by ProductName")
result_df2.show()
# Display segment wise revenue earned
result_df3 =Spark.sql("select Segment , Sum(Price) from Superstore group by Segment")
result_df2.show()
(责任编辑:admin)