安装环境

mac上可以先安装一个cli端。然后下载docker并运行。

如果没有配置文件、可以先启动容器并且进入容器cp出来

docker pull redis:5.0.4-alpine
#redis本地新建一个redis的配置目录/Users/apple/conf/

cp /usr/local/etc/redis.conf /Users/apple/conf/

docker run -v /Users/apple/conf/:/usr/local/etc/redis/ -d --rm -p 6379:6379 --name myredis redis:5.0.4-alpine redis-server

或者使用

docker run -v /Users/apple/conf/:/usr/local/etc/redis/ -d --rm -p 6379:6379 --name myredis redis:5.0.4-alpine sh -c"redis-server /usr/local/etc/redis/redis.conf"

查看版本

redis-cli -h 127.0.0.1 info | grep 'redis_version'
#显示里面的版本
redis_version:5.0.4

商品服务举例图

图

先进入的先出。

Redis 列表(List)

lrange key 0 -1 获取列表左右元素

命令:LPUSH 、将一个或多个值插入到列表头部

语法格式:LPUSH key value1 [value2]

➜ vagrant_openresty_swoole redis-cli -h 127.0.0.1
127.0.0.1:6379> lpush order id001 id002
(integer) 2
127.0.0.1:6379> lrange order 0 -1
1) “id002”
2) “id001”
127.0.0.1:6379>

命令: RPOP 移出并获取列表的第一个元素

语法格式:RPOP key

127.0.0.1:6379> rpop order
“id001”
127.0.0.1:6379> lrange order 0 -1
1) “id002”
127.0.0.1:6379>

命令:BRPOP

移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。

语法格式:BRPOP key1 [key2 ] timeout

图

启动N个服务、死循环同事取列队的订单信息

function start(redis $redis_client)
{
    while(true) {
        //如果没有数据则阻塞10后运行
        $res = $redis_client->brPop(["orders"],10);
        if($res && $res[0])
        {
            if($res[1]==="pn002") sleep(5);
            echo "order_no=".$res[1]." done".PHP_EOL;
            usleep(500*1000); //休眠500毫秒
            echo "restart ".PHP_EOL;
        }
        else
            continue;


    }

}

如果出现断电等是事情、那么会存在丢失数据的行为。可以在设置个备份list来解决、

例子:N1死循环服务、取出一个数据、放到N1备份list里面

图

用到的命令:Brpoplpush

Brpoplpush 命令从列表中取出最后一个元素,并插入到另外一个列表的头部; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。

代码例子:

function start(redis $redis_client)
{
    global $myid;
    //先处理 上次没有处理的 key
    echo "find baklist".PHP_EOL;
    while(true) // 应该使用swoole 等 开一个异步任务或者进程,专门对自己的 备份list做长期监听
    {
        $bak_res = $redis_client->brPop(["orders".$myid],1); //从自己的备份队列中获取 ,过期时间设置短一些
        if($bak_res && $bak_res[0])
        {
            doJob($bak_res[1],$redis_client,true); //是自定义处理函数
            usleep(500*1000); //休眠500毫秒
        }
        else //如果没取到值 说明 备份队列中 木有 了,则跳出循环
            break;
    }
    echo "baklist done".PHP_EOL;
    echo "begin orders ".PHP_EOL;
    //接下来开始继续
    while(true) {
        $res = $redis_client->brpoplpush("orders","orders".$myid,10); //注意,这个函数直接返回的是值
        if($res)
        {
            doJob($res,$redis_client); //是处理函数
            usleep(500*1000); //休眠500毫秒

        }
        else
            continue;


    }

}

function doJob($orderNo,redis $redis_client,$isBak=false) //isBak决定了 处理过程是否是 备份队列处理 ,如果是 有些步骤不需要执行
{
    global $myid;
    try{
        if($orderNo=="pn023") throw new MyException("error");
        sleep(3); //假装 干的很耗时,很辛苦
        if($isBak) //如果是 备份队列处理,显示不一样的字符,仅此而已
            echo "backjob order_no=";
        else
        echo "order_no=";

        echo $orderNo." done".PHP_EOL;
        if(!$isBak)
            $redis_client->lPop("orders".$myid);   //注意这里,处理完后,要删掉 备份列表左边的第一个元素
    }
    catch (MyException $myException)
    {
        if(!$isBak){
            //这里要判断 是什么类型的exception ,譬如超时等才能塞回去,这个机制是自己定的
            $redis_client->rPush("orders",$orderNo);//塞回原队列
            $redis_client->lPop("orders".$myid);
            echo "push back ".PHP_EOL;
            sleep(3);//休眠3秒,让其他 死循环程序来获取
        }

    }
    catch (Exception $ex)
    {

        echo "err".$ex->getMessage().PHP_EOL;



    }

}

