Spark Monitor

Spark 集群监控

Spark UI StandAlone模式监控管理界面的入口:http://master:8080,可以在$SPARK_HOME/conf/spark-env.sh中修改参数SPARK_MASTER_PORT或者SPARK_MASTER_WEBUI_PORT来改变端口号。

除此之外,Spark会给每一个SparkContext启动一个默认的Web界面,默认端口是4040,该界面显示了关于程序运行情况的信息,包括:

  • Stages 和 Task列表
  • RDD 的信息统计和内存情况
  • 环境变量
  • 正在运行Executors的相关信息

访问地址:http://driver:4040。如果同时运行多个SparkContext,则端口会顺延到4041、4042…默认情况下,这些信息只能在程序运行期间被访问。程序运行结束后仍要访问的话,需要将APP运行期间的事件日志保存下来,并且配置 Spark-History-Server

需要注意的是APP运行期间的事件日志,即eventLog和用户输出的日志并不是同一种东西。

对于用户输出的日志信息可以在监控管理页面的Executor中查看,日志文件默认存储在每个节点$SPARK_HOME/work/目录下,各个节点下每个程序的日志文件名字相同。另外需要注意的是,各个节点存储的日志信息是分发到各个节点的任务执行时的信息,即在map等算子中执行的日志输出信息。而在client执行的日志输出信息不会被存储到里面,它默认在控制台输出,可以通过修改$SPARK_HOME/conf/log4j.properties,追加文件输出的appender

Spark 集群历史监控

当我们执行stop-all.sh关闭集群,在重启集群之后,Web页面的任务信息会被清空。为了防止这种情况,我们可以配置 Spark-History-Sever,这样还可以查看已经运行结束的任务的事件信息。

官方配置说明:http://spark.apache.org/docs/latest/monitoring.html

通过SparkUI监控日志,能更直观的查询任务情况,如内存占用、响应时间、完成情况。当Spark运行在集群,如YARN或者Mesos中,spark-history-sever仍然可以满足我们的需求。

首先进行如下配置,将Spark任务运行期间的事件日志保存下来。

vim $SPARK_HOME/conf/spark-default.conf

# 这样Spark就会将程序运行情况信息编码并以文件的形式持久化,打开会发现是json格式的信息。
spark.eventLog.enabled        true
# 用来指定SparkAPP运行情况信息日志的目录。注意是程序运行情况信息,并不是用户输出的日志信息。 
spark.eventLog.dir            hdfs://s121202:8020/sparkLogs
# 是否压缩记录Spark事件,默认值是snappy
spark.eventLog.compress       true

在启用Spark-History-Server之前,确保这个目录已经被创建,否则运行./start-history-server.sh的时候会报错。

# 提前在hdfs中创建文件夹
su - hdfs
hadoop fs -mkdir /sparkLogs

除此之外,SPARK_PID_DIR的默认路径是/tmp/spark-events,如果此路径不存在,在启用的时候也会报错,信息如下。

# 启用spark-history-server出现下面的错误
starting org.apache.spark.deploy.history.HistoryServer, logging to /opt/neu/spark-2.1.1-bin-hadoop2.6/logs/spark-neu-org.apache.spark.deploy.history.HistoryServer-1-s121202.out
failed to launch: nice -n 0 /opt/neu/spark-2.1.1-bin-hadoop2.6/bin/spark-class org.apache.spark.deploy.history.HistoryServer
  	at org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$startPolling(FsHistoryProvider.scala:204)
  	... 9 more
full log in /opt/neu/spark-2.1.1-bin-hadoop2.6/logs/spark-neu-org.apache.spark.deploy.history.HistoryServer-1-s121202.out

