您好,欢迎来到12图资源库!分享精神,快乐你我!我们只是素材的搬运工!!
  • 首 页
  • 当前位置:首页 > 开发 > WEB开发 >
    用Spark,Kafka和k8s构建下一代数据管道(2)
    时间:2021-08-09 08:00 来源:网络整理 作者:网络 浏览:收藏 挑错 推荐 打印

    SQL 效劳:SparkSQL 效劳是协助我们创立数据框和保存关系数据以停止进一步转换的主要元素。这是我们运用SparkSQL 时Batch层转换的入口点。在转换进程中,可以运用python、R、Scala或Java中的不同API,也可以直接执行SQL来转换数据。

    下面是一些Batch的代码示例:

    假定有两个表:一个是 PRODUCT,另一个是 TRANSACTION。PRODUCT 表包含商店特定产品的一切信息,Transaction 表包含针对每个产品的一切买卖。我们可以经过转换和聚合失掉以下信息。

    产品明智的总销售量

    分部明智的总支出

    经过在Spark数据帧上编写纯 SQL 或运用聚合函数可以取得相反的结果。

    Python 

    from pyspark.sql importSparkSession 

    from pyspark.sql.functions import * 

    Spark=SparkSession.builder.master("local").appName("Superstore").getOrCreate() 

    df1 =Spark.read.csv("Product.csv"

    df2 =Spark.read.csv("Transaction.csv"

    df3 = df1.filter(df1.Segment != 'Electric'

    df4 = df2.withColumn("OrderDate",df2.OrderDate[7:10]) 

    result_df1 = df3.join(df4, on= ['ProductCode'], how='inner'

    result_df2 = result_df1.groupBy('ProductName').sum('Quantity'

    result_df2.show() 

    # Display segment wise revenue generated 

    result_df3 = result_df1.groupBy('Segment').sum('Price'

    result_df3.show() 

    Python 

    from pyspark.sql importSparkSession 

    from pyspark.sql.functions import * 

    Spark=SparkSession.builder.master("local").appName("Superstore").getOrCreate() 

    df1 =Spark.read.csv("Product.csv"

    df2 =Spark.read.csv("Transaction.csv"

    df3 = df1.filter(df1.Segment != 'Electric'

    df4 = df2.withColumn("OrderDate",df2.OrderDate[7:10]) 

    result_df1 = df3.join(df4, on= ['ProductCode'], how='inner'

    result_df1.createOrReplaceTempView("SuperStore"

    # Display product wise quantity sold 

    result_df2 =Spark.sql("select ProductName , Sum(Quantity) from Superstore group by ProductName"

    result_df2.show() 

    # Display segment wise revenue earned 

    result_df3 =Spark.sql("select Segment , Sum(Price) from Superstore group by Segment"

    result_df2.show() 

    (责任编辑:admin)