在研究mybatis的连接池数据源源码时,我们看到了wait()、notifyAll()方法的使用,工作中因为很少使用到这类方法的调用,所以,其中概念也有些模糊了,写一遍博客记录一下.
在读取mybatis提供的PooledDataSource源码中,获取PooledConnection对象时如果最大活动连接数达到上限后,则调用wait()方法等待,当调用close关闭连接时,再调用notify方法唤起等待的线程.
先来看jdk中的注释
/**
* Causes the current thread to wait until another thread invokes the
* {@link java.lang.Object#notify()} method or the
* {@link java.lang.Object#notifyAll()} method for this object.
* In other words, this method behaves exactly as if it simply
* performs the call {@code wait(0)}.
* <p>
* The current thread must own this object's monitor. The thread
* releases ownership of this monitor and waits until another thread
* notifies threads waiting on this object's monitor to wake up
* either through a call to the {@code notify} method or the
* {@code notifyAll} method. The thread then waits until it can
* re-obtain ownership of the monitor and resumes execution.
* <p>
* As in the one argument version, interrupts and spurious wakeups are
* possible, and this method should always be used in a loop:
* <pre>
* synchronized (obj) {
* while (<condition does not hold>)
* obj.wait();
* ... // Perform action appropriate to condition
* }
* </pre>
* This method should only be called by a thread that is the owner
* of this object's monitor. See the {@code notify} method for a
* description of the ways in which a thread can become the owner of
* a monitor.
*
* @throws IllegalMonitorStateException if the current thread is not
* the owner of the object's monitor.
* @throws InterruptedException if any thread interrupted the
* current thread before or while the current thread
* was waiting for a notification. The <i>interrupted
* status</i> of the current thread is cleared when
* this exception is thrown.
* @see java.lang.Object#notify()
* @see java.lang.Object#notifyAll()
*/
public final void wait() throws InterruptedException {
wait(0);
}
/**
* Wakes up a single thread that is waiting on this object's
* monitor. If any threads are waiting on this object, one of them
* is chosen to be awakened. The choice is arbitrary and occurs at
* the discretion of the implementation. A thread waits on an object's
* monitor by calling one of the {@code wait} methods.
* <p>
* The awakened thread will not be able to proceed until the current
* thread relinquishes the lock on this object. The awakened thread will
* compete in the usual manner with any other threads that might be
* actively competing to synchronize on this object; for example, the
* awakened thread enjoys no reliable privilege or disadvantage in being
* the next thread to lock this object.
* <p>
* This method should only be called by a thread that is the owner
* of this object's monitor. A thread becomes the owner of the
* object's monitor in one of three ways:
* <ul>
* <li>By executing a synchronized instance method of that object.
* <li>By executing the body of a {@code synchronized} statement
* that synchronizes on the object.
* <li>For objects of type {@code Class,} by executing a
* synchronized static method of that class.
* </ul>
* <p>
* Only one thread at a time can own an object's monitor.
*
* @throws IllegalMonitorStateException if the current thread is not
* the owner of this object's monitor.
* @see java.lang.Object#notifyAll()
* @see java.lang.Object#wait()
*/
public final native void notify();
通过查看JDK的注释方法,我们应该知道:
- 不管是wait方法或者notify、notifyAll,调用方法必须在synchronized同步代码块中,也就是说当前线程必须拥有对象的监视器,这样做的原因是防止线程信号丢失,否则会抛出IllegalMonitorStateException异常。
- 调用wait()方法前的检查操作必须使用while循环,有时在没有调用notify()方法的情况下,线程会被重新激活等待,使用if会造成虚假唤醒的问题,而使用while循环可以确保条件被检查到
- 如果是一个线程的话,notify和notifyAll效果一样,如果是多个线程的情况下,notify只会随机唤醒其中一个线程,而notifyAll会唤醒所有线程,唤醒后的所有线程会竞争获取对象锁.
- 另外,尽量避免在同步块中获取锁,或者调用外部方法(因为你不知道这个方法会发生什么),尽量避免造成死锁
- 如果你不知道自己在做什么的话,或者对notify不是很清楚,请使用notifyAll代替
为了达到上面的效果,可以写一个简单的阻塞队列实现BlockingQueue
首先我们应该抽取相应的条件:
- 队列有大小,也就是说我们的队里中如果已满,则线程必须等待
- 如果队里已经消费,则需要唤醒等待的线程,可以重新加入数据
BlockingQueue.java
/***
*
* @since:mybatis-advance 1.0
* @author <a href="mailto:xiaoymin@foxmail.com">xiaoymin@foxmail.com</a>
* 2019/05/20 21:41
*/
public class BlockingQueue<T> {
/**
* 容量
*/
private int capacity;
private Queue<T> queue=new LinkedList<>();
public BlockingQueue(int capacity) {
this.capacity = capacity;
}
/***
* 添加元素
* @param t
*/
public void add(T t){
synchronized (this){
//如果队列已满,wait等待
while (queue.size()==capacity){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.add(t);
//已经添加元素,唤醒消费线程
notifyAll();
}
}
/***
* 获取元素
* @return
*/
public T get(){
T obj=null;
//获取对象锁
synchronized (this){
//如果当前队列中没有数据,则等待
while (queue.isEmpty()){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
obj=queue.remove();
//获取得到obj,唤醒其他线程
notifyAll();
return obj;
}
}
}
此时,如果我们用两个线程来测试一下:
public static void main(String[] args) {
BlockingQueue<String> blockingQueue=new BlockingQueue<>(10);
Thread t=new Thread(new Runnable() {
@Override
public void run() {
//provider
while (true){
blockingQueue.add("test");
}
}
});
t.start();
Thread tc=new Thread(new Runnable() {
@Override
public void run() {
while (true){
System.out.println("获取的数据:"+blockingQueue.get());
}
}
});
tc.start();
}
tc线程会一直输出,这是因为我们的add数据和get数据都是一对一的,而我们的容量是10,这不会造成阻塞,如果我们有多个生成者,而只有一个消费者呢,此时我们改一下代码
public static void main(String[] args) {
BlockingQueue<String> blockingQueue=new BlockingQueue<>(10);
for (int i=0;i<10;i++){
Provider p=new Provider(blockingQueue);
p.setName("线程"+i);
p.start();
}
//创建2个消费线程
for (int i=0;i<2;i++){
Consumer c=new Consumer(blockingQueue);
c.setName("消费线程"+i);
c.start();
}
}
private static class Provider extends Thread{
private final BlockingQueue<String> blockingQueue;
public Provider(BlockingQueue<String> blockingQueue) {
this.blockingQueue=blockingQueue;
}
@Override
public void run() {
while (true){
blockingQueue.add(Thread.currentThread().getName()+"--产生的数据");
}
}
}
private static class Consumer extends Thread{
private final BlockingQueue<String> blockingQueue;
public Consumer(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void run(){
while (true){
System.out.println(Thread.currentThread().getName()+"--------->"+blockingQueue.get());
}
}
}