Spark Memory Management

Spark Memory

一个任务提交后,申请的总内存大小为(spark.driver.memory + spark.driver.memoryOverhead) + spark.executor.instances * (spark.executor.memory + spark.executor.memoryOverhead),其中:

  • spark.driver.memory <=> --driver-memory,默认1G
  • spark.driver.memoryOverhead,默认spark.driver.memory * MEMORY_OVERHEAD_FACTOR, with minimum of 384MEMORY_OVERHEAD_FACTOR = 0.1 在 Spark 代码中写死了
  • spark.executor.instances <=> --num-executors,默认2
  • spark.executor.memory <=> --executor-memory, 默认1G
  • spark.executor.memoryOverhead,默认spark.executor.memory * MEMORY_OVERHEAD_FACTOR, with minimum of 384MiB

spark.executor.memoryOverhead: Amount of additional memory to be allocated per executor process in cluster mode, in MiB unless otherwise specified. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%). This option is currently supported on YARN and Kubernetes. Note: Additional memory includes PySpark executor memory (when spark.executor.pyspark.memory is not configured) and memory used by other non-executor processes running in the same container.

当执行spark-submit提交到 YARN 时,Executor 运行在 YARN Container,可申请的最大内存受限于yarn.scheduler.maximum-allocation-mb,因此当 Executor 申请的内存超过该值的时候就会报错。

在 Spark 2.4.5,以及之前的版本,当开启堆外内存后,spark.yarn.executor.memoryOverhead需要包含spark.memory.offHeap.size,以向 YARN 申请足够的内存去启动 Executor,也就是说如果你设置spark.memory.offHeap.size=4G,提交到 YARN 的时候spark.yarn.executor.memoryOverhead就得大于4G,但在 Spark 3.0 之后,spark.executor.memoryOverhead不再需要包含spark.memory.offHeap.size,具体可以参考Difference between spark.yarn.executor.memoryOverhead and spark.memory.offHeap.size

The maximum memory size of container to running executor is determined by the sum of spark.executor.memoryOverhead, spark.executor.memory, spark.memory.offHeap.size and spark.executor.pyspark.memory.

从 Spark3.0 开始,Spark 申请的内存可以划分成三大部分

  • On-Heap memory:spark.executor.memory
  • Off-Heap Memory:spark.memory.offHeap.size
  • Additional memory:也就是spark.executor.memoryOverhead,用于额外的内存开销,比如 VM Overheads、interned strings、other native overheads,如果没有设置spark.executor.pyspark.memory,那么这部分内存也从额外的内存当中划分,还有在 container 中运行的其他 non-executor 进程所使用的内存

On-Heap Memory

对于 Heap Memory 可以划分为三块:

  • Spark Memory:(spark.executor.memory - 300MB) * spark.memory.fraction,该部分内存主要用于 Spark 程序运行时,这部分内存使用大致可以分为两类,Storage 和 Execution,在 UnifiedMemoryManager 中,它们共享该区域的内存,且可以互相借用,具体借用规则下面会说,这里先简单了解下
    • On-Heap Storage Memory: Spark Memory * spark.memory.storageFraction,主要用于存储 Spark 的 Cache 数据,需要在集群内传播的内部数据。例如 RDD 的缓存、广播(Broadcast)数据、和 Unroll 数据
    • On-Heap Execution Memory: Spark Memory * (1 - spark.memory.storageFraction),主要用于 Shuffle、Join、Sort、Aggregation 计算
  • Other/User Memory:(spark.executor.memory - 300MB) * (1 - spark.memory.fraction),其他/用户内存,reserved for user data structures, internal metadata in Spark, and safeguarding against OOM errors in the case of sparse and unusually large records.
  • Reserved Memory: 300MB,在 Spark 中硬编码写死了,是为系统预留的内存,比如存储 Spark 内部对象。统一内存管理最初版本是没有固定 300M 的设置,但是如果给定的内存较低时,例如 spark.executor.memory = 1Gspark.memory.fraction = 0.75这样用于 non-storage、non-execution 的内存就只有 250MB,在启动时出现 OOM,因此,对于 Other/User Memory 这部分内存做了修改,先划出 300M 内存。具体可以参考 SPARK-12081

Spark Heap Memory 中有两个比较重要的参数

Property Name Default Meaning Since Version
spark.memory.fraction 0.6 Fraction of (heap space - 300MB) used for execution and storage. The lower this is, the more frequently spills and cached data eviction occur. The purpose of this config is to set aside memory for internal metadata, user data structures, and imprecise size estimation in the case of sparse, unusually large records. Leaving this at the default value is recommended. For more detail, including important information about correctly tuning JVM garbage collection when increasing this value, see this description. 1.6.0
spark.memory.storageFraction 0.5 Amount of storage memory immune to eviction, expressed as a fraction of the size of the region set aside by spark.memory.fraction. The higher this is, the less working memory may be available to execution and tasks may spill to disk more often. Leaving this at the default value is recommended. For more detail, see this description. 1.6.0

