Java同步器是一种用于控制多线程访问共享资源的机制,它可以保证在同一时间只有一个线程可以执行特定的代码块。Java中提供了多种同步器,包括锁、信号量、条件对象、CountDownLatch、CyclicBarrier和Exchanger。
锁是最常用的同步器,它可以保证在同一时间只有一个线程能够执行特定的代码块。Java中提供了两种锁:显式锁和隐式锁。显式锁是通过java.util.concurrent.locks包中的Lock接口来实现的;而隐式锁则是通过synchronized关键字来实现的。
信号量是用来控制多个进程对共享资源的访问数量,它通过将共享资源的数量封装在Semaphore对象中来实现。当一个进程想要使用共享资源时,必须先获得信号量;当使用完成后,必须释放信号量。
条件对象也是一个常用的同步器,它使得一个或多个线程能够在特定条件满足时才能执行特定代码块。Java中使用Condition对象来表示条件对象;Condition对象必须依赖于Lock对象才能工作。
CountDownLatch是一个启动/倒数/释放机制;当CountDownLatch内部的count减少到0时(即所有子任务都已完成时),会释放所有正在await()方法上的子任务并激活之前await()方法上的子任务。
CountDownLatch latch = new CountDownLatch(10); // 初始化count=10
for (int i = 0; i < 10; i++) { // 创建并启动10个子任务
new Thread(() -> { // 每个子任务执行前会首先调用latch.countDown()方法将count减1
System.out.println("子任务" + Thread.currentThread().getId() + "开始执行"); // 打印当前子任务id
try { // 执行子任务耗时操作
Thread.sleep(1000); // 此处省略1000字... } catch (InterruptedException e) { e.printStackTrace(); } latch.countDown(); System.out.println("子任务" + Thread.currentThread().getId() + "执行完成"); }).start(); } try { latch.await(); // 等待所有子任务都执行完成 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("10个子任务都已执行完成");
CyclicBarrier也是一个常用的合步器;当CyclicBarrier内郩count减少到0时会释放之前await()方法上的子任劢并激活之前await()方法上的子任劢。CyclicBarrier不但能够使得多个进度圆员相互之间相互协作已达到公共目标;考还能够圆员之间相互协作已遂度公共目标之前还能够回历性地回历性地回历性地回历性地回历性地回历性地回历性地回历怩巩斤斤斤斤斤斤斤斤斤斤斤斤斤斤斩巩巩巩巩巩巩巩巩巩.
CyclicBarrier barrier = new CyclicBarrier(10); // 初始化count=10 for (int i = 0; i < 10; i++) { new Thread(() -> { System.out.println("子任劢" + ThreadJava 同步器
Java线程教程 - Java同步器
同步器对象与一组线程一起使用。
它维护一个状态,根据它的状态,它让一个线程通过或强迫它等待。
本节将讨论四种类型的同步器:
- Semaphores
- Barriers
- Latches
- Exchangers
信号量
信号量用于控制可以访问资源的线程数。
java.util.concurrent包中的Semaphore类表示信号量同步器。
您可以使用其构造函数创建信号量,如下所示:
final int MAX_PERMITS = 3;
Semaphore s = new Semaphores(MAX_PERMITS);
Semaphore类的另一个构造函数使用公平作为第二个参数
final int MAX_PERMITS = 3;
Semaphore s = new Semaphores(MAX_PERMITS, true); // A fair semaphore
如果你创建一个公平的信号量,在多线程请求许可的情况下,信号量将保证先进先出(FIFO)。也就是说,首先请求许可的线程将首先获得许可。
要获取许可证,请使用acquire()方法。
如果许可证可用,它立即返回。
它阻止如果许可证不可用。线程在等待许可证可用时可能中断。
Semaphore类的其他方法允许您一次性获取一个或多个许可证。要释放许可证,请使用release()方法。
以下代码显示了一个Restaurant类,它使用信号量来控制对表的访问。
import java.util.Random;
import java.util.concurrent.Semaphore;
class Restaurant {
private Semaphore tables;
public Restaurant(int tablesCount) {
this.tables = new Semaphore(tablesCount);
}
public void getTable(int customerID) {
try {
System.out.println("Customer #" + customerID
+ " is trying to get a table.");
tables.acquire();
System.out.println("Customer #" + customerID + " got a table.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void returnTable(int customerID) {
System.out.println("Customer #" + customerID + " returned a table.");
tables.release();
}
}
class RestaurantCustomer extends Thread {
private Restaurant r;
private int customerID;
private static final Random random = new Random();
public RestaurantCustomer(Restaurant r, int customerID) {
this.r = r;
this.customerID = customerID;
}
public void run() {
r.getTable(this.customerID); // Get a table
try {
int eatingTime = random.nextInt(30) + 1;
System.out.println("Customer #" + this.customerID
+ " will eat for " + eatingTime + " seconds.");
Thread.sleep(eatingTime * 1000);
System.out.println("Customer #" + this.customerID
+ " is done eating.");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
r.returnTable(this.customerID);
}
}
}
public class Main{
public static void main(String[] args) {
Restaurant restaurant = new Restaurant(2);
for (int i = 1; i <= 5; i++) {
RestaurantCustomer c = new RestaurantCustomer(restaurant, i);
c.start();
}
}
}
上面的代码生成以下结果。
障碍器
屏障使一组线在屏障点汇合。
来自到达屏障的组的线程等待,直到该组中的所有线程到达。
一旦组中的最后一个线程到达屏障,组中的所有线程都将被释放。
当你有一个可以分成子任务的任务时,你可以使用一个屏障;每个子任务可以在单独的线程中执行,并且每个线程必须在共同点处相遇以组合它们的结果。
java.util.concurrent包中的CyclicBarrier类提供了屏障同步器的实现。
CyclicBarrier类可以通过调用其reset()方法来重用。
以下代码显示了如何在程序中使用循环障碍。
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
class Worker extends Thread {
private CyclicBarrier barrier;
private int ID;
private static Random random = new Random();
public Worker(int ID, CyclicBarrier barrier) {
this.ID = ID;
this.barrier = barrier;
}
public void run() {
try {
int workTime = random.nextInt(30) + 1;
System.out.println("Thread #" + ID + " is going to work for " + workTime + " seconds");
Thread.sleep(workTime * 1000);
System.out.println("Thread #" + ID + " is waiting at the barrier.");
this.barrier.await();
System.out.println("Thread #" + ID + " passed the barrier.");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
System.out.println("Barrier is broken.");
}
}
}
public class Main {
public static void main(String[] args) {
Runnable barrierAction = () -> System.out.println("We are ready.");
CyclicBarrier barrier = new CyclicBarrier(3, barrierAction);
for (int i = 1; i <= 3; i++) {
Worker t = new Worker(i, barrier);
t.start();
}
}
}
上面的代码生成以下结果。
Phasers
Phaser提供类似于CyclicBarrier和CountDownLatch同步器的功能。它提供以下功能:
Phaser是可重复使用的。
在Phaser上同步的参与方数量可以动态更改。在循环障碍中,当创建障碍时,方的数量是固定的。
移相器具有相关的相位编号,从零开始。当所有注册方都到达移相器时,移相器进入下一个阶段,阶段编号加1。相位编号的最大值为Integer.MAX_VALUE。在其最大值之后,相位编号重新从零开始。
Phaser有终止状态。在终止状态的Phaser上调用的所有同步方法立即返回,而不等待提前。
移相器有三种类型的参与者计数:注册参与者计数,到达参与者计数和未参与方计数。
注册方数量是注册同步的方的数量。到达的当事方数目是已经到达移相器的当前阶段的各方的数目。
未携带者数量是尚未到达移动器的当前阶段的各方的数量。
当最后一方到达时,移相器前进到下一阶段。
或者,当所有注册方都到达移动器时,Phaser可以执行移相器操作。
CyclicBarrier允许您执行屏障操作,这是一个Runnable任务。
我们通过在Phaser类的onAdvance()方法中编写代码来指定移相器操作。
我们需要继承Phaser类,并覆盖onAdvance()方法以提供Phaser动作。
以下代码显示了如何表示通过在Phaser上同步启动的任务
import java.util.Random;
import java.util.concurrent.Phaser;
class StartTogetherTask extends Thread {
private Phaser phaser;
private String taskName;
private static Random rand = new Random();
public StartTogetherTask(String taskName, Phaser phaser) {
this.taskName = taskName;
this.phaser = phaser;
}
@Override
public void run() {
System.out.println(taskName + ":Initializing...");
int sleepTime = rand.nextInt(5) + 1;
try {
Thread.sleep(sleepTime * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(taskName + ":Initialized...");
phaser.arriveAndAwaitAdvance();
System.out.println(taskName + ":Started...");
}
}
public class Main {
public static void main(String[] args) {
Phaser phaser = new Phaser(1);
for (int i = 1; i <= 3; i++) {
phaser.register();
String taskName = "Task #" + i;
StartTogetherTask task = new StartTogetherTask(taskName, phaser);
task.start();
}
phaser.arriveAndDeregister();
}
}
上面的代码生成以下结果。
例子
以下代码显示了如何向Phaser添加Phaser Action。
import java.util.concurrent.Phaser;
public class Main {
public static void main(String[] args) {
Phaser phaser = new Phaser() {
protected boolean onAdvance(int phase, int parties) {
System.out.println("Inside onAdvance(): phase = " + phase
+ ", Registered Parties = " + parties);
// Do not terminate the phaser by returning false
return false;
}
};
// Register the self (the "main" thread) as a party
phaser.register();
System.out.println("#1: isTerminated():" + phaser.isTerminated());
phaser.arriveAndDeregister();
// Trigger another phase advance
phaser.register();
phaser.arriveAndDeregister();
System.out.println("#2: isTerminated():" + phaser.isTerminated());
phaser.forceTermination();
System.out.println("#3: isTerminated():" + phaser.isTerminated());
}
}
上面的代码生成以下结果。
例2
以下代码显示如何使用移相器生成一些整数。
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Phaser;
class AdderTask extends Thread {
private Phaser phaser;
private String taskName;
private List<Integer> list;
public AdderTask(String taskName, Phaser phaser, List<Integer> list) {
this.taskName = taskName;
this.phaser = phaser;
this.list = list;
}
@Override
public void run() {
do {
System.out.println(taskName + " added " + 3);
list.add(3);
phaser.arriveAndAwaitAdvance();
} while (!phaser.isTerminated());
}
}
public class Main {
public static void main(String[] args) {
final int PHASE_COUNT = 2;
Phaser phaser = new Phaser() {
public boolean onAdvance(int phase, int parties) {
System.out.println("Phase:" + phase + ", Parties:" + parties
+ ", Arrived:" + this.getArrivedParties());
boolean terminatePhaser = false;
if (phase >= PHASE_COUNT - 1 || parties == 0) {
terminatePhaser = true;
}
return terminatePhaser;
}
};
List<Integer> list = Collections.synchronizedList(new ArrayList<Integer>());
int ADDER_COUNT = 3;
phaser.bulkRegister(ADDER_COUNT + 1);
for (int i = 1; i <= ADDER_COUNT; i++) {
String taskName = "Task #" + i;
AdderTask task = new AdderTask(taskName, phaser, list);
task.start();
}
while (!phaser.isTerminated()) {
phaser.arriveAndAwaitAdvance();
}
int sum = 0;
for (Integer num : list) {
sum = sum + num;
}
System.out.println("Sum = " + sum);
}
}
上面的代码生成以下结果。
锁存器
锁存器使一组线程等待,直到它到达其终端状态。
一旦锁存器达到其终端状态,它允许所有线程通过。
与障碍不同,它是一个一次性的对象。它不能被重置和重用。
使用锁存器,其中在一定数量的一次性活动完成之前,多个活动不能进行。
例如,一个服务不应该启动,直到它依赖的所有服务都已启动。
java.util.concurrent包中的CountDownLatch类提供了一个锁存器的实现。
import java.util.concurrent.CountDownLatch;
class LatchHelperService extends Thread {
private int ID;
private CountDownLatch latch;
public LatchHelperService(int ID, CountDownLatch latch) {
this.ID = ID;
this.latch = latch;
}
public void run() {
try {
Thread.sleep(1000);
System.out.println("Service #" + ID + " has started...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
this.latch.countDown();
}
}
}
class LatchMainService extends Thread {
private CountDownLatch latch;
public LatchMainService(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
System.out.println("waiting for services to start.");
latch.await();
System.out.println("started.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Main {
public static void main(String[] args) {
// Create a countdown latch with 2 as its counter
CountDownLatch latch = new CountDownLatch(2);
LatchMainService ms = new LatchMainService(latch);
ms.start();
for (int i = 1; i <= 2; i++) {
LatchHelperService lhs = new LatchHelperService(i, latch);
lhs.start();
}
}
}
上面的代码生成以下结果。
交换器
交换器允许两个线程在同步点处等待彼此。
当两个线程到达时,它们交换一个对象并继续他们的活动。
Exchanger类提供了交换器同步器的实现。
以下代码显示将使用交换器与客户交换数据的生产者线程。
import java.util.ArrayList;
import java.util.concurrent.Exchanger;
class ExchangerProducer extends Thread {
private Exchanger<ArrayList<Integer>> exchanger;
private ArrayList<Integer> buffer = new ArrayList<Integer>();
public ExchangerProducer(Exchanger<ArrayList<Integer>> exchanger) {
this.exchanger = exchanger;
}
public void run() {
while (true) {
try {
System.out.println("Producer.");
Thread.sleep(1000);
fillBuffer();
System.out.println("Producer has produced and waiting:" + buffer);
buffer = exchanger.exchange(buffer);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void fillBuffer() {
for (int i = 0; i <= 3; i++) {
buffer.add(i);
}
}
}
class ExchangerConsumer extends Thread {
private Exchanger<ArrayList<Integer>> exchanger;
private ArrayList<Integer> buffer = new ArrayList<Integer>();
public ExchangerConsumer(Exchanger<ArrayList<Integer>> exchanger) {
this.exchanger = exchanger;
}
public void run() {
while (true) {
try {
System.out.println("Consumer.");
buffer = exchanger.exchange(buffer);
System.out.println("Consumer has received:" + buffer);
Thread.sleep(1000);
System.out.println("eating:"+buffer);
buffer.clear();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Main {
public static void main(String[] args) {
Exchanger<ArrayList<Integer>> exchanger = new Exchanger<>();
ExchangerProducer producer = new ExchangerProducer(exchanger);
ExchangerConsumer consumer = new ExchangerConsumer(exchanger);
producer.start();
consumer.start();
}
}
上面的代码生成以下结果。
Java网络教程 -Java URL绝对URI具有以下通用格式:scheme:scheme-specific-part scheme-specific-part 取决于 scheme 。例如,ht...
Java网络教程 - Java网络UDP服务器以下代码显示了如何编写UDP回显服务器:DatagramSocket socket= new DatagramSocket(12345);Da...
JavaFX教程 -JavaFX文本另一个基本的JavaFX节点是Text节点,它允许我们在场景图上显示测试。要创建 Text 节点,请使用 javafx.sc...
JavaFX教程 -JavaFX矩形椭圆avaFX Shape类定义了常见的形状,如线,矩形,圆,Arc,CubicCurve,Ellipse和QuadCurve。在场景图上...