博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
分析dubbo心跳检测机制
阅读量:6153 次
发布时间:2019-06-21

本文共 5617 字,大约阅读时间需要 18 分钟。

  • 目的: 
  • 维持provider和consumer之间的长连接
  • 实现: 
  • dubbo心跳时间heartbeat默认是60s,超过heartbeat时间没有收到消息,就发送心跳消息(provider,consumer一样),如果连着3次(heartbeatTimeout为heartbeat*3)没有收到心跳响应,provider会关闭channel,而consumer会进行重连;不论是provider还是consumer的心跳检测都是通过启动定时任务的方式实现;
  • provider绑定和consumer连接的入口:
public class HeaderExchanger implements Exchanger {    public static final String NAME = "header";    @Override    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);    }    @Override    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));    }}
  • provider启动心跳检测:
public HeaderExchangeServer(Server server) {        if (server == null) {            throw new IllegalArgumentException("server == null");        }        this.server = server;        this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);        //心跳超时时间默认为心跳时间的3倍        this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);        //如果心跳超时时间小于心跳时间的两倍则抛异常        if (heartbeatTimeout < heartbeat * 2) {            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");        }        startHeatbeatTimer();    }
  • startHeatbeatTimer的实现 
  • 先停止已有的定时任务,启动新的定时任务:
private void startHeatbeatTimer() {        // 停止原有定时任务        stopHeartbeatTimer();        // 发起新的定时任务        if (heartbeat > 0) {            heatbeatTimer = scheduled.scheduleWithFixedDelay(                    new HeartBeatTask(new HeartBeatTask.ChannelProvider() {                        public Collection
getChannels() { return Collections.unmodifiableCollection(HeaderExchangeServer.this.getChannels()); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat, TimeUnit.MILLISECONDS); } }
  • HeartBeatTask的实现 
  • 遍历所有的channel,检测心跳间隔,如果超过心跳间隔没有读或写,则发送需要回复的心跳消息,最有判断是否心跳超时(heartbeatTimeout),如果超时,provider关闭channel,consumer进行重连
public void run() {        try {            long now = System.currentTimeMillis();            for (Channel channel : channelProvider.getChannels()) {                if (channel.isClosed()) {                    continue;                }                try {                    Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP);                    Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);                    // 读写的时间,任一超过心跳间隔,发送心跳                    if ((lastRead != null && now - lastRead > heartbeat)                            || (lastWrite != null && now - lastWrite > heartbeat)) {                        Request req = new Request();                        req.setVersion("2.0.0");                        req.setTwoWay(true); // 需要响应的心跳事件                        req.setEvent(Request.HEARTBEAT_EVENT);                        channel.send(req);                        if (logger.isDebugEnabled()) {                            logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()                                    + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");                        }                    }                    // 最后读的时间,超过心跳超时时间                    if (lastRead != null && now - lastRead > heartbeatTimeout) {                        logger.warn("Close channel " + channel                                + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");                        // 客户端侧,重新连接服务端                        if (channel instanceof Client) {                            try {                                ((Client) channel).reconnect();                            } catch (Exception e) {                                //do nothing                            }                        // 服务端侧,关闭客户端连接                        } else {                            channel.close();                        }                    }                } catch (Throwable t) {                    logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);                }            }        } catch (Throwable t) {            logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);        }    }
  • consumer端的实现 
  • 默认需要心跳检测
public HeaderExchangeClient(Client client, boolean needHeartbeat) {        if (client == null) {            throw new IllegalArgumentException("client == null");        }        this.client = client;        // 创建 HeaderExchangeChannel 对象        this.channel = new HeaderExchangeChannel(client);        // 读取心跳相关配置        String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);        this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);        this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);        if (heartbeatTimeout < heartbeat * 2) { // 避免间隔太短            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");        }        // 发起心跳定时器        if (needHeartbeat) {            startHeatbeatTimer();        }    }

我们可以看到dubbo的心跳检测,服务端会发送发送心跳包,客户端也会发送心跳包,与一般只有客户端发送心跳包,服务端接受心跳是有所不同的。

转载于:https://www.cnblogs.com/cat520/p/9412062.html

你可能感兴趣的文章
CSS引入的方式有哪些? link和@import的区别?
查看>>
Redis 介绍2——常见基本类型
查看>>
asp.net开发mysql注意事项
查看>>
(转)Cortex-M3 (NXP LPC1788)之EEPROM存储器
查看>>
ubuntu set defult jdk
查看>>
[译]ECMAScript.next:TC39 2012年9月会议总结
查看>>
【Xcode】编辑与调试
查看>>
用tar和split将文件分包压缩
查看>>
[BTS] Could not find stored procedure 'mp_sap_check_tid'
查看>>
PLSQL DBMS_DDL.ALTER_COMPILE
查看>>
Activity生命周期
查看>>
高仿UC浏览器弹出菜单效果
查看>>
Ubuntu忘记密码,进不了系统的解决方法
查看>>
[原创]白盒测试技术思维导图
查看>>
<<Information Store and Management>> 读书笔记 之八
查看>>
Windows 8 开发之设置合约
查看>>
闲说HeartBeat心跳包和TCP协议的KeepAlive机制
查看>>
MoSQL
查看>>
Hibernate多对一外键单向关联(Annotation配置)
查看>>
《CLR via C#》读书笔记 之 方法
查看>>