遇到的问题
在try
语句中的flatMap()
做写入数据库的操作,在finally
中执行Connection.close()
方法释放资源,结果出现了ConnectionClosedException
。
先验知识
- Scala 的
finally
语句块默认没有返回值(返回值类型为Unit),除非显示使用return关键字。 try
和catch
块中的最后一行默认是返回值,并且返回值会先暂存到缓存中,等待finally中的语句执行完之后再执行。 即try catch finally
+return
的执行顺序:
/**
* finally 中不显示指定return关键字
*/
def main(args: Array[String]): Unit = {
println(test())
}
def test(): Int = {
try {
throw new Exception
1
} catch {
case e: Exception => 2
}
finally {
println("finally func ...")
// finally 默认没有返回值,除非显示使用return关键字
3
}
}
输出结果
2
如果finally
中使用return
关键字return 3
,输出结果是3。非常不建议在finally
语句中写return
语句。假设finally
有return
,那么try
和catch
中的return
语句就永远不会执行,这种方式太不科学了。
问题抽象
val sparkSession = SparkSession.builder().master("local[2]").appName("test").getOrCreate()
val snRDD = sparkSession.sparkContext.makeRDD(Array("Just for Test")).cache
snRDD
.repartition(1)
.mapPartitions[String] { snOfPartition =>
try {
println(s"try func ... initial connection.")
val result = snOfPartition.toArray // Array 的算子操作
.groupBy { x =>
println(s"try - groupBy func ...")
x
}
.toIterator // Array => Iterator,之后是Iterator的算子操作
.map { x =>
println(s"try - map func ...")
x
}
.flatMap { x =>
println(s"try - flatMap func ...")
Array("A return just for test.")
}
println("try - just for example")
result // return 语句需要调用 flatMap 操作返回的对象,此刻去执行 map 和 flatMap 中的代码。而 return 语句是在 finally 块之后执行的。
} finally {
println(s"finally func ...")
println(s"connection.close()")
}
}
.collect() // Spark Action
这段代码的执行顺序如下
try func ...
try - groupBy func ...
try - just for example
finally func ...
try - map func ...
try - flatMap func ...
将代码改成
val sparkSession = SparkSession.builder().master("local[2]").appName("test").getOrCreate()
val snRDD = sparkSession.sparkContext.makeRDD(Array("Just for Test")).cache
snRDD
.repartition(1)
.mapPartitions[String] { snOfPartition =>
try {
println(s"try func ...")
val result = snOfPartition.toArray
.groupBy { x =>
println(s"try - groupBy func ...")
x
}
.map { x =>
println(s"try - map func ...")
x
}
.flatMap { x =>
println(s"try - flatMap func ...")
Array("Just for test.")
}
.toIterator // Array => Iterator
println("try - just for example")
result
} finally {
println(s"finally func ...")
}
}
.collect() // Spark Action
执行顺序如下
try func ...
try - groupBy func ...
try - map func ...
try - flatMap func ...
try - just for example
finally func ...
总结
- Scala
scala.collection.Iterator
的算子操作是lazy
操作,只有当Iterator的算子操作返回的集合对象被调用(执行,赋值不算)时,才会去执行算子当中的代码,而Array
的算子操作不是lazy
操作。 - Spark中的算子操作也是
lazy
操作,当触发Action操作的时候,才会去执行。