全部
常见问题
产品动态
精选推荐

一分钟了解长连接 、短连接、心跳机制与断线重连

管理 管理 编辑 删除

短连接

概念

client与server通过三次握手建立连接,client发送请求消息,server返回响应,一次连接就完成了。

这时候双方任意都可以发起close操作,不过一般都是client先发起close操作。由于短连接一般只会在 client/server 间传递一次请求操作,因此短连接的特点是连接生命周期短暂,连接建立和断开的开销较大,适用于单次请求响应的场景。

短连接的优缺点

管理起来比较简单,存在的连接都是有用的连接,不需要额外的控制手段。

使用场景

通常情况下,当浏览器访问服务器时,采用的是短连接的方式。

对于服务端而言,长连接会消耗大量的资源,而且用户使用浏览器对服务端的访问频率相对较低。如果同时存在几十万甚至上百万的连接,则服务端的压力将非常巨大,甚至可能导致崩溃。

因此,针对并发量高但请求频率低的情况,建议使用短连接。

为了优化这种情况,可以考虑以下方法:

1. 进行连接池管理:使用连接池来管理与服务端的连接,避免每次请求都建立和关闭连接,减少资源的消耗。

2. 使用缓存机制:将一些不经常变动且占用资源较多的数据进行缓存,减少对服务端的请求,提高性能。

3. 引入负载均衡:通过负载均衡技术将请求分发到多个服务器上,均衡服务器的压力,提高整体的处理能力。

4. 优化服务端架构:对服务端进行优化,如增加服务器的处理能力、调整服务器配置等,以提高服务端的并发处理能力。

长连接

什么是长连接

客户端向服务器发起连接,服务器接受客户端连接并建立双方连接。

客户端和服务器完成一次读写后,它们之间的连接不会主动关闭,并可以继续使用该连接进行后续的读写操作。

长连接的生命周期

在正常情况下,一条TCP长连接建立后,只要双方不提出关闭请求并且不出现异常情况,这条连接会一直存在。操作系统不会主动关闭它,即使在经过物理网络拓扑的改变之后仍然可以使用。因此,一条连接可以保持几天、几个月、几年甚至更长时间,只要没有异常情况或用户(应用层)主动关闭。

客户端和服务端可以一直使用该连接进行数据通信。

长连接的优点

使用长连接可以减少TCP建立和关闭操作,从而减少网络阻塞。即使发生错误,也不需要关闭连接就能进行提示,这样可以减少CPU和内存的使用,因为不需要频繁地建立和关闭连接。

长连接的缺点

连接数过多时,影响服务端的性能和并发数量。

使用场景

数据库的连接就是采用TCP长连接.

RPC,远程服务调用,在服务器,一个服务进程频繁调用另一个服务进程,可使用长连接,减少连接花费的时间。

总结

1.对于长连接和短连接的使用是需要根据应用场景来判断的

2.长连接并不是万能的,也是需要维护的,

长连接的实现

心跳机制

应用层协议通常会使用心跳机制来保持客户端与服务器的连接,并确保客户端仍然在线。典型的心跳协议如IM协议(例如QQ、MSN、飞信)会定期发送数据包给服务器,同时传输一些可能必要的数据。

在TCP协议中,也有一个心跳机制,即TCP选项中的SO_KEEPALIVE。系统默认设置为2小时发送一次心跳包。但是这个机制无法检测机器断电、网线拔出或防火墙等导致的断线情况。此外,逻辑层处理断线情况也可能不够完善。通常情况下,如果只是用于保活目的,SO_KEEPALIVE机制仍然是可以接受的。

请注意以下优化建议:

1. 调整心跳频率:根据实际情况,可以根据应用需求调整心跳频率。太频繁的心跳包可能造成额外的网络负担,而太不频繁则可能延迟检测到断线情况。

2. 使用应用层心跳机制:考虑使用应用层心跳机制,而不仅仅依赖于TCP的SO_KEEPALIVE。应用层心跳机制能够更灵活地处理不同情况下的断线问题,并能够传递更多的必要数据。

3. 完善断线处理逻辑:在应用层实现断线处理逻辑,包括重新连接、重发未成功的数据等。确保断线后客户端能够尽快恢复连接,并保持数据的完整性和一致性。

4. 测试和监控:定期测试心跳机制的有效性,并监控断线情况以及处理效果。及时发现并解决可能存在的问题。

为什么需要心跳机制?

