探索将20w数据插入1000w+的表中
问题:有一个1000w+数据的出库单表,这个表有两个字段,分别是主键id和订单号,现在在Java的list集合中有20w条待入库的数据,这些数据的订单号可能会跟数据库中的数据有重复。现在需要以最快的速度将这20w数据插入数据库中。
1.准备:用Docker启动一个mysql容器,cpu限制为1,内存限制为1024MB。
version: '3'
services:
db:
image: mysql:8.0
restart: always
environment:
MYSQL_ROOT_PASSWORD: ${MYSQL_ROOT_PASSWORD}
MYSQL_COLLATION_SERVER: utf8mb4_unicode_ci
MYSQL_CHARACTER_SET_SERVER: utf8mb4
ports:
- "3306:3306"
volumes:
- db_data:/var/lib/mysql
deploy:
resources:
limits:
cpus: '1' # 限制 CPU 使用为 1 个核心
memory: 1024M # 限制内存使用为 1024MB
volumes:
db_data:
2.建库建表,建一个出库单表用来演示
要求订单号不能重复,所以这里对order_no字段建了唯一索引
CREATE TABLE `stock_out_order` (
`id` bigint NOT NULL COMMENT 'id',
`order_no` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '订单编号',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_order_no` (`order_no`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
3.伪代码演示
基于20w数据,我们不会一次性插入,也不会一条条插入,我这里是准备以1000条为一组做批量插入,同时为了提高性能,使用了并发线程同时插入。
mybatis的批量是代码循环插入,要自己写一个
<insert id="saveBatch2" parameterType="java.util.List">
insert into stock_out_order (id,order_no) values
<foreach collection="list" item="item" separator=",">
(#{item.id},#{item.orderNo})
</foreach>
</insert>
为了方便验证,这里使用伪代码生成订单号(一般订单号都是由字母+数字组成,形如Y000000000000001,应该没有人用汉字吧)。
// 每批保存的数量
int batchSize = 1000;
// 总订单数量
int totalOrders = 20 * 10000;
// 线程池大小
int threadPoolSize = 10;
ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
CountDownLatch latch = new CountDownLatch(totalOrders / batchSize + (totalOrders % batchSize == 0 ? 0 : 1));
// 批量生成不重复的订单
for (int i = 0; i < totalOrders; i += batchSize) {
int start = i;
int end = Math.min(start + batchSize, totalOrders);
executorService.submit(() -> {
List<StockOutOrder> orders = new ArrayList<>(batchSize);
for (int j = start; j < end; j++) {
int numericOrderNo;
String orderNo = convertIntegerToOrderNo(numericOrderNo);
StockOutOrder stockOutOrder = new StockOutOrder();
stockOutOrder.setOrderNo(orderNo);
orders.add(stockOutOrder);
}
super.baseMapper.saveBatch2(orders);
// 增加已存在的订单数量
lock.lock();
latch.countDown();
});
}
try {
latch.await(); // 等待所有任务完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
executorService.shutdown();
}
数据库的数据我们暂时使用多线程拉出来,为了减少存储空间,我这里使用的bitmap,
// 订单转成数字,回头用bitmap存储,这里只是模拟,并不代表真实订单,真实订单可能ABCD多种类型
private int convertOrderNoToInteger(String orderNo) {
char letterPart = orderNo.charAt(0);
int numericPart = Integer.parseInt(orderNo.substring(1));
return (letterPart - 'A') * 1000000 + numericPart;
}
private String convertIntegerToOrderNo(int numericOrderNo) {
char letterPart = (char) ('A' + (numericOrderNo / 1000000));
int numericPart = numericOrderNo % 1000000;
return letterPart + String.format("%06d", numericPart);
}
数据库的这里先不做并发处理,一批1w条,for循环查试试,下面我们来看一下数据
数据到160w条我就没往下测了,很明显,随着数据库数据量的增大,查询时间控制不住了
于是我就对数据库的查询做了一下并发处理,并发数是10(约cpu核心数2N+1),再来看一下数据
从上面数据可以看出来,做并发处理还是很明显,但是架不住数据量大呀,每次写入都要去全量获取订单确实太耗费时间了。
如果是经常要将20w左右的数据存储到数据库,那么是否考虑使用缓存将订单数据存储到内存里面,这样就不用每次从数据库去拉取数据了,这里暂时不引入中间件,服务器重启数据会清空,demo~
最终伪代码如下:
@Slf4j
@Service
public class StockOutOrderServiceImpl extends ServiceImpl<StockOutOrderMapper, StockOutOrder>
implements StockOutOrderService {
private final AtomicInteger orderNoCounter = new AtomicInteger(0);
private final Lock lock = new ReentrantLock();
private BitSet existingOrderNos;
private int cachedTotalExistingOrders = -1;
private void initializeCache() {
int totalExistingOrders = (int) count();
cachedTotalExistingOrders = totalExistingOrders;
existingOrderNos = new BitSet(totalExistingOrders);
// 并发查询已有的 orderNo
int queryBatchSize = 10000;
int numBatches = (totalExistingOrders + queryBatchSize - 1) / queryBatchSize;
CountDownLatch queryLatch = new CountDownLatch(numBatches);
for (int i = 0; i < totalExistingOrders; i += queryBatchSize) {
int offset = i;
int limit = Math.min(queryBatchSize, totalExistingOrders - offset);
Executors.newSingleThreadExecutor().submit(() -> {
List<String> batchOrderNos = super.baseMapper.selectOrderNosByRange(offset, limit);
for (String orderNo : batchOrderNos) {
int numericOrderNo = convertOrderNoToInteger(orderNo);
lock.lock();
try {
existingOrderNos.set(numericOrderNo);
} finally {
lock.unlock();
}
}
queryLatch.countDown();
});
}
try {
queryLatch.await(); // 等待所有查询任务完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void saveBatch2() {
long startTime = System.currentTimeMillis();
// 每批保存的数量
int batchSize = 1000;
// 总订单数量
int totalOrders = 20 * 10000;
// 线程池大小
int threadPoolSize = 10;
ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
CountDownLatch latch = new CountDownLatch(totalOrders / batchSize + (totalOrders % batchSize == 0 ? 0 : 1));
// 检查缓存是否有效
int currentTotalExistingOrders = (int) count();
if (currentTotalExistingOrders != cachedTotalExistingOrders) {
// 缓存失效,重新初始化缓存
initializeCache();
}
log.debug("查询到已有的订单数量:{}", currentTotalExistingOrders);
long time2 = System.currentTimeMillis();
log.debug("查询已有订单耗时:{}ms", (time2 - startTime));
for (int i = 0; i < totalOrders; i += batchSize) {
int start = i;
int end = Math.min(start + batchSize, totalOrders);
executorService.submit(() -> {
List<StockOutOrder> orders = new ArrayList<>(batchSize);
for (int j = start; j < end; j++) {
int numericOrderNo;
do {
numericOrderNo = orderNoCounter.getAndIncrement();
} while (existingOrderNos.get(numericOrderNo));
existingOrderNos.set(numericOrderNo);
String orderNo = convertIntegerToOrderNo(numericOrderNo);
StockOutOrder stockOutOrder = new StockOutOrder();
stockOutOrder.setOrderNo(orderNo);
orders.add(stockOutOrder);
}
super.baseMapper.saveBatch2(orders);
// 增加已存在的订单数量
lock.lock();
try {
cachedTotalExistingOrders += orders.size();
} finally {
lock.unlock();
}
latch.countDown();
});
}
try {
latch.await(); // 等待所有任务完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
executorService.shutdown();
long time3 = System.currentTimeMillis();
log.debug("保存订单耗时:{}ms", (time3 - time2));
}
}
private int convertOrderNoToInteger(String orderNo) {
char letterPart = orderNo.charAt(0);
int numericPart = Integer.parseInt(orderNo.substring(1));
return (letterPart - 'A') * 1000000 + numericPart;
}
private String convertIntegerToOrderNo(int numericOrderNo) {
char letterPart = (char) ('A' + (numericOrderNo / 1000000));
int numericPart = numericOrderNo % 1000000;
return letterPart + String.format("%06d", numericPart);
}
}
再来看一下数据
从上面的数据可以看出,使用本地缓存策略以减少数据库访问还是比较有效的,能够大幅度减少订单查询耗时
最后:
实验数据仅作参考,实际操作中还有其他细节需要考虑,比如失败回滚策略,缓存数据的一致性等。
使用BitSet作为存储方式也许是很不错的选择,理论上来说1亿订单占用约 12 MB
为什么批量插入要选择1000条呢?----1000条量级在错误重试时成本可控
# 典型性能趋势示例
批量数 | 耗时(s)
100 | 1.2
1000 | 0.8 # 最佳点
5000 | 1.5
10000 | 3.2