learn.lianglianglee.com/专栏/Kafka核心技术与实战/42 Kafka Streams在金融领域的应用.md.html
2022-05-11 18:57:05 +08:00

1219 lines
33 KiB
HTML
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<!DOCTYPE html>
<!-- saved from url=(0046)https://kaiiiz.github.io/hexo-theme-book-demo/ -->
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1.0, user-scalable=no">
<link rel="icon" href="/static/favicon.png">
<title>42 Kafka Streams在金融领域的应用.md.html</title>
<!-- Spectre.css framework -->
<link rel="stylesheet" href="/static/index.css">
<!-- theme css & js -->
<meta name="generator" content="Hexo 4.2.0">
</head>
<body>
<div class="book-container">
<div class="book-sidebar">
<div class="book-brand">
<a href="/">
<img src="/static/favicon.png">
<span>技术文章摘抄</span>
</a>
</div>
<div class="book-menu uncollapsible">
<ul class="uncollapsible">
<li><a href="/" class="current-tab">首页</a></li>
</ul>
<ul class="uncollapsible">
<li><a href="../">上一级</a></li>
</ul>
<ul class="uncollapsible">
<li>
<a href="/专栏/Kafka核心技术与实战/00 开篇词 为什么要学习Kafka.md.html">00 开篇词 为什么要学习Kafka.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/01 消息引擎系统ABC.md.html">01 消息引擎系统ABC.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/02 一篇文章带你快速搞定Kafka术语.md.html">02 一篇文章带你快速搞定Kafka术语.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/03 Kafka只是消息引擎系统吗.md.html">03 Kafka只是消息引擎系统吗.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/04 我应该选择哪种Kafka.md.html">04 我应该选择哪种Kafka.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/05 聊聊Kafka的版本号.md.html">05 聊聊Kafka的版本号.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/06 Kafka线上集群部署方案怎么做.md.html">06 Kafka线上集群部署方案怎么做.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/07 最最最重要的集群参数配置(上).md.html">07 最最最重要的集群参数配置(上).md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/08 最最最重要的集群参数配置(下).md.html">08 最最最重要的集群参数配置(下).md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/09 生产者消息分区机制原理剖析.md.html">09 生产者消息分区机制原理剖析.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/10 生产者压缩算法面面观.md.html">10 生产者压缩算法面面观.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/11 无消息丢失配置怎么实现?.md.html">11 无消息丢失配置怎么实现?.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/12 客户端都有哪些不常见但是很高级的功能?.md.html">12 客户端都有哪些不常见但是很高级的功能?.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/13 Java生产者是如何管理TCP连接的.md.html">13 Java生产者是如何管理TCP连接的.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/14 幂等生产者和事务生产者是一回事吗?.md.html">14 幂等生产者和事务生产者是一回事吗?.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/15 消费者组到底是什么?.md.html">15 消费者组到底是什么?.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/16 揭开神秘的“位移主题”面纱.md.html">16 揭开神秘的“位移主题”面纱.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/17 消费者组重平衡能避免吗?.md.html">17 消费者组重平衡能避免吗?.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/18 Kafka中位移提交那些事儿.md.html">18 Kafka中位移提交那些事儿.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/19 CommitFailedException异常怎么处理.md.html">19 CommitFailedException异常怎么处理.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/20 多线程开发消费者实例.md.html">20 多线程开发消费者实例.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/21 Java 消费者是如何管理TCP连接的.md.html">21 Java 消费者是如何管理TCP连接的.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/22 消费者组消费进度监控都怎么实现?.md.html">22 消费者组消费进度监控都怎么实现?.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/23 Kafka副本机制详解.md.html">23 Kafka副本机制详解.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/24 请求是怎么被处理的?.md.html">24 请求是怎么被处理的?.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/25 消费者组重平衡全流程解析.md.html">25 消费者组重平衡全流程解析.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/26 你一定不能错过的Kafka控制器.md.html">26 你一定不能错过的Kafka控制器.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/27 关于高水位和Leader Epoch的讨论.md.html">27 关于高水位和Leader Epoch的讨论.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/28 主题管理知多少.md.html">28 主题管理知多少.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/29 Kafka动态配置了解下.md.html">29 Kafka动态配置了解下.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/30 怎么重设消费者组位移?.md.html">30 怎么重设消费者组位移?.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/31 常见工具脚本大汇总.md.html">31 常见工具脚本大汇总.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/32 KafkaAdminClientKafka的运维利器.md.html">32 KafkaAdminClientKafka的运维利器.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/33 Kafka认证机制用哪家.md.html">33 Kafka认证机制用哪家.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/34 云环境下的授权该怎么做?.md.html">34 云环境下的授权该怎么做?.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/35 跨集群备份解决方案MirrorMaker.md.html">35 跨集群备份解决方案MirrorMaker.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/36 你应该怎么监控Kafka.md.html">36 你应该怎么监控Kafka.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/37 主流的Kafka监控框架.md.html">37 主流的Kafka监控框架.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/38 调优Kafka你做到了吗.md.html">38 调优Kafka你做到了吗.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/39 从0搭建基于Kafka的企业级实时日志流处理平台.md.html">39 从0搭建基于Kafka的企业级实时日志流处理平台.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/40 Kafka Streams与其他流处理平台的差异在哪里.md.html">40 Kafka Streams与其他流处理平台的差异在哪里.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/41 Kafka Streams DSL开发实例.md.html">41 Kafka Streams DSL开发实例.md.html</a>
</li>
<li>
<a class="current-tab" href="/专栏/Kafka核心技术与实战/42 Kafka Streams在金融领域的应用.md.html">42 Kafka Streams在金融领域的应用.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/加餐 搭建开发环境、阅读源码方法、经典学习资料大揭秘.md.html">加餐 搭建开发环境、阅读源码方法、经典学习资料大揭秘.md.html</a>
</li>
<li>
<a href="/专栏/Kafka核心技术与实战/结束语 以梦为马,莫负韶华!.md.html">结束语 以梦为马,莫负韶华!.md.html</a>
</li>
</ul>
</div>
</div>
<div class="sidebar-toggle" onclick="sidebar_toggle()" onmouseover="add_inner()" onmouseleave="remove_inner()">
<div class="sidebar-toggle-inner"></div>
</div>
<script>
function add_inner() {
let inner = document.querySelector('.sidebar-toggle-inner')
inner.classList.add('show')
}
function remove_inner() {
let inner = document.querySelector('.sidebar-toggle-inner')
inner.classList.remove('show')
}
function sidebar_toggle() {
let sidebar_toggle = document.querySelector('.sidebar-toggle')
let sidebar = document.querySelector('.book-sidebar')
let content = document.querySelector('.off-canvas-content')
if (sidebar_toggle.classList.contains('extend')) { // show
sidebar_toggle.classList.remove('extend')
sidebar.classList.remove('hide')
content.classList.remove('extend')
} else { // hide
sidebar_toggle.classList.add('extend')
sidebar.classList.add('hide')
content.classList.add('extend')
}
}
function open_sidebar() {
let sidebar = document.querySelector('.book-sidebar')
let overlay = document.querySelector('.off-canvas-overlay')
sidebar.classList.add('show')
overlay.classList.add('show')
}
function hide_canvas() {
let sidebar = document.querySelector('.book-sidebar')
let overlay = document.querySelector('.off-canvas-overlay')
sidebar.classList.remove('show')
overlay.classList.remove('show')
}
</script>
<div class="off-canvas-content">
<div class="columns">
<div class="column col-12 col-lg-12">
<div class="book-navbar">
<!-- For Responsive Layout -->
<header class="navbar">
<section class="navbar-section">
<a onclick="open_sidebar()">
<i class="icon icon-menu"></i>
</a>
</section>
</header>
</div>
<div class="book-content" style="max-width: 960px; margin: 0 auto;
overflow-x: auto;
overflow-y: hidden;">
<div class="book-post">
<p id="tip" align="center"></p>
<div><h1>42 Kafka Streams在金融领域的应用</h1>
<p>你好我是胡夕。今天我要和你分享的主题是Kafka Streams 在金融领域的应用。</p>
<h2>背景</h2>
<p>金融领域囊括的内容有很多,我今天分享的主要是,如何利用大数据技术,特别是 Kafka Streams 实时计算框架,来帮助我们更好地做企业用户洞察。</p>
<p>众所周知金融领域内的获客成本是相当高的一线城市高净值白领的获客成本通常可达上千元。面对如此巨大的成本压力金融企业一方面要降低广告投放的获客成本另一方面要做好精细化运营实现客户生命周期内价值Custom Lifecycle Value, CLV的最大化。</p>
<p><strong>实现价值最大化的一个重要途径就是做好用户洞察,而用户洞察要求你要更深度地了解你的客户</strong>,即所谓的 Know Your CustomerKYC真正做到以客户为中心不断地满足客户需求。</p>
<p>为了实现 KYC传统的做法是花费大量的时间与客户见面做面对面的沟通以了解客户的情况。但是用这种方式得到的数据往往是不真实的毕竟客户内心是有潜在的自我保护意识的短时间内的面对面交流很难真正洞察到客户的真实诉求。</p>
<p>相反地渗透到每个人日常生活方方面面的大数据信息则代表了客户的实际需求。比如客户经常浏览哪些网站、都买过什么东西、最喜欢的视频类型是什么。这些数据看似很随意但都表征了客户最真实的想法。将这些数据汇总在一起我们就能完整地构造出客户的画像这就是所谓的用户画像User Profile技术。</p>
<h2>用户画像</h2>
<p>用户画像听起来很玄妙但实际上你应该是很熟悉的。你的很多基本信息比如性别、年龄、所属行业、工资收入和爱好等都是用户画像的一部分。举个例子我们可以这样描述一个人某某某男性28 岁,未婚,工资水平大致在 15000 到 20000 元之间,是一名大数据开发工程师,居住在北京天通苑小区,平时加班很多,喜欢动漫或游戏。</p>
<p>其实这一连串的描述就是典型的用户画像。通俗点来说构建用户画像的核心工作就是给客户或用户打标签Tagging。刚刚那一连串的描述就是用户系统中的典型标签。用户画像系统通过打标签的形式把客户提供给业务人员从而实现精准营销。</p>
<h2>ID 映射ID Mapping</h2>
<p>用户画像的好处不言而喻,而且标签打得越多越丰富,就越能精确地表征一个人的方方面面。不过,在打一个个具体的标签之前,弄清楚“你是谁”是所有用户画像系统首要考虑的问题,这个问题也被称为 ID 识别问题。</p>
<p>所谓的 ID 即 Identification表示用户身份。在网络上能够标识用户身份信息的常见 ID 有 5 种。</p>
<ul>
<li>身份证号:这是最能表征身份的 ID 信息,每个身份证号只会对应一个人。</li>
<li>手机号:手机号通常能较好地表征身份。虽然会出现同一个人有多个手机号或一个手机号在不同时期被多个人使用的情形,但大部分互联网应用使用手机号表征用户身份的做法是很流行的。</li>
<li>设备 ID在移动互联网时代这主要是指手机的设备 ID 或 Mac、iPad 等移动终端设备的设备 ID。特别是手机的设备 ID在很多场景下具备定位和识别用户的功能。常见的设备 ID 有 iOS 端的 IDFA 和 Android 端的 IMEI。</li>
<li>应用注册账号:这属于比较弱的一类 ID。每个人在不同的应用上可能会注册不同的账号但依然有很多人使用通用的注册账号名称因此具有一定的关联性和识别性。</li>
<li>Cookie在 PC 时代,浏览器端的 Cookie 信息是很重要的数据它是网络上表征用户信息的重要手段之一。只不过随着移动互联网时代的来临Cookie 早已江河日下,如今作为 ID 数据的价值也越来越小了。我个人甚至认为在构建基于移动互联网的新一代用户画像时Cookie 可能要被抛弃了。</li>
</ul>
<p>在构建用户画像系统时,我们会从多个数据源上源源不断地收集各种个人用户数据。通常情况下,这些数据不会全部携带以上这些 ID 信息。比如在读取浏览器的浏览历史时,你获取的是 Cookie 数据,而读取用户在某个 App 上的访问行为数据时,你拿到的是用户的设备 ID 和注册账号信息。</p>
<p>倘若这些数据表征的都是一个用户的信息,我们的用户画像系统如何识别出来呢?换句话说,你需要一种手段或技术帮你做各个 ID 的打通或映射。这就是用户画像领域的 ID 映射问题。</p>
<h2>实时 ID Mapping</h2>
<p>我举个简单的例子。假设有一个金融理财用户张三他首先在苹果手机上访问了某理财产品然后在安卓手机上注册了该理财产品的账号最后在电脑上登录该账号并购买了该理财产品。ID Mapping 就是要将这些不同端或设备上的用户信息聚合起来,然后找出并打通用户所关联的所有 ID 信息。</p>
<p>实时 ID Mapping 的要求就更高了,它要求我们能够实时地分析从各个设备收集来的数据,并在很短的时间内完成 ID Mapping。打通用户 ID 身份的时间越短,我们就能越快地为其打上更多的标签,从而让用户画像发挥更大的价值。</p>
<p>从实时计算或流处理的角度来看,实时 ID Mapping 能够转换成一个<strong>流 - 表连接问题</strong>Stream-Table Join即我们实时地将一个流和一个表进行连接。</p>
<p>消息流中的每个事件或每条消息包含的是一个未知用户的某种信息,它可以是用户在页面的访问记录数据,也可以是用户的购买行为数据。这些消息中可能会包含我们刚才提到的若干种 ID 信息,比如页面访问信息中可能包含设备 ID也可能包含注册账号而购买行为信息中可能包含身份证信息和手机号等。</p>
<p>连接的另一方表保存的是<strong>用户所有的 ID 信息</strong>,随着连接的不断深入,表中保存的 ID 品类会越来越丰富,也就是说,流中的数据会被不断地补充进表中,最终实现对用户所有 ID 的打通。</p>
<h2>Kafka Streams 实现</h2>
<p>好了,现在我们就来看看如何使用 Kafka Streams 来实现一个特定场景下的实时 ID Mapping。为了方便理解我们假设 ID Mapping 只关心身份证号、手机号以及设备 ID。下面是用 Avro 写成的 Schema 格式:</p>
<pre><code>{
&quot;namespace&quot;: &quot;kafkalearn.userprofile.idmapping&quot;,
&quot;type&quot;: &quot;record&quot;,
&quot;name&quot;: &quot;IDMapping&quot;,
&quot;fields&quot;: [
{&quot;name&quot;: &quot;deviceId&quot;, &quot;type&quot;: &quot;string&quot;},
{&quot;name&quot;: &quot;idCard&quot;, &quot;type&quot;: &quot;string&quot;},
{&quot;name&quot;: &quot;phone&quot;, &quot;type&quot;: &quot;string&quot;}
]
}
</code></pre>
<p>顺便说一下,<strong>Avro 是 Java 或大数据生态圈常用的序列化编码机制</strong>,比如直接使用 JSON 或 XML 保存对象。Avro 能极大地节省磁盘占用空间或网络 I/O 传输量,因此普遍应用于大数据量下的数据传输。</p>
<p>在这个场景下,我们需要两个 Kafka 主题,一个用于构造表,另一个用于构建流。这两个主题的消息格式都是上面的 IDMapping 对象。</p>
<p>新用户在填写手机号注册 App 时,会向第一个主题发送一条消息,该用户后续在 App 上的所有访问记录,也都会以消息的形式发送到第二个主题。值得注意的是,发送到第二个主题上的消息有可能携带其他的 ID 信息,比如手机号或设备 ID 等。就像我刚刚所说的,这是一个典型的流 - 表实时连接场景,连接之后,我们就能够将用户的所有数据补齐,实现 ID Mapping 的打通。</p>
<p>基于这个设计思路,我先给出完整的 Kafka Streams 代码,稍后我会对重点部分进行详细解释:</p>
<pre><code>package kafkalearn.userprofile.idmapping;
// omit imports……
public class IDMappingStreams {
public static void main(String[] args) throws Exception {
if (args.length &lt; 1) {
throw new IllegalArgumentException(&quot;Must specify the path for a configuration file.&quot;);
}
IDMappingStreams instance = new IDMappingStreams();
Properties envProps = instance.loadProperties(args[0]);
Properties streamProps = instance.buildStreamsProperties(envProps);
Topology topology = instance.buildTopology(envProps);
instance.createTopics(envProps);
final KafkaStreams streams = new KafkaStreams(topology, streamProps);
final CountDownLatch latch = new CountDownLatch(1);
// Attach shutdown handler to catch Control-C.
Runtime.getRuntime().addShutdownHook(new Thread(&quot;streams-shutdown-hook&quot;) {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
private Properties loadProperties(String propertyFilePath) throws IOException {
Properties envProps = new Properties();
try (FileInputStream input = new FileInputStream(propertyFilePath)) {
envProps.load(input);
return envProps;
}
}
private Properties buildStreamsProperties(Properties envProps) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, envProps.getProperty(&quot;application.id&quot;));
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty(&quot;bootstrap.servers&quot;));
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return props;
}
private void createTopics(Properties envProps) {
Map&lt;String, Object&gt; config = new HashMap&lt;&gt;();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty(&quot;bootstrap.servers&quot;));
try (AdminClient client = AdminClient.create(config)) {
List&lt;NewTopic&gt; topics = new ArrayList&lt;&gt;();
topics.add(new NewTopic(
envProps.getProperty(&quot;stream.topic.name&quot;),
Integer.parseInt(envProps.getProperty(&quot;stream.topic.partitions&quot;)),
Short.parseShort(envProps.getProperty(&quot;stream.topic.replication.factor&quot;))));
topics.add(new NewTopic(
envProps.getProperty(&quot;table.topic.name&quot;),
Integer.parseInt(envProps.getProperty(&quot;table.topic.partitions&quot;)),
Short.parseShort(envProps.getProperty(&quot;table.topic.replication.factor&quot;))));
client.createTopics(topics);
}
}
private Topology buildTopology(Properties envProps) {
final StreamsBuilder builder = new StreamsBuilder();
final String streamTopic = envProps.getProperty(&quot;stream.topic.name&quot;);
final String rekeyedTopic = envProps.getProperty(&quot;rekeyed.topic.name&quot;);
final String tableTopic = envProps.getProperty(&quot;table.topic.name&quot;);
final String outputTopic = envProps.getProperty(&quot;output.topic.name&quot;);
final Gson gson = new Gson();
// 1. 构造表
KStream&lt;String, IDMapping&gt; rekeyed = builder.&lt;String, String&gt;stream(tableTopic)
.mapValues(json -&gt; gson.fromJson(json, IDMapping.class))
.filter((noKey, idMapping) -&gt; !Objects.isNull(idMapping.getPhone()))
.map((noKey, idMapping) -&gt; new KeyValue&lt;&gt;(idMapping.getPhone(), idMapping));
rekeyed.to(rekeyedTopic);
KTable&lt;String, IDMapping&gt; table = builder.table(rekeyedTopic);
// 2. 流 - 表连接
KStream&lt;String, String&gt; joinedStream = builder.&lt;String, String&gt;stream(streamTopic)
.mapValues(json -&gt; gson.fromJson(json, IDMapping.class))
.map((noKey, idMapping) -&gt; new KeyValue&lt;&gt;(idMapping.getPhone(), idMapping))
.leftJoin(table, (value1, value2) -&gt; IDMapping.newBuilder()
.setPhone(value2.getPhone() == null ? value1.getPhone() : value2.getPhone())
.setDeviceId(value2.getDeviceId() == null ? value1.getDeviceId() : value2.getDeviceId())
.setIdCard(value2.getIdCard() == null ? value1.getIdCard() : value2.getIdCard())
.build())
.mapValues(v -&gt; gson.toJson(v));
joinedStream.to(outputTopic);
return builder.build();
}
}
</code></pre>
<p>这个 Java 类代码中最重要的方法是<strong>buildTopology 函数</strong>,它构造了我们打通 ID Mapping 的所有逻辑。</p>
<p>在该方法中,我们首先构造了 StreamsBuilder 对象实例,这是构造任何 Kafka Streams 应用的第一步。之后我们读取配置文件,获取了要读写的所有 Kafka 主题名。在这个例子中,我们需要用到 4 个主题,它们的作用如下:</p>
<ul>
<li>streamTopic保存用户登录 App 后发生的各种行为数据,格式是 IDMapping 对象的 JSON 串。你可能会问,前面不是都创建 Avro Schema 文件了吗,怎么这里又用回 JSON 了呢?原因是这样的:社区版的 Kafka 没有提供 Avro 的序列化 / 反序列化类支持,如果我要使用 Avro必须改用 Confluent 公司提供的 Kafka但这会偏离我们专栏想要介绍 Apache Kafka 的初衷。所以,我还是使用 JSON 进行说明。这里我只是用了 Avro Code Generator 帮我们提供 IDMapping 对象各个字段的 set/get 方法,你使用 Lombok 也是可以的。</li>
<li>rekeyedTopic这个主题是一个中间主题它将 streamTopic 中的手机号提取出来作为消息的 Key同时维持消息体不变。</li>
<li>tableTopic保存用户注册 App 时填写的手机号。我们要使用这个主题构造连接时要用到的表数据。</li>
<li>outputTopic保存连接后的输出信息即打通了用户所有 ID 数据的 IDMapping 对象,将其转换成 JSON 后输出。</li>
</ul>
<p>buildTopology 的第一步是构造表,即 KTable 对象。我们修改初始的消息流,以用户注册的手机号作为 Key构造了一个中间流之后将这个流写入到 rekeyedTopic最后直接使用 builder.table 方法构造出 KTable。这样每当有新用户注册时该 KTable 都会新增一条数据。</p>
<p>有了表之后,我们继续构造消息流来封装用户登录 App 之后的行为数据,我们同样提取出手机号作为要连接的 Key之后使用 KStream 的<strong>leftJoin 方法</strong>将其与上一步的 KTable 对象进行关联。</p>
<p>在关联的过程中,我们同时提取两边的信息,尽可能地补充到最后生成的 IDMapping 对象中,然后将这个生成的 IDMapping 实例返回到新生成的流中。最后,我们将它写入到 outputTopic 中保存。</p>
<p>至此,我们使用了不到 200 行的 Java 代码,就简单实现了一个真实场景下的实时 ID Mapping 任务。理论上,你可以将这个例子继续扩充,扩展到任意多个 ID Mapping甚至是含有其他标签的数据连接原理是相通的。在我自己的项目中我借助于 Kafka Streams 帮助我实现了用户画像系统的部分功能,而 ID Mapping 就是其中的一个。</p>
<h2>小结</h2>
<p>好了,我们小结一下。今天,我展示了 Kafka Streams 在金融领域的一个应用案例重点演示了如何利用连接函数来实时关联流和表。其实Kafka Streams 提供的功能远不止这些,我推荐你阅读一下<a href="https://kafka.apache.org/23/documentation/streams/developer-guide/">官网</a>的教程,然后把自己的一些轻量级的实时计算线上任务改为使用 Kafka Streams 来实现。</p>
<p><img src="assets/75df06c2b75c3886ca3496a774730de7.jpg" alt="img" /></p>
</div>
</div>
<div>
<div style="float: left">
<a href="/专栏/Kafka核心技术与实战/41 Kafka Streams DSL开发实例.md.html">上一页</a>
</div>
<div style="float: right">
<a href="/专栏/Kafka核心技术与实战/加餐 搭建开发环境、阅读源码方法、经典学习资料大揭秘.md.html">下一页</a>
</div>
</div>
</div>
</div>
</div>
</div>
<a class="off-canvas-overlay" onclick="hide_canvas()"></a>
</div>
<script defer src="https://static.cloudflareinsights.com/beacon.min.js/v652eace1692a40cfa3763df669d7439c1639079717194" integrity="sha512-Gi7xpJR8tSkrpF7aordPZQlW2DLtzUlZcumS8dMQjwDHEnw9I7ZLyiOj/6tZStRBGtGgN6ceN6cMH8z7etPGlw==" data-cf-beacon='{"rayId":"709972272b103d60","version":"2021.12.0","r":1,"token":"1f5d475227ce4f0089a7cff1ab17c0f5","si":100}' crossorigin="anonymous"></script>
</body>
<!-- Global site tag (gtag.js) - Google Analytics -->
<script async src="https://www.googletagmanager.com/gtag/js?id=G-NPSEEVD756"></script>
<script>
window.dataLayer = window.dataLayer || [];
function gtag() {
dataLayer.push(arguments);
}
gtag('js', new Date());
gtag('config', 'G-NPSEEVD756');
var path = window.location.pathname
var cookie = getCookie("lastPath");
console.log(path)
if (path.replace("/", "") === "") {
if (cookie.replace("/", "") !== "") {
console.log(cookie)
document.getElementById("tip").innerHTML = "<a href='" + cookie + "'>跳转到上次进度</a>"
}
} else {
setCookie("lastPath", path)
}
function setCookie(cname, cvalue) {
var d = new Date();
d.setTime(d.getTime() + (180 * 24 * 60 * 60 * 1000));
var expires = "expires=" + d.toGMTString();
document.cookie = cname + "=" + cvalue + "; " + expires + ";path = /";
}
function getCookie(cname) {
var name = cname + "=";
var ca = document.cookie.split(';');
for (var i = 0; i < ca.length; i++) {
var c = ca[i].trim();
if (c.indexOf(name) === 0) return c.substring(name.length, c.length);
}
return "";
}
</script>
</html>