线程池(二)

线程池MyThreadPoolExecutor的执行流程和工作线程的执行逻辑如下:

线程池执行流程:

  1. 初始化:在创建MyThreadPoolExecutor实例时,会设置核心线程数、最大线程数、空闲存活时间以及工作队列。

  2. 提交任务:通过execute(Runnable command)方法提交一个任务到线程池。

  3. 任务判断:线程池会检查当前状态和任务的有效性。

  4. 核心线程判断:如果工作线程数量小于核心线程数,尝试创建一个新线程来执行任务。

  5. 工作队列判断:如果工作线程数量已达到核心线程数,尝试将任务放入工作队列。

  6. 最大线程判断:如果工作队列已满或无法添加任务,且工作线程数量小于最大线程数,创建一个新线程来执行任务。

  7. 拒绝策略:如果任务既不能放入工作队列,又不能创建新线程,则线程池会采取拒绝策略,通常会抛出一个异常。

工作线程执行逻辑:

  1. 工作线程创建:通过addWorker(Runnable command, boolean core)方法创建一个新的工作线程。

  2. 启动线程:新创建的工作线程启动,开始执行Worker类的run方法。

  3. 获取任务:工作线程通过runWorkers(Worker worker)方法尝试获取任务。如果是核心线程,它会无限期地等待任务;如果是非核心线程,则会有一个超时时间。

  4. 执行任务:一旦获取任务,工作线程执行任务的run方法。

  5. 任务完成:任务执行完成后,工作线程记录已完成的任务数。

  6. 继续工作:工作线程尝试获取下一个任务,继续执行,直到没有任务可执行。

  7. 线程移除:当工作线程没有任务可执行时,它会尝试从工作线程集合中移除自己。

  8. 线程终止:如果线程池处于停止状态,工作线程将终止执行。

package com.threadpool;

import com.sun.media.sound.SF2InstrumentRegion;
import javafx.concurrent.Worker;

