背景

最近入职了一家新公司,发现新公司很喜欢用MyBatis-Plus(后面简称MP)工具,并且多数据源也是基于MP提供的多数据源能力,只需要在配置文件中定义好多数据源,然后在Mapper上使用@DS注解标注数据源的名称即可。

配置文件示例:

# DataSource Config
spring:
  datasource:
    dynamic:
      primary: db0
      # 是否启用严格模式,默认不启动. 严格模式下未匹配到数据源直接报错, 非严格模式下则使用默认数据源primary所设置的数据源
      strict: true
      datasource:
        db0:
          driver-class-name: org.h2.Driver
          username: root
          password: test
          url: jdbc:h2:mem:testdb0;MODE=MYSQL;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false
          schema: classpath:db0/schema-user.sql
          data: classpath:db0/data-user.sql
        db1:
          driver-class-name: org.h2.Driver
          username: root
          password: test
          url: jdbc:h2:mem:testdb1;MODE=MYSQL;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false
          schema: classpath:db1/schema-address.sql
          data: classpath:db1/data-address.sql

Mapper文件示例

@DS("db1")
public interface AddressMapper extends BaseMapper<Address> {

}

调用时即可基于@DS注解上的名称自动切换对应的数据源。

然而在自测一个加了事务的更新接口时却报错说找不到对应的表,我感觉非常奇怪,刚刚自测的另一个查询接口好好的,怎么在这个接口就报错表不存在了?

结论先行

先说结论,这个问题的原因是由于更新的接口在service类的入口处添加了Spring提供的@Transactional注解。

MP提供的多数据源能力的大致原理是,通过一个切面拦截标注了@DS的方法,在切面中将@DS注解的数据源名称放入一个ThreadLocal类(DynamicDataSourceContextHolder)。

然后实现了一个自定义的数据源对象DynamicRoutingDataSource,当获取数据库连接时,会从DynamicDataSourceContextHolder中拿到数据源名称,并且返回对应数据源名称的连接。

没有使用事务的情况下,每次操作数据库都会去获取一次连接,每次获取连接都可以正常拿到Mapper上@DS的数据源名称,而使用了@Transactional之后,在开始执行真正的方法之前先开启一个事务,并且事务所使用的数据源是默认数据源。

排查过程

首先通过debug看了下Mybatis-plus动态数据源实现原理:

  1. DynamicDataSourceAnnotationInterceptor拦截@DS注解,并将动态数据源名称放入ThreadLocal(DynamicDataSourceContextHolder.push(dsKey))

  2. 系统中的数据源由mybatis-plus提供的DynamicRoutingDataSource数据源接管,其中存储了多个数据源名称和对应的数据源配置,当调用DataSource.getConnection()时,实际上调用了DynamicRoutingDataSource.getDataSource(String dsKey).getConnection()方法,

  3. 当Mybatis获取SqlSession时,会调用到DynamicRoutingDataSource.getDatasource()方法获取对应的数据源,在getDatasource()中会读取DynamicDataSourceContextHolder中的动态数据源名称,并获取对应的数据源,如果找不到则获取标记为primiry的数据源。

那为什么使用了@Transaction之后会导致动态数据源失效呢?

如果配置了@Transaction注解,那么就会执行到TransactionInterceptor拦截器。

在TransactionInterceptor拦截器中,在启动事务时,会创建Transaction对象,并且在这之后会创建一个连接对象Connection。

问题在于,由于动态数据源时配置在Mapper接口上,而事务注解一般是放在Service类上,所以在在创建这个连接对象时,并没有经过DynamicDataSourceAnnotationInterceptor拦截器并将数据源信息放入DynamicDataSourceContextHolder中,所以此时获取的连接对象是标记为了primary的数据源。

当TransactionInterceptor拦截器执行完后,事务和连接对象都创建完毕,在这个事务期间调用非primary的数据源的Mapper查询时,会触发获取DefaultSqlSessionFactory#openSessionFromDataSource获取SqlSession,一路debug后最后走到DataSourceUtils#doGetConnection,在这个方法中有个判断,如果当前已经在一个事务中,则会先判断是否已经有现成的connection对象,如果有就直接复用。也就是说,即使后面的查询配置了@DS动态数据源的配置,其查询使用的仍然是在事务创建时就生成的connection对象。

