Scala 算子执行顺序

遇到的问题

try语句中的flatMap()做写入数据库的操作,在finally中执行Connection.close()方法释放资源,结果出现了ConnectionClosedException

先验知识

  • Scala 的finally语句块默认没有返回值(返回值类型为Unit),除非显示使用return关键字。
  • trycatch块中的最后一行默认是返回值,并且返回值会先暂存到缓存中,等待finally中的语句执行完之后再执行。 即try catch finally+return的执行顺序:
/**
 *  finally 中不显示指定return关键字
 */
def main(args: Array[String]): Unit = {
  println(test())
}

def test(): Int = {
  try {
    throw new Exception
    1
  } catch {
    case e: Exception => 2
  }
  finally {
    println("finally func ...")
    // finally 默认没有返回值,除非显示使用return关键字
    3
  }
}

输出结果

2

如果finally中使用return关键字return 3,输出结果是3。非常不建议在finally语句中写return语句。假设finallyreturn,那么trycatch中的return语句就永远不会执行,这种方式太不科学了。

问题抽象

val sparkSession = SparkSession.builder().master("local[2]").appName("test").getOrCreate()
val snRDD = sparkSession.sparkContext.makeRDD(Array("Just for Test")).cache

snRDD
  .repartition(1)
  .mapPartitions[String] { snOfPartition =>
  try {
    println(s"try func ... initial connection.")
    val result = snOfPartition.toArray // Array 的算子操作
      .groupBy { x =>
        println(s"try - groupBy func ...")
        x
      }
      .toIterator // Array => Iterator,之后是Iterator的算子操作
      .map { x =>
        println(s"try - map func ...")
        x
      }
      .flatMap { x =>
        println(s"try - flatMap func ...")
        Array("A return just for test.")
      }
      println("try - just for example")
    result // return 语句需要调用 flatMap 操作返回的对象,此刻去执行 map 和 flatMap 中的代码。而 return 语句是在 finally 块之后执行的。
  } finally {
    println(s"finally func ...")
    println(s"connection.close()")
  }
}
  .collect() // Spark Action

这段代码的执行顺序如下

try func ...
try - groupBy func ...
try - just for example
finally func ...
try - map func ...
try - flatMap func ...

将代码改成

val sparkSession = SparkSession.builder().master("local[2]").appName("test").getOrCreate()
val snRDD = sparkSession.sparkContext.makeRDD(Array("Just for Test")).cache
snRDD
  .repartition(1)
  .mapPartitions[String] { snOfPartition =>
  try {
    println(s"try func ...")
    val result = snOfPartition.toArray
      .groupBy { x =>
        println(s"try - groupBy func ...")
        x
      }
      .map { x =>
        println(s"try - map func ...")
        x
       }
      .flatMap { x =>
        println(s"try - flatMap func ...")
        Array("Just for test.")
      }
      .toIterator // Array => Iterator
    println("try - just for example")
    result 
  } finally {
    println(s"finally func ...")
  }
}
  .collect() // Spark Action

执行顺序如下

try func ...
try - groupBy func ...
try - map func ...
try - flatMap func ...
try - just for example
finally func ...

总结

  • Scalascala.collection.Iterator的算子操作是lazy操作,只有当Iterator的算子操作返回的集合对象被调用(执行,赋值不算)时,才会去执行算子当中的代码,而Array的算子操作不是lazy操作。
  • Spark中的算子操作也是lazy操作,当触发Action操作的时候,才会去执行。

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,一毛也是爱

打开支付宝扫一扫,即可进行扫码打赏哦