Off-Heap Memroy

对于运行在 JVM 上的数据密集型程序,不良的内存管理可能会增加 GC 的长时停顿,这部分开销也是相当大的。自从 Spark1.6,Spark 引入了 Off-Heap memory (详见SPARK-11389),通过编写内存优化的代码并使用堆外内存存储来减少这种影响,这种模式使用 Java 的 unsafe API 直接向操作系统申请内存,堆外内存可以被精确地申请和释放,这样就避免频繁的 GC 内存开销,提升了处理性能;对于序列化数据的占用空间,可以被精确计算,相比堆内内存来说降低了管理的难度。但缺点也很明显,就是要自己写代码管理内存的申请和释放。

Spark 堆外内存默认是关闭的,通过spark.memory.offHeap.enabled=true来开启,当开启堆外内存时,需要确保spark.memory.offHeap.size > 0,自从 Spark1.6 开始,Spark 使用UnifiedMemoryManager替代StaticMemoryManager

相比 On-Heap Memroy,Off-Heap Memroy 只包含 Off-Heap Storage Memory 和 Off-Heap Execution Memory,同样也由spark.memory.storageFraction控制,堆外内存被启用后,Executor 内将同时存在堆内和堆外内存,这时Storage Memory = On-Heap Storage Memory + Off-Heap Storage Memory,同理,Execution Memory = On-Heap Execution Memory + Off-Heap Execution Memory

Unified Memory Management

在 Spark 统一内存管理机制中,Storage 和 Execution 共享一个统一的区域(M)。当不使用 Execution 内存时,Storage 可以获取所有可用内存,反之亦然。如果有必要,Execution 可能会驱逐 Storage 占用的内存,但只有当总的 Storage Memory 使用量下降到某个阈值(R)以下时,才可以执行该操作。换句话说,R 描述了 M 内的一个子区域,在该子区域中,缓存的块从不会被驱逐(但是如果空闲,可以被 Execution 占用)。Storage 可能无法驱逐 Execution 占用的内存,因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂,而且 Shuffle 过程产生的文件在后面一定会被使用到,而 Cache 在内存的数据不一定在后面使用。详细可以参考 Unified Memory Management in Spark 1.6

这种设计确保了几种理想的情况。首先,不使用缓存的应用程序可以将整个空间用于执行,从而避免了不必要的磁盘溢出。其次,使用缓存的应用程序可以保留最小的存储空间(R),以免其数据块被逐出。最后,这种方法可为不同的负载场景提供开箱即用的配置,无需用户了解如何在内部划分内存,只要根据需要改变配置即可。

  • M = (spark.executor.memory - 300MB) * spark.memory.fraction
  • R = M * spark.memory.stoargeFraction

上面说的借用对方的内存需要借用方和被借用方的内存类型都一样,都是堆内内存或者都是堆外内存,不存在堆内内存不够去借用堆外内存的空间。

Storage Memory

有帮助的截图

Storage Memory = On Heap Storage Memory + Off Heap Storage Memory

  • On-Heap Storage Memory = (spark.executor.memory - 300M) * spark.memory.fraction * spark.memory.storageFraction
  • Off-Heap Storage Memory = spark.memory.offHeap.size * spark.memory.storageFraction

Practice

Spark JMX

$ vim $SPARK_HOME/conf/metrics.properties
# *.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
executor.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
# master, worker, driver, executor
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource

# 开启 JMX 端口
$ vim $SPARK_HOME/conf/spark-defaults.conf
# port=0 表示随机取端口,否则可能会由于同一节点调度两个 Executor 造成端口冲突
spark.executor.extraJavaOptions         -Dcom.sun.management.jmxremote \
                                        -Dcom.sun.management.jmxremote.authenticate=false \
                                        -Dcom.sun.management.jmxremote.ssl=false \
                                        -Dcom.sun.management.jmxremote.port=0

定位 JMX 端口

方式一

# 获取 Application-Id
$ yarn application -list
Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):4
                Application-Id	    Application-Name	    Application-Type	      User	     Queue	             State	       Final-State	       Progress	                       Tracking-URL
application_1620462634102_0480	Bigdata - One Data Storage Metadata Extractor	               SPARK	sre.bigdata	     other	           RUNNING	         UNDEFINED	            10%	http://slave007.hadoop-shnew.data.sensetime.com:34050
# 通过 Application-Id 获取 ApplicationAttempt-Id
$ yarn applicationattempt -list application_1620462634102_0480
Total number of application attempts :1
         ApplicationAttempt-Id	               State	                    AM-Container-Id	                       Tracking-URL
appattempt_1620462634102_0480_000001	             RUNNING	container_e09_1620462634102_0480_01_000001	https://master001.hadoop-shnew.data.sensetime.com:8090/proxy/application_1620462634102_0480/
# 通过 ApplicationAttempt-Id 获取 Container-Id
$ yarn container -list appattempt_1620462634102_0480_000001
Total number of containers :81
                  Container-Id	          Start Time	         Finish Time	               State	                Host	   Node Http Address	                            LOG-URL
