[입 개발] Spark Structured Streaming 에서 Offset 은 어떻게 관리되는가(아주 간략한 버전)?

DStream 만 열심히 쓰다가, Structured Streaming 을 강제로 써야할 일이 생겼습니다. 그런데 DStream을 쓸 때는 Offset 을 명확하게 가져오거나 볼 수 있었는데, Structured Streaming 을 보니, 아는게 없었습니다. -_-;;; 분명히 까만건 글씨고 하얀건 바탕인데…

먼저 Structured Streaming 에서 offset 을 지정하는 방법은 startingOffsets 을 지정하는 방법이 있습니다. 보통 “earliest” 와 “latest” 를 지정하지만, 시작 Offset을 명시적으로 지정할 수도 있습니다.

  val stream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaServer)   // comma separated list of broker:host
      .option("subscribe", topic)    // comma separated list of topics
      .option("startingOffsets", "earliest") //earliest, latest
      .load()

earliest 는 현재 Kafka에서 해당 토픽이 가지고 있는 가장 빠른 offset, latest 는 가장 마지막 offset 입니다. 다음과 같이 Offset 을 명시적으로 지정할 수 도 있습니다.

  val stream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaServer)   // comma separated list of broker:host
      .option("subscribe", topic)    // comma separated list of topics
      .option("startingOffsets", """{"test_logs": {"0": 100, "1": 200}""")
      .load()

일단 위의 offset 지정 방법은 checkpoint가 생성되기 전에 최초에만 지정되는 형태입니다. 즉 checkpoint 가 생성되면 위의 staringOffsets는 checkpoint 의 값을 사용하게 됩니다.

일단 Structured Streaming 을 하면 checkpoint 를 사용하게 되어 있습니다. checkpoint 를 설정하면, 다음과 같은 기본 디렉토리 들이 생깁니다.

  • commits – 완료된 batchId 가 저장된다.
    • commits/0, commits/1 이런식으로 파일이 저장된다.
  • metadata
    • 잘 모릅니다.
  • offsets
    • 실제 offsets 이 저장됩니다.
    • offsets/0, offsets/1, offsets/2 이런식으로 Offset 이 저장되어 있다.
  • sources
    • 잘 모릅니다.

일단 잘 모르는 것들을 제외하고 나서 남아있는 offsets 와 commits 입니다. 여기서 offsets 안에는 batchId 가 있는데, 이 파일 안에 실제로 offset이 들어가 있습니다. 다음과 같이 버전 {정보} {오프셋} 형태로 들어가 있습니다. 실제 오프셋은 {topic_name: {partitionId: offset} 구조 입니다. 마지막줄의 내용이, 위에서 보았던, Specific Offset 지정 방법과 동일한 형태인 것을 볼 수 있습니다.

v1
{“batchWatermarkMs”:0,”batchTimestampMs”:1615293540745,”conf”:{“spark.sql.streaming.stateStore.providerClass”:”org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider”,”spark.sql.streaming.join.stateFormatVersion”:”2″,”spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion”:”2″,”spark.sql.streaming.multipleWatermarkPolicy”:”min”,”spark.sql.streaming.aggregation.stateFormatVersion”:”2″,”spark.sql.shuffle.partitions”:”200″}}
{“test_logs”:{“0″:1058826043,”1″:1058853622,”2”:1058871214}}

실제로 이 offset 을 남기는 부분은 external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceInitialOffsetWriter.scala 파일을 보시면 됩니다.

여기서 Offset 이 어떻게 관리되고 처리되는지 살짝 알아보도록 하겠습니다. 먼저 offsets/{nextBatchId} 로 파일이 먼저 생성됩니다. 이 파일이 있으면 현재 offset 부터 nextBatchId 에 있는 offset 까지 처리가 되게 됩니다. 해당 파일이 없으면 latest offset까지가 다음 offset 이 되면서 해당 파일이 만들어지게 됩니다. nextBatchId 는 일종의 WAL(Write ahead Log) 의 역할을 합니다.

그리고 하나의 MicroBatch 가 완료되면 commits 에 해당 batchId 가 생성되면서 완료를 확인합니다. 즉 commits/3 까지 있고 offsets/3 이 있다면, batchId 3번 까지 완료가 되었다는 뜻입니다.

그럼 이제 실제로 예를 한번 들어보겠습니다.

startingOffsets 를 earliest 로 설정하고 해당 값이 100 이고 처음 batch가 시작된다고 하면 offsets/0 파일에 {“test_logs”:{“0”: 200}} 이렇게 저장되어 있다면, 첫 배치에서 Offset 100 ~ Offset 199 까지 처리하게 됩니다. 그래서 혹시나 Offset 을 조작할 일이 있다면, 처리된 commits/{batchId} 를 지우거나, N번째와 N-1번째를 수정해주면 동작할 것 같습니다.(지우기는 해봤는데… 막 수정까지는 하지 안해봤습니다. 다만 Validation 검증 코드가 없는 걸로 봐서 동작할꺼 같습니다. 둘 다 바꾸기는 귀찮아서…)

처음에 살짝 당황했던건 offsets/0 에 있는 값이 실제로 준 offset 보다 이후 값이었는데, 왜 그런지 몰라서 한참 고민을 했습니다. offsets/0 에 있는 값이 DStream 의 untilOffset과 유사하다고 보시면 됩니다.

그리고 여기 offsets 안에 저장하는 값은 KafkaSourceOffset 과 KafkaSourceInitialOffsetWriter(HDFSMetadataLog) 을 보시면됩니다. RateLimit 가 설정이 되면 ./external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala 의 latestOffset 함수를 보면 설정에 따라서 다음 offset이 설정이 됩니다.

삽질(실험?) 과 소스를 보면서 이해한 Offset에 대한 아주 간략한 내용입니다. 위의 설명을 보면 아시겠지만, 위의 내용은 거의 Kafka 전용이고, 다른 것들은 내용이 다를 것으로 보입니다.