jvm源码解读--17 Java的wait()、notify()学习
阅读原文时间:2022年03月03日阅读:1

write and debug by 张艳涛

wait()和notify()的通常用法

  1. A线程取得锁,执行wait(),释放锁;
  2. B线程取得锁,完成业务后执行notify(),再释放锁;
  3. B线程释放锁之后,A线程取得锁,继续执行wait()之后的代码;

关于synchronize修饰的代码块
通常,对于synchronize(lock){…}这样的代码块,编译后会生成monitorenter和monitorexit指令,线程执行到monitorenter指令时会尝试取得lock对应的monitor的所有权(CAS设置对象头),取得后即获取到锁,执行monitorexit指令时会释放monitor的所有权即释放锁;

一个完整的demo
为了深入学习wait()和notify(),先用完整的demo程序来模拟场景吧,以下是源码:

package com.zyt.wait_notify;

public class NotifyDemo {

private static void sleep(long sleepVal){  
    try{  
        Thread.sleep(sleepVal);  
    }catch(Exception e){  
        e.printStackTrace();  
    }  
}

private static void log(String desc){  
    System.out.println(Thread.currentThread().getName() + " : " + desc);  
}

Object lock = new Object();

public void startThreadA(){  
    new Thread(() -> {  
        synchronized (lock){  
            log("get lock");  
            startThreadB();  
            log("start wait");  
            try {

            }catch(InterruptedException e){  
                e.printStackTrace();  
            }

            log("get lock after wait");  
            log("release lock");  
        }  
    }, "thread-A").start();  
}

public void startThreadB(){  
    new Thread(()->{  
        synchronized (lock){  
            log("get lock");  
            startThreadC();  
            sleep(100);  
            log("start notify");  
            lock.notify();  
            log("release lock");

        }  
    },"thread-B").start();  
}

public void startThreadC(){  
    new Thread(() -> {  
        synchronized (lock){  
            log("get lock");  
            log("release lock");  
        }  
    }, "thread-C").start();  
}

public static void main(String\[\] args){  
    new NotifyDemo().startThreadA();  
}  

}

以上就是本次实战用到的demo,代码功能简述如下:

启动线程A,取得锁之后先启动线程B再执行wait()方法,释放锁并等待;
线程B启动之后会等待锁,A线程执行wait()之后,线程B取得锁,然后启动线程C,再执行notify唤醒线程A,最后退出synchronize代码块,释放锁;
线程C启动之后就一直在等待锁,这时候线程B还没有退出synchronize代码块,锁还在线程B手里;
线程A在线程B执行notify()之后就一直在等待锁,这时候线程B还没有退出synchronize代码块,锁还在线程B手里;
线程B退出synchronize代码块,释放锁之后,线程A和线程C竞争锁;
把上面的代码在Openjdk8下面执行,反复执行多次,都得到以下结果:

thread-A : get lock
thread-A : start wait
thread-B : get lock
thread-C : c thread is start
thread-B : start notify
thread-B : release lock
thread-A : after wait, acquire lock again
thread-A : release lock
thread-C : get lock
thread-C : release lock

针对以上结果,问题来了:
第一个问题:
将以上代码反复执行多次,结果都是B释放锁之后A会先得到锁,这又是为什么呢?C为何不能先拿到锁呢?

第二个问题:
线程C自开始就执行了monitorenter指令,它能得到锁是容易理解的,但是线程A呢?在wait()之后并没有没有monitorenter指令,那么它又是如何取得锁的呢?

wait()、notify()这些方法都是native方法,所以只有从JVM源码寻找答案了,本次阅读的是openjdk8的源码;

带上问题去看JVM源码
按照demo代码执行顺序,我整理了如下问题,带着这些问题去看JVM源码可以聚焦主线,不要被一些支线的次要的代码卡住(例如一些异常处理,监控和上报等):

线程A在wait()的时候做了什么?
线程C启动后,由于此时线程B持有锁,那么线程C此时在干啥?
线程B在notify()的时候做了什么?
线程B释放锁的时候做了什么?

好了,接下来看源码分析问题吧:

线程A在wait()的时候做了什么

java代码中的

lock.wait();

这个lock是一个object对象

如果你看源码的时候能看到这个是一个native方法,jvm对于native方法的处理有俩种方法,

  1. 一种是自定义,使用javah 生成对应的.hpp的头,在用c++写对于的.cpp方法实现,其中会调用jin.h的env方法,在使用gcc编译成.so文件,

ObjTest.java

package jni;
class A {}
public class ObjTest extends A{
static {
System.loadLibrary("ObjTest");
}
public ObjTest(){
System.out.println("default");
}
public ObjTest(int age){
System.out.println("param Construtor,age->"+age);
}
public native static void test(Object a);

public static void main(String\[\] args){  
    test(new ObjTest());  
}  

}

  2.另外的一种就是系统自带的native,

如果你要看代码的话

#include
#include
#include

#include "jni.h"
#include "jni_util.h"
#include "jvm.h"

#include "java_lang_Object.h"

