概述
技术摘要:springboot线程池、mybatis
〇、场景
目标:短时间内实现百万级数据落库
txt文件格式,每行为一条数据,共200w条,如下:
|identity|name|age|address|desc|
|123123|张三|20|xxxxxxxxxxx|xxxxxx|
|321321|李四|22|xxxxxxxx||
|112233|王五|21|xxxxxxxxxxxx|xxxxxxxx|
......
一、流程
1. 读取文件
目标:将txt文件读取到系统中
通过IO流的方式读取文件,设置路径和编码格式
数据列数(column):字段数,分隔符号(delimiter),字符截取的时候会使用
//数据列数
int column = 5;
//分割符号
String delimiter = "|";
//文件绝对路径
String path = "C:\Users\Admin\Desktop\test.txt";
//编码格式
String encoding = "UTF-8";
//IO流读取文件
File file = new File(path);
InputStreamReader in = new InputStreamReader(new FileInputStream(file),encoding);
BufferedReader read = new BufferedReader(in);
2. 字符截取将数据存入集合
目标:将数据流存储到到实体列表中
每次while循环,截取文本中一行数据“|identity|name|age|address|desc|”
①得到分隔符(delimiter)的索引数组:将单条数据分割成单个字符存到数组(charArr),遍历charArr将delimiter的索引存到数组(indexArr)
②根据indexArr就可以截取分隔符之间的数据,得到数据数组(strArr)
③遍历strArr将数据存在实体(person)中,然后再add到集合(personList)中
注:为什么不直接用lineTxt.split("|"),因为数据存在空值“||”分隔符间没有数据会导致不正确的截取
String lineTxt = null;
List<Person> personList = new ArrayList<>();
while((lineTxt = read.readLine()) != null){
//分割为字符数组
String[] charArr = lineTxt.split("");
//存储“|”在数组中的的index
int[] indexArr = new int[column+1];
int j = 0;
for (int i = 0; i < charArr.length; i++) {
if(delimiter.equals(charArr[i])){
indexArr[j] = i;
j++;
}
}
//存储“|”与“|”之间的数据,如果没有则为null
String[] strArr = new String[column];
int k = 0;
for (int i = 0; i < indexArr.length; i++) {
if(i < indexArr.length-1){
//截取两个相邻index之间的字符并去掉两端的空格,如果为”||“
strArr[k] = lineTxt.substring(indexArr[i]+1,indexArr[i+1]).trim();
k++;
}
}
//存入实体类
Person person = new Person();
for (int i = 0; i < strArr.length; i++) {
person.setId(UUID.randomUUID().toString());
person.setIdentity(strArr[0]);
person.setName(strArr[1]);
person.setAge(strArr[2]);
person.setAddress(strArr[3]);
person.setDesc(strArr[4]);
}
//添加到集合
personList.add(person);
}
3. 数组截取并落库
目标:实现分批落库
得到长度为200w的集合后,不能直接批量插入,会导致IO异常,单次批量插入上限2100条左右;需要对数组逐步截取,这里选择的是1000条为一批,每1000条数据落库一次
//单次分批落库条数
int subList = 1000;
//计算执行次数
int count = personList.size() % subList > 0 ? (personList.size()/subList)+1 : personList.size()/subList;
//临时集合
List<Person> tempList;
for (int i = 0; i < count; i++) {
//截取集合
tempList = personList.stream().skip(subList*i).limit(subList).collect(Collectors.toList());
//数据落库
personDao.insertBatch(personList);
}
mybatis中,使用批量插入的方式落库,可以提高插入的效率
<insert id="insertBatch" parameterType="com.example.pojo.Person">
insert into person(id,identity,name,age,address,desc) values
<foreach collection="personList" item="item" index="index" separator="," >
(#{item.id,jdbcType=VARCHAR},#{item.identity,jdbcType=VARCHAR},#{item.name,jdbcType=VARCHAR},#{item.age,jdbcType=INTEGER},#{item.address,jdbcType=VARCHAR},#{item.desc,jdbcType=VARCHAR})
</foreach>
</insert>
4.多线程方式落库
目标:实现异步落库
使用springboot线程池,多线程可以提高落库效率,for循环走的很快,需要使用CountDownLatch中await方法,等待全部线程执行完毕再结束
//单次分批落库条数
int subList = 1000;
//计算执行次数
int count = personList.size() % subList > 0 ? (personList.size()/subList)+1 : personList.size()/subList;
//临时集合
List<Person> tempList;
CountDownLatch countDownLatch = new CountDownLatch(personList.size()/subList);
for (int i = 0; i < count; i++) {
//截取集合
tempList = personList.stream().skip(subList*i).limit(subList).collect(Collectors.toList());
//多线程执行落库方法
asyncService.executeAsync(tempList, countDownLatch);
}
//阻塞线程,等待全部线程执行完毕
countDownLatch.await();
springboot线程池代码
@Service
@EnableAsync
public class AsyncService {
@Autowired
private PersonDao personDao;
@Async("taskExecutor")
public void executeAsync(List<Person> personList, CountDownLatch countDownLatch) {
try{
personDao.insertBatch(personList);
} finally {
countDownLatch.countDown();
}
}
}
@Configuration
@EnableAsync
public class ExecutorConfig {
ThreadPoolProperties properties = new ThreadPoolProperties();
@Bean(name = "taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(properties.getCorePoolSize());
executor.setMaxPoolSize(properties.getMaxPoolSize());
executor.setQueueCapacity(properties.getQueueCapacity());
executor.setThreadNamePrefix(properties.getThreadNamePrefix());
executor.setKeepAliveSeconds(properties.getKeepAliveTime());
executor.setWaitForTasksToCompleteOnShutdown(properties.isWaitForTasksToCompleteOnShutdown());
executor.setAwaitTerminationSeconds(properties.getAwaitTerminationSeconds());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
@Data
class ThreadPoolProperties {
private int corePoolSize = 30;
private int maxPoolSize = 100;
private int keepAliveTime;
private int queueCapacity = 1000;
private String threadNamePrefix = "test";
private boolean allowCoreThreadTimeout = false;
private boolean waitForTasksToCompleteOnShutdown = false;
private int awaitTerminationSeconds;
}
}
二、总结
通过打时间戳记录时间,读取文件到存储实体集合用时10秒,时间主要花费在落库;单线程总用时18分钟左右,多线程4分钟;还可以将表的所有字段取消主键约束和非空约束提高落库效率,最终优化后总用时仅32秒!
原创不易,转载请注明来源
最后
以上就是开心发带为你收集整理的SpringBoot线程池实现200w数据快速落库二、总结的全部内容,希望文章能够帮你解决SpringBoot线程池实现200w数据快速落库二、总结所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复