Java百万级别数据异步导出功能实现

前言

谈到以excel格式导出数据,很容易想到的实现思路就是:前端发送导出请求,后端使用org.apache.poi进行数据处理,然后使用reponse把流文件响应给前端,供客户端下载。但是这种实现方式会带来以下问题:
1.数据量比较大的情况下,会导致OOM
2.数据庞大,后端响应前端超时
3.采用同步方式导出数据,响应时间长,用户体验感差
基于此,本文提供一种异步导出数据的方案,实现百万级别数据的导出。

程序流程图

程序流程图.png

技术采用

Spring Boot、线程池、Hikari连接池数据库连接池、七牛云、MyBatis-Plus、定时器Scheduled、POI,MySQL

技术细节

1.线程池

Q1:采用线程池有什么好处?
A1:
①降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗;
②提高响应速度:任务到达时,无需等待线程创建即可立即执行;
③提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。

Q2:ThreadPoolExecutor参数含义?
A2:
①corePoolSize:核心线程数
* 核心线程会一直存活,即使没有任务需要执行
* 当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理
* 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭

②queueCapacity:任务队列容量(阻塞队列)
* 当核心线程数达到最大时,新任务会放在队列中排队等待执行

③maxPoolSize:最大线程数
* 当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
* 当线程数=maxPoolSize,且任务队列已满时,线程池采取拒绝策略

④keepAliveTime:线程空闲时间
* 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
* 如果allowCoreThreadTimeout=true,则会直到线程数量=0

⑤allowCoreThreadTimeout:允许核心线程超时

⑥rejectedExecutionHandler:任务拒绝处理器
* 两种情况会拒绝处理任务:
- 当线程数已经达到maxPoolSize,切队列已满,会拒绝新任务
- 当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务
* 线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置默认是AbortPolicy,会抛出异常
* ThreadPoolExecutor类有几个内部实现类来处理这类情况:
- AbortPolicy 丢弃任务,抛运行时异常
- CallerRunsPolicy 交由提交线程任务的线程执行,会影响程序的整体性能
- DiscardPolicy 不处理新任务,直接丢弃
- DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务
* 实现RejectedExecutionHandler接口,可自定义处理器

Q3:线程池任务调度流程?
A3:

任务调度流程.png

线程池的配置:
@Configuration
public class TaskExecutorConfig {
    /**
     * 文件导出线程池
     */
    @Bean
    public ThreadPoolTaskExecutor fileExportExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(3);
        executor.setMaxPoolSize(6);
        executor.setQueueCapacity(2);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("fileExportExecutor-");
        // 设置拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(3);
        return executor;
    }
}
任务执行
//将导出任务交给线程池执行
Future<Boolean> future = fileExportExecutor.submit(exportDataTask);

执行线程任务的方式有两种:
①excute(),用于提交不需要返回值的任务,无法判断是否任务已经执行完成
②submit(),用于提交需要返回值的任务,会返回一个Future对象,通过调用Future类中的get()可以获取任务的执行结果

线程任务伪代码
@Component
@Data
/**
 * spring容器管理的对象默认是单例,需贴上@Scope("prototype")设置成多例
 * 将对象注入到ObjectFactory类中,通过getObject()方法获取
 *  @Autowired
 *   private ObjectFactory<ExportDataTask> exportDataTasks;
 */