# 查看错误日志
tail -500f /opt/neu/spark-2.1.1-bin-hadoop2.6/logs/spark-neu-org.apache.spark.deploy.history.HistoryServer-1-s121202.out
# 完整报错信息如下:
Spark Command: /usr/java/latest/bin/java -cp /opt/neu/spark-2.1.1-bin-hadoop2.6/conf/:/opt/neu/spark-2.1.1-bin-hadoop2.6/jars/*:/etc/hadoop/conf/ -Xmx1g org.apache.spark.deploy.history.HistoryServer
========================================
17/07/06 16:03:00 INFO history.HistoryServer: Started daemon with process name: [email protected]
17/07/06 16:03:00 INFO util.SignalUtils: Registered signal handler for TERM
17/07/06 16:03:00 INFO util.SignalUtils: Registered signal handler for HUP
17/07/06 16:03:00 INFO util.SignalUtils: Registered signal handler for INT
17/07/06 16:03:01 INFO spark.SecurityManager: Changing view acls to: neu
17/07/06 16:03:01 INFO spark.SecurityManager: Changing modify acls to: neu
17/07/06 16:03:01 INFO spark.SecurityManager: Changing view acls groups to: 
17/07/06 16:03:01 INFO spark.SecurityManager: Changing modify acls groups to: 
17/07/06 16:03:01 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(neu); groups with view permissions: Set(); users  with modify permissions: Set(neu); groups with modify permissions: Set()
17/07/06 16:03:01 INFO history.FsHistoryProvider: History server ui acls disabled; users with admin permissions: ; groups with admin permissions
Exception in thread "main" java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.spark.deploy.history.HistoryServer$.main(HistoryServer.scala:278)
	at org.apache.spark.deploy.history.HistoryServer.main(HistoryServer.scala)
Caused by: java.io.FileNotFoundException: Log directory specified does not exist: file:/tmp/spark-events Did you configure the correct one through spark.history.fs.logDirectory?
	at org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$startPolling(FsHistoryProvider.scala:214)
	at org.apache.spark.deploy.history.FsHistoryProvider.initialize(FsHistoryProvider.scala:160)
	at org.apache.spark.deploy.history.FsHistoryProvider.<init>(FsHistoryProvider.scala:156)
	at org.apache.spark.deploy.history.FsHistoryProvider.<init>(FsHistoryProvider.scala:77)
	... 6 more
Caused by: java.io.FileNotFoundException: File file:/tmp/spark-events does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:537)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:750)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:527)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
	at org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$startPolling(FsHistoryProvider.scala:204)
	... 9 more

SparkAPP的事件日志信息所在的目录可以在执行启动脚本的时候通过传参来指定,例如./start-history-server.sh hdfs://s121202:8020/sparkLogs,也可以通过spark.history.fs.logDirectory参数来指定,指定方式如下。

也可以配置在$SPARK_HOME/conf/spark-env.sh中:

export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=6666 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://s121202:8020/sparkLogs"
# -Dspark.history.ui.port=6666 指定WebUI访问端口,端口号必须是1024~65535之间的数,或者指定0,表示随机取一个空闲的port,在启动日志信息中可以看到它。
# -Dspark.history.retainedApplications=3 指定保存Application历史记录个数 
# -Dspark.history.fs.logDirectory=hdfs://s121202:8020/sparkLogs 指定包含要加载的应用程序事件日志的目录URL,也可以是本地文件路径file://

spark.eventLog.dirspark.history.fs.logDirectory的区别:

spark.eventLog.dir用来指定SparkAPP运行时生成事件日志的目录,spark.history.fs.logDirectory是spark-history-server发现日志事件的位置

启动 Spark-History-Server:

    # 进入sbin目录下
    cd $SPARK_HOME/sbin
    # 执行启动脚本
    ./start-history-server.sh

spark-history-server的默认端口是18080,启动完成之后,通过地址:http://localhost:18080访问

Task Failure Monitor

在 Spark 程序中,task会根据spark.task.maxFailures(default 4)失败后进行会先进行重试,而不是让整个 Spark App 死掉,只有重试次数超过阈值的时候才会杀死 App。另外,如果是 Spark on YARN,那么程序还会受 YARN 的重试机制尝试重启 Spark App,通过参数yarn.resourcemanager.am.max-attempts(default 2)控制。

Catch Task Failure Event

  • Executor中,不管task成功与否都会向execBackend报告task的状态
    execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
    
  • CoarseGrainedExecutorBackend会向driver发送StatusUpdate状态变更信息
    override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
      val msg = StatusUpdate(executorId, taskId, state, data)
      driver match {
        case Some(driverRef) => driverRef.send(msg)
        case None => logWarning(s"Drop $msg because has not yet connected to driver")
      }
    }
    
  • CoarseGrainedSchedulerBackend收到信息后调用scheduler.statusUpdate() ```scala // 1 override def receive: PartialFunction[Any, Unit] = { case StatusUpdate(executorId, taskId, state, data) => scheduler.statusUpdate(taskId, state, data.value) …..

// 2 taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) // 3 scheduler.handleFailedTask(taskSetManager, tid, taskState, reason) // 4 taskSetManager.handleFailedTask(tid, taskState, reason) // 5 sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info) // 6 eventProcessLoop.post(CompletionEvent(task, reason, result, accumUpdates, taskInfo)) ```

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,一毛也是爱

打开支付宝扫一扫,即可进行扫码打赏哦