深入源码:解密MyBatis数据源设计的精妙机制

数据源的顶级接口 DataSource

我们知道数据源是一个非常重要的基础组件,它的性能直接关系到数据持久层的性能,尽管市面上有很多第三方数据源组件,比如阿里的 druid,Apache 的 DBCP、c3p0,不管是哪种数据源,最终都需要实现 javax.sql.DataSource 接口。

public interface DataSource extends CommonDataSource, Wrapper {
    
    // 尝试通过数据源建立一个连接
    Connection getConnection() throws SQLException;
    
    // 重载的方法,传入用户名以及密码
    Connection getConnection(String username, String password) throws SQLException;
}

MyBatis 框架自身也提供了数据源的实现,分别是 PooledDataSourceUnpooledDataSource

通过 DataSource 接口我们可以知道,数据源本身就是就算是一个连接工厂,当你需要连接时,就问工厂要(调用 getConnection 方法)一个就行了。

一般 DataSource 接口是由数据库驱动商实现,且基本上会有三种实现形式:

  1. 基础实现:每次需要连接对象都是单纯的生产一个标准的新连接对象返回
  2. 连接池实现:生产的连接对象自动加入到连接池中,以便复用连接对象,该实现方式需要与一个中间层连接管理器合作
  3. 分布式事务方式实现:此种实现较为复杂,本文不会涉及

Mybaits 中的数据源实现就是针对以上前两点实现的,UnpooledDataSource 对应基础实现方式,而 PooledDataSource 针对 连接池方式的实现,下面我们直接看源码。

非池化数据源 UnpooledDataSource

非池化数据源,即每次都是创建一个新的数据库连接对象返回。

public class UnpooledDataSource implements DataSource {
  
  private ClassLoader driverClassLoader;
  // 驱动相关属性配置,在下面的工厂方法模式的 setProperties 方法中会将该属性赋值上(如果有驱动相关属性配置的话)
  private Properties driverProperties;
  // 已注册到驱动管理器的驱动集合,不同的数据库对应不同的驱动,比如 mysql 驱动,oracle 驱动...
  private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<String, Driver>();
  // 数据库驱动名称
  private String driver;
  // 数据库 url
  private String url;
  // 用户名
  private String username;
  // 密码
  private String password;
  // 自动提交
  private Boolean autoCommit;
  // 默认隔离级别
  private Integer defaultTransactionIsolationLevel;

  static {
    // 在 UnpooledDataSource 类初始化阶段,将已经注册到驱动管理器中的驱动考一份存入 registeredDrivers 集合中,以驱动的 className 为 key 保存
    Enumeration<Driver> drivers = DriverManager.getDrivers();
    while (drivers.hasMoreElements()) {
      Driver driver = drivers.nextElement();
      registeredDrivers.put(driver.getClass().getName(), driver);
    }
  }
  
  // 省略各种重载的构造函数...
  
  // 省略各种属性的 getter/setter 方法...
  
  @Override
  public Connection getConnection() throws SQLException {
    return doGetConnection(username, password);
  }

  @Override
  public Connection getConnection(String username, String password) throws SQLException {
    return doGetConnection(username, password);
  }
  
  // getConnection 方法都会转到 doGetConnection 方法上
  private Connection doGetConnection(String username, String password) throws SQLException {
    Properties props = new Properties();
    if (driverProperties != null) {
      props.putAll(driverProperties);
    }
    if (username != null) {
      props.setProperty("user", username);
    }
    if (password != null) {
      props.setProperty("password", password);
    }
    // 最终调用的是下面的重载方法
    return doGetConnection(props);
  }

  private Connection doGetConnection(Properties properties) throws SQLException {
    // 1. 初始化驱动
    initializeDriver();
    // 2. 通过驱动管理器获取连接,如果1.中没有合适的驱动注册到驱动管理器中,这里根据指定的 url 前缀(如:jdbc:mysql:xxx)就找不到合适的 JDBC 驱动,也就获取不到连接对象
    Connection connection = DriverManager.getConnection(url, properties);
    // 3. 配置连接对象
    configureConnection(connection);
    // 4. 返回连接对象
    return connection;
  }
  
