[입 개발] Spark DataFrameWriter에서 saveAsTable 의 동작

s3 에 external table을 만들고 거기에 데이터를 넣는 작업을 하다가 이상한 현상을 경험했습니다.
다음과 같은 테이블을 만들고

create external table(
    id int,
    name varchar(22)
) LOCATION
  's3://bucket/tmp/tmp1';

아래 코드를 돌렸는데…

val tb = spark.sql(
        """
        |SELECT
        |  id, name
        |FROM original
        """
)
tb.write.mode(SaveMode.Overwrite).saveAsTable("tmp1")

당연히 tmp1 테이블은 데이터를 s3에 그리고 external 테이블로 제대로 저장이 될 것을 기대했는데, 아래와 같이, managed 테이블로 바뀌고 저장 위치로 hdfs 로 바뀌는 것이었습니다. -_-(왜왜왜)

create table(
    id int,
    name varchar(22)
) LOCATION
  'hdfs://tmp/blahblah/tmp1';

Spark DataFrameWriter의 saveAsTable 을 SaveMode.Overwrite mode로 사용하게 되면 이런 일이 벌어지게 됩니다. 왜 그런가 해서 소스 코드를 까봤습니다. DataFrameWriter.scala를 보시면 됩니다.

그냥 saveAsTable 소스를 보면 간단합니다. 아래를 보면 SaveMode.Overwrite 가 true 일때… 밑에서 dropTable, createTable 을 부르는군요. 어랏…. dropTable???, 이거 실화인가요? 즉, 이 때 테이블을 날려버립니다. 그리고 createTable로 재생성해줍니다.

  private def saveAsTable(tableIdent: TableIdentifier): Unit = {
    val catalog = df.sparkSession.sessionState.catalog
    val tableExists = catalog.tableExists(tableIdent)
    val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase)
    val tableIdentWithDB = tableIdent.copy(database = Some(db))
    val tableName = tableIdentWithDB.unquotedString

    (tableExists, mode) match {
       ......
       case (true, SaveMode.Overwrite) =>
        // Get all input data source or hive relations of the query.
        val srcRelations = df.logicalPlan.collect {
          case LogicalRelation(src: BaseRelation, _, _, _) => src
          case relation: HiveTableRelation => relation.tableMeta.identifier
        }

        val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
        EliminateSubqueryAliases(tableRelation) match {
          // check if the table is a data source table (the relation is a BaseRelation).
          case LogicalRelation(dest: BaseRelation, _, _, _) if srcRelations.contains(dest) =>
            throw new AnalysisException(
              s"Cannot overwrite table $tableName that is also being read from")
          // check hive table relation when overwrite mode
          case relation: HiveTableRelation
              if srcRelations.contains(relation.tableMeta.identifier) =>
            throw new AnalysisException(
              s"Cannot overwrite table $tableName that is also being read from")
          case _ => // OK
        }

        // Drop the existing table
        catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false)
        createTable(tableIdentWithDB)
        // Refresh the cache of the table in the catalog.
        catalog.refreshTable(tableIdentWithDB)
      ......
    }
  }

그럼 external이 왜 managed가 되는지 살펴보시죠. createTable 코드를 보면 storage.locationUri.isDefined를 보고 EXTERNAL, MANAGED가 결정됩니다.(다른 글에서 쓰겠지만 Spark Sql에서 현재는 alter table 을 이용한 external, managed 변경이 안됩니다. Spark 2.4 기준)

  private def createTable(tableIdent: TableIdentifier): Unit = {
    val storage = DataSource.buildStorageFormatFromOptions(extraOptions.toMap)
    val tableType = if (storage.locationUri.isDefined) {
      CatalogTableType.EXTERNAL
    } else {
      CatalogTableType.MANAGED
    }

    val tableDesc = CatalogTable(
      identifier = tableIdent,
      tableType = tableType,
      storage = storage,
      schema = new StructType,
      provider = Some(source),
      partitionColumnNames = partitioningColumns.getOrElse(Nil),
      bucketSpec = getBucketSpec)

    runCommand(df.sparkSession, "saveAsTable")(CreateTable(tableDesc, mode, Some(df.logicalPlan)))
  }

해당 값은 buildStorageFormatFromOptions 를 보면 가져옵니다. options map에서 path가 있으면 가져오는 군요.

  def buildStorageFormatFromOptions(options: Map[String, String]): CatalogStorageFormat = {
    val path = CaseInsensitiveMap(options).get("path")
    val optionsWithoutPath = options.filterKeys(_.toLowerCase(Locale.ROOT) != "path")
    CatalogStorageFormat.empty.copy(
      locationUri = path.map(CatalogUtils.stringToURI), properties = optionsWithoutPath)
  }

여기서 path가 있으면 그냥 locationUri 에 복사해줍니다. 그럼 이제 위의 문제를 어떻게 해결해야 할까요? storage.locationUri.isDefined 를 true로 만들어주는 방법은, 넵 path option을 설정해 주면 간단하게 해결됩니다.

val tb = spark.sql(
        """
        |SELECT
        |  id, name
        |FROM original
        """
)
tb.write
  .mode(SaveMode.Overwrite)
  .option("path", "s3://bucket/tmp/tmp1")
  .saveAsTable("tmp1")

이렇게 하는 방법말고 만약에 그냥 쉽게 덮어쓰고 싶다면, insertInto 메서드를 쓰셔도 간단하게 해결됩니다.