博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka服务端源代码分析之副本拉取Leader消息
阅读量:2428 次
发布时间:2019-05-10

本文共 15270 字,大约阅读时间需要 50 分钟。

1. AbstractFetcherThread

kafka每个分区的副本分为Leader和Follower两种角色,Follower副本会定期从Leader副本拉取数据。在kafka源码中是通过ReplicaFetcherThread类实现消息的拉取与处理,ReplicaFetcherThread继承自AbstractFetcherThread,因此首先分析AbstractFetcherThread。

abstract class AbstractFetcherThread(name: String,  // 线程名称                                     clientId: String,  // Client Id,用于日志输出                                     val sourceBroker: BrokerEndPoint,  // 数据源Broker地址                                     fetchBackOffMs: Int = 0, // 获取操作重试间隔                                     isInterruptible: Boolean = true) // 线程是否允许被中断                                     extends ShutdownableThread(name, isInterruptible){
...}

AbstractFetcherThread又继承自ShutdownableThread类

abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean = true)        extends Thread(name) with Logging {
def doWork(): Unit override def run(): Unit = {
info("Starting") try {
while (isRunning) doWork() } catch {
case e: FatalExitError => shutdownInitiated.countDown() shutdownComplete.countDown() info("Stopped") Exit.exit(e.statusCode()) case e: Throwable => if (isRunning) error("Error due to", e) } finally {
shutdownComplete.countDown() } info("Stopped") }}

从ShutdownableThread的run方法中可以得出实际的执行体是子类AbstractFetcherThread的doWork方法。

override def doWork() {
maybeTruncate() // 执行副本截断操作 maybeFetch() // 执行消息获取操作 }

doWork方法中只做两件事,分别是maybeTruncate负责副本截断和maybeFetch负责消息获取。

maybeTruncate方法

private def maybeTruncate(): Unit = {
// 将所有处于截断中状态的分区依据有无Leader Epoch值进行分组 val (partitionsWithEpochs, partitionsWithoutEpochs) = fetchTruncatingPartitions() // 对于有Leader Epoch值的分区,将日志截断到Leader Epoch值对应的位移值处 if (partitionsWithEpochs.nonEmpty) {
truncateToEpochEndOffsets(partitionsWithEpochs) } // 对于没有Leader Epoch值的分区,将日志截断到高水位值处 if (partitionsWithoutEpochs.nonEmpty) {
truncateToHighWatermark(partitionsWithoutEpochs) } }

下面来看truncateToHighWatermark方法的实现,truncateToHighWatermark方法中调用了truncate方法,truncate是抽象方法,具体的实现在ReplicaFetcherThread类中。

private[server] def truncateToHighWatermark(partitions: Set[TopicPartition]): Unit = inLock(partitionMapLock) {
val fetchOffsets = mutable.HashMap.empty[TopicPartition, OffsetTruncationState] val partitionsWithError = mutable.HashSet.empty[TopicPartition] // 遍历每个要执行截断操作的分区对象 for (tp <- partitions) {
// 获取分区的分区读取状态 val partitionState = partitionStates.stateValue(tp) if (partitionState != null) {
try {
// 取出高水位值,分区的最大可读取位移值就是高水位值 val highWatermark = partitionState.fetchOffset val truncationState = OffsetTruncationState(highWatermark, truncationCompleted = true) info(s"Truncating partition $tp to local high watermark $highWatermark") // 执行截断到高水位值,具体实现在ReplicaFetcherThread类中 truncate(tp, truncationState) fetchOffsets.put(tp, truncationState) } catch {
case e: KafkaStorageException => info(s"Failed to truncate $tp", e) partitionsWithError += tp } } } handlePartitionsWithErrors(partitionsWithError) // 更新这组分区的分区读取状态 updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets) }

maybeFetch方法中会调用buildFetch构造FetchRequest,buildFetch是抽象方法,具体实现在ReplicaFetcherThread类中。