延迟列队? 比方说新闻的延迟发布

利用redis的有序集合(sorted set) 用分值来存时间戳、进行排序

用zadd添加 、可以是一张新闻表的id也和是表和id组合取出来进行切割

127.0.0.1:6379> ZREVRANGEBYSCORE newsList +inf -inf WITHSCORES
1) "question_news@1"
2) "4"
3) "article_news@2"
4) "2"
5) "product_news@1"
6) "1"
127.0.0.1:6379> zadd newsList  20  question_news@3
127.0.0.1:6379> ZREVRANGEBYSCORE newsList +inf -inf
1) "question_news@3"
2) "question_news@1"
3) "article_news@2"
4) "product_news@1"

ZREVRANGEBYSCORE newsList +inf -inf 倒序排列

PHP代码可以这样写

$redis->client()->zRangeByScore("newsList","-inf",time(),['limit'=>[0,10]]);

也可以做订单指定时间未支付关闭订单

结合列队实现熔断器

条件、适用于php-fpm下

图

熔断器是对电器起到了保护作用。

图

熔断器的功能、常用语rpc服务。

那么问题来了!假设有个不断运行的主程序?在某个点调用函数A假设函数A抛出了异常。 —那么明知道A函数有异常?那么是否反复去调用能呢?

答:会、因为程序不是人脑,没有那么灵活,除非修改源码。

熔断器上场

  • 主程序不直接调 A,而是让熔断器调用A
  • A如果有异常,熔断器记录
  • 超过一定次数,则熔断器打开,下次则调用备用函数

图

代码例子

class CircuitBreaker{
    private $zSetKey="circuit";

    public function invoke(object $class,string $method,array $params,callable $fallback){
        global $redis;
        try{
           return $class->$method(...$params);
        }
        catch (Throwable $ex){
            $member=get_class($class)."_".$method;
            $redis->client()->zIncrBy($this->zSetKey,1,$member);
            return $fallback();//函数降级
        }
    }
}
function cbHandler(Exception $ex){
  throw new Exception($ex->getMessage());
}
set_error_handler("cbHandler",E_ALL);


class myclass{
    public function test($str){
        return file_get_contents("aaaa");
    }
}
$c=new myclass();
//echo $c->test("abc");

echo (new CircuitBreaker())->invoke($c,"test",['bcd'],function(){ return "fallback";});

整体的原理、是代理模式。外面包裹一个try_catch运行。错误进行捕获。redis进行计数。

更新到达一定的次数、直接调用降级函数。

熔断器的三种状态

半开状态、简单代码例子

define("BreakerStateOpen",1); //开
define("BreakerStateClose",2); //关,这是默认值
define("BreakerStateHalfOpen",3); //半开
class CircuitBreaker{
    private $zSetKey="circuit"; //记录错误次数的key
    private $zSetKey_open="circuit_open"; //熔断器从开到半开的状态key
    public $failCount=3;//表示失败次数>=该值 则不再访问原函数

    public $openTime=20; //20秒后自动进入半开状态
    public function invoke(object $class,string $method,array $params,callable $fallback){
        global $redis;
        $member=get_class($class)."_".$method;
        try{
            if($this->getState($member)==BreakerStateOpen)
                return $fallback()."开状态";
            if($this->getState($member)==BreakerStateHalfOpen)
                return $fallback()."半开状态";

            return $class->$method(...$params);
        }
        catch (Throwable $ex){
         $score=$redis->client()->zIncrBy($this->zSetKey,1,$member);
         if($score>=$this->failCount){//进入了开状态,设置一个定时器,进入半开状态
             $redis->client()->zAdd($this->zSetKey_open,time()+$this->openTime,$member);
         }
         return $fallback();//函数降级
        }
    }
    private function getState($member) //判断是否失败,从此不再访问原函数
    {
        global $redis;
        $getScore=$redis->client()->zScore($this->zSetKey,$member);
        if($getScore>=$this->failCount) return BreakerStateOpen;//开状态
        if($getScore==-1) return BreakerStateHalfOpen;//如果值是-1 则代表是半开状态
        return BreakerStateClose;
    }



}
function cbHandler(Exception $ex){
  throw new Exception($ex->getMessage());
}
class myclass{
    public function test($str){
        return file_get_contents("aaaa");
    }
}
$c=new myclass();
//echo $c->test("abc");

echo (new CircuitBreaker())->invoke($c,"test",['bcd'],function(){ return "fallback";});
set_error_handler("cbHandler",E_ALL);

state.php

while(true)
{
    $getMembers=$redis->client()->zRangeByScore("circuit_open","-inf",time(),['limit' => [0, 10]]);

    if(count($getMembers)>0)
    {
        foreach ($getMembers as $member){
            $redis->client()->zAdd("circuit",-1,$member);
        }

        // 别忘了删掉, 否则 会循环获取
        $redis->client()->zRem("circuit_open",...$getMembers); //使用了参数解包
        echo "set ".count($getMembers)." circuit halfopen".PHP_EOL;
    }

    usleep(500*1000);//休眠500毫秒
}

