Dubbo 服务暴露和服务引用都是通过的 com.alibaba.dubbo.rpc.Protocol
来实现的。它是一个 SPI 扩展。
@SPI("dubbo")public interface Protocol { int getDefaultPort(); @AdaptiveExporter export(Invoker invoker) throws RpcException; @Adaptive Invoker refer(Class type, URL url) throws RpcException; void destroy();}
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapperlistener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrappermock=com.alibaba.dubbo.rpc.support.MockProtocoldubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocolinjvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocolrmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocolhessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocolcom.alibaba.dubbo.rpc.protocol.http.HttpProtocolcom.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocolthrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocolmemcached=com.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocolredis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocolrest=com.alibaba.dubbo.rpc.protocol.rest.RestProtocolregistry=com.alibaba.dubbo.registry.integration.RegistryProtocolqos=com.alibaba.dubbo.qos.protocol.QosProtocolWrapper
根据前面 的分析,我们可以知道 Dubbo 会默认使用名称为 "dubbo" 的 Protocol 协议(可以通过配置去 override)。 如果我们通过 Protocol extension = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getDefaultExtension();
去获取 Protocol 的话,则 SPI 扩展文件中的 wrapper 类会包装在 DubboProtocol 实例上。所以,我们获取到的 extension 实际上是一个 Wrappered Extention。
ProtocolFilterWrapper
在 Protocol 的 Extension 中, ProtocolFilterWrapper 为 Protocol 的 export() 和 refer() 方法添加了一层 Filter 扩展,它会在服务引用和服务消费时,为 Invoker 实体域上包装一层层的 Filter 来做代码增强(AOP)。也就是在 Invoker 被执行之前会经过一堆的 Filter 的处理。
如果说 SPI 是 Dubbo 的静态扩展能力的话,那么 Filter 就是 Dubbo 的动态扩展能力。它能通过 URL 中的参数,来动态拼装 filter。
ProtocolFilterWrapper 会获取所有被激活的 Filter(@Activate),然后为 Invoker 构建一条 filter 链。阅读源码,我们会发现,export() 和 refer() 过程中都会构建 filter 链,也就是说,在服务被调用时 consumer 端会先经过一层 filter 的调用,然后通过 netty 调用到 provider 端;然后 provider 端再经过一层filter 之后,再去调用真正的服务接口实现。
public class ProtocolFilterWrapper implements Protocol { private final Protocol protocol; public ProtocolFilterWrapper(Protocol protocol) { if (protocol == null) { throw new IllegalArgumentException("protocol == null"); } this.protocol = protocol; } // 构建 Invoker Chain private staticInvoker buildInvokerChain(final Invoker invoker, String key, String group) { // 获取被激活的 Filter 扩展 Invoker last = invoker; List filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (!filters.isEmpty()) { for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker next = last; last = new Invoker () { @Override public Class getInterface() { return invoker.getInterface(); } @Override public URL getUrl() { return invoker.getUrl(); } @Override public boolean isAvailable() { return invoker.isAvailable(); } @Override public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } @Override public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; } @Override public int getDefaultPort() { return protocol.getDefaultPort(); } @Override public Exporter export(Invoker invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); } @Override public Invoker refer(Class type, URL url) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER); } @Override public void destroy() { protocol.destroy(); }}
SPI 扩展文件 META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Filter 内容:
cache=com.alibaba.dubbo.cache.filter.CacheFiltervalidation=com.alibaba.dubbo.validation.filter.ValidationFilterecho=com.alibaba.dubbo.rpc.filter.EchoFiltergeneric=com.alibaba.dubbo.rpc.filter.GenericFiltergenericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFiltertoken=com.alibaba.dubbo.rpc.filter.TokenFilteraccesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilteractivelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilterclassloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFiltercontext=com.alibaba.dubbo.rpc.filter.ContextFilterconsumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilterexception=com.alibaba.dubbo.rpc.filter.ExceptionFilterexecutelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilterdeprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFiltercompatible=com.alibaba.dubbo.rpc.filter.CompatibleFiltertimeout=com.alibaba.dubbo.rpc.filter.TimeoutFiltertrace=com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilterfuture=com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFiltermonitor=com.alibaba.dubbo.monitor.support.MonitorFilter
我们可以添加自定义的 filter 扩展。
DubboProtocol#export(Invoker<T> invoker)
Dubbo 服务暴露时,首先会创建一个 DubboExporter,然后再通过 netty 开启服务端口监听。
DubboExporter 的作用是缓存 Invoker,方便后续操作获取 Invoker。
publicExporter export(Invoker invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey(url); DubboExporter exporter = new DubboExporter (invoker, key, exporterMap); exporterMap.put(key, exporter); ...... // 开启服务监听 openServer(url); optimizeSerialization(url); return exporter;}
DubboProtocol#refer(Class<T> type,URL url)
Dubbo 服务引用时,首先创建一条与 provider 的 tcp 连接,然后再创建一个 DubboInvoker。
publicInvoker refer(Class serviceType, URL url) throws RpcException { optimizeSerialization(url); // create rpc invoker. 同时,创建一条与 provider 的 tcp 连接 DubboInvoker invoker = new DubboInvoker (serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker;}