
锁:就是对代码块背后的资源进行锁定,在同一时间段只允许有一个线程访问修改
常用的线程安全机制:
1、sychronized jvm 自带的锁, 可以重入,没有超时时间,不能被外部释放接下来咱们在一个例子中体会分布式锁
2.例子秒杀例子
秒杀商品:
1.判断库存是否足够
2.库存充裕 则
生成订单,扣减库存
3.返回结构,生成订单
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.xml.ws.soap.Addressing;
@RestController
public class GoodsController {
@Autowired
private StringRedisTemplate redisTemplate;
@RequestMapping("/init")
public String init(String goods){
// 初始化库存
redisTemplate.opsForValue().set("stack_"+goods, 1000+"");
// 初始化订单
redisTemplate.opsForValue().set("order_"+goods, 0+"");
return "当前商品:"+goods +"--库存:"+ redisTemplate.opsForValue().get("stack_"+goods)+
"--订单:"+ redisTemplate.opsForValue().get("order_"+goods);
}
@RequestMapping("/killGoods")
public synchronized String killGoods(String goods){
String stackStr = redisTemplate.opsForValue().get("stack_"+goods);
int stack = Integer.valueOf(stackStr);
// 1.判断库存
if (stack<1){// 说明没有库存
return "很遗憾商品已售罄"+goods;
}
// 生成订单 订单加1 自增1
redisTemplate.opsForValue().increment("order_"+goods);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
redisTemplate.opsForValue().set("stack_"+goods,(stack-1)+"");
return "当前抢购商品成功"+goods;
}
@GetMapping("/getState")
public String getState(String goods){
return "当前商品:"+goods +"--库存:"+ redisTemplate.opsForValue().get("stack_"+goods)+
"--订单:"+ redisTemplate.opsForValue().get("order_"+goods);
}
}
获取订单状态
秒杀
使用工具进行秒杀
下载ab压力测试
ab -n 请求数 -c 并发数 访问的路径 ab -n 5000 -c 20 http://localhost:8080/killGoods?goods=p50
可以使用 synchronized 解决线程安全
多台机器同时访问
synchronized无法解决多应用线程问题
3.使用分布式锁解决线程安全 使用redis 解决分布式线程安全问题 1.引入redis 锁
导入依赖
application.properties的创建org.springframework.boot spring-boot-starter-parent2.2.4.RELEASE org.springframework.boot spring-boot-starter-weborg.springframework.boot spring-boot-starter-actuatororg.springframework.boot spring-boot-starter-testtest org.springframework.boot spring-boot-starter-data-redis
spring.redis.host=8.130.166.101 spring.redis.port=6379
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
public class RedisLock {
@Autowired
public StringRedisTemplate redisTemplate;
public boolean lock(String key,String value,int time){
// 通过 setnx 来判断key 的标记位是否存在
// 如果存在说明别人已经 持有锁
// 不存在 创建,并获得锁
return redisTemplate.opsForValue().setIfAbsent(key,value,time, TimeUnit.SECONDS);
}
public boolean unlock(String key){
return redisTemplate.delete(key);
}
}
2.使用
@Autowired
private RedisLock redisLock;
@RequestMapping("/redisKillGoods")
public synchronized String redisKillGoods(String goods){
if (redisLock.lock("lock_"+goods,"0",5)){// 拿到锁
String stackStr = redisTemplate.opsForValue().get("stack_"+goods);
int stack = Integer.valueOf(stackStr);
// 1.判断库存
if (stack<1){// 说明没有库存
// 释放锁
redisLock.unlock("lock_"+goods);
return "很遗憾商品已售罄"+goods;
}
// 生成订单 订单加1 自增1
redisTemplate.opsForValue().increment("order_"+goods);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
redisTemplate.opsForValue().set("stack_"+goods,(stack-1)+"");
// 释放锁
redisLock.unlock("lock_"+goods);
return "当前抢购商品成功"+goods;
}else {
return "很遗憾没有抢到宝贝"+goods;
}
}
使用redis setnx 完成分布式锁缺陷
1.setnx 本身是两条命令 a.set值 b.配置其过期时间
2.如果redis 是主从模式,有可能造成 主从数据延迟问题
3.setnx 有时间限制,如果在使用锁期间,key时间过期了,也会造成安全问题?
解决方案:
1.使用 redis 调用lua 脚本操作 setnx 保证原子性 操作复杂
2.我们使用 redlock (红锁) 我们通过多个key 保证一个分布式锁,通过投票决定是否获取锁,配置的key还拥有看门狗机制(可以查看key是否即将过期,如果快要过期,可以续命)
解决 遇到到 1,2,3 问题
1.引入zk 依赖
2.容器加入zk org.apache.zookeeper zookeeper3.6.0 slf4j-api org.slf4j slf4j-log4j12 org.slf4j log4j log4j org.apache.curator curator-recipes4.0.1 slf4j-api org.slf4j
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ZkConfig {
@Bean
public Curatorframework getCuratorframework(){
// 当客户端 和 服务端 连接异常时,会发起重试 每隔2s 重试1次,总共试 3次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(2000,3);
Curatorframework curatorframework = CuratorframeworkFactory.builder()
.retryPolicy(retryPolicy).connectString("192.168.12.145:2181;192.168.12.145:2182;192.168.12.145:2183").build();
curatorframework.start();
return curatorframework;
}
}
3.使用zk 分布式锁
@Autowired
private Curatorframework curatorframework;
@RequestMapping("/zkKillGoods")
public synchronized String zkKillGoods(String goods) throws Exception {
// 封装zk 的分布式锁 相当于 jvm synchronized 中的监视器
InterProcessMutex interProcessMutex = new InterProcessMutex(curatorframework,"/"+goods);
// interProcessMutex.acquire(5, TimeUnit.SECONDS) 去获取锁,若果没有获取到,就等待5s,还没有则放弃
if (interProcessMutex.acquire(5, TimeUnit.SECONDS)){// 拿到锁
String stackStr = redisTemplate.opsForValue().get("stack_"+goods);
int stack = Integer.valueOf(stackStr);
// 1.判断库存
if (stack<1){// 说明没有库存
// 释放锁
interProcessMutex.release();
return "很遗憾商品已售罄"+goods;
}
// 生成订单 订单加1 自增1
redisTemplate.opsForValue().increment("order_"+goods);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
redisTemplate.opsForValue().set("stack_"+goods,(stack-1)+"");
// 释放锁
interProcessMutex.release();
return "当前抢购商品成功"+goods;
}else {
return "很遗憾没有抢到宝贝"+goods;
}
}
总结:
使用zk 作为分布锁,比较可靠,但是效率太低,一般适用于并发量不大的场合 如果并发量太高,zk 就是最大的性能瓶颈