Redis入门到实战
阅读原文时间:2023年07月10日阅读:3

一、Redis基础

Redis所有的命令都可以去官方网站查看

keys *

查找所有符合给定模式pattern(正则表达式)的 key 。可以进行模糊匹配

del key1,key2,...

删除指定的一批keys,如果删除中的某些key不存在,则直接忽略。被删除的keys的数量

exists key

返回key是否存在。0:key不存在 1:key存在

EXPIRE key seconds

设置key的过期时间,超过时间后,将会自动删除该key。

TTL key

返回key剩余的过期时间。

-1:永不过期

-2:过期或不存在

type key

查看键的类型

Redis中的数据都是以"字符串"形式存储的

Redis的key允许有多个单词组成层级结构,多个单词使用 ‘:’隔开

例如:项目名:业务名:类型:id

SET key

将键key设定为指定的“字符串”值。

如果 key 已经保存了一个值,那么这个操作会直接覆盖原来的值,并且忽略原始类型。

当set命令执行成功之后,之前设置的过期时间都将失效

Get key

根据key获取值value

append key value

将给定的value 追加到原值的末尾

strlen key

获得值的长度

mset k1 v1 k2 v2 k3 v3....

设置多个键值对

mget k1 k2 k3 ......

根据key获取多个值

incr key

给整数型value加一

incrby key step

给整型的key子这自增并且指定步长,

decr key

给整数型value减一

setnx k1 v1

添加一个键值对,不存在则执行,否则不执行

msetnx k1 v1 k2 v2 k3 v3...

同时设置一个或多个 key-value 对,当且仅当所有给定 key 都不存在

setex key sceneds

设置键值的同时,设置过期时间,单位秒

getrange key startIndex endIndex

获得值的范围,类似java中的substring

setrange key startIndex value

用value 覆写key所储存的字符串值,从startIndex开始

getset key value

以新换旧,设置了新值同时获得旧值

lpush/rpush k1 v1 k2 v2 ...

从左边/右边插入一个或多个值

lpop/rpop key

从左边/右边吐出一个值

rpoplpush k1 k2

从k1列表右边吐出一个值,插到k2列表左边

lrange key startIndex endIndex

按照索引下标获得元素(从左到右)

查询所有 0 -1

lindex key index

按照索引下标获得元素(从左到右)

llen key

获得列表长度

linsert key before|after value newvalue

在 value 的后面插入 newvalue 插入值

lrem key n value

从左边删除n个value(从左到右)

Set是string类型的无序集合。它底层其实是一个value为null的hash表,所以添加,删除,查找的复杂度都是O(1)

sadd k1 v1 v2 v3....

将一个或多个 member 元素加入到集合 key 当中,已经存在于集合的 member 元素将被忽略

smembers key

取出该集合的所有值

sismember key value

判断集合 key 是否为含有该 value 值,有返回1,没有返回0

scard key

返回该集合的元素个数

srem key value1 value2....

删除集合中的某个元素

spop key

随机从该集合中吐出一个值

srandmember key n

随机从该集合中取出n个值。

不会从集合中删除

SINTER k1 k2

返回两个集合的交集元素

SUNION k1 k2

返回两个集合的并集元素

SDIFF k2 k1

返回两个集合的差集元素

hash是一个string类型的field和value的映射表,hash特别适合用于存储对象,类似Java里面的Map

hset key filed value

按照hash形式存储内容,添加或者修改hash类型的key的filed的值

HSET qbb:user:1 name qiuqiu

HSET qbb:user:1 age 18

hget key filed

获取一个hash类型的key的filed的值

HGET qbb:user:1 name

HGET qbb:user:1 age

hmset key filed value filed value ...

批量添加多个hash类型的key的filed值

HMSET qbb:user:2 name qq age 18 sex woman

hmget key filed filed filed ...

批量获取多个hash类型的key1的filed值

HMGET qbb:user:2 name age sex

hgetall key

获取一个hashkey中的所有filed和value

HGETALL qbb:user:1

hkeys key

获取hash类型的key中的所有field

hkeys qbb:user:1

hvals key

获取hash类型的key的所有值value

HVALS qbb:user:1

hexists key field

查看哈希表 key 中,给定域 field 是否存在

hincrby key filed

让hash类型的key的字段值自动增长指定步长(负数则自减)

HINCRBY qbb:user:1 age 1

hsetnx key filed value

添加一个hash类型的key的filed值,前提是这个filed不存在,否则不执行

HSETNX qbb:user:1 address wuhan

