第45天学习打卡(Set 不安全 Map不安全 Callable 常用的辅助类 读写锁 阻塞队列 线程池)
阅读原文时间:2022年01月13日阅读:1

Set不安全

 package com.kuang.unsafe;
 ​
 import java.util.*;
 import java.util.concurrent.CopyOnWriteArraySet;
 ​
 /**
  * 同理可证:ConcurrentModificationException并发修改异常
  * 解决方法:
  * //1.Set set = Collections.synchronizedSet(new HashSet<>());工具类的写法
  * //2.Set set = new CopyOnWriteArraySet<>();
  */
 public class SetTest {
     public static void main(String[] args) {
      // Set set = new HashSet<>();
       // Set set = Collections.synchronizedSet(new HashSet<>());
         Set set = new CopyOnWriteArraySet<>();
 ​
         for (int i = 1; i <= 30; i++) {              new Thread(()->{
                 set.add(UUID.randomUUID().toString().substring(0,5));
                 System.out.println(set);
            },String.valueOf(i)).start();
        }
    }
 }
 ​

hashSet底层是什么??

   public HashSet() {
         map = new HashMap<>();
    }
 //add Set本质是map key是无法重复的
  public boolean add(E e) {
         return map.put(e, PRESENT)==null;
    }
 //PRESENT是常量固定的值 不会变
  private static final Object PRESENT = new Object();

Map不安全

回顾map的基本操作

 package com.kuang.unsafe;
 ​
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 ​
 //ConcurrentModificationException 并发修改异常
 public class MapTest {
     public static void main(String[] args) {
         //map是这样用的吗?   不是 工作中不用HashMap
         //默认等价于什么?? new HashMap<>(16,0.75);
         //HashMap map = new HashMap<>();
         Map map = new ConcurrentHashMap<>();
         for (int i = 1; i < 30; i++) {              new Thread(()->{
                 map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5));
                 System.out.println(map);
 ​
            },String.valueOf(i)).start();
             
        }
 ​
         //加载因子 初始化容量
    }
 }
 ​

7.Callable

1.可以有返回值

2.可以抛出异常

3.方法不同 run()/ call()

代码测试

 package com.kuang.callable;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.locks.ReentrantLock;
 public class CallableTest {
     public static void main(String[] args) throws ExecutionException, InterruptedException {
        //new Thread(new Runnable()).start();//Runnable()) 和FutureTask()两者等价
        //new Thread(new FutureTask(Callable)).start();//FutureTask可以调用(Callable)
         //new Thread().start()是可以启动Callable
         new Thread().start();//怎么启动Callable
         MyThread thread = new MyThread();
         FutureTask futureTask = new FutureTask<>(thread);//适配类
 ​
         new Thread(futureTask,"A").start();
         Integer o = (Integer) futureTask.get();//获取Callable返回结果
         System.out.println(o);
 ​
 ​
    }
 }
 class MyThread implements Callable {
     @Override
 ​
     public Integer call(){
         System.out.println("call()");
         return 1024;
 ​
    }
 }

 package com.kuang.callable;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.locks.ReentrantLock;
 public class CallableTest {
     public static void main(String[] args) throws ExecutionException, InterruptedException {
        //new Thread(new Runnable()).start();//Runnable()) 和FutureTask()两者等价
        //new Thread(new FutureTask(Callable)).start();//FutureTask可以调用(Callable)
         //new Thread().start()是可以启动Callable
         new Thread().start();//怎么启动Callable
         MyThread thread = new MyThread();
         FutureTask futureTask = new FutureTask<>(thread);//适配类
 ​
         new Thread(futureTask,"A").start();//A为线程
         new Thread(futureTask,"B").start();//B为线程   打印的结果只有一个call 因为结果会被缓存,效率高
         Integer o = (Integer) futureTask.get();//获取Callable返回结果
         //这个get方法可能会产生阻塞,把它放在最后
         //或者使用异步通信来处理
         System.out.println(o);
 ​
 ​
    }
 }
 class MyThread implements Callable {
     @Override
 ​
     public Integer call(){
         System.out.println("call()");
         return 1024;
 ​
    }
 }

细节:

1.缓存

