赞
踩
需要使用多线程,批量生成账号,并插入数据库
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Service;
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.List;
@Service
public class AccountService extends ServiceImpl<AccountMapper, Account> {
// 线程个数
private static final int THREAD_COUNT = 10;
// 账号总数
private static final int ACCOUNT_TOTAL_NUMBER = 100000;
// 每批次插入数量
private static final int BATCH_INSERT_SIZE = 1000;
/**
* 使用多线程生成账号并批量插入数据库
*/
public void generateAndInsertAccounts() throws ExecutionException, InterruptedException {
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
THREAD_COUNT, THREAD_COUNT, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
List<Future<List<Account>>> futures = new ArrayList<>();
// 每个线程需要生成的账号数量
int accountsPerThread = ACCOUNT_TOTAL_NUMBER / THREAD_COUNT;
for (int i = 0; i < THREAD_COUNT; i++) {
// 计算每个线程应处理的账号序号范围
final int start = i * accountsPerThread;
Future<List<Account>> future = executor.submit(() -> {
List<Account> accounts = new ArrayList<>();
for (int j = start; j < start + accountsPerThread; j++) {
// 生成账号
accounts.add(new Account(null, generateAccount(j)));
// 达到批次插入数量则插入数据库
if (accounts.size() == BATCH_INSERT_SIZE) {
this.saveBatch(new ArrayList<>(accounts));
accounts.clear();
}
}
// 将剩余的账号插入数据库
if (!accounts.isEmpty()) {
this.saveBatch(accounts);
}
return accounts;
});
futures.add(future);
}
// 确保所有任务都已完成
for (Future<List<Account>> future : futures) {
future.get();
}
// 关闭线程池
executor.shutdown();
}
/**
* 生成账号的方法
* 账号由前缀8000,7位序号(不足7位前面补0),后缀x组成
* @param seqNumber 序号
* @return 生成的账号
*/
private String generateAccount(int seqNumber) {
return "8000" + String.format("%07d", seqNumber) + "x";
}
}
// 在最后一个线程中需要生成的额外的账号数量
int extraAccounts = ACCOUNT_TOTAL_NUMBER % THREAD_COUNT;
for (int i = 0; i < THREAD_COUNT; i++) {
// 计算每个线程应处理的账号序号范围
final int start = i * accountsPerThread;
final int end = (i == THREAD_COUNT - 1) ? start + accountsPerThread + extraAccounts : start + accountsPerThread;
Future<List<Account>> future = executor.submit(() -> {
logger.info("Thread " + Thread.currentThread().getName() + " started.");
List<Account> accounts = new ArrayList<>();
for (int j = start; j < end; j++) {
// 生成账号
accounts.add(new Account(null, generateAccount(j)));
// 达到批次插入数量则插入数据库
if (accounts.size() == BATCH_INSERT_SIZE) {
this.saveBatch(new ArrayList<>(accounts));
accounts.clear();
}
}
// 将剩余的账号插入数据库
if (!accounts.isEmpty()) {
this.saveBatch(accounts);
}
logger.info("Thread " + Thread.currentThread().getName() + " finished.");
return accounts;
});
futures.add(future);
}
发现是线程池连接数不够。
重新配置一下yml
hikari:
maximum-pool-size: 40 # 最大连接池大小
connection-timeout: 30000 # 连接超时时间(毫秒)
idle-timeout: 600000 # 连接空闲超时时间(毫秒)
validation-timeout: 5000 # 连接校验超时时间(毫秒)
max-lifetime: 1800000 # 连接最大存活时间(毫秒)
// 创建线程池
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(THREAD_COUNT);
List<Future<List<Account>>> futures = new ArrayList<>();
try {
// 计算每个线程需要生成的账号数量
int accountsPerThread = ACCOUNT_TOTAL_NUMBER / THREAD_COUNT;
// 在最后一个线程中需要生成的额外的账号数量
int extraAccounts = ACCOUNT_TOTAL_NUMBER % THREAD_COUNT;
for (int i = 0; i < THREAD_COUNT; i++) {
// 计算每个线程应处理的账号序号范围
final int start = i * accountsPerThread;
final int end = (i == THREAD_COUNT - 1) ? start + accountsPerThread + extraAccounts : start + accountsPerThread;
Future<List<Account>> future = executor.submit(() -> {
logger.info("Thread " + Thread.currentThread().getName() + " started.");
List<Account> accounts = new ArrayList<>();
for (int j = start; j < end; j++) {
// 生成账号
accounts.add(new Account(null, generateAccount(j)));
// 达到批次插入数量则插入数据库
if (accounts.size() == BATCH_INSERT_SIZE) {
this.saveBatch(new ArrayList<>(accounts));
accounts.clear();
}
}
// 将剩余的账号插入数据库
if (!accounts.isEmpty()) {
this.saveBatch(accounts);
}
logger.info("Thread " + Thread.currentThread().getName() + " finished.");
return accounts;
});
futures.add(future);
}
// 等待所有任务执行完毕
for (Future<List<Account>> future : futures) {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
logger.error("Error occurred while executing task.", e);
}
}
} finally {
// 关闭线程池
if (executor != null) {
executor.shutdown();
logger.info("Thread pool shut down.");
}
}
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。