Redis有序集合zset与普通集合set非常相似,是一个没有重复元素的字符串集合。不同之处是有序集合的每个成员都关联了一个评分(score) ,这个评分(score)被用来按照从最低分到最高分的方式排序集合中的成员。集合的成员是唯一的,但是评分可以是重复了 。

因为元素是有序的, 所以你也可以很快的根据评分(score)或者次序(position)来获取一个范围的元素。访问有序集合的中间元素也是非常快的,因此你能够使用有序集合作为一个没有重复成员的智能列表

zadd key score1 value1 score2 value2

将一个或多个 member 元素及其 score 值加入到有序集 key 当中

zrange key start stop [WITHSCORES]

返回有序集 key 中,下标在 start stop 之间的元素

带WITHSCORES,可以让分数一起和值返回到结果集

zrangebyscore key min max [withscores]

返回有序集 key 中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员。有序集成员按 score 值递增(从小到大)次序排列

zrevrangebyscore key max min [withscores]

同上,改为从大到小排列

zrem key value

删除该集合下,指定值的元素

zcount key min max

统计该集合,分数区间内的元素个数

zrank key value

返回该值在集合中的排名,从0开始

二、Redis实战

@Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public Result sendCode(String phone, HttpSession session) {
        // 1.校验手机号
        if (RegexUtils.isPhoneInvalid(phone)) {
            // 2.如果不符合,返回错误信息
            return Result.fail("手机号格式错误!");
        }
        // 3.符合,生成验证码
        String code = RandomUtil.randomNumbers(6);

        // 4.保存验证码到 session
        stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone, code, LOGIN_CODE_TTL, TimeUnit.MINUTES);

        // 5.发送验证码
        log.debug("发送短信验证码成功,验证码:{}", code);
        // 返回ok
        return Result.ok();
    }

    @Override
    public Result login(LoginFormDTO loginForm, HttpSession session) {
        // 1.校验手机号
        String phone = loginForm.getPhone();
        if (RegexUtils.isPhoneInvalid(phone)) {
            // 2.如果不符合,返回错误信息
            return Result.fail("手机号格式错误!");
        }
        // 3.从redis获取验证码并校验
        String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
        String code = loginForm.getCode();
        if (cacheCode == null || !cacheCode.equals(code)) {
            // 不一致,报错
            return Result.fail("验证码错误");
        }

        // 4.一致,根据手机号查询用户 select * from tb_user where phone = ?
        User user = query().eq("phone", phone).one();

        // 5.判断用户是否存在
        if (user == null) {
            // 6.不存在,创建新用户并保存
            user = createUserWithPhone(phone);
        }

        // 7.保存用户信息到 redis中
        // 7.1.随机生成token,作为登录令牌
        String token = UUID.randomUUID().toString(true);
        // 7.2.将User对象转为HashMap存储
        UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
        Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
                CopyOptions.create()
                        .setIgnoreNullValue(true)
                        .setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString()));
        // 7.3.存储
        String tokenKey = LOGIN_USER_KEY + token;
        stringRedisTemplate.opsForHash().putAll(tokenKey, userMap);
        // 7.4.设置token有效期
        stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES);

        // 8.返回token
        return Result.ok(token);
    }


public class RefreshTokenInterceptor implements HandlerInterceptor {

    private StringRedisTemplate stringRedisTemplate;

