thinkphp6+redis stream消息队列

2023-05-12541

1、首先在\vendor\topthink\framework\src\think\cache\driver\Redis.php的class Redis里面增加如下代码。


/**

     * stream 操作相关

     *Parameters [5] {

        Parameter #0 [ <required> $str_key ]

        Parameter #1 [ <required> $str_id ]

        Parameter #2 [ <required> array $arr_fields ]

        Parameter #3 [ <optional> $i_maxlen ]

        Parameter #4 [ <optional> $boo_approximate ]

     */

    public function xadd($key,$arr_fields,$str_id="*",$i_maxlen=0,$boo_approximate = null){

        $stime = microtime(true);

        for ($retry=0; $retry<2; $retry++){ //重试两次

            try {

                $res = $this->connect('xadd', $key) && $this->oRedis->xadd($key, $str_id, $arr_fields,$i_maxlen,$boo_approximate) ? true : false; 

                break;

            }catch (RedisException $e){

                $this->close(); //显式关闭,强制重连

                $retry && $this->errorlog('xadd', $e->getCode(), $e->getMessage(), false, 'redis_error.txt');

            }

        }

        $etime = microtime(true);

        //大于1秒

        if($etime - $stime >= 1){

            $runTime = $etime - $stime;

            $dateTime = date('m-d H:i:s');

            $uri = (isset( $_SERVER['REQUEST_URI']) ? $_SERVER['REQUEST_URI'] : ''). '--';

            $error = "[{$dateTime}] > xadd time:{$runTime}; key:{$key}; uri:{$uri}";

        }

        return $res;

    }

    

    /**

     * 此命令返回流中满足给定ID范围的条目 FIFO

     * @param string $key stream key

     * @param string $start 最小ID

     * @param string $end 最大ID

     * @param int $count 返回指定条数

     */

    public function xrange($key, $start="-", $end="+", $i_count = 0){

        $stime = microtime(true);

        for ($retry=0; $retry<2; $retry++){ //重试两次

            try {

                if($i_count > 0){

                    $res = $this->connect('xrange', $key) && ($result = $this->oRedis->xrange($key, $start, $end, $i_count)) ? $result : array();

                } else {

                    $res = $this->connect('xrange', $key) && ($result = $this->oRedis->xrange($key, $start, $end)) ? $result : array();    

                }                

                break;

            }catch (RedisException $e){

                $this->close(); //显式关闭,强制重连

                $retry && $this->errorlog('xrange', $e->getCode(), $e->getMessage(), false, 'redis_error.txt');

            }

        }

        $etime = microtime(true);

        //大于1秒

        if($etime - $stime >= 1){

            $runTime = $etime - $stime;

            $dateTime = date('m-d H:i:s');

            $uri = (isset( $_SERVER['REQUEST_URI']) ? $_SERVER['REQUEST_URI'] : ''). '--';

            $error = $this->aServer[0] . ':' .$this->aServer[1] . "[{$dateTime}] > xrange time:{$runTime}; key:{$key}; uri:{$uri}";

        }

        return $res;

    }

    

    /**

     * 此命令返回流中满足给定ID范围的条目,按条目降序返回FILO

     * @param string $key stream key

     * @param string $start 最大ID

     * @param string $end 最小ID

     * @param int $count 返回指定条数

     */

    public function xrevrange($key, $start="+", $end="-", $i_count = 0){

        $stime = microtime(true);

        for ($retry=0; $retry<2; $retry++){ //重试两次

            try {

                if($i_count > 0){

                    $res = $this->connect('xrevrange', $key) && ($result = $this->oRedis->xrevrange($key, $start, $end, $i_count)) ? $result : array();

                } else {

                    $res = $this->connect('xrevrange', $key) && ($result = $this->oRedis->xrevrange($key, $start, $end)) ? $result : array();    

                }                

                break;

            }catch (RedisException $e){

                $this->close(); //显式关闭,强制重连

                $retry && $this->errorlog('xrevrange', $e->getCode(), $e->getMessage(), false, 'redis_error.txt');

            }

        }

        $etime = microtime(true);

        //大于1秒

        if($etime - $stime >= 1){

            $runTime = $etime - $stime;

            $dateTime = date('m-d H:i:s');

            $uri = (isset( $_SERVER['REQUEST_URI']) ? $_SERVER['REQUEST_URI'] : ''). '--';

            $error = $this->aServer[0] . ':' .$this->aServer[1] . "[{$dateTime}] > xrevrange time:{$runTime}; key:{$key}; uri:{$uri}";

        }

        return $res;

    }

    

    /**

     * 从一个或者多个流中读取数据,仅返回ID大于调用者报告的最后接收ID的条目。此命令有一个阻塞选项,用于等待可用的项目,类似于BRPOP或者BZPOPMIN

     *    Parameters [3] {

     *  Parameter #0 [ <required> array $arr_streams ] ['stream1'=>0,'stream2'=>0]

     *  Parameter #1 [ <optional> $i_count ]

     *  Parameter #2 [ <optional> $i_block ]

     */

    public function xread(array $arr_streams, $i_count = 1, $i_block = null){

        $stime = microtime(true);

        for ($retry=0; $retry<2; $retry++){ //重试两次

            try {

                $res = $this->connect() && ($result = $this->oRedis->xread($arr_streams,$i_count)) ? $result : array();    

                break;

            }catch (RedisException $e){

                $this->close(); //显式关闭,强制重连

                $retry && $this->errorlog('xread', $e->getCode(), $e->getMessage(), false, 'redis_error.txt');

            }

        }

        $etime = microtime(true);

        //大于1秒

        if($etime - $stime >= 1){

            $runTime = $etime - $stime;

            $dateTime = date('m-d H:i:s');

            $uri = (isset( $_SERVER['REQUEST_URI']) ? $_SERVER['REQUEST_URI'] : ''). '--';

            $error = $this->aServer[0] . ':' .$this->aServer[1] . "[{$dateTime}] > xread time:{$runTime}; key:{$key}; uri:{$uri}";

        }

        return $res;

    }

    

    /**

     * 返回流中的条目数

     * @param string $key

     */

    public function xlen($key){

        return $this->connect('xlen', $key) ? $this->oRedis->xlen( $key) : 0;

    }

    

    /**

     * 返回stream group consumers中的信息流统计

     * Parameters [3] {

        Parameter #0 [ <required> $str_cmd ] GROUPS key|STREAM key|CONSUMERS key groupname

        Parameter #1 [ <optional> $str_key ]

        Parameter #2 [ <optional> $str_group ]

     */

    public function xinfo($str_cmd, $str_key, $str_group = null){        

        if(!empty($str_group)){

            for ($retry=0; $retry<2; $retry++){ //重试两次

                try {

                    $res = $this->connect() && ($result = $this->oRedis->xinfo($str_cmd,$str_key,$str_group)) ? $result : array();

                    break;

                }catch (RedisException $e){

                    $this->close(); //显式关闭,强制重连

                    $retry && $this->errorlog('xinfo', $e->getCode(), $e->getMessage(), false, 'redis_error.txt');

                }

            }

        }else{

            for ($retry=0; $retry<2; $retry++){ //重试两次

                try {

                    $res = $this->connect() && ($result = $this->oRedis->xinfo($str_cmd,$str_key)) ? $result : array();    

                    break;

                }catch (RedisException $e){

                    $this->close(); //显式关闭,强制重连

                    $retry && $this->errorlog('xinfo', $e->getCode(), $e->getMessage(), false, 'redis_error.txt');

                }

            }

        }

        

        return $res;

    }

    

    /**

     * 该命令用于管理流数据结构关联的消费者组。使用XGROUP你可以:


        创建与流关联的新消费者组。

        销毁一个消费者组。

        从消费者组中移除指定的消费者。

        将消费者组的最后交付ID设置为其他内容

        Parameters [5] {

        Parameter #0 [ <required> $str_operation ] CREATE|DESTROY|SETID|DELCONSUMER

        Parameter #1 [ <optional> $str_key ]

        Parameter #2 [ <optional> $str_arg1 ]

        Parameter #3 [ <optional> $str_arg2 ]

        Parameter #4 [ <optional> $str_arg3 ]

     */

    public function xgroup($str_operation, $str_key, $str_arg1 = null, $str_arg2 = null, $str_arg3 = null){

        switch($str_operation){

            case "CREATE":    //创建消费组 命令,streamkey,组名称,从哪开始消费

                $flag = $this->connect() ? $this->oRedis->xgroup( $str_operation,$str_key,$str_arg1,$str_arg2) : false;

                break;

            case "DESTROY":    //删除消费组 命令,streamkey,组名称

                $flag = $this->connect() ? $this->oRedis->xgroup( $str_operation,$str_key,$str_arg1) : false;

                break;

            case "SETID":    //设置传递下一个消息的ID 命令,streamkey,条目ID

                $flag =  $this->connect() ? $this->oRedis->xgroup( $str_operation,$str_key,$str_arg1) : false;

                break;

            case "DELCONSUMER":    //删除消费者 命令,streamkey,groupname,consumername

                $flag = $this->connect() ? $this->oRedis->xgroup( $str_operation,$str_key,$str_arg1,$str_arg2) : false;

                break;

            default:

                $flag = false;

                break;

        }

        return $flag;

    }

    

    /**

     * 删除流数据中的条目

     * @param string $key

     * @param array $arr_ids

     */

    public function xdel($key, array $arr_ids){

        return $this->connect('xdel', $key) ? $this->oRedis->xdel( $key, $arr_ids) : 0;

    }

    

    /**

     * XACK命令用于从流的消费者组的待处理条目列表(简称PEL)中删除一条或多条消息

     * Parameters [3] {

        Parameter #0 [ <required> $str_key ]

        Parameter #1 [ <required> $str_group ]

        Parameter #2 [ <required> array $arr_ids ]

     * 成功返回确认的消息数

     */

    public function xack($key, $str_group, array $arr_ids){

        return $this->connect('xack', $key) ? $this->oRedis->xack( $key, $str_group, $arr_ids) : 0;

    }

    

    /**

     * XREADGROUP命令是XREAD命令的特殊版本,支持消费者组

     * Parameters [5] {

        Parameter #0 [ <required> $str_group ]            groupname

        Parameter #1 [ <required> $str_consumer ]        consumername

        Parameter #2 [ <required> array $arr_streams ]    arr_streams ['stream'=>'>']

        Parameter #3 [ <optional> $i_count ]            读多少条

        Parameter #4 [ <optional> $i_block ]            是否阻塞读取

     */

    public function xreadgroup($str_group, $str_consumer, array $arr_streams, $i_count = 1, $i_block = null){

        $stime = microtime(true);

        for ($retry=0; $retry<2; $retry++){ //重试两次

            try {

                $res = $this->connect() && ($result = $this->oRedis->xreadgroup($str_group, $str_consumer, $arr_streams, $i_count)) ? $result : array();    

                break;

            }catch (RedisException $e){

                $this->close(); //显式关闭,强制重连

                $retry && $this->errorlog('xreadgroup', $e->getCode(), $e->getMessage(), false, 'redis_error.txt');

            }

        }

        $etime = microtime(true);

        //大于1秒

        if($etime - $stime >= 1){

            $runTime = $etime - $stime;

            $dateTime = date('m-d H:i:s');

            $uri = (isset( $_SERVER['REQUEST_URI']) ? $_SERVER['REQUEST_URI'] : ''). '--';

            $error = $this->aServer[0] . ':' .$this->aServer[1] . "[{$dateTime}] > xreadgroup time:{$runTime}; key:{$key}; uri:{$uri}";

        }

        return $res;

    }

    

    /**

     * 通过消费者组从流中获取数据,而不是确认这些数据,具有创建待处理条目的效果

     * Parameters [6] {

        Parameter #0 [ <required> $str_key ]

        Parameter #1 [ <required> $str_group ]

        Parameter #2 [ <optional> $str_start ]

        Parameter #3 [ <optional> $str_end ]

        Parameter #4 [ <optional> $i_count ]

        Parameter #5 [ <optional> $str_consumer ]

        $redis->xPending('mystream', 'mygroup', '-', '+', 1, 'consumer-1')

     */

    public function xpending($key, $str_group, $str_start = '-', $str_end = '+', $i_count = 1, $str_consumer=null){

        if($str_consumer){

            return $this->connect() ? $this->oRedis->xpending( $key, $str_group,$str_start,$str_end,$i_count,$str_consumer) : [];    

        }else{

            return $this->connect() ? $this->oRedis->xpending( $key, $str_group, $str_start, $str_end, $i_count) : [];

        }        

    }

    

    /**

     * 把某些条目给认领给某个消费者

     * Parameters [6] {

        Parameter #0 [ <required> $str_key ]         streamkey

        Parameter #1 [ <required> $str_group ]        groupname

        Parameter #2 [ <required> $str_consumer ]    consumername

        Parameter #3 [ <required> $i_min_idle ]        空闲时间

        Parameter #4 [ <required> array $arr_ids ]    条目IDS

        Parameter #5 [ <optional> array $arr_opts ] [IDLE|TIME|RETRYCOUNT|FORCE|JUSTID]

     */

    public function xclaim($key, $str_group, $str_consumer, $i_min_idle, array $arr_ids, array $arr_opts = null){

        return $this->connect() ? $this->oRedis->xtrim( $key, $str_group,$str_consumer,$i_min_idle,$arr_ids) : 0;

    }

    

    /**

     * XTRIM将流裁剪为指定数量的项目

     *Parameters [3] {

        Parameter #0 [ <required> $str_key ]

        Parameter #1 [ <required> $i_maxlen ]

        Parameter #2 [ <optional> $boo_approximate ]

     */

    public function xtrim($key, $i_maxlen, $boo_approximate = null){

        return $this->connect('xtrim', $key) ? $this->oRedis->xtrim( $key, $i_maxlen) : 0;

    }


