【Redis】笔记(尚硅谷、黑马整合)
阅读原文时间:2021年04月20日阅读:1

`笔记内容包括两个视频的笔记:

Redis—尚硅谷java研究院

(推荐)Redis入门到精通【黑马程序员】https://www.bilibili.com/video/BV1CJ411m7Gc

第1章 NoSQL 简介

REmote Dictionary Server:是一种用C语言开发的开源的高性能键值对数据库。

1.1 技术的分类

  1. 解决功能性的问题

java、Servlet、Jsp、Tomcat、RDBMS、JDBC、Linux、Svn 等

  1. 解决扩展性的问题

Spring、 SpringMVC、SpringBoot、Hibernate、MyBatis等

  1. 解决性能的问题

NoSQL、java多线程、Nginx、MQ、ElasticSearch、Hadoop等

1.2 WEB1.0 及WEB2.0

  1. Web1.0的时代,数据访问量很有限,用一夫当关的高性能的单节点服务器可以解决大部分问题.

  1. Web2.0时代的到来,用户访问量大幅度提升,同时产生了大量的用户数据,加上后来的智能移动设备的普及,所有的互联网平台都面临了巨大的性能挑战.

1.3 解决服务器CPU内存压力

思考: Session共享问题如何解决?

  • 方案一、存在Cookie中

此种方案需要将Session数据以Cookie的形式存在客户端,不安全,网络负担效率低

  • 方案二、存在文件服务器或者是数据库里

此种方案会导致大量的IO操作,效率低.

  • 方案三、Session复制

此种方案会导致每个服务器之间必须将Session广播到集群内的每个节点,Session数据会冗余,节点越多浪费越大,存在广播风暴问题.

  • 方案四、存在Redis中

目前来看,此种方案是最好的。将Session数据存在内存中,每台服务器都从内存中读取数据,速度快,结构还相对简单.

1.4 解决IO压力

将活跃的数据缓存到Redis中,客户端的请求先打到缓存中来获取对应的数据,如果能获取到,直接返回,不需要从MySQL中读取。如果缓存中没有,再从MySQL数据库中读取数据,将读取的数据返回并存一份到Redis中,方便下次读取.

扩展: 对于持久化的数据库来说,单个库单个表存在性能瓶颈,因此会通过水平切分、垂直切分、读取分离等技术提升性能,此种解决方案会破坏一定的业务逻辑,但是可以换取更高的性能.

1.5 NoSQL数据库概述

  1. NoSQL(NoSQL = Not Only SQL ),意即==“不仅仅是SQL”,泛指非关系型的数据库。==
  • NoSQL 不依赖业务逻辑方式存储,而以简单的key-value模式存储。因此大大的增加了数据库的扩展能力。
  1. NoSQL的特点
  • 不遵循SQL标准
  • 不支持ACID(原子性,一致性,持久性,隔离性)
  • 远超于SQL的性能。
  1. NoSQL的适用场景
  • 对数据高并发的读写

  • 海量数据的读写

  • 对数据高可扩展性的

    目前 NoSQL 不能完全替代关系型数据库.使用关系型数据库结合 NoSQl 数据库进行完成项目
    2.3.1 当数据比较复杂时不适用于 NoSQL 数据库
    2.3.2 关系型数据库依然做为数据存储的主要软件.
    2.3.3 NoSQL 数据库当作缓存工具来使用.
    2.3.3.1 把某些使用频率较高的内容不仅仅存储到关系型数据库中还存储到 NoSQL 数据中
    2.3.3.2 考虑到: NoSQL 和关系型数据库数据同步的问题

    Redis 持久化策略
    3.1 rdb持久化策略
    3.1.1 默认的持久化策略.
    3.1.2 每隔一定时间后把内存中数据持久化到 dump.rdb 文件中

    3.1.3 缺点:

    3.1.3.1 数据过于集中.(都存储到了一个文件中)
    3.1.3.2 可能导致最后的数据没有持久化到 dump.rdb 中(因为是每隔一段时间)
    3.1.3.2.1 解决办法:使用命令:SAVE 或BGSAVE 手动持久
    化.
    3.2 aof持久化策略
    3.2.1 监听 Redis 的日志文件,监听如果发现执行了修改,删除,
    新增命令.立即根据这条命令把数据持久化.
    3.2.2 缺点:
    3.2.2.1 效率降低

  1. NoSQL的不适用场景
  • 需要事务支持
  • 基于sql的结构化查询存储,处理复杂的关系,需要即席查询。
  1. 建议: 用不着sql的和用了sql也不行的情况,请考虑用NoSql

1.6 常用的缓存数据库

  1. Memcached

  1. Redis

  1. mongoDB

  1. 列式数据库
  • 先看行式数据库

思考: 如下两条SQL的快慢

​ select * from users where id =3(快)

​ select avg(age) from users(慢)需要先查行年龄,再平均

  • 再看列式数据库

列式数据库对查平均快

  1. HBase

  1. Cassandra

  1. Neo4j

1.7 数据库排名

http://db-engines.com/en/ranking

第2章 Redis简介 及 安装

2.1 Redis是什么

简单来说 redis 就是一个数据库,不过与传统数据库不同的是 redis 的数据是存在内存中的,所以读写速度非常快,因此 redis 被广泛应用于缓存方向。另外,redis 也经常用来做分布式锁。redis 提供了多种数据类型来支持不同的业务场景。除此之外,redis 支持事务 、持久化、LUA脚本、LRU驱动事件、多种集群方案。

Redis是一个开源的key-value存储系统。

是完全开源免费的,用c语言编写的,是一个单线程,高性能的(key/value)内存数据库,基于内存运行并支持持久化的nosql数据库

和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和hash(哈希类型)。这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,Redis支持各种不同方式的排序。

与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是Redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。

2.2 Redis的应用场景

  1. 配合关系型数据库做高速缓存
  • 高频次,热门访问的数据,降低数据库IO
  1. 由于其拥有持久化能力,利用其多样的数据结构存储特定的数据
  • 最新N个数据→通过List实现按自然事件排序的数据
  • 排行榜,TopN→利用zset(有序集合)
  • 时效性的数据,比如手机验证码→Expire过期
  • 计数器,秒杀→原子性,自增方法INCR、DECR
  • 去除大量数据中的重复数据→利用set集合
  • 构建队列→利用list集合
  • 发布订阅消息系统→pub/sub模式

2.3 Redis官网

  1. Redis官方网站 http://Redis.io

  2. Redis中文官方网站 http://www.Redis.net.cn

手册网址: http://doc.redisfans.com/

2.4 关于Redis版本

  1. 3.2.5 for Linux

  2. 不用考虑在Windows环境下对Redis的支持

Redis官方没有提供对Windows环境的支持,是微软的开源小组开发了对Redis对Windows的支持.

2.5 安装步骤

1)    下载获得redis-3.2.5.tar.gz后将它放入我们的Linux目录/opt
wget http://......tar.gz
2)    解压命令:`tar -zxvf redis-3.2.5.tar.gz`

3)    解压完成后进入目录:`cd redis-3.2.5`

4)    在redis-3.2.5目录下执行make命令
编译:make
cd src
安装:make install 
或make PREFIX=/usr/local/redis install
# PREFIX= 这个关键字的作用是编译的时候用于指定程序存放的路径。比如我们现在就是指定了redis必须存放在/usr/local/redis目录。假设不添加该关键字Linux会将可执行文件存放在/usr/local/bin目录,

5)执行
默认生成的可执行文件在/usr/local/bin中
/usr/local/redis/bin/redis-server & ./redis.conf

创建软链接
ln -s 原始目录名 快速访问目录名
  1. 安装gcc与g++

运行Make命令时出现错误,提示 gcc:命令未找到 ,原因是因为当前Linux环境中并没有安装gcc 与 g++ 的环境

  • 能上网的情况:

yum install gcc

yum install gcc-c++

  1. 重新进入到Redis的目录中执行 make distclean后再执行make 命令.

    Hint: It's a good idea to run 'make test' ;)

  2. 执行完make后,可跳过Redis test步骤,直接执行 make install

    INSTALL install
    INSTALL install
    INSTALL install
    INSTALL install
    INSTALL install
    make[1]: Leaving directory '/home/ftp/redis/redis-3.0.4/src'

2.6 查看默认安装目录 /usr/local/bin

放到这个目录下之后,可以在任何目录下访问。

  1. Redis-benchmark:性能测试工具,可以在自己本子运行,看看自己本子性能如何(服务启动起来后执行)

  2. Redis-check-aof:修复有问题的AOF文件,rdb和aof后面讲

  3. Redis-check-dump:修复有问题的dump.rdb文件

  4. Redis-sentinel:Redis集群使用

  5. Redis-server:前台启动Redis

                _._                                                  
           _.-``__ ''-._                                             
      _.-``    `.  `_.  ''-._           Redis 3.0.4 (00000000/0) 64 bit

    .-.-```. ```\/ _.,_ ''-._ ( ' , .-` | `, ) Running in standalone mode |`-._`-...-` __...-.-.|'_.-'| Port: 6379 |-. ._ / _.-' | PID: 26926 -._ -._-./ .-' .-'
    |-._-._ -.__.-' _.-'_.-'| |-.-._ _.-'_.-' | http://redis.io -. -._-..-'.-' .-' |-._-._ -.__.-' _.-'_.-'| |-.-._ _.-'_.-' | -. -._-..-'.-' .-'
    -._-..-' _.-' -._ _.-' -..-'

这是前台启动,我们要把他设置为后台启动。

  1. redis-cli:客户端,操作入口

2.7 Redis的启动

  1. 默认前台方式启动
  • 直接执行redis-server 即可启动后不能操作当前命令窗口
  1. 推荐后台方式启动
  • 拷贝一份redis.conf配置文件到其他目录,例如根目录下的myredis目录 /myredis

  • 修改redis.conf文件中的一项配置 daemonize 将no 改为yes,代表后台启动

  • 设置自己的端口号

  • 执行配置文件进行启动 执行 redis-server /myredis/redis.conf

  • ps -ef | grep 进程名

  • kill -s 9 进程号

    ouc-13 26926 22709 0 15:30 pts/4 00:00:00 redis-server *:6379
    ouc-13 29372 28429 0 15:49 pts/3 00:00:00 grep --color=auto redis
    端口号是6379

    daemonize yes
    #以守护进程方式启动,使用本启动方式, redis将以服务的形式存在,日志将不再打印到命令窗口中
    port 6379
    #设定当前服务启动端口号
    dir "/自定义目录/redis/data"
    #设定当前服务文件保存位置,包含日志文件、持久化文件(后面详细讲解)等
    logfile "6***.log“
    #设定日志文件名,便于查阅

启动多个redis

复制conf文件

改port

改pidfile

改logfile

改dir,每个不一样

集群配置:

cluster-enabled yes打开注释

cluster-config-file nodes-6379.conf打开注释并修改(可不改)

当你安装完成之后,你可以先执行 redis-server 让 Redis 启动起来,然后运行命令 redis-benchmark -n 100000 -q 来检测本地同时执行 10 万个请求时的性能:

redis-cli -h localhost -p 6379

2.9 关闭Redis服务

  1. 单实例关闭
  • 如果还未通过客户端访问,可直接 redis-cli shutdown
  • 如果已经进入客户端,直接 shutdown即可.
  1. 多实例关闭

l 指定端口关闭 redis-cli -p 端口号 shutdown

如何启动多个redis:

#默认配置启动
redis-server
redis-server –-port 6379
redis-server –-port 6380 ……
# 指定配置文件启动
redis-server redis.conf
redis-server redis-6379.conf
redis-server redis-6380.conf ……
redis-server conf/redis-6379.conf
redis-server config/redis-6380.conf ……
#默认连接
redis-cli
#连接指定服务器
redis-cli -h 127.0.0.1
redis-cli –port 6379
redis-cli -h 127.0.0.1 –port 6379

2.10 Redis 默认16个库

  1. Redis默认创建16个库,每个库对应一个下标,从0开始。通过客户端连接后默认进入到0 号库,推荐只使用0号库.

  2. 使用命令 select 库的下标 来切换数据库,例如 select 8

统一的密码管理:所有库都是同样密码,要么都OK,要么一个都连不上

2.11 Redis的单线程+多路IO复用技术

1.1流的概念

一个流可以文件、socket、pipe等可以进行IO操作的内核对象。不管是文件,还是套接字,还是管道,我们都可以把他们看作流。

中读取数据或者写入数据到流中,可能存在这样的情况:①读取数据时,流中还没有数据;②写入数据时,流中数据已经满了,没有空间写入了。典型的例子为客户端要从socket流中读入数据,但是服务器还没有把数据准备好。此时有两种处理办法:

  • 阻塞,等待数据准备好了,再读取出来返回;
  • 非阻塞,通过轮询的方式,查询是否有数据可以读取,直到把数据读取返回。

接下来再来了解以下I/O同步、异步、阻塞、非阻塞的概念。

  • 1.redis是基于内存的,内存的读写速度非常快(纯内存)。
  • 2.redis是单线程的,省去了很多上下文切换线程的时间(避免线程切换和竞态消耗)。
  • 3.redis使用多路复用技术,可以处理并发的连接(非阻塞IO即NIO)。

非阻塞IO 内部实现采用epoll,采用了epoll+自己实现的简单的事件框架。epoll中的读、写、关闭、连接都转化成了事件,然后利用epoll的多路复用特性,绝不在io上浪费一点时间。

  1. 多路复用是指使用一个线程来检查多个文件描述符(Socket)的就绪状态,比如调用select和poll函数,传入多个文件描述符,如果有一个文件描述符就绪,则返回,否则阻塞直到超时。得到就绪状态后进行真正的操作可以在同一个线程里执行,也可以启动线程执行(比如使用线程池)。

  2. Memcached 是 多线程 + 锁 ;Redis 是 单线程 + 多路IO复用.

1.阻塞IO, 给女神发一条短信, 说我来找你了, 然后就默默的一直等着女神下楼, 这个期间除了等待你不会做其他事情, 属于备胎做法.

2 非阻塞IO, 给女神发短信, 如果不回, 接着再发, 一直发到女神下楼, 这个期间你除了发短信等待不会做其他事情, 属于专一做法.

3 IO多路复用, 是找一个宿管大妈来帮你监视下楼的女生, 这个期间你可以些其他的事情. 例如可以顺便看看其他妹子,玩玩王者荣耀, 上个厕所等等. IO复用又包括 select, poll, epoll 模式. 那么它们的区别是什么?
3.1 select大妈 每一个女生下楼, select大妈都不知道这个是不是你的女神, 她需要一个一个询问, 并且select大妈能力还有限, 最多一次帮你监视1024个妹子。对应的编程模型就是:一个连接来了,就必须遍历所有已经注册的文件描述符,来找到那个需要处理信息的文件描述符,如果已经注册了几万个文件描述符,那会因为遍历这些已经注册的文件描述符,导致cpu爆炸。

3.2 poll大妈不限制盯着女生的数量, 只要是经过宿舍楼门口的女生, 都会帮你去问是不是你女神
3.3 epoll大妈不限制盯着女生的数量, 并且也不需要一个一个去问. 那么如何做呢? epoll大妈会为每个==进宿舍楼的女生脸上贴上一个大字条,==上面写上女生自己的名字, 只要女生下楼了, epoll大妈就知道这个是不是你女神了, 然后大妈再通知你.

https://www.zhihu.com/question/28594409

上面这些同步IO有一个共同点就是, 当女神走出宿舍门口的时候, 你已经站在宿舍门口等着女神的, 此时你属于阻塞状态

一个epoll场景:一个酒吧服务员(一个线程),前面趴了一群醉汉,突然一个吼一声“倒酒”(事件),你小跑过去给他倒一杯,然后随他去吧,突然又一个要倒酒,你又过去倒上,就这样一个服务员服务好多人,有时没人喝酒,服务员处于空闲状态,可以干点别的玩玩手机。至于epoll与select,poll的区别在于后两者的场景中醉汉不说话,你要挨个问要不要酒,没时间玩手机了。io多路复用大概就是指这几个醉汉共用一个服务员。

https://blog.csdn.net/happy_wu/article/details/80052617

接下来是异步IO的情况
你告诉女神我来了, 然后你就去王者荣耀了, 一直到女神下楼了, 发现找不见你了, 女神再给你打电话通知你, 说我下楼了, 你在哪呢? 这时候你才来到宿舍门口. 此时属于逆袭做法

为什么 Redis 中要使用 I/O 多路复用这种技术呢?

首先,Redis 是跑在单线程中的,所有的操作都是按照顺序线性执行的,但是由于读写操作等待用户输入或输出都是阻塞的,所以 I/O 操作在一般情况下往往不能直接返回,这会导致某一文件的 I/O 阻塞导致整个进程无法对其它客户提供服务,而 I/O 多路复用就是为了解决这个问题而出现的。

要弄清问题先要知道问题的出现原因

由于进程的执行过程是线性的(也就是顺序执行),当我们调用低速系统I/O(read,write,accept等等),进程可能阻塞,此时进程就阻塞在这个调用上,不能执行其他操作。阻塞很正常, 接下来考虑这么一个问题:一个服务器进程和一个客户端进程通信,服务器端read(sockfd1,bud,bufsize),此时客户端进程没有发送数据,那么read(阻塞调用)将阻塞直到客户端write(sockfd,but,size)发来数据。在一个客户和服务器通信时这没什么问题,当多个客户与服务器通信时,若服务器阻塞于其中一个客户sockfd1,当另一个客户的数据到达套接字sockfd2时,服务器仍不能处理,仍然阻塞在read(sockfd1,…)上。此时问题就出现了,不能及时处理另一个客户的服务,肿么办?I/O多路复用来解决!

继续上面的问题,有多个客户连接,sockfd1、sockfd2、sockfd3…sockfdn同时监听这n个客户,当其中有一个发来消息时就从select的阻塞中返回,然后就调用read读取收到消息的sockfd,然后又循环回select阻塞;这样就不会因为阻塞在其中一个上而不能处理另一个客户的消息。

Q:
那这样子,在读取socket1的数据时,如果其它socket有数据来,那么也要等到socket1读取完了才能继续读取其它socket的数据吧。那不是也阻塞住了吗?而且读取到的数据也要开启线程处理吧,那这和多线程I/O有什么区别呢?

A:
1.CPU本来就是线性的,不论什么都需要顺序处理,并行只能是多核CPU。

2.I/O多路复用本来就是用来解决对多个I/O监听时,一个I/O阻塞影响其他I/O的问题,跟多线程没关系。

3.跟多线程相比较,线程切换需要切换到内核进行线程切换,需要消耗时间和资源。而I/O多路复用不需要切换线/进程,效率相对较高,特别是对高并发的应用nginx就是用I/O多路复用,故而性能极佳。但多线程编程逻辑和处理上比I/O多路复用简单,而I/O多路复用处理起来较为复杂。

理解IO多路复用

什么是I/O 多路复用

关于I/O多路复用(又被称为“事件驱动”),首先要理解的是,操作系统为你提供了一个功能,当你的某个socket可读或者可写的时候,它可以给你一个通知。这样当配合非阻塞的socket使用时,只有当系统通知我哪个描述符可读了,我才去执行read操作,可以保证每次read都能读到有效数据而不做纯返回-1和EAGAIN的无用功。写操作类似。操作系统的这个功能通过select/poll/epoll/kqueue之类的系统调用函数来使用,这些函数都可以同时监视多个描述符的读写就绪状况,这样,多个描述符的I/O操作都能在一个线程内并发交替地顺序完成,这就叫I/O多路复用,这里的“复用”指的是复用同一个线程。

I/O 多路复用其实是在单个线程中通过记录跟踪每一个sock(I/O流) 的状态来管理多个I/O流。结合下图可以清晰地理解I/O多路复用。

select, poll, epoll 都是I/O多路复用的具体的实现。epoll性能比其他几者要好。redis中的I/O多路复用的所有功能通过包装常见的select、epoll、evport和kqueue这些I/O多路复用函数库来实现的。

Reactor(反应器模式)

IO多路复用模型是建立在内核提供的多路分离函数select基础之上的,使用select函数可以避免同步非阻塞IO模型中轮询等待的问题。

  • 用户线程监视一个socket,socket把监视信息注册到内核缓存recvBuf,等待数据到达,(用户线程不阻塞)
  • 数据到达之后,内核发送socket刻度信息,用户线程发送read请求去从recvBuf中读数据
  • read完成

多路分离函数select

如上图所示,用户线程发起请求的时候,首先会将需要进行IO操作的socket添加到select中,这时阻塞等待select函数返回。当数据到达时,select被激活,select函数返回,此时用户线程才正式发起read请求,读取数据并继续执行。

从流程上来看,使用select函数进行I/O请求和同步阻塞模型没有太大的区别,甚至还多了添加监视socket,以及调用select函数的额外操作,效率更差。但是,使用select以后最大的优势是用户可以在一个线程内同时处理多个socket的I/O请求。用户可以注册多个socket,然后不断地调用select读取被激活的socket,即可达到在同一个线程内同时处理多个I/O请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。

用户线程使用select函数的伪代码描述为:

{
   select(socket);//将需要进行IO操作的socket加入到select中

   while(1) {
       sockets = select();//获取被激活的socket
       for(socket in sockets) {
           if(can_read(socket)) {//拿被激活的socket去读
               read(socket, buffer);//从socket中把数据库读到buffer中
               process(buffer);//处理缓存中的数据
           }
       }
   }
}

其中while循环前将socket添加到select监视中,然后在while内一直调用select获取被激活的socket,一旦socket可读,便调用read函数将socket中的数据读取出来。

然而,使用select函数的优点并不仅限于此。虽然上述方式允许单线程内处理多个IO请求,但是每个IO请求的过程还是阻塞的(在select函数上阻塞),平均时间甚至比同步阻塞IO模型还要长。如果用户线程只注册自己感兴趣的socket或者IO请求,然后去做自己的事情,等到数据到来时再进行处理,则可以提高CPU的利用率。

IO多路复用模型使用了Reactor设计模式实现了这一机制。

EventHandler抽象类表示IO事件处理器,它拥有IO文件句柄Handle(通过get_handle获取),以及对Handle的操作handle_event(读/写等)。继承于EventHandler的子类可以对事件处理器的行为进行定制。Reactor类用于管理EventHandler(注册、删除等),并使用handle_events实现事件循环,不断调用同步事件多路分离器(一般是内核)的多路分离函数select,只要某个文件句柄被激活(可读/写等),select就返回(阻塞),handle_events就会调用与文件句柄关联的事件处理器的handle_event进行相关操作。

如上图,I/O多路复用模型使用了Reactor设计模式实现了这一机制。通过Reactor的方式,可以将用户线程轮询I/O操作状态的工作统一交给handle_events事件循环进行处理。用户线程注册事件处理器之后可以继续执行做其他的工作(异步),而Reactor线程负责调用内核的select函数检查socket状态。当有socket被激活时,则通知相应的用户线程(或执行用户线程的回调函数),执行handle_event进行数据读取、处理的工作。由于select函数是阻塞的,因此多路I/O复用模型也被称为异步阻塞I/O模型。注意,这里的所说的阻塞是指select函数执行时线程被阻塞,而不是指socket。一般在使用I/O多路复用模型时,socket都是设置为NONBLOCK的,不过这并不会产生影响,因为用户发起I/O请求时,数据已经到达了,用户线程一定不会被阻塞。

void UserEventHandler::handle_event() {

    if(can_read(socket)) {
        read(socket, buffer);
        process(buffer);
    }
}



{
    Reactor.register(new UserEventHandler(socket));
}

//用户需要重写EventHandler的handle_event函数进行读取数据、处理数据的工作,用户线程只需要将自己的EventHandler注册到Reactor即可。Reactor中handle_events事件循环的伪代码大致如下。

Reactor::handle_events() {
    while(1) {
        sockets = select();
        for(socket in sockets) {
            get_event_handler(socket).handle_event();
        }
    }
}

//事件循环不断地调用select获取被激活的socket,然后根据获取socket对应的EventHandler,执行器handle_event函数即可。

IO多路复用是最常使用的IO模型,但是其异步程度还不够“彻底”,因为它使用了会阻塞线程的select系统调用。因此IO多路复用只能称为异步阻塞IO,而非真正的异步IO。

https://blog.csdn.net/happy_wu/article/details/80052617

总结

I/O 多路复用模型是利用select、poll、epoll可以同时监察多个流的 I/O 事件的能力,在空闲的时候,会把当前线程阻塞掉,当有一个或多个流有I/O事件时,就从阻塞态中唤醒,于是程序就会轮询一遍所有的流(epoll是只轮询那些真正发出了事件的流),依次顺序的处理就绪的流,这种做法就避免了大量的无用操作。这里“多路”指的是多个网络连接,“复用”指的是复用同一个线程。采用多路 I/O 复用技术可以让单个线程高效的处理多个连接请求(尽量减少网络IO的时间消耗),且Redis在内存中操作数据的速度非常快(内存内的操作不会成为这里的性能瓶颈),主要以上两点造就了Redis具有很高的吞吐量。

IO多路复用总结:

redis 采用网络IO多路复用技术,来保证在多连接的时候系统的高吞吐量。

多路-指的是多个socket网络连接,复用-指的是复用一个线程。

多路复用主要有三种技术:select,poll,epoll。epoll是最新的、也是目前最好的多路复用技术。

采用多路I/O复用技术:其一,可以让单个线程高效处理多个连接请求(尽量减少网络IO的时间消耗)。其二,Redis在内存中操作数据的速度非常快(内存里的操作不会成为这里的性能瓶颈)。主要以上两点造就了Redis具有很高的吞吐量。

二、为什么Redis是单线程的

2.1.官方答案

因为Redis是基于内存的操作,CPU不是Redis的瓶颈,Redis的瓶颈最有可能是机器内存的大小或者网络带宽。既然单线程容易实现,而且CPU不会成为瓶颈,那就顺理成章地采用单线程的方案了。

2.2.性能指标

关于redis的性能,官方网站也有,普通笔记本轻松处理每秒几十万的请求。

2.3.详细原因

1)不需要各种锁的性能消耗

Redis的数据结构并不全是简单的Key-Value,还有list,hash等复杂的结构,这些结构有可能会进行很细粒度的操作,比如在很长的列表后面添加一个元素,在hash当中添加或者删除一个对象。这些操作可能就需要加非常多的锁,导致的结果是同步开销大大增加。

总之,在单线程的情况下,就不用去考虑各种锁的问题,不存在加锁、释放锁操作,没有因为可能出现死锁而导致的性能消耗。

2)单线程多进程集群方案

单线程的威力实际上非常强大,单核cpu效率也非常高,多线程自然是可以比单线程有更高的性能上限,但是在今天的计算环境中,即使是单机多线程的上限也往往不能满足需要了,需要进一步摸索的是多服务器集群化的方案,这些方案中多线程的技术照样是用不上的。

**所以“单线程、多进程的集群”不失为一个时髦的解决方案。**我们需要的是机器内存

3)CPU消耗

采用单线程,避免了不必要的上下文切换和竞争条件,也不存在多进程或者多线程导致的切换而消耗 CPU。

但是如果CPU成为Redis瓶颈,或者不想让服务器其他CUP核闲置,那怎么办?

可以考虑多起几个Redis进程,Redis是key-value数据库,不是关系数据库,数据之间没有约束。只要客户端分清哪些key放在哪个Redis

第3章 Redis的五大数据类型

string String

hash HashMap

list LinkedList

set HashSet

sorted set TreeSet

3.0 key 基本命令

http://www.redis.cn/commands.html

命令

说明

keys *

查看当前库的所有键,支持正则

exists <key>

判断某个键是否存在

type <key>

查看键的类型

del <key>

删除某个键

expire <key> <seconds>

为键值设置过期时间,单位秒

ttl <key>

查看还有多久过期,返回-1表示永不过期,-2表示已过期

dbsize

查看当前数据库中key的数量

flushdb

清空当前库

Flushall

通杀全部库

move key db

移动key,从当前库移动到db库

# 设置key有效期
expire key seconds
pexpire key milliseconds
expireat key timestamp
pexpireat key milliseconds-timestamp

# 查询key有效时间 [time to live]
ttl key # 如果key不存在或者已过期,返回 -2。如果key存在并且没有设置过期时间(永久有效),返回 -1 。整数代表有效秒数
pttl key # 毫秒数

# 切换key从时效性转换为永久性
persist key

查询匹配的key值
keys pattern正则

其他操作
rename key newkey
renamenx key newkey

排序key
sort
sort list1

key的重复问题:

key是由程序员定义的

  • redis在使用过程中,伴随着操作数据量的增加,会出现大量的数据以及对应的key
  • 数据不区分种类、类别混杂在一起,极易出现重复或冲突

解决方案:

redis为每个服务提供有16个数据库,编号从0到15

  • 每个数据库之间的数据相互独立

  • 这些数据库共用一块空间,没有空间大小之分

    select index #切换数据库
    quit
    ping #返回PONG代表连通
    echo

    move key db #移动到另外一个库
    dbsize
    flushdb # 删除掉当前库数据
    flushall # 删除掉所有库

3.1 String

  1. String是Redis最基本的类型,你可以理解成与Memcached一模一样的类型,一个key对应一个value

  2. String类型是二进制安全的。意味着Redis的string可以包含任何数据。比如jpg图片或者序列化的对象 。

  3. String类型是Redis最基本的数据类型,一个Redis中字符串value最多可以是512M

  4. 常用操作

m:multiple

为什么有m的操作:发送命令还需要时长

命令

说明

get <key>

查询对应键值

set <key> <value>

添加键值对

append <key> <value>

将给定的<value>追加到原值的末尾 ,返回的长度

strlen <key>

获取值的长度,不包括\0

setnx <key> <value>

只有在key 不存在时设置key的值