    public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        // 1.获取请求头中的token
        String token = request.getHeader("authorization");
        if (StrUtil.isBlank(token)) {
            return true;
        }
        // 2.基于TOKEN获取redis中的用户
        String key  = LOGIN_USER_KEY + token;
        Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
        // 3.判断用户是否存在
        if (userMap.isEmpty()) {
            return true;
        }
        // 5.将查询到的hash数据转为UserDTO
        UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
        // 6.存在,保存用户信息到 ThreadLocal
        UserHolder.saveUser(userDTO);
        // 7.刷新token有效期
        stringRedisTemplate.expire(key, LOGIN_USER_TTL, TimeUnit.MINUTES);
        // 8.放行
        return true;
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        // 移除用户
        UserHolder.removeUser();
    }
}
  • 使用双写模式,先写数据库,在删除缓存

  • 缓存null值

  • 布隆过滤器,注意布隆过滤器的误判,适当时间重建布隆

    /**
    * 根据skuId查询商品详情
    *


    * 使用Redis实现分布式锁:
    * 解决大并发下,缓存击穿|穿透问题
    *
    * @param skuId
    * @return
    */
    public SkuItemTo findSkuItemWithRedisDistLock(Long skuId) {
    // 缓存key
    String cacheKey = RedisConstants.SKU_CACHE_KEY_PREFIX + skuId;
    // 查询缓存
    SkuItemTo data = cacheService.getData(RedisConstants.SKU_CACHE_KEY_PREFIX + skuId, new TypeReference() {
    });
    // 判断是否命中缓存
    if (data == null) {
    // 缓存没有,回源查询数据库.但是这个操作之前先问一下bloom是否需要回源
    if (skuIdBloom.contains(skuId)) {
    // bloom返回true说明数据库中有
    log.info("缓存没有,bloom说有,回源");
    SkuItemTo skuItemTo = null;
    // 使用UUID作为锁的值,防止修改别人的锁
    String value = UUID.randomUUID().toString();
    // 摒弃setnx ,加锁个设置过期时间不是原子的
    // 原子加锁,防止被击穿 分布式锁 设置过期时间
    Boolean ifAbsent = stringRedisTemplate.opsForValue()
    .setIfAbsent(RedisConstants.LOCK + skuId, value, RedisConstants.LOCK_TIMEOUT, TimeUnit.SECONDS);
    if (ifAbsent) {
    try {
    // 设置自动过期时间,非原子的,加锁和设置过期时间不是原子的操作,所以会出现问题
    // stringRedisTemplate.expire(RedisConstants.LOCK, RedisConstants.LOCK_TIMEOUT, TimeUnit.SECONDS);

                        // 大量请求,只有一个抢到锁
                        log.info(Thread.currentThread().getName() + "抢到锁,查询数据库");
                        skuItemTo = findSkuItemDb(skuId); // 执行回源查询数据库
                        // 把数据库中查询的数据缓存里存一份
                        cacheService.saveData(cacheKey, skuItemTo);
                    } finally { // 解锁前有可能出现各种问题导致解锁失败,从而出现死锁
                        // 释放锁,非原子,不推荐使用
                        // String myLock = stringRedisTemplate.opsForValue().get(RedisConstants.LOCK);
                    //删锁: 【对比锁值+删除(合起来保证原子性)】
                    String deleteScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
                    Long executeResult = stringRedisTemplate.execute(new DefaultRedisScript&lt;&gt;(deleteScript, Long.class),
                            Arrays.asList(RedisConstants.LOCK + skuId), value);
    
                    // 判断是否解锁成功
                    if (executeResult.longValue() == 1) {
                        log.info("自己的锁:{},解锁成功", value);
                        stringRedisTemplate.delete(RedisConstants.LOCK);
                    } else {
                        log.info("别人的锁,解不了");
                    }
                }
            } else {
                // 抢锁失败,自旋抢锁. 但是实际业务为我们只需要让让程序缓一秒再去查缓存就好了
                try {
                    log.info("抢锁失败,1秒后去查询缓存");
                    Thread.sleep(1000);
                    data = cacheService.getData(RedisConstants.SKU_CACHE_KEY_PREFIX + skuId, new TypeReference&lt;SkuItemTo&gt;() {
                    });
                    return data;
                } catch (InterruptedException e) {
                }
            }
            return skuItemTo;
        } else {
            log.info("缓存没有,bloom也说没有,直接打回");
            return data;
        }
    }
    log.info("缓存中有数据,直接返回,不回源");
    // 价格不缓存,有些需要变的数据,可以"现用现拿"
    Result&lt;BigDecimal&gt; decimalResult = productFeignClient.findPriceBySkuId(skuId);
    if (decimalResult.isOk()) {
        BigDecimal price = decimalResult.getData();
        data.setPrice(price);
    }
    return data;
    }
  • 利用分布式锁,解决缓存击穿问题

    /**
    * 根据skuId查询商品详情
    * 使用Redisson框架
    *
    * @param skuId
    * @return
    */
    public SkuItemTo findSkuItemWithRedissonLock(Long skuId) {
    // 1.先查询缓存
    String cacheKey = RedisConstants.SKU_CACHE_KEY_PREFIX + skuId;
    SkuItemTo data = cacheService.getData(cacheKey, new TypeReference() {
    });
    // 2.判断是否为null
    if (data == null) {
    // 2.1为null缓存没有,需要回源
    // 2.2回源之前问一下bloom过滤器,是否有必要回源
    boolean contains = skuIdBloom.contains(skuId);
    if (contains) {
    log.info("bloom说有…准备回源");
    // 2.2.1创建一把锁
    RLock lock = redissonClient.getLock(RedisConstants.SKU_LOCK + skuId);
    // 2.2.2数据库中存在对应的ID数据,回源
    boolean tryLock = false;
    try {
    // 2.2.3回源之前先上锁
    tryLock = lock.tryLock();
    if (tryLock) {
    log.info(Thread.currentThread().getName() + ":获取到锁了");
    // 加锁成功
    // 回源,查询数据库是数据
    SkuItemTo skuItemTo = findSkuItemDb(skuId);
    // 缓存中存一份
    cacheService.saveData(cacheKey, skuItemTo);
    // 返回数据
    return skuItemTo;
    }
    } finally {
    // 解锁
    try {
    if (tryLock) lock.unlock();
    } catch (Exception e) {
    log.info("解到别人的锁了");
    }
    }
    // 加锁失败,睡一秒查缓存
    try {
    Thread.sleep(1000);
    data = cacheService.getData(cacheKey, new TypeReference() {
    });
    return data;
    } catch (InterruptedException e) {
    }
    } else {
    log.info("bloom打回");
    // 不存在对应的ID数据,不回源
    return null;
    }
    }
    // 缓存不为空,直接返回数据
    return data;
    }

  • 缓存过期时间加上随机值

    /**
    * 添加数据到缓存
    *
    * @param key
    * @param data
    */
    @Override
    public void saveData(String key, Object data) {
    if (data == null) {
    // 缓存null值,防止缓存穿透.设置缓存过期时间
    stringRedisTemplate.opsForValue().set(key, cacheConfig.getNullValueKey(),
    cacheConfig.getNullValueTimeout(), cacheConfig.getNullTimeUnit());
    } else {
    // 为了防止缓存同时过期,发生缓存雪崩.给每个缓存过期时间加上随机值
    Double value = Math.random() * 10000000L;
    long mill = 1000 * 60 * 24 * 3 + value.intValue();
    stringRedisTemplate.opsForValue().set(key, JsonUtils.toStr(data),
    mill, cacheConfig.getDataTimeUnit());
    }
    }

    @Component
    public class RedisIdWorker {
    /**
    * 开始时间戳
    / private static final long BEGIN_TIMESTAMP = 1640995200L; /*
    * 序列号的位数
    */
    private static final int COUNT_BITS = 32;

    private StringRedisTemplate stringRedisTemplate;
    
    public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }
    
    public long nextId(String keyPrefix) {
        // 1.生成时间戳
        LocalDateTime now = LocalDateTime.now();
        long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
        long timestamp = nowSecond - BEGIN_TIMESTAMP;
    // 2.生成序列号
    // 2.1.获取当前日期,精确到天
    String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
    // 2.2.自增长
    long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
    
    // 3.拼接并返回
    return timestamp &lt;&lt; COUNT_BITS | count;
    }

    }

  • 使用乐观锁解决 stock > 0

    @Transactional
    public void createVoucherOrder(VoucherOrder voucherOrder) {
    // 5.一人一单
    Long userId = voucherOrder.getUserId();

        // 5.1.查询订单
        int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
        // 5.2.判断是否存在
        if (count > 0) {
            // 用户已经购买过了
            log.error("用户已经购买过一次!");
            return;
        }
    // 6.扣减库存
    boolean success = seckillVoucherService.update()
            .setSql("stock = stock - 1") // set stock = stock - 1
            .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // where id = ? and stock &gt; 0
            .update();
    if (!success) {
        // 扣减失败
        log.error("库存不足!");
        return;
    }
    
    // 7.创建订单
    save(voucherOrder);
    }

  • 把下单的功能放入阻塞队列中,实现异步的下单。这样可以更好的提高吞吐量

    /**
    * 创建一个阻塞队列
    */
    private BlockingQueue orderTasks = new ArrayBlockingQueue<>(1024 * 1024);

    /**
     * 创建一个线程池
     */
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
    
    /**
     * 类初始化完成就执行任务,从队列中消费消息,也就是创建订单
     */
    @PostConstruct
    private void init() {
        SECKILL_ORDER_EXECUTOR.execute(new VoucherOrderHandler());
    }
    
    /**
     * 创建一个任务处理器
     */
    private class VoucherOrderHandler implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    // 1.获取队列中的订单信息
                    VoucherOrder voucherOrder = orderTasks.take();
                    // 2.创建订单
                    handlerVoucherOrder(voucherOrder);
                } catch (InterruptedException e) {
                    log.error("订单异常:{}", e);
                }
            }
        }
    }
    
    private void handlerVoucherOrder(VoucherOrder voucherOrder) {
        // 获取用户ID
        Long userId = voucherOrder.getUserId();
        // 创建锁对象
        RLock redisLock = redissonClient.getLock("order:" + userId);
        // 获取锁
        boolean isLock = redisLock.tryLock();
        // 判断获取锁是否成功
        if (!isLock) {
            // 获取锁失败
            log.error("不允许重复下单!!!");
            return;
        }
        try {
            proxy.createVoucherOrder(voucherOrder);
        } finally {
            // 释放锁
            redisLock.unlock();
        }
    }
  • 基于JVM阻塞队列实现异步秒杀会有两个大的问题

    • 内存限制
    • 数据安全问题,由于是内存操作,所以宕机订单就丢失了

