learn.lianglianglee.com/专栏/Kafka核心技术与实战/30 怎么重设消费者组位移?.md.html
2022-05-11 18:52:13 +08:00

1171 lines
31 KiB
HTML
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<!DOCTYPE html>
<!-- saved from url=(0046)https://kaiiiz.github.io/hexo-theme-book-demo/ -->
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1.0, user-scalable=no">
<link rel="icon" href="/static/favicon.png">
<title>30 怎么重设消费者组位移?.md.html</title>
<!-- Spectre.css framework -->
<link rel="stylesheet" href="/static/index.css">
<!-- theme css & js -->
<meta name="generator" content="Hexo 4.2.0">
</head>
<body>
<div class="book-container">
<div class="book-sidebar">
<div class="book-brand">
<a href="/">
<img src="/static/favicon.png">
<span>技术文章摘抄</span>
</a>
</div>
<div class="book-menu uncollapsible">
<ul class="uncollapsible">
<li><a href="/" class="current-tab">首页</a></li>
</ul>
<ul class="uncollapsible">
<li><a href="../">上一级</a></li>
</ul>
<ul class="uncollapsible">
<li>
<a href="/专栏/Kafka核心技术与实战/00 开篇词 为什么要学习Kafka.md">00 开篇词 为什么要学习Kafka.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/01 消息引擎系统ABC.md">01 消息引擎系统ABC.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/02 一篇文章带你快速搞定Kafka术语.md">02 一篇文章带你快速搞定Kafka术语.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/03 Kafka只是消息引擎系统吗.md">03 Kafka只是消息引擎系统吗.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/04 我应该选择哪种Kafka.md">04 我应该选择哪种Kafka.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/05 聊聊Kafka的版本号.md">05 聊聊Kafka的版本号.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/06 Kafka线上集群部署方案怎么做.md">06 Kafka线上集群部署方案怎么做.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/07 最最最重要的集群参数配置(上).md">07 最最最重要的集群参数配置(上).md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/08 最最最重要的集群参数配置(下).md">08 最最最重要的集群参数配置(下).md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/09 生产者消息分区机制原理剖析.md">09 生产者消息分区机制原理剖析.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/10 生产者压缩算法面面观.md">10 生产者压缩算法面面观.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/11 无消息丢失配置怎么实现?.md">11 无消息丢失配置怎么实现?.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/12 客户端都有哪些不常见但是很高级的功能?.md">12 客户端都有哪些不常见但是很高级的功能?.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/13 Java生产者是如何管理TCP连接的.md">13 Java生产者是如何管理TCP连接的.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/14 幂等生产者和事务生产者是一回事吗?.md">14 幂等生产者和事务生产者是一回事吗?.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/15 消费者组到底是什么?.md">15 消费者组到底是什么?.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/16 揭开神秘的“位移主题”面纱.md">16 揭开神秘的“位移主题”面纱.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/17 消费者组重平衡能避免吗?.md">17 消费者组重平衡能避免吗?.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/18 Kafka中位移提交那些事儿.md">18 Kafka中位移提交那些事儿.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/19 CommitFailedException异常怎么处理.md">19 CommitFailedException异常怎么处理.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/20 多线程开发消费者实例.md">20 多线程开发消费者实例.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/21 Java 消费者是如何管理TCP连接的.md">21 Java 消费者是如何管理TCP连接的.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/22 消费者组消费进度监控都怎么实现?.md">22 消费者组消费进度监控都怎么实现?.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/23 Kafka副本机制详解.md">23 Kafka副本机制详解.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/24 请求是怎么被处理的?.md">24 请求是怎么被处理的?.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/25 消费者组重平衡全流程解析.md">25 消费者组重平衡全流程解析.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/26 你一定不能错过的Kafka控制器.md">26 你一定不能错过的Kafka控制器.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/27 关于高水位和Leader Epoch的讨论.md">27 关于高水位和Leader Epoch的讨论.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/28 主题管理知多少.md">28 主题管理知多少.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/29 Kafka动态配置了解下.md">29 Kafka动态配置了解下.md.html</a>
</li>
<li>
<a class="current-tab" href="/专栏/Kafka核心技术与实战/30 怎么重设消费者组位移?.md">30 怎么重设消费者组位移?.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/31 常见工具脚本大汇总.md">31 常见工具脚本大汇总.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/32 KafkaAdminClientKafka的运维利器.md">32 KafkaAdminClientKafka的运维利器.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/33 Kafka认证机制用哪家.md">33 Kafka认证机制用哪家.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/34 云环境下的授权该怎么做?.md">34 云环境下的授权该怎么做?.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/35 跨集群备份解决方案MirrorMaker.md">35 跨集群备份解决方案MirrorMaker.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/36 你应该怎么监控Kafka.md">36 你应该怎么监控Kafka.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/37 主流的Kafka监控框架.md">37 主流的Kafka监控框架.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/38 调优Kafka你做到了吗.md">38 调优Kafka你做到了吗.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/39 从0搭建基于Kafka的企业级实时日志流处理平台.md">39 从0搭建基于Kafka的企业级实时日志流处理平台.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/40 Kafka Streams与其他流处理平台的差异在哪里.md">40 Kafka Streams与其他流处理平台的差异在哪里.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/41 Kafka Streams DSL开发实例.md">41 Kafka Streams DSL开发实例.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/42 Kafka Streams在金融领域的应用.md">42 Kafka Streams在金融领域的应用.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/加餐 搭建开发环境、阅读源码方法、经典学习资料大揭秘.md">加餐 搭建开发环境、阅读源码方法、经典学习资料大揭秘.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/结束语 以梦为马,莫负韶华!.md">结束语 以梦为马,莫负韶华!.md.html</a>
</li>
</ul>
</div>
</div>
<div class="sidebar-toggle" onclick="sidebar_toggle()" onmouseover="add_inner()" onmouseleave="remove_inner()">
<div class="sidebar-toggle-inner"></div>
</div>
<script>
function add_inner() {
let inner = document.querySelector('.sidebar-toggle-inner')
inner.classList.add('show')
}
function remove_inner() {
let inner = document.querySelector('.sidebar-toggle-inner')
inner.classList.remove('show')
}
function sidebar_toggle() {
let sidebar_toggle = document.querySelector('.sidebar-toggle')
let sidebar = document.querySelector('.book-sidebar')
let content = document.querySelector('.off-canvas-content')
if (sidebar_toggle.classList.contains('extend')) { // show
sidebar_toggle.classList.remove('extend')
sidebar.classList.remove('hide')
content.classList.remove('extend')
} else { // hide
sidebar_toggle.classList.add('extend')
sidebar.classList.add('hide')
content.classList.add('extend')
}
}
function open_sidebar() {
let sidebar = document.querySelector('.book-sidebar')
let overlay = document.querySelector('.off-canvas-overlay')
sidebar.classList.add('show')
overlay.classList.add('show')
}
function hide_canvas() {
let sidebar = document.querySelector('.book-sidebar')
let overlay = document.querySelector('.off-canvas-overlay')
sidebar.classList.remove('show')
overlay.classList.remove('show')
}
</script>
<div class="off-canvas-content">
<div class="columns">
<div class="column col-12 col-lg-12">
<div class="book-navbar">
<!-- For Responsive Layout -->
<header class="navbar">
<section class="navbar-section">
<a onclick="open_sidebar()">
<i class="icon icon-menu"></i>
</a>
</section>
</header>
</div>
<div class="book-content" style="max-width: 960px; margin: 0 auto;
overflow-x: auto;
overflow-y: hidden;">
<div class="book-post">
<p id="tip" align="center"></p>
<div><h1>30 怎么重设消费者组位移?</h1>
<p>你好,我是胡夕。今天我要跟你分享的主题是:如何重设消费者组位移。</p>
<h2>为什么要重设消费者组位移?</h2>
<p>我们知道Kafka 和传统的消息引擎在设计上是有很大区别的其中一个比较显著的区别就是Kafka 的消费者读取消息是可以重演的replayable</p>
<p>像 RabbitMQ 或 ActiveMQ 这样的传统消息中间件它们处理和响应消息的方式是破坏性的destructive即一旦消息被成功处理就会被从 Broker 上删除。</p>
<p>反观 Kafka由于它是基于日志结构log-based的消息引擎消费者在消费消息时仅仅是从磁盘文件上读取数据而已是只读的操作因此消费者不会删除消息数据。同时由于位移数据是由消费者控制的因此它能够很容易地修改位移的值实现重复消费历史数据的功能。</p>
<p>对了,之前有很多同学在专栏的留言区提问:在实际使用场景中,我该如何确定是使用传统的消息中间件,还是使用 Kafka 呢我在这里统一回答一下。如果在你的场景中消息处理逻辑非常复杂处理代价很高同时你又不关心消息之间的顺序那么传统的消息中间件是比较合适的反之如果你的场景需要较高的吞吐量但每条消息的处理时间很短同时你又很在意消息的顺序此时Kafka 就是你的首选。</p>
<h2>重设位移策略</h2>
<p>不论是哪种设置方式,重设位移大致可以从两个维度来进行。</p>
<ol>
<li>位移维度。这是指根据位移值来重设。也就是说,直接把消费者的位移值重设成我们给定的位移值。</li>
<li>时间维度。我们可以给定一个时间,让消费者把位移调整成大于该时间的最小位移;也可以给出一段时间间隔,比如 30 分钟前,然后让消费者直接将位移调回 30 分钟之前的位移值。</li>
</ol>
<p>下面的这张表格罗列了 7 种重设策略。接下来,我来详细解释下这些策略。</p>
<p><img src="assets/eb469122e5af2c9f6baebb173b56bed5.jpeg" alt="img" /></p>
<p>Earliest 策略表示将位移调整到主题当前最早位移处。这个最早位移不一定就是 0因为在生产环境中很久远的消息会被 Kafka 自动删除,所以当前最早位移很可能是一个大于 0 的值。<strong>如果你想要重新消费主题的所有消息,那么可以使用 Earliest 策略</strong></p>
<p>Latest 策略表示把位移重设成最新末端位移。如果你总共向某个主题发送了 15 条消息,那么最新末端位移就是 15。<strong>如果你想跳过所有历史消息,打算从最新的消息处开始消费的话,可以使用 Latest 策略。</strong></p>
<p>Current 策略表示将位移调整成消费者当前提交的最新位移。有时候你可能会碰到这样的场景你修改了消费者程序代码并重启了消费者结果发现代码有问题你需要回滚之前的代码变更同时也要把位移重设到消费者重启时的位置那么Current 策略就可以帮你实现这个功能。</p>
<p>表中第 4 行的 Specified-Offset 策略则是比较通用的策略,表示消费者把位移值调整到你指定的位移处。<strong>这个策略的典型使用场景是,消费者程序在处理某条错误消息时,你可以手动地“跳过”此消息的处理</strong>。在实际使用过程中,可能会出现 corrupted 消息无法被消费的情形,此时消费者程序会抛出异常,无法继续工作。一旦碰到这个问题,你就可以尝试使用 Specified-Offset 策略来规避。</p>
<p>如果说 Specified-Offset 策略要求你指定位移的<strong>绝对数值</strong>的话,那么 Shift-By-N 策略指定的就是位移的<strong>相对数值</strong>,即你给出要跳过的一段消息的距离即可。这里的“跳”是双向的,你既可以向前“跳”,也可以向后“跳”。比如,你想把位移重设成当前位移的前 100 条位移处,此时你需要指定 N 为 -100。</p>
<p>刚刚讲到的这几种策略都是位移维度的,下面我们来聊聊从时间维度重设位移的 DateTime 和 Duration 策略。</p>
<p>DateTime 允许你指定一个时间,然后将位移重置到该时间之后的最早位移处。常见的使用场景是,你想重新消费昨天的数据,那么你可以使用该策略重设位移到昨天 0 点。</p>
<p>Duration 策略则是指给定相对的时间间隔,然后将位移调整到距离当前给定时间间隔的位移处,具体格式是 PnDTnHnMnS。如果你熟悉 Java 8 引入的 Duration 类的话,你应该不会对这个格式感到陌生。它就是一个符合 ISO-8601 规范的 Duration 格式,以字母 P 开头,后面由 4 部分组成,即 D、H、M 和 S分别表示天、小时、分钟和秒。举个例子如果你想将位移调回到 15 分钟前,那么你就可以指定 PT0H15M0S。</p>
<p>我会在后面分别给出这 7 种重设策略的实现方式。不过在此之前,我先来说一下重设位移的方法。目前,重设消费者组位移的方式有两种。</p>
<ul>
<li>通过消费者 API 来实现。</li>
<li>通过 kafka-consumer-groups 命令行脚本来实现。</li>
</ul>
<h2>消费者 API 方式设置</h2>
<p>首先,我们来看看如何通过 API 的方式来重设位移。我主要以 Java API 为例进行演示。如果你使用的是其他语言,方法应该是类似的,不过你要参考具体的 API 文档。</p>
<p>通过 Java API 的方式来重设位移,你需要调用 KafkaConsumer 的 seek 方法,或者是它的变种方法 seekToBeginning 和 seekToEnd。我们来看下它们的方法签名。</p>
<pre><code>void seek(TopicPartition partition, long offset);
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
void seekToBeginning(Collection&lt;TopicPartition&gt; partitions);
void seekToEnd(Collection&lt;TopicPartition&gt; partitions);
</code></pre>
<p>根据方法的定义,我们可以知道,每次调用 seek 方法只能重设一个分区的位移。OffsetAndMetadata 类是一个封装了 Long 型的位移和自定义元数据的复合类只是一般情况下自定义元数据为空因此你基本上可以认为这个类表征的主要是消息的位移值。seek 的变种方法 seekToBeginning 和 seekToEnd 则拥有一次重设多个分区的能力。我们在调用它们时,可以一次性传入多个主题分区。</p>
<p>好了,有了这些方法,我们就可以逐一地实现上面提到的 7 种策略了。我们先来看 Earliest 策略的实现方式,代码如下:</p>
<pre><code>Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, &quot;earliest&quot;);
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
String topic = &quot;test&quot;; // 要重设位移的 Kafka 主题
try (final KafkaConsumer&lt;String, String&gt; consumer =
new KafkaConsumer&lt;&gt;(consumerProperties)) {
consumer.subscribe(Collections.singleton(topic));
consumer.poll(0);
consumer.seekToBeginning(
consumer.partitionsFor(topic).stream().map(partitionInfo -&gt;
new TopicPartition(topic, partitionInfo.partition()))
.collect(Collectors.toList()));
}
</code></pre>
<p>这段代码中有几个比较关键的部分,你需要注意一下。</p>
<ol>
<li>你要创建的消费者程序,要禁止自动提交位移。</li>
<li>组 ID 要设置成你要重设的消费者组的组 ID。</li>
<li>调用 seekToBeginning 方法时,需要一次性构造主题的所有分区对象。</li>
<li>最重要的是,一定要调用带长整型的 poll 方法,而不要调用 consumer.poll(Duration.ofSecond(0))。</li>
</ol>
<p>虽然社区已经不推荐使用 poll(long) 了,但短期内应该不会移除它,所以你可以放心使用。另外,为了避免重复,在后面的实例中,我只给出最关键的代码。</p>
<p>Latest 策略和 Earliest 是类似的,我们只需要使用 seekToEnd 方法即可,如下面的代码所示:</p>
<pre><code>consumer.seekToEnd(
consumer.partitionsFor(topic).stream().map(partitionInfo -&gt;
new TopicPartition(topic, partitionInfo.partition()))
.collect(Collectors.toList()));
</code></pre>
<p>实现 Current 策略的方法很简单,我们需要借助 KafkaConsumer 的 committed 方法来获取当前提交的最新位移,代码如下:</p>
<pre><code>consumer.partitionsFor(topic).stream().map(info -&gt;
new TopicPartition(topic, info.partition()))
.forEach(tp -&gt; {
long committedOffset = consumer.committed(tp).offset();
consumer.seek(tp, committedOffset);
});
</code></pre>
<p>这段代码首先调用 partitionsFor 方法获取给定主题的所有分区,然后依次获取对应分区上的已提交位移,最后通过 seek 方法重设位移到已提交位移处。</p>
<p>如果要实现 Specified-Offset 策略,直接调用 seek 方法即可,如下所示:</p>
<pre><code>long targetOffset = 1234L;
for (PartitionInfo info : consumer.partitionsFor(topic)) {
TopicPartition tp = new TopicPartition(topic, info.partition());
consumer.seek(tp, targetOffset);
}
</code></pre>
<p>这次我没有使用 Java 8 Streams 的写法,如果你不熟悉 Lambda 表达式以及 Java 8 的 Streams这种写法可能更加符合你的习惯。</p>
<p>接下来我们来实现 Shift-By-N 策略,主体代码逻辑如下:</p>
<pre><code>for (PartitionInfo info : consumer.partitionsFor(topic)) {
TopicPartition tp = new TopicPartition(topic, info.partition());
// 假设向前跳 123 条消息
long targetOffset = consumer.committed(tp).offset() + 123L;
consumer.seek(tp, targetOffset);
}
</code></pre>
<p>如果要实现 DateTime 策略,我们需要借助另一个方法:<strong>KafkaConsumer.</strong> <strong>offsetsForTimes 方法</strong>。假设我们要重设位移到 2019 年 6 月 20 日晚上 8 点,那么具体代码如下:</p>
<pre><code>long ts = LocalDateTime.of(
2019, 6, 20, 20, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
Map&lt;TopicPartition, Long&gt; timeToSearch =
consumer.partitionsFor(topic).stream().map(info -&gt;
new TopicPartition(topic, info.partition()))
.collect(Collectors.toMap(Function.identity(), tp -&gt; ts));
for (Map.Entry&lt;TopicPartition, OffsetAndTimestamp&gt; entry :
consumer.offsetsForTimes(timeToSearch).entrySet()) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}
</code></pre>
<p>这段代码构造了 LocalDateTime 实例,然后利用它去查找对应的位移值,最后调用 seek实现了重设位移。</p>
<p>最后,我来给出实现 Duration 策略的代码。假设我们要将位移调回 30 分钟前,那么代码如下:</p>
<pre><code>Map&lt;TopicPartition, Long&gt; timeToSearch = consumer.partitionsFor(topic).stream()
.map(info -&gt; new TopicPartition(topic, info.partition()))
.collect(Collectors.toMap(Function.identity(), tp -&gt; System.currentTimeMillis() - 30 * 1000 * 60));
for (Map.Entry&lt;TopicPartition, OffsetAndTimestamp&gt; entry :
consumer.offsetsForTimes(timeToSearch).entrySet()) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}
</code></pre>
<p><strong>总之,使用 Java API 的方式来实现重设策略的主要入口方法,就是 seek 方法</strong></p>
<h2>命令行方式设置</h2>
<p>位移重设还有另一个重要的途径:<strong>通过 kafka-consumer-groups 脚本</strong>。需要注意的是,这个功能是在 Kafka 0.11 版本中新引入的。这就是说,如果你使用的 Kafka 是 0.11 版本之前的,那么你只能使用 API 的方式来重设位移。</p>
<p>比起 API 的方式,用命令行重设位移要简单得多。针对我们刚刚讲过的 7 种策略,有 7 个对应的参数。下面我来一一给出实例。</p>
<p>Earliest 策略直接指定**to-earliest**。</p>
<pre><code>bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest execute
</code></pre>
<p>Latest 策略直接指定**to-latest**。</p>
<pre><code>bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute
</code></pre>
<p>Current 策略直接指定**to-current**。</p>
<pre><code>bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute
</code></pre>
<p>Specified-Offset 策略直接指定**to-offset**。</p>
<pre><code>bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset &lt;offset&gt; --execute
</code></pre>
<p>Shift-By-N 策略直接指定**shift-by N**。</p>
<pre><code>bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by &lt;offset_N&gt; --execute
</code></pre>
<p>DateTime 策略直接指定**to-datetime**。</p>
<pre><code>bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute
</code></pre>
<p>最后是实现 Duration 策略,我们直接指定**by-duration**。</p>
<pre><code>bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H30M0S --execute
</code></pre>
<h2>小结</h2>
<p>至此,重设消费者组位移的 2 种方式我都讲完了。我们来小结一下。今天,我们主要讨论了在 Kafka 中,为什么要重设位移以及如何重设消费者组位移。重设位移主要是为了实现消息的重演。目前 Kafka 支持 7 种重设策略和 2 种重设方法。在实际使用过程中,我推荐你使用第 2 种方法即用命令行的方式来重设位移。毕竟执行命令要比写程序容易得多。但是需要注意的是0.11 及 0.11 版本之后的 Kafka 才提供了用命令行调整位移的方法。如果你使用的是之前的版本,那么就只能依靠 API 的方式了。</p>
<p><img src="assets/dfc41feeb1e5a3977cb720388891b11e.png" alt="img" /></p>
</div>
</div>
<div>
<div style="float: left">
<a href="/专栏/Kafka核心技术与实战/29 Kafka动态配置了解下.md">上一页</a>
</div>
<div style="float: right">
<a href="/专栏/Kafka核心技术与实战/31 常见工具脚本大汇总.md">下一页</a>
</div>
</div>
</div>
</div>
</div>
</div>
<a class="off-canvas-overlay" onclick="hide_canvas()"></a>
</div>
<script defer src="https://static.cloudflareinsights.com/beacon.min.js/v652eace1692a40cfa3763df669d7439c1639079717194" integrity="sha512-Gi7xpJR8tSkrpF7aordPZQlW2DLtzUlZcumS8dMQjwDHEnw9I7ZLyiOj/6tZStRBGtGgN6ceN6cMH8z7etPGlw==" data-cf-beacon='{"rayId":"7099720f4ccd3d60","version":"2021.12.0","r":1,"token":"1f5d475227ce4f0089a7cff1ab17c0f5","si":100}' crossorigin="anonymous"></script>
</body>
<!-- Global site tag (gtag.js) - Google Analytics -->
<script async src="https://www.googletagmanager.com/gtag/js?id=G-NPSEEVD756"></script>
<script>
window.dataLayer = window.dataLayer || [];
function gtag() {
dataLayer.push(arguments);
}
gtag('js', new Date());
gtag('config', 'G-NPSEEVD756');
var path = window.location.pathname
var cookie = getCookie("lastPath");
console.log(path)
if (path.replace("/", "") === "") {
if (cookie.replace("/", "") !== "") {
console.log(cookie)
document.getElementById("tip").innerHTML = "<a href='" + cookie + "'>跳转到上次进度</a>"
}
} else {
setCookie("lastPath", path)
}
function setCookie(cname, cvalue) {
var d = new Date();
d.setTime(d.getTime() + (180 * 24 * 60 * 60 * 1000));
var expires = "expires=" + d.toGMTString();
document.cookie = cname + "=" + cvalue + "; " + expires + ";path = /";
}
function getCookie(cname) {
var name = cname + "=";
var ca = document.cookie.split(';');
for (var i = 0; i < ca.length; i++) {
var c = ca[i].trim();
if (c.indexOf(name) === 0) return c.substring(name.length, c.length);
}
return "";
}
</script>
</html>