  // 只有不符合 JDBC Driver SPI 的驱动才可能会进入 if 内
  private synchronized void initializeDriver() throws SQLException {
    // 假设这里传入的 driver 为某种不符号 SPI 的驱动商驱动,第一次会进入 if 内
    if (!registeredDrivers.containsKey(driver)) {
      Class<?> driverType;
      try {
        if (driverClassLoader != null) {
          // 如果 driverClassLoader 在配置文件中配置了,就进入这里
          driverType = Class.forName(driver, true, driverClassLoader);
        } else {
          // 没有配置单独的 driverClassLoader,则执行这里,加载 driver 驱动类
          driverType = Resources.classForName(driver);
        }
        // 驱动类加载完成,开始实例化驱动
        Driver driverInstance = (Driver)driverType.newInstance();
        // 注册驱动类,注意这里是一个静态代理类
        DriverManager.registerDriver(new DriverProxy(driverInstance));
        // 存入已注册驱动集合中
        registeredDrivers.put(driver, driverInstance);
      } catch (Exception e) {
        throw new SQLException("Error setting driver on UnpooledDataSource. Cause: " + e);
      }
    }
  }
  
  private void configureConnection(Connection conn) throws SQLException {
    // 配置连接对象,就是在已经获取到的连接上配置自动提交和默认事务的隔离级别
    if (autoCommit != null && autoCommit != conn.getAutoCommit()) {
      conn.setAutoCommit(autoCommit);
    }
    if (defaultTransactionIsolationLevel != null) {
      conn.setTransactionIsolation(defaultTransactionIsolationLevel);
    }
  }
  
  // 静态内部类,Driver 的静态代理类
  private static class DriverProxy implements Driver {
    private Driver driver;

    DriverProxy(Driver d) {
      this.driver = d;
    }

    @Override
    public boolean acceptsURL(String u) throws SQLException {
      return this.driver.acceptsURL(u);
    }

    @Override
    public Connection connect(String u, Properties p) throws SQLException {
      return this.driver.connect(u, p);
    }

    @Override
    public int getMajorVersion() {
      return this.driver.getMajorVersion();
    }

    @Override
    public int getMinorVersion() {
      return this.driver.getMinorVersion();
    }

    @Override
    public DriverPropertyInfo[] getPropertyInfo(String u, Properties p) throws SQLException {
      return this.driver.getPropertyInfo(u, p);
    }

    @Override
    public boolean jdbcCompliant() {
      return this.driver.jdbcCompliant();
    }

    // @Override only valid jdk7+
    public Logger getParentLogger() {
      return Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
    }
  }

以上就是非池化数据源的全部源码,代码比较简单,对于熟悉 JDBC 编程的同学几乎没有难度,此外我额外提一下关于该非池化数据源的创建过程,以上源码省略了其构造函数,实际上 MyBatis 采用了工厂方法模式来创建非池化以及池化数据源。

非池化数据源工厂 UnpooledDataSourceFactory

MyBatis 定义了一个 DataSourceFactory 接口来作为工厂方法模式中的工厂接口。

public interface DataSourceFactory {

  // 为 DataSource 设置相关属性
  void setProperties(Properties props);
  // 获取数据源对象
  DataSource getDataSource();
}

那既然采用了工厂方法模式来创建不同的数据源实例,那么自然针对不同的产品(数据源)就会存在对应的工厂实现类,针对 UnpooledDataSource 产品的工厂类实现就是 UnpooledDataSourceFactory 类。

利用工厂方法模式,MyBatis 就可以直接面向工厂接口以及产品接口编程,而不用去管具体的工厂类和具体的产品类,与之带来的优点就是开闭原则:对扩展开放,对修改关闭,

如果我们需要增加一种数据源(产品,比如增加一种第三方数据源),MyBatis 只要再额外增加一种对应的工厂类就可以了。

public class UnpooledDataSourceFactory implements DataSourceFactory {
    
  // 以 driver 开头的属性
  private static final String DRIVER_PROPERTY_PREFIX = "driver.";
  private static final int DRIVER_PROPERTY_PREFIX_LENGTH = DRIVER_PROPERTY_PREFIX.length();

  protected DataSource dataSource;

  // 利用工厂的构造函数直接 new 一个数据源对象出来
  public UnpooledDataSourceFactory() {
    this.dataSource = new UnpooledDataSource();
  }

