生产者消费者,用java线程实现生产者消费者同步问题


引言
生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间内共用同一个存储空间,如下图所示,生产者向空间里存放数据,而消费者取用数据,如果不加以协调可能会出现以下情况:
用java线程实现生产者消费者同步问题生产者消费者
生产者消费者图
存储空间已满,而生产者占用着它,消费者等着生产者让出空间从而去除产品,生产者等着消费者消费产品,从而向空间中添加产品。互相等待,从而发生死锁。
JAVA解决线程模型的三种方式
1、wait()和notify()
1 package com.sinayun.main; 2 3 import java.util.LinkedList; 4 5 public class ProducerConsumer { 6 private LinkedList storeHouse = new LinkedList(); 7 private int MAX = 10; 8 9 public ProducerConsumer() { 10 } 11 12 public void start() { 13 new Producer().start(); 14 new Comsumer().start(); 15 } 16 17 class Producer extends Thread { 18 public void run() { 19 while (true) { 20 synchronized (storeHouse) { 21 try { 22 while (storeHouse.size() == MAX) { 23 System.out.println("storeHouse is full , please wait"); 24 storeHouse.wait(); 25 } 26 Object newOb = new Object(); 27 if (storeHouse.add(newOb)) { 28 System.out.println("Producer put a Object to storeHouse"); 29 Thread.sleep((long)(Math.random()* 3000)); 30 storeHouse.notify(); 31 } 32 } catch (InterruptedException ie) { 33 System.out.println("producer is interrupted!"); 34 } 35 36 } 37 } 38 } 39 } 40 41 class Comsumer extends Thread { 42 public void run() { 43 while(true) 44 { 45 synchronized (storeHouse) { 46 try { 47 while (storeHouse.size() == 0) { 48 System.out.println("storeHouse is empty , please wait"); 49 storeHouse.wait(); 50 } 51 storeHouse.removeLast(); 52 System.out.println("Comsumer get a Object from storeHouse"); 53 Thread.sleep((long)(Math.random()* 3000)); 54 storeHouse.notify(); 55 } catch (InterruptedException ie) { 56 System.out.println("Consumer is interrupted"); 57 } 58 59 } 60 } 61 62 } 63 } 64 65 public static void main(String[] args) throws Exception { 66 ProducerConsumer pc = new ProducerConsumer(); 67 pc.start(); 68 } 69 }
2、await()和signal(),即线程锁的方式
1 package com.sinayun.main; 2 3 import java.util.LinkedList; 4 import java.util.concurrent.locks.Condition; 5 import java.util.concurrent.locks.Lock; 6 import java.util.concurrent.locks.ReentrantLock; 7 8 public class PC2{ 9 private LinkedList myList = new LinkedList(); 10 private int MAX = 10; 11 private final Lock lock = new ReentrantLock(); 12 private final Condition full = lock.newCondition(); 13 private final Condition empty = lock.newCondition(); 14 15 public PC2(){ 16 } 17 18 public void start(){ 19 new Producer().start(); 20 new Consumer().start(); 21 } 22 23 public static void main(String[] args) throws Exception{ 24 PC2 s2 = new PC2(); 25 s2.start(); 26 } 27 28 class Producer extends Thread{ 29 public void run(){ 30 while(true){ 31 lock.lock(); 32 try{ 33 while(myList.size() == MAX){ 34 System.out.println("warning: it's full!"); 35 full.await(); 36 } 37 Object o = new Object(); 38 if(myList.add(o)){ 39 System.out.println("Producer: " + o); 40 empty.signal(); 41 } 42 }catch(InterruptedException ie){ 43 System.out.println("producer is interrupted!"); 44 }finally{ 45 lock.unlock(); 46 } 47 } 48 } 49 } 50 51 class Consumer extends Thread{ 52 public void run(){ 53 while(true){ 54 lock.lock(); 55 try{ 56 while(myList.size() == 0){ 57 System.out.println("warning: it's empty!"); 58 empty.await(); 59 } 60 Object o = myList.removeLast(); 61 System.out.println("Consumer: " + o); 62 full.signal(); 63 }catch(InterruptedException ie){ 64 System.out.println("consumer is interrupted!"); 65 }finally{ 66 lock.unlock(); 67 } 68 } 69 } 70 } 71 72 }
3、阻塞队列的方式
用java线程实现生产者消费者同步问题生产者消费者用java线程实现生产者消费者同步问题生产者消费者阻塞队列 1 import java.util.concurrent.*; 2 3 public class PC3{ 4 //建立一个阻塞队列 5 private LinkedBlockingQueue queue = new LinkedBlockingQueue(10); 6 private int MAX = 10; 7 8 public PC3(){ 9 } 10 11 public void start(){ 12 new Producer().start(); 13 new Consumer().start(); 14 } 15 16 public static void main(String[] args) throws Exception{ 17 PC3 s3 = new PC3(); 18 s3.start(); 19 } 20 21 class Producer extends Thread{ 22 public void run(){ 23 while(true){ 24 try{ 25 Object o = new Object(); 26 // 取出一个对象 27 queue.put(o); 28 System.out.println("Producer: " + o); 29 }catch(InterruptedException e){ 30 System.out.println("producer is interrupted!"); 31 } 32 //} 33 } 34 } 35 } 36 37 class Consumer extends Thread{ 38 public void run(){ 39 while(true){ 40 try{ 41 // 取出一个对象 42 Object o = queue.take(); 43 System.out.println("Consumer: " + o); 44 }catch(InterruptedException e){ 45 System.out.println("producer is interrupted!"); 46 } 47 //} 48 } 49 } 50 } 51 52 }
结论
三种方式原理一致,都是对独占空间加锁,阻塞和唤醒线程,第一种方式比较传统,第三种方式最简单,只需存储和取用,线程同步的操作交由LinkedBlockingQueue全权处理。
Tags:  生产者和消费者 生产者消费者代码 生产者消费者程序 生产者消费者问题 生产者消费者

延伸阅读

最新评论

发表评论