Redis

准备

简介

Redis 是完全开源的,遵守 BSD 协议,是一个高性能的 key-value 数据库。

Redis 与其他 key - value 缓存产品有以下三个特点:

  • Redis支持数据的持久化,可以将内存中的数据保存在磁盘中,重启的时候可以再次加载进行使用。
  • Redis不仅仅支持简单的key-value类型的数据,同时还提供list,set,zset,hash等数据结构的存储。
  • Redis支持数据的备份,即master-slave模式的数据备份。

Windows安装

下载地址

在安装目录中 , 在对应安装文件夹打开cmd

redis-server.exe redis.windows.conf

如果将文件夹添加到环境变量中,cmd输入

redis-server.exe

打开在另一个cmd窗口

redis-cli.exe -h 127.0.0.1 -p 6379
#设置键值对
set myKey abc
#取出键值对
get myKey

Linux 源码安装

下载地址

# wget http://download.redis.io/releases/redis-6.0.8.tar.gz
# tar xzf redis-6.0.8.tar.gz
# cd redis-6.0.8
# make

如果缺少gcc依赖

yum -y install gcc automake autoconf libtool make
yum install gcc-c++
yum install centos-release-scl scl-utils-build
yum install -y devtoolset-8-toolchain
scl enable devtoolset-8 bash

make distclean
make && make install

执行完 make 命令后,redis-6.0.8 的 src 目录下会出现编译后的 redis 服务程序 redis-server

# cd src
# ./redis-server

这种方式启动 redis 使用的是默认配置。也可以通过启动参数告诉 redis 使用指定配置文件使用下面命令启动。

# cd src
# ./redis-server ../redis.conf

redis.conf 是一个默认的配置文件。我们可以根据需要使用自己的配置文件。启动 redis 服务进程后,就可以使用测试客户端程序 redis-cli 和 redis 服务交互了。 比如:

# cd src
# ./redis-cli
redis> set foo bar
OK
redis> get foo
"bar"

Ubuntu apt 命令安装

# sudo apt update
# sudo apt install redis-server

启动 Redis

# redis-server

查看 redis 是否启动?

# redis-cli

以上命令将打开以下终端:

redis 127.0.0.1:6379>

127.0.0.1 是本机 IP ,6379 是 redis 服务端口。现在我们输入 PING 命令。

redis 127.0.0.1:6379> ping
PONG

配置

Redis 的配置文件位于 Redis 安装目录下,文件名为 redis.conf(Windows 名为 redis.windows.conf)

查看配置

Redis CONFIG 命令格式如下:

redis 127.0.0.1:6379> CONFIG GET CONFIG_SETTING_NAME
#示例
redis 127.0.0.1:6379> CONFIG GET loglevel

1) "loglevel"
2) "notice"

使用 ***** 号获取所有配置项:

编辑配置

可以通过修改 redis.conf 文件或使用 CONFIG set 命令来修改配置

CONFIG SET 命令基本语法:

redis 127.0.0.1:6379> CONFIG SET CONFIG_SETTING_NAME NEW_CONFIG_VALUE
redis 127.0.0.1:6379> CONFIG SET loglevel "notice"
OK
redis 127.0.0.1:6379> CONFIG GET loglevel

1) "loglevel"
2) "notice"

redis.conf 配置项说明如下:

序号 配置项 说明
1 daemonize no Redis 默认不是以守护进程的方式运行,可以通过该配置项修改,使用 yes 启用守护进程(Windows 不支持守护线程的配置为 no )
2 pidfile /var/run/redis.pid 当 Redis 以守护进程方式运行时,Redis 默认会把 pid 写入 /var/run/redis.pid 文件,可以通过 pidfile 指定
3 port 6379 指定 Redis 监听端口,默认端口为 6379,作者在自己的一篇博文中解释了为什么选用 6379 作为默认端口,因为 6379 在手机按键上 MERZ 对应的号码,而 MERZ 取自意大利歌女 Alessia Merz 的名字
4 bind 127.0.0.1 绑定的主机地址
5 timeout 300 当客户端闲置多长秒后关闭连接,如果指定为 0 ,表示关闭该功能
6 loglevel notice 指定日志记录级别,Redis 总共支持四个级别:debug、verbose、notice、warning,默认为 notice
7 logfile stdout 日志记录方式,默认为标准输出,如果配置 Redis 为守护进程方式运行,而这里又配置为日志记录方式为标准输出,则日志将会发送给 /dev/null
8 databases 16 设置数据库的数量,默认数据库为0,可以使用SELECT 命令在连接上指定数据库id
9 save <seconds> <changes>Redis 默认配置文件中提供了三个条件:save 900 1 save 300 10 save 60 10000分别表示 900 秒(15 分钟)内有 1 个更改,300 秒(5 分钟)内有 10 个更改以及 60 秒内有 10000 个更改。 指定在多长时间内,有多少次更新操作,就将数据同步到数据文件,可以多个条件配合
10 rdbcompression yes 指定存储至本地数据库时是否压缩数据,默认为 yes,Redis 采用 LZF 压缩,如果为了节省 CPU 时间,可以关闭该选项,但会导致数据库文件变的巨大
11 dbfilename dump.rdb 指定本地数据库文件名,默认值为 dump.rdb
12 dir ./ 指定本地数据库存放目录
13 slaveof <masterip> <masterport> 设置当本机为 slave 服务时,设置 master 服务的 IP 地址及端口,在 Redis 启动时,它会自动从 master 进行数据同步
14 masterauth <master-password> 当 master 服务设置了密码保护时,slave 服务连接 master 的密码
15 requirepass foobared 设置 Redis 连接密码,如果配置了连接密码,客户端在连接 Redis 时需要通过 AUTH <password> 命令提供密码,默认关闭
16 maxclients 128 设置同一时间最大客户端连接数,默认无限制,Redis 可以同时打开的客户端连接数为 Redis 进程可以打开的最大文件描述符数,如果设置 maxclients 0,表示不作限制。当客户端连接数到达限制时,Redis 会关闭新的连接并向客户端返回 max number of clients reached 错误信息
17 maxmemory <bytes> 指定 Redis 最大内存限制,Redis 在启动时会把数据加载到内存中,达到最大内存后,Redis 会先尝试清除已到期或即将到期的 Key,当此方法处理 后,仍然到达最大内存设置,将无法再进行写入操作,但仍然可以进行读取操作。Redis 新的 vm 机制,会把 Key 存放内存,Value 会存放在 swap 区
18 appendonly no 指定是否在每次更新操作后进行日志记录,Redis 在默认情况下是异步的把数据写入磁盘,如果不开启,可能会在断电时导致一段时间内的数据丢失。因为 redis 本身同步数据文件是按上面 save 条件来同步的,所以有的数据会在一段时间内只存在于内存中。默认为 no
19 appendfilename appendonly.aof 指定更新日志文件名,默认为 appendonly.aof
20 appendfsync everysec 指定更新日志条件,共有 3 个可选值:no:表示等操作系统进行数据缓存同步到磁盘(快)always:表示每次更新操作后手动调用 fsync() 将数据写到磁盘(慢,安全)everysec:表示每秒同步一次(折中,默认值)
21 vm-enabled no 指定是否启用虚拟内存机制,默认值为 no,简单的介绍一下,VM 机制将数据分页存放,由 Redis 将访问量较少的页即冷数据 swap 到磁盘上,访问多的页面由磁盘自动换出到内存中(在后面的文章我会仔细分析 Redis 的 VM 机制)
22 vm-swap-file /tmp/redis.swap 虚拟内存文件路径,默认值为 /tmp/redis.swap,不可多个 Redis 实例共享
23 vm-max-memory 0 将所有大于 vm-max-memory 的数据存入虚拟内存,无论 vm-max-memory 设置多小,所有索引数据都是内存存储的(Redis 的索引数据 就是 keys),也就是说,当 vm-max-memory 设置为 0 的时候,其实是所有 value 都存在于磁盘。默认值为 0
24 vm-page-size 32 Redis swap 文件分成了很多的 page,一个对象可以保存在多个 page 上面,但一个 page 上不能被多个对象共享,vm-page-size 是要根据存储的 数据大小来设定的,作者建议如果存储很多小对象,page 大小最好设置为 32 或者 64bytes;如果存储很大大对象,则可以使用更大的 page,如果不确定,就使用默认值
25 vm-pages 134217728 设置 swap 文件中的 page 数量,由于页表(一种表示页面空闲或使用的 bitmap)是在放在内存中的,,在磁盘上每 8 个 pages 将消耗 1byte 的内存。
26 vm-max-threads 4 设置访问swap文件的线程数,最好不要超过机器的核数,如果设置为0,那么所有对swap文件的操作都是串行的,可能会造成比较长时间的延迟。默认值为4
27 glueoutputbuf yes 设置在向客户端应答时,是否把较小的包合并为一个包发送,默认为开启
28 hash-max-zipmap-entries 64 hash-max-zipmap-value 512 指定在超过一定的数量或者最大的元素超过某一临界值时,采用一种特殊的哈希算法
29 activerehashing yes 指定是否激活重置哈希,默认为开启(后面在介绍 Redis 的哈希算法时具体介绍)
30 include /path/to/local.conf 指定包含其它的配置文件,可以在同一主机上多个Redis实例之间使用同一份配置文件,而同时各个实例又拥有自己的特定配置文件

设置

目录内容

redis-benchmark:性能测试工具,可以在自己本子运行,看看自己本子性能如何

redis-check-aof:修复有问题的AOF文件,rdb和aof后面讲

redis-check-dump:修复有问题的dump.rdb文件

redis-sentinel:Redis集群使用

redis-server:Redis服务器启动命令

redis-cli:客户端,操作入口

后台启动

#拷贝一份redis.conf到其他目录
cp  /opt/redis-3.2.5/redis.conf  /myredis
#修改redis.conf文件将里面的daemonize no 改成 yes,让服务在后台启动
#根据配置文件启动
redis-server /root/redis/redis-6.0.8/redis.conf
#设置密码
CONFIG set requirepass "密码"

关闭

单实例关闭:redis-cli shutdown

也可以进入终端后再关闭: >shutdown

多实例关闭,指定端口关闭:redis-cli -p 6379 shutdown

相关知识

  • 默认16个数据库,类似数组下标从0开始,初始默认使用0号库
  • 使用命令 select <dbid>来切换数据库。如: select 8
  • 统一密码管理,所有库同样密码。
  • dbsize查看当前数据库的key的数量
  • flushdb清空当前库
  • flushall通杀全部库

单线程+多路IO复用技术

多路复用是指使用一个线程来检查多个文件描述符(Socket)的就绪状态,比如调用select和poll函数,传入多个文件描述符,如果有一个文件描述符就绪,则返回,否则阻塞直到超时。得到就绪状态后进行真正的操作可以在同一个线程里执行,也可以启动线程执行(比如使用线程池)

数据类型

String(字符串)

  • string 是 redis 最基本的类型,一个 key 对应一个 value。
  • string 类型是二进制安全的。意思是 redis 的 string 可以包含任何数据。比如jpg图片或者序列化的对象。
  • string 类型是 Redis 最基本的数据类型,string 类型的值最大能存储 512MB
  • 当字符串长度小于1M时,扩容都是加倍现有的空间,如果超过1M,扩容时一次只会多扩1M的空间。需要注意的是字符串最大长度为512M。
redis> set foo bar
OK
redis> get foo
"bar"

Hash(哈希)

  • Redis hash 是一个键值(key=>value)对集合。

  • Redis hash 是一个 string 类型的 field 和 value 的映射表,hash 特别适合用于存储对象

DEL runoob 用于删除前面测试用过的 key,不然会报错:(error) WRONGTYPE Operation against a key holding the wrong kind of value

redis 127.0.0.1:6379> DEL runoob
redis 127.0.0.1:6379> HMSET runoob field1 "Hello" field2 "World"
"OK"
redis 127.0.0.1:6379> HGET runoob field1
"Hello"
redis 127.0.0.1:6379> HGET runoob field2
"World"
  • 实例中我们使用了 Redis HMSET, HGET 命令,HMSET 设置了两个 field=>value 对, HGET 获取对应 field 对应的 value

  • 每个 hash 可以存储 232 -1 键值对(40多亿)。

List(列表)

  • Redis 列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)。
  • 它的底层实际是个双向链表,对两端的操作性能很高,通过索引下标的操作中间的节点性能会较差。