2.结果可能需要等待,会阻塞!

8 常用的辅助类(必会)

8.1 CountDownLatch

 package com.kuang.add;
 ​
 import java.util.concurrent.CountDownLatch;
 ​
 //计数器
 public class CountDownLatchDemo {
     public static void main(String[] args) throws InterruptedException {
         //总数是6的倒计时, 必须要执行任务的时候,再使用
         CountDownLatch countDownLatch = new CountDownLatch(6);
         for (int i = 1; i <= 6; i++) {              new Thread(()->{
                 System.out.println(Thread.currentThread().getName()+"Go out");
                 countDownLatch.countDown();//数量减一
 ​
            },String.valueOf(i)).start();
             
        }
         countDownLatch.await();//等待计数器归零,然后再向下执行
 ​
 ​
         System.out.println("Close Door");
 ​
 ​
    }
 }
 ​

减法计数器

原理:

  1. countDownLatch.countDown();//数量减一

  2. countDownLatch.await();//等待计数器归零,然后再向下执行

    每次有线程调用countDown()数量减一。假设计数器变为0,countDownLatch.await();就会被唤醒,继续执行!

8.2 CyclicBarrier

加法计数器

 package com.kuang.add;
 ​
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
 ​
 public class CyclicBarrierDemo {
     public static void main(String[] args) {
         /**
          * 集齐7颗龙珠召唤神龙
          */
         //召唤龙珠的线程
         CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
             System.out.println("召唤神龙成功!");
 ​
        });
 ​
 ​
         for (int i = 1; i <=7; i++) {              final int temp = i;  ​              //lambda能操作到i吗?              new Thread(()->{
                 System.out.println(Thread.currentThread().getName()+"搜集"+temp+"个龙珠");
                 try {
                     cyclicBarrier.await();//等待7个线程结束
                } catch (InterruptedException e) {
                     e.printStackTrace();
                } catch (BrokenBarrierException e) {
                     e.printStackTrace();
                }
 ​
 ​
            }).start();
 ​
        }
    }
 }
 ​

8.3 Semaphore

 package com.kuang.add;
 ​
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 ​
 public class SemaphoreDemo {
     public static void main(String[] args) {
         //线程数量: 类比于停车位 3个停车位 限流的时候可以用
         Semaphore semaphore = new Semaphore(3);
         for (int i = 1; i <=6 ; i++) {              new Thread(()->{
                 //acquire() 得到
                 try {
                     semaphore.acquire();
                     System.out.println(Thread.currentThread().getName()+"抢到车位");
                     TimeUnit.SECONDS.sleep(2);
                     System.out.println(Thread.currentThread().getName()+"离开车位");
 ​
 ​
 ​
                } catch (InterruptedException e) {
                     e.printStackTrace();
                }finally {
                     //release() 释放
                   semaphore.release();
                }
 ​
 ​
            },String.valueOf(i)).start();
             
        }
    }
 }
 ​

原理:

semaphore.acquire();//获取,假设已经满了,等待,等待释放为止!

semaphore.release();//释放,会将当前的信号量释放+1,然后唤醒等待的线程!

作用:多个共享资源互斥使用!并发限流,控制最大的线程数。

9读写锁

