概述
通过@Intercepts注解信息和@Signature注解信息可以了解到,JadeSQLInterceptor会拦截Executor.query(MappedStatement, Object, RowBounds, ResultHandler)和Executor.update(MappedStatement, Object)两个方法
@Intercepts({
@Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class,
RowBounds.class, ResultHandler.class}),
@Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class})})
public class JadeSQLInterceptor implements Interceptor {
// ......
}
介绍完注解信息后,再来看看plugin()方法,具体实现如下:
public Object plugin(Object target) {
return Plugin.wrap(target, this);
}
其中会解析JadeSQLInterceptor中的@Intercepts和@Signature注解的信息,从而确定需要拦截的方法,然后使用JDK动态代理的方式,为JadeSQLInterceptor创建代理对象。在该代理对象中,会拦截Executor.query(MappedStatement, Object, RowBounds, ResultHandler)和Executor.update(MappedStatement, Object),拦截的具体逻辑在JadeSQLInterceptor.intercept()方法中实现的,具体
1 从调用者信息invocation中获取method信息,然后判断method是否有ShardBy注解,获取shard的param,
2 根据这个param分表策略重写sql,再将重写后的sql设置到method的arg[1]中,
3 调用target object的method方法执行后续操作
public Object intercept(Invocation invocation) throws Throwable {
//获取被拦截方法的参数列表
Object[] args = invocation.getArgs();
MappedStatement statement = (MappedStatement) args[INDEX_MAPPED_STATEMENT];
//获取dao类
Class dao = mapperClazz(statement);
//获取dao中执行的method
Method method = shardMapperMethod(statement, dao);
// 在这里判断dao的method中是否有ShardBy注解,如果有,表明需要分表操作。返回ShardBy注解的Param,后面会用到
Object shardByObject = getExecuteParamByAnnotationClass(args, method, ShardBy.class);
Modifier modifier = new Modifier(new Definition(dao), method);
BoundSql boundSql = statement.getBoundSql(args[INDEX_PARAMETER]);
// 如果需要分表则在这里进行分表分析
if (shardByObject != null) {
Configuration configuration = statement.getConfiguration();
// 获取dataSource,这个dataSource是spring的代理dataSource
DataSource dataSource = configuration.getEnvironment() != null ? configuration.getEnvironment().getDataSource() : null;
//获取method中标有Param注解的信息
Map<String, Object> params = getJadeParameters(args, method);
RouterInterpreter interpreter = BeanFactory.getBean("jade.routerInterpreter", RouterInterpreter.class);
//设置到线程变量中,后面解析sql时会用到
SQLThreadLocal.set(getSQLType(statement), boundSql.getSql(), modifier, params);
// 路由了数据源,且重写了sql语句
SQLInterpreterResult result = interpreter.interpret(dataSource, boundSql.getSql(), modifier, params, convertSQLArrayParams(params));
// 将重写后的sql设置到invocation的method的param里
args[INDEX_MAPPED_STATEMENT] = buildMappedStatement(statement, boundSql, result.getSQL());
SQLThreadLocal.set(getSQLType(statement), result.getSQL(), modifier, params);
} else {
SQLThreadLocal.set(getSQLType(statement), boundSql.getSql(), modifier, new HashMap<String, Object>());
}
try {
// 调用method方法
return invocation.proceed();
} finally {
SQLThreadLocal.remove();
}
}
重写sql的逻辑在intercepter.interpret(DataSource datasource, String sql, Modify modify, Map paramAsMap, Object[] paramAsArray)这个方法中。如下:
1 获取配置文件配置的数据源
2 解析sql,得到路由信息
3 依据路由后的结果重写了sql的表名
public SQLInterpreterResult interpret(DataSource dataSource, String sql, Modifier modifier, Map<String, Object> parametersAsMap, Object[] parametersAsArray) {
if (dataSource instanceof DelegatingDataSource) {
//配置文件中配置的数据源
dataSource = ((DelegatingDataSource)dataSource).getTargetDataSource();
}
if (!(dataSource instanceof XnDataSource)) {
return null;
} else {
Assert.notNull(parametersAsArray, "need parametersAsArray prepared before invoking this interpreter!");
String bizName = ((XnDataSource)dataSource).getBizName();
if (logger.isDebugEnabled()) {
logger.debug("Invoking analyzing: " + sql);
}
SQLParseInfo parseInfo = SQLParseInfo.getParseInfo(sql);
//获取sql中所有的table
Table[] tables = parseInfo.getTables();
RouterInterpreter.RoutingInfo routingInfo = null;
if (tables != null) {
int beginIndex = 0;
if (parseInfo.isInsert() && tables.length > 1) {
beginIndex = 1;
}
for(int i = beginIndex; i < tables.length; ++i) {
RoutingDescriptor descriptor = this.routingConfigurator.getDescriptor(bizName, tables[i].getName());
if (descriptor != null) {
routingInfo = new RouterInterpreter.RoutingInfo(tables[i], descriptor);
break;
}
}
}
if (routingInfo == null) {
return null;
} else {
if (logger.isDebugEnabled()) {
logger.debug("Find routing info: " + routingInfo.byTable + ", " + routingInfo.getDbRouterColumn());
}
String forwardTableName = null;
String forwardDbPattern = null;
Object columnValue;
Column column;
if (routingInfo.getTableRouter() != null) {
column = routingInfo.getTableRouterColumn();
columnValue = null;
if (column != null) {
// 从线程变量中获取ShardBy的param
columnValue = findShardParamValue(parseInfo, column, parametersAsMap, parametersAsArray);
if (columnValue == null) {
throw new BadSqlGrammarException("sharding", parseInfo.getSQL(), (SQLException)null);
}
}
// 根据路由策略对shard的param进行路由,获取路由后的表名
forwardTableName = routingInfo.getTableRouter().doRoute(columnValue);
} else if (logger.isDebugEnabled()) {
logger.debug("table router is null for sql "" + sql + """);
}
if (routingInfo.getDbRouter() != null) {
column = routingInfo.getDbRouterColumn();
columnValue = null;
if (column != null) {
columnValue = findShardParamValue(parseInfo, column, parametersAsMap, parametersAsArray);
if (columnValue == null) {
throw new BadSqlGrammarException("sharding", parseInfo.getSQL(), (SQLException)null);
}
}
forwardDbPattern = routingInfo.getDbRouter().doRoute(columnValue);
if (forwardDbPattern != null) {
if (logger.isDebugEnabled()) {
logger.debug("db pattern is '" + forwardDbPattern + "'");
}
parametersAsMap.put(XnDataSource.DB_PATTERN, forwardDbPattern);
} else {
if (logger.isDebugEnabled()) {
logger.debug("db pattern is empty");
}
parametersAsMap.put(XnDataSource.DB_PATTERN, "");
}
} else if (logger.isDebugEnabled()) {
logger.debug("db router is null for sql "" + sql + """);
}
String byTableName = routingInfo.byTable.getName();
String sqlRewrited;
if (forwardTableName != null && !forwardTableName.equals(byTableName)) {
// 将sql中的原table名换成新的路由后的table名
sqlRewrited = SqlRewriter.rewriteSqlTable(sql, byTableName, forwardTableName);
if (logger.isDebugEnabled()) {
logger.debug("Rewriting SQL: n From: " + sql + "n To: " + sqlRewrited);
}
} else {
sqlRewrited = sql;
}
// 返回重写sql的结果
return new RouterSQLInterpreterResult(forwardDbPattern, sqlRewrited, parametersAsArray);
}
}
}
重写了sql的table和路由后的db_pattern,将db_pattern设置到SQLThreadLocal线程变量中,在后续dataSource.getConnection()会用到db_pattern。下面介绍一下dataSource是如何根据db_pattern路由到物理库的。
1 首先配置数据源
<bean id="adminJadeDataSource" class="cn.techwolf.boss.admin.base.mybatis.datasource.JadeDataSourceFactoryBean">
<constructor-arg index="0" ref="jade.dataSourceFactory" />
<constructor-arg index="1" value="boss_admin" />
</bean>
2 配置SessionFactory,将此数据源与SessionFactory绑定
<bean id="bossAdminSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="adminJadeDataSource" />
<property name="configLocation" value="classpath:mybatis/mybatis-config.xml" />
<property name="failFast" value="true"/>
<property name="mapperLocations" value="classpath:mybatis/admin/*DAO.xml" />
</bean>
<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<property name="sqlSessionFactoryBeanName" value="bossAdminSessionFactory" />
<property name="basePackage" ref="adminPackageValue"/>
<!--<property name="nameGenerator" ref="defaultDAOBeanNameGenerator" />-->
</bean>
<!--统一声明各个datasource的dao-->
<bean id="adminPackageValue" class="java.lang.String">
<constructor-arg>
<value>
cn.admin.*.dao.admin
</value>
</constructor-arg>
</bean>
在cn.admin.*.dao.admin包下了mapper文件中的sql执行都会走此逻辑数据源,执行sql时在此逻辑数据源中getConnection()会根据db_pattern选择物理库,下面分析一下dataSource是如何路由到物理库的。
String pattern = (String)local.getParameters().get(DB_PATTERN);
if (pattern == null) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("not found DB_PATTERN, using default patter '' for SQL '" + local.getSql() + "'");
}
pattern = "";
}
Connection conn;
if (!write && !TransactionSynchronizationManager.isSynchronizationActive()) {
conn = this.connectionManager.getReadConnection(this.getBizName(), pattern);
} else {
conn = this.connectionManager.getWriteConnection(this.getBizName(), pattern);
}
在JadeSQLIntecepter中将DB_PATTERN放入到了SOLThreadLocal,例如message加上pattern后缀变成message_70,然后根据配置文件中的bizName也就是逻辑库名和db_pattern来获取connection,connection的获取过程如下:
1 DbAgent根据db也就是bizName如message来获取DataSourcePool,逻辑库与实体库的映射关系配置在zookeeper上,如下的配置
<instance name="message_box" timestamp="2017-11-01 16:00:00" type="router">
<route expression="message_box_[0-9]|boss_message_box_[1-9][0-9]|message_box_[1-4][0-9][0-9]" instance="message_box_0"/>
<route expression="message_box_[5-9][0-9][0-9]" instance="message_box_1"/>
</instance>
type="router"表示message_box这个逻辑库走分库路由,其中message_box_0至message_box_499走message_0库,message_box_500至message_box_999走message_1库。配置信息交给zookeeper管理,并对其进行监听。
public StormDataSourcePool getDsPool(String db) {
StormDataSourcePool ds = (StormDataSourcePool)this.dsPool.get(db);
if (ds == null) {
synchronized(this) {
if (ds == null) {
this.registerClient(db);
this.watchDbConfig(db);
log.debug("not found ds for db " + db + " , retirve");
this.compareAndReload(this.retriveDbConfig(db));
}
}
}
return (StormDataSourcePool)this.dsPool.get(db);
}
根据配置信息的type为router,StormDataSourcePool指向了其子类RouterDsPool,router的路由解析就是在这个RouterDsPool实例的getReadableDs(String pattern)或者getWriteableDs(String pattern)中完成的,调用了findDataSource(String pattern),如下:
protected StormDataSourcePool findDataSource(String pattern) {
Iterator i$ = this.config.getRoutes().iterator();
RouteConfig route;
do {
if (!i$.hasNext()) {
throw new NoRouteMatchExecption(pattern, this.config.getName());
}
route = (RouteConfig)i$.next();
log.debug("Comparing " + pattern + " aginst " + route.getExpression());
} while(!pattern.matches(route.getExpression()));
return this.agent.getDsPool(route.getInstance());
}
匹配了pattern和配置信息中的expression,得到instance,如message_box_75匹配后得到message_box_0,再从zk中获取message_box_0的配置信息,
<instance name="message_box_0" timestamp="2017-11-01 15:00:00" type="singler">
<server charset="utf8" database="message_box" host="192.168.1.1" password="123456" port="3306" priority="" type="mysql" user="test" wrflag="wr"/>
</instance>
配置信息中配置了database的信息,host,port,user, password, 读写库标识等,type为singer,表示不分库,至此得到此连接信息。返回此连接
最后
以上就是美丽云朵为你收集整理的mybatis的一个分库分表插件--jade的全部内容,希望文章能够帮你解决mybatis的一个分库分表插件--jade所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复