redis 127.0.0.1:6379> DEL runoob
redis 127.0.0.1:6379> lpush runoob redis
(integer) 1
redis 127.0.0.1:6379> lpush runoob mongodb
(integer) 2
redis 127.0.0.1:6379> lpush runoob rabbitmq
(integer) 3
redis 127.0.0.1:6379> lrange runoob 0 10
1) "rabbitmq"
2) "mongodb"
3) "redis"
redis 127.0.0.1:6379>

列表最多可存储 232 - 1 元素 (4294967295, 每个列表可存储40多亿)。

Set(集合)

  • Redis 的 Set 是 string 类型的无序集合。
  • 集合是通过哈希表实现的,所以添加,删除,查找的复杂度都是 O(1)。

sadd 命令

添加一个 string 元素到 key 对应的 set 集合中,成功返回 1,如果元素已经在集合中返回 0。

sadd key member
redis 127.0.0.1:6379> DEL runoob
redis 127.0.0.1:6379> sadd runoob redis
(integer) 1
redis 127.0.0.1:6379> sadd runoob mongodb
(integer) 1
redis 127.0.0.1:6379> sadd runoob rabbitmq
(integer) 1
redis 127.0.0.1:6379> sadd runoob rabbitmq
(integer) 0
redis 127.0.0.1:6379> smembers runoob

1) "redis"
2) "rabbitmq"
3) "mongodb"

**注意:**以上实例中 rabbitmq 添加了两次,但根据集合内元素的唯一性,第二次插入的元素将被忽略。

集合中最大的成员数为 232 - 1(4294967295, 每个集合可存储40多亿个成员)。

zset(sorted set:有序集合)

  • Redis zset 和 set 一样也是string类型元素的集合,且不允许重复的成员。

  • 不同的是每个元素都会关联一个double类型的分数。redis正是通过分数来为集合中的成员进行从小到大的排序。

  • zset的成员是唯一的,但分数(score)却可以重复。

zadd 命令

添加元素到集合,元素在集合中存在则更新对应score

zadd key score member 
redis 127.0.0.1:6379> DEL runoob
redis 127.0.0.1:6379> zadd runoob 0 redis
(integer) 1
redis 127.0.0.1:6379> zadd runoob 0 mongodb
(integer) 1
redis 127.0.0.1:6379> zadd runoob 0 rabbitmq
(integer) 1
redis 127.0.0.1:6379> zadd runoob 0 rabbitmq
(integer) 0
redis 127.0.0.1:6379> ZRANGEBYSCORE runoob 0 1000
1) "mongodb"
2) "rabbitmq"
3) "redis"

总结

类型 简介 特性 场景
String(字符串) 二进制安全 可以包含任何数据,比如jpg图片或者序列化的对象,一个键最大能存储512M
Hash(字典) 键值对集合,即编程语言中的Map类型 适合存储对象,并且可以像数据库中update一个属性一样只修改某一项属性值(Memcached中需要取出整个字符串反序列化成对象修改完再序列化存回去) 存储、读取、修改用户属性
List(列表) 链表(双向链表) 增删快,提供了操作某一段元素的API 1,最新消息排行等功能(比如朋友圈的时间线) 2,消息队列
Set(集合) 哈希表实现,元素不重复 1、添加、删除,查找的复杂度都是O(1) 2、为集合提供了求交集、并集、差集等操作 1、共同好友 2、利用唯一性,统计访问网站的所有独立ip 3、好友推荐时,根据tag求交集,大于某个阈值就可以推荐
Sorted Set(有序集合) 将Set中的元素增加一个权重参数score,元素按score有序排列 数据插入集合时,已经进行天然排序 1、排行榜 2、带权重的消息队列

Bitmaps

setbit

setbit<key><offset><value>#设置Bitmaps中某个偏移量的值(0或1)

每个独立用户是否访问过网站存放在Bitmaps中, 将访问的用户记做1, 没有访问的用户记做0, 用偏移量作为用户的id。设置键的第offset个位的值(从0算起) , 假设现在有20个用户,userid=1, 6, 11, 15, 19的用户对网站进行了访问,进行标记

  • 在第一次初始化Bitmaps时, 假如偏移量非常大, 那么整个初始化过程执行会比较慢, 可能会造成Redis的阻塞。
  • 很多应用的用户id以一个指定数字(例如10000) 开头, 直接将用户id和Bitmaps的偏移量对应势必会造成一定的浪费, 通常的做法是每次做setbit操作时将用户id减去这个指定数字。

getbit

getbit<key><offset>#获取Bitmaps中某个偏移量的值

bitcount

bitcount<key>[start end] #统计字符串从start字节到end字节比特值为1的数量

bitop

bitop是一个复合操作, 它可以做多个Bitmaps的and(交集) 、 or(并集) 、 not(非) 、 xor(异或) 操作并将结果保存在destkey中

bitop  and(or/not/xor) <destkey> [key…]

Bitmaps与set对比

假设网站有1亿用户, 每天独立访问的用户有5千万, 如果每天用集合类型和Bitmaps分别存储活跃用户可以得到表

set和Bitmaps存储一天活跃用户对比
数据 类型 每个用户id占用空间 需要存储的用户量 全部内存量
集合 类型 64位 50000000 64位50000000 = 400MB
Bitmaps 1位 100000000 1位100000000 = 12.5MB

但Bitmaps并不是万金油, 假如该网站每天的独立访问用户很少, 例如只有10万(大量的僵尸用户) , 那么两者的对比如下表所示, 很显然, 这时候使用Bitmaps就不太合适了, 因为基本上大部分位都是0。

set和Bitmaps存储一天活跃用户对比(独立用户比较少)
数据类型 每个userid占用空间 需要存储的用户量 全部内存量
集合类型 64位 100000 64位*100000 = 800KB
Bitmaps 1位 100000000 1位*100000000 = 12.5MB

命令

Redis 客户端的基本语法为:

$ redis-cli

连接到本地的 redis 服务并执行 PING 命令,该命令用于检测 redis 服务是否启动。

$ redis-cli
redis 127.0.0.1:6379>
redis 127.0.0.1:6379> PING

PONG

如果需要在远程 redis 服务上执行命令,同样我们使用的也是 redis-cli 命令。

$ redis-cli -h host -p port -a password

以下实例演示了如何连接到主机为 127.0.0.1,端口为 6379 ,密码为 mypass 的 redis 服务上。

$redis-cli -h 127.0.0.1 -p 6379 -a "mypass"
redis 127.0.0.1:6379>
redis 127.0.0.1:6379> PING

PONG

中文乱码

要在 redis-cli 后面加上 --raw

redis-cli --raw

就可以避免中文乱码了

键(key)

Redis 键命令用于管理 redis 的键

redis 127.0.0.1:6379> SET runoobkey redis
OK
redis 127.0.0.1:6379> DEL runoobkey
(integer) 1

如果键被删除成功,命令执行后输出 (integer) 1,否则将输出 (integer) 0

keys *:查看当前库所有 key

exists key:判断某个 key 是否存在

type key:查看你的 key 是什么类型

del key :删除指定的 key 数据

unlink key:根据 value 选择非阻塞删除,仅将 keyskeyspace 元数据中删除,真正的删除会在后续异步操作

expire key 10 :为给定的 key 设置过期时间

ttl key:查看还有多少秒过期,-1表示永不过期,-2表示已过期

select:命令切换数据库

dbsize:查看当前数据库的 key 的数量

flushdb:清空当前库

flushall:通杀全部库

序号 命令及描述
1 DEL key 该命令用于在 key 存在时删除 key。
2 DUMP key 序列化给定 key ,并返回被序列化的值。
3 EXISTS key 检查给定 key 是否存在。
4 EXPIRE key seconds 为给定 key 设置过期时间,以秒计。
5 EXPIREAT key timestamp EXPIREAT 的作用和 EXPIRE 类似,都用于为 key 设置过期时间。 不同在于 EXPIREAT 命令接受的时间参数是 UNIX 时间戳(unix timestamp)。
6 PEXPIRE key milliseconds 设置 key 的过期时间以毫秒计。
7 PEXPIREAT key milliseconds-timestamp 设置 key 过期时间的时间戳(unix timestamp) 以毫秒计
8 KEYS pattern 查找所有符合给定模式( pattern)的 key 。
9 MOVE key db 将当前数据库的 key 移动到给定的数据库 db 当中。
10 PERSIST key 移除 key 的过期时间,key 将持久保持。
11 PTTL key 以毫秒为单位返回 key 的剩余的过期时间。
12 TTL key 以秒为单位,返回给定 key 的剩余生存时间(TTL, time to live)。
13 RANDOMKEY 从当前数据库中随机返回一个 key 。
14 RENAME key newkey 修改 key 的名称
15 RENAMENX key newkey 仅当 newkey 不存在时,将 key 改名为 newkey 。
16 [SCAN cursor MATCH pattern] [COUNT count] 迭代数据库中的数据库键。
17 TYPE key 返回 key 所储存的值的类型。

字符串(String)

redis 127.0.0.1:6379> SET runoobkey redis
OK
redis 127.0.0.1:6379> GET runoobkey
"redis"

set <key><value>:添加键值对

get <key>:查询对应键值

append <key><value>:将给定的 <value> 追加到原值的末尾

strlen <key>:获得值的长度

setnx <key><value>:只有在 key 不存在时,设置 key 的值

incr <key>:将 key 中储存的数字值增 1,只能对数字值操作,如果为空,新增值为 1(具有原子性

decr <key>:将 key 中储存的数字值减 1,只能对数字值操作,如果为空,新增值为 -1

incrby/decrby <key><步长>:将 key 中储存的数字值增减。自定义步长

mset <key1><value1><key2><value2> :同时设置一个或多个 key-value

mget <key1><key2><key3>...:同时获取一个或多个 value

msetnx <key1><value1><key2><value2>...:同时设置一个或多个 key-value 对,当且仅当所有给定 key 都不存在

getrange <key><起始位置><结束位置>:获得值的范围

setrange <key><起始位置><value>:用 <value> 覆写 <key> 所储存的字符串值

setex <key><过期时间><value>:设置键值的同时,设置过期时间,单位秒。

getset <key><value>:以新换旧,设置了新值同时获得旧值。

序号 命令及描述
1 SET key value 设置指定 key 的值。
2 GET key 获取指定 key 的值。
3 GETRANGE key start end 返回 key 中字符串值的子字符
4 GETSET key value 将给定 key 的值设为 value ,并返回 key 的旧值(old value)。
5 GETBIT key offset 对 key 所储存的字符串值,获取指定偏移量上的位(bit)。
6 [MGET key1 key2…] 获取所有(一个或多个)给定 key 的值。
7 SETBIT key offset value 对 key 所储存的字符串值,设置或清除指定偏移量上的位(bit)。
8 SETEX key seconds value 将值 value 关联到 key ,并将 key 的过期时间设为 seconds (以秒为单位)。
9 SETNX key value 只有在 key 不存在时设置 key 的值。
10 SETRANGE key offset value 用 value 参数覆写给定 key 所储存的字符串值,从偏移量 offset 开始。
11 STRLEN key 返回 key 所储存的字符串值的长度。
12 [MSET key value key value …] 同时设置一个或多个 key-value 对。
13 [MSETNX key value key value …] 同时设置一个或多个 key-value 对,当且仅当所有给定 key 都不存在。
14 PSETEX key milliseconds value 这个命令和 SETEX 命令相似,但它以毫秒为单位设置 key 的生存时间,而不是像 SETEX 命令那样,以秒为单位。
15 INCR key 将 key 中储存的数字值增一。
16 INCRBY key increment 将 key 所储存的值加上给定的增量值(increment) 。
17 INCRBYFLOAT key increment 将 key 所储存的值加上给定的浮点增量值(increment) 。
18 DECR key 将 key 中储存的数字值减一。
19 DECRBY key decrement key 所储存的值减去给定的减量值(decrement) 。
20 APPEND key value 如果 key 已经存在并且是一个字符串, APPEND 命令将指定的 value 追加到该 key 原来值(value)的末尾。

哈希(Hash)

127.0.0.1:6379>  HMSET runoobkey name "redis tutorial" description "redis basic commands for caching" likes 20 visitors 23000
OK
127.0.0.1:6379>  HGETALL runoobkey
1) "name"
2) "redis tutorial"
3) "description"
4) "redis basic commands for caching"
5) "likes"
6) "20"
7) "visitors"
8) "23000"

hset <key><field><value>:给 <key> 集合中的 <field> 键赋值 <value>