ReadWriteLock

 package com.kuang.rw;
 ​
 ​
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 ​
 /**
  * 独占锁(写锁)一次只能被一个线程占有
  * 共享锁(读锁) 多个线程可以同时占有
  * ReadWriteLockDemo
  * 读-读 可以共存
  * 读-写 不能共存
  * 写-写 不能共存
  */
 public class ReadWriteLockDemo {
     public static void main(String[] args) {
         MyCacheLock myCache = new MyCacheLock();
 ​
         //写入 lambda表达式无法访问外部变量,只有通过fianl变量去进行中间转换
         for (int i = 1; i <= 5; i++) {              final int temp = i;              new Thread(()->{
                 myCache.put(temp+"",temp+"");
            },String.valueOf(i)).start();
             
        }
         //读取
         for (int i = 1; i <= 5; i++) {              final int temp = i;              new Thread(()->{
                 myCache.get(temp+"");
            },String.valueOf(i)).start();
 ​
        }
 ​
       
    }
 }
 class MyCacheLock{
     private volatile Map map = new HashMap<>();
     //读写锁,更加细粒度的控制
     private ReadWriteLock readWriteLock =  new ReentrantReadWriteLock();
     //存,写的过程 只希望同时只有一个线程写
     public void put(String key,Object value){
         readWriteLock.writeLock().lock();
 ​
         try {
             System.out.println(Thread.currentThread().getName()+"写入"+key);
             map.put(key, value);
             System.out.println(Thread.currentThread().getName()+"写入OK");
 ​
        } catch (Exception e) {
             e.printStackTrace();
        } finally {
             readWriteLock.writeLock().unlock();
        }
 ​
    }
     //取,读的过程 所有人都可以去读
     public void get(String key){
         readWriteLock.readLock().lock();
 ​
         try {
             System.out.println(Thread.currentThread().getName()+"读取"+key);
             Object o = map.get(key);
             System.out.println(Thread.currentThread().getName()+"读取OK");
        } catch (Exception e) {
             e.printStackTrace();
        } finally {
             readWriteLock.readLock().unlock();
        }
 ​
 ​
 ​
    }
 ​
 ​
 }
 /**
  * 自定义缓存
  */
  class MyCache{
      private volatile Map map = new HashMap<>();
      //存,写的过程
 ​
      public void put(String key,Object value){
          System.out.println(Thread.currentThread().getName()+"写入"+key);
          map.put(key, value);
          System.out.println(Thread.currentThread().getName()+"写入OK");
 ​
      }
      //取,读的过程
     public void get(String key){
         System.out.println(Thread.currentThread().getName()+"读取"+key);
         Object o = map.get(key);
         System.out.println(Thread.currentThread().getName()+"读取OK");
 ​
 ​
    }
 ​
 ​
 }

10 阻塞队列

队列 :FIFO

写入:如果队列满了,就必须阻塞等待

取:如果队列是空的,必须阻塞等待生产

不得不阻塞

阻塞队列:

BlockingQueue: 不是新的东西

什么情况下我们会使用阻塞队列:多线程并发处理,线程池!

学会使用队列

添加 移除

四组API

方式

抛出异常

有返回值,不抛出异常

阻塞等待

超时等待

添加

add

offer()空参

put()

offer(,,)

移除

remove

pool()

take()

poll(,)

检测队首元素

element

peek()

-

-

 package com.kuang.bq;
 ​
 import java.util.concurrent.ArrayBlockingQueue;
 ​
 public class Test {
     public static void main(String[] args) {
         test1();
 ​
    }
     /**
      * 抛出异常
      */
     public static void test1(){
         //<>()放队列的大小
         ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
 ​
         System.out.println(blockingQueue.add("a"));
         System.out.println(blockingQueue.add("b"));
         System.out.println(blockingQueue.add("c"));
         System.out.println("++++++++++++++++++++++++++++");
 ​
 ​
         //IllegalStateException: Queue full抛出异常
        // System.out.println(blockingQueue.add("d"));
         System.out.println(blockingQueue.remove());
         System.out.println(blockingQueue.remove());
         System.out.println(blockingQueue.remove());
 ​
         //java.util.NoSuchElementException没有元素错误 抛出异常
         System.out.println(blockingQueue.remove());
 ​
    }
 }
 ​

 package com.kuang.bq;
 ​
 import java.util.concurrent.ArrayBlockingQueue;
 ​
 public class Test {
     public static void main(String[] args) {
         test2();
 ​
    }
     /**
      * 抛出异常
      */
     public static void test1(){
         //<>()放队列的大小
         ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
 ​
         System.out.println(blockingQueue.add("a"));
         System.out.println(blockingQueue.add("b"));
         System.out.println(blockingQueue.add("c"));
         System.out.println("++++++++++++++++++++++++++++");
 ​
 ​
         //IllegalStateException: Queue full抛出异常
        // System.out.println(blockingQueue.add("d"));
         System.out.println(blockingQueue.remove());
         System.out.println(blockingQueue.remove());
         System.out.println(blockingQueue.remove());
 ​
         //java.util.NoSuchElementException没有元素错误 抛出异常
         System.out.println(blockingQueue.remove());
 ​
    }
 ​
     /**
      * 有返回值,没有异常
      */
     public static void test2() {
         //队列的大小
     ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
      //存
     System.out.println(blockingQueue.offer("a"));
     System.out.println(blockingQueue.offer("b"));
     System.out.println(blockingQueue.offer("c"));
 ​
     System.out.println(blockingQueue.offer("d"));//返回false 不抛出异常
     //取
     System.out.println("===============================");
     System.out.println(blockingQueue.poll());
     System.out.println(blockingQueue.poll());
     System.out.println(blockingQueue.poll());
 ​
     System.out.println(blockingQueue.poll());//返回值为null,也不抛出异常
 ​
 ​
 ​
 }
 }