container_e09_1620462634102_0480_01_000066	Wed Jun 30 15:18:16 +0800 2021	                 N/A	             RUNNING	slave020.hadoop-shnew.data.sensetime.com:45454	https://slave020.hadoop-shnew.data.sensetime.com:8044	https://slave020.hadoop-shnew.data.sensetime.com:8044/node/containerlogs/container_e09_1620462634102_0480_01_000066/sre.bigdata
...
# 去对应节点上查看进程找到 pid
$ ps -aux | grep container_e09_1620462634102_0480_01_000066
yarn      21218  0.0  0.0   2376   600 ?        S    15:18   0:00 /opt/hadoop-2.7.7/bin/container-executor sre.bigdata sre.bigdata 1 application_1620462634102_0480 container_e09_1620462634102_0480_01_000066 /hadoop-data/nm-local-dir/usercache/sre.bigdata/appcache/application_1620462634102_0480/container_e09_1620462634102_0480_01_000066 /hadoop-data/nm-local-dir/nmPrivate/application_1620462634102_0480/container_e09_1620462634102_0480_01_000066/launch_container.sh /hadoop-data/nm-local-dir/nmPrivate/application_1620462634102_0480/container_e09_1620462634102_0480_01_000066/container_e09_1620462634102_0480_01_000066.tokens /hadoop-data/nm-local-dir/nmPrivate/application_1620462634102_0480/container_e09_1620462634102_0480_01_000066/container_e09_1620462634102_0480_01_000066.pid /hadoop-data/nm-local-dir /hadoop-data/logs/userlogs cgroups=none
sre.big+  21224  0.0  0.0   5396   856 ?        Ss   15:18   0:00 /bin/bash -c /usr/local/openjdk-8//bin/java -server -Xmx20480m '-Dcom.sun.management.jmxremote' '-Dcom.sun.management.jmxremote.authenticate=false' '-Dcom.sun.management.jmxremote.ssl=false' '-Dcom.sun.management.jmxremote.port=0' -Djava.io.tmpdir=/hadoop-data/nm-local-dir/usercache/sre.bigdata/appcache/application_1620462634102_0480/container_e09_1620462634102_0480_01_000066/tmp '-Dspark.history.ui.port=18080' '-Dspark.ui.port=0' '-Dspark.driver.port=42416' -Dspark.yarn.app.container.log.dir=/hadoop-data/logs/userlogs/application_1620462634102_0480/container_e09_1620462634102_0480_01_000066 -XX:OnOutOfMemoryError='kill %p' org.apache.spark.executor.YarnCoarseGrainedExecutorBackend --driver-url spark://[email protected]:42416 --executor-id 65 --hostname slave020.hadoop-shnew.data.sensetime.com --cores 2 --app-id application_1620462634102_0480 --resourceProfileId 0 --user-class-path file:/hadoop-data/nm-local-dir/usercache/sre.bigdata/appcache/application_1620462634102_0480/container_e09_1620462634102_0480_01_000066/__app__.jar 1>/hadoop-data/logs/userlogs/application_1620462634102_0480/container_e09_1620462634102_0480_01_000066/stdout 2>/hadoop-data/logs/userlogs/application_1620462634102_0480/container_e09_1620462634102_0480_01_000066/stderr
sre.big+  21236  163  2.2 23220828 5860980 ?    Sl   15:18 114:44 /usr/local/openjdk-8//bin/java -server -Xmx20480m -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=0 -Djava.io.tmpdir=/hadoop-data/nm-local-dir/usercache/sre.bigdata/appcache/application_1620462634102_0480/container_e09_1620462634102_0480_01_000066/tmp -Dspark.history.ui.port=18080 -Dspark.ui.port=0 -Dspark.driver.port=42416 -Dspark.yarn.app.container.log.dir=/hadoop-data/logs/userlogs/application_1620462634102_0480/container_e09_1620462634102_0480_01_000066 -XX:OnOutOfMemoryError=kill %p org.apache.spark.executor.YarnCoarseGrainedExecutorBackend --driver-url spark://[email protected]:42416 --executor-id 65 --hostname slave020.hadoop-shnew.data.sensetime.com --cores 2 --app-id application_1620462634102_0480 --resourceProfileId 0 --user-class-path file:/hadoop-data/nm-local-dir/usercache/sre.bigdata/appcache/application_1620462634102_0480/container_e09_1620462634102_0480_01_000066/__app__.jar

$ sudo netstat -antp | grep 21224

方式二

fileRdd.foreachPartition { iterator =>
      val executorJMX = sun.management.ConnectorAddressLink.importRemoteFrom(0).get("sun.management.JMXConnectorServer.0.remoteAddress")
      // ====> JMX Address: service:jmx:rmi:///jndi/rmi://slave023.hadoop-shnew.data.example.com:37806/jmxrmi
      println(s"====> JMX Address: $executorJMX")
      ...
}

Reference

qin

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏

打开支付宝扫一扫,即可进行扫码打赏哦