前言
谈到以excel格式导出数据,很容易想到的实现思路就是:前端发送导出请求,后端使用org.apache.poi进行数据处理,然后使用reponse把流文件响应给前端,供客户端下载。但是这种实现方式会带来以下问题:
1.数据量比较大的情况下,会导致OOM
2.数据庞大,后端响应前端超时
3.采用同步方式导出数据,响应时间长,用户体验感差
基于此,本文提供一种异步导出数据的方案,实现百万级别数据的导出。
程序流程图
技术采用
Spring Boot、线程池、Hikari连接池数据库连接池、七牛云、MyBatis-Plus、定时器Scheduled、POI,MySQL
技术细节
1.线程池
Q1:采用线程池有什么好处?
A1:
①降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗;
②提高响应速度:任务到达时,无需等待线程创建即可立即执行;
③提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
Q2:ThreadPoolExecutor参数含义?
A2:
①corePoolSize:核心线程数
* 核心线程会一直存活,即使没有任务需要执行
* 当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理
* 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
②queueCapacity:任务队列容量(阻塞队列)
* 当核心线程数达到最大时,新任务会放在队列中排队等待执行
③maxPoolSize:最大线程数
* 当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
* 当线程数=maxPoolSize,且任务队列已满时,线程池采取拒绝策略
④keepAliveTime:线程空闲时间
* 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
* 如果allowCoreThreadTimeout=true,则会直到线程数量=0
⑤allowCoreThreadTimeout:允许核心线程超时
⑥rejectedExecutionHandler:任务拒绝处理器
* 两种情况会拒绝处理任务:
- 当线程数已经达到maxPoolSize,切队列已满,会拒绝新任务
- 当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务
* 线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置默认是AbortPolicy,会抛出异常
* ThreadPoolExecutor类有几个内部实现类来处理这类情况:
- AbortPolicy 丢弃任务,抛运行时异常
- CallerRunsPolicy 交由提交线程任务的线程执行,会影响程序的整体性能
- DiscardPolicy 不处理新任务,直接丢弃
- DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务
* 实现RejectedExecutionHandler接口,可自定义处理器
Q3:线程池任务调度流程?
A3:
线程池的配置:
@Configuration
public class TaskExecutorConfig {
/**
* 文件导出线程池
*/
@Bean
public ThreadPoolTaskExecutor fileExportExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);
executor.setMaxPoolSize(6);
executor.setQueueCapacity(2);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("fileExportExecutor-");
// 设置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(3);
return executor;
}
}
任务执行
//将导出任务交给线程池执行
Future<Boolean> future = fileExportExecutor.submit(exportDataTask);
执行线程任务的方式有两种:
①excute(),用于提交不需要返回值的任务,无法判断是否任务已经执行完成
②submit(),用于提交需要返回值的任务,会返回一个Future对象,通过调用Future类中的get()可以获取任务的执行结果
线程任务伪代码
@Component
@Data
/**
* spring容器管理的对象默认是单例,需贴上@Scope("prototype")设置成多例
* 将对象注入到ObjectFactory类中,通过getObject()方法获取
* @Autowired
* private ObjectFactory<ExportDataTask> exportDataTasks;
*/
@Scope("prototype")
public class ExportDataTask implements Callable<Boolean> {
@Autowired
private FileService fileService;
@Override
public Boolean call() throws Exception {
try {
//TODO:将状态改成进行中
//导出数据到excel,并且将文件上传到七牛云,返回文件在七牛云的下载路径以及导出的记录数
map = fileService.exportExcel(template.getSqlStr(), params, headers, containBean, recordId);
if (map == null || Thread.currentThread().isInterrupted()) {
return false;
}
//TODO:将文件在七牛云下载路径写入数据库,并将导出状态改成已完成
return true;
}catch (Exception e) {
//TODO:把导出状态改成导出异常
log.error(e.getMessage(),e);
return false;
}
}
创建线程任务的方式有三种:①继承Thread类;②实现Runnable接口;③实现Callable接口
引用链接:美团技术团队->https://tech.meituan.com/archives
2.数据库连接池
Q1:数据库连接池的主要参数?
A1:
①driver-class-name:驱动类名
②maxLifetime:一个连接的生命时长(毫秒),超时而且没被使用则被释放
③maximumPoolSize:连接池中允许的最大连接数
④url:数据库url
⑤username:数据库用户名
⑥password:数据库密码
数据库连接池执行sql语句
//从数据库连接池中获取连接对象
Connection conn = dataSource.getConnection();
//获取sql预编译语句,使用预编译语句可以防止sql注入
stmt = conn.prepareStatement(sqlStr);
//设置sql参数
if (StringUtils.hasLength(params)) {
String[] paramGroup = params.split(",");
for (int i = 0; i < paramGroup.length; i++) {
stmt.setObject(i + 1, paramGroup[i]);
}
}
//执行sql语句
rs = stmt.executeQuery();
3.使用poi将数据转换成工作簿
//创建一个excel工作簿
SXSSFWorkbook workbook = new SXSSFWorkbook();
SXSSFSheet sheet = null;
//设置每一行的数据
while (rs.next()) {
//一个表只能存1048575行,多了就换表
if (count % 1048575 == 0){
sheet = workbook.createSheet();
SXSSFRow row0 = sheet.createRow(0);
String[] titles = headers.split(",");
//初始化行号
line = 0;
//设置表头
for (int i = 0; i < titles.length; i++) {
SXSSFCell cell = row0.createCell(i);
cell.setCellValue(titles[i]);
}
}
count++;
line++;
//新建一行
SXSSFRow row = sheet.createRow(line);
//设置每一项的数据
for (int j = 0; j < beans.length; j++) {
if (Thread.currentThread().isInterrupted()) {
return null;
}
SXSSFCell cell = row.createCell(j);
cell.setCellValue(rs.getString(beans[j]));
}
}
workbook的类型有三种:
①HSSFWorkbook:针对EXCEL 2003版本,扩展名为.xls
②XSSFWorkbook:其对应的是EXCEL2007+ ,扩展名为.xlsx ,最多可以导出104万行,会出现OOM
③SXSSFWorkbook:可以其对应的是EXCEL2007+,根据行数将内存中的数据持久化写到文件中,避免OOM
4.七牛云
Q1:七牛云的主要参数?
A1:
①accessKey:-
②secretKey:-
③bucket:-
④domain:域名
ByteArrayOutputStream bos = new ByteArrayOutputStream();
//将数据写到数据流中,并存放在字节数组中
workbook.write(bos);
byte[] bytes = bos.toByteArray();
rs.close();
bos.close();
Calendar calendar = Calendar.getInstance();
int year = calendar.get(Calendar.YEAR);
int month = calendar.get(Calendar.MONTH) + 1;
//文件名
String key = year + "/" + month + "/" +
UUID.randomUUID().toString().replaceAll("-", "").substring(0, 6) + ".xlsx";
//创建七牛云token
String token = Auth.create(accessKey, secretKey).uploadToken(bucket);
UploadManager uploadManager = new UploadManager(new Configuration(Region.region2()));
//上传至七牛云
Response response = uploadManager.put(bytes, key, token);
//处理结果
DefaultPutRet putRet = new Gson().fromJson(response.bodyString(), DefaultPutRet.class);
//文件在七牛云上面的下载链接
String qiniuUrl = domain + "/" + putRet.key;
5.任务中断
实现思路:任务提交给线程池执行后,返回Future对象,按照:导出记录id->future对象的格式存入ExpiringMap中,中断时使用future对象调用cancel(true)方法,并且捕捉future对象调用get()方法抛出的的CancellationException
private final static ExpiringMap<Integer, Future> threadMap = ExpiringMap.builder()
.expiration(2, TimeUnit.HOURS)
.expirationPolicy(ExpirationPolicy.CREATED)
.build();
future.cancel(true);
try {
future.get();
} catch (CancellationException e) {
exportRecordDAO.update(null,updateWrapper);
log.info("记录id为{}的任务已经被取消",recordId);
6.定时器
作用:定时将因宕机或者长时间导出未响应的导出记录切换成异常状态
在启动类上贴上@EnableScheduling注解:
@SpringBootApplication
@EnableScheduling
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
在定时任务上贴上@Scheduled注解:
@Scheduled(cron = " 0 0 0/2 * * ?")
public void executeScheduledTask(){
//TODO:将超过两个小时还处于未开始,进行中状态中的导出记录切换成导出异常
}
7.系统限流
考虑到并发情况下导出大批量数据,会造成服务器CPU负载过高,所以针对单用户,同时导出的任务不能超多2个。实现方式有两种:①到数据库中统计当前用户导出记录处于进行中的数量;②将用户的导出记录处于进行中的数量存入redis中,从redis中获取数据作判断
实现功能
实测可以导出860万的数据,加上sql运行时间,总耗时2分钟