learn.lianglianglee.com/专栏/Kafka核心技术与实战/18 Kafka中位移提交那些事儿.md.html
2022-08-14 03:40:33 +08:00

389 lines
30 KiB
HTML
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<!DOCTYPE html>
<!-- saved from url=(0046)https://kaiiiz.github.io/hexo-theme-book-demo/ -->
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1.0, user-scalable=no">
<link rel="icon" href="/static/favicon.png">
<title>18 Kafka中位移提交那些事儿.md.html</title>
<!-- Spectre.css framework -->
<link rel="stylesheet" href="/static/index.css">
<!-- theme css & js -->
<meta name="generator" content="Hexo 4.2.0">
</head>
<body>
<div class="book-container">
<div class="book-sidebar">
<div class="book-brand">
<a href="/">
<img src="/static/favicon.png">
<span>技术文章摘抄</span>
</a>
</div>
<div class="book-menu uncollapsible">
<ul class="uncollapsible">
<li><a href="/" class="current-tab">首页</a></li>
</ul>
<ul class="uncollapsible">
<li><a href="../">上一级</a></li>
</ul>
<ul class="uncollapsible">
<li>
<a href="/专栏/Kafka核心技术与实战/00 开篇词 为什么要学习Kafka.md.html">00 开篇词 为什么要学习Kafka</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/01 消息引擎系统ABC.md.html">01 消息引擎系统ABC</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/02 一篇文章带你快速搞定Kafka术语.md.html">02 一篇文章带你快速搞定Kafka术语</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/03 Kafka只是消息引擎系统吗.md.html">03 Kafka只是消息引擎系统吗</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/04 我应该选择哪种Kafka.md.html">04 我应该选择哪种Kafka</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/05 聊聊Kafka的版本号.md.html">05 聊聊Kafka的版本号</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/06 Kafka线上集群部署方案怎么做.md.html">06 Kafka线上集群部署方案怎么做</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/07 最最最重要的集群参数配置(上).md.html">07 最最最重要的集群参数配置(上)</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/08 最最最重要的集群参数配置(下).md.html">08 最最最重要的集群参数配置(下)</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/09 生产者消息分区机制原理剖析.md.html">09 生产者消息分区机制原理剖析</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/10 生产者压缩算法面面观.md.html">10 生产者压缩算法面面观</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/11 无消息丢失配置怎么实现?.md.html">11 无消息丢失配置怎么实现?</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/12 客户端都有哪些不常见但是很高级的功能?.md.html">12 客户端都有哪些不常见但是很高级的功能?</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/13 Java生产者是如何管理TCP连接的.md.html">13 Java生产者是如何管理TCP连接的</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/14 幂等生产者和事务生产者是一回事吗?.md.html">14 幂等生产者和事务生产者是一回事吗?</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/15 消费者组到底是什么?.md.html">15 消费者组到底是什么?</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/16 揭开神秘的“位移主题”面纱.md.html">16 揭开神秘的“位移主题”面纱</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/17 消费者组重平衡能避免吗?.md.html">17 消费者组重平衡能避免吗?</a>
</li>
<li>
<a class="current-tab" href="/专栏/Kafka核心技术与实战/18 Kafka中位移提交那些事儿.md.html">18 Kafka中位移提交那些事儿</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/19 CommitFailedException异常怎么处理.md.html">19 CommitFailedException异常怎么处理</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/20 多线程开发消费者实例.md.html">20 多线程开发消费者实例</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/21 Java 消费者是如何管理TCP连接的.md.html">21 Java 消费者是如何管理TCP连接的</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/22 消费者组消费进度监控都怎么实现?.md.html">22 消费者组消费进度监控都怎么实现?</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/23 Kafka副本机制详解.md.html">23 Kafka副本机制详解</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/24 请求是怎么被处理的?.md.html">24 请求是怎么被处理的?</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/25 消费者组重平衡全流程解析.md.html">25 消费者组重平衡全流程解析</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/26 你一定不能错过的Kafka控制器.md.html">26 你一定不能错过的Kafka控制器</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/27 关于高水位和Leader Epoch的讨论.md.html">27 关于高水位和Leader Epoch的讨论</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/28 主题管理知多少.md.html">28 主题管理知多少</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/29 Kafka动态配置了解下.md.html">29 Kafka动态配置了解下</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/30 怎么重设消费者组位移?.md.html">30 怎么重设消费者组位移?</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/31 常见工具脚本大汇总.md.html">31 常见工具脚本大汇总</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/32 KafkaAdminClientKafka的运维利器.md.html">32 KafkaAdminClientKafka的运维利器</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/33 Kafka认证机制用哪家.md.html">33 Kafka认证机制用哪家</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/34 云环境下的授权该怎么做?.md.html">34 云环境下的授权该怎么做?</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/35 跨集群备份解决方案MirrorMaker.md.html">35 跨集群备份解决方案MirrorMaker</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/36 你应该怎么监控Kafka.md.html">36 你应该怎么监控Kafka</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/37 主流的Kafka监控框架.md.html">37 主流的Kafka监控框架</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/38 调优Kafka你做到了吗.md.html">38 调优Kafka你做到了吗</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/39 从0搭建基于Kafka的企业级实时日志流处理平台.md.html">39 从0搭建基于Kafka的企业级实时日志流处理平台</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/40 Kafka Streams与其他流处理平台的差异在哪里.md.html">40 Kafka Streams与其他流处理平台的差异在哪里</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/41 Kafka Streams DSL开发实例.md.html">41 Kafka Streams DSL开发实例</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/42 Kafka Streams在金融领域的应用.md.html">42 Kafka Streams在金融领域的应用</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/加餐 搭建开发环境、阅读源码方法、经典学习资料大揭秘.md.html">加餐 搭建开发环境、阅读源码方法、经典学习资料大揭秘</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/结束语 以梦为马,莫负韶华!.md.html">结束语 以梦为马,莫负韶华!</a>
</li>
</ul>
</div>
</div>
<div class="sidebar-toggle" onclick="sidebar_toggle()" onmouseover="add_inner()" onmouseleave="remove_inner()">
<div class="sidebar-toggle-inner"></div>
</div>
<script>
function add_inner() {
let inner = document.querySelector('.sidebar-toggle-inner')
inner.classList.add('show')
}
function remove_inner() {
let inner = document.querySelector('.sidebar-toggle-inner')
inner.classList.remove('show')
}
function sidebar_toggle() {
let sidebar_toggle = document.querySelector('.sidebar-toggle')
let sidebar = document.querySelector('.book-sidebar')
let content = document.querySelector('.off-canvas-content')
if (sidebar_toggle.classList.contains('extend')) { // show
sidebar_toggle.classList.remove('extend')
sidebar.classList.remove('hide')
content.classList.remove('extend')
} else { // hide
sidebar_toggle.classList.add('extend')
sidebar.classList.add('hide')
content.classList.add('extend')
}
}
function open_sidebar() {
let sidebar = document.querySelector('.book-sidebar')
let overlay = document.querySelector('.off-canvas-overlay')
sidebar.classList.add('show')
overlay.classList.add('show')
}
function hide_canvas() {
let sidebar = document.querySelector('.book-sidebar')
let overlay = document.querySelector('.off-canvas-overlay')
sidebar.classList.remove('show')
overlay.classList.remove('show')
}
</script>
<div class="off-canvas-content">
<div class="columns">
<div class="column col-12 col-lg-12">
<div class="book-navbar">
<!-- For Responsive Layout -->
<header class="navbar">
<section class="navbar-section">
<a onclick="open_sidebar()">
<i class="icon icon-menu"></i>
</a>
</section>
</header>
</div>
<div class="book-content" style="max-width: 960px; margin: 0 auto;
overflow-x: auto;
overflow-y: hidden;">
<div class="book-post">
<p id="tip" align="center"></p>
<div><h1>18 Kafka中位移提交那些事儿</h1>
<p>你好,我是胡夕。今天我们来聊聊 Kafka 中位移提交的那些事儿。</p>
<p>之前我们说过Consumer 端有个位移的概念,它和消息在分区中的位移不是一回事儿,虽然它们的英文都是 Offset。今天我们要聊的位移是 Consumer 的消费位移,它记录了 Consumer 要消费的下一条消息的位移。这可能和你以前了解的有些出入,不过切记是下一条消息的位移,而不是目前最新消费消息的位移。</p>
<p>我来举个例子说明一下。假设一个分区中有 10 条消息,位移分别是 0 到 9。某个 Consumer 应用已消费了 5 条消息,这就说明该 Consumer 消费了位移为 0 到 4 的 5 条消息,此时 Consumer 的位移是 5指向了下一条消息的位移。</p>
<p><strong>Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移</strong>Committing Offsets。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即<strong>Consumer 需要为分配给它的每个分区提交各自的位移数据</strong></p>
<p>提交位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 发生故障重启之后,就能够从 Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。换句话说,位移提交是 Kafka 提供给你的一个工具或语义保障,你负责维持这个语义保障,即如果你提交了位移 X那么 Kafka 会认为所有位移值小于 X 的消息你都已经成功消费了。</p>
<p>这一点特别关键。因为位移提交非常灵活,你完全可以提交任何位移值,但由此产生的后果你也要一并承担。假设你的 Consumer 消费了 10 条消息,你提交的位移值却是 20那么从理论上讲位移介于 1119 之间的消息是有可能丢失的;相反地,如果你提交的位移值是 5那么位移介于 59 之间的消息就有可能被重复消费。所以,我想再强调一下,<strong>位移提交的语义保障是由你来负责的Kafka 只会“无脑”地接受你提交的位移</strong>。你对位移提交的管理直接影响了你的 Consumer 所能提供的消息语义保障。</p>
<p>鉴于位移提交甚至是位移管理对 Consumer 端的巨大影响Kafka特别是 KafkaConsumer API提供了多种提交位移的方法。<strong>从用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交</strong></p>
<p>我们先来说说自动提交和手动提交。所谓自动提交,就是指 Kafka Consumer 在后台默默地为你提交位移作为用户的你完全不必操心这些事而手动提交则是指你要自己提交位移Kafka Consumer 压根不管。</p>
<p>开启自动提交位移的方法很简单。Consumer 端有个参数 enable.auto.commit把它设置为 true 或者压根不设置它就可以了。因为它的默认值就是 true即 Java Consumer 默认就是自动提交位移的。如果启用了自动提交Consumer 端还有个参数就派上用场了auto.commit.interval.ms。它的默认值是 5 秒,表明 Kafka 每 5 秒会为你自动提交一次位移。</p>
<p>为了把这个问题说清楚,我给出了完整的 Java 代码。这段代码展示了设置自动提交位移的方法。有了这段代码做基础,今天后面的讲解我就不再展示完整的代码了。</p>
<pre><code>Properties props = new Properties();
props.put(&quot;bootstrap.servers&quot;, &quot;localhost:9092&quot;);
props.put(&quot;group.id&quot;, &quot;test&quot;);
props.put(&quot;enable.auto.commit&quot;, &quot;true&quot;);
props.put(&quot;auto.commit.interval.ms&quot;, &quot;2000&quot;);
props.put(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
props.put(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;&gt;(props);
consumer.subscribe(Arrays.asList(&quot;foo&quot;, &quot;bar&quot;));
while (true) {
ConsumerRecords&lt;String, String&gt; records = consumer.poll(100);
for (ConsumerRecord&lt;String, String&gt; record : records)
System.out.printf(&quot;offset = %d, key = %s, value = %s%n&quot;, record.offset(), record.key(), record.value());
}
</code></pre>
<p>上面的橙色粗体部分,就是开启自动提交位移的方法。总体来说,还是很简单的吧。</p>
<p>和自动提交相反的,就是手动提交了。开启手动提交位移的方法就是设置 enable.auto.commit 为 false。但是仅仅设置它为 false 还不够,因为你只是告诉 Kafka Consumer 不要自动提交位移而已,你还需要调用相应的 API 手动提交位移。</p>
<p>最简单的 API 就是<strong>KafkaConsumer#commitSync()</strong>。该方法会提交 KafkaConsumer#poll() 返回的最新位移。从名字上来看,它是一个同步操作,即该方法会一直等待,直到位移被成功提交才会返回。如果提交过程中出现异常,该方法会将异常信息抛出。下面这段代码展示了 commitSync() 的使用方法:</p>
<pre><code>while (true) {
ConsumerRecords&lt;String, String&gt; records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
try {
consumer.commitSync();
} catch (CommitFailedException e) {
handle(e); // 处理提交失败异常
}
}
</code></pre>
<p>可见,调用 consumer.commitSync() 方法的时机,是在你处理完了 poll() 方法返回的所有消息之后。如果你莽撞地过早提交了位移,就可能会出现消费数据丢失的情况。那么你可能会问,自动提交位移就不会出现消费数据丢失的情况了吗?它能恰到好处地把握时机进行位移提交吗?为了搞清楚这个问题,我们必须要深入地了解一下自动提交位移的顺序。</p>
<p>一旦设置了 enable.auto.commit 为 trueKafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于,<strong>它可能会出现重复消费</strong></p>
<p>在默认情况下Consumer 每 5 秒自动提交一次位移。现在,我们假设提交位移之后的 3 秒发生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。虽然你能够通过减少 auto.commit.interval.ms 的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷。</p>
<p>反观手动提交位移,它的好处就在于更加灵活,你完全能够把控位移提交的时机和频率。但是,它也有一个缺陷,就是在调用 commitSync() 时Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果,这个状态才会结束。在任何系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的 TPS。当然你可以选择拉长提交间隔但这样做的后果是 Consumer 的提交频率下降,在下次 Consumer 重启回来后,会有更多的消息被重新消费。</p>
<p>鉴于这个问题Kafka 社区为手动提交位移提供了另一个 API 方法:<strong>KafkaConsumer#commitAsync()</strong>。从名字上来看它就不是同步的,而是一个异步操作。调用 commitAsync() 之后,它会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。由于它是异步的Kafka 提供了回调函数callback供你实现提交之后的逻辑比如记录日志或处理异常等。下面这段代码展示了调用 commitAsync() 的方法:</p>
<pre><code>while (true) {
ConsumerRecords&lt;String, String&gt; records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
consumer.commitAsync((offsets, exception) -&gt; {
if (exception != null)
handle(exception);
});
}
</code></pre>
<p>commitAsync 是否能够替代 commitSync 呢答案是不能。commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。</p>
<p>显然,如果是手动提交,我们需要将 commitSync 和 commitAsync 组合使用才能到达最理想的效果,原因有两个:</p>
<ol>
<li>我们可以利用 commitSync 的自动重试来规避那些瞬时错误比如网络的瞬时抖动Broker 端 GC 等。因为这些问题都是短暂的,自动重试通常都会成功,因此,我们不想自己重试,而是希望 Kafka Consumer 帮我们做这件事。</li>
<li>我们不希望程序总处于阻塞状态,影响 TPS。</li>
</ol>
<p>我们来看一下下面这段代码,它展示的是如何将两个 API 方法结合使用进行手动提交。</p>
<pre><code> try {
while (true) {
ConsumerRecords&lt;String, String&gt; records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
commitAysnc(); // 使用异步提交规避阻塞
}
} catch (Exception e) {
handle(e); // 处理异常
} finally {
try {
consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
} finally {
consumer.close();
}
}
</code></pre>
<p>这段代码同时使用了 commitSync() 和 commitAsync()。对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。将两者结合后,我们既实现了异步无阻塞式的位移管理,也确保了 Consumer 位移的正确性,所以,如果你需要自行编写代码开发一套 Kafka Consumer 应用,那么我推荐你使用上面的代码范例来实现手动的位移提交。</p>
<p>我们说了自动提交和手动提交,也说了同步提交和异步提交,这些就是 Kafka 位移提交的全部了吗?其实,我们还差一部分。</p>
<p>实际上Kafka Consumer API 还提供了一组更为方便的方法,可以帮助你实现更精细化的位移管理功能。刚刚我们聊到的所有位移提交,都是提交 poll 方法返回的所有消息的位移,比如 poll 方法一次返回了 500 条消息,当你处理完这 500 条消息之后,前面我们提到的各种方法会一次性地将这 500 条消息的位移一并处理。简单来说,就是<strong>直接提交最新一条消息的位移</strong>。但如果我想更加细粒度化地提交位移,该怎么办呢?</p>
<p>设想这样一个场景:你的 poll 方法返回的不是 500 条消息,而是 5000 条。那么,你肯定不想把这 5000 条消息都处理完之后再提交位移,因为一旦中间出现差错,之前处理的全部都要重来一遍。这类似于我们数据库中的事务处理。很多时候,我们希望将一个大事务分割成若干个小事务分别提交,这能够有效减少错误恢复的时间。</p>
<p>在 Kafka 中也是相同的道理。对于一次要处理很多消息的 Consumer 而言,它会关心社区有没有方法允许它在消费的中间进行位移提交。比如前面这个 5000 条消息的例子,你可能希望每处理完 100 条消息就提交一次位移,这样能够避免大批量的消息重新消费。</p>
<p>庆幸的是Kafka Consumer API 为手动提交提供了这样的方法commitSync(Map&lt;TopicPartition, OffsetAndMetadata&gt;) 和 commitAsync(Map&lt;TopicPartition, OffsetAndMetadata&gt;)。它们的参数是一个 Map 对象,键就是 TopicPartition即消费的分区而值是一个 OffsetAndMetadata 对象,保存的主要是位移数据。</p>
<p>就拿刚刚提过的那个例子来说,如何每处理 100 条消息就提交一次位移呢?在这里,我以 commitAsync 为例展示一段代码实际上commitSync 的调用方法和它是一模一样的。</p>
<pre><code class="language-java">private Map&lt;TopicPartition, OffsetAndMetadata&gt; offsets = new HashMap&lt;&gt;();
int count = 0;
……
while (true) {
ConsumerRecords&lt;String, String&gt; records =
consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord&lt;String, String&gt; record: records) {
process(record); // 处理消息
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
ifcount % 100 == 0
consumer.commitAsync(offsets, null); // 回调处理逻辑是 null
count++;
}
}
</code></pre>
<p>简单解释一下这段代码。程序先是创建了一个 Map 对象,用于保存 Consumer 消费处理过程中要提交的分区位移,之后开始逐条处理消息,并构造要提交的位移值。还记得之前我说过要提交下一条消息的位移吗?这就是这里构造 OffsetAndMetadata 对象时,使用当前消息位移加 1 的原因。代码的最后部分是做位移的提交。我在这里设置了一个计数器,每累计 100 条消息就统一提交一次位移。与调用无参的 commitAsync 不同,这里调用了带 Map 对象参数的 commitAsync 进行细粒度的位移提交。这样,这段代码就能够实现每处理 100 条消息就提交一次位移,不用再受 poll 方法返回的消息总数的限制了。</p>
<h2>小结</h2>
<p>好了我们来总结一下今天的内容。Kafka Consumer 的位移提交,是实现 Consumer 端语义保障的重要手段。位移提交分为自动提交和手动提交,而手动提交又分为同步提交和异步提交。在实际使用过程中,推荐你使用手动提交机制,因为它更加可控,也更加灵活。另外,建议你同时采用同步提交和异步提交两种方式,这样既不影响 TPS又支持自动重试改善 Consumer 应用的高可用性。总之Kafka Consumer API 提供了多种灵活的提交方法,方便你根据自己的业务场景定制你的提交策略。</p>
<p><img src="assets/f2dc07889e489e1a6f5268f7139a17cc.jpeg" alt="img" /></p>
</div>
</div>
<div>
<div style="float: left">
<a href="/专栏/Kafka核心技术与实战/17 消费者组重平衡能避免吗?.md.html">上一页</a>
</div>
<div style="float: right">
<a href="/专栏/Kafka核心技术与实战/19 CommitFailedException异常怎么处理.md.html">下一页</a>
</div>
</div>
</div>
</div>
</div>
</div>
<a class="off-canvas-overlay" onclick="hide_canvas()"></a>
</div>
<script defer src="https://static.cloudflareinsights.com/beacon.min.js/v652eace1692a40cfa3763df669d7439c1639079717194" integrity="sha512-Gi7xpJR8tSkrpF7aordPZQlW2DLtzUlZcumS8dMQjwDHEnw9I7ZLyiOj/6tZStRBGtGgN6ceN6cMH8z7etPGlw==" data-cf-beacon='{"rayId":"709971f52b263d60","version":"2021.12.0","r":1,"token":"1f5d475227ce4f0089a7cff1ab17c0f5","si":100}' crossorigin="anonymous"></script>
</body>
<!-- Global site tag (gtag.js) - Google Analytics -->
<script async src="https://www.googletagmanager.com/gtag/js?id=G-NPSEEVD756"></script>
<script>
window.dataLayer = window.dataLayer || [];
function gtag() {
dataLayer.push(arguments);
}
gtag('js', new Date());
gtag('config', 'G-NPSEEVD756');
var path = window.location.pathname
var cookie = getCookie("lastPath");
console.log(path)
if (path.replace("/", "") === "") {
if (cookie.replace("/", "") !== "") {
console.log(cookie)
document.getElementById("tip").innerHTML = "<a href='" + cookie + "'>跳转到上次进度</a>"
}
} else {
setCookie("lastPath", path)
}
function setCookie(cname, cvalue) {
var d = new Date();
d.setTime(d.getTime() + (180 * 24 * 60 * 60 * 1000));
var expires = "expires=" + d.toGMTString();
document.cookie = cname + "=" + cvalue + "; " + expires + ";path = /";
}
function getCookie(cname) {
var name = cname + "=";
var ca = document.cookie.split(';');
for (var i = 0; i < ca.length; i++) {
var c = ca[i].trim();
if (c.indexOf(name) === 0) return c.substring(name.length, c.length);
}
return "";
}
</script>
</html>