hget <key1><field>:从 <key1> 集合 <field> 取出 value

hmset <key1><field1><value1><field2><value2>...: 批量设置 hash 的值

hexists <key1><field>:查看哈希表 key 中,给定域 field 是否存在

hkeys <key>:列出该 hash 集合的所有 field

hvals <key>:列出该 hash 集合的所有 value

hincrby <key><field><increment>:为哈希表 key 中的域 field 的值加上增量 1 -1

hsetnx <key><field><value>:将哈希表 key 中的域 field 的值设置为 value ,当且仅当域 field 不存在

序号 命令及描述
1 [HDEL key field1 field2] 删除一个或多个哈希表字段
2 HEXISTS key field 查看哈希表 key 中,指定的字段是否存在。
3 HGET key field 获取存储在哈希表中指定字段的值。
4 HGETALL key 获取在哈希表中指定 key 的所有字段和值
5 HINCRBY key field increment 为哈希表 key 中的指定字段的整数值加上增量 increment 。
6 HINCRBYFLOAT key field increment 为哈希表 key 中的指定字段的浮点数值加上增量 increment 。
7 HKEYS key 获取所有哈希表中的字段
8 HLEN key 获取哈希表中字段的数量
9 [HMGET key field1 field2] 获取所有给定字段的值
10 [HMSET key field1 value1 field2 value2 ] 同时将多个 field-value (域-值)对设置到哈希表 key 中。
11 HSET key field value 将哈希表 key 中的字段 field 的值设为 value 。
12 HSETNX key field value 只有在字段 field 不存在时,设置哈希表字段的值。
13 HVALS key 获取哈希表中所有值。
14 [HSCAN key cursor MATCH pattern] [COUNT count] 迭代哈希表中的键值对。

列表(List)

redis 127.0.0.1:6379> LPUSH runoobkey redis
(integer) 1
redis 127.0.0.1:6379> LPUSH runoobkey mongodb
(integer) 2
redis 127.0.0.1:6379> LPUSH runoobkey mysql
(integer) 3
redis 127.0.0.1:6379> LRANGE runoobkey 0 10

1) "mysql"
2) "mongodb"
3) "redis"

lpush/rpush <key><value1><value2><value3> ....: 从左边/右边插入一个或多个值。

lpush k1 v1 v2 v3
lrange k1 0 -1
输出:v3 v2 v1

rpush k1 v1 v2 v3
rrange k1 0 -1
输出:v1 v2 v3

lpop/rpop <key>:从左边/右边吐出一个值。值在键在,值光键亡。

rpoplpush <key1><key2>:从 <key1> 列表右边吐出一个值,插到 <key2> 列表左边。

lrange <key><start><stop>:按照索引下标获得元素(从左到右)

lrange mylist 0 -1 0:左边第一个,-1右边第一个,(0 -1表示获取所有)

lindex <key><index>:按照索引下标获得元素(从左到右)

llen <key>:获得列表长度

linsert <key> before/after <value><newvalue>:在 <value> 的前面/后面插入 <newvalue> 插入值

lrem <key><n><value>:从左边删除 nvalue(从左到右)

lset<key><index><value>:将列表 key 下标为 index 的值替换成 value

序号 命令及描述
1 [BLPOP key1 key2 ] timeout 移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
2 [BRPOP key1 key2 ] timeout 移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
3 BRPOPLPUSH source destination timeout 从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
4 LINDEX key index 通过索引获取列表中的元素
5 LINSERT key BEFORE|AFTER pivot value 在列表的元素前或者后插入元素
6 LLEN key 获取列表长度
7 LPOP key 移出并获取列表的第一个元素
8 [LPUSH key value1 value2] 将一个或多个值插入到列表头部
9 LPUSHX key value 将一个值插入到已存在的列表头部
10 LRANGE key start stop 获取列表指定范围内的元素
11 LREM key count value 移除列表元素
12 LSET key index value 通过索引设置列表元素的值
13 LTRIM key start stop 对一个列表进行修剪(trim),就是说,让列表只保留指定区间内的元素,不在指定区间之内的元素都将被删除。
14 RPOP key 移除列表的最后一个元素,返回值为移除的元素。
15 RPOPLPUSH source destination 移除列表的最后一个元素,并将该元素添加到另一个列表并返回
16 [RPUSH key value1 value2] 在列表中添加一个或多个值到列表尾部
17 RPUSHX key value 为已存在的列表添加值

集合(Set)

redis 127.0.0.1:6379> SADD runoobkey redis
(integer) 1
redis 127.0.0.1:6379> SADD runoobkey mongodb
(integer) 1
redis 127.0.0.1:6379> SADD runoobkey mysql
(integer) 1
redis 127.0.0.1:6379> SADD runoobkey mysql
(integer) 0
redis 127.0.0.1:6379> SMEMBERS runoobkey

1) "mysql"
2) "mongodb"
3) "redis"

sadd <key><value1><value2> .....:将一个或多个 member 元素加入到集合 key 中,已经存在的 member 元素将被忽略

smembers <key>:取出该集合的所有值。

sismember <key><value>:判断集合 <key> 是否为含有该 <value> 值,有返回 1,没有返回 0

scard<key>:返回该集合的元素个数。

srem <key><value1><value2> ....:删除集合中的某个元素

spop <key>:随机从该集合中吐出一个值

srandmember <key><n>:随机从该集合中取出 n 个值,不会从集合中删除

smove <source><destination>value:把集合中一个值从一个集合移动到另一个集合

sinter <key1><key2>:返回两个集合的交集元素

sunion <key1><key2>:返回两个集合的并集元素

sdiff <key1><key2>:返回两个集合的差集元素(key1 中的,不包含 key2 中的)

序号 命令及描述
1 [SADD key member1 member2] 向集合添加一个或多个成员
2 SCARD key 获取集合的成员数
3 [SDIFF key1 key2] 返回第一个集合与其他集合之间的差异。
4 [SDIFFSTORE destination key1 key2] 返回给定所有集合的差集并存储在 destination 中
5 [SINTER key1 key2] 返回给定所有集合的交集
6 [SINTERSTORE destination key1 key2] 返回给定所有集合的交集并存储在 destination 中
7 SISMEMBER key member 判断 member 元素是否是集合 key 的成员
8 SMEMBERS key 返回集合中的所有成员
9 SMOVE source destination member 将 member 元素从 source 集合移动到 destination 集合
10 SPOP key 移除并返回集合中的一个随机元素
11 [SRANDMEMBER key count] 返回集合中一个或多个随机数
12 [SREM key member1 member2] 移除集合中一个或多个成员
13 [SUNION key1 key2] 返回所有给定集合的并集
14 [SUNIONSTORE destination key1 key2] 所有给定集合的并集存储在 destination 集合中
15 [SSCAN key cursor MATCH pattern] [COUNT count] 迭代集合中的元素

有序集合(sorted set)

redis 127.0.0.1:6379> ZADD runoobkey 1 redis
(integer) 1
redis 127.0.0.1:6379> ZADD runoobkey 2 mongodb
(integer) 1
redis 127.0.0.1:6379> ZADD runoobkey 3 mysql
(integer) 1
redis 127.0.0.1:6379> ZADD runoobkey 3 mysql
(integer) 0
redis 127.0.0.1:6379> ZADD runoobkey 4 mysql
(integer) 0
redis 127.0.0.1:6379> ZRANGE runoobkey 0 10 WITHSCORES

1) "redis"
2) "1"
3) "mongodb"
4) "2"
5) "mysql"
6) "4"

zadd <key><score1><value1><score2><value2>…:将一个或多个 member 元素及其 score 值加入到有序集 key 当中

zrange <key><start><stop> [WITHSCORES]:返回有序集 key 中,下标在 <start><stop> 之间的元素

当带 WITHSCORES,可以让分数一起和值返回到结果集

zrangebyscore key min max [withscores] [limit offset count]:返回有序集 key 中,所有 score 值介于 minmax 之间(包括等于 minmax )的成员。有序集成员按 score 值递增(从小到大)次序排列。

zrevrangebyscore key max min [withscores] [limit offset count]:同上,改为从大到小排列

zincrby <key><increment><value>:为元素的 score 加上增量

zrem <key><value>:删除该集合下,指定值的元素

zcount <key><min><max>:统计该集合,分数区间内的元素个数

zrank <key><value>:返回该值在集合中的排名,从 0 开始。

序号 命令及描述
1 [ZADD key score1 member1 score2 member2] 向有序集合添加一个或多个成员,或者更新已存在成员的分数
2 ZCARD key 获取有序集合的成员数
3 ZCOUNT key min max 计算在有序集合中指定区间分数的成员数
4 ZINCRBY key increment member 有序集合中对指定成员的分数加上增量 increment
5 [ZINTERSTORE destination numkeys key key …] 计算给定的一个或多个有序集的交集并将结果集存储在新的有序集合 destination 中
6 ZLEXCOUNT key min max 在有序集合中计算指定字典区间内成员数量
7 [ZRANGE key start stop WITHSCORES] 通过索引区间返回有序集合指定区间内的成员
8 [ZRANGEBYLEX key min max LIMIT offset count] 通过字典区间返回有序集合的成员
9 [ZRANGEBYSCORE key min max WITHSCORES] [LIMIT] 通过分数返回有序集合指定区间内的成员
10 ZRANK key member 返回有序集合中指定成员的索引
11 [ZREM key member member …] 移除有序集合中的一个或多个成员
12 ZREMRANGEBYLEX key min max 移除有序集合中给定的字典区间的所有成员
13 ZREMRANGEBYRANK key start stop 移除有序集合中给定的排名区间的所有成员
14 ZREMRANGEBYSCORE key min max 移除有序集合中给定的分数区间的所有成员
15 [ZREVRANGE key start stop WITHSCORES] 返回有序集中指定区间内的成员,通过索引,分数从高到低
16 [ZREVRANGEBYSCORE key max min WITHSCORES] 返回有序集中指定分数区间内的成员,分数从高到低排序
17 ZREVRANK key member 返回有序集合中指定成员的排名,有序集成员按分数值递减(从大到小)排序
18 ZSCORE key member 返回有序集中,成员的分数值
19 [ZUNIONSTORE destination numkeys key key …] 计算给定的一个或多个有序集的并集,并存储在新的 key 中
20 [ZSCAN key cursor MATCH pattern] [COUNT count] 迭代有序集合中的元素(包括元素成员和元素分值)

HyperLogLog

Redis 在 2.8.9 版本添加了 HyperLogLog 结构。

Redis HyperLogLog 是用来做基数统计的算法,HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定的、并且是很小的。

在 Redis 里面,每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基数。这和计算基数时,元素越多耗费内存就越多的集合形成鲜明对比。

什么是基数?

比如数据集 {1, 3, 5, 7, 5, 7, 8}, 那么这个数据集的基数集为 {1, 3, 5 ,7, 8}, 基数(不重复元素)为5。 基数估计就是在误差可接受的范围内,快速计算基数。

redis 127.0.0.1:6379> PFADD runoobkey "redis"

1) (integer) 1

redis 127.0.0.1:6379> PFADD runoobkey "mongodb"

1) (integer) 1

redis 127.0.0.1:6379> PFADD runoobkey "mysql"

1) (integer) 1

redis 127.0.0.1:6379> PFCOUNT runoobkey

(integer) 3
序号 命令及描述
1 [PFADD key element element …] 添加指定元素到 HyperLogLog 中。
2 [PFCOUNT key key …] 返回给定 HyperLogLog 的基数估算值。
3 [PFMERGE destkey sourcekey sourcekey …] 将多个 HyperLogLog 合并为一个 HyperLogLog
pfadd <key>< element> [element ...]   #添加指定元素到 HyperLogLog 中
pfcount<key> [key ...] #计算HLL的近似基数
pfmerge<destkey><sourcekey> [sourcekey ...]  #将一个或多个HLL合并后的结果存储在另一个HLL中

发布订阅

  • Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。

  • Redis 客户端可以订阅任意数量的频道。

image-image-20230119141020731

创建了订阅频道名为 runoobChat:

redis 127.0.0.1:6379> SUBSCRIBE runoobChat

Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "runoobChat"
3) (integer) 1

重新开启个 redis 客户端,然后在同一个频道 runoobChat 发布两次消息,订阅者就能接收到消息。

redis 127.0.0.1:6379> PUBLISH runoobChat "Redis PUBLISH test"

(integer) 1

redis 127.0.0.1:6379> PUBLISH runoobChat "Learn redis by runoob.com"

