This commit is contained in:
louzefeng
2024-07-11 05:50:32 +00:00
parent bf99793fd0
commit d3828a7aee
6071 changed files with 0 additions and 0 deletions

View File

@@ -0,0 +1,297 @@
<audio id="audio" title="01 | 日志段:保存消息文件的对象是怎么实现的?" controls="" preload="none"><source id="mp3" src="https://static001.geekbang.org/resource/audio/94/e0/94846052e0ed70ff96b1b021e449bfe0.mp3"></audio>
你好,我是胡夕。
今天我们开始学习Kafka源代码分析的第一模块日志Log、日志段LogSegment以及索引Index源码。
日志段及其相关代码是Kafka服务器源码中最为重要的组件代码之一。你可能会非常关心在Kafka中消息是如何被保存和组织在一起的。毕竟**不管是学习任何消息引擎,弄明白消息建模方式都是首要的问题**。因此,你非常有必要学习日志段这个重要的子模块的源码实现。
除此之外了解日志段也有很多实际意义比如说你一定对Kafka底层日志文件00000000000000012345.log的命名感到很好奇。学过日志段之后我相信这个问题一定会迎刃而解的。
今天我会带你详细看下日志段部分的源码。不过在此之前你需要先了解一下Kafka的日志结构。
## Kafka日志结构概览
Kafka日志在磁盘上的组织架构如下图所示
<img src="https://static001.geekbang.org/resource/image/72/4b/72fb27cb49e41a61524322ab6bd1cb4b.jpg" alt="">
日志是Kafka服务器端代码的重要组件之一很多其他的核心组件都是以日志为基础的比如后面要讲到的状态管理机和副本管理器等。
总的来说Kafka日志对象由多个日志段对象组成而每个日志段对象会在磁盘上创建一组文件包括消息日志文件.log、位移索引文件.index、时间戳索引文件.timeindex以及已中止Aborted事务的索引文件.txnindex。当然如果你没有使用Kafka事务已中止事务的索引文件是不会被创建出来的。图中的一串数字0是该日志段的起始位移值Base Offset也就是该日志段中所存的第一条消息的位移值。
一般情况下一个Kafka主题有很多分区每个分区就对应一个Log对象在物理磁盘上则对应于一个子目录。比如你创建了一个双分区的主题test-topic那么Kafka在磁盘上会创建两个子目录test-topic-0和test-topic-1。而在服务器端这就是两个Log对象。每个子目录下存在多组日志段也就是多组.log、.index、.timeindex文件组合只不过文件名不同因为每个日志段的起始位移不同。
## 日志段代码解析
阅读日志段源码是很有必要的因为日志段是Kafka保存消息的最小载体。也就是说消息是保存在日志段中的。然而官网对于日志段的描述少得可怜以至于很多人对于这么重要的概念都知之甚少。
但是,不熟悉日志段的话,如果在生产环境出现相应的问题,我们是没有办法快速找到解决方案的。我跟你分享一个真实案例。
我们公司之前碰到过一个问题当时大面积日志段同时间切分导致瞬时打满磁盘I/O带宽。对此所有人都束手无策最终只能求助于日志段源码。
最后我们在LogSegment的shouldRoll方法中找到了解决方案设置Broker端参数log.roll.jitter.ms值大于0即通过给日志段切分执行时间加一个扰动值的方式来避免大量日志段在同一时刻执行切分动作从而显著降低磁盘I/O。
后来在复盘的时候我们一致认为阅读LogSegment源码是非常正确的决定。否则单纯查看官网对该参数的说明我们不一定能够了解它的真实作用。那log.roll.jitter.ms参数的具体作用是啥呢下面咱们说日志段的时候我会给你详细解释下。
那话不多说现在我们就来看一下日志段源码。我会重点给你讲一下日志段类声明、append方法、read方法和recover方法。
你首先要知道的是,日志段源码位于 Kafka 的 core 工程下,具体文件位置是 core/src/main/scala/kafka/log/LogSegment.scala。实际上所有日志结构部分的源码都在 core 的 kafka.log 包下。
该文件下定义了三个 Scala 对象:
- LogSegment class
- LogSegment object
- LogFlushStats object。LogFlushStats 结尾有个 Stats它是做统计用的主要负责为日志落盘进行计时。
我们主要关心的是 **LogSegment class 和 object**。在 Scala 语言里,在一个源代码文件中同时定义相同名字的 class 和 object 的用法被称为伴生Companion。Class 对象被称为伴生类,它和 Java 中的类是一样的;而 Object 对象是一个单例对象,用于保存一些静态变量或静态方法。如果用 Java 来做类比的话我们必须要编写两个类才能实现这两个类也就是LogSegment 和 LogSegmentUtils。在 Scala 中,你直接使用伴生就可以了。
对了值得一提的是Kafka 中的源码注释写得非常详细。我不打算把注释也贴出来,但我特别推荐你要读一读源码中的注释。比如,今天我们要学习的日志段文件开头的一大段注释写得就非常精彩。我截一个片段让你感受下:
A segment of the log. Each segment has two components: a log and an index. The log is a FileRecords containing the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each segment has a base offset which is an offset &lt;= the least offset of any message in this segment and &gt; any offset in any previous segment.
这段文字清楚地说明了每个日志段由两个核心组件构成日志和索引。当然这里的索引泛指广义的索引文件。另外这段注释还给出了一个重要的事实每个日志段都有一个起始位移值Base Offset而该位移值是此日志段所有消息中最小的位移值同时该值却又比前面任何日志段中消息的位移值都大。看完这个注释我们就能够快速地了解起始位移值在日志段中的作用了。
### 日志段类声明
下面,我分批次给出比较关键的代码片段,并对其进行解释。首先,我们看下 LogSegment 的定义:
```
class LogSegment private[log] (val log: FileRecords,
val lazyOffsetIndex: LazyIndex[OffsetIndex],
val lazyTimeIndex: LazyIndex[TimeIndex],
val txnIndex: TransactionIndex,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
val time: Time) extends Logging { … }
```
就像我前面说的,一个日志段包含**消息日志文件**、**位移索引文件**、**时间戳索引文件**、**已中止事务索引文件**等。这里的 FileRecords 就是实际保存 Kafka 消息的对象。专栏后面我将专门讨论 Kafka 是如何保存具体消息的,也就是 FileRecords 及其家族的实现方式。同时,我还会给你介绍一下社区在持久化消息这块是怎么演进的,你一定不要错过那部分的内容。
下面的 lazyOffsetIndex、lazyTimeIndex 和 txnIndex 分别对应于刚才所说的 3 个索引文件。不过,在实现方式上,前两种使用了延迟初始化的原理,降低了初始化时间成本。后面我们在谈到索引的时候再详细说。
每个日志段对象保存自己的起始位移 **baseOffset**——这是非常重要的属性事实上你在磁盘上看到的文件名就是baseOffset的值。每个LogSegment对象实例一旦被创建它的起始位移就是固定的了不能再被更改。
indexIntervalBytes 值其实就是 Broker 端参数 log.index.interval.bytes 值,它控制了**日志段对象新增索引项的频率**。默认情况下,日志段至少新写入 4KB 的消息数据才会新增一条索引项。而 rollJitterMs 是日志段对象新增倒计时的“扰动值”。因为目前 Broker 端日志段新增倒计时是全局设置,这就是说,在未来的某个时刻可能同时创建多个日志段对象,这将极大地增加物理磁盘 I/O 压力。有了 rollJitterMs 值的干扰,每个新增日志段在创建时会彼此岔开一小段时间,这样可以缓解物理磁盘的 I/O 负载瓶颈。
至于最后的 time 参数,它就是用于统计计时的一个实现类,在 Kafka 源码中普遍出现,我就不详细展开讲了。
下面我来说一些重要的方法。
对于一个日志段而言,最重要的方法就是写入消息和读取消息了,它们分别对应着源码中的 append 方法和 read 方法。另外recover方法同样很关键它是Broker重启后恢复日志段的操作逻辑。
### append方法
我们先来看append 方法,了解下写入消息的具体操作。
```
def append(largestOffset: Long,
largestTimestamp: Long,
shallowOffsetOfMaxTimestamp: Long,
records: MemoryRecords): Unit = {
if (records.sizeInBytes &gt; 0) {
trace(s&quot;Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} &quot; +
s&quot;with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp&quot;)
val physicalPosition = log.sizeInBytes()
if (physicalPosition == 0)
rollingBasedTimestamp = Some(largestTimestamp)
ensureOffsetInRange(largestOffset)
// append the messages
val appendedBytes = log.append(records)
trace(s&quot;Appended $appendedBytes to ${log.file} at end offset $largestOffset&quot;)
// Update the in memory max timestamp and corresponding offset.
if (largestTimestamp &gt; maxTimestampSoFar) {
maxTimestampSoFar = largestTimestamp
offsetOfMaxTimestampSoFar = shallowOffsetOfMaxTimestamp
}
// append an entry to the index (if needed)
if (bytesSinceLastIndexEntry &gt; indexIntervalBytes) {
offsetIndex.append(largestOffset, physicalPosition)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
bytesSinceLastIndexEntry = 0
}
bytesSinceLastIndexEntry += records.sizeInBytes
}
}
```
append 方法接收 4 个参数,分别表示待写入消息批次中消息的**最大位移值**、**最大时间戳**、**最大时间戳对应消息的位移**以及**真正要写入的消息集合**。下面这张图展示了 append 方法的完整执行流程:
<img src="https://static001.geekbang.org/resource/image/67/5c/6700570d3052fcadda54767ed8dc385c.jpg" alt="">
**第一步:**
在源码中,首先调用 log.sizeInBytes 方法判断该日志段是否为空,如果是空的话, Kafka 需要记录要写入消息集合的最大时间戳,并将其作为后面新增日志段倒计时的依据。
**第二步:**
代码调用 ensureOffsetInRange 方法确保输入参数最大位移值是合法的。那怎么判断是不是合法呢?标准就是看它与日志段起始位移的差值是否在整数范围内,即 largestOffset - baseOffset的值是不是介于 [0Int.MAXVALUE] 之间。在极个别的情况下这个差值可能会越界这时append 方法就会抛出异常,阻止后续的消息写入。一旦你碰到这个问题,你需要做的是升级你的 Kafka 版本,因为这是由已知的 Bug 导致的。
**第三步:**
待这些做完之后append 方法调用 FileRecords 的 append 方法执行真正的写入。前面说过了,专栏后面我们会详细介绍 FileRecords 类。这里你只需要知道它的工作是将内存中的消息对象写入到操作系统的页缓存就可以了。
**第四步:**
再下一步,就是更新日志段的最大时间戳以及最大时间戳所属消息的位移值属性。每个日志段都要保存当前最大时间戳信息和所属消息的位移信息。
还记得 Broker 端提供定期删除日志的功能吗?比如我只想保留最近 7 天的日志,没错,当前最大时间戳这个值就是判断的依据;而最大时间戳对应的消息的位移值则用于时间戳索引项。虽然后面我会详细介绍,这里我还是稍微提一下:**时间戳索引项保存时间戳与消息位移的对应关系**。在这步操作中Kafka会更新并保存这组对应关系。
**第五步:**
append 方法的最后一步就是更新索引项和写入的字节数了。我在前面说过,日志段每写入 4KB 数据就要写入一个索引项。当已写入字节数超过了 4KB 之后append 方法会调用索引对象的 append 方法新增索引项,同时清空已写入字节数,以备下次重新累积计算。
### read 方法
好了append 方法我就解释完了。下面我们来看read方法了解下读取日志段的具体操作。
```
def read(startOffset: Long,
maxSize: Int,
maxPosition: Long = size,
minOneMessage: Boolean = false): FetchDataInfo = {
if (maxSize &lt; 0)
throw new IllegalArgumentException(s&quot;Invalid max size $maxSize for log read from segment $log&quot;)
val startOffsetAndSize = translateOffset(startOffset)
// if the start position is already off the end of the log, return null
if (startOffsetAndSize == null)
return null
val startPosition = startOffsetAndSize.position
val offsetMetadata = LogOffsetMetadata(startOffset, this.baseOffset, startPosition)
val adjustedMaxSize =
if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
else maxSize
// return a log segment but with zero size in the case below
if (adjustedMaxSize == 0)
return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize)
FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),
firstEntryIncomplete = adjustedMaxSize &lt; startOffsetAndSize.size)
}
```
read 方法接收 4 个输入参数。
- startOffset要读取的第一条消息的位移
- maxSize能读取的最大字节数
- maxPosition :能读到的最大文件位置;
- minOneMessage是否允许在消息体过大时至少返回第一条消息。
前3个参数的含义很好理解我重点说下第 4 个。当这个参数为 true 时,即使出现消息体字节数超过了 maxSize 的情形read 方法依然能返回至少一条消息。引入这个参数主要是为了确保不出现消费饿死的情况。
下图展示了 read 方法的完整执行逻辑:
<img src="https://static001.geekbang.org/resource/image/61/45/61c97ee41b52e63e771cf5503e0ee345.jpg" alt="">
逻辑很简单,我们一步步来看下。
第一步是调用 translateOffset 方法定位要读取的起始文件位置 startPosition。输入参数 startOffset 仅仅是位移值Kafka 需要根据索引信息找到对应的物理文件位置才能开始读取消息。
待确定了读取起始位置,日志段代码需要根据这部分信息以及 maxSize 和 maxPosition 参数共同计算要读取的总字节数。举个例子,假设 maxSize=100maxPosition=300startPosition=250那么 read 方法只能读取 50 字节,因为 maxPosition - startPosition = 50。我们把它和maxSize参数相比较其中的最小值就是最终能够读取的总字节数。
最后一步是调用 FileRecords 的 slice 方法,从指定位置读取指定大小的消息集合。
### recover 方法
除了append 和read 方法LogSegment 还有一个重要的方法需要我们关注,它就是 recover方法用于**恢复日志段**。
下面的代码是 recover 方法源码。什么是恢复日志段呢?其实就是说, Broker 在启动时会从磁盘上加载所有日志段信息到内存中,并创建相应的 LogSegment 对象实例。在这个过程中,它需要执行一系列的操作。
```
def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = {
offsetIndex.reset()
timeIndex.reset()
txnIndex.reset()
var validBytes = 0
var lastIndexEntry = 0
maxTimestampSoFar = RecordBatch.NO_TIMESTAMP
try {
for (batch &lt;- log.batches.asScala) {
batch.ensureValid()
ensureOffsetInRange(batch.lastOffset)
// The max timestamp is exposed at the batch level, so no need to iterate the records
if (batch.maxTimestamp &gt; maxTimestampSoFar) {
maxTimestampSoFar = batch.maxTimestamp
offsetOfMaxTimestampSoFar = batch.lastOffset
}
// Build offset index
if (validBytes - lastIndexEntry &gt; indexIntervalBytes) {
offsetIndex.append(batch.lastOffset, validBytes)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
lastIndexEntry = validBytes
}
validBytes += batch.sizeInBytes()
if (batch.magic &gt;= RecordBatch.MAGIC_VALUE_V2) {
leaderEpochCache.foreach { cache =&gt;
if (batch.partitionLeaderEpoch &gt; 0 &amp;&amp; cache.latestEpoch.forall(batch.partitionLeaderEpoch &gt; _))
cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
}
updateProducerState(producerStateManager, batch)
}
}
} catch {
case e@ (_: CorruptRecordException | _: InvalidRecordException) =&gt;
warn(&quot;Found invalid messages in log segment %s at byte offset %d: %s. %s&quot;
.format(log.file.getAbsolutePath, validBytes, e.getMessage, e.getCause))
}
val truncated = log.sizeInBytes - validBytes
if (truncated &gt; 0)
debug(s&quot;Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery&quot;)
log.truncateTo(validBytes)
offsetIndex.trimToValidSize()
// A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, skipFullCheck = true)
timeIndex.trimToValidSize()
truncated
}
```
我依然使用一张图来说明 recover 的处理逻辑:
<img src="https://static001.geekbang.org/resource/image/eb/6c/eb5bd324685ee393e8a3072fc4b4276c.jpg" alt="">
recover 开始时,代码依次调用索引对象的 reset 方法清空所有的索引文件之后会开始遍历日志段中的所有消息集合或消息批次RecordBatch。对于读取到的每个消息集合日志段必须要确保它们是合法的这主要体现在两个方面
1. 该集合中的消息必须要符合 Kafka 定义的二进制格式;
1. 该集合中最后一条消息的位移值不能越界,即它与日志段起始位移的差值必须是一个正整数值。
校验完消息集合之后代码会更新遍历过程中观测到的最大时间戳以及所属消息的位移值。同样这两个数据用于后续构建索引项。再之后就是不断累加当前已读取的消息字节数并根据该值有条件地写入索引项。最后是更新事务型Producer的状态以及Leader Epoch缓存。不过这两个并不是理解Kafka日志结构所必需的组件因此我们可以忽略它们。
遍历执行完成后Kafka 会将日志段当前总字节数和刚刚累加的已读取字节数进行比较,如果发现前者比后者大,说明日志段写入了一些非法消息,需要执行截断操作,将日志段大小调整回合法的数值。同时, Kafka 还必须相应地调整索引文件的大小。把这些都做完之后,日志段恢复的操作也就宣告结束了。
## 总结
今天我们对Kafka日志段源码进行了重点的分析包括日志段的append方法、read方法和recover方法。
1. append方法我重点分析了源码是如何写入消息到日志段的。你要重点关注一下写操作过程中更新索引的时机是如何设定的。
1. read方法我重点分析了源码底层读取消息的完整流程。你要关注下Kafka计算待读取消息字节数的逻辑也就是maxSize、maxPosition和startOffset是如何共同影响read方法的。
1. recover方法这个操作会读取日志段文件然后重建索引文件。再强调一下**这个操作在执行过程中要读取日志段文件**。因此如果你的环境上有很多日志段文件你又发现Broker重启很慢那你现在就知道了这是因为Kafka在执行recover的过程中需要读取大量的磁盘文件导致的。你看这就是我们读取源码的收获。
<img src="https://static001.geekbang.org/resource/image/15/80/158bed3c92e7205fc450bb8b2d136480.jpg" alt="">
这三个方法是日志段对象最重要的功能。你一定要仔细阅读它们尽量做到对源码中每行代码的作用都了然于心。没有什么代码是读一遍不能理解的如果有那就再多读几遍。另外我希望你特别关注下append和read方法它们将是后面我们讨论日志对象时重点会用到的两个方法。毕竟读写日志是Kafka最常用的操作而日志读取底层调用的就是日志段的这两个方法。
## 课后讨论
如果你查看日志段源码的话你会发现还有一个比较重要的方法我没有提到那就是truncateTo方法这个方法会将日志段中的数据强制截断到指定的位移处。该方法只有20几行代码我希望你可以自己去阅读下然后思考这样一个问题如果指定的位移值特别特别大以至于超过了日志段本身保存的最大位移值该方法对执行效果是怎么样的
欢迎你在留言区畅所欲言,跟我交流讨论,也欢迎你把文章分享给你的朋友。

