您好,欢迎来到12图资源库!分享精神,快乐你我!我们只是素材的搬运工!!
  • 首 页
  • 当前位置:首页 > 开发 > WEB开发 >
    用Spark,Kafka和k8s构建下一代数据管道(5)
    时间:2021-08-09 08:00 来源:网络整理 作者:网络 浏览:收藏 挑错 推荐 打印

    df_final = df4.groupBy("Dept",sf.window("eventTime","10 minutes""5 minute")).count() 

    df_final.show() 

    带水印和堆叠窗口的聚合

    数据迟到会在近实时系统的聚合中产生成绩。我们可以运用堆叠窗口来处置这个错误。但成绩是:系统等候迟到的数据需求多长时间?这可以经过水印处置。经过这种办法,我们在堆叠窗口之上定义了一个特定的时间段。之后,系统丢弃该事情。

    Python 

    from pyspark.sql importSparkSession 

    from pyspark.streaming import StreamingContext 

    import pyspark.sql.functions as sf 

    import datetime 

    import time 

    spark=SparkSession.builder.master('local').appName('StructuredStreamingApp').getOrCreate() 

    df =Spark.readStream.format("kafka").option("kafka.bootstrap.servers""localhost:9092"

                                         .option("subscribe""test_topic").load() 

    df1 = df.selectExpr("CAST(value AS STRING)"

    df2 = df1.selectExpr("split(value, ',')[0] as Dept","split(value, ',')[1] as Age"

    df3 = df2.withColumn("Age", df2.Age.cast('int')) 

    df4 = df3.withColumn("eventTime",sf.current_timestamp()) 

    df_final = df4.withWatermark("eventTime","10 Minutes").groupBy("Dept",sf.window("eventTime","10 minutes""5 minute")).count() 

    df_final.show() 

    下面的代码表示关于延迟事情,10 分钟后,旧窗口结果将不会更新。

    Kafka + k8s - Speed层的另一种处置方案

    托管在 Kubernetes 集群上的 Pod 构成了 Kafka 流的消费者组,是另一种近乎实时数据处置的办法。经过运用这种组合,我们可以轻松取得散布式计算的优势。

    用Spark,Kafka和k8s构建下一代数据管道

    图 6:经过 Kafka + Kubernetes 完成的Speed层示例

    在下面例子中的事情驱动系统中,数据正在从 Kafka 主题加载到基于 Python 的处置单元中。假设 Kafka 集群中的分区数量与 Pod 的复制因子婚配,则 Pod 一同组成一个消费者组,音讯被无缝消费。

    这是构建散布式数据处置系统的经典示例,仅运用Kafka+k8s组合即可确保并行处置。

    运用 Python 创立 Kafka 消费者的两个十分盛行的库是:

    Python_Kafka 库

    Confluent_Kafka 库 

    Python_Kafka 

    Python 

    from kafka import KafkaConsumer 

    consumer = KafkaConsumer(TopicName, 

                             bootstrap_servers= <broker-list>, 

                             group_id=<GroupName>, 

                             enable_auto_commit=True

                             auto_offset_reset='earliest'

    consumer.poll() 

    Confluent_Kafka  

    Python 

    from confluent_kafka import Consumer 

    consumer = Consumer({'bootstrap.servers': <broker-list>, 

    (责任编辑:admin)