Datasource 与 Transaction 模块
本文将介绍 MyBatis 数据源与事务模块。包括工厂方法设计模式,以及 UnpooledDataSource、UnpooledDataSourceFactory、PooledConnection、PoolState、事务 TransactionFactory、JdbcTransaction 原理
数据源是持久层框架中最核心的组件之一,在实际工作中比较常见的数据源有 C3P0、Apache Common DBCP、Proxool 等。作为一款成熟的持久化框架,MyBatis 不仅自己提供了一套数据源实现,而且还能够方便地集成第三方数据源。
javax.sql.DataSource
是 Java 语言中用来抽象数据源的接口,其中定义了所有数据源实现的公共行为,MyBatis 自身提供的数据源实现也要实现该接口。
MyBatis 提供了两种类型的数据源实现,分别是 PooledDataSource
和 UnpooledDataSource
。针对不同类型的 DataSource
实现,MyBatis 提供了不同的工厂实现来进行创建,如下图所示,这是工厂方法模式的一个典型应用场景:
![DataSource 类图](images/datasource.webp)
工厂方法模式
工厂方法模式中定义了 Factory
这个工厂接口,如下图所示,其中定义了 createProduct()
方法创建右侧继承树中的对象,不同的工厂接口实现类会创建右侧继承树中不同 Product
实现类(例如 ProductImpl1
和 ProductImpl2
)。
![工厂方法模式](images/factory.webp)
从上图中,我们可以看到工厂方法模式由四个核心角色构成:
- Factory 接口:工厂方法模式的核心接口之一。使用方会依赖
Factory
接口创建Product
对象实例。 - Factory 实现类(图中的 FactoryImpl1 和 FactoryImpl2):用于创建
Product
对象。不同的Factory
实现会根据需求创建不同的Product
实现类。 - Product 接口:用于定义业务类的核心功能。
Factor
y 接口创建出来的所有对象都需要实现Product
接口。使用方依赖Product
接口编写其他业务实现,所以使用方关心的是Product
接口这个抽象,而不是其中的具体实现逻辑。 - Product 实现类(图中的 ProductImpl1 和 ProductImpl2):实现了
Product
接口中定义的方法,完成了具体的业务逻辑。
这里假设一个场景:目前我们要做一个注册中心模块,已经有了 ZookeeperImpl
和 EtcdImpl
两个业务实现类,分别支持了与 ZooKeeper 交互和与 etcd 交互,此时来了个新需求,需要支持与 Consul 交互,我们只需要添加新的 ConsulFactory
实现类和 ConsulImpl
实现类即可完成扩展。
工厂方法模式最终也是符合“开放 —— 封闭”原则的,可以通过添加新的 Factory
接口实现和 Product
接口实现来扩展整个体系的功能。另外,工厂方法模式对使用方暴露的是 Factory
和 Product
这两个抽象的接口,而不是具体的实现,也就帮助使用方面向接口编程。
datasource.DataSourceFactory
了解了工厂方法模式的基础知识之后,我们再回到 MyBatis 的数据源实现上来。MyBatis 的数据源模块也是用到了工厂方法模式,如果需要扩展新的数据源实现时,只需要添加对应的 Factory 实现类,新的数据源就可以被 MyBatis 使用。
DataSourceFactory
接口就扮演了 MyBatis 数据源实现中的 Factory
接口角色。UnpooledDataSourceFactory
和 PooledDataSourceFactory
实现了 DataSourceFactory
接口,也就是 Factory
接口实现类的角色。三者的继承关系如下图所示:
![DataSourceFactory](images/datasource-factory.webp)
DataSourceFactory
接口中最核心的方法是 getDataSource()
方法,该方法用来生成一个 DataSource
对象。
public interface DataSourceFactory {
void setProperties(Properties props);
DataSource getDataSource();
}
UnpooledDataSourceFactory
在 UnpooledDataSourceFactory
这个实现类的初始化过程中,会直接创建 UnpooledDataSource
对象,其中的 dataSource
字段会指向该 UnpooledDataSource
对象。
接下来调用的 setProperties()
方法会根据传入的配置信息,完成对该 UnpooledDataSource
对象相关属性的设置:
UnpooledDataSourceFactory
对于 getDataSource()
方法的实现就相对简单了,其中直接返回了上面创建的 UnpooledDataSource
对象。
PooledDataSourceFactory
从前面的 DataSourceFactory
继承关系图中可以看到,PooledDataSourceFactory
是通过继承 UnpooledDataSourceFactory
间接实现了 DataSourceFactory
接口。
public class PooledDataSourceFactory extends UnpooledDataSourceFactory {
public PooledDataSourceFactory() {
this.dataSource = new PooledDataSource();
}
}
在 PooledDataSourceFactory
中并没有覆盖 UnpooledDataSourceFactory
中的任何方法,唯一的变化就是将 dataSource
字段指向的 DataSource
对象类型改为 PooledDataSource
类型。
javax.sql.DataSource
JDK 提供的 javax.sql.DataSource
接口在 MyBatis 数据源中扮演了 Product
接口的角色。 MyBatis 提供的数据源实现有两个,一个 UnpooledDataSource
实现,另一个 PooledDataSource
实现,它们都是 “Product
” 具体实现类的角色。
unpooled.UnpooledDataSource
我们先来看 UnpooledDataSource
的实现,其中的核心字段有如下:
public class UnpooledDataSource implements DataSource {
// 加载 Driver 类的类加载器
private ClassLoader driverClassLoader;
// 数据库连接驱动的相关配置
private Properties driverProperties;
// 缓存所有已注册的数据库连接驱动
private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<>();
// 事务隔离级别
private Integer defaultTransactionIsolationLevel;
}
在 Java 中,几乎所有数据源实现的底层都是依赖 JDBC 操作数据库的,而使用 JDBC 的第一步就是向 DriverManager
注册 JDBC 驱动类,之后才能创建数据库连接。DriverManager
中定义了 registeredDrivers
字段用于记录注册的 JDBC 驱动,这是一个 CopyOnWriteArrayList
类型的集合,是线程安全的。
MyBatis 的 UnpooledDataSource
实现中定义了如下静态代码块,从而在 UnpooledDataSource
加载时,将已在 DriverManager
中注册的 JDBC 驱动器实例复制一份到 UnpooledDataSource.registeredDrivers
集合中。
static {
Enumeration<Driver> drivers = DriverManager.getDrivers();
while (drivers.hasMoreElements()) {
Driver driver = drivers.nextElement();
registeredDrivers.put(driver.getClass().getName(), driver);
}
}
在 getConnection()
方法中,UnpooledDataSource
会调用 doGetConnection()
方法获取数据库连接,具体实现如下:
private Connection doGetConnection(Properties properties) throws SQLException {
// 初始化数据库驱动
initializeDriver();
// 创建数据库连接
Connection connection = DriverManager.getConnection(url, properties);
// 配置数据库连接
configureConnection(connection);
return connection;
}
- 在调用的
initializeDriver()
方法中,完成了 JDBC 驱动的初始化,其中会创建配置中指定的Driver
对象,并将其注册到DriverManager
以及上面介绍的UnpooledDataSource.registeredDrivers
集合中保存; configureConnection()
方法会对数据库连接进行一系列配置,例如,数据库连接超时时长、事务是否自动提交以及使用的事务隔离级别。
pooled.PooledDataSource
在 PooledDataSource
中并没有直接维护数据库连接的集合,而是维护了一个 PooledState
类型的字段(state
字段),这个 PooledState
才是管理连接的地方。不过 PooledState
中维护的数据库连接并不是真正的数据库连接(不是 java.sql.Connection
对象),而是 PooledConnection
对象。
PooledConnection
PooledConnection
是 MyBatis 中定义的一个 InvocationHandler
接口实现类,其中封装了真正的 java.sql.Connection
对象以及相关的代理对象,这里的代理对象是通过 JDK 动态代理产生的。
下面来看 PooledConnection
中的核心字段:
class PooledConnection implements InvocationHandler {
/*
* 记录当前 PooledConnection 对象归属的 PooledDataSource 对象。
* 也就是说,当前的 PooledConnection 是由该 PooledDataSource 对象创建的;
* 在通过 close() 方法关闭当前 PooledConnection 的时候,当前 PooledConnection 会被返还给该 PooledDataSource 对象。
*/
private final PooledDataSource dataSource;
// 当前 PooledConnection 底层的真正数据库连接对象
private final Connection realConnection;
// 指向了 realConnection 数据库连接的代理对象
private final Connection proxyConnection;
// 使用方从连接池中获取连接的时间戳
private long checkoutTimestamp;
// 连接创建的时间戳
private long createdTimestamp;
// 连接最后一次被使用的时间戳
private long lastUsedTimestamp;
// 数据库连接的标识。该标识是由数据库 URL、username 和 password 三部分组合计算出来的 hash 值,主要用于连接对象确认归属的连接池
private int connectionTypeCode;
/*
* 用于标识 PooledConnection 对象是否有效。该字段的主要目的是防止使用方将连接归还给连接池之后,
* 依然保留该 PooledConnection 对象的引用并继续通过该 PooledConnection 对象操作数据库。
*/
private boolean valid;
}
下面来看 PooledConnection
的构造方法,其中会初始化上述字段:
public PooledConnection(Connection connection, PooledDataSource dataSource) {
this.hashCode = connection.hashCode();
this.realConnection = connection;
this.dataSource = dataSource;
this.createdTimestamp = System.currentTimeMillis();
this.lastUsedTimestamp = System.currentTimeMillis();
this.valid = true;
this.proxyConnection = (Connection) Proxy
.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
}
构造方法中我们重点关注 proxyConnection
,它是通过 JDK 动态代理的方式生成的,其中传入的 InvocationHandler
实现正是 PooledConnection
自身。PooledConnection.invoke()
方法中只对 close()
方法进行了拦截,具体实现如下:
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
if (CLOSE.equals(methodName)) {
// 如果调用 close() 方法,并没有直接关闭底层连接,而是将其归还给关联的连接池
dataSource.pushConnection(this);
return null;
}
try {
if (!Object.class.equals(method.getDeclaringClass())) {
// 只要不是 Object 的方法,都需要检测当前 PooledConnection 是否可用
// 如果连接不可用,就抛出异常
checkConnection();
}
// 如果连接可用,调用 realConnection 的对应方法
return method.invoke(realConnection, args);
} // catch ...
}
PoolState
接下来看 PoolState
这个类,它负责管理连接池中所有 PooledConnection
对象的状态,维护了两个 ArrayList <PooledConnection>
集合,按照 PooledConnection
对象的状态分类存储,其中:
idleConnections
集合用来存储空闲状态的PooledConnection
对象;activeConnections
集合用来存储活跃状态的PooledConnection
对象。
另外,PoolState
中还定义了多个 long
类型的统计字段:
public class PoolState {
// 请求数据库连接的次数
protected long requestCount = 0;
// 获取连接的累积耗时
protected long accumulatedRequestTime = 0;
// 所有连接的 checkoutTime 累加。PooledConnection 中有一个 checkoutTime 属性,
// 表示的是使用方从连接池中取出连接到归还连接的总时长,也就是连接被使用的时长
protected long accumulatedCheckoutTime = 0;
// 当连接长时间未归还给连接池时,会被认为该连接超时,该字段记录了超时的连接个数
protected long claimedOverdueConnectionCount = 0;
// 记录了累积超时时间
protected long accumulatedCheckoutTimeOfOverdueConnections = 0;
// 当连接池全部连接已经被占用之后,新的请求会阻塞等待,该字段就记录了累积的阻塞等待总时间
protected long accumulatedWaitTime = 0;
// 记录了阻塞等待总次数
protected long hadToWaitCount = 0;
// 无效的连接数
protected long badConnectionCount = 0;
}
获取连接
在了解了 PooledConnection
和 PooledState
的核心实现之后,我们再来看 PooledDataSource
实现,这里按照使用方的逻辑依次分析 PooledDataSource
的核心方法。
首先是 getConnection()
方法,其中先是依赖 popConnection()
方法获取 PooledConnection
对象,然后从 PooledConnection
中获取数据库连接的代理对象(即前面介绍的 proxyConnection
字段)。这里调用的 popConnection()
方法是从连接池中获取数据库连接的核心。
其核心实现总结为如下步骤:
- 检测当前连接池中是否有空闲的有效连接,如果有,则直接返回连接;如果没有,则继续执行下一步。
- 检查连接池当前的活跃连接数是否已经达到上限值,如果未达到,则尝试创建一个新的数据库连接,并在创建成功之后,返回新建的连接;如果已达到最大上限,则往下执行。
- 检查活跃连接中是否有连接超时,如果有,则将超时的连接从活跃连接集合中移除,并重复步骤 2;如果没有,则执行下一步。
- 当前请求数据库连接的线程阻塞等待,并定期执行前面三步检测相应的分支是否可能获取连接。
释放连接
前面介绍 PooledConnection
的时候,我们提到当调用 proxyConnection
对象的 close()
方法时,连接并没有真正关闭,而是通过 PooledDataSource.pushConnection()
方法将 PooledConnection
归还给了关联的 PooledDataSource
。
其核心实现总结为如下步骤:
- 从活跃连接集合(即前面提到的
activeConnections
集合)中删除传入的PooledConnection
对象。 - 检测该
PooledConnection
对象是否可用。如果连接已不可用,则递增badConnectionCount
字段进行统计,之后,直接丢弃PooledConnection
对象即可。如果连接依旧可用,则执行下一步。 - 检测当前
PooledDataSource
连接池中的空闲连接是否已经达到上限值。如果达到上限值,则PooledConnection
无法放回到池中,正常关闭其底层的数据库连接即可。如果未达到上限值,则继续执行下一步。 - 将底层连接重新封装成
PooledConnection
对象,并添加到空闲连接集合(也就是前面提到的idleConnections
集合),然后唤醒所有阻塞等待空闲连接的线程。
检测连接可用性
通过对上述 pushConnection()
方法和 popConnection()
方法的分析,我们大致了解了 PooledDataSource
的核心实现。正如我们看到的那样,这两个方法都需要检测一个数据库连接是否可用,这是通过 PooledConnection.isValid()
方法实现的,在该方法中会检测三个方面:
valid
字段值为true
;realConnection
字段值不为空;- 执行
PooledDataSource.pingConnection()
方法,返回值为true
。
只有这三个条件都成立,才认为这个 PooledConnection
对象可用。其中,PooledDataSource.pingConnection()
方法会尝试请求数据库,并执行一条测试 SQL 语句,检测是否真的能够访问到数据库,该方法的核心代码如下:
public boolean isValid() {
return valid && realConnection != null && dataSource.pingConnection(this);
}
protected boolean pingConnection(PooledConnection conn) {
boolean result = true; // 记录此次 ping 操作是否成功完成
try {
// 检测底层数据库连接是否已经关闭
result = !conn.getRealConnection().isClosed();
} catch (SQLException e) {
result = false;
}
// 如果底层与数据库的网络连接没断开,则需要检测 poolPingEnabled 字段的配置,决定
// 是否能执行 ping 操作。另外,ping 操作不能频繁执行,只有超过一定时长
// (超过 poolPingConnectionsNotUsedFor 指定的时长)未使用的连接,才需要 ping
// 操作来检测数据库连接是否正常
if (result && poolPingEnabled && poolPingConnectionsNotUsedFor >= 0
&& conn.getTimeElapsedSinceLastUse() > poolPingConnectionsNotUsedFor) {
try {
// 执行 poolPingQuery 字段中记录的测试 SQL 语句
Connection realConn = conn.getRealConnection();
try (Statement statement = realConn.createStatement()) {
statement.executeQuery(poolPingQuery).close();
}
if (!realConn.getAutoCommit()) {
realConn.rollback();
}
result = true; // 不抛异常,即为成功
} catch (Exception e) {
try {
// 关闭真实连接
conn.getRealConnection().close();
} catch (Exception e2) {
// ignore
}
result = false; // 抛异常,即为失败
}
}
return result;
}
transaction.Transaction
介绍完 MyBatis 对数据源的实现之后,我们接下来看与数据源紧密关联的另一个概念——事务。
当我们从数据源中得到一个可用的数据库连接之后,就可以开启一个数据库事务了,事务成功开启之后,我们才能修改数据库中的数据。在修改完成之后,我们需要提交事务,完成整个事务内的全部修改操作,如果修改过程中出现异常,我们也可以回滚事务,放弃整个事务中的全部修改操作。
可见,控制事务在一个以数据库为基础的服务中,是一件非常重要的工作。为此,MyBatis 专门抽象出来一个 Transaction
接口,好在相较于我们上面讲述的数据源,这部分内容还是比较简单、比较好理解的。
org.apache.ibatis.transaction.Transaction
接口是 MyBatis 中对数据库事务的抽象,其中定义了提交事务、回滚事务,以及获取事务底层数据库连接的方法。同时, MyBatis 自带的两个 Transaction
接口实现 JdbcTransaction
和 ManagedTransaction
,这里也使用到了工厂方法模式,如下图所示:
![Transaction](images/transaction.webp)
transaction.TransactionFactory
TransactionFactory
是用于创建 Transaction
的工厂接口,其中最核心的方法是 newTransaction()
方法,它会根据数据库连接或数据源创建 Transaction
对象。
JdbcTransactionFactory
和 ManagedTransactionFactory
是 TransactionFactory
的两个实现类,分别用来创建 JdbcTransaction
对象和 ManagedTransaction
对象,本质也是工厂方法模式的实现。
这两个工厂的实现特别简单,以 JdbcTransactionFactory
为例看下他的源码:
public class JdbcTransactionFactory implements TransactionFactory {
private boolean skipSetAutoCommitOnClose;
@Override
public void setProperties(Properties props) {
if (props == null) {
return;
}
String value = props.getProperty("skipSetAutoCommitOnClose");
if (value != null) {
skipSetAutoCommitOnClose = Boolean.parseBoolean(value);
}
}
@Override
public Transaction newTransaction(Connection conn) {
return new JdbcTransaction(conn);
}
@Override
public Transaction newTransaction(DataSource ds, TransactionIsolationLevel level, boolean autoCommit) {
return new JdbcTransaction(ds, level, autoCommit, skipSetAutoCommitOnClose);
}
}
jdbc.JdbcTransaction
接下来,我们看一下 JdbcTransaction
的实现,其中维护了事务关联的数据库连接以及数据源对象,同时还记录了事务自身的属性,例如“事务隔离级别”和“是否自动提交”等。
public class JdbcTransaction implements Transaction {
// 数据库连接
protected Connection connection;
// 数据源
protected DataSource dataSource;
// 隔离级别
protected TransactionIsolationLevel level;
// 是否自动提交
protected boolean autoCommit;
}
在日常使用数据库事务的时候,我们最常用的操作就是提交和回滚事务,Transaction
接口将这两个操作抽象为 commit()
方法和 rollback()
方法。在 commit()
方法和 rollback()
方法中,JdbcTransaction
都是通过 java.sql.Connection
的同名方法实现事务的提交和回滚的。
@Override
public void commit() throws SQLException {
if (connection != null && !connection.getAutoCommit()) {
connection.commit();
}
}
@Override
public void rollback() throws SQLException {
if (connection != null && !connection.getAutoCommit()) {
connection.rollback();
}
}
managed.ManagedTransaction
ManagedTransaction
的实现相较于 JdbcTransaction
来说,有些许类似,也是依赖关联的 DataSource
获取数据库连接,但其 commit()
、rollback()
方法都是空实现,事务的提交和回滚都是依靠容器管理的,这也是它被称为 ManagedTransaction
的原因。
@Override
public void commit() throws SQLException {
// Does nothing
}
@Override
public void rollback() throws SQLException {
// Does nothing
}
另外,与 JdbcTransaction
不同的是,ManagedTransaction
会根据初始化时传入的 closeConnection
值确定是否在事务关闭时,同时关闭关联的数据库连接(即调用其 close()
方法)。