incr <key>

将key中存储的数字值增1 只能对数字值操作,如果为空,新增值为1

decr <key>

将key中存储的数字值减1 只能对数字之操作,如果为空,新增值为-1

incrby/decrby <key> 步长
incrbyfloat key 值

将key中存储的数字值增减,自定义步长。值可正可负,但小数得用float。操作具有原子性,命令都是一个一个执行的,所以安全。超过上限或原来不是数会报错。

mset <key1> <value1> <key2> <value2>

同时设置一个或多个key-value对

mget <key1> <key2> <key3>

同时获取一个或多个value

msetnx <key1> <value1> <key2> <value2>

同时设置一个或多个key-value对,当且仅当所有给定的key都不存在

getrange <key> <起始位置> <结束位置>

获得值的范围,双闭区间,类似java中的substring

setrange <key> <起始位置> <value>

<value>覆盖<key>所存储的字符串值,从<起始位置>开始

setex <key> <过期时间> <value>

设置键值的同时,设置过去时间,单位秒。ex是expire。对投票系统等很有效。但重新设置set的值后,时间就失效了

psetex key milliseconds value

getset <key> <value>

以新换旧,设置了新值的同时获取旧值

数据操作不成功的反馈与数据正常操作之间的差异

值可以是任何种类的字符串(包括二进制数据),例如你可以在一个键下保存一张 .jpeg 图片,只需要注意不要超过 512 MB 的最大限度就好了。

当 key 存在时,SET 命令会覆盖掉你上一次设置的值:

string 类型数据操作的注意事项

① 表示运行结果是否成功
 (integer) 0 → false 失败
 (integer) 1 → true 成功
② 表示运行结果值
 (integer) 3 → 3 3个
 (integer) 1 → 1 1个

 数据未获取到
( nil)等同于null

 数据最大存储量
512MB
 数值计算最大范围( java中的long的最大值)
9223372036854775807

key的命名规范:

表名:主键名:主键值:字段名

stu:id:1:class

  1. 详说 incr key 操作的原子性
  • 所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。
  • 在单线程中, 能够在单条指令中完成的操作都可以认为是" 原子操作",因为中断只能发生于指令之间。
  • 在多线程中,不能被其它进程(线程)打断的操作就叫原子操作。
  • Redis单命令的原子性主要得益于Redis的单线程

Redis 中的字符串是一种 动态字符串sds,这意味着使用者可以修改,它的底层实现有点类似于 Java 中的 ArrayList,有一个字符数组,从源码的 sds.h/sdshdr 文件 中可以看到 Redis 底层对于字符串的定义 SDS,即 Simple Dynamic String 结构:

/* Note: sdshdr5 is never used, we just access the flags byte directly.
 * However is here to document the layout of type 5 SDS strings. */
struct __attribute__ ((__packed__)) sdshdr5 {
    unsigned char flags; /* 3 lsb of type, and 5 msb of string length */
    char buf[];
};
struct __attribute__ ((__packed__)) sdshdr8 {
    uint8_t len; /* used */
    uint8_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
};
struct __attribute__ ((__packed__)) sdshdr16 {
    uint16_t len; /* used */
    uint16_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
};
struct __attribute__ ((__packed__)) sdshdr32 {
    uint32_t len; /* used */
    uint32_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
};
struct __attribute__ ((__packed__)) sdshdr64 {
    uint64_t len; /* used */
    uint64_t alloc; /* excluding the header and null terminator */
    unsigned char flags; /* 3 lsb of type, 5 unused bits */
    char buf[];
};

你会发现同样一组结构 Redis 使用泛型定义了好多次,为什么不直接使用 int 类型呢?

因为当字符串比较短的时候,len 和 alloc 可以使用 byte 和 short 来表示,Redis 为了对内存做极致的优化,不同长度的字符串使用不同的结构体来表示。

SDS 与 C 字符串的区别

为什么不考虑直接使用 C 语言的字符串呢?因为 C 语言这种简单的字符串表示方式 不符合 Redis 对字符串在安全性、效率以及功能方面的要求。我们知道,C 语言使用了一个长度为 N+1 的字符数组来表示长度为 N 的字符串,并且字符数组最后一个元素总是 '\0'(下图就展示了 C 语言中值为 “Redis” 的一个字符数组)

C语言这样简单的数据结构可能会造成以下一些问题:

  • 获取字符串长度为 O(N) 级别的操作 → 因为 C 不保存数组的长度,每次都需要遍历一遍整个数组;
  • 不能很好的杜绝 缓冲区溢出/内存泄漏 的问题 → 跟上述问题原因一样,如果执行拼接 or 缩短字符串的操作,如果操作不当就很容易造成上述问题;
  • C 字符串 只能保存文本数据 → 因为 C 语言中的字符串必须符合某种编码(比如 ASCII),例如中间出现的 '\0' 可能会被判定为提前结束的字符串而识别不了;

我们以追加字符串的操作举例,Redis 源码如下:

/* Append the specified binary-safe string pointed by 't' of 'len' bytes to the
 * end of the specified sds string 's'.
 *
 * After the call, the passed sds string is no longer valid and all the
 * references must be substituted with the new pointer returned by the call. */
sds sdscatlen(sds s, const void *t, size_t len) {
    // 获取原字符串的长度
    size_t curlen = sdslen(s);

    // 按需调整空间,如果容量不够容纳追加的内容,就会重新分配字节数组并复制原字符串的内容到新数组中
    s = sdsMakeRoomFor(s,len); 
    if (s == NULL) return NULL;   // 内存不足
    memcpy(s+curlen, t, len);     // 追加目标字符串到字节数组中
    sdssetlen(s, curlen+len);     // 设置追加后的长度
    s[curlen+len] = '\0';         // 让字符串以 \0 结尾,便于调试打印
    return s;
}

3.2 hash

hash相当于一个hashMap,他是一整个hashMap,而不只是一个键值对。

场景:学生有姓名,学号,班级等信息,想让学生为key,剩下的为值。那么此时值想让称为一个hashmap,这样更方便更改。

新的存储需求:对一系列存储的数据进行编组,方便管理,典型应用存储对象信息

需要的存储结构:一个存储空间保存多个键值对数据

hash类型:底层使用哈希表结构实现数据存储

hash存储结构优化
 如果field数量较少,存储结构优化为类数组结构
 如果field数量较多,存储结构使用HashMap结构

hdel key field1 [field2]

hmset key field1 value1 field2 value2...#设置多个
hmget key field1 field2...#获取多个
hlen key #字段数量
hexists key field #查是否存在指定字段

hkeys key
hvals key

hincrby key field increment
hincrbyfloat key field increment

命令

说明

hset <key> <field> <value>

给集合中的 键赋值

hget <key1> <field>

从集合 取出 value

hmset <key1> <field1> <value1> <field2> <value2>...

批量设置hash的值

hexists key <field>

查看哈希表 key 中,给定域 field 是否存在。

hkeys

列出该hash集合的所有field

hvals

列出该hash集合的所有value

hincrby

为哈希表 key 中的域 field 的值加上增量 increment

hsetnx

将哈希表 key 中的域 field 的值设置为 value ,当且仅当域 field 不存在

例子:
127.0.0.1:6379> hset user id 01
(integer) 0
127.0.0.1:6379> hset user name lisi
(integer) 0
127.0.0.1:6379> hset user class 1
(integer) 1
# 即hset可以多句分开设置,不会覆盖、

127.0.0.1:6379> hgetall user
1) "id"
2) "01"
3) "name"
4) "lisi"
5) "class"
6) "1"

127.0.0.1:6379> hget user name
"lisi"
127.0.0.1:6379> hdel user class
(integer) 1
127.0.0.1:6379> hgetall user
1) "id"
2) "01"
3) "name"
4) "lisi"

127.0.0.1:6379> hmget user id name
1) "01"
2) "lisi"
127.0.0.1:6379> hexists user id
(integer) 1
127.0.0.1:6379> hexists user age
(integer) 0
127.0.0.1:6379> hkeys user
1) "id"
2) "name"

hash 类型数据操作的注意事项

  • hash类型下的value只能存储字符串,不允许存储其他数据类型,不存在嵌套现象。如果数据未获取到,对应的值为( nil)
  • 每个 hash 可以存储 232 - 1 个键值对
  • hash类型十分贴近对象的数据存储形式,并且可以灵活添加删除对象属性。但hash设计初衷不是为了存储大量对象而设计的,切记不可滥用,更不可以将hash作为对象列表使用
  • hgetall 操作可以获取全部属性,如果内部field过多,遍历整体数据效率就很会低,有可能成为数据访问瓶颈

hash类型应用场景:

淘宝购物车涉及与实现

业务分析:

  • 仅分析购物车的redis存储模型。添加、浏览、更改数量、删除、清空
  • 购物车于数据库间持久化同步(不讨论)
  • 购物车于订单间关系(不讨论)
    提交购物车:读取数据生成订单
    商家临时价格调整:隶属于订单级别
  • 未登录用户购物车信息存储(不讨论)
    cookie存储

解决方案

  • 以客户id作为key,每位客户创建一个hash存储结构存储对应的购物车信息
  • 将商品编号作为field,购买数量作为value进行存储
  • 添加商品:追加全新的field与value
  • 浏览:遍历hash
  • 更改数量:自增/自减,设置value值
  • 删除商品:删除field
  • 清空:删除key
  • 此处仅讨论购物车中的模型设计
  • 购物车与数据库间持久化同步、购物车与订单间关系、未登录用户购物车信息存储不进行讨论

场景2:双11活动日,销售手机充值卡的商家对移动、联通、电信的30元、 50元、 100元商品推出抢购活动,每种商品抢购上限1000张

解决方案

  • 以商家id作为key,3个key
  • 将参与抢购的商品id作为field
  • 将参与抢购的商品数量作为对应的value
  • 抢购时使用降值的方式控制产品数量
  • 实际业务中还有超卖等实际问题,这里不做讨论

业务场景
string存储对象( json)与hash存储对象

// 源码定义如 `dict.h/dictht` 定义:
typedef struct dictht {
    // 哈希表数组
    dictEntry **table;
    // 哈希表大小
    unsigned long size;
    // 哈希表大小掩码,用于计算索引值,总是等于 size - 1
    unsigned long sizemask;
    // 该哈希表已有节点的数量
    unsigned long used;
} dictht;

typedef struct dict {
    dictType *type;
    void *privdata;
    // 内部有两个 dictht 结构
    dictht ht[2];
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */
    unsigned long iterators; /* number of iterators currently running */
} dict;

table 属性是一个数组,数组中的每个元素都是一个指向 dict.h/dictEntry 结构的指针,而每个 dictEntry 结构保存着一个键值对:

typedef struct dictEntry {
    // 键
    void *key;
    // 值
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;
    // 指向下个哈希表节点,形成链表
    struct dictEntry *next;
} dictEntry;

可以从上面的源码中看到,实际上字典结构的内部包含两个 hashtable,通常情况下只有一个 hashtable 是有值的,但是在字典扩容缩容时,需要分配新的 hashtable,然后进行 渐进式搬迁 (下面说原因)

渐进式 rehash

大字典的扩容是比较耗时间的,需要重新申请新的数组,然后将旧字典所有链表中的元素重新挂接到新的数组下面,这是一个 O(n) 级别的操作,作为单线程的 Redis 很难承受这样耗时的过程,所以 Redis 使用 渐进式 rehash 小步搬迁:

渐进式 rehash 会在 rehash 的同时,保留新旧两个 hash 结构,如上图所示,查询时会同时查询两个 hash 结构,然后在后续的定时任务以及 hash 操作指令中,循序渐进的把旧字典的内容迁移到新字典中。当搬迁完成了,就会使用新的 hash 结构取而代之。

扩缩容的条件

正常情况下,当 hash 表中 元素的个数等于第一维数组的长度时,就会开始扩容,扩容的新数组是 原数组大小的 2 倍。不过如果 Redis 正在做 bgsave(持久化命令),为了减少内存也得过多分离,Redis 尽量不去扩容,但是如果 hash 表非常满了,达到了第一维数组长度的 5 倍了,这个时候就会 强制扩容

当 hash 表因为元素逐渐被删除变得越来越稀疏时,Redis 会对 hash 表进行缩容来减少 hash 表的第一维数组空间占用。所用的条件是 元素个数低于数组长度的 10%,缩容不会考虑 Redis 是否在做 bgsave

3.3 List

LinkedList

  1. 单键多值

  2. Redis 列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素导列表的头部(左边)或者尾部(右边)。

  3. 它的底层实际是个**双向链表,对两端的操作性能很高,通过索引下标的操作中间的节点性能会较差**

不可以操作中间的

  1. 常用操作

命令

说明

lpush/rpush <key> <value1> <value2>

从左边/右边插入一个或多个值。

lpop/rpop <key>

从左边/右边吐出一个值。 值在键在,值光键亡。

rpoplpush <key1> <key2>

<key1>列表右边吐出一个值,插到<key2>列表左边

lrange <key> <start> <stop>

按照索引下标获得元素(从左到右),可以用-1表示最后一个

lindex <key> <index>

按照索引下标获得元素(从左到右)

llen <key>

获得列表长度

linsert <key> before <value> <newvalue>

<value>的后面插入<newvalue> 插入值

lrem <key> <n> <value>

从左边删除n个value(从左到右)

# 规定时间内获取并移除数据,现在没有没关系,可以等一会
blpop key1 [key2] timeout
brpop key1 [key2] timeout
brpoplpush source destination timeout

业务场景:

微信朋友圈点赞,要求按照点赞顺序显示点赞好友信息
如果取消点赞,移除对应好友信息

# 移除指定数据,中间拿数据,但其实是从左面拿指定value,拿完count个value后结束
lrem key count value

list操作注意事项:

  • list中保存的数据都是string类型的,数据总容量是有限的,最多2^32 - 1 个元素 (4294967295)。
  • list具有索引的概念,但是操作数据时通常以队列的形式进行入队出队操作,或以栈的形式进行入栈出栈操作
  • 获取全部数据操作结束索引设置为-1
  • list可以对数据进行分页操作,通常第一页的信息来自于list,第2页及更多的信息通过数据库的形式加载

场景2:

twitter、新浪微博、腾讯微博中个人用户的关注列表需要按照用户的关注顺序进行展示,粉丝列表需要将最近关注的粉丝列在前面

新闻、资讯类网站如何将最新的新闻或资讯按照发生的时间顺序展示?
企业运营过程中,系统将产生出大量的运营数据,如何保障多台服务器操作日志的统一顺序输出?

解决方案

  • 依赖list的数据具有顺序的特征对信息进行管理
  • 使用队列模型解决多路信息汇总合并的问题
  • 使用栈模型解决最新消息的问题

tips:redis 应用于最新消息展示

// `adlist.h/listNode` 源码

/* Node, List, and Iterator are the only data structures used currently. */

typedef struct listNode {
    struct listNode *prev;
    struct listNode *next;
    void *value;
} listNode;

typedef struct listIter {
    listNode *next;
    int direction;
} listIter;

typedef struct list {
    listNode *head;
    listNode *tail;
    void *(*dup)(void *ptr);
    void (*free)(void *ptr);
    int (*match)(void *ptr, void *key);
    unsigned long len;
} list;

虽然仅仅使用多个 listNode 结构就可以组成链表,但是使用 adlist.h/list 结构来持有链表的话,操作起来会更加方便:

3.4 Set

  1. Redis set对外提供的功能与list类似是一个列表的功能,特殊之处在于set是可以自动排重的,当你需要存储一个列表数据,又不希望出现重复数据时,set是一个很好的选择,并且set提供了判断某个成员是否在一个set集合内的重要接口,这个也是list所不能提供的

  2. Redis的Set是string类型的无序集合。它底层其实是一个value为null的hash表,所以添加,删除,查找的复杂度都是O(1)。

当你需要存储一个列表数据,又不希望出现重复数据时,set是一个很好的选择,并且set提供了判断某个成员是否在一个set集合内的重要接口,这个也是list所不能提供的。可以基于 set 轻易实现交集、并集、差集的操作。

  1. 常用操作

list内部是链表结构,读取比较慢,所以想起set,高效查询。

把前面的hash表的field保留,value去除,即得到了set。

新的存储需求:存储大量的数据,在查询方面提供更高的效率
 需要的存储结构:能够保存大量的数据,高效的内部存储机制,便于查询
 set类型:与hash存储结构完全相同,仅存储键,不存储值( nil),并且值是不允许重复的

命令

说明

sadd <key> <value1> <value2> ....

将一个或多个 member 元素加入到集合 key 当中,已经存在于集合的 value元素将被忽略。

smembers <key>

取出该集合的所有值。

sismember <key> <value>

判断集合是否为含有该值,有返回1,没有返回0

scard <key>

返回该集合的元素个数。

srem <key> <value1> <value2> ....

删除集合中的某个元素。

spop <key>

随机从该集合中吐出一个值。

srandmember <key> <n>

随机从该集合中取出n个值。 不会从集合中删除

sinter <key1> <key2>

返回两个集合的交集元素。

sunion <key1> <key2>

返回两个集合的并集元素。

sdiff <key1> <key2>

返回两个集合的差集元素。在

sinterstore key1 key2 key3

将交集存在key1内

缺点: 用户ID数据冗余

  • 第三种方案: 通过 key(用户ID) + field(属性标签) 就可以操作对应属性数据了,既不需要重复存储数据,也不会带来序列化和并发修改控制的问题

扩展:

srandmember key [count] #随机获取集合中指定数量的数据
spop key [count] #随机获取集合中的某个数据并将该数据移除集合


# 求两个集合的交、并、差集
sinter key1 [key2]
sunion key1 [key2]
sdiff key1 [key2]

# 求两个集合的交、并、差集并存储到指定集合中
sinterstore destination key1 [key2]
sunionstore destination key1 [key2]
sdiffstore destination key1 [key2]

# 将指定数据从原始集合中移动到目标集合中
smove source destination member

场景:

每位用户首次使用今日头条时会设置3项爱好的内容,但是后期为了增加用户的活跃度、兴趣点,必须让用户对其他信息类别逐渐产生兴趣,增加客户留存度,如何实现?

业务分析

  • 系统分析出各个分类的最新或最热点信息条目并组织成set集合
  • 随机挑选其中部分信息
  • 配合用户关注信息分类中的热点信息组织成展示的全信息集合

redis 应用于随机推荐类信息检索,例如热点歌单推荐,热点新闻推荐,热卖旅游线路,应用APP推荐,大V推荐等

场景2:

脉脉为了促进用户间的交流,保障业务成单率的提升,需要让每位用户拥有大量的好友,事实上职场新人不具有更多的职场好友,如何快速为用户积累更多的好友?
新浪微博为了增加用户热度,提高用户留存性,需要微博用户在关注更多的人,以此获得更多的信息或热门话题,如何提高用户关注他人的总量?
QQ新用户入网年龄越来越低,这些用户的朋友圈交际圈非常小,往往集中在一所学校甚至一个班级中,如何帮助用户快速积累好友用户带来更多的活跃度?
微信公众号是微信信息流通的渠道之一,增加用户关注的公众号成为提高用户活跃度的一种方式,如何帮助用户积累更多关注的公众号?
美团外卖为了提升成单量,必须帮助用户挖掘美食需求,如何推荐给用户最适合自己的美食?

Tips 9:
 redis 应用于同类信息的关联搜索,二度关联搜索,深度关联搜索
 显示共同关注(一度)
 显示共同好友(一度)
 由用户A出发,获取到好友用户B的好友信息列表(一度)
 由用户A出发,获取到好友用户B的购物清单列表(二度)
 由用户A出发,获取到好友用户B的游戏充值列表(二度)

set 类型数据操作的注意事项

  • set 类型不允许数据重复,如果添加的数据在 set 中已经存在,将只保留一份
  • set 虽然与hash的存储结构相同,但是无法启用hash中存储值的空间

场景3:

集团公司共具有12000名员工,内部OA系统中具有700多个角色, 3000多个业务操作, 23000多种数据,每位员工具有一个或多个角色,如何快速进行业务操作的权限校验?

场景4:

公司对旗下新的网站做推广,统计网站的PV(访问量) ,UV(独立访客) ,IP(独立IP)。
PV:网站被访问次数,可通过刷新页面提高访问量
UV:网站被不同用户访问的次数,可通过cookie统计访问量,相同用户切换IP地址, UV不变
IP:网站被不同IP地址访问的总次数,可通过IP地址统计访问量,相同IP不同用户访问, IP不变

解决方案
 利用set集合的数据去重特征,记录各种访问数据
 建立string类型数据,利用incr统计日访问量( PV)
 建立set模型,记录不同cookie数量( UV)
 建立set模型,记录不同IP数量( IP)

tips:redis 应用于同类型数据的快速去重

场景5:

黑名单
资讯类信息类网站追求高访问量,但是由于其信息的价值,往往容易被不法分子利用,通过爬虫技术,快速获取信息,个别特种行业网站信息通过爬虫获取分析后,可以转换成商业机密进行出售。例如第三方火车票、机票、酒店刷票代购软件,电商刷评论、刷好评。
同时爬虫带来的伪流量也会给经营者带来错觉,产生错误的决策,有效避免网站被爬虫反复爬取成为每个网站都要考虑的基本问题。在基于技术层面区分出爬虫用户后,需要将此类用户进行有效的屏蔽,这就是黑名单的典型应用。
ps:不是说爬虫一定做摧毁性的工作,有些小型网站需要爬虫为其带来一些流量。

白名单
对于安全性更高的应用访问,仅仅靠黑名单是不能解决安全问题的,此时需要设定可访问的用户群体,
依赖白名单做更为苛刻的访问验证。

解决方案
 基于经营战略设定问题用户发现、鉴别规则
 周期性更新满足规则的用户黑名单,加入set集合
 用户行为信息达到后与黑名单进行比对,确认行为去向
 黑名单过滤IP地址:应用于开放游客访问权限的信息源
 黑名单过滤设备信息:应用于限定访问设备的信息源
 黑名单过滤用户:应用于基于访问权限的信息源

tips:redis 应用于基于黑名单与白名单设定的服务控制

3.5 zset (sorted set)

zset和set相比,sorted set增加了一个权重参数score,使得集合中的元素能够按score进行有序排列。

它的内部实现用的是一种叫做 「跳跃表」 的数据结构,由于比较复杂,所以在这里简单提一下原理就好了:

score不是数据。

  1. Redis有序集合zset与普通集合set非常相似,是一个没有重复元素的字符串集合。不同之处是有序集合的每个成员都关联了一个评分(score) ,这个评分(score)被用来按照从最低分到最高分的方式排序集合中的成员。集合的成员是唯一的,但是评分可以是重复了 。

  2. 因为元素是有序的, 所以你也可以很快的根据评分(score)或者次序(position)来获取一个范围的元素。访问有序集合的中间元素也是非常快的,因此你能够使用有序集合作为一个没有重复成员的智能列表。

  3. 常用操作

命令

说明

zadd <key> <score1> <value1> <score2> <value2>...

将一个或多个 member 元素及其 score 值加入到有序集 key 当中。注意分在前,值在后。

zrange <key> <start> <stop> [WITHSCORES]

返回有序集 key 中,下标在 之间的元素 带WITHSCORES,可以让分数一起和值返回到结果集。

zrangebyscore key min max [withscores] [limit offset count]

返回有序集 key 中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员。有序集成员按 score 值递增(从小到大)次序排列。

zrevrangebyscore key max min [withscores] [limit offset count]

同上,改为从大到小排列。

zincrby <key> <increment> <value>

为元素的score加上增量

zrem <key> <value>

删除该集合下,指定值的元素

zremrangebyrank/zremrangebyscore key start stop【或min max】

条件删除

zcount <key> <min> <max>

统计该集合,分数区间内的元素个数

zrank <key> <value>

返回该值在集合中的排名,从0开始。

 获取集合数据总量
zcard key
zcount key min max

 集合交、并操作
zinterstore destination numkeys key [key ...]
zunionstore destination numkeys key [key ...]

主要应用:排行榜

  1. 思考: 如何利用zset实现一个文章访问量的排行榜?

    语法:zadd
    127.0.0.1:6379> zadd test 94 zs
    (integer) 1
    127.0.0.1:6379> zadd test 100 ls
    (integer) 1
    127.0.0.1:6379> zadd test 60 ww
    (integer) 1
    127.0.0.1:6379> zadd test 47 zl
    (integer) 1
    127.0.0.1:6379> zrange test 0 -1
    1) "zl"
    2) "ww"
    3) "zs"
    4) "ls"
    127.0.0.1:6379> zrange test 0 -1 withscores
    1) "zl"
    2) "47"
    3) "ww"
    4) "60"
    5) "zs"
    6) "94"
    7) "ls"
    8) "100"

  • min与max用于限定搜索查询的条件
  • start与stop用于限定查询范围,作用于索引,表示开始和结束索引
  • offset与count用于限定查询范围,作用于查询结果,表示开始位置和数据总量

场景1:

票选广东十大杰出青年,各类综艺选秀海选投票
各类资源网站TOP10(电影,歌曲,文档,电商,游戏等)
聊天室活跃度统计
游戏好友亲密度
业务分析
 为所有参与排名的资源建立排序依据

 获取数据对应的索引(排名)
zrank key member
zrevrank key member
 score值获取与修改
zscore key member
zincrby key increment member

 redis 应用于计数器组合排序功能对应的排名

注意事项:

score保存的数据存储空间是64位,如果是整数范围是-9007199254740992~9007199254740992
 score保存的数据也可以是一个双精度的double值,基于双精度浮点数的特征,可能会丢失精度,使用时候要慎重
 sorted_set 底层存储还是基于set结构的,因此数据不能重复,如果重复添加相同的数据, score值将被反复覆盖,保留最后一次修改的结果

场景2:

基础服务+增值服务类网站会设定各位会员的试用,让用户充分体验会员优势。例如观影试用VIP、游戏VIP体验、云盘下载体验VIP、数据查看体验VIP。当VIP体验到期后,如果有效管理此类信息。即便对于正式VIP用户也存在对应的管理方式。
网站会定期开启投票、讨论,限时进行,逾期作废。如何有效管理此类过期信息。

解决方案:

对于基于时间线限定的任务处理,将处理时间记录为score值,利用排序功能区分处理的先后顺序
 记录下一个要处理的时间,当到期后处理对应任务,移除redis中的记录,并记录下一个要处理的时间
 当新任务加入时,判定并更新当前下一个要处理的任务时间
 为提升sorted_set的性能,通常将任务根据特征存储成若干个sorted_set。例如1小时内, 1天内,周内,月内,季内,年度等,操作时逐级提升,将即将操作的若干个任务纳入到1小时内处理的队列中

time # 获取当前系统时间

tips: redis 应用于定时任务执行顺序管理或任务过期管理

场景3:

任务/消息权重设定应用
当任务或者消息待处理,形成了任务队列或消息队列时,对于高优先级的任务要保障对其优先处理,如何实现任务权重管理。

解决方案:

对于带有权重的任务,优先处理权重高的任务,采用score记录权重即可多条件任务权重设定
如果权重条件过多时,需要对排序score值进行处理,保障score值能够兼容2条件或者多条件,例如外贸订单优先于国内订单,总裁订单优先于员工订单,经理订单优先于员工订单
 因score长度受限,需要对数据进行截断处理,尤其是时间设置为小时或分钟级即可(折算后)
 先设定订单类别,后设定订单发起角色类别,整体score长度必须是统一的,不足位补0。第一排序规则首位不得是0
 例如外贸101,国内102,经理004,员工008。
 员工下的外贸单score值为101008(优先)
 经理下的国内单score值为102004

第4章 Redis的相关配置

  1. 计量单位说明,大小写不敏感

  2. include

类似jsp中的include,多实例的情况可以把公用的配置文件提取出来

  1. ip地址的绑定 bind
  • 默认情况bind=127.0.0.1,只能接受本机的访问请求
  • 不写的情况下,无限制接受任何ip地址的访问
  • 生产环境肯定要写你应用服务器的地址
  • 如果开启了protected-mode,那么在没有设定bind ip且没有设密码的情况下,Redis只允许接受本机的相应
  1. tcp-backlog
  • 可以理解是一个请求到达后至到接受进程处理前的队列.
  • backlog队列总和=未完成三次握手队列 + 已经完成三次握手队列
  • 高并发环境tcp-backlog 设置值跟超时时限内的Redis吞吐量决定
  1. timeout

一个空闲的客户端维持多少秒会关闭,0为永不关闭。

  1. tcp keepalive

对访问客户端的一种心跳检测,每个n秒检测一次,官方推荐设置为60秒

  1. daemonize

是否为后台进程

  1. pidfile

存放pid文件的位置,每个实例会产生一个不同的pid文件

  1. log level

四个级别根据使用阶段来选择,生产环境选择notice 或者warning

  1. log level

日志文件名称

  1. syslog

是否将Redis日志输送到linux系统日志服务中

  1. syslog-ident

日志的标志

  1. syslog-facility

输出日志的设备

  1. database

设定库的数量 默认16

  1. security

在命令行中设置密码

config get requirepass
config set requirepass "123456"
config get requirepass
auth 123456
get kl
  1. maxclient

最大客户端连接数

  1. maxmemory

设置Redis可以使用的内存量。一旦到达内存使用上限,Redis将会试图移除内部数据,移除规则可以通过maxmemory-policy来指定。如果Redis无法根据移除规则来移除内存中的数据,或者设置了“不允许移除”,

那么Redis则会针对那些需要申请内存的指令返回错误信息,比如SET、LPUSH等。

  1. Maxmemory-policy
  • volatile-lru:使用LRU算法移除key,只对设置了过期时间的键
  • allkeys-lru:使用LRU算法移除key
  • volatile-random:在过期集合中移除随机的key,只对设置了过期时间的键
  • allkeys-random:移除随机的key
  • volatile-ttl:移除那些TTL值最小的key,即那些最近要过期的key
  • noeviction:不进行移除。针对写操作,只是返回错误信息
  1. Maxmemory-samples