import java.util.HashSet;
import java.util.Scanner;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MyThreadPoolExecutor {
    private int corePoolCount;
    private int maxPoolCount;
    private int keepLiveTime;

    BlockingDeque<Runnable> workQueue;

    public MyThreadPoolExecutor(int corePoolCount, int maxPoolCount, int keepLiveTime, BlockingDeque<Runnable> workQueue) {
        this.corePoolCount = corePoolCount;
        this.maxPoolCount = maxPoolCount;
        this.keepLiveTime = keepLiveTime;
        this.workQueue = workQueue;
    }

    //当前线程的状态
    private AtomicInteger status = new AtomicInteger();
    private AtomicInteger workCount = new AtomicInteger();//工作线程的数量 都是线程安全的
    private HashSet<Worker> workers = new HashSet<>();//工作线程
    private final Lock lock = new ReentrantLock();//hashset不安全辅助锁
    private final static Integer RUNNING = 0;
    private final static Integer STOP = 1;
    //当前完成的总任务数
    private int completedTaskCount = 0;


    public void execute(Runnable command) {
        if (command == null) throw new NullPointerException();
        if (status.get() == STOP) throw new RuntimeException();
        if (status.get() == RUNNING) {
            if (workCount.get() < corePoolCount) {
                if (addWorker(command, true)) {//增加核心线程数
                    return;
                }
            }
            if (workQueue.offer(command)) {//只是放到任务队列
                return;
            }
            if (workCount.get() < maxPoolCount) {
                if (addWorker(command, false)) {//创建新的线程
                    return;
                }
            }
            throw new RuntimeException("拒绝策略");
        }
    }

    private boolean addWorker(Runnable command, boolean core) {//创建新的线程加入工作者队列
        if (status.get() == STOP) return false;
        retry:
        while (true) {
            if (status.get() == STOP) return false;
            while (true) {
                if (workCount.get() >= (core ? corePoolCount : maxPoolCount)) {
                    return false;//可能其他线程也创建了
                }
                //创建工作线程
                if (!casAddWorkerCount()) {//自增失败
                    continue retry;
                }
                break retry;
            }
        }
        Worker worker = null;
        try {
            lock.lock();
            worker = new Worker(command);
            final Thread thread = worker.thread;
            if (thread != null) {
                if (thread.isAlive()) throw new IllegalArgumentException();
            }
            thread.start();
            workers.add(worker);//向hashset中添加
        } finally {
            lock.unlock();
        }
        return true;
    }

    public void shutdown(){
        lock.lock();
        try {
            setState(STOP);
            interruptIdleWorkers();
        }finally {
            lock.unlock();
        }
    }
    private void setState(Integer stop){
        if (status.get()==stop) return;
        while (true){
            if (status.get() == stop){
                break;
            }
            if (status.compareAndSet(status.get(),stop)){
                break;
            }
        }
    }
    private void interruptIdleWorkers(){
        lock.lock();
        try {
            for (Worker worker : workers) {
                if (!worker.thread.isInterrupted()){
                    worker.thread.interrupt();
                }
                this.completedTaskCount += worker.completedTasks;
            }
        }finally {
            lock.unlock();
        }
    }

    private void runWorkers(Worker worker) {
        if (worker == null) throw new NullPointerException();
        try {
            Runnable task = worker.firstTask;
            Thread wt = worker.thread;
            worker.firstTask = null;
            while (task != null || (task = getTask()) != null) {//不断的取任务
                if (wt.isInterrupted()) {
                    System.out.println("this thread is interrupted");
                }
                if (status.get() == STOP) {
                    System.out.println("this threadPool has already stopped");
                }
                task.run();//调用方法
                task = null;
                worker.completedTasks++;
            }
        }catch (Exception e){
            e.printStackTrace();
        } finally {
            //没任务了
            while (true) {
                if (casDelWorkerCount()) {
                    completedTaskCount += worker.completedTasks;
                    break;
                } else {
                    continue;
                }
            }
            lock.lock();
            try {
                workQueue.remove(worker);
            } finally {
                lock.unlock();
            }
        }
    }
    private Runnable getTask() {
        boolean timeout = false;
        Runnable task = null;
        try {
            while (true) {
                if (timeout){
                    return null;
                }
                if (status.get() == STOP) throw new NullPointerException();
                //常驻工作队列
                if (workCount.get() <= corePoolCount){
                    task = workQueue.take();//如果workQueue是空 那么会阻塞在这里 直到不为空 “保活”线程
                }else{
                    task = workQueue.poll(keepLiveTime, TimeUnit.SECONDS);//如果在一定时间内拿不到直接返回null
                }
                if (task != null){
                    return task;
                }
                timeout = true;
            }
        }catch (InterruptedException exception){
            exception.printStackTrace();
            return null;
        }
    }
    private boolean casAddWorkerCount() {
        workCount.compareAndSet(workCount.get(), workCount.get() + 1);
        return true;
    }

    private boolean casDelWorkerCount() {
        workCount.compareAndSet(workCount.get(), workCount.get() - 1);
        return true;
    }

    private final class Worker implements Runnable {//工作者线程
        final Thread thread;
        Runnable firstTask;
        volatile int completedTasks;

        private Worker(Runnable firstTask) {
            this.firstTask = firstTask;
            this.thread = new Thread(this);
            this.completedTasks = 0;
        }

        @Override
        public void run() {
            runWorkers(this);//工作
        }
    }
}

关键逻辑点:

  • 线程安全:使用AtomicIntegerReentrantLock来保证线程池中对工作线程数量和状态的修改是线程安全的。
  • 任务队列:使用BlockingDeque作为任务队列,可以阻塞或超时地获取任务。
  • 工作线程的管理:通过HashSetLock来管理工作线程,保证在添加和移除工作线程时的线程安全。
  • 状态管理:使用AtomicInteger status来跟踪线程池的状态,决定是否接受新任务。

注意:

  • 代码中casAddWorkerCount()casDelWorkerCount()方法的实现似乎有误,因为它们没有使用compareAndSet()方法,这可能导致工作线程数量的不一致。
  • shutdown()方法中,应该在中断空闲线程后,也要尝试中断正在执行任务的工作线程,以确保线程池能够顺利关闭。
  • 代码中的setState(Integer stop)方法中,状态设置的逻辑可能存在竞态条件,因为它没有考虑状态在设置过程中可能被其他线程改变的情况。