解决办法

那如果就是要使用MP的多数据源时使用事务,有没有什么解决办法呢?

方法一:将@DS注解放在@Transaction相同的地方

在前面debug的结果可以知道,多数据源失效的原因在于只获取了一次数据库连接,并且在@Transaction注解的入口处没有正确获取到动态数据源名称,针对这两个问题可以将默认的事务传播行为更改为REQUIRES_NEW,从而再次触发获取数据库连接,并且在有@Transaction注解的方法上也添加上动态数据源的注解。如下图:

再次调用debug发现,当要进入handleSetAward方法时,会先由DynamicDataSourceAnnotationInterceptor拦截将@DS的动态数据源名称保存到DynamicDataSourceContextHolder中。

同时由于有@Transaction注解,会再次进入到事务拦截器中,并且会判断是否已经存在事务,如果已经存在事务,并且事务传播行为为REQUIRES_NEW,则会调用startTransaction创建一个新的事务。

此时再创建事务,并且创建connection对象时,可以拿到动态数据库名称,也就能拿到正确的对应的动态数据源了。

不过这个方案非常不优雅,也不推荐。

方法二:使用MP提供的@DSTransactional注解

使用@DSTransactional很简单,只需要将最外层使用@Transactional的地方替换成@DSTransactional即可。

使用虽简单,但是本着格物致知的精神,我们来看一下它的实现原理。

首先使用IDEA看看哪里有引用到这个注解,发现并没有哪里有引用,使用全局搜索发现在DynamicDataSourceAutoConfiguration类中有使用。

可以看到在这个配置类中定义了一个切面,拦截所有有DSTransactional注解的方法,并且在DynamicLocalTransactionAdvisor中执行拦截的逻辑。

再次进入DynamicLocalTransactionAdvisor类中,代码如下:

@Slf4j
public class DynamicLocalTransactionAdvisor implements MethodInterceptor {

    @Override
    public Object invoke(MethodInvocation methodInvocation) throws Throwable {
        // 如果有xid则直接目标方法
        if (!StringUtils.isEmpty(TransactionContext.getXID())) {
            return methodInvocation.proceed();
        }
        boolean state = true;
        Object o;
        String xid = UUID.randomUUID().toString();
        // 如果没有xid,则生成一个,并且将xid放入ThreadLocal(bind方法中就是调用ThreadLocal的set方法)
        TransactionContext.bind(xid);
        try {
            o = methodInvocation.proceed();
        } catch (Exception e) {
            // 目标方法报错,将state置为false
            state = false;
            throw e;
        } finally {
            // 执行ConnectionFactory的notify方法
            ConnectionFactory.notify(state);
            // 移除xid
            TransactionContext.remove();
        }
        return o;
    }
}

在这个切面中,首先判断TransactionContext中是否有xid,如果有xid则直接执行目标方法,如果没有xid,则生成一个新的xid,并调用TransactionContext.bind(xid)存入上下文。

如果目标方法抛出了异常,则将state更改false,最终执行ConnectionFactory.notify(state)并移除上下文中的xid。

这个方法中没有对事务的操作,重点应该在ConnectionFactory.notify(state)方法中,继续进入ConnectionFactory.notify方法。

public class ConnectionFactory {

    private static final ThreadLocal<Map<String, ConnectionProxy>> CONNECTION_HOLDER =
            new ThreadLocal<Map<String, ConnectionProxy>>() {
                @Override
                protected Map<String, ConnectionProxy> initialValue() {
                    return new ConcurrentHashMap<>();
                }
            };

    /**
    * 将连接对象的autoCommit设置为false,并存入上下文。key是动态数据源名称
    */
    public static void putConnection(String ds, ConnectionProxy connection) {
        Map<String, ConnectionProxy> concurrentHashMap = CONNECTION_HOLDER.get();
        if (!concurrentHashMap.containsKey(ds)) {
            try {
                connection.setAutoCommit(false);
            } catch (SQLException e) {
                e.printStackTrace();
            }
            concurrentHashMap.put(ds, connection);
        }
    }