设置样本数量,LRU算法和最小TTL算法都并非是精确的算法,而是估算值,所以你可以设置样本的大小。

一般设置3到7的数字,数值越小样本越不准确,但是性能消耗也越小。

第5章 Jedis

可视化客户端:Redis Desktop Manager

Jedis jedis = new Jedis("localhost",6379);
jedis.set("name","lisi");//方法名和原来的命令一致
jedis.get("name");
jedis.close();
//http://xetorthio.github.io/jedis/
  1. Jedis所需要的jar包 ,可通过Maven的依赖引入

    Commons-pool-1.6.jar
    Jedis-2.1.0.jar

    maven为jedis,可选junit

  2. 使用Windows环境下Eclipse连接虚拟机中的Redis注意事项

  • 禁用Linux的防火墙:Linux(CentOS7)里执行命令 : systemctl stop firewalld.service

  • redis.conf中注释掉bind 127.0.0.1,然后 protect-mode no。

    package com.itheima;

    import org.junit.Test;
    import redis.clients.jedis.Jedis;

    import java.util.List;
    import java.util.Map;

    public class JedisTest {
    @Test
    public void testJedis(){
    //1.连接redis
    Jedis jedis = new Jedis("127.0.0.1", 6379);
    //2.操作redis
    // jedis.set("name","itheima");
    String name = jedis.get("name");
    System.out.println(name);
    //3.关闭连接
    jedis.close();
    }

    @Test
    public void testList(){
        //1.连接redis
        Jedis jedis = new Jedis("127.0.0.1", 6379);
        //2.操作redis
        jedis.lpush("list1","a","b","c");
        jedis.rpush("list1","x");
    List&lt;String&gt; list1 = jedis.lrange("list1", 0, -1);
    for(String s : list1){
        System.out.println(s);
    }
    
    System.out.println(jedis.llen("list1"));
    
    System.out.println();
    //3.关闭连接
    jedis.close();
    } @Test public void testHash(){ //1.连接redis Jedis jedis = new Jedis("127.0.0.1", 6379); //2.操作redis
    jedis.hset("hash1","a1","b1");
    jedis.hset("hash1","a2","a2");
    jedis.hset("hash1","a3","b3");
    
    Map&lt;String, String&gt; hash1 = jedis.hgetAll("hash1");
    
    System.out.println(hash1);
    
    System.out.println(jedis.hlen("hash1"));
    
    System.out.println();
    //3.关闭连接
    jedis.close();
    }

    }

① 设定一个服务方法,用于模拟实际业务调用的服务,内部采用打印模拟调用
② 在业务调用前服务调用控制单元,内部使用redis进行控制,参照之前的方案
③ 对调用超限使用异常进行控制,异常处理设定为打印提示信息
④ 主程序启动3个线程,分别表示3种不同用户的调用

package com.itheima;

import com.itheima.util.JedisUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisDataException;

public class Service {
    private String id;
    private int num;

    public Service(String id,int num){
        this.id = id;
        this.num = num;
    }
    //控制单元
    public void service(){
//        Jedis jedis = new Jedis("127.0.0.1",6379);
        Jedis jedis = JedisUtils.getJedis();
        String value = jedis.get("compid:"+id);
        //判断该值是否存在
        try{
            if(value == null){
                //不存在,创建该值
                jedis.setex("compid:"+id,5,Long.MAX_VALUE-num+"");
            }else{
                //存在,自增,调用业务
                Long val = jedis.incr("compid:"+id);
                business(id,num-(Long.MAX_VALUE-val));
            }
        }catch (JedisDataException e){
            System.out.println("使用已经到达次数上限,请升级会员级别");
            return;
        }finally{
            jedis.close();
        }
    }
    //业务操作
    public void business(String id,Long val){
        System.out.println("用户:"+id+" 业务操作执行第"+val+"次");
    }
}

