安装环境
mac上可以先安装一个cli端。然后下载docker并运行。
如果没有配置文件、可以先启动容器并且进入容器cp出来
docker pull redis:5.0.4-alpine
#redis本地新建一个redis的配置目录/Users/apple/conf/
cp /usr/local/etc/redis.conf /Users/apple/conf/
docker run -v /Users/apple/conf/:/usr/local/etc/redis/ -d --rm -p 6379:6379 --name myredis redis:5.0.4-alpine redis-server或者使用
docker run -v /Users/apple/conf/:/usr/local/etc/redis/ -d --rm -p 6379:6379 --name myredis redis:5.0.4-alpine sh -c"redis-server /usr/local/etc/redis/redis.conf"查看版本
redis-cli -h 127.0.0.1 info | grep 'redis_version'
#显示里面的版本
redis_version:5.0.4商品服务举例图

先进入的先出。
Redis 列表(List)
lrange key 0 -1 获取列表左右元素
命令:LPUSH 、将一个或多个值插入到列表头部
语法格式:LPUSH key value1 [value2]
➜ vagrant_openresty_swoole redis-cli -h 127.0.0.1
127.0.0.1:6379> lpush order id001 id002
(integer) 2
127.0.0.1:6379> lrange order 0 -1
1) “id002”
2) “id001”
127.0.0.1:6379>命令: RPOP 移出并获取列表的第一个元素
语法格式:RPOP key
127.0.0.1:6379> rpop order
“id001”
127.0.0.1:6379> lrange order 0 -1
1) “id002”
127.0.0.1:6379>
命令:BRPOP
移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
语法格式:BRPOP key1 [key2 ] timeout

启动N个服务、死循环同事取列队的订单信息
function start(redis $redis_client)
{
while(true) {
//如果没有数据则阻塞10后运行
$res = $redis_client->brPop(["orders"],10);
if($res && $res[0])
{
if($res[1]==="pn002") sleep(5);
echo "order_no=".$res[1]." done".PHP_EOL;
usleep(500*1000); //休眠500毫秒
echo "restart ".PHP_EOL;
}
else
continue;
}
}如果出现断电等是事情、那么会存在丢失数据的行为。可以在设置个备份list来解决、
例子:N1死循环服务、取出一个数据、放到N1备份list里面

用到的命令:Brpoplpush
Brpoplpush 命令从列表中取出最后一个元素,并插入到另外一个列表的头部; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
代码例子:
function start(redis $redis_client)
{
global $myid;
//先处理 上次没有处理的 key
echo "find baklist".PHP_EOL;
while(true) // 应该使用swoole 等 开一个异步任务或者进程,专门对自己的 备份list做长期监听
{
$bak_res = $redis_client->brPop(["orders".$myid],1); //从自己的备份队列中获取 ,过期时间设置短一些
if($bak_res && $bak_res[0])
{
doJob($bak_res[1],$redis_client,true); //是自定义处理函数
usleep(500*1000); //休眠500毫秒
}
else //如果没取到值 说明 备份队列中 木有 了,则跳出循环
break;
}
echo "baklist done".PHP_EOL;
echo "begin orders ".PHP_EOL;
//接下来开始继续
while(true) {
$res = $redis_client->brpoplpush("orders","orders".$myid,10); //注意,这个函数直接返回的是值
if($res)
{
doJob($res,$redis_client); //是处理函数
usleep(500*1000); //休眠500毫秒
}
else
continue;
}
}
function doJob($orderNo,redis $redis_client,$isBak=false) //isBak决定了 处理过程是否是 备份队列处理 ,如果是 有些步骤不需要执行
{
global $myid;
try{
if($orderNo=="pn023") throw new MyException("error");
sleep(3); //假装 干的很耗时,很辛苦
if($isBak) //如果是 备份队列处理,显示不一样的字符,仅此而已
echo "backjob order_no=";
else
echo "order_no=";
echo $orderNo." done".PHP_EOL;
if(!$isBak)
$redis_client->lPop("orders".$myid); //注意这里,处理完后,要删掉 备份列表左边的第一个元素
}
catch (MyException $myException)
{
if(!$isBak){
//这里要判断 是什么类型的exception ,譬如超时等才能塞回去,这个机制是自己定的
$redis_client->rPush("orders",$orderNo);//塞回原队列
$redis_client->lPop("orders".$myid);
echo "push back ".PHP_EOL;
sleep(3);//休眠3秒,让其他 死循环程序来获取
}
}
catch (Exception $ex)
{
echo "err".$ex->getMessage().PHP_EOL;
}
}延迟列队? 比方说新闻的延迟发布
利用redis的有序集合(sorted set) 用分值来存时间戳、进行排序
用zadd添加 、可以是一张新闻表的id也和是表和id组合取出来进行切割
127.0.0.1:6379> ZREVRANGEBYSCORE newsList +inf -inf WITHSCORES
1) "question_news@1"
2) "4"
3) "article_news@2"
4) "2"
5) "product_news@1"
6) "1"
127.0.0.1:6379> zadd newsList 20 question_news@3
127.0.0.1:6379> ZREVRANGEBYSCORE newsList +inf -inf
1) "question_news@3"
2) "question_news@1"
3) "article_news@2"
4) "product_news@1"ZREVRANGEBYSCORE newsList +inf -inf 倒序排列
PHP代码可以这样写
$redis->client()->zRangeByScore("newsList","-inf",time(),['limit'=>[0,10]]);
也可以做订单指定时间未支付关闭订单
结合列队实现熔断器
条件、适用于php-fpm下

