Spark-StructuredStreaming的checkpointLocation如何对接Grafana监控并提交Kafka Lag监控?

摘要:一、Spark-StructuredStreaming checkpointLocation 介绍 Structured Streaming 在 Spark 2.0 版本于 2016 年引入, 是基于 Spark SQL 引擎构建的可扩展且
一、Spark-StructuredStreaming checkpointLocation 介绍 Structured Streaming 在 Spark 2.0 版本于 2016 年引入, 是基于 Spark SQL 引擎构建的可扩展且容错的流处理引擎,对比传统的 Spark Streaming,由于复用了 Spark SQL 引擎,代码的写法和批处理 API (基于 Dataframe 和 Dataset API)一样,而且这些 API 非常的简单。 Structured Streaming 还支持使用 event time,通过设置 watermark 来处理延时到达的数据;而 Spark Streaming 只能基于 process time 做计算,显然是不够用的。 比如.withWatermark("timestamp", "10 minutes")表示用 DataFrame 里面的timestamp字段作为 event time,如果 event time 比 process time 落后超过 10 分钟,那么就不会处理这些数据。 Structured Streaming 默认情况下还是使用 micro batch 模式处理数据,不过从 Spark 2.3 开始提供了一种叫做Continuous Processing的模式,可以在至少一次语义下数据端到端只需 1ms 。 不过 Structured Streaming 的 Web UI 并没有和 Spark Streaming 一样的监控指标。 Checkpoint目录的结构: 1、checkpointLocation 在源码调用链 分析源码查看 StructuredStreaming 启动流程发现,DataStreamWriter#start 方法启动一个 StreamingQuery。 同时将 checkpointLocation配置参数传递给StreamingQuery管理。 StreamingQuery 接口实现关系如下: StreamingQueryWrapper 仅包装了一个不可序列化的StreamExecution StreamExecution 管理Spark SQL查询的执行器 MicroBatchExecution 微批处理执行器 ContinuousExecution 连续处理(流式)执行器 因此我们仅需要分析 checkpointLocation 在 StreamExecution中调用即可。 备注:StreamExecution 中 protected def checkpointFile(name: String): String 方法为所有与 checkpointLocation 有关逻辑,返回 $checkpointFile/name 路径 2、MetadataLog(元数据日志接口) spark 提供了org.apache.spark.sql.execution.streaming.MetadataLog接口用于统一处理元数据日志信息。 checkpointLocation 文件内容均使用 MetadataLog进行维护。
阅读全文