在实际使用中,需要对这些潜在的问题进行修正和优化。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/592296.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

网络基础(全)

协议 ”协议“就是一种约定。那么协议需要需要管理吗&#xff1f;答案是当然需要管理呀。 操作系统要进行协议管理——先描述&#xff0c;在组织协议本质就是软件&#xff0c;软件是可以进分层的协议在设计的时候&#xff0c;就是被层状的划分的为什么要划分为层状结呢&#…

综合能源系统:Modbus转IEC104网关解决方案

Modbus转IEC104网关BE102 方案概述 Modbus和IEC104是两种通信协议&#xff0c;各自适用于不同行业和场景&#xff0c;其中Modbus常见于工业自动化&#xff0c;而IEC104则主导电力行业。在某些项目中&#xff0c;需要将Modbus设备的数据传至IEC104电力平台&#xff0c;但两者协…

[嵌入式系统-65]:RT-Thread-组件:FinSH控制台, 用户与RT Thread OS实时命令行交互工具

目录 FinSH 控制台 1. FinSH 简介 2. FinSH 内置命令 - 内核代码自身提供的命令 显示线程状态 显示信号量状态 显示事件状态 显示互斥量状态 显示邮箱状态 显示消息队列状态 显示内存池状态 显示定时器状态 显示设备状态 显示动态内存状态 3. 自定义 FinSH 命令 …

【5/01-5/03】 Arxiv安全类文章速览

知识星球 首先推荐一下我们的知识星球&#xff0c;以AI与安全结合作为主题&#xff0c;包括AI在安全上的应用和AI本身的安全&#xff1b; 加入星球你将获得&#xff1a; 【Ai4sec】&#xff1a;以数据驱动增强安全水位&#xff0c;涵盖内容包括&#xff1a;恶意软件分析&…

MATLAB中功率谱密度计算pwelch函数使用详解

MATLAB中功率谱密度计算pwelch函数使用详解 目录 前言 一、pwelch函数简介 二、pwelch函数参数说明 三、pxx pwelch(x)示例 四、[pxx,f]pwelch(x,window,noverlap,nfft,fs)示例 四、[pxx,f] pwelch(x,window,noverlap,nfft,fs,freqrange,spectrumtype)示例 五、多通道功…

# cmd 报错 “npm 不是内部或外部命令,也不是可运行的程序 或批处理文件”

cmd 报错 “npm 不是内部或外部命令,也不是可运行的程序 或批处理文件” 1、报错原因分析&#xff1a; Node.js 没有安装或安装不正确。 npm 的路径没有添加到系统环境变量中。 安装 Node.js 时选择了不包含 npm 的安装选项。 2、解决方法&#xff1a; 1&#xff09;在 cm…

【房屋】租房攻略,萌新第一次租房需要考虑的要素(通勤、地段、房源)

【房屋】租房攻略&#xff0c;萌新第一次租房需要考虑的要素&#xff08;通勤、地段、房源&#xff09; 文章目录 1、位置要好&#xff08;通勤近 vs 地段好&#xff09;2、户型要好&#xff08;朝向/楼层&#xff0c;独卫/家具&#xff0c;水电费&#xff09;3、价格要便宜4、…

Github 2024-05-03 Java开源项目日报 Top9

根据Github Trendings的统计,今日(2024-05-03统计)共有9个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Java项目9Kotlin项目1C++项目1libGDX: 跨平台Java游戏开发框架 创建周期:4284 天开发语言:Java, C++协议类型:Apache License 2.0Star数量:2…

DDD:根据maven的脚手架archetype生成ddd多模块项目目录结构

随着领域驱动的兴起&#xff0c;很多人都想学习如何进行ddd的项目开发&#xff0c;那ddd的项目结构是怎么样的&#xff1f;又是如何结合SpringBoot呢&#xff1f;那么针对这个问题&#xff0c;笔者使用maven的archetype封装一个相对通用的ddd的项目目录&#xff0c;方便一键生成…

函数模板 template