熔断器是对电器起到了保护作用。

熔断器的功能、常用语rpc服务。
答:会、因为程序不是人脑,没有那么灵活,除非修改源码。
熔断器上场
- 主程序不直接调 A,而是让熔断器调用A
- A如果有异常,熔断器记录
- 超过一定次数,则熔断器打开,下次则调用备用函数

代码例子
class CircuitBreaker{
private $zSetKey="circuit";
public function invoke(object $class,string $method,array $params,callable $fallback){
global $redis;
try{
return $class->$method(...$params);
}
catch (Throwable $ex){
$member=get_class($class)."_".$method;
$redis->client()->zIncrBy($this->zSetKey,1,$member);
return $fallback();//函数降级
}
}
}
function cbHandler(Exception $ex){
throw new Exception($ex->getMessage());
}
set_error_handler("cbHandler",E_ALL);
class myclass{
public function test($str){
return file_get_contents("aaaa");
}
}
$c=new myclass();
//echo $c->test("abc");
echo (new CircuitBreaker())->invoke($c,"test",['bcd'],function(){ return "fallback";});整体的原理、是代理模式。外面包裹一个try_catch运行。错误进行捕获。redis进行计数。
更新到达一定的次数、直接调用降级函数。
熔断器的三种状态
1、关。调用原函数,失败后计数器+1,并返回降级函数,到达一定阀值后,进入”开”状态
2、开。也就是直接返回降级函数,不调用原函数,但是我们也要给失败函数一些机会,于是出现搬开状态,不过需要设定一个定时器,一定时间后才进入半开状态
3、搬开状态。处理搬开状态是熔断器的核心、1、开状态下、一定时间后,自动进入半开状态。2、调用函数时 有一定数量的调用是进入的降级函数,有一定数量进入的是真实函数调用
半开状态、简单代码例子
define("BreakerStateOpen",1); //开
define("BreakerStateClose",2); //关,这是默认值
define("BreakerStateHalfOpen",3); //半开
class CircuitBreaker{
private $zSetKey="circuit"; //记录错误次数的key
private $zSetKey_open="circuit_open"; //熔断器从开到半开的状态key
public $failCount=3;//表示失败次数>=该值 则不再访问原函数
public $openTime=20; //20秒后自动进入半开状态
public function invoke(object $class,string $method,array $params,callable $fallback){
global $redis;
$member=get_class($class)."_".$method;
try{
if($this->getState($member)==BreakerStateOpen)
return $fallback()."开状态";
if($this->getState($member)==BreakerStateHalfOpen)
return $fallback()."半开状态";
return $class->$method(...$params);
}
catch (Throwable $ex){
$score=$redis->client()->zIncrBy($this->zSetKey,1,$member);
if($score>=$this->failCount){//进入了开状态,设置一个定时器,进入半开状态
$redis->client()->zAdd($this->zSetKey_open,time()+$this->openTime,$member);
}
return $fallback();//函数降级
}
}
private function getState($member) //判断是否失败,从此不再访问原函数
{
global $redis;
$getScore=$redis->client()->zScore($this->zSetKey,$member);
if($getScore>=$this->failCount) return BreakerStateOpen;//开状态
if($getScore==-1) return BreakerStateHalfOpen;//如果值是-1 则代表是半开状态
return BreakerStateClose;
}
}
function cbHandler(Exception $ex){
throw new Exception($ex->getMessage());
}
class myclass{
public function test($str){
return file_get_contents("aaaa");
}
}
$c=new myclass();
//echo $c->test("abc");
echo (new CircuitBreaker())->invoke($c,"test",['bcd'],function(){ return "fallback";});
set_error_handler("cbHandler",E_ALL);
state.php
while(true)
{
$getMembers=$redis->client()->zRangeByScore("circuit_open","-inf",time(),['limit' => [0, 10]]);
if(count($getMembers)>0)
{
foreach ($getMembers as $member){
$redis->client()->zAdd("circuit",-1,$member);
}
// 别忘了删掉, 否则 会循环获取
$redis->client()->zRem("circuit_open",...$getMembers); //使用了参数解包
echo "set ".count($getMembers)." circuit halfopen".PHP_EOL;
}
usleep(500*1000);//休眠500毫秒
}在半开状态下、随机执行 原函数和降级函数。当执行一定次数成功原函数熔断器恢复关状态。
if(rand(1,100)%2==0){
echo "调用降级函数";
}else{
echo "调用原函数";
}
0为关3为开-3为半开
如下代码例子
CiruitBreaker.php
define("BreakerStateOpen",1); //开
define("BreakerStateClose",2); //关,这是默认值
define("BreakerStateHalfOpen",3); //半开
class CircuitBreaker{
private $zSetKey="circuit"; //记录错误次数的key
private $zSetKey_open="circuit_open"; //熔断器从开到半开的状态key
public $failCount=3;//表示失败次数>=该值 则不再访问原函数
public $openTime=15; //20秒后自动进入半开状态
public function invoke(object $class,string $method,array $params,callable $fallback){
global $redis;
$member=get_class($class)."_".$method;
$currentState=$this->getState($member);//获取当前状态,保存到变量中
try{
if($currentState==BreakerStateOpen) //开状态直接调用降级函数
return $fallback()."开状态";
if($currentState==BreakerStateHalfOpen) //半开状态下 随机调用
{
if(rand(0,100)%2==0)
return $fallback()."半开状态"; //这里依然调用降级函数
else
{//下面调用了 真实函数
$result=$class->$method(...$params);
$redis->client()->zIncrBy($this->zSetKey,1,$member); //半开状态下依然要 计数器+1,目的是让它归零
return $result;
}
}
$ret= $class->$method(...$params);
return $ret;
}
catch (Throwable $ex){
if($currentState==BreakerStateClose) //如果是关 状态
{
$score=$redis->client()->zIncrBy($this->zSetKey,1,$member);
if($score>=$this->failCount){//进入了开状态,设置一个定时器,一段时间后进入半开状态
echo "<div>切换为开</div>";
$redis->client()->zAdd($this->zSetKey_open,time()+$this->openTime,$member);
}
}
if($currentState==BreakerStateHalfOpen) //如果是半开 状态 下出现异常,则依然要设置定时器
{
$redis->client()->zAdd($this->zSetKey,$this->failCount,$member);//把计数器 设置为 failCount
$redis->client()->zAdd($this->zSetKey_open,time()+$this->openTime,$member);
echo "<div>从半开切换为开</div>";
}
return $fallback();
}
}
private function getState($member) //判断是否失败,从此不再访问原函数
{
global $redis;
$getScore=$redis->client()->zScore($this->zSetKey,$member);
if($getScore>=$this->failCount) return BreakerStateOpen;//开状态
if($getScore<0) return BreakerStateHalfOpen;//如果值小于0 则代表是半开状态
//其他状态 或==0 则 是关闭状态
return BreakerStateClose;
}
}
function cbHandler(Exception $ex){
throw new Exception($ex->getMessage());
}
set_error_handler("cbHandler",E_ALL);swith.php循环监听
$failCount=3; //关闭状态 切换到 开状态 允许错误的次数
while(true)
{
$getMembers=$redis->client()->zRangeByScore("circuit_open","-inf",time(),['limit' => [0, 10]]);
if(count($getMembers)>0)
{
foreach ($getMembers as $member){
//给负数-3
$redis->client()->zAdd("circuit",-$failCount,$member);
}
// 别忘了删掉, 否则 会循环获取
$redis->client()->zRem("circuit_open",...$getMembers); //使用了参数解包
echo "set ".count($getMembers)." circuit halfopen".PHP_EOL;
}
usleep(500*1000);//休眠500毫秒
}调用test.php
require "CircuitBreaker.php";
class myclass{
public function test($str){
//错误
return file_get_contents("aaaa");
//正确
// return "aaa";
}
}
$c=new myclass();
echo (new CircuitBreaker())->invoke($c,"test",['bcd'],function(){ return "fallback";});Redis中的Stream类型
文档地址:http://www.redis.cn/commands/xadd.html
XADD key ID field string [field string …]
id 如果指定的ID参数是字符*(星号ASCII字符),XADD命令会自动为您生成一个唯一的ID。 但是,也可以指定
xadd newUser * name qidong
XRANGE key start end [COUNT count]
-号为最小值 +号为最大值
127.0.0.1:6379> XRANGE newUsers - + count 1
1) 1) "1566528036661-0"
2) 1) "userid"
2) "101"
127.0.0.1:6379>XREAD
语法:[COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]
读取一条
127.0.0.1:6379> xread count 1 streams newUsers 0
1) 1) "newUsers"
2) 1) 1) "1566528036661-0"
2) 1) "userid"
2) "101"还有阻塞读取 block后面跟毫秒级时间戳
127.0.0.1:6379> xread count 1 block 0 streams newUsers $
1) 1) "newUsers"
2) 1) 1) "1566540305311-0"
2) 1) "userid"
2) "109"
(12.57s)消费组
消息列队,不仅仅处理一个服务,所以有多个“组”,消费者进入组里进行消费。组是和stream 进行关联的。
xgroup 命令 来创建组
0从起始点消费 $从尾部开始消费
XGROUP CREATE mystream consumer-group-name $
XGROUP CREATE mystream consumer-group-name 0添加
127.0.0.1:6379> xgroup create newUsers users 0
OK查看消息列队信息