由于网络的不可靠性,TCP长连接可能会在某些突发情况下断开,例如网线被拔出或突然掉电。在这种情况下,如果服务器和客户端之间没有交互,它们不能立即发现对方已掉线。为解决这个问题,可以引入心跳机制。

TCP协议的KeepAlive机制

默认KeepAlive状态是不打开的。

需要将setsockopt将SOL_SOCKET.SO_KEEPALIVE设置为1才是打开KeepAlive状态,

并且可以设置三个参数:

tcp_keepalive_time ,tcp_keepalive_probes , tcp_keepalive_intvl

分别表示:连接闲置多久开始发keepalive的ack包、发几个ack包不回复才当对方已断线、两个ack包之间的间隔。

很多网络设备,尤其是NAT路由器,由于其硬件的限制(例如内存、CPU处理能力),无法保持其上的所有连接,因此在必要的时候,会在连接池中选择一些不活跃的连接踢掉。

典型做法是LRU,把最久没有数据的连接给T掉。

通过使用TCP的KeepAlive机制(修改那个time参数),可以让连接每隔一小段时间就产生一些ack包,以降低被踢掉的风险,当然,这样的代价是额外的网络和CPU负担。

如何实现心跳机制?

两种方式实现心跳机制:

  • 使用 TCP 协议层面的 keepalive 机制.
  • 在应用层上实现自定义的心跳机制.

虽然在 TCP 协议层面上, 提供了 keepalive 保活机制, 但是使用它有几个缺点:

  1. 它不是 TCP 的标准协议, 并且是默认关闭的.
  2. TCP keepalive 机制依赖于操作系统的实现, 默认的 keepalive 心跳时间是 两个小时, 并且对 keepalive 的修改需要系统调用(或者修改系统配置), 灵活性不够.
  3. TCP keepalive 与 TCP 协议绑定, 因此如果需要更换为 UDP 协议时, keepalive 机制就失效了.

使用 TCP 层面的 keepalive 机制比自定义的应用层心跳机制节省流量,

本文的主要介绍应用层方面实现心跳机制,使用netty实现心跳和断线重连。

netty实现心跳机制

netty对心跳机制提供了机制,实现的关键是IdleStateHandler先来看一下他的构造函数

public IdleStateHandler(
            long readerIdleTime, long writerIdleTime, long allIdleTime,
            TimeUnit unit) {
        this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
    }

实例化一个 IdleStateHandler 需要提供三个参数:

  • readerIdleTimeSeconds, 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件.
  • writerIdleTimeSeconds, 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件.
  • allIdleTimeSeconds, 读和写都超时. 即当在指定的时间间隔内没有读并且写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.

netty心跳流程

3f0d2202310071633484377.png

1. 客户端成功连接服务端。

2.在客户端中的ChannelPipeline中加入IdleStateHandler,设置写事件触发事件为5s.

3.客户端超过5s未写数据,触发写事件,向服务端发送心跳包,

4.同样,服务端要对心跳包做出响应,其实给客户端最好的回复就是“不回复”,减轻服务端的压力

5.超过三次,1过0s服务端都会收到来自客户端的心跳信息,服务端可以认为客户端挂了,可以close链路。

6.客户端恢复正常,发现链路已断,重新连接服务端。

代码实现

服务端handler:

package com.heartbreak.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

import java.util.Random;

/\*\*
 \* @author janti
 \* @date 2018/6/10 12:21
 \*/
public class HeartbeatServerHandler extends SimpleChannelInboundHandler<String> {
    // 失败计数器:未收到client端发送的ping请求
    private int unRecPingTimes = 0;

    // 定义服务端没有收到心跳消息的最大次数
    private static final int MAX\_UN\_REC\_PING\_TIMES = 3;

    private Random random = new Random(System.currentTimeMillis());

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        if (msg!=null && msg.equals("Heartbeat")){
            System.out.println("客户端"+ctx.channel().remoteAddress()+"--心跳信息--");
        }else {
            System.out.println("客户端----请求消息----:"+msg);
            String resp \= "商品的价格是:"+random.nextInt(1000);
            ctx.writeAndFlush(resp);
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event \= (IdleStateEvent) evt;
            if (event.state()==IdleState.READER\_IDLE){
                System.out.println("===服务端===(READER\_IDLE 读超时)");
                // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连
                if (unRecPingTimes >= MAX\_UN\_REC\_PING\_TIMES) {
                    System.out.println("===服务端===(读超时,关闭chanel)");
                    // 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连
                    ctx.close();
                } else {
                    // 失败计数器加1
                    unRecPingTimes++;
                }
            }else {
                super.userEventTriggered(ctx,evt);
            }
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        System.out.println("一个客户端已连接");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        System.out.println("一个客户端已断开连接");
    }
}

