文章目录
- 一、Redisson简单介绍
- 二、Redisson简单使用
-
- 1. maven引用
- 2. RedisConfig配置
- 3. StockRedissonService
- 4. 测试
- 三、Redisson源码
-
- 1. 加锁
- 2. 解锁
- 3. 自动续期
- 4. 总结
- 四、Redisson公平锁
- 五、Redisson读写锁
- 五、Redisson的RSemaphore信号量
-
- 1. Semaphore:
- 2. RSemaphore:实现分布式限流
- 六、Redisson的RCountDownLatch
-
- 1. CountDownLatch:允许一个或者多个线程去等待其他线程完成操作。
- 2. RCountDownLatch
- 七、 总结
一、Redisson简单介绍
Redisson是一个在Redis的基础上实现的Java驻内存数据网格,可参考Redisson官方文档使用,它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。
二、Redisson简单使用
1. maven引用
dependency>
groupId>org.redisson/groupId>
artifactId>redisson/artifactId>
version>3.19.1/version>
/dependency>
2. RedisConfig配置
@Configuration
public class RedissonConfig {
@Value("${spring.redis.host}")
private String redisAddress;
@Bean
public RedissonClient redissonClient(){
// 默认连接地址 127.0.0.1:6379
RedissonClient redisson = Redisson.create();
// Config config = new Config();
// config.useSingleServer().setAddress("redis://" + redisAddress); //redis服务器地址
// .setDatabase(0) //制定redis数据库编号
// .setUsername("").setPassword() // redis用户名密码
// .setConnectionMinimumIdleSize(10) //连接池最小空闲连接数
// .setConnectionPoolSize(50) //连接池最大线程数
// .setIdleConnectionTimeout(60000) //线程的超时时间
// .setConnectTimeout(6000) //客户端程序获取redis链接的超时时间
// .setTimeout(60000) //响应超时时间
// RedissonClient redisson = Redisson.create(config);
return redisson;
}
}
3. StockRedissonService
@Service
public class StockRedissonService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private RedissonClient redissonClient;
public void deduct() {
RLock rlock = redissonClient.getLock("lock");
rlock.lock();
try{
// 1。 查询库存
String stockStr = redisTemplate.opsForValue().get("stock");
if (!StringUtil.isNullOrEmpty(stockStr)) {
int stock = Integer.parseInt(stockStr);
// 2。 判断条件是否满足
if (stock > 0) {
// 3 更新redis
redisTemplate.opsForValue().set("stock", String.valueOf(stock - 1));
}
}
}finally {
rlock.unlock();
}
}
}
简单来说,就是直接
RLock rlock = redissonClient.getLock(“lock”);
获取到锁,然后lock()和unlock()即可。
4. 测试
并发达到了660,比之前自己写的redis LUA脚本还要高。
三、Redisson源码
1. 加锁
T> RFutureT> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommandT> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if ((redis.call('exists', KEYS[1]) == 0) " +
"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
这就是加锁的核心代码,其实本质上也就是LUA脚本+hash数据类型
// 如果这个key不存在,或者这个key是我自己的,那么state+1,并且重置失效时间
"if ((redis.call('exists', KEYS[1]) == 0) " +
"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 如果不满足,则以毫秒为单位,返回key的过期时间
"return redis.call('pttl', KEYS[1]);",
pttl: 以毫秒为单位返回 key 的剩余过期时间。
当 key 不存在时,返回 -2 。
当 key 存在但没有设置剩余生存时间时,返回 -1 。
否则,以毫秒为单位,返回 key 的剩余生存时间。
这其实和我们之前手写的lua脚本并没有什么太大差别,只是它这里return nil表示的是加锁成功
2. 解锁
protected RFutureBoolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
解锁的核心代码,和加锁一样,本质上也是LUA脚本+hash数据类型
// 如果key不是我的,直接返回nil,解锁失败
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 如果key是我的,那么state-1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// state-1后的值,如果>0,说明重入过了,那更新下失效时间
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
// state-1后的值,如果=0,说明全部解锁成功了,删除锁,并且publish
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
3. 自动续期
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
CompletionStageBoolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock {} expiration", getRawName(), e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
它也是使用的new TimerTask() ,同样也是一次性续期,若res=1,再次续期。
官方文档:如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
不同的是,即使redisson节点宕机,它还是可以自动续期,保证你的业务逻辑正常结束。
4. 总结
其实和我们之前手写实现的redis分布式锁原理差不多,但是它是配合着CompletableFuture去异步实现,更加高效
本质上还是LUA脚本+hash数据类型
四、Redisson公平锁
- 公平锁
非公平锁:来个线程就先试试能不能插队,不能插队才去后面排队
公平锁:线程都乖乖去后面排队去,不准插队
- 代码
StockController新增一个接口,用于调用公平锁
@GetMapping("fair/lock/{id}")
public String fairLock(@PathVariable("id")Long id){
stockService.fairLock(id);
return "hello fair lock";
}
StockRedissonService
public void fairLock(Long id) {
RLock fairLock = redissonClient.getFairLock("fairLock");
fairLock.lock();
try {
TimeUnit.SECONDS.sleep(3);
System.out.println("-----测试公平锁-----");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
fairLock.unlock();
}
}
- 测试
访问http://localhost:10010/fair/lock/1 从1一直访问到5
redis中有个等待队列
访问是顺序执行。
- 那如果把公平锁修改为非公平锁呢
RLock fairLock = redissonClient.getLock("fairLock");
- 重新测试
redis中并无等待队列
锁访问顺序也并不是先进先出。
ps:若使用nginx转发会超时重发,修改conf即可
添加如下:
proxy_connect_timeout 12000;
proxy_send_timeout 12000;
proxy_read_timeout 12000;
五、Redisson读写锁
读写 不能并发 加锁
写写 不能并发 加锁
读读 能并发 不加锁
StockController
@GetMapping("read/lock")
public String readLock(){
stockService.readLock();
return "read read lock";
}
@GetMapping("write/lock")
public String writeLock(){
stockService.writeLock();
return "write write lock";
}
StockRedissonService
public void readLock() {
RReadWriteLock lock = redissonClient.getReadWriteLock("rwLock");
lock.readLock().lock(10, TimeUnit.SECONDS);
// TODO 疯狂的读
// lock.readLock().unlock();
}
public void writeLock() {
RReadWriteLock lock = redissonClient.getReadWriteLock("rwLock");
lock.writeLock().lock(10, TimeUnit.SECONDS);
// TODO 疯狂的写
// lock.writeLock().unlock();
}
简单测试下,两个页面分别访问
http://localhost:10010/write/lock 写操作
http://localhost:10010/read/lock 读操作
写写: 多次写操作明显等待
写读:明显等待
读写:明显等待
读锁有多个,但写锁只有一个
读读:无等待现象
五、Redisson的RSemaphore信号量
1. Semaphore:
-
可以用来控制同时访问特定资源的线程数量,常用于限流场景。
-
Semaphore接收一个int整型值,表示 许可证数量。
-
线程通过调用acquire()获取许可证,执行完成之后通过调用release()归还许可证。
-
只有获取到许可证的线程才能运行,获取不到许可证的线程将会阻塞。
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i 6; i++) {
new Thread(()->{
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + ":抢到了停车位");
TimeUnit.SECONDS.sleep(new Random().nextInt(10));
System.out.println(Thread.currentThread().getName() + "停了会儿就开走了");
semaphore.release();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, i + "号车").start();
}
}
每次只会停三辆车,走一辆后才可以停下一辆,完美实现限流。
2. RSemaphore:实现分布式限流
StockRedissonService
public void semaphoreLock() {
RSemaphore semaphore = redissonClient.getSemaphore("semaphore");
semaphore.trySetPermits(3); //设置资源量,限流的线程数
try {
semaphore.acquire(); // 获取资源,获取资源成功的线程可以继续处理业务操作,否则会被阻塞住
redisTemplate.opsForList().rightPush("log",port + "端口获取了资源,开始处理业务逻辑" + Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(new Random().nextInt(10));
redisTemplate.opsForList().rightPush("log",port + "端口处理完,释放了资源" + Thread.currentThread().getName());
semaphore.release(); // 手动释放资源,后续请求线程可以获取该资源
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
用redis的list数据类型来记录日志,方便集群部署的日志查看。
- 注意:
semaphore.trySetPermits(3);//设置资源量,限流的线程数
这会在redis中成semaphore的key,如果要修改资源量,必须手动把redis中该key删除,否则只在代码中修改,重启后无法生效。
简单测试下,多次访问后
集群部署下,共用了信号量的限流,两个端口启动,同一时间限流了3个线程。
六、Redisson的RCountDownLatch
1. CountDownLatch:允许一个或者多个线程去等待其他线程完成操作。
- CountDownLatch接收一个int型参数,表示要等待的工作线程的个数。
- await(): 使当前线程进入同步队列进行等待,直到latch的值被减到0或者当前线程被中断,当前线程就会被唤醒。
- countDown(): 使latch的值减1,如果减到了0,则会唤醒所有等待在这个latch上的线程。
public static void main(String[] args) throws InterruptedException { // 班长线程
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "准备出门了");
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + "睡了一会,终于出门了");
countDownLatch.countDown();
}, i + "号同学" ).start();
}
countDownLatch.await();
System.out.println("班长锁门了");
}
班长等六个同学全部出门后才可以锁门
2. RCountDownLatch
- StockController
@GetMapping("test/countDown")
public String countDown(){
stockService.countDown();
return "出来了一位同学";
}
@GetMapping("test/latch")
public String testLatch(){
stockService.testLatch();
return "班长锁门了";
}
- countDown 同学出门
public void countDown() {
RCountDownLatch cdl = redissonClient.getCountDownLatch("cdl");
// TODO 出门准备完成
cdl.countDown();
}
- latch 等六位同学出门后,班长锁门
public void testLatch() {
RCountDownLatch cdl = redissonClient.getCountDownLatch("cdl");
cdl.trySetCount(6);
try {
cdl.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// TODO 准备锁门
}
RCountDownLatch cdl = redissonClient.getCountDownLatch("cdl");
其中的“cdl”即是存放在redis的key,两个方法必须命名一致。
分别访问接口
http://localhost/test/countDown 出门
http://localhost/test/latch 班长锁门
只有访问六次出门后,才可以成功锁门
redis中存放着cdl的key,也就是目前的等待线程数。
七、 总结
redisson: redis的java客户端,分布式锁
玩法:
- 引入依赖
- java配置类:RedissonConfig
- 代码使用
- 可重入锁RLock对象: CompletableFuture + LUA脚本 + hash
RLock rlock = redissonClient.getLock("xxx");
rlock.lock()/unlock()
- 公平锁:
RLock fairLock = redissonClient.getLock("xxx");
fairLock.lock()/unlock()
- 联锁 和 红锁
- 读写锁:
RReadWriteLock lock = redissonClient.getReadWriteLock("xxx");
lock.readLock().lock()/unlock();
lock.writeLock().lock()/unlock();
- 信号量:
RSemaphore semaphore = redissonClient.getSemaphore("xxx");
semaphore.trySetPermits(3); //设置资源量,限流的线程数
semaphore.acquire()/release()
- 闭锁:
RCountDownLatch cdl = redissonClient.getCountDownLatch("xxx");
cdl.trySetCount(6);
cdl.await()/countDown()