基于PubSub的消息队列有哪些优缺点?

  • 优点:

    • 采用发布订阅模型,支持多生产、多消费
  • 缺点:

    • 不支持数据持久化
    • 无法避免消息丢失
    • 消息堆积有上限,超出时数据丢失

  • 上面的Stream方式会出现漏读消息的情况,所以下面使用Stream的ConsumerGroup(消费者组的概念)实现

  • 代码实现流程

    /**
    * 类初始化完成就执行任务
    */
    @PostConstruct
    private void init() {
    SECKILL_ORDER_EXECUTOR.execute(new VoucherOrderHandler());
    }

    /**
     * 创建一个任务处理器,获取消息队列中的消息
     */
    private class VoucherOrderHandler implements Runnable {
        // 队列名
        String queueName = "stream.orders";
    @Override
    public void run() {
        while (true) {
            try {
                // 1.获取消息队列中的订单信息
                List&lt;MapRecord&lt;String, Object, Object&gt;&gt; list = redisTemplate.opsForStream()
                        .read(Consumer.from("g1", "c1"),
                                StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                                StreamOffset.create(queueName, ReadOffset.lastConsumed())
                        );
                // 2.判断一下消息是否获取成功
                if (list == null || list.isEmpty()) {
                    continue;
                }
                // 获取订单信息
                MapRecord&lt;String, Object, Object&gt; entries = list.get(0);
                Map&lt;Object, Object&gt; map = entries.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(map, new VoucherOrder(), true);
                // 3.创建订单
                handlerVoucherOrder(voucherOrder);
                // 4.ACK确认
                redisTemplate.opsForStream().acknowledge(queueName, "g1", entries.getId());
            } catch (Exception e) {
                log.error("订单异常:{}", e);
                // 从pending-list中获取消息
                handlePendingList();
            }
        }
    }
    
    /**
     * 处理消费失败的消息
     */
    private void handlePendingList() {
        while (true) {
            try {
                // 1.获取pending-list中的订单信息
                List&lt;MapRecord&lt;String, Object, Object&gt;&gt; list = redisTemplate.opsForStream().read(
                        Consumer.from("g1", "c1"),
                        StreamReadOptions.empty().count(1),
                        StreamOffset.create(queueName, ReadOffset.from(")"))
                );
                // 2.判断一下消息是否获取成功
                if (list == null || list.isEmpty()) {
                    // pending-list中没有消息
                    break;
                }
                // 获取订单信息
                MapRecord&lt;String, Object, Object&gt; entries = list.get(0);
                Map&lt;Object, Object&gt; map = entries.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(map, new VoucherOrder(), true);
                // 3.创建订单
                handlerVoucherOrder(voucherOrder);
                // 4.ACK确认
                redisTemplate.opsForStream().acknowledge(queueName, "g1", entries.getId());
            } catch (Exception e) {
                log.error("处理pending-list的订单异常:{}", e);
                try {
                    Thread.sleep(200);
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
    }
  • 一个用户只能点赞一次

    /**
    * 修改点赞数量
    *
    * @param id
    * @return
    */
    @Override
    public Result likeBlog(Long id) {
    // 获取登录用户
    UserDTO user = UserHolder.getUser();
    String userId = user.getId().toString();
    // 判断当前登录用户是否点赞过
    String key = "blog:liked:" + id;
    Boolean isLike = redisTemplate.opsForSet().isMember(key, userId);
    // 判断是否点赞过
    if (BooleanUtil.isFalse(isLike)) {
    // 未点赞
    // 数据库+1
    boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
    // 将用户id保存到redis
    if (isSuccess) {
    redisTemplate.opsForSet().add(key, userId);
    }
    } else {
    // 已点赞
    // 数据库-1
    boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
    // 将用户id从redis中移除
    if (isSuccess) {
    redisTemplate.opsForSet().remove(key, userId);
    }
    }
    return Result.ok();
    }

  • 参考朋友圈点赞

    /**
    * 修改点赞数量 sorted set集合
    *
    * @param id
    * @return
    */
    @Override
    public Result likeBlog(Long id) {
    // 获取登录用户
    UserDTO user = UserHolder.getUser();
    String userId = user.getId().toString();
    // 判断当前登录用户是否点赞过
    String key = RedisConstants.BLOG_LIKED_KEY + id;
    Double score = redisTemplate.opsForZSet().score(key, userId);
    // 判断是否点赞过
    if (score == null) {
    // 未点赞
    // 数据库+1
    boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
    // 将用户id保存到redis
    if (isSuccess) {
    redisTemplate.opsForZSet().add(key, userId, System.currentTimeMillis());
    }
    } else {
    // 已点赞
    // 数据库-1
    boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
    // 将用户id从redis中移除
    if (isSuccess) {
    redisTemplate.opsForZSet().remove(key, userId);
    }
    }
    return Result.ok();
    }

    /**
     * 查询blog点赞的人
     *
     * @param id
     * @return
     */
    @Override
    public Result queryBlogLikes(Long id) {
        // 获取缓存key
        String key = RedisConstants.BLOG_LIKED_KEY + id;
        // 从zset中查询点赞前5名
        Set<String> top5 = redisTemplate.opsForZSet().range(key, 0, 4);
        if (top5 == null || top5.isEmpty()) {
            return Result.fail("没有点赞用户!!!");
        }
        // 解析用户id
        List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
        // 查询用户
        List<User> users = userService.listByIds(ids);
        List<UserDTO> userDTOList = users.stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class))
                .collect(Collectors.toList());
        return Result.ok(userDTOList);
    }
  • 查看两个人的共同好友

    /**
    * 关注或取关
    *
    * @param followUserId
    * @param isFollow
    * @return
    */
    @Override
    public Result follow(Long followUserId, Boolean isFollow) {
    // 获取登录用户
    Long userId = UserHolder.getUser().getId();
    // 判断是关注还是取关
    if (isFollow) {
    // 关注
    Follow follow = new Follow();
    follow.setUserId(userId);
    follow.setFollowUserId(followUserId);
    // 保存关注信息到数据库
    boolean isSuccess = save(follow);
    if (isSuccess) {
    // 把关注的用户存入Redis
    redisTemplate.opsForSet().add(RedisConstants.FOLLOW_USER_PREFIX + userId, followUserId.toString());
    }
    } else {
    // 取关
    boolean isSuccess = remove(Wrappers.lambdaQuery().eq(Follow::getUserId, userId).eq(Follow::getFollowUserId, followUserId));
    if (isSuccess) {
    // 从Redis中删除关注的用户
    redisTemplate.opsForSet().remove(RedisConstants.FOLLOW_USER_PREFIX + userId, followUserId.toString());
    }
    }
    return Result.ok();
    }

    /**
     * 查询是否关注
     *
     * @param followUserId
     * @return
     */
    @Override
    public Result isFollow(Long followUserId) {
        // 获取登录用户
        Long userId = UserHolder.getUser().getId();
        // 查询是否关注
        LambdaQueryWrapper<Follow> wrapper = Wrappers.<Follow>lambdaQuery().eq(Follow::getUserId, userId).eq(Follow::getFollowUserId, followUserId);
        int count = count(wrapper);
        return Result.ok(count > 0);
    }
    
    /**
     * 查询共同关注
     *
     * @param id
     * @return
     */
    @Override
    public Result followCommons(Long id) {
        // 获取登录用户
        Long userId = UserHolder.getUser().getId();
        // 当前用户的好友集合
        String key1 = RedisConstants.FOLLOW_USER_PREFIX + userId;
        // 点击查看感兴趣用户的好友集合
        String key2 = RedisConstants.FOLLOW_USER_PREFIX + id;
        // 获取共同好友
        Set<String> set = redisTemplate.opsForSet().intersect(key1, key2);
        if (set == null || set.isEmpty()) {
            return Result.ok(Collections.emptyList());
        }
        List<Long> ids = set.stream().map(Long::valueOf).collect(Collectors.toList());
        List<UserDTO> userDTOList = listByIds(ids)
                .stream()
                .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
                .collect(Collectors.toList());
        return Result.ok(userDTOList);
    }
  • 在用户发送文章是,推送给关注了此用户的好友

  • Feed流有三种实现方式

    • 拉模式 (不推荐)
    • 推模式
    • 推拉结合模式
  • 基于推模式实现关注推送功能

    /**
    * 保存blog,并推送给粉丝
    *
    * @param blog
    * @return
    */
    @Override
    public Result saveBlog(Blog blog) {
    // 获取登录用户
    UserDTO user = UserHolder.getUser();
    blog.setUserId(user.getId());
    // 保存探店博文
    boolean isSuccess = save(blog);
    if (isSuccess) {
    return Result.fail("新增Blog失败!!!");
    }
    // 查询此用户的粉丝
    List followList = followService.list(Wrappers.lambdaQuery().eq(Follow::getFollowUserId, user.getId()));
    // 把此用户发布的blog发送给粉丝
    followList.stream().peek(follow -> {
    // 获取粉丝的ID
    Long userId = follow.getUserId();
    // 推送
    String key = RedisConstants.FOLLOW_FEEF_PREFIX + userId;
    redisTemplate.opsForZSet().add(key, blog.getId().toString(), System.currentTimeMillis());
    });
    return Result.ok(blog.getId());
    }

  • 使用SortedSet的Score实现Feed流的滚动分页

    /**
    * 好友关注,推送消息
    *
    * @param max
    * @param offset
    * @return
    */
    @Override
    public Result queryBlogOfFollow(Long max, Integer offset) {
    // 获取当前用户
    Long userId = UserHolder.getUser().getId();
    // 封装key
    String key = RedisConstants.FOLLOW_FEEF_PREFIX + userId;
    Set> typedTuples = redisTemplate.opsForZSet()
    .reverseRangeByScoreWithScores(key, 0, max, offset, 3);
    // 非空判断一下
    if (typedTuples == null || typedTuples.isEmpty()) {
    return Result.ok();
    }
    // 解析数据
    List ids = new ArrayList<>(typedTuples.size());
    long minTime = 0;
    int os = 1;
    for (ZSetOperations.TypedTuple typedTuple : typedTuples) {
    String idStr = typedTuple.getValue();
    ids.add(Long.valueOf(idStr));
    long time = typedTuple.getScore().longValue();
    if (time == minTime) {
    os++;
    } else {
    minTime = time;
    os = 1;
    }
    }
    // 根据id查询blog
    String idStr = StrUtil.join(",", ids);
    List blogs = query()
    .in("id", ids)
    .last("ORDER BY FIELD(id," + idStr + ")")
    .list();

        blogs.forEach(blog -> {
            // 查询用户
            queryBlogUser(blog);
            // 查询是否点赞
            isBlogLiked(blog);
        });
    // 封装返回的数据
    ScrollResult scrollResult = new ScrollResult();
    scrollResult.setList(blogs);
    scrollResult.setMinTime(minTime);
    scrollResult.setOffset(offset);
    return Result.ok(scrollResult);
    }
  • 导入坐标数据到Redis中

    /**
    * 导入坐标数据
    / @Test public void geoTest() { // 查询店铺信息 List shopList = shopServiceImpl.list(); // 分组 Map> map = shopList.stream().collect(Collectors.groupingBy(Shop::getTypeId)); // 写入redis for (Map.Entry> entry : map.entrySet()) { Long typeId = entry.getKey(); List value = entry.getValue(); // 组装key String key = RedisConstants.SHOP_GEO_PREFIX + typeId; // 写redis / for (Shop shop : value) {
    redisTemplate.opsForGeo().add(key, new Point(shop.getX(), shop.getY()), shop.getId().toString());
    }*/

            // 改进写法,销量高一点
            List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>(value.size());
            for (Shop shop : value) {
                locations.add(new RedisGeoCommands.GeoLocation<>(shop.getId().toString(), new Point(shop.getX(), shop.getY())));
            }
            redisTemplate.opsForGeo().add(key, locations);
        }
    }
  • 实现附近商家功能,注意一点Redis的版本≥6.2

    org.springframework.boot spring-boot-starter-data-redis lettuce-core io.lettuce spring-data-redis org.springframework.data

    org.springframework.data spring-data-redis 2.7.2

    io.lettuce lettuce-core 6.2.0.RELEASE

  • 核心代码

    /**
    * 根据商铺类型分页查询商铺信息
    *
    * @param typeId 商铺类型
    * @param current 页码
    * @param x 经度
    * @param y 纬度
    * @return 商铺列表
    */
    @Override
    public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {
    // 判断是否需要根据坐标查询
    if (x == null || y == null) {
    Page page = query()
    .eq("type_id", typeId)
    .page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));
    // 返回数据
    return Result.ok(page.getRecords());
    }
    // 分页参数
    int from = (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE;
    int end = current * SystemConstants.DEFAULT_PAGE_SIZE;
    // 查询Redis,按照距离排序、分页
    String key = RedisConstants.SHOP_GEO_PREFIX + typeId;
    GeoResults> results = stringRedisTemplate.opsForGeo()
    // 搜索范围new Distance(5000),5000m
    .search(key,
    GeoReference.fromCoordinate(x, y),
    new Distance(5000),
    RedisGeoCommands
    .GeoRadiusCommandArgs
    .newGeoRadiusArgs()
    .includeDistance()
    .limit(end));
    if (results == null) {
    return Result.ok(Collections.emptyList());
    }
    // 解析ID
    List>> content = results.getContent();
    // 判断是否还有下一页
    if (content.size() <= from) { return Result.ok(Collections.emptyList()); } // 截取from ~ end的部分 List ids = new ArrayList<>(content.size());
    Map distanceMap = new HashMap<>(content.size());
    content.stream().skip(from).forEach(item -> {
    // 商品ID
    String shopId = item.getContent().getName();
    ids.add(Long.valueOf(shopId));
    // 距离
    Distance distance = item.getDistance();
    distanceMap.put(shopId, distance);
    });
    // 根据ID批量查询shop
    String idStr = StrUtil.join(",", ids);
    List shopList = query()
    .in("id", ids)
    .last("ORDER BY FIELD(id," + idStr + ")")
    .list();
    for (Shop shop : shopList) {
    shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());
    }
    return Result.ok(shopList);
    }

  • BitMap基本命令

    /**
    * 用户签到
    *
    * @return
    */
    @Override
    public Result sign() {
    // 获取当前用户
    Long userId = UserHolder.getUser().getId();
    // 获取日期
    LocalDateTime now = LocalDateTime.now();
    // 拼接key
    String keyPrefix = now.format(DateTimeFormatter.ofPattern("yyyyMM"));
    String key = RedisConstants.USER_SIGN_PREFIX + userId + keyPrefix;
    // 获取当前日期是本月的第几天
    int dayOfMonth = now.getDayOfMonth();
    // 写入redis,签到
    stringRedisTemplate.opsForValue().setBit(key, dayOfMonth - 1, true);
    return Result.ok();
    }

  • 统计签到次数,连续签到次数

    /**
    * 统计连续签到次数
    *
    * @return
    */
    @Override
    public Result signCount() {
    // 获取当前用户
    Long userId = UserHolder.getUser().getId();
    // 获取日期
    LocalDateTime now = LocalDateTime.now();
    // 拼接key
    String keyPrefix = now.format(DateTimeFormatter.ofPattern("yyyyMM"));
    String key = RedisConstants.USER_SIGN_PREFIX + userId + keyPrefix;
    // 获取当前日期是本月的第几天
    int dayOfMonth = now.getDayOfMonth();
    // 获取本月截止今天为止所有的签到记录
    List result = stringRedisTemplate.opsForValue()
    .bitField(key, BitFieldSubCommands.create()
    .get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0));
    if (result == null || result.isEmpty()) {
    return Result.ok(0);
    }
    Long num = result.get(0);
    if (num == null || num == 0) {
    return Result.ok(0);
    }
    // 定义一个计数器
    int count = 0;
    // 循环遍历
    while (true) {
    // 让这个数字和1做与运算,未签到,结束
    if ((num & 1) == 0) {
    // 如果为0,说明未签到,结束
    break;
    } else {
    // 不为0,说明签到了,计数器加一
    count++;
    }
    // 把数字右移一位,抛弃最后以为bit位,继续下一位bit位
    num >>>= 1; // 无符号右移一位
    }
    // 返回,签到计数器
    return Result.ok(count);
    }

  • UV:全称Unique Visitor 页脚独立访客量,是指通过互联网访问,浏览这个网页的自然人。一天内同一个用户多次访问该网站,只记录一次

  • PV:全称Page View ,页脚页面访问量或点击量,用户每访问网站的一个页面,记录一次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。

HyperLogLog基本用法

  • 天生唯一性,内存占用永远小于16kb

    @Test
    public void testHyperLogLog() {
    String[] values = new String[1000];
    int j = 0;
    for (int i = 0; i < 1000000; i++) {
    j = i % 1000;
    values[j] = "qiu_" + i;
    if (j == 999) {
    redisTemplate.opsForHyperLogLog().add("hl", values);
    }
    }
    // 统计数量
    Long count = redisTemplate.opsForHyperLogLog().size("hl");
    System.out.println("count = " + count);
    }

到这,入门到实战篇整理完毕,推荐大家参考黑马程序员虎哥讲的Redis,讲的很好。后面的高级篇和原理篇后面再整理,每天只能靠下班时间多学习学习了,各位小伙伴加油呀~~~我喜欢的人在很远的地方,我必须更加努力!!!