目录
一、Nacos 和 Eureka 的区别
1.1、以 Nacos 注册流程来解析区别
一、Nacos 和 Eureka 的区别
1.1、以 Nacos 注册流程来解析区别
a)首先,我们的服务启动时。都会把自己的信息提交给注册中心,然后注册中心就会把信息保存下来.
注册的信息实际上就是一个嵌套 Map,结构为 Map<String, Map<String, Service>>,第一层 key 就是 namespace_id,起到环境隔离的作用. value 由是一个嵌套 Map<String, Service>.
第二层的 key 表示 group 分组,key 就是分组名,value 就是分组下的某一个服务,实际上就是一个类,内部又维护了一个 Map<String,Cluster> .
第三层的 key 就是集群的名称,value 就是 Cluster ,也是一个类,包含了集群的具体信息.
因为一个集群中可能包含多个实例,也就是具体的节点信息(例如实例的IP、Port、健康状态),那么 Cluster 这个类中又维护了 两个 Set<Instance>,分别是临时实例和非临时实例(此处,Eureka 就没有做区分,只有临时实例).
b)那么当服务消费者要去消费时,就可以从注册中心拉取服务信息. 这个过程也被称为“服务发现”. 但是他这个拉去动作不是每次都要做的(压力太大),而是将拉取到的服务信息缓存到一个列表中,这样接下来的一段时间里,就不用去拉去了,而是直接从缓存列表中拿.
当然这个缓存一直不更新也不行,因此会每隔 30 秒取重新拉取一次(多长时间不用记,都是可以配置的),进行更新.
c)消费者拿到服务列表后,就可以通过 负载均衡(LoadBalancer)从列表中挑选一个发起远程调用就可以了.
d)截至目前为止,Nacos 和 Eureka 还没什么太大的差别,那差别在哪呢?差别就在于服务提供者的健康检测机制.
e)在 nacos 中,将服务分成了临时实例和非临时实例:
- 临时实例:当临时实例进行心跳检测的时候,如果心跳不跳了,nacos 就会把它从服务中直接剔除. (这里的心跳检测机制和 Eureka 是一样的,但非临时实例就不一样了)
- 非临时实例:nacos 就不会要求你给我发心跳了,而是通过 nacos 主动发请求询问,定时向实例发送请求:“你还活着吗?”,即使真的挂了,nacos 也仅仅只是把它标记为 不健康,不会剔除,而是等待它恢复健康.
而 Eureka 只提供了心跳模式的健康监测,而没有主动检测功能。
主动询问进行健康检测,效率岂不是很低?
对于非临时实例,所有的健康检测任务都不是立即执行的,都会被放入一个阻塞队列中,如下源码
@Override public void process(HealthCheckTask task) { // 获取所有 非临时实例的 集合 List<Instance> ips = task.getCluster().allIPs(false); if (CollectionUtils.isEmpty(ips)) { return; } for (Instance ip : ips) { // 封装健康检测信息到 Beat Beat beat = new Beat(ip, task); // 放入一个阻塞队列中 taskQueue.add(beat); MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet(); } }
可以看出,检测任务不是立即执行,这里也采用了异步指定的策略,会把任务放到线程池中取执行,如下:
public void run() { while (true) { try { // 处理任务 processTask(); // ... } catch (Throwable e) { SRV_LOG.error("[HEALTH-CHECK] error while processing NIO task", e); } } }
通过 processTask 来处理健康检测的任务:
private void processTask() throws Exception { // 将任务封装为一个 TaskProcessor,并放入集合 Collection<Callable<Void>> tasks = new LinkedList<>(); do { Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS); if (beat == null) { return; } tasks.add(new TaskProcessor(beat)); } while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64); // 批量处理集合中的任务 for (Future<?> f : GlobalExecutor.invokeAllTcpSuperSenseTask(tasks)) { f.get(); } }
任务被封装到了TaskProcessor中去执行了,TaskProcessor是一个Callable,其中的call方法:
@Override public Void call() { // 获取检测任务已经等待的时长 long waited = System.currentTimeMillis() - beat.getStartTime(); if (waited > MAX_WAIT_TIME_MILLISECONDS) { Loggers.SRV_LOG.warn("beat task waited too long: " + waited + "ms"); } SocketChannel channel = null; try { // 获取实例信息 Instance instance = beat.getIp(); // 通过NIO建立TCP连接 channel = SocketChannel.open(); channel.configureBlocking(false); // only by setting this can we make the socket close event asynchronous channel.socket().setSoLinger(false, -1); channel.socket().setReuseAddress(true); channel.socket().setKeepAlive(true); channel.socket().setTcpNoDelay(true); Cluster cluster = beat.getTask().getCluster(); int port = cluster.isUseIPPort4Check() ? instance.getPort() : cluster.getDefCkport(); channel.connect(new InetSocketAddress(instance.getIp(), port)); // 注册连接、读取事件 SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ); key.attach(beat); keyMap.put(beat.toString(), new BeatKey(key)); beat.setStartTime(System.currentTimeMillis()); GlobalExecutor .scheduleTcpSuperSenseTask(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS); } catch (Exception e) { beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(), "tcp:error:" + e.getMessage()); if (channel != null) { try { channel.close(); } catch (Exception ignore) { } } } return null; }
这差别就像是亲生儿子和非亲生儿子,亲生儿子我还会去主动关怀一下,诶,你还活着么?而非临时实例,就是你不心跳了,就把你扔了~
f)还有一个差别在于服务消费者,Eureka 采用的是定时拉取(每 30 秒一次),那如果在 30 秒内服务提供者挂了,消费肯定是不知道的,因此 Eureka 这里更新的时效性也比较差.
我们的 微服务 定时拉取的基本逻辑就是先从本地缓存读:
- 如果本地缓存没有,就通过 Nacos 客户端构造请求去 nacos 服务器中读取.
- 如果本地缓存有,就开启定时更新功能(就是创建一个定时任务,每隔一段时间去拉取一次),并返回缓存结果.
核心源码如下:
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); // 由 服务名@@集群名拼接 key String key = ServiceInfo.getKey(serviceName, clusters); if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } // 读取本地服务列表的缓存,缓存是一个Map,格式:Map<String, ServiceInfo> ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); // 判断缓存是否存在 if (null == serviceObj) { // 不存在,创建空ServiceInfo serviceObj = new ServiceInfo(serviceName, clusters); // 放入缓存 serviceInfoMap.put(serviceObj.getKey(), serviceObj); // 放入待更新的服务列表(updatingMap)中 updatingMap.put(serviceName, new Object()); // 立即更新服务列表 updateServiceNow(serviceName, clusters); // 从待更新列表中移除 updatingMap.remove(serviceName); } else if (updatingMap.containsKey(serviceName)) { // 缓存中有,但是需要更新 if (UPDATE_HOLD_INTERVAL > 0) { // hold a moment waiting for update finish 等待5秒中,待更新完成 synchronized (serviceObj) { try { serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) { NAMING_LOGGER .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); } } } } // 开启定时更新服务列表的功能 scheduleUpdateIfAbsent(serviceName, clusters); // 返回缓存中的服务信息 return serviceInfoMap.get(serviceObj.getKey()); }
定时更新方法如下:
public void updateService(String serviceName, String clusters) throws NacosException { ServiceInfo oldService = getServiceInfo0(serviceName, clusters); try { // 基于ServerProxy发起远程调用,查询服务列表 String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false); if (StringUtils.isNotEmpty(result)) { // 处理查询结果 processServiceJson(result); } } finally { if (oldService != null) { synchronized (oldService) { oldService.notifyAll(); } } } } public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException { // 准备请求参数 final Map<String, String> params = new HashMap<String, String>(8); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put("clusters", clusters); params.put("udpPort", String.valueOf(udpPort)); params.put("clientIP", NetUtils.localIP()); params.put("healthyOnly", String.valueOf(healthyOnly)); // 发起请求,地址与API接口一致 return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET); }
而 nacos 这里的消费者不光进行服务的定时拉取,nacos 还会主动进行消息的订阅推送,一旦发现有服务挂了,就立刻推送一条消息给服务消费者,告诉你服务要更新了.
Nacos 具体是通过什么实现消息订阅推送机制呢?
a)首先 PushPeceiver 这个类(我们自己的微服务配置的 Nacos 客户端),会以 UDP 的方式与 Nacos 服务端建立连接,监听 Nacos 服务端推送的服务变更数据.
b)一旦 Nacos 服务列表发生变更,就会发送 UDP 广播给所有的微服务订阅者.
c)当订阅者接收到通知以后,就可以将接收到的服务信息缓存到本地缓存列表.
d)那么之后再拉取服务的时候,会优先从缓存里读取,缓存里有就直接返回缓存,如果没有,再去拉取或者订阅.
PushPeceiver 构造函数如下:
public PushReceiver(HostReactor hostReactor) { try { this.hostReactor = hostReactor; // 创建 UDP客户端 String udpPort = getPushReceiverUdpPort(); if (StringUtils.isEmpty(udpPort)) { this.udpSocket = new DatagramSocket(); } else { this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort))); } // 准备线程池 this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.push.receiver"); return thread; } }); // 开启线程任务,准备接收变更数据 this.executorService.execute(this); } catch (Exception e) { NAMING_LOGGER.error("[NA] init udp socket failed", e); } }
PushReceiver 构造函数中基于线程池来运行任务。这是因为 PushReceiver 本身也是一个Runnable,其中的run方法业务逻辑就是:
@Override public void run() { while (!closed) { try { // byte[] is initialized with 0 full filled by default byte[] buffer = new byte[UDP_MSS]; DatagramPacket packet = new DatagramPacket(buffer, buffer.length); // 接收推送数据 udpSocket.receive(packet); // 解析为json字符串 String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim(); NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString()); // 反序列化为对象 PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class); String ack; if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) { // 交给 HostReactor去处理 hostReactor.processServiceJson(pushPacket.data); // send ack to server 发送ACK回执,略。。 } catch (Exception e) { if (closed) { return; } NAMING_LOGGER.error("[NA] error while receiving push data", e); } } }
通知数据的处理由交给了
public ServiceInfo processServiceJson(String json) { // 解析出ServiceInfo信息 ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class); String serviceKey = serviceInfo.getKey(); if (serviceKey == null) { return null; } // 查询缓存中的 ServiceInfo ServiceInfo oldService = serviceInfoMap.get(serviceKey); // 如果缓存存在,则需要校验哪些数据要更新 boolean changed = false; if (oldService != null) { // 拉取的数据是否已经过期 if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) { NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: " + serviceInfo.getLastRefTime()); } // 放入缓存 serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); // 中间是缓存与新数据的对比,得到newHosts:新增的实例;remvHosts:待移除的实例; // modHosts:需要修改的实例 if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) { // 发布实例变更的事件 NotifyCenter.publishEvent(new InstancesChangeEvent( serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); DiskCache.write(serviceInfo, cacheDir); } } else { // 本地缓存不存在 changed = true; // 放入缓存 serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); // 直接发布实例变更的事件 NotifyCenter.publishEvent(new InstancesChangeEvent( serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts())); serviceInfo.setJsonFromServer(json); DiskCache.write(serviceInfo, cacheDir); } // 。。。 return serviceInfo; }