(integer) 1

# 订阅者的客户端会显示如下消息
 1) "message"
2) "runoobChat"
3) "Redis PUBLISH test"
 1) "message"
2) "runoobChat"
3) "Learn redis by runoob.com"
序号 命令及描述
1 [PSUBSCRIBE pattern pattern …] 订阅一个或多个符合给定模式的频道。
2 [PUBSUB subcommand argument [argument …]] 查看订阅与发布系统状态。
3 PUBLISH channel message 将信息发送到指定的频道。
4 [PUNSUBSCRIBE pattern [pattern …]] 退订所有给定模式的频道。
5 [SUBSCRIBE channel channel …] 订阅给定的一个或多个频道的信息。
6 [UNSUBSCRIBE channel [channel …]] 指退订给定的频道。

事务

语法

Redis 事务可以一次执行多个命令, 并且带有以下三个重要的保证:

  • 批量操作在发送 EXEC 命令前被放入队列缓存。
  • 收到 EXEC 命令后进入事务执行,事务中任意命令执行失败,其余的命令依然被执行。
  • 在事务执行过程,其他客户端提交的命令请求不会插入到事务执行命令序列中。

一个事务从开始到执行会经历以下三个阶段:

  • 开始事务。
  • 命令入队。
  • 执行事务

先以 MULTI 开始一个事务, 然后将多个命令入队到事务中, 最后由 EXEC 命令触发事务, 一并执行事务中的所有命令:

redis 127.0.0.1:6379> MULTI
OK

redis 127.0.0.1:6379> SET book-name "Mastering C++ in 21 days"
QUEUED

redis 127.0.0.1:6379> GET book-name
QUEUED

redis 127.0.0.1:6379> SADD tag "C++" "Programming" "Mastering Series"
QUEUED

redis 127.0.0.1:6379> SMEMBERS tag
QUEUED

redis 127.0.0.1:6379> EXEC
1) OK
2) "Mastering C++ in 21 days"
3) (integer) 3
4) 1) "Mastering Series"
   2) "C++"
   3) "Programming"

单个 Redis 命令的执行是原子性的,但 Redis 没有在事务上增加任何维持原子性的机制,所以 Redis 事务的执行并不是原子性的。

事务可以理解为一个打包的批量执行脚本,但批量指令并非原子化的操作,中间某条指令的失败不会导致前面已做指令的回滚,也不会造成后续的指令不做。

序号 命令及描述
1 DISCARD 取消事务,放弃执行事务块内的所有命令。
2 EXEC 执行所有事务块内的命令。
3 MULTI 标记一个事务块的开始。
4 UNWATCH 取消 WATCH 命令对所有 key 的监视。
5 [WATCH key key …] 监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。

错误处理

组队中某个命令出现了报告错误,执行时整个的所有队列都会被取消

如果执行阶段某个命令报出了错误,则只有报错的命令不会被执行,而其他的命令都会执行,不会回滚。

悲观锁(Pessimistic Lock), 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁表锁等,读锁写锁等,都是在做操作之前先上锁。

乐观锁(Optimistic Lock), 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量。Redis就是利用这种check-and-set机制实现事务的。

Redis事务三特性

  • 单独的隔离操作

    • 事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
  • 没有隔离级别的概念

    • 队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行
  • 不保证原子性

    • 事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚

脚本

Redis 脚本使用 Lua 解释器来执行脚本。 Redis 2.6 版本通过内嵌支持 Lua 环境。执行脚本的常用命令为 EVAL

redis 127.0.0.1:6379> EVAL script numkeys key [key ...] arg [arg ...]
redis 127.0.0.1:6379> EVAL "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second

1) "key1"
2) "key2"
3) "first"
4) "second"
序号 命令及描述
1 [EVAL script numkeys key key …] arg [arg …] 执行 Lua 脚本。
2 [EVALSHA sha1 numkeys key key …] arg [arg …] 执行 Lua 脚本。
3 [SCRIPT EXISTS script script …] 查看指定的脚本是否已经被保存在缓存当中。
4 SCRIPT FLUSH 从脚本缓存中移除所有脚本。
5 SCRIPT KILL 杀死当前正在运行的 Lua 脚本。
6 SCRIPT LOAD script 将脚本 script 添加到脚本缓存中,但并不立即执行这个脚本。

连接

Redis 连接命令主要是用于连接 redis 服务

redis 127.0.0.1:6379> AUTH "password"
OK
redis 127.0.0.1:6379> PING
PONG
序号 命令及描述
1 AUTH password 验证密码是否正确
2 ECHO message 打印字符串
3 PING 查看服务是否运行
4 QUIT 关闭当前连接
5 SELECT index 切换到指定的数据库

查看服务器统计信息

redis 127.0.0.1:6379> INFO
序号 命令及描述
1 BGREWRITEAOF 异步执行一个 AOF(AppendOnly File) 文件重写操作
2 BGSAVE 在后台异步保存当前数据库的数据到磁盘
3 [CLIENT KILL ip:port] [ID client-id] 关闭客户端连接
4 CLIENT LIST 获取连接到服务器的客户端连接列表
5 CLIENT GETNAME 获取连接的名称
6 CLIENT PAUSE timeout 在指定时间内终止运行来自客户端的命令
7 CLIENT SETNAME connection-name 设置当前连接的名称
8 CLUSTER SLOTS 获取集群节点的映射数组
9 COMMAND 获取 Redis 命令详情数组
10 COMMAND COUNT 获取 Redis 命令总数
11 COMMAND GETKEYS 获取给定命令的所有键
12 TIME 返回当前服务器时间
13 [COMMAND INFO command-name command-name …] 获取指定 Redis 命令描述的数组
14 CONFIG GET parameter 获取指定配置参数的值
15 CONFIG REWRITE 对启动 Redis 服务器时所指定的 redis.conf 配置文件进行改写
16 CONFIG SET parameter value 修改 redis 配置参数,无需重启
17 CONFIG RESETSTAT 重置 INFO 命令中的某些统计数据
18 DBSIZE 返回当前数据库的 key 的数量
19 DEBUG OBJECT key 获取 key 的调试信息
20 DEBUG SEGFAULT 让 Redis 服务崩溃
21 FLUSHALL 删除所有数据库的所有key
22 FLUSHDB 删除当前数据库的所有key
23 [INFO section] 获取 Redis 服务器的各种信息和统计数值
24 LASTSAVE 返回最近一次 Redis 成功将数据保存到磁盘上的时间,以 UNIX 时间戳格式表示
25 MONITOR 实时打印出 Redis 服务器接收到的命令,调试用
26 ROLE 返回主从实例所属的角色
27 SAVE 同步保存数据到硬盘
28 [SHUTDOWN NOSAVE] [SAVE] 异步保存数据到硬盘,并关闭服务器
29 SLAVEOF host port 将当前服务器转变为指定服务器的从属服务器(slave server)
30 [SLOWLOG subcommand argument] 管理 redis 的慢日志
31 SYNC 用于复制功能(replication)的内部命令

Stream

Redis Stream 是 Redis 5.0 版本新增加的数据结构。

Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。

简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。

而 Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

Redis Stream 的结构如下所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:

image-en-us_image_0167982791

每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。

上图解析:

  • Consumer Group :消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)。
  • last_delivered_id :游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
  • pending_ids :消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)。

消息队列相关命令:

  • XADD - 添加消息到末尾
  • XTRIM - 对流进行修剪,限制长度
  • XDEL - 删除消息
  • XLEN - 获取流包含的元素数量,即消息长度
  • XRANGE - 获取消息列表,会自动过滤已经删除的消息
  • XREVRANGE - 反向获取消息列表,ID 从大到小
  • XREAD - 以阻塞或非阻塞方式获取消息列表

消费者组相关命令:

  • XGROUP CREATE - 创建消费者组
  • XREADGROUP GROUP - 读取消费者组中的消息
  • XACK - 将消息标记为"已处理"
  • XGROUP SETID - 为消费者组设置新的最后递送消息ID
  • XGROUP DELCONSUMER - 删除消费者
  • XGROUP DESTROY - 删除消费者组
  • XPENDING - 显示待处理消息的相关信息
  • XCLAIM - 转移消息的归属权
  • XINFO - 查看流和消费者组的相关信息;
  • XINFO GROUPS - 打印消费者组的信息;
  • XINFO STREAM - 打印流信息

XADD

使用 XADD 向队列添加消息,如果指定的队列不存在,则创建一个队列,XADD 语法格式:

XADD key ID field value [field value ...]
  • key :队列名称,如果不存在就创建
  • ID :消息 id,我们使用 * 表示由 redis 生成,可以自定义,但是要自己保证递增性。
  • field value : 记录。
redis> XADD mystream * name Sara surname OConnor
"1601372323627-0"
redis> XADD mystream * field1 value1 field2 value2 field3 value3
"1601372323627-1"
redis> XLEN mystream
(integer) 2
redis> XRANGE mystream - +
1) 1) "1601372323627-0"
   2) 1) "name"
      2) "Sara"
      3) "surname"
      4) "OConnor"
2) 1) "1601372323627-1"
   2) 1) "field1"
      2) "value1"
      3) "field2"
      4) "value2"
      5) "field3"
      6) "value3"

XTRIM

使用 XTRIM 对流进行修剪,限制长度, 语法格式:

XTRIM key MAXLEN [~] count
  • key :队列名称
  • MAXLEN :长度
  • count :数量
127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
"1601372434568-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(integer) 0
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1601372434568-0"
   2) 1) "field1"
      2) "A"
      3) "field2"
      4) "B"
      5) "field3"
      6) "C"
      7) "field4"
      8) "D"
127.0.0.1:6379>

XDEL

使用 XDEL 删除消息,语法格式:

XDEL key ID [ID ...]
  • key:队列名称
  • ID :消息 ID
> XADD mystream * a 1
1538561698944-0
> XADD mystream * b 2
1538561700640-0
> XADD mystream * c 3
1538561701744-0
> XDEL mystream 1538561700640-0
(integer) 1
127.0.0.1:6379> XRANGE mystream - +
1) 1) 1538561698944-0
   2) 1) "a"
      2) "1"
2) 1) 1538561701744-0
   2) 1) "c"
      2) "3"

XLEN

使用 XLEN 获取流包含的元素数量,即消息长度,语法格式:

XLEN key
  • key:队列名称
redis> XADD mystream * item 1
"1601372563177-0"
redis> XADD mystream * item 2
"1601372563178-0"
redis> XADD mystream * item 3
"1601372563178-1"
redis> XLEN mystream
(integer) 3
redis>

XRANGE

使用 XRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:

XRANGE key start end [COUNT count]
  • key :队列名
  • start :开始值, - 表示最小值
  • end :结束值, + 表示最大值
  • count :数量
redis> XADD writers * name Virginia surname Woolf
"1601372577811-0"
redis> XADD writers * name Jane surname Austen
"1601372577811-1"
redis> XADD writers * name Toni surname Morrison
"1601372577811-2"
redis> XADD writers * name Agatha surname Christie
"1601372577812-0"
redis> XADD writers * name Ngozi surname Adichie
"1601372577812-1"
redis> XLEN writers
(integer) 5
redis> XRANGE writers - + COUNT 2
1) 1) "1601372577811-0"
   2) 1) "name"
      2) "Virginia"
      3) "surname"
      4) "Woolf"
2) 1) "1601372577811-1"
   2) 1) "name"
      2) "Jane"
      3) "surname"
      4) "Austen"
redis>

XREVRANGE

使用 XREVRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:

XREVRANGE key end start [COUNT count]
  • key :队列名
  • end :结束值, + 表示最大值
  • start :开始值, - 表示最小值
  • count :数量
redis> XADD writers * name Virginia surname Woolf
"1601372731458-0"
redis> XADD writers * name Jane surname Austen
"1601372731459-0"
redis> XADD writers * name Toni surname Morrison
"1601372731459-1"
redis> XADD writers * name Agatha surname Christie
"1601372731459-2"
redis> XADD writers * name Ngozi surname Adichie
"1601372731459-3"
redis> XLEN writers
(integer) 5
redis> XREVRANGE writers + - COUNT 1
1) 1) "1601372731459-3"
   2) 1) "name"
      2) "Ngozi"
      3) "surname"
      4) "Adichie"
redis>

XREAD

使用 XREAD 以阻塞或非阻塞方式获取消息列表 ,语法格式:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
  • count :数量
  • milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式
  • key :队列名
  • id :消息 ID