服务端server:

package com.heartbreak.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.\*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

/\*\*
 \* @author tangj
 \* @date 2018/6/10 10:46
 \*/
public class HeartBeatServer {
    private static int port = 9817;

    public HeartBeatServer(int port) {
        this.port = port;
    }

    ServerBootstrap bootstrap \= null;
    ChannelFuture f;

    // 检测chanel是否接受过心跳数据时间间隔(单位秒)
    private static final int READ\_WAIT\_SECONDS = 10;

    public static void main(String args\[\]) {
        HeartBeatServer heartBeatServer \= new HeartBeatServer(port);
        heartBeatServer.startServer();
    }

    public void startServer() {
        EventLoopGroup bossgroup \= new NioEventLoopGroup();
        EventLoopGroup workergroup \= new NioEventLoopGroup();
        try {
            bootstrap \= new ServerBootstrap();
            bootstrap.group(bossgroup, workergroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new HeartBeatServerInitializer());
            // 服务器绑定端口监听
            f = bootstrap.bind(port).sync();
            System.out.println("server start ,port: "+port);
            // 监听服务器关闭监听,此方法会阻塞
            f.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossgroup.shutdownGracefully();
            workergroup.shutdownGracefully();
        }
    }


    private class HeartBeatServerInitializer extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline \= ch.pipeline();
            // 监听读操作,读超时时间为5秒,超过5秒关闭channel;
            pipeline.addLast("ping", new IdleStateHandler(READ\_WAIT\_SECONDS, 0, 0, TimeUnit.SECONDS));
            pipeline.addLast("decoder", new StringDecoder());
            pipeline.addLast("encoder", new StringEncoder());

            pipeline.addLast("handler", new HeartbeatServerHandler());
        }
    }

}

客户端handler

package com.heartbreak.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;

/\*\*
 \* @author tangj
 \* @date 2018/6/11 22:55
 \*/
public class HeartBeatClientHandler extends SimpleChannelInboundHandler<String>{
    private HeartBeatClient client;

    private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:dd");

    private static final ByteBuf HEARTBEAT\_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
            CharsetUtil.UTF\_8));

    public HeartBeatClientHandler(HeartBeatClient client) {
        this.client = client;
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("收到服务端回复:"+msg);
        if (msg.equals("Heartbeat")) {
            ctx.write("has read message from server");
            ctx.flush();
        }
        ReferenceCountUtil.release(msg);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState state \= ((IdleStateEvent) evt).state();
            if (state == IdleState.WRITER\_IDLE) {
                ctx.writeAndFlush(HEARTBEAT\_SEQUENCE.duplicate());
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        System.err.println("客户端与服务端断开连接,断开的时间为:"+format.format(new Date()));
        // 定时线程 断线重连
        final EventLoop eventLoop = ctx.channel().eventLoop();
        eventLoop.schedule(new Runnable() {
            @Override
            public void run() {
                client.doConncet();
            }
        }, 10, TimeUnit.SECONDS);
    }


}

客户端启动:

package com.heartbreak.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.\*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/\*\*
 \* @author tangj
 \* @date 2018/6/10 16:18
 \*/
public class HeartBeatClient {

    private Random random = new Random();
    public Channel channel;
    public Bootstrap bootstrap;

    protected String host = "127.0.0.1";
    protected int port = 9817;

    public static void main(String args\[\]) throws Exception {
        HeartBeatClient client \= new HeartBeatClient();
        client.run();
        client.sendData();

    }

