Java基础
List
集合中线程安全的方案:
- Collections.synchronizedList(arrayList);
- Vector
- CopyOnWriteArrayList
Collections.synchronizedList
Collections.synchronizedList 使用了装饰者模式,将原本的List包装了一层,在调用List原本的方法时,通过多态进行加锁操作,这里我们注意:读写操作加锁,获取迭代器不加锁
public boolean add(E e) {
synchronized (mutex) {return c.add(e);}
}
public E get(int index) {
synchronized (mutex) {return list.get(index);}
}
public Iterator<E> iterator() {
return list.iterator(); // Must be manually synched by user!
}
public ListIterator<E> listIterator() {
return list.listIterator(); // Must be manually synched by user
}
public ListIterator<E> listIterator(int index) {
return list.listIterator(index); // Must be manually synched by user
}
为什么在获取迭代器时不加锁,并且注释了
Must be manually synched by user
,也就是由使用者来保证线程安全
这里我们用代码示例一下:
public void test() {
List<String> list = Collections.synchronizedList(new ArrayList<>());
list.add("A");
list.add("B");
list.add("C");
// 创建两个线程,一个遍历列表,一个修改列表
Thread iteratorThread = new Thread(() -> {
Iterator<String> iter = list.iterator();
while (iter.hasNext()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(iter.next()); // 迭代过程中可能会抛出 ConcurrentModificationException
}
});
Thread updaterThread = new Thread(() -> {
try {
System.out.println(list.size());
Thread.sleep(50); // 确保迭代线程先启动
} catch (InterruptedException e) {
e.printStackTrace();
}
list.remove("B"); // 在迭代进行时删除元素
System.out.println(list.size());
});
iteratorThread.start();
updaterThread.start();
try {
iteratorThread.join();
updaterThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
两个线程对同一个集合进行访问时,如果一个线程中对该集合进行删除或添加操作,另一个线程在通过迭代器访问时,就会抛出异常。所以就需要使用者自己保证迭代器的获取与集合的操作在一个线程中进行。
Vector
代码示例:
public synchronized boolean add(E e) {
modCount++;
ensureCapacityHelper(elementCount + 1);
elementData[elementCount++] = e;
return true;
}
public synchronized Iterator<E> iterator() {
return new Itr();
}
可以看到Vector底层也是类似的通过加锁操作,来保证线程安全问题。但如果我们用上面的测试代码替换为Vector,发现也会报错。
在 Java 集合框架中,迭代器的并发修改检测通常依赖于集合内部的一个计数器(modCount)。当集合结构发生变化时,这个计数器就会增加。如果迭代器发现 modCount 与创建迭代器时的值不同,则会抛出 ConcurrentModificationException
。
CopyOnWriteArrayList
也是一个线程安全版的ArrayList,适用于读多写少的场景,其中所有可变操作(add、set等等)都是通过对底层数组进行一次新的复制来实现的。所以频繁的写是消耗性能的。
代码示例:
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
可以看到这里使用了ReentrantLock
来进行加锁操作,并且每次赋值时,都赋值了一个新的数组。
当我们再通过CopyOnWriteArray来测试时,发现并没有报错,并且一个线程对集合的操作,并不影响另一个线程迭代器遍历数组。但问题就是消耗性能,因为每次都进行拷贝操作,所以也就需要注意内存问题。
Map
HashMap
HashMap存储的是Key,Value.也就是一个个小节点,所以内部封装了Node类来存储值,再通过Node数组来存放一个个Node,这里有意思的就是Node它也是一个链表结构,当对key求Hash值时,如果数组上这个位置已经有值了,则直接绑定到之前加入节点的后面,很方便。既能做到查询快,插入也快。
HashMap中Put到底是怎么走的?
public V put(K key, V value) {
//先对key求一个Hash值
return putVal(hash(key), key, value, false, true);
}
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
//这里初始化一个Node数组,一个Node节点
Node<K,V>[] tab; Node<K,V> p; int n, i;
//判定Node数字有没有初始化过
if ((tab = table) == null || (n = tab.length) == 0)
//如果没有,说明现在的Node数组是一个空数组,需要进行扩容后存放数据
n = (tab = resize()).length;
//通过数组中Hash位置判定,是否之前插入过数据
if ((p = tab[i = (n - 1) & hash]) == null)
//如果没有直接在该节点上插入一个新的Node节点
tab[i] = newNode(hash, key, value, null);
else {
//这里就是如果之前插入过一个节点,那现在就需要将新来的节点和之前的连接
Node<K,V> e; K k;
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
else if (p instanceof TreeNode)
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
else {
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}
//统计有多少数据
++modCount;
//判定是否需要扩容
if (++size > threshold)
resize();
afterNodeInsertion(evict);
return null;
}
final Node<K,V>[] resize() {
Node<K,V>[] oldTab = table;
//原来数组的大小
int oldCap = (oldTab == null) ? 0 : oldTab.length;
//老的临界点值
int oldThr = threshold;
int newCap, newThr = 0;
if (oldCap > 0) {
if (oldCap >= MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return oldTab;
}
else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
oldCap >= DEFAULT_INITIAL_CAPACITY)
newThr = oldThr << 1; // double threshold
}
else if (oldThr > 0) // initial capacity was placed in threshold
newCap = oldThr;
//这里一般就是第一次使用时,进行默认初始化 容量:16 边界值:12 加载因子:0.75
else { // zero initial threshold signifies using defaults
newCap = DEFAULT_INITIAL_CAPACITY;
newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
}
if (newThr == 0) {
float ft = (float)newCap * loadFactor;
newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
(int)ft : Integer.MAX_VALUE);
}
threshold = newThr;
@SuppressWarnings({"rawtypes","unchecked"})
// 一般是按照一倍扩容的
Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
table = newTab;
if (oldTab != null) {
for (int j = 0; j < oldCap; ++j) {
Node<K,V> e;
if ((e = oldTab[j]) != null) {
oldTab[j] = null;
if (e.next == null)
newTab[e.hash & (newCap - 1)] = e;
else if (e instanceof TreeNode)
((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
else { // preserve order
Node<K,V> loHead = null, loTail = null;
Node<K,V> hiHead = null, hiTail = null;
Node<K,V> next;
do {
next = e.next;
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
else {
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}
}
}
}
}
return newTab;
}
put() ⽅法原理
- 先将Key,Value封装为一个Node。
- 通过HashCode()得出Hash值
- 通过哈希函数将Hash值装换为数组下标
- 如果这个位置没有任何元素,则直接插入Node
- 如果已经有了Node链表,将当前Node的key与链表上每一个Key进行equals比较
- 如果都返回false,则插入链表末尾
- 如果有true,则直接新的Node的value替代原来的。(保证不可重复)
注意:
HashMap
中允许key,value为null,但只能有一个
HashTable
中key和value都不能为空
get() 方法原理
- 通过key的HashCode()方法求出Hash值
- 通过哈希函数将hash值转为数组的下标
- 通过下标定位到数组的某个位置
- 如果没有链表,直接返回Null
- 如果有链表,以此进行匹配每个节点上的key通过equals进行比较
- 如果都为false,则返回null
- 如果为true,则返回值
放在HashMap中的key元素需要同时重写hashCode()和equals()方法
重写HashCode()和equals()方法
一定要分布均匀
如果hashCode()返回的一个值,就会变成单链表
如果hashCode()返回的都不一样值,HashMap就变成数组了
这也称为分布不均匀
equals ,hashCode 需要同时重写,保证一个对象一旦相等,Hash值也应该相同
注意事项:
如果可以预测到容量最好,多设置一些,一方面避免频繁的扩容,一方面也需要考虑到加载因子的边界值
为什么Map需要有加载因子,0.75
- 如果不设置边界值,Map的Hash碰撞概率到后面会越来越高,影响效率
- 0.75是一个普遍值,需要用数学证明
Collection工具类
Collections.sort(List list)
Collections.sort(List list, Compataor cmp)
String存储原理
public static void main(String[] args) {
//这里是直接创建在方法区的字符串常量池
String s = "abc";
//这个是在堆中创建了一个String对象,但值是指向方法区的
String s1 = new String("abc");
//判断这个字符串在常量池是否存在,如果存在直接返回,如果不存在创建返回
String s2 = s1.intern();
System.out.println(s == s1); //false
System.out.println(s1 == s2); //false
System.out.println(s == s2); //true
}
String,StringBuilder,StringBuffer
- String被final修饰,当你修改时是直接创建一个新的在方法区,所以当我们使用时,如果该值经常就修改,最好使用后面两个
- StringBuilder没有final修饰,可变,线程不安全,效率高
- StringBuffer没有final修饰,可变,线程安全,效率低
锁问题
Synchronize(重量级锁)
加在方法上,对方法进行同步,也就是每次只能一个线程进入。默认为非公平锁
- 优点:简单,直接,自动完成加锁,解锁操作
- 缺点:在用户态和内核态进行切换,效率低。
ReenLock(手动锁)
通过创建锁对象,手动的进行加锁,和解锁
- 优点:可以自己设置为公平或非公平锁
非公平锁:假如有三个线程同时来,那最终哪个线程能够进入就看谁快,(不用排队)
公平锁:三个线程进入,依次放入一个队列,(先到先得)
- 缺点:需要自己注意解锁的过程,锁是可重入的,如果少释放一层就会出现死锁现象
CAS(Compare And Set:轻量级锁,乐观锁)
通过
AtomicReference
进行创建自旋锁,每次线程操作前会检查自旋锁的值是否是预想值,如果不是需要进行自旋操作,知道当前值与预期值一致时,才可以操作。
- 优点:不会进入内核态,减少切换的性能消耗
- 缺点:自旋其实就是循环,如果一个线程一直无法进行操作,就会导致CPU空转,也会带来性能消耗
CAS的经典问题:ABA问题 -> 就是说我预期值为1,但是别的线程先改成2了,再一个线程又改成1了,那当我判断时并不知道这个值之前已经被人操作了,我会继续执行我自己的逻辑。那么该如何解决呢?
使用AtomicStampedReference
加上时间戳,通俗的说就是加上一个版本号,不仅要比较当前值,还要比较版本号。只有两者都相等,才执行更新操作。
如果优化CAS?
可在线程空转一定时间后,放入阻塞队列中等待。
LockSupport
- 是什么
- 对阻塞,唤醒的优化机制
- 其实底层使用的是一个permit许可证,取值范围0-1
- 能干嘛
- 线程同步操作
- 去哪下
- JUC包
- 怎么玩
- LockSupport.port相当于需要消耗一个许可证,如果许可证=1,则直接-1返回,不会阻塞,
- LockSupport.unport相当于给出一个许可证,但是许可证上限是1,重复使用unport许可证还是等于1,所以执行两次LockSupport.unport 和 LockSupport.port,会导致最后一个port阻塞
- AB法则
- Object.wait() 和 Object.notify() 有哪些不足
- wait 和 notify 必须在同步代码块中出现.
- 无法先notify,再wait,会导致死锁问题
- Lock.Condition.await() 和 signal() 有哪些不足
- 与wait,notify的问题一致
- 所以我们推出了LockSupport,看是如果解决上述方法了。
- Object.wait() 和 Object.notify() 有哪些不足
AQS
我们通过对ReentrantLock的源码解读来学习
创建锁
// 创建一个Reentrant锁
ReentrantLock lock = new ReentrantLock();
// 通过构造器我们可以看到实际上创建的是一个Sync对象
public ReentrantLock() {
Sync sync = new NonfairSync();
}
// Sync类是继承了AQS(抽象队列同步器)
abstract static class Sync extends AbstractQueuedSynchronizer
获取锁
// 底层还是sync对象调用lock方法
lock.lock(); -> sync.lock();
// lock方法是Sync的抽象模板方法,具体由NonfairSync进行实现,这里我们使用默认的非公平锁
final void lock() {
// 尝试获取锁
if (compareAndSetState(0, 1))
// 如果获取成功则,将当前线程设为所有者线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 如果没获取到,则再判断
acquire(1);
}
public final void acquire(int arg) {
// 调用nonfairTryAcquire(int acquires) 再次尝试获取锁,判断是否为可重入
if (!tryAcquire(arg) &&
// 先将当前线程创建成一个节点,加入等待队列中,再进行最后的尝试
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean nonfairTryAcquire(int acquires) {
// 当前线程
final Thread current = Thread.currentThread();
// 这里使用AQS方法,获取锁值,
int c = getState();
if (c == 0) {
// 如果为0说明可以抢占锁了
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 判断之前抢到锁的线程是否就是自己,保证锁的可重入性
else if (current == getExclusiveOwnerThread()) {
// 如果是,则计数器+1
int nextc = c + acquires;
// 这里怕计数器溢出了
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
// 判断抢锁成功
return true;
}
// 抢锁失败
return false;
}
private Node addWaiter(Node mode) {
// 添加一个等待节点,并标记为渎职EXCLUSIVE独占模式
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 获取队列中的尾结点
Node pred = tail;
if (pred != null) {
// 尾结点不为空,说明前面还有等待的节点,则进行串联
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果队列为空,则插入当前节点到队列中
enq(node);
// 最后返回生成的节点
return node;
}
private Node enq(final Node node) {
// 这是一个自旋结构
for (;;) {
// 再次判断队列是否为空
Node t = tail;
if (t == null) { // Must initialize
// 将该节点加入队列,这里注意头结点和尾结点是一个系统自建的一个节点,也是一个面试可能考的点,第一个进入等待队列的节点,其头指针指向的是哨兵节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 新来的节点连接尾结点或哨兵节点
node.prev = t;
// 将尾指针设置为新加入的节点
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
开始准备将节点进行阻塞
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋结构
for (;;) {
// 获取当前封装节点的前一个节点
final Node p = node.predecessor();
// 如果p就是头结点,也就是哨兵节点,则再尝试去获取锁
if (p == head && tryAcquire(arg)) {
// 如果获取成功了,将当前线程节点设置为头结点
setHead(node);
// 将之前的头结点进行垃圾回收,这里其实就是头结点的替换
p.next = null; // help GC
failed = false;
// 返回线程是否被打断过
return interrupted;
}
// Park相当于wait进行阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 取消排队
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前一个节点可能是哨兵节点的状态值
int ws = pred.waitStatus;
// 如果status值=-1
if (ws == Node.SIGNAL)
// 说明这个节点已经尝试过一次(因为调用结构为自旋),但是失败了
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 这里如果是第一次来,则将前节点状态值设置为-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// 当前节点直接被挂起,相当于阻塞,也就是不再去抢锁了
LockSupport.park(this);
return Thread.interrupted();
}
解锁
lock.unlock(); -> sync.release(1);
public final boolean release(int arg) {
// 尝试解锁
if (tryRelease(arg)) {
// 当前节点解锁成功后,再看AQS头结点
Node h = head;
// 如果头结点不为空,且只要status!=0,说明队列中肯定有阻塞的节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
// 将当前线程节点状态-1,
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// c=0表示已经锁解除干净了,没有重入了
if (c == 0) {
free = true;
// 将占锁标记指向null,其他线程看到了。就可以直接去抢占了
setExclusiveOwnerThread(null);
}
// 设置这个线程的状态值
setState(c);
// 返回解锁成功
return free;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 注意这个的node是AQS的头结点,判断status值
int ws = node.waitStatus;
if (ws < 0)
// 将头结点的值设置为0
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 获取到哨兵节点后第一个阻塞节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 这里我们可以看到被阻塞的线程被唤醒了
LockSupport.unpark(s.thread);
}