# 从 Stream 头部读取两条消息
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
   2) 1) 1) 1526984818136-0
         2) 1) "duration"
            2) "1532"
            3) "event-id"
            4) "5"
            5) "user-id"
            6) "7782813"
      2) 1) 1526999352406-0
         2) 1) "duration"
            2) "812"
            3) "event-id"
            4) "9"
            5) "user-id"
            6) "388234"
2) 1) "writers"
   2) 1) 1) 1526985676425-0
         2) 1) "name"
            2) "Virginia"
            3) "surname"
            4) "Woolf"
      2) 1) 1526985685298-0
         2) 1) "name"
            2) "Jane"
            3) "surname"
            4) "Austen"

XGROUP CREATE

使用 XGROUP CREATE 创建消费者组,语法格式:

XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
  • key :队列名称,如果不存在就创建
  • groupname :组名。
  • $ : 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。

从头开始消费:

XGROUP CREATE mystream consumer-group-name 0-0  

从尾部开始消费:

XGROUP CREATE mystream consumer-group-name $

XREADGROUP GROUP

使用 XREADGROUP GROUP 读取消费组中的消息,语法格式:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group :消费组名
  • consumer :消费者名。
  • count : 读取数量。
  • milliseconds : 阻塞毫秒数。
  • key : 队列名。
  • ID : 消息 ID。
XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >

数据备份与恢复

备份

Redis SAVE 命令用于创建当前数据库的备份

redis 127.0.0.1:6379> SAVE 

该命令将在 redis 安装目录中创建dump.rdb文件

恢复

如果需要恢复数据,只需将备份文件 (dump.rdb) 移动到 redis 安装目录并启动服务即可。获取 redis 目录可以使用 CONFIG 命令,如下所示:

redis 127.0.0.1:6379> CONFIG GET dir
1) "dir"
2) "/usr/local/redis/bin"

以上命令 CONFIG GET dir 输出的 redis 安装目录为 /usr/local/redis/bin。

Bgsave

创建 redis 备份文件也可以使用命令 BGSAVE,该命令在后台执行。

127.0.0.1:6379> BGSAVE

Background saving started

安全

可以通过 redis 的配置文件设置密码参数,这样客户端连接到 redis 服务就需要密码验证

默认情况下 requirepass 参数是空的,这就意味着你无需通过密码验证就可以连接到 redis 服务。

你可以通过以下命令来修改该参数:

127.0.0.1:6379> CONFIG set requirepass "runoob"
OK
127.0.0.1:6379> CONFIG get requirepass
1) "requirepass"
2) "runoob"

AUTH 命令基本语法格式如下:

127.0.0.1:6379> AUTH password
127.0.0.1:6379> AUTH "runoob"
OK
127.0.0.1:6379> SET mykey "Test value"
OK
127.0.0.1:6379> GET mykey
"Test value"

性能测试

redis 性能测试的基本命令如下:

redis-benchmark [option] [option value]

注意:该命令是在 redis 的目录下执行的,而不是 redis 客户端的内部指令。

序号 选项 描述 默认值
1 -h 指定服务器主机名 127.0.0.1
2 -p 指定服务器端口 6379
3 -s 指定服务器 socket
4 -c 指定并发连接数 50
5 -n 指定请求数 10000
6 -d 以字节的形式指定 SET/GET 值的数据大小 2
7 -k 1=keep alive 0=reconnect 1
8 -r SET/GET/INCR 使用随机 key, SADD 使用随机值
9 -P 通过管道传输 <numreq> 请求 1
10 -q 强制退出 redis。仅显示 query/sec 值
11 -csv 以 CSV 格式输出
12 -l\*(L 的小写字母) 生成循环,永久执行测试
13 -t 仅运行以逗号分隔的测试命令列表。
14 *-I*(i 的大写字母) Idle 模式。仅打开 N 个 idle 连接并等待。

以下实例我们使用了多个参数来测试 redis 性能:

$ redis-benchmark -h 127.0.0.1 -p 6379 -t set,lpush -n 10000 -q

SET: 146198.83 requests per second
LPUSH: 145560.41 requests per second

以上实例中主机为 127.0.0.1,端口号为 6379,执行的命令为 set,lpush,请求数为 10000,通过 -q 参数让结果只显示每秒执行的请求数。

客户端连接

Redis 通过监听一个 TCP 端口或者 Unix socket 的方式来接收来自客户端的连接,当一个连接建立后,Redis 内部会进行以下一些操作:

  • 首先,客户端 socket 会被设置为非阻塞模式,因为 Redis 在网络事件处理上采用的是非阻塞多路复用模型。
  • 然后为这个 socket 设置 TCP_NODELAY 属性,禁用 Nagle 算法
  • 然后创建一个可读的文件事件用于监听这个客户端 socket 的数据发送

最大连接数

maxclients 的默认值是 10000,可以在 redis.conf 中对这个值进行修改。

config get maxclients

1) "maxclients"
2) "10000"

在服务启动时设置最大连接数为 100000:

redis-server --maxclients 100000
S.N. 命令 描述
1 CLIENT LIST 返回连接到 redis 服务的客户端列表
2 CLIENT SETNAME 设置当前连接的名称
3 CLIENT GETNAME 获取通过 CLIENT SETNAME 命令设置的服务名称
4 CLIENT PAUSE 挂起客户端连接,指定挂起的时间以毫秒计
5 CLIENT KILL 关闭客户端连接

管道

Redis是一种基于客户端-服务端模型以及请求/响应协议的TCP服务。这意味着通常情况下一个请求会遵循以下步骤:

  • 客户端向服务端发送一个查询请求,并监听Socket返回,通常是以阻塞模式,等待服务端响应。
  • 服务端处理命令,并将结果返回给客户端。

查看 redis 管道,只需要启动 redis 实例并输入以下命令:

$(echo -en "PING\r\n SET runoobkey redis\r\nGET runoobkey\r\nINCR visitor\r\nINCR visitor\r\nINCR visitor\r\n"; sleep 10) | nc localhost 6379

+PONG
+OK
redis
:1
:2
:3

以上实例中我们通过使用 PING 命令查看redis服务是否可用, 之后我们设置了 runoobkey 的值为 redis,然后我们获取 runoobkey 的值并使得 visitor 自增 3 次。

在返回的结果中我们可以看到这些命令一次性向 redis 服务提交,并最终一次性读取所有服务端的响应

管道技术的优势

管道技术最显著的优势是提高了 redis 服务的性能。

分区

分区是分割数据到多个Redis实例的处理过程,因此每个实例只保存key的一个子集。

分区的优势

  • 通过利用多台计算机内存的和值,允许我们构造更大的数据库。
  • 通过多核和多台计算机,允许我们扩展计算能力;通过多台计算机和网络适配器,允许我们扩展网络带宽。

分区的不足

redis的一些特性在分区方面表现的不是很好:

  • 涉及多个key的操作通常是不被支持的。举例来说,当两个set映射到不同的redis实例上时,你就不能对这两个set执行交集操作。
  • 涉及多个key的redis事务不能使用。
  • 当使用分区时,数据处理较为复杂,比如你需要处理多个rdb/aof文件,并且从多个实例和主机备份持久化文件。
  • 增加或删除容量也比较复杂。redis集群大多数支持在运行时增加、删除节点的透明数据平衡的能力,但是类似于客户端分区、代理等其他系统则不支持这项特性。然而,一种叫做presharding的技术对此是有帮助的。

分区类型

Redis 有两种类型分区。 假设有4个Redis实例 R0,R1,R2,R3,和类似user:1,user:2这样的表示用户的多个key,对既定的key有多种不同方式来选择这个key存放在哪个实例中。也就是说,有不同的系统来映射某个key到某个Redis服务。

范围分区

最简单的分区方式是按范围分区,就是映射一定范围的对象到特定的Redis实例。

比如,ID从0到10000的用户会保存到实例R0,ID从10001到 20000的用户会保存到R1,以此类推。

这种方式是可行的,并且在实际中使用,不足就是要有一个区间范围到实例的映射表。这个表要被管理,同时还需要各 种对象的映射表,通常对Redis来说并非是好的方法。

哈希分区

另外一种分区方法是hash分区。这对任何key都适用,也无需是object_name:这种形式,像下面描述的一样简单:

  • 用一个hash函数将key转换为一个数字,比如使用crc32 hash函数。对key foobar执行crc32(foobar)会输出类似93024922的整数。
  • 对这个整数取模,将其转化为0-3之间的数字,就可以将这个整数映射到4个Redis实例中的一个了。93024922 % 4 = 2,就是说key foobar应该被存到R2实例中。注意:取模操作是取除的余数,通常在多种编程语言中用%操作符实现。

Java 使用 Redis

操作数据

  • 导入jedis.jar或导入依赖

禁用Linux的防火墙:Linux(CentOS7)里执行命令

systemctl stop/disable firewalld.service

redis.conf中注释掉bind 127.0.0.1 ,然后 protected-mode no

<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>4.4.0-m1</version>
</dependency>

连接测试

public class RedisJava {
    public static void main(String[] args) {
        //连接本地的 Redis 服务
        Jedis jedis = new Jedis("localhost");
        // 如果 Redis 服务设置了密码,需要下面这行,没有就不需要
        jedis.auth("da");
        System.out.println("连接成功");
        //查看服务是否运行
        System.out.println("服务正在运行: "+jedis.ping());
        jedis.set("test","test");
        System.out.println(jedis.get("test"));
        jedis.close();
    }
}

List

public class RedisListJava {
    public static void main(String[] args) {
        //连接本地的 Redis 服务
        Jedis jedis = new Jedis("localhost");
        System.out.println("连接成功");
        //存储数据到列表中
        jedis.lpush("site-list", "Runoob");
        jedis.lpush("site-list", "Google");
        jedis.lpush("site-list", "Taobao");
        // 获取存储的数据并输出
        List<String> list = jedis.lrange("site-list", 0 ,2);
        for(int i=0; i<list.size(); i++) {
            System.out.println("列表项为: "+list.get(i));
        }
    }
}

Key

public class RedisKeyJava {
    public static void main(String[] args) {
        //连接本地的 Redis 服务
        Jedis jedis = new Jedis("localhost");
        System.out.println("连接成功");
 
        // 获取数据并输出
        Set<String> keys = jedis.keys("*"); 
        Iterator<String> it=keys.iterator() ;   
        while(it.hasNext()){   
            String key = it.next();   
            System.out.println(key);  
            System.out.println(jedis.exists("k1"));
            System.out.println(jedis.ttl("k1"));           
            System.out.println(jedis.get("k1"));
        }
    }
}

string

jedis.mset("str1","v1","str2","v2","str3","v3");
System.out.println(jedis.mget("str1","str2","str3"));

set

jedis.sadd("orders", "order01");
jedis.sadd("orders", "order02");
jedis.sadd("orders", "order03");
jedis.sadd("orders", "order04");
Set<String> smembers = jedis.smembers("orders");
for (String order : smembers) {
System.out.println(order);
}
jedis.srem("orders", "order02");

hash

jedis.hset("hash1","userName","lisi");
System.out.println(jedis.hget("hash1","userName"));
Map<String,String> map = new HashMap<String,String>();
map.put("telphone","13810169999");
map.put("address","atguigu");
map.put("email","abc@163.com");
jedis.hmset("hash2",map);
List<String> result = jedis.hmget("hash2", "telphone","email");
for (String element : result) {
System.out.println(element);
}

zset

jedis.zadd("zset01", 100d, "z3");
jedis.zadd("zset01", 90d, "l4");
jedis.zadd("zset01", 80d, "w5");
jedis.zadd("zset01", 70d, "z6");
 
Set<String> zrange = jedis.zrange("zset01", 0, -1);
for (String e : zrange) {
System.out.println(e);
}

模拟验证码

public class PhoneCode {

    public static void main(String[] args) {
        //模拟验证码发送
        verifyCode("13678765435");

        //模拟验证码校验
        //getRedisCode("13678765435","4444");
    }

    //3 验证码校验
    public static void getRedisCode(String phone,String code) {
        //从redis获取验证码
        Jedis jedis = new Jedis("192.168.44.168",6379);
        //验证码key
        String codeKey = "VerifyCode"+phone+":code";
        String redisCode = jedis.get(codeKey);
        //判断
        if(redisCode.equals(code)) {
            System.out.println("成功");
        }else {
            System.out.println("失败");
        }
        jedis.close();
    }