static JNINativeMethod methods[] = {
{"hashCode", "()I", (void *)&JVM_IHashCode},
{"wait", "(J)V", (void *)&JVM_MonitorWait},
{"notify", "()V", (void *)&JVM_MonitorNotify},
{"notifyAll", "()V", (void *)&JVM_MonitorNotifyAll},
{"clone", "()Ljava/lang/Object;", (void *)&JVM_Clone},
};

JNIEXPORT void JNICALL
Java_java_lang_Object_registerNatives(JNIEnv *env, jclass cls)
{
(*env)->RegisterNatives(env, cls,
methods, sizeof(methods)/sizeof(methods[0]));
}

JNIEXPORT jclass JNICALL
Java_java_lang_Object_getClass(JNIEnv *env, jobject this)
{
if (this == NULL) {
JNU_ThrowNullPointerException(env, NULL);
return 0;
} else {
return (*env)->GetObjectClass(env, this);
}
}

会发现它使用了 RegisterNatives这种方法,wait对于的方法,为   (void *)&JVM_MonitorWait

在share/vm/prims/jvm.cpp文件中定义的

JVM_ENTRY(void, JVM_MonitorWait(JNIEnv* env, jobject handle, jlong ms))
JVMWrapper("JVM_MonitorWait");
Handle obj(THREAD, JNIHandles::resolve_non_null(handle));
JavaThreadInObjectWaitState jtiows(thread, ms != 0);
if (JvmtiExport::should_post_monitor_wait()) {
JvmtiExport::post_monitor_wait((JavaThread *)THREAD, (oop)obj(), ms);
}
ObjectSynchronizer::wait(obj, ms, CHECK);
JVM_END

重要的是,他要进入ObjectSynchronizer.wait方法

void ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) {
//UseBiasedLocking默认为true
if (UseBiasedLocking) {
//撤销对象头中包含的偏向锁
BiasedLocking::revoke_and_rebias(obj, false, THREAD);
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}
if (millis < 0) { TEVENT (wait - throw IAX) ; THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative"); } //分配一个关联的ObjectMonitor实例 ObjectMonitor* monitor = ObjectSynchronizer::inflate(THREAD, obj()); DTRACE_MONITOR_WAIT_PROBE(monitor, obj(), THREAD, millis); //调用其wait方法 monitor->wait(millis, true, THREAD);

/* This dummy call is in place to get around dtrace bug 6254741. Once
that's fixed we can uncomment the following line and remove the call */
// DTRACE_MONITOR_PROBE(waited, monitor, obj(), THREAD);
dtrace_waited_probe(monitor, obj, THREAD);
}

这里就能看到了

//调用其wait方法
monitor->wait(millis, true, THREAD);

线程B在这个时候做了什么

在线程A调用wait方法的同时,线程B也已经启动了,就是jvm源码调试中的Thread-10

能看到这个时候线程B,已经进入了synchronized代码块,其中对应的指令就是monitorenter

对于monitorenter指令进行简单分析

先打印一下bt信息

#0 ObjectMonitor::enter (this=0x7fb1280036e0, __the_thread__=0x7fb11c001800) at /home/atzhang/atzhang/openjdksource/openjdk8/openjdk/hotspot/src/share/vm/runtime/objectMonitor.cpp:323
#1 0x00007fb14a8de3e6 in ObjectSynchronizer::slow_enter (obj=…, lock=0x7fb1345fc6a8, __the_thread__=0x7fb11c001800) at /home/atzhang/atzhang/openjdksource/openjdk8/openjdk/hotspot/src/share/vm/runtime/synchronizer.cpp:258
#2 0x00007fb14a8ddf70 in ObjectSynchronizer::fast_enter (obj=…, lock=0x7fb1345fc6a8, attempt_rebias=true, __the_thread__=0x7fb11c001800) at /home/atzhang/atzhang/openjdksource/openjdk8/openjdk/hotspot/src/share/vm/runtime/synchronizer.cpp:180
#3 0x00007fb14a538e2f in InterpreterRuntime::monitorenter (thread=0x7fb11c001800, elem=0x7fb1345fc6a8) at /home/atzhang/atzhang/openjdksource/openjdk8/openjdk/hotspot/src/share/vm/interpreter/interpreterRuntime.cpp:573

如果看网上的一般文章那么分析的起点就是InterpreterRuntime::monitorenter ,其实是不对的,也就是99.8%的文章都是差了那么一乃乃,

在上下求索的过程中,发现连mashibing的公开课讲的课,黄俊将的synchronized底层实现都是错的,他错在了哪里了? 他用了openjdk1.8早期的

源码版本来讲偏向锁,其实这个源码还没有实现偏向锁,我也遇到了这个问题,发现怎么看也看不懂,那么真正的入口是

bytecodeInterpreter.cpp

 /\* monitorenter and monitorexit for locking/unlocking an object \*/

  CASE(\_monitorenter): {  
    oop lockee = STACK\_OBJECT(-1);  
    // derefing's lockee ought to provoke implicit null check  
    CHECK\_NULL(lockee);  
    // find a free monitor or one already allocated for this object  
    // if we find a matching object then we need a new monitor  
    // since this is recursive enter  
    BasicObjectLock\* limit = istate->monitor\_base();  
    BasicObjectLock\* most\_recent = (BasicObjectLock\*) istate->stack\_base();  
    BasicObjectLock\* entry = NULL;  
    while (most\_recent != limit ) {  
      if (most\_recent->obj() == NULL) entry = most\_recent;  
      else if (most\_recent->obj() == lockee) break;  
      most\_recent++;  
    }  
    if (entry != NULL) {  
      entry->set\_obj(lockee);  
      int success = false;  
      uintptr\_t epoch\_mask\_in\_place = (uintptr\_t)markOopDesc::epoch\_mask\_in\_place;

      markOop mark = lockee->mark();  
      intptr\_t hash = (intptr\_t) markOopDesc::no\_hash;  
      // implies UseBiasedLocking  
      // code 3:如果锁对象的mark word的状态是偏向模式  
      if (mark->has\_bias\_pattern()) {//has\_bias\_pattern las 3bit is 101 ?  
        uintptr\_t thread\_ident;  
        uintptr\_t anticipated\_bias\_locking\_value;  
        thread\_ident = (uintptr\_t)istate->thread();  
        anticipated\_bias\_locking\_value =  
          (((uintptr\_t)lockee->klass()->prototype\_header() | thread\_ident) ^ (uintptr\_t)mark) &  
          ~((uintptr\_t) markOopDesc::age\_mask\_in\_place);  
         // code 5:如果偏向的线程是自己且epoch等于class的epoch  
        if  (anticipated\_bias\_locking\_value == 0) {  
          // already biased towards this thread, nothing to do  
          if (PrintBiasedLockingStatistics) {  
            (\* BiasedLocking::biased\_lock\_entry\_count\_addr())++;  
          }  
          success = true;  
        }  
           /\*\*  
           \*  初始化:偏向锁默认是延时初始化的,延迟的时间通过参数BiasedLockingStartupDelay控制,默认是4000ms默认是4000ms。  
           \*  初始化是在安全点下通过VMThread完成的,初始化时会把由SystemDictionary维护的所有已加载类的  
           \*  Klass的prototype\_header修改成匿名偏向锁对象头,并把\_biased\_locking\_enabled静态属性置为true,  
           \*  后续加载新的Klass时发现该属性为true,会将Klass的prototype\_header修改成匿名偏向锁对象头。  
           \*  当创建某个Klass的oop时,会利用Klass的prototype\_header来初始化该oop的对象头,即偏向锁初始化完成后,  
           \*  后续所有创建的oop的初始对象头都是匿名偏向锁的,在此之前创建的oop初始对象头都是无锁状态的。  
           \*/  
        else if ((anticipated\_bias\_locking\_value & markOopDesc::biased\_lock\_mask\_in\_place) != 0) {  
          // try revoke bias

          markOop header = lockee->klass()->prototype\_header();  
          if (hash != markOopDesc::no\_hash) {  
            header = header->copy\_set\_hash(hash);  
          }  
          //\_biased\_locking\_enabled静态属性为false,默认是4000ms,尚未开启偏向锁,所以撤销偏向锁  
          if (Atomic::cmpxchg\_ptr(header, lockee->mark\_addr(), mark) == mark) {  
            if (PrintBiasedLockingStatistics)  
              (\*BiasedLocking::revoked\_lock\_entry\_count\_addr())++;  
          }  
        }  
         // code 7:如果epoch不等于class中的epoch,则尝试重偏向  
        else if ((anticipated\_bias\_locking\_value & epoch\_mask\_in\_place) !=0) {  
          // try rebias  
          markOop new\_header = (markOop) ( (intptr\_t) lockee->klass()->prototype\_header() | thread\_ident);  
          if (hash != markOopDesc::no\_hash) {  
            new\_header = new\_header->copy\_set\_hash(hash);  
          }  
          if (Atomic::cmpxchg\_ptr((void\*)new\_header, lockee->mark\_addr(), mark) == mark) {  
            if (PrintBiasedLockingStatistics)  
              (\* BiasedLocking::rebiased\_lock\_entry\_count\_addr())++;  
          }  
          else {  
            // 重偏向失败,代表存在多线程竞争,则调用monitorenter方法进行锁升级  
              CALL\_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle\_exception);  
          }  
          success = true;  
        }  
        else {  
            // 走到这里说明当前要么偏向别的线程,要么是匿名偏向(即没有偏向任何线程)  
            // code 8:下面构建一个匿名偏向的mark word,尝试用CAS指令替换掉锁对象的mark word  
            // try to bias towards thread in case object is anonymously biased  
          markOop header = (markOop) ((uintptr\_t) mark & ((uintptr\_t)markOopDesc::biased\_lock\_mask\_in\_place |  
                                                          (uintptr\_t)markOopDesc::age\_mask\_in\_place |  
                                                          epoch\_mask\_in\_place));  
          if (hash != markOopDesc::no\_hash) {  
            header = header->copy\_set\_hash(hash);  
          }  
          markOop new\_header = (markOop) ((uintptr\_t) header | thread\_ident);  
          // debugging hint  
          DEBUG\_ONLY(entry->lock()->set\_displaced\_header((markOop) (uintptr\_t) 0xdeaddead);)  
          if (Atomic::cmpxchg\_ptr((void\*)new\_header, lockee->mark\_addr(), header) == header) {  
            // CAS修改成功  
              if (PrintBiasedLockingStatistics)  
              (\* BiasedLocking::anonymously\_biased\_lock\_entry\_count\_addr())++;  
          }  
          else {  
            // 如果修改失败说明存在多线程竞争,所以进入monitorenter方法  
            CALL\_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle\_exception);  
          }  
          success = true;  
        }  
      }  
      //if (mark->has\_bias\_pattern()) {//has\_bias\_pattern las 3bit is 101 ? } end of code  
      //如果没符合的mark->has\_bias\_pattern就到这里了,  
      // traditional lightweight locking  
      // 如果偏向线程不是当前线程或没有开启偏向模式等原因都会导致success==false??存疑

      if (!success) {  
          // 轻量级锁的逻辑  
            //code 9: 构造一个无锁状态的Displaced Mark Word,并将Lock Record的lock指向它  
        markOop displaced = lockee->mark()->set\_unlocked();  
        entry->lock()->set\_displaced\_header(displaced);  
        //如果指定了-XX:+UseHeavyMonitors,则call\_vm=true,代表禁用偏向锁和轻量级锁  
        bool call\_vm = UseHeavyMonitors;  
         // 利用CAS将对象头的mark word替换为指向Lock Record的指针,(如果是无锁状态,直接升级轻量级锁)  
        if (call\_vm || Atomic::cmpxchg\_ptr(entry, lockee->mark\_addr(), displaced) != displaced) {  
          // Is it simple recursive case?  
          // 判断是不是锁重入,进入这里说明obj是不是无锁状态,  
          if (!call\_vm && THREAD->is\_lock\_owned((address) displaced->clear\_lock\_bits())) {  
            //code 10: 如果是锁重入,则直接将Displaced Mark Word设置为null  
              entry->lock()->set\_displaced\_header(NULL);  
          } else {  
            CALL\_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle\_exception);  
          }  
        }  
      }  
      UPDATE\_PC\_AND\_TOS\_AND\_CONTINUE(1, -1);  
    } else {  
       // lock record不够,重新执行  
      istate->set\_msg(more\_monitors);  
      UPDATE\_PC\_AND\_RETURN(0); // Re-execute  
    }  
  }  
  /\*\*  
   \* JVM中的每个类也有一个类似mark word的prototype\_header,用来标记该class的epoch和偏向开关等信息。  
   \* 上面的代码中lockee->klass()->prototype\_header()即获取class的prototype\_header。