class MyThread extends Thread{
    Service sc ;
    public MyThread(String id,int num){
        sc = new Service(id,num);
    }
    public void run(){
        while(true){
            sc.service();
            try {
                Thread.sleep(300L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class Main{
    public static void main(String[] args) {
        MyThread mt1 = new MyThread("初级用户",10);
        MyThread mt2 = new MyThread("高级用户",30);
        mt1.start();
        mt2.start();
    }
}


package com.itheima.util;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.util.ResourceBundle;

public class JedisUtils {
    private static JedisPool jp = null;
    private static String host = null;
    private static int port;
    private static int maxTotal;
    private static int maxIdle;

    static {//定义放到外面
        ResourceBundle rb = ResourceBundle.getBundle("redis");//redis.properties
        host = rb.getString("redis.host");
        port = Integer.parseInt(rb.getString("redis.port"));
        maxTotal = Integer.parseInt(rb.getString("redis.maxTotal"));
        maxIdle = Integer.parseInt(rb.getString("redis.maxIdle"));
        JedisPoolConfig jpc = new JedisPoolConfig();
        jpc.setMaxTotal(maxTotal);
        jpc.setMaxIdle(maxIdle);
        jp = new JedisPool(jpc,host,port);
    }

    public static Jedis getJedis(){
        return jp.getResource();
    }
    public static void main(String[] args){
        JedisUtils.getJedis();
    }
}
// 原方法//这种每次都要拿一个连接池,没有效率,所以放到static代码块中
public static Jedis getJedis() {
    JedisPoolConfig jpc=new JedisPoolConfig();
    jpc.setMaxTotal(30);
    jpc.setMaxIdle(10);
    String host="127.0.0.1";
    int port=6379;
    JedisPool jp=new JedisPool(jpc,host,port);
    return jp.getResource();
}

redis.properties

redis.host=127.0.0.1
redis.port=6379
redis.maxTotal=30
redis.maxIdle=10

第6章 Redis 事务

6.1 Redis中事务的定义

Redis事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断

Redis事务的主要作用就是串联多个命令防止别的命令插队

基本操作:

multi # 开启事务
作用:设定事务的开启位置,此指令执行后,后续的所有指令均加入到事务中

exec # 执行事务
作用:设定事务的结束位置,同时执行事务。与multi成对出现,成对使用
注意:加入事务的命令暂时进入到任务队列中,并没有立即执行,只有执行exec命令才开始执行

discard # 取消事务
作用:终止当前事务的定义,发生在multi之后, exec之前
  1. 从输入Multi命令开始,输入的命令都会依次进入命令队列中,但不会执行,至到输入Exec后,Redis会将之前的命令队列中的命令依次执行。

  2. 组队的过程中可以通过discard来放弃组队。

事务的注意事项

  • 事务过程中,输入的语法错误:会造成取消事务,且整体事务的所有命令均不会执行。包括正确的指令也不执行
  • 事务过程中,输入语法无误,但不符合逻辑:例如对list进行incr操作。能够正确运行的命令会执行,运行错误的命令不会被执行。注意:已经执行完毕的命令对应的数据不会自动回滚,需要程序员自己在代码中实现回滚。

redis事务三特性

  • 单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

  • 没有隔离级别的概念:队列中的命令没有提交之前都不会实际的被执行,因为事务提交前任何指令都不会被实际执行,也就不存在“事务内的查询要看到事务里的更新,在事务外查询不能看到”这个让人万分头痛的问题

  • 不保证原子性:Redis同一个事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚

    手动进行事务回滚
     记录操作过程中被影响的数据之前的状态
    单数据: string
    多数据: hash、 list、 set、 zset

     设置指令恢复所有的被修改的项
    单数据:直接set(注意周边属性,例如时效)
    多数据:修改对应值或整体克隆复制

业务场景

天猫双11热卖过程中,对已经售罄的货物追加补货, 4个业务员都有权限进行补货。补货的操作可能是一系列的操作,牵扯到多个连续操作,如何保障不会重复操作?

业务分析:

 多个客户端有可能同时操作同一组数据,并且该数据一旦被操作修改后,将不适用于继续操作
 在操作之前锁定要操作的数据,一旦发生变化,终止当前操作

事务之—监视锁(超卖问题、分布式锁)

watch的变量发生变化了,那么事务将不会执行。例如多终端修改。

# 对 key 添加监视锁,在执行exec前如果key发生了变化,终止事务执行  # 得写在multi外
watch key1 [key2……]

# 取消对【所有】 key 的监视
unwatch

EXEC和DISCARD也有取消监视的效果:如果在执行 WATCH 命令之后, EXEC 命令或 DISCARD 命令先被执行了的话,那么就不需要再执行 UNWATCH 了。

Tips 18:redis 应用基于状态控制的批量任务执行

业务场景

天猫双11热卖过程中,对已经售罄的货物追加补货,且补货完成。客户购买热情高涨, 3秒内将所有商品购买完毕。本次补货已经将库存全部清空,如何避免最后一件商品不被多人同时购买? 【超卖问题】

业务分析

 使用watch监控一个key有没有改变已经不能解决问题,此处要监控的是具体数据
 虽然redis是单线程的,但是多个客户端对同一数据同时进行操作时,如何避免不被同时修改?

watch的值是不停在改变的,1件的时候一个人买到其他人就消掉?他是监控一个值能不能变的,而不是监控其他人能不能改这个值的,

思想:

  • 利用setnx设置一个锁对象,拿到该锁对象才能操作业务,操作完是否该锁

    解决方案:

    setnx lock-keyname value # 设置一个公共锁
    #value值不重要,比如我们要锁name这个变量,那么就是setnx lock-name true
    利用setnx命令的返回值特征,有值则返回设置失败,无值则返回设置成功
     对于返回设置成功的,拥有控制权,进行下一步的具体业务操作
     对于返回设置失败的,不具有控制权,排队或等待

    set num 10
    setnx lock-num 1 # setnx只是简单形式,实际比这复杂,还得加过期时间+更新过期时间
    incrby num -1 #库存-1
    del lock-num # 操作完毕通过del操作释放锁
    如果别的终端在这个过程中页想加锁,别的终端是加不上的。到时他得重新输入命令他再锁

上面方案的问题:某个用户拿到锁之后还没delete呢,却宕机了,那该key岂不是永远不delete了?解决方案是给key加个过期时间

setnx lock-name 1
假如此时停电宕机了,却还没执行delete释放锁。

解决方案:给锁加失效时间
setnx lock-name 1
expire lock-name 20 #20s


# 此外,对象时间长的业务,可以检查所的失效时间,一旦小于1个值,重新设置过期时间,给自己的业务续命

由于操作通常都是微秒或毫秒级,因此该锁定时间不宜设置过大。具体时间需要业务测试后确认。

  • 例如:持有锁的操作最长执行时间127ms,最短执行时间7ms。
  • 测试百万次最长执行时间对应命令的最大耗时,测试百万次网络延迟平均耗时
  • 锁时间设定推荐:最大耗时×120%+平均网络延迟×110%
  • 如果业务最大耗时<<网络平均延迟,通常为2个数量级,取其中单个耗时较长即可

6.3 事务中的错误处理

  • 组队阶段: 语法错误,命令错了
  • 执行阶段:对象错了,对象类型不对。(语法没错)
  1. 组队阶段,如果某个命令出现了报告错误,执行时整个的所有队列会都会被取消。

  1. 执行阶段,如果某个命令报出了错误,则只有报错的命令不会被执行,而其他的命令都会执行,不会回滚。

6.4 为什么要做成事务?

悲观锁(Pessimistic Lock), 别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁

乐观锁适用于多读的应用类型,这样可以提高吞吐量。Redis就是利用这种check-and-set机制实现事务的。

6.6 Redis事务 秒杀案例

  1. 解决计数器和人员记录的事务操作

  1. 秒杀并发模拟 ab工具

CentOS6 默认安装 ,CentOS7需要手动安装

  • 联网: yum install httpd-tools
  • 无网络: 进入cd /run/media/root/CentOS 7 x86_64/Packages ,顺序安装

apr-1.4.8-3.el7.x86_64.rpm、 apr-util-1.5.2-6.el7.x86_64.rpm、 httpd-tools-2.4.6-67.el7.centos.x86_64.rpm

  • ab –n 请求数 -c 并发数 -p 指定请求数据文件 -T “application/x-www-form-urlencoded” 测试的请求
  1. 超卖问题

  2. 请求超时问题

节省每次连接redis服务带来的消耗,把连接好的实例反复利用

连接池参数:

MaxTotal:控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取;如果赋值为-1,则表示不限制;如果pool已经分配了MaxTotal个jedis实例,则此时pool的状态为exhausted。

maxIdle:控制一个pool最多有多少个状态为idle(空闲)的jedis实例;

MaxWaitMillis:表示当borrow一个jedis实例时,最大的等待毫秒数,如果超过等待时间,则直接抛JedisConnectionException;

testOnBorrow:获得一个jedis实例的时候是否检查连接可用性(ping());如果为true,则得到的jedis实例均是可用的;

  1. 遗留问题
  • LUA脚本

Lua 是一个小巧的脚本语言,Lua脚本可以很容易的被C/C++ 代码调用,也可以反过来调用C/C++的函数,Lua并没有提供强大的库,一个完整的Lua解释器不过200k,所以Lua不适合作为开发独立应用程序的语言,而是作为嵌入式脚本语言。

很多应用程序、游戏使用LUA作为自己的嵌入式脚本语言,以此来实现可配置性、可扩展性。这其中包括魔兽争霸地图、魔兽世界、博德之门、愤怒的小鸟等众多游戏插件或外挂

  • LUA脚本在Redis中的优势

将复杂的或者多步的redis操作,写为一个脚本,一次提交给redis执行,减少反复连接redis的次数。提升性能。

LUA脚本是类似redis事务,有一定的原子性,不会被其他命令插队,可以完成一些redis事务性的操作

但是注意redis的lua脚本功能,只有在2.6以上的版本才可以使用。

l 利用lua脚本淘汰用户,解决超卖问题。

第7章 Redis 持久化

是否正在加载RDB文件内容
最后一次保存之后改变的键的个数
是否正在后台执行RDB保存任务
最后一次执行RDB保存任务的时间
最后一次执行RDB保存任务的状态
最后一次执行RDB保存任务消耗的时间
如果正在执行RDB保存任务,则为当前RDB任务已经消耗的时间,否则为一1
最后一次执行RDB保存任务消耗的内存
是否开启了AOF功能
是否正在后台执行AOF重写任务(重写在后续的章节介绍)
是否等待调度一次OF重写任务。如果触发了一次AOF重写,但是后台正在执行RDB保存任务时会将该状态置为1
最后一次执行AOF重写任务消耗的时间
如果正在执行AOF重写任务,则为当前该任务巳经消耗的时问,否则为一1
最后一次执行AOF重写任务的状态
最后一次执行AOF缓冲区写入的状态(服务端执行命令时会开辟一段内存空f司将命令放入其中,然后从该缓冲区中同步到文

怎么保证 redis 挂掉之后再重启数据可以进行恢复

Redis提供了2个不同形式的持久化方式 RDB 和 AOF

  • RDB:将当前数据状态进行保存,快照形式,存储数据结果,存储格式简单,关注点在数据
  • AOF:将数据的操作过程进行保存,日志形式,存储操作过程,存储格式复杂,关注点在数据的操作过程

7.1 RDB

原理是redis会单独创建(fork)一个与当前进程一模一样的子进程来进行持久化,这个子线程的所有数据(变量。环境变量,程序程序计数器等)都和原进程一模一样,会先将数据写入到一个临时文件中,待持久化结束了,再用这个临时文件替换上次持久化好的文件,整个过程中,主进程不进行任何的io操作,这就确保了极高的性能

Redis可以通过创建快照来获得存储在内存里面的数据在某个时间点上的副本。Redis创建快照之后,可以对快照进行备份,可以将快照复制到其他服务器从而创建具有相同数据的服务器副本(Redis主从结构,主要用来提高Redis性能),还可以将快照留在原地以便重启服务器的时候使用。

如果系统真的发生崩溃,用户将丢失最近一次生成快照之后更改的所有数据。因此,快照持久化只适用于即使丢失一部分数据也不会造成一些大问题的应用程序。不能接受这个缺点的话,可以考虑AOF持久化。

创建快照的办法有如下几种:

  • BGSAVE命令: 客户端向Redis发送 BGSAVE命令 来创建一个快照。对于支持BGSAVE命令的平台来说(基本上所有平台支持,除了Windows平台),Redis会调用fork来创建一个子进程,然后子进程负责将快照写入硬盘,而父进程则继续处理命令请求。
    • 这个子进程和原进程数据一模一样,会先将数据写入到一个临时文件中,待持久化结束了,再用这个临时文件替换上次持久化好的文件,
    • 整个过程中,主进程不进程任何的IO操作,这就确保了极高的性能
  • SAVE命令: 客户端还可以向Redis发送 SAVE命令 来创建一个快照,接到SAVE命令的Redis服务器在快照创建完毕之前不会再响应任何其他命令。SAVE命令不常用,我们通常只会在没有足够内存去执行BGSAVE命令的情况下,又或者即使等待持久化操作执行完毕也无所谓的情况下,才会使用这个命令。
  • save选项: 如果用户设置了save选项(一般会默认设置),比如 save 60 10000,那么从Redis最近一次创建快照之后开始算起,当“60秒之内有10000次写入”这个条件被满足时,Redis就会自动触发BGSAVE命令。
  • SHUTDOWN命令: 当Redis通过SHUTDOWN命令接收到关闭服务器的请求时,或者接收到标准TERM信号时,会执行一个SAVE命令,阻塞所有客户端,不再执行客户端发送的任何命令,并在SAVE命令执行完毕之后关闭服务器。
  • 一个Redis服务器连接到另一个Redis服务器: 当一个Redis服务器连接到另一个Redis服务器,并向对方发送SYNC命令来开始一次复制操作的时候,如果主服务器目前没有执行BGSAVE操作,或者主服务器并非刚刚执行完BGSAVE操作,那么主服务器就会执行BGSAVE命令

7.1.1 save

save #手动执行一次保存操作,会阻塞
bgsave # redis会在后台异步进行快照操作,同时可以响应客户端的请求
shutdown # 如果没有开启aof,会触发持久化
执行flushall命令,但是里面是空的,无意义

# 在redis.conf中配置
save 900 1           #在900秒(15分钟)之后,如果至少有1个key发生变化,Redis就会自动触发BGSAVE命令创建快照。
save 300 10          #在300秒(5分钟)之后,如果至少有10个key发生变化,Redis就会自动触发BGSAVE命令创建快照。
save 60 10000        #在60秒(1分钟)之后,如果至少有10000个key发生变化,Redis就会自动触发BGSAVE命令创建快照。

客户端执行save命令,该命令强制redis执行快照,这时候redis处于阻塞状态,不会响应任何其他客户端发来的请求,直到RDB快照文件执行完毕,所以请慎用。

save指令操作配置:需要在conf文件中配置

  • dbfilename dump.rdb
    • 说明:设置持久化后文件名(本地数据库文件名),默认值为 dump.rdb
    • 经验:通常设置为dump-端口号.rdb
  • dir
    • 说明:设置存储.rdb、log等文件的路径。加载位置也是这个位置
    • 经验:通常设置成存储空间较大的目录中,目录名称data
    • 保存时间分3种
  • rdbcompression yes
    • 说明:设置存储至本地数据库时是否压缩数据,默认为 yes,采用 LZF 压缩
    • 经验:通常默认为开启状态,如果设置为no,可以节省 CPU 运行时间,但会使存储的文件变大(巨大)
  • rdbchecksum yes
    • 说明:设置是否进行RDB文件格式校验,该校验过程在写文件和读文件过程均进行
    • 经验:通常默认为开启状态,如果设置为no,可以节约读写性过程约10%时间消耗,但是存储一定的数据损坏风险

要点:

  • save一次之后,删除rdb文件,重新save一次,rdb文件又出现(恢复)了
  • 退出后再登录,还有数据

RDB工作原理:

如图,4个客户端分别输入指令,redis是单线程的,所以需要排序。

有个问题是如果save是第三个,save执行时间很长,所以后面的get需要等待很长时间。就会阻塞。所以线上不建议使用save指令,会很拖慢性能。

注意: save指令的执行会阻塞当前Redis服务器, 直到当前RDB过程完成为止, 有可能会造成长时间阻塞, 线上环境不建议使用。

其他

# 查看最近一次持久化时间:
info Persistence

# 查看存储文件位置
CONFIG GET dir

7.1.2 bgsave

bgsave # 后台save#手动启动后台保存操作,但不是立即执行  

bgsave原理:

  1. 客户端执行bgsave命令,redis主进程收到指令并判断此时是否在执行bgrewriteaof(AOF文件重写过程,后续会讲解),如果此时正好在执行则bgsave直接返回,不fork子进程,如果没有执行bgrewriteaof重写AOF文件,则进入下一个阶段;
  2. 主进程调用fork方法创建子进程,在创建子进程过程中redis主进程阻塞,所以不能响应客户端请求;
  3. 子进程创建完成以后,bgsave命令返回“Background saving started”,此时标志着redis可以响应客户端请求了;
  4. 子进程根据主进程的内存副本创建临时快照文件,当快照文件完成以后对原快照文件进行替换;
  5. 子进程发送信号给redis主进程完成快照操作,主进程更新统计信息(info Persistence可查看),子进程退出;

注意: bgsave命令是针对save阻塞问题做的优化。 Redis内部所有涉及到RDB操作都采用bgsave的方式, save命令可以放弃使用

bgsave命令可以理解为background save即:“后台保存”。当执行bgsave命令时,redis会fork出一个子进程来执行快照生成操作,需要注意的redis是在fork子进程这个简短的时间redis是阻塞的(此段时间不会响应客户端请求,),当子进程创建完成以后redis响应客户端请求。其实redis自动快照也是使用bgsave来完成的。

bgsave相关配置:

dbfilename dump.rdb
dir
rdbcompression yes
rdbchecksum yes
stop-writes-on-bgsave-error yes
说明:后台存储过程中如果出现错误现象,是否停止保存操作
经验:通常默认为开启状态

但是save和bgsave都需要手动保存,难免疏忽,使用需要自动执行。

conf文件配置持久化条件:

# 在配置文件中设置
格式:save 时间段 key修改次数
作用:满足限定时间范围内key的变化数量达到指定数量即进行持久化。即时间片内变化大才自动保存,是快照的思想。

 位置:在conf文件中进行配置,把3个都关掉就把RDB持久化关掉了。或者save "" # 但是集群环境下RDB是关不掉的
save 900 1 # 900s内变化一个就保存
save 300 10
save 60 10000

配置文件中save原理

发送3条指令,每条都会返回个结果,通过结果判断这条指令算不算影响数量。不进行数据比对指的是两个set就都算

RDB三种保存方式对比:

方式

save指令

bgsave指令

save配置

读写

同步

异步

阻塞客户端指令

额外内存消耗

启动新进程

RDB特殊启动形式:

  • 全量复制:在主从复制中详细讲解
  • 服务器运行过程中重启:debug reload
  • 关闭服务器时指定保存数据:shutdown、 save

RDB优点:

RDB是一个紧凑压缩的二进制文件, 存储效率较高
 RDB内部存储的是redis在某个时间点的数据快照, 非常适合用于数据备份,全量复制等场景
 RDB恢复数据的速度要比AOF快很多
 应用:服务器中每X小时执行bgsave备份,并将RDB文件拷贝到远程机器中,用于灾难恢复。

RGB缺点:

RDB方式无论是执行指令还是利用配置,无法做到实时持久化,具有较大的可能性丢失数据
 bgsave指令每次运行要执行fork操作创建子进程, 要牺牲掉一些性能
 Redis的众多版本中未进行RDB文件格式的版本统一,有可能出现各版本服务之间数据格式无法兼容现象

  1. 在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里。

  2. 备份是如何执行的

Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。整个过程中,主进程是不进行任何IO操作的,这就确保了极高的性能如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失。

  1. 关于fork

在Linux程序中,fork()会产生一个和父进程完全相同的子进程,但子进程在此后多会exec系统调用,出于效率考虑,Linux中引入了“写时复制技术”,一般情况父进程和子进程会共用同一段物理内存,只有进程空间的各段的内容要发生变化时,才会将父进程的内容复制一份给子进程。

  1. RDB保存的文件名

在redis.conf中配置文件名称,默认为dump.rdb

  1. RDB文件的保存路径

默认为Redis启动时命令行所在的目录下,也可以修改dir

  1. RDB的保存策略

  2. 手动保存快照

save: 只管保存,其它不管,全部阻塞

bgsave:按照保存策略自动保存

  1. RDB的相关配置
  • stop-writes-on-bgsave-error yes

当Redis无法写入磁盘的话,直接关掉Redis的写操作

  • rdbcompression yes

进行rdb保存时,将文件压缩

  • rdbchecksum yes

在存储快照后,还可以让Redis使用CRC64算法来进行数据校验,但是这样做会增加大约10%的性能消耗,如果希望获取到最大的性能提升,可以关闭此功能

  1. RDB的备份 与恢复

备份:先通过config get dir 查询rdb文件的目录 , 将*.rdb的文件拷贝到别的地方

恢复: 关闭Redis,把备份的文件拷贝到工作目录下,启动redis,备份数据会直接加载。

  1. RDB的优缺点

优点: 节省磁盘空间,恢复速度快.

缺点: 虽然Redis在fork时使用了写时拷贝技术,但是如果数据庞大时还是比较消耗性能。 在备份周期在一定间隔时间做一次备份,所以如果Redis意外down掉的话,就会丢失最后一次快照后的所有修改

7.2 AOF

RDB弊端:

存储数据量较大,效率较低
基于快照思想,每次读写都是全部数据,当数据量巨大时,效率非常低
 大数据量下的IO性能较低
 基于fork创建子进程,内存产生额外消耗
 宕机带来的数据丢失风险

解决思路:

不写全数据,仅记录部分数据
 降低区分数据是否改变的难度,改记录数据为记录操作过程
 对所有操作均进行记录,排除丢失数据的风险

什么是AOF:

  • AOF(append only file)持久化:以独立日志的方式记录每次(写)命令,重启时再重新执行AOF文件中命令达到恢复数据的目的。与RDB相比可以简单描述为改记录数据为记录数据产生的过程
  • AOF的主要作用是解决了数据持久化的实时性,目前已经是Redis持久化的主流方式
  • 比如10条incre命令,就自动替换为了set lock 11

当redis存储非临时数据时,为了降低redis故障而引起的数据丢失,redis提供了AOF(Append Only File)持久化,从单词意思讲,将命令追加到文件。AOF可以将Redis执行的每一条写命令追加到磁盘文件(appendonly.aof)中,在redis启动时候优先选择从AOF文件恢复数据。由于每一次的写操作,redis都会记录到文件中,所以开启AOF持久化会对性能有一定的影响,但是大部分情况下这个影响是可以接受的,我们可以使用读写速率高的硬盘提高AOF性能。与RDB持久化相比,AOF持久化数据丢失更少,其消耗内存更少(RDB方式执行bgsve会有内存拷贝)。

默认情况下,redis是关闭了AOF持久化,开启AOF通过配置appendonly为yes开启,我们修改配置文件或者在命令行直接使用config set修改,在用config rewrite同步到配置文件。通过客户端修改好处是不用重启redis,AOF持久化直接生效。

AOF刷新缓存区

AOF写数据过程:

当客户端发出一条指令给服务器时,服务器收到并没有马上记录,而是放到临时区域:刷新缓存区,缓存区是最终存成文件时用的。

AOF持久化策略(写数据)

与快照持久化相比,AOF持久化 的实时性更好,因此已成为主流的持久化方案。默认情况下Redis没有开启AOF(append only file)方式的持久化,可以通过appendonly参数开启:

开启AOF功能:appendonly yes|no(默认)

从缓冲区同步到文件调用的是fsync方法
1.追加写入

redis将每一条写命令以redis通讯协议添加至缓冲区aof_buf,这样的好处在于在大量写请求情况下,采用缓冲区暂存一部分命令随后根据策略一次性写入磁盘,这样可以减少磁盘的I/O次数,提高性能。

2.同步命令到硬盘

当写命令写入aof_buf缓冲区后,redis会将缓冲区的命令写入到文件。

什么情况下缓冲区同步到文件中是个问题。redis提供了三种同步策略,由配置参数appendfsync决定,下面是每个策略所对应的含义:
可以在conf文件中配置appendfsync

  • no:不使用fsync方法同步,而是交给操作系统write函数去执行同步操作,在linux操作系统中大约每30秒刷一次缓冲。这种情况下,缓冲区数据同步不可控,并且在大量的写操作下,aof_buf缓冲区会堆积会越来越严重,一旦redis出现故障,数据丢失严重。
  • always:表示每次有写操作都调用fsync方法强制内核将数据写入到aof文件。这种情况下由于每次写命令都写到了文件中, 虽然数据比较安全,但是因为每次写操作都会同步到AOF文件中,所以在性能上会有影响,同时由于频繁的IO操作,硬盘的使用寿命会降低。
  • everysec:数据将使用调用操作系统write写入文件,并使用fsync每秒一次从内核刷新到磁盘。 这是折中的方案,兼顾性能和数据安全,所以redis默认推荐使用该配置。

dir:与RDB同,默认appendonly.aof

appendfilename:AOF持久化文件名,默认文件名未appendonly.aof,建议配置为appendonly-端口号.aof

开启AOF持久化后每执行一条会更改Redis中的数据的命令,Redis就会将该命令写入硬盘中的AOF文件。

AOF写数据遇到的问题:

虽然AOF持久化非常灵活地提供了多种不同的选项来满足不同应用程序对数据安全的不同要求,但AOF持久化也有缺陷——AOF文件的体积太大。

AOF重写

随着命令不断写入AOF,文件会越来越大,为了解决这个问题, Redis引入了AOF重写机制压缩文件体积,redis能够调用bgrewriteaof对日志文件进行重写。 AOF文件重写是将Redis进程内的数据转化为写命令同步到新AOF文件的过程。 简单说就是将对同一个数据的若干个条命令执行结果转化成最终结果数据对应的指令进行记录。

AOF重写可以产生一个新的AOF文件,这个新的AOF文件和原有的AOF文件所保存的数据库状态一样,但体积更小。

触发AOF文件重写条件(后续会说明)时候,redis将使用bgrewriteaof对AOF文件进行重写。这样的好处在于减少AOF文件大小,同时有利于数据的恢复。

为了解决AOF体积过大的问题,用户可以向Redis发送 BGREWRITEAOF命令 ,这个命令会通过移除AOF文件中的冗余命令来重写(rewrite)AOF文件来减小AOF文件的体积。BGREWRITEAOF命令和BGSAVE创建快照原理十分相似,所以AOF文件重写也需要用到子进程,这样会导致性能问题和内存占用问题,和快照持久化一样。更糟糕的是,如果不加以控制的话,AOF文件的体积可能会比快照文件大好几倍。

AOF重写过程(AOF 重写缓冲区)

在执行 BGREWRITEAOF命令时,Redis 服务器会维护一个 AOF 重写缓冲区,该缓冲区会在子进程创建新AOF文件期间,记录服务器执行的所有写命令当子进程完成创建新AOF文件的工作之后,服务器会将重写缓冲区中的所有内容追加到新AOF文件的末尾,使得新旧两个AOF文件所保存的数据库状态一致。最后,服务器用新的AOF文件替换旧的AOF文件,以此来完成AOF文件重写操作

AOF文件重写过程与RDB快照bgsave工作过程有点相似,都是通过fork子进程,由子进程完成相应的操作,同样的在fork子进程简短的时间内,redis是阻塞的:

重写过程说明:

aof_rewrite_buf 代表重写缓冲区 。aof_buf代表写命令存放的缓冲区

  • 1.开始bgrewriteaof,判断当前有没有bgsave命令(RDB持久化)/bgrewriteaof在执行,倘若有,则这些命令执行完成以后在执行。
  • 2.主进程fork出子进程,在这一个短暂的时间内,redis是阻塞的。
  • 3.主进程fork完子进程继续接受客户端请求,所有写命令依然写入AOF文件缓冲区并根据appendfsync策略同步到磁盘,保证原有AOF文件完整和正确。由于fork的子进程仅仅只共享主进程fork时的内存,因此Redis使用采用重写缓冲区(aof_rewrite_buf)机制保存fork之后的客户端的写请求,防止新AOF文件生成期间丢失这部分数据。此时,客户端的写请求不仅仅写入原来aof_buf缓冲,还写入重写缓冲区(aof_rewrite_buf)。
  • 4.子进程通过内存快照,按照命令重写策略写入到新的AOF文件。
    • 4.1子进程写完新的AOF文件后,向主进程发信号,父进程更新统计信息。
    • 4.2主进程把AOFaof_rewrite_buf中的数据写入到新的AOF文件(避免写文件是数据丢失)。
  • 5.使用新的AOF文件覆盖旧的AOF文件,标志AOF重写完成。

重写时机:

redis开启在AOF功能开启的情况下,会维持以下三个变量

  • 记录当前AOF文件大小的变量aof_current_size。

  • 记录最后一次AOF重写之后,AOF文件大小的变量aof_rewrite_base_size。

  • 增长百分比变量aof_rewrite_perc。

    #手动重写
    命令行输入bgrewriteaof

    #自动重写
    #配置文件里写
    auto-aof-rewrite-min-size 64Mb #达到这个大小才重写 #改为5GB,因为会fork子进程,我们尽量让他少触发
    auto-aof-rewrite-percentage 100 # 达到百分比

    每次重写完后重新设置baseSize大小,下一次重写的标准是相当于这个baseSize计算的
    系统载入时或者上次重写完毕时,Redis会记录此时AOF大小,设为base_size,如果Redis的AOF当前大小>= base_size +base_size*100% (默认)且当前大小>=64mb(默认)的情况下,Redis会对AOF进行重写。

     自动重写触发比对参数( 运行指令info Persistence获取具体信息 )
    aof_current_size
    aof_base_size

     自动重写触发条件

每次当serverCron(服务器周期性操作函数)函数执行时,它会检查以下条件是否全部满足,如果全部满足的话,就触发自动的AOF重写操作:

  • 没有BGSAVE命令(RDB持久化)/AOF持久化在执行;
  • 没有BGREWRITEAOF在进行;
  • 当前AOF文件大小要大于server.aof_rewrite_min_size的值;
  • 当前AOF文件大小和最后一次重写后的大小之间的比率等于或者大于指定的增长百分比(auto-aof-rewrite-percentage参数)

AOP重写会fork子进程

AOF重写作用

  • 降低磁盘占用量,提高磁盘利用率
  • 提高持久化效率,降低持久化写时间,提高IO性能
  • 提高数据恢复效率

AOF重写规则

  • 进程内已超时的数据不再写入文件
  • 忽略无效指令,重写时使用进程内数据直接生成,这样新的AOF文件只保留最终数据的写入命令
    如del key1、 hdel key2、 srem key3、 set key4 111、 set key4 222等
  • 对同一数据的多条写命令合并为一条命令
    如lpush list1 a、 lpush list1 b、 lpush list1 c 可以转化为: lpush list1 a b c。
  • 为防止数据量过大造成客户端缓冲区溢出,对list、 set、 hash、 zset等类型, 每条指令最多写入64个元素

7.3 RDB+AOF混合持久化

redis4.0相对与3.X版本其中一个比较大的变化是4.0添加了新的混合持久化方式。前面已经详细介绍了AOF持久化以及RDB持久化,这里介绍的混合持久化就是同时结合RDB持久化以及AOF持久化混合写入AOF文件。这样做的好处是可以结合 rdb 和 aof 的优点, 快速加载同时避免丢失过多的数据,缺点是 aof 里面的 rdb 部分就是压缩格式不再是 aof 格式,可读性差。

开启混合持久化

  • 4.0版本的混合持久化默认关闭的,通过aof-use-rdb-preamble配置参数控制,yes则表示开启,no表示禁用,
  • 5.0之后默认开启。

如果把混合持久化打开,AOF 重写的时候就直接把 RDB 的内容写到 AOF 文件开头。这样做的好处是可以结合 RDB 和 AOF 的优点, 快速加载同时避免丢失过多的数据。当然缺点也是有的, AOF 里面的 RDB 部分是压缩格式不再是 AOF 格式,可读性较差。

redis4.0之后的混合持久化是对重写的进一步优化,他以RDB格式保存更节约内存

  • RBD:会fork子进程,有可能会丢失最后一次写入的数据,启动redis的适合,从磁盘加载持久化数据快
  • AOF:不会fork子进程,最多丢失不会超过2s的数据(此时是因为选用是EverySec),启动redis的适合,从磁盘加载持久化数据不如RDB
  • AOF重写:会fork子进程

混合持久化同样也是通过bgrewriteaof完成的,不同的是当开启混合持久化时,fork出的子进程先将共享的内存副本全量的以RDB方式写入aof文件,然后在将重写缓冲区的增量命令以AOF方式写入到文件,写入完成后通知主进程更新统计信息,并将新的含有RDB格式和AOF格式的AOF文件替换旧的的AOF文件。简单的说:新的AOF文件前半段是RDB格式的全量数据后半段是AOF格式的增量数据,如下图:

  • 优点:混合持久化结合了RDB持久化 和 AOF 持久化的优点, 由于绝大部分都是RDB格式,加载速度快,同时结合AOF,增量的数据以AOF方式保存了,数据更少的丢失。
  • 缺点:兼容性差,一旦开启了混合持久化,在4.0之前版本都不识别该aof文件,同时由于前部分是RDB格式,阅读性较差

数库恢复:

  • aof文件开头是rdb的格式, 先加载 rdb内容再加载剩余的 aof。
  • aof文件开头不是rdb的格式,直接以aof格式加载整个文件

AOF工作流程:

  • Always时,Set正常执行,另开一个子进程进行重写。
  • Sec时会先放到缓存区。

AOF重写流程:

AOF缓冲区同步文件策略, 由参数appendfsync控制
系统调用write和fsync说明:

 write操作会触发延迟写( delayed write) 机制, Linux在内核提供页缓冲区用来提高硬盘IO性能。 write操作在写入系统缓冲区后直接返回。 同步硬盘操作依赖于系统调度机制, 列如:缓冲区页空间写满或达到特定时间周期。 同步文件之前, 如果此时系统故障宕机, 缓冲区内数据将丢失。

 fsync针对单个文件操作( 比如AOF文件) , 做强制硬盘同步, fsync将阻塞知道写入硬盘完成后返回, 保证了数据持久化。

除了write、 fsync、 Linx还提供了sync、 fdatasync操作, 具体API说明参见:

  1. AOF文件故障备份

AOF的备份机制和性能虽然和RDB不同, 但是备份和恢复的操作同RDB一样,都是拷贝备份文件,需要恢复时再拷贝到Redis工作目录下,启动系统即加载

  1. AOF文件故障恢复

如遇到AOF文件损坏,可通过redis-check-aof --fix appendonly.aof 进行恢复

  1. Rewrite

l AOF采用文件追加方式,文件会越来越大为避免出现此种情况,新增了重写机制,当AOF文件的大小超过所设定的阈值时,Redis就会启动AOF文件的内容压缩,只保留可以恢复数据的最小指令集.可以使用命令bgrewriteaof。

l Redis如何实现重写

AOF文件持续增长而过大时,会fork出一条新进程来将文件重写(也是先写临时文件最后再rename),遍历新进程的内存中数据,每条记录有一条的Set语句。重写aof文件的操作,并没有读取旧的aof文件,而是将整个内存中的数据库内容用命令的方式重写了一个新的aof文件,这点和快照有点类似。

7.3 RDB和AOF比较

持久化方式

RDB

AOF

占用存储空间

小(数据级:压缩)

大(指令级:重写)

存储速度

恢复速度

数据安全性

同步时间间隔大

同步时间间隔小

资源消耗

高/重量级

低/轻量级

启动优先级

AOF和RDB同时开启,redis听谁的?:官方建议 两种持久化机制同时开启,如果两个同时开启 优先使用aof

  • 官方推荐两个都启用。
  • 如果对数据不敏感,可以选单独用RDB
  • 不建议单独用 AOF,因为可能会出现Bug。
  • 如果只是做纯内存缓存,可以都不用
  • 双保险策略, 同时开启 RDB 和 AOF, 重启后, Redis优先使用 AOF 来恢复数据,降低丢失数据的量

对数据非常敏感, 建议使用默认的AOF持久化方案
 AOF持久化策略使用everysecond,每秒钟fsync一次。该策略redis仍可以保持很好的处理性能, 当出
现问题时,最多丢失0-1秒内的数据。
 注意:由于AOF文件存储体积较大,且恢复速度较慢
 数据呈现阶段有效性,建议使用RDB持久化方案
 数据可以良好的做到阶段内无丢失(该阶段是开发者或运维人员手工维护的),且恢复速度较快,阶段
点数据恢复通常采用RDB方案
 注意:利用RDB实现紧凑的数据持久化会使Redis降的很低,慎重总结:
 综合比对
 RDB与AOF的选择实际上是在做一种权衡,每种都有利有弊
 如不能承受数分钟以内的数据丢失,对业务数据非常敏感, 选用AOF
 如能承受数分钟以内的数据丢失, 且追求大数据集的恢复速度, 选用RDB
 灾难恢复选用RDB

开机加载持久化步骤

优先加载aof文件

⑧ 过期数据删除策略

Redis中有个设置时间过期的功能,即对存储在 redis 数据库中的值可以设置一个过期时间。作为一个缓存数据库,这是非常实用的。如我们一般项目中的 token 或者一些登录信息,尤其是短信验证码都是有时间限制的,按照传统的数据库处理方式,一般都是自己判断过期,这样无疑会严重影响项目性能。

如果假设你设置了一批 key 只能存活1个小时,那么接下来1小时后,redis是怎么对这批key进行删除的?

过期的数据真的被删除了吗?扔垃圾我们往往都是过会再扔。等CPU空闲时候再处理扔。这就是删除策略

  • 1 定时删除
  • 2 惰性删除
  • 3 定期删除

0 时效性数据的存储结构

这四种操作会给key设置一个过期时间,这个过期时间存放在expires区域里。右面的1359…是一个系统时间点

数据删除策略的目标:在内存占用与CPU占用之间寻找一种平衡,顾此失彼都会造成整体redis性能的下降,甚至引发服务器宕机或内存泄露

内存释放的策略:Redis中有专门释放内存的函数:freeMmoryIfNeeded。每当执行一个命令的时候,就会调用该函数来检测内存是否够用。如果已用内存大于最大内存限制,它就会进行内存释放。

代码:https://blog.csdn.net/libafei/article/details/80311372

1 定时删除

 创建一个定时器,当key设置有过期时间,且过期时间到达时,由定时器任务立即执行对键的删除操作。此时存储空间的东西也删除了expires空间的内容也删除了。(拿时间换空间)

 优点:节约内存,到时就删除,快速释放掉不必要的内存占用

 缺点: CPU压力很大,无论CPU此时负载量多高,均占用CPU去进行删除,会影响redis服务器响应时间和指令吞吐量

2 惰性删除

数据到达过期时间,不做处理(此时还在expires区里存在)。等下次访问该数据时

  • 如果未过期,返回数据
  • 发现已过期,删除,返回不存在

惰性删除由db.c/expireIfNeeded()函数实现,所有读写数据库的命令在执行之前都会调用expireIfNeeded()函数对要操作的key进行检查。如果key已经过期,那么将会将key从数据库中删除

 优点:节约CPU性能,发现必须删除的时候才删除(拿时间换空间)
 缺点:内存压力很大,出现长期占用内存的数据

3 定期删除

https://www.jianshu.com/p/d0be3c255fc6
两种方案都走极端,有没有折中方案?

redis默认是每隔 100ms 就随机抽取一些设置了过期时间的key,检查其是否过期,如果过期就删除。注意这里是随机抽取的。为什么要随机呢?你想一想假如 redis 存了几十万个 key ,每隔100ms就遍历所有的设置过期时间的 key 的话,就会给 CPU 带来很大的负载!

定期删除由函数redis.c/activeExpireCycle()函数实现,每当server在调用beforeSleep()和serverCron()时,都会被调用。

每个库都有一个expire区,expire[0]…expire[15],即16个db

  • Redis启动服务器初始化时,读取配置server.hz的值,默认为10。(通过info server查询),该值代表CPU每秒对16个库整体进行的查询次数。每次过期key清理的时间不超过CPU时间的25%,即若hz=1,则一次清理时间最大为250ms,若hz=10,则一次清理时间最大为25ms;清理时依次遍历所有的db;从db中随机取20个key,判断是否过期,若过期,则逐出;若有5个以上key过期,则重复步骤4,否则遍历下一个db;在清理过程中,若达到了25%CPU时间,退出清理过程;

  • 每秒钟执行server.hz次(10) 函数==serverCron()==对服务器进行定时轮询。其中会调用:==databaseCron()==继续对每个库进行轮询,该函数会调用:==activeExporeCycle()==对变量进行检查

  • activeExpireCycle()对每个expires[0]里的变量逐一进行检测,每次执行250ms/server.hz

  • 对某个expires[0]检测时,随机挑选W个key检测

    • W=ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP属性值(配置文件中)
    • 对这W个key进行以下操作:
    • 如果key超时,删除key
    • 如果一轮中删除的key的数量>W*25%,说明删除的量比较大,很可能还有很多没删除的,循环该过程
    • 如果一轮中删除的key的数量≤W*25%,检查下一个expires[1],如此轮询16个库, 0-15循环
  • 那么下次轮询从几号库开始查询呢?:参数current_db用于记录activeExpireCycle() 进入哪个expires[] 执行。如果activeExpireCycle()执行时间到期,下次从current_db继续向下执行。取值0-15?

定期删除总结:

 特点1: CPU性能占用设置有峰值,检测频度可自定义设置
 特点2:内存压力不是很大,长期占用内存的冷数据会被持续清理

周期性抽查,抽查不合格的停着监督让整改后再检测他

但是仅仅通过设置过期时间还是有问题的。我们想一下:如果定期删除漏掉了很多过期 key,然后你也没及时去查,也就没走惰性删除,此时会怎么样?如果大量过期key堆积在内存里,导致redis内存块耗尽了。怎么解决这个问题呢? redis 内存淘汰机制。

⑨ 内存不足逐出算法:

https://www.jianshu.com/p/b1b4eeccc140
redis 内存淘汰机制(MySQL里有2000w数据,Redis中只存20w的数据,如何保证Redis中的数据都是热点数据?)

新数据进入检测:当新数据进入redis时,如果redis内存不足怎么办?expire控制的是过期数据,如果都不会过期怎么办?

 Redis使用内存存储数据,在执行每一个命令前,会调用==freeMemoryIfNeeded()==检测内存是否充足。如果内存不满足新加入数据的最低存储要求, redis要临时删除一些数据为当前指令清理存储空间。清理数据的策略称为逐出算法(临时淘汰)。

 注意:逐出算法不一定肯定成功:逐出数据的过程不是100%能够清理出足够的可使用的内存空间,如果不成功则反复执行。当对所有数据尝试完毕后,如果不能达到内存清理的要求,将出现错误信息:

(error) OOM command not allowed when used memory >‘maxmemory’

设置方法:

# 最大可使用内存:占用物理内存的比例,默认值为0,表示不限制,全用掉内存。生产环境中根据需求设定,通常设置在50%以上。
maxmemory
# 每次选取待删除数据的个数:选取数据时并不会全库扫描,导致严重的性能消耗,降低读写性能。因此采用随机获取数据的方式作为待检测删除数据
maxmemory-samples

# 删除策略,即删除哪个:达到最大内存后的,对被挑选出来的数据进行删除的策略  
maxmemory-policy  下面删除策略之一  # maxmemory-policy  volatile-lru
 检测易失数据(可能会过期的数据集server.db[i].expires )
① volatile-lru:挑选最近最少使用的数据淘汰。Least Recently Used。从已设置过期时间的数据集(server.db[i].expires)中挑选最近最少使用的数据淘汰
最久没使用
② volatile-lfu:挑选最近使用次数最少的数据淘汰。Least Frequently Used。从已设置过期时间的数据集(server.db[i].expires)中挑选最不经常使用的数据淘汰。当内存不足以容纳新写入数据时,在键空间中,移除最不经常使用的key
时间段内使用次数最少
③ volatile-ttl:挑选将要过期的数据淘汰。从已设置过期时间的数据集(server.db[i].expires)中挑选将要过期的数据淘汰
④ volatile-random:任意选择数据淘汰。从已设置过期时间的数据集(server.db[i].expires)中任意选择数据淘汰

 检测当前库全库数据(所有数据集server.db[i].dict )
⑤ allkeys-lru:挑选最近最少使用的数据淘汰。当内存不足以容纳新写入数据时,在键空间中,移除最近最少使用的key(这个是最常用的)
⑥ allkeys-lfu:挑选最近使用次数最少的数据淘汰。当内存不足以容纳新写入数据时,在键空间中,移除最不经常使用的key
⑦ allkeys-random:任意选择数据淘汰。从数据集(server.db[i].dict)中任意选择数据淘汰

 放弃数据驱逐
⑧ no-enviction(驱逐):禁止驱逐数据( redis4.0中默认策略),会引发错误OOM( Out Of Memory) 。禁止驱逐数据,也就是说当内存不足以容纳新写入数据时,新写入操作会报错。这个应该没人使用吧!

数据逐出策略配置依据: 使用INFO命令输出监控信息,查询缓存 hit 和 miss 的次数,根据业务需求调优Redis配置

服务器配置

# -----服务器端设置-----
daemonize yes|no # 设置服务器以守护进程的方式运行

bind 127.0.0.1 # 绑定主机地址,如果不绑别人也能访问

port 6379 # 设置服务器端口号
databases 16  # 设置数据库数量


# -----日志配置-----
loglevel debug|verbose|notice|warning #  设置服务器以指定日志记录级别
logfile 端口号.log  #  日志记录文件名

#  -----客户端配置-----
maxclients 0  # 设置同一时间最大客户端连接数,默认无限制。当客户端连接到达上限, Redis会关闭新的连接
# 注意:日志级别开发期设置为verbose即可,生产环境中配置为notice,简化日志输出量,降低写日志IO的频度  
timeout 300 #  客户端闲置等待最大时长,达到最大值后关闭连接。如需关闭该功能,设置为 0  

多服务器快捷配置:

 导入并加载指定配置文件信息,用于快速创建redis公共配置较多的redis实例配置文件,便于维护

include /path/server-端口号.conf

相当于继承

高级数据模型

Pipline

Jedis jedis = new Jedis("192.168.1.2",6379);
// 原来 :n个命令=n个连接时间+n处理时间
for(int i=0;i<100;i++){//原来每条命令都会请求一遍,管道会多条指令请求一次
    // 现在:n个命令=1次连接事件+n次处理时间
    Pipeline pipeline = jedis.pipilined();
    for(int j=i*100;i<(i+1)*100;j++){
        pipeline.hset("bbbb"+j,"bbbb"+j,"bbbb"+j);
    }
    pipeline=syncAndReturnAll(); 
}

Bitmaps

https://blog.csdn.net/qq_16399991/article/details/83512937

场景:

  • 统计日活

点赞,取消点赞,查看是否点赞,统一一共有多少点赞

电影网站是有资源常年没有人浏览,可以删除。年度浏览最低起,月浏览量最低。我们用计算机上最小单位bit存储状态。用string

Bitmaps类型的基础操作:

  • getbit key offset:获取指定key对应偏移量上的bit值 :
  • setbit key offset value:设置指定key对应偏移量上的bit值, value只能是1或0。 如setbit 20200606 0 1,就代表这个字符串的第0位为1
  • bitcount key [start][end]:计数1
  • bitop op destkey key[key....] :op可以为and / or / not / nor

比如我们统计电影网站上很多电影每天/每周/每月有没有人观看,一个人都没人看的电影就可以删了节约数据库了。

解决思路是我们可以【每日】给所以电影设置一个字符串,即这个字符串的每一位代表一部电影,如果有人浏览,这部电影对应的那一位就置为1,这样就拿一个字符串就统计了全部电影的每日是否被点击。

如果我们想统计每周各部电影是否被观看,只需要把那七天的7个字符串进行按位或操作即可,即得到了一个字符串,各个位代表这周这部电影是否被观看

下图的意思是第一天有没有被观看,第二天有没有被关键,进行或操作后就是这两天有没有被观看。

谷歌提供的位图计算器

首先导入依赖:guava,其中不只有位图

HyperLogLog:

用处:统计独立UV

 原始方案: set
 存储每个用户的id(字符串)
 改进方案: Bitmaps
 存储每个用户状态( bit)
 全新的方案: Hyperloglog

基数:基数是数据集去重后元素个数,即set
 HyperLogLog 是用来做基数统计的,运用了LogLog的算法

HyperLogLog类型的基本操作

//添加数据
pfadd key element [element ...]

//统计数据
pfcount key [key ...]

//合并数据
pfmerge des tkey sourcekey [sourcekey...]  

Tips 22:
 redis 应用于独立信息统计

相关说明
 用于进行基数统计,不是集合,不保存数据,只记录数量而不是具体数据
 核心是基数估算算法,最终数值存在一定误差
 误差范围:基数估计的结果是一个带有 0.81% 标准错误的近似值
 耗空间极小,每个hyperloglog key占用了12K的内存用于标记基数
 pfadd命令不是一次性分配12K内存使用,会随着基数的增加内存逐渐增大
 Pfmerge命令合并后占用的存储空间为12K,无论合并之前数据量多少

GEO:

GEO类型的基本操作
# 添加坐标点
geoadd key longitude latitude member [longitude latitude member ...]
georadius key longitude latitude radius m|km|ft|mi [withcoord] [withdist] [withhash] [count count]
# 获取坐标点
geopos key member [member ...]
georadiusbymember key member radius m|km|ft|mi [withcoord] [withdist] [withhash] [count count]

# 计算坐标点距离
geodist key member1 member2 [unit]  

# 计算经纬度
geohash key member [member ...]

# 举例
GEOADD locations 116.419217 39.921133 beijing
GEOPOS locations beijing
GEODIST locations beijing tianjin km //算计举例
GEORADIUSBYMEMBER locations beijing 150km //通过举例计算城市
注意:没有删除命令,他的本质是zset(type locations)所以可以使用zrem key member

第8章 redis高可用

先说下演变过程:

  • 单机版
  • 主从复制:复制是高可用redis的基础,哨兵和集群都是在复制基础上实现高可用的。复制主要实现了数据的多机备份,以及对读操作的负载均衡和简单的故障恢复。缺陷是故障恢复无法自动化;写操作无法负载均衡;存储能力受到单机的限制。
  • 哨兵:在复制的基础上,哨兵实现了自动化的故障恢复,实现主从切换。缺陷是写操作无法负载均衡;存储能力受到单机的限制
  • 集群:解决了写操作无法负载均衡,以及存储能力受到单击限制的问题,实现了较为完善的高可用方案

8.1 主从复制

主从复制:即将master中的数据即时、有效的复制到slave中

特征:一个master可以拥有多个slave,一个slave只对应一个master。Master以写为主,Slave以读为主。

master和slave是个相对的概念,下面的slave还可以当做下一级的master

职责:

  • master:
     写数据
     执行写操作时,将出现变化的数据自动同步到slave
     读数据(可忽略)
  • slave:
     读数据
     写数据(禁止)

Redis 中,用户可以通过执行 SLAVEOF 命令或者设置 slaveof 选项,让一个服务器去复制另一个服务器,成为从服务器,以下三种方式是 完全等效 的:

  • 配置文件:在从服务器的配置文件中加入:slaveof <masterip> <masterport>
  • 启动命令:redis-server 启动命令后加入 --slaveof <masterip> <masterport>
  • 客户端命令:Redis 服务器启动后,直接通过客户端执行命令:slaveof <masterip> <masterport>,让该 Redis 实例成为从节点。

需要注意的是:主从复制的开启,完全是在从节点发起的,不需要我们在主节点做任何事情。

在master端配置:

# -----master授权slave访问-------
 master客户端发送命令设置密码`requirepass <password>`
 master配置文件设置密码
`config set requirepass <password>`
`config get requirepass`

 slave客户端发送命令设置密码 `auth <password>`
 slave配置文件设置密码 `masterauth <password>`
 slave启动服务器设置密  `redis-server –a <password>`

# ----------主从断开连接-----------
 客户端发送命令:`slaveof no one `
## 说明:slave断开连接后,不会删除已有数据,只是不再接受master发送的数据  

8.2 主从复制的目的

主从复制的作用:

  • 读写分离: master写、 slave读,提高服务器的读写负载能力
  • 负载均衡: 基于主从结构,配合读写分离,由slave分担master负载,并根据需求的变化,改变slave的数量,通过多个从节点分担数据读取负载,大大提高Redis服务器并发量与数据吞吐量
  • 故障恢复:当master出现问题时,由slave提供服务,实现快速的故障恢复
  • 数据冗余:实现数据热备份,是持久化之外的一种数据冗余方式
  • 高可用基石: 基于主从复制,构建哨兵模式与集群,实现Redis的高可用方案

8.3 主从配置原理

主从复制过程大体可以分为3个阶段

  • 1 建立连接阶段(设置通道)
  • 2 数据同步阶段(RDB全量复制)
  • 3 命令传播阶段()

阶段一:建立连接阶段

 建立slave到master的连接,使master能够识别slave, 并保存slave端口号

  • 步骤1:在slave端设置master的IP和端口,
  • 步骤2:建立socket连接,以后通过这个通道传数据,以后slave会自动周期性发送ping命令(定时器任务)到master,验证master还在不在。master返回pong说明通道还在
  • 步骤3:master查看配置中是否允许slave连接,进行身份验证。同时master也记录了slave的IP和端口
  • 步骤4:master发送授权通过信息给slave,建立了通道

阶段二:同步阶段

思想:在slave初次连接master后,复制master中的所有数据到slave,将slave的数据库状态更新成master当前的数据库状态 。复制完以后再同步数据只复制更改的部分就行了。

数据同步阶段工作流程:
步骤1: slave发送psync2命令请求同步数据
步骤2: 创建RDB同步数据
步骤3: slave拿到master发来的RDB文件后,删除原来slave里的RDB文件,执行文件恢复过程。恢复好后slave就保存了master刚才时刻的RDB。然后通知master恢复好了。

这时就完成了开机后的工作,接下来的命令增量的同步工作叫命令传播阶段。

注意:上面的缓冲区只在部分复制时才起作用,他里面是AOF指令,而不是数据快照。

master说明:

  • \1. 如果master数据量巨大,数据同步阶段应避开流量高峰期,避免造成master阻塞,影响业务正常执行
  • \2. 缓存区设置:复制缓冲区大小设定不合理,会导致数据溢出,后进的指令会冲掉先进的指令。如进行全量复制周期太长,进行部分复制时又出现数据已经存在丢失的情况,必须进行第二次全量复制,致使slave陷入死循环状态。在master端更改:repl-backlog-size 1mb
  • \3. master单机内存占用主机内存的比例不应过大,建议使用50%-70%的内存,留下30%-50%的内存用于执
    行bgsave命令和创建复制缓冲区

slave说明:

  • \1. 为避免slave进行全量复制、部分复制时服务器响应阻塞或数据不同步,建议关闭此期间的对外服务slave-serve-stale-data yes|no
  • \2. 数据同步阶段, master发送给slave信息可以理解master是slave的一个客户端,主动向slave发送命令
  • \3. 多个slave同时对master请求数据同步, master发送的RDB文件增多, 会对带宽造成巨大冲击, 如果master带宽不足, 因此数据同步需要根据业务需求, 适量错峰
  • \4. slave过多时, 建议调整拓扑结构,由一主多从结构变为树状结构, 中间的节点既是master,也是slave。注意使用树状结构时,由于层级深度,导致深度越高的slave与最顶层master间数据同步延迟较大, 数据一致性变差, 应谨慎选择

阶段三:命令传播阶段

 当master数据库状态被修改后,导致主从服务器数据库状态不一致,此时需要让主从数据同步到一致的状态,同步的动作称为命令传播。

数据库的状态信息是通过复制缓冲区实现的。

命令传播阶段出现了断网现象
 网络闪断闪连 忽略
 短时间网络中断 部分复制
 长时间网络中断 全量复制

部分复制的三个核心要素

  • 服务器的运行 id( run id)
  • 主服务器的复制积压缓冲区
  • 主从服务器的复制偏移量 offset

服务器运行ID( runid):

  •  概念:服务器运行ID是每一台服务器每次运行的身份识别码,一台服务器多次运行可以生成多个运行id。每个计算机也有自己的运行ID,slave也有。运行id由40位字符组成,是一个随机的十六进制字符,如fdc9ff13b9bbaab28db42b3d50f852bb5e3fcdce
  • 作用:运行id被用于在服务器间进行传输,识别身份
    如果想两次操作均对同一台服务器进行,必须每次操作携带对应的运行id,用于对方识别。master发送ID,slave比对ID,
  • 实现方式: 运行id在每台服务器启动时自动生成的, master在首次连接slave时,会将自己的运行ID发
    送给slave, slave保存此ID,通过info Server命令,可以查看节点的runid

复制缓冲区

复制缓冲区是一个先进先出FIFO队列,master先把命令信息放到复制缓冲区,不是放整个命令,而是放入AOF日志文件里的$3 \r \n,set等。此时将缓冲区分格,每个格放一个字符。有个类似数组索引的"偏移量"识别当前slave执行到哪里了。所以不同slave的偏移量是不一样的。

 缓冲区大小建议设置如下:
\1. 测算从master到slave的重连平均时长second
\2. 获取master平均每秒产生写命令数据总量write_size_per_second
\3. 最优复制缓冲区空间 = 2 * second * write_size_per_second

复制缓冲区:

  • 复制缓冲区默认数据存储空间大小是1M,由于存储空间大小是固定的,当入队元素的数量大于队列长度时,最先入队的元素会被弹出,而新元素会被放入队列
  • master有一个缓冲区,但有多个offset。
  • 当master接收到主客户端的指令时,除了将指令执行,会将该指令存储到缓冲区中
  • slave请求部分同步时,会把slave的offset(slave)发送过来,master是看slave发送过来的offset(slave)还在不在我们的缓冲区中,如果已经被挤出去,那么就触发全局复制。如果offset(slave)在master还没挤出去,就把还记录的部分再发给slave,同时发送给slave新的offset

offset偏移量:

  • 通过offset区分不同的slave当前数据传播的差异,offset是复制偏移量,描述复制缓冲区中的指令字节位置
    • master复制偏移量:master记录已发送的信息对应的offset(多个),发送一次记录一次
    • slave复制偏移量:slave记录已接收的信息对应的offset(一个),接收一次记录一次

下面的序号是执行的顺序

数据同步总结

master发过来的为红的offset,slave记下来的为蓝的offset。

如果ID不匹配。那么就触发全部复制。或者slave传过来的offset没了(在master里找不到这个偏移量了,被队列挤出去了),那么就触发全量复制。

现象:网络环境不佳,出现网络中断, slave不提供服务

原因:复制缓冲区过小,断网后slave的offset越界,频繁触发全量复制

解决:修改复制缓冲区大小repl-backlog-size

心跳机制

心跳机制:进入命令传播阶段候, master与slave间需要进行信息交换,使用心跳机制进行维护,实现双方连接保持在线

master心跳:

  • 指令: PING
  • 周期:由repl-ping-slave-period决定,默认10秒
  • 作用:判断slave是否在线
  • 查询: INFO replication 获取slave最后一次连接时间间隔, lag项维持在0或1视为正常

slave心跳任务

  • 指令: REPLCONF ACK {offset}
  • 周期: 1秒
  • 作用1:汇报slave自己的复制偏移量,获取最新的数据变更指令
  • 作用2:判断master是否在线

心跳设置:

问题现象:slave与master连接断开
问题原因
 master发送ping指令频度较低:提高ping指令发送的频度:repl-ping-slave-period
 master设定超时时间较短:超时时间repl-time的时间至少是ping指令频度的5到10倍,否则slave很容易判定超时 
 ping指令在网络中存在丢包

心跳阶段注意事项

 当slave多数掉线,或延迟过高时, master为保障数据稳定性,将拒绝所有信息同步操作。都掉了就没有同步的意义了。或者同步完成时间太久了

min-slaves-to-write 2 # 少于2时就不再写了
min-slaves-max-lag 8  # 链接延迟,在info里可以看到

slave数量少于2个,或者所有slave的延迟都大于等于10秒时,强制关闭master写功能,停止数据同步
 slave数量由slave发送REPLCONF ACK命令做确认
 slave延迟由slave发送REPLCONF ACK命令做确认

PING的设置会影响网络,可能会引起网络中断。

主从复制常见问题

这个地方还没看视频

频繁的全量复制( 1)

伴随着系统的运行, master的数据量会越来越大,一旦master重启, runid将发生变化,会导致全部slave的全量复制操作

内部优化调整方案:
\1. master内部创建master_replid变量,使用runid相同的策略生成,长度41位,并发送给所有slave
\2. 在master关闭时执行命令 shutdown save,进行RDB持久化,将runid与offset保存到RDB文件中
 repl-id repl-offset
 通过redis-check-rdb命令可以查看该信息
\3. master重启后加载RDB文件, 恢复数据
重启后,将RDB文件中保存的repl-id与repl-offset加载到内存中
 master_repl_id = repl master_repl_offset = repl-offset
 通过info命令可以查看该信息
作用:
本机保存上次runid,重启后恢复该值,使所有slave认为还是之前的master

频繁的网络中断( 1)

 问题现象
 master的CPU占用过高 或 slave频繁断开连接
 问题原因
 slave每1秒发送REPLCONF ACK命令到master
 当slave接到了慢查询时( keys * , hgetall等),会大量占用CPU性能
 master每1秒调用复制定时函数replicationCron(),比对slave发现长时间没有进行响应
 最终结果
 master各种资源(输出缓冲区、带宽、连接等) 被严重占用
 解决方案
 通过设置合理的超时时间,确认是否释放slave:repl-timeout

该参数定义了超时时间的阈值(默认60秒),超过该值,释放slave

数据不一致

 问题现象
 多个slave获取相同数据不同步
 问题原因
 网络信息不同步,数据发送有延迟
 解决方案
 优化主从间的网络环境,通常放置在同一个机房部署,如使用阿里云等云服务器时要注意此现象
 监控主从节点延迟(通过offset)判断,如果slave延迟过大,暂时屏蔽程序对该slave的数据访问:slave-serve-stale-data yes|no
开启后仅响应info、 slaveof等少数命令(慎用,除非对数据一致性要求很高)

可以在redis里输入 info application查看当前redis的主从状态

8.2 哨兵与选举

哨兵(sentinel) :是一个分布式系统,用于对主从结构中的每台服务器进行监控,当master出现故障时通过投票机制从slave中选择新的master并将所有slave连接到新的master。

哨兵的作用

  • 监控:
    • 不断的检查master和slave是否正常运行。
    • master存活检测、 master与slave运行情况检测
  • 通知(提醒)
    • 当被监控的服务器出现问题时, 向其他(哨兵间,客户端) 发送通知。
  • 自动故障转移
    • 断开master与slave连接,选举一个slave作为master,将其他slave连接到新的master,并告知客户端新的服务器地址

注意:哨兵也是一台redis服务器,只是不提供数据服务。通常哨兵配置数量为单数

哨兵会改变conf文件内容,删除与新增slaveof。应用程序知道哨兵地址就可以了

哨兵2挂了后,剩下两个选id大的

配置哨兵

 配置一拖二的主从结构

 配置三个哨兵(配置相同,端口不同)

步骤:

  • 写好哨兵的配置文件:指定监听的redis(只监听主节点)

  • 启动哨兵:redis-sentinel /myredis/sentinel.conf

    在每个哨兵配置文件中修改

    redis-sentinel-1.conf------------------

    port 26379
    daemonize yes
    logfile "26379.log"
    sentinel monitor mymaster 127.0.0.1 6379 2

    监控127.0.0.1 6379这个主节点, # 只监听主节点

    该主节点名称是mymaster。

    2个哨兵同意才能判定主节点故障并进行故障转移。 #哨兵数量/2+1

    redis-sentinel-2.conf------------------

    port 26380
    daemonize yes
    logfile "26380.log"
    sentinel monitor mymaster 127.0.0.1 6379 2

    redis-sentinel-3.conf------------------

    port 26381
    daemonize yes
    logfile "26381.log"
    sentinel monitor mymaster 127.0.0.1 6379 2

    entinel down-after-milliseconds master01 30000 #30000ms表示认为主节点挂了的超时时间
    sentinel parallel-syncs master01 1 #表示切换主从节点时同时有多少个从节点进行复制
    sentinel failover-timeout master01 180000 #表示切换主从节点超时时间

 启动哨兵

redis-server /usr/local/redis-5.0.3/redis-sentinel-1.conf --sentinel
redis-server /usr/local/redis-5.0.3/redis-sentinel-2.conf --sentinel
redis-server /usr/local/redis-5.0.3/redis-sentinel-3.conf --sentinel

启动顺序:先启动主机,再启动从机,再启动哨兵

配置项

范例

说明

sentinel auth-pass <服务器名称> <password>

sentinel auth-pass mymaster itcast

连接服务器口令

sentinel down-after-milliseconds <自定义服 务名称><主机地址><端口><主从服务器总量>

sentinel monitor mymaster 192.168.194.131 6381 1

设置哨兵监听的主服务器信息,最后的参数决定了最终参与选举的服务器 数量( -1)

sentinel down-after-milliseconds <服务名称><毫秒数(整数) >

sentinel down-after milliseconds mymaster 3000

指定哨兵在监控Redis服务时,判定服务器挂掉的时间周期,默认30秒 ( 30000),也是主从切换的启动条件之一

sentinel parallel-syncs <服务名称><服务器数(整数) >

sentinel parallel-syncs mymaster 1

指定同时进行主从的slave数量,数值越大,要求网络资源越高,要求约 小,同步时间约长

sentinel failover-timeout <服务名称><毫秒数(整数) >

sentinel failover-timeout mymaster 9000

指定出现故障后,故障切换的最大超时时间,超过该值,认定切换失败, 默认3分钟

sentinel notification-script <服务名称><脚本路径>

服务器无法正常联通时,设定的执行脚本,通常调试使用。

查看状态

# 连接端口为 26379 的 Redis 节点
➜  ~ redis-cli -p 26379
127.0.0.1:26379> info Sentinel
# Sentinel
sentinel_masters:1
sentinel_tilt:0
sentinel_running_scripts:0
sentinel_scripts_queue_length:0
sentinel_simulate_failure_flags:0
master0:name=mymaster,status=ok,address=127.0.0.1:6379,slaves=2,sentinels=3

哨兵也是一台redis服务器,只是不提供数据服务。通常哨兵配置数量为单数

@Test
public void sentinel(){
    Set<String> sentinels=new HashSet<String>();
    sentinels.add("192.168.58.145:26379");
    sentinels.add("192.168.58.145:26380");
    sentinels.add("192.168.58.145:26381");
    GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
    JedisSentinelPool jedisSentinelPool = new JedisSentinelPool("master01",sentinels,poolConfig);
    Jedis jedis=null;
    try{
        jedis=jedisSentinelPool.getResource();
        jedis.set("hello", "world");
        System.out.println(jedis.get("hello"));
    }catch (Exception e) {
        e.printStackTrace();
    }finally{
        jedis.close();
    }
}

哨兵工作原理:

https://www.cnblogs.com/bingshu/p/9776610.html

主从切换

哨兵在进行主从切换过程中经历三个阶段
 监控
 通知
 故障转移

阶段一:监控阶段

哨兵里只配置了主节点的信息,哨兵要通过主节点拿到别的哨兵的信息

获取master的信息,从而获取到slave的信息

主节点

  • runid
  • role

从节点

  • runid
  • role
  • master-host,master-port
  • offset

流程:

  • 哨兵1向master发送info,然后建立了一个cmd连接(创建 cmdpub/sub 两个 连接Sentinel 通过 cmd 连接给 Redis 发送命令,通过 pub/sub 连接到 Redis 实例上的其他 Sentinel 实例。),
  • 哨兵1拿到了master、slaves、sentiels,主节点拿到了master、slaves、sentiels
  • 哨兵1根据拿到的slaves信息去连每个slaves
  • -----------
  • 哨兵2连接master,看到有master里有哨兵1,哨兵2就拿到了哨兵1的信息,去连接哨兵1(哨兵1和哨兵2互相发布订阅),哨兵1和哨兵2同步了
  • 哨兵3连接master,看到有哨兵1和2,于是各个哨兵互联
  • 总结:哨兵会向master、slave和其他哨兵获取状态。哨兵间会组件网络同步信息。

阶段二:通知阶段
  • 主哨兵1去问master和slave的状态,然后哨兵1把状态告诉哨兵2和3
  • 过了一会可能是哨兵2做了上面工作然后告诉其他哨兵

阶段三:故障转移阶段

哨兵1给master发hello发现没人理,就判断为master宕机,告诉其他哨兵,其他哨兵也验证一下挂没挂(半数以上就可以,前面配置的),于是选举新master

  • 主观下线代表只有一台哨兵认为挂了Sdown;客观下线代表半数哨兵都认为master挂了Odown
    • 主观下线 适用于所有 主节点从节点。如果在 down-after-milliseconds 毫秒内,Sentinel 没有收到 目标节点 的有效回复,则会判定 该节点主观下线
    • 客观下线 只适用于 主节点。如果 主节点 出现故障,Sentinel 节点会通过 sentinel is-master-down-by-addr 命令,向其它 Sentinel 节点询问对该节点的 状态判断。如果超过 <quorum> 个数的节点判定 主节点 不可达,则该 Sentinel 节点会判断 主节点客观下线
  • 投票机制先选举哪个哨兵负责去处理选举新master的任务。3个哨兵同时投票自己(还会带自己的竞选次数),哨兵先收到谁的投票就把票投给谁,从而得到哨兵领导
  • 哨兵领导去看哪个slave适合当master
    • 找在线的
    • pass响应慢的
    • pass与master断开时间久的
    • 优先级:offset、runid
  • 通知被选上的slave当master,通知其他slave新master

应用程序连哨兵,哨兵知道当前谁是主节点。

Redis Sentinel的工作原理

每个 Sentinel 节点都需要 定期执行 以下任务:

  • 每个 Sentinel每秒钟 一次的频率,向它所知的 主服务器从服务器 以及其他 Sentinel 实例 发送一个 PING 命令。

如果一个 实例instance)距离 最后一次 有效回复 PING 命令的时间超过 down-after-milliseconds 所指定的值,那么这个实例会被 Sentinel 标记为 主观下线

如果一个 主服务器 被标记为 主观下线,那么正在 监视 这个 主服务器所有Sentinel 节点,要以 每秒一次 的频率确认 主服务器 的确进入了 主观下线 状态。

如果一个 主服务器 被标记为 主观下线,并且有 足够数量Sentinel(至少要达到 配置文件 指定的数量)在指定的 时间范围 内同意这一判断,那么这个 主服务器 被标记为 客观下线

在一般情况下, 每个 Sentinel 会以每 10 秒一次的频率,向它已知的所有 主服务器从服务器 发送 INFO 命令。当一个 主服务器Sentinel 标记为 客观下线 时,Sentinel下线主服务器 的所有 从服务器 发送 INFO 命令的频率,会从 10 秒一次改为 每秒一次

Sentinel 和其他 Sentinel 协商 主节点 的状态,如果 主节点 处于 SDOWN 状态,则投票自动选出新的 主节点。将剩余的 从节点 指向 新的主节点 进行 数据复制

当没有足够数量的 Sentinel 同意 主服务器 下线时, 主服务器客观下线状态 就会被移除。当 主服务器 重新向 SentinelPING 命令返回 有效回复 时,主服务器主观下线状态 就会被移除。

注意:一个有效的 PING 回复可以是:+PONG-LOADING 或者 -MASTERDOWN。如果 服务器 返回除以上三种回复之外的其他回复,又或者在 指定时间 内没有回复 PING 命令, 那么 Sentinel 认为服务器返回的回复 无效non-valid)。

8.3 Redis集群

要解决的问题:

  • 并发写
  • 扩容

集群好处:

  • Redis 集群实现了对Redis的水平扩容,即启动N个redis节点,将整个数据库分布存储在这N个节点中,每个节点存储总数据的1/N。
  • Redis 集群通过分区(partition)来提供一定程度的可用性(availability): 即使集群中有一部分节点失效或者无法进行通讯, 集群也可以继续处理命令请求
  • 数据分区: 数据分区 (或称数据分片) 是集群最核心的功能。集群将数据分散到多个节点,一方面 突破了 Redis 单机内存大小的限制,存储容量大大增加另一方面 每个主节点都可以对外提供读服务和写服务,大大提高了集群的响应能力。Redis 单机内存大小受限问题,在介绍持久化和主从复制时都有提及,例如,如果单机内存太大,bgsavebgrewriteaoffork 操作可能导致主进程阻塞,主从环境下主机切换时可能导致从节点长时间无法提供服务,全量复制阶段主节点的复制缓冲区可能溢出……
  • 高可用: 集群支持主从复制和主节点的 自动故障转移 (与哨兵类似),当任一节点发生故障时,集群仍然可以对外提供服务。

0-16383共16384个槽位

上图 展示了 Redis Cluster 典型的架构图,集群中的每一个 Redis 节点都 互相两两相连,客户端任意 直连 到集群中的 任意一台,就可以对其他 Redis 节点进行 读写 的操作。

基本原理

Redis 集群中内置了 16384 个哈希槽。当客户端连接到 Redis 集群之后,会同时得到一份关于这个 集群的配置信息,当客户端具体对某一个 key 值进行操作时,会计算出它的一个 Hash 值,然后把结果对 16384 求余数,这样每个 key 都会对应一个编号在 0-16383 之间的哈希槽,Redis 会根据节点数量 大致均等 的将哈希槽映射到不同的节点。

再结合集群的配置信息就能够知道这个 key 值应该存储在哪一个具体的 Redis 节点中,如果不属于自己管,那么就会使用一个特殊的 MOVED 命令来进行一个跳转,告诉客户端去连接这个节点以获取数据:

GET x
-MOVED 3999 127.0.0.1:6381

MOVED 指令第一个参数 3999key 对应的槽位编号,后面是目标节点地址,MOVED 命令前面有一个减号,表示这是一个错误的消息。客户端在收到 MOVED 指令后,就立即纠正本地的 槽位映射表,那么下一次再访问 key 时就能够到正确的地方去获取了。

数据存储设计

这个key存到哪个redis存储空间:

 通过CRC16算法设计,计算出key应该保存的位置
 将所有的存储空间计划切割成16384份,每台主机保存一部分
每份代表的是一个存储空间,不是一个key的保存空间
 将key按照计算出的结果,然后(%16384),比如得到37,放到对应的37存储空间

那如何加入一台新的计算机,怎么重新划分?每个原来的redis拿出一部分存到新的redis上。即改变槽的位置。

怎么知道槽新地址?

集群内部通讯设计:

  • 各个数据库相互通信,保存各个库中槽的编号数据。槽可以不连续
  • 一次命中,直接返回
  • 一次未命中,告知具体位置。不是A去B里找,而是链接请求客户端去B里找。

搭建方式

 原生安装(单条命令)
 配置服务器( 3主3从)
 建立通信( Meet)
 分槽( Slot)
 搭建主从( master-slave)
 工具安装(批处理)

Cluster配置

 添加节点:cluster-enabled yes|no
 cluster配置文件名,该文件属于自动生成,仅用于快速查找文件并查询文件内容cluster-config-file <filename>
 节点服务响应超时时间,用于判定该节点是否下线或切换为从节点 cluster-node-timeout <milliseconds>
 master连接的slave最小数量

cluster-migration-barrier<count>

查看集群节点信息 cluster nodes
 进入一个从节点 redis,切换其主节点cluster replicate <master-id>
 发现一个新节点,新增主节点cluster meet ip:port
 忽略一个没有solt的节点cluster forget <id>
 手动故障转移cluster failover

redis-trib命令
 添加节点redis-trib.rb add-node
 删除节点redis-trib.rb del-node
 重新分片redis-trib.rb reshard

8.3.4 集群搭建

第一步:创建集群节点配置文件

# 创建一个集群配置文件的目录
mkdir -p ~/Desktop/redis-cluster

创建六个配置文件,分别命名为:redis_7000.confredis_7001.confredis_7005.conf,然后根据不同的端口号修改对应的端口值就好了:

# 后台执行
daemonize yes
# 端口号 
port 7000 # 每个redis不一样
# 为每一个集群节点指定一个 pid_file
pidfile ~/Desktop/redis-cluster/redis_7000.pid
# 启动集群模式
cluster-enabled yes
# 每一个集群节点都有一个配置文件,这个文件是不能手动编辑的。确保每一个集群节点的配置文件不通
cluster-config-file nodes-7000.conf
# 集群节点的超时时间,单位:ms,超时后集群会认为该节点失败
cluster-node-timeout 5000
# 最后将 appendonly 改成 yes(AOF 持久化)
appendonly yes
#dir 


sed 's/7000/7001/g' redis7000.conf > redis7001.conf
# 把redis7000.conf文件里所有7000改成7001后输入到redis7001.conf

记得把对应上述配置文件中根端口对应的配置都修改掉 (port/ pidfile/ cluster-config-file)

第二步:分别启动 6 个 Redis 实例

# 用的是redis-server,但是值得注意的是下面相当于是6个集群,我们后面还需要操作
redis-server ~/Desktop/redis-cluster/redis_7000.conf
redis-server ~/Desktop/redis-cluster/redis_7001.conf
redis-server ~/Desktop/redis-cluster/redis_7002.conf
redis-server ~/Desktop/redis-cluster/redis_7003.conf
redis-server ~/Desktop/redis-cluster/redis_7004.conf
redis-server ~/Desktop/redis-cluster/redis_7005.conf
#此时是写不了数据的,因为还没有分配槽位 #只有主节点有槽,从结点没有槽位的概念

然后执行 ps -ef | grep redis 查看是否启动成功:

可以看到 6 个 Redis 节点都以集群的方式成功启动了,但是现在每个节点还处于独立的状态,也就是说它们每一个都各自成了一个集群,还没有互相联系起来,我们需要手动地把他们之间建立起联系。

第三步:建立集群

执行下列命令:

# 5.0后才能这么用,之前版本要用lua
# redis-cli --cluster 
redis-cli --cluster create 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 --cluster-replicas 1 
# 考前的是主节点
  • --replicas 1:集群中的每个主节点创建一个从节点,即备份一份。如上会把6个结点分别3个主从模型,每个主从2个结点

观察控制台输出:

看到 [OK] 的信息之后,就表示集群已经搭建成功了,可以看到,这里我们正确地创建了三主三从的集群。

第四步:验证集群

我们先使用 redic-cli 任意连接一个节点:

redis-cli -c -h 127.0.0.1 -p 7000
  • -c表示集群模式;-h 指定 ip 地址;-p 指定端口。

然后随便 set 一些值观察控制台输入:

127.0.0.1:7000> SET name wmyskxz
-> Redirected to slot [5798] located at 127.0.0.1:7001 # 重定向到7001机器上的5798槽位
OK


> cluster nodes查看结点信息

可以看到这里 Redis 自动帮我们进行了 Redirected 操作跳转到了 7001 这个实例上。

我们再使用 cluster info (查看集群信息)cluster nodes (查看节点列表) 来分别看看:(任意节点输入均可)

127.0.0.1:7001> CLUSTER INFO
cluster_state:ok
cluster_slots_assigned:16384
cluster_slots_ok:16384
cluster_slots_pfail:0
cluster_slots_fail:0
cluster_known_nodes:6
cluster_size:3
cluster_current_epoch:6
cluster_my_epoch:2
cluster_stats_messages_ping_sent:1365
cluster_stats_messages_pong_sent:1358
cluster_stats_messages_meet_sent:4
cluster_stats_messages_sent:2727
cluster_stats_messages_ping_received:1357
cluster_stats_messages_pong_received:1369
cluster_stats_messages_meet_received:1
cluster_stats_messages_received:2727
#----------------------------------------
127.0.0.1:7001> CLUSTER NODES
56a04742f36c6e84968cae871cd438935081e86f 127.0.0.1:7003@17003 slave 4ec8c022e9d546c9b51deb9d85f6cf867bf73db6 0 1584428884000 4 connected
4ec8c022e9d546c9b51deb9d85f6cf867bf73db6 127.0.0.1:7000@17000 master - 0 1584428884000 1 connected 0-5460
e2539c4398b8258d3f9ffa714bd778da107cb2cd 127.0.0.1:7005@17005 slave a3406db9ae7144d17eb7df5bffe8b70bb5dd06b8 0 1584428885222 6 connected
d31cd1f423ab1e1849cac01ae927e4b6950f55d9 127.0.0.1:7004@17004 slave 236cefaa9cdc295bc60a5bd1aed6a7152d4f384d 0 1584428884209 5 connected
236cefaa9cdc295bc60a5bd1aed6a7152d4f384d 127.0.0.1:7001@17001 myself,master - 0 1584428882000 2 connected 5461-10922
a3406db9ae7144d17eb7df5bffe8b70bb5dd06b8 127.0.0.1:7002@17002 master - 0 1584428884000 3 connected 10923-16383

# jedis不提供这种重定向功能

扩容

redis-server /redis-7007.conf
cluster nodes
redis.cli --cluster help
redis.cli --cluster add-node 182.168.294.188:7007 192.168.204.177:7000这个参数只要指定集群中任意机器就行 #以主节点加入
cluster nodes
redis.cli --cluster add-node 182.168.294.188:7008 192.168.204.177:7000 --cluster-slave --cluster-master-id 9saudsiodno上面查到的id
# 此时新加入的主节点还没有槽位
redis-cli --cluster reshard 已存在节点id:端口

一、原生搭建

  • 配置开启cluster结点
    • cluster-enabled yes
    • cluster-config-file nodes-8001.conf
  • meet
    • cluster meet ip port
  • 指派槽
    • 查看crc16算法算出key的槽位命令 cluster keyslot key
    • 默认平均分配
      • 3个主从时 16384/3 0-5461 5462-10922 10923-16383
      • 4额主从时 36384/4 4096
    • cluster addslots slot (槽位下标)
  • 分配主从
    • cluster replcate node-id

二、使用redis提供的rb脚本

redis cluster集群需要至少三个master结点,我们这里搭建三个master结点,并且给每个master再搭建一个slave结点,总共6个redis结点,由于节点数较多,这里采用在一台机器上创建6个redis实例,并将这6个redis实例配置成集群模式,所以这里搭建的是为集群模式,当然真正的分布式集群的配置方法几乎一样,搭建伪集群的步骤如下

  • 在/usr/local下创建文件夹redis-cluster,然后在其下面分别创建6个文件夹如下
    • mkdir -p /usr/local/redis-cluster

数据分区方案简析

方案一:哈希值 % 节点数

哈希取余分区思路非常简单:计算 key 的 hash 值,然后对节点数量进行取余,从而决定数据映射到哪个节点上。

不过该方案最大的问题是,当新增或删减节点时,节点数量发生变化,系统中所有的数据都需要 重新计算映射关系,引发大规模数据迁移。

方案二:一致性哈希分区

一致性哈希算法将 整个哈希值空间 组织成一个虚拟的圆环,范围是 [0 , 232-1],对于每一个数据,根据 key 计算 hash 值,确数据在环上的位置,然后从此位置沿顺时针行走,找到的第一台服务器就是其应该映射到的服务器:

与哈希取余分区相比,一致性哈希分区将 增减节点的影响限制在相邻节点。以上图为例,如果在 node1node2 之间增加 node5,则只有 node2 中的一部分数据会迁移到 node5;如果去掉 node2,则原 node2 中的数据只会迁移到 node4 中,只有 node4 会受影响。

一致性哈希分区的主要问题在于,当 节点数量较少 时,增加或删减节点,对单个节点的影响可能很大,造成数据的严重不平衡。还是以上图为例,如果去掉 node2node4 中的数据由总数据的 1/4 左右变为 1/2 左右,与其他节点相比负载过高。

方案三:带有虚拟节点的一致性哈希分区

该方案在 一致性哈希分区的基础上,引入了 虚拟节点 的概念。Redis 集群使用的便是该方案,其中的虚拟节点称为 槽(slot)。槽是介于数据和实际节点之间的虚拟概念,每个实际节点包含一定数量的槽,每个槽包含哈希值在一定范围内的数据。

在使用了槽的一致性哈希分区中,槽是数据管理和迁移的基本单位。槽 解耦数据和实际节点 之间的关系,增加或删除节点对系统的影响很小。仍以上图为例,系统中有 4 个实际节点,假设为其分配 16 个槽(0-15);

  • 槽 0-3 位于 node1;4-7 位于 node2;以此类推…

如果此时删除 node2,只需要将槽 4-7 重新分配即可,例如槽 4-5 分配给 node1,槽 6 分配给 node3,槽 7 分配给 node4;可以看出删除 node2 后,数据在其他节点的分布仍然较为均衡。

节点通信机制简析

集群的建立离不开节点之间的通信,例如我们上访在 快速体验 中刚启动六个集群节点之后通过 redis-cli 命令帮助我们搭建起来了集群,实际上背后每个集群之间的两两连接是通过了 CLUSTER MEET <ip> <port> 命令发送 MEET 消息完成的,下面我们展开详细说说。

两个端口

哨兵系统 中,节点分为 数据节点哨兵节点:前者存储数据,后者实现额外的控制功能。在 集群 中,没有数据节点与非数据节点之分:所有的节点都存储数据,也都参与集群状态的维护。为此,集群中的每个节点,都提供了两个 TCP 端口:

  • 普通端口: 即我们在前面指定的端口 (7000等)。普通端口主要用于为客户端提供服务 (与单机节点类似);但在节点间数据迁移时也会使用。
  • 集群端口: 端口号是普通端口 + 10000 (10000是固定值,无法改变),如 7000 节点的集群端口为 17000集群端口只用于节点之间的通信,如搭建集群、增减节点、故障转移等操作时节点间的通信;不要使用客户端连接集群接口。为了保证集群可以正常工作,在配置防火墙时,要同时开启普通端口和集群端口。

Gossip 协议

节点间通信,按照通信协议可以分为几种类型:单对单、广播、Gossip 协议等。重点是广播和 Gossip 的对比。

  • 广播是指向集群内所有节点发送消息。优点 是集群的收敛速度快(集群收敛是指集群内所有节点获得的集群信息是一致的),缺点 是每条消息都要发送给所有节点,CPU、带宽等消耗较大。
  • Gossip 协议的特点是:在节点数量有限的网络中,每个节点都 “随机” 的与部分节点通信 (并不是真正的随机,而是根据特定的规则选择通信的节点),经过一番杂乱无章的通信,每个节点的状态很快会达到一致。Gossip 协议的 优点 有负载 (比广播) 低、去中心化、容错性高 (因为通信有冗余) 等;缺点 主要是集群的收敛速度慢。

消息类型

集群中的节点采用 固定频率(每秒10次)定时任务 进行通信相关的工作:判断是否需要发送消息及消息类型、确定接收节点、发送消息等。如果集群状态发生了变化,如增减节点、槽状态变更,通过节点间的通信,所有节点会很快得知整个集群的状态,使集群收敛。

节点间发送的消息主要分为 5 种:meet 消息ping 消息pong 消息fail 消息publish 消息。不同的消息类型,通信协议、发送的频率和时机、接收节点的选择等是不同的:

  • MEET 消息: 在节点握手阶段,当节点收到客户端的 CLUSTER MEET 命令时,会向新加入的节点发送 MEET 消息,请求新节点加入到当前集群;新节点收到 MEET 消息后会回复一个 PONG 消息。
  • PING 消息: 集群里每个节点每秒钟会选择部分节点发送 PING 消息,接收者收到消息后会回复一个 PONG 消息。PING 消息的内容是自身节点和部分其他节点的状态信息,作用是彼此交换信息,以及检测节点是否在线。PING 消息使用 Gossip 协议发送,接收节点的选择兼顾了收敛速度和带宽成本,具体规则如下:(1)随机找 5 个节点,在其中选择最久没有通信的 1 个节点;(2)扫描节点列表,选择最近一次收到 PONG 消息时间大于 cluster_node_timeout / 2 的所有节点,防止这些节点长时间未更新。
  • PONG消息: PONG 消息封装了自身状态数据。可以分为两种:第一种 是在接到 MEET/PING 消息后回复的 PONG 消息;第二种 是指节点向集群广播 PONG 消息,这样其他节点可以获知该节点的最新信息,例如故障恢复后新的主节点会广播 PONG 消息。
  • FAIL 消息: 当一个主节点判断另一个主节点进入 FAIL 状态时,会向集群广播这一 FAIL 消息;接收节点会将这一 FAIL 消息保存起来,便于后续的判断。
  • PUBLISH 消息: 节点收到 PUBLISH 命令后,会先执行该命令,然后向集群广播这一消息,接收节点也会执行该 PUBLISH 命令。

数据结构简析

节点需要专门的数据结构来存储集群的状态。所谓集群的状态,是一个比较大的概念,包括:集群是否处于上线状态、集群中有哪些节点、节点是否可达、节点的主从状态、槽的分布……

节点为了存储集群状态而提供的数据结构中,最关键的是 clusterNodeclusterState 结构:前者记录了一个节点的状态,后者记录了集群作为一个整体的状态。

clusterNode 结构

clusterNode 结构保存了 一个节点的当前状态,包括创建时间、节点 id、ip 和端口号等。每个节点都会用一个 clusterNode 结构记录自己的状态,并为集群内所有其他节点都创建一个 clusterNode 结构来记录节点状态。

下面列举了 clusterNode 的部分字段,并说明了字段的含义和作用:

typedef struct clusterNode {
    //节点创建时间
    mstime_t ctime;
    //节点id
    char name[REDIS_CLUSTER_NAMELEN];
    //节点的ip和端口号
    char ip[REDIS_IP_STR_LEN];
    int port;
    //节点标识:整型,每个bit都代表了不同状态,如节点的主从状态、是否在线、是否在握手等
    int flags;
    //配置纪元:故障转移时起作用,类似于哨兵的配置纪元
    uint64_t configEpoch;
    //槽在该节点中的分布:占用16384/8个字节,16384个比特;每个比特对应一个槽:比特值为1,则该比特对应的槽在节点中;比特值为0,则该比特对应的槽不在节点中
    unsigned char slots[16384/8];
    //节点中槽的数量
    int numslots;
    …………
} clusterNode;

除了上述字段,clusterNode 还包含节点连接、主从复制、故障发现和转移需要的信息等。

clusterState 结构

clusterState 结构保存了在当前节点视角下,集群所处的状态。主要字段包括:

typedef struct clusterState {
    //自身节点
    clusterNode *myself;
    //配置纪元
    uint64_t currentEpoch;
    //集群状态:在线还是下线
    int state;
    //集群中至少包含一个槽的节点数量
    int size;
    //哈希表,节点名称->clusterNode节点指针
    dict *nodes;
    //槽分布信息:数组的每个元素都是一个指向clusterNode结构的指针;如果槽还没有分配给任何节点,则为NULL
    clusterNode *slots[16384];
    …………
} clusterState;

除此之外,clusterState 还包括故障转移、槽迁移等需要的信息。

redis相关阅读

  1. Redis(1)——5种基本数据结构 - https://www.wmyskxz.com/2020/02/28/redis-1-5-chong-ji-ben-shu-ju-jie-gou/
  2. Redis(2)——跳跃表 - https://www.wmyskxz.com/2020/02/29/redis-2-tiao-yue-biao/
  3. Redis(3)——分布式锁深入探究 - https://www.wmyskxz.com/2020/03/01/redis-3/
  4. Reids(4)——神奇的HyperLoglog解决统计问题 - https://www.wmyskxz.com/2020/03/02/reids-4-shen-qi-de-hyperloglog-jie-jue-tong-ji-wen-ti/
  5. Redis(5)——亿级数据过滤和布隆过滤器 - https://www.wmyskxz.com/2020/03/11/redis-5-yi-ji-shu-ju-guo-lu-he-bu-long-guo-lu-qi/
  6. Redis(6)——GeoHash查找附近的人https://www.wmyskxz.com/2020/03/12/redis-6-geohash-cha-zhao-fu-jin-de-ren/
  7. Redis(7)——持久化【一文了解】 - https://www.wmyskxz.com/2020/03/13/redis-7-chi-jiu-hua-yi-wen-liao-jie/
  8. Redis(8)——发布/订阅与Stream - [https://www.wmyskxz.com/2020/03/15/redis-8-fa-bu-ding-yue-yu-stream/](

9.6 集群操作

  1. 以集群的方式进入客户端

redis-cli -c -p 端口号

  1. 通过cluster nodes 命令查看集群信息

  2. redis cluster 如何分配这六个节点

一个集群至少要有三个主节点。

选项 --replicas 1 表示我们希望为集群中的每个主节点创建一个从节点。

分配原则尽量保证每个主数据库运行在不同的IP地址,每个从库和主库不在一个IP地址上。

  1. 什么是slots

l 一个 Redis 集群包含 16384 个插槽(hash slot), 数据库中的每个键都属于这 16384 个插槽的其中一个, 集群使用公式 CRC16(key) % 16384 来计算键 key 属于哪个槽, 其中 CRC16(key) 语句用于计算键 key 的 CRC16 校验和 。

l 集群中的每个节点负责处理一部分插槽。 举个例子, 如果一个集群可以有主节点, 其中:

​ 节点 A 负责处理 0 号至 5500 号插槽。

​ 节点 B 负责处理 5501 号至 11000 号插槽。

​ 节点 C 负责处理 11001 号至 16383 号插槽

  1. 在集群中录入值

l 在redis-cli每次录入、查询键值,redis都会计算出该key应该送往的插槽,如果不是该客户端对应服务器的插槽,redis会报错,并告知应前往的redis实例地址和端口.

l redis-cli客户端提供了 –c 参数实现自动重定向。

如 redis-cli -c –p 6379 登入后,再录入、查询键值对可以自动重定向。

l 不在一个slot下的键值,是不能使用mget,mset等多键操作。

l 可以通过{}来定义组的概念,从而使key中{}内相同内容的键值对放到一个slot中去

  1. 查询集群中的值
  • CLUSTER KEYSLOT <key> 计算键 key 应该被放置在哪个槽上。
  • CLUSTER COUNTKEYSINSLOT <slot> 返回槽 slot 目前包含的键值对数量
  • CLUSTER GETKEYSINSLOT <slot> <count> 返回 count 个 slot 槽中的键
  1. 故障恢复

l 如果主节点下线?从节点能否自动升为主节点?

l 主节点恢复后,主从关系会如何?

l 如果所有某一段插槽的主从节点都当掉,redis服务是否还能继续?

redis.conf中的参数 cluster-require-full-coverage

redis客户端

Jedis api 在线网址:http://tool.oschina.net/uploads/apidocs/redis/clients/jedis/Jedis.html

redisson 官网地址:https://redisson.org/

redisson git项目地址:https://github.com/redisson/redisson

lettuce 官网地址:https://lettuce.io/

lettuce git项目地址:https://github.com/lettuce-io/lettuce-core

首先,在spring boot2之后,对redis连接的支持,默认就采用了lettuce。这就一定程度说明了lettuce 和Jedis的优劣。

概念:

Jedis:是老牌的Redis的Java实现客户端,提供了比较全面的Redis命令的支持,

Redisson:实现了分布式和可扩展的Java数据结构。

Lettuce:高级Redis客户端,用于线程安全同步,异步和响应使用,支持集群,Sentinel,管道和编码器。

优点:
  Jedis:比较全面的提供了Redis的操作特性

Redisson:促使使用者对Redis的关注分离,提供很多分布式相关操作服务,例如,分布式锁,分布式集合,可通过Redis支持延迟队列

Lettuce:基于Netty框架的事件驱动的通信层,其方法调用是异步的。Lettuce的API是线程安全的,所以可以操作单个Lettuce连接来完成各种操作

可伸缩:

Jedis:使用阻塞的I/O,且其方法调用都是同步的,程序流需要等到sockets处理完I/O才能执行,不支持异步。Jedis客户端实例不是线程安全的,所以需要通过连接池来使用Jedis。

Redisson:基于Netty框架的事件驱动的通信层,其方法调用是异步的。Redisson的API是线程安全的,所以可以操作单个Redisson连接来完成各种操作

Lettuce:基于Netty框架的事件驱动的通信层,其方法调用是异步的。Lettuce的API是线程安全的,所以可以操作单个Lettuce连接来完成各种操作

lettuce能够支持redis4,需要java8及以上。
lettuce是基于netty实现的与redis进行同步和异步的通信。

lettuce和jedis比较:
jedis使直接连接redis server,如果在多线程环境下是非线程安全的,这个时候只有使用连接池,为每个jedis实例增加物理连接 ;

lettuce的连接是基于Netty的,连接实例(StatefulRedisConnection)可以在多个线程间并发访问,StatefulRedisConnection是线程安全的,所以一个连接实例可以满足多线程环境下的并发访问,当然这也是可伸缩的设计,一个连接实例不够的情况也可以按需增加连接实例。

Redisson实现了分布式和可扩展的Java数据结构,和Jedis相比,功能较为简单,不支持字符串操作,不支持排序、事务、管道、分区等Redis特性。Redisson的宗旨是促进使用者对Redis的关注分离,从而让使用者能够将精力更集中地放在处理业务逻辑上。

总结:
优先使用Lettuce,如果需要分布式锁,分布式集合等分布式的高级特性,添加Redisson结合使用,因为Redisson本身对字符串的操作支持很差。

9.7 集群的Jedis开发

public class JedisClusterTest {
    public static void main(String[] args) {

        Set<HostAndPort> set =new HashSet<HostAndPort>();
        set.add(new HostAndPort("192.168.31.211",6379));
        JedisCluster jedisCluster=new JedisCluster(set);
        jedisCluster.set("k1", "v1");
        System.out.println(jedisCluster.get("k1"));
    }
}

RedisTemplate

直接传对象的话就报 未序列化的错误

Spring-data-redis是spring大家族的一部分,提供了在srping应用中通过简单的配置访问redis服务,对redis底层开发包(Jedis, JRedis, and RJC)进行了高度封装,RedisTemplate提供了redis各种操作、异常处理及序列化,支持发布订阅,并对spring 3.1 cache进行了实现。
**官网:http://projects.spring.io/spring-data-redis/**
**项目地址:https://github.com/spring-projects/spring-data-redis**

一、spring-data-redis功能介绍

jedis客户端在编程实施方面存在如下不足:

1)connection管理缺乏自动化,connection-pool的设计缺少必要的容器支持。
2)数据操作需要关注“序列化”/“反序列化”,因为jedis的客户端API接受的数据类型为string和byte,对结构化数据(json,xml,pojo等)操作需要额外的支持。https://www.cnblogs.com/duanxz/p/3511695.html
3)事务操作纯粹为硬编码。
4)pub/sub功能,缺乏必要的设计模式支持,对于开发者而言需要关注的太多。

spring-data-redis针对jedis提供了如下功能:

  • 1.连接池自动管理,提供了一个高度封装的“RedisTemplate”类

  • 2.针对jedis客户端中大量api进行了归类封装,将同一类型操作封装为operation接口

    • ValueOperations:简单K-V操作
    • SetOperations:set类型数据操作
    • ZSetOperations:zset类型数据操作
    • HashOperations:针对map类型的数据操作
    • ListOperations:针对list类型的数据操作
  • 3.提供了对key的“bound”(绑定)便捷化操作API,可以通过bound封装指定的key,然后进行一系列的操作而无须“显式”的再次指定Key,即BoundKeyOperations:

    • BoundValueOperations
    • BoundSetOperations
    • BoundListOperations
    • BoundSetOperations
    • BoundHashOperations
  • 4.将事务操作封装,有容器控制。

  • 5.针对数据的“序列化/反序列化”,提供了多种可选择策略(RedisSerializer)

    • JdkSerializationRedisSerializer:POJO对象的存取场景,使用JDK本身序列化机制,将pojo类通过ObjectInputStream/ObjectOutputStream进行序列化操作,最终redis-server中将存储字节序列。是目前最常用的序列化策略。
    • StringRedisSerializer:Key或者value为字符串的场景,根据指定的charset对数据的字节序列编码成string,是“new String(bytes, charset)”和“string.getBytes(charset)”的直接封装。是最轻量级和高效的策略。
    • JacksonJsonRedisSerializer:jackson-json工具提供了javabean与json之间的转换能力,可以将pojo实例序列化成json格式存储在redis中,也可以将json格式的数据转换成pojo实例。因为jackson工具在序列化和反序列化时,需要明确指定Class类型,因此此策略封装起来稍微复杂。【需要jackson-mapper-asl工具支持】
    • OxmSerializer:提供了将javabean与xml之间的转换能力,目前可用的三方支持包括jaxb,apache-xmlbeans;redis存储的数据将是xml工具。不过使用此策略,编程将会有些难度,而且效率最低;不建议使用。【需要spring-oxm模块的支持】
    • 针对“序列化和发序列化”中JdkSerializationRedisSerializer和StringRedisSerializer是最基础的策略,原则上,我们可以将数据存储为任何格式以便应用程序存取和解析(其中应用包括app,hadoop等其他工具),不过在设计时仍然不推荐直接使用“JacksonJsonRedisSerializer”和“OxmSerializer”,因为无论是json还是xml,他们本身仍然是String。如果你的数据需要被第三方工具解析,那么数据应该使用StringRedisSerializer而不是JdkSerializationRedisSerializer。如果你的数据格式必须为json或者xml,那么在编程级别,在redisTemplate配置中仍然使用StringRedisSerializer,在存储之前或者读取之后,使用“SerializationUtils”工具转换转换成json或者xml
  • 6.基于设计模式,和JMS开发思路,将pub/sub的API设计进行了封装,使开发更加便捷。