输出结果:

true true true

false

a b c null

Process finished with exit code 0

查看队首元素 System.out.println(blockingQueue.element())放在哪个remove 前面,哪个就是队首 其余的元素就被移除了

 package com.kuang.bq;
 ​
 import java.util.concurrent.ArrayBlockingQueue;
 ​
 public class Test {
     public static void main(String[] args) {
         test1();
 ​
    }
     /**
      * 抛出异常
      */
     public static void test1(){
         //<>()放队列的大小
         ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
 ​
         System.out.println(blockingQueue.add("a"));
         System.out.println(blockingQueue.add("b"));
         System.out.println(blockingQueue.add("c"));
 ​
 ​
         System.out.println("++++++++++++++++++++++++++++");
 ​
 ​
         //IllegalStateException: Queue full抛出异常
        // System.out.println(blockingQueue.add("d"));
        // System.out.println(blockingQueue.remove());
         System.out.println(blockingQueue.remove());
 ​
     
         System.out.println(blockingQueue.remove());
         System.out.println(blockingQueue.element());//查看队首元素
         System.out.println(blockingQueue.remove());
 ​
         //java.util.NoSuchElementException没有元素错误 抛出异常
         //System.out.println(blockingQueue.remove());
 ​
    }
 ​
     /**
      * 有返回值,没有异常
      */
     public static void test2() {
         //队列的大小
     ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
      //存
     System.out.println(blockingQueue.offer("a"));
     System.out.println(blockingQueue.offer("b"));
     System.out.println(blockingQueue.offer("c"));
 ​
     System.out.println(blockingQueue.offer("d"));//返回false 不抛出异常
     //取
     System.out.println("===============================");
     System.out.println(blockingQueue.poll());
     System.out.println(blockingQueue.poll());
     System.out.println(blockingQueue.poll());
 ​
     System.out.println(blockingQueue.poll());//返回值为null,也不抛出异常
 ​
 ​
 ​
 }
 }

 package com.kuang.bq;
 ​
 import java.util.concurrent.ArrayBlockingQueue;
 ​
 public class Test {
     public static void main(String[] args) {
         test2();
 ​
    }
     /**
      * 抛出异常
      */
     public static void test1(){
         //<>()放队列的大小
         ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
 ​
         System.out.println(blockingQueue.add("a"));
         System.out.println(blockingQueue.add("b"));
         System.out.println(blockingQueue.add("c"));
 ​
 ​
         System.out.println("++++++++++++++++++++++++++++");
 ​
 ​
         //IllegalStateException: Queue full抛出异常
        // System.out.println(blockingQueue.add("d"));
        // System.out.println(blockingQueue.remove());
         System.out.println(blockingQueue.remove());
 ​
 ​
         System.out.println(blockingQueue.remove());
         System.out.println(blockingQueue.element());//查看队首元素
         System.out.println(blockingQueue.remove());
 ​
         //java.util.NoSuchElementException没有元素错误 抛出异常
         //System.out.println(blockingQueue.remove());
 ​
    }
 ​
     /**
      * 有返回值,没有异常
      */
     public static void test2() {
         //队列的大小
     ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
      //存
     System.out.println(blockingQueue.offer("a"));
     System.out.println(blockingQueue.offer("b"));
     System.out.println(blockingQueue.offer("c"));
 ​
         System.out.println(blockingQueue.peek());
 ​
        // System.out.println(blockingQueue.offer("d"));//返回false 不抛出异常
     //取
     System.out.println("===============================");
     System.out.println(blockingQueue.poll());
     System.out.println(blockingQueue.poll());
     System.out.println(blockingQueue.poll());
 ​
   // System.out.println(blockingQueue.poll());//返回值为null,也不抛出异常
 ​
 ​
 ​
 }
 }

 package com.kuang.bq;
 ​
 import java.util.concurrent.ArrayBlockingQueue;
 ​
 public class Test {
     public static void main(String[] args) throws InterruptedException {
         test3();
 ​
    }
     /**
      * 抛出异常
      */
     public static void test1(){
         //<>()放队列的大小
         ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
 ​
         System.out.println(blockingQueue.add("a"));
         System.out.println(blockingQueue.add("b"));
         System.out.println(blockingQueue.add("c"));
 ​
 ​
         System.out.println("++++++++++++++++++++++++++++");
 ​
 ​
         //IllegalStateException: Queue full抛出异常
        // System.out.println(blockingQueue.add("d"));
        // System.out.println(blockingQueue.remove());
         System.out.println(blockingQueue.remove());
 ​
 ​
         System.out.println(blockingQueue.remove());
         System.out.println(blockingQueue.element());//查看队首元素
         System.out.println(blockingQueue.remove());
 ​
         //java.util.NoSuchElementException没有元素错误 抛出异常
         //System.out.println(blockingQueue.remove());
 ​
    }
 ​
     /**
      * 有返回值,没有异常
      */
     public static void test2() {
         //队列的大小
     ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
      //存
     System.out.println(blockingQueue.offer("a"));
     System.out.println(blockingQueue.offer("b"));
     System.out.println(blockingQueue.offer("c"));
 ​
         System.out.println(blockingQueue.peek());
 ​
        // System.out.println(blockingQueue.offer("d"));//返回false 不抛出异常
     //取
     System.out.println("===============================");
     System.out.println(blockingQueue.poll());
     System.out.println(blockingQueue.poll());
     System.out.println(blockingQueue.poll());
 ​
   // System.out.println(blockingQueue.poll());//返回值为null,也不抛出异常
 ​
 ​
 ​
 }
 /**
  * 等待,阻塞(一直阻塞)
  */
 public static void test3() throws InterruptedException {
     //队列的大小
     ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
 ​
     //一直阻塞
     //存
     blockingQueue.put("a");
     blockingQueue.put("b");
     blockingQueue.put("c");
    // blockingQueue.put("d");//队列没有位置了会一直等待
 ​
     System.out.println(blockingQueue.take());
     System.out.println(blockingQueue.take());
     System.out.println(blockingQueue.take());
     System.out.println(blockingQueue.take());//没有这个元素,会一直阻塞
 ​
 ​
 }
 ​
 /**
  * 等待,阻塞(等待超时)
  */
 }

 package com.kuang.bq;
 ​
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.TimeUnit;
 ​
 public class Test {
     public static void main(String[] args) throws InterruptedException {
         test4();
 ​
    }
     /**
      * 抛出异常
      */
     public static void test1(){
         //<>()放队列的大小
         ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
 ​
         System.out.println(blockingQueue.add("a"));
         System.out.println(blockingQueue.add("b"));
         System.out.println(blockingQueue.add("c"));
 ​
 ​
         System.out.println("++++++++++++++++++++++++++++");
 ​
 ​
         //IllegalStateException: Queue full抛出异常
        // System.out.println(blockingQueue.add("d"));
        // System.out.println(blockingQueue.remove());
         System.out.println(blockingQueue.remove());
 ​
 ​
         System.out.println(blockingQueue.remove());
         System.out.println(blockingQueue.element());//查看队首元素
         System.out.println(blockingQueue.remove());
 ​
         //java.util.NoSuchElementException没有元素错误 抛出异常
         //System.out.println(blockingQueue.remove());
 ​
    }
 ​
     /**
      * 有返回值,没有异常
      */
     public static void test2() {
         //队列的大小
     ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
      //存
     System.out.println(blockingQueue.offer("a"));
     System.out.println(blockingQueue.offer("b"));
     System.out.println(blockingQueue.offer("c"));
 ​
         System.out.println(blockingQueue.peek());
 ​
        // System.out.println(blockingQueue.offer("d"));//返回false 不抛出异常
     //取
     System.out.println("===============================");
     System.out.println(blockingQueue.poll());
     System.out.println(blockingQueue.poll());
     System.out.println(blockingQueue.poll());
 ​
   // System.out.println(blockingQueue.poll());//返回值为null,也不抛出异常
 ​
 ​
 ​
 }
 /**
  * 等待,阻塞(一直阻塞)
  */
 public static void test3() throws InterruptedException {
     //队列的大小
     ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
 ​
     //一直阻塞
     //存
     blockingQueue.put("a");
     blockingQueue.put("b");
     blockingQueue.put("c");
    // blockingQueue.put("d");//队列没有位置了会一直等待
 ​
     System.out.println(blockingQueue.take());
     System.out.println(blockingQueue.take());
     System.out.println(blockingQueue.take());
     System.out.println(blockingQueue.take());//没有这个元素,会一直阻塞
 ​
 ​
 }
 ​
 /**
  * 等待,阻塞(等待超时)
  */
 public static void test4() throws InterruptedException {
     //队列的大小
     ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
     //存
 ​
     blockingQueue.offer("a");
     blockingQueue.offer("b");
     blockingQueue.offer("c");
     //blockingQueue.offer("d",2, TimeUnit.SECONDS);//延迟两秒,超时退出
 ​
     System.out.println("===========================");
     System.out.println(blockingQueue.poll());
     System.out.println(blockingQueue.poll());
     System.out.println(blockingQueue.poll());
     blockingQueue.poll(2, TimeUnit.SECONDS);//等待超过2秒就退出
 ​
 ​
 ​
 }
 }