    public void run() throws Exception {
        EventLoopGroup group \= new NioEventLoopGroup();
        try {
            bootstrap \= new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new SimpleClientInitializer(HeartBeatClient.this));
            doConncet();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /\*\*
     \* 发送数据
     \* @throws Exception
     \*/
    public void sendData() throws Exception {
        BufferedReader in \= new BufferedReader(new InputStreamReader(System.in));
        while (true){
            String cmd \= in.readLine();
            switch (cmd){
                case "close" :
                    channel.close();
                    break;
                default:
                channel.writeAndFlush(in.readLine());
                    break;
            }
        }
    }

    /\*\*
     \* 连接服务端
     \*/
    public void doConncet() {
        if (channel != null && channel.isActive()) {
            return;
        }
        ChannelFuture channelFuture \= bootstrap.connect(host, port);
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture futureListener) throws Exception {
                if (channelFuture.isSuccess()) {
                    channel \= futureListener.channel();
                    System.out.println("connect server successfully");
                } else {
                    System.out.println("Failed to connect to server, try connect after 10s");
                    futureListener.channel().eventLoop().schedule(new Runnable() {
                        @Override
                        public void run() {
                            doConncet();
                        }
                    }, 10, TimeUnit.SECONDS);
                }
            }
        });

    }


    private class SimpleClientInitializer extends ChannelInitializer<SocketChannel> {

        private HeartBeatClient client;

        public SimpleClientInitializer(HeartBeatClient client) {
            this.client = client;
        }

        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline \= socketChannel.pipeline();
            pipeline.addLast(new IdleStateHandler(0, 5, 0));
            pipeline.addLast("encoder", new StringEncoder());
            pipeline.addLast("decoder", new StringDecoder());
            pipeline.addLast("handler", new HeartBeatClientHandler(client));
        }
    }


}

运行结果:

1.客户端长时间未发送心跳包,服务端关闭连接

server start ,port: 9817
一个客户端已连接
\===服务端===(READER\_IDLE 读超时)
\===服务端===(READER\_IDLE 读超时)
\===服务端===(READER\_IDLE 读超时)
\===服务端===(READER\_IDLE 读超时)
\===服务端===(读超时,关闭chanel)
一个客户端已断开连接

2.客户端发送心跳包,服务端和客户端保持心跳信息

一个客户端已连接
客户端/127.0.0.1:55436--心跳信息--
客户端/127.0.0.1:55436--心跳信息--
客户端/127.0.0.1:55436--心跳信息--
客户端/127.0.0.1:55436--心跳信息--

3.服务单宕机,断开连接,客户端进行重连

客户端与服务端断开连接,断开的时间为:2018-06-12 23:47:12
Failed to connect to server, try connect after 10s
Failed to connect to server, try connect after 10s
Failed to connect to server, try connect after 10s
connect server successfully

代码地址:

https://gitee.com/ZhongBangKeJi/CRMEB

https://gitee.com/ZhongBangKeJi/crmeb_java

请登录后查看

CRMEB-慕白寒窗雪 最后编辑于2023-10-07 16:44:50

快捷回复
回复({{post_count}}) {{!is_user ? '我的回复' :'全部回复'}}
回复从新到旧

{{item.user_info.nickname ? item.user_info.nickname : item.user_name}}

作者 管理员 企业

{{item.floor}}# 同步到gitee 已同步到gitee {{item.is_suggest==1? '取消推荐': '推荐'}}
{{item.floor}}#
{{item.user_info.title}}
附件

{{itemf.name}}

{{item.created_at}}  {{item.ip_address}}
{{item.like_count}}
{{item.showReply ? '取消回复' : '回复'}}
删除
回复
回复

{{itemc.user_info.nickname}}

{{itemc.user_name}}

作者 管理员 企业

回复 {{itemc.comment_user_info.nickname}}

附件

{{itemf.name}}

{{itemc.created_at}}   {{itemc.ip_address}}
{{itemc.like_count}}
{{itemc.showReply ? '取消回复' : '回复'}}
删除
回复
回复
查看更多
回复
回复
1353
{{like_count}}
{{collect_count}}
添加回复 ({{post_count}})

相关推荐

CRMEB-慕白寒窗雪 作者
社区运营专员---高冷のBoy | 呆萌のGirl

回答

7377

发布

1830

经验

66902

快速安全登录

使用微信扫码登录
{{item.label}} {{item.label}} {{item.label}} 板块推荐 常见问题 产品动态 精选推荐 首页头条 首页动态 首页推荐
加精
取 消 确 定
回复
回复
问题:
问题自动获取的帖子内容,不准确时需要手动修改. [获取答案]
答案:
提交
bug 需求 取 消 确 定

微信登录/注册

切换手机号登录

{{ bind_phone ? '绑定手机' : '手机登录'}}

{{codeText}}
切换微信登录/注册
暂不绑定
CRMEB客服

CRMEB咨询热线 咨询热线

400-8888-794

微信扫码咨询

CRMEB开源商城下载 开源下载 CRMEB官方论坛 帮助文档
返回顶部 返回顶部
CRMEB客服