code 1,从当前线程的栈中找到一个空闲的Lock Record(即代码中的BasicObjectLock,下文都用Lock Record代指),
判断Lock Record是否空闲的依据是其obj字段 是否为null。
注意这里是按内存地址从低往高找到最后一个可用的Lock Record,换而言之,就是找到内存地址最高的可用Lock Record。

code 2,获取到Lock Record后,首先要做的就是为其obj字段赋值。

code 3,判断锁对象的mark word是否是偏向模式,即低3位是否为101。

code 4,这里有几步位运算的操作 anticipated_bias_locking_value =
(((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) & ​
~((uintptr_t) markOopDesc::age_mask_in_place); 这个位运算可以分为3个部分。

第一部分((uintptr_t)lockee->klass()->prototype_header() | thread_ident)
将当前线程id和类的prototype_header相或,这样得到的值为
(当前线程id + prototype_header中的(epoch + 分代年龄 + 偏向锁标志 + 锁标志位))
,注意prototype_header的分代年龄那4个字节为0

第二部分 ^ (uintptr_t)mark 将上面计算得到的结果与锁对象的markOop进行异或,
相等的位全部被置为0,只剩下不相等的位。

第三部分 & ~((uintptr_t) markOopDesc::age_mask_in_place) markOopDesc::age_mask_in_place为…0001111000,
取反后,变成了…1110000111,除了分代年龄那4位,其他位全为1;
将取反后的结果再与上面的结果相与,将上面异或得到的结果中分代年龄给忽略掉。

code 5,anticipated_bias_locking_value==0代表偏向的线程是当前线程且mark word的epoch等于class的epoch,这种情况下什么都不用做。

code 6,(anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0
代表class的prototype_header或对象的mark word中偏向模式是关闭的,
又因为能走到这已经通过了mark->has_bias_pattern()判断,
即对象的mark word中偏向模式是开启的,那也就是说class的prototype_header不是偏向模式。

然后利用CAS指令Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark撤销偏向锁,
我们知道CAS会有几个参数,1是预期的原值,2是预期修改后的值 ,3是要修改的对象,
与之对应,cmpxchg_ptr方法第一个参数是预期修改后的值,第2个参数是修改的对象,第3个参数是预期原值,
方法返回实际原值,如果等于预期原值则说明修改成功。

code 7,如果epoch已过期,则需要重偏向,利用CAS指令将锁对象的mark word替换为一个偏向当前线程且epoch为类的epoch的新的mark word。

code 8,CAS将偏向线程改为当前线程,如果当前是匿名偏向则能修改成功,否则进入锁升级的逻辑。

code 9,这一步已经是轻量级锁的逻辑了。从上图的mark word的格式可以看到,
轻量级锁中mark word存的是指向Lock Record的指针。这里构造一个无锁状态的mark word,
然后存储到Lock Record(Lock Record的格式可以看第一篇文章)。设置mark word是无锁状态的原因是:
轻量级锁解锁时是将对象头的mark word设置为Lock Record中的Displaced Mark Word,
所以创建时设置为无锁状态,解锁时直接用CAS替换就好了。

code 10, 如果是锁重入,则将Lock Record的Displaced Mark Word设置为null,起到一个锁重入计数的作用。

以上是偏向锁加锁的流程(包括部分轻量级锁的加锁流程),如果当前锁已偏向其他线程||epoch值过期||偏向模式关闭||获取偏向锁的过程中存在并发冲突,
都会进入到InterpreterRuntime::monitorenter方法, 在该方法中会对偏向锁撤销和升级。
*/

这个比较复杂,如果你下载的版本是hotspot-87ee5ee27509那么你会看到源码为

 /\* monitorenter and monitorexit for locking/unlocking an object \*/

  CASE(\_monitorenter): {  
    oop lockee = STACK\_OBJECT(-1);  
    // derefing's lockee ought to provoke implicit null check  
    CHECK\_NULL(lockee);  
    // find a free monitor or one already allocated for this object  
    // if we find a matching object then we need a new monitor  
    // since this is recursive enter  
    BasicObjectLock\* limit = istate->monitor\_base();  
    BasicObjectLock\* most\_recent = (BasicObjectLock\*) istate->stack\_base();  
    BasicObjectLock\* entry = NULL;  
    while (most\_recent != limit ) {  
      if (most\_recent->obj() == NULL) entry = most\_recent;  
      else if (most\_recent->obj() == lockee) break;  
      most\_recent++;  
    }  
    if (entry != NULL) {  
      entry->set\_obj(lockee);  
      markOop displaced = lockee->mark()->set\_unlocked();  
      entry->lock()->set\_displaced\_header(displaced);  
      if (Atomic::cmpxchg\_ptr(entry, lockee->mark\_addr(), displaced) != displaced) {  
        // Is it simple recursive case?  
        if (THREAD->is\_lock\_owned((address) displaced->clear\_lock\_bits())) {  
          entry->lock()->set\_displaced\_header(NULL);  
        } else {  
          CALL\_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle\_exception);  
        }  
      }  
      UPDATE\_PC\_AND\_TOS\_AND\_CONTINUE(1, -1);  
    } else {  
      istate->set\_msg(more\_monitors);  
      UPDATE\_PC\_AND\_RETURN(0); // Re-execute  
    }  
  }

这个真是个坑,那么分析一下这个过程

接着进入fast_enter

直接进入,

如果进入inflate方法,

会有其中情况:

  • // The mark can be in one of the following states:
  • // * Inflated - just return
  • // * Stack-locked - coerce it to inflated
  • // * INFLATING - busy wait for conversion to complete
  • // * Neutral - aggressively inflate the object.
  • // * BIASED - Illegal. We should never see this
  • ===以上是源码注解
  • 如果膨胀过了,锁已经是 ObjectMonitor,那么直接返回
  • 如果是Stack-locked,就是轻量级锁,那么锁膨胀
  • 如果膨胀中
  • 如果无锁Neutral,直接膨胀为重量级锁
  • 如果biased偏向锁,非法,因为上一个方法有判断,如果是偏向锁,直接返回不会到到达这里的

这里只贴了,锁此刻为重量级锁的时候,

那么返回的 ObjectMonitor * m 进入了enter方法

void ATTR ObjectMonitor::enter(TRAPS) {
// The following code is ordered to check the most common cases first
// and to reduce RTS->RTO cache line upgrades on SPARC and IA32 processors.
Thread * const Self = THREAD ;
void * cur ;
//原子的设置owner属性,如果_owner属性是NULL就将其设置为Self,否则返回当前的_owner属性
cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
if (cur == NULL) {
//设置成功,说明该Monitor没有被人占用
// Either ASSERT _recursions == 0 or explicitly set _recursions = 0.
assert (_recursions == 0 , "invariant") ;
assert (_owner == Self, "invariant") ;
// CONSIDER: set or assert OwnerIsThread == 1
return ;
}

if (cur == Self) {
//设置失败,说明该Monitor就是当前线程占用的,此处进入enter是嵌套加锁情形
// TODO-FIXME: check for integer overflow! BUGID 6557169.
_recursions ++ ;
return ;
}

//轻量级锁膨胀成重量级锁时,将owner设置为lock属性  

if (Self->is_lock_owned ((address)cur)) {
assert (_recursions == 0, "internal state error");
_recursions = 1 ;
// Commute owner from a thread-specific on-stack BasicLockObject address to
// a full-fledged "Thread *".
_owner = Self ;
OwnerIsThread = 1 ;
return ;
}

//该Monitor被其他某个线程占用了,需要抢占  

// We've encountered genuine contention.
assert (Self->_Stalled == 0, "invariant") ;

//记录需要抢占的Monitor指针  

Self->_Stalled = intptr_t(this) ;
//Knob_SpinEarly默认为1,即为true
//TrySpin让当前线程自旋,自旋的次数默认可以自适应调整,如果进入安全点同步则退出自旋,返回1表示抢占成功
// Try one round of spinning *before* enqueueing Self
// and before going through the awkward and expensive state
// transitions. The following spin is strictly optional …
// Note that if we acquire the monitor from an initial spin
// we forgo posting JVMTI events and firing DTRACE probes.
if (Knob_SpinEarly && TrySpin (Self) > 0) {
assert (_owner == Self , "invariant") ;
assert (_recursions == 0 , "invariant") ;
assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
Self->_Stalled = 0 ;
return ;
}
//自旋若干次数后依然抢占失败
assert (_owner != Self , "invariant") ;
assert (_succ != Self , "invariant") ;
assert (Self->is_Java_thread() , "invariant") ;
JavaThread * jt = (JavaThread *) Self ;
assert (!SafepointSynchronize::is_at_safepoint(), "invariant") ;
assert (jt->thread_state() != _thread_blocked , "invariant") ;
assert (this->object() != NULL , "invariant") ;
assert (_count >= 0, "invariant") ;

//原子的将_count属性加1,表示增加了一个抢占该锁的线程
// Prevent deflation at STW-time. See deflate_idle_monitors() and is_busy().
// Ensure the object-monitor relationship remains stable while there's contention.
Atomic::inc_ptr(&_count);

EventJavaMonitorEnter event;

{ // Change java thread status to indicate blocked on monitor enter.
//修改Java线程状态为BLOCKED_ON_MONITOR_ENTER,此代码块退出后还原成原来的
JavaThreadBlockedOnMonitorEnterState jtbmes(jt, this);

DTRACE\_MONITOR\_PROBE(contended\_\_enter, this, object(), jt);  
if (JvmtiExport::should\_post\_monitor\_contended\_enter()) {  
  JvmtiExport::post\_monitor\_contended\_enter(jt, this);  
}

OSThreadContendState osts(Self->osthread());  
ThreadBlockInVM tbivm(jt);

Self->set\_current\_pending\_monitor(this);

// TODO-FIXME: change the following for(;;) loop to straight-line code.  
for (;;) {  
  jt->set\_suspend\_equivalent();  
  // cleared by handle\_special\_suspend\_equivalent\_condition()  
  // or java\_suspend\_self()  

//会通过自旋,park等方式不断循环尝试获取锁,直到成功获取锁为止
EnterI (THREAD) ;

  if (!ExitSuspendEquivalent(jt)) break ;

  //  
  // We have acquired the contended monitor, but while we were  
  // waiting another thread suspended us. We don't want to enter  
  // the monitor while suspended because that would surprise the  
  // thread that suspended us.  
  //  
      \_recursions = 0 ;  
  \_succ = NULL ;  
  exit (false, Self) ;

  jt->java\_suspend\_self();  
}  
Self->set\_current\_pending\_monitor(NULL);  

}

Atomic::dec_ptr(&_count);
assert (_count >= 0, "invariant") ;
Self->_Stalled = 0 ;

// Must either set _recursions = 0 or ASSERT _recursions == 0.
assert (_recursions == 0 , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_succ != Self , "invariant") ;
assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;

// The thread -- now the owner -- is back in vm mode.
// Report the glorious news via TI,DTrace and jvmstat.
// The probe effect is non-trivial. All the reportage occurs
// while we hold the monitor, increasing the length of the critical
// section. Amdahl's parallel speedup law comes vividly into play.
//
// Another option might be to aggregate the events (thread local or
// per-monitor aggregation) and defer reporting until a more opportune
// time -- such as next time some thread encounters contention but has
// yet to acquire the lock. While spinning that thread could
// spinning we could increment JVMStat counters, etc.

DTRACE_MONITOR_PROBE(contended__entered, this, object(), jt);
if (JvmtiExport::should_post_monitor_contended_entered()) {
JvmtiExport::post_monitor_contended_entered(jt, this);
}

if (event.should_commit()) {
event.set_klass(((oop)this->object())->klass());
event.set_previousOwner((TYPE_JAVALANGTHREAD)_previous_owner_tid);
event.set_address((TYPE_ADDRESS)(uintptr_t)(this->object_addr()));
event.commit();
}

if (ObjectMonitor::_sync_ContendedLockAttempts != NULL) {
ObjectMonitor::_sync_ContendedLockAttempts->inc() ;
}
}

// Caveat: TryLock() is not necessarily serializing if it returns failure.
// Callers must compensate as needed.

int ObjectMonitor::TryLock (Thread * Self) {
for (;;) {
void * own = _owner ;
if (own != NULL) return 0 ;
if (Atomic::cmpxchg_ptr (Self, &_owner, NULL) == NULL) {
// Either guarantee _recursions == 0 or set _recursions = 0.
assert (_recursions == 0, "invariant") ;
assert (_owner == Self, "invariant") ;
// CONSIDER: set or assert that OwnerIsThread == 1
return 1 ;
}
// The lock had been free momentarily, but we lost the race to the lock.
// Interference -- the CAS failed.
// We can either return -1 or retry.
// Retry doesn't make as much sense because the lock was just acquired.
if (true) return -1 ;
}
}

标记了比较重要的方法,主要流程就是

  • cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ; cas,期望无人使用锁,(即 MonitorObject._owner为0x0),

  • 比如这个时候线程A 会在wati方法在释放锁之前这个_owner的值为ThreadA线程的一个包装对象,那么此时需要等待锁

  • 在ThreadA释放锁之后,这个时候如果进入执行这个方法那么,直接回返回,代表已经获取了锁

  • 锁进行等待的代码主要是EnterI ,主要实现如下

  • for (;;) {

        if (TryLock (Self) > 0) break ;  
        assert (\_owner != Self, "invariant") ;
    if ((SyncFlags &amp; 2) &amp;&amp; \_Responsible == NULL) {  
       Atomic::cmpxchg\_ptr (Self, &amp;\_Responsible, NULL) ;  
    }
    
    // park self  
    if (\_Responsible == Self || (SyncFlags &amp; 1)) {  
        TEVENT (Inflated enter - park TIMED) ;  
        Self-&gt;\_ParkEvent-&gt;park ((jlong) RecheckInterval) ;  
        // Increase the RecheckInterval, but clamp the value.  
        RecheckInterval \*= 8 ;  
        if (RecheckInterval &gt; 1000) RecheckInterval = 1000 ;  
    } else {  
        TEVENT (Inflated enter - park UNTIMED) ;  
        Self-&gt;\_ParkEvent-&gt;park() ;  
    }
    
    if (TryLock(Self) &gt; 0) break ;
    
    // The lock is still contested.  
    // Keep a tally of the # of futile wakeups.  
    // Note that the counter is not protected by a lock or updated by atomics.  
    // That is by design - we trade "lossy" counters which are exposed to  
    // races during updates for a lower probe effect.  
    TEVENT (Inflated enter - Futile wakeup) ;  
    if (ObjectMonitor::\_sync\_FutileWakeups != NULL) {  
       ObjectMonitor::\_sync\_FutileWakeups-&gt;inc() ;  
    }  
    ++ nWakeups ;
    
    // Assuming this is not a spurious wakeup we'll normally find \_succ == Self.  
    // We can defer clearing \_succ until after the spin completes  
    // TrySpin() must tolerate being called with \_succ == Self.  
    // Try yet another round of adaptive spinning.  
    if ((Knob\_SpinAfterFutile &amp; 1) &amp;&amp; TrySpin (Self) &gt; 0) break ;
    
    // We can find that we were unpark()ed and redesignated \_succ while  
    // we were spinning.  That's harmless.  If we iterate and call park(),  
    // park() will consume the event and return immediately and we'll  
    // just spin again.  This pattern can repeat, leaving \_succ to simply  
    // spin on a CPU.  Enable Knob\_ResetEvent to clear pending unparks().  
    // Alternately, we can sample fired() here, and if set, forgo spinning  
    // in the next iteration.
    
    if ((Knob\_ResetEvent &amp; 1) &amp;&amp; Self-&gt;\_ParkEvent-&gt;fired()) {  
       Self-&gt;\_ParkEvent-&gt;reset() ;  
       OrderAccess::fence() ;  
    }  
    if (\_succ == Self) \_succ = NULL ;
    
    // Invariant: after clearing \_succ a thread \*must\* retry \_owner before parking.  
    OrderAccess::fence() ;  
    }

    上边的方法里面可以主要看出

    • TryLock(Self),试着cas一把,如果没人占用锁就,占用锁

    • Self->_ParkEvent->park ((jlong) RecheckInterval) ; 线程挂起一段时间

    • 醒了之后,接着TryLock(Self),若不成功则一直循环,

接着看线程A在wait()的时候做了什么

此时线程B在一直循环等待获取锁,线程A开始执行wait方法,2者是并行执行的

可以看到其实Thread9 就是线程A在jvm中对应的线程

那么看到在threadA执行完wait方法之后,先释放了锁.在把自己所对应的linux线程挂起

那么还查threadB.notify分析

流程:

lock.notify()   //调用lock对象的方法

-->                 //这个obj就是这个lock,将lock作为参数

ObjectSynchronizer::notify(obj, CHECK);

-->                 //找到obj对象上的重量锁ObjectMonitor

ObjectSynchronizer::inflate(THREAD, obj())->notify(THREAD);
-->                 // 调用锁上的notify方法

void ObjectMonitor::notify(TRAPS)

那么现在进入关键步骤

进入的去DequeueWaiter()

接着

打印

(gdb) p _WaitSet
$1 = (ObjectWaiter * volatile) 0x7f88bbbfa110

(gdb) p *node
$6 = (ObjectWaiter) {
= {
= {
_vptr.AllocatedObj = 0x7f88d66797f0
}, },
members of ObjectWaiter:
_next = 0x7f88bbbfa110,
_prev = 0x7f88bbbfa110,
_thread = 0x7f88d0160800,
_notifier_tid = 140225242177872,
_event = 0x7f88d0161b00,
_notified = 0,
TState = ObjectWaiter::TS_WAIT,
_Sorted = (unknown: 3019902016),
_active = false
}

看这个链表节点的next和priev都是自己

ObjectWaiter* next = node->_next;   找到下一个节点node3
   if (next == node) {  //如果next指向自己说明,只有一个node节点,那么久就将_WaitSet置空
  assert(node->_prev == node, "invariant check");
  _WaitSet = NULL;
 } else {    //不进入这里,这里是中间节点的情况
  ObjectWaiter* prev = node->_prev;  找到上一个节点 ,命名node1

  next->_prev = prev;    令下一个节点的上一个指向 node1
  prev->_next = next;     令node1的_next指针指向node3
  if (_WaitSet == node) {    
  _WaitSet = next;
    }
  }
  node->_next = NULL; 接着道这里,将node节点的next ,priev置空
  node->_prev = NULL;

这就

那么这样就取出来了objectWaiter 对象

接下来对ObjectWaiter对象的处理方式,根据Policy的不同而不同:
Policy == 0:放入_EntryList队列的排头位置;
Policy == 1:放入_EntryList队列的末尾位置;
Policy == 2:_EntryList队列为空就放入_EntryList,否则放入_cxq队列的排头位置;

那么接着进入ThreadB的monitorexit指令

小结一下,线程B释放了锁之后,执行的操作如下:

偏向锁逻辑,此处未命中;
根据QMode的不同,将ObjectWaiter从_cxq或者_EntryList中取出后唤醒;
唤醒的元素会继续执行挂起前的代码,按照我们之前的分析,线程唤醒后,就会通过CAS去竞争锁,此时由于线程B已经释放了锁,那么此时应该能竞争成功;

唤醒目标线程,就是A之前wait方法里面挂起了,那么接着看wait方法的实现

还有

//ReenterI和EnterI的逻辑基本相同,用于获取对象锁
void ATTR ObjectMonitor::ReenterI (Thread * Self, ObjectWaiter * SelfNode) {
assert (Self != NULL , "invariant") ;
assert (SelfNode != NULL , "invariant") ;
assert (SelfNode->_thread == Self , "invariant") ;
assert (_waiters > 0 , "invariant") ;
//校验目标对象的对象头就是当前ObjectMonitor的指针
assert (((oop)(object()))->mark() == markOopDesc::encode(this) , "invariant") ;
assert (((JavaThread *)Self)->thread_state() != _thread_blocked, "invariant") ;
JavaThread * jt = (JavaThread *) Self ;

int nWakeups = 0 ;  
for (;;) {  
    ObjectWaiter::TStates v = SelfNode->TState ;  
    //校验状态  
    guarantee (v == ObjectWaiter::TS\_ENTER || v == ObjectWaiter::TS\_CXQ, "invariant") ;  
    assert    (\_owner != Self, "invariant") ;

    //尝试获取锁  
    if (TryLock (Self) > 0) break ;  
    //尝试自旋获取锁  
    if (TrySpin (Self) > 0) break ;

    TEVENT (Wait Reentry - parking) ;

    {  
       //修改线程状态  
       OSThreadContendState osts(Self->osthread());  
       ThreadBlockInVM tbivm(jt);

       jt->set\_suspend\_equivalent();  
       //SyncFlags默认是0  
       if (SyncFlags & 1) {  
          Self->\_ParkEvent->park ((jlong)1000) ;  
       } else {  
          Self->\_ParkEvent->park () ;  
       }

       // were we externally suspended while we were waiting?  
       for (;;) {  
          //ExitSuspendEquivalent默认返回false  
          if (!ExitSuspendEquivalent (jt)) break ;  
          if (\_succ == Self) { \_succ = NULL; OrderAccess::fence(); }  
          jt->java\_suspend\_self();  
          jt->set\_suspend\_equivalent();  
       }  
    }

    //尝试获取锁  
    if (TryLock(Self) > 0) break ;

    TEVENT (Wait Reentry - futile wakeup) ;  
    ++ nWakeups ;

    // Assuming this is not a spurious wakeup we'll normally  
    // find that \_succ == Self.  
    if (\_succ == Self) \_succ = NULL ;

    // Invariant: after clearing \_succ a contending thread  
    // \*must\* retry  \_owner before parking.  
    OrderAccess::fence() ;

    if (ObjectMonitor::\_sync\_FutileWakeups != NULL) {  
      ObjectMonitor::\_sync\_FutileWakeups->inc() ;  
    }  
}//for循环结束

//for循环结束,已经获取了锁  
assert (\_owner == Self, "invariant") ;  
assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;  
//从链表中移除  
UnlinkAfterAcquire (Self, SelfNode) ;  
if (\_succ == Self) \_succ = NULL ;  
assert (\_succ != Self, "invariant") ;  
//修改状态为TS\_RUN  
SelfNode->TState = ObjectWaiter::TS\_RUN ;  
OrderAccess::fence() ;      // see comments at the end of EnterI()  

}

本文结束

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器