今天来分享下个人搭建IM的心得

GatewayWorker基于Workerman开发的一个项目框架,用于快速开发TCP长连接应用,例如app推送服务端、即时IM服务端、游戏服务端、物联网、智能家居等等

GatewayWorker使用经典的Gateway和Worker进程模型。Gateway进程负责维持客户端连接,并转发客户端的数据给BusinessWorker进程处理,BusinessWorker进程负责处理实际的业务逻辑(默认调用Events.php处理业务),并将结果推送给对应的客户端。Gateway服务和BusinessWorker服务可以分开部署在不同的服务器上,实现分布式集群。

GatewayWorker提供非常方便的API,可以全局广播数据、可以向某个群体广播数据、也可以向某个特定客户端推送数据。配合Workerman的定时器,也可以定时推送数据。

第一步

composer安装:

composer require workerman/gateway-worker

(可选,用于服务端主动推送消息)

composer require workerman/gatewayclient

(可选,用于连接数据库)

composer require workerman/mysql

(可选,用于连接redis)

composer require workerman/redis

第二步

删掉start.php中的require_once DIR . ‘/vendor/autoload.php’;

原因是tp有自己的自动加载机制,貌似在集成composer自动加载机制时有BUG

第三步

在start_gateway.php中创建wss服务,因为小程序要用websocket必须是wss:
假设我的证书目录是统一放在tp根目录,start_gateway.php都放在application里的workerman里
记得把原来的tcp链接要改掉!不然bug

// 证书最好是申请的证书
$context = array(
    // 更多ssl选项请参考手册 http://php.net/manual/zh/context.ssl.php
    'ssl' => array(
        // 请使用绝对路径
        'local_cert'                 => __DIR__.'/../../certificate/websocket/wss.pem', // 也可以是crt文件
        'local_pk'                   => __DIR__.'/../../certificate/websocket/wss.key',
        'verify_peer'               => false,
        // 'allow_self_signed' => true, //如果是自签名证书需要开启此选项
    )
);
// websocket协议(端口任意,只要没有被其它程序占用就行)
$gateway = new Gateway("websocket://0.0.0.0:443", $context);
// 开启SSL,websocket+SSL 即wss
$gateway->transport = 'ssl';

基本环境已经配置好了,现在我们可以在前端先测试一下

// 证书是会检查域名的,请使用域名连接
ws = new WebSocket("wss://test.com:443");
ws.onopen = function() {
    console.log("连接成功");
    ws.send('tom');
    console.log("给服务端发送一个字符串:tom");
};
ws.onmessage = function(e) {
    console.log("收到服务端的消息:" + e.data);
};
ws.onerror =function(e){
    console.log(e);
}

第四步

接下来开始在events.php中写我们的主要通信逻辑,本项目主要涉及离线消息推送、在线用户状态显示、群聊、私聊等功能,根据自身情况去调整

<?php
/**
 * This file is part of workerman.
 *
 * Licensed under The MIT License
 * For full copyright and license information, please see the MIT-LICENSE.txt
 * Redistributions of files must retain the above copyright notice.
 *
 * @author walkor<walkor@workerman.net>
 * @copyright walkor<walkor@workerman.net>
 * @link http://www.workerman.net/
 * @license http://www.opensource.org/licenses/mit-license.php MIT License
 */

/**
 * 用于检测业务代码死循环或者长时间阻塞等问题
 * 如果发现业务卡死,可以将下面declare打开(去掉//注释),并执行php start.php reload
 * 然后观察一段时间workerman.log看是否有process_timeout异常
 */
//declare(ticks=1);
use GatewayWorker\Lib\Gateway;
use Workerman\MySQL\Connection;
use Workerman\Redis\Client;

/**
 * 主逻辑
 * 主要是处理 onConnect onMessage onClose 三个方法
 * onConnect 和 onClose 如果不需要可以不用实现并删除
 */
class Events
{
    /**
     * 新建静态成员,用来保存数据库实例
     */
    public static $db = null;
    public static $redis = null;
    /**
     * 进程启动后初始化数据库连接
     */
    public static function onWorkerStart()
    {
        self::$db = new Connection('IP', '端口号', '账户', '密码', '数据库');
        self::$redis = new Client('redis://127.0.0.1:6379');
    }
    /**
     * 当客户端连接时触发
     * 如果业务不需此回调可以删除onConnect
     * 
     * @param int $client_id 连接id
     */
    public static function onConnect($client_id)
    {

    }
    
