Skip to content

05 Server性能提升:设计多个Processor

你好,我是郭屹。今天我们继续手写MiniTomcat。

学完前几节课的内容之后,现在我们已经做到接口满足Servlet规范,而且功能模块拆分成了Connector和Processor两部分,它们各司其职,一个负责网络连接,一个负责Servlet调用处理。

但现在这个Server的运行模式是,一个Connector服务于一个Processor,而且每次创建Processor的时候都是重新实例化一个新的对象,Processor还不支持多线程处理,所以我们在HttpServer性能方面还有提升的空间。

这节课,我们计划引入池的概念,增强Processor的复用性,同时将Processor异步化,支持一个Connector服务于多个Processor。

项目结构

这节课,我们只针对原有的HttpConnector和HttpProcessor类进行改造,所以项目结构和maven引入依赖保持不变,还是延续下面的结构和配置。

MiniTomcat
├─ src
│  ├─ main
│  │  ├─ java
│  │  │  ├─ server
│  │  │  │  ├─ HttpConnector.java
│  │  │  │  ├─ HttpProcessor.java
│  │  │  │  ├─ HttpServer.java
│  │  │  │  ├─ Request.java
│  │  │  │  ├─ Response.java
│  │  │  │  ├─ ServletProcessor.java
│  │  │  │  ├─ StatisResourceProcessor.java
│  │  ├─ resources
│  ├─ test
│  │  ├─ java
│  │  │  ├─ test
│  │  │  │  ├─ HelloServlet.java
│  │  ├─ resources
├─ webroot
│  ├─ test
│  │  ├─ HelloServlet.class
│  ├─ hello.txt
├─ pom.xml

pom.xml参考如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>day3</groupId>
    <artifactId>day3</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <version>4.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.4</version>
        </dependency>
    </dependencies>
</project>

池化技术引入

现在我们在HttpConnector里获取到Socket后,每次都是创建一个全新的HttpProcessor对象,如下所示:

socket = serverSocket.accept();
HttpProcessor processor = new HttpProcessor();
processor.process(socket);

在并发请求逐步增加后,构造新对象会对服务器的性能造成负担,所以我们打算引入池,把对象初始化好之后,需要用的时候再拿出来使用,不需要使用的时候就再放回池里,不用再构造新的对象。

因此,接下来我们先改造HttpConnector类,使用ArrayDeque存放构造完毕的HttpProcessor对象。改造如下:

public class HttpConnector implements Runnable {
    int minProcessors = 3;
    int maxProcessors = 10;
    int curProcessors = 0;
    //存放多个processor的池子
    Deque<HttpProcessor> processors = new ArrayDeque<>();
    public void run() {
        ServerSocket serverSocket = null;
        int port = 8080;
        try {
            serverSocket = new ServerSocket(port, 1, InetAddress.getByName("127.0.0.1"));
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        // initialize processors pool
        for (int i = 0; i < minProcessors; i++) {
            HttpProcessor processor = new HttpProcessor();
            processors.push(processor);
        }
        curProcessors = minProcessors;
        while (true) {
            Socket socket = null;
            try {
                socket = serverSocket.accept();
                //得到一个新的processor,这个processor从池中获取(池中有可能新建)
                HttpProcessor processor = createProcessor();
                if (processor == null) {
                    socket.close();
                    continue;
                }
                processor.process(socket); //处理
                processors.push(processor); //处理完毕后放回池子
                // Close the socket
                socket.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    public void start() {
        Thread thread = new Thread(this);
        thread.start();
    }
    //从池子中获取一个processor,如果池子为空且小于最大限制,则新建一个
    private HttpProcessor createProcessor() {
        synchronized (processors) {
            if (processors.size() > 0) {
                //获取一个
                return ((HttpProcessor) processors.pop());
            }
            if (curProcessors < maxProcessors) {
                //新建一个
                return (newProcessor());
            }
            else {
                return (null);
            }
        }
    }
    //新建一个processor
    private HttpProcessor newProcessor() {
        HttpProcessor initprocessor = new HttpProcessor();
        processors.push(initprocessor);
        curProcessors++;
        return ((HttpProcessor) processors.pop());
    }
}

从上述代码中,我们可以看到,run()方法中先进行了processors池的初始化,就是循环创建新的HttpProcessor对象后,push到processors这个ArrayDeque里,processors作为HttpProcessor对象的存储池使用。

之后,每次接收到一个socket,就用createProcessor()方法获取到一个processor。看一下createProcessor()方法的代码,可以发现当processors内元素不为空的时候,直接从ArrayDeque内获取HttpProcessor对象,每次从池里获取一个processor。如果池子里没有processor,就新创建一个。

无论池子里有没有,最后的结果都是拿到了一个processor,然后执行它,任务执行完之后再把它放回池里。完成了一次请求响应。

但是我们要注意,到现在为止,HttpProcessor并没有做到多线程,也没有实现NIO,只是在池中放置了多个对象,做到了多路复用。目前整体架构还是阻塞式运行的,Socket每次用之后也关闭丢弃了。

所以我们要继续改造,优化HttpProcessor的性能。

多线程HttpProcessor

接下来,我们计划设置多线程Processor,通过这个手段继续优化它的性能。这可以使一个Connector同时服务于多个Processor。

基本的思路是使用多个线程,让Connector和Processor分别由不同的线程来运行。工作的基本流程是由Connector接收某个Socket连接后交给某个Processor线程处理,而Processor线程等待某个Socket连接到来后进行处理,处理完毕之后要交回给Connector。因此, 这里的核心是要设计一个线程之间的同步机制。

首先我们让HttpProcessor实现Runnable接口,这样每一个HttpProcessor都可以在独立的线程中运行。改造如下:

public class HttpProcessor implements Runnable{
    @Override
    public void run() {
        while (true) {
            // 等待socket分配过来
            Socket socket = await();
            if (socket == null) continue;
            // 处理请求
            process(socket);
            // 回收processor
            connector.recycle(this);
        }
    }
}

上述Processor的run()方法执行过程是,等待某个Socket,收到Connector交过来的Socket后,Process方法处理这个Socket,处理完毕之后交回给Connector回收,并重新把这个processor放入池里。

这是Processor接收者这一边,而另一边就是作为分配者的Connector。类似地,我们提供一个assign方法让Connector分配Socket给Processor。

接下来我们重点解决分配者和接收者如何同步的问题。因为这是两个线程,一定需要同步才能协同工作。基本的思路就是 await() 方法里等着Socket,而assign()方法里分配Socket,中间通过一个标志来表示分配和接收状态,以此进行同步。

这个同步的机制内部其实就是用的Java自身提供的notify和wait。

看程序代码,如下所示:

synchronized void assign(Socket socket) {
    // 等待connector提供一个新的socket
    while (available) {
        try {
            wait();
        } catch (InterruptedException e) {
        }
    }
    // 获取到这个新的Socket
    this.socket = socket;
    // 把标志设置回去
    available = true;
    //通知另外的线程
    notifyAll();
}
private synchronized Socket await() {
    // 等待connector提供一个新的socket
    while (!available) {
        try {
            wait();
        }catch (InterruptedException e) {
        }
    }
    // 获得这个新的Socket
    Socket socket = this.socket;
    //设置标志为false
    available = false;
    //通知另外的线程
    notifyAll();
    return (socket);
}

首先看 assign(socket) 方法,在这里,我们用一个标志available来标记,如果标志为true,Connetor线程就继续死等。到了某个时候,Processor线程把这个标志设置为false,Connector线程就跳出死等的循环,然后把接收到的Socket交给Processor。然后要立刻重新把available标志设置为true,再调用 notifyAll() 通知其他线程。

再看 await(),这是作为接收者Processor的线程使用的方法。反过来,如果avaliable标志为false,那么Processor线程继续死等。到了某个时候,Connector线把这个标志设置为true,那么Processor线程就跳出死等的循环,拿到Socket。然后要立刻重新把avaiable标志设置为false,再调用 notifyAll() 通知其他线程。

这个线程互锁机制保证了两个线程之间的同步协调。图示如下:

下面我列出HttpProcessor的完整代码,因为HttpProcessor构造函数调整,增加了HttpConnector的参数,所以我把HttpConnector类调整后的代码一并列出了,随后我们统一说明调整步骤还有调整的理由。

HttpProcessor代码:

public class HttpProcessor implements Runnable{
    Socket socket;
    boolean available = false;
    HttpConnector connector;
    public HttpProcessor(HttpConnector connector) {
        this.connector = connector;
    }
    @Override
    public void run() {
        while (true) {
            // 等待分配下一个 socket
            Socket socket = await();
            if (socket == null) continue;
            // 处理来自这个socket的请求
            process(socket);
            // 完成此请求
            connector.recycle(this);
        }
    }
    public void start() {
        Thread thread = new Thread(this);
        thread.start();
    }
    public void process(Socket socket) {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        InputStream input = null;
        OutputStream output = null;
        try {
            input = socket.getInputStream();
            output = socket.getOutputStream();
            // 创建请求对象并解析
            Request request = new Request(input);
            request.parse();
            // 创建响应对象
            Response response = new Response(output);
            response.setRequest(request);
//               response.sendStaticResource();
            // 检查这是对servlet还是静态资源的请求
            // a request for a servlet begins with "/servlet/"
            if (request.getUri().startsWith("/servlet/")) {
                ServletProcessor processor = new ServletProcessor();
                processor.process(request, response);
            }
            else {
                StaticResourceProcessor processor = new StaticResourceProcessor();
                processor.process(request, response);
            }
            // 关闭 socket
            socket.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    synchronized void assign(Socket socket) {
        // 等待 connector 提供新的 Socket
        while (available) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
        // 存储新可用的 Socket 并通知我们的线程
        this.socket = socket;
        available = true;
        notifyAll();
    }
    private synchronized Socket await() {
        // 等待 connector 提供一个新的 Socket
        while (!available) {
            try {
                wait();
            }catch (InterruptedException e) {
            }
        }
        // 通知Connector我们已经收到这个Socket了
        Socket socket = this.socket;
        available = false;
        notifyAll();
        return (socket);
    }
}

要注意available标志和 assign() 方法都是写在Processor类里的。

processor.assign(socket);

但是这并不表示 assign() 是Processor线程来执行,因为这个方法的调用者是Connector。

HttpConnector代码:

public class HttpConnector implements Runnable {
    int minProcessors = 3;
    int maxProcessors = 10;
    int curProcessors = 0;
    //存放Processor的池子
    Deque<HttpProcessor> processors = new ArrayDeque<>();
    public void run() {
        ServerSocket serverSocket = null;
        int port = 8080;
        try {
            serverSocket = new ServerSocket(port, 1, InetAddress.getByName("127.0.0.1"));
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        // 初始化池子initialize processors pool
        for (int i = 0; i < minProcessors; i++) {
            HttpProcessor initprocessor = new HttpProcessor(this);
            initprocessor.start();
            processors.push(initprocessor);
        }
        curProcessors = minProcessors;
        while (true) {
            Socket socket = null;
            try {
                socket = serverSocket.accept();
                //对每一个socket,从池子中拿到一个processor
                HttpProcessor processor = createProcessor();
                if (processor == null) {
                    socket.close();
                    continue;
                }
                //分配给这个processor
                processor.assign(socket);
                // Close the socket
//                socket.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    public void start() {
        Thread thread = new Thread(this);
        thread.start();
    }
    //从池子中获取一个processor,池子为空且数量小于最大限制则会新建一个processor
    private HttpProcessor createProcessor() {
        synchronized (processors) {
            if (processors.size() > 0) {
                return ((HttpProcessor) processors.pop());
            }
            if (curProcessors < maxProcessors) {
                return (newProcessor());
            }
            else {
                return (null);
            }
        }
    }
    private HttpProcessor newProcessor() {
        HttpProcessor initprocessor = new HttpProcessor(this);
        initprocessor.start();
        processors.push(initprocessor);
        curProcessors++;
        return ((HttpProcessor) processors.pop());
    }
    void recycle(HttpProcessor processor) {
        processors.push(processor);
    }
}

我们再回顾一下HttpProcessor类中的assign方法与await方法。在HttpProcessor的线程启动之后,available的标识一直是false,这个时候这个线程会一直等待。在HttpConnector类里构造Processor,并且调用 processor.assign(socket) 给HttpProcessor分配Socket之后,标识符available改成true,并且调用notifyAll这个本地方法通知唤醒所有等待的线程。

而在await方法里,HttpProcessor拿到HttpConnector传来的Socket之后,首先会接收Socket,并且立即把available由true改为false,最后以拿到的这个Socket为基准继续进行Processor中的处理工作。

这也意味着,一旦Connector分配了一个Socket给到Processor,后者就能立即结束等待,拿到Socket后调用Process方法继续后面的工作。这时available的状态立刻修改,进而用notifyAll方法唤醒Connector的等待线程,Connector就可以全身而退,去处理下一个HttpProcessor了。

Tomcat中两个线程互锁的这种机制很经典,在后续版本的NIO和Servlet协调的设计中都用到了。

这样也就做到了HttpProcessor的异步化,也正因为做到了异步化,我们就不能再利用Connector去关闭Socket了,因为Connector是不知道Processor何时处理完毕的,Socket的关闭任务就交给Processor自己处理了。

因此在Connector类里,socket.close()这一行被注释掉了,而在Processor类里新增了那一行代码。

测试

在src/test/java/test目录下,修改HelloServlet。

package test;
import java.io.IOException;
import javax.servlet.Servlet;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
public class HelloServlet implements Servlet{
    @Override
    public void service(ServletRequest req, ServletResponse res) throws ServletException, IOException {
        res.setCharacterEncoding("UTF-8");
        String doc = "<!DOCTYPE html> \n" +
                "<html>\n" +
                "<head><meta charset=\"utf-8\"><title>Test</title></head>\n"+
                "<body bgcolor=\"#f0f0f0\">\n" +
                "<h1 align=\"center\">" + "Hello World 你好" + "</h1>\n";
        res.getWriter().println(doc);
    }
    @Override
    public void destroy() {
    }
    @Override
    public ServletConfig getServletConfig() {
        return null;
    }
    @Override
    public String getServletInfo() {
        return null;
    }
    @Override
    public void init(ServletConfig arg0) throws ServletException {
    }
}

这一次我们的测试类还是和之前差不多,但在Processor的process方法中,我们新增了Thread.sleep方法。

在准备工作进行完毕之后,我们运行HttpServer服务器,在浏览器中可以连续打开多个页面,键入 http://localhost:8080/hello.txt 后,等待一小段时间(这个时间由Thread.sleep传入参数timeout决定,timeout以毫秒为单位),随后我们可以发现hello.txt里的所有文本内容,都作为返回体展示在浏览器页面上了。

再以同样的方法测试HelloServlet,输入 http://localhost:8080/servlet/test.HelloServlet 后等待一小会儿还是可以看到浏览器显示:Hello World 你好。这也是我们在HelloServlet里定义的返回资源内容。

这些结果可以证明,我们整体的改造已经做到Processor异步化的改造,在客户端连续输入多次请求后,能做到并发执行,互不影响。

小结

这节课我们主要做了两件事:引入池化技术以及Processor多线程。前者我们从优化对象构造,持续复用的角度,做到了性能的优化提升,压缩了构造对象的性能开销;而后者则从并发的角度,使Connector能同时服务于多个Processor,减少了原来因等待Processor处理而产生的时间消耗,但是也需要仔细编写线程同步代码。

本节课完整代码参见: https://gitee.com/yaleguo1/minit-learning-demo/tree/geek_chapter0 5

思考题

学完了这节课的内容,我们来思考一个问题:Tomcat为什么用一个简单的queue来实现多线程而不是用JDK自带的线程池?

欢迎你把你思考后的结果分享到评论区,也欢迎你把这节课的内容分享给其他朋友,我们下节课再见!