30 Filter Interface Expansion Dubbo Framework's Common Techniques Guiding North

30 Filter Interface Expansion Dubbo Framework’s Common Techniques Guiding North #

In the previous lesson (Lesson 27), we introduced the implementation of ProtocolFilterWrapper. Let’s briefly review it here. In the buildInvokerChain() method, ProtocolFilterWrapper loads the Filter implementation classes provided by Dubbo and the application, and constructs a Filter chain. Finally, it adds the logic to execute the Filter chain on top of the original Invoker object using the decorator pattern.

The assembly logic of the Filter chain is designed to be very flexible. You can manually exclude the Filters provided by Dubbo by configuring them with “-”, or replace the Filters provided by Dubbo with “default”. This allows you to control which Filters to load and the exact execution order of the Filters.

Filter is the preferred solution for extending Dubbo’s functionality, and Dubbo itself provides a large number of Filter implementations to extend its functionality. After reviewing the general logic of loading Filters in ProtocolFilterWrapper, in this lesson we will delve into the various built-in Filter implementations in Dubbo, as well as the method of extending Dubbo with custom Filters.

Before we introduce the implementations of the Filter interface, we need to understand the position of the Filter in the Dubbo architecture. This will clarify where the Filter chain handles requests/responses, as shown in the red box in the following diagram:

Lark20201106-191028.png

Position of the Filter in the Dubbo architecture

ConsumerContextFilter #

ConsumerContextFilter is a very simple implementation of the Filter on the consumer side. It records some state information of the local invocation in the current RpcContext (which will be recorded in the RpcContext corresponding to LOCAL), such as the relevant Invoker, Invocation, and the local and remote addresses of the invocation. Here is the specific implementation:

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

    RpcContext context = RpcContext.getContext();

    context.
        setInvoker(invoker) // records Invoker
        .setInvocation(invocation) // records Invocation
        // records local and remote addresses
        .setLocalAddress(NetUtils.getLocalHost(), 0)
        .setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort())
        // records remote application name and other information
        .setRemoteApplicationName(invoker.getUrl().getParameter(REMOTE_APPLICATION_KEY))
        .setAttachment(REMOTE_APPLICATION_KEY, invoker.getUrl().getParameter(APPLICATION_KEY));

    if (invocation instanceof RpcInvocation) {
        ((RpcInvocation) invocation).setInvoker(invoker);
    }

    // check if it is timed out
    Object countDown = context.get(TIME_COUNTDOWN_KEY);
    if (countDown != null) {
        TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countDown;
        if (timeoutCountDown.isExpired()) {
            return AsyncRpcResult.newDefaultAsyncResult(new RpcException("...."), invocation);
        }
    }

    return invoker.invoke(invocation);
}

The TimeoutCountDown object used here is used to check if the current invocation has timed out. It has three fields:

  • timeoutInMillis (of type long): the timeout time in milliseconds.
  • deadlineInNanos (of type long): the timestamp for the timeout in nanoseconds.
  • expired (of type boolean): indicates whether the call associated with this TimeoutCountDown has expired.

In the TimeoutCountDown.isExpired() method, the current time is compared with the timestamp of the timeout recorded in the deadlineInNanos field. As seen in the above logic, if the request times out, no remote call will be made and the AsyncRpcResult will end in an exception.

ActiveLimitFilter #

ActiveLimitFilter is used on the consumer side to limit the concurrent invocations of a consumer to a server-side method. It can also be called “client-side flow control”. Let’s take a look at the specific implementation of ActiveLimitFilter:

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

    URL url = invoker.getUrl(); // get the URL object
    String methodName = invocation.getMethodName(); // get the method name

    // get the maximum number of concurrent invocations
    int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0);

    // get the status information of the method
    final RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());

    if (!RpcStatus.beginCount(url, methodName, max)) { // attempt to increment the concurrency
        long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), TIMEOUT_KEY, 0);
        long start = System.currentTimeMillis();
        long remain = timeout;

        synchronized (rpcStatus) { // lock
            while (!RpcStatus.beginCount(url, methodName, max)) { // attempt to increment the concurrency again
                rpcStatus.wait(remain); // the current thread is blocked, waiting for the concurrency to decrease

                // check if it timed out
                long elapsed = System.currentTimeMillis() - start;
                remain = timeout - elapsed;
                if (remain <= 0) {
                    throw new RpcException(...);
                }
            }
        }

}