函数模板的定义和调用 注意&#xff1a; 在调用函数模板时&#xff0c;编译器会根据调用的函数的参数类型自动推导出T的类型。 优先选择普通函数 强制调用函数模板 函数模板不能对函数的参数自动强制类型转换 myPrintAll(10,b)//普通函数&#xff0c;因为普通函数将b强制转换成…

安装vscode基础配置,es6基础语法,

https://code.visualstudio.com/ es6 定义变量 const声明常量&#xff08;只读变量&#xff09; // 1、声明之后不允许改变 const PI “3.1415926” PI 3 // TypeError: Assignment to constant variable. // 2、一但声明必须初始化&#xff0c;否则会报错 const MY_AGE /…

极简单行阅读器:上班族的摸鱼神器

在忙碌的工作日中&#xff0c;我们经常需要寻找一些方式来放松自己&#xff0c;而阅读无疑是一种既能够放松心情&#xff0c;又能增长知识的方式。今天&#xff0c;我要向大家介绍一个名为“极简单行阅读器”的神器&#xff0c;它不仅能够满足你的阅读需求&#xff0c;还能让你…

时也命也!反派失败于错估了主角的实力——早读(逆天打工人爬取热门微信文章解读)

此子断不可留 引言Python 代码第一篇 洞见 人到中年最大的清醒&#xff1a;时也&#xff0c;运也&#xff0c;命也第二篇 人民日报要闻社会政策 结尾 自知之明是最难得的知识 真正的智慧来自于对自己能力和局限的深刻理解 引言 最近在看仙葫 然后昨天晚上刷了一下这个诛仙 发现…

Qt之信号与槽

槽的本质&#xff1a;对信号响应的函数。 信号函数和槽函数通常位于某个类中&#xff0c;和普通的成员函数相⽐&#xff0c;它们的特别之处在于&#xff1a; 信号函数⽤ signals 关键字修饰&#xff0c;槽函数⽤ public slots、protected slots 或者 private slots 修饰。sign…

前端基础学习html-->表单标签

目录 表单标签&#xff1a; 表单域&#xff1a; 表单控件(表单元素)&#xff1a; 提示信息: 表单标签&#xff1a; 表单标签顾名思义就是一种表格&#xff0c;用于收集用户信息 在html&#xff0c;一个完整的表单域是由表单域&#xff0c;表单控件(表单元素)和提示信息组…

揭秘Fabric交易流程:一文带你深入了解

随着区块链技术的日益普及&#xff0c;Hyperledger Fabric作为一种联盟链解决方案&#xff0c;受到了广泛关注。那么&#xff0c;Fabric的交易流程究竟是怎样的呢&#xff1f;本文将为您一一揭晓。 1. Fabric交易的参与方 客户端&#xff1a;交易流程的发起方&#xff0c;发起…

Java web第五次作业

1.在idea中配置好数据源 2、视频案例中只给出了查询所有结果的示例&#xff0c;请自己完成添加、删除、修改操作的代码。以下供参 考。 Delete("delete from emp where id#{id}") public void delete(Integer id); 测试代码 Test public void testDelete(){ empMa…

springboot 整合 knife4j-openapi3

适用于&#xff1a;项目已使用shiro安全认证框架&#xff0c;整合knife4j-openapi3 1.引入依赖 <!-- knife4j-openapi3 --> <dependency><groupId>com.github.xiaoymin</groupId><artifactId>knife4j-openapi3-spring-boot-starter</artifa…

SpringBoot+Vue项目在线视频教育平台

一、前言介绍 本系统采用的数据库是Mysql&#xff0c;使用SpringBoot框架开发&#xff0c;运行环境使用Tomcat服务器&#xff0c;idea是本系统的开发平台。在设计过程中&#xff0c;充分保证了系统代码的良好可读性、实用性、易扩展性、通用性、便于后期维护、操作方便以及页面…

ThreeJS:常见几何体与基础材质入门

在前文《ThreeJS:Geometry与顶点|索引|面》中&#xff0c;我们了解了与Geometry几何体相关的基础概念&#xff0c;也尝试了如何通过BufferGeometry自定义几何体。 常见Geometry几何体 ThreeJS内部也提供了诸多封装好的几何体&#xff0c;常见的Geometry几何体如下图所示&#…
最新文章