SynchronousQueue同步队列

没有容量

进去一个元素,必须等待取出来之后,才能往里面放一个元素

put take

 package com.kuang.bq;
 ​
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 ​
 /**
  * 同步队列
  * 和其他的BlockingQueue不一样,SynchronousQueue不存储元素
  * put了一个元素,必须从里面先take取出来,否则不能在put进去值
  */
 public class SynchronousQueueDemo {
     public static void main(String[] args) {
         BlockingQueue blockingQueue  = new SynchronousQueue<>();//同步队列
 ​
         new Thread(()->{
 ​
             try {
                 System.out.println(Thread.currentThread().getName()+"put 1");
                 blockingQueue.put("1");
                 System.out.println(Thread.currentThread().getName()+"put 2");
                 blockingQueue.put("2");
                 System.out.println(Thread.currentThread().getName()+"put 3");
                 blockingQueue.put("3");
            } catch (InterruptedException e) {
                 e.printStackTrace();
            }
 ​
        },"T1").start();
         new Thread(()->{
             try {
                 TimeUnit.SECONDS.sleep(3);
                 System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
                 TimeUnit.SECONDS.sleep(3);
                 System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
                 TimeUnit.SECONDS.sleep(3);
                 System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
            } catch (InterruptedException e) {
                 e.printStackTrace();
            }
        },"T2").start();
    }
 }
 ​

