forked from echoTheLiar/JavaCodeAcc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathProducerAndConsumer.java
121 lines (98 loc) · 2.92 KB
/
ProducerAndConsumer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package concurrency;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
/**
* 使用 BlockingQueue 实现生产者-消费者模式,其中: 1. 生产者将生产的产品放置到“产品队列”里; 2. 消费者从“产品队列”里消费产品;
* 3. 定义多个生产者与消费者同时执行;4. 虽然有多个生产者,但是必须确保生产出的产品唯一,即 product+id 唯一;
*
* 设计上如有问题,欢迎拍砖~
*
* @author liu yuning
*
*/
class Product {
private final int id;
public Product(int id) {
super();
this.id = id;
}
@Override
public String toString() {
return "product" + this.id;
}
}
class Producer implements Runnable {
// 定义所有producer共享的计数器,保证在向“产品队列”里放置产品时,产品id唯一
static class Counter {
private static int count = 0;
public static int increase() {
return ++count;
}
}
private final int id;
BlockingQueue<Product> producerQueue;
public Producer(BlockingQueue<Product> producerQueue, int id) {
super();
this.id = id;
this.producerQueue = producerQueue;
}
@Override
public String toString() {
return "producer" + this.id;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(500);
// 生产产品时,对计数器加锁
synchronized (Counter.class) {
Product product = new Product(Counter.increase());
System.out.println("生产者" + this + "生产出:" + product);
producerQueue.put(product);
}
}
} catch (InterruptedException e) {
System.out.println(this + "interrupted");
}
}
}
class Consumer implements Runnable {
private final int id;
BlockingQueue<Product> consumerQueue;
public Consumer(BlockingQueue<Product> consumerQueue, int id) {
super();
this.id = id;
this.consumerQueue = consumerQueue;
}
@Override
public String toString() {
return "consumer" + this.id;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
Product consumerProduct = consumerQueue.take();
System.out.println("消费者" + this + "消费了:" + consumerProduct);
}
} catch (InterruptedException e) {
System.out.println(this + "interrupted");
}
}
}
public class ProducerAndConsumer {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Product> productQueue = new LinkedBlockingDeque<Product>();
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
executorService.execute(new Producer(productQueue, i));
executorService.execute(new Consumer(productQueue, i));
}
TimeUnit.SECONDS.sleep(10);
executorService.shutdownNow();
}
}