  @Override
  public void setProperties(Properties properties) {
    // 抽出属性配置中的驱动相关配置,并保存到 driverProperties 中
    Properties driverProperties = new Properties();
    MetaObject metaDataSource = SystemMetaObject.forObject(dataSource);
    for (Object key : properties.keySet()) {
      String propertyName = (String) key;
      if (propertyName.startsWith(DRIVER_PROPERTY_PREFIX)) {
        // 获取驱动相关属性名(去除前缀 driver. 的)保存
        String value = properties.getProperty(propertyName);
        driverProperties.setProperty(propertyName.substring(DRIVER_PROPERTY_PREFIX_LENGTH), value);
      } else if (metaDataSource.hasSetter(propertyName)) {
        String value = (String) properties.get(propertyName);
        Object convertedValue = convertValue(metaDataSource, propertyName, value);
        // dataSource 相关的配置属性,直接 set 进去
        metaDataSource.setValue(propertyName, convertedValue);
      } else {
        throw new DataSourceException("Unknown DataSource property: " + propertyName);
      }
    }
    if (driverProperties.size() > 0) {
      // 最后,如果有驱动相关的属性配置的话,将其设置到 dataSource 对象的 driverProperties 属性上去
      metaDataSource.setValue("driverProperties", driverProperties);
    }
  }

  @Override
  public DataSource getDataSource() {
    return dataSource;
  }
}

至此我们解析了整个 MyBatis 提供的非池化数据源的创建过程,可见底层还是调用了 Java JDBC 相关的代码。

大致总结一下:

  1. 调用非池化数据源工厂类 UnpooledDataSourceFactory 的构造函数,创建 工厂对象
  2. 工厂对象的 构造函数中 调用非池化数据源类 UnpooledDataSource 的构造函数,创建 非池化数据源对象
  3. 调用非池化数据源 工厂对象setProperties(...) 方法,根据入参 Properties,设置非池化数据源对象的基本属性(如 urlusernamepassword 等)以及赋值 driverProperties 属性
  4. 最后就可以调用非池化数据源工厂对象的 getDataSource 方法获取 数据源对象 实例

池化数据源 PooledDataSource

为什么需要一个池化的数据源呢?我们知道在 JDBC 编程中,数据库连接的创建过程是十分耗时的,且数据库能够建立的连接数也是有限的。因此,数据库连接是一种极其珍贵的系统资源,也就有了池化的必要,从而保证数据库连接的重用,提高系统响应速度,防止数据库连接泄露等。

MyBatis 使用 PooledDataSource 来实现数据源连接池的功能,其内部依赖了上文分析的 UnplooledDataSource 组件,因为池化的数据源也是需要创建连接的,其创建连接的部分功能就由内部持有的 UnpooledDataSource 来完成。

接下来我们直接看源码:

// 一个简单,同步,线程安全的数据库连接池
public class PooledDataSource implements DataSource {

    // 省略其他属性...
    
    // 内部持有一个非池化的数据源,用来创建连接对象
    private final UnpooledDataSource dataSource;

    // 有关池相关的可选配置属性
    // 最大活跃连接数, 默认值 10
    protected int poolMaximumActiveConnections = 10;
    
    // 最大空闲连接数,默认值 5,超过该值配置的空闲连接会被直接关闭,而不是再将其放入池中维护
    protected int poolMaximumIdleConnections = 5;
    
    // 强制收回时间,获取连接时如果没有空闲连接可用且不满足创建新连接的条件,就会从活跃连接集合中获取第一个连接,并判断其上次检查时间到当前时间的差,如果大于该配置值就满足强制收回条件,配置默认值为 20 秒
    protected int poolMaximumCheckoutTime = 20000;
    
    // 获取连接等待时间,当获取连接时没有空闲连接,且不能创建新连接,且没有活跃连接到达强制收回条件,则当前获取连接的线程就会阻塞等待该配置的时间,默认20秒
    protected int poolTimeToWait = 20000;

    // 如果获取到的是一个坏的连接,那么重新获取一个新的连接,次数最多为 poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance
    protected int poolMaximumLocalBadConnectionTolerance = 3;

    // 心跳相关配置属性,心跳查询 sql
    protected String poolPingQuery = "NO PING QUERY SET";
    
    // 是否允许心跳查询
    protected boolean poolPingEnabled;
    
    // 执行心跳查询频率
    protected int poolPingConnectionsNotUsedFor;

