Semaphore当前在多线程环境下被扩放使用,操作系统的信号量是个很重要的概念,在进程控制方面都有应用。Java 并发库 的Semaphore 可以很轻松完成信号量控制,Semaphore可以控制某个资源可被同时访问的个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。比如在Windows下可以设置共享文件的最大客户端访问个数。
Semaphore实现的功能就类似厕所有5个坑,假如有10个人要上厕所,那么同时只能有多少个人去上厕所呢?同时只能有5个人能够占用,当5个人中 的任何一个人让开后,其中等待的另外5个人中又有一个人可以占用了。另外等待的5个人中可以是随机获得优先机会,也可以是按照先来后到的顺序获得机会,这取决于构造Semaphore对象时传入的参数选项。单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了“锁”,再由另一个线程释放“锁”,这可应用于死锁恢复的一些场合。
Semaphore维护了当前访问的个数,提供同步机制,控制同时访问的个数。在数据结构中链表可以保存“无限”的节点,用Semaphore可以实现有限大小的链表。另外重入锁 ReentrantLock 也可以实现该功能,但实现上要复杂些。
下面的Demo中申明了一个只有5个许可的Semaphore,而有20个线程要访问这个资源,通过acquire()和release()获取和释放访问许可。
package concurrent;
/**
* TestSemaphore
*
* @author weiwei(Duan.Yu)
* @version 1.0.0 createTime: 2017/9/25 上午10:17
*/
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class TestSemaphore {
public static void main(String[] args) {
// 线程池
ExecutorService exec = Executors.newCachedThreadPool();
// 只能5个线程同时访问
final Semaphore semp = new Semaphore(5);
// 模拟20个客户端访问
for (int index = 0; index < 20; index++) {
final int NO = index;
Runnable run = new Runnable() {
public void run() {
try {
// 获取许可
semp.acquire();
System.out.println("Accessing: " + NO);
Thread.sleep((long) (Math.random() * 10000));
// 访问完后,释放
semp.release();
System.out.println("-----------------" + semp.availablePermits());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
exec.execute(run);
}
// 退出线程池
exec.shutdown();
}
}
追加一个例子:1秒钟内得不到许可,则丢弃访问。
public class FlowControl {
/**
* 最大访问量
*/
private static final int MAX_ACCESS_COUNT = 20;
/**
* 只能有MAX_ACCESS_COUNT个线程数同时访问
*/
private static final Semaphore semaphore = new Semaphore(MAX_ACCESS_COUNT);
public static void main(String[] args) {
// 线程池
ExecutorService exec = Executors.newCachedThreadPool();
// 模拟30个客户端
for (int i = 0; i < 30; i++) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
// 1秒钟内得不到许可,则丢弃访问。
if (semaphore.tryAcquire(1, TimeUnit.SECONDS)) {
System.out.println("正在执行...");
//做一些事情...
Thread.sleep(2 * 1000);
System.out.println("执行完毕!");
} else {
System.out.println("访问被拒绝!!!");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 执行完成,释放许可。
semaphore.release();
}
}
};
exec.execute(run);
}
// 关闭线程池
exec.shutdown();
}
}
分线程池例子
package concurrent;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
final static int MAX_QPS = 10;
final static Semaphore semaphore = new Semaphore(MAX_QPS);
public static void main(String... args) throws Exception {
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
semaphore.release(MAX_QPS / 2);
}
}, 1000, 500, TimeUnit.MILLISECONDS);
//lots of concurrent calls:100 * 1000
ExecutorService pool = Executors.newFixedThreadPool(100);
for (int i = 100; i > 0; i--) {
final int x = i;
pool.submit(new Runnable() {
@Override
public void run() {
for (int j = 1000; j > 0; j--) {
semaphore.acquireUninterruptibly(1);
remoteCall(x, j);
}
}
});
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.HOURS);
System.out.println("DONE");
}
private static void remoteCall(int i, int j) {
System.out.println(String.format("%s - %s: %d %d", new Date(),
Thread.currentThread(), i, j));
}
}