  • 7.spring-data-redis中,并没有对sharding提供良好的封装,如果你的架构是基于sharding,那么你需要自己去实现,这也是sdr和jedis相比,唯一缺少的特性。

    RedisTemplate template = new RedisTemplate<>();
    template.setConnection(factory);//LettuceConnectionFactory
    Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
    ObjectMapper om = new ObjectMapper();
    om.set

redis-template用法

序列化:https://www.cnblogs.com/duanxz/p/3511695.html

  • transient修饰的属性,不会被序列化

  • 静态static的属性,不会序列化

  • 实现Serializable接口的时候,一定要给这个serialVersionID赋值。序列化的时候没有某个属性,反序列话的时候增加了某个属性,重新计算的UID就不一样了。https://blog.csdn.net/u014750606/article/details/80040130

  • 当属性是对象的时候,镀锡也有实现序列化的接口

    redisTemplate.opsForValue();//操作字符串
    redisTemplate.opsForHash();//操作hash
    redisTemplate.opsForList();//操作list
    redisTemplate.opsForSet();//操作set
    redisTemplate.opsForZSet();//操作有序set

String
//set void set(K key, V value);
redisTemplate.opsForValue().set("num","123");
redisTemplate.opsForValue().get("num")  输出结果为123

//set void set(K key, V value, long timeout, TimeUnit unit); 
redisTemplate.opsForValue().set("num","123",10, TimeUnit.SECONDS);
redisTemplate.opsForValue().get("num")设置的是10秒失效,十秒之内查询有结果,十秒之后返回为null
        TimeUnit.DAYS          //天
        TimeUnit.HOURS         //小时
        TimeUnit.MINUTES       //分钟
        TimeUnit.SECONDS       //秒
        TimeUnit.MILLISECONDS  //毫秒 

//set void set(K key, V value, long offset);
//覆写(overwrite)给定 key 所储存的字符串值,从偏移量 offset 开始
template.opsForValue().set("key","hello world");
template.opsForValue().set("key","redis", 6);
System.out.println("***************"+template.opsForValue().get("key"));
结果:***************hello redis

//get V get(Object key);
template.opsForValue().set("key","hello world");
System.out.println("***************"+template.opsForValue().get("key"));
结果:***************hello world

//getAndSet V getAndSet(K key, V value); 
//设置键的字符串值并返回其旧值
template.opsForValue().set("getSetTest","test");
System.out.println(template.opsForValue().getAndSet("getSetTest","test2"));
结果:test

//append Integer append(K key, String value);
//如果key已经存在并且是一个字符串,则该命令将该值追加到字符串的末尾。如果键不存在,则它被创建并设置为空字符串,因此APPEND在这种特殊情况下将类似于SET。
template.opsForValue().append("test","Hello");
System.out.println(template.opsForValue().get("test"));
template.opsForValue().append("test","world");
System.out.println(template.opsForValue().get("test"));
Hello
Helloworld

//size Long size(K key);返回key所对应的value值得长度
template.opsForValue().set("key","hello world");
System.out.println("***************"+template.opsForValue().size("key"));
***************11
List
//Long size(K key);返回存储在键中的列表的长度。如果键不存在,则将其解释为空列表,并返回0。当key存储的值不是列表时返回错误。

System.out.println(template.opsForList().size("list"));


//Long leftPush(K key, V value);
将所有指定的值插入存储在键的列表的头部。如果键不存在,则在执行推送操作之前将其创建为空列表。(从左边插入)

template.opsForList().leftPush("list","java");
template.opsForList().leftPush("list","python");
template.opsForList().leftPush("list","c++");
返回的结果为推送操作后的列表的长度


//Long leftPushAll(K key, V... values);
批量把一个数组插入到列表中

String[] strs = new String[]{"1","2","3"};
template.opsForList().leftPushAll("list",strs);
System.out.println(template.opsForList().range("list",0,-1));
[3, 2, 1]

//Long rightPush(K key, V value);
将所有指定的值插入存储在键的列表的头部。如果键不存在,则在执行推送操作之前将其创建为空列表。(从右边插入)

template.opsForList().rightPush("listRight","java");
template.opsForList().rightPush("listRight","python");
template.opsForList().rightPush("listRight","c++");

//Long rightPushAll(K key, V... values);

String[] strs = new String[]{"1","2","3"};
template.opsForList().rightPushAll("list",strs);
System.out.println(template.opsForList().range("list",0,-1));
[1, 2, 3]

//void set(K key, long index, V value);
在列表中index的位置设置value值

System.out.println(template.opsForList().range("listRight",0,-1));
template.opsForList().set("listRight",1,"setValue");
System.out.println(template.opsForList().range("listRight",0,-1));
[java, python, oc, c++]
[java, setValue, oc, c++]

//Long remove(K key, long count, Object value);
从存储在键中的列表中删除等于值的元素的第一个计数事件。
计数参数以下列方式影响操作:
count> 0:删除等于从头到尾移动的值的元素。
count <0:删除等于从尾到头移动的值的元素。
count = 0:删除等于value的所有元素。

System.out.println(template.opsForList().range("listRight",0,-1));
template.opsForList().remove("listRight",1,"setValue");//将删除列表中存储的列表中第一次次出现的“setValue”。
System.out.println(template.opsForList().range("listRight",0,-1));
[java, setValue, oc, c++]
[java, oc, c++]

//V index(K key, long index);
根据下表获取列表中的值,下标是从0开始的

System.out.println(template.opsForList().range("listRight",0,-1));
System.out.println(template.opsForList().index("listRight",2));
[java, oc, c++]
c++

//V leftPop(K key);
弹出最左边的元素,弹出之后该值在列表中将不复存在

System.out.println(template.opsForList().range("list",0,-1));
System.out.println(template.opsForList().leftPop("list"));
System.out.println(template.opsForList().range("list",0,-1));
[c++, python, oc, java, c#, c#]
c++
[python, oc, java, c#, c#]

//V rightPop(K key);
弹出最右边的元素,弹出之后该值在列表中将不复存在

System.out.println(template.opsForList().range("list",0,-1));
System.out.println(template.opsForList().rightPop("list"));
System.out.println(template.opsForList().range("list",0,-1));
[python, oc, java, c#, c#]
c#
[python, oc, java, c#]
Hash
//Long delete(H key, Object... hashKeys);
删除给定的哈希hashKeys

System.out.println(template.opsForHash().delete("redisHash","name"));
System.out.println(template.opsForHash().entries("redisHash"));
1
{class=6, age=28.1}

//Boolean hasKey(H key, Object hashKey);
确定哈希hashKey是否存在

System.out.println(template.opsForHash().hasKey("redisHash","666"));
System.out.println(template.opsForHash().hasKey("redisHash","777"));
true
false

//HV get(H key, Object hashKey);
从键中的哈希获取给定hashKey的值

System.out.println(template.opsForHash().get("redisHash","age"));
26

//Set<HK> keys(H key);
获取key所对应的散列表的key

System.out.println(template.opsForHash().keys("redisHash"));
//redisHash所对应的散列表为{class=1, name=666, age=27}
[name, class, age]

//Long size(H key);
获取key所对应的散列表的大小个数

System.out.println(template.opsForHash().size("redisHash"));
//redisHash所对应的散列表为{class=1, name=666, age=27}
3

//void putAll(H key, Map<? extends HK, ? extends HV> m);
使用m中提供的多个散列字段设置到key对应的散列表中

Map<String,Object> testMap = new HashMap();
testMap.put("name","666");
testMap.put("age",27);
testMap.put("class","1");
template.opsForHash().putAll("redisHash1",testMap);
System.out.println(template.opsForHash().entries("redisHash1"));
{class=1, name=jack, age=27}

//void put(H key, HK hashKey, HV value);
设置散列hashKey的值

template.opsForHash().put("redisHash","name","666");
template.opsForHash().put("redisHash","age",26);
template.opsForHash().put("redisHash","class","6");
System.out.println(template.opsForHash().entries("redisHash"));
{age=26, class=6, name=666}

//List<HV> values(H key);
获取整个哈希存储的值根据密钥

System.out.println(template.opsForHash().values("redisHash"));
[tom, 26, 6]

//Map<HK, HV> entries(H key);
获取整个哈希存储根据密钥

System.out.println(template.opsForHash().entries("redisHash"));
{age=26, class=6, name=tom}

//Cursor<Map.Entry<HK, HV>> scan(H key, ScanOptions options);
使用Cursor在key的hash中迭代,相当于迭代器。

Cursor<Map.Entry<Object, Object>> curosr = template.opsForHash().scan("redisHash", 
  ScanOptions.ScanOptions.NONE);
    while(curosr.hasNext()){
        Map.Entry<Object, Object> entry = curosr.next();
        System.out.println(entry.getKey()+":"+entry.getValue());
    }
age:27
class:6
name:666
Set
//Long add(K key, V... values);
无序集合中添加元素,返回添加个数
也可以直接在add里面添加多个值 如:template.opsForSet().add("setTest","aaa","bbb")

String[] strs= new String[]{"str1","str2"};
System.out.println(template.opsForSet().add("setTest", strs));
2

// Long remove(K key, Object... values);
移除集合中一个或多个成员

String[] strs = new String[]{"str1","str2"};
System.out.println(template.opsForSet().remove("setTest",strs));
2

// V pop(K key);
移除并返回集合中的一个随机元素

System.out.println(template.opsForSet().pop("setTest"));
System.out.println(template.opsForSet().members("setTest"));
bbb
[aaa, ccc]

// Boolean move(K key, V value, K destKey);
将 member 元素从 source 集合移动到 destination 集合

template.opsForSet().move("setTest","aaa","setTest2");
System.out.println(template.opsForSet().members("setTest"));
System.out.println(template.opsForSet().members("setTest2"));
[ccc]
[aaa]

// Long size(K key);
无序集合的大小长度

System.out.println(template.opsForSet().size("setTest"));
1

//Set<V> members(K key);
返回集合中的所有成员

System.out.println(template.opsForSet().members("setTest"));
[ddd, bbb, aaa, ccc]

// Cursor<V> scan(K key, ScanOptions options);
遍历set

Cursor<Object> curosr = template.opsForSet().scan("setTest", ScanOptions.NONE);
  while(curosr.hasNext()){
     System.out.println(curosr.next());
  }
ddd
bbb
aaa
ccc
ZSet
//Boolean add(K key, V value, double score);
新增一个有序集合,存在的话为false,不存在的话为true

System.out.println(template.opsForZSet().add("zset1","zset-1",1.0));
true

// Long add(K key, Set<TypedTuple<V>> tuples);
新增一个有序集合

ZSetOperations.TypedTuple<Object> objectTypedTuple1 = new DefaultTypedTuple<>("zset-5",9.6);
ZSetOperations.TypedTuple<Object> objectTypedTuple2 = new DefaultTypedTuple<>("zset-6",9.9);
Set<ZSetOperations.TypedTuple<Object>> tuples = new HashSet<ZSetOperations.TypedTuple<Object>>();
tuples.add(objectTypedTuple1);
tuples.add(objectTypedTuple2);
System.out.println(template.opsForZSet().add("zset1",tuples));
System.out.println(template.opsForZSet().range("zset1",0,-1));
[zset-1, zset-2, zset-3, zset-4, zset-5, zset-6]

//Long remove(K key, Object... values);
从有序集合中移除一个或者多个元素

System.out.println(template.opsForZSet().range("zset1",0,-1));
System.out.println(template.opsForZSet().remove("zset1","zset-6"));
System.out.println(template.opsForZSet().range("zset1",0,-1));
[zset-1, zset-2, zset-3, zset-4, zset-5, zset-6]
1
[zset-1, zset-2, zset-3, zset-4, zset-5]

// Long rank(K key, Object o);
返回有序集中指定成员的排名,其中有序集成员按分数值递增(从小到大)顺序排列

System.out.println(template.opsForZSet().range("zset1",0,-1));
System.out.println(template.opsForZSet().rank("zset1","zset-2"));
[zset-2, zset-1, zset-3, zset-4, zset-5]
0   //表明排名第一

//Set<V> range(K key, long start, long end);
通过索引区间返回有序集合成指定区间内的成员,其中有序集成员按分数值递增(从小到大)顺序排列

System.out.println(template.opsForZSet().range("zset1",0,-1));
[zset-2, zset-1, zset-3, zset-4, zset-5]

//Long count(K key, double min, double max);
通过分数返回有序集合指定区间内的成员个数

System.out.println(template.opsForZSet().rangeByScore("zset1",0,5));
System.out.println(template.opsForZSet().count("zset1",0,5));
[zset-2, zset-1, zset-3]
3

//Long size(K key);
获取有序集合的成员数,内部调用的就是zCard方法

System.out.println(template.opsForZSet().size("zset1"));
6

// Double score(K key, Object o);
获取指定成员的score值

System.out.println(template.opsForZSet().score("zset1","zset-1"));
2.2

//Long removeRange(K key, long start, long end);
移除指定索引位置的成员,其中有序集成员按分数值递增(从小到大)顺序排列

System.out.println(template.opsForZSet().range("zset2",0,-1));
System.out.println(template.opsForZSet().removeRange("zset2",1,2));
System.out.println(template.opsForZSet().range("zset2",0,-1));
[zset-1, zset-2, zset-3, zset-4]
2
[zset-1, zset-4]

//Cursor<TypedTuple<V>> scan(K key, ScanOptions options);
遍历zset

Cursor<ZSetOperations.TypedTuple<Object>> cursor = template.opsForZSet().scan("zzset1", ScanOptions.NONE);
    while (cursor.hasNext()){
       ZSetOperations.TypedTuple<Object> item = cursor.next();
       System.out.println(item.getValue() + ":" + item.getScore());
    }
zset-1:1.0
zset-2:2.0
zset-3:3.0
zset-4:6.0

所有的对象需要序列化。在企业中,所有的pojo都会序列化。如果不想用java的序列化,就要编写自己的redisTemplate

真实的开发一般都使用json来传递对象

User user = new User("123",12);
redisTemplate.opsForValue().set("user",user);
sout(redisTemplate.opsForValue().get("user"))

企业案例

高并发秒杀超卖
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));//jedis.get("stock");
if(stock>0){
    int realStock = stock-1;
    strngRedisTemplate.opsForValue().set("stock",realStock+"");
    //成功
}else{
    //失败
}

解决方案:

setnx若存在不操作if not exists

思路:只是练习,不能实用

String lockKey = "lockKey";
try{
    //Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey,"hh")
      //  stringRedisTemplate.expire(lockKey,10,TimeUnit.SECONDS);
    //上两句应该合并才不会出错
    Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey,"hh",10,TimeUnit.SECONDS);//重载


        if(!result) return "error";

    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));//jedis.get("stock");
    if(stock>0){
        int realStock = stock-1;
        strngRedisTemplate.opsForValue().set("stock",realStock+"");
        //成功
    }else{
        //失败
    }
}finally{
    stringRedisTemplate.delete(lockKey);
}