    // 另一个重要属性是 PoolState,该类用来存放数据库状态,包括我们的池相关功能(活动连接,空闲连接的保存..)都是通过该类实现的,当我们使用 MyBatis 的连接池出现问题或者需要优化时,可以利用该属性获取到连接池的各种状态来做参考
    private final PoolState state = new PoolState(this);

}

以上就是 PooledDataSource 类的重要属性分析,既然是属性,那就需要有相应的设置配置属性的方法提供。

// 设置池最大活动连接数
public void setPoolMaximumActiveConnections(int poolMaximumActiveConnections) {
    this.poolMaximumActiveConnections = poolMaximumActiveConnections;
    forceCloseAll();
}

// 设置池最大空闲连接数
public void setPoolMaximumIdleConnections(int poolMaximumIdleConnections) {
    this.poolMaximumIdleConnections = poolMaximumIdleConnections;
    forceCloseAll();
}

// 省略其他设置方法...

// 以上是数据源连接池提供的设置自身池相关属性的方法,由于其内部持有了一个非池化数据源,因此 MyBatis 也提供了一些方法用来设置非池化数据源的配置属性,如 driver、url、username、password 等

// 设置驱动
public void setDriver(String driver) {
    dataSource.setDriver(driver);
    forceCloseAll();
}

// 设置 url
public void setUrl(String url) {
    dataSource.setUrl(url);
    forceCloseAll();
}

// 设置 username
public void setUsername(String username) {
    dataSource.setUsername(username);
    forceCloseAll();
}

// 设置 password
public void setPassword(String password) {
    dataSource.setPassword(password);
    forceCloseAll();
}

// 省略其他非池化属性设置方法...

通过以上设置相关配置属性的方法我们可以看到,基本上所有的属性配置方法都会执行一个 forceCloseAll() 方法,下面我们看下这个方法做了什么事。

// 代码逻辑不复杂,就是利用 for 循环,关闭池中所有活跃和空闲的连接
public void forceCloseAll() {
    synchronized (state) {
      expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
      for (int i = state.activeConnections.size(); i > 0; i--) {
        try {
          // 从 PoolState 对象持有的活跃连接集合中移除活跃连接
          PooledConnection conn = state.activeConnections.remove(i - 1);
          // 失效该活跃连接对象
          conn.invalidate();
          // 获取 PooledConnection 对象持有的真实连接对象 Connection
          Connection realConn = conn.getRealConnection();
          // 如果真实的连接对象是手动提交事务的,这里需要回滚事务
          if (!realConn.getAutoCommit()) {
            realConn.rollback();
          }
          // 关闭连接对象
          realConn.close();
        } catch (Exception e) {
          // ignore
        }
      }
      for (int i = state.idleConnections.size(); i > 0; i--) {
        try {
          PooledConnection conn = state.idleConnections.remove(i - 1);
          conn.invalidate();
          Connection realConn = conn.getRealConnection();
          if (!realConn.getAutoCommit()) {
            realConn.rollback();
          }
          realConn.close();
        } catch (Exception e) {
          // ignore
        }
      }
    }
    if (log.isDebugEnabled()) {
      log.debug("PooledDataSource forcefully closed/removed all connections.");
    }
  }

下面我们来分析数据源连接池的重要的基本功能:连接获取缓存释放等功能,首先看连接获取。

PooledDataSource 类实现了 DataSource 接口,自然就需要重写其两个 getConnection 方法。

  @Override
  public Connection getConnection() throws SQLException {
    return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection();
  }

  @Override
  public Connection getConnection(String username, String password) throws SQLException {
    return popConnection(username, password).getProxyConnection();
  }

以上两个重载的方法都转到了 popConnection 方法上,我们直接看 popConnection方法 的源码,该方法代码量比较多,毕竟是数据源连接池的核心逻辑所在,重点分析这个方法。

private PooledConnection popConnection(String username, String password) throws SQLException {...}

首先看下方法声明,该方法并不是直接返回一个 Connection 对象,而是一个 PooledConnection 对象,下面我们先看下这个类的定义。

PooledConnection 不仅是一个 InvocationHandler,还持有数据源 dataSource,真实的连接对象 realConnection,以及 JDK 动态代理连接对象 proxyConnection,以及连接对象的创建时间,上次使用时间,连接是否有效等属性…

class PooledConnection implements InvocationHandler {
    
    private static final String CLOSE = "close";
    