    //2 每个手机每天只能发送三次,验证码放到redis中,设置过期时间120
    public static void verifyCode(String phone) {
        //连接redis
        Jedis jedis = new Jedis("192.168.44.168",6379);

        //拼接key
        //手机发送次数key
        String countKey = "VerifyCode"+phone+":count";
        //验证码key
        String codeKey = "VerifyCode"+phone+":code";

        //每个手机每天只能发送三次
        String count = jedis.get(countKey);
        if(count == null) {
            //没有发送次数,第一次发送
            //设置发送次数是1
            jedis.setex(countKey,24*60*60,"1");
        } else if(Integer.parseInt(count)<=2) {
            //发送次数+1
            jedis.incr(countKey);
        } else if(Integer.parseInt(count)>2) {
            //发送三次,不能再发送
            System.out.println("今天发送次数已经超过三次");
            jedis.close();
        }

        //发送验证码放到redis里面
        String vcode = getCode();
        jedis.setex(codeKey,120,vcode);
        jedis.close();
    }

    //1 生成6位数字验证码
    public static String getCode() {
        Random random = new Random();
        String code = "";
        for(int i=0;i<6;i++) {
            int rand = random.nextInt(10);
            code += rand;
        }
        return code;
    }
}

Spring Boot整合

在pom.xml文件中引入redis相关依赖

<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- spring2.X集成redis所需common-pool2-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.6.0</version>
</dependency>

application.properties配置redis配置

#Redis服务器地址
spring.redis.host=192.168.140.136
#Redis服务器连接端口
spring.redis.port=6379
#Redis数据库索引(默认为0)
spring.redis.database= 0
#连接超时时间(毫秒)
spring.redis.timeout=1800000
#连接池最大连接数(使用负值表示没有限制)
spring.redis.lettuce.pool.max-active=20
#最大阻塞等待时间(负数表示没限制)
spring.redis.lettuce.pool.max-wait=-1
#连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=5
#连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=0
#设置密码
spring.redis.password=

添加redis配置类

@EnableCaching
@Configuration
public class RedisConfig extends CachingConfigurerSupport {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        template.setConnectionFactory(factory);
        //key序列化方式
        template.setKeySerializer(redisSerializer);
        //value序列化
        template.setValueSerializer(jackson2JsonRedisSerializer);
        //value hashmap序列化
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        return template;
    }

    @Bean
    public CacheManager cacheManager(RedisConnectionFactory factory) {
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        //解决查询缓存转换异常的问题
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        // 配置序列化(解决乱码的问题),过期时间600秒
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
            .entryTtl(Duration.ofSeconds(600))
            .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
            .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
            .disableCachingNullValues();
        RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
            .cacheDefaults(config)
            .build();
        return cacheManager;
    }
}

RedisTestController中添加测试方法

@RestController
@RequestMapping("/redisTest")
public class RedisTestController {
    @Autowired
    private RedisTemplate redisTemplate;

    @GetMapping
    public String testRedis() {
        //设置值到redis
        redisTemplate.opsForValue().set("name","lucy");
        //从redis获取值
        String name = (String)redisTemplate.opsForValue().get("name");
        return name;
    }
}

并发秒杀模拟

连接池设置

节省每次连接redis服务带来的消耗,把连接好的实例反复利用。

public class JedisPoolUtil {
	private static volatile JedisPool jedisPool = null;

	private JedisPoolUtil() {
	}

	public static JedisPool getJedisPoolInstance() {
		if (null == jedisPool) {
			synchronized (JedisPoolUtil.class) {
				if (null == jedisPool) {
					JedisPoolConfig poolConfig = new JedisPoolConfig();
					poolConfig.setMaxTotal(200);
					poolConfig.setMaxIdle(32);
					poolConfig.setMaxWaitMillis(100*1000);
					poolConfig.setBlockWhenExhausted(true);
					poolConfig.setTestOnBorrow(true);  // ping  PONG
				 
					jedisPool = new JedisPool(poolConfig, "192.168.44.168", 6379, 60000 );
				}
			}
		}
		return jedisPool;
	}

	public static void release(JedisPool jedisPool, Jedis jedis) {
		if (null != jedis) {
			jedisPool.returnResource(jedis);
		}
	}

}
  • MaxTotal:控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取;如果赋值为-1,则表示不限制;如果pool已经分配了MaxTotal个jedis实例,则此时pool的状态为exhausted。

  • maxIdle:控制一个pool最多有多少个状态为idle(空闲)的jedis实例;

  • MaxWaitMillis:表示当borrow一个jedis实例时,最大的等待毫秒数,如果超过等待时间,则直接抛JedisConnectionException;

  • testOnBorrow:获得一个jedis实例的时候是否检查连接可用性(ping());如果为true,则得到的jedis实例均是可用的;

解决问题实质:

通过lua脚本解决争抢问题,实际上是redis 利用其单线程的特性,用任务队列的方式解决多任务并发问题

**乐观锁 ** (解决超卖),但出现遗留库存和连接超时

public class SecKill_redis {

	public static void main(String[] args) {
		Jedis jedis =new Jedis("127.0.0.1",6379);
		jedis.auth("da");
		System.out.println(jedis.ping());
		jedis.close();
	}

	//秒杀过程
	public static boolean doSecKill(String uid,String prodid) throws IOException {
		//1 uid和prodid非空判断
		if(uid == null || prodid == null) {
			return false;
		}

		//2 连接redis
		//Jedis jedis = new Jedis("192.168.44.168",6379);
		//通过连接池得到jedis对象
		JedisPool jedisPoolInstance = JedisPoolUtil.getJedisPoolInstance();
		Jedis jedis = jedisPoolInstance.getResource();

		//3 拼接key
		// 3.1 库存key
		String kcKey = "sk:"+prodid+":qt";
		// 3.2 秒杀成功用户key
		String userKey = "sk:"+prodid+":user";

		//监视库存
		jedis.watch(kcKey);

		//4 获取库存,如果库存null,秒杀还没有开始
		String kc = jedis.get(kcKey);
		if(kc == null) {
			System.out.println("秒杀还没有开始,请等待");
			jedis.close();
			return false;
		}

		// 5 判断用户是否重复秒杀操作
		if(jedis.sismember(userKey, uid)) {
			System.out.println("已经秒杀成功了,不能重复秒杀");
			jedis.close();
			return false;
		}

		//6 判断如果商品数量,库存数量小于1,秒杀结束
		if(Integer.parseInt(kc)<=0) {
			System.out.println("秒杀已经结束了");
			jedis.close();
			return false;
		}

		//7 秒杀过程
		//使用事务
		Transaction multi = jedis.multi();

		//组队操作
		multi.decr(kcKey);
		multi.sadd(userKey,uid);

		//执行
		List<Object> results = multi.exec();

		if(results == null || results.size()==0) {
			System.out.println("秒杀失败了....");
			jedis.close();
			return false;
		}

		//7.1 库存-1
		//jedis.decr(kcKey);
		//7.2 把秒杀成功用户添加清单里面
		//jedis.sadd(userKey,uid);

		System.out.println("秒杀成功了..");
		jedis.close();
		return true;
	}
}

LUA脚本

public class SecKill_redisByScript {
	
	private static final  org.slf4j.Logger logger =LoggerFactory.getLogger(SecKill_redisByScript.class) ;

	public static void main(String[] args) {
		JedisPool jedispool =  JedisPoolUtil.getJedisPoolInstance();
 
		Jedis jedis=jedispool.getResource();
		System.out.println(jedis.ping());
		
		Set<HostAndPort> set=new HashSet<HostAndPort>();

	//	doSecKill("201","sk:0101");
	}
	
	static String secKillScript ="local userid=KEYS[1];\r\n" + 
			"local prodid=KEYS[2];\r\n" + 
			"local qtkey='sk:'..prodid..\":qt\";\r\n" + 
			"local usersKey='sk:'..prodid..\":usr\";\r\n" + 
			"local userExists=redis.call(\"sismember\",usersKey,userid);\r\n" + 
			"if tonumber(userExists)==1 then \r\n" + 
			"   return 2;\r\n" + 
			"end\r\n" + 
			"local num= redis.call(\"get\" ,qtkey);\r\n" + 
			"if tonumber(num)<=0 then \r\n" + 
			"   return 0;\r\n" + 
			"else \r\n" + 
			"   redis.call(\"decr\",qtkey);\r\n" + 
			"   redis.call(\"sadd\",usersKey,userid);\r\n" + 
			"end\r\n" + 
			"return 1" ;
			 
	static String secKillScript2 = 
			"local userExists=redis.call(\"sismember\",\"{sk}:0101:usr\",userid);\r\n" +
			" return 1";

	public static boolean doSecKill(String uid,String prodid) throws IOException {

		JedisPool jedispool =  JedisPoolUtil.getJedisPoolInstance();
		Jedis jedis=jedispool.getResource();

		 //String sha1=  .secKillScript;
		String sha1=  jedis.scriptLoad(secKillScript);
		Object result= jedis.evalsha(sha1, 2, uid,prodid);

		  String reString=String.valueOf(result);
		if ("0".equals( reString )  ) {
			System.err.println("已抢空!!");
		}else if("1".equals( reString )  )  {
			System.out.println("抢购成功!!!!");
		}else if("2".equals( reString )  )  {
			System.err.println("该用户已抢过!!");
		}else{
			System.err.println("抢购异常!!");
		}
		jedis.close();
		return true;
	}
}

数据持久化

RDB

简介

在指定的时间间隔内将内存中的数据集快照写入磁盘

Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到 一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。 整个过程中,主进程是不进行任何IO操作的,这就确保了极高的性能 如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失。

image-image-20230119175810938

  • 在redis.conf中配置文件名称,默认为dump.rdb
  • rdb文件的保存路径,也可以修改。默认为Redis启动时命令行所在的目录下dir "/myredis/"

快照

image-image-20230119180227814

  • save :save时只管保存,其它不管,全部阻塞。手动保存。不建议。

  • bgsave:Redis会在后台异步进行快照操作, 快照同时还可以响应客户端请求。

  • 可以通过lastsave 命令获取最后一次成功执行快照的时间

  • 执行flushall命令,也会产生dump.rdb文件,但里面是空的

save命令

格式: save 秒钟 写操作次数

  • RDB是整个内存的压缩过的Snapshot,RDB的数据结构,可以配置复合的快照触发条件,默认是1分钟内改了1万次,或5分钟内改了10次,或15分钟内改了1次。

  • 禁用 不设置save指令,或者给save传入空字符串

stop-writes-on-bgsave-error

  • 当Redis无法写入磁盘的话,直接关掉Redis的写操作。推荐yes.

rdbcompression 压缩文件

  • 对于存储到磁盘中的快照,可以设置是否进行压缩存储。如果是的话,redis会采用LZF算法进行压缩。

  • 如果你不想消耗CPU来进行压缩的话,可以设置为关闭此功能。推荐yes

rdbchecksum 检查完整性

  • 在存储快照后,还可以让redis使用CRC64算法来进行数据校验,

  • 但是这样做会增加大约10%的性能消耗,如果希望获取到最大的性能提升,可以关闭此功能推荐yes.

优劣

优势:

  • 适合大规模的数据恢复

  • 对数据完整性和一致性要求不高更适合使用

  • 节省磁盘空间

  • 恢复速度快

劣势:

  • Fork的时候,内存中的数据被克隆了一份,大致2倍的膨胀性需要考虑

  • 虽然Redis在fork时使用了写时拷贝技术,但是如果数据庞大时还是比较消耗性能。

  • 在备份周期在一定间隔时间做一次备份,所以如果Redis意外down掉的话,就会丢失最后一次快照后的所有修改。

停止

动态停止RDB:

redis-cli config set save ""#save后给空值,表示禁用保存策略

AOF

简介

Append Only File ,以日志的形式来记录每个写操作(增量保存),将Redis执行过的所有写指令记录下来(读操作不记录), 只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis 重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作

持久化流程

(1)客户端的请求写命令会被append追加到AOF缓冲区内;

(2)AOF缓冲区根据AOF持久化策略[always,everysec,no]将操作sync同步到磁盘的AOF文件中;

(3)AOF文件大小超过重写策略或手动重写时,会对AOF文件rewrite重写,压缩AOF文件容量;

(4)Redis服务重启时,会重新load加载AOF文件中的写操作达到数据恢复的目的;

AOF默认不开启

  • 可以在redis.conf中配置文件名称,默认为 appendonly.aof

  • AOF文件的保存路径,同RDB的路径一致。