11、线程池(重点)

线程池:三大方法,七大参数,四种拒绝策略

池化技术

程序的运行,本质:占用系统的资源!优化资源的使用!=>池化技术

线程池、连接池、内存池、对象池///…. 创建、销毁,十分浪费资源

池化技术:事先准备好一些资源,有人要用,就来我这里拿,用完之后还给我。

线程池的好处:

1.降低资源的消耗

2.提高响应的速度

3.方便管理。

线程可以复用 ,可以控制最大并发数,管理线程

线程池:三大方法

 package com.kuang.pool;
 ​
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 // Executors工具类 三大方法
 //使用了线程池之后,使用线程池来创建线程
 public class Demo01 {
     public static void main(String[] args) {
        ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程
        //ExecutorService threadPool = Executors.newFixedThreadPool(5);//创建一个固定的线程池的大小
         //ExecutorService threadPool = Executors.newCachedThreadPool();//缓存的 可以伸缩,遇强则强,遇弱则弱
 ​
 ​
         try {
             for (int i = 0; i < 100 ; i++) {                  //使用了线程池之后,使用线程池来创建线程                  threadPool.execute(()->{
                     System.out.println(Thread.currentThread().getName()+" ok");
 ​
                });
 ​
            }
 ​
        } catch (Exception e) {
             e.printStackTrace();
        } finally {
             //线程池用完,程序结束,关闭线程池
             threadPool.shutdown();
        }
    }
 }
 ​