2、在\config\cache.php里配置好redis的端口和密码


比如


// 更多的缓存连接

        // 配置Reids

        'redis'    =>    [

            'type'     => 'redis',

            'host'     => '127.0.0.1',

            'port'     => '1025',

            'password' => 'XXXXXX',

            'select'   => '0',

            // 全局缓存有效期(0为永久有效)

            'expire'   => 0,

            // 缓存前缀

            'prefix'   => '',

            'timeout'  => 0,

        ],


3、在业务的Controller里增加use think\facade\Config; use think\cache\driver\Redis;


4、在具体业务里


$redis = new Redis(Config::get('cache.stores.redis'));

        //xadd:追加消息

        //xdel:删除消息,删除标志位,不影响消息总长度

        //xrange:消息列表,过滤已删除的消息

        //xlen:消息长度

        //del: 删除所有消息

    

        $redis->rawCommand('del','xiaoxi');


        // 星号表示自动生成id,后面参数key,value

        $redis->rawCommand('xadd','xiaoxi','*','name','sail','age','168');


//消息长度        


$res = $redis->rawCommand('xlen','xiaoxi');


//消息列表


$res = $redis->rawCommand('xrange','xiaoxi','-','+');


//从stream中头部读取1条消息

  $res = $redis->rawCommand('xread','count','1','streams','xiaoxi',0);


//创建消费组xxGroup  为消息队列 xiaoxi 从第一条开始消费

 $redis->rawCommand('xgroup','create','xiaoxi','xxGroup','0');


//A,消费第1条

        $redis->rawCommand('xreadgroup','group', 'xxGroup', 'A', 'count', '1' ,'streams', 'xiaoxi', '>');


//获取strarm信息

        $res = $redis->rawCommand('xinfo','stream','xiaoxi');


//获取strarm消费组信息

        $res = $redis->rawCommand('xinfo','groups','xiaoxi');


//同一个消费组有多个消费者,观察每个消费者的状态

        $res = $redis->rawCommand('xinfo','consumers','xiaoxi','xxGroup');


//xxGroup的Pending等待列表情况  + 0 10

        //使用 -:start +:end 10:count 选项可以获取详细信息

        //$res = $redis->rawCommand('xpending','xiaoxi','xxGroup');

        //$res = $redis->rawCommand('xpending','xiaoxi','xxGroup','-','+','10');

        $res = $redis->rawCommand('xpending','xiaoxi','xxGroup','-','+','10','A');


//通知消息处理结束,用消息ID标识

        $msg_id = $res[0][0];

        $res = $redis->rawCommand('xack','xiaoxi','xxGroup',$msg_id);