尚硅谷JUC详细面试笔记
JMM
JMM是指Java内存模型 ,不是Java内存布局 ,不是所谓的栈、堆、方法区。 每个Java线程都有自己的工作内存 。操作数据,首先从主内存中读,得到一份拷贝,操作完毕后再写回到主内存。
JMM可能带来可见性 、原子性 和有序性 问题。所谓可见性,就是某个线程对主内存内容的更改,应该立刻通知到其它线程。原子性是指一个操作是不可分割的,不能执行到一半,就不执行了。所谓有序性,就是指令是有序的,不会被重排。
volatile关键字
码云源码 volatile
关键字是Java提供的一种轻量级 同步机制。它能够保证可见性 和有序性 ,但是不能保证原子性 。
可见性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 class MyData { int number=0 ; AtomicInteger atomicInteger=new AtomicInteger(); public void setTo60 () { this .number=60 ; } public void addPlusPlus () { number++; } public void addAtomic () { atomicInteger.getAndIncrement(); } } private static void volatileVisibilityDemo () { System.out.println("可见性测试" ); MyData myData=new MyData(); new Thread(()->{ System.out.println(Thread.currentThread().getName()+"\t come in" ); try {TimeUnit.SECONDS.sleep(3 );myData.setTo60(); System.out.println(Thread.currentThread().getName()+"\t update number value: " +myData.number);}catch (InterruptedException e){e.printStackTrace();} },"AAA" ).start(); while (myData.number==0 ){ } System.out.println(Thread.currentThread().getName()+"\t mission is over. main get number value: " +myData.number); }
MyData
类是资源类,一开始number变量没有用volatile修饰,所以程序运行的结果是:
1 2 3 可见性测试 AAA come in AAA update number value: 60
虽然一个线程把number修改成了60,但是main线程持有的仍然是最开始的0,所以一直循环,程序不会结束。
如果对number添加了volatile修饰,运行结果是:
1 2 3 AAA come in AAA update number value: 60 main mission is over. main get number value: 60
可见某个线程对number的修改,会立刻反映到主内存上。
原子性
volatile并不能保证操作的原子性 。这是因为,比如一条number++的操作,会形成3条指令。
1 2 3 4 getfield //读 iconst_1 //++常量1 iadd //加操作 putfield //写操作
假设有3个线程,分别执行number++,都先从主内存中拿到最开始的值,number=0,然后三个线程分别进行操作。假设线程0执行完毕,number=1,也立刻通知到了其它线程,但是此时线程1、2已经拿到了number=0,所以结果就是写覆盖,线程1、2将number变成1。
解决的方式就是:
对addPlusPlus()
方法加锁。
使用java.util.concurrent.AtomicInteger
类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private static void atomicDemo () { System.out.println("原子性测试" ); MyData myData=new MyData(); for (int i = 1 ; i <= 20 ; i++) { new Thread(()->{ for (int j = 0 ; j <1000 ; j++) { myData.addPlusPlus(); myData.addAtomic(); } },String.valueOf(i)).start(); } while (Thread.activeCount()>2 ){ Thread.yield(); } System.out.println(Thread.currentThread().getName()+"\t int type finally number value: " +myData.number); System.out.println(Thread.currentThread().getName()+"\t AtomicInteger type finally number value: " +myData.atomicInteger); }
结果:可见,由于volatile
不能保证原子性,出现了线程重复写的问题,最终结果比20000小。而AtomicInteger
可以保证原子性。
1 2 3 原子性测试 main int type finally number value: 17542 main AtomicInteger type finally number value: 20000
有序性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class ResortSeqDemo { int a=0 ; boolean flag=false ; public void method01 () { a=1 ; flag=true ; } public void method02 () { if (flag){ a+=5 ; System.out.println("*****retValue: " +a); } } }
volatile可以保证有序性 ,也就是防止指令重排序 。所谓指令重排序,就是出于优化考虑,CPU执行指令的顺序跟程序员自己编写的顺序不一致。就好比一份试卷,题号是老师规定的,是程序员规定的,但是考生(CPU)可以先做选择,也可以先做填空。
1 2 3 4 int x = 11 ; int y = 12 ; x = x + 5 ; y = x * x;
以上例子,可能出现的执行顺序有1234、2134、1342,这三个都没有问题,最终结果都是x = 16,y=256。但是如果是4开头,就有问题了,y=0。这个时候就不需要 指令重排序。
volatile底层是用CPU的内存屏障 (Memory Barrier)指令来实现的,有两个作用,一个是保证特定操作的顺序性,二是保证变量的可见性。在指令之间插入一条Memory Barrier指令,告诉编译器和CPU,在Memory Barrier指令之间的指令不能被重排序。
哪些地方用到过volatile?
单例模式的安全问题
常见的DCL(Double Check Lock)模式虽然加了同步,但是在多线程下依然会有线程安全问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class SingletonDemo { private static SingletonDemo singletonDemo=null ; private SingletonDemo () { System.out.println(Thread.currentThread().getName()+"\t 我是构造方法" ); } public static SingletonDemo getInstance () { if (singletonDemo==null ){ synchronized (SingletonDemo.class ) { if (singletonDemo==null ){ singletonDemo=new SingletonDemo(); } } } return singletonDemo; } public static void main (String[] args) { for (int i = 0 ; i < 10 ; i++) { new Thread(()->{ SingletonDemo.getInstance(); },String.valueOf(i+1 )).start(); } } }
这个漏洞比较tricky,很难捕捉,但是是存在的。instance=new SingletonDemo();
可以大致分为三步
1 2 3 memory = allocate(); instance(memory); instance = memory;
其中2、3没有数据依赖关系,可能发生重排 。如果发生,此时内存已经分配,那么instance=memory
不为null。如果此时线程挂起,instance(memory)
还未执行,对象还未初始化。由于instance!=null
,所以两次判断都跳过,最后返回的instance
没有任何内容,还没初始化。
解决的方法就是对singletondemo
对象添加上volatile
关键字,禁止指令重排。
CAS
码云源码 CAS是指Compare And Swap ,比较并交换 ,是一种很重要的同步思想。如果主内存的值跟期望值一样,那么就进行修改,否则一直重试,直到一致为止。
1 2 3 4 5 6 7 8 public class CASDemo { public static void main (String[] args) { AtomicInteger atomicInteger=new AtomicInteger(5 ); System.out.println(atomicInteger.compareAndSet(5 , 2019 )+"\t current data : " + atomicInteger.get()); System.out.println(atomicInteger.compareAndSet(5 , 1024 )+"\t current data : " + atomicInteger.get()); } }
第一次修改,期望值为5,主内存也为5,修改成功,为2019。第二次修改,期望值为5,主内存为2019,修改失败。
查看AtomicInteger.getAndIncrement()
方法,发现其没有加synchronized
也实现了同步 。这是为什么?
CAS底层原理
AtomicInteger
内部维护了volatile int value
和private static final Unsafe unsafe
两个比较重要的参数。
1 2 3 public final int getAndIncrement () { return unsafe.getAndAddInt(this ,valueOffset,1 ); }
AtomicInteger.getAndIncrement()
调用了Unsafe.getAndAddInt()
方法。Unsafe
类的大部分方法都是native
的,用来像C语言一样从底层操作内存。
1 2 3 4 5 6 7 public final int getAnddAddInt (Object var1,long var2,int var4) { int var5; do { var5 = this .getIntVolatile(var1, var2); } while (!this .compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
这个方法的var1和var2,就是根据对象 和偏移量 得到在主内存的快照值 var5。然后compareAndSwapInt
方法通过var1和var2得到当前主内存的实际值 。如果这个实际值 跟快照值 相等,那么就更新主内存的值为var5+var4。如果不等,那么就一直循环,一直获取快照,一直对比,直到实际值和快照值相等为止。
比如有A、B两个线程,一开始都从主内存中拷贝了原值为3,A线程执行到var5=this.getIntVolatile
,即var5=3。此时A线程挂起,B修改原值为4,B线程执行完毕,由于加了volatile,所以这个修改是立即可见的。A线程被唤醒,执行this.compareAndSwapInt()
方法,发现这个时候主内存的值不等于快照值3,所以继续循环,重新 从主内存获取。
CAS缺点
CAS实际上是一种自旋锁,
一直循环,开销比较大。
只能保证一个变量的原子操作,多个变量依然要加锁。
引出了ABA问题 。
ABA问题
码云源码 所谓ABA问题,就是比较并交换的循环,存在一个时间差 ,而这个时间差可能带来意想不到的问题。比如线程T1将值从A改为B,然后又从B改为A。线程T2看到的就是A,但是却不知道这个A发生了更改 。尽管线程T2 CAS操作成功,但不代表就没有问题。 有的需求,比如CAS,只注重头和尾 ,只要首尾一致就接受。但是有的需求,还看重过程,中间不能发生任何修改,这就引出了AtomicReference
原子引用。
AtomicReference
AtomicInteger
对整数进行原子操作,如果是一个POJO呢?可以用AtomicReference
来包装这个POJO,使其操作原子化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class User { String name; int age; User(String name, int age) { this .name = name; this .age = age; } } public class AtomReferenceDemo { public static void main (String[] args) { User user1 = new User("Jack" , 25 ); User user2 = new User("Lucy" , 21 ); AtomicReference<User> atomicReference = new AtomicReference<>(); atomicReference.set(user1); System.out.println(atomicReference.compareAndSet(user1, user2)); System.out.println(atomicReference.compareAndSet(user1, user2)); } }
AtomicStampedReference和ABA问题的解决
使用AtomicStampedReference
类可以解决ABA问题。这个类维护了一个“版本号 ”Stamp,在进行CAS操作的时候,不仅要比较当前值,还要比较版本号 。只有两者都相等,才执行更新操作。
1 AtomicStampedReference.compareAndSet(expectedReference,newReference,oldStamp,newStamp);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 public class ABAProblem { static AtomicReference<Integer> atomicReference = new AtomicReference<>(100 ); static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100 , 1 ); public static void main (String[] args) { System.out.println("======ABA问题的产生======" ); new Thread(() -> { atomicReference.compareAndSet(100 , 101 ); atomicReference.compareAndSet(101 , 100 ); }, "t1" ).start(); new Thread(() -> { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(atomicReference.compareAndSet(100 , 2019 ) + "\t" + atomicReference.get().toString()); }, "t2" ).start(); try { TimeUnit.SECONDS.sleep(2 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("======ABA问题的解决======" ); new Thread(() -> { int stamp = atomicStampedReference.getStamp(); System.out.println(Thread.currentThread().getName() + "\t第一次版本号: " + stamp); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } atomicStampedReference.compareAndSet(100 , 101 , atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1 ); System.out.println(Thread.currentThread().getName() + "\t第二次版本号: " + atomicStampedReference.getStamp()); atomicStampedReference.compareAndSet(101 , 100 , atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1 ); System.out.println(Thread.currentThread().getName() + "\t第三次版本号: " + atomicStampedReference.getStamp()); }, "t3" ).start(); new Thread(() -> { int stamp = atomicStampedReference.getStamp(); System.out.println(Thread.currentThread().getName() + "\t第一次版本号: " + stamp); try { TimeUnit.SECONDS.sleep(3 ); } catch (InterruptedException e) { e.printStackTrace(); } boolean result = atomicStampedReference.compareAndSet(100 , 2019 , stamp, stamp + 1 ); System.out.println(Thread.currentThread().getName() + "\t修改成功与否:" + result + " 当前最新版本号" + atomicStampedReference.getStamp()); System.out.println(Thread.currentThread().getName() + "\t当前实际值:" + atomicStampedReference.getReference()); }, "t4" ).start(); } }
集合类不安全问题
码云源码
List
ArrayList
不是线程安全类,在多线程同时写的情况下,会抛出java.util.ConcurrentModificationException
异常。
1 2 3 4 5 6 7 8 9 private static void listNotSafe () { List<String> list=new ArrayList<>(); for (int i = 1 ; i <= 30 ; i++) { new Thread(() -> { list.add(UUID.randomUUID().toString().substring(0 , 8 )); System.out.println(Thread.currentThread().getName() + "\t" + list); }, String.valueOf(i)).start(); } }
解决方法 :
使用Vector
(ArrayList
所有方法加synchronized
,太重)。
使用Collections.synchronizedList()
转换成线程安全类。
使用java.concurrent.CopyOnWriteArrayList
(推荐)。
CopyOnWriteArrayList
这是JUC的类,通过写时复制 来实现读写分离 。比如其add()
方法,就是先复制 一个新数组,长度为原数组长度+1,然后将新数组最后一个元素设为添加的元素。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public boolean add (E e) { final ReentrantLock lock = this .lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1 ); newElements[len] = e; setArray(newElements); return true ; } finally { lock.unlock(); } }
Set
跟List类似,HashSet
和TreeSet
都不是线程安全的,与之对应的有CopyOnWriteSet
这个线程安全类。这个类底层维护了一个CopyOnWriteArrayList
数组。
1 2 3 4 private final CopyOnWriteArrayList<E> al;public CopyOnWriteArraySet () { al = new CopyOnWriteArrayList<E>(); }
HashSet和HashMap
HashSet
底层是用HashMap
实现的。既然是用HashMap
实现的,那HashMap.put()
需要传两个参数 ,而HashSet.add()
只传一个参数 ,这是为什么?实际上HashSet.add()
就是调用的HashMap.put()
,只不过Value 被写死了,是一个private static final Object
对象。
Map
HashMap
不是线程安全的,Hashtable
是线程安全的,但是跟Vector
类似,太重量级。所以也有类似CopyOnWriteMap,只不过叫ConcurrentHashMap
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 package concurrent.notSafeContainer;import java.util.ArrayList;import java.util.List;import java.util.Map;import java.util.UUID;import java.util.concurrent.ConcurrentHashMap;public class ContainerThreadConcurrentException { public static void main (String[] args) throws InterruptedException { listNotSafe(); mapNotSafe(); } private static void mapNotSafe () { Map<String, String> map = new ConcurrentHashMap<>(); for (int i = 0 ; i < 100 ; i++) { new Thread(() -> { map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0 , 8 )); System.out.println(map); }, String.valueOf(i)).start(); } } private static void listNotSafe () { List<String> list = new ArrayList<>(); for (int i = 0 ; i < 300 ; i++) { new Thread(() -> { list.add(UUID.randomUUID().toString().substring(0 , 8 )); for (String x : list) { System.out.print(x + " " ); } System.out.println(); }, String.valueOf(i)).start(); } } }
Java锁
码云源码
公平锁/非公平锁
概念 :所谓公平锁 ,就是多个线程按照申请锁的顺序 来获取锁,类似排队,先到先得。而非公平锁 ,则是多个线程抢夺锁,会导致优先级反转 或饥饿现象 。
区别 :公平锁在获取锁时先查看此锁维护的等待队列 ,为空 或者当前线程是等待队列的队首 ,则直接占有锁,否则插入到等待队列,FIFO原则。非公平锁比较粗鲁,上来直接先尝试占有锁 ,失败则采用公平锁方式。非公平锁的优点是吞吐量 比公平锁更大。
synchronized
和juc.ReentrantLock
默认都是非公平锁 。ReentrantLock
在构造的时候传入true
则是公平锁 。
可重入锁/递归锁
可重入锁又叫递归锁,指的同一个线程在外层方法 获得锁时,进入内层方法 会自动获取锁。也就是说,线程可以进入任何一个它已经拥有锁的代码块。比如get
方法里面有set
方法,两个方法都有同一把锁,得到了get
的锁,就自动得到了set
的锁。
就像有了家门的锁,厕所、书房、厨房就为你敞开了一样。可重入锁可以避免死锁 的问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 package concurrent.reentrantlock;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class ReentrantLockDemo { public static void main (String[] args) { Phone phone=new Phone(); syncTest(phone); System.out.println(); Thread t3=new Thread(phone); Thread t4=new Thread(phone); t3.start(); t4.start(); } private static void syncTest (Phone phone) { new Thread(()->{ try { phone.sendSMS(); }catch (Exception e){ e.printStackTrace(); } },"t1" ).start(); new Thread(()->{ try { phone.sendSMS(); }catch (Exception e){ e.printStackTrace(); } },"t2" ).start(); } } class Phone implements Runnable { public synchronized void sendSMS () { System.out.println(Thread.currentThread().getId()+"\t" +"sendSMS()" ); sendEmail(); } public synchronized void sendEmail () { System.out.println(Thread.currentThread().getId()+"\t" +"sendEmail()" ); } Lock lock=new ReentrantLock(); @Override public void run () { get(); } public void get () { lock.lock(); try { System.out.println(Thread.currentThread().getId()+"\t" +"get()" ); set(); }finally { lock.unlock(); } } public void set () { lock.lock(); try { System.out.println(Thread.currentThread().getId()+"\t" +"set()" ); }finally { lock.unlock(); } } }
锁的配对
锁之间要配对,加了几把锁,最后就得解开几把锁,下面的代码编译和运行都没有任何问题。但锁的数量不匹配会导致死循环。
1 2 3 4 5 6 7 lock.lock(); lock.lock(); try { someAction(); }finally { lock.unlock(); }
自旋锁
所谓自旋锁,就是尝试获取锁的线程不会立即阻塞 ,而是采用循环的方式去尝试获取 。自己在那儿一直循环获取,就像“自旋 ”一样。这样的好处是减少线程切换的上下文开销 ,缺点是会消耗CPU 。CAS底层的getAndAddInt
就是自旋锁 思想。
1 2 while (!atomicReference.compareAndSet(null , thread)) { }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 package concurrent.reentrantlock;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicReference;public class SpinLockDemo { AtomicReference<Thread> atomicReference = new AtomicReference<>(); public static void main (String[] args) { SpinLockDemo spinLockDemo = new SpinLockDemo(); new Thread(() -> { spinLockDemo.myLock(); try { TimeUnit.SECONDS.sleep(5 ); } catch (Exception e) { e.printStackTrace(); } spinLockDemo.myUnlock(); }, "AA" ).start(); try { TimeUnit.SECONDS.sleep(1 ); } catch (Exception e) { e.printStackTrace(); } new Thread(() -> { spinLockDemo.myLock(); try { TimeUnit.SECONDS.sleep(1 ); } catch (Exception e) { e.printStackTrace(); } spinLockDemo.myUnlock(); }, "BB" ).start(); } public void myLock () { Thread thread = Thread.currentThread(); System.out.println(Thread.currentThread().getName() + "\t" + " come in ..." ); while (!atomicReference.compareAndSet(null , thread)) { } } public void myUnlock () { Thread thread = Thread.currentThread(); atomicReference.compareAndSet(thread, null ); System.out.println(Thread.currentThread().getName() + "\t" + " unlock ..." ); } }
读写锁/独占/共享锁
读锁 是共享的 ,写锁 是独占的 。juc.ReentrantLock
和synchronized
都是独占锁 ,独占锁就是一个锁 只能被一个线程 所持有。有的时候,需要读写分离 ,那么就要引入读写锁,即juc.ReentrantReadWriteLock
。
比如缓存,就需要读写锁来控制。缓存就是一个键值对,以下Demo模拟了缓存的读写操作,读的get
方法使用了ReentrantReadWriteLock.ReadLock()
,写的put
方法使用了ReentrantReadWriteLock.WriteLock()
。这样避免了写被打断,实现了多个线程同时读。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 package concurrent.sortsOfLock;import java.util.HashMap;import java.util.Map;import java.util.concurrent.locks.ReentrantReadWriteLock;class MyCache { Map<String,Object> map = new HashMap<>(); ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); public void put (String key,Object value) { rwLock.writeLock().lock(); System.out.println(Thread.currentThread().getName()+"\t进行写入操作\t" +key); map.put(key,value); System.out.println(Thread.currentThread().getName()+"\t写入完成\t" +key); rwLock.writeLock().unlock(); } public void get (String key) { rwLock.readLock().lock(); System.out.println(Thread.currentThread().getName()+"\t进行读取操作\t" +key); map.get(key); System.out.println(Thread.currentThread().getName()+"\t读取完成\t" +key+"\t" ); rwLock.readLock().unlock(); } } public class ReadWriteLockDemo { public static void main (String[] args) throws InterruptedException { MyCache myCache = new MyCache(); for (int i=0 ;i<5 ;i++){ final int tmp = i; new Thread(()->{ myCache.put(tmp+"" , tmp+"" ); },String.valueOf(i)).start(); } Thread.sleep(3000 ); for (int i=0 ;i<5 ;i++){ final int tmp = i; new Thread(()->{ myCache.get(tmp+"" ); },String.valueOf(i)).start(); } } }
Synchronized和Lock的区别
synchronized
关键字和java.util.concurrent.locks.Lock
都能加锁,两者有什么区别呢?
原始构成 :sync
是JVM层面的,底层通过monitorenter
和monitorexit
来实现的。Lock
是JDK API层面的。(sync
一个enter会有两个exit,一个是正常退出,一个是异常退出)
使用方法 :sync
不需要手动释放锁,而Lock
需要手动释放。
是否可中断 :sync
不可中断,除非抛出异常或者正常运行完成。Lock
是可中断的,通过调用interrupt()
方法。
是否为公平锁 :sync
只能是非公平锁,而Lock
既能是公平锁,又能是非公平锁。
绑定多个条件 :sync
不能,只能随机唤醒。而Lock
可以通过Condition
来绑定多个条件,精确唤醒。
CountDownLatch/CyclicBarrier/Semaphore
码云源码
CountDownLatch
CountDownLatch
内部维护了一个计数器 ,只有当计数器==0 时,某些线程才会停止阻塞,开始执行。
CountDownLatch
主要有两个方法,countDown()
来让计数器-1,await()
来让线程阻塞。当count==0
时,阻塞线程自动唤醒。
案例一班长关门 :main线程是班长,6个线程是学生。只有6个线程运行完毕,都离开教室后,main线程班长才会关教室门。
案例二秦灭六国 :只有6国都被灭亡后(执行完毕),main线程才会显示“秦国一统天下”。
枚举类的使用
在案例二 中会使用到枚举类,因为灭六国,循环6次,想根据i
的值来确定输出什么国,比如1代表楚国,2代表赵国。如果用判断则十分繁杂,而枚举类可以简化操作。
枚举类就像一个简化的数据库 ,枚举类名就像数据库名,枚举的项目就像数据表,枚举的属性就像表的字段。
关于CountDownLatch
和枚举类的使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package concurrent.countDownLatch;import java.util.concurrent.CountDownLatch;public class CountDownLatchDemo { public static void main (String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(6 ); for (int i=1 ;i<=6 ;i++){ new Thread(()->{ System.out.println(Thread.currentThread().getName()+"\t国被灭" ); countDownLatch.countDown(); },CountryEnum.foreach_CountryEnum(i).getRetValue()).start(); } countDownLatch.await(); System.out.println(Thread.currentThread().getName()+"\t秦国统一" ); } }
枚举类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package concurrent.countDownLatch;public enum CountryEnum { ONE(1 , "齐" ), TWO(2 , "楚" ), THREE(3 , "燕" ), FOUR(4 , "赵" ), FIVE(5 , "魏" ), SIX(6 , "韩" ); Integer retCode; String retValue; public Integer getRetCode () { return retCode; } public void setRetCode (Integer retCode) { this .retCode = retCode; } public String getRetValue () { return retValue; } public void setRetValue (String retValue) { this .retValue = retValue; } CountryEnum(Integer retCode, String retValue) { this .retCode = retCode; this .retValue = retValue; } public static CountryEnum foreach_CountryEnum (int index) { CountryEnum[] countryEnums = CountryEnum.values(); for (CountryEnum element : countryEnums) { if (element.retCode == index) { return element; } } return null ; } }
CyclicBarrier
CountDownLatch
是减,而CyclicBarrier
是加,理解了CountDownLatch
,CyclicBarrier
就很容易。比如召集7颗龙珠才能召唤神龙,详见
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package concurrent.cyclicBarrier;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class CyclicBarrierDemo { public static void main (String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(7 , () -> { System.out.println("----龙珠集齐,召唤神龙" ); }); for (int i = 0 ; i < 7 ; i++) { final int tmp = i; new Thread(() -> { System.out.println(Thread.currentThread().getName() + "\t收集到第" + tmp + "颗龙珠" ); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } },String.valueOf(i)).start(); } } }
Semaphore
CountDownLatch
的问题是不能复用 。比如count=3
,那么加到3,就不能继续操作了。而Semaphore
可以解决这个问题,比如6辆车3个停车位,对于CountDownLatch
只能停3辆车 ,而Semaphore
可以停6辆车,车位空出来后,其它车可以占有,这就涉及到了Semaphore.accquire()
和Semaphore.release()
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package concurrent.semaphore;import java.util.concurrent.Semaphore;import java.util.concurrent.TimeUnit;public class SemaphoreDemo { public static void main (String[] args) { Semaphore semaphore = new Semaphore(3 ); for (int i = 0 ; i < 6 ; i++) { new Thread(() -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "\t抢到车位" ); TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"\t1秒后离开车位" ); semaphore.release(); },String.valueOf(i)).start(); } } }
阻塞队列
码云源码 概念 :当阻塞队列为空时,获取(take)操作是阻塞的;当阻塞队列为满时,添加(put)操作是阻塞的。
好处 :阻塞队列不用手动控制什么时候该被阻塞,什么时候该被唤醒,简化了操作。
体系 :Collection
→Queue
→BlockingQueue
→七个阻塞队列实现类。
类名
作用
ArrayBlockingQueue
由数组 构成的有界 阻塞队列
LinkedBlockingQueue
由链表 构成的有界 阻塞队列
PriorityBlockingQueue
支持优先级排序的无界阻塞队列
DelayQueue
支持优先级的延迟无界阻塞队列
SynchronousQueue
单个元素的阻塞队列
LinkedTransferQueue
由链表构成的无界阻塞队列
LinkedBlockingDeque
由链表构成的双向阻塞队列
粗体标记的三个用得比较多,许多消息中间件底层就是用它们实现的。
需要注意的是LinkedBlockingQueue
虽然是有界的,但有个巨坑,其默认大小是Integer.MAX_VALUE
,高达21亿,一般情况下内存早爆了(在线程池的ThreadPoolExecutor
有体现)。
API :抛出异常是指当队列满时,再次插入会抛出异常;返回布尔是指当队列满时,再次插入会返回false;阻塞是指当队列满时,再次插入会被阻塞,直到队列取出一个元素,才能插入。超时是指当一个时限过后,才会插入或者取出。API使用见
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 package concurrent.blockingqueue;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;public class BlockingQueueApi { public static void main (String[] args) throws InterruptedException { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3 ); addAndRemove(blockingQueue); offerAndPoll(blockingQueue); putAndTake(blockingQueue); outOfTime(blockingQueue); } private static void outOfTime (BlockingQueue<String> blockingQueue) throws InterruptedException { System.out.println(blockingQueue.offer("a" ,2L , TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("a" ,2L , TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("a" ,2L , TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("a" ,2L , TimeUnit.SECONDS)); } private static void putAndTake (BlockingQueue<String> blockingQueue) throws InterruptedException { blockingQueue.put("a" ); blockingQueue.put("b" ); blockingQueue.put("c" ); blockingQueue.put("d" ); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); } private static void offerAndPoll (BlockingQueue<String> blockingQueue) { System.out.println(blockingQueue.offer("a" )); System.out.println(blockingQueue.offer("b" )); System.out.println(blockingQueue.offer("c" )); System.out.println(blockingQueue.offer("e" )); System.out.println(blockingQueue.peek()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); } private static void addAndRemove (BlockingQueue<String> blockingQueue) { System.out.println(blockingQueue.add("a" )); System.out.println(blockingQueue.add("b" )); System.out.println(blockingQueue.add("c" )); System.out.println(blockingQueue.add("e" )); System.out.println(blockingQueue.element()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); } }
方法类型
抛出异常
返回布尔
阻塞
超时
插入
add(E e)
offer(E e)
put(E e)
offer(E e,Time,TimeUnit)
取出
remove()
poll()
take()
poll(Time,TimeUnit)
队首
element()
peek()
无
无
SynchronousQueue
队列只有一个元素,如果想插入多个,必须等队列元素取出后,才能插入,只能有一个“坑位”,用一个插一个,详见SynchronousQueueDemo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 package concurrent.blockingqueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.SynchronousQueue;import java.util.concurrent.TimeUnit;public class SynchronousQueueDemo { public static void main (String[] args) { BlockingQueue<String> blockingQueue = new SynchronousQueue<String>(); new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + "\t put 1" ); blockingQueue.put("1" ); System.out.println(Thread.currentThread().getName() + "\t put 2" ); blockingQueue.put("2" ); System.out.println(Thread.currentThread().getName() + "\t put 3" ); blockingQueue.put("3" ); } catch (InterruptedException e) { e.printStackTrace(); } }, "AAA" ).start(); new Thread(() -> { try { try { TimeUnit.SECONDS.sleep(5 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "\t take " + blockingQueue.take()); try { TimeUnit.SECONDS.sleep(5 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "\t take " + blockingQueue.take()); try { TimeUnit.SECONDS.sleep(5 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "\t take" + blockingQueue.take()); } catch (Exception e) { e.printStackTrace(); } }, "BBB" ).start(); } }
Callable接口
与Runnable的区别 :
Callable带返回值。
会抛出异常。
覆写call()
方法,而不是run()
方法。
Callable接口的使用 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class CallableDemo { class MyThread implements Callable <Integer > { @Override public Integer call () throws Exception { System.out.println("callable come in ..." ); return 1024 ; } } public static void main (String[] args) throws ExecutionException, InterruptedException { FutureTask<Integer> futureTask = new FutureTask<>(new MyThread()); new Thread(futureTask, "AA" ).start(); int result01 = 100 ; int result02 = futureTask.get(); System.out.println("result=" + (result01 + result02)); } }
阻塞队列的应用——生产者消费者
码云源码
传统模式
传统模式使用Lock
来进行操作,需要手动加锁、解锁。详见ProdConsTradiDemo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 package concurrent.blockingqueue;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class ProdConsTradiDemo { public static void main (String[] args) { ShareData shareData = new ShareData(); new Thread(() -> { for (int i = 0 ; i < 5 ; i++) { try { shareData.increment(); } catch (Exception e) { e.printStackTrace(); } } }, "Producer" ).start(); new Thread(() -> { for (int i = 0 ; i < 5 ; i++) { try { shareData.decrement(); } catch (Exception e) { e.printStackTrace(); } } }, "Consumer" ).start(); } } class ShareData { private int number = 0 ; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void increment () throws InterruptedException { lock.lock(); try { while (number != 0 ) { condition.await(); } number++; System.out.println(Thread.currentThread().getName() + "\t" + number); condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void decrement () throws InterruptedException { lock.lock(); try { while (number == 0 ) { condition.await(); } number--; System.out.println(Thread.currentThread().getName() + "\t" + number); condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
阻塞队列模式
使用阻塞队列就不需要手动加锁了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 package concurrent.blockingqueue;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;class MyResource { private volatile boolean FLAG = true ; private AtomicInteger atomicInteger = new AtomicInteger(); BlockingQueue<String> blockingQueue = null ; public MyResource (BlockingQueue<String> blockingQueue) { this .blockingQueue = blockingQueue; System.out.println(blockingQueue.getClass().getName()); } public void myProd () throws Exception { String data = null ; boolean retValue; while (FLAG) { data = atomicInteger.incrementAndGet() + "" ; retValue = blockingQueue.offer(data, 2L , TimeUnit.SECONDS); if (retValue) { System.out.println(Thread.currentThread().getName() + "\t 插入队列:" + data + "成功" ); } else { System.out.println(Thread.currentThread().getName() + "\t 插入队列:" + data + "失败" ); } try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + "\t 停止生产,表示FLAG=false,生产介绍" ); } public void myConsumer () throws Exception { String retValue; while (FLAG) { retValue = blockingQueue.poll(2L , TimeUnit.SECONDS); if (retValue != null && retValue != "" ) { System.out.println(Thread.currentThread().getName() + "\t 消费队列:" + retValue + "成功" ); } else { FLAG = false ; System.out.println(Thread.currentThread().getName() + "\t 消费失败,队列中已为空,退出" ); return ; } } } public void stop () { this .FLAG = false ; } } public class ProdConsumerBlockingQueueDemo { public static void main (String[] args) { MyResource myResource = new MyResource(new ArrayBlockingQueue<String>(10 )); new Thread(() -> { System.out.println(Thread.currentThread().getName() + "\t 生产线程启动" ); System.out.println("" ); System.out.println("" ); try { myResource.myProd(); System.out.println("" ); System.out.println("" ); } catch (Exception e) { e.printStackTrace(); } }, "prod" ).start(); new Thread(() -> { System.out.println(Thread.currentThread().getName() + "\t 消费线程启动" ); try { myResource.myConsumer(); } catch (Exception e) { e.printStackTrace(); } }, "consumer" ).start(); try { TimeUnit.SECONDS.sleep(5 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("5秒中后,生产和消费线程停止,线程结束" ); myResource.stop(); } }
阻塞队列的应用——线程池
码云源码
线程池基本概念
概念 :线程池主要是控制运行线程的数量,将待处理任务放到等待队列,然后创建线程执行这些任务。如果超过了最大线程数,则等待。
优点 :
线程复用:不用一直new新线程,重复利用已经创建的线程来降低线程的创建和销毁开销,节省系统资源。
提高响应速度:当任务达到时,不用创建新的线程,直接利用线程池的线程。
管理线程:可以控制最大并发数,控制线程的创建等。
体系 :Executor
→ExecutorService
→AbstractExecutorService
→ThreadPoolExecutor
。ThreadPoolExecutor
是线程池创建的核心类。类似Arrays
、Collections
工具类,Executor
也有自己的工具类Executors
。
线程池三种常用创建方式
newFixedThreadPool :使用LinkedBlockingQueue
实现,定长线程池。
1 2 3 4 5 public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
newSingleThreadExecutor :使用LinkedBlockingQueue
实现,一池只有一个线程。
1 2 3 4 5 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
newCachedThreadPool :使用SynchronousQueue
实现,变长线程池。
1 2 3 4 5 public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor(0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
线程池创建的七个参数
参数
意义
corePoolSize
线程池常驻核心线程数
maximumPoolSize
能够容纳的最大线程数
keepAliveTime
空闲线程存活时间
unit
存活时间单位
workQueue
存放提交但未执行任务的队列
threadFactory
创建线程的工厂类
handler
等待队列满后的拒绝策略
理解 :线程池的创建参数,就像一个银行 。
corePoolSize
就像银行的“当值窗口 “,比如今天有2位柜员 在受理客户请求 (任务)。如果超过2个客户,那么新的客户就会在等候区 (等待队列workQueue
)等待。当等候区 也满了,这个时候就要开启“加班窗口 ”,让其它3位柜员来加班,此时达到最大窗口 maximumPoolSize
,为5个。如果开启了所有窗口,等候区依然满员,此时就应该启动”拒绝策略 “handler
,告诉不断涌入的客户,叫他们不要进入,已经爆满了。由于不再涌入新客户,办完事的客户增多,窗口开始空闲,这个时候就通过keepAlivetTime
将多余的3个”加班窗口“取消,恢复到2个”当值窗口“。
线程池底层原理
原理图 :上面银行的例子,实际上就是线程池的工作原理。
流程图 :
新任务到达→
如果正在运行的线程数小于corePoolSize
,创建核心线程;大于等于corePoolSize
,放入等待队列。
如果等待队列已满,但正在运行的线程数小于maximumPoolSize
,创建非核心线程;大于等于maximumPoolSize
,启动拒绝策略。
当一个线程无事可做一段时间keepAliveTime
后,如果正在运行的线程数大于corePoolSize
,则关闭非核心线程。
线程池的拒绝策略
当等待队列满时,且达到最大线程数,再有新任务到来,就需要启动拒绝策略。JDK提供了四种拒绝策略,分别是。
AbortPolicy :默认的策略,直接抛出RejectedExecutionException
异常,阻止系统正常运行。
CallerRunsPolicy :既不会抛出异常,也不会终止任务,而是将任务返回给调用者。
DiscardOldestPolicy :抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交任务。
DiscardPolicy :直接丢弃任务,不做任何处理。
实际生产使用哪一个线程池?
单一、可变、定长都不用 !原因就是FixedThreadPool
和SingleThreadExecutor
底层都是用LinkedBlockingQueue
实现的,这个队列最大长度为Integer.MAX_VALUE
,显然会导致OOM。所以实际生产一般自己通过ThreadPoolExecutor
的7个参数,自定义线程池。
1 2 3 4 5 ExecutorService threadPool=new ThreadPoolExecutor(2 ,5 , 1L ,TimeUnit.SECONDS, new LinkedBlockingQueue<>(3 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
自定义线程池参数选择
对于CPU密集型任务,最大线程数是CPU线程数+1。对于IO密集型任务,尽量多配点,可以是CPU线程数*2,或者CPU线程数/(1-阻塞系数)。
死锁编码和定位
主要是两个命令配合起来使用,定位死锁。
jps 指令:jps -l
可以查看运行的Java进程。
1 2 9688 thread.DeadLockDemo12177 sun.tools.jps.Jps
jstack 指令:jstack pid
可以查看某个Java进程的堆栈信息,同时分析出死锁。
1 2 3 4 5 6 7 8 9 10 11 12 ===================== "Thread AAA" : at xxxxx - waiting to lock <0x000111 > - locked <0x000222 > at java.lang.Thread.run "Thread BBB" : at xxxxx - waiting to lock <0x000222 > - locked <0x000111 > at java.lang.Thread.run Found 1 deadlock.
码云源码
2020年11月7日更