View File

@@ -0,0 +1,586 @@
<audio id="audio" title="02 | 日志(上):日志究竟是如何加载日志段的?" controls="" preload="none"><source id="mp3" src="https://static001.geekbang.org/resource/audio/59/d4/59545895a0902d128c0f04221eced3d4.mp3"></audio>
你好我是胡夕。今天我来讲讲Kafka源码的日志Log对象。
上节课,我们学习了日志段部分的源码,你可以认为,**日志是日志段的容器,里面定义了很多管理日志段的操作**。坦率地说如果看Kafka源码却不看Log就跟你买了这门课却不知道作者是谁一样。在我看来Log对象是Kafka源码特别是Broker端最核心的部分没有之一。
它到底有多重要呢我和你分享一个例子你先感受下。我最近正在修复一个Kafka的Bug[KAFKA-9157](https://issues.apache.org/jira/browse/KAFKA-9157)在某些情况下Kafka的Compaction操作会产生很多空的日志段文件。如果要避免这些空日志段文件被创建出来就必须搞懂创建日志段文件的原理而这些代码恰恰就在Log源码中。
既然Log源码要管理日志段对象那么它就必须先把所有日志段对象加载到内存里面。这个过程是怎么实现的呢今天我就带你学习下日志加载日志段的过程。
首先我们来看下Log对象的源码结构。
## Log源码结构
Log源码位于Kafka core工程的log源码包下文件名是Log.scala。总体上该文件定义了10个类和对象如下图所示
<img src="https://static001.geekbang.org/resource/image/81/ce/8126a191f63d9abea860d71992b0aece.jpg" alt="">
那么这10个类和对象都是做什么的呢我先给你简单介绍一下你可以对它们有个大致的了解。
不过在介绍之前我先提一句图中括号里的C表示ClassO表示Object。还记得我在上节课提到过的伴生对象吗没错同时定义同名的Class和Object就属于Scala中的伴生对象用法。
我们先来看伴生对象也就是LogAppendInfo、Log和RollParams。
**1.LogAppendInfo**
- LogAppendInfoC保存了一组待写入消息的各种元数据信息。比如这组消息中第一条消息的位移值是多少、最后一条消息的位移值是多少再比如这组消息中最大的消息时间戳又是多少。总之这里面的数据非常丰富下节课我再具体说说
- LogAppendInfoO: 可以理解为其对应伴生类的工厂方法类里面定义了一些工厂方法用于创建特定的LogAppendInfo实例。
**2.Log**
- LogC: Log源码中最核心的代码。这里我先卖个关子一会儿细聊。
- LogO同理Log伴生类的工厂方法定义了很多常量以及一些辅助方法。
**3.RollParams**
- RollParamsC定义用于控制日志段是否切分Roll的数据结构。
- RollParamsO同理RollParams伴生类的工厂方法。
除了这3组伴生对象之外还有4类源码。
- LogMetricNames定义了Log对象的监控指标。
- LogOffsetSnapshot封装分区所有位移元数据的容器类。
- LogReadInfo封装读取日志返回的数据及其元数据。
- CompletedTxn记录已完成事务的元数据主要用于构建事务索引。
## Log Class &amp; Object
下面我会按照这些类和对象的重要程度对它们一一进行拆解。首先咱们先说说Log类及其伴生对象。
考虑到伴生对象多用于保存静态变量和静态方法比如静态工厂方法等因此我们先看伴生对象即Log Object的实现。毕竟柿子先找软的捏
```
object Log {
val LogFileSuffix = &quot;.log&quot;
val IndexFileSuffix = &quot;.index&quot;
val TimeIndexFileSuffix = &quot;.timeindex&quot;
val ProducerSnapshotFileSuffix = &quot;.snapshot&quot;
val TxnIndexFileSuffix = &quot;.txnindex&quot;
val DeletedFileSuffix = &quot;.deleted&quot;
val CleanedFileSuffix = &quot;.cleaned&quot;
val SwapFileSuffix = &quot;.swap&quot;
val CleanShutdownFile = &quot;.kafka_cleanshutdown&quot;
val DeleteDirSuffix = &quot;-delete&quot;
val FutureDirSuffix = &quot;-future&quot;
……
}
```
这是Log Object定义的所有常量。如果有面试官问你Kafka中定义了多少种文件类型你可以自豪地把这些说出来。耳熟能详的.log、.index、.timeindex和.txnindex我就不解释了我们来了解下其他几种文件类型。
- .snapshot是Kafka为幂等型或事务型Producer所做的快照文件。鉴于我们现在还处于阅读源码的初级阶段事务或幂等部分的源码我就不详细展开讲了。
- .deleted是删除日志段操作创建的文件。目前删除日志段文件是异步操作Broker端把日志段文件从.log后缀修改为.deleted后缀。如果你看到一大堆.deleted后缀的文件名别慌这是Kafka在执行日志段文件删除。
- .cleaned和.swap都是Compaction操作的产物等我们讲到Cleaner的时候再说。
- -delete则是应用于文件夹的。当你删除一个主题的时候主题的分区文件夹会被加上这个后缀。
- -future是用于变更主题分区文件夹地址的属于比较高阶的用法。
总之记住这些常量吧。记住它们的主要作用是以后不要被面试官唬住开玩笑其实这些常量最重要的地方就在于它们能够让你了解Kafka定义的各种文件类型。
Log Object还定义了超多的工具类方法。由于它们都很简单这里我只给出一个方法的源码我们一起读一下。
```
def filenamePrefixFromOffset(offset: Long): String = {
val nf = NumberFormat.getInstance()
nf.setMinimumIntegerDigits(20)
nf.setMaximumFractionDigits(0)
nf.setGroupingUsed(false)
nf.format(offset)
}
```
这个方法的作用是**通过给定的位移值计算出对应的日志段文件名**。Kafka日志文件固定是20位的长度filenamePrefixFromOffset方法就是用前面补0的方式把给定位移值扩充成一个固定20位长度的字符串。
举个例子我们给定一个位移值是12345那么Broker端磁盘上对应的日志段文件名就应该是00000000000000012345.log。怎么样很简单吧其他的工具类方法也很简单我就不一一展开说了。
下面我们来看Log源码部分的重头戏**Log类**。这是一个2000多行的大类。放眼整个Kafka源码像Log这么大的类也不多见足见它的重要程度。我们先来看这个类的定义
```
class Log(@volatile var dir: File,
@volatile var config: LogConfig,
@volatile var logStartOffset: Long,
@volatile var recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
val time: Time,
val maxProducerIdExpirationMs: Int,
val producerIdExpirationCheckIntervalMs: Int,
val topicPartition: TopicPartition,
val producerStateManager: ProducerStateManager,
logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
……
}
```
看着好像有很多属性,但其实,你只需要记住两个属性的作用就够了:**dir和logStartOffset**。dir就是这个日志所在的文件夹路径也就是**主题分区的路径**。而logStartOffset表示**日志的当前最早位移**。dir和logStartOffset都是volatile var类型表示它们的值是变动的而且可能被多个线程更新。
你可能听过日志的当前末端位移也就是Log End OffsetLEO它是表示日志下一条待插入消息的位移值而这个Log Start Offset是跟它相反的它表示日志当前对外可见的最早一条消息的位移值。我用一张图来标识它们的区别
<img src="https://static001.geekbang.org/resource/image/38/b4/388672f6dab8571f272ed47c9679c2b4.jpg" alt="">
图中绿色的位移值3是日志的Log Start Offset而位移值15表示LEO。另外位移值8是高水位值它是区分已提交消息和未提交消息的分水岭。
有意思的是Log End Offset可以简称为LEO但Log Start Offset却不能简称为LSO。因为在Kafka中LSO特指Log Stable Offset属于Kafka事务的概念。这个课程中不会涉及LSO你只需要知道Log Start Offset不等于LSO即可。
Log类的其他属性你暂时不用理会因为它们要么是很明显的工具类属性比如timer和scheduler要么是高阶用法才会用到的高级属性比如producerStateManager和logDirFailureChannel。工具类的代码大多是做辅助用的跳过它们也不妨碍我们理解Kafka的核心功能而高阶功能代码设计复杂学习成本高性价比不高。
其实除了Log类签名定义的这些属性之外Log类还定义了一些很重要的属性比如下面这段代码
```
@volatile private var nextOffsetMetadata: LogOffsetMetadata = _
@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
@volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None
```
第一个属性nextOffsetMetadata它封装了下一条待插入消息的位移值你基本上可以把这个属性和LEO等同起来。
第二个属性highWatermarkMetadata是分区日志高水位值。关于高水位的概念我们在[《Kafka核心技术与实战》](https://time.geekbang.org/column/intro/100029201)这个课程中做过详细解释,你可以看一下[这篇文章](https://time.geekbang.org/column/article/112118)(下节课我还会再具体给你介绍下)。
第三个属性segments我认为这是Log类中最重要的属性。它保存了分区日志下所有的日志段信息只不过是用Map的数据结构来保存的。Map的Key值是日志段的起始位移值Value则是日志段对象本身。Kafka源码使用ConcurrentNavigableMap数据结构来保存日志段对象就可以很轻松地利用该类提供的线程安全和各种支持排序的方法来管理所有日志段对象。
第四个属性是Leader Epoch Cache对象。Leader Epoch是社区于0.11.0.0版本引入源码中的主要是用来判断出现Failure时是否执行日志截断操作Truncation。之前靠高水位来判断的机制可能会造成副本间数据不一致的情形。这里的Leader Epoch Cache是一个缓存类数据里面保存了分区Leader的Epoch值与对应位移值的映射关系我建议你查看下LeaderEpochFileCache类深入地了解下它的实现原理。
掌握了这些基本属性之后我们看下Log类的初始化逻辑
```
locally {
val startMs = time.milliseconds
// create the log directory if it doesn't exist
Files.createDirectories(dir.toPath)
initializeLeaderEpochCache()
val nextOffset = loadSegments()
/* Calculate the offset of the next message */
nextOffsetMetadata = LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size)
leaderEpochCache.foreach(_.truncateFromEnd(nextOffsetMetadata.messageOffset))
logStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset)
// The earliest leader epoch may not be flushed during a hard failure. Recover it here.
leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
// Any segment loading or recovery code must not use producerStateManager, so that we can build the full state here
// from scratch.
if (!producerStateManager.isEmpty)
throw new IllegalStateException(&quot;Producer state must be empty during log initialization&quot;)
loadProducerState(logEndOffset, reloadFromCleanShutdown = hasCleanShutdownFile)
info(s&quot;Completed load of log with ${segments.size} segments, log start offset $logStartOffset and &quot; +
s&quot;log end offset $logEndOffset in ${time.milliseconds() - startMs}
```
在详细解释这段初始化代码之前,我使用一张图来说明它到底做了什么:
<img src="https://static001.geekbang.org/resource/image/a1/a8/a10b81680a449e5b1d8882939061f7a8.jpg" alt="">
这里我们重点说说第三步即加载日志段的实现逻辑以下是loadSegments的实现代码
```
private def loadSegments(): Long = {
// first do a pass through the files in the log directory and remove any temporary files
// and find any interrupted swap operations
val swapFiles = removeTempFilesAndCollectSwapFiles()
// Now do a second pass and load all the log and index files.
// We might encounter legacy log segments with offset overflow (KAFKA-6264). We need to split such segments. When
// this happens, restart loading segment files from scratch.
retryOnOffsetOverflow {
// In case we encounter a segment with offset overflow, the retry logic will split it after which we need to retry
// loading of segments. In that case, we also need to close all segments that could have been left open in previous
// call to loadSegmentFiles().
logSegments.foreach(_.close())
segments.clear()
loadSegmentFiles()
}
// Finally, complete any interrupted swap operations. To be crash-safe,
// log files that are replaced by the swap segment should be renamed to .deleted
// before the swap file is restored as the new segment file.
completeSwapOperations(swapFiles)
if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
val nextOffset = retryOnOffsetOverflow {
recoverLog()
}
// reset the index size of the currently active log segment to allow more entries
activeSegment.resizeIndexes(config.maxIndexSize)
nextOffset
} else {
if (logSegments.isEmpty) {
addSegment(LogSegment.open(dir = dir,
baseOffset = 0,
config,
time = time,
fileAlreadyExists = false,
initFileSize = this.initFileSize,
preallocate = false))
}
0
}
```
这段代码会对分区日志路径遍历两次。
首先它会移除上次Failure遗留下来的各种临时文件包括.cleaned、.swap、.deleted文件等removeTempFilesAndCollectSwapFiles方法实现了这个逻辑。
之后它会清空所有日志段对象并且再次遍历分区路径重建日志段segments Map并删除无对应日志段文件的孤立索引文件。
待执行完这两次遍历之后它会完成未完成的swap操作即调用completeSwapOperations方法。等这些都做完之后再调用recoverLog方法恢复日志段对象然后返回恢复之后的分区日志LEO值。
如果你现在觉得有点蒙,也没关系,我把这段代码再进一步拆解下,以更小的粒度跟你讲下它们做了什么。理解了这段代码之后,你大致就能搞清楚大部分的分区日志操作了。所以,这部分代码绝对值得我们多花一点时间去学习。
我们首先来看第一步removeTempFilesAndCollectSwapFiles方法的实现。我用注释的方式详细解释了每行代码的作用
```
private def removeTempFilesAndCollectSwapFiles(): Set[File] = {
// 在方法内部定义一个名为deleteIndicesIfExist的方法用于删除日志文件对应的索引文件
def deleteIndicesIfExist(baseFile: File, suffix: String = &quot;&quot;): Unit = {
info(s&quot;Deleting index files with suffix $suffix for baseFile $baseFile&quot;)
val offset = offsetFromFile(baseFile)
Files.deleteIfExists(Log.offsetIndexFile(dir, offset, suffix).toPath)
Files.deleteIfExists(Log.timeIndexFile(dir, offset, suffix).toPath)
Files.deleteIfExists(Log.transactionIndexFile(dir, offset, suffix).toPath)
}
var swapFiles = Set[File]()
var cleanFiles = Set[File]()
var minCleanedFileOffset = Long.MaxValue
// 遍历分区日志路径下的所有文件
for (file &lt;- dir.listFiles if file.isFile) {
if (!file.canRead) // 如果不可读直接抛出IOException
throw new IOException(s&quot;Could not read file $file&quot;)
val filename = file.getName
if (filename.endsWith(DeletedFileSuffix)) { // 如果以.deleted结尾
debug(s&quot;Deleting stray temporary file ${file.getAbsolutePath}&quot;)
Files.deleteIfExists(file.toPath) // 说明是上次Failure遗留下来的文件直接删除
} else if (filename.endsWith(CleanedFileSuffix)) { // 如果以.cleaned结尾
minCleanedFileOffset = Math.min(offsetFromFileName(filename), minCleanedFileOffset) // 选取文件名中位移值最小的.cleaned文件获取其位移值并将该文件加入待删除文件集合中
cleanFiles += file
} else if (filename.endsWith(SwapFileSuffix)) { // 如果以.swap结尾
val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, &quot;&quot;))
info(s&quot;Found file ${file.getAbsolutePath} from interrupted swap operation.&quot;)
if (isIndexFile(baseFile)) { // 如果该.swap文件原来是索引文件
deleteIndicesIfExist(baseFile) // 删除原来的索引文件
} else if (isLogFile(baseFile)) { // 如果该.swap文件原来是日志文件
deleteIndicesIfExist(baseFile) // 删除掉原来的索引文件
swapFiles += file // 加入待恢复的.swap文件集合中
}
}
}
// 从待恢复swap集合中找出那些起始位移值大于minCleanedFileOffset值的文件直接删掉这些无效的.swap文件
val (invalidSwapFiles, validSwapFiles) = swapFiles.partition(file =&gt; offsetFromFile(file) &gt;= minCleanedFileOffset)
invalidSwapFiles.foreach { file =&gt;
debug(s&quot;Deleting invalid swap file ${file.getAbsoluteFile} minCleanedFileOffset: $minCleanedFileOffset&quot;)
val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, &quot;&quot;))
deleteIndicesIfExist(baseFile, SwapFileSuffix)
Files.deleteIfExists(file.toPath)
}
// Now that we have deleted all .swap files that constitute an incomplete split operation, let's delete all .clean files
// 清除所有待删除文件集合中的文件
cleanFiles.foreach { file =&gt;
debug(s&quot;Deleting stray .clean file ${file.getAbsolutePath}&quot;)
Files.deleteIfExists(file.toPath)
}
// 最后返回当前有效的.swap文件集合
validSwapFiles
}
```
执行完了removeTempFilesAndCollectSwapFiles逻辑之后源码开始清空已有日志段集合并重新加载日志段文件。这就是第二步。这里调用的主要方法是loadSegmentFiles。
```
private def loadSegmentFiles(): Unit = {
// 按照日志段文件名中的位移值正序排列,然后遍历每个文件
for (file &lt;- dir.listFiles.sortBy(_.getName) if file.isFile) {
if (isIndexFile(file)) { // 如果是索引文件
val offset = offsetFromFile(file)
val logFile = Log.logFile(dir, offset)
if (!logFile.exists) { // 确保存在对应的日志文件,否则记录一个警告,并删除该索引文件
warn(s&quot;Found an orphaned index file ${file.getAbsolutePath}, with no corresponding log file.&quot;)
Files.deleteIfExists(file.toPath)
}
} else if (isLogFile(file)) { // 如果是日志文件
val baseOffset = offsetFromFile(file)
val timeIndexFileNewlyCreated = !Log.timeIndexFile(dir, baseOffset).exists()
// 创建对应的LogSegment对象实例并加入segments中
val segment = LogSegment.open(dir = dir,
baseOffset = baseOffset,
config,
time = time,
fileAlreadyExists = true)
try segment.sanityCheck(timeIndexFileNewlyCreated)
catch {
case _: NoSuchFileException =&gt;
error(s&quot;Could not find offset index file corresponding to log file ${segment.log.file.getAbsolutePath}, &quot; +
&quot;recovering segment and rebuilding index files...&quot;)
recoverSegment(segment)
case e: CorruptIndexException =&gt;
warn(s&quot;Found a corrupted index file corresponding to log file ${segment.log.file.getAbsolutePath} due &quot; +
s&quot;to ${e.getMessage}}, recovering segment and rebuilding index files...&quot;)
recoverSegment(segment)
}
addSegment(segment)
}
}
}
```
第三步是处理第一步返回的有效.swap文件集合。completeSwapOperations方法就是做这件事的
```
private def completeSwapOperations(swapFiles: Set[File]): Unit = {
// 遍历所有有效.swap文件
for (swapFile &lt;- swapFiles) {
val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, &quot;&quot;)) // 获取对应的日志文件
val baseOffset = offsetFromFile(logFile) // 拿到日志文件的起始位移值
// 创建对应的LogSegment实例
val swapSegment = LogSegment.open(swapFile.getParentFile,
baseOffset = baseOffset,
config,
time = time,
fileSuffix = SwapFileSuffix)
info(s&quot;Found log file ${swapFile.getPath} from interrupted swap operation, repairing.&quot;)
// 执行日志段恢复操作
recoverSegment(swapSegment)
// We create swap files for two cases:
// (1) Log cleaning where multiple segments are merged into one, and
// (2) Log splitting where one segment is split into multiple.
//
// Both of these mean that the resultant swap segments be composed of the original set, i.e. the swap segment
// must fall within the range of existing segment(s). If we cannot find such a segment, it means the deletion
// of that segment was successful. In such an event, we should simply rename the .swap to .log without having to
// do a replace with an existing segment.
// 确认之前删除日志段是否成功,是否还存在老的日志段文件
val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.readNextOffset).filter { segment =&gt;
segment.readNextOffset &gt; swapSegment.baseOffset
}
// 将生成的.swap文件加入到日志中删除掉swap之前的日志段
replaceSegments(Seq(swapSegment), oldSegments.toSeq, isRecoveredSwapFile = true)
}
}
```
最后一步是recoverLog操作
```
private def recoverLog(): Long = {
// if we have the clean shutdown marker, skip recovery
// 如果不存在以.kafka_cleanshutdown结尾的文件。通常都不存在
if (!hasCleanShutdownFile) {
// 获取到上次恢复点以外的所有unflushed日志段对象
val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).toIterator
var truncated = false
// 遍历这些unflushed日志段
while (unflushed.hasNext &amp;&amp; !truncated) {
val segment = unflushed.next
info(s&quot;Recovering unflushed segment ${segment.baseOffset}&quot;)
val truncatedBytes =
try {
// 执行恢复日志段操作
recoverSegment(segment, leaderEpochCache)
} catch {
case _: InvalidOffsetException =&gt;
val startOffset = segment.baseOffset
warn(&quot;Found invalid offset during recovery. Deleting the corrupt segment and &quot; +
s&quot;creating an empty one with starting offset $startOffset&quot;)
segment.truncateTo(startOffset)
}
if (truncatedBytes &gt; 0) { // 如果有无效的消息导致被截断的字节数不为0直接删除剩余的日志段对象
warn(s&quot;Corruption found in segment ${segment.baseOffset}, truncating to offset ${segment.readNextOffset}&quot;)
removeAndDeleteSegments(unflushed.toList, asyncDelete = true)
truncated = true
}
}
}
// 这些都做完之后,如果日志段集合不为空
if (logSegments.nonEmpty) {
val logEndOffset = activeSegment.readNextOffset
if (logEndOffset &lt; logStartOffset) { // 验证分区日志的LEO值不能小于Log Start Offset值否则删除这些日志段对象
warn(s&quot;Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). &quot; +
&quot;This could happen if segment files were deleted from the file system.&quot;)
removeAndDeleteSegments(logSegments, asyncDelete = true)
}
}
// 这些都做完之后,如果日志段集合为空了
if (logSegments.isEmpty) {
// 至少创建一个新的日志段以logStartOffset为日志段的起始位移并加入日志段集合中
addSegment(LogSegment.open(dir = dir,
baseOffset = logStartOffset,
config,
time = time,
fileAlreadyExists = false,
initFileSize = this.initFileSize,
preallocate = config.preallocate))
}
// 更新上次恢复点属性,并返回
recoveryPoint = activeSegment.readNextOffset
recoveryPoint
```
## 总结
今天我重点向你介绍了Kafka的Log源码主要包括
1. **Log文件的源码结构**你可以看下下面的导图它展示了Log类文件的架构组成你要重点掌握Log类及其相关方法。
1. **加载日志段机制**:我结合源码重点分析了日志在初始化时是如何加载日志段的。前面说过了,日志是日志段的容器,弄明白如何加载日志段是后续学习日志段管理的前提条件。
<img src="https://static001.geekbang.org/resource/image/dd/fc/dd2bf4882021d969accb14c0017d9dfc.jpg" alt="">
总的来说虽然洋洋洒洒几千字但我也只讲了最重要的部分。我建议你多看几遍Log.scala中加载日志段的代码这对后面我们理解Kafka Broker端日志段管理原理大有裨益。在下节课我会继续讨论日志部分的源码带你学习常见的Kafka日志操作。
## 课后讨论
Log源码中有个maybeIncrementHighWatermark方法你能说说它的实现原理吗
欢迎你在留言区畅所欲言,跟我交流讨论,也欢迎你把今天的内容分享给你的朋友。

View File

@@ -0,0 +1,803 @@
<audio id="audio" title="03 | 日志彻底搞懂Log对象的常见操作" controls="" preload="none"><source id="mp3" src="https://static001.geekbang.org/resource/audio/4c/d1/4ce9089c910559893b291c9d35ca5ed1.mp3"></audio>
你好我是胡夕。上节课我们一起了解了日志加载日志段的过程。今天我会继续带你学习Log源码给你介绍Log对象的常见操作。
我一般习惯把Log的常见操作分为4大部分。
1. **高水位管理操作**高水位的概念在Kafka中举足轻重对它的管理是Log最重要的功能之一。
1. **日志段管理**Log是日志段的容器。高效组织与管理其下辖的所有日志段对象是源码要解决的核心问题。
1. **关键位移值管理**日志定义了很多重要的位移值比如Log Start Offset和LEO等。确保这些位移值的正确性是构建消息引擎一致性的基础。
1. **读写操作**:所谓的操作日志,大体上就是指读写日志。读写操作的作用之大,不言而喻。
接下来我会按照这个顺序和你介绍Log对象的常见操作并希望你特别关注下高水位管理部分。
事实上社区关于日志代码的很多改进都是基于高水位机制的有的甚至是为了替代高水位机制而做的更新。比如Kafka的KIP-101提案正式引入的Leader Epoch机制就是用来替代日志截断操作中的高水位的。显然要深入学习Leader Epoch你至少要先了解高水位并清楚它的弊病在哪儿才行。
既然高水位管理这么重要,那我们就从它开始说起吧。
## 高水位管理操作
在介绍高水位管理操作之前,我们先来了解一下高水位的定义。
### 定义
源码中日志对象定义高水位的语句只有一行:
```
@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
```
这行语句传达了两个重要的事实:
1. 高水位值是volatile易变型的。因为多个线程可能同时读取它因此需要设置成volatile保证内存可见性。另外由于高水位值可能被多个线程同时修改因此源码使用Java Monitor锁来确保并发修改的线程安全。
1. 高水位值的初始值是Log Start Offset值。上节课我们提到每个Log对象都会维护一个Log Start Offset值。当首次构建高水位时它会被赋值成Log Start Offset值。
你可能会关心LogOffsetMetadata是什么对象。因为它比较重要我们一起来看下这个类的定义
```
case class LogOffsetMetadata(messageOffset: Long,
segmentBaseOffset: Long = Log.UnknownOffset, relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition)
```
显然它就是一个POJO类里面保存了三个重要的变量。
1. messageOffset**消息位移值**,这是最重要的信息。我们总说高水位值,其实指的就是这个变量的值。
1. segmentBaseOffset**保存该位移值所在日志段的起始位移**。日志段起始位移值辅助计算两条消息在物理磁盘文件中位置的差值,即两条消息彼此隔了多少字节。这个计算有个前提条件,即两条消息必须处在同一个日志段对象上,不能跨日志段对象。否则它们就位于不同的物理文件上,计算这个值就没有意义了。**这里的segmentBaseOffset就是用来判断两条消息是否处于同一个日志段的**。
1. relativePositionSegment**保存该位移值所在日志段的物理磁盘位置**。这个字段在计算两个位移值之间的物理磁盘位置差值时非常有用。你可以想一想Kafka什么时候需要计算位置之间的字节数呢答案就是在读取日志的时候。假设每次读取时只能读1MB的数据那么源码肯定需要关心两个位移之间所有消息的总字节数是否超过了1MB。
LogOffsetMetadata类的所有方法都是围绕这3个变量展开的工具辅助类方法非常容易理解。我会给出一个方法的详细解释剩下的你可以举一反三。
```
def onSameSegment(that: LogOffsetMetadata): Boolean = {
if (messageOffsetOnly)
throw new KafkaException(s&quot;$this cannot compare its segment info with $that since it only has message offset info&quot;)
this.segmentBaseOffset == that.segmentBaseOffset
}
```
看名字我们就知道了这个方法就是用来判断给定的两个LogOffsetMetadata对象是否处于同一个日志段的。判断方法很简单就是比较两个LogOffsetMetadata对象的segmentBaseOffset值是否相等。
好了,我们接着说回高水位,你要重点关注下获取和设置高水位值、更新高水位值,以及读取高水位值的方法。
### 获取和设置高水位值
关于获取高水位值的方法其实很好理解我就不多说了。设置高水位值的方法也就是Setter方法更复杂一些为了方便你理解我用注释的方式来解析它的作用。
```
// getter method读取高水位的位移值
def highWatermark: Long = highWatermarkMetadata.messageOffset
// setter method设置高水位值
private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = {
if (newHighWatermark.messageOffset &lt; 0) // 高水位值不能是负数
throw new IllegalArgumentException(&quot;High watermark offset should be non-negative&quot;)
lock synchronized { // 保护Log对象修改的Monitor锁
highWatermarkMetadata = newHighWatermark // 赋值新的高水位值
producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset) // 处理事务状态管理器的高水位值更新逻辑,忽略它……
maybeIncrementFirstUnstableOffset() // First Unstable Offset是Kafka事务机制的一部分忽略它……
}
trace(s&quot;Setting high watermark $newHighWatermark&quot;)
}
```
### 更新高水位值
除此之外,源码还定义了两个更新高水位值的方法:**updateHighWatermark**和**maybeIncrementHighWatermark**。从名字上来看,前者是一定要更新高水位值的,而后者是可能会更新也可能不会。
我们分别看下它们的实现原理。
```
// updateHighWatermark method
def updateHighWatermark(hw: Long): Long = {
// 新高水位值一定介于[Log Start OffsetLog End Offset]之间
val newHighWatermark = if (hw &lt; logStartOffset)
logStartOffset
else if (hw &gt; logEndOffset)
logEndOffset
else
hw
// 调用Setter方法来更新高水位值
updateHighWatermarkMetadata(LogOffsetMetadata(newHighWatermark))
newHighWatermark // 最后返回新高水位值
}
// maybeIncrementHighWatermark method
def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = {
// 新高水位值不能越过Log End Offset
if (newHighWatermark.messageOffset &gt; logEndOffset)
throw new IllegalArgumentException(s&quot;High watermark $newHighWatermark update exceeds current &quot; +
s&quot;log end offset $logEndOffsetMetadata&quot;)
lock.synchronized {
val oldHighWatermark = fetchHighWatermarkMetadata // 获取老的高水位值
// 新高水位值要比老高水位值大以维持单调增加特性,否则就不做更新!
// 另外,如果新高水位值在新日志段上,也可执行更新高水位操作
if (oldHighWatermark.messageOffset &lt; newHighWatermark.messageOffset ||
(oldHighWatermark.messageOffset == newHighWatermark.messageOffset &amp;&amp; oldHighWatermark.onOlderSegment(newHighWatermark))) {
updateHighWatermarkMetadata(newHighWatermark)
Some(oldHighWatermark) // 返回老的高水位值
} else {
None
}
}
}
```
你可能觉得奇怪,为什么要定义两个更新高水位的方法呢?
其实这两个方法有着不同的用途。updateHighWatermark方法主要用在Follower副本从Leader副本获取到消息后更新高水位值。一旦拿到新的消息就必须要更新高水位值而maybeIncrementHighWatermark方法主要是用来更新Leader副本的高水位值。需要注意的是Leader副本高水位值的更新是有条件的——某些情况下会更新高水位值某些情况下可能不会。
就像我刚才说的Follower副本成功拉取Leader副本的消息后必须更新高水位值但Producer端向Leader副本写入消息时分区的高水位值就可能不需要更新——因为它可能需要等待其他Follower副本同步的进度。因此源码中定义了两个更新的方法它们分别应用于不同的场景。
### 读取高水位值
关于高水位值管理的最后一个操作是**fetchHighWatermarkMetadata方法**。它不仅仅是获取高水位值,还要获取高水位的其他元数据信息,即日志段起始位移和物理位置信息。下面是它的实现逻辑:
```
private def fetchHighWatermarkMetadata: LogOffsetMetadata = {
checkIfMemoryMappedBufferClosed() // 读取时确保日志不能被关闭
val offsetMetadata = highWatermarkMetadata // 保存当前高水位值到本地变量,避免多线程访问干扰
if (offsetMetadata.messageOffsetOnly) { //没有获得到完整的高水位元数据
lock.synchronized {
val fullOffset = convertToOffsetMetadataOrThrow(highWatermark) // 通过读日志文件的方式把完整的高水位元数据信息拉出来
updateHighWatermarkMetadata(fullOffset) // 然后再更新一下高水位对象
fullOffset
}
} else { // 否则,直接返回即可
offsetMetadata
}
}
```
## 日志段管理
前面我反复说过,日志是日志段的容器,那它究竟是如何承担起容器一职的呢?
```
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
```
可以看到源码使用Java的ConcurrentSkipListMap类来保存所有日志段对象。ConcurrentSkipListMap有2个明显的优势。
- **它是线程安全的**这样Kafka源码不需要自行确保日志段操作过程中的线程安全
- **它是键值Key可排序的Map**。Kafka将每个日志段的起始位移值作为Key这样一来我们就能够很方便地根据所有日志段的起始位移值对它们进行排序和比较同时还能快速地找到与给定位移值相近的前后两个日志段。
所谓的日志段管理无非是增删改查。接下来我们就从这4个方面一一来看下。
**1.增加**
Log对象中定义了添加日志段对象的方法**addSegment**。
```
def addSegment(segment: LogSegment): LogSegment = this.segments.put(segment.baseOffset, segment)
```
很简单吧就是调用Map的put方法将给定的日志段对象添加到segments中。
**2.删除**
删除操作相对来说复杂一点。我们知道Kafka有很多留存策略包括基于时间维度的、基于空间维度的和基于Log Start Offset维度的。那啥是留存策略呢其实它本质上就是**根据一定的规则决定哪些日志段可以删除**。
从源码角度来看Log中控制删除操作的总入口是**deleteOldSegments无参方法**
```
def deleteOldSegments(): Int = {
if (config.delete) {
deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
} else {
deleteLogStartOffsetBreachedSegments()
}
}
```
代码中的deleteRetentionMsBreachedSegments、deleteRetentionSizeBreachedSegments和deleteLogStartOffsetBreachedSegments分别对应于上面的那3个策略。
下面这张图展示了Kafka当前的三种日志留存策略以及底层涉及到日志段删除的所有方法
<img src="https://static001.geekbang.org/resource/image/f3/ad/f321f8f8572356248465f00bd5b702ad.jpg" alt="">
从图中我们可以知道上面3个留存策略方法底层都会调用带参数版本的deleteOldSegments方法而这个方法又相继调用了deletableSegments和deleteSegments方法。下面我们来深入学习下这3个方法的代码。
首先是带参数版的deleteOldSegments方法
```
private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) =&gt; Boolean, reason: String): Int = {
lock synchronized {
val deletable = deletableSegments(predicate)
if (deletable.nonEmpty)
info(s&quot;Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(&quot;,&quot;)}] due to $reason&quot;)
deleteSegments(deletable)
}
}
```
该方法只有两个步骤:
1. 使用传入的函数计算哪些日志段对象能够被删除;
1. 调用deleteSegments方法删除这些日志段。
接下来是deletableSegments方法我用注释的方式来解释下主体代码含义
```
private def deletableSegments(predicate: (LogSegment, Option[LogSegment]) =&gt; Boolean): Iterable[LogSegment] = {
if (segments.isEmpty) { // 如果当前压根就没有任何日志段对象,直接返回
Seq.empty
} else {
val deletable = ArrayBuffer.empty[LogSegment]
var segmentEntry = segments.firstEntry
// 从具有最小起始位移值的日志段对象开始遍历,直到满足以下条件之一便停止遍历:
// 1. 测定条件函数predicate = false
// 2. 扫描到包含Log对象高水位值所在的日志段对象
// 3. 最新的日志段对象不包含任何消息
// 最新日志段对象是segments中Key值最大对应的那个日志段也就是我们常说的Active Segment。完全为空的Active Segment如果被允许删除后面还要重建它故代码这里不允许删除大小为空的Active Segment。
// 在遍历过程中同时不满足以上3个条件的所有日志段都是可以被删除的
while (segmentEntry != null) {
val segment = segmentEntry.getValue
val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey)
val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) =
if (nextSegmentEntry != null)
(nextSegmentEntry.getValue, nextSegmentEntry.getValue.baseOffset, false)
else
(null, logEndOffset, segment.size == 0)
if (highWatermark &gt;= upperBoundOffset &amp;&amp; predicate(segment, Option(nextSegment)) &amp;&amp; !isLastSegmentAndEmpty) {
deletable += segment
segmentEntry = nextSegmentEntry
} else {
segmentEntry = null
}
}
deletable
}
}
```
最后是deleteSegments方法这个方法执行真正的日志段删除操作。
```
private def deleteSegments(deletable: Iterable[LogSegment]): Int = {
maybeHandleIOException(s&quot;Error while deleting segments for $topicPartition in dir ${dir.getParent}&quot;) {
val numToDelete = deletable.size
if (numToDelete &gt; 0) {
// 不允许删除所有日志段对象。如果一定要做先创建出一个新的来然后再把前面N个删掉
if (segments.size == numToDelete)
roll()
lock synchronized {
checkIfMemoryMappedBufferClosed() // 确保Log对象没有被关闭
// 删除给定的日志段对象以及底层的物理文件
removeAndDeleteSegments(deletable, asyncDelete = true)
// 尝试更新日志的Log Start Offset值
maybeIncrementLogStartOffset(
segments.firstEntry.getValue.baseOffset)
}
}
numToDelete
}
}
```
这里我稍微解释一下为什么要在删除日志段对象之后尝试更新Log Start Offset值。Log Start Offset值是整个Log对象对外可见消息的最小位移值。如果我们删除了日志段对象很有可能对外可见消息的范围发生了变化自然要看一下是否需要更新Log Start Offset值。这就是deleteSegments方法最后要更新Log Start Offset值的原因。
**3.修改**
说完了日志段删除,接下来我们来看如何修改日志段对象。
其实源码里面不涉及修改日志段对象所谓的修改或更新也就是替换而已用新的日志段对象替换老的日志段对象。举个简单的例子。segments.put(1L, newSegment)语句在没有Key=1时是添加日志段否则就是替换已有日志段。
**4.查询**
最后再说下查询日志段对象。源码中需要查询日志段对象的地方太多了但主要都是利用了ConcurrentSkipListMap的现成方法。
- segments.firstEntry获取第一个日志段对象
- segments.lastEntry获取最后一个日志段对象即Active Segment
- segments.higherEntry获取第一个起始位移值≥给定Key值的日志段对象
- segments.floorEntry获取最后一个起始位移值≤给定Key值的日志段对象。
## 关键位移值管理
Log对象维护了一些关键位移值数据比如Log Start Offset、LEO等。其实高水位值也算是关键位移值只不过它太重要了所以我单独把它拎出来作为独立的一部分来讲了。
还记得我上节课给你说的那张标识LEO和Log Start Offset的图吗我再来借助这张图说明一下这些关键位移值的区别
<img src="https://static001.geekbang.org/resource/image/38/b4/388672f6dab8571f272ed47c9679c2b4.jpg" alt="">
请注意这张图中位移值15的虚线方框。这揭示了一个重要的事实**Log对象中的LEO永远指向下一条待插入消息********也就是说LEO值上面是没有消息的**源码中定义LEO的语句很简单
```
@volatile private var nextOffsetMetadata: LogOffsetMetadata = _
```
这里的nextOffsetMetadata就是我们所说的LEO它也是LogOffsetMetadata类型的对象。Log对象初始化的时候源码会加载所有日志段对象并由此计算出当前Log的下一条消息位移值。之后Log对象将此位移值赋值给LEO代码片段如下
```
locally {
val startMs = time.milliseconds
// 创建日志路径保存Log对象磁盘文件
Files.createDirectories(dir.toPath)
// 初始化Leader Epoch缓存
initializeLeaderEpochCache()
// 加载所有日志段对象并返回该Log对象下一条消息的位移值
val nextOffset = loadSegments()
// 初始化LEO元数据对象LEO值为上一步获取的位移值起始位移值是Active Segment的起始位移值日志段大小是Active Segment的大小
nextOffsetMetadata = LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size)
// 更新Leader Epoch缓存去除LEO值之上的所有无效缓存项
leaderEpochCache.foreach(
_.truncateFromEnd(nextOffsetMetadata.messageOffset))
......
}
```
当然代码中单独定义了更新LEO的updateLogEndOffset方法
```
private def updateLogEndOffset(offset: Long): Unit = {
nextOffsetMetadata = LogOffsetMetadata(offset, activeSegment.baseOffset, activeSegment.size)
if (highWatermark &gt;= offset) {
updateHighWatermarkMetadata(nextOffsetMetadata)
}
if (this.recoveryPoint &gt; offset) {
this.recoveryPoint = offset
}
}
```
根据上面的源码你应该能看到更新过程很简单我就不再展开说了。不过你需要注意的是如果在更新过程中发现新LEO值小于高水位值那么Kafka还要更新高水位值因为对于同一个Log对象而言高水位值是不能越过LEO值的。这一点你一定要切记再切记
讲到这儿我就要提问了Log对象什么时候需要更新LEO呢
实际上LEO对象被更新的时机有4个。
1. **Log对象初始化时**当Log对象初始化时我们必须要创建一个LEO对象并对其进行初始化。
1. **写入新消息时**这个最容易理解。以上面的图为例当不断向Log对象插入新消息时LEO值就像一个指针一样需要不停地向右移动也就是不断地增加。
1. **Log对象发生日志切分Log Roll时**日志切分是啥呢其实就是创建一个全新的日志段对象并且关闭当前写入的日志段对象。这通常发生在当前日志段对象已满的时候。一旦发生日志切分说明Log对象切换了Active Segment那么LEO中的起始位移值和段大小数据都要被更新因此在进行这一步操作时我们必须要更新LEO对象。
1. **日志截断Log Truncation时**这个也是显而易见的。日志中的部分消息被删除了自然可能导致LEO值发生变化从而要更新LEO对象。
你可以在代码中查看一下updateLogEndOffset方法的调用时机验证下是不是和我所说的一致。这里我也想给你一个小小的提示**阅读源码的时候,最好加入一些思考,而不是简单地全盘接受源码的内容,也许你会有不一样的收获**。
说完了LEO我再跟你说说Log Start Offset。其实就操作的流程和原理而言源码管理Log Start Offset的方式要比LEO简单因为Log Start Offset不是一个对象它就是一个长整型的值而已。代码定义了专门的updateLogStartOffset方法来更新它。该方法很简单我就不详细说了你可以自己去学习下它的实现。
现在我们再来思考一下Kafka什么时候需要更新Log Start Offset呢我们一一来看下。
1. **Log对象初始化时**和LEO类似Log对象初始化时要给Log Start Offset赋值一般是将第一个日志段的起始位移值赋值给它。
1. **日志截断时**同理一旦日志中的部分消息被删除可能会导致Log Start Offset发生变化因此有必要更新该值。
1. **Follower副本同步时**一旦Leader副本的Log对象的Log Start Offset值发生变化。为了维持和Leader副本的一致性Follower副本也需要尝试去更新该值。
1. **删除日志段时**这个和日志截断是类似的。凡是涉及消息删除的操作都有可能导致Log Start Offset值的变化。
1. **删除消息时**严格来说这个更新时机有点本末倒置了。在Kafka中删除消息就是通过抬高Log Start Offset值来实现的因此删除消息时必须要更新该值。
## 读写操作
最后我重点说说针对Log对象的读写操作。
**1.写操作**
在Log中涉及写操作的方法有3个appendAsLeader、appendAsFollower和append。它们的调用关系如下图所示
<img src="https://static001.geekbang.org/resource/image/ef/24/efd914ef24911704fa5d23d38447a024.jpg" alt="">
appendAsLeader是用于写Leader副本的appendAsFollower是用于Follower副本同步的。它们的底层都调用了append方法。
我们重点学习下append方法。下图是append方法的执行流程
<img src="https://static001.geekbang.org/resource/image/e4/f1/e4b47776198b7def72332f93930f65f1.jpg" alt="">
看到这张图你可能会感叹“天呐执行步骤居然有12步这么多”别急现在我用代码注释的方式给你分别解释下每步的实现原理。
```
private def append(records: MemoryRecords,
origin: AppendOrigin,
interBrokerProtocolVersion: ApiVersion,
assignOffsets: Boolean,
leaderEpoch: Int): LogAppendInfo = {
maybeHandleIOException(s&quot;Error while appending records to $topicPartition in dir ${dir.getParent}&quot;) {
// 第1步分析和验证待写入消息集合并返回校验结果
val appendInfo = analyzeAndValidateRecords(records, origin)
// 如果压根就不需要写入任何消息,直接返回即可
if (appendInfo.shallowCount == 0)
return appendInfo
// 第2步消息格式规整即删除无效格式消息或无效字节
var validRecords = trimInvalidBytes(records, appendInfo)
lock synchronized {
checkIfMemoryMappedBufferClosed() // 确保Log对象未关闭
if (assignOffsets) { // 需要分配位移
// 第3步使用当前LEO值作为待写入消息集合中第一条消息的位移值
val offset = new LongRef(nextOffsetMetadata.messageOffset)
appendInfo.firstOffset = Some(offset.value)
val now = time.milliseconds
val validateAndOffsetAssignResult = try {
LogValidator.validateMessagesAndAssignOffsets(validRecords,
topicPartition,
offset,
time,
now,
appendInfo.sourceCodec,
appendInfo.targetCodec,
config.compact,
config.messageFormatVersion.recordVersion.value,
config.messageTimestampType,
config.messageTimestampDifferenceMaxMs,
leaderEpoch,
origin,
interBrokerProtocolVersion,
brokerTopicStats)
} catch {
case e: IOException =&gt;
throw new KafkaException(s&quot;Error validating messages while appending to log $name&quot;, e)
}
// 更新校验结果对象类LogAppendInfo
validRecords = validateAndOffsetAssignResult.validatedRecords
appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
appendInfo.lastOffset = offset.value - 1
appendInfo.recordConversionStats = validateAndOffsetAssignResult.recordConversionStats
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
appendInfo.logAppendTime = now
// 第4步验证消息确保消息大小不超限
if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
for (batch &lt;- validRecords.batches.asScala) {
if (batch.sizeInBytes &gt; config.maxMessageSize) {
// we record the original message set size instead of the trimmed size
// to be consistent with pre-compression bytesRejectedRate recording
brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
throw new RecordTooLargeException(s&quot;Message batch size is ${batch.sizeInBytes} bytes in append to&quot; +
s&quot;partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.&quot;)
}
}
}
} else { // 直接使用给定的位移值,无需自己分配位移值
if (!appendInfo.offsetsMonotonic) // 确保消息位移值的单调递增性
throw new OffsetsOutOfOrderException(s&quot;Out of order offsets found in append to $topicPartition: &quot; +
records.records.asScala.map(_.offset))
if (appendInfo.firstOrLastOffsetOfFirstBatch &lt; nextOffsetMetadata.messageOffset) {
val firstOffset = appendInfo.firstOffset match {
case Some(offset) =&gt; offset
case None =&gt; records.batches.asScala.head.baseOffset()
}
val firstOrLast = if (appendInfo.firstOffset.isDefined) &quot;First offset&quot; else &quot;Last offset of the first batch&quot;
throw new UnexpectedAppendOffsetException(
s&quot;Unexpected offset in append to $topicPartition. $firstOrLast &quot; +
s&quot;${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. &quot; +
s&quot;First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in&quot; +
s&quot; append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset&quot;,
firstOffset, appendInfo.lastOffset)
}
}
// 第5步更新Leader Epoch缓存
validRecords.batches.asScala.foreach { batch =&gt;
if (batch.magic &gt;= RecordBatch.MAGIC_VALUE_V2) {
maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
} else {
leaderEpochCache.filter(_.nonEmpty).foreach { cache =&gt;
warn(s&quot;Clearing leader epoch cache after unexpected append with message format v${batch.magic}&quot;)
cache.clearAndFlush()
}
}
}
// 第6步确保消息大小不超限
if (validRecords.sizeInBytes &gt; config.segmentSize) {
throw new RecordBatchTooLargeException(s&quot;Message batch size is ${validRecords.sizeInBytes} bytes in append &quot; +
s&quot;to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.&quot;)
}
// 第7步执行日志切分。当前日志段剩余容量可能无法容纳新消息集合因此有必要创建一个新的日志段来保存待写入的所有消息
val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
val logOffsetMetadata = LogOffsetMetadata(
messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
segmentBaseOffset = segment.baseOffset,
relativePositionInSegment = segment.size)
// 第8步验证事务状态
val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
logOffsetMetadata, validRecords, origin)
maybeDuplicate.foreach { duplicate =&gt;
appendInfo.firstOffset = Some(duplicate.firstOffset)
appendInfo.lastOffset = duplicate.lastOffset
appendInfo.logAppendTime = duplicate.timestamp
appendInfo.logStartOffset = logStartOffset
return appendInfo
}
// 第9步执行真正的消息写入操作主要调用日志段对象的append方法实现
segment.append(largestOffset = appendInfo.lastOffset,
largestTimestamp = appendInfo.maxTimestamp,
shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
records = validRecords)
// 第10步更新LEO对象其中LEO值是消息集合中最后一条消息位移值+1
// 前面说过LEO值永远指向下一条不存在的消息
updateLogEndOffset(appendInfo.lastOffset + 1)
// 第11步更新事务状态
for (producerAppendInfo &lt;- updatedProducers.values) {
producerStateManager.update(producerAppendInfo)
}
for (completedTxn &lt;- completedTxns) {
val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
segment.updateTxnIndex(completedTxn, lastStableOffset)
producerStateManager.completeTxn(completedTxn)
}
producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)
maybeIncrementFirstUnstableOffset()
trace(s&quot;Appended message set with last offset: ${appendInfo.lastOffset}, &quot; +
s&quot;first offset: ${appendInfo.firstOffset}, &quot; +
s&quot;next offset: ${nextOffsetMetadata.messageOffset}, &quot; +
s&quot;and messages: $validRecords&quot;)
// 是否需要手动落盘。一般情况下我们不需要设置Broker端参数log.flush.interval.messages
// 落盘操作交由操作系统来完成。但某些情况下,可以设置该参数来确保高可靠性
if (unflushedMessages &gt;= config.flushInterval)
flush()
// 第12步返回写入结果
appendInfo
}
}
}
```
这些步骤里有没有需要你格外注意的呢我希望你重点关注下第1步即Kafka如何校验消息重点是看**针对不同的消息格式版本Kafka是如何做校验的**。
说起消息校验你还记得上一讲我们提到的LogAppendInfo类吗它就是一个普通的POJO类里面几乎保存了待写入消息集合的所有信息。我们来详细了解一下。
```
case class LogAppendInfo(var firstOffset: Option[Long],
var lastOffset: Long, // 消息集合最后一条消息的位移值
var maxTimestamp: Long, // 消息集合最大消息时间戳
var offsetOfMaxTimestamp: Long, // 消息集合最大消息时间戳所属消息的位移值
var logAppendTime: Long, // 写入消息时间戳
var logStartOffset: Long, // 消息集合首条消息的位移值
// 消息转换统计类,里面记录了执行了格式转换的消息数等数据
var recordConversionStats: RecordConversionStats,
sourceCodec: CompressionCodec, // 消息集合中消息使用的压缩器Compressor类型比如是Snappy还是LZ4
targetCodec: CompressionCodec, // 写入消息时需要使用的压缩器类型
shallowCount: Int, // 消息批次数,每个消息批次下可能包含多条消息
validBytes: Int, // 写入消息总字节数
offsetsMonotonic: Boolean, // 消息位移值是否是顺序增加的
lastOffsetOfFirstBatch: Long, // 首个消息批次中最后一条消息的位移
recordErrors: Seq[RecordError] = List(), // 写入消息时出现的异常列表
errorMessage: String = null) { // 错误码
......
}
```
大部分字段的含义很明确,这里我稍微提一下**lastOffset**和**lastOffsetOfFirstBatch**。
Kafka消息格式经历了两次大的变迁目前是0.11.0.0版本引入的Version 2消息格式。我们没有必要详细了解这些格式的变迁你只需要知道在0.11.0.0版本之后,**lastOffset和lastOffsetOfFirstBatch都是指向消息集合的最后一条消息即可**。它们的区别主要体现在0.11.0.0之前的版本。
append方法调用analyzeAndValidateRecords方法对消息集合进行校验并生成对应的LogAppendInfo对象其流程如下
```
private def analyzeAndValidateRecords(records: MemoryRecords, origin: AppendOrigin): LogAppendInfo = {
var shallowMessageCount = 0
var validBytesCount = 0
var firstOffset: Option[Long] = None
var lastOffset = -1L
var sourceCodec: CompressionCodec = NoCompressionCodec
var monotonic = true
var maxTimestamp = RecordBatch.NO_TIMESTAMP
var offsetOfMaxTimestamp = -1L
var readFirstMessage = false
var lastOffsetOfFirstBatch = -1L
for (batch &lt;- records.batches.asScala) {
// 消息格式Version 2的消息批次起始位移值必须从0开始
if (batch.magic &gt;= RecordBatch.MAGIC_VALUE_V2 &amp;&amp; origin == AppendOrigin.Client &amp;&amp; batch.baseOffset != 0)
throw new InvalidRecordException(s&quot;The baseOffset of the record batch in the append to $topicPartition should &quot; +
s&quot;be 0, but it is ${batch.baseOffset}&quot;)
if (!readFirstMessage) {
if (batch.magic &gt;= RecordBatch.MAGIC_VALUE_V2)
firstOffset = Some(batch.baseOffset) // 更新firstOffset字段
lastOffsetOfFirstBatch = batch.lastOffset // 更新lastOffsetOfFirstBatch字段
readFirstMessage = true
}
// 一旦出现当前lastOffset不小于下一个batch的lastOffset说明上一个batch中有消息的位移值大于后面batch的消息
// 这违反了位移值单调递增性
if (lastOffset &gt;= batch.lastOffset)
monotonic = false
// 使用当前batch最后一条消息的位移值去更新lastOffset
lastOffset = batch.lastOffset
// 检查消息批次总字节数大小是否超限即是否大于Broker端参数max.message.bytes值
val batchSize = batch.sizeInBytes
if (batchSize &gt; config.maxMessageSize) {
brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
throw new RecordTooLargeException(s&quot;The record batch size in the append to $topicPartition is $batchSize bytes &quot; +
s&quot;which exceeds the maximum configured value of ${config.maxMessageSize}.&quot;)
}
// 执行消息批次校验包括格式是否正确以及CRC校验
if (!batch.isValid) {
brokerTopicStats.allTopicsStats.invalidMessageCrcRecordsPerSec.mark()
throw new CorruptRecordException(s&quot;Record is corrupt (stored crc = ${batch.checksum()}) in topic partition $topicPartition.&quot;)
}
// 更新maxTimestamp字段和offsetOfMaxTimestamp
if (batch.maxTimestamp &gt; maxTimestamp) {
maxTimestamp = batch.maxTimestamp
offsetOfMaxTimestamp = lastOffset
}
// 累加消息批次计数器以及有效字节数更新shallowMessageCount字段
shallowMessageCount += 1
validBytesCount += batchSize
// 从消息批次中获取压缩器类型
val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id)
if (messageCodec != NoCompressionCodec)
sourceCodec = messageCodec
}
// 获取Broker端设置的压缩器类型即Broker端参数compression.type值。
// 该参数默认值是producer表示sourceCodec用的什么压缩器targetCodec就用什么
val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
// 最后生成LogAppendInfo对象并返回
LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset,
RecordConversionStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch)
}
```
**2.读取操作**
说完了append方法下面我们聊聊read方法。
read方法的流程相对要简单一些首先来看它的方法签名
```
def read(startOffset: Long,
maxLength: Int,
isolation: FetchIsolation,
minOneMessage: Boolean): FetchDataInfo = {
......
}
```
它接收4个参数含义如下
- startOffset即从Log对象的哪个位移值开始读消息。
- maxLength即最多能读取多少字节。
- isolation设置读取隔离级别主要控制能够读取的最大位移值多用于Kafka事务。
- minOneMessage即是否允许至少读一条消息。设想如果消息很大超过了maxLength正常情况下read方法永远不会返回任何消息。但如果设置了该参数为trueread方法就保证至少能够返回一条消息。
read方法的返回值是FetchDataInfo类也是一个POJO类里面最重要的数据就是读取的消息集合其他数据还包括位移等元数据信息。
下面我们来看下read方法的流程。
```
def read(startOffset: Long,
maxLength: Int,
isolation: FetchIsolation,
minOneMessage: Boolean): FetchDataInfo = {
maybeHandleIOException(s&quot;Exception while reading from $topicPartition in dir ${dir.getParent}&quot;) {
trace(s&quot;Reading $maxLength bytes from offset $startOffset of length $size bytes&quot;)
val includeAbortedTxns = isolation == FetchTxnCommitted
// 读取消息时没有使用Monitor锁同步机制因此这里取巧了用本地变量的方式把LEO对象保存起来避免争用race condition
val endOffsetMetadata = nextOffsetMetadata
val endOffset = nextOffsetMetadata.messageOffset
if (startOffset == endOffset) // 如果从LEO处开始读取那么自然不会返回任何数据直接返回空消息集合即可
return emptyFetchDataInfo(endOffsetMetadata, includeAbortedTxns)
// 找到startOffset值所在的日志段对象。注意要使用floorEntry方法
var segmentEntry = segments.floorEntry(startOffset)
// return error on attempt to read beyond the log end offset or read below log start offset
// 满足以下条件之一将被视为消息越界即你要读取的消息不在该Log对象中
// 1. 要读取的消息位移超过了LEO值
// 2. 没找到对应的日志段对象
// 3. 要读取的消息在Log Start Offset之下同样是对外不可见的消息
if (startOffset &gt; endOffset || segmentEntry == null || startOffset &lt; logStartOffset)
throw new OffsetOutOfRangeException(s&quot;Received request for offset $startOffset for partition $topicPartition, &quot; +
s&quot;but we only have log segments in the range $logStartOffset to $endOffset.&quot;)
// 查看一下读取隔离级别设置。
// 普通消费者能够看到[Log Start Offset, 高水位值)之间的消息
// 事务型消费者只能看到[Log Start Offset, Log Stable Offset]之间的消息。Log Stable Offset(LSO)是比LEO值小的位移值为Kafka事务使用
// Follower副本消费者能够看到[Log Start OffsetLEO)之间的消息
val maxOffsetMetadata = isolation match {
case FetchLogEnd =&gt; nextOffsetMetadata
case FetchHighWatermark =&gt; fetchHighWatermarkMetadata
case FetchTxnCommitted =&gt; fetchLastStableOffsetMetadata
}
// 如果要读取的起始位置超过了能读取的最大位置,返回空的消息集合,因为没法读取任何消息
if (startOffset &gt; maxOffsetMetadata.messageOffset) {
val startOffsetMetadata = convertToOffsetMetadataOrThrow(startOffset)
return emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns)
}
// 开始遍历日志段对象,直到读出东西来或者读到日志末尾
while (segmentEntry != null) {
val segment = segmentEntry.getValue
val maxPosition = {
if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) {
maxOffsetMetadata.relativePositionInSegment
} else {
segment.size
}
}
// 调用日志段对象的read方法执行真正的读取消息操作
val fetchInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
if (fetchInfo == null) { // 如果没有返回任何消息,去下一个日志段对象试试
segmentEntry = segments.higherEntry(segmentEntry.getKey)
} else { // 否则返回
return if (includeAbortedTxns)
addAbortedTransactions(startOffset, segmentEntry, fetchInfo)
else
fetchInfo
}
}
// 已经读到日志末尾还是没有数据返回,只能返回空消息集合
FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
}
}
```
## 总结
今天我重点讲解了Kafka的Log对象以及常见的操作。我们复习一下。
1. **高水位管理**Log对象定义了高水位对象以及管理它的各种操作主要包括更新和读取。
1. **日志段管理**作为日志段的容器Log对象保存了很多日志段对象。你需要重点掌握这些日志段对象被组织在一起的方式以及Kafka Log对象是如何对它们进行管理的。
1. **关键位移值管理**主要涉及对Log Start Offset和LEO的管理。这两个位移值是Log对象非常关键的字段。比如副本管理、状态机管理等高阶功能都要依赖于它们。
1. **读写操作**日志读写是实现Kafka消息引擎基本功能的基石。虽然你不需要掌握每行语句的含义但你至少要明白大体的操作流程。
讲到这里Kafka Log部分的源码我就介绍完了。我建议你特别关注下高水位管理和读写操作部分的代码特别是后者并且结合我今天讲的内容重点分析下这两部分的实现原理。最后我用一张思维导图来帮助你理解和记忆Log源码中的这些常见操作
<img src="https://static001.geekbang.org/resource/image/d0/99/d0cb945d7284f09ab2b6ffa764190399.jpg" alt="">
## 课后讨论
你能为Log对象添加一个方法统计介于高水位值和LEO值之间的消息总数吗
欢迎你在留言区畅所欲言,跟我交流讨论,也欢迎你把今天的内容分享给你的朋友。

View File

@@ -0,0 +1,366 @@
<audio id="audio" title="04 | 索引改进的二分查找算法在Kafka索引的应用" controls="" preload="none"><source id="mp3" src="https://static001.geekbang.org/resource/audio/eb/54/eb42991816f9ddad5d462d8155407354.mp3"></audio>
你好我是胡夕。今天我来带你学习一下Kafka源码中的索引对象以及改进版二分查找算法Binary Search Algorithm在索引中的应用。
## 为什么要阅读索引源码?
坦率地说你在Kafka中直接接触索引或索引文件的场景可能不是很多。索引是一个很神秘的组件Kafka官方文档也没有怎么提过它。你可能会说既然这样我还有必要读索引对象的源码吗其实是非常有必要的我给你分享一个真实的例子。
有一次我用Kafka的DumpLogSegments类去查看底层日志文件和索引文件的内容时发现了一个奇怪的现象——查看日志文件的内容不需要sudo权限而查看索引文件的内容必须要有sudo权限如下所示
```
$ sudo ./kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index
Dumping 00000000000000000000.index
offset: 0 position: 0
$ ./kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.index
Dumping 00000000000000000000.index
Exception in thread &quot;main&quot; java.io.FileNotFoundException: 00000000000000000000.index (Permission denied)
......
```
看了索引源码之后我才知道原来Kafka读取索引文件时使用的打开方式是rw。实际上读取文件不需要w权限只要r权限就行了。这显然是Kafka的一个Bug。你看通过阅读源码我找到了问题的根本原因还顺便修复了Kafka的一个问题[KAFKA-5104](https://issues.apache.org/jira/browse/KAFKA-5104))。
除了能帮我们解决实际问题之外,索引这个组件的源码还有一个亮点,那就是**它应用了耳熟能详的二分查找算法来快速定位索引项**。关于算法,我一直觉得很遗憾的是,**我们平时太注重算法本身,却忽略了它们在实际场景中的应用**。
比如说我们学习了太多的排序算法但是对于普通的应用开发人员来说亲自使用这些算法完成编程任务的机会实在太少了。说起数组排序你可能只记得调用Collections.sort方法了但它底层应用了什么排序算法其实并不清楚。
难得的是Kafka的索引组件中应用了二分查找算法而且社区还针对Kafka自身的特点对其进行了改良。这难道不值得我们好好学上一学吗话不多说现在我们就开始学习吧。
## 索引类图及源文件组织架构
在Kafka源码中跟索引相关的源码文件有5个它们都位于core包的/src/main/scala/kafka/log路径下。我们一一来看下。
- AbstractIndex.scala它定义了最顶层的抽象类这个类封装了所有索引类型的公共操作。
- LazyIndex.scala它定义了AbstractIndex上的一个包装类实现索引项延迟加载。这个类主要是为了提高性能。
- OffsetIndex.scala定义位移索引保存“&lt;位移值,文件磁盘物理位置&gt;”对。
- TimeIndex.scala定义时间戳索引保存“&lt;时间戳,位移值&gt;”对。
- TransactionIndex.scala定义事务索引为已中止事务Aborted Transcation保存重要的元数据信息。只有启用Kafka事务后这个索引才有可能出现。
这些类的关系如下图所示:
<img src="https://static001.geekbang.org/resource/image/34/e8/347480a2d1ae659d0ecf590b01d091e8.jpg" alt="">
其中OffsetIndex、TimeIndex和TransactionIndex都继承了AbstractIndex类而上层的LazyIndex仅仅是包装了一个AbstractIndex的实现类用于延迟加载。就像我之前说的LazyIndex的作用是为了提升性能并没有什么功能上的改进。
所以今天我先和你讲一讲AbstractIndex这个抽象父类的代码。下节课我再重点和你分享具体的索引实现类。
## AbstractIndex代码结构
我们先来看下AbstractIndex的类定义
```
abstract class AbstractIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1, val writable: Boolean) extends Closeable {
......
}
```
AbstractIndex定义了4个属性字段。由于是一个抽象基类它的所有子类自动地继承了这4个字段。也就是说Kafka所有类型的索引对象都定义了这些属性。我先给你解释下这些属性的含义。
1. **索引文件**file。每个索引对象在磁盘上都对应了一个索引文件。你可能注意到了这个字段是var型说明它是可以被修改的。难道索引对象还能动态更换底层的索引文件吗是的。自1.1.0版本之后Kafka允许迁移底层的日志路径所以索引文件自然要是可以更换的。
1. **起始位移值**baseOffset。索引对象对应日志段对象的起始位移值。举个例子如果你查看Kafka日志路径的话就会发现日志文件和索引文件都是成组出现的。比如说如果日志文件是00000000000000000123.log正常情况下一定还有一组索引文件00000000000000000123.index、00000000000000000123.timeindex等。这里的“123”就是这组文件的起始位移值也就是baseOffset值。
1. **索引文件最大字节数**maxIndexSize。它控制索引文件的最大长度。Kafka源码传入该参数的值是Broker端参数segment.index.bytes的值即10MB。这就是在默认情况下所有Kafka索引文件大小都是10MB的原因。
1. **索引文件打开方式**writable。“True”表示以“读写”方式打开“False”表示以“只读”方式打开。如果我没记错的话这个参数应该是我加上去的就是为了修复我刚刚提到的那个Bug。
AbstractIndex是抽象的索引对象类。可以说它是承载索引项的容器而每个继承它的子类负责定义具体的索引项结构。比如OffsetIndex的索引项是&lt;位移值,物理磁盘位置&gt;TimeIndex的索引项是&lt;时间戳,位移值&gt;对。基于这样的设计理念AbstractIndex类中定义了一个抽象方法entrySize来表示不同索引项的大小如下所示
```
protected def entrySize: Int
```
子类实现该方法时需要给定自己索引项的大小对于OffsetIndex而言该值就是8对于TimeIndex而言该值是12如下所示
```
// OffsetIndex
override def entrySize = 8
// TimeIndex
override def entrySize = 12
```
说到这儿你肯定会问为什么是8和12呢我来解释一下。
在OffsetIndex中位移值用4个字节来表示物理磁盘位置也用4个字节来表示所以总共是8个字节。你可能会说位移值不是长整型吗应该是8个字节才对啊。
还记得AbstractIndex已经保存了baseOffset了吗这里的位移值实际上是相对于baseOffset的相对位移值即真实位移值减去baseOffset的值。下节课我会给你重点讲一下它这里你只需要知道**使用相对位移值能够有效地节省磁盘空间**就行了。而Broker端参数log.segment.bytes是整型这说明Kafka中每个日志段文件的大小不会超过2^32即4GB这就说明同一个日志段文件上的位移值减去baseOffset的差值一定在整数范围内。因此源码只需要4个字节保存就行了。
同理TimeIndex中的时间戳类型是长整型占用8个字节位移依然使用相对位移值占用4个字节因此总共需要12个字节。
如果有人问你Kafka中的索引底层的实现原理是什么你可以大声地告诉他**内存映射文件即Java中的MappedByteBuffer**。
使用内存映射文件的主要优势在于它有很高的I/O性能特别是对于索引这样的小文件来说由于文件内存被直接映射到一段虚拟内存上访问内存映射文件的速度要快于普通的读写文件速度。
另外在很多操作系统中比如Linux这段映射的内存区域实际上就是内核的页缓存Page Cache。这就意味着里面的数据不需要重复拷贝到用户态空间避免了很多不必要的时间、空间消耗。
在AbstractIndex中这个MappedByteBuffer就是名为mmap的变量。接下来我用注释的方式带你深入了解下这个mmap的主要流程。
```
@volatile
protected var mmap: MappedByteBuffer = {
// 第1步创建索引文件
val newlyCreated = file.createNewFile()
// 第2步以writable指定的方式读写方式或只读方式打开索引文件
val raf = if (writable) new RandomAccessFile(file, &quot;rw&quot;) else new RandomAccessFile(file, &quot;r&quot;)
try {
if(newlyCreated) {
if(maxIndexSize &lt; entrySize) // 预设的索引文件大小不能太小,如果连一个索引项都保存不了,直接抛出异常
throw new IllegalArgumentException(&quot;Invalid max index size: &quot; + maxIndexSize)
// 第3步设置索引文件长度roundDownToExactMultiple计算的是不超过maxIndexSize的最大整数倍entrySize
// 比如maxIndexSize=1234567entrySize=8那么调整后的文件长度为1234560
raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))
}
// 第4步更新索引长度字段_length
_length = raf.length()
// 第5步创建MappedByteBuffer对象
val idx = {
if (writable)
raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, _length)
else
raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, _length)
}
/* set the position in the index for the next entry */
// 第6步如果是新创建的索引文件将MappedByteBuffer对象的当前位置置成0
// 如果索引文件已存在将MappedByteBuffer对象的当前位置设置成最后一个索引项所在的位置
if(newlyCreated)
idx.position(0)
else
idx.position(roundDownToExactMultiple(idx.limit(), entrySize))
// 第7步返回创建的MappedByteBuffer对象
idx
} finally {
CoreUtils.swallow(raf.close(), AbstractIndex) // 关闭打开索引文件句柄
}
}
```
这些代码最主要的作用就是创建mmap对象。要知道AbstractIndex其他大部分的操作都是和mmap相关。
我举两个简单的小例子。
例1如果我们要计算索引对象中当前有多少个索引项那么只需要执行下列计算即可
```
protected var _entries: Int = mmap.position() / entrySize
```
例2如果我们要计算索引文件最多能容纳多少个索引项只要定义下面的变量就行了
```
private[this] var _maxEntries: Int = mmap.limit() / entrySize
```
再进一步,有了这两个变量,我们就能够很容易地编写一个方法,来判断当前索引文件是否已经写满:
```
def isFull: Boolean = _entries &gt;= _maxEntries
```
总之,**AbstractIndex中最重要的就是这个mmap变量了**。事实上AbstractIndex继承类实现添加索引项的主要逻辑也就是**向mmap中添加对应的字段**。
## 写入索引项
下面这段代码是OffsetIndex的append方法用于向索引文件中写入新索引项。
```
def append(offset: Long, position: Int): Unit = {
inLock(lock) {
// 第1步判断索引文件未写满
require(!isFull, &quot;Attempt to append to a full index (size = &quot; + _entries + &quot;).&quot;)
// 第2步必须满足以下条件之一才允许写入索引项
// 条件1当前索引文件为空
// 条件2要写入的位移大于当前所有已写入的索引项的位移——Kafka规定索引项中的位移值必须是单调增加的
if (_entries == 0 || offset &gt; _lastOffset) {
trace(s&quot;Adding index entry $offset =&gt; $position to ${file.getAbsolutePath}&quot;)
mmap.putInt(relativeOffset(offset)) // 第3步A向mmap中写入相对位移值
mmap.putInt(position) // 第3步B向mmap中写入物理位置信息
// 第4步更新其他元数据统计信息如当前索引项计数器_entries和当前索引项最新位移值_lastOffset
_entries += 1
_lastOffset = offset
// 第5步执行校验。写入的索引项格式必须符合要求即索引项个数*单个索引项占用字节数匹配当前文件物理大小,否则说明文件已损坏
require(_entries * entrySize == mmap.position(), entries + &quot; entries but file position in index is &quot; + mmap.position() + &quot;.&quot;)
} else {
// 如果第2步中两个条件都不满足不能执行写入索引项操作抛出异常
throw new InvalidOffsetException(s&quot;Attempt to append an offset ($offset) to position $entries no larger than&quot; +
s&quot; the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.&quot;)
}
}
}
```
我使用一张图来总结下append方法的执行流程希望可以帮你更快速地掌握它的实现
<img src="https://static001.geekbang.org/resource/image/23/ea/236f0731ff2799f0902ff7293cf6ddea.jpg" alt="">
## 查找索引项
索引项的写入逻辑并不复杂难点在于如何查找索引项。AbstractIndex定义了抽象方法**parseEntry**用于查找给定的索引项,如下所示:
```
protected def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry
```
这里的“n”表示要查找给定ByteBuffer中保存的第n个索引项在Kafka中也称第n个槽。IndexEntry是源码定义的一个接口里面有两个方法indexKey和indexValue分别返回不同类型索引的&lt;KeyValue&gt;对。
OffsetIndex实现parseEntry的逻辑如下
```
override protected def parseEntry(buffer: ByteBuffer, n: Int): OffsetPosition = {
OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n))
}
```
OffsetPosition是实现IndexEntry的实现类Key就是之前说的位移值而Value就是物理磁盘位置值。所以这里你能看到代码调用了relativeOffset(buffer, n) + baseOffset计算出绝对位移值之后调用physical(buffer, n)计算物理磁盘位置,最后将它们封装到一起作为一个独立的索引项返回。
**我建议你去看下relativeOffset和physical方法的实现看看它们是如何计算相对位移值和物理磁盘位置信息的**
有了parseEntry方法我们就能够根据给定的n来查找索引项了。但是这里还有个问题需要解决那就是我们如何确定要找的索引项在第n个槽中呢其实本质上这是一个算法问题也就是如何从一组已排序的数中快速定位符合条件的那个数。
## 二分查找算法
到目前为止从已排序数组中寻找某个数字最快速的算法就是二分查找了它能做到O(lgN)的时间复杂度。Kafka的索引组件就应用了二分查找算法。
我先给出原版的实现算法代码。
```
private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = {
// 第1步如果当前索引为空直接返回&lt;-1,-1&gt;对
if(_entries == 0)
return (-1, -1)
// 第2步要查找的位移值不能小于当前最小位移值
if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) &gt; 0)
return (-1, 0)
// binary search for the entry
// 第3步执行二分查找算法
var lo = 0
var hi = _entries - 1
while(lo &lt; hi) {
val mid = ceil(hi/2.0 + lo/2.0).toInt
val found = parseEntry(idx, mid)
val compareResult = compareIndexEntry(found, target, searchEntity)
if(compareResult &gt; 0)
hi = mid - 1
else if(compareResult &lt; 0)
lo = mid
else
return (mid, mid)
}
(lo, if (lo == _entries - 1) -1 else lo + 1)
```
这段代码的核心是第3步的二分查找算法。熟悉Binary Search的话你对这段代码一定不会感到陌生。
讲到这里似乎一切很完美Kafka索引应用二分查找算法快速定位待查找索引项位置之后调用parseEntry来读取索引项。不过这真的就是无懈可击的解决方案了吗
## 改进版二分查找算法
显然不是我前面说过了大多数操作系统使用页缓存来实现内存映射而目前几乎所有的操作系统都使用LRULeast Recently Used或类似于LRU的机制来管理页缓存。
Kafka写入索引文件的方式是在文件末尾追加写入而几乎所有的索引查询都集中在索引的尾部。这么来看的话LRU机制是非常适合Kafka的索引访问场景的。
这里有个问题是当Kafka在查询索引的时候原版的二分查找算法并没有考虑到缓存的问题因此很可能会导致一些不必要的缺页中断Page Fault。此时Kafka线程会被阻塞等待对应的索引项从物理磁盘中读出并放入到页缓存中。
下面我举个例子来说明一下这个情况。假设Kafka的某个索引占用了操作系统页缓存13个页Page如果待查找的位移值位于最后一个页上也就是Page 12那么标准的二分查找算法会依次读取页号0、6、9、11和12具体的推演流程如下所示
<img src="https://static001.geekbang.org/resource/image/e4/85/e4fc56a301b13afae4dda303e5366085.jpg" alt="">
通常来说一个页上保存了成百上千的索引项数据。随着索引文件不断被写入Page #12不断地被填充新的索引项。如果此时索引查询方都来自ISR副本或Lag很小的消费者那么这些查询大多集中在对Page #12的查询因此Page #0、6、9、11、12一定经常性地被源码访问。也就是说这些页一定保存在页缓存上。后面当新的索引项填满了Page #12页缓存就会申请一个新的Page来保存索引项即Page #13
现在最新索引项保存在Page #13中。如果要查找最新索引项原版二分查找算法将会依次访问Page #0、7、10、12和13。此时问题来了Page 7和10已经很久没有被访问过了它们大概率不在页缓存中因此一旦索引开始征用Page #13就会发生Page Fault等待那些冷页数据从磁盘中加载到页缓存。根据国外用户的测试这种加载过程可能长达1秒。
显然这是一个普遍的问题即每当索引文件占用Page数发生变化时就会强行变更二分查找的搜索路径从而出现不在页缓存的冷数据必须要加载到页缓存的情形而这种加载过程是非常耗时的。
基于这个问题社区提出了改进版的二分查找策略也就是缓存友好的搜索算法。总体的思路是代码将所有索引项分成两个部分热区Warm Area和冷区Cold Area然后分别在这两个区域内执行二分查找算法如下图所示
<img src="https://static001.geekbang.org/resource/image/e1/8e/e135f0a6b3fb43ead51db4ddbad1638e.jpg" alt="">
乍一看,该算法并没有什么高大上的改进,仅仅是把搜寻区域分成了冷、热两个区域,然后有条件地在不同区域执行普通的二分查找算法罢了。实际上,这个改进版算法提供了一个重要的保证:**它能保证那些经常需要被访问的Page组合是固定的**。
想想刚才的例子同样是查询最热的那部分数据一旦索引占用了更多的Page要遍历的Page组合就会发生变化。这是导致性能下降的主要原因。
这个改进版算法的最大好处在于,**查询最热那部分数据所遍历的Page永远是固定的因此大概率在页缓存中从而避免无意义的Page Fault**。
下面我们来看实际的代码。我用注释的方式解释了改进版算法的实现逻辑。一旦你了解了冷区热区的分割原理,剩下的就不难了。
```
private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = {
// 第1步如果索引为空直接返回&lt;-1,-1&gt;对
if(_entries == 0)
return (-1, -1)
// 封装原版的二分查找算法
def binarySearch(begin: Int, end: Int) : (Int, Int) = {
// binary search for the entry
var lo = begin
var hi = end
while(lo &lt; hi) {
val mid = (lo + hi + 1) &gt;&gt;&gt; 1
val found = parseEntry(idx, mid)
val compareResult = compareIndexEntry(found, target, searchEntity)
if(compareResult &gt; 0)
hi = mid - 1
else if(compareResult &lt; 0)
lo = mid
else
return (mid, mid)
}
(lo, if (lo == _entries - 1) -1 else lo + 1)
}
// 第3步确认热区首个索引项位于哪个槽。_warmEntries就是所谓的分割线目前固定为8192字节处
// 如果是OffsetIndex_warmEntries = 8192 / 8 = 1024即第1024个槽
// 如果是TimeIndex_warmEntries = 8192 / 12 = 682即第682个槽
val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries)
// 第4步判断target位移值在热区还是冷区
if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) &lt; 0) {
return binarySearch(firstHotEntry, _entries - 1) // 如果在热区,搜索热区
}
// 第5步确保target位移值不能小于当前最小位移值
if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) &gt; 0)
return (-1, 0)
// 第6步如果在冷区搜索冷区
binarySearch(0, firstHotEntry)
```
## 总结
今天我带你详细学习了Kafka中的索引机制以及社区如何应用二分查找算法实现快速定位索引项。有两个重点你一定要记住。
1. AbstractIndex这是Kafka所有类型索引的抽象父类里面的mmap变量是实现索引机制的核心你一定要掌握它。
1. 改进版二分查找算法社区在标准原版的基础上对二分查找算法根据实际访问场景做了定制化的改进。你需要特别关注改进版在提升缓存性能方面做了哪些努力。改进版能够有效地提升页缓存的使用率从而在整体上降低物理I/O缓解系统负载瓶颈。你最好能够从索引这个维度去思考社区在这方面所做的工作。
<img src="https://static001.geekbang.org/resource/image/31/22/31e35198818b0bbc05ef333b5897b022.jpg" alt="">
实际上无论是AbstractIndex还是它使用的二分查找算法它们都属于Kafka索引共性的东西即所有Kafka索引都具备这些特点或特性。那么你是否想了解不同类型索引之间的区别呢比如位移索引和时间戳索引有哪些异同之处。这些问题的答案我会在下节课揭晓你千万不要错过。
## 课后讨论
目前冷区和热区的分割线设定在8192字节处请结合源码注释以及你自己的理解讲一讲为什么要设置成8192
欢迎你在留言区畅所欲言,跟我交流讨论,也欢迎你把今天的内容分享给你的朋友。

View File

@@ -0,0 +1,229 @@
<audio id="audio" title="05 | 索引(下):位移索引和时间戳索引的区别是什么?" controls="" preload="none"><source id="mp3" src="https://static001.geekbang.org/resource/audio/9a/97/9ab31ac33d92b925f577e8fc4aa48797.mp3"></audio>
你好,我是胡夕。今天,我们继续说索引那些事儿。
在上节课我带你重点学习了Kafka源码中索引的抽象父类AbstractIndex。我分析了AbstractIndex类的大体对象结构还介绍了社区改进版的二分查找算法在Kafka索引上的应用。
前面说过Kafka索引类型有三大类位移索引、时间戳索引和已中止事务索引。相比于最后一类索引前两类索引的出镜率更高一些。在Kafka的数据路径下你肯定看到过很多.index和.timeindex后缀的文件。不知你是否有过这样的疑问“这些文件是用来做什么的呢” 现在我可以明确告诉你:.index文件就是Kafka中的位移索引文件而.timeindex文件则是时间戳索引文件。
那么,位移索引和时间戳索引到底是做什么用的呢?它们之间的区别是什么?今天,我就为你揭晓这些问题的答案。
## 位移索引
在学习Kafka的任何一类索引的时候我们都要关注两个问题
1. 索引中的索引项是如何定义的?
1. 如何向索引写入新的索引项?
看到这里,你可能会很疑惑:“等等,难道我们不需要关心如何查询索引吗?” 当然需要啦!上节课我们不是讲过二分查找算法在索引中的应用了吗?如果你觉得有点生疏了,那就赶快先去复习一下吧。
现在,我们先来看下索引项的定义。
### 索引项的定义
位移索引也就是所谓的OffsetIndex它可是一个老资历的组件了。如果我没记错的话国内大面积使用Kafka应该是在0.8时代。从那个时候开始OffsetIndex就已经存在了。每当Consumer需要从主题分区的某个位置开始读取消息时Kafka就会用到OffsetIndex直接定位物理文件位置从而避免了因为从头读取消息而引入的昂贵的I/O操作。
在上节课,我提到过,不同索引类型保存不同的&lt;Key, Value&gt;对。就OffsetIndex而言Key就是消息的相对位移Value是保存该消息的日志段文件中该消息第一个字节的物理文件位置。
这里我来具体解释一下相对位移的含义。还记得AbstractIndex类中的抽象方法entrySize吗它定义了单个&lt;Key, Value&gt;对所用的字节数。对于OffsetIndex来说entrySize就是8如OffsetIndex.scala中定义的那样
```
override def entrySize = 8
```
为什么是8呢相对位移是一个整型Integer占用4个字节物理文件位置也是一个整型同样占用4个字节因此总共是8个字节。
那相对位移是什么值呢我们知道Kafka中的消息位移值是一个长整型Long应该占用8个字节才对。在保存OffsetIndex的&lt;Key, Value&gt;对时Kafka做了一些优化。每个OffsetIndex对象在创建时都已经保存了对应日志段对象的起始位移因此OffsetIndex索引项没必要保存完整的8字节位移值。相反地它只需要保存与起始位移的差值Delta就够了而这个差值是可以被整型容纳的。这种设计可以让OffsetIndex每个索引项都节省4个字节。
举个简单的例子。假设一个索引文件保存了1000个索引项使用相对位移值就能节省大约4MB的空间这是不是一件很划算的事情呢
OffsetIndex定义了专门的方法用于将一个Long型的位移值转换成相对位移如下所示
```
def relativeOffset(offset: Long): Int = {
val relativeOffset = toRelative(offset)
if (relativeOffset.isEmpty)
// 如果无法转换成功(比如差值超过了整型表示范围),则抛出异常
throw new IndexOffsetOverflowException(s&quot;Integer overflow for offset: $offset (${file.getAbsoluteFile})&quot;)
relativeOffset.get
}
```
relativeOffset方法调用了父类的toRelative方法执行真正的转换。我们来看一下toRelative方法的实现。
```
private def toRelative(offset: Long): Option[Int] = {
val relativeOffset = offset - baseOffset
if (relativeOffset &lt; 0 || relativeOffset &gt; Int.MaxValue)
None
else
Some(relativeOffset.toInt)
}
```
逻辑很简单第一步是计算给定的offset值与baseOffset的差值第二步是校验该差值不能是负数或不能超过整型表示范围。如果校验通过就直接返回该差值作为相对位移值否则就返回None表示转换失败。
<img src="https://static001.geekbang.org/resource/image/c2/00/c259aff07a71aa4fe16165f423b3d600.jpg" alt="">
现在你知道OffsetIndex中的索引项为什么是8个字节以及位移值是如何被转换成相对位移了吧
当读取OffsetIndex时源码还需要将相对位移值还原成之前的完整位移。这个是在parseEntry方法中实现的。
```
override protected def parseEntry(buffer: ByteBuffer, n: Int): OffsetPosition = {
OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n))
}
```
我来给你解释下具体的实现方法。
这个方法返回一个OffsetPosition类型。该类有两个方法分别返回索引项的Key和Value。
**这里的parseEntry方法就是要构造OffsetPosition所需的Key和Value**。Key是索引项中的完整位移值**代码使用baseOffset + relativeOffset(buffer, n)的方式将相对位移值还原成完整位移值**Value是这个位移值上消息在日志段文件中的物理位置代码调用physical方法计算这个物理位置并把它作为Value。
最后parseEntry方法把Key和Value封装到一个OffsetPosition实例中然后将这个实例返回。
由于索引文件的总字节数就是索引项字节数乘以索引项数因此代码结合entrySize和buffer.getInt方法能够轻松地计算出第n个索引项所处的物理文件位置。这就是physical方法做的事情。
### 写入索引项
好了有了这些基础下面的内容就很容易理解了。我们来看下OffsetIndex中最重要的操作——**写入索引项append方法的实现**。
```
def append(offset: Long, position: Int): Unit = {
inLock(lock) {
// 索引文件如果已经写满,直接抛出异常
require(!isFull, &quot;Attempt to append to a full index (size = &quot; + _entries + &quot;).&quot;)
// 要保证待写入的位移值offset比当前索引文件中所有现存的位移值都要大
// 这主要是为了维护索引的单调增加性
if (_entries == 0 || offset &gt; _lastOffset) {
trace(s&quot;Adding index entry $offset =&gt; $position to ${file.getAbsolutePath}&quot;)
mmap.putInt(relativeOffset(offset)) // 向mmap写入相对位移值
mmap.putInt(position) // 向mmap写入物理文件位置
_entries += 1 // 更新索引项个数
_lastOffset = offset // 更新当前索引文件最大位移值
// 确保写入索引项格式符合要求
require(_entries * entrySize == mmap.position(), s&quot;$entries entries but file position in index is ${mmap.position()}.&quot;)
} else {
throw new InvalidOffsetException(s&quot;Attempt to append an offset ($offset) to position $entries no larger than&quot; +
s&quot; the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.&quot;)
}
}
}
```
append方法接收两个参数**Long型的位移值**和**Integer型的物理文件位置**。**该方法最重要的两步就是分别向mmap写入相对位移值和物理文件位置**。我使用一张图来总结下append方法的执行流程
<img src="https://static001.geekbang.org/resource/image/17/e6/176bec3d45790509c0587614be7f61e6.jpg" alt="">
除了append方法索引还有一个常见的操作截断操作Truncation。**截断操作是指,将索引文件内容直接裁剪掉一部分**。比如OffsetIndex索引文件中当前保存了100个索引项我想只保留最开始的40个索引项。源码定义了truncateToEntries方法来实现这个需求
```
private def truncateToEntries(entries: Int): Unit = {
inLock(lock) {
_entries = entries
mmap.position(_entries * entrySize)
_lastOffset = lastEntry.offset
debug(s&quot;Truncated index ${file.getAbsolutePath} to $entries entries;&quot; +
s&quot; position is now ${mmap.position()} and last offset is now ${_lastOffset}&quot;)
}
}
```
这个方法接收entries参数表示**要截取到哪个槽**主要的逻辑实现是调用mmap的position方法。源码中的_entries * entrySize就是mmap要截取到的字节处。
下面我来说说OffsetIndex的使用方式。
既然OffsetIndex被用来快速定位消息所在的物理文件位置那么必然需要定义一个方法执行对应的查询逻辑。这个方法就是lookup。
```
def lookup(targetOffset: Long): OffsetPosition = {
maybeLock(lock) {
val idx = mmap.duplicate // 使用私有变量复制出整个索引映射区
// largestLowerBoundSlotFor方法底层使用了改进版的二分查找算法寻找对应的槽
val slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY)
// 如果没找到返回一个空的位置即物理文件位置从0开始表示从头读日志文件
// 否则返回slot槽对应的索引项
if(slot == -1)
OffsetPosition(baseOffset, 0)
else
parseEntry(idx, slot)
}
}
```
我把主要的逻辑以注释的方式加到了代码中。该方法返回的是不大于给定位移值targetOffset的最大位移值以及对应的物理文件位置。你大致可以把这个方法理解为位移值的FLOOR函数。
## 时间戳索引
说完了OffsetIndex我们来看另一大类索引时间戳索引即TimeIndex。与OffsetIndex类似我们重点关注TimeIndex中索引项的定义以及如何写入TimeIndex索引项。
### 索引项的定义
与OffsetIndex不同的是TimeIndex保存的是&lt;时间戳,相对位移值&gt;对。时间戳需要一个长整型来保存相对位移值使用Integer来保存。因此TimeIndex单个索引项需要占用12个字节。这也揭示了一个重要的事实**在保存同等数量索引项的基础上TimeIndex会比OffsetIndex占用更多的磁盘空间**。
### 写入索引项
TimeIndex也有append方法只不过它叫作maybeAppend。我们来看下它的实现逻辑。
```
def maybeAppend(timestamp: Long, offset: Long, skipFullCheck: Boolean = false): Unit = {
inLock(lock) {
if (!skipFullCheck)
// 如果索引文件已写满,抛出异常
require(!isFull, &quot;Attempt to append to a full time index (size = &quot; + _entries + &quot;).&quot;)
// 确保索引单调增加性
if (_entries != 0 &amp;&amp; offset &lt; lastEntry.offset)
throw new InvalidOffsetException(s&quot;Attempt to append an offset ($offset) to slot ${_entries} no larger than&quot; +
s&quot; the last offset appended (${lastEntry.offset}) to ${file.getAbsolutePath}.&quot;)
// 确保时间戳的单调增加性
if (_entries != 0 &amp;&amp; timestamp &lt; lastEntry.timestamp)
throw new IllegalStateException(s&quot;Attempt to append a timestamp ($timestamp) to slot ${_entries} no larger&quot; +
s&quot; than the last timestamp appended (${lastEntry.timestamp}) to ${file.getAbsolutePath}.&quot;)
if (timestamp &gt; lastEntry.timestamp) {
trace(s&quot;Adding index entry $timestamp =&gt; $offset to ${file.getAbsolutePath}.&quot;)
mmap.putLong(timestamp) // 向mmap写入时间戳
mmap.putInt(relativeOffset(offset)) // 向mmap写入相对位移值
_entries += 1 // 更新索引项个数
_lastEntry = TimestampOffset(timestamp, offset) // 更新当前最新的索引项
require(_entries * entrySize == mmap.position(), s&quot;${_entries} entries but file position in index is ${mmap.position()}.&quot;)
}
}
}
```
和OffsetIndex类似向TimeIndex写入索引项的主体逻辑是向mmap分别写入时间戳和相对位移值。只不过**除了校验位移值的单调增加性之外TimeIndex还会确保顺序写入的时间戳也是单调增加的**。
说到这里我想到我当年读到这段代码时候的一个想法。那个时候这段代码还没有加上时间戳单调增加的校验逻辑我灵机一动萌发了向TimeIndex写入一个过期时间戳的想法。一番动手操作之后我真的向TimeIndex索引文件中写入了一个过期时间戳和位移。
你猜结果怎样?结果是引发了消费者端程序的彻底混乱。这是因为,当消费者端程序根据时间戳信息去过滤待读取的消息时,它读到了这个过期的时间戳并拿到了错误的位移值,因此返回了错误的数据。
为此我还给社区提交了一个Jira当时被驳回了——理由是不允许向TimeIndex写入过期时间戳。跟你说这个趣事儿只是想说明有的时候读源码会诱发很多灵感或奇思妙想而这些东西是你在平时使用过程中不会想到的。这也算是阅读源码的一大收获吧。
## 区别
讲到这里这节课就接近尾声了。最后我用一张表格汇总下OffsetIndex和TimeIndex的特点和区别希望能够帮助你更好地理解和消化今天的重点内容。
<img src="https://static001.geekbang.org/resource/image/a3/78/a359ce4e81eb073a9ebed2979082b578.jpg" alt="">
## 总结
今天我带你详细分析了OffsetIndex和TimeIndex以及它们的不同之处。虽然OffsetIndex和TimeIndex是不同类型的索引但Kafka内部是把二者结合使用的。通常的流程是先使用TimeIndex寻找满足时间戳要求的消息位移值然后再利用OffsetIndex去定位该位移值所在的物理文件位置。因此它们其实是合作的关系。
最后,我还想提醒你一点:**不要对索引文件做任何修改!**我碰到过因用户擅自重命名索引文件从而导致Broker崩溃无法启动的场景。另外虽然Kafka能够重建索引但是随意地删除索引文件依然是一个很危险的操作。在生产环境中我建议你尽量不要执行这样的操作。
## 课后讨论
OffsetIndex中的lookup方法实现了类似于FLOOR函数的位移查找逻辑。你能否对应写一个类似于CEILING函数的位移查找逻辑即返回不小于给定位移值targetOffset的最小位移值和对应的物理文件位置
欢迎你在留言区畅所欲言,跟我交流讨论,也欢迎你把今天的内容分享给你的朋友。