    // 创建代理类时传入,作为代理的接口类型
    private static final Class<?>[] IFACES = new Class<?>[] { Connection.class };
    
    private final PooledDataSource dataSource;
    private final Connection realConnection;
    private final Connection proxyConnection;
    
    private long checkoutTimestamp;
    private long createdTimestamp;
    private long lastUsedTimestamp;
    private int connectionTypeCode;
    private boolean valid;
    
    // 构造函数
    public PooledConnection(Connection connection, PooledDataSource dataSource) {
        this.hashCode = connection.hashCode();
        this.realConnection = connection;
        this.dataSource = dataSource;
        this.createdTimestamp = System.currentTimeMillis();
        this.lastUsedTimestamp = System.currentTimeMillis();
        this.valid = true;
        // 利用 JDK 动态代理,创建出代理对象 proxyConnection,并传入当前实例 this 作为 InvocationHandler
        this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
  }
}

那既然是一个 InvocationHandler,那重要的方法就是 invoke 啦。

  @Override
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) {
      dataSource.pushConnection(this);
      return null;
    } else {
      try {
        if (!Object.class.equals(method.getDeclaringClass())) {
          // issue #579 toString() should never fail
          // throw an SQLException instead of a Runtime
          checkConnection();
        }
        return method.invoke(realConnection, args);
      } catch (Throwable t) {
        throw ExceptionUtil.unwrapThrowable(t);
      }
    }
  }

可以看到实际上就是代理了 Connection 对象的 close 方法,如果不是池化的数据源,那这里也就没必要作什么代理,连接对象的 close 方法执行就是真实的连接对象的关闭操作,而如果是一个池化的数据源,那一个真实连接对象的 close 方法就不能简单的直接关闭就完事了,到底关不关,需要交给池化的 dataSource 决定,内部处理逻辑是:如果空闲的连接已经超过了配置设定的最大空闲连接数,那就关闭真实连接对象,否则就加到空闲连接集合中缓存起来 …

这就是 PooledConnection 存在的作用,分析到了这里,是不是优点迫不及待的想看看到底 PooledDataSource 是怎么获取连接以及释放连接的呢?

继续上面的获取连接的方法 popConnection 分析,核心地方到了。

