防止数据的并发操作出现数据不一致,保证操作是幂等的,我们可以对对象加锁、方法加锁,例如synchronized、ReentrantLock等,但是上述Jdk提供的锁时基于内存的,在分布式环境下是不适用的,那么在分布式环境下该如何实现锁操作呢?
分布式环境下需要保证集群中对同一数据、方法等操作状态,所以需要把状态单独存储,分布式锁的实现常用方式是基于缓存或者ZK,常用分布式缓存包括Redis和memcached,通过在缓存中设置同一个key和过期时间实现集群中拿到的状态是一致的,ZK是分布式锁实现是基于ZK的临时顺序节点,通过判断同一name下节点是否是最小节点确定是否获取到锁。
本文不具体介绍如何实现缓存的写入和删除或者ZK的临时顺序节点创建等实现,主要针对怎么利用Spring AOP消除冗余的if…else,try{}catch(){}等代码,至于为什么,举个正常写一下锁调用的和利用AOP实现的简单对比。
- 正常写法
public String test() {
final String lockKey = "lock_key";
if (!DistributedLock.getInstance().lock(lockKey, 10, TimeUnit.SECONDS)) {
// 没有获取到锁, 返回
}
try {
// 获取到锁, 进行业务逻辑
// ...
} finally {
// 释放锁
DistributedLock.getInstance().unlock(lockKey);
}
return "ok";
}
- AOP实现
"lock_key")
public String test() {
return "ok";
}
(prefix=
看上面两个对比,是不是发现下面的写法很简洁,不用重复写相同的代码了,下面们一下具体实现,其实主要是@Lock
注解的定义,和利用AOP对注解的拦截
@Lock
package com.github.lock.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author wangdongbo
* @since 2019/7/22.
*/
(ElementType.METHOD)
(RetentionPolicy.RUNTIME)
public Lock {
/**
* 锁前缀, 为空时取'classname_method_'
*/
String prefix() default "";
/**
* 方法参数名,支持方法中的入参,支持方法参数是对象时对象属性的字段
* 不支持集合/数组参数
*/
String[] parameter() default {};
/**
* 锁定时间
*/
int lockSeconds() default 10;
/**
* 是否重试,默认重试
*/
boolean retry() default true;
/**
* 重试次数,默认为1,retry为true时可用
*/
int retryCount() default 1;
/**
* 没拿到锁时等待多久后重试,retry为true时可用
*/
int tryLockSeconds() default 10;
/**
* 提示
*/
String desc() default "数据正在操作中,请稍后再试";
}
LockResolver.java
package com.github.lock.resolver;
import com.github.lock.annotation.Lock;
import com.github.lock.exception.LockException;
import com.github.lock.util.UUIDUtil;
import com.github.lock.way.DistributedLock;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.ArrayUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.LocalVariableTableParameterNameDiscoverer;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.List;
import java.util.stream.Stream;
/**
* @author wangdongbo
* @since 2019/7/22.
*/
4j
(1)
public class LockResolver {
private static final String DOT = ".";
private static final String DOT_SPLIT = "\\.";
private DistributedLock distributedLock;
private static LocalVariableTableParameterNameDiscoverer parameterNameDiscoverer = new LocalVariableTableParameterNameDiscoverer();
("@annotation(com.github.lock.annotation.Lock)")
public void lockAspectMethod() { }
("lockAspectMethod()")
public Object doAround(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
Method method = methodSignature.getMethod();
if (method == null) {
throw new LockException("method is empty");
}
Lock lock = method.getAnnotation(Lock.class);
String prefix;
if (!StringUtils.isEmpty(lock.prefix())) {
prefix = lock.prefix();
} else {
prefix = joinPoint.getTarget().getClass().getSimpleName() + "_" + method.getName() + "_";
}
String lockKey = prefix + getArgsConcat(method, lock.parameter(), joinPoint.getArgs());
String requestId = UUIDUtil.getID();
try {
if (lock.retry()) {
int count = lock.retryCount();
for (int i = 0; i < count; i++) {
if (distributedLock.tryLock(lockKey, requestId, lock.lockSeconds(), lock.tryLockSeconds())) {
return joinPoint.proceed();
}
}
} else if (distributedLock.lock(lockKey, requestId, lock.lockSeconds())) {
return joinPoint.proceed();
}
throw new LockException(lock.desc());
} catch (Throwable e) {
log.warn("add lock error:{}", e.getMessage(), e);
throw new LockException(e.getMessage(), e);
} finally {
distributedLock.unlock(lockKey, requestId);
}
}
private String getArgsConcat(Method method, String[] parameter, Object[] args) {
String[] params = parameterNameDiscoverer.getParameterNames(method);
if (ArrayUtils.isEmpty(parameter)) {
return "";
}
List<Object> list = Lists.newArrayListWithCapacity(parameter.length);
Stream.of(parameter).forEach(s -> {
if (s.contains(DOT)) {
list.add(parseObj(s, params, args));
} else {
int idx = ArrayUtils.indexOf(params, s);
list.add(args[idx]);
}
});
return Joiner.on("_").join(list);
}
private Object parseObj(String parameter, String[] params, Object[] args) {
String firstField = parameter.split(DOT_SPLIT)[0];
int idx = ArrayUtils.indexOf(params, firstField);
Object value = args[idx];
Class valueClass = value.getClass();
Object[] fields = ArrayUtils.remove(parameter.split(DOT_SPLIT), 0);
for (Object field : fields) {
try {
Field nextField = valueClass.getDeclaredField(String.valueOf(field));
nextField.setAccessible(true);
value = nextField.get(value);
valueClass = value.getClass();
} catch (NoSuchFieldException | IllegalAccessException e) {
log.error(e.getMessage(), e);
}
}
return value;
}
}
提供一个锁接口,具体可以根据自己的代码实现具体的功能。
DistributedLock.java
package com.github.lock.way;
/**
* @author wangdongbo
* @since 2019/7/22.
*/
public interface DistributedLock {
/**
* 加锁
*
* @param lockKey key
* @param requestId 唯一请求ID
* @param seconds 锁定时间[秒]
* @return 是否加锁成功
*/
boolean lock(String lockKey, String requestId, int seconds);
/**
* 尝试加锁
*
* @param lockKey key
* @param requestId 唯一请求ID
* @param seconds 锁定时间[秒]
* @param trySeconds 拿不到锁,等待多久之后再次尝试加锁[秒]
* @return 是否加锁成功
*/
boolean tryLock(String lockKey, String requestId, int seconds, int trySeconds);
/**
* 释放锁
*
* @param lockKey key
* @param requestId 唯一请求ID
* @return 是否成功释放锁
*/
boolean unlock(String lockKey, String requestId);
}
以上代码就是主要逻辑代码,具体代码实现可以参考工程: 传送门