   /**
    * 当客户端发来消息时触发
    * @param int $client_id 连接id
    * @param mixed $message 具体消息
    */
   public static function onMessage($client_id, $message)
   {
       //fromUid,toUid,isGroup,type为必须参数
       $msg=json_decode($message,true);//来源消息
       Gateway::bindUid($client_id, $msg['fromUid']);//将client_id与uid绑定
       $guid=self::$db->select('friendguid')->from('oh_ai_groupfriend')->where("uid={$msg['fromUid']}")->query();
       foreach ($guid as $k=>$v){
           Gateway::joinGroup($client_id,$v['friendguid']);//将client_id与群uid绑定
       }
       //常见事件(只处理,不给客户端发送)
       switch ($msg['type']){
           //离线消息(可选)
           case 'pull':
               self::$redis->lrange($msg['fromUid'],0,-1,function ($list) use ($msg) {
               if(!empty($list)){
                   GateWay::sendToUid($msg['fromUid'], json_encode(['message'=>$list,'type'=>'pull']));
                   self::$redis->del($msg['fromUid']);//清除redis缓存
               }
           });
               return false;
           //上线
           case 'online':
               Gateway::sendToAll(json_encode(['list'=>array_keys(Gateway::getAllUidList()),'type'=>'online']));
               return false;
           //心跳
           case 'heart':
               GateWay::sendToUid($msg['fromUid'], json_encode(['type'=>'heart','group'=>Gateway::getClientSessionsByGroup(10000),'client'=>$client_id]));
               return false;
           //创建群聊
           case 'create_group':
               Gateway::joinGroup($client_id,$msg['toUid']);
               return false;
           //加入群聊
           case 'join_group':
               $client_id_array=Gateway::getClientIdByUid($msg['fromUid']);
               Gateway::joinGroup($client_id_array[0],$msg['toUid']);
               return false;
           //解散群聊
           case 'dismiss_group':
               GateWay::ungroup($msg['toUid']);
               return false;
           //退出群聊
           case 'leave_group':
               Gateway::leaveGroup($client_id, $msg['toUid']);
               return false;
           //踢出群聊
           case 'kick_group':
               $client_id_array=Gateway::getClientIdByUid($msg['fromUid']);
               Gateway::leaveGroup($client_id_array[0], $msg['toUid']);
               return false;
       }
       //自己和自己发消息,不执行任何操作
       if($msg['fromUid']==$msg['toUid']){
           return false;
       }
       //处理要发送的数据
       //私聊
       if($msg['isGroup']==0){
           self::sendMsg($msg['toUid'],$msg);
       }
       //群聊
       else{
           self::sendMsgGroup($msg['fromUid'],$msg['toUid'],$msg);
       }
       return false;
   }
   
   /**
    * 当用户断开连接时触发
    * @param int $client_id 连接id
    */
   public static function onClose($client_id)
   {
       //向所有客户端通知下线
       $list=array_keys(Gateway::getAllUidList());
       GateWay::sendToAll(json_encode(['list'=>$list,'type'=>'online']));
   }
   /**
    * 发送私聊消息
    * @param int $uid 目标uid
    * @param array $message 消息内容
    */
   public static function sendMsg($uid,$message){
       //判断是否为好友
       if($message['type']!=='read' && $message['type']!=='friend_request' && $message['type']!=='friend_response' && $message['type']!=='group_request' && $message['type']!=='group_response'){
           $db=self::$db->select('frienduid')->from('oh_ai_friend')->where("uid={$message['toUid']} and frienduid={$message['fromUid']}")->single();
           if(empty($db)){
               GateWay::sendToUid($message['fromUid'], json_encode(['id'=>$message['id'],'type'=>'stranger']));
               return false;
           }
       }
       //如果对方不在线
       if(!Gateway::isUidOnline($uid)){
           if($message['type']!=='heart' && $message['type']!=='online'){
               self::$redis->set('ai_msg_time',date('Y-m-d',time()));
               self::$redis->rpush($uid,json_encode($message));
               self::$redis->get('ai_msg_time', function ($date) use ($uid) {
                   if($date!==date('Y-m-d',time())){
                       self::$redis->expire($uid,43200);
                   }
               });
           }
       }
       //如果对方在线
       else{
           GateWay::sendToUid($uid, json_encode($message));
       }
       return false;
   }
    /**
     * 发送群聊消息
     * @param int $uid 发送者uid
     * @param int $guid 目标群uid
     * @param array $message 消息内容
     */
    public static function sendMsgGroup($uid, $guid, $message)
    {
        //判断用户是否在此群
        $db=self::$db->select('friendguid')->from('oh_ai_groupfriend')->where("uid=$uid and friendguid=$guid")->single();
        if(empty($db)){
            GateWay::sendToUid($message['fromUid'], json_encode(['id'=>$message['id'],'type'=>'stranger']));
            return false;
        }
        $group_info=self::$db->select('is_at,ai_id')->from('oh_ai_group')->where("guid=$guid")->row();
        $client_id_array=Gateway::getClientIdByUid($uid);
        Gateway::sendToGroup($guid,json_encode($message),$client_id_array[0]);//一定要排除自己
        return false;
    }

最后是TP框架主动推送消息的一种方法:

需要安装GatewayClient

总体思路图如下

img

<?php


namespace app\workerman\controller;

use app\BaseController;
use GatewayClient\Gateway;
use think\facade\Db;

class RestfulApi extends BaseController
{
    public function __construct()
    {
        Gateway::$registerAddress = '127.0.0.1:1238';
    }
    public function sendMsg(){
        $message=input('post.message');
        // 向任意uid发送数据
        Gateway::sendToUid($message['toUid'], json_encode($message));
        return success('发送成功',$message);
    }
    public function sendGroupMsg(){
        $message=input('post.message');
        $client_id_array=Gateway::getClientIdByUid($message['fromUid']);
        Gateway::sendToGroup($message['toUid'],json_encode($message),$client_id_array[0]);//一定要排除自己
        return success('发送成功',$message);
    }
}