Advanced Java Services | ArrayBlockingQueue und LinkedBlockingQueue |
Eine nach unten und oben begrenzte BlockingQueue, die im Hintergrund mit einem Array arbeitet. Die obere Grenze muß über den Konstruktor angegeben. Aus der API:
This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue.
This is a classic "bounded buffer", in which a fixed-sized array holds elements inserted by producers and extracted by consumers. Once created, the capacity cannot be changed. Attempts to put an element into a full queue will result in the operation blocking; attempts to take an element from an empty queue will similarly block.
Eine nach unten begrenzte BlockingQueue. Ohne Angabe einer oberen Grenze wird Integer.MAX_VALUE als maximaler Wert gesetzt. Bei Producer-Consumer Modellen, wo der Producer sehr aktiv ist kann man über den Konstruktor eine obere Grenze angeben.
This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue.
Eine LinkedBlockingQueue wird z. Bsp. vom SingleThreadExecutor und vom FixedThreadPool verwendet, zwei Executorservices, den die Factoryklasse Executors liefert.
Das Lager entfällt. Die BlockingQueue ist nun unser Lager. Den Typ zu speichernden Objekte geben wir generisch an. Jeder nicht primitive Datentyp ist möglich. Consumer und Producer erhalten nun über den Konstruktor die BlockingQueue. Sie sind im Prinzip wie vorher aufgebaut. Es werden nur Konsolmeldungen eingebaut, die den Ablauf dokumentieren, da das in der BackingQueue nicht mehr geschehen kann.
import java.util.concurrent.TimeUnit; import java.util.concurrent.BlockingQueue; public class Consumer extends Thread { private BlockingQueue<Integer> blockingQueue; private int sleepTime; public Consumer(BlockingQueue<Integer> blockingQueue, String name, int sleepTime) { this.blockingQueue = blockingQueue; this.setName(name); this.sleepTime = sleepTime; } public void run() { int value=0; for(;;) { try { value = blockingQueue.take(); // abholen System.out.println("removed 1, size = " + blockingQueue.size()); TimeUnit.MILLISECONDS.sleep((int)(Math.random()*sleepTime)); } catch(InterruptedException ex) { System.out.println(ex); interrupt(); System.out.println(Thread.currentThread().getName() + " end"); break; } } } }
import java.util.concurrent.TimeUnit; import java.util.concurrent.BlockingQueue; public class Producer extends Thread { private BlockingQueue<Integer> blockingQueue; private int sleepTime; public Producer(BlockingQueue<Integer> blockingQueue, String name, int sleepTime) { this.blockingQueue = blockingQueue; this.setName(name); this.sleepTime = sleepTime; } public void run() { for(;;) { try { blockingQueue.put(1); // auffüllen um 1 System.out.println("filled 1, size = " + blockingQueue.size()); TimeUnit.MILLISECONDS.sleep((int)(Math.random()*sleepTime)); } catch(InterruptedException ex) { System.out.println(ex); interrupt(); System.out.println(this.getName() + " end"); break; } } } }
Wir legen in main beide Queues an und produzieren verschiedene Abläufe. Durch verändern der sleep()-Zeiten kann man verschiedene Aktivitäten nachstellen. Kleine Wartezeiten erhöhen die Aktivität von Consumer bzw. Producer.
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class Main { public static void main(String[] args) { BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(7); // 7 ist die obere Grenze //LinkedBlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(); blockingQueue.add(1); blockingQueue.add(1); blockingQueue.add(1); Consumer c1 = new Consumer(blockingQueue, "consumer", 100); Producer p1 = new Producer(blockingQueue, "producer", 20); Consumer c2 = new Consumer(blockingQueue, "consumer2", 100); // Producer p2 = new Producer(lager, "producer2", 50); p1.start(); c1.start(); // p2.start(); c2.start(); try { Thread.sleep(1000); c1.interrupt(); p1.interrupt(); c2.interrupt(); // p2.interrupt(); } catch(InterruptedException e) { e.printStackTrace(); } } }
ArrayBlockingQueue<Integer>(7)
Producer ist aktiver als beide Consumer
Consumer c1 = new Consumer(blockingQueue, "consumer", 100);
Producer p1 = new Producer(blockingQueue, "producer", 20);
Consumer c2 = new Consumer(blockingQueue, "consumer2", 100);
filled 1, size = 4 removed 1, size = 2 removed 1, size = 3 filled 1, size = 3 filled 1, size = 4 removed 1, size = 3 removed 1, size = 2 filled 1, size = 3 filled 1, size = 4 filled 1, size = 5 filled 1, size = 6 filled 1, size = 7 removed 1, size = 6 filled 1, size = 7 removed 1, size = 6 filled 1, size = 7 removed 1, size = 6 filled 1, size = 7 removed 1, size = 6 filled 1, size = 7 removed 1, size = 6 filled 1, size = 7 removed 1, size = 6 filled 1, size = 7 removed 1, size = 6 filled 1, size = 7 removed 1, size = 6 removed 1, size = 5 removed 1, size = 4 filled 1, size = 5 filled 1, size = 6 filled 1, size = 7 removed 1, size = 6 filled 1, size = 7 removed 1, size = 6 filled 1, size = 7 removed 1, size = 6 filled 1, size = 7 removed 1, size = 6 filled 1, size = 7 filled 1, size = 7 removed 1, size = 7 java.lang.InterruptedException java.lang.InterruptedException: sleep interrupted consumer end java.lang.InterruptedException: sleep interrupted producer end consumer2 end
LinkedBlockingQueue<Integer>
Producer ist aktiver als beide Consumer
Consumer c1 = new Consumer(blockingQueue, "consumer", 100);
Producer p1 = new Producer(blockingQueue, "producer", 20);
Consumer c2 = new Consumer(blockingQueue, "consumer2", 100);
filled 1, size = 3 removed 1, size = 2 removed 1, size = 3 filled 1, size = 3 filled 1, size = 4 filled 1, size = 5 removed 1, size = 4 filled 1, size = 5 removed 1, size = 4 filled 1, size = 5 filled 1, size = 6 filled 1, size = 7 removed 1, size = 6 filled 1, size = 7 filled 1, size = 8 filled 1, size = 9 filled 1, size = 10 filled 1, size = 11 filled 1, size = 12 removed 1, size = 11 filled 1, size = 12 removed 1, size = 11 filled 1, size = 12 removed 1, size = 11 filled 1, size = 12 filled 1, size = 13 filled 1, size = 14 filled 1, size = 15 removed 1, size = 14 filled 1, size = 15 filled 1, size = 16 filled 1, size = 17 filled 1, size = 18 filled 1, size = 19 filled 1, size = 20 filled 1, size = 21 removed 1, size = 20 filled 1, size = 21 filled 1, size = 22 filled 1, size = 23 removed 1, size = 22 filled 1, size = 23 filled 1, size = 24 filled 1, size = 25 removed 1, size = 24 filled 1, size = 25 filled 1, size = 26 removed 1, size = 25 filled 1, size = 26 filled 1, size = 27 filled 1, size = 28 filled 1, size = 29 filled 1, size = 30 removed 1, size = 29 filled 1, size = 30 filled 1, size = 31 filled 1, size = 32 filled 1, size = 33 filled 1, size = 34 removed 1, size = 33 filled 1, size = 34 filled 1, size = 35 java.lang.InterruptedException: sleep interrupted java.lang.InterruptedException: sleep interrupted consumer2 end producer end java.lang.InterruptedException: sleep interrupted consumer end
LinkedBlockingQueue<Integer>
Beide Consumer zusammen sind doppelt so aktiv wie der Producer
Consumer c1 = new Consumer(blockingQueue, "consumer", 50);
Producer p1 = new Producer(blockingQueue, "producer", 50);
Consumer c2 = new Consumer(blockingQueue, "consumer2", 50);
filled 1, size = 4 removed 1, size = 2 removed 1, size = 3 filled 1, size = 3 filled 1, size = 4 filled 1, size = 5 filled 1, size = 6 removed 1, size = 5 removed 1, size = 4 removed 1, size = 3 filled 1, size = 4 removed 1, size = 3 removed 1, size = 2 removed 1, size = 1 filled 1, size = 2 removed 1, size = 1 filled 1, size = 2 removed 1, size = 1 removed 1, size = 0 filled 1, size = 1 removed 1, size = 0 filled 1, size = 1 removed 1, size = 0 filled 1, size = 1 removed 1, size = 0 filled 1, size = 1 removed 1, size = 0 filled 1, size = 1 removed 1, size = 0 filled 1, size = 1 removed 1, size = 0 filled 1, size = 1 removed 1, size = 0 filled 1, size = 1 removed 1, size = 0 java.lang.InterruptedException java.lang.InterruptedException: sleep interrupted consumer2 end consumer end java.lang.InterruptedException: sleep interrupted producer end