参考
- what is shuffle read shuffle write in apache spark
- spark shuffle introduction
- nsdi spark
- 一篇文章了解 Spark Shuffle 内存使用
- Shuffle Write解析 (Sort Based Shuffle)
Spark Memory 管理
Executor
中的内存由MemoryManager
统一管理,当一个Task
被分配到某个Executor
上时,会为该Task
创建一个TaskMemoryManager
,TaskMemoryManager
实际上只对当前Task
的内存使用进行管理,而真正的内存的申请、分配、释放工作交给MemoryManager
的实现类去做,MemoryManager
则又借助MemoryAllocator
去真正执行内存的分配。一个Exectutor
的TaskMemoryManager
由executor-cores
决定。
Spark 使用抽象类MemoryConsumer
表示需要使用内存的消费者,其中定义了一些对内存进行具体操作的方法(如acquireMemory(申请)
、allocateMemory(分配)
、freeMemory(释放)
)和接口(如spill(内存数据溢写到磁盘)
),实际上MemoryConsumer
的实现类主要实现spill
接口,将内存数据溢写到磁盘,对于相关内存的申请、分配、释放工作交给TaskMemoryManager
来执行。一个TaskMemoryManager
会包含若干个MemoryConsumer
MemoryConsumer
package org.apache.spark.memory;
/**
* A memory consumer of {@link TaskMemoryManager} that supports spilling.
*
* Note: this only supports allocation / spilling of Tungsten memory.
*/
public abstract class MemoryConsumer {
protected final TaskMemoryManager taskMemoryManager;
private final long pageSize;
private final MemoryMode mode;
protected long used;
protected MemoryConsumer(TaskMemoryManager taskMemoryManager, long pageSize, MemoryMode mode) {
this.taskMemoryManager = taskMemoryManager;
this.pageSize = pageSize;
this.mode = mode;
}
protected MemoryConsumer(TaskMemoryManager taskMemoryManager) {
this(taskMemoryManager, taskMemoryManager.pageSizeBytes(), MemoryMode.ON_HEAP);
}
/**
* Spill some data to disk to release memory, which will be called by TaskMemoryManager
* when there is not enough memory for the task.
*
* This should be implemented by subclass.
*
* Note: In order to avoid possible deadlock, should not call acquireMemory() from spill().
*
* Note: today, this only frees Tungsten-managed pages.
*
* @param size the amount of memory should be released
* @param trigger the MemoryConsumer that trigger this spilling
* @return the amount of released memory in bytes
* @throws IOException
*/
// 需要具体的实现类去实现
public abstract long spill(long size, MemoryConsumer trigger) throws IOException;
/**
* Allocate a memory block with at least `required` bytes.
*
* @throws OutOfMemoryError
*/
protected MemoryBlock allocatePage(long required) {
// 实际交给 taskMemoryManager 执行
MemoryBlock page = taskMemoryManager.allocatePage(Math.max(pageSize, required), this);
if (page == null || page.size() < required) {
throwOom(page, required);
}
used += page.size();
return page;
}
/**
* Free a memory block.
*/
protected void freePage(MemoryBlock page) {
used -= page.size();
// 实际交给 taskMemoryManager 执行
taskMemoryManager.freePage(page, this);
}
/**
* Allocates memory of `size`.
*/
public long acquireMemory(long size) {
// 实际交给 taskMemoryManager 执行
long granted = taskMemoryManager.acquireExecutionMemory(size, this);
used += granted;
return granted;
}
}
TaskMemoryManager
package org.apache.spark.memory;
/**
* Manages the memory allocated by an individual task.
* <p>
* Most of the complexity in this class deals with encoding of off-heap addresses into 64-bit longs.
* In off-heap mode, memory can be directly addressed with 64-bit longs. In on-heap mode, memory is
* addressed by the combination of a base Object reference and a 64-bit offset within that object.
* This is a problem when we want to store pointers to data structures inside of other structures,
* such as record pointers inside hashmaps or sorting buffers. Even if we decided to use 128 bits
* to address memory, we can't just store the address of the base object since it's not guaranteed
* to remain stable as the heap gets reorganized due to GC.
* <p>
* Instead, we use the following approach to encode record pointers in 64-bit longs: for off-heap
* mode, just store the raw address, and for on-heap mode use the upper 13 bits of the address to
* store a "page number" and the lower 51 bits to store an offset within this page. These page
* numbers are used to index into a "page table" array inside of the MemoryManager in order to
* retrieve the base object.
* <p>
* This allows us to address 8192 pages. In on-heap mode, the maximum page size is limited by the
* maximum size of a long[] array, allowing us to address 8192 * (2^31 - 1) * 8 bytes, which is
* approximately 140 terabytes of memory.
*/
public class TaskMemoryManager {
/** The number of bits used to address the page table. */
private static final int PAGE_NUMBER_BITS = 13;
/** The number of bits used to encode offsets in data pages. */
@VisibleForTesting
static final int OFFSET_BITS = 64 - PAGE_NUMBER_BITS; // 51
/** The number of entries in the page table. */
private static final int PAGE_TABLE_SIZE = 1 << PAGE_NUMBER_BITS;
/**
* Maximum supported data page size (in bytes). In principle, the maximum addressable page size is
* (1L << OFFSET_BITS) bytes, which is 2+ petabytes. However, the on-heap allocator's
* maximum page size is limited by the maximum amount of data that can be stored in a long[]
* array, which is (2^31 - 1) * 8 bytes (or about 17 gigabytes). Therefore, we cap this at 17
* gigabytes.
*/
public static final long MAXIMUM_PAGE_SIZE_BYTES = ((1L << 31) - 1) * 8L;
/** Bit mask for the lower 51 bits of a long. */
private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL;
/**
* Similar to an operating system's page table, this array maps page numbers into base object
* pointers, allowing us to translate between the hashtable's internal 64-bit address
* representation and the baseObject+offset representation which we use to support both in- and
* off-heap addresses. When using an off-heap allocator, every entry in this map will be `null`.
* When using an in-heap allocator, the entries in this map will point to pages' base objects.
* Entries are added to this map as new data pages are allocated.
*/
// 通过 MemoryBlock[] 管理当前 Task 已分配到的内存
private final MemoryBlock[] pageTable = new MemoryBlock[PAGE_TABLE_SIZE];
/**
* Bitmap for tracking free pages.
*/
private final BitSet allocatedPages = new BitSet(PAGE_TABLE_SIZE);
private final MemoryManager memoryManager;
private final long taskAttemptId;
/**
* Tracks whether we're in-heap or off-heap. For off-heap, we short-circuit most of these methods
* without doing any masking or lookups. Since this branching should be well-predicted by the JIT,
* this extra layer of indirection / abstraction hopefully shouldn't be too expensive.
*/
final MemoryMode tungstenMemoryMode;
/**
* Tracks spillable memory consumers.
*/
@GuardedBy("this")
private final HashSet<MemoryConsumer> consumers;
/**
* The amount of memory that is acquired but not used.
*/
private volatile long acquiredButNotUsed = 0L;
/**
* Construct a new TaskMemoryManager.
*/
public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
this.tungstenMemoryMode = memoryManager.tungstenMemoryMode();
this.memoryManager = memoryManager;
this.taskAttemptId = taskAttemptId;
this.consumers = new HashSet<>();
}
/**
* Acquire N bytes of memory for a consumer. If there is no enough memory, it will call
* spill() of consumers to release more memory.
*
* @return number of bytes successfully granted (<= N).
*/
public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
assert(required >= 0);
assert(consumer != null);
MemoryMode mode = consumer.getMode();
// If we are allocating Tungsten pages off-heap and receive a request to allocate on-heap
// memory here, then it may not make sense to spill since that would only end up freeing
// off-heap memory. This is subject to change, though, so it may be risky to make this
// optimization now in case we forget to undo it late when making changes.
synchronized (this) {
// 向 MemoryManager 申请内存
long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode);
// Try to release memory from other consumers first, then we can reduce the frequency of
// spilling, avoid to have too many spilled files.
// 如果没有申请到足够的内存,则先调用其他 consumer 的 spill 方法释放内存
if (got < required) {
// Call spill() on other consumers to release memory
// Sort the consumers according their memory usage. So we avoid spilling the same consumer
// which is just spilled in last few times and re-spilling on it will produce many small
// spill files.
TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
for (MemoryConsumer c: consumers) {
if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
long key = c.getUsed();
List<MemoryConsumer> list =
sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1));
list.add(c);
}
}
while (!sortedConsumers.isEmpty()) {
// Get the consumer using the least memory more than the remaining required memory.
Map.Entry<Long, List<MemoryConsumer>> currentEntry =
sortedConsumers.ceilingEntry(required - got);
// No consumer has used memory more than the remaining required memory.
// Get the consumer of largest used memory.
if (currentEntry == null) {
currentEntry = sortedConsumers.lastEntry();
}
List<MemoryConsumer> cList = currentEntry.getValue();
MemoryConsumer c = cList.remove(cList.size() - 1);
if (cList.isEmpty()) {
sortedConsumers.remove(currentEntry.getKey());
}
try {
// MemoryConsumer 实现类将内存数据写入磁盘以释放内存
long released = c.spill(required - got, consumer);
if (released > 0) {
logger.debug("Task {} released {} from {} for {}", taskAttemptId,
Utils.bytesToString(released), c, consumer);
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
if (got >= required) {
break;
}
}
} catch (ClosedByInterruptException e) {
// This called by user to kill a task (e.g: speculative task).
logger.error("error while calling spill() on " + c, e);
throw new RuntimeException(e.getMessage());
} catch (IOException e) {
logger.error("error while calling spill() on " + c, e);
throw new SparkOutOfMemoryError("error while calling spill() on " + c + " : "
+ e.getMessage());
}
}
}
// call spill() on itself
// 如果还不够,再调用当前 consumer 的 spill 方法释放内存
if (got < required) {
try {
// MemoryConsumer 实现类将内存数据写入磁盘以释放内存
long released = consumer.spill(required - got, consumer);
if (released > 0) {
logger.debug("Task {} released {} from itself ({})", taskAttemptId,
Utils.bytesToString(released), consumer);
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
}
} catch (ClosedByInterruptException e) {
// This called by user to kill a task (e.g: speculative task).
logger.error("error while calling spill() on " + consumer, e);
throw new RuntimeException(e.getMessage());
} catch (IOException e) {
logger.error("error while calling spill() on " + consumer, e);
throw new SparkOutOfMemoryError("error while calling spill() on " + consumer + " : "
+ e.getMessage());
}
}
consumers.add(consumer);
logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got), consumer);
return got;
}
}
/**
* Allocate a block of memory that will be tracked in the MemoryManager's page table; this is
* intended for allocating large blocks of Tungsten memory that will be shared between operators.
*
* Returns `null` if there was not enough memory to allocate the page. May return a page that
* contains fewer bytes than requested, so callers should verify the size of returned pages.
*
* @throws TooLargePageException
*/
public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
assert(consumer != null);
assert(consumer.getMode() == tungstenMemoryMode);
if (size > MAXIMUM_PAGE_SIZE_BYTES) {
throw new TooLargePageException(size);
}
long acquired = acquireExecutionMemory(size, consumer);
if (acquired <= 0) {
return null;
}
final int pageNumber;
synchronized (this) {
pageNumber = allocatedPages.nextClearBit(0);
if (pageNumber >= PAGE_TABLE_SIZE) {
releaseExecutionMemory(acquired, consumer);
throw new IllegalStateException(
"Have already allocated a maximum of " + PAGE_TABLE_SIZE + " pages");
}
allocatedPages.set(pageNumber);
}
MemoryBlock page = null;
try {
// tungstenMemoryAllocator() 返回 MemoryAllocator
page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
} catch (OutOfMemoryError e) {
logger.warn("Failed to allocate a page ({} bytes), try again.", acquired);
// there is no enough memory actually, it means the actual free memory is smaller than
// MemoryManager thought, we should keep the acquired memory.
synchronized (this) {
acquiredButNotUsed += acquired;
allocatedPages.clear(pageNumber);
}
// this could trigger spilling to free some pages.
return allocatePage(size, consumer);
}
page.pageNumber = pageNumber;
pageTable[pageNumber] = page;
if (logger.isTraceEnabled()) {
logger.trace("Allocate page number {} ({} bytes)", pageNumber, acquired);
}
return page;
}
}
MemoryManager
package org.apache.spark.memory
/**
* An abstract memory manager that enforces how memory is shared between execution and storage.
*
* In this context, execution memory refers to that used for computation in shuffles, joins,
* sorts and aggregations, while storage memory refers to that used for caching and propagating
* internal data across the cluster. There exists one MemoryManager per JVM.
*/
private[spark] abstract class MemoryManager(
conf: SparkConf,
numCores: Int,
onHeapStorageMemory: Long,
onHeapExecutionMemory: Long) extends Logging {
// -- Methods related to memory allocation policies and bookkeeping ------------------------------
// 堆内存储内存池
@GuardedBy("this")
protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
// 堆外存储内存池
@GuardedBy("this")
protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
// 堆内执行内存池
@GuardedBy("this")
protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
// 堆外执行内存池
@GuardedBy("this")
protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)
onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory)
onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)
// 从配置文件读取最大堆外内存
protected[this] val maxOffHeapMemory = conf.get(MEMORY_OFFHEAP_SIZE)
// 堆外存储内存 = 最大堆外内存 * ${spark.memory.storageFraction} (默认 0.5)
protected[this] val offHeapStorageMemory =
(maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong
offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory)
offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)
/**
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
*
* @return whether all N bytes were successfully granted.
*/
// 申请存储内存
def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
/**
* Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
*
* This extra method allows subclasses to differentiate behavior between acquiring storage
* memory and acquiring unroll memory. For instance, the memory management model in Spark
* 1.5 and before places a limit on the amount of space that can be freed from unrolling.
*
* @return whether all N bytes were successfully granted.
*/
// 申请内存用于展开 block
def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
/**
* Try to acquire up to `numBytes` of execution memory for the current task and return the
* number of bytes obtained, or 0 if none can be allocated.
*
* This call may block until there is enough free memory in some situations, to make sure each
* task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of
* active tasks) before it is forced to spill. This can happen if the number of tasks increase
* but an older task had a lot of memory already.
*/
// 申请执行内存
private[memory]
def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long
// -- Fields related to Tungsten managed memory -------------------------------------------------
/**
* Tracks whether Tungsten memory will be allocated on the JVM heap or off-heap using
* sun.misc.Unsafe.
*/
final val tungstenMemoryMode: MemoryMode = {
// 是否启用堆外内存
if (conf.get(MEMORY_OFFHEAP_ENABLED)) {
require(conf.get(MEMORY_OFFHEAP_SIZE) > 0,
"spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
require(Platform.unaligned(),
"No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.")
MemoryMode.OFF_HEAP
} else {
MemoryMode.ON_HEAP
}
}
/**
* Allocates memory for use by Unsafe/Tungsten code.
*/
// 根据 MemoryMode 获取相应的 MemoryAllocator
private[memory] final val tungstenMemoryAllocator: MemoryAllocator = {
tungstenMemoryMode match {
case MemoryMode.ON_HEAP => MemoryAllocator.HEAP
case MemoryMode.OFF_HEAP => MemoryAllocator.UNSAFE
}
}
}
Spark Shuffle
Spark Shuffle 分为两个阶段:shuffle write
和shuffle read
。
ShuffleWriter
获取map task
中的数据,写入shuffle system
。ShuffleReader
读取shuffle system
生成的reduce task
中的数据,这些数据是从mappers
获取,然后合并到reduce task
的。
ShuffleWriter
Write 阶段经历排序(最低要求需要按照分区进行排序),如果有combiner
函数的话还需要进行聚合,如果有多个spill
溢写文件的话还需要对其进行归并,最终每个Shuffle Writer
会产生一个数据文件(可能包含多个分区)和一个索引文件。其中数据文件会按照分区存储,同一分区内的数据是连续的,索引文件记录每个分区在文件中的起始位置和结束位置。对于shuffle writer
,Spark 2.0+ 有 3 中实现,分别是BypassMergeSortShuffleWriter
、UnsafeShuffleWriter
和SortShuffleWriter
BypassMergeSortShuffleWriter
BypassMergeSortShuffleWriter
是基于排序的哈希样式的ShuffleWriter
,它会读取每个partition
的数据,然后写到单独的文件中,最后将所有的分区文件合并成一个数据文件,并把各分区文件在最终文件中的区域划分信息提供给reducers
。数据不会缓存在内存中,他按照规定格式写出以供IndexShuffleBlockResolver
消费。
这种shuffle
写路径的方式对于reduce partitions
特别多的情况效率非常低,因为它会同为所有分区打开单独的序列化程序和文件流。
由于这个过程不会 sort 和 combine(如果需要 combine 不会使用这个实现)等操作,因此对于BypassMergeSortShuffleWriter
,总体来说是不怎么耗费内存的。
SortShuffleWriter 分析
SortShuffleWriter
是最一般的实现,也是日常使用最频繁的。SortShuffleWriter
主要委托ExternalSorter
做数据插入,排序,归并(Merge),聚合(Combine)以及最终写数据和索引文件的工作。ExternalSorter
实现了之前提到的MemoryConsumer
接口。下面分析一下各个过程使用内存的情况:
UnsafeShuffleWriter
ShuffleManager
ShuffleManager
是shuffle systems
的一个可插拔的接口,它会根据spark.shuffle.manager
(别找了,现在就一个)的配置,在driver
和executor
端的SparkEnv
中创建对应的ShuffleManager
。driver
程序会向它注册shuffles
,而executors (or tasks running locally in the driver)
会向它请求写入/读取数据。
package org.apache.spark.shuffle
import org.apache.spark.{ShuffleDependency, TaskContext}
/**
* Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the driver
* and on each executor, based on the spark.shuffle.manager setting. The driver registers shuffles
* with it, and executors (or tasks running locally in the driver) can ask to read and write data.
*
* NOTE: this will be instantiated by SparkEnv so its constructor can take a SparkConf and
* boolean isDriver as parameters.
*/
private[spark] trait ShuffleManager {
/**
* Register a shuffle with the manager and obtain a handle for it to pass to tasks.
*/
def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle
/** Get a writer for a given partition. Called on executors by map tasks. */
// 一个 map task 一个 writer
def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V]
/**
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
* Called on executors by reduce tasks.
*/
def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C]
/**
* Remove a shuffle's metadata from the ShuffleManager.
* @return true if the metadata removed successfully, otherwise false.
*/
def unregisterShuffle(shuffleId: Int): Boolean
/**
* Return a resolver capable of retrieving shuffle block data based on block coordinates.
*/
// 返回一个能够根据 block 坐标检索 shuffle block 的解析器
def shuffleBlockResolver: ShuffleBlockResolver
/** Shut down this ShuffleManager. */
def stop(): Unit
}
SortShuffleManager
目前 Spark 中ShuffleManager
只有这一种实现,即基于排序的SortShuffleManager
,它将传入的记录根据其目标partitionId
进行排序,然后写入单个map output file
,reducers
获取此文件中连续的regions
,以便得到该文件的一部分。如果map output
数据太大无法放入内存中,则把output
已经排好序的子集 spill(溢写)到磁盘,最后把这些溢写文件合并生成最终的输出文件。
基于排序的shuffle
有两种不同的写路径划分方式,以产生map output files
:
- Serialized sorting: 在满足以下三个条件时使用
ShuffleDependency
指定没有聚合和输出排序。ShuffleSerializer
支持重新定位序列化值(KryoSerializer
和 Spark SQL 的自定义serializers
支持此功能)shuffle
产生的输出分区数小于 16777216
- Deserialized sorting: 用于所有其他情况
对于Serialized sorting mode
,读取的数据在传入ShuffleWriter
后立即被序列化,并在排序期间以序列化的形式缓存。这种方式实现了以下的几种优化:
- 它对序列化后的二进制数据进行排序,而不是 Java 对象,这样就会减少内存和 GC 的开销。该优化要求
record serializer
有能够不需要反序列化就可以对序列化后的数据进行排序的属性。参考SPARK-4550,它首次提出并实现了该优化。 - 它使用专门的 cache-efficient sorter(
ShuffleExternalSorter
)来对压缩记录指针和分区 ids 的数组进行排序,数组中的每个元素仅用 8 个字节的空间,这样就可以在内存中缓存更多条数据。 - 在溢写合并的过程中,对同一分区的序列化数据块进行操作,不需要反序列化数据
- 当溢写压缩编解码器支持压缩数据的连接时,溢写合并只是简单的把序列化并压缩的溢写分区连接起来,生成最终的输出分区。这就允许使用更高效的数据复制方法,如 NIO 的
transferTo
,并避免了在合并期间分配额外的解压缩或复制缓存的需要。
关于SortShuffleManager
的更详细的优化参考SPARK-7081
下面看下源码
package org.apache.spark.shuffle.sort
import java.util.concurrent.ConcurrentHashMap
import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.shuffle._
/**
* In sort-based shuffle, incoming records are sorted according to their target partition ids, then
* written to a single map output file. Reducers fetch contiguous regions of this file in order to
* read their portion of the map output. In cases where the map output data is too large to fit in
* memory, sorted subsets of the output can be spilled to disk and those on-disk files are merged
* to produce the final output file.
*
* Sort-based shuffle has two different write paths for producing its map output files:
*
* - Serialized sorting: used when all three of the following conditions hold:
* 1. The shuffle dependency specifies no aggregation or output ordering.
* 2. The shuffle serializer supports relocation of serialized values (this is currently
* supported by KryoSerializer and Spark SQL's custom serializers).
* 3. The shuffle produces fewer than 16777216 output partitions.
* - Deserialized sorting: used to handle all other cases.
*
* -----------------------
* Serialized sorting mode
* -----------------------
*
* In the serialized sorting mode, incoming records are serialized as soon as they are passed to the
* shuffle writer and are buffered in a serialized form during sorting. This write path implements
* several optimizations:
*
* - Its sort operates on serialized binary data rather than Java objects, which reduces memory
* consumption and GC overheads. This optimization requires the record serializer to have certain
* properties to allow serialized records to be re-ordered without requiring deserialization.
* See SPARK-4550, where this optimization was first proposed and implemented, for more details.
*
* - It uses a specialized cache-efficient sorter ([[ShuffleExternalSorter]]) that sorts
* arrays of compressed record pointers and partition ids. By using only 8 bytes of space per
* record in the sorting array, this fits more of the array into cache.
*
* - The spill merging procedure operates on blocks of serialized records that belong to the same
* partition and does not need to deserialize records during the merge.
*
* - When the spill compression codec supports concatenation of compressed data, the spill merge
* simply concatenates the serialized and compressed spill partitions to produce the final output
* partition. This allows efficient data copying methods, like NIO's `transferTo`, to be used
* and avoids the need to allocate decompression or copying buffers during the merge.
*
* For more details on these optimizations, see SPARK-7081.
*/
private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
if (!conf.getBoolean("spark.shuffle.spill", true)) {
logWarning(
"spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+." +
" Shuffle will continue to spill to disk when necessary.")
}
/**
* A mapping from shuffle ids to the number of mappers producing output for those shuffles.
*/
private[this] val numMapsForShuffle = new ConcurrentHashMap[Int, Int]()
override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)
/**
* Obtains a [[ShuffleHandle]] to pass to tasks.
*/
override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
// 通过 ShuffleDependency 去判断是否能够绕过合并和排序
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
// need map-side aggregation, then write numPartitions files directly and just concatenate
// them at the end. This avoids doing serialization and deserialization twice to merge
// together the spilled files, which would happen with the normal code path. The downside is
// having multiple files open at a time and thus more memory allocated to buffers.
new BypassMergeSortShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
}
// 如果不能绕过合并和排序,那是否能使用 SerializedShuffle
else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
new SerializedShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
}
// 都不能的话就使用非序列化的形式去处理
else {
// Otherwise, buffer map outputs in a deserialized form:
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}
}
/**
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
* Called on executors by reduce tasks.
*/
override def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C] = {
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
}
/** Get a writer for a given partition. Called on executors by map tasks. */
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext): ShuffleWriter[K, V] = {
numMapsForShuffle.putIfAbsent(
handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
val env = SparkEnv.get
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
new UnsafeShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
context.taskMemoryManager(),
unsafeShuffleHandle,
mapId,
context,
env.conf)
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
bypassMergeSortHandle,
mapId,
context,
env.conf)
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
}
}
/** Remove a shuffle's metadata from the ShuffleManager. */
override def unregisterShuffle(shuffleId: Int): Boolean = {
Option(numMapsForShuffle.remove(shuffleId)).foreach { numMaps =>
(0 until numMaps).foreach { mapId =>
shuffleBlockResolver.removeDataByMap(shuffleId, mapId)
}
}
true
}
/** Shut down this ShuffleManager. */
override def stop(): Unit = {
shuffleBlockResolver.stop()
}
}
private[spark] object SortShuffleManager extends Logging {
/**
* The maximum number of shuffle output partitions that SortShuffleManager supports when
* buffering map outputs in a serialized form. This is an extreme defensive programming measure,
* since it's extremely unlikely that a single shuffle produces over 16 million output partitions.
* */
val MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE =
PackedRecordPointer.MAXIMUM_PARTITION_ID + 1
/**
* Helper method for determining whether a shuffle should use an optimized serialized shuffle
* path or whether it should fall back to the original path that operates on deserialized objects.
*/
// 判断能否使用 Serialized sorting
def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
val shufId = dependency.shuffleId
val numPartitions = dependency.partitioner.numPartitions
// 是否支持对序列化的二进制数据进行排序
if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " +
s"${dependency.serializer.getClass.getName}, does not support object relocation")
false
}
// 是否没有 map 端的聚合操作
else if (dependency.mapSideCombine) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because we need to do " +
s"map-side aggregation")
false
}
// shuffle output partitions 数量是否小于 16777216
else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " +
s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")
false
}
else {
log.debug(s"Can use serialized shuffle for shuffle $shufId")
true
}
}
}
/**
* Subclass of [[BaseShuffleHandle]], used to identify when we've chosen to use the
* serialized shuffle.
*/
private[spark] class SerializedShuffleHandle[K, V](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, V])
extends BaseShuffleHandle(shuffleId, numMaps, dependency) {
}
/**
* Subclass of [[BaseShuffleHandle]], used to identify when we've chosen to use the
* bypass merge sort shuffle path.
*/
private[spark] class BypassMergeSortShuffleHandle[K, V](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, V])
extends BaseShuffleHandle(shuffleId, numMaps, dependency) {
}