private PooledConnection popConnection(String username, String password) throws SQLException {
    boolean countedWait = false;
    PooledConnection conn = null;
    long t = System.currentTimeMillis();
    int localBadConnectionCount = 0;

    while (conn == null) {
      // 利用一个 while 循环获取连接,文章开头说的同步的数据源连接池,原因就在这里,你可以把 PoolState 理解为一个大的池,里面有存放活跃/空闲连接的集合,每次获取连接也好,释放也好,都需要加锁同步处理,否则会出现并发安全问题
      synchronized (state) {
        if (!state.idleConnections.isEmpty()) {
          // 1. 如果池的空闲连接集合不为空,则代表有可用的连接,则取出一个空闲连接
          conn = state.idleConnections.remove(0);
          if (log.isDebugEnabled()) {
            // 打印取出连接日志
            log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
          }
        } else {
          // 2. 池中没有空闲连接,则说明没有连接可用,则判断是否满足可以新建连接,即判断池当前活跃连接数是否小于配置的最大活跃连接数
          if (state.activeConnections.size() < poolMaximumActiveConnections) {
            // 2.1. 允许创建新连接,调用了内部持有的 UnpooledDataSource 实例的 getConnection 方法创建新的对象,并封装成 PooledConnection 对象返回
            conn = new PooledConnection(dataSource.getConnection(), this);
            if (log.isDebugEnabled()) {
              // 打印新建连接日志
              log.debug("Created connection " + conn.getRealHashCode() + ".");
            }
          } else {
            // 不允许创建新连接,则尝试获取最新创建的活跃连接
            PooledConnection oldestActiveConnection = state.activeConnections.get(0);
            long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
            // 检测该连接是否超时
            if (longestCheckoutTime > poolMaximumCheckoutTime) {
              // 统计超时连接
              state.claimedOverdueConnectionCount++;// 超时连接对象数量+1
              state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
              state.accumulatedCheckoutTime += longestCheckoutTime;
              // 将超时连接移出活跃连接集合
              state.activeConnections.remove(oldestActiveConnection);
              // 如果超时连接是手动提交的方式
              if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                try {
                  // 则尝试回滚该超时连接
                  oldestActiveConnection.getRealConnection().rollback();
                } catch (SQLException e) {
                  log.debug("Bad connection. Could not roll back");
                }  
              }
              // 利用超时连接的真实 Connection 连接对象创建一个新的 PooledConnection 对象,并设置创建时间为之前超时连接的创建时间
              conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
              conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
              // 设置上次使用时间为超时连接的上次使用时间
              conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
              // 超时连接对象置为无效状态
              oldestActiveConnection.invalidate();
              if (log.isDebugEnabled()) {
                log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
              }
            } else {
              // 执行到这里,等于没有空闲连接,且无法创建新连接,且无超时活跃连接,则只能等待,等待时间是 poolTimeToWait 属性配置项
              try {
                if (!countedWait) {
                  // 统计池状态,增加等待数量
                  state.hadToWaitCount++;
                  countedWait = true;
                }
                if (log.isDebugEnabled()) {
                  log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
                }
                long wt = System.currentTimeMillis();
                state.wait(poolTimeToWait);
                // 统计累计等待时间
                state.accumulatedWaitTime += System.currentTimeMillis() - wt;
              } catch (InterruptedException e) {
                break;
              }
            }
          }
        }
        // 如果是等待后执行这里,则不会进入if,直接下一次循环获取连接对象,如果不是等待后执行的这里,则会进入if内
        if (conn != null) {
          // 检测获取到的连接对象是否有效
          if (conn.isValid()) {
            // 如果这里的连接真实对象是手动提交事务的,就回滚事务
            if (!conn.getRealConnection().getAutoCommit()) {
              conn.getRealConnection().rollback();
            }
            conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
            // 重置连接对象的上次使用时间和检查时间为当前时间戳
            conn.setCheckoutTimestamp(System.currentTimeMillis());
            conn.setLastUsedTimestamp(System.currentTimeMillis());
            // 将其添加到活跃连接集合中
            state.activeConnections.add(conn);
            // 增加请求连接成功数量
            state.requestCount++;
            // 累计请求时间
            state.accumulatedRequestTime += System.currentTimeMillis() - t;
          } else {
            // 无效连接对象,则打印日志
            if (log.isDebugEnabled()) {
              log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
            }
            // 统计无效连接数量
            state.badConnectionCount++;
            localBadConnectionCount++;
            conn = null;
            if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
              // 如果获取到的无效连接数量超过 poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance 之和,则会执行这里,抛出 SQLException 异常...
              if (log.isDebugEnabled()) {
                log.debug("PooledDataSource: Could not get a good connection to the database.");
              }
              throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
            }
          }
        }
      }

    }

    if (conn == null) {
      if (log.isDebugEnabled()) {
        log.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
      }
      throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
    }

    // while 循环结束,返回 pooledConnection 对象
    return conn;
  }

以上就是整个数据源连接池中关于获取连接的代码逻辑分析,下面用一个流程图总结一下。

当我们获取到连接对象 PooledConnection 之后,还需要调用其 getProxyConnection() 方法获取。
PooledConnection 对象中封装的 JDK代理对象 返回,MyBatis 拿到这个 Connection 代理对象后就可以使用了,比如获取 预处理语句执行 等方法…

介绍完了整个获取连接的流程,我们下面分析一下释放的逻辑,比如 MyBatis 拿到代理对象 Connection 之后执行完一整套 SQL 相关方法之后,需要调用代理对象的 close 方法,其 close 方法调用之后就会进入到 PooledConnection(InvocationHandler)的 invoke 方法中,invoke 中我们上面看到,当执行方法为 close 时会直接调用 dataSource.pushConnection(this);,因此我们看 PooledDataSource 的 pushConnection 方法源码。