// Add an attribute

invocation.put(ACTIVELIMIT_FILTER_START_TIME, System.currentTimeMillis());

return invoker.invoke(invocation);

}

From the code of the ActiveLimitFilter.invoke() method, it can be seen that its core implementation is closely related to the RpcStatus object. RpcStatus maintains two collections:

  • SERVICE_STATISTICS collection (ConcurrentMap type), which records the status information of each service called by the current consumer. The key is the URL and the value is the corresponding RpcStatus object;
  • METHOD_STATISTICS collection (ConcurrentMap<String, RpcStatus> type), which records the status information of each method of the service called by the current consumer. The first level key is the URL, the second level key is the method name, and the third level is the corresponding RpcStatus object.

RpcStatus collects a lot of call-related information, and its core fields include the following:

  • active (AtomicInteger type): the current concurrency level. This is also the concurrency level of interest in ActiveLimitFilter.
  • total (AtomicLong type): the total number of calls.
  • failed (AtomicInteger type): the number of failed calls.
  • totalElapsed (AtomicLong type): the total time consumed by all calls.
  • failedElapsed (AtomicLong type): the total time consumed by all failed calls.
  • maxElapsed (AtomicLong type): the longest time consumed by all calls.
  • failedMaxElapsed (AtomicLong type): the longest time consumed by all failed calls.
  • succeededMaxElapsed (AtomicLong type): the longest time consumed by all successful calls.

In addition, RpcStatus provides getter/setter methods for these fields, which are used for reading and writing the values of these fields. I will not analyze them in detail here.

The beginCount() method in RpcStatus is executed before the remote call starts. It obtains the RpcStatus objects corresponding to the service and service method from the SERVICE_STATISTICS and METHOD_STATISTICS collections, respectively, and increments their active fields. The implementation is as follows:

public static boolean beginCount(URL url, String methodName, int max) {

    max = (max <= 0) ? Integer.MAX_VALUE : max;

    // Get the RpcStatus object corresponding to the service

    RpcStatus appStatus = getStatus(url); 

    // Get the RpcStatus object corresponding to the service method

    RpcStatus methodStatus = getStatus(url, methodName);

    if (methodStatus.active.get() == Integer.MAX_VALUE) { // Concurrency overflow

        return false;

    }

    for (int i; ; ) {

        i = methodStatus.active.get();

        if (i + 1 > max) { // Concurrency exceeds the max limit, directly return false

            return false;

        }

        if (methodStatus.active.compareAndSet(i, i + 1)) { // CAS operation

            break; // Exit the current loop after successful update

        }

    }

    appStatus.active.incrementAndGet(); // Increment the concurrency level of a single service

    return true;

}

In addition to inheriting the Filter interface, ActiveLimitFilter also inherits the Filter.Listener internal interface. In the implementation of its onResponse() method, it not only calls the RpcStatus.endCount() method to complete the statistics of call monitoring, but also calls the notifyFinish() method to awaken the thread blocked on the corresponding RpcStatus object. The implementation is as follows:

public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {

    String methodName = invocation.getMethodName(); // Get the name of the called method

    URL url = invoker.getUrl();

    int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0);

    // Call the RpcStatus.endCount() method to complete the statistics of call monitoring

    RpcStatus.endCount(url, methodName, getElapsed(invocation), true);

    // Call the notifyFinish() method to awaken the thread blocked on the corresponding RpcStatus object

    notifyFinish(RpcStatus.getStatus(url, methodName), max);

}

In the RpcStatus.endCount() method, all the fields in the RpcStatus objects of the service and service method are updated to complete the statistics:

private static void endCount(RpcStatus status, long elapsed, boolean succeeded) {

    status.active.decrementAndGet(); // Request completed, decrease concurrency level

    status.total.incrementAndGet(); // Increase the total number of calls

    status.totalElapsed.addAndGet(elapsed); // Increase the total time consumed by calls

    if (status.maxElapsed.get() < elapsed) { // Update the maximum time consumed

        status.maxElapsed.set(elapsed);

    }

    if (succeeded) { // If this call is successful, update the maximum time consumed by successful calls

        if (status.succeededMaxElapsed.get() < elapsed) {

            status.succeededMaxElapsed.set(elapsed);

        }

    } else { // If this call fails, update the maximum time consumed by failed calls

        status.failed.incrementAndGet();
    LOG_SCHEDULED.scheduleAtFixedRate(() -> {
    
        for (Map.Entry<String, Set<AccessLogData>> entry : LOG_ENTRIES.entrySet()) {
    
            String key = entry.getKey(); // 获取KEY
    
            Set<AccessLogData> logSet = entry.getValue(); // 获取VALUE
    
            if (!logSet.isEmpty()) { // 将logSet中的日志写入文件
    
                writeLogSetToFile(key, logSet);
    
            }
    
        }
    
    }, LOG_FLUSH_DELAY, LOG_FLUSH_PEROID, TimeUnit.MILLISECONDS);

AccessLogFilter 的 Listener 接口提供了两个方法,分别是 onResponse() 和 onError()。在 AccessLogFilter 的 onResponse() 方法中,会根据配置的 ACCESS_LOG_KEY,将 SERVER_REMOTE 类型的 RpcContext 中的附加信息,添加到 AppResponse 的 attachments 字段中。这样在 Consumer 端,通过 AppResponse.getAttachments() 可以获取到 Provider 端 RpcContext 中的信息。

在 onError() 方法中,如果配置了 ACCESS_LOG_KEY,会把错误的调用信息写入文件中,方便开发者进行排查。

TraceFilter #

TraceFilter 是一个链路追踪的 Filter。Dubbo 使用的链路追踪工具是 Apache Dubbo 的一个孵化项目,称作 Apache SkyWalking。TraceFilter 的主要功能就是在服务调用的过程中,记录和传递链路追踪信息。

TraceFilter 的核心逻辑在 invoke() 方法中,它先从 RpcContext 中获取 TraceId(如果已经存在,说明是 Consumer 端传递过来的),如果不存在,则创建一个 TraceId 对象。随后将 TraceId 对象封装到 Invocation 中,并调用下一个 Filter 的 invoke() 方法。当方法调用结束后,再从 RpcContext 中获取 Invocation 对象中的 TraceId,将其写入响应结果中。具体逻辑如下:

    public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
        // 获取TraceId
        TraceId traceId = initTraceId(inv, RpcContext.getContext().getAttachment(TraceConstants.CLIENT_ATTACHMENT_KEY));
        // 设置TraceId到Invocation对象中
        inv.setAttachment(TraceConstants.CLIENT_ATTACHMENT_KEY, traceId);
    
        // 调用下一个Filter
        Result result = invoker.invoke(inv);
        
        // 将TraceId写入Result中
        if (result != null && result.getAttachments() != null) {
            result.getAttachments().put(TraceConstants.CLIENT_ATTACHMENT_KEY, traceId);
        }
    
        return result;
    }

EchoFilter #

EchoFilter 是一个心跳检测的 Filter。Dubbo 在调用服务前会发起一个空的请求,称作 Echo 请求,用于测试服务是否可用。EchoFilter 的主要功能就是检测请求是否是 Echo 请求,如果是则直接返回一个空的响应结果。

EchoFilter 的 invoke() 方法的实现很简单,只需要判断 Invocation 中的方法是否为 EchoService.echo,并根据判断结果返回相应的响应结果即可。具体逻辑如下:

    public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
        // 判断是否是Echo请求
        if (inv.getMethodName().equals(Constants.$ECHO) && inv.getArguments() != null && inv.getArguments().length == 0) {
            // 是Echo请求,直接返回一个空的结果
            return new RpcResult();
        } else {
            // 调用下一个Filter
            return invoker.invoke(inv);
        }
    }

TokenFilter #

TokenFilter 是一个身份认证的 Filter。如果提供者提供的服务需要身份认证,那么消费者在调用服务之前,需要先进行身份认证,获得有效的 Token。

