learn.lianglianglee.com/专栏/Java 并发编程 78 讲-完/05 有哪几种实现生产者消费者模式的方法?.md.html
2022-05-11 18:57:05 +08:00

1797 lines
38 KiB
HTML
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<!DOCTYPE html>
<!-- saved from url=(0046)https://kaiiiz.github.io/hexo-theme-book-demo/ -->
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1.0, user-scalable=no">
<link rel="icon" href="/static/favicon.png">
<title>05 有哪几种实现生产者消费者模式的方法?.md.html</title>
<!-- Spectre.css framework -->
<link rel="stylesheet" href="/static/index.css">
<!-- theme css & js -->
<meta name="generator" content="Hexo 4.2.0">
</head>
<body>
<div class="book-container">
<div class="book-sidebar">
<div class="book-brand">
<a href="/">
<img src="/static/favicon.png">
<span>技术文章摘抄</span>
</a>
</div>
<div class="book-menu uncollapsible">
<ul class="uncollapsible">
<li><a href="/" class="current-tab">首页</a></li>
</ul>
<ul class="uncollapsible">
<li><a href="../">上一级</a></li>
</ul>
<ul class="uncollapsible">
<li>
<a href="/专栏/Java 并发编程 78 讲-完/00 由点及面,搭建你的 Java 并发知识网.md.html">00 由点及面,搭建你的 Java 并发知识网.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/01 为何说只有 1 种实现线程的方法?.md.html">01 为何说只有 1 种实现线程的方法?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/02 如何正确停止线程?为什么 volatile 标记位的停止方法是错误的?.md.html">02 如何正确停止线程?为什么 volatile 标记位的停止方法是错误的?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/03 线程是如何在 6 种状态之间转换的?.md.html">03 线程是如何在 6 种状态之间转换的?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/04 waitnotifynotifyAll 方法的使用注意事项?.md.html">04 waitnotifynotifyAll 方法的使用注意事项?.md.html</a>
</li>
<li>
<a class="current-tab" href="/专栏/Java 并发编程 78 讲-完/05 有哪几种实现生产者消费者模式的方法?.md.html">05 有哪几种实现生产者消费者模式的方法?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/06 一共有哪 3 类线程安全问题?.md.html">06 一共有哪 3 类线程安全问题?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/07 哪些场景需要额外注意线程安全问题?.md.html">07 哪些场景需要额外注意线程安全问题?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/08 为什么多线程会带来性能问题?.md.html">08 为什么多线程会带来性能问题?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/09 使用线程池比手动创建线程好在哪里?.md.html">09 使用线程池比手动创建线程好在哪里?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/10 线程池的各个参数的含义?.md.html">10 线程池的各个参数的含义?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/11 线程池有哪 4 种拒绝策略?.md.html">11 线程池有哪 4 种拒绝策略?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/12 有哪 6 种常见的线程池?什么是 Java8 的 ForkJoinPool.md.html">12 有哪 6 种常见的线程池?什么是 Java8 的 ForkJoinPool.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/13 线程池常用的阻塞队列有哪些?.md.html">13 线程池常用的阻塞队列有哪些?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/14 为什么不应该自动创建线程池?.md.html">14 为什么不应该自动创建线程池?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/15 合适的线程数量是多少CPU 核心数和线程数的关系?.md.html">15 合适的线程数量是多少CPU 核心数和线程数的关系?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/16 如何根据实际需要,定制自己的线程池?.md.html">16 如何根据实际需要,定制自己的线程池?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/17 如何正确关闭线程池shutdown 和 shutdownNow 的区别?.md.html">17 如何正确关闭线程池shutdown 和 shutdownNow 的区别?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/18 线程池实现“线程复用”的原理?.md.html">18 线程池实现“线程复用”的原理?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/19 你知道哪几种锁?分别有什么特点?.md.html">19 你知道哪几种锁?分别有什么特点?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/20 悲观锁和乐观锁的本质是什么?.md.html">20 悲观锁和乐观锁的本质是什么?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/21 如何看到 synchronized 背后的“monitor 锁”?.md.html">21 如何看到 synchronized 背后的“monitor 锁”?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/22 synchronized 和 Lock 孰优孰劣,如何选择?.md.html">22 synchronized 和 Lock 孰优孰劣,如何选择?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/23 Lock 有哪几个常用方法?分别有什么用?.md.html">23 Lock 有哪几个常用方法?分别有什么用?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/24 讲一讲公平锁和非公平锁,为什么要“非公平”?.md.html">24 讲一讲公平锁和非公平锁,为什么要“非公平”?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/25 读写锁 ReadWriteLock 获取锁有哪些规则?.md.html">25 读写锁 ReadWriteLock 获取锁有哪些规则?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/26 读锁应该插队吗?什么是读写锁的升降级?.md.html">26 读锁应该插队吗?什么是读写锁的升降级?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/27 什么是自旋锁?自旋的好处和后果是什么呢?.md.html">27 什么是自旋锁?自旋的好处和后果是什么呢?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/28 JVM 对锁进行了哪些优化?.md.html">28 JVM 对锁进行了哪些优化?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/29 HashMap 为什么是线程不安全的?.md.html">29 HashMap 为什么是线程不安全的?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/30 ConcurrentHashMap 在 Java7 和 8 有何不同?.md.html">30 ConcurrentHashMap 在 Java7 和 8 有何不同?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/31 为什么 Map 桶中超过 8 个才转为红黑树?.md.html">31 为什么 Map 桶中超过 8 个才转为红黑树?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/32 同样是线程安全ConcurrentHashMap 和 Hashtable 的区别.md.html">32 同样是线程安全ConcurrentHashMap 和 Hashtable 的区别.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/33 CopyOnWriteArrayList 有什么特点?.md.html">33 CopyOnWriteArrayList 有什么特点?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/34 什么是阻塞队列?.md.html">34 什么是阻塞队列?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/35 阻塞队列包含哪些常用的方法add、offer、put 等方法的区别?.md.html">35 阻塞队列包含哪些常用的方法add、offer、put 等方法的区别?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/36 有哪几种常见的阻塞队列?.md.html">36 有哪几种常见的阻塞队列?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/37 阻塞和非阻塞队列的并发安全原理是什么?.md.html">37 阻塞和非阻塞队列的并发安全原理是什么?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/38 如何选择适合自己的阻塞队列?.md.html">38 如何选择适合自己的阻塞队列?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/39 原子类是如何利用 CAS 保证线程安全的?.md.html">39 原子类是如何利用 CAS 保证线程安全的?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/40 AtomicInteger 在高并发下性能不好,如何解决?为什么?.md.html">40 AtomicInteger 在高并发下性能不好,如何解决?为什么?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/41 原子类和 volatile 有什么异同?.md.html">41 原子类和 volatile 有什么异同?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/42 AtomicInteger 和 synchronized 的异同点?.md.html">42 AtomicInteger 和 synchronized 的异同点?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/43 Java 8 中 Adder 和 Accumulator 有什么区别?.md.html">43 Java 8 中 Adder 和 Accumulator 有什么区别?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/44 ThreadLocal 适合用在哪些实际生产的场景中?.md.html">44 ThreadLocal 适合用在哪些实际生产的场景中?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/45 ThreadLocal 是用来解决共享资源的多线程访问的问题吗?.md.html">45 ThreadLocal 是用来解决共享资源的多线程访问的问题吗?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/46 多个 ThreadLocal 在 Thread 中的 threadlocals 里是怎么存储的?.md.html">46 多个 ThreadLocal 在 Thread 中的 threadlocals 里是怎么存储的?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/47 内存泄漏——为何每次用完 ThreadLocal 都要调用 remove().md.html">47 内存泄漏——为何每次用完 ThreadLocal 都要调用 remove().md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/48 Callable 和 Runnable 的不同?.md.html">48 Callable 和 Runnable 的不同?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/49 Future 的主要功能是什么?.md.html">49 Future 的主要功能是什么?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/50 使用 Future 有哪些注意点Future 产生新的线程了吗?.md.html">50 使用 Future 有哪些注意点Future 产生新的线程了吗?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/51 如何利用 CompletableFuture 实现“旅游平台”问题?.md.html">51 如何利用 CompletableFuture 实现“旅游平台”问题?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/52 信号量能被 FixedThreadPool 替代吗?.md.html">52 信号量能被 FixedThreadPool 替代吗?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/53 CountDownLatch 是如何安排线程执行顺序的?.md.html">53 CountDownLatch 是如何安排线程执行顺序的?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/54 CyclicBarrier 和 CountdownLatch 有什么异同?.md.html">54 CyclicBarrier 和 CountdownLatch 有什么异同?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/55 Condition、object.wait() 和 notify() 的关系?.md.html">55 Condition、object.wait() 和 notify() 的关系?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/56 讲一讲什么是 Java 内存模型?.md.html">56 讲一讲什么是 Java 内存模型?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/57 什么是指令重排序?为什么要重排序?.md.html">57 什么是指令重排序?为什么要重排序?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/58 Java 中的原子操作有哪些注意事项?.md.html">58 Java 中的原子操作有哪些注意事项?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/59 什么是“内存可见性”问题?.md.html">59 什么是“内存可见性”问题?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/60 主内存和工作内存的关系?.md.html">60 主内存和工作内存的关系?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/61 什么是 happens-before 规则?.md.html">61 什么是 happens-before 规则?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/62 volatile 的作用是什么?与 synchronized 有什么异同?.md.html">62 volatile 的作用是什么?与 synchronized 有什么异同?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/63 单例模式的双重检查锁模式为什么必须加 volatile.md.html">63 单例模式的双重检查锁模式为什么必须加 volatile.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/64 你知道什么是 CAS 吗?.md.html">64 你知道什么是 CAS 吗?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/65 CAS 和乐观锁的关系,什么时候会用到 CAS.md.html">65 CAS 和乐观锁的关系,什么时候会用到 CAS.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/66 CAS 有什么缺点?.md.html">66 CAS 有什么缺点?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/67 如何写一个必然死锁的例子?.md.html">67 如何写一个必然死锁的例子?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/68 发生死锁必须满足哪 4 个条件?.md.html">68 发生死锁必须满足哪 4 个条件?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/69 如何用命令行和代码定位死锁?.md.html">69 如何用命令行和代码定位死锁?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/70 有哪些解决死锁问题的策略?.md.html">70 有哪些解决死锁问题的策略?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/71 讲一讲经典的哲学家就餐问题.md.html">71 讲一讲经典的哲学家就餐问题.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/72 final 的三种用法是什么?.md.html">72 final 的三种用法是什么?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/73 为什么加了 final 却依然无法拥有“不变性”?.md.html">73 为什么加了 final 却依然无法拥有“不变性”?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/74 为什么 String 被设计为是不可变的?.md.html">74 为什么 String 被设计为是不可变的?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/75 为什么需要 AQSAQS 的作用和重要性是什么?.md.html">75 为什么需要 AQSAQS 的作用和重要性是什么?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/76 AQS 的内部原理是什么样的?.md.html">76 AQS 的内部原理是什么样的?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/77 AQS 在 CountDownLatch 等类中的应用原理是什么?.md.html">77 AQS 在 CountDownLatch 等类中的应用原理是什么?.md.html</a>
</li>
<li>
<a href="/专栏/Java 并发编程 78 讲-完/78 一份独家的 Java 并发工具图谱.md.html">78 一份独家的 Java 并发工具图谱.md.html</a>
</li>
</ul>
</div>
</div>
<div class="sidebar-toggle" onclick="sidebar_toggle()" onmouseover="add_inner()" onmouseleave="remove_inner()">
<div class="sidebar-toggle-inner"></div>
</div>
<script>
function add_inner() {
let inner = document.querySelector('.sidebar-toggle-inner')
inner.classList.add('show')
}
function remove_inner() {
let inner = document.querySelector('.sidebar-toggle-inner')
inner.classList.remove('show')
}
function sidebar_toggle() {
let sidebar_toggle = document.querySelector('.sidebar-toggle')
let sidebar = document.querySelector('.book-sidebar')
let content = document.querySelector('.off-canvas-content')
if (sidebar_toggle.classList.contains('extend')) { // show
sidebar_toggle.classList.remove('extend')
sidebar.classList.remove('hide')
content.classList.remove('extend')
} else { // hide
sidebar_toggle.classList.add('extend')
sidebar.classList.add('hide')
content.classList.add('extend')
}
}
function open_sidebar() {
let sidebar = document.querySelector('.book-sidebar')
let overlay = document.querySelector('.off-canvas-overlay')
sidebar.classList.add('show')
overlay.classList.add('show')
}
function hide_canvas() {
let sidebar = document.querySelector('.book-sidebar')
let overlay = document.querySelector('.off-canvas-overlay')
sidebar.classList.remove('show')
overlay.classList.remove('show')
}
</script>
<div class="off-canvas-content">
<div class="columns">
<div class="column col-12 col-lg-12">
<div class="book-navbar">
<!-- For Responsive Layout -->
<header class="navbar">
<section class="navbar-section">
<a onclick="open_sidebar()">
<i class="icon icon-menu"></i>
</a>
</section>
</header>
</div>
<div class="book-content" style="max-width: 960px; margin: 0 auto;
overflow-x: auto;
overflow-y: hidden;">
<div class="book-post">
<p id="tip" align="center"></p>
<div><h1>05 有哪几种实现生产者消费者模式的方法?</h1>
<p>本课时我们主要学习如何用 wait/notify/Condition/BlockingQueue 实现生产者消费者模式。</p>
<h3>生产者消费者模式</h3>
<p>我们先来看看什么是生产者消费者模式,生产者消费者模式是程序设计中非常常见的一种设计模式,被广泛运用在解耦、消息队列等场景。在现实世界中,我们把生产商品的一方称为生产者,把消费商品的一方称为消费者,有时生产者的生产速度特别快,但消费者的消费速度跟不上,俗称“产能过剩”,又或是多个生产者对应多个消费者时,大家可能会手忙脚乱。如何才能让大家更好地配合呢?这时在生产者和消费者之间就需要一个中介来进行调度,于是便诞生了生产者消费者模式。</p>
<p><img src="assets/CgotOV3OJ3iAGcaiAAFrcv5xk9U160.png" alt="img" /></p>
<p>使用生产者消费者模式通常需要在两者之间增加一个阻塞队列作为媒介,有了媒介之后就相当于有了一个缓冲,平衡了两者的能力,整体的设计如图所示,最上面是阻塞队列,右侧的 1 是生产者线程,生产者在生产数据后将数据存放在阻塞队列中,左侧的 2 是消费者线程,消费者获取阻塞队列中的数据。而中间的 3 和 4 分别代表生产者消费者之间互相通信的过程,因为无论阻塞队列是满还是空都可能会产生阻塞,阻塞之后就需要在合适的时机去唤醒被阻塞的线程。</p>
<p>那么什么时候阻塞线程需要被唤醒呢?有两种情况。第一种情况是当消费者看到阻塞队列为空时,开始进入等待,这时生产者一旦往队列中放入数据,就会通知所有的消费者,唤醒阻塞的消费者线程。另一种情况是如果生产者发现队列已经满了,也会被阻塞,而一旦消费者获取数据之后就相当于队列空了一个位置,这时消费者就会通知所有正在阻塞的生产者进行生产,这便是对生产者消费者模式的简单介绍。</p>
<h3>如何用 BlockingQueue 实现生产者消费者模式</h3>
<p>我们接下来看如何用 wait/notify/Condition/BlockingQueue 实现生产者消费者模式,先从最简单的 BlockingQueue 开始讲起:</p>
<pre><code class="language-java">public static void main(String[] args) {
BlockingQueue&lt;Object&gt; queue = new ArrayBlockingQueue&lt;&gt;(10);
Runnable producer = () -&gt; {
while (true) {
queue.put(new Object());
}
};
new Thread(producer).start();
new Thread(producer).start();
Runnable consumer = () -&gt; {
while (true) {
queue.take();
}
};
new Thread(consumer).start();
new Thread(consumer).start();
}
</code></pre>
<p>如代码所示,首先,创建了一个 ArrayBlockingQueue 类型的 BlockingQueue命名为 queue 并将它的容量设置为 10其次创建一个简单的生产者while(true) 循环体中的queue.put() 负责往队列添加数据然后创建两个生产者线程并启动同样消费者也非常简单while(true) 循环体中的 queue.take() 负责消费数据,同时创建两个消费者线程并启动。为了代码简洁并突出设计思想,代码里省略了 try/catch 检测,我们不纠结一些语法细节。以上便是利用 BlockingQueue 实现生产者消费者模式的代码。虽然代码非常简单,但实际上 ArrayBlockingQueue 已经在背后完成了很多工作,比如队列满了就去阻塞生产者线程,队列有空就去唤醒生产者线程等。</p>
<h3>如何用 Condition 实现生产者消费者模式</h3>
<p>BlockingQueue 实现生产者消费者模式看似简单,背后却暗藏玄机,我们在掌握这种方法的基础上仍需要掌握更复杂的实现方法。我们接下来看如何在掌握了 BlockingQueue 的基础上利用 Condition 实现生产者消费者模式,它们背后的实现原理非常相似,相当于我们自己实现一个简易版的 BlockingQueue</p>
<pre><code class="language-java">public class MyBlockingQueueForCondition {
private Queue queue;
private int max = 16;
private ReentrantLock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public MyBlockingQueueForCondition(int size) {
this.max = size;
queue = new LinkedList();
}
public void put(Object o) throws InterruptedException {
lock.lock();
try {
while (queue.size() == max) {
notFull.await();
}
queue.add(o);
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (queue.size() == 0) {
notEmpty.await();
}
Object item = queue.remove();
notFull.signalAll();
return item;
} finally {
lock.unlock();
}
}
}
</code></pre>
<p>如代码所示,首先,定义了一个队列变量 queue 并设置最大容量为 16其次定义了一个 ReentrantLock 类型的 Lock 锁,并在 Lock 锁的基础上创建两个 Condition一个是 notEmpty另一个是 notFull分别代表队列没有空和没有满的条件最后声明了 put 和 take 这两个核心方法。</p>
<p>因为生产者消费者模式通常是面对多线程的场景,需要一定的同步措施保障线程安全,所以在 put 方法中先将 Lock 锁上,然后,在 while 的条件里检测 queue 是不是已经满了,如果已经满了,则调用 notFull 的 await() 阻塞生产者线程并释放 Lock如果没有满则往队列放入数据并利用 notEmpty.signalAll() 通知正在等待的所有消费者并唤醒它们。最后在 finally 中利用 lock.unlock() 方法解锁,把 unlock 方法放在 finally 中是一个基本原则,否则可能会产生无法释放锁的情况。</p>
<p>下面再来看 take 方法take 方法实际上是与 put 方法相互对应的,同样是通过 while 检查队列是否为空,如果为空,消费者开始等待,如果不为空则从队列中获取数据并通知生产者队列有空余位置,最后在 finally 中解锁。</p>
<p>这里需要注意,我们在 take() 方法中使用 while( queue.size() == 0 ) 检查队列状态,而不能用 if( queue.size() == 0 )。为什么呢?大家思考这样一种情况,因为生产者消费者往往是多线程的,我们假设有两个消费者,第一个消费者线程获取数据时,发现队列为空,便进入等待状态;因为第一个线程在等待时会释放 Lock 锁,所以第二个消费者可以进入并执行 if( queue.size() == 0 ),也发现队列为空,于是第二个线程也进入等待;而此时,如果生产者生产了一个数据,便会唤醒两个消费者线程,而两个线程中只有一个线程可以拿到锁,并执行 queue.remove 操作,另外一个线程因为没有拿到锁而卡在被唤醒的地方,而第一个线程执行完操作后会在 finally 中通过 unlock 解锁,而此时第二个线程便可以拿到被第一个线程释放的锁,继续执行操作,也会去调用 queue.remove 操作,然而这个时候队列已经为空了,所以会抛出 NoSuchElementException 异常,这不符合我们的逻辑。而如果用 while 做检查,当第一个消费者被唤醒得到锁并移除数据之后,第二个线程在执行 remove 前仍会进行 while 检查,发现此时依然满足 queue.size() == 0 的条件,就会继续执行 await 方法,避免了获取的数据为 null 或抛出异常的情况。</p>
<h3>如何用 wait/notify 实现生产者消费者模式</h3>
<p>最后我们再来看看使用 wait/notify 实现生产者消费者模式的方法实际上实现原理和Condition 是非常类似的,它们是兄弟关系:</p>
<pre><code class="language-java">class MyBlockingQueue {
private int maxSize;
private LinkedList&lt;Object&gt; storage;
public MyBlockingQueue(int size) {
this.maxSize = size;
storage = new LinkedList&lt;&gt;();
}
public synchronized void put() throws InterruptedException {
while (storage.size() == maxSize) {
wait();
}
storage.add(new Object());
notifyAll();
}
public synchronized void take() throws InterruptedException {
while (storage.size() == 0) {
wait();
}
System.out.println(storage.remove());
notifyAll();
}
}
</code></pre>
<p>如代码所示,最主要的部分仍是 take 与 put 方法,我们先来看 put 方法put 方法被 synchronized 保护while 检查队列是否为满,如果不满就往里放入数据并通过 notifyAll() 唤醒其他线程。同样take 方法也被 synchronized 修饰while 检查队列是否为空,如果不为空就获取数据并唤醒其他线程。使用这个 MyBlockingQueue 实现的生产者消费者代码如下:</p>
<pre><code class="language-java">/**
* 描述: wait形式实现生产者消费者模式
*/
public class WaitStyle {
public static void main(String[] args) {
MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10);
Producer producer = new Producer(myBlockingQueue);
Consumer consumer = new Consumer(myBlockingQueue);
new Thread(producer).start();
new Thread(consumer).start();
}
}
class Producer implements Runnable {
private MyBlockingQueue storage;
public Producer(MyBlockingQueue storage) {
this.storage = storage;
}
@Override
public void run() {
for (int i = 0; i &lt; 100; i++) {
try {
storage.put();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
private MyBlockingQueue storage;
public Consumer(MyBlockingQueue storage) {
this.storage = storage;
}
@Override
public void run() {
for (int i = 0; i &lt; 100; i++) {
try {
storage.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
</code></pre>
<p>以上就是三种实现生产者消费者模式的讲解,其中,第一种 BlockingQueue 模式实现比较简单,但其背后的实现原理在第二种、第三种实现方法中得以体现,第二种、第三种实现方法本质上是我们自己实现了 BlockingQueue 的一些核心逻辑,供生产者与消费者使用。</p>
</div>
</div>
<div>
<div style="float: left">
<a href="/专栏/Java 并发编程 78 讲-完/04 waitnotifynotifyAll 方法的使用注意事项?.md.html">上一页</a>
</div>
<div style="float: right">
<a href="/专栏/Java 并发编程 78 讲-完/06 一共有哪 3 类线程安全问题?.md.html">下一页</a>
</div>
</div>
</div>
</div>
</div>
</div>
<a class="off-canvas-overlay" onclick="hide_canvas()"></a>
</div>
<script defer src="https://static.cloudflareinsights.com/beacon.min.js/v652eace1692a40cfa3763df669d7439c1639079717194" integrity="sha512-Gi7xpJR8tSkrpF7aordPZQlW2DLtzUlZcumS8dMQjwDHEnw9I7ZLyiOj/6tZStRBGtGgN6ceN6cMH8z7etPGlw==" data-cf-beacon='{"rayId":"70997084e9043d60","version":"2021.12.0","r":1,"token":"1f5d475227ce4f0089a7cff1ab17c0f5","si":100}' crossorigin="anonymous"></script>
</body>
<!-- Global site tag (gtag.js) - Google Analytics -->
<script async src="https://www.googletagmanager.com/gtag/js?id=G-NPSEEVD756"></script>
<script>
window.dataLayer = window.dataLayer || [];
function gtag() {
dataLayer.push(arguments);
}
gtag('js', new Date());
gtag('config', 'G-NPSEEVD756');
var path = window.location.pathname
var cookie = getCookie("lastPath");
console.log(path)
if (path.replace("/", "") === "") {
if (cookie.replace("/", "") !== "") {
console.log(cookie)
document.getElementById("tip").innerHTML = "<a href='" + cookie + "'>跳转到上次进度</a>"
}
} else {
setCookie("lastPath", path)
}
function setCookie(cname, cvalue) {
var d = new Date();
d.setTime(d.getTime() + (180 * 24 * 60 * 60 * 1000));
var expires = "expires=" + d.toGMTString();
document.cookie = cname + "=" + cvalue + "; " + expires + ";path = /";
}
function getCookie(cname) {
var name = cname + "=";
var ca = document.cookie.split(';');
for (var i = 0; i < ca.length; i++) {
var c = ca[i].trim();
if (c.indexOf(name) === 0) return c.substring(name.length, c.length);
}
return "";
}
</script>
</html>