protected void pushConnection(PooledConnection conn) throws SQLException {
    // 同样需要加锁同步
    synchronized (state) {
      // 将连接对象 PooledConnection 对象从池的活跃集合中移除
      state.activeConnections.remove(conn);
      if (conn.isValid()) {
        // 移除的活跃连接有效,就判断当前池的空闲连接数以达到上限,以及 PooledConnection 是否为该连接池的连接
        if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
          // 统计累计校验时间
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          if (!conn.getRealConnection().getAutoCommit()) {
            conn.getRealConnection().rollback();
          }
          // 为返还连接生成一个新的 PooledConnection 对象,只是使用了老的真实连接对象
          PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
          // 增加池空闲连接对象
          state.idleConnections.add(newConn);
          // 新连接对象使用老连接对象的创建时间和上次使用时间
          newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
          newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
          // 老的连接对象失效处理
          conn.invalidate();
          if (log.isDebugEnabled()) {
            log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
          }
          // 唤醒阻塞在等待空闲连接的线程
          state.notifyAll();
        } else {
          // 空闲连接数已达上限,则关闭真实的数据库连接,并失效连接对象 PooledConnection
          // 统计累计校验时间
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          if (!conn.getRealConnection().getAutoCommit()) {
            conn.getRealConnection().rollback();
          }
          conn.getRealConnection().close();
          if (log.isDebugEnabled()) {
            log.debug("Closed connection " + conn.getRealHashCode() + ".");
          }
          conn.invalidate();
        }
      } else {
        // 移除的连接对象无效,统计池状态的 badConnectionCount 数量
        if (log.isDebugEnabled()) {
          log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
        }
        state.badConnectionCount++;
      }
    }
  }

源码相对 popConnection 来说比较少,逻辑比较简单。

需要注意的是 PooledConnection 的 isValid 方法,该方法不仅会检测 boolean 属性 valid,而且会尝试调用 PooledDataSource 的 pingConnection(this) 方法让数据库执行 poolPingQuery 属性中保存的测试 SQL 语句,从而确保该真实连接对象的真实可用。

public boolean isValid() {
    return valid && realConnection != null && dataSource.pingConnection(this);
}
protected boolean pingConnection(PooledConnection conn) {
    // 记录 ping 操作是否成功
    boolean result = true;

    try {
      result = !conn.getRealConnection().isClosed();
    } catch (SQLException e) {
      if (log.isDebugEnabled()) {
        log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
      }
      result = false;
    }
    
    // 当真实连接对象确实没有关闭
    if (result) {
      // 校验是否启动 Ping
      if (poolPingEnabled) {
        // 只有长时间(超过 poolPingConnectionsNotUsedFor 指定的时长)未使用的连接才需要 Ping 操作
        if (poolPingConnectionsNotUsedFor >= 0 && conn.getTimeElapsedSinceLastUse() > poolPingConnectionsNotUsedFor) {
          // conn.getTimeElapsedSinceLastUse() 的时间就是采用当前时间 - lastUsedTimestamp 时间得来的
          try {
            if (log.isDebugEnabled()) {
              log.debug("Testing connection " + conn.getRealHashCode() + " ...");
            }
            // 1. 获取真实连接对象
            Connection realConn = conn.getRealConnection();
            // 2. 获取预处理语句
            Statement statement = realConn.createStatement();
            // 3. 执行 poolPingQuery SQL
            ResultSet rs = statement.executeQuery(poolPingQuery);
            // 关闭 ResultSet
            rs.close();
            // 关闭预处理语句
            statement.close();
            if (!realConn.getAutoCommit()) {
              // 回滚事务
              realConn.rollback();
            }
            // 执行完毕返回 true
            result = true;
            if (log.isDebugEnabled()) {
              log.debug("Connection " + conn.getRealHashCode() + " is GOOD!");
            }
          } catch (Exception e) {
            log.warn("Execution of ping query '" + poolPingQuery + "' failed: " + e.getMessage());
            try {
              // 执行 SQL 发生异常,则关闭真实连接对象
              conn.getRealConnection().close();
            } catch (Exception e2) {
              // ignore
            }
            // 返回 false
            result = false;
            if (log.isDebugEnabled()) {
              log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
            }
          }
        }
      }
    }
    return result;
  }

池化数据源工厂 PooledDataSourceFactory

PooledDataSource 和上文中非池化数据源一样,也是采用了工厂方法模式创建数据源实例,因此,MyBatis 额外提供一个工厂实现类 PooledDataSourceFactory。

public class PooledDataSourceFactory extends UnpooledDataSourceFactory {

  public PooledDataSourceFactory() {
    this.dataSource = new PooledDataSource();
  }
}

可以看出这里继承了 UnpooledDataSourceFactory,只是提供了自己的构造函数用来初始化 dataSource 属性为 PooledDataSource,而 PooledDataSource 的构造函数实际上就是 new 了一个 UnPooledDataSource。