private def maybeFetch(): Unit = {
val (fetchStates, fetchRequestOpt) = inLock(partitionMapLock) {
// partitionStates中保存的是要去获取消息的分区以及对应的状态 val fetchStates = partitionStates.partitionStateMap.asScala // 为partitionStates中的分区构造FetchRequest,具体实现在ReplicaFetcherThread类中 val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(fetchStates) // 处理出错的分区,处理方式主要是将这个分区加入到有序Map末尾 handlePartitionsWithErrors(partitionsWithError) // 如果当前没有可读取的分区,则等待fetchBackOffMs时间等候后续重试 if (fetchRequestOpt.isEmpty) {
trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request") partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } (fetchStates, fetchRequestOpt) } fetchRequestOpt.foreach {
fetchRequest => // 发送FETCH请求给Leader副本,并处理Response processFetchRequest(fetchStates, fetchRequest) } }

processFetchRequest方法会调用processPartitionData完成Response的处理,processPartitionData方法是抽象方法,具体实现在ReplicaFetcherThread类中。

private def processFetchRequest(fetchStates: Map[TopicPartition, PartitionFetchState],                                  fetchRequest: FetchRequest.Builder): Unit = {
val partitionsWithError = mutable.Set[TopicPartition]() var responseData: Seq[(TopicPartition, FetchData)] = Seq.empty try {
trace(s"Sending fetch request $fetchRequest") // 从leader 分区拉取数据 responseData = fetchFromLeader(fetchRequest) } catch {
case t: Throwable => if (isRunning) {
warn(s"Error in response for fetch request $fetchRequest", t) inLock(partitionMapLock) {
partitionsWithError ++= partitionStates.partitionSet.asScala partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } } } // 更新请求发送速率指标 fetcherStats.requestRate.mark() if (responseData.nonEmpty) {
// process fetched data inLock(partitionMapLock) {
responseData.foreach {
case (topicPartition, partitionData) => Option(partitionStates.stateValue(topicPartition)).foreach {
currentFetchState => val fetchState = fetchStates(topicPartition) // 处理Response的条件: // 1. 要获取的位移值和之前已保存的下一条待获取位移值相等 // 2. 当前分区处于可获取状态 if (fetchState.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
partitionData.error match {
// 没有错误 case Errors.NONE => try {
// 完成Response的处理,具体实现在ReplicaFetcherThread类中 val logAppendInfoOpt = processPartitionData(topicPartition, currentFetchState.fetchOffset, partitionData) logAppendInfoOpt.foreach {
logAppendInfo => val validBytes = logAppendInfo.validBytes val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset fetcherLagStats.getAndMaybePut(topicPartition).lag = Math.max(0L, partitionData.highWatermark - nextOffset) if (validBytes > 0 && partitionStates.contains(topicPartition)) {
val newFetchState = PartitionFetchState(nextOffset, fetchState.currentLeaderEpoch, state = Fetching) partitionStates.updateAndMoveToEnd(topicPartition, newFetchState) fetcherStats.byteRate.mark(validBytes) } } } catch {
case ime: CorruptRecordException => error(s"Found invalid messages during fetch for partition $topicPartition " + s"offset ${currentFetchState.fetchOffset}", ime) partitionsWithError += topicPartition case e: KafkaStorageException => error(s"Error while processing data for partition $topicPartition", e) partitionsWithError += topicPartition case e: Throwable => throw new KafkaException(s"Error processing data for partition $topicPartition " + s"offset ${currentFetchState.fetchOffset}", e) } // 如果读取位移值越界,通常是因为Leader发生变更 case Errors.OFFSET_OUT_OF_RANGE => if (!handleOutOfRangeError(topicPartition, currentFetchState)) // 加入到出错分区列表 partitionsWithError += topicPartition // 如果Leader Epoch值比Leader所在Broker上的Epoch值要新 case Errors.UNKNOWN_LEADER_EPOCH => debug(s"Remote broker has a smaller leader epoch for partition $topicPartition than " + s"this replica's current leader epoch of ${fetchState.currentLeaderEpoch}.") // 加入到出错分区列表 partitionsWithError += topicPartition // 如果Leader Epoch值比Leader所在Broker上的Epoch值要旧 case Errors.FENCED_LEADER_EPOCH => onPartitionFenced(topicPartition) // 如果Leader发生变更 case Errors.NOT_LEADER_FOR_PARTITION => debug(s"Remote broker is not the leader for partition $topicPartition, which could indicate " + "that the partition is being moved") // 加入到出错分区列表 partitionsWithError += topicPartition case _ => error(s"Error for partition $topicPartition at offset ${currentFetchState.fetchOffset}", partitionData.error.exception) partitionsWithError += topicPartition } } } } } } if (partitionsWithError.nonEmpty) {
debug(s"Handling errors for partitions $partitionsWithError") // 处理出错分区列表 handlePartitionsWithErrors(partitionsWithError) } }
2.ReplicaFetcherThread

首先看下ReplicaFetcherThread类定义

class ReplicaFetcherThread(name: String,                           fetcherId: Int,                           sourceBroker: BrokerEndPoint,                           brokerConfig: KafkaConfig,                           replicaMgr: ReplicaManager,                           metrics: Metrics,                           time: Time,                           quota: ReplicaQuota,                           leaderEndpointBlockingSend: Option[BlockingSend] = None)  extends AbstractFetcherThread(name = name,                                clientId = name,                                sourceBroker = sourceBroker,                                fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,                                isInterruptible = false) {
... // follower发送tetch请求被处理返回前的最长等待时间 private val maxWait = brokerConfig.replicaFetchWaitMaxMs // 每个fetch Response返回前必须要累积的最少字节数 private val minBytes = brokerConfig.replicaFetchMinBytes // 每个合法fetch Response的最大字节数 private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes // 单个分区能够获取到的最大字节数 private val fetchSize = brokerConfig.replicaFetchMaxBytes ... }

接下来看ReplicaFetcherThread 的 3 个重要方法如何实现:processPartitionData、buildFetch 和 truncate。

1.processPartitionData方法

override def processPartitionData(topicPartition: TopicPartition,                                    fetchOffset: Long,                                    partitionData: FetchData): Option[LogAppendInfo] = {
val replica = replicaMgr.localReplicaOrException(topicPartition) // 从副本管理器获取指定主题分区对象 val partition = replicaMgr.getPartition(topicPartition).get // 将获取到的数据转换成符合格式要求的消息集合 val records = toMemoryRecords(partitionData.records) maybeWarnIfOversizedRecords(records, topicPartition) // 要读取的起始位移值如果不是本地日志LEO值则视为异常情况 if (fetchOffset != replica.logEndOffset) throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format( topicPartition, fetchOffset, replica.logEndOffset)) if (isTraceEnabled) trace("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d" .format(replica.logEndOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark)) // 写入Follower副本本地日志 val logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false) if (isTraceEnabled) trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s" .format(replica.logEndOffset, records.sizeInBytes, topicPartition)) val followerHighWatermark = replica.logEndOffset.min(partitionData.highWatermark) val leaderLogStartOffset = partitionData.logStartOffset // 更新Follower副本的高水位值 replica.highWatermark = new LogOffsetMetadata(followerHighWatermark) // 尝试更新Follower副本的Log Start Offset值 replica.maybeIncrementLogStartOffset(leaderLogStartOffset) if (isTraceEnabled) trace(s"Follower set replica high watermark for partition $topicPartition to $followerHighWatermark") // 副本消息拉取限流 if (quota.isThrottled(topicPartition)) quota.record(records.sizeInBytes) replicaMgr.brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes) logAppendInfo }

2.buildFetch方法

override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
val partitionsWithError = mutable.Set[TopicPartition]() val builder = fetchSessionHandler.newBuilder() // 遍历每个分区,将处于可获取状态的分区添加到builder后续统一处理 // 对于有错误的分区加入到出错分区列表 partitionMap.foreach {
case (topicPartition, fetchState) => if (fetchState.isReadyForFetch && !shouldFollowerThrottle(quota, topicPartition)) {
try {
val logStartOffset = replicaMgr.localReplicaOrException(topicPartition).logStartOffset builder.add(topicPartition, new FetchRequest.PartitionData( fetchState.fetchOffset, logStartOffset, fetchSize, Optional.of(fetchState.currentLeaderEpoch))) } catch {
case _: KafkaStorageException => // The replica has already been marked offline due to log directory failure and the original failure should have already been logged. // This partition should be removed from ReplicaFetcherThread soon by ReplicaManager.handleLogDirFailure() partitionsWithError += topicPartition } } } val fetchData = builder.build() val fetchRequestOpt = if (fetchData.sessionPartitions.isEmpty && fetchData.toForget.isEmpty) {
None } else {
// 构造FETCH请求的Builder对象 val requestBuilder = FetchRequest.Builder .forReplica(fetchRequestVersion, replicaId, maxWait, minBytes, fetchData.toSend) .setMaxBytes(maxBytes) .toForget(fetchData.toForget) .metadata(fetchData.metadata) Some(requestBuilder) } // 返回Builder对象以及出错分区列表 ResultWithPartitions(fetchRequestOpt, partitionsWithError) }

3.truncate方法

override def truncate(tp: TopicPartition, offsetTruncationState: OffsetTruncationState): Unit = {
val replica = replicaMgr.localReplicaOrException(tp) // 拿到分区对象 val partition = replicaMgr.getPartition(tp).get // 执行截断操作,截断到的位置由offsetTruncationState的offset指定 partition.truncateTo(offsetTruncationState.offset, isFuture = false) if (offsetTruncationState.offset < replica.highWatermark.messageOffset) warn(s"Truncating $tp to offset ${offsetTruncationState.offset} below high watermark " + s"${replica.highWatermark.messageOffset}") // mark the future replica for truncation only when we do last truncation if (offsetTruncationState.truncationCompleted) replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId, tp, offsetTruncationState.offset) }

转载地址:http://ercmb.baihongyu.com/

你可能感兴趣的文章
第二章,ubuntu系统的查看防火墙,切换root用户,设置固定ip、系统时间等
查看>>
hash原理总结
查看>>
mongodb的优缺点
查看>>
如何在cmd下,查找指定一个TXT文件的内容,把这个文本里包含关键字的所有行复制到一个新的文本中
查看>>
线程池和队列学习,队列在线程池中的使用,什么是队列阻塞,什么是有界队列
查看>>
复制集让mongodb的secondary支持读操作,因为存在读写延迟所以是否需要读写分离
查看>>
svn小乌龟怎么断开链接,怎么查看电脑中的隐藏文件,svn复制包或修改包名应注意什么
查看>>
DDR内存基础知识
查看>>
uboot结构之BL0、BL1、BL2、BL3区别于关系
查看>>
Linux模块传参
查看>>
文件系统
查看>>
Linux驱动之总线
查看>>
App 穷途末路?
查看>>
IoT 时代,你要么被颠覆,要么成为颠覆者!
查看>>
漫画:什么是囚徒困境?
查看>>
如何用 160 行代码,实现动态炫酷的可视化图表?
查看>>
山东到底有没有互联网?
查看>>
10 步教你接手同事的代码!
查看>>
买不到口罩怎么办?Python 爬虫帮你时刻盯着自动下单!| 原力计划
查看>>
一图读懂浏览器演变史 | 每日趣闻
查看>>