赞
踩
在 Redis 中,Pipeline(管道)是一种客户端与服务器间通信的优化机制,旨在减少网络往返时间和提高命令执行效率。以下是 Redis Pipeline 的具体定义和特点:
1.批量发送与接收:
2.异步执行
3.命令隔离
4.使用场景
5.注意事项
总结来说,Redis Pipeline 是一种客户端与服务器间高效通信的技术,通过批量发送和接收命令,减少网络往返次数,提高命令执行效率,尤其适用于大量命令操作的场景。在使用时需注意命令打包大小的控制以及错误处理。
Redis Pipeline允许一次性发送多个命令到Redis服务器,而无需等待每个命令的响应,显著减少了网络往返时间和潜在的延迟。在Spring Boot应用中,可以使用RedisTemplate的executePipelined()方法实现:
@Autowired
private StringRedisTemplate redisTemplate
public void batchInsertUsersWithPipeline(List<User> users, String keyPrefix, long ttlSeconds) {
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
for (User user : users) {
String key = generateKey(keyPrefix, user.getId());
String value = objectMapper.writeValueAsString(user);
connection.setEx(key.getBytes(), (int) ttlSeconds, value.getBytes());
}
return null;
});
}
分批处理
尽管Pipeline提高了效率,但对于千万级数据,一次性发送所有命令可能导致内存溢出或网络阻塞。因此,建议将数据分批处理,每批包含适量的记录(如1000条),逐批发送至Redis:
public void insertUsersInBatches(List<User> users, String keyPrefix, long ttlSeconds, int batchSize) {
int start = 0;
while (start < users.size()) {
int end = Math.min(start + batchSize, users.size());
List<User> batch = users.subList(start, end);
batchInsertUsersWithPipeline(batch, keyPrefix, ttlSeconds);
start = end;
}
}
batchInsertUsersWithPipeline方法利用Redis Pipeline机制发送批量命令,可以在一定程度上提高插入操作的并发性,减少网络往返时间和整体耗时。然而,Pipeline本身并不能严格保证所有命令同时成功或失败,其主要特性如下:
1.原子性:
2.响应顺序
3.故障处理
综上所述,batchInsertUsersWithPipeline方法不能严格保证所有命令同时成功或失败。在实际使用中,如果需要确保一批数据要么全部成功插入,要么全部失败回滚,可以采取以下策略:
事务( MULTI/EXEC/DISCARD ):
Lua脚本:
batchInsertUsersWithPipeline方法中的connection中各个方法的区别是什么?
1.connection.setEx(key.getBytes(), (int) ttlSeconds, value.getBytes());
这一行调用了RedisConnection的setEx方法,用于设置一个带有过期时间(Time To Live,TTL)的键值对。参数说明如下:
- key.getBytes(): 将给定的键(字符串)转换为字节数组,这是Redis底层通信协议所要求的格式。
- (int) ttlSeconds: 将过期时间(以秒为单位)转换为整数类型,表示键值对在指定秒数后自动过期并被删除。
- value.getBytes(): 同样将给定的值(用户对象序列化后的JSON字符串)转换为字节数组
setEx方法确保在设置键值对的同时为其设定一个过期时间。如果键已经存在,该方法会更新键的值和过期时间。这个操作在Pipeline模式下是原子的,即在同一时刻只有一个setEx命令被执行。
2.connection.multi(); 和 connection.exec();
这两个方法涉及Redis的事务(Transaction)功能。在Pipeline模式下,由于我们希望保持较高的性能,一般不会使用这两个方法。但如果确实需要保证一批命令的原子性,可以使用如下方式:
- connection.multi(): 开启一个事务块,后续的所有命令都会被放入这个事务中,直到调用exec方法。在Pipeline模式下,调用multi方法可能会破坏原有的性能优化效果。
- connection.exec(): 提交并执行事务中的所有命令。如果事务中有任何一个命令执行失败,其他命令也会被取消执行,整个事务被视为失败。
在您的batchInsertUsersWithPipeline方法中并没有使用multi和exec,因为Pipeline已经提供了高效的批量执行机制,而且这里的目的是提高插入性能,而不是实现严格的事务行为。
综上所述,batchInsertUsersWithPipeline方法中直接使用了setEx方法,利用Pipeline来高效地批量插入带有过期时间的键值对。如果需要实现更严格的事务控制,应考虑使用Redis的事务(MULTI/EXEC)或Lua脚本,但这通常会牺牲一定的性能,并且与Pipeline机制不完全兼容。在实际应用场景中,应根据业务需求权衡选择合适的操作方式。
3.connection.set()和connection.setNx有什么区别
connection.set() 和 connection.setNx() 都是Redis的键值对设置方法,它们的主要区别在于是否存在条件以及对已有键的处理方式:
1.connection.set(key, value)
这是最基础的设置键值对的方法,无论键是否存在,都会直接覆盖(或创建)对应的键值对。参数说明如下:
行为特点:
2.connection.setNx(key, value)
这是带有条件的设置键值对方法,仅当键不存在时才会设置键值对。参数与set()相同:
行为特点
总结来说,connection.set()无条件地设置或更新键值对,而connection.setNx()则是在键不存在时才设置键值对,如果键已存在,则不会执行任何操作。前者适用于常规的键值更新或插入,后者常用于实现锁机制、唯一性检查等场景,确保某个键的值只在首次设置时有效。在您的batchInsertUsersWithPipeline方法中,由于目标是批量插入新数据,所以使用了setEx方法(带有过期时间的set),确保每个用户数据作为一个新的键值对被添加到Redis中。如果您需要在插入前检查键的唯一性,可以考虑使用setNx方法。不过,对于批量插入场景,通常假设数据是新的且键不存在,因此直接使用setEx更为常见。
public void batchDeleteKeysWithPipeline(List<String> keys) {
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
for (String key : keys) {
connection.del(key.getBytes());
}
return null;
});
}
1.批量删除策略
2.并行处理
3.Redis 客户端优化:
4.监控与故障恢复:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;
public class RedisDataDeleter {
private static final int SCAN_BATCH_SIZE = 1000; // 可根据实际情况调整
private static final String MATCH_PATTERN = "*"; // 匹配所有键
public void deleteAllKeys(Jedis jedis) {
ScanParams scanParams = new ScanParams().count(SCAN_BATCH_SIZE).match(MATCH_PATTERN);
String cursor = "0";
while (true) {
ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
cursor = scanResult.getCursor();
List<String> keysToDelete = scanResult.getResult();
if (!keysToDelete.isEmpty()) {
// 使用 Pipeline 批量删除键
Pipeline pipeline = jedis.pipelined();
for (String key : keysToDelete) {
pipeline.del(key);
}
pipeline.sync(); // 执行批量命令
}
if ("0".equals(cursor)) {
break; // 扫描完成
}
}
}
}
注意
如果条件允许,建议升级到 Redis 6.x 版本,并启用 activedefrag 配置项,有助于在删除大量数据后及时进行碎片整理,保持 Redis 内存的高效利用。同时,监控 Redis 的内存使用情况和碎片率,必要时手动触发 BGREWRITEAOF 或 BGSAVE 操作。
maven
<dependencies>
<!-- ... 其他依赖 ... -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.7.0</version> <!-- 根据实际版本号调整 -->
</dependency>
</dependencies>
jedis连接池配置
spring.redis.host=192.168.1.100
spring.redis.port=6379
spring.redis.password=mysecretpassword # 如果有密码,请填写
# Jedis 连接池配置
spring.redis.jedis.pool.max-active=10
spring.redis.jedis.pool.max-idle=6
spring.redis.jedis.pool.min-idle=2
spring.redis.jedis.pool.max-wait=2000ms
jedisConfig
@Configuration
public class JedisConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private int port;
@Value("${spring.redis.password}")
private String password;
@Bean
public JedisPool jedisPool() {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(Integer.parseInt(env.getProperty("spring.redis.jedis.pool.max-active")));
poolConfig.setMaxIdle(Integer.parseInt(env.getProperty("spring.redis.jedis.pool.max-idle")));
poolConfig.setMinIdle(Integer.parseInt(env.getProperty("spring.redis.jedis.pool.min-idle")));
poolConfig.setMaxWaitMillis(Long.parseLong(env.getProperty("spring.redis.jedis.pool.max-wait")));
return new JedisPool(poolConfig, host, port, Protocol.DEFAULT_TIMEOUT, password);
}
}
实现 Redis 数据删除服务
@Service
public class RedisDataDeleterService {
@Autowired
private JedisPool jedisPool;
public void deleteAllKeys() {
try (Jedis jedis = jedisPool.getResource()) {
ScanParams scanParams = new ScanParams().match("*").count(1000);
String cursor = "0";
while (true) {
ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
cursor = scanResult.getCursor();
List<String> keysToDelete = scanResult.getResult();
if (!keysToDelete.isEmpty()) {
Pipeline pipeline = jedis.pipelined();
for (String key : keysToDelete) {
pipeline.del(key);
}
pipeline.sync();
}
if ("0".equals(cursor)) {
break;
}
}
}
}
}
调用删除服务
@RestController
@RequestMapping("/redis")
public class RedisController {
@Autowired
private RedisDataDeleterService redisDataDeleterService;
@GetMapping("/delete-all-keys")
public ResponseEntity<?> deleteAllKeys() {
redisDataDeleterService.deleteAllKeys();
return ResponseEntity.ok().build();
}
}
maven
<dependencies>
<!-- ... 其他依赖 ... -->
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.2.¼</version> <!-- 根据实际版本号调整 -->
</dependency>
</dependencies>
配置Lettuce
Spring Boot 自动配置会为 Lettuce 提供连接池支持。在 application.properties 或 application.yml 中配置 Redis 连接信息:
spring.redis.host=192.168.1.100
spring.redis.port=6379
spring.redis.password=mysecretpassword # 如果有密码,请填写
使用 Lettuce 客户端执行批量删除操作:
@Service
public class RedisDataDeleterService {
@Autowired
private RedisConnectionFactory connectionFactory;
public void deleteAllKeys() {
RedisAsyncCommands<String, String> asyncCommands = connectionFactory.getConnection().async();
ScanArgs scanArgs = ScanArgs.Builder.matches("*").count(1000);
RedisFuture<ScanResult<String>> scanFuture = asyncCommands.scan(ScanCursor.INITIAL, scanArgs);
AtomicBoolean isRunning = new AtomicBoolean(true);
AtomicReference<ScanCursor> lastCursor = new AtomicReference<>(ScanCursor.INITIAL);
// 异步处理扫描结果
scanFuture.thenAccept(scanResult -> {
lastCursor.set(scanResult.getCursor());
List<String> keysToDelete = scanResult.getKeys();
if (!keysToDelete.isEmpty()) {
RedisFuture<Long> delFuture = asyncCommands.del(keysToDelete.toArray(new String[0]));
delFuture.thenAccept(count -> {
if (isRunning.get()) {
// 如果仍在运行,继续扫描
deleteAllKeysRecursive(asyncCommands, scanArgs, lastCursor, isRunning);
}
});
} else {
isRunning.set(false);
}
});
// 设置超时时间(可根据实际情况调整)
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(120000); // 2分钟超时
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
isRunning.set(false);
});
}
private void deleteAllKeysRecursive(RedisAsyncCommands<String, String> asyncCommands,
ScanArgs scanArgs,
AtomicReference<ScanCursor> lastCursor,
AtomicBoolean isRunning) {
if (isRunning.get()) {
asyncCommands.scan(lastCursor.get(), scanArgs).thenAccept(scanResult -> {
lastCursor.set(scanResult.getCursor());
List<String> keysToDelete = scanResult.getKeys();
if (!keysToDelete.isEmpty()) {
asyncCommands.del(keysToDelete.toArray(new String[0])).thenAccept(count -> {
if (isRunning.get()) {
deleteAllKeysRecursive(asyncCommands, scanArgs, lastCursor, isRunning);
}
});
} else {
isRunning.set(false);
}
});
}
}
}
调用
@RestController
@RequestMapping("/redis")
public class RedisController {
@Autowired
private RedisDataDeleterService redisDataDeleterService;
@GetMapping("/delete-all-keys")
public ResponseEntity<?> deleteAllKeys() {
redisDataDeleterService.deleteAllKeys();
return ResponseEntity.ok().build();
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。