learn.lianglianglee.com/专栏/ShardingSphere 核心原理精讲-完/31 配置中心:如何基于配置中心实现配置信息的动态化管理?.md.html
2022-05-11 18:57:05 +08:00

1717 lines
37 KiB
HTML
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<!DOCTYPE html>
<!-- saved from url=(0046)https://kaiiiz.github.io/hexo-theme-book-demo/ -->
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1.0, user-scalable=no">
<link rel="icon" href="/static/favicon.png">
<title>31 配置中心:如何基于配置中心实现配置信息的动态化管理?.md.html</title>
<!-- Spectre.css framework -->
<link rel="stylesheet" href="/static/index.css">
<!-- theme css & js -->
<meta name="generator" content="Hexo 4.2.0">
</head>
<body>
<div class="book-container">
<div class="book-sidebar">
<div class="book-brand">
<a href="/">
<img src="/static/favicon.png">
<span>技术文章摘抄</span>
</a>
</div>
<div class="book-menu uncollapsible">
<ul class="uncollapsible">
<li><a href="/" class="current-tab">首页</a></li>
</ul>
<ul class="uncollapsible">
<li><a href="../">上一级</a></li>
</ul>
<ul class="uncollapsible">
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/00 如何正确学习一款分库分表开源框架?.md.html">00 如何正确学习一款分库分表开源框架?.md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/01 从理论到实践:如何让分库分表真正落地?.md.html">01 从理论到实践:如何让分库分表真正落地?.md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/02 顶级项目ShardingSphere 是一款什么样的 Apache 开源软件?.md.html">02 顶级项目ShardingSphere 是一款什么样的 Apache 开源软件?.md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/03 规范兼容JDBC 规范与 ShardingSphere 是什么关系?.md.html">03 规范兼容JDBC 规范与 ShardingSphere 是什么关系?.md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/04 应用集成:在业务系统中使用 ShardingSphere 的方式有哪些?.md.html">04 应用集成:在业务系统中使用 ShardingSphere 的方式有哪些?.md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/05 配置驱动ShardingSphere 中的配置体系是如何设计的?.md.html">05 配置驱动ShardingSphere 中的配置体系是如何设计的?.md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/06 数据分片:如何实现分库、分表、分库+分表以及强制路由?(上).md.html">06 数据分片:如何实现分库、分表、分库+分表以及强制路由?(上).md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/07 数据分片:如何实现分库、分表、分库+分表以及强制路由?(下).md.html">07 数据分片:如何实现分库、分表、分库+分表以及强制路由?(下).md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/08 读写分离:如何集成分库分表+数据库主从架构?.md.html">08 读写分离:如何集成分库分表+数据库主从架构?.md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/09 分布式事务:如何使用强一致性事务与柔性事务?.md.html">09 分布式事务:如何使用强一致性事务与柔性事务?.md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/10 数据脱敏:如何确保敏感数据的安全访问?.md.html">10 数据脱敏:如何确保敏感数据的安全访问?.md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/11 编排治理:如何实现分布式环境下的动态配置管理?.md.html">11 编排治理:如何实现分布式环境下的动态配置管理?.md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/12 从应用到原理:如何高效阅读 ShardingSphere 源码?.md.html">12 从应用到原理:如何高效阅读 ShardingSphere 源码?.md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/13 微内核架构ShardingSphere 如何实现系统的扩展性?.md.html">13 微内核架构ShardingSphere 如何实现系统的扩展性?.md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/14 分布式主键ShardingSphere 中有哪些分布式主键实现方式?.md.html">14 分布式主键ShardingSphere 中有哪些分布式主键实现方式?.md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/15 解析引擎SQL 解析流程应该包括哪些核心阶段?(上).md.html">15 解析引擎SQL 解析流程应该包括哪些核心阶段?(上).md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/16 解析引擎SQL 解析流程应该包括哪些核心阶段?(下).md.html">16 解析引擎SQL 解析流程应该包括哪些核心阶段?(下).md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/17 路由引擎:如何理解分片路由核心类 ShardingRouter 的运作机制?.md.html">17 路由引擎:如何理解分片路由核心类 ShardingRouter 的运作机制?.md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/18 路由引擎:如何实现数据访问的分片路由和广播路由?.md.html">18 路由引擎:如何实现数据访问的分片路由和广播路由?.md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/19 路由引擎:如何在路由过程中集成多种路由策略和路由算法?.md.html">19 路由引擎:如何在路由过程中集成多种路由策略和路由算法?.md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/20 改写引擎:如何理解装饰器模式下的 SQL 改写实现机制?.md.html">20 改写引擎:如何理解装饰器模式下的 SQL 改写实现机制?.md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/21 执行引擎:分片环境下 SQL 执行的整体流程应该如何进行抽象?.md.html">21 执行引擎:分片环境下 SQL 执行的整体流程应该如何进行抽象?.md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/22 执行引擎:如何把握 ShardingSphere 中的 Executor 执行模型?(上).md.html">22 执行引擎:如何把握 ShardingSphere 中的 Executor 执行模型?(上).md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/23 执行引擎:如何把握 ShardingSphere 中的 Executor 执行模型?(下).md.html">23 执行引擎:如何把握 ShardingSphere 中的 Executor 执行模型?(下).md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/24 归并引擎:如何理解数据归并的类型以及简单归并策略的实现过程?.md.html">24 归并引擎:如何理解数据归并的类型以及简单归并策略的实现过程?.md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/25 归并引擎:如何理解流式归并和内存归并在复杂归并场景下的应用方式?.md.html">25 归并引擎:如何理解流式归并和内存归并在复杂归并场景下的应用方式?.md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/26 读写分离:普通主从架构和分片主从架构分别是如何实现的?.md.html">26 读写分离:普通主从架构和分片主从架构分别是如何实现的?.md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/27 分布式事务:如何理解 ShardingSphere 中对分布式事务的抽象过程?.md.html">27 分布式事务:如何理解 ShardingSphere 中对分布式事务的抽象过程?.md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/28 分布式事务ShardingSphere 中如何集成强一致性事务和柔性事务支持?(上).md.html">28 分布式事务ShardingSphere 中如何集成强一致性事务和柔性事务支持?(上).md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/29 分布式事务ShardingSphere 中如何集成强一致性事务和柔性事务支持?(下).md.html">29 分布式事务ShardingSphere 中如何集成强一致性事务和柔性事务支持?(下).md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/30 数据脱敏:如何基于改写引擎实现低侵入性数据脱敏方案?.md.html">30 数据脱敏:如何基于改写引擎实现低侵入性数据脱敏方案?.md.html</a>
</li>
<li>
<a class="current-tab" href="/专栏/ShardingSphere 核心原理精讲-完/31 配置中心:如何基于配置中心实现配置信息的动态化管理?.md.html">31 配置中心:如何基于配置中心实现配置信息的动态化管理?.md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/32 注册中心:如何基于注册中心实现数据库访问熔断机制?.md.html">32 注册中心:如何基于注册中心实现数据库访问熔断机制?.md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/33 链路跟踪:如何基于 Hook 机制以及 OpenTracing 协议实现数据访问链路跟踪?.md.html">33 链路跟踪:如何基于 Hook 机制以及 OpenTracing 协议实现数据访问链路跟踪?.md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/34 系统集成:如何完成 ShardingSphere 内核与 Spring+SpringBoot 的无缝整合?.md.html">34 系统集成:如何完成 ShardingSphere 内核与 Spring+SpringBoot 的无缝整合?.md.html</a>
</li>
<li>
<a href="/专栏/ShardingSphere 核心原理精讲-完/35 结语ShardingSphere 总结及展望.md.html">35 结语ShardingSphere 总结及展望.md.html</a>
</li>
</ul>
</div>
</div>
<div class="sidebar-toggle" onclick="sidebar_toggle()" onmouseover="add_inner()" onmouseleave="remove_inner()">
<div class="sidebar-toggle-inner"></div>
</div>
<script>
function add_inner() {
let inner = document.querySelector('.sidebar-toggle-inner')
inner.classList.add('show')
}
function remove_inner() {
let inner = document.querySelector('.sidebar-toggle-inner')
inner.classList.remove('show')
}
function sidebar_toggle() {
let sidebar_toggle = document.querySelector('.sidebar-toggle')
let sidebar = document.querySelector('.book-sidebar')
let content = document.querySelector('.off-canvas-content')
if (sidebar_toggle.classList.contains('extend')) { // show
sidebar_toggle.classList.remove('extend')
sidebar.classList.remove('hide')
content.classList.remove('extend')
} else { // hide
sidebar_toggle.classList.add('extend')
sidebar.classList.add('hide')
content.classList.add('extend')
}
}
function open_sidebar() {
let sidebar = document.querySelector('.book-sidebar')
let overlay = document.querySelector('.off-canvas-overlay')
sidebar.classList.add('show')
overlay.classList.add('show')
}
function hide_canvas() {
let sidebar = document.querySelector('.book-sidebar')
let overlay = document.querySelector('.off-canvas-overlay')
sidebar.classList.remove('show')
overlay.classList.remove('show')
}
</script>
<div class="off-canvas-content">
<div class="columns">
<div class="column col-12 col-lg-12">
<div class="book-navbar">
<!-- For Responsive Layout -->
<header class="navbar">
<section class="navbar-section">
<a onclick="open_sidebar()">
<i class="icon icon-menu"></i>
</a>
</section>
</header>
</div>
<div class="book-content" style="max-width: 960px; margin: 0 auto;
overflow-x: auto;
overflow-y: hidden;">
<div class="book-post">
<p id="tip" align="center"></p>
<div><h1>31 配置中心:如何基于配置中心实现配置信息的动态化管理?</h1>
<p>ShardingSphere 在编排治理方面包括配置动态化、注册中心、数据库熔断禁用、调用链路等治理能力。</p>
<p>今天我们先来介绍最简单的配置中心,即如何基于配置中心从而实现配置信息的动态化管理。</p>
<h3>ShardingSphere 中对配置中心的抽象过程</h3>
<p>配置中心的核心接口 ConfigCenter 位于 sharding-orchestration-config-api 工程中,定义如下:</p>
<pre><code>public interface ConfigCenter extends TypeBasedSPI {
//初始化配置中心
void init(ConfigCenterConfiguration config);
//获取配置项数据
String get(String key);
//直接获取配置项数据
String getDirectly(String key);
//是否存在配置项
boolean isExisted(String key);
//获取子配置项列表
List&lt;String&gt; getChildrenKeys(String key);
//持久化配置项
void persist(String key, String value);
//更新配置项
void update(String key, String value);
//持久化临时数据
void persistEphemeral(String key, String value);
//对配置项或路径进行监听
void watch(String key, DataChangedEventListener dataChangedEventListener);
//关闭配置中心
void close();
}
</code></pre>
<p>上述方法中,唯一值得展开的就是 watch 方法,该方法传入了一个代表事件监听器的 DataChangedEventListener 接口,如下所示:</p>
<pre><code>public interface DataChangedEventListener {
//当数据变动时进行触发
void onChange(DataChangedEvent dataChangedEvent);
}
</code></pre>
<p>这里用到的 DataChangedEvent 类定义如下,可以看到事件的类型有三种,分别是 UPDATED、DELETED 和 IGNORED</p>
<pre><code>public final class DataChangedEvent {
private final String key;
private final String value;
private final ChangedType changedType;
public enum ChangedType {
UPDATED, DELETED, IGNORED
}
}
</code></pre>
<p>我们同样注意到 ConfigCenter 接口继承了 TypeBasedSPI 接口,所以集成了 SPI 机制。在 ShardingSphere 中ConfigCenter 接口有两个实现类,分别基于 Apollo 的 ApolloConfigCenter 和基于 Zookeeper 的 CuratorZookeeperConfigCenter。</p>
<p>我们分别展开讲解一下。</p>
<h3>ApolloConfigCenter</h3>
<h4>1.ApolloConfigCenter 的实现过程</h4>
<p>我们先来看基于 Apollo 的 ApolloConfigCenter它的 init 方法如下所示:</p>
<pre><code>@Override
public void init(final ConfigCenterConfiguration config) {
//从配置对象中获取配置信息并设置系统属性
System.getProperties().setProperty(&quot;app.id&quot;, properties.getProperty(&quot;appId&quot;, &quot;APOLLO_SHARDING_SPHERE&quot;));
System.getProperties().setProperty(&quot;env&quot;, properties.getProperty(&quot;env&quot;, &quot;DEV&quot;));
System.getProperties().setProperty(ConfigConsts.APOLLO_CLUSTER_KEY, properties.getProperty(&quot;clusterName&quot;, ConfigConsts.CLUSTER_NAME_DEFAULT));
System.getProperties().setProperty(ConfigConsts.APOLLO_META_KEY, config.getServerLists());
//通过配置对象构建 ApolloConfig
apolloConfig = ConfigService.getConfig(config.getNamespace());
}
</code></pre>
<p>上述 init 方法的作用是在设置系统属性的同时,构建一个 Config 对象。在 Apollo 中,基于这个 Config 对象就可以实现对配置项的操作,例如:</p>
<pre><code>@Override
public String get(final String key) {
return apolloConfig.getProperty(key.replace(&quot;/&quot;, &quot;.&quot;), &quot;&quot;);
}
@Override
public String getDirectly(final String key) {
return get(key);
}
@Override
public boolean isExisted(final String key) {
return !Strings.isNullOrEmpty(get(key));
}
</code></pre>
<p>注意这里的 getDirectly 方法和 get 方法的处理方式实际上是一致的。而对于 Apollo 而言getChildrenKeys、persist、update 和 persistEphemeral 等方法都是无效的因为不支持这样的操作。但是对于常见的监听机制Apollo 也提供了它的实现方案,可以通过对 Config 对象添加 ChangeListener 来实现监听效果,如下所示:</p>
<pre><code>@Override
public void watch(final String key, final DataChangedEventListener dataChangedEventListener) {
//添加 Apollo 中的监听器
apolloConfig.addChangeListener(new ConfigChangeListener() {
@Override
public void onChange(final ConfigChangeEvent changeEvent) {
for (String key : changeEvent.changedKeys()) {
//获取 Apollo 监听事件
ConfigChange change = changeEvent.getChange(key);
DataChangedEvent.ChangedType changedType = getChangedType(change.getChangeType());
if (DataChangedEvent.ChangedType.IGNORED != changedType) {
//将 Apollo 中的监听事件转化为 ShardingSphere 中的监听事件
//通过 EventListener 触发事件
dataChangedEventListener.onChange(new DataChangedEvent(key, change.getNewValue(), changedType));
}
}
}
}, Sets.newHashSet(key));
}
</code></pre>
<p>上述代码的逻辑在于当事件被 Apollo 监听,并触发上述 watch 方法时,我们会将 Apollo 中的事件类型转化为 ShardingSphere 中自身的事件类型,并通过 DataChangedEventListener 进行传播和处理。</p>
<h4>2.ShardingSphere 中的事件驱动架构</h4>
<p>讲到 DataChangedEventListener我们不得不对 ShardingSphere 中的事件驱动框架做一些展开。</p>
<p>显然从命名上看DataChangedEventListener 是一种事件监听器,用于监听各种 DataChangedEvent。</p>
<p>注意到 ShardingSphere 并没有提供 DataChangedEventListener 接口的任何实现类,而是大量采用了匿名方法进行事件的监听,一种典型的实现方式如下所示:</p>
<pre><code>new DataChangedEventListener() {
@Override
public void onChange(final DataChangedEvent dataChangedEvent) {
//通过 EventBus 发布事件
eventBus.post(createXXXEvent(dataChangedEvent));
}
}
});
</code></pre>
<p>在通过 DataChangedEventListener 监听到某一个 DataChangedEvent 并进行传播时ShardingSphere 的处理过程就是通过 EventBus 类的 post 方法将事件进行进一步转发。这里使用的 EventBus 同样来自 Google 的 Guava 框架,代表了一种事件总线的实现方式。</p>
<p>现在,事件已经可以通过 EventBus 进行发送了,那么这些被发送的事件是怎么被消费的呢?在 ShardingSphere 中,存在一个 ShardingOrchestrationEventBus 包装类,包装了对 EventBus 的使用过程。</p>
<p>这个包装过程非常简单,只是使用单例模式构建了一个 EventBus 对象而已,如下所示:</p>
<pre><code>public final class ShardingOrchestrationEventBus {
private static final EventBus INSTANCE = new EventBus();
//使用单例模式构建单例对象
public static EventBus getInstance() {
return INSTANCE;
}
}
</code></pre>
<p>如果我们想要订阅通过 EventBus 发送的事件,只要把自身注册到 EventBus 上即可,可以直接通过 EventBus 提供的 register 方法实现这一目标,如下所示:</p>
<pre><code>ShardingOrchestrationEventBus.getInstance().register(this);
</code></pre>
<p>另一方面,在 Guava 的 EventBus 机制中,提供了 @Subscribe 注解用来标识对具体某一种事件的处理方法。一旦在某个方法上添加了 @Subscribe 注解,这个方法就可以自动用来处理所传入的事件。</p>
<p>所以,我们进一步总结事件订阅者的代码结构,可以得到如下所示的伪代码:</p>
<pre><code>public class Subscriber {
public Subscriber(…) {
//将自己注册到 EventBus 中
ShardingOrchestrationEventBus.getInstance().register(this);
}
@Subscribe
public void renew(DataSourceChangedEvent dataSourceChangedEvent){
//消费事件
}
}
</code></pre>
<p>可以想象ShardingSphere 中势必存在一批符合上述代码结构的实现类,用于监听配置中心所产生的配置信息变更事件。以如下所示的 LogicSchema 类为例,我们可以看到它的实现过程就是很典型的一种事件订阅者:</p>
<pre><code>@Getter
public abstract class LogicSchema {
public LogicSchema(final String name, final Map&lt;String, YamlDataSourceParameter&gt; dataSources) {
ShardingOrchestrationEventBus.getInstance().register(this);
}
@Subscribe
public final synchronized void renew(final DataSourceChangedEvent dataSourceChangedEvent) throws Exception {
if (!name.equals(dataSourceChangedEvent.getShardingSchemaName())) {
return;
}
//根据 DataSourceChangedEvent 更新 DataSource 的配置
backendDataSource.renew(DataSourceConverter.getDataSourceParameterMap(dataSourceChangedEvent.getDataSourceConfigurations()));
}
}
</code></pre>
<p>上述 LogicSchema 类会根据 DataSourceChangedEvent 中携带的配置信息来更新DataSource的配置从而实现配置信息的动态化管理。</p>
<p>在介绍完 ApolloConfigCenter 之后,我们再来看一下 ShardingSphere 中另一种配置中心的实现方式,即 CuratorZookeeperConfigCenter。</p>
<h3>CuratorZookeeperConfigCenter</h3>
<h4>1.Zookeeper 和 Curator 简介</h4>
<p>CuratorZookeeperConfigCenter 使用 Zookeeper 作为配置中心的服务组件。针对如何使用 Zookeeper业界也存在一些开源的客户端而在ShardingSphere 采用的是 Curator。</p>
<p>在介绍 CuratorZookeeperConfigCenter 之前,我们先来对 Zookeeper 和 Curator 做简要介绍。</p>
<ul>
<li><strong>Zookeeper</strong></li>
</ul>
<p>对于 Zookeeper 而言我们知道它有两个特性与分布式协调直接相关一个是会话机制一个是Watcher机制。</p>
<p>会话是客户端和服务器端的 TCP 连接,能够发送请求并接收监听器 Watcher 事件而Watcher 机制本质上就是分布式的回调。就类型而言,会话又可以分为<strong>短暂性会话</strong><strong>持久性会话</strong>两种,前者在会话断开的同时会自动删除会话对应的 ZNode而后者则不会。ZNode 的客户端关注 ZNode 发生的变化,一旦发生变化则回传消息到客户端,然后客户端的消息处理函数得到调用。在 Zookeeper 中,任何读操作都能够设置 Watcher。</p>
<ul>
<li><strong>Curator</strong></li>
</ul>
<p>在我们使用 Zookeeper 时,一般不使用它原生的 API而是倾向于采用客户端集成框架这其中最具代表性的就是 Curator。Curator 解决了很多 Zookeeper 客户端非常底层的细节开发工作,包括连接重试、反复注册 Watcher 和 NodeExistsException 异常等。</p>
<p>Curator 包含了几个包:其中 curator-framework 包提供了对 Zookeeper 底层 API 的一层封装curator-client 包则提供一些客户端的操作,例如重试策略等;而 curator-recipes 包封装了一些高级特性,如选举、分布式锁、分布式计数器等。</p>
<p>在使用 Curator 时,首先需要创建一个 CuratorFramework 客户端对象,这一过程可以使用 CuratorFrameworkFactory 工厂类进行完成。对于 CuratorFrameworkFactory 而言,我们一方面需要指定与 Zookeeper 的链接 URL connectString、会话超时时间 sessionTimeoutMs、连接创建超时时间 connectionTimeoutMs以及重试策略 retryPolicy另一方面也可以根据需要设置安全认证信息。</p>
<p>一旦我们获取了 CuratorFramework 对象,就可以调用它的 start 方法启动客户端,然后通过 create/delete 来创建和删除节点,通过 getData/setData 方法获取,以及设置对应节点中的数据。当然,最为重要的是我们可以在节点上添加监听器。</p>
<p>接下来就让我们一起看一下 ShardingSphere 中如何使用 Curator 完成与 Zookeeper 的集成方法。</p>
<h4>2.CuratorZookeeperConfigCenter 的实现过程</h4>
<p>在 ShardingSphere 中,使用 CuratorFrameworkFactory 创建 CuratorFramework 客户端对象的过程如下所示:</p>
<pre><code>private CuratorFramework buildCuratorClient(final ConfigCenterConfiguration config) {
//构建 CuratorFrameworkFactory 并设置连接属性
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(config.getServerLists())
.retryPolicy(new ExponentialBackoffRetry(config.getRetryIntervalMilliseconds(), config.getMaxRetries(), config.getRetryIntervalMilliseconds() * config.getMaxRetries()))
.namespace(config.getNamespace());
if (0 != config.getTimeToLiveSeconds()) {
builder.sessionTimeoutMs(config.getTimeToLiveSeconds() * 1000);
}
if (0 != config.getOperationTimeoutMilliseconds()) {
builder.connectionTimeoutMs(config.getOperationTimeoutMilliseconds());
}
//设置安全摘要信息
if (!Strings.isNullOrEmpty(config.getDigest())) {
builder.authorization(&quot;digest&quot;, config.getDigest().getBytes(Charsets.UTF_8))
.aclProvider(new ACLProvider() {
@Override
public List&lt;ACL&gt; getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
@Override
public List&lt;ACL&gt; getAclForPath(final String path) {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
});
}
return builder.build();
}
</code></pre>
<p>上述代码相对比较固化,我们可以直接在自己的应用程序中进行借鉴和参考。</p>
<p>然后我们来看它的 persist 方法,如下所示:</p>
<pre><code>@Override
public void persist(final String key, final String value) {
try {
if (!isExisted(key)) {
//创建持久化节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(Charsets.UTF_8));
} else {
update(key, value);
}
} catch (final Exception ex) {
CuratorZookeeperExceptionHandler.handleException(ex);
}
}
</code></pre>
<p>这里使用了 CreateMode.PERSISTENT 模式来创建接口,也就是说创建的是一种持久化节点。而另一个 persistEphemeral 方法中,则通过设置 CreateMode.EPHEMERAL 来创建临时节点。</p>
<p>如下所示的 update 方法也值得一看,我们看到了如何基于 Curator 实现在事务中更新数据的具体实现方法:</p>
<pre><code>@Override
public void update(final String key, final String value) {
try {
//在事务中更新数据
client.inTransaction().check().forPath(key).and().setData().forPath(key, value.getBytes(Charsets.UTF_8)).and().commit();
} catch (final Exception ex) {
CuratorZookeeperExceptionHandler.handleException(ex);
}
}
</code></pre>
<p>然后,我们来到获取数据的 get 方法,如下所示:</p>
<pre><code>@Override
public String get(final String key) {
//先通过缓存获取数据,如果没有则通过 getDirectly 直接获取数据
TreeCache cache = findTreeCache(key);
if (null == cache) {
return getDirectly(key);
}
ChildData resultInCache = cache.getCurrentData(key);
if (null != resultInCache) {
return null == resultInCache.getData() ? null : new String(resultInCache.getData(), Charsets.UTF_8);
}
return getDirectly(key);
}
</code></pre>
<p>注意到在这个 get 方法中ShardingSphere 使用了缓存机制来提升数据获取的效率。如果缓存没有命中,才会调用 getDirectly 方法来直接从 Zookeeper 中获取数据。</p>
<p>最后,让我们来到最为关键的 watch 方法,该方法如下所示:</p>
<pre><code>@Override
public void watch(final String key, final DataChangedEventListener dataChangedEventListener) {
final String path = key + &quot;/&quot;;
if (!caches.containsKey(path)) {
addCacheData(key);
}
TreeCache cache = caches.get(path);
//添加 Zookeeper 监听器
cache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws UnsupportedEncodingException {
//获取 Zookeeper 监听事件
ChildData data = event.getData();
if (null == data || null == data.getPath()) {
return;
}
//将 Zookeeper 中的监听事件转化为 ShardingSphere 中的监听事件
//通过 EventListener 触发事件
DataChangedEvent.ChangedType changedType = getChangedType(event);
if (DataChangedEvent.ChangedType.IGNORED != changedType) {
dataChangedEventListener.onChange(new DataChangedEvent(data.getPath(), null == data.getData() ? null : new String(data.getData(), &quot;UTF-8&quot;), changedType));
}
}
});
}
</code></pre>
<p>可以看到watch 方法的最终处理结果也是将 Zookeeper 中的监听事件转化为 ShardingSphere 中的监听事件,并通过 EventListener 触发事件。这个过程我们已经在介绍 ApolloConfigCenter 时做了展开。</p>
<h3>从源码解析到日常开发</h3>
<p>今天我们介绍的很多内容实际上也可以应用到日常开发过程中,包括如何基于 Apollo 以及 Zookeeper 这两款典型的配置中心实现工具,来进行配置信息的存储和监听。我们完全可以根据自身的需求,将应用场景和范围从配置中心扩大到各种需要进行动态化管理的业务数据,而基于这两款工具实现这一目标的实现细节,我们都可以直接进行参考和借鉴。</p>
<h3>小结与预告</h3>
<p>本课时关注于 ShardingSphere 中对配置中心的抽象和实现过程。配置中心的核心机制是需要实现配置信息的动态化加载,而 Apollo 和 Zookeeper 都提供了监听机制来实现这一目标。ShardingSphere 通过集成这两款主流的开源工具,以及 Guava 框架中的 EventBus 工具类实现了从事件监听到订阅消费的整个事件驱动架构。</p>
<p>这里给你留一道思考题ShardingSphere 是如何将 Apollo 以及 Zookeeper 中的事件生成和监听机制抽象成一套统一的事件驱动架构的?欢迎你在留言区与大家讨论,我将逐一点评解答。</p>
<p>配置中心和注册中心在实现上存在一定的相似性,但又面向不同的应用场景。下一课时,我们将介绍 ShardingSphere 中的注册中心的实现机制和应用场景。</p>
</div>
</div>
<div>
<div style="float: left">
<a href="/专栏/ShardingSphere 核心原理精讲-完/30 数据脱敏:如何基于改写引擎实现低侵入性数据脱敏方案?.md.html">上一页</a>
</div>
<div style="float: right">
<a href="/专栏/ShardingSphere 核心原理精讲-完/32 注册中心:如何基于注册中心实现数据库访问熔断机制?.md.html">下一页</a>
</div>
</div>
</div>
</div>
</div>
</div>
<a class="off-canvas-overlay" onclick="hide_canvas()"></a>
</div>
<script defer src="https://static.cloudflareinsights.com/beacon.min.js/v652eace1692a40cfa3763df669d7439c1639079717194" integrity="sha512-Gi7xpJR8tSkrpF7aordPZQlW2DLtzUlZcumS8dMQjwDHEnw9I7ZLyiOj/6tZStRBGtGgN6ceN6cMH8z7etPGlw==" data-cf-beacon='{"rayId":"709974e5faf83d60","version":"2021.12.0","r":1,"token":"1f5d475227ce4f0089a7cff1ab17c0f5","si":100}' crossorigin="anonymous"></script>
</body>
<!-- Global site tag (gtag.js) - Google Analytics -->
<script async src="https://www.googletagmanager.com/gtag/js?id=G-NPSEEVD756"></script>
<script>
window.dataLayer = window.dataLayer || [];
function gtag() {
dataLayer.push(arguments);
}
gtag('js', new Date());
gtag('config', 'G-NPSEEVD756');
var path = window.location.pathname
var cookie = getCookie("lastPath");
console.log(path)
if (path.replace("/", "") === "") {
if (cookie.replace("/", "") !== "") {
console.log(cookie)
document.getElementById("tip").innerHTML = "<a href='" + cookie + "'>跳转到上次进度</a>"
}
} else {
setCookie("lastPath", path)
}
function setCookie(cname, cvalue) {
var d = new Date();
d.setTime(d.getTime() + (180 * 24 * 60 * 60 * 1000));
var expires = "expires=" + d.toGMTString();
document.cookie = cname + "=" + cvalue + "; " + expires + ";path = /";
}
function getCookie(cname) {
var name = cname + "=";
var ca = document.cookie.split(';');
for (var i = 0; i < ca.length; i++) {
var c = ca[i].trim();
if (c.indexOf(name) === 0) return c.substring(name.length, c.length);
}
return "";
}
</script>
</html>