[입 개발] Kafka 에서 auto.offset.reset 의 사용법

최근에 서버로그를 컨슈밍해서 뭔가 작업할 일이 생겼습니다. 데이터를 매일 보면서 실제 얼마나 이벤트가 있었는지 볼려고 하니, 실제 로그의 수와 저장된 이벤트의 수가 엄청 다른 것입니다.

확인을 해보니, 로그가… 몇일 전꺼부터 계속 돌고 있었습니다. -_-;;; 즉, 몇일 치가 하루데이터라고 생각하고 계속 저장되고 있었으니, 생각하는 값보다 훨씬 많이 오버된 T.T

저만 그런 문제가 있는가 해서 뒤져보니, auto.offset.reset 이라는 환경설정 값을 찾을 수 있었습니다. auto.offset.reset 은 다음과 같이 설명이 되어 있습니다.

What to do when there is no initial offset in Zookeeper or if an offset is out of range:
* smallest : automatically reset the offset to the smallest offset
* largest : automatically reset the offset to the largest offset
* anything else: throw exception to the consumer. If this is set to largest, the consumer may lose some messages when the number of partitions, for the topics it subscribes to, changes on the broker. To prevent data loss during partition addition, set auto.offset.reset to smallest

일단 설정이 안되었을 때의 기본 값은 largest 입니다.(아주 옛날 소스는 autooffset.reset 이라는 설정에, latest 라는 설정으로 되어있었지만… 현재는 바뀌어 있습니다.)

smallest 는 가지고 있는 오프셋 값 중에 가장 작은 값을 사용합니다. largest는 반대로 가장 최신 offset 을 사용하게 됩니다.

handleOffsetOutOfRange 에서는 autoOffsetReset 값을 통해서 startTimestamp를 설정하고 실제 newOffset 을 earliestOrLatestOffset 을 통해서 가져오게 됩니다.

<pre>def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
  val startTimestamp = config.autoOffsetReset match {
    case OffsetRequest.SmallestTimeString =&gt; OffsetRequest.EarliestTime
    case OffsetRequest.LargestTimeString =&gt; OffsetRequest.LatestTime
    case _ =&gt; OffsetRequest.LatestTime
  }
  val newOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, startTimestamp, Request.OrdinaryConsumerId)
  val pti = partitionMap(topicAndPartition)
  pti.resetFetchOffset(newOffset)
  pti.resetConsumeOffset(newOffset)
  newOffset
}</pre>

그럼 이 옵션을 언제 적용하게 되느냐? 즉, 바꿔말하면 handleOffsetOutOfRange이 언제 호출되는가를 알아야 합니다. AbstractFetcherThread.scala를 보게 되면 Errors.NONE 일 때는 해당 결과 offset 으로 현재 offset을 업데이트 합니다. partitionMap에 PartitionFetchState 로 그 offset을 저장합니다.

<pre>private def processFetchRequest(fetchRequest: REQ) {
  val partitionsWithError = new mutable.HashSet[TopicAndPartition]
  var responseData: Map[TopicAndPartition, PD] = Map.empty

  try {
    trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
    responseData = fetch(fetchRequest)
  } catch {
    case t: Throwable =&gt;
      if (isRunning.get) {
        warn(s"Error in fetch $fetchRequest", t)
        inLock(partitionMapLock) {
          partitionsWithError ++= partitionMap.keys
          // there is an error occurred while fetching partitions, sleep a while
          partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
        }
      }
  }
  fetcherStats.requestRate.mark()

  if (responseData.nonEmpty) {
    // process fetched data
    inLock(partitionMapLock) {

      responseData.foreach { case (topicAndPartition, partitionData) =&gt;
        val TopicAndPartition(topic, partitionId) = topicAndPartition
        partitionMap.get(topicAndPartition).foreach(currentPartitionFetchState =&gt;
          // we append to the log if the current offset is defined and it is the same as the offset requested during fetch
          if (fetchRequest.offset(topicAndPartition) == currentPartitionFetchState.offset) {
            Errors.forCode(partitionData.errorCode) match {
              case Errors.NONE =&gt;
                try {
                  val messages = partitionData.toByteBufferMessageSet
                  val validBytes = messages.validBytes
                  val newOffset = messages.shallowIterator.toSeq.lastOption match {
                    case Some(m: MessageAndOffset) =&gt; m.nextOffset
                    case None =&gt; currentPartitionFetchState.offset
                  }
                  partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset))
                  fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset)
                  fetcherStats.byteRate.mark(validBytes)
                  // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
                  processPartitionData(topicAndPartition, currentPartitionFetchState.offset, partitionData)
                } catch {
                  case ime: CorruptRecordException =&gt;
                    // we log the error and continue. This ensures two things
                    // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
                    // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
                    // should get fixed in the subsequent fetches
                    logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.offset  + " error " + ime.getMessage)
                  case e: Throwable =&gt;
                    throw new KafkaException("error processing data for partition [%s,%d] offset %d"
                      .format(topic, partitionId, currentPartitionFetchState.offset), e)
                }
              case Errors.OFFSET_OUT_OF_RANGE =&gt;
                try {
                  val newOffset = handleOffsetOutOfRange(topicAndPartition)
                  partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset))
                  error("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
                    .format(currentPartitionFetchState.offset, topic, partitionId, newOffset))
                } catch {
                  case e: Throwable =&gt;
                    error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)
                    partitionsWithError += topicAndPartition
                }
              case _ =&gt;
                if (isRunning.get) {
                  error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id,
                    partitionData.exception.get))
                  partitionsWithError += topicAndPartition
                }
            }
          })
      }
    }
  }

  if (partitionsWithError.nonEmpty) {
    debug("handling partitions with error for %s".format(partitionsWithError))
    handlePartitionsWithErrors(partitionsWithError)
  }
}</pre>