TokenFilter 的核心逻辑在 invoke() 方法中。它从 RpcContext 中获取该请求的 Token,然后根据 Token 是否为空进行判断。如果 Token 为空,则抛出 RpcException 异常,表示身份认证失败。否则,继续调用下一个 Filter 的 invoke() 方法。具体逻辑如下:

    public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
        // 获取Token
        String token = RpcContext.getContext().getAttachment(Constants.TOKEN_KEY);
        // 判断Token是否为空
        if (StringUtils.isEmpty(token)) {
            throw new RpcException(RpcException.NO_INVOKER_AVAILABLE_EXCEPTION, 
                "Failed to invoke service " + invoker.getInterface().getName() + 
                "." + inv.getMethodName() + " because token is empty.");
        } else {
            // 调用下一个Filter
            return invoker.invoke(inv);
        }
    }

参数过滤器总结 #

通过上面的介绍,我们了解到 Dubbo 中的参数过滤器主要用于实现一些通用的逻辑,例如初始化 RpcContext、记录访问日志、实现链路追踪、实现心跳检测等。在 Dubbo 中,开发者可以根据自己的需求扩展这些过滤器,增加一些其他的功能。

除了这几个内置的过滤器之外,Dubbo 还提供了一些其他的内置过滤器。例如,MonitorFilter 用于监控 Dubbo 服务的调用次数和调用时间;ExceptionFilter 用于处理 Dubbo 服务中的异常;ClassLoaderFilter 用于记录 Dubbo 服务调用过程中的类加载信息。这些过滤器都是非常有用且常用的,在实际的开发中,可以根据需求将它们应用到 Dubbo 的服务中。 LOG_SCHEDULED.scheduleWithFixedDelay( this::writeLogToFile, LOG_OUTPUT_INTERVAL, LOG_OUTPUT_INTERVAL, TimeUnit.MILLISECONDS); }

To help you better understand this part of the content, let’s take a look at Dubbo’s support for various logging frameworks. In the processWithServiceLogger() method, we can see that Dubbo supports various third-party logging frameworks through LoggerFactory:

private void processWithServiceLogger(Set<AccessLogData> logSet) {

    for (Iterator<AccessLogData> iterator = logSet.iterator();
        iterator.hasNext();
        iterator.remove()) { // Iterate over the logSet collection

        AccessLogData logData = iterator.next();

        // Get logger object from LoggerFactory and write log
        LoggerFactory.getLogger(LOG_KEY + "." + logData.getServiceName()).info(logData.getLogMessage());
    }
}

In LoggerFactory, a LOGGERS collection (of type Map) is maintained, which contains all currently used FailsafeLogger objects. The FailsafeLogger object encapsulates a Logger object, which is a custom interface defined by Dubbo. Dubbo provides an implementation of the Logger interface for each third-party framework. Here’s an example of Log4j2Logger:

public class Log4j2Logger implements Logger {

    // Maintains a Logger object from the log4j framework, implementing the adapter functionality
    private final org.apache.logging.log4j.Logger logger;

    public Log4j2Logger(org.apache.logging.log4j.Logger logger) {
        this.logger = logger;
    }

    @Override
    public void info(String msg, Throwable e) {
        logger.info(msg, e); // Directly invoke the log4j framework's Logger to write logs
    }

    // Other overloads of the info() method are omitted, as well as error, trace, warn, debug, and other methods
}

In the LoggerFactory.getLogger() method, the LOGGER_ADAPTER field (of type LoggerAdapter) is used to get the Logger implementation object:

public static Logger getLogger(String key) {
    return LOGGERS.computeIfAbsent(key, k ->
        new FailsafeLogger(LOGGER_ADAPTER.getLogger(k)));
}

The LOGGER_ADAPTER field is initialized through SPI mechanism in LoggerFactory.setLogger() method:

public static void setLoggerAdapter(String loggerAdapter) {
    if (loggerAdapter != null && loggerAdapter.length() > 0) {
        setLoggerAdapter(ExtensionLoader.getExtensionLoader(LoggerAdapter.class).getExtension(loggerAdapter));
    }
}

LoggerAdapter is marked with the @SPI annotation and is an extension interface. Each third-party framework has a corresponding implementation of LoggerAdapter. It is used to create the respective Dubbo Logger implementation objects. Here’s an example of Log4j2LoggerAdapter:

public class Log4j2LoggerAdapter implements LoggerAdapter {

    @Override
    public Logger getLogger(String key) { // Create Log4j2Logger adapter
        return new Log4j2Logger(LogManager.getLogger(key));
    }
}

ClassLoaderFilter #

ClassLoaderFilter is a Filter implementation on the Provider side, which is responsible for switching class loaders.

In the ClassLoaderFilter.invoke() method, it first gets the contextClassLoader associated with the current thread and then sets its ContextClassLoader to invoker.getInterface().getClassLoader(), which is the class loader used to load the service interface class. It then executes the invoker.invoke() method, which executes the subsequent Filter logic and business logic. Finally, it resets the contextClassLoader associated with the current thread to the original contextClassLoader. Here’s the core logic of ClassLoaderFilter:

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    ClassLoader ocl = Thread.currentThread().getContextClassLoader();

    // Update the current thread's binding ClassLoader
    Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader());

    try {
        // Execute the subsequent Filter logic and business logic
        // ...
    } finally {
        // Reset the current thread's contextClassLoader
        Thread.currentThread().setContextClassLoader(ocl);
    }
}
return invoker.invoke(invocation);

} finally {

Thread.currentThread().setContextClassLoader(ocl);

}

}


### ExecuteLimitFilter

**ExecuteLimitFilter is the implementation of rate limiting in the Dubbo provider side**, corresponding to the ActiveLimitFilter implementation in the consumer side. The core implementation of ExecuteLimitFilter is similar to ActiveLimitFilter, depending on the RpcStatus beginCount() and endCount() methods to increase and decrease the RpcStatus.active field. The specific implementation is as follows:

```java
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

URL url = invoker.getUrl();

String methodName = invocation.getMethodName();

int max = url.getMethodParameter(methodName, EXECUTES_KEY, 0);

// Increase the value of "active" and throw an exception directly if the concurrency reaches the threshold specified by the "executes" configuration

if (!RpcStatus.beginCount(url, methodName, max)) {

throw new RpcException("...");

}

invocation.put(EXECUTE_LIMIT_FILTER_START_TIME, System.currentTimeMillis());

return invoker.invoke(invocation); // Execute subsequent filters and business logic

}

ExecuteLimitFilter also implements the internal Listener interface of Filter. The onResponse() and onError() methods will call the RpcStatus.endCount() method to decrease the value of “active” and complete the statistics of a call. The specific implementation is relatively simple and will not be shown here.

TimeoutFilter #

In the previous introduction of ConsumerContextFilter, it can be seen that if TimeoutCountDown is configured in RpcContext through TIME_COUNTDOWN_KEY, TimeoutCountDown will be checked to determine whether this request has timed out. Then, in the doInvoker() method implementation of DubboInvoker, it can be seen that calculateTimeout() method is called before sending the request to determine how long the request will be valid:

private int calculateTimeout(Invocation invocation, String methodName) {

Object countdown = RpcContext.getContext().get(TIME_COUNTDOWN_KEY);

int timeout = DEFAULT_TIMEOUT;

if (countdown == null) { // If TIME_COUNTDOWN_KEY is not specified in RpcContext, use the timeout configuration

// Get the timeout duration specified by the timeout configuration, with a default value of 1 second

timeout = (int) RpcUtils.getTimeout(getUrl(), methodName, RpcContext.getContext(), DEFAULT_TIMEOUT);

if (getUrl().getParameter(ENABLE_TIMEOUT_COUNTDOWN_KEY, false)) {

// If ENABLE_TIMEOUT_COUNTDOWN_KEY is enabled, pass the timeout value to the provider through TIMEOUT_ATTACHENT_KEY

invocation.setObjectAttachment(TIMEOUT_ATTACHENT_KEY, timeout);

}

} else { 

// If the timeout is specified by TIME_COUNTDOWN_KEY in the current RpcContext, use the specified value as the timeout

TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countdown;

timeout = (int) timeoutCountDown.timeRemaining(TimeUnit.MILLISECONDS);

// Put the remaining timeout in the attachment and pass it to the provider

invocation.setObjectAttachment(TIMEOUT_ATTACHENT_KEY, timeout);

}

return timeout;

}

When the request reaches the provider, the ContextFilter will restore the attachment of RpcContext based on the attachment in Invocation, which includes TIMEOUT_ATTACHENT_KEY (the corresponding Value will be restored to the TimeoutCountDown object.

TimeoutFilter is another filter implementation in the provider side that involves the timeout duration. Its invoke() method implementation is relatively simple and directly forwards the request to the subsequent filters. In the onResponse() method implementation of TimeoutFilter, the TimeoutCountDown object is acquired from RpcContext and check whether the request has timed out. If the request has timed out, the result in AppResponse will be cleared, and a warning log will be printed. The specific implementation is as follows:

public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {

Object obj = RpcContext.getContext().get(TIME_COUNTDOWN_KEY);

if (obj != null) {

TimeoutCountDown countDown = (TimeoutCountDown) obj;

if (countDown.isExpired()) { // Check if the result has timed out

((AppResponse) appResponse).clear(); // Clear the result information

if (logger.isWarnEnabled()) {

logger.warn("...");

}

}

}

}

TpsLimitFilter #

TpsLimitFilter is the implementation of TPS rate limiting in the Dubbo provider side. TpsLimitFilter maintains an object of type TPSLimiter, and its default implementation is DefaultTPSLimiter, which controls the upper limit of TPS in the provider side. The specific implementation of TpsLimitFilter.invoke() method is as follows:

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

// If it exceeds the rate limit, throw an exception directly

if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {

throw new RpcException("...");

}
}
```java
    
    
            .weakKeys()
    
            .build(new CacheLoader<Class<?>, String>() {
    
                @Override
    
                public String load(Class<?> key) throws Exception {
    
                    return getVersion(key);
    
                }
    
            });
    
        @Override
    
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    
            // 获取接口所在jar包的版本号
    
            Class<?> apiClass = invoker.getInterface();
    
            String version = versionCache.getUnchecked(apiClass);
    
            // 将版本号作为attachment发送到Provider端
    
            invocation.setAttachment(JAR_VERSION_NAME_KEY, version);
    
            return invoker.invoke(invocation);
    
        }
    
        private String getVersion(Class<?> apiClass) {
    
            // 获取接口所在的jar包
    
            String jarPath = apiClass.getProtectionDomain().getCodeSource().getLocation().getPath();
    
            // 获取jar包的版本号
    
            String version = getVersionFromJar(jarPath);
    
            return version;
    
        }
    
        private String getVersionFromJar(String jarPath) {
    
            // 根据jar包路径获取版本号逻辑
    
            // ...
    
        }
    
    }
    
    

让我们接着看 JarVersionProviderFilter 的实现在这个实现中需要一个工具类 DubboVersionUtils里面有很多关于DubboJar包的操作方法)。

下面是 JarVersionProviderFilter 的具体实现
    
    
    public class JarVersionProviderFilter implements Filter {
    
        private static final Logger logger = LoggerFactory.getLogger(JarVersionProviderFilter.class);
    
        private static final long PRINT_INTERVAL = 60 * 1000L; // 打印周期
    
        private static final String JAR_VERSION_NAME_KEY = "dubbo.jar.version";
    
        private Map<String, AtomicInteger> versionCountMap = new ConcurrentHashMap<>();
    
        private ExecutorService printExecutorService = Executors.newSingleThreadExecutor();
    
        private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    
        public JarVersionProviderFilter() {
    
            scheduledExecutorService.schedule(() -> {
    
                printVersionCountMap();
    
            }, PRINT_INTERVAL, TimeUnit.MILLISECONDS);
    
        }
    
        @Override
    
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    
            String version = invocation.getAttachment(JAR_VERSION_NAME_KEY);
    
            if (version != null) {
    
                versionCountMap.computeIfAbsent(version, k -> new AtomicInteger()).incrementAndGet();
    
            }
    
            return invoker.invoke(invocation);
    
        }
    
        private void printVersionCountMap() {
    
            logger.info("JarVersionProviderFilter print version count map start");
    
            versionCountMap.forEach((version, count) -> {
    
                logger.info("Jar Version: {}, Request Count: {}", version, count);
    
            });
    
            logger.info("JarVersionProviderFilter print version count map end");
    
        }
    
        @PreDestroy
    
        private void destroy() {
    
            printExecutorService.shutdown();
    
            scheduledExecutorService.shutdown();
    
        }
    
    }
    

