ReentrantLock lock = new ReentrantLock(true); // 公平锁 ReentrantLock lock = new ReentrantLock(false); // 非公平锁 ReentrantLock lock = new ReentrantLock(); // 非公平锁
1 2 3
publicReentrantLock(boolean fair){ sync = fair ? new FairSync() : new NonfairSync(); }
根据初始化的参数fair,就决定了内部类Sync的实例
compareAndSetState
NonfairSync的lock方法进入一个逻辑判断compareAndSetState
1 2 3 4 5 6 7
// Performs lock. Try immediate barge, backing up to normal acquire on failure. finalvoidlock(){ if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
protectedfinalbooleancompareAndSetState(int expect, int update){ // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
// The synchronization state. privatevolatileint state;
1 2 3 4 5 6 7
// Performs lock. Try immediate barge, backing up to normal acquire on failure. finalvoidlock(){ if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
/** * Sets the thread that currently owns exclusive access. * A {@code null} argument indicates that no thread owns access. * This method does not otherwise impose any synchronization or * {@code volatile} field accesses. * @param thread the owner thread */ protectedfinalvoidsetExclusiveOwnerThread(Thread thread){ exclusiveOwnerThread = thread; }
设置互斥锁的拥有线程(ExclusiveOwnerThread)为当前线程
acquire(arg)
1 2 3 4 5
publicfinalvoidacquire(int arg){ if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
/* Creates and enqueues node for current thread and given mode. * Params: * mode – Node.EXCLUSIVE for exclusive, Node.SHARED for shared * Returns: * the new node */ private Node addWaiter(Node mode){ Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
staticfinalclassNode{ /** Marker to indicate a node is waiting in shared mode */ staticfinal Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ staticfinal Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */ staticfinalint CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ staticfinalint SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ staticfinalint CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ staticfinalint PROPAGATE = -3;
/** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ volatileint waitStatus;
/** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueuing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */ volatile Node prev;
/** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */ volatile Node next;
/** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ volatile Thread thread;
/** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ Node nextWaiter;
/** * Returns true if node is waiting in shared mode. */ finalbooleanisShared(){ return nextWaiter == SHARED; }
/** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ final Node predecessor()throws NullPointerException { Node p = prev; if (p == null) thrownew NullPointerException(); else return p; }
Node() { // Used to establish initial head or SHARED marker }
Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; }
Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
排除一些常数的定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
staticfinalclassNode{ volatileint waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
// pred 是 node 的前置节点 privatestaticbooleanshouldParkAfterFailedAcquire(Node pred, Node node){ // 获取前置节点pred的waitStatus int ws = pred.waitStatus; // waitStatus值,指示后续线程需要uppark的话,返回true if (ws == Node.SIGNAL) // ws = -1 // 前驱节点已经设置了SIGNAL,闹钟已经设好,现在我可以安心睡觉(阻塞)了。 // 如果前驱变成了head,并且head的代表线程exclusiveOwnerThread释放了锁, // 就会来根据这个SIGNAL来唤醒自己 // 如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了 /* * This node has already set status asking a release * to signal it, so it can safely park. */ returntrue; // waitStatus值 > 0 实际上ws = 1 表示线程将被取消 if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ // 发现传入的前驱的状态大于0,即CANCELLED。说明前驱节点已经因为超时或响应了中断,而取消了自己。所以需要跨越掉这些CANCELLED节点,直到找到一个<=0的节点 // 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边 // 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被GC回收 do { // pred的前置指针设置为node的前置指针,也就是prev线程被取消了 // 并且将pred设置为pred的prev // 总的来说,就是想删除pred节点 head<=>pred<=>node // 因此将head设置为node的前置节点 // 再将pred指针指向head node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // pred再指向node pred.next = node; } else { // ws = 0 or -3 or -2 /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 进入这个分支,ws只能是0或PROPAGATE。CAS设置ws为SIGNAL // 如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } returnfalse; }
1 2 3 4 5 6 7 8 9 10 11 12
/** waitStatus value to indicate thread has cancelled */ staticfinalint CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ // // waitStatus值,指示后续线程需要uppark staticfinalint SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ staticfinalint CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ staticfinalint PROPAGATE = -3;
/** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ privatefinalbooleanparkAndCheckInterrupt(){ LockSupport.park(this); // 调用park()使线程进入waiting状态 return Thread.interrupted(); // 如果被唤醒,查看自己是不是被中断的 }
privatevoidcancelAcquire(Node node){ // Ignore if node doesn't exist if (node == null) return;
node.thread = null;
// Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next;
// Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); }