本文共 15270 字,大约阅读时间需要 50 分钟。
kafka每个分区的副本分为Leader和Follower两种角色,Follower副本会定期从Leader副本拉取数据。在kafka源码中是通过ReplicaFetcherThread类实现消息的拉取与处理,ReplicaFetcherThread继承自AbstractFetcherThread,因此首先分析AbstractFetcherThread。
abstract class AbstractFetcherThread(name: String, // 线程名称 clientId: String, // Client Id,用于日志输出 val sourceBroker: BrokerEndPoint, // 数据源Broker地址 fetchBackOffMs: Int = 0, // 获取操作重试间隔 isInterruptible: Boolean = true) // 线程是否允许被中断 extends ShutdownableThread(name, isInterruptible){ ...}
AbstractFetcherThread又继承自ShutdownableThread类
abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean = true) extends Thread(name) with Logging { def doWork(): Unit override def run(): Unit = { info("Starting") try { while (isRunning) doWork() } catch { case e: FatalExitError => shutdownInitiated.countDown() shutdownComplete.countDown() info("Stopped") Exit.exit(e.statusCode()) case e: Throwable => if (isRunning) error("Error due to", e) } finally { shutdownComplete.countDown() } info("Stopped") }}
从ShutdownableThread的run方法中可以得出实际的执行体是子类AbstractFetcherThread的doWork方法。
override def doWork() { maybeTruncate() // 执行副本截断操作 maybeFetch() // 执行消息获取操作 }
doWork方法中只做两件事,分别是maybeTruncate负责副本截断和maybeFetch负责消息获取。
maybeTruncate方法
private def maybeTruncate(): Unit = { // 将所有处于截断中状态的分区依据有无Leader Epoch值进行分组 val (partitionsWithEpochs, partitionsWithoutEpochs) = fetchTruncatingPartitions() // 对于有Leader Epoch值的分区,将日志截断到Leader Epoch值对应的位移值处 if (partitionsWithEpochs.nonEmpty) { truncateToEpochEndOffsets(partitionsWithEpochs) } // 对于没有Leader Epoch值的分区,将日志截断到高水位值处 if (partitionsWithoutEpochs.nonEmpty) { truncateToHighWatermark(partitionsWithoutEpochs) } }
下面来看truncateToHighWatermark方法的实现,truncateToHighWatermark方法中调用了truncate方法,truncate是抽象方法,具体的实现在ReplicaFetcherThread类中。
private[server] def truncateToHighWatermark(partitions: Set[TopicPartition]): Unit = inLock(partitionMapLock) { val fetchOffsets = mutable.HashMap.empty[TopicPartition, OffsetTruncationState] val partitionsWithError = mutable.HashSet.empty[TopicPartition] // 遍历每个要执行截断操作的分区对象 for (tp <- partitions) { // 获取分区的分区读取状态 val partitionState = partitionStates.stateValue(tp) if (partitionState != null) { try { // 取出高水位值,分区的最大可读取位移值就是高水位值 val highWatermark = partitionState.fetchOffset val truncationState = OffsetTruncationState(highWatermark, truncationCompleted = true) info(s"Truncating partition $tp to local high watermark $highWatermark") // 执行截断到高水位值,具体实现在ReplicaFetcherThread类中 truncate(tp, truncationState) fetchOffsets.put(tp, truncationState) } catch { case e: KafkaStorageException => info(s"Failed to truncate $tp", e) partitionsWithError += tp } } } handlePartitionsWithErrors(partitionsWithError) // 更新这组分区的分区读取状态 updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets) }
maybeFetch方法中会调用buildFetch构造FetchRequest,buildFetch是抽象方法,具体实现在ReplicaFetcherThread类中。
private def maybeFetch(): Unit = { val (fetchStates, fetchRequestOpt) = inLock(partitionMapLock) { // partitionStates中保存的是要去获取消息的分区以及对应的状态 val fetchStates = partitionStates.partitionStateMap.asScala // 为partitionStates中的分区构造FetchRequest,具体实现在ReplicaFetcherThread类中 val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(fetchStates) // 处理出错的分区,处理方式主要是将这个分区加入到有序Map末尾 handlePartitionsWithErrors(partitionsWithError) // 如果当前没有可读取的分区,则等待fetchBackOffMs时间等候后续重试 if (fetchRequestOpt.isEmpty) { trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request") partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } (fetchStates, fetchRequestOpt) } fetchRequestOpt.foreach { fetchRequest => // 发送FETCH请求给Leader副本,并处理Response processFetchRequest(fetchStates, fetchRequest) } }
processFetchRequest方法会调用processPartitionData完成Response的处理,processPartitionData方法是抽象方法,具体实现在ReplicaFetcherThread类中。
private def processFetchRequest(fetchStates: Map[TopicPartition, PartitionFetchState], fetchRequest: FetchRequest.Builder): Unit = { val partitionsWithError = mutable.Set[TopicPartition]() var responseData: Seq[(TopicPartition, FetchData)] = Seq.empty try { trace(s"Sending fetch request $fetchRequest") // 从leader 分区拉取数据 responseData = fetchFromLeader(fetchRequest) } catch { case t: Throwable => if (isRunning) { warn(s"Error in response for fetch request $fetchRequest", t) inLock(partitionMapLock) { partitionsWithError ++= partitionStates.partitionSet.asScala partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } } } // 更新请求发送速率指标 fetcherStats.requestRate.mark() if (responseData.nonEmpty) { // process fetched data inLock(partitionMapLock) { responseData.foreach { case (topicPartition, partitionData) => Option(partitionStates.stateValue(topicPartition)).foreach { currentFetchState => val fetchState = fetchStates(topicPartition) // 处理Response的条件: // 1. 要获取的位移值和之前已保存的下一条待获取位移值相等 // 2. 当前分区处于可获取状态 if (fetchState.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) { partitionData.error match { // 没有错误 case Errors.NONE => try { // 完成Response的处理,具体实现在ReplicaFetcherThread类中 val logAppendInfoOpt = processPartitionData(topicPartition, currentFetchState.fetchOffset, partitionData) logAppendInfoOpt.foreach { logAppendInfo => val validBytes = logAppendInfo.validBytes val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset fetcherLagStats.getAndMaybePut(topicPartition).lag = Math.max(0L, partitionData.highWatermark - nextOffset) if (validBytes > 0 && partitionStates.contains(topicPartition)) { val newFetchState = PartitionFetchState(nextOffset, fetchState.currentLeaderEpoch, state = Fetching) partitionStates.updateAndMoveToEnd(topicPartition, newFetchState) fetcherStats.byteRate.mark(validBytes) } } } catch { case ime: CorruptRecordException => error(s"Found invalid messages during fetch for partition $topicPartition " + s"offset ${currentFetchState.fetchOffset}", ime) partitionsWithError += topicPartition case e: KafkaStorageException => error(s"Error while processing data for partition $topicPartition", e) partitionsWithError += topicPartition case e: Throwable => throw new KafkaException(s"Error processing data for partition $topicPartition " + s"offset ${currentFetchState.fetchOffset}", e) } // 如果读取位移值越界,通常是因为Leader发生变更 case Errors.OFFSET_OUT_OF_RANGE => if (!handleOutOfRangeError(topicPartition, currentFetchState)) // 加入到出错分区列表 partitionsWithError += topicPartition // 如果Leader Epoch值比Leader所在Broker上的Epoch值要新 case Errors.UNKNOWN_LEADER_EPOCH => debug(s"Remote broker has a smaller leader epoch for partition $topicPartition than " + s"this replica's current leader epoch of ${fetchState.currentLeaderEpoch}.") // 加入到出错分区列表 partitionsWithError += topicPartition // 如果Leader Epoch值比Leader所在Broker上的Epoch值要旧 case Errors.FENCED_LEADER_EPOCH => onPartitionFenced(topicPartition) // 如果Leader发生变更 case Errors.NOT_LEADER_FOR_PARTITION => debug(s"Remote broker is not the leader for partition $topicPartition, which could indicate " + "that the partition is being moved") // 加入到出错分区列表 partitionsWithError += topicPartition case _ => error(s"Error for partition $topicPartition at offset ${currentFetchState.fetchOffset}", partitionData.error.exception) partitionsWithError += topicPartition } } } } } } if (partitionsWithError.nonEmpty) { debug(s"Handling errors for partitions $partitionsWithError") // 处理出错分区列表 handlePartitionsWithErrors(partitionsWithError) } }
首先看下ReplicaFetcherThread类定义
class ReplicaFetcherThread(name: String, fetcherId: Int, sourceBroker: BrokerEndPoint, brokerConfig: KafkaConfig, replicaMgr: ReplicaManager, metrics: Metrics, time: Time, quota: ReplicaQuota, leaderEndpointBlockingSend: Option[BlockingSend] = None) extends AbstractFetcherThread(name = name, clientId = name, sourceBroker = sourceBroker, fetchBackOffMs = brokerConfig.replicaFetchBackoffMs, isInterruptible = false) { ... // follower发送tetch请求被处理返回前的最长等待时间 private val maxWait = brokerConfig.replicaFetchWaitMaxMs // 每个fetch Response返回前必须要累积的最少字节数 private val minBytes = brokerConfig.replicaFetchMinBytes // 每个合法fetch Response的最大字节数 private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes // 单个分区能够获取到的最大字节数 private val fetchSize = brokerConfig.replicaFetchMaxBytes ... }
接下来看ReplicaFetcherThread 的 3 个重要方法如何实现:processPartitionData、buildFetch 和 truncate。
1.processPartitionData方法
override def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: FetchData): Option[LogAppendInfo] = { val replica = replicaMgr.localReplicaOrException(topicPartition) // 从副本管理器获取指定主题分区对象 val partition = replicaMgr.getPartition(topicPartition).get // 将获取到的数据转换成符合格式要求的消息集合 val records = toMemoryRecords(partitionData.records) maybeWarnIfOversizedRecords(records, topicPartition) // 要读取的起始位移值如果不是本地日志LEO值则视为异常情况 if (fetchOffset != replica.logEndOffset) throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format( topicPartition, fetchOffset, replica.logEndOffset)) if (isTraceEnabled) trace("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d" .format(replica.logEndOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark)) // 写入Follower副本本地日志 val logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false) if (isTraceEnabled) trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s" .format(replica.logEndOffset, records.sizeInBytes, topicPartition)) val followerHighWatermark = replica.logEndOffset.min(partitionData.highWatermark) val leaderLogStartOffset = partitionData.logStartOffset // 更新Follower副本的高水位值 replica.highWatermark = new LogOffsetMetadata(followerHighWatermark) // 尝试更新Follower副本的Log Start Offset值 replica.maybeIncrementLogStartOffset(leaderLogStartOffset) if (isTraceEnabled) trace(s"Follower set replica high watermark for partition $topicPartition to $followerHighWatermark") // 副本消息拉取限流 if (quota.isThrottled(topicPartition)) quota.record(records.sizeInBytes) replicaMgr.brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes) logAppendInfo }
2.buildFetch方法
override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = { val partitionsWithError = mutable.Set[TopicPartition]() val builder = fetchSessionHandler.newBuilder() // 遍历每个分区,将处于可获取状态的分区添加到builder后续统一处理 // 对于有错误的分区加入到出错分区列表 partitionMap.foreach { case (topicPartition, fetchState) => if (fetchState.isReadyForFetch && !shouldFollowerThrottle(quota, topicPartition)) { try { val logStartOffset = replicaMgr.localReplicaOrException(topicPartition).logStartOffset builder.add(topicPartition, new FetchRequest.PartitionData( fetchState.fetchOffset, logStartOffset, fetchSize, Optional.of(fetchState.currentLeaderEpoch))) } catch { case _: KafkaStorageException => // The replica has already been marked offline due to log directory failure and the original failure should have already been logged. // This partition should be removed from ReplicaFetcherThread soon by ReplicaManager.handleLogDirFailure() partitionsWithError += topicPartition } } } val fetchData = builder.build() val fetchRequestOpt = if (fetchData.sessionPartitions.isEmpty && fetchData.toForget.isEmpty) { None } else { // 构造FETCH请求的Builder对象 val requestBuilder = FetchRequest.Builder .forReplica(fetchRequestVersion, replicaId, maxWait, minBytes, fetchData.toSend) .setMaxBytes(maxBytes) .toForget(fetchData.toForget) .metadata(fetchData.metadata) Some(requestBuilder) } // 返回Builder对象以及出错分区列表 ResultWithPartitions(fetchRequestOpt, partitionsWithError) }
3.truncate方法
override def truncate(tp: TopicPartition, offsetTruncationState: OffsetTruncationState): Unit = { val replica = replicaMgr.localReplicaOrException(tp) // 拿到分区对象 val partition = replicaMgr.getPartition(tp).get // 执行截断操作,截断到的位置由offsetTruncationState的offset指定 partition.truncateTo(offsetTruncationState.offset, isFuture = false) if (offsetTruncationState.offset < replica.highWatermark.messageOffset) warn(s"Truncating $tp to offset ${offsetTruncationState.offset} below high watermark " + s"${replica.highWatermark.messageOffset}") // mark the future replica for truncation only when we do last truncation if (offsetTruncationState.truncationCompleted) replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId, tp, offsetTruncationState.offset) }
转载地址:http://ercmb.baihongyu.com/