return "end";

分布式锁:

https://blog.csdn.net/wuzhiwei549/article/details/80692278

setnx不支持设置时间,所以需要后面跟上expire,但这样就非原子了,容易执行一般就挂了,死锁了。而set方法支持设置超时时间set(lock_sale_商品ID,1,30,NX)。还有个问题是如何给锁续命的问题,得用守护线程,当被守护的线程挂了,守护线程也就失去作用了。

其他解决方法:zk的有序临时结点

Redission

https://www.bilibili.com/video/BV16K411J754?p=3

热点数据

例如使用 Zset 数据结构,存储 Key 的访问次数/最后访问时间作为 Score,最后做排序,来淘汰那些最少访问的 Key。

如果企业级应用,可以参考:[阿里云的 Redis 混合存储版][1]

会话维持 Session

会话维持 Session 场景,即使用 Redis 作为分布式场景下的登录中心存储应用。每次不同的服务在登录的时候,都会去统一的 Redis 去验证 Session 是否正确。但是在微服务场景,一般会考虑 Redis + JWT 做 Oauth2 模块。

其中 Redis 存储 JWT 的相关信息主要是留出口子,方便以后做统一的防刷接口,或者做登录设备限制等。

表缓存

Redis 缓存表的场景有黑名单、禁言表等。访问频率较高,即读高。根据业务需求,可以使用后台定时任务定时刷新 Redis 的缓存表数据。

消息队列 list

主要使用了 List 数据结构。
List 支持在头部和尾部操作,因此可以实现简单的消息队列。

  1. 发消息:在 List 尾部塞入数据。
  2. 消费消息:在 List 头部拿出数据。

同时可以使用多个 List,来实现多个队列,根据不同的业务消息,塞入不同的 List,来增加吞吐量。

可视化页面:Anoter Redis DeskTop Manager

企业级解决方案

更新一致性

  • 读请求:先读缓存,缓存没有的话,就读数据库,然后取出数据后放入缓存,同时返回响应。
  • 写请求:先删除缓存,然后再更新数据库(避免大量地写、却又不经常读的数据导致缓存频繁更新)。

缓存预热

缓存预热就是系统启动前, 提前将相关的缓存数据直接加载到缓存系统。避免在用户请求的时候,先查询数据库,然后再将数据缓存的问题!用户直接查询事先被预热的缓存数据

场景 :宕机:服务器启动后迅速宕机

问题排查
\1. 请求数量较高
\2. 主从之间数据吞吐量较大,数据同步操作频度较高

解决方案
前置准备工作:

\1. 日常例行统计数据访问记录,启动之前统计访问频度较高的热点数据
\2. 利用LRU数据删除策略,构建数据留存队列
例如: storm与kafka配合

准备工作:
\1. 将统计结果中的数据分类,根据级别, redis优先加载级别较高的热点数据
\2. 利用分布式多服务器同时进行数据读取, 提速数据加载过程
\3. 热点数据主从同时预热

实施:
\1. 使用脚本程序固定触发数据预热过程
\2. 如果条件允许, 使用了CDN(内容分发网络),效果会更好

缓存穿透

缓存穿透是指缓存和数据库中都没有的数据,而用户不断发起请求,如发起为id为“-1”的数据或id为特别大不存在的数据。这时的用户很可能是攻击者,攻击会导致数据库压力过大。

查询数据库和缓存都没有的数据,每次都要查完缓存再查数据库,还是每次访问数据库,浪费了性能

  • 缓存空对象:代码简单,效果不好。查一条数据的时候,不管能不能查到,都加入到缓存。解决的是一个id,多次访问的问题。还会导致redis中会有大量的空数据,占用我们的内存。
  • 布隆过滤器bloom Filter:代码复杂,效果很高。涉及到了位数据扩容。使用一个足够大的bitmap,用于存储可能访问的key,不存在的key直接被过滤;在缓存层前拦截非法请求、自动为空值添加黑名单(同时可能要为误判的记录添加白名单).但需要考虑布隆过滤器的维护(离线生成/ 实时生成)。
  • 在接口增增加校验,如用户鉴权校验,id做基础校验,id<=0的直接拦截;

一般MySQL 默认的最大连接数在 150 左右,这个可以通过 show variables like '%max_connections%';命令来查看。最大连接数一个还只是一个指标,cpu,内存,磁盘,网络等无力条件都是其运行指标,这些指标都会限制其并发能力!所以,一般 3000 个并发请求就能打死大部分数据库了。

另外,这里多说一嘴,一般情况下我们是这样设计 key 的: 表名:列名:主键名:主键值

布隆过滤器

https://github.com/Snailclimb/JavaGuide/blob/master/docs/dataStructures-algorithms/data-structure/bloom-filter.md

隆过滤器的主要是由一个很长的二进制向量和若干个(k个)散列映射函数组成。因为每个元数据的存储信息值固定,而且总的二进制向量固定。所以在内存占用和查询时间上都远远超过一般的算法。当然存在一定的不准确率(可以控制)和不容易删除样本数据。

  • 不支持删除

  • 存在漏判率

  • 构造器BloomFilter.create()

    • 只有两个方法:put()mightContain()
  • 先查布隆,再查缓存,再查数据库

  • size预计插入多少数据

  • fpp 容错率 出现误判的概率是多少。不能为0

  • bloomFilter 位数组

  • list 创建的是object数组

  • bit位数组:redis底层就是用二进制存储的。一个字符是8位,还可以用setbit key值 第几位 0/1修改指定位,就可以把a改成b。同样我们还可以用setbit来进行扩容,这样就能设置为我们想要的1000位,在后面补0

  • 位数组:JVM内存,没有持久化

  • redis:redis内存,redis持久化

二进制的向量初始状态(JAVA中由BitSet实现)

2:添加一个样本数据

为了表达存储N个元素的集合,使用K个独立的函数来进行哈希运算。

  • {x1,x2……xk}为k个哈希算法。
  • 如果集合元素有N1,N2……NN,

N1经过x1运算后得到的结果映射的位置标1,经过x2运算后结果映射也标1,已经为1的保持1不变。经过k次散列后,对N1的散列完成。

依次对N2,NN等所有数据进行散列,最终得到一个部分为1,部分位为0的字节数组。当然了,这个字节数组会比较长,不然散列效果不好。

​ 样本数据经过函数组后获得位置数组,对应改变二进制向量的值为1。继续添加样本数据,重复上述过程。

那么怎么判断一个外来的元素是否已经在集合里呢,譬如已经散列了10亿个垃圾邮箱,现在来了一个邮箱,怎么判断它是否在这10亿里面呢?

很简单,就拿这个新来的也依次经历x1,x2……xk个哈希算法即可。

在任何一个哈希算法譬如到x2时,得到的映射值有0,那就说明这个邮箱肯定不在这10亿内。

如果是一个黑名单对象,那么可以肯定的是所有映射都为1,肯定跑不了它。也就是说是坏人,一定会被抓。

那么误伤是为什么呢,就是指一些非黑名单对象的值经过k次哈希后,也全部为1,但它确实不是黑名单里的值,这种概率是存在的,但是是可控的。

​ 3:得到最终二进制向量

​ 4:新数据比对

获取到位置数组,判断二进制向量上对应位置是否为1,只要有一个不为1(为0),那么就能肯定不存在。如果都为1,那么就很可能存在。

package com.java.base;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;

public class TestBloomFilter {

    private static final int DEFAULT_SIZE = (1 << 31) - 1; // m的值 ,最大有符号int//1后面31个0,;-1的补码为32个1,相加为0后面31个1
    private static final int[] seeds = new int[] { 9, 11, 13, 31, 37, 57 }; // 6个函数
    private BitSet bits = new BitSet(DEFAULT_SIZE);
    private HashFunc[] func = new HashFunc[seeds.length];
    private static String words = "abcdefghijklmnopqrstuvwxyz1234567890_"; //37

    /**
     * 创建过滤器
     */
    public TestBloomFilter () { // 构造器
        for (int i = 0; i < seeds.length; i++) {
            func[i] = new HashFunc(DEFAULT_SIZE, seeds[i]);
        }
    }

    public static void main(String[] args) {
        runFilter();
    }

    public static void runFilter() {
        TestBloomFilter  filter = new TestBloomFilter ();
        List<String> existList = new ArrayList<String>();
        List<String> noExistList = new ArrayList<String>();
        int countExist = 0;
        System.out.println("开始添加数据");
        int SampleCount = 100000000;
        for (int i = 0; i < SampleCount; i++) {
            String value = getStr();
            if (!filter.contains(value)) {
                if (existList.size() < 1000) {
                    existList.add(value);
                }
                filter.add(value);
            } else { // 重复值
                countExist++;
            }
            if (i % 1000000 == 0) {
                System.out.println("已经添加:" + i);
            }
        }//数据添加完
        System.out.println("随机保存值重复了" + countExist);
        System.out.println(SampleCount + "比对样本值保存完毕");
        boolean flag = true;
        while (flag) {
            if (noExistList.size() > 999) {
                flag = false;
            } else {
                String str = getStr();
                if (!filter.contains(str)) {
                    noExistList.add(str);
                }
            }
        }
        System.out.println("1千的存在和不存在的待比对数据准备完毕");
        long start = System.currentTimeMillis();
        System.out.println("开始比对存在字符串");
        int existCount = 0;
        for (int i = 0; i < existList.size(); i++) {
            if (filter.contains(existList.get(i))) {
                existCount++;
            }
        }
        System.out.println("比对正确率:" + existCount + "/1000");
        System.out.println("开始比对不存在字符串");
        int noExistCount = 0;
        for (int i = 0; i < noExistList.size(); i++) {
            if (!filter.contains(noExistList.get(i))) {
                noExistCount++;
            }
        }
        System.out.println("比对正确率:" + noExistCount + "/1000");
        System.out.println("比对2千数据耗时:" + (System.currentTimeMillis() - start) + "毫秒");
        System.out.println("over");
    }

    /**
     * 获取随机比对字符串
     */
    public static String getStr() {// 得到一个长度为30多的字符串
        StringBuilder sb = new StringBuilder();
        // 添加30个字母
        for (int i = 0; i < 30; i++) {
            sb.append(words.charAt((int) (Math.random() * 37)));// "abcdefghijklmnopqrstuvwxyz1234567890_";
        }
        // 添加0~10W的一个整数
        sb.append(Math.random() * 100000);
        // sb.append(System.nanoTime());
        return sb.toString();
    }



    /**
     * 添加样本数据
     * @param value
     */
    public void add(String value) {
        for (HashFunc f : func) {
            bits.set(f.hash(value), true);
        }
    }

    /**
     * 判断是否存在
     * @param value
     * @return
     */
    public boolean contains(String value) {
        if (value == null) {
            return false;
        }
        boolean ret = true;
        for (HashFunc f : func) {
            ret = ret && bits.get(f.hash(value));//获取对象位置上的bit。0/1
        }
        return ret;
    }

    /**
     * 哈希函数
     */
    public static class HashFunc {
        private int maxCount;// DEFAULT_SIZE
        private int seed;// seeds[i]

        public HashFunc(int maxCount, int seed) {
            this.maxCount = maxCount;
            this.seed = seed;
        }

        public int hash(String value) {//计算得到要放的位置
            int result = 0;
            int len = value.length();
            for (int i = 0; i < len; i++) {
                result = seed * result + value.charAt(i);
            }
            return (maxCount - 1) & result;
        }
    }

}// https://blog.csdn.net/daobaliangbanana2/article/details/81388045
哈希函数个数和布隆过滤器长度

很显然,过小的布隆过滤器很快所有的 bit 位均为 1,那么查询任何值都会返回“可能存在”,起不到过滤的目的了。布隆过滤器的长度会直接影响误报率,布隆过滤器越长其误报率越小。

另外,哈希函数的个数也需要权衡,个数越多则布隆过滤器 bit 位置位 1 的速度越快,且布隆过滤器的效率越低;但是如果太少的话,那我们的误报率会变高。

  • k 为哈希函数个数
  • m 为布隆过滤器长度
  • n 为插入的元素个数
  • p 为误报率

Guava

https://blog.csdn.net/u014106644/article/details/91491807

//https://blog.csdn.net/u014653197/article/details/76397037
public class BloomFilter implements Serializable{

    private final int[] seeds;
    private final int size;
    private final BitSet notebook;//
    private final MisjudgmentRate rate;
    private final AtomicInteger useCount = new AtomicInteger();//Atomic类型
    private final Double autoClearRate;

    //dataCount逾预期处理的数据规模
    public BloomFilter(int dataCount){
        this(MisjudgmentRate.MIDDLE, dataCount, null);//重载构造
    }

    public BloomFilter(MisjudgmentRate rate, // 第一个参数是一个Enum,有个属性为int[] seeds1331
                       int dataCount, 
                       Double autoClearRate){//自动清空过滤器内部信息的使用比率,传null则表示不会自动清理;//当过滤器使用率达到100%时,则无论传入什么数据,都会认为在数据已经存在了;//当希望过滤器使用率达到80%时自动清空重新使用,则传入0.8

        //每个字符串需要的bit位数*总数据量
        long bitSize = rate.seeds.length * dataCount;//getSeed()
        if(bitSize<0 || bitSize>Integer.MAX_VALUE){
            throw new RuntimeException("位数太大溢出了,请降低误判率或者降低数据大小");
        }
        this.rate = rate;
        seeds = rate.seeds;
        size = (int)bitSize;
        //创建一个BitSet位集合
        notebook = new BitSet(size);
        this.autoClearRate = autoClearRate;
    }

    //如果存在返回true,不存在返回false
    public boolean addIfNotExist(String data){
        //是否需要清理
        checkNeedClear();
        //seeds.length决定每一个string对应多少个bit位,每一位都有一个索引值
        //给定data,求出data字符串的第一个索引值index,如果第一个index值对应的bit=false说明,该data值不存在,则直接将所有对应bit位置为true即可;
        //如果第一个index值对应bit=true,则将index值保存,但此4时并不能说明data已经存在,
        //则继续求解第二个index值,若所有index值都不存在则说明该data值不存在,将之前保存的index数组对应的bit位置为true
        int[] indexs = new int[seeds.length];
        //假定data已经存在
        boolean exist = true;
        int index;
        for(int i=0; i<seeds.length; i++){
            //计算位hash值
            indexs[i] = index = hash(data, seeds[i]);
            if(exist){
                //如果某一位bit不存在,则说明该data不存在
                if(!notebook.get(index)){
                    exist = false;
                    //将之前的bit位置为true
                    for(int j=0; j<=i; j++){
                        setTrue(indexs[j]);
                    }
                }
            }else{
                //如果不存在则直接置为true
                setTrue(index);
            }
        }

        return exist;
    }

    private int hash(String data, int seeds) {
        char[] value = data.toCharArray();
        int hash = 0;
        if(value.length>0){
            for(int i=0; i<value.length; i++){
                hash = i * hash + value[i];
            }
        }
        hash = hash * seeds % size;
        return Math.abs(hash);
    }

    private void setTrue(int index) {
        useCount.incrementAndGet();
        notebook.set(index, true);
    }

    //如果BitSet使用比率超过阈值,则将BitSet清零
    private void checkNeedClear() {
        if(autoClearRate != null){
            if(getUseRate() >= autoClearRate){
                synchronized (this) {
                    if(getUseRate() >= autoClearRate){
                        notebook.clear();
                        useCount.set(0);
                    }
                }
            }
        }
    }

    private Double getUseRate() {
        return (double)useCount.intValue()/(double)size;
    }

    public void saveFilterToFile(String path) {
        try (ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(path))) {
            oos.writeObject(this);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

    }

    public static BloomFilter readFilterFromFile(String path) {
        try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream(path))) {
            return (BloomFilter) ois.readObject();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 清空过滤器中的记录信息
     */
    public void clear() {
        useCount.set(0);
        notebook.clear();
    }

    public MisjudgmentRate getRate() {
        return rate;
    }

    /**
     * 分配的位数越多,误判率越低但是越占内存
     * 
     * 4个位误判率大概是0.14689159766308
     * 
     * 8个位误判率大概是0.02157714146322
     * 
     * 16个位误判率大概是0.00046557303372
     * 
     * 32个位误判率大概是0.00000021167340
     *
     */
    public enum MisjudgmentRate {
        // 这里要选取质数,能很好的降低错误率
        /**
         * 每个字符串分配4个位
         */
        VERY_SMALL(new int[] { 2, 3, 5, 7 }), // 后面跟的参数传入构造器了
        /**
         * 每个字符串分配8个位
         */
        SMALL(new int[] { 2, 3, 5, 7, 11, 13, 17, 19 }), //
        /**
         * 每个字符串分配16个位
         */
        MIDDLE(new int[] { 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53 }), //
        /**
         * 每个字符串分配32个位
         */
        HIGH(new int[] { 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 
                        43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97,
                        101, 103, 107, 109, 113, 127, 131 });

        private int[] seeds;

        //枚举类型MIDDLE构造函数将seeds数组初始化 // 私有构造,防止被外部调用
        private MisjudgmentRate(int[] seeds) { // 前面new[]就会自动传入
            this.seeds = seeds;
        }

        public int[] getSeeds() {
            return seeds;
        }

        public void setSeeds(int[] seeds) {
            this.seeds = seeds;
        }
    }

    public static void main(String[] args) {
        BloomFilter fileter = new BloomFilter(7);
        System.out.println(fileter.addIfNotExist("1111111111111"));
        System.out.println(fileter.addIfNotExist("2222222222222222"));
        System.out.println(fileter.addIfNotExist("3333333333333333"));
        System.out.println(fileter.addIfNotExist("444444444444444"));
        System.out.println(fileter.addIfNotExist("5555555555555"));
        System.out.println(fileter.addIfNotExist("6666666666666"));
        System.out.println(fileter.addIfNotExist("1111111111111"));
        //fileter.saveFilterToFile("C:\\Users\\john\\Desktop\\1111\\11.obj");
        //fileter = readFilterFromFile("C:\\Users\\john\\Desktop\\111\\11.obj");
        System.out.println(fileter.getUseRate());
        System.out.println(fileter.addIfNotExist("1111111111111"));
    }

}


<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>25.1-jre</version>
</dependency>


public class Test1 {

    private static int size = 1000000;

    private static BloomFilter<Integer> bloomFilter = BloomFilter.create(Funnels.integerFunnel(), size);
    // 自定义错误率
    // private static BloomFilter<Integer> bloomFilter = BloomFilter.create(Funnels.integerFunnel(), size, 0.01);

    public static void main(String[] args) {
        for (int i = 0; i < size; i++) {
            bloomFilter.put(i);
        }

        long startTime = System.nanoTime(); // 获取开始时间
        //判断这一百万个数中是否包含29999这个数
        if (bloomFilter.mightContain(29999)) {
            System.out.println("命中了");
        }
        long endTime = System.nanoTime();   // 获取结束时间
        System.out.println("程序运行时间: " + (endTime - startTime) + "纳秒");
    }

}
分布式自定义
redis实现类
@Component
public class RedisUtil {
    @Autowired
    private RedisTemplate redisTemplate;

    /**
     * 设置bitMap
     *
     * @param key    bitMap key
     * @param offset 设置在多少位
     * @param flag   设置的值
     */
    public void setBit(String key, Long offset, Boolean flag) {
        redisTemplate.opsForValue().setBit(key, offset, flag);
    }

    /**
     * 获取bitMap指定位置的值
     *
     * @param key    bitMap key
     * @param offset 位置
     */
    public Boolean getBit(String key, Long offset) {
        return redisTemplate.opsForValue().getBit(key, offset);
    }
}
建立hash函数所需要的seed
public enum CorrectRatio {
    /**
     * 32位代表一个数
     */
    HIGH(new int[]{2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 
                   41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 
                   89, 97,101, 103, 107, 109, 113, 127, 131});

    private int[] seed;

    CorrectRatio(int[] size) {
        this.seed = size;
    }

    public int[] getSeed() {
        return seed;
    }

    public void setSeed(int[] seed) {
        this.seed = seed;
    }
}

布隆过滤器

public class BoolmfilterForRedis {

    @Autowired
    private RedisUtil redisUtil;

    /**
     * BKRD Hash算法seed
     */
    private int[] seed;

    /**
     * hash方法 数组
     */
    private HashFunction[] hashFunction;

    /**
     * 所需bitMap 位数 为2的幂次方
     */
    private Long cap;

    private static final String bitMapPrefix = "bitMapForBoolmfilter";


    /**
     * 初始化容量 redis中位图的容量
     *
     * @param n 输入量
     */
    private void initCap(CorrectRatio correctRatio, Long n) {
        /*
         * 如果correctRatio.getSeed().length 为32 即有32个hash函数对一个数做映射
         * 那么 bitSet中 32位代表一个数 所需要的最大容量为32*n
         */
        this.cap = correctRatio.getSeed().length * n;
    }


    /**
     * 构造 hash函数
     *
     * @param correctRatio
     */
    private void createHashFunctions(CorrectRatio correctRatio) {
        this.seed = correctRatio.getSeed();
        this.hashFunction = new HashFunction[this.seed.length];
        for (int i = 0; i < seed.length; i++) {
            this.hashFunction[i] = new HashFunction(seed[i], this.cap);
        }
    }

    /**
     * 初始化
     *
     * @param correctRatio
     * @param n
     */
    public BoolmfilterForRedis(CorrectRatio correctRatio, Long n) {
        initCap(correctRatio, n);
        createHashFunctions(correctRatio);
    }

    /**
     * 添加
     *
     * @param value
     */
    public void add(String value) {
        for (int i = 0; i < seed.length; i++) {
            redisUtil.setBit(bitMapPrefix, this.hashFunction[i].hash(value), true);
        }
    }

    /**
     * 包含
     *
     * @param value
     * @return
     */
    public Boolean contain(String value) {
        boolean flag = true;
        for (int i = 0; i < seed.length; i++) {
            flag = flag && redisUtil.getBit(bitMapPrefix, this.hashFunction[i].hash(value));
            if (!flag) {
                return false;
            }
        }
        return flag;
    }


    /**
     * hash算法
     */
    private class HashFunction {
        private Integer seed;

        private Long cap;

        public HashFunction(Integer seed, Long cap) {
            this.seed = seed;
            this.cap = cap;
        }

        public Long hash(String value) {
            Integer result = 0;
            for (int i = 0; i < value.length(); i++) {
                result = result * this.seed + value.charAt(i);
            }
            //求余数 防止redis中bitMap无限膨胀  类似循环队列 可以防止系统OOM
            return result & (cap - 1);
        }
    }
}

由于初始化不能通过无参构造函数 所以还要配置

@Configuration
public class BoolmfilterConfig {
    @Value("${boolmfilter.inputnum.pow}")
    private Integer pow;

    @Bean
    public BoolmfilterForRedis getBoolmfilter(){
        //输入数据量cap=输入数据量 * hash个数  
        // hash个数就是seed数 所以要求seed个数为2的幂次方 输入量也要是2的幂次方 详情参考hashMap容量为什么为2的幂次方
        BoolmfilterForRedis boolmfilter=new BoolmfilterForRedis(CorrectRatio.HIGH,(long)1<<pow);
        return boolmfilter;
    }
}
测试

测试前准备1000uuid存入

@RunWith(SpringRunner.class)
@SpringBootTest
public class GatewayApplicationTests {
    @Autowired
    private BoolmfilterForRedis boolmfilter;
    @Autowired
    private RedisTemplate redisTemplate;

    @Test
    public void contextLoads() {
        //1.先存入1000个uuid
        for (int i=0;i<1000;i++) {
            redisTemplate.opsForList().rightPush("xx", UUID.randomUUID().toString());
        }
    }

}

// 2.将数据填入boolmfilter
List<String> xx = redisTemplate.opsForList().range("bitMapForBoolmfilter", 0L, 100L);
xx.forEach(s -> {
    boolmfilter.add(s);
});

//3.测试通过率 由于uuid基本不重复 所以这里生成的uuid通过率在不误判的情况下应为0
int count = 0;
for (int i = 0; i < 10000; i++) {
    boolean contains = boolmfilter.contain(UUID.randomUUID().toString());
    if (contains == true) {
        count++;
    }
}
System.out.println(count);

数据库服务器崩溃( 3)
\1. 系统平稳运行过程中
\2. 应用服务器流量随时间增量较大
\3. Redis服务器命中率随时间逐步降低
\4. Redis内存平稳,内存无压力
\5. Redis服务器CPU占用激增
\6. 数据库服务器压力激增
\7. 数据库崩溃

问题排查
\1. Redis中大面积出现未命中
\2. 出现非正常URL访问

问题分析
 获取的数据在数据库中也不存在,数据库查询未得到对应数据
 Redis获取到null数据未进行持久化,直接返回
 下次此类数据到达重复上述过程
 出现黑客攻击服务器

解决方案(术)
\1. 缓存null
对查询结果为null的数据进行缓存(长期使用,定期清理), 设定短时限,例如30-60秒, 最高5分钟

\3. 实施监控
实时监控redis命中率( 业务正常范围时,通常会有一个波动值)与null数据的占比
 非活动时段波动:通常检测3-5倍,超过5倍纳入重点排查对象
 活动时段波动:通常检测10-50倍, 超过50倍纳入重点排查对象
根据倍数不同,启动不同的排查流程。然后使用黑名单进行防控(运营)
\4. key加密
问题出现后,临时启动防灾业务key,对key进行业务层传输加密服务,设定校验程序,过来的key校验
例如每天随机分配60个加密串,挑选2到3个,混淆到页面数据id中,发现访问key不满足规则,驳回数据访问

总结
缓存击穿访问了不存在的数据,跳过了合法数据的redis数据缓存阶段,每次访问数据库,导致对数据库服务器造成压力。通常此类数据的出现量是一个较低的值,当出现此类情况以毒攻毒,并及时报警。应对策略应该在临时预案防范方面多做文章。
无论是黑名单还是白名单,都是对整体系统的压力,警报解除后尽快移除。

缓存击穿

缓存雪崩和缓存击穿不同的是:

  • 缓存击穿指并发查同一条数据。缓存击穿是指缓存中没有但数据库中有的数据(一般是缓存时间到期),这时由于并发用户特别多,同时读缓存没读到数据,又同时去数据库去取数据,引起数据库压力瞬间增大,造成过大压力
  • 缓存雪崩是不同数据都过期了,很多数据都查不到从而查数据库。

解决方案:

  • 设置热点数据永远不过期。

  • 加互斥锁:业界比较常用的做法,是使用mutex。简单地来说,就是在缓存失效的时候(判断拿出来的值为空),不是立即去load db去数据库加载,而是先使用缓存工具的某些带成功操作返回值的操作(比如Redis的SETNX或者Memcache的ADD)去set一个mutex key,当操作返回成功时,再进行load db的操作并回设缓存;否则,就重试整个get缓存的方法。
    互斥锁参考代码如下:

    public static String getData(String key)throws InterruptedException{
    //从缓存中读取数据
    String result = getDataFromRedis(key);
    //如果缓存中不存在数据
    if(result==null){
    //去获取锁,获取成功,去数据库取数据
    if(reenLock.tryLock()){//redisLock.lock(String.valueOf(id))
    //从数据库获取数据
    result = getDataFromMysql(key);
    //更新缓存数据
    if(result!=null){
    setDataToCache(key,result);
    }
    //释放锁
    reenLock.unlock();
    }else{//获取锁失败
    //暂停100ms再去缓存再去缓存获取数据
    Thread.sleep(100);
    result = getData(key);
    }
    }
    return result;
    }

