bobuhiro11's diary

Javaでスレッドの同期処理

25 Oct 2013

同期処理の方法やデッドロックを防ぐ注意点など.

以下のような仕組みを持つものを,同期プリミティブと呼びます. (セマフォやモニタなど)

モニタ

javaでは同期プリミティブにモニタという仕組みを使っています. モニタとは,スレッド間で共有したいデータをメソッドなどで包んで そのメソッド内部でしか読み書きが出来ない状態にして, さらに同時に1つのスレッドからしか侵入出来ないようにしたもの. これで,モニタ内部のデータは排他処理ができるようになります. javaでは,共有データにprivate修飾子をつけ,メソッドにsynchronized修飾子 をつけることで,メソッド単位でモニタを実現できます. また,モニタを使って,条件の同期(condition Synchronization)を行います.

javaにはスレッドのwait,wakeupに関して以下の3つが準備されているので, これを使います.

例えば,0からNまでの整数間を動くカウンタは以下のように実装します. Counter::inc()やCounter::dec()内では,if文の代わりにwhile文を使います. (if文だとwait()から起きたときに,再評価されないので眠っている間に変更されている可能性がある) いくつのスレッドが待っているか分からないときは,notifyAll()で起こします. (必要以上に待つのを防ぐため)

public class Counter {
	private int count;
	private int N;
	
	public Counter(int max){
		count = 0;
		N = max;
	}
	
	public int get(){
		return count;
	}
	
	synchronized void inc() throws InterruptedException{
		while(count==N)
			wait();
		count++;
		notifyAll();
	}
	
	synchronized void dec() throws InterruptedException{
		while(count==0)
			wait();
		count--;
		notifyAll();
	}
}

public class IncRunnable implements Runnable{
	
	private Counter counter;
	
	public IncRunnable(Counter c){
		counter = c;
	}
	
	@Override
	public void run() {
		while(true){
			try {
				counter.inc();
			} catch (InterruptedException e) {}
		}
	}
}

public class DecRunnable implements Runnable{
    /* 省略 */
}

public class Main {
	public static void main(String[] args) {

		Counter counter = new Counter(10);
		new Thread(new IncRunnable(counter)).start();
		new Thread(new DecRunnable(counter)).start();
		
		for(int i=0;i<10000;i++)
			System.out.println(counter.get());
	}
}

セマフォ

ちなみに,セマフォは以下のようになります. Semaphore::wait()では,スレッドを起こす必要がないので,notify()は呼びません. また,Semaphore::signal()は,1つのスレッドを起こせば十分なので,notify()を使います.

public class Semaphore {
  private int n;

  public Semaphore(int n){
    this.n = n;
  }

  synchronized public void signal(){
    n++;
    notify();
  }

  synchronized public void  wait() throws InterruptedException{
    while(n==0)
      wait();
    n--;
  }
}

モニタのネスト

しかし,セマフォを使って以下のようなバッファを作ると困ったことになります.

public void Buffer(){
  Semaphore s1,s2;
  ...

  public Buffer(int n){
    s1 = new Semaphore(0); // 使用中
    s2 = new Semaphore(n); // 空き
  }
  
  synchronized public void put(Object o) throws InterruptedException{
    s2.wait(); 
    /* 書き込み処理 */
    s1.signal();
  }

  synchronized public Object get() throws InterruptedException{
    s1.wait(); 
    /* 読み込み処理 */
    s2.signal();
  }
}

実は,バッファ内に何もない状態(初期状態など)に get()を呼び出してしまうと,デッドロックになります.

wait()の他のスレッドに起こされるのを待つ.モニタに関連するSynchronizationロックはすべて解除という性質に原因があるようです.

スレッド1がBuffer::get()を呼び出し, その後スレッド2がBuffer::put()を 呼び出したとします.

  1. まず,s1.wait()でスレッド1がスリープします.
    • ここで,s1.wait()に関連するSynchronizationロックはすべて解除されますが,Buffer::get()に関連するSynchronizationロックは解除されません.
  2. 次に,スレッド2がBuffer::put()を呼び出すと,Buffer::get()のロックが解除されていないためデッドロックになります.

Synchronizationロックが二重でかかるのが原因なので,以下のようにネストを 解消すれば解決されます.良かったよかった.

public Object get() throws InterruptedException{
  s1.wait(); 
  synchronized(this){
    /* 読み込み処理 */
  }
  s2.signal();
}

comments powered by Disqus < Rubyの実装一覧のメモ debianパッケージメモ >