在半开状态下、随机执行 原函数和降级函数。当执行一定次数成功原函数熔断器恢复关状态。

if(rand(1,100)%2==0){
    echo "调用降级函数";
}else{
    echo  "调用原函数";
}

图

0为关3为开-3为半开

如下代码例子

CiruitBreaker.php

define("BreakerStateOpen",1); //开
define("BreakerStateClose",2); //关,这是默认值
define("BreakerStateHalfOpen",3); //半开
class CircuitBreaker{
    private $zSetKey="circuit"; //记录错误次数的key
    private $zSetKey_open="circuit_open"; //熔断器从开到半开的状态key
    public $failCount=3;//表示失败次数>=该值 则不再访问原函数

    public $openTime=15; //20秒后自动进入半开状态
    public function invoke(object $class,string $method,array $params,callable $fallback){
        global $redis;
        $member=get_class($class)."_".$method;
        $currentState=$this->getState($member);//获取当前状态,保存到变量中
        try{
            if($currentState==BreakerStateOpen)  //开状态直接调用降级函数
                return $fallback()."开状态";
            if($currentState==BreakerStateHalfOpen) //半开状态下 随机调用
            {
                if(rand(0,100)%2==0)
                    return $fallback()."半开状态";  //这里依然调用降级函数
                else
                {//下面调用了 真实函数
                    $result=$class->$method(...$params);
                    $redis->client()->zIncrBy($this->zSetKey,1,$member); //半开状态下依然要 计数器+1,目的是让它归零
                    return $result;
                }
            }

            $ret= $class->$method(...$params);
            return $ret;
        }
        catch (Throwable $ex){
            if($currentState==BreakerStateClose) //如果是关 状态
            {
                $score=$redis->client()->zIncrBy($this->zSetKey,1,$member);
                if($score>=$this->failCount){//进入了开状态,设置一个定时器,一段时间后进入半开状态
                    echo "<div>切换为开</div>";
                    $redis->client()->zAdd($this->zSetKey_open,time()+$this->openTime,$member);
                }
            }
            if($currentState==BreakerStateHalfOpen) //如果是半开 状态 下出现异常,则依然要设置定时器
            {
                $redis->client()->zAdd($this->zSetKey,$this->failCount,$member);//把计数器 设置为 failCount
                $redis->client()->zAdd($this->zSetKey_open,time()+$this->openTime,$member);
                echo "<div>从半开切换为开</div>";
            }
            return $fallback();
        }

    }
    private function getState($member) //判断是否失败,从此不再访问原函数
    {
        global $redis;
        $getScore=$redis->client()->zScore($this->zSetKey,$member);
        if($getScore>=$this->failCount) return BreakerStateOpen;//开状态
        if($getScore<0) return BreakerStateHalfOpen;//如果值小于0 则代表是半开状态

        //其他状态 或==0 则 是关闭状态
        return BreakerStateClose;
    }
}
function cbHandler(Exception $ex){
  throw new Exception($ex->getMessage());
}

set_error_handler("cbHandler",E_ALL);

swith.php循环监听

$failCount=3; //关闭状态 切换到 开状态 允许错误的次数
while(true)
{
    $getMembers=$redis->client()->zRangeByScore("circuit_open","-inf",time(),['limit' => [0, 10]]);

    if(count($getMembers)>0)
    {
        foreach ($getMembers as $member){
            //给负数-3
            $redis->client()->zAdd("circuit",-$failCount,$member);
        }

        // 别忘了删掉, 否则 会循环获取
        $redis->client()->zRem("circuit_open",...$getMembers); //使用了参数解包
        echo "set ".count($getMembers)." circuit halfopen".PHP_EOL;
    }

    usleep(500*1000);//休眠500毫秒
}

调用test.php

require "CircuitBreaker.php";

class myclass{
    public function test($str){
        //错误
      return file_get_contents("aaaa");
      //正确
        //    return "aaa";

    }
}
$c=new myclass();

echo (new CircuitBreaker())->invoke($c,"test",['bcd'],function(){ return "fallback";});

Redis中的Stream类型

文档地址:http://www.redis.cn/commands/xadd.html

XADD key ID field string [field string …]

id 如果指定的ID参数是字符*(星号ASCII字符),XADD命令会自动为您生成一个唯一的ID。 但是,也可以指定

xadd newUser * name qidong

XRANGE key start end [COUNT count]

-号为最小值 +号为最大值

127.0.0.1:6379> XRANGE newUsers - + count 1
1) 1) "1566528036661-0"
   2) 1) "userid"
      2) "101"
