Spark 集群监控
Spark UI StandAlone模式监控管理界面的入口:http://master:8080,可以在$SPARK_HOME/conf/spark-env.sh
中修改参数SPARK_MASTER_PORT
或者SPARK_MASTER_WEBUI_PORT
来改变端口号。
除此之外,Spark会给每一个SparkContext启动一个默认的Web界面,默认端口是4040,该界面显示了关于程序运行情况的信息,包括:
- Stages 和 Task列表
- RDD 的信息统计和内存情况
- 环境变量
- 正在运行Executors的相关信息
访问地址:http://driver:4040。如果同时运行多个SparkContext,则端口会顺延到4041、4042…默认情况下,这些信息只能在程序运行期间被访问。程序运行结束后仍要访问的话,需要将APP运行期间的事件日志保存下来,并且配置 Spark-History-Server
需要注意的是APP运行期间的事件日志,即eventLog
和用户输出的日志并不是同一种东西。
对于用户输出的日志信息可以在监控管理页面的Executor中查看,日志文件默认存储在每个节点$SPARK_HOME/work/
目录下,各个节点下每个程序的日志文件名字相同。另外需要注意的是,各个节点存储的日志信息是分发到各个节点的任务执行时的信息,即在map
等算子中执行的日志输出信息。而在client
执行的日志输出信息不会被存储到里面,它默认在控制台输出,可以通过修改$SPARK_HOME/conf/log4j.properties
,追加文件输出的appender
。
Spark 集群历史监控
当我们执行stop-all.sh
关闭集群,在重启集群之后,Web页面的任务信息会被清空。为了防止这种情况,我们可以配置 Spark-History-Sever,这样还可以查看已经运行结束的任务的事件信息。
官方配置说明:http://spark.apache.org/docs/latest/monitoring.html
通过SparkUI监控日志,能更直观的查询任务情况,如内存占用、响应时间、完成情况。当Spark运行在集群,如YARN或者Mesos中,spark-history-sever仍然可以满足我们的需求。
首先进行如下配置,将Spark任务运行期间的事件日志保存下来。
vim $SPARK_HOME/conf/spark-default.conf
# 这样Spark就会将程序运行情况信息编码并以文件的形式持久化,打开会发现是json格式的信息。
spark.eventLog.enabled true
# 用来指定SparkAPP运行情况信息日志的目录。注意是程序运行情况信息,并不是用户输出的日志信息。
spark.eventLog.dir hdfs://s121202:8020/sparkLogs
# 是否压缩记录Spark事件,默认值是snappy
spark.eventLog.compress true
在启用Spark-History-Server之前,确保这个目录已经被创建,否则运行./start-history-server.sh
的时候会报错。
# 提前在hdfs中创建文件夹
su - hdfs
hadoop fs -mkdir /sparkLogs
除此之外,SPARK_PID_DIR
的默认路径是/tmp/spark-events
,如果此路径不存在,在启用的时候也会报错,信息如下。
# 启用spark-history-server出现下面的错误
starting org.apache.spark.deploy.history.HistoryServer, logging to /opt/neu/spark-2.1.1-bin-hadoop2.6/logs/spark-neu-org.apache.spark.deploy.history.HistoryServer-1-s121202.out
failed to launch: nice -n 0 /opt/neu/spark-2.1.1-bin-hadoop2.6/bin/spark-class org.apache.spark.deploy.history.HistoryServer
at org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$startPolling(FsHistoryProvider.scala:204)
... 9 more
full log in /opt/neu/spark-2.1.1-bin-hadoop2.6/logs/spark-neu-org.apache.spark.deploy.history.HistoryServer-1-s121202.out
# 查看错误日志
tail -500f /opt/neu/spark-2.1.1-bin-hadoop2.6/logs/spark-neu-org.apache.spark.deploy.history.HistoryServer-1-s121202.out
# 完整报错信息如下:
Spark Command: /usr/java/latest/bin/java -cp /opt/neu/spark-2.1.1-bin-hadoop2.6/conf/:/opt/neu/spark-2.1.1-bin-hadoop2.6/jars/*:/etc/hadoop/conf/ -Xmx1g org.apache.spark.deploy.history.HistoryServer
========================================
17/07/06 16:03:00 INFO history.HistoryServer: Started daemon with process name: [email protected]
17/07/06 16:03:00 INFO util.SignalUtils: Registered signal handler for TERM
17/07/06 16:03:00 INFO util.SignalUtils: Registered signal handler for HUP
17/07/06 16:03:00 INFO util.SignalUtils: Registered signal handler for INT
17/07/06 16:03:01 INFO spark.SecurityManager: Changing view acls to: neu
17/07/06 16:03:01 INFO spark.SecurityManager: Changing modify acls to: neu
17/07/06 16:03:01 INFO spark.SecurityManager: Changing view acls groups to:
17/07/06 16:03:01 INFO spark.SecurityManager: Changing modify acls groups to:
17/07/06 16:03:01 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(neu); groups with view permissions: Set(); users with modify permissions: Set(neu); groups with modify permissions: Set()
17/07/06 16:03:01 INFO history.FsHistoryProvider: History server ui acls disabled; users with admin permissions: ; groups with admin permissions
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.spark.deploy.history.HistoryServer$.main(HistoryServer.scala:278)
at org.apache.spark.deploy.history.HistoryServer.main(HistoryServer.scala)
Caused by: java.io.FileNotFoundException: Log directory specified does not exist: file:/tmp/spark-events Did you configure the correct one through spark.history.fs.logDirectory?
at org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$startPolling(FsHistoryProvider.scala:214)
at org.apache.spark.deploy.history.FsHistoryProvider.initialize(FsHistoryProvider.scala:160)
at org.apache.spark.deploy.history.FsHistoryProvider.<init>(FsHistoryProvider.scala:156)
at org.apache.spark.deploy.history.FsHistoryProvider.<init>(FsHistoryProvider.scala:77)
... 6 more
Caused by: java.io.FileNotFoundException: File file:/tmp/spark-events does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:537)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:750)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:527)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
at org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$startPolling(FsHistoryProvider.scala:204)
... 9 more
SparkAPP的事件日志信息所在的目录可以在执行启动脚本的时候通过传参来指定,例如./start-history-server.sh hdfs://s121202:8020/sparkLogs
,也可以通过spark.history.fs.logDirectory
参数来指定,指定方式如下。
也可以配置在$SPARK_HOME/conf/spark-env.sh
中:
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=6666 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://s121202:8020/sparkLogs"
# -Dspark.history.ui.port=6666 指定WebUI访问端口,端口号必须是1024~65535之间的数,或者指定0,表示随机取一个空闲的port,在启动日志信息中可以看到它。
# -Dspark.history.retainedApplications=3 指定保存Application历史记录个数
# -Dspark.history.fs.logDirectory=hdfs://s121202:8020/sparkLogs 指定包含要加载的应用程序事件日志的目录URL,也可以是本地文件路径file://
spark.eventLog.dir
和spark.history.fs.logDirectory
的区别:
spark.eventLog.dir用来指定SparkAPP运行时生成事件日志的目录,spark.history.fs.logDirectory是spark-history-server发现日志事件的位置
启动 Spark-History-Server:
# 进入sbin目录下
cd $SPARK_HOME/sbin
# 执行启动脚本
./start-history-server.sh
spark-history-server的默认端口是18080
,启动完成之后,通过地址:http://localhost:18080
访问
Task Failure Monitor
在 Spark 程序中,task
会根据spark.task.maxFailures(default 4)
失败后进行会先进行重试,而不是让整个 Spark App 死掉,只有重试次数超过阈值的时候才会杀死 App。另外,如果是 Spark on YARN,那么程序还会受 YARN 的重试机制尝试重启 Spark App,通过参数yarn.resourcemanager.am.max-attempts(default 2)
控制。
Catch Task Failure Event
- 在
Executor
中,不管task
成功与否都会向execBackend
报告task
的状态execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
CoarseGrainedExecutorBackend
会向driver
发送StatusUpdate
状态变更信息override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { val msg = StatusUpdate(executorId, taskId, state, data) driver match { case Some(driverRef) => driverRef.send(msg) case None => logWarning(s"Drop $msg because has not yet connected to driver") } }
CoarseGrainedSchedulerBackend
收到信息后调用scheduler.statusUpdate()
```scala // 1 override def receive: PartialFunction[Any, Unit] = { case StatusUpdate(executorId, taskId, state, data) => scheduler.statusUpdate(taskId, state, data.value) …..
// 2 taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) // 3 scheduler.handleFailedTask(taskSetManager, tid, taskState, reason) // 4 taskSetManager.handleFailedTask(tid, taskState, reason) // 5 sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info) // 6 eventProcessLoop.post(CompletionEvent(task, reason, result, accumUpdates, taskInfo)) ```