    /**
    * 获取数据库连接,ds是动态数据源名称
    */
    public static ConnectionProxy getConnection(String ds) {
        return CONNECTION_HOLDER.get().get(ds);
    }

    /**
    * DSTransactional事务执行完成后执行的通知方法
    */
    public static void notify(Boolean state) {
        try {
            Map<String, ConnectionProxy> concurrentHashMap = CONNECTION_HOLDER.get();
            // 遍历当前上下文中的所有连接代理对象
            for (ConnectionProxy connectionProxy : concurrentHashMap.values()) {
                // 调用连接代理对象的notify方法
                connectionProxy.notify(state);
            }
        } finally {
            CONNECTION_HOLDER.remove();
        }
    }

}

继续进入connectionProxy.notify方法

@Slf4j
public class ConnectionProxy implements Connection {

    private Connection connection;

    private String ds;

    public ConnectionProxy(Connection connection, String ds) {
        this.connection = connection;
        this.ds = ds;
    }

    public void notify(Boolean commit) {
        try {
            // 前面传入的state,如果是true则提交事务,如果为false则会滚事务
            if (commit) {
                connection.commit();
            } else {
                connection.rollback();
            }
            connection.close();
        } catch (Exception e) {
            log.error(e.getLocalizedMessage(), e);
        }
    }

}

到这里,已经大致清楚多数据源的提交和回滚是如何实现的了,那事务的开启是什么时候呢?另外这个ConnectionProxy是MP提供的类,这个类是什么时候创建的呢。

在前面追溯源码的过程中注意到的ConnectionFactory类中有个putConnection方法,在putConnection中将autoCommit设置为了false,那直接打个断点debug一下。

通过debug堆栈信息很快找到了调用的是AbstractRoutingDataSource类的getConnectionProxy方法,在这个方法中创建了一个ConnectionProxy类对原connection进行了一层包装。

通过堆栈继续往上追溯,发现了熟悉的getConnection方法,这个方法正是在排查过程中介绍MP多数据源实现原理的DynamicRoutingDataSource类的父类的方法。画个类图看一下:

具体来看下AbstractRoutingDataSource的getConnection方法:

public abstract class AbstractRoutingDataSource extends AbstractDataSource {
    protected abstract DataSource determineDataSource();

    @Override
    public Connection getConnection() throws SQLException {
        String xid = TransactionContext.getXID();
        // 如果当前上下文没有xid,则调用子类的determineDataSource()获取数据源,然后获取数据库连接
        if (StringUtils.isEmpty(xid)) {
            return determineDataSource().getConnection();
        }
        // 如果上下文有xid 
        else {
            String ds = DynamicDataSourceContextHolder.peek();
            // 获取上下文中的动态数据源名称
            ds = StringUtils.isEmpty(ds) ? "default" : ds;
            // 从先看看这个数据源有没有创建过
            ConnectionProxy connection = ConnectionFactory.getConnection(ds);
            // 如果没有创建过则创建一个新的数据库连接,并使用ConnectionProxy进行包装
            return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection()) : connection;
        }
    }
}

到这里,整个DSTransactional的流程大概了解了,总结一下:

  1. 在DynamicLocalTransactionAdvisor中拦截DSTransactional注解,并生成一个xid保存在上下文中

  2. 当进行数据库操作需要获取连接对象时,实际调用的时DynamicRoutingDataSource的父类AbstractRoutingDataSource的getConnection方法

  3. 如果上下文存在xid,则获取上下文保存的动态数据源名称,并根据这个名称先看看该数据源有没有现成的连接对象,如果没有,则创建一个新的连接对象,并用ConnectionProxy进行包装,随后将ConnectionProxy放入上下文中,并将连接对象的autoCommit设置为false。

  4. 当事务方法执行完后,执行DynamicLocalTransactionAdvisor中的finall代码,将上下文中的所有连接对象进行提交或者回滚。

注意点:

看到这里,不知道你有没有注意到不管是多数据源还是DSTransactional实现的事务都是在getConnection中实现的,也就是说必须要调用getConnection方法才能使多数据源或者DSTransactional实现的事务生效,如果在DSTransactional外层的方法中有Spring的@Transactional注解会发生什么呢?

没错,多数据源以及DSTransactional事务都会失效,所以日常使用的时候一定要注意。