mybatis 源码系列(八) Java基础之wait()、notify()、notifyAll()方法

2019/05/19 mybatis 浏览

在研究mybatis的连接池数据源源码时,我们看到了wait()、notifyAll()方法的使用,工作中因为很少使用到这类方法的调用,所以,其中概念也有些模糊了,写一遍博客记录一下.

在读取mybatis提供的PooledDataSource源码中,获取PooledConnection对象时如果最大活动连接数达到上限后,则调用wait()方法等待,当调用close关闭连接时,再调用notify方法唤起等待的线程.

先来看jdk中的注释

/**
     * Causes the current thread to wait until another thread invokes the
     * {@link java.lang.Object#notify()} method or the
     * {@link java.lang.Object#notifyAll()} method for this object.
     * In other words, this method behaves exactly as if it simply
     * performs the call {@code wait(0)}.
     * <p>
     * The current thread must own this object's monitor. The thread
     * releases ownership of this monitor and waits until another thread
     * notifies threads waiting on this object's monitor to wake up
     * either through a call to the {@code notify} method or the
     * {@code notifyAll} method. The thread then waits until it can
     * re-obtain ownership of the monitor and resumes execution.
     * <p>
     * As in the one argument version, interrupts and spurious wakeups are
     * possible, and this method should always be used in a loop:
     * <pre>
     *     synchronized (obj) {
     *         while (&lt;condition does not hold&gt;)
     *             obj.wait();
     *         ... // Perform action appropriate to condition
     *     }
     * </pre>
     * This method should only be called by a thread that is the owner
     * of this object's monitor. See the {@code notify} method for a
     * description of the ways in which a thread can become the owner of
     * a monitor.
     *
     * @throws  IllegalMonitorStateException  if the current thread is not
     *               the owner of the object's monitor.
     * @throws  InterruptedException if any thread interrupted the
     *             current thread before or while the current thread
     *             was waiting for a notification.  The <i>interrupted
     *             status</i> of the current thread is cleared when
     *             this exception is thrown.
     * @see        java.lang.Object#notify()
     * @see        java.lang.Object#notifyAll()
     */
public final void wait() throws InterruptedException {
    wait(0);
}
/**
     * Wakes up a single thread that is waiting on this object's
     * monitor. If any threads are waiting on this object, one of them
     * is chosen to be awakened. The choice is arbitrary and occurs at
     * the discretion of the implementation. A thread waits on an object's
     * monitor by calling one of the {@code wait} methods.
     * <p>
     * The awakened thread will not be able to proceed until the current
     * thread relinquishes the lock on this object. The awakened thread will
     * compete in the usual manner with any other threads that might be
     * actively competing to synchronize on this object; for example, the
     * awakened thread enjoys no reliable privilege or disadvantage in being
     * the next thread to lock this object.
     * <p>
     * This method should only be called by a thread that is the owner
     * of this object's monitor. A thread becomes the owner of the
     * object's monitor in one of three ways:
     * <ul>
     * <li>By executing a synchronized instance method of that object.
     * <li>By executing the body of a {@code synchronized} statement
     *     that synchronizes on the object.
     * <li>For objects of type {@code Class,} by executing a
     *     synchronized static method of that class.
     * </ul>
     * <p>
     * Only one thread at a time can own an object's monitor.
     *
     * @throws  IllegalMonitorStateException  if the current thread is not
     *               the owner of this object's monitor.
     * @see        java.lang.Object#notifyAll()
     * @see        java.lang.Object#wait()
     */
public final native void notify();

通过查看JDK的注释方法,我们应该知道

  • 不管是wait方法或者notify、notifyAll,调用方法必须在synchronized同步代码块中,也就是说当前线程必须拥有对象的监视器,这样做的原因是防止线程信号丢失,否则会抛出IllegalMonitorStateException异常。
  • 调用wait()方法前的检查操作必须使用while循环,有时在没有调用notify()方法的情况下,线程会被重新激活等待,使用if会造成虚假唤醒的问题,而使用while循环可以确保条件被检查到
  • 如果是一个线程的话,notify和notifyAll效果一样,如果是多个线程的情况下,notify只会随机唤醒其中一个线程,而notifyAll会唤醒所有线程,唤醒后的所有线程会竞争获取对象锁.
  • 另外,尽量避免在同步块中获取锁,或者调用外部方法(因为你不知道这个方法会发生什么),尽量避免造成死锁
  • 如果你不知道自己在做什么的话,或者对notify不是很清楚,请使用notifyAll代替

为了达到上面的效果,可以写一个简单的阻塞队列实现BlockingQueue

首先我们应该抽取相应的条件:

  • 队列有大小,也就是说我们的队里中如果已满,则线程必须等待
  • 如果队里已经消费,则需要唤醒等待的线程,可以重新加入数据

BlockingQueue.java

/***
 *
 * @since:mybatis-advance 1.0
 * @author <a href="mailto:xiaoymin@foxmail.com">xiaoymin@foxmail.com</a> 
 * 2019/05/20 21:41
 */
public class BlockingQueue<T> {

    /**
     * 容量
     */
    private int capacity;


    private Queue<T> queue=new LinkedList<>();

    public BlockingQueue(int capacity) {
        this.capacity = capacity;
    }

    /***
     * 添加元素
     * @param t
     */
    public void add(T t){
        synchronized (this){
            //如果队列已满,wait等待
            while (queue.size()==capacity){
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.add(t);
            //已经添加元素,唤醒消费线程
            notifyAll();
        }
    }

    /***
     * 获取元素
     * @return
     */
    public T get(){
        T obj=null;
        //获取对象锁
        synchronized (this){
            //如果当前队列中没有数据,则等待
            while (queue.isEmpty()){
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            obj=queue.remove();
            //获取得到obj,唤醒其他线程
            notifyAll();
            return obj;
        }
    }
}

此时,如果我们用两个线程来测试一下:

public static void main(String[] args) {
        BlockingQueue<String> blockingQueue=new BlockingQueue<>(10);

        Thread t=new Thread(new Runnable() {
            @Override
            public void run() {
                //provider
                while (true){
                    blockingQueue.add("test");
                }
            }
        });

        t.start();

        Thread tc=new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    System.out.println("获取的数据:"+blockingQueue.get());
                }
            }
        });
        tc.start();
    }

tc线程会一直输出,这是因为我们的add数据和get数据都是一对一的,而我们的容量是10,这不会造成阻塞,如果我们有多个生成者,而只有一个消费者呢,此时我们改一下代码

public static void main(String[] args) {
    BlockingQueue<String> blockingQueue=new BlockingQueue<>(10);
    for (int i=0;i<10;i++){
        Provider p=new Provider(blockingQueue);
        p.setName("线程"+i);
        p.start();
    }

    //创建2个消费线程

    for (int i=0;i<2;i++){
        Consumer c=new Consumer(blockingQueue);
        c.setName("消费线程"+i);
        c.start();
    }
}


private static class Provider extends Thread{


    private final BlockingQueue<String> blockingQueue;

    public Provider(BlockingQueue<String> blockingQueue) {
        this.blockingQueue=blockingQueue;
    }
    @Override
    public void run() {
        while (true){
            blockingQueue.add(Thread.currentThread().getName()+"--产生的数据");
        }
    }
}


private static class Consumer extends Thread{

    private final BlockingQueue<String> blockingQueue;

    public Consumer(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    public void run(){
        while (true){
            System.out.println(Thread.currentThread().getName()+"--------->"+blockingQueue.get());
        }
    }
}

站内搜索

    Table of Contents