@Scope("prototype")
public class ExportDataTask implements Callable<Boolean> {
    @Autowired
    private FileService fileService;
    @Override
    public Boolean call() throws Exception {
        try {
            //TODO:将状态改成进行中
            //导出数据到excel,并且将文件上传到七牛云,返回文件在七牛云的下载路径以及导出的记录数
            map = fileService.exportExcel(template.getSqlStr(), params, headers, containBean, recordId);
            if (map == null || Thread.currentThread().isInterrupted()) {
                return false;
            }
            //TODO:将文件在七牛云下载路径写入数据库,并将导出状态改成已完成
            return true;
        }catch (Exception e) {
            //TODO:把导出状态改成导出异常
            log.error(e.getMessage(),e);
            return false;
        }
    }

创建线程任务的方式有三种:①继承Thread类;②实现Runnable接口;③实现Callable接口
引用链接:美团技术团队->https://tech.meituan.com/archives

2.数据库连接池

Q1:数据库连接池的主要参数?
A1:
①driver-class-name:驱动类名
②maxLifetime:一个连接的生命时长(毫秒),超时而且没被使用则被释放
③maximumPoolSize:连接池中允许的最大连接数
④url:数据库url
⑤username:数据库用户名
⑥password:数据库密码

数据库连接池执行sql语句
        //从数据库连接池中获取连接对象
        Connection conn = dataSource.getConnection();
        //获取sql预编译语句,使用预编译语句可以防止sql注入
        stmt = conn.prepareStatement(sqlStr);
        //设置sql参数
        if (StringUtils.hasLength(params)) {
            String[] paramGroup = params.split(",");
            for (int i = 0; i < paramGroup.length; i++) {
                stmt.setObject(i + 1, paramGroup[i]);
            }
        }
        //执行sql语句
        rs = stmt.executeQuery();
3.使用poi将数据转换成工作簿
        //创建一个excel工作簿
        SXSSFWorkbook workbook = new SXSSFWorkbook();
        SXSSFSheet sheet = null;
        //设置每一行的数据
        while (rs.next()) {
            //一个表只能存1048575行,多了就换表
            if (count % 1048575 == 0){
                sheet = workbook.createSheet();
                SXSSFRow row0 = sheet.createRow(0);
                String[] titles = headers.split(",");
                //初始化行号
                line = 0;
                //设置表头
                for (int i = 0; i < titles.length; i++) {
                    SXSSFCell cell = row0.createCell(i);
                    cell.setCellValue(titles[i]);
                }
            }
            count++;
            line++;
            //新建一行
            SXSSFRow row = sheet.createRow(line);
            //设置每一项的数据
            for (int j = 0; j < beans.length; j++) {
                if (Thread.currentThread().isInterrupted()) {
                    return null;
                }
                SXSSFCell cell = row.createCell(j);
                cell.setCellValue(rs.getString(beans[j]));
            }
        }

workbook的类型有三种:
①HSSFWorkbook:针对EXCEL 2003版本,扩展名为.xls
②XSSFWorkbook:其对应的是EXCEL2007+ ,扩展名为.xlsx ,最多可以导出104万行,会出现OOM
③SXSSFWorkbook:可以其对应的是EXCEL2007+,根据行数将内存中的数据持久化写到文件中,避免OOM

4.七牛云

Q1:七牛云的主要参数?
A1:
①accessKey:-
②secretKey:-
③bucket:-
④domain:域名

       ByteArrayOutputStream bos = new ByteArrayOutputStream();
        //将数据写到数据流中,并存放在字节数组中
        workbook.write(bos);
        byte[] bytes = bos.toByteArray();
        rs.close();
        bos.close();
        Calendar calendar = Calendar.getInstance();
        int year = calendar.get(Calendar.YEAR);
        int month = calendar.get(Calendar.MONTH) + 1;
        //文件名
        String key = year + "/" + month + "/" +
                UUID.randomUUID().toString().replaceAll("-", "").substring(0, 6) + ".xlsx";
        //创建七牛云token
        String token = Auth.create(accessKey, secretKey).uploadToken(bucket);
        UploadManager uploadManager = new UploadManager(new Configuration(Region.region2()));
        //上传至七牛云
        Response response = uploadManager.put(bytes, key, token);
        //处理结果
        DefaultPutRet putRet = new Gson().fromJson(response.bodyString(), DefaultPutRet.class);
        //文件在七牛云上面的下载链接
        String qiniuUrl = domain + "/" + putRet.key;
5.任务中断

实现思路:任务提交给线程池执行后,返回Future对象,按照:导出记录id->future对象的格式存入ExpiringMap中,中断时使用future对象调用cancel(true)方法,并且捕捉future对象调用get()方法抛出的的CancellationException

private final static ExpiringMap<Integer, Future> threadMap = ExpiringMap.builder()
                                                                              .expiration(2, TimeUnit.HOURS)
                                                                              .expirationPolicy(ExpirationPolicy.CREATED)
                                                                              .build();
        future.cancel(true);
        try {
            future.get();
        } catch (CancellationException e) {
            exportRecordDAO.update(null,updateWrapper);
            log.info("记录id为{}的任务已经被取消",recordId);
6.定时器

作用:定时将因宕机或者长时间导出未响应的导出记录切换成异常状态
在启动类上贴上@EnableScheduling注解:

@SpringBootApplication
@EnableScheduling
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

在定时任务上贴上@Scheduled注解:

    @Scheduled(cron = " 0 0 0/2 * * ?")
    public void executeScheduledTask(){
        //TODO:将超过两个小时还处于未开始,进行中状态中的导出记录切换成导出异常
    }
7.系统限流

考虑到并发情况下导出大批量数据,会造成服务器CPU负载过高,所以针对单用户,同时导出的任务不能超多2个。实现方式有两种:①到数据库中统计当前用户导出记录处于进行中的数量;②将用户的导出记录处于进行中的数量存入redis中,从redis中获取数据作判断

实现功能

实测可以导出860万的数据,加上sql运行时间,总耗时2分钟

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容