df_final = df4.groupBy("Dept",sf.window("eventTime","10 minutes", "5 minute")).count()
df_final.show()
带水印和堆叠窗口的聚合数据迟到会在近实时系统的聚合中产生成绩。我们可以运用堆叠窗口来处置这个错误。但成绩是:系统等候迟到的数据需求多长时间?这可以经过水印处置。经过这种办法,我们在堆叠窗口之上定义了一个特定的时间段。之后,系统丢弃该事情。
Python
from pyspark.sql importSparkSession
from pyspark.streaming import StreamingContext
import pyspark.sql.functions as sf
import datetime
import time
spark=SparkSession.builder.master('local').appName('StructuredStreamingApp').getOrCreate()
df =Spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test_topic").load()
df1 = df.selectExpr("CAST(value AS STRING)")
df2 = df1.selectExpr("split(value, ',')[0] as Dept","split(value, ',')[1] as Age")
df3 = df2.withColumn("Age", df2.Age.cast('int'))
df4 = df3.withColumn("eventTime",sf.current_timestamp())
df_final = df4.withWatermark("eventTime","10 Minutes").groupBy("Dept",sf.window("eventTime","10 minutes", "5 minute")).count()
df_final.show()
下面的代码表示关于延迟事情,10 分钟后,旧窗口结果将不会更新。
Kafka + k8s - Speed层的另一种处置方案托管在 Kubernetes 集群上的 Pod 构成了 Kafka 流的消费者组,是另一种近乎实时数据处置的办法。经过运用这种组合,我们可以轻松取得散布式计算的优势。
图 6:经过 Kafka + Kubernetes 完成的Speed层示例
在下面例子中的事情驱动系统中,数据正在从 Kafka 主题加载到基于 Python 的处置单元中。假设 Kafka 集群中的分区数量与 Pod 的复制因子婚配,则 Pod 一同组成一个消费者组,音讯被无缝消费。
这是构建散布式数据处置系统的经典示例,仅运用Kafka+k8s组合即可确保并行处置。
运用 Python 创立 Kafka 消费者的两个十分盛行的库是:
Python_Kafka 库
Confluent_Kafka 库
Python_Kafka
Python
from kafka import KafkaConsumer
consumer = KafkaConsumer(TopicName,
bootstrap_servers= <broker-list>,
group_id=<GroupName>,
enable_auto_commit=True,
auto_offset_reset='earliest')
consumer.poll()
Confluent_Kafka
Python
from confluent_kafka import Consumer
consumer = Consumer({'bootstrap.servers': <broker-list>,
(责任编辑:admin)