说明:

1)缓存中有数据,直接走上述代码13行后就返回结果了

2)缓存中没有数据,第1个进入的线程,获取锁并从数据库去取数据,没释放锁之前,其他并行进入的线程会等待100ms,再重新去缓存取数据。这样就防止都去数据库重复取数据,重复往缓存中更新数据情况出现。

3)当然这是简化处理,理论上如果能根据key值加锁就更好了,就是线程A从数据库取key1的数据并不妨碍线程B取key2的数据,上面代码明显做不到这点。

1.使用互斥锁(mutex key)

SETNX,是「SET if Not eXists」的缩写,也就是只有不存在的时候才设置,可以利用它来实现锁的效果。在redis2.6.1之前版本未实现setnx的过期时间,所以这里给出两种版本代码参考:

//2.6.1前单机版本锁
String get(String key) {  
    String value = redis.get(key);  
    if (value  == null) {  
        if (redis.setnx(key_mutex, "1")) {  
            // 3 min timeout to avoid mutex holder crash  
            redis.expire(key_mutex, 3 * 60) ;
            value = db.get(key);  
            redis.set(key, value);  
            redis.delete(key_mutex);  
        } else {  
            //其他线程休息50毫秒后重试  
            Thread.sleep(50);     
            get(key);  
        }  
    }  
}

最新版本代码:

public String get(key) {
    String value = redis.get(key);
    if (value == null) { //代表缓存值过期
        //设置3min的超时,防止del操作失败的时候,下次缓存过期一直不能load db
        if (redis.setnx(key_mutex, 1, 3 * 60) == 1) {  //代表设置成功
            value = db.get(key);
            redis.set(key, value, expire_secs);
            redis.del(key_mutex);
        } else {  //这个时候代表同时候的其他线程已经load db并回设到缓存了,这时候重试获取缓存值即可
            sleep(50);
            get(key);  //重试
        }
    } else {
        return value;      
    }
}
memcache代码:
    if (memcache.get(key) == null) {  
        // 3 min timeout to avoid mutex holder crash  
        if (memcache.add(key_mutex, 3 * 60 * 1000) == true) {  
            value = db.get(key);  
            memcache.set(key, value);  
            memcache.delete(key_mutex);  
        } else {  
            sleep(50);  
            retry();  
        }  
    } 

2 "提前"使用互斥锁(mutex key):
在value内部设置1个超时值(timeout1), timeout1比实际的memcache timeout(timeout2)小。当从cache读取到timeout1发现它已经过期时候,马上延长timeout1并重新设置到cache。然后再从数据库加载数据并设置到cache中。伪代码如下:

v = memcache.get(key);  
if (v == null) {  
    if (memcache.add(key_mutex, 3 * 60 * 1000) == true) {  
        value = db.get(key);  
        memcache.set(key, value);  
        memcache.delete(key_mutex);  
    } else {  
        sleep(50);  
        retry();  
    }  
} else {  
    if (v.timeout <= now()) {  
        if (memcache.add(key_mutex, 3 * 60 * 1000) == true) {  
            // extend the timeout for other threads  
            v.timeout += 3 * 60 * 1000;  
            memcache.set(key, v, KEY_TIMEOUT * 2);  
            // load the latest value from db  
            v = db.get(key);  
            v.timeout = KEY_TIMEOUT;  
            memcache.set(key, value, KEY_TIMEOUT * 2);  
            memcache.delete(key_mutex);  
        } else {  
            sleep(50);  
            retry();  
        }  
    }  
} 
  1. “永远不过期”:
    这里的“永远不过期”包含两层意思:

(1) 从redis上看,确实没有设置过期时间,这就保证了,不会出现热点key过期问题,也就是“物理”不过期。

(2) 从功能上看,如果不过期,那不就成静态的了吗?所以我们把过期时间存在key对应的value里,如果发现要过期了,通过一个后台的异步线程进行缓存的构建,也就是“逻辑”过期

从实战看,这种方法对于性能非常友好,唯一不足的就是构建缓存时候,其余线程(非构建缓存的线程)可能访问的是老数据,但是对于一般的互联网功能来说这个还是可以忍受。

String get(final String key) {  
    V v = redis.get(key);  
    String value = v.getValue();     
    long timeout = v.getTimeout();  
    if (v.timeout <= System.currentTimeMillis()) {  
        // 异步更新后台异常执行  
        threadPool.execute(new Runnable() {  
            public void run() {  
                String keyMutex = "mutex:" + key;  
                if (redis.setnx(keyMutex, "1")) {  
                    // 3 min timeout to avoid mutex holder crash  
                    redis.expire(keyMutex, 3 * 60);  
                    String dbValue = db.get(key);  
                    redis.set(key, dbValue);  
                    redis.delete(keyMutex);  
                }  
            }  
        });  
    }  
    return value;  
}
  1. 资源保护:
    采用netflix的hystrix,可以做资源的隔离保护主线程池,如果把这个应用到缓存的构建也未尝不可。
  2. 四种解决方案:没有最佳只有最合适

解决方案

优点

缺点

简单分布式互斥锁(mutex key)

1. 思路简单2. 保证一致性

1. 代码复杂度增大2. 存在死锁的风险3. 存在线程池阻塞的风险

“提前”使用互斥锁

1. 保证一致性

同上

不过期(本文)

1. 异步构建缓存,不会阻塞线程池

1. 不保证一致性。2. 代码复杂度增大(每个value都要维护一个timekey)。3. 占用一定的内存空间(每个value都要维护一个timekey)。

资源隔离组件hystrix(本文)

1. hystrix技术成熟,有效保证后端。2. hystrix监控强大。

1. 部分访问存在降级策略。

数据库服务器崩溃( 2)
\1. 系统平稳运行过程中
\2. 数据库连接量瞬间激增
\3. Redis服务器无大量key过期
\4. Redis内存平稳,无波动
\5. Redis服务器CPU正常
\6. 数据库崩溃

问题排查
\1. Redis中某个key过期,该key访问量巨大
\2. 多个数据请求从服务器直接压到Redis后,均未命中
\3. Redis在短时间内发起了大量对数据库中同一数据的访问

问题分析
 单个key高热数据
 key过期

解决方案(术)
\1. 预先设定
以电商为例,每个商家根据店铺等级, 指定若干款主打商品,在购物节期间, 加大此类信息key的过期时长
注意:购物节不仅仅指当天,以及后续若干天,访问峰值呈现逐渐降低的趋势
\2. 现场调整
监控访问量,对自然流量激增的数据延长过期时间或设置为永久性key
\3. 后台刷新数据
启动定时任务,高峰期来临之前, 刷新数据有效期, 确保不丢失
\4. 二级缓存
设置不同的失效时间,保障不会被同时淘汰就行
\5. 加锁
分布式锁,防止被击穿,但是要注意也是性能瓶颈,慎重!

总结
缓存击穿就是单个高热数据过期的瞬间,数据访问量较大,未命中redis后,发起了大量对同一数据的数据库访问,导致对数据库服
务器造成压力。应对策略应该在业务数据分析与预防方面进行,配合运行监控测试与即时调整策略,毕竟单个key的过期监控难度
较高,配合雪崩处理策略即可

缓存雪崩

大量的key设置了相同的过期时间,导致在缓存在同一时刻全部失效,造成瞬时DB请求量大、压力骤增,引起雪崩。

缓存雪崩是指缓存中数据大批量到过期时间,而查询数据量巨大,引起数据库压力过大甚至down机。

缓存雪崩和缓存击穿不同的是, 缓存击穿指并发查同一条数据,缓存雪崩是不同数据都过期了,很多数据都查不到从而查数据库。

缓存崩溃时请求会直接落到数据库上,很可能由于无法承受大量的并发请求而崩溃,此时如果只重启数据库,或因为缓存重启后没有数据,新的流量进来很快又会把数据库击倒。

解决方案:

  • 规避雪崩:缓存数据的过期时间设置随机,防止同一时间大量数据过期现象发生。
  • 如果缓存数据库是分布式部署,将热点数据均匀分布在不同缓存数据库中。
  • 设置热点数据永远不过期。
  • 出现雪崩:降级 熔断
  • 事前:尽量保证整个 redis 集群的高可用性,发现机器宕机尽快补上。选择合适的内存淘汰策略。
  • 事中:本地ehcache缓存 + hystrix限流&降级,避免MySQL崩掉
  • 事后:利用 redis 持久化机制保存的数据尽快恢复缓存

数据库服务器崩溃( 1)
\1. 系统平稳运行过程中,忽然数据库连接量激增
\2. 应用服务器无法及时处理请求
\3. 大量408, 500错误页面出现
\4. 客户反复刷新页面获取数据
\5. 数据库崩溃
\6. 应用服务器崩溃
\7. 重启应用服务器无效
\8. Redis服务器崩溃
\9. Redis集群崩溃
\10. 重启数据库后再次被瞬间流量放倒

问题排查
\1. 在一个较短的时间内,缓存中较多的key集中过期
\2. 此周期内请求访问过期的数据, redis未命中, redis向数据库获取数据
\3. 数据库同时接收到大量的请求无法及时处理
\4. Redis大量请求被积压,开始出现超时现象
\5. 数据库流量激增,数据库崩溃
\6. 重启后仍然面对缓存中无数据可用
\7. Redis服务器资源被严重占用, Redis服务器崩溃
\8. Redis集群呈现崩塌,集群瓦解
\9. 应用服务器无法及时得到数据响应请求,来自客户端的请求数量越来越多,应用服务器崩溃
\10. 应用服务器, redis,数据库全部重启,效果不理想

问题分析
 短时间范围内
 大量key集中过期

解决方案(道)
\1. 更多的页面静态化处理
\2. 构建多级缓存架构
Nginx缓存+redis缓存+ehcache缓存
\3. 检测Mysql严重耗时业务进行优化
对数据库的瓶颈排查:例如超时查询、耗时较高事务等
\4. 灾难预警机制
监控redis服务器性能指标
 CPU占用、 CPU使用率
 内存容量
 查询平均响应时间
 线程数
\5. 限流、降级
短时间范围内牺牲一些客户体验,限制一部分请求访问,降低应用服务器压力,待业务低速运转后再逐步放开访问

解决方案(术)
\1. LRU与LFU切换
\2. 数据有效期策略调整
 根据业务数据有效期进行分类错峰, A类90分钟, B类80分钟, C类70分钟
 过期时间使用固定时间+随机值的形式,稀释集中到期的key的数量
\3. 超热数据使用永久key
\4. 定期维护(自动+人工)
对即将过期数据做访问量分析,确认是否延时,配合访问量统计,做热点数据的延时
\5. 加锁
慎用!

总结
缓存雪崩就是瞬间过期数据量太大,导致对数据库服务器造成压力。如能够有效避免过期时间集中,可以有效解决雪崩现象的出现(约40%),配合其他策略一起使用,并监控服务器的运行数据,根据运行记录做快速调整。

数据一致性问题

缓存与数据库双写时的数据一致性?

一般情况下我们都是这样使用缓存的:先读缓存,缓存没有的话,就读数据库,然后取出数据后放入缓存,同时返回响应。这种方式很明显会存在缓存和数据库的数据不一致的情况。

你只要用缓存,就可能会涉及到缓存与数据库双存储双写,你只要是双写,就一定会有数据一致性的问题,那么你如何解决一致性问题?

一般来说,就是如果你的系统不是严格要求缓存+数据库必须一致性的话,缓存可以稍微的跟数据库偶尔有不一致的情况,最好不要做这个方案,读请求和写请求串行化,串到一个内存队列里去,这样就可以保证一定不会出现不一致的情况

串行化之后,就会导致系统的吞吐量会大幅度的降低,用比正常情况下多几倍的机器去支撑线上的一个请求。

更多内容可以查看:https://github.com/doocs/advanced-java/blob/master/docs/high-concurrency/redis-consistence.md

当更新数据时,如更新某商品的库存,当前商品的库存是100,现在要更新为99,先更新数据库更改成99,然后删除缓存,发现删除缓存失败了,这意味着数据库存的是99,而缓存是100,这导致数据库和缓存不一致。

解决方法:
这种情况应该是先删除缓存,然后再更新数据库,

  • 如果删除缓存失败,那就不要更新数据库,
  • 如果说删除缓存成功,而更新数据库失败,那查询的时候只是从数据库里查了旧的数据而已,这样就能保持数据库与缓存的一致性。

场景:(删完缓存改数据库时被别人又增内存了)在高并发的情况下,如果当删除完缓存的时候,这时A去更新数据库,但A还没有更新完,另外一个请求B来查询数据,B发现缓存里没有,B就去数据库里查,还是以上面商品库存为例,如果数据库中产品的库存是100,那么查询到的库存是100,B然后插入缓存,插入完缓存后,原来那个更新数据库的线程把数据库更新为了99,导致数据库与缓存不一致的情况

解决方法:创建队列让更新操作排队。
遇到这种情况,可以用队列的去解决这个问,创建几个队列,如20个,根据商品的ID去做hash值,然后对队列个数取摸,当有数据更新请求时,先把它丢到队列里去,当更新完后在从队列里去除,如果在更新的过程中,遇到以上场景,先去缓存里看下有没有数据,如果没有,可以先去队列里看是否有相同商品ID在做更新,如果有也把查询的请求发送到队列里去,然后同步等待缓存更新完成

这里有一个优化点,如果发现队列里有一个查询请求了,那么就不要放新的查询操作进去了,用一个while(true)循环去查询缓存,循环个200MS左右,如果缓存里还没有则直接取数据库的旧数据,一般情况下是可以取到的。

在高并发下解决场景二要注意的问题:

(1)读请求时长阻塞

由于读请求进行了非常轻度的异步化,所以一定要注意读超时的问题,每个读请求必须在超时间内返回,该解决方案最大的风险在于可能数据更新很频繁,导致队列中挤压了大量的更新操作在里面,然后读请求会发生大量的超时,最后导致大量的请求直接走数据库,像遇到这种情况,一般要做好足够的压力测试,如果压力过大,需要根据实际情况添加机器。

(2)请求并发量过高

这里还是要做好压力测试,多模拟真实场景,并发量在最高的时候QPS多少,扛不住就要多加机器,还有就是做好读写比例是多少

(3)多服务实例部署的请求路由

可能这个服务部署了多个实例,那么必须保证说,执行数据更新操作,以及执行缓存更新操作的请求,都通过nginx服务器路由到相同的服务实例上

(4)热点商品的路由问题,导致请求的倾斜

某些商品的读请求特别高,全部打到了相同的机器的相同丢列里了,可能造成某台服务器压力过大,因为只有在商品数据更新的时候才会清空缓存,然后才会导致读写并发,所以更新频率不是太高的话,这个问题的影响并不是很大,但是确实有可能某些服务器的负载会高一些。

并发竞争 Key

所谓 Redis 的并发竞争 Key 的问题也就是多个系统同时对一个 key 进行操作,但是最后执行的顺序和我们期望的顺序不同,这样也就导致了结果的不同!

推荐一种方案:分布式锁(zookeeper 和 redis 都可以实现分布式锁)。(如果不存在 Redis 的并发竞争 Key 问题,不要使用分布式锁,这样会影响性能)

基于zookeeper临时有序节点可以实现的分布式锁。大致思想为:每个客户端对某个方法加锁时,在zookeeper上的与该方法对应的指定节点的目录下,生成一个唯一的瞬时有序节点。 判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。 当释放锁的时候,只需将这个瞬时节点删除即可。同时,其可以避免服务宕机导致的锁无法释放,而产生的死锁问题。完成业务流程后,删除对应的子节点释放锁。

在实践中,当然是从以可靠性为主。所以首推Zookeeper。

参考:

性能指标监控

监控指标
 性能指标: Performance
 内存指标: Memory
 基本活动指标: Basic activity
 持久性指标: Persistence
 错误指标: Error

监控指标
 性能指标: Performance

Name

描述

latency

Redis响应一个请求的时间

instantaneous_ops_per_sec

平均每秒处理请求次数

hit rate

缓存命中率(计算出来的)

监控指标
 内存指标: Memory

name

描述

used_memory

已使用内存

mem_fragmentation_ratio

内存碎片率

evited_keys

由于最大内存限制被移除的key值

blocked_clients

由于BLPOP,BRPOP,BRPOPLPUSH而被阻塞的客户端

监控指标
 基本活动指标: Basic activity

Name

描述

connected_clients

客户端连接数

connected_slaves

slave数量

master_last_io_seconds_ago

最近一次主从交互之后的秒数

keyspace

数据库中的key值总数

监控指标
 持久性指标: Persistence

name

描述

rdb_last_save_time

最后一次持久化保存到磁盘的时间戳

rdb_changes_since_last_save

自最后一次持久化以来数据库的更改次数

监控指标
 错误指标: Error

name

rejected_connections

由于达到maxclient限制而被拒绝的连接数

keyspace_misses

key值查找失败(没有命中)次数

master_link_down_since_seconds

主从断开的持续时间(秒)

监控方式
 工具
 Cloud Insight Redis
 Prometheus
 Redis-stat
 Redis-faina
 RedisLive
 zabbix
 命令
 benchmark
 redis cli
 monitor
 showlog

benchmark

 命令

redis-benchmark [-h ] [-p ] [-c ] [-n [-k ]

 范例1

redis-benchmark

说明: 50个连接, 10000次请求对应的性能
 范例2

redis-benchmark -c 100 -n 5000

说明: 100个连接, 5000次请求对应的性能

monitor

 命令

monitor

打印服务器调试信息

showlong

 命令

showlong [operator]

 get :获取慢查询日志
 len :获取慢查询日志条目数
 reset :重置慢查询日志
 相关配置

slowlog-log-slower-than 1000 #设置慢查询的时间下线,单位:微妙
slowlog-max-len 100 #设置慢查询命令对应的日志显示长度,单位:命令数  

常见面试题

为啥Redis那么快?

Redis采用的是基于内存的采用的是单进程单线程模型的 KV 数据库,由C语言编写,官方提供的数据是可以达到100000+的QPS(每秒内查询次数)

  • * 完全基于内存,绝大部分请求是纯粹的内存操作,非常快速。它的,数据存在内存中,类似于HashMapHashMap的优势就是查找和操作的时间复杂度都是O(1);
    • 数据结构简单,对数据操作也简单,Redis中的数据结构是专门进行设计的;
    • 采用单线程,避免了不必要的上下文切换和竞争条件,也不存在多进程或者多线程导致的切换而消耗 CPU,不用去考虑各种锁的问题,不存在加锁释放锁操作,没有因为可能出现死锁而导致的性能消耗;
    • 使用多路I/O复用模型,非阻塞NIO;
    • 使用底层模型不同,它们之间底层实现方式以及与客户端之间通信的应用协议不一样,Redis直接自己构建了VM 机制 ,因为一般的系统调用系统函数的话,会浪费一定的时间去移动和请求;

我可以问一下啥是上下文切换么?

线程切换时线程当前的状态,包括栈等

那他是单线程的,我们现在服务器都是多核的,那不是很浪费?
是的他是单线程的,但是,我们可以通过在单机开多个Redis实例嘛。
既然提到了单机会有瓶颈,那你们是怎么解决这个瓶颈的?
我们用到了集群的部署方式也就是Redis cluster,并且是主从同步读写分离,类似Mysql的主从同步,Redis cluster 支撑 N 个 Redis master node,每个master node都可以挂载多个 slave node
这样整个 Redis 就可以横向扩容了。如果你要支撑更大数据量的缓存,那就横向扩容更多的 master 节点,每个 master 节点就能存放更多的数据了。
哦?那问题就来了,他们之间是怎么进行数据交互的?以及Redis是怎么进行持久化的?Redis数据都在内存中,一断电或者重启不就木有了嘛?
是的,持久化的话是Redis高可用中比较重要的一个环节,因为Redis数据在内存的特性,持久化必须得有,我了解到的持久化是有两种方式的。

  • * RDB:RDB 持久化机制,是对 Redis 中的数据执行周期性的持久化。
    • AOF:AOF 机制对每条写入命令作为日志,以 append-only 的模式写入一个日志文件中,因为这个模式是只追加的方式,所以没有任何磁盘寻址的开销,所以很快,有点像Mysql中的binlog

两种方式都可以把Redis内存中的数据持久化到磁盘上,然后再将这些数据备份到别的地方去,RDB更适合做冷备AOF更适合做热备,比如我杭州的某电商公司有这两个数据,我备份一份到我杭州的节点,再备份一个到上海的,就算发生无法避免的自然灾害,也不会两个地方都一起挂吧,这灾备也就是异地容灾,地球毁灭他没办法。
tip:两种机制全部开启的时候,Redis在重启的时候会默认使用AOF去重新构建数据,因为AOF的数据是比RDB更完整的。
那这两种机制各自优缺点是啥?
我先说RDB
优点:
他会生成多个数据文件,每个数据文件分别都代表了某一时刻Redis里面的数据,这种方式,有没有觉得很适合做冷备,完整的数据运维设置定时任务,定时同步到远端的服务器,比如阿里的云服务,这样一旦线上挂了,你想恢复多少分钟之前的数据,就去远端拷贝一份之前的数据就好了。
RDBRedis的性能影响非常小,是因为在同步数据的时候他只是fork了一个子进程去做持久化的,而且他在数据恢复的时候速度比AOF来的快。
缺点:
RDB都是快照文件,都是默认五分钟甚至更久的时间才会生成一次,这意味着你这次同步到下次同步这中间五分钟的数据都很可能全部丢失掉。AOF则最多丢一秒的数据,数据完整性上高下立判。
还有就是RDB在生成数据快照的时候,如果文件很大,客户端可能会暂停几毫秒甚至几秒,你公司在做秒杀的时候他刚好在这个时候fork了一个子进程去生成一个大快照,哦豁,出大问题。
我们再来说说AOF
优点:
上面提到了,RDB五分钟一次生成快照,但是AOF是一秒一次去通过一个后台的线程fsync操作,那最多丢这一秒的数据。
AOF在对日志文件进行操作的时候是以append-only的方式去写的,他只是追加的方式写数据,自然就少了很多磁盘寻址的开销了,写入性能惊人,文件也不容易破损。
AOF的日志是通过一个叫非常可读的方式记录的,这样的特性就适合做灾难性数据误删除的紧急恢复了,比如公司的实习生通过flushall清空了所有的数据,只要这个时候后台重写还没发生,你马上拷贝一份AOF日志文件,把最后一条flushall命令删了就完事了。

缺点:
一样的数据,AOF文件比RDB还要大。
AOF开启后,Redis支持写的QPS会比RDB支持写的要低,他不是每秒都要去异步刷新一次日志嘛fsync,当然即使这样性能还是很高,我记得ElasticSearch也是这样的,异步刷新缓存区的数据去持久化,为啥这么做呢,不直接来一条怼一条呢,那我会告诉你这样性能可能低到没办法用的,大家可以思考下为啥哟。
那两者怎么选择?

小孩子才做选择,我全都要,你单独用RDB你会丢失很多数据,你单独用AOF,你数据恢复没RDB来的快,真出什么时候第一时间用RDB恢复,然后AOF做数据补全,真香!冷备热备一起上,才是互联网时代一个高健壮性系统的王道。
看不出来年纪轻轻有点东西的呀,对了我听你提到了高可用,Redis还有其他保证集群高可用的方式么?
!!!晕 自己给自己埋个坑(其实是明早就准备好了,故意抛出这个词等他问,就怕他不问)。
假装思考一会(不要太久,免得以为你真的不会),哦我想起来了,还有哨兵集群sentinel
哨兵必须用三个实例去保证自己的健壮性的,哨兵+主从并不能保证数据不丢失,但是可以保证集群的高可用
为啥必须要三个实例呢?我们先看看两个哨兵会咋样。

master宕机了 s1和s2两个哨兵只要有一个认为你宕机了就切换了,并且会选举出一个哨兵去执行故障,但是这个时候也需要大多数哨兵都是运行的。
那这样有啥问题呢?M1宕机了,S1没挂那其实是OK的,但是整个机器都挂了呢?哨兵就只剩下S2个裸屌了,没有哨兵去允许故障转移了,虽然另外一个机器上还有R1,但是故障转移就是不执行。
经典的哨兵集群是这样的:

M1所在的机器挂了,哨兵还有两个,两个人一看他不是挂了嘛,那我们就选举一个出来执行故障转移不就好了。
暖男我,小的总结下哨兵组件的主要功能:

  • * 集群监控:负责监控 Redis master 和 slave 进程是否正常工作。
    • 消息通知:如果某个 Redis 实例有故障,那么哨兵负责发送消息作为报警通知给管理员。
    • 故障转移:如果 master node 挂掉了,会自动转移到 slave node 上。
    • 配置中心:如果故障转移发生了,通知 client 客户端新的 master 地址。

我记得你还提到了主从同步,能说一下主从之间的数据怎么同步的么?
面试官您的记性可真是一级棒呢,我都要忘了你还记得,我特么谢谢你,提到这个,就跟我前面提到的数据持久化的RDBAOF有着比密切的关系了。
我先说下为啥要用主从这样的架构模式,前面提到了单机QPS是有上限的,而且Redis的特性就是必须支撑读高并发的,那你一台机器又读又写,这谁顶得住啊,不当人啊!但是你让这个master机器去写,数据同步给别的slave机器,他们都拿去读,分发掉大量的请求那是不是好很多,而且扩容的时候还可以轻松实现水平扩容。

回归正题,他们数据怎么同步的呢?
你启动一台slave 的时候,他会发送一个psync命令给master ,如果是这个slave第一次连接到master,他会触发一个全量复制。master就会启动一个线程,生成RDB快照,还会把新的写请求都缓存在内存中,RDB文件生成后,master会将这个RDB发送给slave的,slave拿到之后做的第一件事情就是写进本地的磁盘,然后加载进内存,然后master会把内存里面缓存的那些新命名都发给slave。
数据传输的时候断网了或者服务器挂了怎么办啊?
传输过程中有什么网络问题啥的,会自动重连的,并且连接之后会把缺少的数据补上的。
大家需要记得的就是,RDB快照的数据生成的时候,缓存区也必须同时开始接受新请求,不然你旧的数据过去了,你在同步期间的增量数据咋办?是吧?
那说了这么多你能说一下他的内存淘汰机制么,来手写一下LRU代码?

手写LRU?你是不是想直接跳起来说一句:Are U F*_k Kidding me?
这个问题是我在蚂蚁金服三面的时候亲身被问过的问题,不知道大家有没有被怼到过这个问题。
Redis的过期策略,是有_定期删除+惰性删除*两种。
定期好理解,默认100s就随机抽一些设置了过期时间的key,去检查是否过期,过期了就删了。
为啥不扫描全部设置了过期时间的key呢?
假如Redis里面所有的key都有过期时间,都扫描一遍?那太恐怖了,而且我们线上基本上也都是会设置一定的过期时间的。全扫描跟你去查数据库不带where条件不走索引全表扫描一样,100s一次,Redis累都累死了。
如果一直没随机到很多key,里面不就存在大量的无效key了?
好问题,惰性删除,见名知意,惰性嘛,我不主动删,我懒,我等你来查询了我看看你过期没,过期就删了还不给你返回,没过期该怎么样就怎么样。
最后就是如果的如果,定期没删,我也没查询,那可咋整?
内存淘汰机制
官网上给到的内存淘汰机制是以下几个:

  • * noeviction:返回错误当内存限制达到并且客户端尝试执行会让更多内存被使用的命令(大部分的写入指令,但DEL和几个例外)
    • allkeys-lru: 尝试回收最少使用的键(LRU),使得新添加的数据有空间存放。
    • volatile-lru: 尝试回收最少使用的键(LRU),但仅限于在过期集合的键,使得新添加的数据有空间存放。
    • allkeys-random: 回收随机的键使得新添加的数据有空间存放。
    • volatile-random: 回收随机的键使得新添加的数据有空间存放,但仅限于在过期集合的键。
    • volatile-ttl: 回收在过期集合的键,并且优先回收存活时间(TTL)较短的键,使得新添加的数据有空间存放。
      如果没有键满足回收的前提条件的话,策略volatile-lru, volatile-random以及volatile-ttl就和noeviction 差不多了。

至于LRU我也简单提一下,手写实在是太长了,大家可以去Redis官网看看,我把近视LUR效果给大家看看
tip:Redis为什么不使用真实的LRU实现是因为这需要太多的内存。不过近似的LRU算法对于应用而言应该是等价的。使用真实的LRU算法与近似的算法可以通过下面的图像对比。

你可以看到三种点在图片中, 形成了三种带.

  • * 浅灰色带是已经被回收的对象。
    • 灰色带是没有被回收的对象。
    • 绿色带是被添加的对象。
    • LRU实现的理论中,我们希望的是,在旧键中的第一半将会过期。RedisLRU算法则是概率的过期旧的键。

你可以看到,在都是五个采样的时候Redis 3.0比Redis 2.8要好,Redis2.8中在最后一次访问之间的大多数的对象依然保留着。使用10个采样大小的Redis 3.0的近似值已经非常接近理论的性能。
注意LRU只是个预测键将如何被访问的模型。另外,如果你的数据访问模式非常接近幂定律,大部分的访问将集中在一个键的集合中,LRU的近似算法将处理得很好。
其实在大家熟悉的LinkedHashMap中也实现了Lru算法的,实现如下:

当容量超过100时,开始执行LRU策略:将最近最少未使用的 TimeoutInfoHolder 对象 evict 掉。
真实面试中会让你写LUR算法,你可别搞原始的那个,那真TM多,写不完的,你要么怼上面这个,要么怼下面这个,找一个数据结构实现下Java版本的LRU还是比较容易的,知道啥原理就好了。