127.0.0.1:6379>

XREAD

语法:[COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]

读取一条

127.0.0.1:6379> xread count 1 streams newUsers 0
1) 1) "newUsers"
   2) 1) 1) "1566528036661-0"
         2) 1) "userid"
            2) "101"

还有阻塞读取 block后面跟毫秒级时间戳

127.0.0.1:6379> xread count 1  block  0 streams  newUsers  $
1) 1) "newUsers"
   2) 1) 1) "1566540305311-0"
         2) 1) "userid"
            2) "109"
(12.57s)

消费组

消息列队,不仅仅处理一个服务,所以有多个“组”,消费者进入组里进行消费。组是和stream 进行关联的。

xgroup 命令 来创建组

0从起始点消费 $从尾部开始消费

XGROUP CREATE mystream consumer-group-name $
XGROUP CREATE mystream consumer-group-name 0

添加

127.0.0.1:6379> xgroup create newUsers  users 0
OK

查看消息列队信息

图

查看组信息

127.0.0.1:6379> xinfo groups newUsers
1) 1) "name"
   2) "users"   //组名字
   3) "consumers"  //消费者数量、目前没有
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"  起始id
   8) "0-0"

消费者读取的话需要用xreadgroup命令

xreadgroup


xreadgroup group users qidong count 1 streams newUsers >
这代表 
1、读取group=users 、stream=newUsers 的数据,count是1
2、消费者 = qidong 
3、> 也是个特殊符号,好比就是取最新消息(没有被其他消费者消费过)

这时我们使用xinfo groups newUsers 可以看到消费者有1个

现在 在使用xinfo groups newUsers

127.0.0.1:6379> xinfo groups newUsers
1) 1) "name"
   2) "users"
   3) "consumers"
   4) (integer) 1
   5) "pending"   表示读取后待处理的数据个数
   6) (integer) 4
   7) "last-delivered-id"
   8) "1566540305311-0"
127.0.0.1:6379>

xreadgroup group users qidong count 10 streams newUsers

把>替换成0 就是读取pending里面去取

图

通过xack命令来通知redis服务器词条消息消费完成

127.0.0.1:6379> xack newUsers  users 1566540305311-0
(integer) 1
127.0.0.1:6379>

结果剩下3条

127.0.0.1:6379> xreadgroup group users qidong count 10 streams newUsers 0
1) 1) "newUsers"
   2) 1) 1) "1566528036661-0"
         2) 1) "userid"
            2) "101"
      2) 1) "1566528138448-0"
         2) 1) "userid"
            2) "102"
      3) 1) "1566528143051-0"
         2) 1) "userid"
            2) "103"

XPENDING 命令 去读取未处理的消息

127.0.0.1:6379> xreadgroup group users c1 count 1 streams newUsers >

xpending newUsers users - + 10
-最小值
+最大值
10为取出的数量
127.0.0.1:6379> xpending newUsers users - + 10
1) 1) "1566615221522-0"
   2) "qidong"   //消费者
   3) (integer) 1874657  //传递间隔时间  毫秒
   4) (integer) 1      //传递次数
2) 1) "1566615229619-0"
   2) "qidong"
   3) (integer) 1871599
   4) (integer) 1
3) 1) "1566615237473-0"
   2) "c1"    //消费者c1
   3) (integer) 8645
   4) (integer) 1
127.0.0.1:6379>

用xrange命令查看id的内容

 xrange newUsers 1566615221522-0 1566615221522-0
1) 1) "1566615221522-0"
   2) 1) "name"
      2) "qidong"

xclaim 消息转发、多个消费者其中之1有出现故障的、把消息转给其他

最基本的作用是可以把消息重新 分发给 消费者(可以指定)

XCLAIM newUser users c1 50000  1566615221522-0 

这代表把 :
1、消息闲置至少50000(毫秒)的消息,(没有被消费者xreadgroup)

2、id是1566615221522-0

3、把消息的所有者 设置为c1(也可以为别的消费者)

然后使用命令后在查看未消费信息的时候、发现时间间隔啥的变短了

127.0.0.1:6379> XCLAIM newUsers users c1 50000 1566615221522-0
1) 1) "1566615221522-0"
   2) 1) "name"
      2) "qidong"
127.0.0.1:6379> xpending newUsers users - + 10
1) 1) "1566615221522-0"
   2) "c1"
   3) (integer) 27806 //传递时间变少了
   4) (integer) 2  //传递次数递增了一次
2) 1) "1566615229619-0"
   2) "qidong"
   3) (integer) 2073985
   4) (integer) 1
3) 1) "1566615237473-0"
   2) "c1"
   3) (integer) 211031
   4) (integer) 1
最后修改:2020 年 02 月 10 日
如果觉得我的文章对你有用,请随意赞赏