codetc - 网站开发技术 首页 后端 PHP 查看内容

使用Redis实现分布式锁

2019-7-31 17:18| 发布者: CODETC| 查看: 2835| 评论: 0

锁在我们的日常开发通常用来解决资源并发的问题。特别是在集群情况下,资源争抢的问题。但是在锁的处理上稍微不注意便会犯一些问题。今天我们来了解下锁在PHP开发中的应用。

一、Redis 锁错误使用之一

我曾经见过有的项目把查询结果存储到 Redis 当中的伪代码如下:

$redis    = new Redis('127.0.0.1', 6379);
$cacheKey = 'query_cache';
$result   = $redis->get($cacheKey);
if ($result) { // 缓存有效则直接返回。
    return $result;
} else { // 缓存失效则重新获取并存储到 Redis。
    $mysqlResult = []; // 这一步我省了。
    $redis->set($cacheKey, json_encode($mysqlResult), 3600);
    return $mysqlResult;
}

初看这个代码好像并没有什么问题。通常情况下,当服务器资源压力非常小的时候,这段代码不会有任何问题。并且,真的可以提升服务器吞吐性能。

但假如,这个位置的代码出现了单点压力呢?比如,这个功能是统计结果,查询数据库需要花 5s。而且,由于该功能比较常用,单位时间内达到了 1000 次/秒。

这时就会出现缓存穿透问题。

1000 个请求同时到达这个程序位置,都去读取缓存是否存在。假如此时缓存不存在。这 1000 个请求都会得不到缓存结果。并且都会执行到去数据库取缓存结果的步骤。同时也会把结果重写到 Redis。

那就导致了这一瞬间单点压力导致穿透到数据库,造成数据库压力瞬间到达峰值。如果我们的数据库的性能处理不了这么大的压力,就会导致数据库服务器 CPU 直接爆满。响应给前端的数据就会陷入停顿状态。

所以,这段代码是不正确的锁使用。

二、Redis 锁错误使用之二

在第一点中,我们发现了问题。于是,就有人想着去优化它。于是就有了下面的代码:

$redis    = new Redis('127.0.0.1', 6379);
$lockKey  = 'query_cache_lock'; // 锁专用的 KEY。
$cacheKey = 'query_cache'; // 存储查询结果的 KEY。
$result   = $redis->get($cacheKey);
if ($result) { // 缓存有效则直接返回。
    return $result;
} else { // 缓存失效则重新获取并存储到 Redis。
    if ($redis->setNx($lockKey) === false) {
        throw new Exception("服务器火爆,请稍候重试");
    } else {
        $mysqlResult = []; // 这一步我省了。
        $redis->set($cacheKey, json_encode($mysqlResult), 3600);
        $redis->delete($lockKey); // 锁用完了要解锁。删掉就是解锁。
        return $mysqlResult;
    }
}

这段代码就完全避免了第一点中的缓存穿透的问题。

即使如此,这段代码依然存在三个问题:
1)并发越大,第一个取到锁的请求能正常响应,后续的请求就会得到一个“服务器火爆,请稍候重试”的异常提示。
2)没办法对后续请求取锁失效加一个等待时间。
3)如果代码执行到 $redis->delete($lockKey) 之前程序异常了。那么锁就不能正常释放,造成死锁。后续的访问也无法正常取到锁了。

针对第 1) 点,这个是用户体验极差的。
针对第 2) 点,它是解决第一点的方案。
针对第 3) 点,它是我们必须解决的问题。否则,我们的分布式锁将无法正常使用。

三、正确的分布式锁
正常的分布式锁要满足以下几点要求:
1)能解决并发时资源争抢。这是最核心的需求。
2)锁能正常获取与释放。不能出现死锁。
3)锁能实现等待,否则不能最大保证用户的体验。

针对以上三点,我们可以使用redis实现一个分布式锁的工具类。

class RedisMutexLock
{

    private $_redis;

    public __construct($config) {
        $this->_redis = new Redis();
        $this->_redis->connect($config['host'], $config['port'], $config['timeout']);
    }


    /**
     * 获得锁,如果锁被占用,阻塞,直到获得锁或者超时。
     * -- 1、如果 $timeout 参数为 0,则立即返回锁。
     * -- 2、建议 timeout 设置为 0,避免 redis 因为阻塞导致性能下降。请根据实际需求进行设置。
     *
     * @param  string  $key         缓存KEY。
     * @param  int     $timeout     取锁超时时间。单位(秒)。等于0,如果当前锁被占用,则立即返回失败。如果大于0,则反复尝试获取锁直到达到该超时时间。
     * @param  int     $lockSecond  锁定时间。单位(秒)。
     * @param  int     $sleep       取锁间隔时间。单位(微秒)。当锁为占用状态时。每隔多久尝试去取锁。默认 0.1 秒一次取锁。
     * @return bool 成功:true、失败:false
     */
    public function lock($key, $timeout = 0, $lockSecond = 20, $sleep = 100000)
    {
        if (empty($key)) {
            throw new Exception("Lock Key Is Empty.");
        }

        $start = $this->getMicroTime();
        do {
            $acquired = $this->_redis->set("Lock:{$key}", 1, ['NX', 'EX' => $lockSecond]);
            if ($acquired) {
                break;
            }
            if ($timeout === 0) {
                break;
            }
            usleep($sleep);
        } while (!is_numeric($timeout) || ($this->getMicroTime()) < ($start + ($timeout * 1000000)));
        return $acquired ? true : false;
    }

    /**
     * 释放锁
     *
     * @param  mixed  $key  被加锁的KEY。
     * @return void
     */
    public function release($key)
    {
        if (empty($key)) {
            throw new Exception("Lock Key Is Empty.");
        }

        $this->_redis->del("Lock:{$key}");
    }

    /**
     * 获取当前微秒。
     *
     * @return bigint
     */
    protected function getMicroTime()
    {
        return bcmul(microtime(true), 1000000);
    }

    /**
     * 释放redis连接
     */
    public function __destruct() {
        if ($this->_redis) {
            $this->_redis->close();
        }
    }
}

获取锁之后一定要记得手动释放锁。虽然,锁能自己释放。
文章来源 CODETC,欢迎分享,转载请注明地址: http://www.codetc.com/article-361-1.html

最新评论

 作为游客发表评论,请输入您的昵称

返回顶部