查看组信息
127.0.0.1:6379> xinfo groups newUsers
1) 1) "name"
2) "users" //组名字
3) "consumers" //消费者数量、目前没有
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id" 起始id
8) "0-0"消费者读取的话需要用xreadgroup命令
xreadgroup
xreadgroup group users qidong count 1 streams newUsers >
这代表
1、读取group=users 、stream=newUsers 的数据,count是1
2、消费者 = qidong
3、> 也是个特殊符号,好比就是取最新消息(没有被其他消费者消费过)
这时我们使用xinfo groups newUsers 可以看到消费者有1个现在 在使用xinfo groups newUsers
127.0.0.1:6379> xinfo groups newUsers
1) 1) "name"
2) "users"
3) "consumers"
4) (integer) 1
5) "pending" 表示读取后待处理的数据个数
6) (integer) 4
7) "last-delivered-id"
8) "1566540305311-0"
127.0.0.1:6379>xreadgroup group users qidong count 10 streams newUsers
把>替换成0 就是读取pending里面去取

通过xack命令来通知redis服务器词条消息消费完成
127.0.0.1:6379> xack newUsers users 1566540305311-0
(integer) 1
127.0.0.1:6379>结果剩下3条
127.0.0.1:6379> xreadgroup group users qidong count 10 streams newUsers 0
1) 1) "newUsers"
2) 1) 1) "1566528036661-0"
2) 1) "userid"
2) "101"
2) 1) "1566528138448-0"
2) 1) "userid"
2) "102"
3) 1) "1566528143051-0"
2) 1) "userid"
2) "103"XPENDING 命令 去读取未处理的消息
127.0.0.1:6379> xreadgroup group users c1 count 1 streams newUsers >
xpending newUsers users - + 10
-最小值
+最大值
10为取出的数量
127.0.0.1:6379> xpending newUsers users - + 10
1) 1) "1566615221522-0"
2) "qidong" //消费者
3) (integer) 1874657 //传递间隔时间 毫秒
4) (integer) 1 //传递次数
2) 1) "1566615229619-0"
2) "qidong"
3) (integer) 1871599
4) (integer) 1
3) 1) "1566615237473-0"
2) "c1" //消费者c1
3) (integer) 8645
4) (integer) 1
127.0.0.1:6379>用xrange命令查看id的内容
xrange newUsers 1566615221522-0 1566615221522-0
1) 1) "1566615221522-0"
2) 1) "name"
2) "qidong"xclaim 消息转发、多个消费者其中之1有出现故障的、把消息转给其他
最基本的作用是可以把消息重新 分发给 消费者(可以指定)
XCLAIM newUser users c1 50000 1566615221522-0
这代表把 :
1、消息闲置至少50000(毫秒)的消息,(没有被消费者xreadgroup)
2、id是1566615221522-0
3、把消息的所有者 设置为c1(也可以为别的消费者)
然后使用命令后在查看未消费信息的时候、发现时间间隔啥的变短了
127.0.0.1:6379> XCLAIM newUsers users c1 50000 1566615221522-0
1) 1) "1566615221522-0"
2) 1) "name"
2) "qidong"
127.0.0.1:6379> xpending newUsers users - + 10
1) 1) "1566615221522-0"
2) "c1"
3) (integer) 27806 //传递时间变少了
4) (integer) 2 //传递次数递增了一次
2) 1) "1566615229619-0"
2) "qidong"
3) (integer) 2073985
4) (integer) 1
3) 1) "1566615237473-0"
2) "c1"
3) (integer) 211031
4) (integer) 1