From 0d8e783e5752a12396829ef7550ba1d2f39e6caa Mon Sep 17 00:00:00 2001 From: stone Date: Fri, 23 Sep 2016 18:44:40 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9redisReadWriteLock=EF=BC=8Cre?= =?UTF-8?q?dis=E6=93=8D=E4=BD=9C=E5=8E=9F=E5=AD=90=E6=80=A7=20=E6=9C=AA?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=20=E8=AF=BB=E9=94=81=E8=87=AA=E5=8A=A8?= =?UTF-8?q?=E8=BF=87=E6=9C=9F=E6=9C=AA=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../support/redis/RedisReadWriteLock.java | 279 ++++++++++-------- .../main/resources/META-INF/scheckAndVset.lua | 21 ++ .../main/resources/META-INF/vcheckAndsAdd.lua | 16 + 3 files changed, 198 insertions(+), 118 deletions(-) create mode 100644 hsweb-web-concurrent/hsweb-web-concurrent-lock/src/main/resources/META-INF/scheckAndVset.lua create mode 100644 hsweb-web-concurrent/hsweb-web-concurrent-lock/src/main/resources/META-INF/vcheckAndsAdd.lua diff --git a/hsweb-web-concurrent/hsweb-web-concurrent-lock/src/main/java/org/hsweb/concurrent/lock/support/redis/RedisReadWriteLock.java b/hsweb-web-concurrent/hsweb-web-concurrent-lock/src/main/java/org/hsweb/concurrent/lock/support/redis/RedisReadWriteLock.java index 7a35b15cf..73731bfe7 100644 --- a/hsweb-web-concurrent/hsweb-web-concurrent-lock/src/main/java/org/hsweb/concurrent/lock/support/redis/RedisReadWriteLock.java +++ b/hsweb-web-concurrent/hsweb-web-concurrent-lock/src/main/java/org/hsweb/concurrent/lock/support/redis/RedisReadWriteLock.java @@ -1,10 +1,16 @@ package org.hsweb.concurrent.lock.support.redis; +import org.springframework.core.io.ClassPathResource; +import org.springframework.data.redis.connection.StringRedisConnection; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.core.script.DefaultRedisScript; +import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.util.Assert; -import java.util.UUID; +import java.util.*; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -20,20 +26,34 @@ public class RedisReadWriteLock implements ReadWriteLock { private WriteLock writeLock; private long lockKeyExpireTime = DEFAULT_EXPIRE; private long waitTime = 30; - protected byte[] lockValue; - private byte[] readLockKey, writeLockKey; + protected String lockValue; + private String readLockKey, writeLockKey; - private RedisTemplate redisTemplate; + private static DefaultRedisScript redisScriptRead; + private static DefaultRedisScript redisScriptWrite; + + static { + //初始化脚本 + redisScriptRead = new DefaultRedisScript<>(); + redisScriptWrite = new DefaultRedisScript<>(); + + redisScriptRead.setScriptSource(new ResourceScriptSource(new ClassPathResource("MEAT_INF/scripts/vcheckAndSadd.lua"))); + redisScriptRead.setResultType(Boolean.class); + redisScriptWrite.setScriptSource(new ResourceScriptSource(new ClassPathResource("MEAT_INF/scripts/scheckAndVset.lua"))); + redisScriptWrite.setResultType(Boolean.class); + } + + private StringRedisTemplate redisTemplate; public RedisReadWriteLock(String key, RedisTemplate redisTemplate) { Assert.notNull(key); Assert.notNull(redisTemplate); - this.redisTemplate = redisTemplate; + this.redisTemplate = new StringRedisTemplate(redisTemplate.getConnectionFactory()); readLock = new ReadLock(); writeLock = new WriteLock(); - readLockKey = (PREFIX + key + ".read.lock").getBytes(); - writeLockKey = (PREFIX + key + ".write.lock").getBytes(); - lockValue = (UUID.randomUUID().toString()).getBytes(); + readLockKey = PREFIX + key + ".read.lock"; + writeLockKey = PREFIX + key + ".write.lock"; + lockValue = UUID.randomUUID().toString(); } @Override @@ -46,11 +66,11 @@ public class RedisReadWriteLock implements ReadWriteLock { return writeLock; } - private byte[] getReadKey() { + private String getReadKey() { return readLockKey; } - private byte[] getWriteKey() { + private String getWriteKey() { return writeLockKey; } @@ -69,89 +89,101 @@ public class RedisReadWriteLock implements ReadWriteLock { this.lockKeyExpireTime = lockKeyExpireTime; } + + class ReadLock implements Lock { - public byte[] lockValue() { - return new String(lockValue).concat(Thread.currentThread().getId() + "").getBytes(); + + private List keys = new ArrayList<>(); + + public ReadLock() { + super(); + keys.add(getWriteKey().toString()); + keys.add(getReadKey().toString()); + } + + public String lockValue() { + return new String(lockValue).concat(Thread.currentThread().getId() + ""); } @Override public void lock() { - redisTemplate.execute((RedisCallback) connection -> { - boolean locked = false; - do { - if (!connection.exists(getWriteKey())) { - connection.setNX(getReadKey(), lockValue()); - connection.expire(getReadKey(), lockKeyExpireTime); - locked = true; - } else - sleep(); - } while (!locked); - return null; - }); + + while (true) { + Boolean locked = redisTemplate.execute(redisScriptRead, keys, lockValue()); + if (!locked) { + sleep(); + } else { + /* + * 此处增加对所有读锁的过期 + * 1、防止项目停止,导致读锁一直存在 + * + * @TODO 后期可以抽出到 redisScriptRead脚本中 + * */ + expire(); + break; + } + } } @Override public void lockInterruptibly() throws InterruptedException { - boolean locked = (Boolean) redisTemplate.execute((RedisCallback) connection -> - { - boolean writeLocked = connection.exists(getWriteKey()); - if (!writeLocked) { - if (connection.setNX(getReadKey(), lockValue)) { - connection.expire(getReadKey(), lockKeyExpireTime); - } - writeLocked = true; - } - return writeLocked; - }); - if (!locked) throw new InterruptedException(""); + + boolean locked = redisTemplate.execute(redisScriptRead, keys, lockValue()); + if (locked) { + expire(); + } else { + throw new InterruptedException("could not get the read lock!"); + } } @Override public boolean tryLock() { - return (Boolean) redisTemplate.execute((RedisCallback) connection -> - { - if (connection.setNX(getReadKey(), lockValue)) { - connection.expire(getReadKey(), lockKeyExpireTime); - } - return false; - }); + boolean locked = redisTemplate.execute(redisScriptRead, keys, lockValue()); + if (locked){ + expire(); + } + return locked; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { byte[] error = new byte[1]; - boolean success = (Boolean) redisTemplate.execute((RedisCallback) connection -> { - boolean locked = false; - long startWith = System.nanoTime(); - do { - if (!connection.exists(getWriteKey())) { - connection.setNX(getReadKey(), lockValue); - connection.expire(getReadKey(), lockKeyExpireTime); - return true; - } - long now = System.nanoTime(); - if (now - startWith > unit.toNanos(time)) { - error[0] = 1; - return false; - } - sleep(); - } while (!locked); - return false; - }); + + boolean locked; + long startWith = System.nanoTime(); + do { + + locked = redisTemplate.execute(redisScriptRead, keys, lockValue()); + if (locked) { + expire(); + break; + } + + long now = System.nanoTime(); + if (now - startWith > unit.toNanos(time)) { + error[0] = 1; + break; + } + sleep(); + } while (!locked); + if (error[0] == 1) { throw new InterruptedException("try lock time out!"); } - return success; + return locked; } @Override public void unlock() { redisTemplate.execute((RedisCallback) conn -> { - byte[] lock = conn.get(getReadKey()); - if (lock == null) return null; + StringRedisConnection strConn = (StringRedisConnection) conn; + Set locks = strConn.sMembers(getReadKey()); + + if (locks == null || locks.size() == 0) + return null; //当前读锁为自己持有 才解锁 - if (new String(lock).equals(new String(lockValue()))) { - conn.del(getReadKey()); + if (locks.contains(lockValue())) { + strConn.sRem(getReadKey(), lockValue()); } return null; }); @@ -161,88 +193,99 @@ public class RedisReadWriteLock implements ReadWriteLock { public Condition newCondition() { throw new UnsupportedOperationException(); } + + private void expire() { + redisTemplate.expire(getReadKey(), lockKeyExpireTime, TimeUnit.SECONDS); + } } class WriteLock implements Lock { + + private List keys = new ArrayList<>(); + + public WriteLock() { + super(); + keys.add(getReadKey()); + keys.add(getWriteKey()); + } + @Override public void lock() { - redisTemplate.execute((RedisCallback) connection -> { - boolean locked = false, readLocked = false; - do { - readLocked = connection.exists(getReadKey()); - if (!readLocked) { - locked = connection.setNX(getWriteKey(), lockValue); - connection.expire(getWriteKey(), lockKeyExpireTime); - } else - sleep(); - } while (!locked); - return null; - }); + + boolean locked; + + do { + locked = redisTemplate.execute(redisScriptWrite, keys, lockValue); + if (locked) { + expire(); + } else { + sleep(); + } + } while (!locked); } @Override public void lockInterruptibly() throws InterruptedException { - boolean locked = (Boolean) redisTemplate.execute((RedisCallback) connection -> - { - boolean readLocked = connection.exists(getReadKey()); - if (!readLocked) { - boolean _locked = connection.setNX(getWriteKey(), lockValue); - if (_locked) connection.expire(getWriteKey(), lockKeyExpireTime); - return _locked; - } - return false; - }); - if (!locked) throw new InterruptedException(""); + boolean locked = redisTemplate.execute(redisScriptWrite, keys, lockValue); + if (locked) { + expire(); + } else { + throw new InterruptedException(""); + } } @Override public boolean tryLock() { - return (Boolean) redisTemplate.execute((RedisCallback) connection -> { - if (connection.exists(getReadKey())) return false; - boolean locked = connection.setNX(getWriteKey(), lockValue); - if (locked) - connection.expire(getWriteKey(), lockKeyExpireTime); - return locked; - }); + boolean locked = redisTemplate.execute(redisScriptWrite, keys, lockValue); + if (locked) { + expire(); + } + return locked; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { byte[] error = new byte[1]; - boolean success = (Boolean) redisTemplate.execute((RedisCallback) connection -> { - boolean locked = false; - long startWith = System.nanoTime(); - do { - if (!connection.exists(getReadKey())) { - locked = connection.setNX(getWriteKey(), lockValue); - if (locked) { - connection.expire(getWriteKey(), lockKeyExpireTime); - return true; - } - } - long now = System.nanoTime(); - if (now - startWith > unit.toNanos(time)) { - error[0] = 1; - return false; - } - sleep(); - } while (!locked); - return null; - }); + + boolean locked; + long startWith = System.nanoTime(); + do { + locked = redisTemplate.execute(redisScriptWrite, keys, lockValue); + long now = System.nanoTime(); + if (now - startWith > unit.toNanos(time)) { + error[0] = 1; + break; + } + sleep(); + } while (!locked); + + if (locked) { + expire(); + } + if (error[0] == 1) { throw new InterruptedException("lock time out!"); } - return success; + return locked; } @Override public void unlock() { - redisTemplate.execute((RedisCallback) conn -> conn.del(getWriteKey())); + redisTemplate.execute((RedisCallback) conn -> { + StringRedisConnection strConn = (StringRedisConnection) conn; + + strConn.del(getWriteKey()); + return null; + }); } @Override public Condition newCondition() { throw new UnsupportedOperationException(); } + + private void expire() { + redisTemplate.expire(getWriteKey(), lockKeyExpireTime, TimeUnit.SECONDS); + } } } diff --git a/hsweb-web-concurrent/hsweb-web-concurrent-lock/src/main/resources/META-INF/scheckAndVset.lua b/hsweb-web-concurrent/hsweb-web-concurrent-lock/src/main/resources/META-INF/scheckAndVset.lua new file mode 100644 index 000000000..5ee36cc07 --- /dev/null +++ b/hsweb-web-concurrent/hsweb-web-concurrent-lock/src/main/resources/META-INF/scheckAndVset.lua @@ -0,0 +1,21 @@ +-- +-- Created by IntelliJ IDEA. +-- User: aaa +-- Date: 2016/9/19 +-- Time: 17:31 +-- To change this template use File | Settings | File Templates. +-- + +local size = redis.call('SCARD',KEYS[1]) +if(size == 0) +then + local flag = redis.call('SETNX',KEYS[2],ARGV[1]) + if(flag) + then + return true + else + return false + end +else + return false +end diff --git a/hsweb-web-concurrent/hsweb-web-concurrent-lock/src/main/resources/META-INF/vcheckAndsAdd.lua b/hsweb-web-concurrent/hsweb-web-concurrent-lock/src/main/resources/META-INF/vcheckAndsAdd.lua new file mode 100644 index 000000000..f9cc41439 --- /dev/null +++ b/hsweb-web-concurrent/hsweb-web-concurrent-lock/src/main/resources/META-INF/vcheckAndsAdd.lua @@ -0,0 +1,16 @@ +-- +-- Created by IntelliJ IDEA. +-- User: aaa +-- Date: 2016/9/19 +-- Time: 17:31 +-- To change this template use File | Settings | File Templates. +-- + +local v = redis.call('GET',KEYS[1]) +if(v) +then + return false +else + redis.call('SADD',KEYS[2],ARGV[1]) + return true +end