7大参数

源码分析:

 public static ExecutorService newSingleThreadExecutor() {
         return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                     0L, TimeUnit.MILLISECONDS,
                                     new LinkedBlockingQueue()));
    }

   public static ExecutorService newFixedThreadPool(int nThreads) {
         return new ThreadPoolExecutor(nThreads, nThreads,
                                       0L, TimeUnit.MILLISECONDS,
                                       new LinkedBlockingQueue());
    }
 ​

 public static ExecutorService newCachedThreadPool() {
         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,//约为21亿
                                       60L, TimeUnit.SECONDS,
                                       new SynchronousQueue());
    }
 ​
 ​
 本质: ThreadPoolExecutor

 public ThreadPoolExecutor(int corePoolSize,//核心线程池大小
                               int maximumPoolSize,//最大的线程池大小
                               long keepAliveTime,//超时了,没有人调用就会释放
                               TimeUnit unit,//超时单位
                               BlockingQueue workQueue,//阻塞队列
                               ThreadFactory threadFactory,//线程工厂,创建线程的,一般不用动
                               RejectedExecutionHandler handler//拒绝策略) {
         if (corePoolSize < 0 ||
             maximumPoolSize <= 0 ||
             maximumPoolSize < corePoolSize ||
             keepAliveTime < 0)
             throw new IllegalArgumentException();
         if (workQueue == null || threadFactory == null || handler == null)
             throw new NullPointerException();
         this.acc = System.getSecurityManager() == null ?
                 null :
                 AccessController.getContext();
         this.corePoolSize = corePoolSize;
         this.maximumPoolSize = maximumPoolSize;
         this.workQueue = workQueue;
         this.keepAliveTime = unit.toNanos(keepAliveTime);
         this.threadFactory = threadFactory;
         this.handler = handler;
    }

手动创建一个线程池

四种拒绝策略

 package com.kuang.pool;
 ​
 import java.util.concurrent.*;
 ​
 // Executors工具类 三大方法
 ​
 /**
  * new ThreadPoolExecutor.AbortPolicy());//银行满了,还有人进来,不处理这个人的,抛出异常
  * new ThreadPoolExecutor.CallerRunsPolicy());//哪来的去哪里
  * new ThreadPoolExecutor.DiscardPolicy());//队列满了,丢掉任务,不会抛出异常
  * new ThreadPoolExecutor.DiscardOldestPolicy());//队列满了,尝试去和最早的竞争,也不会抛出异常
  */
 //使用了线程池之后,使用线程池来创建线程
 public class Demo01 {
     public static void main(String[] args) {
         //自定义线程池!工作中使用ThreadPoolExecutor
         //假设核心线程池有2个 就是2个银行业务员,最大同时有5个人去办理 超过3秒就不等了 LinkedBlockingDeque<>()侯客区只能放3人
 ​
       ExecutorService threadPool = new ThreadPoolExecutor(2,
               5,
               3,
               TimeUnit.SECONDS,
               new LinkedBlockingDeque<>(3),
       Executors.defaultThreadFactory(),
               new ThreadPoolExecutor.DiscardOldestPolicy());//队列满了,尝试去和最早的竞争,也不会抛出异常
 ​
         try {
             //最大承载:Deque + max
             //RejectedExecutionException超出最大承载抛出的异常
             for (int i = 1; i <= 9 ; i++) {                  //使用了线程池之后,使用线程池来创建线程                  threadPool.execute(()->{
                     System.out.println(Thread.currentThread().getName()+" ok");
 ​
                });
 ​
            }
 ​
        } catch (Exception e) {
             e.printStackTrace();
        } finally {
             //线程池用完,程序结束,关闭线程池
             threadPool.shutdown();
        }
    }
 }
 ​

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器