
Project Modules JDK1.8
java Compiler --> Module typecodeversion 8
线程基础 线程核心概念程序:QQ.exe(硬盘上)
进程:运行程序,放入内存(资源分配的基本单位)
线程:程序执行的基本单元
package org.example;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.net.URL;
//练习Thread,实现多线程同步下载网络图片
public class ThreadPractice extends Thread{
private String url;//网络图片地址
private String name;//保存的文件名
public ThreadPractice(String url,String name){
this.name = name;
this.url = url;
}
//下载图片线程的执行体
public void run() {
WebDownloader webDownloader = new WebDownloader();
webDownloader.downloader(url,name);
System.out.println("下载了文件名为:"+name);
}
public static void main(String[] args) {
ThreadPractice t1 = new ThreadPractice("https://tse2-mm.cn.bing.net/th/id/OIP-C.xsA-3qUw6cqmd8nRfxk6TQHaEK?w=298&h=180&c=7&r=0&o=5&dpr=1.25&pid=1.7","1.jpg");
ThreadPractice t2 = new ThreadPractice("https://tse2-mm.cn.bing.net/th/id/OIP-C.xsA-3qUw6cqmd8nRfxk6TQHaEK?w=298&h=180&c=7&r=0&o=5&dpr=1.25&pid=1.7","2.jpg");
ThreadPractice t3 = new ThreadPractice("https://tse2-mm.cn.bing.net/th/id/OIP-C.xsA-3qUw6cqmd8nRfxk6TQHaEK?w=298&h=180&c=7&r=0&o=5&dpr=1.25&pid=1.7","3.jpg");
t1.start();
t2.start();
t3.start();
}
}
//下载器
class WebDownloader{
//下载方法
public void downloader(String url,String name){
try {
FileUtils.copyURLToFile(new URL(url),new File(name));
} catch (IOException e) {
e.printStackTrace();
}
}
}
实现Runnable接口
package org.example;
//创建线程方式2:实现runnable接口,重写run方法,执行线程需要丢入runnable接口实现类,调用start方法
public class Thread3 implements Runnable {
@Override
public void run() {
//run方法线程体
for(int i=0;i<200;i++){
System.out.println("我在看代码--");
}
}
public static void main(String[] args){
//main线程,主线程
Thread3 thread3 = new Thread3();
Thread t = new Thread(thread3);
t.start();
for(int i=0;i<100;i++){
System.out.println("我在学习多线程--");
}
}
}
实现Callable接口
package org.example; import org.apache.commons.io.FileUtils; import java.io.File; import java.io.IOException; import java.net.URL; import java.util.concurrent.*; public class TestCallable implements CallableLambda表达式{ private String url;//网络图片地址 private String name;//保存的文件名 public TestCallable(String url,String name){ this.name = name; this.url = url; } //下载图片线程的执行体 public Boolean call() throws Exception { WebDownloader webDownloader = new WebDownloader(); webDownloader.downloader(url,name); System.out.println("下载了文件名为:"+name); return true; } public static void main(String[] args) { TestCallable t1 = new TestCallable("https://tse2-mm.cn.bing.net/th/id/OIP-C.xsA-3qUw6cqmd8nRfxk6TQHaEK?w=298&h=180&c=7&r=0&o=5&dpr=1.25&pid=1.7","1.jpg"); TestCallable t2 = new TestCallable("https://tse2-mm.cn.bing.net/th/id/OIP-C.xsA-3qUw6cqmd8nRfxk6TQHaEK?w=298&h=180&c=7&r=0&o=5&dpr=1.25&pid=1.7","2.jpg"); TestCallable t3 = new TestCallable("https://tse2-mm.cn.bing.net/th/id/OIP-C.xsA-3qUw6cqmd8nRfxk6TQHaEK?w=298&h=180&c=7&r=0&o=5&dpr=1.25&pid=1.7","3.jpg"); //Callable线程实现步骤 //创建执行服务 ExecutorService ser = Executors.newFixedThreadPool(3); //提交执行 Future r1 = ser.submit(t1); Future r2 = ser.submit(t2); Future r3 = ser.submit(t3); //获取结果 try { boolean rs1 = r1.get(); boolean rs2 = r2.get(); boolean rs3 = r3.get(); } catch (Exception e) { e.printStackTrace(); } } } //下载器 class WebDownloader{ //下载方法 public void downloader(String url,String name){ try { FileUtils.copyURLToFile(new URL(url),new File(name)); } catch (IOException e) { e.printStackTrace(); } } }
**函数式接口的定义:**任何接口,如果只包含唯一一个抽象方法,那么就是一个函数式接口
对于函数式接口,可以通过lamda表达式来创建接口对象
package org.example;
public class TestLambda1 {
//3.静态内部类
static class Like2 implements ILike{
@Override
public void lambda() {
System.out.println("I like Lambda2");
}
}
public static void main( String[] args ) {
ILike like = new Like();
like.lambda();
like = new Like2();
like.lambda();
//4.局部内部类
class Like3 implements ILike{
@Override
public void lambda() {
System.out.println("I like Lambda3");
}
}
like = new Like3();
like.lambda();
//5.匿名内部类,没有类的名称,必须借助接口或者父类
like = new ILike() {
@Override
public void lambda() {
System.out.println("I like Lambda4");
}
};
like.lambda();
//6.用lambda简化
like = ()->{
System.out.println("I like lambda5");
};
like.lambda();
}
}
//1.定义一个函数式接口
interface ILike{
void lambda();
}
//2.实现类
class Like implements ILike{
@Override
public void lambda() {
System.out.println("I like lambda");
}
}
lambda简化
package org.example;
public class TestLambda2 {
public static void main(String[] args) {
//1.lambda表达式简化
ILove love = (int a)->{
System.out.println("I love you"+a);
};
//2.简化:去掉参数类型
love = (a) -> {
System.out.println("I love you "+a);
};
//3.简化:去掉小括号
love = a -> {
System.out.println("I love you "+a);
};
//4.简化:去掉花括号
love = a -> System.out.println("I love you "+a);
love.love(521);
}
}
interface ILove{
void love(int a);
}
总结:
lambda表达式只能有一行代码的情况下才能简化成为一行,如果有多行,那么就用代码块包裹
前提是接口为函数接口,即只有一个抽象方法
多个参数也可以去掉参数类型,但是要去掉就都去掉,还需要小括号
wait/sleep同步块:synchronized**(Obj){}**
Obj称之为同步监视器
同步监视器的执行过程
setPriority(int newPriority) 更改线程的优先级
static void sleep(long millis) 在指定的毫秒数内让当前正在执行的线程休眠
void join() 等待该线程终止
static void yield() 暂停当前正在执行的线程对象,并执行其他线程
void interrupt() 中断线程,尽量不要用
boolean isAlive() 测试线程是否处于活动状态
线程的停止不推荐JDK提供的stop()和destroy()方法,已废弃
建议线程自己停止下来(使用一个标志位进行终止变量,当flag=false,终止线程运行)
package org.example;
public class TestStop implements Runnable{
//设置一个标志位
private boolean flag = true;
@Override
public void run() {
int i = 0;
while(flag){
System.out.println("线程运行中"+i);
}
}
//设置一个公开的方法停止线程,转换标志位
public void stop(){
this.flag = false;
}
public static void main(String[] args) {
TestStop testStop = new TestStop();
new Thread(testStop).start();
for (int i = 0; i < 1000; i++) {
System.out.println("main"+i);
if(i==900){
//调用stop方法切换标志位,让线程停止
testStop.stop();
System.out.println("线程停止了");
}
}
}
}
线程休眠sleep
sleep指当前线程阻塞的毫秒数
sleep存在异常InterruptedException
sleep时间达到后线程进入就绪状态
sleep可以模拟网络延时,倒计时
每一个对象都有一个锁,sleep不会释放锁
企业中用JUC休眠
TimeUnit.DAYS.sleep() TimeUnit.SECONDS.sleep();
模拟延时:
package org.example;
public class TestSleep implements Runnable{
//票数
private int ticketNums = 10;
@Override
public void run() {
while(true){
if(ticketNums<=1){
break;
}
try {
//模拟延时
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"拿到了第"+ticketNums--+"张票");
}
}
public static void main(String[] args) {
TestSleep ticket = new TestSleep();
new Thread(ticket,"小明").start();
new Thread(ticket,"小桑").start();
new Thread(ticket,"园园").start();
}
}
模拟倒计时:
package org.example;
public class TestSleep2{
//模拟倒计时
public static void tenDown() throws InterruptedException{
int num = 10;
while(true){
Thread.sleep(1000);
System.out.println(num--);
if(num<=0){
break;
}
}
}
public static void main(String[] args) {
try {
tenDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
线程礼让yield
礼让线程,让当前正在执行的线程暂停,但不阻塞
将线程从运行状态转为就绪状态
让CPU重新调度,礼让不一定成功,看CPU心情
package org.example;
public class TestYield {
public static void main(String[] args) {
MyYield myYield = new MyYield();
new Thread(myYield,"a").start();
new Thread(myYield,"b").start();
}
}
class MyYield implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"线程执行开始");
Thread.yield();//礼让
System.out.println(Thread.currentThread().getName()+"线程执行结束");
}
}
线程合并join
package org.example;
//测试Join,想象成插队
public class TestJoin implements Runnable {
@Override
public void run() {
for (int i = 0; i < 100; i++) {
System.out.println("线程VIP来了"+i);
}
}
public static void main(String[] args) throws InterruptedException {
//我们的线程启动
TestJoin testJoin = new TestJoin();
Thread thread = new Thread(testJoin);
//主线程
for (int i = 0; i < 1000; i++) {
if(i==200){
thread.start();
thread.join();//插队
}
System.out.println("main"+i);
}
}
}
观测线程状态
package org.example;
public class TestState {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("结束了");
});
//观察状态
Thread.State state = thread.getState();
System.out.println(state);//NEW
//观察启动后
thread.start();//启动线程
state = thread.getState();
System.out.println(state);//Run
while(state!=Thread.State.TERMINATED){//只要线程不终止,就一直输出状态
Thread.sleep(100);
state = thread.getState();//更新线程状态
System.out.println(state);
}
}
}
线程优先级
java提供一个线程调度器来监控程序中启动后进入就绪状态的所有线程,
线程调度器按照优先级决定应该调度那个线程来执行
线程的优先级用数字表示,范围从1-10
Thread.MIN_PRIORITY = 1;
Thread.MAX_PRIORITY = 10;
Thread.NORM_PRIORITY = 5;
可以使用getPriority()获取优先级,setPriority(int xxx)设置优先级,默认优先级都是5
package org.example;
//测试线程优先级
public class TestPriority extends Thread{
public static void main(String[] args) {
//主线程默认优先级
System.out.println(Thread.currentThread().getName()+"优先级:"+Thread.currentThread().getPriority());
MyPriority myPriority = new MyPriority();
Thread t1 = new Thread(myPriority,"a");
Thread t2 = new Thread(myPriority,"b");
Thread t3 = new Thread(myPriority,"c");
Thread t4 = new Thread(myPriority,"d");
Thread t5 = new Thread(myPriority,"e");
//先设置优先级,再启动
t1.start();
t2.setPriority(1);
t2.start();
t3.setPriority(4);
t3.start();
t4.setPriority(10);
t4.start();
}
}
class MyPriority implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"优先级:"+Thread.currentThread().getPriority());
}
}
守护线程daemon
线程分为用户线程和守护线程
虚拟机必须确保用户线程执行完毕
虚拟机不用等待守护线程执行完毕
package org.example;
public class TestDaemon {
public static void main(String[] args) {
God god = new God();
You you = new You();
Thread t1 = new Thread(god);
t1.setDaemon(true);//默认false是用户线程,改成true变成守护线程
t1.start();
new Thread(you).start();//you 用户线程启动
}
}
class God implements Runnable{
@Override
public void run() {
while(true){
System.out.println("上帝保佑着你");
}
}
}
class You implements Runnable{
@Override
public void run() {
for (int i = 0; i < 36500; i++) {
System.out.println("我还活着");
}
System.out.println("==我死了,拜拜世界==");
}
}
线程同步
并发:同一个对象被多个线程同时操作
线程同步:队列+锁,就是一种等待机制
由于同一进程的多个线程共享同一块存储空间,在带来方便的同时,也带来了访问冲突问题
为了保证数据在方法中被访问时的正确性,在访问时加入锁机制synchronized,
当一个线程获得对象的排它锁,独占资源,其他线程必须等待,使用后释放锁即可,
线程同步存在的问题:
package org.example;
//线程不安全
public class UnsafeBuyTicket {
public static void main(String[] args) {
BuyTicket buyTicket = new BuyTicket();
new Thread(buyTicket,"可爱的小桑").start();;
new Thread(buyTicket,"帅气的园园").start();
new Thread(buyTicket,"掉线的月老").start();
}
}
class BuyTicket implements Runnable{
//票
private int ticketNums = 10;
boolean flag = true;//线程停止标志位
@Override
public void run() {
//买票
while(flag){
try {
buy();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//买票方法
private void buy() throws InterruptedException {
//判断是否有票
if(ticketNums<=0){
flag = false;
return;
}
Thread.sleep(100);
System.out.println(Thread.currentThread().getName()+"买到第"+ticketNums--+"张票了");
}
}
不安全取钱
package org.example;
//不安全的取钱
public class UnsafeBank {
public static void main(String[] args) {
//账户
Account account = new Account(10000,"共同存的钱");
Drawing lpy = new Drawing(account,5000,"李朋园");
Drawing slh = new Drawing(account,10000,"桑兰桦");
lpy.start();
slh.start();
}
}
//账户
class Account{
int balance;//卡内余额
String name;//卡号
public Account(int balance, String name) {
this.balance = balance;
this.name = name;
}
}
//银行:模拟取款
class Drawing extends Thread{
Account account;//账户
//取钱的金额
int drawingMoney;
//现金余额
int nowMoney;
public Drawing(Account account,int drawingMoney,String name){
super(name);
this.account = account;
this.drawingMoney = drawingMoney;
}
//取钱
public void run() {
if(account.balance-drawingMoney<0){
System.out.println(Thread.currentThread().getName()+"的卡内余额不足");
return;
}
//sleep放大问题的发生性
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//卡内余额=卡内余额-取的钱
account.balance = account.balance-drawingMoney;
//现金余额=现金余额+取的钱
nowMoney = nowMoney + drawingMoney;
System.out.println(account.name+"卡内余额为:"+account.balance);
System.out.println(this.getName()+"现金余额为:"+nowMoney);
}
}
不安全集合
package org.example;
import java.util.ArrayList;
import java.util.List;
public class UnsafeList {
public static void main(String[] args) {
List list = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
new Thread(()->{
list.add(Thread.currentThread().getName());
}).start();
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(list.size());
}
}
安全集合
手动安全
package org.example;
import java.util.ArrayList;
import java.util.List;
public class UnsafeList {
public static void main(String[] args) {
List list = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
new Thread(()->{
synchronized (list){
list.add(Thread.currentThread().getName());
}
}).start();
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(list.size());
}
}
JUC自带安全集合:
CopyOnWriteArrayList
package org.example;
import java.util.concurrent.CopyOnWriteArrayList;
public class TestJUC {
public static void main(String[] args) {
CopyOnWriteArrayList list = new CopyOnWriteArrayList<>();
for (int i = 0; i < 10000; i++) {
new Thread(()->{
list.add(Thread.currentThread().getName());
}).start();
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(list.size());
}
}
锁
死锁
死锁形成的四个必要条件:
破坏以上任意一个或多个条件,就可避免死锁
package org.example;
//死锁:多个线程互相抱着对方需要的资源,形成僵持
public class DeadLock {
public static void main(String[] args) {
Makeup g1 = new Makeup(0,"桑兰桦");
Makeup g2 = new Makeup(1,"梁子怡");
g1.start();
g2.start();
}
}
//口红
class Lipstick{
}
//镜子
class Mirror{
}
//化妆
class Makeup extends Thread{
//化妆需要的口红镜子资源,用static保证只有一份
static Lipstick lipstick = new Lipstick();
static Mirror mirror = new Mirror();
int choice;//选择
String girlName;//化妆的人
public Makeup(int choice, String girlName) {
this.choice = choice;
this.girlName = girlName;
}
public void run() {
//化妆
try {
makeup();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//化妆的方法
private void makeup() throws InterruptedException {
if(choice==0){
synchronized (lipstick){
System.out.println(this.girlName+"获得口红的锁");
Thread.sleep(1000);
synchronized (mirror){
System.out.println(this.girlName+"获得镜子的锁");
}
}
}else {
synchronized (mirror){
System.out.println(this.girlName+"获得镜子的锁");
Thread.sleep(2000);
synchronized (lipstick){
System.out.println(this.girlName+"获得口红的锁");
}
}
}
}
}
Lock锁
◆ 从JDK 5.0开始,Java提供了线程同步机制,通过显式定义同步锁对象来实现同步,同步锁使用Lock对象充当
◆ java.util.concurrent.locks.Lock接口是控制多个线程对共享资源进行访问的工具。
锁提供了对共享资源的独占访问,每次只能有一个线程对Lock对象加锁,
线程开始访问共享资源之前应先获得Lock对象
◆ 可重入锁,ReentrantLock类实现了Lock ,它拥有与synchronized相同的并发性和内存语义,
在实现线程安全的控制中,比较常用的是ReentrantLock,可以显式加锁、释放锁
package org.example;
import java.util.concurrent.locks.ReentrantLock;
public class TestLock {
public static void main(String[] args) {
TestLock2 testLock2 = new TestLock2();
new Thread(testLock2).start();
new Thread(testLock2).start();
new Thread(testLock2).start();
}
}
class TestLock2 implements Runnable {
int ticketNums = 10;
//定义Lock锁
private final ReentrantLock lock = new ReentrantLock();
public void run() {
while(true){
try{
//加锁
lock.lock();
if(ticketNums>0){
Thread.sleep(1000);
System.out.println(ticketNums--);
}else {
break;
}
}catch (InterruptedException e) {
e.printStackTrace();
}finally {
//解锁
lock.unlock();
}
}
}
}
线程池初识
◆ 背景:经常创建和销毁、使用量特别大的资源,比如并发情况下的线程,对性能影响很大。
◆ 思路:提前创建好多个线程,放入线程池中,使用时直接获取,使用完放回池中。
可以避免频繁创建销毁、实现重复利用。类似生活中的公共交通工具。
◆好处:
◆ JDK 5.0起提供了线程池相关API,ExecutorService 和Executors
◆ExecutorService: 真正的线程池接口。常见子类ThreadPoolExecutor
◆void execute(Runnable command) :执行任务/命令,没有返回值,一般用来执行Runnable
◆ Future submit(Callable task):执行任务,有返回值,一般用来执行Callable
◆void shutdown() :关闭连接池
◆Executors:工具类、线程池的工厂类,用于创建并返回不同类型的线程池
package org.example;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
//测试线程池
public class TestPool {
public static void main(String[] args) {
//1.创建线程池,参数为线程池的大小
ExecutorService service = Executors.newFixedThreadPool(10);
//2.执行
service.execute(new MyThread());
service.execute(new MyThread());
service.execute(new MyThread());
service.execute(new MyThread());
//3.关闭连接
service.shutdown();
}
}
class MyThread implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
JUC基础
JUC实际就是java.util.concurrent工具包、类
java.util.concurrent.atomic
java.util.concurrent.locks
并发和并行并发(多线程操作一个资源):一个CPU交替执行多个线程(单核,速度快)
并行:多个CUP同时执行多个线程(多核)
public class Test1{
public static void main(String[] args) {
//获取CPU的核数
System.out.println(Runtime.getRuntime().availableProcessors());
}
}
并发编程的本质:充分利用CPU的资源
线程的状态//创建 NEW, //运行 RUNNABLE, //阻塞 blocked BLOCKED, //死死地等 WAITING, //超时等待(等到时间就不等了) TIMED_WAITING, //terminated,终止 TERMINATEDwait和sleep的区别
package org.example;
public class SaleTicket {
public static void main(String[] args) {
Ticket ticket = new Ticket();
new Thread(()->{
for(int i=0;i<60;i++){
ticket.sale();
}
},"A").start();
new Thread(()->{
for(int i=0;i<60;i++){
ticket.sale();
}
},"B").start();
new Thread(()->{
for(int i=0;i<60;i++){
ticket.sale();
}
},"C").start();
}
}
//OOP,降低耦合性
class Ticket{
private int number = 50;
//买票方法
public synchronized void sale(){
if(number>0){
System.out.println(Thread.currentThread().getName()+"卖出了第"+(number--)+"张票");
}
}
}
LOCK接口
//加锁
Lock l = ...;
l.lock();
//解锁
l.unlock();
//可重入锁
ReentrantLock:原码有两个构造方法
public ReentrantLock() {
sync = new NonfairSync();//非公平锁
}
public ReentrantLock(boolean fair) {
//fair为true是公平锁,fair为false是非公平锁
sync = fair ? new FairSync() : new NonfairSync();
}
公平锁:十分公平(先来后到)
非公平锁:十分不公平(可以插队,默认是非公平锁)
package org.example;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class SaleTicket {
public static void main(String[] args) {
Ticket ticket = new Ticket();
new Thread(()->{for(int i=0;i<60;i++) ticket.sale();},"A").start();
new Thread(()->{for(int i=0;i<60;i++) ticket.sale();},"B").start();
new Thread(()->{for(int i=0;i<60;i++) ticket.sale();},"C").start();
}
}
//OOP,降低耦合性
class Ticket{
//Lock三部曲
//1.new ReentrantLock();
//2.加锁,lock.lock();
//3.解锁,lock.unlock();
private int number = 50;
Lock lock = new ReentrantLock();
//买票方法
public void sale(){
lock.lock();//加锁
lock.tryLock();//尝试锁
try{
if(number>0){
System.out.println(Thread.currentThread().getName()+"卖出了第"+(number--)+"张票");
}
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
synchronzied与Lock区别
synchronized是内置的java关键字,Lock是一个java类
synchronized无法判断获取锁的状态,Lock可以判断是否获取到了锁
synchronized会自动释放锁,Lock必须要手动释放锁,否则会死锁
synchronized的线程会傻傻的等待,Lock不一定会等待下去
synchronized的可重入锁,不可以中断的,非公平的。
Lock的可重入锁,可以判断锁,非公平(可以自己设置)
synchronized适合少量的代码同步问题,Lock适合锁大量的同步代码
| sleep() | wait() | |
|---|---|---|
| 类 | Thread类 | Object类 |
| 调用 | 类名. | 对象. |
| 理解 | 哪个位置调用,哪个线程等待 | 对象调用方法,访问对象的其他线程等待 |
| 唤醒 | 不需要别人唤醒 | 需要其他对象调用notify唤醒 |
| 锁 | 不会释放锁 | 等待后会释放锁 |
package org.example;
public class PC {
public static void main(String[] args) {
Data data = new Data();
new Thread(()->{
try {
for (int i = 0; i < 10; i++) {
data.increment();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
new Thread(()->{
try {
for (int i = 0; i < 10; i++) {
data.decrement();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"B").start();
}
}
//数字 资源类
//判断等待 业务 通知
class Data{
private int number = 0 ;
//+1
public synchronized void increment() throws InterruptedException {
if (number!=0){
//访问 调用increment()方法的data 的那个线程进入等待
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName()+"==>"+number);
//唤醒所有线程
this.notifyAll();
}
//-1
public synchronized void decrement() throws InterruptedException {
if (number==0){
//等待
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName()+"==>"+number);
this.notifyAll();
}
}
如果有多个生产者和消费者线程,会怎样? 虚假唤醒
虚假唤醒,多个生产者消费者只需把if换成while
package org.example;
public class PC {
public static void main(String[] args) {
Data data = new Data();
new Thread(()->{
try {
for (int i = 0; i < 10; i++) {
data.increment();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"A").start();
new Thread(()->{
try {
for (int i = 0; i < 10; i++) {
data.increment();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"C").start();
new Thread(()->{
try {
for (int i = 0; i < 10; i++) {
data.decrement();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"B").start();
new Thread(()->{
try {
for (int i = 0; i < 10; i++) {
data.decrement();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
},"D").start();
}
}
//数字 资源类
//判断等待 业务 通知
class Data{
private int number = 0 ;
//+1
public synchronized void increment() throws InterruptedException {
while (number!=0){
//访问 调用increment()方法的data 的那个线程进入等待
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName()+"==>"+number);
//唤醒所有线程
this.notifyAll();
}
//-1
public synchronized void decrement() throws InterruptedException {
while (number==0){
//等待
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName()+"==>"+number);
this.notifyAll();
}
}
JUC生产者消费者
Java.util.concurrent.locks的三个接口:Condition、Lock、ReadWriteLock
使用Lock替代synchronized
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
condition.await()替代wait();
condition.signalAll()替代notifyAll();
package org.example;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
//数字 资源类
//判断等待 业务 通知
class Data{
private int number = 0 ;
//使用Lock替换synchronized
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
//+1
public void increment() throws InterruptedException {
lock.lock();//加锁
try{
//业务代码
while (number!=0){
//访问 调用increment()方法的data 的那个线程进入等待
condition.await();
}
number++;
System.out.println(Thread.currentThread().getName()+"==>"+number);
//唤醒所有线程
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();//解锁
}
}
//-1
public void decrement() throws InterruptedException {
lock.lock();
try {
while (number==0){
//等待
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName()+"==>"+number);
//唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
以上代码,线程的执行顺序是随机的
为了做到先执行A线程,然后通知B线程,在通知C线程,最后通知D线程执行
需要:
Condition精准通知与唤醒同步监视器
package org.example;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TestCondition {
public static void main(String[] args) {
Data1 data1 = new Data1();
new Thread(()->{for(int i=0;i<10;i++)data1.printA();},"A").start();
new Thread(()->{for(int i=0;i<10;i++)data1.printB();},"B").start();
new Thread(()->{for(int i=0;i<10;i++)data1.printC();},"C").start();
}
}
class Data1{//资源类 使用Lock
private int number = 1;
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void printA(){
lock.lock();//加锁
try {
//判断等待 业务 唤醒
while (number!=1){
condition1.await();
}
System.out.println(Thread.currentThread().getName()+"==>我是A线程");
number = 2;
//唤醒指定的线程
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printB(){
lock.lock();
try {
while(number!=2){
condition2.await();
}
System.out.println(Thread.currentThread().getName()+"==>我是B线程");
number = 3;
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printC(){
lock.lock();
try {
while(number!=3){
condition3.await();
}
System.out.println(Thread.currentThread().getName()+"==>我是B线程");
number = 1;
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
8锁现象-重点
关于锁的八个问题:
public class Test1 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(()->{phone.sendMessage();},"A").start();
try {
TimeUnit.SECONDS.sleep(1);//延迟1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{phone.call();},"B").start();
}
}
class Phone{
public synchronized void sendMessage(){
//try {
// TimeUnit.SECONDS.sleep(4);//延迟4秒
//} catch (InterruptedException e) {
// e.printStackTrace();
//}
System.out.println("发短信");
}
public synchronized void call(){
System.out.println("打电话");
}
}
以上代码运行,是先输出发短信,还是先输出打电话?
结果:先输出发短信,再输出打电话
原因:有锁的存在,phone对象是同一个,谁先拿到phone,谁锁住它
以上代码发短信的方法加入4秒延迟,输出顺序又如何?
结果:依旧是先输出发短信,后输出打电话
原因:由于锁的缘故(并不是由于先调用先执行),synchronized被锁的对象是调用者,即phone对象, 两个方法用的是同一个锁–phone对象,发短信先拿到的锁发短信线程先执行
将以上代码打电话方法的synchronized删掉,即call()变成一个普通方法,输出顺序又如何?
结果:先输出打电话,后输出发短信
原因:call()不再受锁的影响了
将以上代码增加一个phone对象,让phone1发短信,phone2打电话,输出结果又如何?
结果:先输出打电话,后输出发短信
原因:两个phone对象,按照代码情况各执行各的,1秒后输出打电话,4秒后输出发短信
两个phone对象,将sendMessage()方法和call()方法都加上static修饰,输出结果如何?
public class Test2 {
public static void main(String[] args) {
Phone2 phone1 = new Phone2();
Phone2 phone2 = new Phone2();
new Thread(()->{phone1.sendMessage();},"A").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{phone2.call();},"B").start();
}
}
class Phone2{
public static synchronized void sendMessage(){
try {
TimeUnit.SECONDS.sleep(4);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("发短信");
}
public static synchronized void call(){
System.out.println("打电话");
}
}
结果:先输出发短信,后输出打电话
原因:加入static修饰,类一加载就有了,synchronized锁定的是Phone2的类模板Class,只有一份
谁先得到锁,谁先执行
两个phone对象,sendMessage()用static修饰,call()不用static修饰,输出结果又如何?
结果:先输出打电话,后输出发短信
原因:锁的不再是一个对象,一个锁的Phone的Class,一个锁的是phone2,各执行各的
如果是一个phone对象,sendMessage()被static synchronized修饰,call()只被synchronized修饰,
输出结果又会怎样?
结果:先输出打电话,后输出发短信
原因:锁的不再是一个对象,一个锁的Phone的Class,一个锁的是phone,各执行各的
锁只会锁 对象 和 Class
new --> this具体的一个手机
static --> Class唯一的一个模板
集合类不安全 List不安全CopyOnWrite:写入时复制,COW思想
多个线程调用的时候,读取固定,写入可能存在覆盖操作
COW思想,就是在写入的时候复制一份,避免覆盖造成数据问题
List list = new CopyOnWriteArrayList<>();
使用CopyOnWriteArrayList比用Vector的优势:没有使用synchronized修饰,使用的Lock锁,效率更高
在并发下,ArrayList不安全:(以下代码使用ArrayList会报异常)
会报异常:java.util.ConcurrentModificationException,并发修改异常
解决方案:
//ListSet不安全list = new ArrayList<>(); //List list = new Vector<>(); //List list = Collections.synchronizedList(new ArrayList<>()); List list = new CopyOnWriteArrayList<>(); for (int i = 1; i <= 10; i++) { new Thread(()->{ list.add(UUID.randomUUID().toString().substring(0,5)); System.out.println(list); },String.valueOf(i)).start(); }
Set set = new CopyOnWriteArraySet();
//会报异常:java.util.ConcurrentModificationException,并发修改异常 //Setset = new HashSet<>(); //Set set = Collections.synchronizedSet(new HashSet<>()); Set set = new CopyOnWriteArraySet(); for (int i = 1; i <= 10 ; i++) { new Thread(()->{ set.add(UUID.randomUUID().toString().substring(0,5)); System.out.println(set); },String.valueOf(i)).start(); }
HashSet底层就是HashMap:
Set本质就是Map的key,因为key是无重复的
public HashSet() {
map = new HashMap<>();
}
//add方法
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
//PRESENT,一个不变的值
private static final Object PRESENT = new Object();
Map不安全
回顾Map的基本操作:
Mapmap = new HashMap<>();
Map是这样用的吗?
不是,工作中不用HashMap
默认等价于什么?
初始容量默认16(位运算),加载因子默认0.75,new HashMap(16,0.75)
Map
//报异常:ConcurrentModificationException //MapCallablemap = new HashMap<>(); //Map map = Collections.synchronizedMap(new HashMap<>()); Map map = new ConcurrentHashMap<>(); for (int i = 1; i <= 30; i++) { new Thread(()->{ map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5)); System.out.println(map); },String.valueOf(i)).start(); }
Callable接口类似于Runnable接口,都是为其实例可能由另一个线程执行的类设计的。
Runnable不返回结果,也不抛出异常,重写run()
Callable可以返回结果,也可以抛出异常,重写call()
public interface Callable{ V call() throws Exception; }
接口泛型什么类型,call方法返回值就返回什么类型
之前启动线程:
new Thread().start();//Thread(),可以传入Runnable类型的参数,但是不能传入Callable类型参数
但是Runnable接口有一个实现类FutureTask,而FutureTask可以传入参数Callable
那么Callable启动线程方式:
public class CallableTest {
public static void main(String[] args) {
MyThread thread = new MyThread();
FutureTask futureTask = new FutureTask(thread);
new Thread(futureTask,"A").start();
new Thread(futureTask,"B").start();
try {
//获取Callable的返回值结果
String res = (String) futureTask.get();//get方法可能会产生阻塞,一般放到最后
System.out.println(res);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
class MyThread implements Callable {
@Override
public String call() throws Exception {
System.out.println("call");
//return这里的操作可能会耗时,这里可能有等待,上面获取返回值得时候可能产生阻塞
return "我好帅";
}
}
注意:
允许一个或多个线程等待直到其他线程中执行的一组操作完成 的同步辅助,减法计数器
//计数器
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
//倒计时,总数是6(必须要执行任务的时候 使用)
CountDownLatch countDownLatch = new CountDownLatch(6);
for(int i=1;i<=6;i++){
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"Go out");
countDownLatch.countDown();//数量减1
},String.valueOf(i)).start();
}
//等待计数器归零,然后再向下执行
countDownLatch.await();
System.out.println("Close Door");
}
}
countDownLatch.countDown();//数量减1
countDownLatch.await();//等待计数器归零,然后再向下执行
每次有线程调用countDown()数量减1,如果计数器变为0,countDownLatch.await()会被唤醒,继续执行
CyclicBarrier线程计数器
//构造器
public CyclicBarrier(int parties) {//计数
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public class CyclicBarrierTest {
public static void main(String[] args) {
//累加计数器
//召唤龙珠线程
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("召唤神龙成功");
});
for (int i = 1; i <= 7; i++) {
final int temp = i;
new Thread(()->{
//Lambda代码块中,不能直接操作for循环的变量i
System.out.println(Thread.currentThread().getName()+"收集了"+temp+"个龙珠");
try {
cyclicBarrier.await();//等待
System.out.println("7颗龙珠收集完毕");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
Lambda代码块中,不能直接操作for循环的变量i(因为i是属于外面的类,而lambda表达式内是内部类,内部类访问局部变量需要加final)
龙珠个数到达7后,await()后的代码才会执行
SemaphoreThread-0收集了1个龙珠
Thread-1收集了2个龙珠
Thread-2收集了3个龙珠
Thread-3收集了4个龙珠
Thread-4收集了5个龙珠
Thread-5收集了6个龙珠
Thread-6收集了7个龙珠
召唤神龙成功
7颗龙珠收集完毕
7颗龙珠收集完毕
7颗龙珠收集完毕
7颗龙珠收集完毕
7颗龙珠收集完毕
7颗龙珠收集完毕
7颗龙珠收集完毕
一个计数信号量,用来限制线程数量的,限流的时候用
semaphore.acquire();//获得,如果已经满了,等待,等到被释放为止
semaphore.release();//释放,会将当前的信号量+1,然后唤醒等待的线程
public class SemaphoreTest {
public static void main(String[] args) {
//参数:线程数量,比作停车位有3个
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) {//6个车
new Thread(()->{
//acquire() 获得
//release() 释放
try {
semaphore.acquire();//抢车位
System.out.println(Thread.currentThread().getName()+"抢到车位");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"离开车位");
} catch (Exception e) {
e.printStackTrace();
}finally {
semaphore.release();//释放车位
}
},String.valueOf(i)).start();
}
}
}
读写锁
ReadWriteLock:读可以被多个线程同时读,写一个时刻只能一个线程写
不加读写锁:(线程写入会出现混乱)
public class ReadWriteLockTest {
public static void main(String[] args) {
MyCache myCache = new MyCache();
//写入
for (int i = 1; i <= 6; i++) {
final int temp = i;
new Thread(()->{myCache.put(temp+"",temp+"");},String.valueOf(i)).start();
}
//读取
for (int i = 1; i <= 6; i++) {
final int temp = i;
new Thread(()->{myCache.get(temp+"");},String.valueOf(i)).start();
}
}
}
//自定义缓存
class MyCache{
private volatile Map map = new HashMap<>();
private Lock lock = new ReentrantLock();
//存 写的过程
public void put(String key,Object value){
System.out.println(Thread.currentThread().getName()+"写入"+key);
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"写入完毕");
}
//取 读的过程
public void get(String key){
System.out.println(Thread.currentThread().getName()+"读取"+key);
Object obj = map.get(key);
System.out.println(Thread.currentThread().getName()+"读取完毕");
}
}
1写入1
2写入2
2写入完毕
1写入完毕
4写入4
3写入3
4写入完毕
3写入完毕
5写入5
6写入6
5写入完毕
6写入完毕
加入读写锁后:(也可以用synchronized、Lock锁,但是没有ReadWriteLock控制读写线程效果好)
一个时刻只有一个线程写入,直到写入完毕,下一个线程(读或写线程)才会执行
public class ReadWriteLockTest {
public static void main(String[] args) {
MyCache myCache = new MyCache();
//写入
for (int i = 1; i <= 20; i++) {
final int temp = i;
new Thread(()->{myCache.put(temp+"",temp+"");},String.valueOf(i)).start();
}
//读取
for (int i = 1; i <= 20; i++) {
final int temp = i;
new Thread(()->{myCache.get(temp+"");},String.valueOf(i)).start();
}
}
}
//自定义缓存
class MyCache{
private volatile Map map = new HashMap<>();
private ReadWriteLock rwLock = new ReentrantReadWriteLock();
//存 写的过程
public void put(String key,Object value){
rwLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"写入"+key);
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"写入完毕");
} catch (Exception e) {
e.printStackTrace();
} finally {
rwLock.writeLock().unlock();
}
}
//取 读的过程
public void get(String key){
rwLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"读取"+key);
Object obj = map.get(key);
System.out.println(Thread.currentThread().getName()+"读取完毕");
} catch (Exception e) {
e.printStackTrace();
} finally {
rwLock.readLock().unlock();
}
}
}
1写入1 1写入完毕 2写入2 2写入完毕
3写入3 3写入完毕 4写入4 4写入完毕
5写入5 5写入完毕 6写入6 6写入完毕
7写入7 7写入完毕 8写入8 8写入完毕
9写入9 9写入完毕 11写入11 11写入完毕
12写入12 12写入完毕 13写入13 13写入完毕
1读取1 1读取完毕 2读取2 2读取完毕
3读取3 3读取完毕 5读取5 5读取完毕
7读取7 7读取完毕 8读取8 8读取完毕
9读取9 9读取完毕 10读取10 10读取完毕
11读取11 13读取13 13读取完毕 11读取完毕
10写入10 10写入完毕 6读取6 6读取完毕
4读取4 4读取完毕 12读取12 12读取完毕
独占锁(写锁):一次只能被一个线程占有
共享锁(读锁):多个线程可以同时占有
阻塞队列BlockingQueue一般用在多线程并发和线程池里
写入:如果队列满了,就必须阻塞等待
读出:如果队列是空的,必须阻塞等待生产
BlockingQueue已实现的类:常用的有,ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue
阻塞队列四组API| 方式 | 抛出异常,有返回值 | 不抛异常,有返回值 | 阻塞等待,无返回值 | 超时等待 |
|---|---|---|---|---|
| 添加 | add() | offer() | put(),无返回值 | offer(,) |
| 移除 | remove() | poll() | take(),有返回值 | poll(,) |
| 检测队首元素 | element() | peek() | - | - |
抛出异常
public static void test1(){
//参数:队列的容量3
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3);
System.out.println(blockingQueue.add("a"));//true
System.out.println(blockingQueue.add("b"));//true
System.out.println(blockingQueue.add("c"));//true
//队列已满,出现异常:IllegalStateException: Queue full
System.out.println(blockingQueue.add("d"));
System.out.println("==============");
//队列:FIFO
System.out.println(blockingQueue.remove());//a
System.out.println(blockingQueue.remove());//b
System.out.println(blockingQueue.remove());//c
//队列已空,出现异常:NoSuchElementException
System.out.println(blockingQueue.remove());
}
不抛异常,有返回值
//2.不抛异常,有返回值false和null
public static void test2(){
//参数:队列的容量
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3);
System.out.println(blockingQueue.offer("a"));//true
System.out.println(blockingQueue.offer("b"));//true
System.out.println(blockingQueue.offer("c"));//true
System.out.println(blockingQueue.offer("d"));//false
System.out.println("==============");
//队列:FIFO
System.out.println(blockingQueue.poll());//a
System.out.println(blockingQueue.poll());//b
System.out.println(blockingQueue.poll());//c
System.out.println(blockingQueue.poll());//null
}
等待,一直阻塞
//3.等待阻塞,一直阻塞
public static void test3() throws InterruptedException{
//队列大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3);
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
blockingQueue.put("d");//一直等待,直到能够进入队列
System.out.println(blockingQueue.take());//a
System.out.println(blockingQueue.take());//b
System.out.println(blockingQueue.take());//c
System.out.println(blockingQueue.take());//一直等待,直到能从队列中取出元素
}
超时等待
//4.超时等待
public static void test4() throws InterruptedException {
//队列大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3);
System.out.println(blockingQueue.offer("a"));//true
System.out.println(blockingQueue.offer("b"));//true
System.out.println(blockingQueue.offer("c"));//true
System.out.println(blockingQueue.offer("d",2, TimeUnit.SECONDS));//等待超过2秒退出
System.out.println("==============");
//队列:FIFO
System.out.println(blockingQueue.poll());//a
System.out.println(blockingQueue.poll());//b
System.out.println(blockingQueue.poll());//c
System.out.println(blockingQueue.poll(2,TimeUnit.SECONDS));//2秒钟过后不再等待
}
没有容量,进去一个元素,必须等待取出来之后,才能往里面放一个元素(队列不能存储元素)
代码//同步队列
public class SynchronousQueueTest {
public static void main(String[] args) {
SynchronousQueue synchronousQueue = new SynchronousQueue();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"put 1");
synchronousQueue.put("1");
System.out.println(Thread.currentThread().getName()+"put 2");
synchronousQueue.put("2");
System.out.println(Thread.currentThread().getName()+"put 3");
synchronousQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T1").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"take 1");
synchronousQueue.take();
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"take 2");
synchronousQueue.take();
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"take 3");
synchronousQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T2").start();
}
}
线程池(重点)
池化技术:事先准备好一些资源,有人要用,就来我这里拿,用完之后还回来
常见池化技术的使用:线程池、连接池、内存池、对象池、常量池…
线程池的优点:
线程复用、可以控制最大并发数、管理线程
线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式创建
线程池:三大方法、七大参数、四种拒绝策略
三大方法 Executors类public static void main(String[] args) {
ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程
// ExecutorService threadPool = Executors.newFixedThreadPool(5);//固定线程
// ExecutorService threadPool = Executors.newCachedThreadPool();//可伸缩线程
try{
for(int i=1;i<=5;i++){
//使用线程池方法创建线程
//线程池用完必须关闭,加入finally结构
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"——ok");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown();//关闭线程池
}
}
方法原码:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}
七大参数
本质上三大方法都是调用ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,//核心线程大小
int maximumPoolSize,//最大核心线程池大小
long keepAliveTime,//空闲线程存活时间
TimeUnit unit,//存活时间单位
BlockingQueue workQueue,//阻塞队列
ThreadFactory threadFactory,//线程工厂,创建线程的
RejectedExecutionHandler handler) {//拒绝策略
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
参数
已知:
银行窗口总共有五个,两个窗口一直都开,剩下的三个是空闲窗口(等候区人满,再来人之后开)
等候区可以坐下三个人(线程)
分析:
以上一直都开的两个窗口,被称为核心线程大小(第一个参数)
银行所有的窗口,被称为最大线程池大小(第二个参数)
剩下三个窗口已经打开前提下,当人在一段时间内没有访问窗口时,窗口会再次关闭(第三个参数,释放)
一段时间的单位(第四个参数)
等候区的座位(第五个参数,阻塞队列)
一般不变(第六个参数,线程工厂)
所有窗口和等候区都有人时,再进来的人可以有四种处理方式(第七个参数,拒绝策略[4种])
四大策略拒绝策略默认的是:AbortPolicy(终止策略)
拒绝策略有四种,因为ThreadExecutorPool的第七个参数:RejectedExecutionHandler接口有四个实现类
package mypool;
import java.util.concurrent.*;
public class MyThreadPool {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
try{
for (int i = 1; i <= 8; i++) {
//最大承载:阻塞队列+最大线程池
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"--OK");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown();
}
}
}
小结
最大线程池大小的设置依据(调优):
函数式接口:只有一个方法的接口
优点:简化编程模型
包:java.util.function
四大函数式接口
函数型接口:有一个输入参数,一个输出参数
原码:
@FunctionalInterface public interface Function{ R apply(T t); }
Function
public class FunctionTest {
public static void main(String[] args) {
Function function = new Function(){
@Override
public Integer apply(String s) {
return 2;
}
};
//简化一
Function function1 = (hello)->{return 1;};
//简化二
Function function2 = hello -> 66;
System.out.println(function.apply("lll"));
}
}
Predicate
断定型接口:有一个输入参数,返回值只能是boolean类型
public interface Predicate{ boolean test(T t); }
public class PredicateTest {
public static void main(String[] args) {
Predicate predicate = new Predicate() {
//判断字符串是否为空
public boolean test(String s) {
return s.isEmpty();
}
};
//简化
Predicate predicate1 = (str)->{
return str.isEmpty();
};
}
}
Consumer
**消费型接口:**只有输入,没有返回值
public interface Consumer{ void accept(T t); }
public class ConsumerTest {
public static void main(String[] args) {
Consumer consumer = new Consumer() {
public void accept(String s) {
System.out.println(s);
}
};
//简化
Consumer consumer1 = (str)->{
System.out.println(str);
};
}
}
Supplier
**供给型接口:**没有参数,只有返回值
@FunctionalInterface public interface Supplier{ T get(); }
public class SupplierTest {
public static void main(String[] args) {
Supplier supplier = new Supplier() {
@Override
public Object get() {
return "我好笨";
}
};
System.out.println(supplier.get());
}
}
Stream流式计算
必须掌握:lambda表达式,链式编程,函数式接口,Stream流式计算
概念Stream流式计算:就是用来计算的
java.util.stream
例题:
public static void main(String[] args) {
User u1 = new User(1,"a",21);
User u2 = new User(2,"b",22);
User u3 = new User(3,"c",23);
User u4 = new User(4,"d",24);
User u5 = new User(6,"e",25);
//集合就是用来存储的
List list = Arrays.asList(u1,u2,u3,u4,u5);
//计算交给流 链式编程
list.stream()//把集合转化成流
.filter(user -> {return user.getId()%2==0;})//id为偶数
.filter(user -> {return user.getAge()>23;})//年龄大于23
.map(user -> {return user.getName().toUpperCase();})//名字转成大写
.sorted((name1,name2)->{return name2.compareTo(name1);})
.limit(1)
.forEach(System.out::println);
}
ForkJoin
可以并行执行任务,提高效率。大数据量
窃取任务当线程B执行完任务后,会窃取线程A的任务执行
案例分析求和计算:
ForkJoinTest类
public class ForkJoinTest extends RecursiveTask{ private Long start; private Long end; //临界值 private Long temp = 10000L; public ForkJoinTest(Long start, Long end) { this.start = start; this.end = end; } @Override protected Long compute() { if((end-start) Long sum = 0L; for(Long i = start; i<=end; i++){ sum += i; } return sum; }else { Long middle = (start+end)/2; ForkJoinTest task1 = new ForkJoinTest(start,middle); task1.fork();//把任务压入线程队列 ForkJoinTest task2 = new ForkJoinTest(middle,end); task2.fork(); return task1.join() + task2.join();//任务结果 } } }
Test类:
package forkjoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
test1();//9040
test2();//8731
test3();//557
}
//普通方法计算
public static void test1(){
long time1 = System.currentTimeMillis();
Long sum=0L;
for(Long i = 1L;i<=10_0000_0000;i++){
sum += i;
}
long time2 = System.currentTimeMillis();
System.out.println("执行时间:"+(time2-time1));
System.out.println("sum="+sum);
}
//ForkJoin方法计算
public static void test2() throws ExecutionException, InterruptedException {
long time1 = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask task = new ForkJoinTest(0L,10_0000_0000L);
ForkJoinTask submit = forkJoinPool.submit(task);
Long sum = submit.get();
long time2 = System.currentTimeMillis();
System.out.println("执行时间:"+(time2-time1));
System.out.println("sum="+sum);
}
//Stream流方式计算
public static void test3(){
long time1 = System.currentTimeMillis();
//range左开右闭 rangeClosed左开右开
Long sum = LongStream
.range(0L,10_0000_0000)
.parallel()//并行操作
.reduce(0,Long::sum);//Long里面的sum方法
long time2 = System.currentTimeMillis();
System.out.println("执行时间:"+(time2-time1));
System.out.println("sum="+sum);
}
}
异步回调
异步测试
package future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class Demo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//发起一个请求
//Future的一个实现类CompletableFuture
CompletableFuture completableFuture = CompletableFuture.runAsync(()->{
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"runAsync=>Void");
});
System.out.println("111");
completableFuture.get();
CompletableFuture completableFuture1 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName()+"supplyAsync=>Integer");
int i = 10/0;
return 1024;
});
System.out.println(completableFuture1.whenComplete((t,u)->{
System.out.println("t=>"+t);
System.out.println("u=>"+u);
}).exceptionally((e)->{
System.out.println(e.getMessage());
return 233;
}).get());
}
}
JMM
JMM(Java Memory Model):java内存模型,不存在的东西,仅仅是个概念或约定
关于JMM的一些同步的约定:
JMM模型
线程间通信必须要经过主内存。
如下,如果线程A与线程B之间要通信的话,必须要经历下面2个步骤:
1)线程A把本地内存A中更新过的共享变量刷新到主内存中去。
2)线程B到主内存中去读取线程A之前已更新过的共享变量
关于主内存与工作内存之间的具体交互协议,即一个变量如何从主内存拷贝到工作内存、如何从工作内存同步到主内存之间的实现细节,Java内存模型定义了以下八种操作来完成:
Java内存模型还规定了在执行上述八种基本操作时,必须满足如下规则:
Volatile是java虚拟机提供的轻量级的同步机制
被volatile修饰的共享变量,三大特性:
通过使用Lock前缀的指令禁止变量在线程工作内存中缓存来保证volatile变量的内存可见性、通过插入内存屏障禁止会影响变量内存可见性的指令重排序
对任意单个volatile变量的读/写具有原子性,但类似于volatile++这种复合操作不具有原子性
保证可见性
public class VolatileDemo1 {
//变量num不加volatile,程序会出现死循环(对主存的改变是不可见的)
//加入volatile,就会对主存中共享变量一直可见
private volatile static int num = 0;
public static void main(String[] args) throws InterruptedException {
new Thread(()->{
while (num==0){
}
}).start();
TimeUnit.SECONDS.sleep(1);
num = 1;
System.out.println(num);
}
}
不保证原子性
原子性:不可分割(要么都成功,要么都失败)
package jmm;
public class VolatileDemo2 {
private volatile static int num = 0;
public synchronized static void add(){
num++;
}
public static void main(String[] args) {
for(int i=1;i<=20;i++){
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
while (Thread.activeCount()>2){
Thread.yield();
}
//理论上num为20000,但是num++,不是一个原子性操作
//方法一:可以加synchronize或lock锁
//方法二:使用原子性类
System.out.println(num);
}
}
num++底层有四个步骤:(不是原子性操作,是不安全的)
如果不允许使用synchronized锁和lock锁,怎样保证原子性?
使用原子类:
package jmm;
import java.util.concurrent.atomic.AtomicInteger;
public class VolatileDemo2 {
private volatile static AtomicInteger num = new AtomicInteger();
public static void add(){
num.getAndIncrement();//i++
}
public static void main(String[] args) {
for(int i=1;i<=20;i++){
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
while (Thread.activeCount()>2){
Thread.yield();
}
//理论上num为20000,但是线程操作是不保证原子性的,一般达不到20000
//方法一:可以加synchronize或lock锁
//方法二:使用原子性类
System.out.println(num);//20000
}
}
禁止指令重排
指令重排:计算机并不是按照我们写的程序进行执行的
为了使得处理器内部的运算单元能尽量被充分利用,处理器可能会对输入代码进行乱序执行(Out-Of-Order Execution)优化,处理器会在计算之后将乱序执行的结果重组,保证该结果与顺序执行的结果是一致的,但并不保证程序中各个语句计算的先后顺序与输入代码中的顺序一致。因此,如果存在一个计算任务依赖另一个计算任务的中间结果,那么其顺序性并不能靠代码的先后顺序来保证。与处理器的乱序执行优化类似,Java虚拟机的即时编译器中也有类似的指令重排序(Instruction Reorder)优化
源代码---->编译器优化的重排---->指令并行也可能重排---->内存系统也会重排---->执行
指令重排会考虑数据之间的依赖性
int x = 1;//1 int y = 2;//2 x = x + 5;//3 y = x * x;//4 //我们所期望的执行顺序是:1 2 3 4 //但是可能会出现 2 1 3 4 或 1 3 2 4 //不会出现:4 3 2 1,因为3和4的执行需要依赖于1的数据
例如:
有两个线程A和B,a b x y 默认值为0
| 线程A | 线程B |
|---|---|
| x = a | y = b |
| b = 1 | a = 2 |
正常结果:x = 0,y = 0
但是有可能会发生先执行b=1,再执行y=b,则y=1,如下
| 线程A | 线程B |
|---|---|
| b=1 | a=2 |
| x=a | y=b |
异常结果:x = 2, y = 1
单例模式而Volatile可以避免指令重排
反编译:javap -p xxx.class
饿汉式单例package singleton;
public class Hungry {
//可能会造成空间浪费
private byte[] data1 = new byte[1024];
private byte[] data2 = new byte[1024];
private byte[] data3 = new byte[1024];
private byte[] data4 = new byte[1024];
private Hungry(){}
private final static Hungry hungry = new Hungry();
public static Hungry getInstance(){
return hungry;
}
}
懒汉式单例
package singleton;
public class LazyMan {
private LazyMan(){
System.out.println(Thread.currentThread().getName()+"被得到");
}
private volatile static LazyMan lazyMan;
//双重检测锁模式(DCL:Double Check Lock 懒汉式)
public static LazyMan getInstance(){
if (lazyMan==null){
synchronized (LazyMan.class){
if(lazyMan==null){
lazyMan = new LazyMan();//不是原子性操作
}
}
}
return lazyMan;
}
}
静态内部类
//静态内部类
public class Holder {
private Holder(){}
public static Holder getInstance(){
return InnerClass.HOLDER;
}
public static class InnerClass{
private static final Holder HOLDER = new Holder();
}
}
枚举类单例
以上三种方式,在反射面前都不是安全的
只有枚举是最安全的,且原码中没有无参构造方法(通过反编译得到原码)
package singleton;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
public enum EnumSingle {
INSTANCE;
public EnumSingle getInstance(){
return INSTANCE;
}
}
class Test{
public static void main(String[] args) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
Constructor declaredConstructor = EnumSingle.class.getDeclaredConstructor(null);
declaredConstructor.setAccessible(true);
EnumSingle instance1 = declaredConstructor.newInstance();//操作失败
EnumSingle instance2 = declaredConstructor.newInstance();//操作失败
System.out.println(instance1);
System.out.println(instance2);
}
}
CAS
CAS:Compare and Swap,即比较再交换
原子类的底层用了CAS,CAS是CPU的并发原语
概念CAS的全称是: Compare And Swap(比较并交换),CAS是实现并发算法时常用到的技术, Java并发包中的很多类都使用了CAS技术,如ConcurrentHashMap, AtomicInteger原子操作类等等。
CAS操作涉及到3个操作值:
当且仅当预估值等于内存中的值的时候,才将新的值保存到概述日内存中, 否则什么都不做。
作用
CAS可以将比较和交换转换为原子操作,这个原子操作直接由CPU保证, CAS可以保证共享变量赋值时的原子操作
特点
CAS是一种非阻塞算法的实现,它能在不使用锁的情况下实现多线程安全, 所以CAS也是- -种无锁算法。
非阻塞算法:一个线程失败或挂起并不会导致其他线程也失败或挂起
案例演示 CAS优缺点**优点:**由于CAS是非阻塞的,可以避免优先级倒置和死锁等问题;
性能好,使用无锁的方式,没有锁竞争带来的系统开销,也没有线程间频繁调度带来的开销
缺点:
原码分析public class CASDemo {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(200);
//compareAndSet:
// public final boolean compareAndSet(int expect, int update) {
// return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
// }
//期望 更新
//如果我们期望的值达到了,就更新,否则就不更新
System.out.println(atomicInteger.compareAndSet(200, 8));//true
System.out.println(atomicInteger.get());//8
atomicInteger.getAndIncrement()//相当于num++
}
}
getAndIncrement底层实现:
//原码 使用了Unsafe类
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
//自旋锁
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
unsafe类可以操作内存:
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
ABA问题
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(200);
//compareAndSet:
// public final boolean compareAndSet(int expect, int update) {
// return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
// }
//期望 更新
//如果我们期望的值达到了,就更新,否则就不更新
System.out.println(atomicInteger.compareAndSet(200, 280));//true
System.out.println(atomicInteger.get());//280
System.out.println(atomicInteger.compareAndSet(280, 200));//true
System.out.println(atomicInteger.get());//280
System.out.println(atomicInteger.compareAndSet(200, 6));//true
System.out.println(atomicInteger.get());//6
}
我们不知道此200非彼200了,已经被换过了
解决方法加版本号(时间戳),使用AtomicStampedReference操作
public static void main(String[] args) {
AtomicStampedReference atomicInteger = new AtomicStampedReference<>(20,1);
new Thread(()->{
int stamp = atomicInteger.getStamp();//获取版本号
System.out.println("A1=>"+stamp);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(atomicInteger.compareAndSet(20, 30,
atomicInteger.getStamp(), atomicInteger.getStamp() + 1));
System.out.println("A2=>"+stamp);
System.out.println(atomicInteger.compareAndSet(30, 20,
atomicInteger.getStamp(), atomicInteger.getStamp() + 1));
System.out.println("A3=>"+atomicInteger.getStamp());
},"A").start();
new Thread(()->{
int stamp = atomicInteger.getStamp();//获取版本号
System.out.println("B1=>"+stamp);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(atomicInteger.compareAndSet(20, 66,
stamp, stamp + 1));
System.out.println("B2=>"+stamp);
},"B").start();
}
可重入锁
synchronized
package lock;
//输出结果
// A发短信
// A打电话
// B发短信
// B打电话
public class Demo1 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(()->{
phone.send();
},"A").start();
new Thread(()->{
phone.send();
},"B").start();
}
}
class Phone{
public synchronized void send(){
System.out.println(Thread.currentThread().getName()+"发短信");
call();
}
public synchronized void call(){
System.out.println(Thread.currentThread().getName()+"打电话");
}
}
reentrantLock
package lock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
//输出结果
// A发短信
// A打电话
// B发短信
// B打电话
public class Demo2 {
public static void main(String[] args) {
Phone2 phone2 = new Phone2();
new Thread(()->{
phone2.send();
},"A").start();
new Thread(()->{
phone2.send();
},"B").start();
}
}
class Phone2{
Lock lock = new ReentrantLock();
public void send(){
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+"发短信");
call();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void call(){
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+"打电话");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
自旋锁
概念
自旋锁(spinlock):是指当一个线程在获取锁的时候,如果锁已经被其它线程获取,那么该线程将循环等待,然后不断的判断锁是否能够被成功获取,直到获取到锁才会退出循环。
获取锁的线程一直处于活跃状态,但是并没有执行任何有效的任务,使用这种锁会造成busy-waiting。
class spinlock {
private AtomicReference cas;
spinlock(AtomicReference cas){
this.cas = cas;
}
public void lock() {
Thread current = Thread.currentThread();
// 利用CAS
while (!cas.compareAndSet(null, current)) { //为什么预期是null??
// DO nothing
System.out.println("I am spinning");
}
}
public void unlock() {
Thread current = Thread.currentThread();
cas.compareAndSet(current, null);
}
}
lock() 方法利用的CAS,当第一个线程A获取锁的时候,能够成功获取到,不会进入while循环,
如果此时线程A没有释放锁,另一个线程B又来获取锁,此时由于不满足CAS,所以就会进入while循环,不断判断是否满足CAS,直到A线程调用unlock方法释放了该锁。
代码分析自定义Spinlock:
package lock;
import java.util.concurrent.atomic.AtomicReference;
//自旋锁
public class SpinLock {
//int 默认为0
//Thread 默认为null
AtomicReference atomicReference = new AtomicReference<>();
//加锁
public void myLock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName()+"==> myLock");
//自旋锁
// (第一个线程进来,不会进入循环)
//(当第一个线程进来没有释放锁,第二个线程也进来了,那么会进入循环自旋)
// 因为预期的不等于内存,不为空就进入循环,当第一个线程进来后,预期肯定不为空
while(!atomicReference.compareAndSet(null,thread)){
}
}
//解锁
public void myUnlock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName()+"==> myUnlock");
atomicReference.compareAndSet(thread,null);
}
}
测试:
package lock;
import java.util.concurrent.TimeUnit;
public class TestSpinLock {
public static void main(String[] args) throws InterruptedException {
SpinLock lock = new SpinLock();
new Thread(()->{
lock.myLock();
try {
TimeUnit.SECONDS.sleep(10);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.myUnlock();
}
},"A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
lock.myLock();
try {
TimeUnit.SECONDS.sleep(5);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.myUnlock();
}
},"B").start();
}
}
代码结果:
A==> myLock B==> myLock A==> myUnlock B==> myUnlock
代码分析:
A线程获得对象(休眠10秒),一秒后,B线程也想获得这个锁(发现和预期null不一致,进入while自旋)
A线程休眠10秒后释放锁,B线程跳出自旋获得锁,休眠5秒,释放锁
死锁排查死锁:
使用jstack 进程号找到死锁问题
jstack 16068