到这里我们就完成了一个完整的自定义 Filter 的实现你可以将 JarVersionConsumerFilter 和 JarVersionProviderFilter 注册到 Filter 的 SPI 文件里面在 Dubbo 启动时自动加载并生效

至此我们已经介绍完了 Dubbo 提供的核心 Filter 实现原理以及自定义 Filter 的实践希望通过本章的学习你能对 Dubbo 中的 Filter 有一定的了解并且可以自己根据实际需求实现一个自定义的 Filter接下来我们将进入 Filter 的衍生—— Interceptor 的学习
```java
.maximumSize(1024).build(new CacheLoader<Class<?>, String>() {

    @Override

    public String load(Class<?> key) throws Exception {

        return getJarVersion(key);

    }

});
@Override

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

    Map<String, String> attachments = invocation.getAttachments();

    String version = versionCache.getUnchecked(invoker.getInterface());

    if (!StringUtils.isBlank(version)) {

        attachments.put(JAR_VERSION_NAME_KEY, version);

    }

    return invoker.invoke(invocation);

}
private String getJarVersion(Class clazz) {

    try (BufferedReader reader = new BufferedReader(new InputStreamReader(clazz.getResourceAsStream("/META-INF/MANIFEST.MF")))) { 

        String s = null;

        while ((s = reader.readLine()) != null) {

            int i = s.indexOf("Implementation-Version:");

            if (i > 0) {

                return s.substring(i);

            }

        }

    } catch (IOException e) {

        // 省略异常处理逻辑

    }

    return "";

}

The implementation of JarVersionProviderFilter is very simple. It reads the version information from the request and increments the associated counter by one. Additionally, the constructor of JarVersionProviderFilter starts a scheduled task that runs every minute and logs the statistics (in a production environment, these statistics are usually used to generate reports).

Since JarVersionProviderFilter runs on the provider side, the group field of its @Activate annotation is set to the constant CommonConstants.PROVIDER. The specific implementation of JarVersionProviderFilter is as follows:

@Activate(group = {CommonConstants.PROVIDER}, order = -1)
public class JarVersionProviderFilter implements Filter {
    private static final String JAR_VERSION_NAME_KEY = "dubbo.jar.version";
    private static final Map<String, AtomicLong> versionState = new ConcurrentHashMap<>();
    private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1);

    public JarVersionProviderFilter() {
        SCHEDULED_EXECUTOR_SERVICE.schedule(() -> {
            for (Map.Entry<String, AtomicLong> entry : versionState.entrySet()) {
                System.out.println(entry.getKey() + ":" + entry.getValue().getAndSet(0));
            }
        }, 1, TimeUnit.MINUTES);
    }

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        String versionAttachment = invocation.getAttachment(JAR_VERSION_NAME_KEY);
        if (!StringUtils.isBlank(versionAttachment)) {
            AtomicLong count = versionState.computeIfAbsent(versionAttachment, v -> new AtomicLong(0L));
            count.getAndIncrement();
        }
        return invoker.invoke(invocation);
    }
}

Finally, we need to add an SPI configuration file in the /resources/META-INF/dubbo directory of the provider project. The file name is org.apache.dubbo.rpc.Filter, and its content is as follows:

version-provider=org.apache.dubbo.demo.provider.JarVersionProviderFilter

Similarly, we also need to add the same SPI configuration file in the consumer project, with the same name as the provider project. The content of the file should be as follows:

version-consumer=org.apache.dubbo.demo.consumer.JarVersionConsumerFilter

Summary: This lesson focuses on the implementation of the Filter interface in Dubbo. First, we review the loading process of Filter chains. Then, we analyze in detail the built-in Filter implementations in Dubbo, which are essential for implementing Dubbo’s core functionality. Finally, we explain the process of extending Dubbo’s functionality by customizing Filters, and provide an example of a Filter that calculates the version of a jar package.