mirror of
https://github.com/hs-web/hsweb-framework.git
synced 2026-06-09 01:14:16 +08:00
修改redisReadWriteLock,redis操作原子性
未测试 读锁自动过期未完成
This commit is contained in:
@@ -1,10 +1,16 @@
|
||||
package org.hsweb.concurrent.lock.support.redis;
|
||||
|
||||
import org.springframework.core.io.ClassPathResource;
|
||||
import org.springframework.data.redis.connection.StringRedisConnection;
|
||||
import org.springframework.data.redis.core.RedisCallback;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.data.redis.core.script.DefaultRedisScript;
|
||||
import org.springframework.scripting.support.ResourceScriptSource;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import java.util.UUID;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
@@ -20,20 +26,34 @@ public class RedisReadWriteLock implements ReadWriteLock {
|
||||
private WriteLock writeLock;
|
||||
private long lockKeyExpireTime = DEFAULT_EXPIRE;
|
||||
private long waitTime = 30;
|
||||
protected byte[] lockValue;
|
||||
private byte[] readLockKey, writeLockKey;
|
||||
protected String lockValue;
|
||||
private String readLockKey, writeLockKey;
|
||||
|
||||
private RedisTemplate redisTemplate;
|
||||
private static DefaultRedisScript<Boolean> redisScriptRead;
|
||||
private static DefaultRedisScript<Boolean> redisScriptWrite;
|
||||
|
||||
static {
|
||||
//初始化脚本
|
||||
redisScriptRead = new DefaultRedisScript<>();
|
||||
redisScriptWrite = new DefaultRedisScript<>();
|
||||
|
||||
redisScriptRead.setScriptSource(new ResourceScriptSource(new ClassPathResource("MEAT_INF/scripts/vcheckAndSadd.lua")));
|
||||
redisScriptRead.setResultType(Boolean.class);
|
||||
redisScriptWrite.setScriptSource(new ResourceScriptSource(new ClassPathResource("MEAT_INF/scripts/scheckAndVset.lua")));
|
||||
redisScriptWrite.setResultType(Boolean.class);
|
||||
}
|
||||
|
||||
private StringRedisTemplate redisTemplate;
|
||||
|
||||
public RedisReadWriteLock(String key, RedisTemplate redisTemplate) {
|
||||
Assert.notNull(key);
|
||||
Assert.notNull(redisTemplate);
|
||||
this.redisTemplate = redisTemplate;
|
||||
this.redisTemplate = new StringRedisTemplate(redisTemplate.getConnectionFactory());
|
||||
readLock = new ReadLock();
|
||||
writeLock = new WriteLock();
|
||||
readLockKey = (PREFIX + key + ".read.lock").getBytes();
|
||||
writeLockKey = (PREFIX + key + ".write.lock").getBytes();
|
||||
lockValue = (UUID.randomUUID().toString()).getBytes();
|
||||
readLockKey = PREFIX + key + ".read.lock";
|
||||
writeLockKey = PREFIX + key + ".write.lock";
|
||||
lockValue = UUID.randomUUID().toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -46,11 +66,11 @@ public class RedisReadWriteLock implements ReadWriteLock {
|
||||
return writeLock;
|
||||
}
|
||||
|
||||
private byte[] getReadKey() {
|
||||
private String getReadKey() {
|
||||
return readLockKey;
|
||||
}
|
||||
|
||||
private byte[] getWriteKey() {
|
||||
private String getWriteKey() {
|
||||
return writeLockKey;
|
||||
}
|
||||
|
||||
@@ -69,89 +89,101 @@ public class RedisReadWriteLock implements ReadWriteLock {
|
||||
this.lockKeyExpireTime = lockKeyExpireTime;
|
||||
}
|
||||
|
||||
|
||||
|
||||
class ReadLock implements Lock {
|
||||
public byte[] lockValue() {
|
||||
return new String(lockValue).concat(Thread.currentThread().getId() + "").getBytes();
|
||||
|
||||
private List<String> keys = new ArrayList<>();
|
||||
|
||||
public ReadLock() {
|
||||
super();
|
||||
keys.add(getWriteKey().toString());
|
||||
keys.add(getReadKey().toString());
|
||||
}
|
||||
|
||||
public String lockValue() {
|
||||
return new String(lockValue).concat(Thread.currentThread().getId() + "");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void lock() {
|
||||
redisTemplate.execute((RedisCallback<String>) connection -> {
|
||||
boolean locked = false;
|
||||
do {
|
||||
if (!connection.exists(getWriteKey())) {
|
||||
connection.setNX(getReadKey(), lockValue());
|
||||
connection.expire(getReadKey(), lockKeyExpireTime);
|
||||
locked = true;
|
||||
} else
|
||||
sleep();
|
||||
} while (!locked);
|
||||
return null;
|
||||
});
|
||||
|
||||
while (true) {
|
||||
Boolean locked = redisTemplate.execute(redisScriptRead, keys, lockValue());
|
||||
if (!locked) {
|
||||
sleep();
|
||||
} else {
|
||||
/*
|
||||
* 此处增加对所有读锁的过期
|
||||
* 1、防止项目停止,导致读锁一直存在
|
||||
*
|
||||
* @TODO 后期可以抽出到 redisScriptRead脚本中
|
||||
* */
|
||||
expire();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void lockInterruptibly() throws InterruptedException {
|
||||
boolean locked = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection ->
|
||||
{
|
||||
boolean writeLocked = connection.exists(getWriteKey());
|
||||
if (!writeLocked) {
|
||||
if (connection.setNX(getReadKey(), lockValue)) {
|
||||
connection.expire(getReadKey(), lockKeyExpireTime);
|
||||
}
|
||||
writeLocked = true;
|
||||
}
|
||||
return writeLocked;
|
||||
});
|
||||
if (!locked) throw new InterruptedException("");
|
||||
|
||||
boolean locked = redisTemplate.execute(redisScriptRead, keys, lockValue());
|
||||
if (locked) {
|
||||
expire();
|
||||
} else {
|
||||
throw new InterruptedException("could not get the read lock!");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tryLock() {
|
||||
return (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection ->
|
||||
{
|
||||
if (connection.setNX(getReadKey(), lockValue)) {
|
||||
connection.expire(getReadKey(), lockKeyExpireTime);
|
||||
}
|
||||
return false;
|
||||
});
|
||||
boolean locked = redisTemplate.execute(redisScriptRead, keys, lockValue());
|
||||
if (locked){
|
||||
expire();
|
||||
}
|
||||
return locked;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
|
||||
byte[] error = new byte[1];
|
||||
boolean success = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection -> {
|
||||
boolean locked = false;
|
||||
long startWith = System.nanoTime();
|
||||
do {
|
||||
if (!connection.exists(getWriteKey())) {
|
||||
connection.setNX(getReadKey(), lockValue);
|
||||
connection.expire(getReadKey(), lockKeyExpireTime);
|
||||
return true;
|
||||
}
|
||||
long now = System.nanoTime();
|
||||
if (now - startWith > unit.toNanos(time)) {
|
||||
error[0] = 1;
|
||||
return false;
|
||||
}
|
||||
sleep();
|
||||
} while (!locked);
|
||||
return false;
|
||||
});
|
||||
|
||||
boolean locked;
|
||||
long startWith = System.nanoTime();
|
||||
do {
|
||||
|
||||
locked = redisTemplate.execute(redisScriptRead, keys, lockValue());
|
||||
if (locked) {
|
||||
expire();
|
||||
break;
|
||||
}
|
||||
|
||||
long now = System.nanoTime();
|
||||
if (now - startWith > unit.toNanos(time)) {
|
||||
error[0] = 1;
|
||||
break;
|
||||
}
|
||||
sleep();
|
||||
} while (!locked);
|
||||
|
||||
if (error[0] == 1) {
|
||||
throw new InterruptedException("try lock time out!");
|
||||
}
|
||||
return success;
|
||||
return locked;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unlock() {
|
||||
redisTemplate.execute((RedisCallback) conn -> {
|
||||
byte[] lock = conn.get(getReadKey());
|
||||
if (lock == null) return null;
|
||||
StringRedisConnection strConn = (StringRedisConnection) conn;
|
||||
Set<String> locks = strConn.sMembers(getReadKey());
|
||||
|
||||
if (locks == null || locks.size() == 0)
|
||||
return null;
|
||||
//当前读锁为自己持有 才解锁
|
||||
if (new String(lock).equals(new String(lockValue()))) {
|
||||
conn.del(getReadKey());
|
||||
if (locks.contains(lockValue())) {
|
||||
strConn.sRem(getReadKey(), lockValue());
|
||||
}
|
||||
return null;
|
||||
});
|
||||
@@ -161,88 +193,99 @@ public class RedisReadWriteLock implements ReadWriteLock {
|
||||
public Condition newCondition() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
private void expire() {
|
||||
redisTemplate.expire(getReadKey(), lockKeyExpireTime, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
class WriteLock implements Lock {
|
||||
|
||||
private List<String> keys = new ArrayList<>();
|
||||
|
||||
public WriteLock() {
|
||||
super();
|
||||
keys.add(getReadKey());
|
||||
keys.add(getWriteKey());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void lock() {
|
||||
redisTemplate.execute((RedisCallback<String>) connection -> {
|
||||
boolean locked = false, readLocked = false;
|
||||
do {
|
||||
readLocked = connection.exists(getReadKey());
|
||||
if (!readLocked) {
|
||||
locked = connection.setNX(getWriteKey(), lockValue);
|
||||
connection.expire(getWriteKey(), lockKeyExpireTime);
|
||||
} else
|
||||
sleep();
|
||||
} while (!locked);
|
||||
return null;
|
||||
});
|
||||
|
||||
boolean locked;
|
||||
|
||||
do {
|
||||
locked = redisTemplate.execute(redisScriptWrite, keys, lockValue);
|
||||
if (locked) {
|
||||
expire();
|
||||
} else {
|
||||
sleep();
|
||||
}
|
||||
} while (!locked);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void lockInterruptibly() throws InterruptedException {
|
||||
boolean locked = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection ->
|
||||
{
|
||||
boolean readLocked = connection.exists(getReadKey());
|
||||
if (!readLocked) {
|
||||
boolean _locked = connection.setNX(getWriteKey(), lockValue);
|
||||
if (_locked) connection.expire(getWriteKey(), lockKeyExpireTime);
|
||||
return _locked;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
if (!locked) throw new InterruptedException("");
|
||||
boolean locked = redisTemplate.execute(redisScriptWrite, keys, lockValue);
|
||||
if (locked) {
|
||||
expire();
|
||||
} else {
|
||||
throw new InterruptedException("");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tryLock() {
|
||||
return (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection -> {
|
||||
if (connection.exists(getReadKey())) return false;
|
||||
boolean locked = connection.setNX(getWriteKey(), lockValue);
|
||||
if (locked)
|
||||
connection.expire(getWriteKey(), lockKeyExpireTime);
|
||||
return locked;
|
||||
});
|
||||
boolean locked = redisTemplate.execute(redisScriptWrite, keys, lockValue);
|
||||
if (locked) {
|
||||
expire();
|
||||
}
|
||||
return locked;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
|
||||
byte[] error = new byte[1];
|
||||
boolean success = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection -> {
|
||||
boolean locked = false;
|
||||
long startWith = System.nanoTime();
|
||||
do {
|
||||
if (!connection.exists(getReadKey())) {
|
||||
locked = connection.setNX(getWriteKey(), lockValue);
|
||||
if (locked) {
|
||||
connection.expire(getWriteKey(), lockKeyExpireTime);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
long now = System.nanoTime();
|
||||
if (now - startWith > unit.toNanos(time)) {
|
||||
error[0] = 1;
|
||||
return false;
|
||||
}
|
||||
sleep();
|
||||
} while (!locked);
|
||||
return null;
|
||||
});
|
||||
|
||||
boolean locked;
|
||||
long startWith = System.nanoTime();
|
||||
do {
|
||||
locked = redisTemplate.execute(redisScriptWrite, keys, lockValue);
|
||||
long now = System.nanoTime();
|
||||
if (now - startWith > unit.toNanos(time)) {
|
||||
error[0] = 1;
|
||||
break;
|
||||
}
|
||||
sleep();
|
||||
} while (!locked);
|
||||
|
||||
if (locked) {
|
||||
expire();
|
||||
}
|
||||
|
||||
if (error[0] == 1) {
|
||||
throw new InterruptedException("lock time out!");
|
||||
}
|
||||
return success;
|
||||
return locked;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unlock() {
|
||||
redisTemplate.execute((RedisCallback) conn -> conn.del(getWriteKey()));
|
||||
redisTemplate.execute((RedisCallback) conn -> {
|
||||
StringRedisConnection strConn = (StringRedisConnection) conn;
|
||||
|
||||
strConn.del(getWriteKey());
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Condition newCondition() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
private void expire() {
|
||||
redisTemplate.expire(getWriteKey(), lockKeyExpireTime, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
--
|
||||
-- Created by IntelliJ IDEA.
|
||||
-- User: aaa
|
||||
-- Date: 2016/9/19
|
||||
-- Time: 17:31
|
||||
-- To change this template use File | Settings | File Templates.
|
||||
--
|
||||
|
||||
local size = redis.call('SCARD',KEYS[1])
|
||||
if(size == 0)
|
||||
then
|
||||
local flag = redis.call('SETNX',KEYS[2],ARGV[1])
|
||||
if(flag)
|
||||
then
|
||||
return true
|
||||
else
|
||||
return false
|
||||
end
|
||||
else
|
||||
return false
|
||||
end
|
||||
@@ -0,0 +1,16 @@
|
||||
--
|
||||
-- Created by IntelliJ IDEA.
|
||||
-- User: aaa
|
||||
-- Date: 2016/9/19
|
||||
-- Time: 17:31
|
||||
-- To change this template use File | Settings | File Templates.
|
||||
--
|
||||
|
||||
local v = redis.call('GET',KEYS[1])
|
||||
if(v)
|
||||
then
|
||||
return false
|
||||
else
|
||||
redis.call('SADD',KEYS[2],ARGV[1])
|
||||
return true
|
||||
end
|
||||
Reference in New Issue
Block a user