learn.lianglianglee.com/文章/MySQL 亿级别数据迁移实战代码分享.md.html
2022-05-11 19:04:14 +08:00

1051 lines
60 KiB
HTML
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<!DOCTYPE html>
<!-- saved from url=(0046)https://kaiiiz.github.io/hexo-theme-book-demo/ -->
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1.0, user-scalable=no">
<link rel="icon" href="/static/favicon.png">
<title>MySQL 亿级别数据迁移实战代码分享.md.html</title>
<!-- Spectre.css framework -->
<link rel="stylesheet" href="/static/index.css">
<!-- theme css & js -->
<meta name="generator" content="Hexo 4.2.0">
</head>
<body>
<div class="book-container">
<div class="book-sidebar">
<div class="book-brand">
<a href="/">
<img src="/static/favicon.png">
<span>技术文章摘抄</span>
</a>
</div>
<div class="book-menu uncollapsible">
<ul class="uncollapsible">
<li><a href="/" class="current-tab">首页</a></li>
</ul>
<ul class="uncollapsible">
<li><a href="../">上一级</a></li>
</ul>
<ul class="uncollapsible">
<li>
<a href="/文章/AQS 万字图文全面解析.md.html">AQS 万字图文全面解析.md.html</a>
</li>
<li>
<a href="/文章/Docker 镜像构建原理及源码分析.md.html">Docker 镜像构建原理及源码分析.md.html</a>
</li>
<li>
<a href="/文章/ElasticSearch 小白从入门到精通.md.html">ElasticSearch 小白从入门到精通.md.html</a>
</li>
<li>
<a href="/文章/JVM CPU Profiler技术原理及源码深度解析.md.html">JVM CPU Profiler技术原理及源码深度解析.md.html</a>
</li>
<li>
<a href="/文章/JVM 垃圾收集器.md.html">JVM 垃圾收集器.md.html</a>
</li>
<li>
<a href="/文章/JVM 面试的 30 个知识点.md.html">JVM 面试的 30 个知识点.md.html</a>
</li>
<li>
<a href="/文章/Java IO 体系、线程模型大总结.md.html">Java IO 体系、线程模型大总结.md.html</a>
</li>
<li>
<a href="/文章/Java NIO浅析.md.html">Java NIO浅析.md.html</a>
</li>
<li>
<a href="/文章/Java 面试题集锦(网络篇).md.html">Java 面试题集锦(网络篇).md.html</a>
</li>
<li>
<a href="/文章/Java-直接内存 DirectMemory 详解.md.html">Java-直接内存 DirectMemory 详解.md.html</a>
</li>
<li>
<a href="/文章/Java中9种常见的CMS GC问题分析与解决.md.html">Java中9种常见的CMS GC问题分析与解决.md.html</a>
</li>
<li>
<a href="/文章/Java中9种常见的CMS GC问题分析与解决.md.html">Java中9种常见的CMS GC问题分析与解决.md.html</a>
</li>
<li>
<a href="/文章/Java中的SPI.md.html">Java中的SPI.md.html</a>
</li>
<li>
<a href="/文章/Java中的ThreadLocal.md.html">Java中的ThreadLocal.md.html</a>
</li>
<li>
<a href="/文章/Java线程池实现原理及其在美团业务中的实践.md.html">Java线程池实现原理及其在美团业务中的实践.md.html</a>
</li>
<li>
<a href="/文章/Java魔法类Unsafe应用解析.md.html">Java魔法类Unsafe应用解析.md.html</a>
</li>
<li>
<a href="/文章/Kafka 源码阅读笔记.md.html">Kafka 源码阅读笔记.md.html</a>
</li>
<li>
<a href="/文章/Kafka、ActiveMQ、RabbitMQ、RocketMQ 区别以及高可用原理.md.html">Kafka、ActiveMQ、RabbitMQ、RocketMQ 区别以及高可用原理.md.html</a>
</li>
<li>
<a href="/文章/MySQL · 引擎特性 · InnoDB Buffer Pool.md.html">MySQL · 引擎特性 · InnoDB Buffer Pool.md.html</a>
</li>
<li>
<a href="/文章/MySQL · 引擎特性 · InnoDB IO子系统.md.html">MySQL · 引擎特性 · InnoDB IO子系统.md.html</a>
</li>
<li>
<a href="/文章/MySQL · 引擎特性 · InnoDB 事务系统.md.html">MySQL · 引擎特性 · InnoDB 事务系统.md.html</a>
</li>
<li>
<a href="/文章/MySQL · 引擎特性 · InnoDB 同步机制.md.html">MySQL · 引擎特性 · InnoDB 同步机制.md.html</a>
</li>
<li>
<a href="/文章/MySQL · 引擎特性 · InnoDB 数据页解析.md.html">MySQL · 引擎特性 · InnoDB 数据页解析.md.html</a>
</li>
<li>
<a href="/文章/MySQL · 引擎特性 · InnoDB崩溃恢复.md.html">MySQL · 引擎特性 · InnoDB崩溃恢复.md.html</a>
</li>
<li>
<a href="/文章/MySQL · 引擎特性 · 临时表那些事儿.md.html">MySQL · 引擎特性 · 临时表那些事儿.md.html</a>
</li>
<li>
<a href="/文章/MySQL 主从复制 半同步复制.md.html">MySQL 主从复制 半同步复制.md.html</a>
</li>
<li>
<a href="/文章/MySQL 主从复制 基于GTID复制.md.html">MySQL 主从复制 基于GTID复制.md.html</a>
</li>
<li>
<a href="/文章/MySQL 主从复制.md.html">MySQL 主从复制.md.html</a>
</li>
<li>
<a href="/文章/MySQL 事务日志(redo log和undo log).md.html">MySQL 事务日志(redo log和undo log).md.html</a>
</li>
<li>
<a class="current-tab" href="/文章/MySQL 亿级别数据迁移实战代码分享.md.html">MySQL 亿级别数据迁移实战代码分享.md.html</a>
</li>
<li>
<a href="/文章/MySQL 从一条数据说起-InnoDB行存储数据结构.md.html">MySQL 从一条数据说起-InnoDB行存储数据结构.md.html</a>
</li>
<li>
<a href="/文章/MySQL 地基基础:事务和锁的面纱.md.html">MySQL 地基基础:事务和锁的面纱.md.html</a>
</li>
<li>
<a href="/文章/MySQL 地基基础:数据字典.md.html">MySQL 地基基础:数据字典.md.html</a>
</li>
<li>
<a href="/文章/MySQL 地基基础:数据库字符集.md.html">MySQL 地基基础:数据库字符集.md.html</a>
</li>
<li>
<a href="/文章/MySQL 性能优化:碎片整理.md.html">MySQL 性能优化:碎片整理.md.html</a>
</li>
<li>
<a href="/文章/MySQL 故障诊断:一个 ALTER TALBE 执行了很久,你慌不慌?.md.html">MySQL 故障诊断:一个 ALTER TALBE 执行了很久,你慌不慌?.md.html</a>
</li>
<li>
<a href="/文章/MySQL 故障诊断:如何在日志中轻松定位大事务.md.html">MySQL 故障诊断:如何在日志中轻松定位大事务.md.html</a>
</li>
<li>
<a href="/文章/MySQL 故障诊断:教你快速定位加锁的 SQL.md.html">MySQL 故障诊断:教你快速定位加锁的 SQL.md.html</a>
</li>
<li>
<a href="/文章/MySQL 日志详解.md.html">MySQL 日志详解.md.html</a>
</li>
<li>
<a href="/文章/MySQL 的半同步是什么?.md.html">MySQL 的半同步是什么?.md.html</a>
</li>
<li>
<a href="/文章/MySQL中的事务和MVCC.md.html">MySQL中的事务和MVCC.md.html</a>
</li>
<li>
<a href="/文章/MySQL事务_事务隔离级别详解.md.html">MySQL事务_事务隔离级别详解.md.html</a>
</li>
<li>
<a href="/文章/MySQL优化优化 select count().md.html">MySQL优化优化 select count().md.html</a>
</li>
<li>
<a href="/文章/MySQL共享锁、排他锁、悲观锁、乐观锁.md.html">MySQL共享锁、排他锁、悲观锁、乐观锁.md.html</a>
</li>
<li>
<a href="/文章/MySQL的MVCC多版本并发控制.md.html">MySQL的MVCC多版本并发控制.md.html</a>
</li>
<li>
<a href="/文章/QingStor 对象存储架构设计及最佳实践.md.html">QingStor 对象存储架构设计及最佳实践.md.html</a>
</li>
<li>
<a href="/文章/RocketMQ 面试题集锦.md.html">RocketMQ 面试题集锦.md.html</a>
</li>
<li>
<a href="/文章/SnowFlake 雪花算法生成分布式 ID.md.html">SnowFlake 雪花算法生成分布式 ID.md.html</a>
</li>
<li>
<a href="/文章/Spring Boot 2.x 结合 k8s 实现分布式微服务架构.md.html">Spring Boot 2.x 结合 k8s 实现分布式微服务架构.md.html</a>
</li>
<li>
<a href="/文章/Spring Boot 教程:如何开发一个 starter.md.html">Spring Boot 教程:如何开发一个 starter.md.html</a>
</li>
<li>
<a href="/文章/Spring MVC 原理.md.html">Spring MVC 原理.md.html</a>
</li>
<li>
<a href="/文章/Spring MyBatis和Spring整合的奥秘.md.html">Spring MyBatis和Spring整合的奥秘.md.html</a>
</li>
<li>
<a href="/文章/Spring 帮助你更好的理解Spring循环依赖.md.html">Spring 帮助你更好的理解Spring循环依赖.md.html</a>
</li>
<li>
<a href="/文章/Spring 循环依赖及解决方式.md.html">Spring 循环依赖及解决方式.md.html</a>
</li>
<li>
<a href="/文章/Spring中眼花缭乱的BeanDefinition.md.html">Spring中眼花缭乱的BeanDefinition.md.html</a>
</li>
<li>
<a href="/文章/Vert.x 基础入门.md.html">Vert.x 基础入门.md.html</a>
</li>
<li>
<a href="/文章/eBay 的 Elasticsearch 性能调优实践.md.html">eBay 的 Elasticsearch 性能调优实践.md.html</a>
</li>
<li>
<a href="/文章/不可不说的Java“锁”事.md.html">不可不说的Java“锁”事.md.html</a>
</li>
<li>
<a href="/文章/互联网并发限流实战.md.html">互联网并发限流实战.md.html</a>
</li>
<li>
<a href="/文章/从ReentrantLock的实现看AQS的原理及应用.md.html">从ReentrantLock的实现看AQS的原理及应用.md.html</a>
</li>
<li>
<a href="/文章/从SpringCloud开始聊微服务架构.md.html">从SpringCloud开始聊微服务架构.md.html</a>
</li>
<li>
<a href="/文章/全面了解 JDK 线程池实现原理.md.html">全面了解 JDK 线程池实现原理.md.html</a>
</li>
<li>
<a href="/文章/分布式一致性理论与算法.md.html">分布式一致性理论与算法.md.html</a>
</li>
<li>
<a href="/文章/分布式一致性算法 Raft.md.html">分布式一致性算法 Raft.md.html</a>
</li>
<li>
<a href="/文章/分布式唯一 ID 解析.md.html">分布式唯一 ID 解析.md.html</a>
</li>
<li>
<a href="/文章/分布式链路追踪:集群管理设计.md.html">分布式链路追踪:集群管理设计.md.html</a>
</li>
<li>
<a href="/文章/动态代理种类及原理,你知道多少?.md.html">动态代理种类及原理,你知道多少?.md.html</a>
</li>
<li>
<a href="/文章/响应式架构与 RxJava 在有赞零售的实践.md.html">响应式架构与 RxJava 在有赞零售的实践.md.html</a>
</li>
<li>
<a href="/文章/大数据算法——布隆过滤器.md.html">大数据算法——布隆过滤器.md.html</a>
</li>
<li>
<a href="/文章/如何优雅地记录操作日志?.md.html">如何优雅地记录操作日志?.md.html</a>
</li>
<li>
<a href="/文章/如何设计一个亿级消息量的 IM 系统.md.html">如何设计一个亿级消息量的 IM 系统.md.html</a>
</li>
<li>
<a href="/文章/异步网络模型.md.html">异步网络模型.md.html</a>
</li>
<li>
<a href="/文章/当我们在讨论CQRS时我们在讨论些神马.md.html">当我们在讨论CQRS时我们在讨论些神马.md.html</a>
</li>
<li>
<a href="/文章/彻底理解 MySQL 的索引机制.md.html">彻底理解 MySQL 的索引机制.md.html</a>
</li>
<li>
<a href="/文章/最全的 116 道 Redis 面试题解答.md.html">最全的 116 道 Redis 面试题解答.md.html</a>
</li>
<li>
<a href="/文章/有赞权限系统(SAM).md.html">有赞权限系统(SAM).md.html</a>
</li>
<li>
<a href="/文章/有赞零售中台建设方法的探索与实践.md.html">有赞零售中台建设方法的探索与实践.md.html</a>
</li>
<li>
<a href="/文章/服务注册与发现原理剖析Eureka、Zookeeper、Nacos.md.html">服务注册与发现原理剖析Eureka、Zookeeper、Nacos.md.html</a>
</li>
<li>
<a href="/文章/深入浅出Cache.md.html">深入浅出Cache.md.html</a>
</li>
<li>
<a href="/文章/深入理解 MySQL 底层实现.md.html">深入理解 MySQL 底层实现.md.html</a>
</li>
<li>
<a href="/文章/漫画讲解 git rebase VS git merge.md.html">漫画讲解 git rebase VS git merge.md.html</a>
</li>
<li>
<a href="/文章/生成浏览器唯一稳定 ID 的探索.md.html">生成浏览器唯一稳定 ID 的探索.md.html</a>
</li>
<li>
<a href="/文章/缓存 如何保证缓存与数据库的双写一致性?.md.html">缓存 如何保证缓存与数据库的双写一致性?.md.html</a>
</li>
<li>
<a href="/文章/网易严选怎么做全链路监控的?.md.html">网易严选怎么做全链路监控的?.md.html</a>
</li>
<li>
<a href="/文章/美团万亿级 KV 存储架构与实践.md.html">美团万亿级 KV 存储架构与实践.md.html</a>
</li>
<li>
<a href="/文章/美团点评Kubernetes集群管理实践.md.html">美团点评Kubernetes集群管理实践.md.html</a>
</li>
<li>
<a href="/文章/美团百亿规模API网关服务Shepherd的设计与实现.md.html">美团百亿规模API网关服务Shepherd的设计与实现.md.html</a>
</li>
<li>
<a href="/文章/解读《阿里巴巴 Java 开发手册》背后的思考.md.html">解读《阿里巴巴 Java 开发手册》背后的思考.md.html</a>
</li>
<li>
<a href="/文章/认识 MySQL 和 Redis 的数据一致性问题.md.html">认识 MySQL 和 Redis 的数据一致性问题.md.html</a>
</li>
<li>
<a href="/文章/进阶Dockerfile 高阶使用指南及镜像优化.md.html">进阶Dockerfile 高阶使用指南及镜像优化.md.html</a>
</li>
<li>
<a href="/文章/铁总在用的高性能分布式缓存计算框架 Geode.md.html">铁总在用的高性能分布式缓存计算框架 Geode.md.html</a>
</li>
<li>
<a href="/文章/阿里云PolarDB及其共享存储PolarFS技术实现分析.md.html">阿里云PolarDB及其共享存储PolarFS技术实现分析.md.html</a>
</li>
<li>
<a href="/文章/阿里云PolarDB及其共享存储PolarFS技术实现分析.md.html">阿里云PolarDB及其共享存储PolarFS技术实现分析.md.html</a>
</li>
<li>
<a href="/文章/面试最常被问的 Java 后端题.md.html">面试最常被问的 Java 后端题.md.html</a>
</li>
<li>
<a href="/文章/领域驱动设计在互联网业务开发中的实践.md.html">领域驱动设计在互联网业务开发中的实践.md.html</a>
</li>
<li>
<a href="/文章/领域驱动设计的菱形对称架构.md.html">领域驱动设计的菱形对称架构.md.html</a>
</li>
<li>
<a href="/文章/高效构建 Docker 镜像的最佳实践.md.html">高效构建 Docker 镜像的最佳实践.md.html</a>
</li>
</ul>
</div>
</div>
<div class="sidebar-toggle" onclick="sidebar_toggle()" onmouseover="add_inner()" onmouseleave="remove_inner()">
<div class="sidebar-toggle-inner"></div>
</div>
<script>
function add_inner() {
let inner = document.querySelector('.sidebar-toggle-inner')
inner.classList.add('show')
}
function remove_inner() {
let inner = document.querySelector('.sidebar-toggle-inner')
inner.classList.remove('show')
}
function sidebar_toggle() {
let sidebar_toggle = document.querySelector('.sidebar-toggle')
let sidebar = document.querySelector('.book-sidebar')
let content = document.querySelector('.off-canvas-content')
if (sidebar_toggle.classList.contains('extend')) { // show
sidebar_toggle.classList.remove('extend')
sidebar.classList.remove('hide')
content.classList.remove('extend')
} else { // hide
sidebar_toggle.classList.add('extend')
sidebar.classList.add('hide')
content.classList.add('extend')
}
}
function open_sidebar() {
let sidebar = document.querySelector('.book-sidebar')
let overlay = document.querySelector('.off-canvas-overlay')
sidebar.classList.add('show')
overlay.classList.add('show')
}
function hide_canvas() {
let sidebar = document.querySelector('.book-sidebar')
let overlay = document.querySelector('.off-canvas-overlay')
sidebar.classList.remove('show')
overlay.classList.remove('show')
}
</script>
<div class="off-canvas-content">
<div class="columns">
<div class="column col-12 col-lg-12">
<div class="book-navbar">
<!-- For Responsive Layout -->
<header class="navbar">
<section class="navbar-section">
<a onclick="open_sidebar()">
<i class="icon icon-menu"></i>
</a>
</section>
</header>
</div>
<div class="book-content" style="max-width: 960px; margin: 0 auto;
overflow-x: auto;
overflow-y: hidden;">
<div class="book-post">
<p id="tip" align="center"></p>
<div><h1>MySQL 亿级别数据迁移实战代码分享</h1>
<h3>x业务背景</h3>
<p>某一项技术的出现是为了解决问题的,我们之所以要学习某个技术,是因为这个技术解决了我们工作中遇到的痛点。上亿条数据在 MySQL 中有索引的条件下,实际上只是查询是没有太大问题的,在没有特别大的并发的情况下,只是插入特别慢。</p>
<p>在迁移大数据量的情况下面临的问题主要有如下几点:</p>
<ol>
<li>怎么保证查询大数据量下没有性能的瓶颈。</li>
<li>不能影响正常的业务查询,不要有太多的性能损耗,可以灵活控制迁移的启动与停止。</li>
<li>实际的迁移参数可以动态的扩展,比如一次批量提交多少条数据,一次迁移多少,迁移过程中怎么转化数据。</li>
<li>对于一些错误可以容忍的话怎么跳过异常,比如可以容忍几条数据的失败。</li>
<li>测试环境和生产环境的测试实际一般很难达到百分百一致,如何在线上对迁移的性能进行可配置的控制。</li>
<li>事务的提交问题,因为数据量过大,不能将所有数据查询到内存之中,数据如何分而治之。</li>
<li>迁移的过程需要记录,比如每次的任务执行的多长时间,事务提交了多少次,这些都可以查询,根据这些指标再次动态调整。</li>
</ol>
<p>针对上面提出的问题,我们自己去写这样一个功能完整且稳定的功能,实际中还是比较困难的。按照通常的 Service 和 Dao 的写法,满足不了我们的需求。</p>
<h3>Spring Batch 核心功能的介绍</h3>
<p>Spring Batch 是 Spring 官方的一个专门用于处理大数据体量下数据迁移的开源框架,主要解决的问题就是我们上面所说的那些问题,通过分批读取和写数据,保证内存不会发生溢出。并且通过灵活的组件组合,可以满足数据迁移中的各种需求。</p>
<h4>核心的功能和特点</h4>
<ol>
<li>事务管理</li>
<li>基于分块的处理</li>
<li>声明式 IO</li>
<li>可灵活控制开始、停止、重启</li>
<li>重试和跳过</li>
<li>基于网页的后台管理 Spring Cloud Data Flow</li>
</ol>
<p>Spring Batch 的文档很全面,但对于初学者来说,整个文档读完需要耗费比较长的时间,这里只看两张 Spring Batch 的架构图,后面我会通过实际工作中的迁移案例来讲解怎么使用它。</p>
<h4>架构图</h4>
<p><img src="assets/8bd2ac90-2f8f-11ea-a6f3-07a41d160500.png" alt="在这里插入图片描述" /></p>
<p><img src="assets/96a647d0-2f8f-11ea-a6f3-07a41d160500.png" alt="在这里插入图片描述" /></p>
<p>第一张图是架构图,第二张分区处理图。</p>
<p>从这两张图可以看到这正是我们迁移大数据量下的核心诉求。输入端是从数据源读取数据,数据源可以来源于文件或者数据库,中间的处理流程是对数据进行加工,因为有些数据我们需要进行转换和处理,输出端是写出数据,可以写入到文件或者数据库。</p>
<h4>核心组件</h4>
<ol>
<li>Reader负责数据的读取</li>
<li>Processor负责数据的处理</li>
<li>Writer负责数据的写出</li>
<li>Step流程编排</li>
<li>Job任务配置</li>
</ol>
<p>这些业务组件用我们生活中的例子来理解的话,可以想象一下如下的场景:</p>
<blockquote>
<p>数据迁移就好比从一个油田里运输原油到炼油厂,提炼完成后最终把成品油送到加油站的一个过程。原油在到成品油的过程中要经历很多的流程和运输步骤,就像我们的数据要经过转换一样。</p>
<p>Reader 就好比从油田里将原油装上运输车Processor 就好比将原油提炼出成品油Writer 就好比将油运到加油站的油箱里Step 就是这个装油 -&gt; 提炼油 -&gt; 运输到加油站这个流水线一样的处理步骤Job 相当与任务,这个任务就是 Step 定义好的这个流程,任务可以执行多次,但是每一次都是按照提前定义好的 Step 来执行。</p>
</blockquote>
<h3>实际中的业务需求怎么实现</h3>
<p>可以先下载源码:</p>
<p><a href="https://gitee.com/smartGim/batchsrv">https://gitee.com/smartGim/batchsrv</a></p>
<h4>先从单表数据备份快速了解迁移流程</h4>
<p>这种场景多见于项目创建初期为了快速上线,在项目初期没有做分库分表(在项目初期一般不会做分库分表),比如订单表,还有银行流水表、历史对账数据、支付记录数据表等。这种数据的特点为时间在半年或者一年以上将变为冷数据,数据查询的情况比较少,并且数据不会发生变化。通常情况下我们需要将时间大于一定时期的数据定时备份。还有一种情况就是接手一些历史遗留的项目,有些数据的单表已经达到了一个极限。</p>
<p>我们以一个数据库的数据 db_source 迁移到另外一个数据库 db_target 为例,迁移过程中同时删除已经迁移走的记录。并将迁移的过程和记录进行记录下来到 db_source 中。</p>
<p>实现步骤:</p>
<p><strong>1. 新建一个 Spring Boot 工程引入</strong></p>
<pre><code class="language-java">&lt;dependency&gt;
&lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
&lt;artifactId&gt;spring-boot-starter-batch&lt;/artifactId&gt;
&lt;/dependency&gt;
@SpringBootApplication
@EnableBatchProcessing
public class BatchsrvApplication {
public static void main(String[] args) {
SpringApplication.run(BatchsrvApplication.class, args);
}
}
</code></pre>
<p>在启动类上添加注解 @EnableBatchProcessing。</p>
<p><strong>注意</strong>:添加上了此注解,那么项目启动后 Spring Batch 就会自动化初始相关的数据库文件,并且直接启动相关的迁移任务。在实际的线上迁移中是不会直接上线后就自动执行任务的,通常会先通过手动执行来看下执行情况,再动态配置一些指标,所以我们的项目会禁用自动执行,并且手动执行 Spring batch 的数据库文件,数据库初始文件在 spring-batch-core 中,根据数据库进行选择即可。</p>
<p>禁用自动执行任务选项,<code>batch.job.enabled = false</code></p>
<pre><code class="language-java">server:
port: 8096
spring:
profiles:
active: dev
batch:
job:
enabled: false
</code></pre>
<p><strong>2. 多数据源配置</strong></p>
<pre><code class="language-java">package cn.gitchat.share.config;
import javax.sql.DataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
/**
* @Author: Gim
* @Date: 2019-12-04 17:05
* @Description:
*/
@Configuration
public class DataSourceConfig {
@Autowired
private Environment env;
@Bean
@Primary
public DataSource primaryDatasource() {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName(env.getProperty(&quot;source.driver-class-name&quot;));
dataSource.setUrl(env.getProperty(&quot;source.jdbc-url&quot;));
dataSource.setUsername(env.getProperty(&quot;source.username&quot;));
dataSource.setPassword(env.getProperty(&quot;source.password&quot;));
return dataSource;
}
@Bean
public DataSource targetDatasource() {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName(env.getProperty(&quot;target.driver-class-name&quot;));
dataSource.setUrl(env.getProperty(&quot;target.jdbc-url&quot;));
dataSource.setUsername(env.getProperty(&quot;target.username&quot;));
dataSource.setPassword(env.getProperty(&quot;target.password&quot;));
return dataSource;
}
@Bean
public JdbcTemplate primaryTemplate(){
return new JdbcTemplate(primaryDatasource());
}
@Bean
public JdbcTemplate targetJdbcTemplate(){
return new JdbcTemplate(targetDatasource());
}
}
</code></pre>
<p>此处我们使用多数据源的配置,并且使用 JdbcTemplate 进行数据的读取。</p>
<p>主数据源中配置 Spring Batch 的数据库文件(如果不在意迁移的情况记录可以使用内存数据库,存储 Spring Batch 的迁移记录情况,这样能提高迁移性能,但是实际中不建议做,迁移性能的大幅提升会影响线上的业务正常运行,除非迁移有及时性这种需求再开启内存数据源)。</p>
<p><strong>如何定制 Spring Batch 的框架使用的数据源?</strong></p>
<p>从源码中找到:</p>
<pre><code class="language-java">public class BatchConfig extends DefaultBatchConfigurer
</code></pre>
<p>这个类继承于 DefaultBatchConfigurer 然后我们对 JobRepository 进行重写:</p>
<pre><code class="language-java">//将spring batch 的记录存取在主数据源中
@Override
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(primaryDatasource);
factory.setTransactionManager(this.getTransactionManager());
factory.afterPropertiesSet();
return factory.getObject();
}
</code></pre>
<p><strong>3. 读取的配置</strong></p>
<pre><code>/**
* 数据读取 根据id 查询保证性能 分页读取
*/
@Bean
@StepScope
public JdbcPagingItemReader&lt;PayRecord&gt; payRecordReader(@Value(&quot;#{jobParameters[minId]}&quot;) Long minId, @Value(&quot;#{jobParameters[maxId]}&quot;) Long maxId) throws Exception {
JdbcPagingItemReader&lt;PayRecord&gt; reader = new JdbcPagingItemReader();
final SqlPagingQueryProviderFactoryBean sqlPagingQueryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();
sqlPagingQueryProviderFactoryBean.setDataSource(primaryDatasource);
sqlPagingQueryProviderFactoryBean.setSelectClause(&quot;select * &quot;);
sqlPagingQueryProviderFactoryBean.setFromClause(&quot;from pay_record&quot;);
sqlPagingQueryProviderFactoryBean.setWhereClause(&quot;id &gt; &quot; + minId + &quot; and id &lt;= &quot; + maxId);
sqlPagingQueryProviderFactoryBean.setSortKey(&quot;id&quot;);
reader.setQueryProvider(sqlPagingQueryProviderFactoryBean.getObject());
reader.setDataSource(primaryDatasource);
reader.setPageSize(config.getPageSize());
reader.setRowMapper(new PayRecordRowMapper());
reader.afterPropertiesSet();
reader.setSaveState(true);
return reader;
}
</code></pre>
<p><strong>在读取的配置中有几个比较重要的点:</strong></p>
<ul>
<li>第一点可以看到在 @Bean 下方有个注解 @StepScope 这个注解的作用是我们可以在 Job 执行时给 Job 添加变量,用于我们可以动态的控制每次任务要迁移的记录,通过 <code>@Value (&quot;#{jobParameters [minId]}&quot;</code> 这种方式进行接收参数,传递参数可以在任务执行时进行 Job 传参,使用方式如下:</li>
</ul>
<pre><code class="language-java">@Scheduled(cron = &quot;0 0 2 * * ?&quot;)
public void migratePayRecord() throws Exception {
Map&lt;String, Object&gt; result = primaryJdbcTemplate
.queryForMap(&quot;select * from job_config where config_key = ?&quot;, MigrateConstants.PAY_RECORD_JOB_CONFIG_KEY);
String value = (String) result.get(&quot;config_value&quot;);
JobExecuteConfig payRecordJobConfig = JSON.parseObject(value, JobExecuteConfig.class);
JobParameters params = new JobParametersBuilder()
.addLong(&quot;maxId&quot;, payRecordJobConfig.getExecuteMaxId())
.addLong(&quot;minId&quot;, payRecordJobConfig.getMinId())
.toJobParameters();
jobLauncher.run(migratePayRecordJob, params);
}
</code></pre>
<ul>
<li>第二点可以看到我们读取数据使用的是 JdbcPagingItemReader 这个类,这个类的特点是可以通过分页的方式对数据记录进行读取,保证不会内存溢出,这里面 SortKey 是个很重要的点,一定要保证数据的唯一性,这个用于任务重新启动判断从哪再次开始。</li>
<li>PageSize 我们可以控制每一次读取多少条记录这个用于调优用根据实际的情况进行控制。SaveState 用于将迁移的记录情况进行记录。</li>
<li>这个 Reader 的配置非常重要是保证亿级别数据不会发生内存溢出的关键配置。在这里的配置相当于我们对大数量级别的数据进行了一个分段处理。是分而治之的关键点。</li>
</ul>
<p><strong>4. 写入的配置</strong></p>
<pre><code class="language-java">/**
* 写入到新库
*/
@Bean
public ItemWriter&lt;? super PayRecord&gt; targetPayRecordWriter() {
return new JdbcBatchItemWriterBuilder&lt;PayRecord&gt;()
.dataSource(targetDatasource)
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider&lt;&gt;())
.sql(
&quot;INSERT INTO `pay_record` (\n&quot;
+ &quot;`id`,\n&quot;
+ &quot;`user_id`,\n&quot;
+ &quot;`pay_detail`,\n&quot;
+ &quot;`pay_status`,\n&quot;
+ &quot;`create_time`,\n&quot;
+ &quot;`update_time`\n&quot;
+ &quot;)\n&quot;
+ &quot;VALUES\n&quot;
+ &quot;\t(\n&quot;
+ &quot;:id,&quot;
+ &quot;:userId,&quot;
+ &quot;:payDetail,&quot;
+ &quot;:payStatus,&quot;
+ &quot;:createTime,&quot;
+ &quot;:updateTime&quot;
+ &quot;)&quot;)
.build();
}
/**
* 删除器
*
* @return
*/
@Bean
public ItemWriter&lt;PayRecord&gt; deletePayRecordWriter() {
return new JdbcBatchItemWriterBuilder&lt;PayRecord&gt;()
.dataSource(primaryDatasource)
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider&lt;&gt;())
.sql(&quot;delete from pay_record where id = :id&quot;)
.build();
}
@Bean
public CompositeItemWriter&lt;PayRecord&gt; compositePayRecordItemWriter(@Qualifier(&quot;deletePayRecordWriter&quot;) ItemWriter deleteWriter, @Qualifier(&quot;targetPayRecordWriter&quot;) ItemWriter targetPayRecordWriter) {
CompositeItemWriter&lt;PayRecord&gt; compositeItemWriter = new CompositeItemWriter&lt;&gt;();
List&lt;ItemWriter&lt;? super PayRecord&gt;&gt; list = new ArrayList&lt;&gt;();
list.add(deleteWriter);
list.add(targetPayRecordWriter);
compositeItemWriter.setDelegates(list);
return compositeItemWriter;
}
</code></pre>
<p>写入的配置这里面使用了两个配置,一个是写入新的数据源,一个是删除原来的迁移数据连接的是原来的主数据源。写入器使用的是框架提供的 JdbcBatchItemWriter 看名字就知道这个类的作用了它支持批量的写入性能非常高。Spring Batch 支持各种 Writer 的组合,通过 CompositeItemWriter 来实现。在这里只是为了演示组合 Writer 的用法,实际中最好分成两步来完成这种任务。</p>
<p><strong>5. Step 的配置</strong></p>
<pre><code class="language-java">@Bean
public Step migratePayRecordStep(@Qualifier(&quot;payRecordReader&quot;) JdbcPagingItemReader&lt;PayRecord&gt; payRecordReader, @Qualifier(value = &quot;compositePayRecordItemWriter&quot;) CompositeItemWriter compositeItemWriter) {
return this.stepBuilderFactory.get(&quot;migratePayRecordStep&quot;)
.&lt;PayRecord, PayRecord&gt;chunk(config.getChunkSize())
.reader(payRecordReader)
.processor(new PassThroughItemProcessor())
.writer(compositeItemWriter)
.taskExecutor(new SimpleAsyncTaskExecutor(&quot;migrate_thread&quot;))
.throttleLimit(config.getThreadSize())
.build();
}
</code></pre>
<p>像上面讲到的那样 Step 的作用就是配置整个流程的,这里面 chunk 是对数据进行分块,可以动态的控制每次事务提交多少条记录。</p>
<p>Processor 是处理器。 这里面直接传递了一个 PassThroughItemProcessor ,这个类的作用是对传递过来的对象不做任何的处理。如果我们想对原有的记录进行扩展,那么可以自定义一个处理器,比如:</p>
<pre><code class="language-java">@Component
public class PayRecordExtProcessor implements ItemProcessor&lt;PayRecord,PayRecordExt&gt; {
@Override
public PayRecordExt process(PayRecord payRecord) throws Exception {
//other service
PayRecordExt ext = new PayRecordExt();
return ext;
}
}
</code></pre>
<p>在这里进行数据处理。</p>
<p>taskExecutor 这个是线程池的配置。因为每条记录的读取、处理、写入都是独立的互相之间没有任何的影响所以使用线程池可以实现流程的并行处理提高处理速度。throttleLimit 用来控制最大的线程数。线程太多了会造成资源的浪费。</p>
<p><strong>6. Job 的配置</strong></p>
<pre><code class="language-java">@Bean
public Job migratePayRecordJob(@Qualifier(&quot;migratePayRecordStep&quot;) Step step) {
return this.jobBuilderFactory.get(&quot;migratePayRecordJob&quot;)
.start(step)
.incrementer(new RunIdIncrementer())
.build();
}
</code></pre>
<p>Job 用于将 step 配置好的流程向外以任务的形式暴露。</p>
<p><strong>7. 如何启动 Job</strong></p>
<p>默认情况下 Spring Batch 会自动启动任务,上面提到了,在线上时一定不要自动启动任务。</p>
<pre><code class="language-java">@Component
@Slf4j
public class PayRecordTask {
@Autowired
private Job migratePayRecordJob;
@Autowired
private JobLauncher jobLauncher;
@Autowired
@Qualifier(&quot;primaryTemplate&quot;)
private JdbcTemplate primaryJdbcTemplate;
/**
*迁移任务
* @throws Exception
*/
@Scheduled(cron = &quot;0 0 2 * * ?&quot;)
public void migratePayRecord() throws Exception {
Map&lt;String, Object&gt; result = primaryJdbcTemplate
.queryForMap(&quot;select * from job_config where config_key = ?&quot;, MigrateConstants.PAY_RECORD_JOB_CONFIG_KEY);
String value = (String) result.get(&quot;config_value&quot;);
JobExecuteConfig payRecordJobConfig = JSON.parseObject(value, JobExecuteConfig.class);
JobParameters params = new JobParametersBuilder()
.addLong(&quot;maxId&quot;, payRecordJobConfig.getExecuteMaxId())
.addLong(&quot;minId&quot;, payRecordJobConfig.getMinId())
.toJobParameters();
jobLauncher.run(migratePayRecordJob, params);
}
/**
*
* 执行迁移任务之前数据迁移配置初始化
* 凌晨一点
*/
@Scheduled(cron = &quot;0 0 1 * * ?&quot;)
public void initBillMigrateConfig() {
//TODO 根据实际需要的业务逻辑动态计算出,每次要迁移的数据是哪些 最好根据时间算出MaxId, 然后每天执行固定的步长 比如每天迁移100万
}
}
</code></pre>
<p>当在启动类上添加注解 @EnableBatchProcessing 之后Spring Batch 会在容器中自动注入 Spring Batch 的组件,这里面我们可以使用 JobLauncher 进行任务的启动,在启动时可以配置任务启动的参数。</p>
<pre><code class="language-java">JobParameters params = new JobParametersBuilder()
.addLong(&quot;maxId&quot;, payRecordJobConfig.getExecuteMaxId())
.addLong(&quot;minId&quot;, payRecordJobConfig.getMinId())
.toJobParameters();
</code></pre>
<p>这传递的参数,通过 @StepScope 注解的 Bean 就可以动态获取这个参数。</p>
<p><strong>8. Job 如何启动、停止、重启</strong></p>
<p>Job 的另一种启动和停止是通过类 JobOperator 这个类来实现的,在启动和重启之前要将 Job 注册到 JobRegistry 中,在 JobController 中可以看到具体的实现。</p>
<pre><code class="language-java">@Autowired
private JobRegistry registry;
@PostConstruct
public void registry() throws DuplicateJobException {
ReferenceJobFactory factory = new ReferenceJobFactory(migratePayRecordJob);
registry.register(factory);
}
@PostMapping(value = &quot;startJob&quot;)
@SneakyThrows
public ResponseEntity&lt;String&gt; startJob() {
Properties properties = new Properties();
properties.put(&quot;minId&quot;, &quot;20&quot;);
properties.put(&quot;maxId&quot;, &quot;50&quot;);
executePayRecordId = jobOperator.start(&quot;migratePayRecordJob&quot;, PropertiesConverter.propertiesToString(properties));
return ResponseEntity.ok(&quot;操作成功&quot;);
}
@PostMapping(value = &quot;stopJob&quot;)
@SneakyThrows
public ResponseEntity&lt;String&gt; stopJob() {
Assert.notNull(executePayRecordId,&quot;执行id不能为空&quot;);
jobOperator.stop(executePayRecordId);
return ResponseEntity.ok(&quot;操作成功&quot;);
}
@PostMapping(value = &quot;restartJob&quot;)
@SneakyThrows
public ResponseEntity&lt;String&gt; restartJob(){
Assert.notNull(executePayRecordId,&quot;执行id不能为空&quot;);
jobOperator.restart(executePayRecordId);
return ResponseEntity.ok(&quot;操作成功&quot;);
}
</code></pre>
<p>这个类的方法非常的丰富,除了能启动执行外还有大量的 API</p>
<pre><code class="language-java">List&lt;Long&gt; getExecutions(long var1) throws NoSuchJobInstanceException;
List&lt;Long&gt; getJobInstances(String var1, int var2, int var3) throws NoSuchJobException;
Set&lt;Long&gt; getRunningExecutions(String var1) throws NoSuchJobException;
String getParameters(long var1) throws NoSuchJobExecutionException;
Long start(String var1, String var2) throws NoSuchJobException, JobInstanceAlreadyExistsException, JobParametersInvalidException;
Long restart(long var1) throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException, NoSuchJobException, JobRestartException, JobParametersInvalidException;
Long startNextInstance(String var1) throws NoSuchJobException, JobParametersNotFoundException, JobRestartException, JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException, UnexpectedJobExecutionException, JobParametersInvalidException;
boolean stop(long var1) throws NoSuchJobExecutionException, JobExecutionNotRunningException;
String getSummary(long var1) throws NoSuchJobExecutionException;
Map&lt;Long, String&gt; getStepExecutionSummaries(long var1) throws NoSuchJobExecutionException;
Set&lt;String&gt; getJobNames();
JobExecution abandon(long var1) throws NoSuchJobExecutionException, JobExecutionAlreadyRunningException;
</code></pre>
<p>在 Spring Batch 的 Admin 管理系统中主要就是通过这个类来获取任务的执行情况。</p>
<p><strong>9. 总结</strong></p>
<p>通过上面的演示可以清楚的看到 Spring Batch 的执行逻辑Spring Batch 定义了模板,我们在使用过程中只需要按照接口提供相应的数据来源和输出的接口即可。其他的事务处理,多线程池,批量处理,内存控制都由框架来完成。那么 Spring Batch 是怎么样保证在大数据量级的情况下,内存不溢出然后又保证性能的呢,下面用一张图进行展示 Spring Batch 是通过怎么的处理,保证在数据量巨大的情况下,高性能的进行数据的读写的。</p>
<p><img src="assets/d9d2ef90-2f8f-11ea-b7a2-bd62e8fb625b.png" alt="在这里插入图片描述" /></p>
<h4>其他业务场景的扩展</h4>
<p>在实际中项目处理中,我们面对的系统情况可能并不是一个单表这样简单的业务情况,比如多表合一、一表分表为多表,在迁移过程中数据变更的情形。那么这些场景该如何处理呢?</p>
<p><strong>1. 多表合一的场景</strong></p>
<p>在多表合一的场景下,实际上如何两张表有关联,那么附表上一定要建立索引。我们还是会让查询逻辑在主表上,千万不要使用 Join 联查,亿级别的数据如何联查,那么基础系统就要崩溃了。此时我们需要将需要拼装的业务逻辑放在 Processor 里面,比如在我们的案例场景下,需要将 User 的信息冗余到新的一个表中那么只需要将 UserService 注入到 PayRecordExtProcessor 中,然后再通过 <code>userService.getId(payRecord.getUserId ())</code> 这种方式去 load 用户信息,用户的 Id 肯定有索引,这样查询性能不会有太多的损耗。这也是现在大厂为啥要求在业务系统开发的过程中尽快不要使用 Join 联查的原因。</p>
<pre><code class="language-java">@Component
public class PayRecordExtProcessor implements ItemProcessor&lt;PayRecord,PayRecordExt&gt; {
@Override
public PayRecordExt process(PayRecord payRecord) throws Exception {
//此处注入 User service 通过userService .getId(payRecord.getUserId());
PayRecordExt ext = new PayRecordExt();
return ext;
}
}
</code></pre>
<p><strong>2. 拆分为多表的场景</strong></p>
<p>系统有些情况下需要将单表拆分为多表,比如根据用户 id 分片,这种情况下 Spring Batch 提供了一个类 ClassifierCompositeItemWriter 可以根据条件动态选择 Writer在这里我们模拟将 pay_record 这个表拆分为两个表的场景 pay_record_1、pay_record2。</p>
<pre><code>create TABLE pay_record_1 like pay_record;
create TABLE pay_record_2 like pay_record;
//根据条件拆分为多表
@Bean
public ClassifierCompositeItemWriter&lt;? super PayRecord&gt; classifierItemWriter(@Qualifier(&quot;payRecordOneWriter&quot;) ItemWriter payRecordOneWriter,@Qualifier(&quot;payRecordTwoWriter&quot;) ItemWriter payRecordTwoWriter){
ClassifierCompositeItemWriter&lt;PayRecord&gt; classifierCompositeItemWriter = new ClassifierCompositeItemWriter&lt;&gt;();
classifierCompositeItemWriter.setClassifier(
(Classifier&lt;PayRecord, ItemWriter&lt;? super PayRecord&gt;&gt;) record -&gt; {
ItemWriter&lt;? super PayRecord&gt; itemWriter;
if(record.getId() %2 ==0 ){
itemWriter = payRecordOneWriter;
}else {
itemWriter = payRecordTwoWriter;
}
return itemWriter;
});
return classifierCompositeItemWriter;
}
@Bean
public Step splitPayRecordStep(@Qualifier(&quot;payRecordReader&quot;) JdbcPagingItemReader&lt;PayRecord&gt; payRecordReader, @Qualifier(value = &quot;classifierItemWriter&quot;) ClassifierCompositeItemWriter itemWriter) {
return this.stepBuilderFactory.get(&quot;splitPayRecordStep&quot;)
.&lt;PayRecord, PayRecord&gt;chunk(config.getChunkSize())
.reader(payRecordReader)
.processor(new PassThroughItemProcessor())
.writer(itemWriter)
.taskExecutor(new SimpleAsyncTaskExecutor(&quot;migrate_thread&quot;))
.throttleLimit(config.getThreadSize())
.build();
}
@Bean
public Job splitPayRecordJob(@Qualifier(&quot;splitPayRecordStep&quot;) Step step) {
return this.jobBuilderFactory.get(&quot;splitPayRecordJob&quot;)
.start(step)
.incrementer(new RunIdIncrementer())
.build();
}
</code></pre>
<p>在这里面我们定义了一个 ClassifierCompositeItemWriter 根据 Id % 表个数进行分片。具体的实现可以看代码。</p>
<p><strong>3. 迁移过程中数据有变更和增量的场景</strong></p>
<p>一般来说有单表性能的表数据,都有一个特点,随着时间的迁移会有大量的冷数据,上千万频繁变更的场景还是比较少的。如果想保证系统不间断运行,同时又能进行系统的迁移,迁移完成后再进行用户无感知的切换。需要考虑到迁移过程中有增量变更的情况。这种情况下我们一般引入 CDC (change data capture) 组件。比较常用的如阿里的 Canal、Debezium这种中间件监听数据的变更。关于它们的使用可以去看官方的文档非常的简单添加好配置项就可以了。</p>
<p>配置好后将变更的数据再导入的新的数据源。切换后完成再通过 Spring Batch 进行比对迁移的数据和变化数据就可以了。如果变化的量级特别大,最好选用后半夜,数据变更较小的情况下进行数据迁移。因为亿级别的数据实时同步这个是不可能的,找一个折中的方案即可。</p>
<h4>如何控制部署的实例不同时运行</h4>
<p>为了保证服务的可靠性,通常我们部署的迁移服务实例是多个的,在这种情况下,我们需要保证数据不能迁移重复。在 Spring Batch 里面 Job 的概念相当于任务,还有一个概念叫 JobInstance。</p>
<p>什么意思呢,上面提到 Step 相当于配置的流程Job 是任务,那么任务是可能有多个的,就像一个类,我们可以 new 出来多个对象一样。一个 JonInstance 在相同的 JobParameter 下只会执行一次。这样在多实例的情况下,实际上部署多台实例也能控制任务不会重复执行,只不过会报一个任务已经存在的异常。但是在实际的项目迁移中我们希望 Job 的参数可以每天动态计算,比如在任务之前我们需要动态的计算出,今天的任务要迁移哪些数据。</p>
<pre><code class="language-java">/**
*
* 执行迁移任务之前数据迁移配置初始化
* 凌晨一点
*/
@Scheduled(cron = &quot;0 0 1 * * ?&quot;)
public void initMigrateConfig() {
//TODO 根据实际需要的业务逻辑动态计算出,每次要迁移的数据是哪些 最好根据时间算出MaxId, 然后每天执行固定的步长 比如每天迁移100万
}
</code></pre>
<p>像这种任务如果是多台实例,如果不加锁实际上会有问题的。这里面我们再引入一个新的组件:</p>
<pre><code class="language-java">&lt;dependency&gt;
&lt;groupId&gt;net.javacrumbs.shedlock&lt;/groupId&gt;
&lt;artifactId&gt;shedlock-spring&lt;/artifactId&gt;
&lt;version&gt;4.1.0&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;net.javacrumbs.shedlock&lt;/groupId&gt;
&lt;artifactId&gt;shedlock-provider-jdbc-template&lt;/artifactId&gt;
&lt;version&gt;4.0.4&lt;/version&gt;
&lt;/dependency&gt;
@Bean
public LockProvider lockProvider(@Qualifier(value = &quot;primaryDatasource&quot;) DataSource dataSource) {
return new JdbcTemplateLockProvider(dataSource);
}
</code></pre>
<p>此处我们使用 JdbcTemplateLock 当做我们的分布式锁的实现。这样配置之后我们的任务在同一时间就只会在一个实例上运行了。</p>
<h4>哪些点可以优化</h4>
<p>\1. 每次可以控制读取的条数,每次事务提交的个数,每个 Step 启用的线程数都可以进行配置,在本项目中提供了一个配置类,可以对这些配置项进行动态的配置。</p>
<pre><code class="language-java">@ConfigurationProperties(prefix = &quot;migrate.config&quot;)
@Data
public class MigrateConfig {
//每次读取数据条数
private Integer pageSize;
//每次事务提交记录数
private Integer chunkSize;
private Integer threadSize;
}
</code></pre>
<p>\2. 在迁移之前 MySQL 要查看连接情况,设置缓冲池大小等指标,这个也是在迁移之前要考虑的点。</p>
<pre><code class="language-sql">show processlist
set global innodb_buffer_pool_size = 缓冲池大小 ;
show variables like 'max_connections';
set global max_connections= 最大连接数;
</code></pre>
<p>\3. 像上面提到的 Spring Batch 在迁移过程中会将任务的执行情况都初始化到数据库中,如果我们不想要这些数据持久化,那么我们可以选择内存数据库,这个会大大提升迁移的速度,更改数据源即可。</p>
<h3>Spring Cloud Data Flow 入门简介</h3>
<p>Spring Cloud Data Flow 这个是现在 Spring 在数据流处理方面一个功能非常强大的框架。目前被 Spring 放在了首页上,跟 Spring Cloud 并列。在单机条件下对 Spring Batch 的支持有限,不支持任务的动态调度,想使用调度需要 K8s 部署Spring Cloud Data Flow 在本机搭建有两种方式,一个是用 Docker一个是直接启动 jar 包。具体的 jar 包我放在了项目的 resources 下面。</p>
<p>如果使用 Dockercd 到 dataflow 目录下运行。</p>
<pre><code class="language-java">export DATAFLOW_VERSION=2.2.1.RELEASE
export SKIPPER_VERSION=2.1.2.RELEASE
docker-compose up
</code></pre>
<p>如果不使用 Docker 而选择直接运行 jar 包:</p>
<pre><code>java -jar spring-cloud-skipper-server-2.1.2.RELEASE.jar
java -jar spring-cloud-dataflow-server-2.2.1.RELEASE.jar
</code></pre>
<p>打开浏览器 输入 http://localhost:9393/dashboard/ 即可访问控制台。</p>
<p>注意一点:如果需要使用 Spring Batch Job 需要集成 Spring Cloud Task实际的项目中使用 JobOperator 即可,这个 UI 也是使用这个类进行 UI 展示的。如果对这个感兴趣的可以去探索。</p>
<p>在实际的迁移过程中可以查询在项目启动时初始化那几张表Spring Batch 会把每次执行的参数和执行情况全部初始化到数据库表中以 BATCH_开头的表中。</p>
<h3>项目代码地址</h3>
<blockquote>
<p><a href="https://gitee.com/smartGim/batchsrv">https://gitee.com/smartGim/batchsrv</a></p>
</blockquote>
<p>该项目是 Spring Boot 项目,打开 application-dev.yml 中按照要求创建自己的数据库db 文件在 resources/db 文件下source_db 开头的初始化到 source 数据库中target_db 开头的初始化到 target 数据库中即可。</p>
<p>实战代码并非理论的讲解,所以请务必查看源码进行练习。</p>
<p>如何进行测试,在 jobconfig 表中更改 minId 和 maxId这个是控制要迁移哪些数据的你也可以写个自动化的任务去每天动态更新这个配置项。</p>
<p>在 web 包下有 JobController 是前端的入口,用于本地的测试,可以通过 Postman 等工具直接发送请求即可。</p>
<p>如果在测试过程中有任何问题可以提 Issue。</p>
<h3>其他学习的资源</h3>
<p>一个不错的示例项目:</p>
<blockquote>
<p><a href="https://github.com/pkainulainen/spring-batch-examples.git">https://github.com/pkainulainen/spring-batch-examples.git</a></p>
</blockquote>
<p>如果想写入到 ExcelSpring Batch 也支持 ExcelES 的扩展:</p>
<blockquote>
<p><a href="https://github.com/spring-projects/spring-batch-extensions.git">https://github.com/spring-projects/spring-batch-extensions.git</a></p>
</blockquote>
<p>官方的 Samples</p>
<blockquote>
<p><a href="https://github.com/spring-projects/spring-batch.git">https://github.com/spring-projects/spring-batch.git</a></p>
</blockquote>
</div>
</div>
<div>
<div style="float: left">
<a href="/文章/MySQL 事务日志(redo log和undo log).md.html">上一页</a>
</div>
<div style="float: right">
<a href="/文章/MySQL 从一条数据说起-InnoDB行存储数据结构.md.html">下一页</a>
</div>
</div>
</div>
</div>
</div>
</div>
<a class="off-canvas-overlay" onclick="hide_canvas()"></a>
</div>
<script defer src="https://static.cloudflareinsights.com/beacon.min.js/v652eace1692a40cfa3763df669d7439c1639079717194" integrity="sha512-Gi7xpJR8tSkrpF7aordPZQlW2DLtzUlZcumS8dMQjwDHEnw9I7ZLyiOj/6tZStRBGtGgN6ceN6cMH8z7etPGlw==" data-cf-beacon='{"rayId":"70997fec7f918b66","version":"2021.12.0","r":1,"token":"1f5d475227ce4f0089a7cff1ab17c0f5","si":100}' crossorigin="anonymous"></script>
</body>
<!-- Global site tag (gtag.js) - Google Analytics -->
<script async src="https://www.googletagmanager.com/gtag/js?id=G-NPSEEVD756"></script>
<script>
window.dataLayer = window.dataLayer || [];
function gtag() {
dataLayer.push(arguments);
}
gtag('js', new Date());
gtag('config', 'G-NPSEEVD756');
var path = window.location.pathname
var cookie = getCookie("lastPath");
console.log(path)
if (path.replace("/", "") === "") {
if (cookie.replace("/", "") !== "") {
console.log(cookie)
document.getElementById("tip").innerHTML = "<a href='" + cookie + "'>跳转到上次进度</a>"
}
} else {
setCookie("lastPath", path)
}
function setCookie(cname, cvalue) {
var d = new Date();
d.setTime(d.getTime() + (180 * 24 * 60 * 60 * 1000));
var expires = "expires=" + d.toGMTString();
document.cookie = cname + "=" + cvalue + "; " + expires + ";path = /";
}
function getCookie(cname) {
var name = cname + "=";
var ca = document.cookie.split(';');
for (var i = 0; i < ca.length; i++) {
var c = ca[i].trim();
if (c.indexOf(name) === 0) return c.substring(name.length, c.length);
}
return "";
}
</script>
</html>