今天来分享下个人搭建IM的心得
GatewayWorker基于Workerman开发的一个项目框架,用于快速开发TCP长连接应用,例如app推送服务端、即时IM服务端、游戏服务端、物联网、智能家居等等
GatewayWorker使用经典的Gateway和Worker进程模型。Gateway进程负责维持客户端连接,并转发客户端的数据给BusinessWorker进程处理,BusinessWorker进程负责处理实际的业务逻辑(默认调用Events.php处理业务),并将结果推送给对应的客户端。Gateway服务和BusinessWorker服务可以分开部署在不同的服务器上,实现分布式集群。
GatewayWorker提供非常方便的API,可以全局广播数据、可以向某个群体广播数据、也可以向某个特定客户端推送数据。配合Workerman的定时器,也可以定时推送数据。
第一步
composer安装:
composer require workerman/gateway-worker
(可选,用于服务端主动推送消息)
composer require workerman/gatewayclient
(可选,用于连接数据库)
composer require workerman/mysql
(可选,用于连接redis)
composer require workerman/redis
第二步
删掉start.php中的require_once DIR . ‘/vendor/autoload.php’;
原因是tp有自己的自动加载机制,貌似在集成composer自动加载机制时有BUG
第三步
在start_gateway.php中创建wss服务,因为小程序要用websocket必须是wss:
假设我的证书目录是统一放在tp根目录,start_gateway.php都放在application里的workerman里
记得把原来的tcp链接要改掉!不然bug
// 证书最好是申请的证书
$context = array(
// 更多ssl选项请参考手册 http://php.net/manual/zh/context.ssl.php
'ssl' => array(
// 请使用绝对路径
'local_cert' => __DIR__.'/../../certificate/websocket/wss.pem', // 也可以是crt文件
'local_pk' => __DIR__.'/../../certificate/websocket/wss.key',
'verify_peer' => false,
// 'allow_self_signed' => true, //如果是自签名证书需要开启此选项
)
);
// websocket协议(端口任意,只要没有被其它程序占用就行)
$gateway = new Gateway("websocket://0.0.0.0:443", $context);
// 开启SSL,websocket+SSL 即wss
$gateway->transport = 'ssl';
基本环境已经配置好了,现在我们可以在前端先测试一下
// 证书是会检查域名的,请使用域名连接
ws = new WebSocket("wss://test.com:443");
ws.onopen = function() {
console.log("连接成功");
ws.send('tom');
console.log("给服务端发送一个字符串:tom");
};
ws.onmessage = function(e) {
console.log("收到服务端的消息:" + e.data);
};
ws.onerror =function(e){
console.log(e);
}
第四步
接下来开始在events.php中写我们的主要通信逻辑,本项目主要涉及离线消息推送、在线用户状态显示、群聊、私聊等功能,根据自身情况去调整
<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/
/**
* 用于检测业务代码死循环或者长时间阻塞等问题
* 如果发现业务卡死,可以将下面declare打开(去掉//注释),并执行php start.php reload
* 然后观察一段时间workerman.log看是否有process_timeout异常
*/
//declare(ticks=1);
use GatewayWorker\Lib\Gateway;
use Workerman\MySQL\Connection;
use Workerman\Redis\Client;
/**
* 主逻辑
* 主要是处理 onConnect onMessage onClose 三个方法
* onConnect 和 onClose 如果不需要可以不用实现并删除
*/
class Events
{
/**
* 新建静态成员,用来保存数据库实例
*/
public static $db = null;
public static $redis = null;
/**
* 进程启动后初始化数据库连接
*/
public static function onWorkerStart()
{
self::$db = new Connection('IP', '端口号', '账户', '密码', '数据库');
self::$redis = new Client('redis://127.0.0.1:6379');
}
/**
* 当客户端连接时触发
* 如果业务不需此回调可以删除onConnect
*
* @param int $client_id 连接id
*/
public static function onConnect($client_id)
{
}
/**
* 当客户端发来消息时触发
* @param int $client_id 连接id
* @param mixed $message 具体消息
*/
public static function onMessage($client_id, $message)
{
//fromUid,toUid,isGroup,type为必须参数
$msg=json_decode($message,true);//来源消息
Gateway::bindUid($client_id, $msg['fromUid']);//将client_id与uid绑定
$guid=self::$db->select('friendguid')->from('oh_ai_groupfriend')->where("uid={$msg['fromUid']}")->query();
foreach ($guid as $k=>$v){
Gateway::joinGroup($client_id,$v['friendguid']);//将client_id与群uid绑定
}
//常见事件(只处理,不给客户端发送)
switch ($msg['type']){
//离线消息(可选)
case 'pull':
self::$redis->lrange($msg['fromUid'],0,-1,function ($list) use ($msg) {
if(!empty($list)){
GateWay::sendToUid($msg['fromUid'], json_encode(['message'=>$list,'type'=>'pull']));
self::$redis->del($msg['fromUid']);//清除redis缓存
}
});
return false;
//上线
case 'online':
Gateway::sendToAll(json_encode(['list'=>array_keys(Gateway::getAllUidList()),'type'=>'online']));
return false;
//心跳
case 'heart':
GateWay::sendToUid($msg['fromUid'], json_encode(['type'=>'heart','group'=>Gateway::getClientSessionsByGroup(10000),'client'=>$client_id]));
return false;
//创建群聊
case 'create_group':
Gateway::joinGroup($client_id,$msg['toUid']);
return false;
//加入群聊
case 'join_group':
$client_id_array=Gateway::getClientIdByUid($msg['fromUid']);
Gateway::joinGroup($client_id_array[0],$msg['toUid']);
return false;
//解散群聊
case 'dismiss_group':
GateWay::ungroup($msg['toUid']);
return false;
//退出群聊
case 'leave_group':
Gateway::leaveGroup($client_id, $msg['toUid']);
return false;
//踢出群聊
case 'kick_group':
$client_id_array=Gateway::getClientIdByUid($msg['fromUid']);
Gateway::leaveGroup($client_id_array[0], $msg['toUid']);
return false;
}
//自己和自己发消息,不执行任何操作
if($msg['fromUid']==$msg['toUid']){
return false;
}
//处理要发送的数据
//私聊
if($msg['isGroup']==0){
self::sendMsg($msg['toUid'],$msg);
}
//群聊
else{
self::sendMsgGroup($msg['fromUid'],$msg['toUid'],$msg);
}
return false;
}
/**
* 当用户断开连接时触发
* @param int $client_id 连接id
*/
public static function onClose($client_id)
{
//向所有客户端通知下线
$list=array_keys(Gateway::getAllUidList());
GateWay::sendToAll(json_encode(['list'=>$list,'type'=>'online']));
}
/**
* 发送私聊消息
* @param int $uid 目标uid
* @param array $message 消息内容
*/
public static function sendMsg($uid,$message){
//判断是否为好友
if($message['type']!=='read' && $message['type']!=='friend_request' && $message['type']!=='friend_response' && $message['type']!=='group_request' && $message['type']!=='group_response'){
$db=self::$db->select('frienduid')->from('oh_ai_friend')->where("uid={$message['toUid']} and frienduid={$message['fromUid']}")->single();
if(empty($db)){
GateWay::sendToUid($message['fromUid'], json_encode(['id'=>$message['id'],'type'=>'stranger']));
return false;
}
}
//如果对方不在线
if(!Gateway::isUidOnline($uid)){
if($message['type']!=='heart' && $message['type']!=='online'){
self::$redis->set('ai_msg_time',date('Y-m-d',time()));
self::$redis->rpush($uid,json_encode($message));
self::$redis->get('ai_msg_time', function ($date) use ($uid) {
if($date!==date('Y-m-d',time())){
self::$redis->expire($uid,43200);
}
});
}
}
//如果对方在线
else{
GateWay::sendToUid($uid, json_encode($message));
}
return false;
}
/**
* 发送群聊消息
* @param int $uid 发送者uid
* @param int $guid 目标群uid
* @param array $message 消息内容
*/
public static function sendMsgGroup($uid, $guid, $message)
{
//判断用户是否在此群
$db=self::$db->select('friendguid')->from('oh_ai_groupfriend')->where("uid=$uid and friendguid=$guid")->single();
if(empty($db)){
GateWay::sendToUid($message['fromUid'], json_encode(['id'=>$message['id'],'type'=>'stranger']));
return false;
}
$group_info=self::$db->select('is_at,ai_id')->from('oh_ai_group')->where("guid=$guid")->row();
$client_id_array=Gateway::getClientIdByUid($uid);
Gateway::sendToGroup($guid,json_encode($message),$client_id_array[0]);//一定要排除自己
return false;
}
最后是TP框架主动推送消息的一种方法:
需要安装GatewayClient
总体思路图如下
<?php
namespace app\workerman\controller;
use app\BaseController;
use GatewayClient\Gateway;
use think\facade\Db;
class RestfulApi extends BaseController
{
public function __construct()
{
Gateway::$registerAddress = '127.0.0.1:1238';
}
public function sendMsg(){
$message=input('post.message');
// 向任意uid发送数据
Gateway::sendToUid($message['toUid'], json_encode($message));
return success('发送成功',$message);
}
public function sendGroupMsg(){
$message=input('post.message');
$client_id_array=Gateway::getClientIdByUid($message['fromUid']);
Gateway::sendToGroup($message['toUid'],json_encode($message),$client_id_array[0]);//一定要排除自己
return success('发送成功',$message);
}
}