두 번째로는 Errors.OFFSET_OUT_OF_RANGE 에러가 발생했을 때입니다. 이 에러가 발생하면, 아까 본 handleOffsetOutOfRange 함수를 통해서 다시 시작 newOffset을 설정하게 됩니다. 그럼 이 에러는 언제 발생하게 될까요?

Kafka는 consumer 가 자신이 처리한 위치 까지의 offset을 가지고 있는 구조입니다.(보통 주키퍼에 /consumers/{groupid} 형태로 기록이 되지요.) 그러니, 주키퍼의 해당 consumers 목록에 없다면 해당 설정대로 값을 읽게 됩니다. 두 번째로 이미 데이터가 접근할 수 없는 offset 즉, 이미 지워진 상태면… 해당 값을 읽어올 수 없다면, auto.commit.reset의 설정이 동작하게 됩니다.

저는 처음에 이것이 kafka 내에 설정값이 있고 해당 값보다 크면, 항상 새로 읽을 줄 알았습니다. T.T

그럼, 어떻게 항상 최신 데이터부터만 읽을 것인가? 가장 쉬운 방법은 해당 groupid에 속하는 오프셋을 필요할 때마다 지워주는 것입니다. 이러면 항상 최신의 데이터 부터 읽어오는 것을 알 수 있습니다. 두 번째는 굉장히 빨리 데이터를 지우도록 설정하는 것인데, 이러면 의도하지 않은 동작(그 주기 이전부터 읽어오게 되므로) 원하는 형태는 아닙니다.

Advertisements

[입 개발] kafka 0.8 설치 방법

해당 블로그는 KT Olleh UCloud Biz의 지원을 받고 있습니다.

Kafka는 LinkedIn에서 만든 분산 메시지 큐로, 디스크를 이용함에도 꽤 빠른 처리속도를 보장하는(더 복잡한 부분은 논문을 보셔야 쿨럭…) 소프트웨어입니다. 주로, 로그를 수집하기 전에, 이벤트 처리를 위해서, 데이터 버스로써, 또는 일종의 버퍼로 HBase나, 다른 툴들의 앞에서 이벤트 전달을 여과해주는 역할을 주로 합니다.

0.8로 진행하면서 약간의 사용방법이 바뀌어서 설치 방법을 정리합니다.

1. 준비
여기서는 KT UCloud에 8Core, 16GB 메모리 장비를 세대 생성했습니다.
server1, server2, server3

2. Zookeeper 설치
Zookeeper 의 설치는 기본이기 때문에 넘어가도록 하겠습니다. Zookeeper 설치 방법은 뭘 봐도 동일합니다.

3. kafka 다운로드
$> git clone https://github.com/apache/kafka.git

4. kafka 빌드
$> ./sbt update
$> ./sbt package
$> ./sbt assembly-package-dependency

5. config/consumer.properties 수정

zookeeper.connect=server1:2181,server2:2181,server3:2181

6. config/producer.properties 수정

metadata.broker.list=1:server1:9092,2:172.27.174.20:9092,3:172.27.92.32:9092

7. config/server.properties 수정

#서버에 맞춰서 unique 한 값을 준다.
broker.id=0

log.dir=/home/charsyam/kafka/log

8. kafka 서버 실행

bin/kafka-server-start.sh config/server.properties

9. topic 생성
topic을 안 만들어주면 제대로 동작을 하지 않는다.

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic test

10. 샘플 실행
* consumer

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

* producer

bin/kafka-console-producer.sh --broker-list server1:9092,server2:9092,server3:9092 --topic test

이제 Kafka가 정상적으로 동작하는 것을 볼 수 있다. 이제는 이를 이용해서 어떤 작업을 할 수 있는지 알아보자.