AOF和RDB同时开启,redis听谁的?

AOF和RDB同时开启,系统默认取AOF的数据(数据不会存在丢失)

启动/修复/恢复

AOF的备份机制和性能虽然和RDB不同, 但是备份和恢复的操作同RDB一样,都是拷贝备份文件,需要恢复时再拷贝到Redis工作目录下,启动系统即加载。

  • 正常恢复

    • 修改默认的appendonly no,改为yes
    • 将有数据的aof文件复制一份保存到对应目录(查看目录:config get dir)
    • 恢复:重启redis然后重新加载
  • 异常恢复

    • 修改默认的appendonly no,改为yes
    • 如遇到AOF文件损坏,通过/usr/local/bin/redis-check-aof–fix appendonly.aof进行恢复
    • 备份被写坏的AOF文件
    • 恢复:重启redis,然后重新加载

同步频率

  • appendfsync always

始终同步,每次Redis的写入都会立刻记入日志;性能较差但数据完整性比较好

  • appendfsync everysec

每秒同步,每秒记入日志一次,如果宕机,本秒的数据可能丢失。

  • appendfsync no

redis不主动进行同步,把同步时机交给操作系统。

压缩

​ AOF采用文件追加方式,文件会越来越大为避免出现此种情况,新增了重写机制, 当AOF文件的大小超过所设定的阈值时,Redis就会启动AOF文件的内容压缩, 只保留可以恢复数据的最小指令集.可以使用命令bgrewriteaof

重写原理,如何实现重写

AOF文件持续增长而过大时,会fork出一条新进程来将文件重写(也是先写临时文件最后再rename),redis4.0版本后的重写,是指上就是把rdb 的快照,以二级制的形式附在新的aof头部,作为已有的历史数据,替换掉原来的流水账操作。

no-appendfsync-on-rewrite:

如果 no-appendfsync-on-rewrite=yes ,不写入aof文件只写入缓存,用户请求不会阻塞,但是在这段时间如果宕机会丢失这段时间的缓存数据。(降低数据安全性,提高性能)

如果 no-appendfsync-on-rewrite=no, 还是会把数据往磁盘里刷,但是遇到重写操作,可能会发生阻塞。(数据安全,但是性能降低)

触发机制,何时重写

Redis会记录上次重写时的AOF大小,默认配置是当AOF文件大小是上次rewrite后大小的一倍且文件大于64M时触发

重写虽然可以节约大量磁盘空间,减少恢复时间。但是每次重写还是有一定的负担的,因此设定Redis要满足一定条件才会进行重写。

  • auto-aof-rewrite-percentage:设置重写的基准值,文件达到100%时开始重写(文件是原来重写后文件的2倍时触发)

  • auto-aof-rewrite-min-size:设置重写的基准值,最小文件64MB。达到这个值开始重写。

image-image-20230119181510041

优劣

优势:

  • 备份机制更稳健,丢失数据概率更低。

  • 可读的日志文本,通过操作AOF稳健,可以处理误操作。

劣势:

  • 比起RDB占用更多的磁盘空间。

  • 恢复备份速度要慢。

  • 每次读写都同步的话,有一定的性能压力。

  • 存在个别Bug,造成恢复不能。

总结

如果对数据不敏感,可以选单独用RDB。

不建议单独用 AOF,因为可能会出现Bug。

如果只是做纯内存缓存,可以都不用。

主从复制

简介

  • 主机数据更新后根据配置和策略, 自动同步到备机的master/slaver机制,Master以写为主,Slave以读为主

  • 读写分离,性能扩展

  • 容灾快速恢复

image-image-20230119181816082

使用

  • 拷贝多个redis.conf文件include(写绝对路径)

  • 开启daemonize yes

  • Pid文件名字pidfile

  • 指定端口port

  • Log文件名字

  • dump.rdb名字dbfilename

  • Appendonly 关掉或者换名字

实例

  • 自定义文件夹
cd ~
mkdir redistest
cd redistest
  • 新建redis6379.conf,填写以下内容
include /root/redis/redis-6.0.8/redis.conf
pidfile /var/run/redis_6379.pid 
port 6379
dbfilename dump6379.rdb
  • 新建redis6380.conf,填写以下内容
include /root/redis/redis-6.0.8/redis.conf
pidfile /var/run/redis_6380.pid
port 6380
dbfilename dump6380.rdb
  • 新建redis6381.conf,填写以下内容
include /root/redis/redis-6.0.8/redis.conf
pidfile /var/run/redis_6381.pid
port 6381
dbfilename dump6381.rdb

slave-priority 10

设置从机的优先级,值越小,优先级越高,用于选举主机时使用。默认100

  • 启动三台服务器
redis-server redis6379.conf
redis-server redis6380.conf
redis-server redis6381.conf
  • 查看系统进程
ps-ef|grep redis
redis-cli -p 6379
redis-cli -p 6380
redis-cli -p 6381
  • info replication

打印主从复制的相关信息

  • 配置从库
slaveof  <ip><port>
#在6380和6381上执行: 
slaveof 127.0.0.1 6379

成为某个实例的从服务器

127.0.0.1:6379> info replication
# Replication
role:master

127.0.0.1:6380> info replication
# Replication
role:slave

127.0.0.1:6381> info replication
# Replication
role:slave
  • 在主机上写,在从机上可以读取数据 , 在从机上写数据报错
127.0.0.1:6379> set keytest "test"
OK
127.0.0.1:6379> get keytest
"test"

127.0.0.1:6380> set keytest "test"
(error) READONLY You can't write against a read only replica.
127.0.0.1:6380> get keytest
"test"

127.0.0.1:6381> set keytest "test"
(error) READONLY You can't write against a read only replica.
127.0.0.1:6381> get keytest
"test"
  • 主机挂掉,重启就行,一切如初
  • 从机重启需重设:slaveof 127.0.0.1 6379
  • 可以将配置增加到文件中。永久生效。

相关机制

薪火相传

​ 上一个Slave可以是下一个slave的Master,Slave同样可以接收其他 slaves的连接和同步请求,那么该slave作为了链条中下一个的master, 可以有效减轻master的写压力,去中心化降低风险。

slaveof <ip><port>

中途变更转向:会清除之前的数据,重新建立拷贝最新的

风险是一旦某个slave宕机,后面的slave都没法备份

主机挂了,从机还是从机,无法写数据了

反客为主

  • 当一个master宕机后,后面的slave可以立刻升为master,其后面的slave不用做任何修改。

  • 用 slaveof no one 将从机变为主机

复制原理

RDB

  • Slave启动成功连接到master后会发送一个sync命令

  • Master接到命令启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令, 在后台进程执行完毕之后,master将传送整个数据文件到slave,以完成一次完全同步

  • 全量复制:而slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。

  • 增量复制:Master继续将新的所有收集到的修改命令依次传给slave,完成同步

  • 但是只要是重新连接master,一次完全同步(全量复制)将被自动执行

哨兵机制

实验

反客为主的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库

  • 自定义的目录新建sentinel.conf
root@ubuntu:~/redistest# nano sentinel.conf

sentinel monitor mymaster 127.0.0.1 6379 1

其中mymaster为监控对象起的服务器名称, 1 为至少有多少个哨兵同意迁移的数量

  • 执行
root@ubuntu:~/redistest# redis-sentinel  /root/redistest/sentinel.conf

28760:X 19 Jan 2023 11:12:28.879 # Sentinel ID is 91f464c1f8c39d8078b50cf4fdb12f3447dca1d4
28760:X 19 Jan 2023 11:12:28.879 # +monitor master mymaster 127.0.0.1 6379 quorum 1
28760:X 19 Jan 2023 11:12:28.880 * +slave slave 127.0.0.1:6380 127.0.0.1 6380 @ mymaster 127.0.0.1 6379

测试

127.0.0.1:6379> shutdown
not connected>

127.0.0.1:6380> info replication
# Replication
role:master

127.0.0.1:6381> info replication
# Replication
role:slave

哨兵窗口现象

可观察到6380成为新的主机,根据优先级 slave-priority

28760:X 19 Jan 2023 11:12:28.884 * +slave slave 127.0.0.1:6381 127.0.0.1 6381 @ mymaster 127.0.0.1 6379
28760:X 19 Jan 2023 11:14:12.986 # +sdown master mymaster 127.0.0.1 6379
28760:X 19 Jan 2023 11:14:12.986 # +odown master mymaster 127.0.0.1 6379 #quorum 1/1
28760:X 19 Jan 2023 11:14:12.986 # +new-epoch 1
28760:X 19 Jan 2023 11:14:12.986 # +try-failover master mymaster 127.0.0.1 6379
28760:X 19 Jan 2023 11:14:12.991 # +vote-for-leader 91f464c1f8c39d8078b50cf4fdb12f3447dca1d4 1
28760:X 19 Jan 2023 11:14:12.991 # +elected-leader master mymaster 127.0.0.1 6379
28760:X 19 Jan 2023 11:14:12.991 # +failover-state-select-slave master mymaster 127.0.0.1 6379
28760:X 19 Jan 2023 11:14:13.049 # +selected-slave slave 127.0.0.1:6380 127.0.0.1 6380 @ mymaster 127.0.0.1 6379
28760:X 19 Jan 2023 11:14:13.049 * +failover-state-send-slaveof-noone slave 127.0.0.1:6380 127.0.0.1 6380 @ mymaster 127.0.0.1 6379
28760:X 19 Jan 2023 11:14:13.140 * +failover-state-wait-promotion slave 127.0.0.1:6380 127.0.0.1 6380 @ mymaster 127.0.0.1 6379
28760:X 19 Jan 2023 11:14:14.118 # +promoted-slave slave 127.0.0.1:6380 127.0.0.1 6380 @ mymaster 127.0.0.1 6379
28760:X 19 Jan 2023 11:14:14.118 # +failover-state-reconf-slaves master mymaster 127.0.0.1 6379
28760:X 19 Jan 2023 11:14:14.202 * +slave-reconf-sent slave 127.0.0.1:6381 127.0.0.1 6381 @ mymaster 127.0.0.1 6379
28760:X 19 Jan 2023 11:14:15.141 * +slave-reconf-inprog slave 127.0.0.1:6381 127.0.0.1 6381 @ mymaster 127.0.0.1 6379
28760:X 19 Jan 2023 11:14:15.141 * +slave-reconf-done slave 127.0.0.1:6381 127.0.0.1 6381 @ mymaster 127.0.0.1 6379
28760:X 19 Jan 2023 11:14:15.203 # +failover-end master mymaster 127.0.0.1 6379
28760:X 19 Jan 2023 11:14:15.204 # +switch-master mymaster 127.0.0.1 6379 127.0.0.1 6380
28760:X 19 Jan 2023 11:14:15.204 * +slave slave 127.0.0.1:6381 127.0.0.1 6381 @ mymaster 127.0.0.1 6380
28760:X 19 Jan 2023 11:14:15.204 * +slave slave 127.0.0.1:6379 127.0.0.1 6379 @ mymaster 127.0.0.1 6380
28760:X 19 Jan 2023 11:14:45.242 # +sdown slave 127.0.0.1:6379 127.0.0.1 6379 @ mymaster 127.0.0.1 6380

重启6379主机,发现成为从机

root@ubuntu:~/redistest# redis-server redis6379.conf
root@ubuntu:~/redistest# redis-cli -p 6379
127.0.0.1:6379> info replication
# Replication
role:slave

image-image-20230119192055463

  • 优先级在redis.conf中默认:slave-priority 100,值越小优先级越高

  • 偏移量是指获得原主机数据最全的

  • 每个redis实例启动后都会随机生成一个40位的runid

复制延时

​ 由于所有的写操作都是先在Master上操作,然后同步更新到Slave上,所以从Master同步到Slave机器有一定的延迟,当系统很繁忙的时候,延迟问题会更加严重,Slave机器数量的增加也会使这个问题更加严重。

Jedis开发

private static JedisSentinelPool jedisSentinelPool=null;
public static  Jedis getJedisFromSentinel(){
    if(jedisSentinelPool==null){
        Set<String> sentinelSet=new HashSet<>();
        sentinelSet.add("192.168.11.103:26379");
        JedisPoolConfig jedisPoolConfig =new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(10); //最大可用连接数
        jedisPoolConfig.setMaxIdle(5); //最大闲置连接数
        jedisPoolConfig.setMinIdle(5); //最小闲置连接数
        jedisPoolConfig.setBlockWhenExhausted(true); //连接耗尽是否等待
        jedisPoolConfig.setMaxWaitMillis(2000); //等待时间
        jedisPoolConfig.setTestOnBorrow(true); //取连接的时候进行一下测试 ping pong
    	jedisSentinelPool=new JedisSentinelPool("mymaster",sentinelSet,jedisPoolConfig);
    	return jedisSentinelPool.getResource();
    }else{
        return jedisSentinelPool.getResource();
    }
}

