小结与扩展
池的最大的大小如何去设置!
了解:IO密集型,CPU密集型:(调优)
//1.CPU密集型 几核就是几个线程 可以保持效率最高
//2.IO密集型判断你的程序中十分耗IO的线程,只要大于这个线程数就行 一般设置为这个耗IO线程数的两倍
package com.kuang.pool;
import java.util.concurrent.*;
// Executors工具类 三大方法
//使用了线程池之后,使用线程池来创建线程
public class Demo01 {
public static void main(String[] args) {
//1.CPU密集型 几核就是几个线程 可以保持效率最高
//2.IO密集型判断你的程序中十分耗IO的线程,只要大于这个线程数就行 一般设置为这个耗IO线程数的两倍
System.out.println(Runtime.getRuntime().availableProcessors());//获取CPU的核数
ExecutorService threadPool = new ThreadPoolExecutor(2,
Runtime.getRuntime().availableProcessors(),
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());//队列满了,尝试去和最早的竞争,也不会抛出异常
try {
//最大承载:Deque + max
//RejectedExecutionException超出最大承载抛出的异常
for (int i = 1; i <= 9 ; i++) {
//使用了线程池之后,使用线程池来创建线程
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+" ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//线程池用完,程序结束,关闭线程池
threadPool.shutdown();
}
}
}
新时代的程序员:lambda表达式,链式编程,函数式接口,Stream流式计算
函数式接口:只有一个方法接口。
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
//超级多FunctionalInterface
//简化编程模型,在新版本的框架中大量应用
//foreach(消费者类的函数式接口)
Function函数式接口
package com.kuang.function;
import java.util.function.Function;
/**
* Function 函数型接口,有一个输入参数,有一个输出
* 只要是函数型接口,可以用Lambda表达式简化
*/
public class Demo01 {
public static void main(String[] args) {
//工具类:输出输入的值
///Function
// @Override
// public String apply(String str) {
// return str;
// }
// };
//可以用Lambda表达式简化
Function
System.out.println(function.apply("asd"));
}
}
简化的lambda表达式
package com.kuang.function;
import java.util.function.Function;
/**
* Function 函数型接口,有一个输入参数,有一个输出
* 只要是函数型接口,可以用Lambda表达式简化
*/
public class Demo01 {
public static void main(String[] args) {
//工具类:输出输入的值
///Function
// @Override
// public String apply(String str) {
// return str;
// }
// };
//可以用Lambda表达式简化
Function
System.out.println(function.apply("asd"));
}
}
断定型接口:有一个输入参数,返回值只能是布尔值!
package com.kuang.function;
import java.util.function.Predicate;
/**
* 断定型接口:有一个输入参数,返回值只能是布尔值
*/
public class Demo02 {
public static void main(String[] args) {
//判断字符串是否为空
// Predicate
// @Override
// public boolean test(String str) {
// return str.isEmpty();
// }
// };
Predicate
System.out.println(predicate.test(""));
}
}
Consumer消费接口
package com.kuang.function;
import java.util.function.Consumer;
/**
*Consumer 消费接口:只有输入,没有返回值
/
public class Demo03 {
public static void main(String[] args) {
// Consumer
// @Override
// public void accept(String str) {
// System.out.println(str);
// }
// };
Consumer
System.out.println(str);
};
consumer.accept("sad");
}
}
Supplier供给型接口
package com.kuang.function;
import java.util.function.Supplier;
/**
*Supplier 供给型接口 没有参数,只有返回值
*/
public class Demo04 {
public static void main(String[] args) {
//Supplier supplier = new Supplier
// @Override
// public Integer get() {
// System.out.println("get()");
// return 1024;
// }
// };
Supplier supplier = ()->{return 1024; };
System.out.println(supplier.get());
}
}
什么是Stream流式计算
大数据:存储 + 计算
集合、MySQL本质就是存储东西的;
计算都应该交给流来操作!
运行时出现的问题:
java: 无法将类 com.kuang.stream.User中的构造器 User应用到给定类型;
解决方法:
package com.kuang.stream;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
private int id;
private String name;
private int age;
}
package com.kuang.stream;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
/**
* 题目要求:一分钟内完成此题,只能用一行代码实现!
* 1.ID必须是偶数
* 2.年龄必须大于23岁
* 3.用户名转为大写字母
* 4.用户名字母倒着排序
* 5.只输出一个用户!
*/
public class Test {
public static void main(String[] args) {
User u1 = new User(1,"a",21);
User u2 = new User(2,"b",22);
User u3 = new User(3,"c",23);
User u4 = new User(4,"d",24);
User u5 = new User(6,"e",25);
//集合就是存储
List
//计算交给Stream流
//链式编程 lambda表达式 函数式接口 Stream流
list.stream().filter(u->{return u.getId()%2==0;})
.filter(u->{return u.getAge()>23;} )
.map(u->{return u.getName().toUpperCase();})
.sorted((uu1,uu2)->{return uu2.compareTo(uu1);})
.limit(1)
.forEach(System.out::println);
}
}
分支合并
ForkJoin 在JDL1.7 并行执行任务!提高效率,大数据量!
大数据:Map Reduce(把大任务拆分为小任务)
ForkJoin特点:工作窃取(B做完后把A没有做完的拿过来做)
这个里面 维护的都是双端队列
ForkJoin的操作
package com.kuang.forkjoin;
import java.util.concurrent.RecursiveTask;
/**
* 求和计算的任务
* 如何使用forkjoin
* 1.forkjoinPool通过它来执行
* 2.计算任务forkjoinPool.execute(ForkJoinTask task)
*3.计算类要继承ForkJoinTask
*/
public class ForkJoinDemo extends RecursiveTask
private Long start;
private Long end;
//临界值
private Long temp = 10000L;
public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
//计算方法
@Override
protected Long compute() {
//如果小于临界值用if里面的方法 如果大于则用forkjoin
if((end-start)<temp){
Long sum = 0L;
for (Long i = start; i <= end; i++) {
sum += i;
}
return sum;
}
else{//forkjoin 递归
long middle = (start + end) / 2;//中间值
ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
task1.fork();//拆分任务,把任务压入线程队列
ForkJoinDemo task2 = new ForkJoinDemo(middle+1, end);
task2.fork();//拆分任务,把任务压入线程队列
return task1.join() + task2.join();
}
}
}
package com.kuang.forkjoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// test1();//执行的时间5867
//test2();//执行时间4332
test3();//sum=时间:152
}
public static void test1(){
Long sum = 0L;
long start = System.currentTimeMillis();
for (Long i = 1L; i <= 10_0000_0000; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println("sum="+sum+" 时间:"+(end-start));
}
//会使用ForkJoin
public static void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask
// forkJoinPool.execute(task);//执行任务 但是没有结果
ForkJoinTask
Long sum = submit.get();
long end = System.currentTimeMillis();
System.out.println("sum="+sum+"时间:"+(end-start));
}
public static void test3(){
long start = System.currentTimeMillis();
//Stream并行流 range():范围都是开括号 rangeClosed(]范围是开闭
long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
long end = System.currentTimeMillis();
System.out.println("sum="+"时间:"+(end-start));
}
}
Future 设计的初衷:
package com.kuang.future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* 异步调用: CompletableFuture
* //异步执行
* //成功回调
* //失败回调
*/
public class Demo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//发起一个请求
//没有返回值的runAsync 异步回调
/* CompletableFuture
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"runAsync=>Void");
});
System.out.println("11111");
completableFuture.get();//获取阻塞执行结果*/
//supplyAsync供给型参数没有参数 只有具体结果。这个结果会拥有一个返回值
//有返回值的异步回调supplyAsync
//ajax ,成功和失败都有回调 失败时返回的时错误信息
//与同步处理相对,异步处理不会阻塞当前线程来等待处理完成
CompletableFuture
System.out.println(Thread.currentThread().getName()+"supplyAsync=>Integer");
int i = 10/0;
return 1024;
});
System.out.println( completableFuture.whenComplete((t,u)->{
System.out.println("t=>"+t);//正常的返回结果
System.out.println("u=>"+u);//有错误会打印错误信息u=>java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zer
}).exceptionally((e)->{
System.out.println(e.getMessage());//打印异常信息
return 233;//可以获取到错误的返回结果
}).get());
}
}
请你谈谈你对Volatile的理解
Volatile是Java虚拟机提供轻量级的同步机制
1.保证可见性
2.不保证原子性
3.禁止指令重排
什么是JMM
JMM:java内存模型,不存在的东西,是概念!可以理解为约定!
关于JMM的一些同步的约定:
1.线程解锁前:必须把共享变量立刻刷回主存。
2.线程加锁前,必须读取主存中的最新值到工作内存中!
3.加锁和解锁必须是同一把锁
线程工作内存、主内存
内存交互操作有8种,虚拟机实现必须保证每一个操作都是原子的,不可在分的(对于double和long类型的变量来说,load、store、read和write操作在某些平台上允许例外)
* lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态
unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用
load (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中
use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令
assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中
store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用
write (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中
JMM对这八种指令的使用,制定了如下规则:
* 不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write
不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存
不允许一个线程将没有assign的数据从工作内存同步回主内存
一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是怼变量实施use、store操作之前,必须经过assign和load操作
一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁
如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值
如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
对一个变量进行unlock操作之前,必须把此变量同步回主内存
JMM对这八种操作规则和对volatile的一些特殊规则就能确定哪里操作是线程安全,哪些操作是线程不安全的了。但是这些规则实在复杂,很难在实践中直接分析。所以一般我们也不会通过上述规则进行分析。更多的时候,使用java的happen-before规则来进行分析。
问题:程序不知道主内存的值已经被修改过了
1、保证可见性
package com.kuang.tvolatile;
import java.util.concurrent.TimeUnit;
public class JMMDemo {
//不加volatile程序1感知不到主程序的变化,程序就会死循环
//加volatile可以保证可见性
private volatile static int num = 0;
public static void main(String[] args) throws InterruptedException {//main主线程
new Thread(()->{//线程1 对主内存的变化是不知道的
while(num==0){
}
}).start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
num = 1;
System.out.println(num);
}
}
2.不保证原子性
原子性:不可分割
线程A在执行任务的时候,不能被打扰的,也不能被分割。要么同时成功,要么同时失败。
package com.kuang.tvolatile;
//不保证原子性
public class VDemo02 {
//volatile不保证原子性
private volatile static int num = 0;
public static void add(){
num++;
}
public static void main(String[] args) {
//理论上num结果应该为2万
for (int i = 1; i <= 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
while(Thread.activeCount()>2){//现存的线程大于2就代表没有执行完
//activeCount表示现存的线程 main gc 线程是Java默认在执行的
Thread.yield();//礼让
}
System.out.println(Thread.currentThread().getName()+ " " + num);
}
}
如果不加lock和synchronized,该怎么保证原子性
使用原子类,解决原子性问题
package com.kuang.tvolatile;
import java.util.concurrent.atomic.AtomicInteger;
//不保证原子性
public class VDemo02 {
//volatile不保证原子性
private volatile static AtomicInteger num = new AtomicInteger();
public static void add(){
// num++;//不是原子性操作
num.getAndIncrement();//AtomicInteger的加一方法 CAS
}
public static void main(String[] args) {
//理论上num结果应该为2万
for (int i = 1; i <= 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
while(Thread.activeCount()>2){//现存的线程大于2就代表没有执行完
//activeCount表示现存的线程 main gc 线程是Java默认在执行的
Thread.yield();//礼让
}
System.out.println(Thread.currentThread().getName()+ " " + num);
}
}
这些类的底层都直接和操作系统挂钩!在内存中修改值!Unsafe类是一个很特殊的存在!
指令重排
什么是指令重排:你写的程序,计算机并不是按照你写的那样去执行的。
源代码---->编译器优化 重排---->指令并行也可能重排------>内存系统也会重排---->执行
处理器在进行指令重排的时候,考虑:数据之间的依赖性!
int x = 1;//步骤1
int y = 2;//步骤2
x = x + 5;//步骤3
y = x * x;//步骤4
我们所希望的是:1234 但是可能执行的时候会变成2134 1324
可不可能是4123!
不可能 因为数据之间有依赖性
可能造成影响的结果: a b x y这四个值默认都是0
线程A
线程B
x =a
y=b
b=1
a=2
正常的结果:x = 0 ;y = 0
线程A
线程B
b=1
a=2
x=a
y=b
指令重排导致的诡异结果:x= 2; y= 1;
volatile可以避免指令重排
内存屏障。CPU指令。作用:
1.保证特定的操作的执行顺序!
2.可以保证某些变量的内存可见性(利用这些特性volatile实现了可见性)
普通读----->普通写---->内存屏障:禁止上指令和下指令顺序交换---->volatile写--->内存屏障:禁止上指令和下指令顺序交换
volatile是可以保证可见性,不能保证原子性,由于内存屏障,可以保证避免指令重排的现象产生。
手机扫一扫
移动阅读更方便
你可能感兴趣的文章