跳转至

多线程

自定义线程池

直接使用 new Thread() 创建线程,线程数量过多会占用大量内存,导致JVM内存溢出。通过自定义线程池,可以解决这个问题。

private static final int corePoolSize = 2;
private static final int maxPoolSize = corePoolSize * 2;

/**
 * 创建一个固定大小的线程池
 */
private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
        // 核心线程数
        corePoolSize,
        // 最大线程数
        maxPoolSize,
        // 空闲线程存活时间(单位:秒)
        60L, 
        TimeUnit.SECONDS,
        // 工作队列,这里使用有界队列
        new ArrayBlockingQueue<>(200),
        // 线程工厂,用于创建新线程
        Executors.defaultThreadFactory(),
        // 拒绝策略,在队列满时如何处理新的任务,默认AbortPolicy会抛出RejectedExecutionException异常,这里使用CallerRunsPolicy让提交的线程自己执行任务
        new ThreadPoolExecutor.CallerRunsPolicy() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                log.keyword("CustomThreadPool", "线程池被打满").info("异步任务被拒绝,由请求线程自己执行, Runnable: " + r.toString() + ", 当前线程池状态: " + e.toString());
                super.rejectedExecution(r, e);
            }
        }
);

优雅停机

线程池的 shutdown() 方法会等待所有任务执行完毕,然后关闭线程池。但是,如果任务执行时间过长,或者任务数量过多,可能会导致停机时间过长。

/**
 * 关闭线程池
 */
public static void shutdown() {
    if (!threadPool.isShutdown()) {
        log.keyword("CustomThreadPool").info("尝试关闭线程池");
        // 有序关闭线程池,继续执行已提交的任务,不再接受新任务
        threadPool.shutdown();
        try {
            // 等待所有任务完成或超时,再关闭线程池
            if (!threadPool.awaitTermination(180, TimeUnit.SECONDS)) {
                log.keyword("CustomThreadPool").info("线程池未在规定时间180秒内关闭");
            }
        } catch (InterruptedException ie) {
            log.keyword("CustomThreadPool").error("尝试关闭线程池异常 e:{}", ie);
            log.keyword("CustomThreadPool").info("尝试强制关闭线程池");
            // 尝试停止所有正在执行的任务,停止对等待任务的处理
            threadPool.shutdownNow();
            // 标记中断线程
            Thread.currentThread().interrupt();
        } finally {
            if (!threadPool.isTerminated()) {
                log.keyword("CustomThreadPool").info("线程池未在规定时间内关闭,尝试强制关闭线程池");
                threadPool.shutdownNow();
            }
        }
    }
}

异步任务

CompletableFuture 结合自定义线程池实现异步任务处理。

/**
 * 执行异步任务
 */
public static CompletableFuture<Void> executeAsync(Runnable task) {
    log.keyword("CustomThreadPool").info("提交异步任务: " + task.toString());
    String crid = Logtube.getProcessor().getCrid();
    return CompletableFuture.runAsync(() -> {
        Logtube.getProcessor().setCrid(crid);
        task.run();
    }, threadPool);
}

/**
 * 执行异步任务带返回值
 */
public static<T> CompletableFuture<T> executeSupplyAsync(Supplier<T> task) {
    log.keyword("CustomThreadPool").info("提交带返回值异步任务: " + task.toString());
    String crid = Logtube.getProcessor().getCrid();
    return CompletableFuture.supplyAsync(() -> {
        Logtube.getProcessor().setCrid(crid);
        return task.get();
    }, threadPool);
}

评论