集群

简介

redis3.0中提供了无中心化集群配置

Redis 集群实现了对Redis的水平扩容,即启动N个redis节点,将整个数据库分布存储在这N个节点中,每个节点存储总数据的1/N。

Redis 集群通过分区(partition)来提供一定程度的可用性(availability): 即使集群中有一部分节点失效或者无法进行通讯, 集群也可以继续处理命令请求。

配置

  • 删除持久化数据

将rdb,aof文件都删除掉

  • 制作6个实例,6379,6380,6381,6389,6390,6391
  • 配置节点信息
开启daemonize yes
Pid文件名字
指定端口
Log文件名字
Dump.rdb名字
Appendonly 关掉或者换名字
  • redis cluster配置修改,配置六个实例

cluster-enabled yes 打开集群模式

cluster-config-file nodes-6379.conf 设定节点配置文件名

cluster-node-timeout 15000 设定节点失联时间,超过该时间(毫秒),集群自动进行主从切换。

include /home/bigdata/redis.conf
port 6379
pidfile "/var/run/redis_6379.pid"
dbfilename "dump6379.rdb"
dir "/home/bigdata/redis_cluster"
logfile "/home/bigdata/redis_cluster/redis_err_6379.log"
cluster-enabled yes
cluster-config-file nodes-6379.conf
cluster-node-timeout 15000
  • 启动6个redis服务

image-image-20230119193333951

ll命令 组合之前,请确保所有redis实例启动后,nodes-xxxx.conf文件都生成正常

  • 进入redis src目录
redis-cli --cluster create --cluster-replicas 1 192.168.11.101:6379 192.168.11.101:6380 192.168.11.101:6381 192.168.11.101:6389 192.168.11.101:6390 192.168.11.101:6391

此处不要用127.0.0.1, 请用真实IP地址

–replicas 1 采用最简单的方式配置集群,一台主机,一台从机,正好三组。

登录

#普通方式登录
redis-cli -p 6379

​ 在redis-cli每次录入、查询键值,redis都会计算出该key应该送往的插槽,如果不是该客户端对应服务器的插槽,redis会报错,并告知应前往的redis实例地址和端口所以,应该以集群方式登录 , 再录入、查询键值对可以自动重定向

#-c 采用集群策略连接,设置数据会自动切换到相应的写主机
redis-cli -c -p 6379

cluster nodes

通过 cluster nodes 命令查看集群信息

  • 一个集群至少要有三个主节点。

  • 选项 --cluster-replicas 1 表示我们希望为集群中的每个主节点创建一个从节点。

  • 分配原则尽量保证每个主数据库运行在不同的IP地址,每个从库和主库不在一个IP地址上。

slots

  • 一个 Redis 集群包含 16384 个插槽(hash slot), 数据库中的每个键都属于这 16384 个插槽的其中一个,

  • 集群使用公式 CRC16(key) % 16384 来计算键 key 属于哪个槽, 其中 CRC16(key) 语句用于计算键 key 的 CRC16 校验和 。

  • 集群中的每个节点负责处理一部分插槽。

举个例子, 如果一个集群可以有主节点, 其中:

节点 A 负责处理 0 号至 5460 号插槽。

节点 B 负责处理 5461 号至 10922 号插槽。

节点 C 负责处理 10923 号至 16383 号插槽。

录入值

  • 不在一个slot下的键值,是不能使用mget,mset等多键操作。

  • 可以通过{}来定义组的概念,从而使key中{}内相同内容的键值对放到一个slot中去。

image-image-20230119194146461

查询值

CLUSTER GETKEYSINSLOT <slot><count> #返回 count 个 slot 槽中的键

image-image-20230119194332569

故障恢复

  • 如果主节点下线, 从节点能否自动升为主节点?

15秒超时

  • 主节点恢复后,主从关系会如何?

主节点回来变成从机

  • 如果所有某一段插槽的主从节点都宕掉,redis服务是否还能继续?

如果某一段插槽的主从都挂掉,而cluster-require-full-coverage 为yes ,那么 ,整个集群都挂掉

如果某一段插槽的主从都挂掉,而cluster-require-full-coverage 为no ,那么,该插槽数据全都不能使用,也无法存储。

redis.conf中的参数 cluster-require-full-coverage

Jedis开发

  • 即使连接的不是主机,集群会自动切换主机存储。主机写,从机读。

  • 无中心化主从集群。无论从哪台主机写的数据,其他主机上都能读到数据。

public class JedisClusterTest {
  public static void main(String[] args) { 
     Set<HostAndPort>set =new HashSet<HostAndPort>();
     set.add(new HostAndPort("192.168.31.211",6379));
     JedisCluster jedisCluster=new JedisCluster(set);
     jedisCluster.set("k1", "v1");
     System.out.println(jedisCluster.get("k1"));
  }
}

优劣

好处:

实现扩容

分摊压力

无中心配置相对简单

坏处:

多键操作是不被支持的

多键的Redis事务是不被支持的。lua脚本不被支持

由于集群方案出现较晚,很多公司已经采用了其他的集群方案,而代理或者客户端分片的方案想要迁移至redis cluster,需要整体迁移而不是逐步过渡,复杂度较大。

应用问题

缓存穿透

问题描述

​ key对应的数据在数据源并不存在,每次针对此key的请求从缓存获取不到,请求都会压到数据源,从而可能压垮数据源。比如用一个不存在的用户id获取用户信息,不论缓存还是数据库都没有,若黑客利用此漏洞进行攻击可能压垮数据库。

image-image-20230119195010575

解决方案

​ 一个一定不存在缓存及查询不到的数据,由于缓存是不命中时被动写的,并且出于容错考虑,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。

(1) **对空值缓存:**如果一个查询返回的数据为空(不管是数据是否不存在),我们仍然把这个空结果(null)进行缓存,设置空结果的过期时间会很短,最长不超过五分钟

(2) 设置可访问的名单(白名单):

使用bitmaps类型定义一个可以访问的名单,名单id作为bitmaps的偏移量,每次访问和bitmap里面的id进行比较,如果访问id不在bitmaps里面,进行拦截,不允许访问。

(3) 采用布隆过滤器:(布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量(位图)和一系列随机映射函数(哈希函数)。

布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。将所有可能存在的数据哈希到一个足够大的bitmaps中,一个一定不存在的数据会被 这个bitmaps拦截掉,从而避免了对底层存储系统的查询压力。

(4) **进行实时监控:**当发现Redis的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务

缓存击穿

问题描述

​ key对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。

image-image-20230119195408011

解决方案

key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:缓存被“击穿”的问题。

(1) 预先设置热门数据:在redis高峰访问之前,把一些热门数据提前存入到redis里面,加大这些热门数据key的时长

(2) 实时调整:现场监控哪些数据热门,实时调整key的过期时长

(3) 使用锁:

  • 就是在缓存失效的时候(判断拿出来的值为空),不是立即去load db。

  • 先使用缓存工具的某些带成功操作返回值的操作(比如Redis的SETNX)去set一个mutex key

  • 当操作返回成功时,再进行load db的操作,并回设缓存,最后删除mutex key;

  • 当操作返回失败,证明有线程在load db,当前线程睡眠一段时间再重试整个get缓存的方法。

image-image-20230119195704614

缓存雪崩

问题描述

​ key对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。

缓存雪崩与缓存击穿的区别在于这里针对很多key缓存,前者则是某一个key

image-image-20230119195810264

缓存失效瞬间

image-image-20230119195845743

解决方案

(1) **构建多级缓存架构:**nginx缓存 + redis缓存 +其他缓存(ehcache等)

(2) 使用锁或队列:

用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适用高并发情况

(3) 设置过期标志更新缓存:

记录缓存数据是否过期(设置提前量),如果过期会触发通知另外的线程在后台去更新实际key的缓存。

(4) 将缓存失效时间分散开:

比如我们可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。

分布式锁

概述

​ 随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

分布式锁主流的实现方案:

  • 基于数据库实现分布式锁

  • 基于缓存(Redis等)

  • 基于Zookeeper

每一种分布式锁解决方案都有各自的优缺点:

  • 性能:redis最高

  • 可靠性:zookeeper最高

实现

redis:命令

set sku:1:info “OK” NX PX 10000

EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value 。

PX millisecond :设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value 。

NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value 。

XX :只在键已经存在时,才对键进行设置操作。

image-image-20230119200435706

  • 多个客户端同时获取锁(setnx)

  • 获取成功,执行业务逻辑{从db获取数据,放入缓存},执行完成释放锁(del)

  • 其他客户端等待重试

#Redis: 
set num 0
@GetMapping("testLock")
public void testLock(){
    //1获取锁,setne
    Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");
    //2获取锁成功、查询num的值
    if(lock){
        Object value = redisTemplate.opsForValue().get("num");
        //2.1判断num为空return
        if(StringUtils.isEmpty(value)){
            return;
        }
        //2.2有值就转成成int
        int num = Integer.parseInt(value+"");
        //2.3把redis的num加1
        redisTemplate.opsForValue().set("num", ++num);
        //2.4释放锁,del
        redisTemplate.delete("lock");

    }else{
        //3获取锁失败、每隔0.1秒再获取
        try {
            Thread.sleep(100);
            testLock();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

问题:setnx刚好获取到锁,业务逻辑出现异常,导致锁无法释放

解决:设置过期时间,自动释放锁。

设置过期时间

设置过期时间有两种方式:

  • 首先想到通过expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放)

  • 在set时指定过期时间(推荐)

image-image-20230119200640212

image-image-20230119200648145

问题:可能会释放其他服务器的锁。

解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁

image-image-20230119200748378

image-image-20230119200743696

问题:删除操作缺乏原子性。

脚本保证删除的原子性

@GetMapping("testLockLua")
public void testLockLua() {
    //1 声明一个uuid ,将做为一个value 放入我们的key所对应的值中
    String uuid = UUID.randomUUID().toString();
    //2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除!
    String skuId = "25"; // 访问skuId 为25号的商品 100008348542
    String locKey = "lock:" + skuId; // 锁住的是每个商品的数据
// 3 获取锁
    Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3, TimeUnit.SECONDS);

    // 第一种: lock 与过期时间中间不写任何的代码。
    // redisTemplate.expire("lock",10, TimeUnit.SECONDS);//设置过期时间
    // 如果true
    if (lock) {
        // 执行的业务逻辑开始
        // 获取缓存中的num 数据
        Object value = redisTemplate.opsForValue().get("num");
        // 如果是空直接返回
        if (StringUtils.isEmpty(value)) {
            return;
        }
        // 不是空 如果说在这出现了异常! 那么delete 就删除失败! 也就是说锁永远存在!
        int num = Integer.parseInt(value + "");
        // 使num 每次+1 放入缓存
        redisTemplate.opsForValue().set("num", String.valueOf(++num));
        /*使用lua脚本来锁*/
        // 定义lua 脚本
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        // 使用redis执行lua执行
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptText(script);
        // 设置一下返回值类型 为Long
        // 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
        // 那么返回字符串与0 会有发生错误。
        redisScript.setResultType(Long.class);
        // 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
        redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);
    } else {
        // 其他线程等待
        try {
            // 睡眠
            Thread.sleep(1000);
            // 睡醒了之后,调用方法。
            testLockLua();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Redis 6.0

ACL

Redis ACL是Access Control List(访问控制列表)的缩写,该功能允许根据可以执行的命令和可以访问的键来限制某些连接。

在Redis 5版本之前,Redis 安全规则只有密码控制 还有通过rename 来调整高危命令比如 flushdb , KEYS* , shutdown 等。Redis 6 则提供ACL的功能对用户进行更细粒度的权限控制 :

(1)接入权限:用户名和密码

(2)可以执行的命令

(3)可以操作的 KEY

参考官网

IO多线程

Redis6终于支撑多线程了,告别单线程了吗?

IO多线程其实指客户端交互部分网络IO交互处理模块多线程,而非执行命令多线程。Redis6执行命令依然是单线程。

另外,多线程IO默认也是不开启的,需要再配置文件中配置

io-threads-do-reads yes 

io-threads 4