阅读即时通讯应用中的网络心跳包机制:功能、原理、实现思路等。

时间:2020-10-20

局域网即时通讯


 1.介绍

 一般来说,如果没有网络通信应用程序的真正动手开发人员,很难想象心跳机制在即时通讯应用程序中的作用。然而,不可否认的是,作为即时通讯,心跳机制是其网络通信技术底层的一个非常重要的环节。该算法是在心跳机制中实现还是在心跳机制中实现,将直接影响应用层即时通讯应用的性能,如实时性、断开网络的自愈能力、弱网络体验等。

 总之,如果你真的想了解即时通讯应用程序的开发,心跳机制必须掌握它,这也是本文的目的,希望能给你一些启发。

 应该注意的是,本文涉及的示例代码是用C/C++语言编写的,但本文介绍的心跳包机制的设计思想和注意事项都是通用的原则,也适用于其他编程语言。虽然语言可以不同,但逻辑上没有区别!

 2.相关文章

 “为什么基于TCP的移动即时消息仍然需要心跳保持机制?”(推荐)

 “微信团队原创分享:安卓版微信背景保活战斗分享(网络保活文章)”(推荐)

 “移动即时通讯实践:安卓版微信智能心跳机制的实现”

 “移动即时通讯实践:WhatsApp、Line和微信心跳策略分析”

 教你用Netty实现网络通讯程序的心跳机制和断线重连机制

 "安卓端消息推送概述:实现原理、心跳保持、遇到的问题等."

 3.为什么你需要心跳机制?

 考虑以下两种典型的即时通讯网络层问题场景:

 1)场景1:客户端连接到服务器后,如果长时间没有与服务器进行数据通信,连接可能会被防火墙程序关闭,有时我们不想被关闭。例如,对于一个即时通讯软件,如果服务器上没有消息,我们确实不会与服务器交换任何数据,但是如果连接被关闭,当新消息到来时,我们就不能再接收它们,这违反了“即时通讯”的设计要求。

 2)场景2:通常,服务器和客户端不在同一个网络中,它们之间可能有多个路由器和交换机。如果其中一台必要的路由器或交换机出现故障,并且在一段时间内无法恢复,它们之间的链路将不再畅通。此时,服务器和客户端之间没有数据交换。因为TCP连接是一个状态机,在这种情况下,客户端和服务器都无法检测到与另一方的连接是否正常。

 对于上述问题,即时通讯应用通常的解决方案:

 1)对于场景1:这个应用场景要求客户机和服务器之间的连接必须保持正常,这就是我们通常所说的“保持活动”。如上所述,当服务器和客户端之间在一段时间内没有有效的业务数据时,我们只需要向对方发送一个心跳包就可以实现保活。

 2)在情况2中:为了解决死链问题,我们可以通过此时从任一端向另一端发送数据包来检测链路是否正常。这种数据包也称为“心跳包”,这种操作称为“心跳检测”。顾名思义,如果一个人没有心跳,他可能已经死了;如果连接长时间没有正常的数据和心跳数据包,则可以认为该连接不再存在。为了节省服务器连接资源,我们可以通过关闭套接字来回收连接资源。

 总之,心跳检测机制一般有两个功能:

 1)保持活力;

 2)检测死链。

 针对上述问题,在即时通讯网络中的另一篇文章:“为什么基于TCP的移动终端即时消息仍然需要心跳保持机制?”,也很值得一读。

 4.TCP的保活选项

 附言:如果您不知道什么是tcp保持活动,建议您阅读:TCP/IP详细说明-第23章TCP保持活动计时器。

 操作系统的TCP/IP协议栈实际上提供了这个功能,即保持活动选项。在Linux操作系统中,我们可以通过代码启用套接字的心跳检测(也就是说,定期向对方发送心跳检测数据包)。

 代码如下:

 123//on为1表示打开keepalive选项,0表示关闭,0表示int on = 1的默认值;setsockopt(fd、SOL_SOCKET、SO_KEEPALIVE、on、sizeof(on));

 但是,即使打开此选项,发送心跳检测数据包的默认时间间隔也是7200秒(2小时),这对于使用来说太长了。

 我们可以通过继续设置与保持活动相关的三个选项来改变这个时间间隔,这三个选项是:保持空闲、保持连接和保持连接。

 示例代码如下:

 01020304050607080910//发送保持活动消息的时间间隔int val = 7200setsockopt(fd,IPPROTO_TCP,TCP_KEEPIDLE,&val,size of(val));//两次重试之间的时间间隔是整数间隔= 75;setsockopt(fd,IPPROTO_TCP,TCP_KEEPINTVL,&interval,size of(interval));int CNT = 9;setsockopt(fd,IPPROTO_TCP,TCP_KEEPCNT,&cnt,sizeof(CNT));

 如果对方在发送消息时回复确认,则设置发送保活消息的时间间隔。则本地终端的TCP协议栈认为该连接仍然有效,并继续等待7200秒后发送保活消息;如果对等方回复“重置”,这意味着对等方进程已经重新启动,本地应用程序应该关闭连接。

 如果没有来自对端的应答,本地端将重试,并且如果在9次重试之后仍然无法到达(TCP_KEEPCNT值) (重试间隔前后75秒(TCP_KEEPINTVL值)),它将向应用程序返回ETIMEOUT(无应答)或EHOST错误消息。

 我们可以使用以下命令来检查上述三个值在Linux系统上的设置:

 1234[root @ Iz 238 VnojLyz ~]# sysctl-a | grep keepalivenet . IP v4 . TCP _ keepalive _ intvl = 75 net . IP v4 . TCP _ keepalive _ probes = 9net . IP v4 . TCP _ keepalive _ time = 7200

 在Windows系统中设置keepalive和相应选项的代码略有不同:

 01020304050607080910111213//open keepalive option const char on = 1;setsockopt(SOLL,SOL_SOCKET,SO _ KEEPALIVE,(char *)&on,size of(on);//设置超时详细信息DWORD cbBytesReturnedtcp _ keepalive klive//启用keepalive。onoff = 1;klive . keepalivetime = 7200;//重试间隔为10秒钟。keepalive间隔= 1000 * 10;WSAIoctl(套接字,SIO_KEEPALIVE_VALS,&klive,sizeof(tcp_keepalive),空,0,&cbBytesReturned,空,空);

 5.应用层心跳包机制的设计

 因为需要为连接中的每个套接字打开keepalive选项,所以没有必要,这可能会导致大量无意义的带宽浪费,并且keepalive选项无法与应用层很好地交互。因此,在一般的实际服务开发中,建议读者在应用层设计自己的心跳包机制。

 那么如何设计它呢?

 从技术上讲,心跳包实际上是一个预定义的数据包,可以通过启动程序中的计时器定期发送,这是实现它的最简单的方法。

 然而,如果在通信的两端之间有频繁的数据交换,那么是时候在下一次发送心跳分组了。事实上,这是对交通的浪费。由于通信双方不断有正常的业务数据包进出,这些数据包本身可以保持活跃。为什么要浪费流量来发送这些心跳数据包?

 因此,对于用于保持活动的心跳包,我们的最佳做法是设置最后一个包时间,并在每次接收和发送数据时更新该包时间,并在每次心跳检测计时器检测到时将该包时间与当前系统时间进行比较,如果时间间隔大于最大允许时间间隔(根据实际开发的要求设置为15 ~ 45秒),则发送心跳包。总而言之,当在某个时间间隔内没有数据往返于另一端时,心跳数据包只发送一次。

 发送心跳数据包的伪代码示例:

 0010020030040050060070080090100110120130140150160170180190200210220230230240250260270270280280290303103203330340350360370370380303303302023,00202020202020202020202020202020202020int NRet = 0;while(true){ NRet =::send(m _ HSocket,m_strSendBuf.c_str(),m_strSendBuf.length(),0);if(NRet = = SOCKET _ ERROR){ if(::WSageTLasterror()= = WSAEWALDBLOCK)中断;否则{ LOG_ERROR("发送数据错误,断开服务器:%s,端口:%d . ",_ str服务器. c_str(),m _ nPORT);关闭();返回false}}否则,如果(nRet < 1) {//立即关闭套接字log _ error(“发送数据错误,断开服务器:% s,端口:% d,”,m _ strserver。c _ str(),m _ nport);关闭();返回false} m_strSendBuf.erase(0,NRet);if (m_strSendBuf.empty())分隔符;*睡眠(1);} {//记录最新的合同签发时间STD::lock _ guard guard(m _ mutexlastdataime);m_nLastDataTime =(长)Time(空);}返回真;} bool CiuSocket::Recv(){ int NRet = 0;充电缓冲器[10 * 1024];而(真){ nRet = ::recv(m_hSocket,buff,10 * 1024,0);If (nRet == SOCKET_ERROR) //一旦发生错误就关闭套接字{ if(::wsagelasterror()= = wsaewoullock)中断;否则{ LOG_ERROR("Recv数据错误,错误号= % d " ,::WSageTLasterror());//关闭();返回false} }否则如果(nRet < 1) { LOG_ERROR("Recv数据错误,错误号= % d " ,::WSageTLasterror());//关闭();返回false} m_strRecvBuf.append(buff,NRet);*睡眠(1);} { STD::lock _ guard guard(m _ MutexlastDataTime);//记录最新的数据包接收时间m _ nlastdatatim e =(长)tim e(空);}返回真;} void CiuSocket::RecvthreadProc(){ LOG _ INFO(" Recv数据线程开始...");int nRet//互联网访问方法。BOOL bAlive同时(!M_bStop) {//检测到数据时接收数据NRet = CheckReceivedData();//错误if(nret = =-1){ m _ prec vmsgthread-> notifyneterror();}//如果(nret = = 0){ long nlastdataime = 0,则没有其他数据;{ STD::lock _ guard guard(m _ MutexlastDataTime);nLastDataTime = m _ nLastDataTime;}如果(m_nHeartbeatInterval > 0) {//当前系统时间与最后一次发送和接收数据包之间的时间间隔超过m_nHeartbeatInterval //发送心跳数据包,如果(time(null)-nlastdatime > = m _ nHeartInterval)Send heart package();} }//如果(NRet = = 1){如果(!recv()){ m _ PrecVmsgThread-> NotifyNetError();继续;}装饰包装();}//end if }//end while-loop LOG _ INFO(" Recv数据线程完成...");}

 同样,检测心跳包的一端应该只在某个时间间隔内没有与另一端进行数据通信时才进行心跳检测。

 心跳检测端的伪代码示例如下:

 0102030405060708091011121314151617181920212223242526272829303132333334353536373839404142434445464748495051525555565755557557585555555555555555555555555555555555555555555555555555//发送数据更新后,发送合同的时间为UpdateHeartBeattime k1/]e();} void BuSINESS Session::HandlePackge(char * PmsG,int msgLength,bool& closeSession,Std::vector& vectorResponse){ //检查数据的有效性,如果(Pmsg = = null | | Pmsg[0]= = 0 | | Msglength max _ data _ length){//非法窥探请求,没有任何响应,直接关闭连接closeSession = true返回;}//更新包接收时间updateheartime();//省略包处理代码...}无效业务会话::updateheartime(){ STD::lock _ guard scoped _ guard(m _ mutexforlast包ime);m _ LastPackageTime =(int 64 _ t)time(null ptr);}布尔业务会话::doHeartbeatCheck(){常量配置& cfg =单例::实例();int 64 _ t now =(int 64 _ t)time(null ptr);STD::lock _ guard lock _ guard(m _ MutexforLastPackageTime);if(now-m _ last packetime > = CFG . m _ nmaxclientdata interval){//心跳数据包检测,超时,关闭连接日志(“心跳过期,关闭会话”);关机();返回真;}返回false} void TCPserver::CheckSessionHeartBet(){ int 64 _ t now =(int 64 _ t)time(null ptr);if(now-m _ NlastCheckHeartBeattime > = m _ nHeartBeatchInterval){ m _ SpsessionManager-> CheckSessionHeartBet();m _ NlastCheckHeartBeattime =(int 64 _ t)time(null ptr);} } void会话管理器::CheckSessionHeartBet(){ STD::lock _ guard作用域_ lock(m _ MutexForSession);适用于(const auto & ITER:m _ mapsessions){//此处请致电business session::doheartbeatchcheck()ITER。第二-> doheartbeatchcheck();} }

 应该注意的是,通常情况下,客户端主动向服务器发送心跳数据包,服务器进行心跳检测以决定是否断开连接,反之亦然。从客户端的角度来看,为了从服务器获得正常的服务,客户端必须主动保持与服务器的连接状态,而服务器不限于特定的客户端。如果客户端不能主动保持与它的连接,它将主动恢复与客户端的连接。当然,当服务器从客户端收到心跳包时,应该给客户端一个心跳响应。

 6.包含业务数据的心跳数据包

 上面描述的心跳包纯粹是从技术角度来看的。在实际应用中,有时我们需要定期或不定期地更新服务器上的一些数据。我们可以将这类数据放入心跳数据包中,并定期或不定期地进行更新。

 这种带有业务数据的心跳包不再是纯粹的技术(这里技术的作用是指上面介绍的心跳包在保持活力和检测死链中的作用)。

 这种心跳包也易于实现,即在心跳包数据结构中添加所需的服务字段信息,然后在定时器中定期发送,客户端发送给服务器,服务器在响应心跳包中填写约定的服务数据信息。

 7.心跳数据包和流量

 通常,在大多数应用程序场景中,在保持与服务器连接的多个客户端中,在同一时间段内没有太多的活动用户(在这种情况下,是与服务器频繁交换数据的客户端)。当有大量连接时,进入和离开服务器程序的包通常是心跳包(为了保持活动)。因此,为了减轻网络编码压力和节省流量,特别是对于一些3/4 G手机应用,在设计心跳包数据格式时,应该尽量减小心跳包的数据量。

 8.心跳包和调试

 如上所述,对于心跳分组,当在特定时间间隔内没有接收到客户端心跳分组时,服务器端的逻辑通常自动断开连接。在开发调试程序的过程中,我们可能需要通过断点来中断程序,这可能需要几秒到几十秒的时间。当程序恢复执行时,由于心跳检测逻辑,连接可能已断开。

 在调试过程中,我们更注重业务数据处理的逻辑是否正确,而不希望被一堆无意义的心跳数据包所干扰。

 鉴于以上两个原因,我们通常在调试模式下关闭或禁用心跳数据包检测机制。

 代码示例大致如下:

 0102030405060708091011121314 ChatSession::ChatSession(const STD::shared _ ptr & conn,int session id):TCPseSession(conn),m_id(sessionid),m_seq(0),m _ isLogin(false){ m _ user info . userid = 0;m _ LastPackageTime = time(空);//这里,心跳包检测功能# ifndef _ debugenablehairbeatcheck()设置为非调试模式;#endif}

 当然,您也可以将打开心跳检测的开关转换为配置信息,并将其放入程序配置文件中。

 9.心跳数据包和日志

 在实际生产环境中,我们通常会将程序接收和发送的数据包写入日志,但是没有业务信息的心跳数据包信息是一个例外,一般不会写入日志。这是因为有大量心跳数据包数据。如果日志被写入,日志文件将变得非常大,并且充满了大量无意义的心跳数据包日志,因此在写入日志时心跳数据包信息将被阻止。

 我在这里的建议是:心跳包信息是否写入日志可以做成一个配置开关,它通常是关闭的,并在必要时再次打开。

 例如,对于WebSocket服务,ping和pong是心跳数据包数据,下面的示例代码按需输出心跳日志信息:

 01020304050607080910111213141516 void Business Session::send(STD::string _ view strResponse){ bool success = WebSocketSession::send(strResponse);if(成功){ bool EnablePingPongLog = Singleton::实例()。m _ bPingPongLogEnabled//其他消息正常打印,心跳消息按需打印,如果(响应!= " pong " | | EnablePingPongLog){ LOGI("消息发送到客户端[%s],会话Id: %s,会话:0x%0x,客户端Id: %s,帐户Id: %s,frontId: %s,消息:%s ",getClientInfo(),m_strSessionId.c_str(),(int64_t)this,m _ StrCreditId . c _ str(),m _ strAccountID.c _ str(),业务会话::m_strFrontId.c_str(),StrResponse . data());} }}