博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
数据库中间件 MyCAT 源码分析:【单库单表】插入
阅读量:5924 次
发布时间:2019-06-19

本文共 9898 字,大约阅读时间需要 32 分钟。

hot3.png

???关注**微信公众号:【芋艿的后端小屋】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右

1. 概述

内容形态以 顺序图 + 核心代码 为主。

如果有地方表述不错误或者不清晰,欢迎留言。
对于内容形态,非常纠结,如果有建议,特别特别特别欢迎您提出。
微信号:wangwenbin-server。

本文讲解 【单库单表】插入 所涉及到的代码。交互如下图:

单库单表插入简图

整个过程,MyCAT Server 流程如下:

  1. 接收 MySQL Client 请求,解析 SQL。
  2. 获得路由结果,进行路由。
  3. 获得 MySQL 连接,执行 SQL。
  4. 响应执行结果,发送结果给 MySQL Client。

我们逐个步骤分析,一起来看看源码。

2. 接收请求,解析 SQL

【单库单表】插入(01主流程)

【 1 - 2 】

接收一条 MySQL 命令。在【1】之前,还有请求数据读取、拆成单条 MySQL SQL。

【 3 】

不同 MySQL 命令,分发到不同的方法执行。核心代码如下:

1: // ⬇️⬇️⬇️【FrontendCommandHandler.java】  2: public class FrontendCommandHandler implements NIOHandler {  3:   4:     @Override  5:     public void handle(byte[] data) {  6:       7:         // .... 省略部分代码  8:         switch (data[4]) //   9:         { 10:             case MySQLPacket.COM_INIT_DB: 11:                 commands.doInitDB(); 12:                 source.initDB(data); 13:                 break; 14:             case MySQLPacket.COM_QUERY: // 查询命令 15:                 // 计数查询命令 16:                 commands.doQuery(); 17:                 // 执行查询命令 18:                 source.query(data); 19:                 break; 20:             case MySQLPacket.COM_PING: 21:                 commands.doPing(); 22:                 source.ping(); 23:                 break; 24:             // .... 省略部分case 25:         } 26:     } 27:  28: }

INSERT/SELECT/UPDATE/DELETE 等 SQL 归属于 MySQLPacket.COM_QUERY,详细可见:。

##【 4 】

将 二进制数组 解析成 SQL。核心代码如下:

1: // ⬇️⬇️⬇️【FrontendConnection.java】  2: public void query(byte[] data) {  3: 	// 取得语句  4: 	String sql = null;		  5: 	try {  6: 		MySQLMessage mm = new MySQLMessage(data);  7: 		mm.position(5);  8: 		sql = mm.readString(charset);  9: 	} catch (UnsupportedEncodingException e) { 10: 		writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + charset + "'"); 11: 		return; 12: 	}		 13: 	// 执行语句 14: 	this.query( sql ); 15: }

##【 5 】

解析 SQL 类型。核心代码如下:

1: // ⬇️⬇️⬇️【ServerQueryHandler.java】  2: @Override  3: public void query(String sql) {  4: 	// 解析 SQL 类型  5: 	int rs = ServerParse.parse(sql);  6: 	int sqlType = rs & 0xff;  7: 	  8: 	switch (sqlType) {  9: 	//explain sql 10: 	case ServerParse.EXPLAIN: 11: 		ExplainHandler.handle(sql, c, rs >>> 8); 12: 		break; 13: 	// .... 省略部分case 14: 		break; 15: 	case ServerParse.SELECT: 16: 		SelectHandler.handle(sql, c, rs >>> 8); 17: 		break; 18: 	// .... 省略部分case 19: 	default: 20: 		if(readOnly){ 21: 			LOGGER.warn(new StringBuilder().append("User readonly:").append(sql).toString()); 22: 			c.writeErrMessage(ErrorCode.ER_USER_READ_ONLY, "User readonly"); 23: 			break; 24: 		} 25: 		c.execute(sql, rs & 0xff); 26: 	} 27: } 28:  29: 30: // ⬇️⬇️⬇️【ServerParse.java】 31: public static int parse(String stmt) { 32: 	int length = stmt.length(); 33: 	//FIX BUG FOR SQL SUCH AS /XXXX/SQL 34: 	int rt = -1; 35: 	for (int i = 0; i < length; ++i) { 36: 		switch (stmt.charAt(i)) { 37: 		// .... 省略部分case			case 'I': 38: 		case 'i': 39: 			rt = insertCheck(stmt, i); 40: 			if (rt != OTHER) { 41: 				return rt; 42: 			} 43: 			continue; 44: 			// .... 省略部分case 45: 		case 'S': 46: 		case 's': 47: 			rt = sCheck(stmt, i); 48: 			if (rt != OTHER) { 49: 				return rt; 50: 			} 51: 			continue; 52: 			// .... 省略部分case 53: 		default: 54: 			continue; 55: 		} 56: 	} 57: 	return OTHER; 58: }

##【 6 】

执行 SQL,详细解析见下文,核心代码如下:

1: // ⬇️⬇️⬇️【ServerConnection.java】  2: public class ServerConnection extends FrontendConnection {  3: 	public void execute(String sql, int type) {  4: 		// .... 省略代码  5: 		SchemaConfig schema = MycatServer.getInstance().getConfig().getSchemas().get(db);  6: 		if (schema == null) {  7: 			writeErrMessage(ErrorCode.ERR_BAD_LOGICDB,  8: 					"Unknown MyCAT Database '" + db + "'");  9: 			return; 10: 		} 11:  12: 		// .... 省略代码 13:  14: 		// 路由到后端数据库,执行 SQL 15: 		routeEndExecuteSQL(sql, type, schema); 16: 	} 17: 	 18:     public void routeEndExecuteSQL(String sql, final int type, final SchemaConfig schema) { 19: 		// 路由计算 20: 		RouteResultset rrs = null; 21: 		try { 22: 			rrs = MycatServer 23: 					.getInstance() 24: 					.getRouterservice() 25: 					.route(MycatServer.getInstance().getConfig().getSystem(), 26: 							schema, type, sql, this.charset, this); 27:  28: 		} catch (Exception e) { 29: 			StringBuilder s = new StringBuilder(); 30: 			LOGGER.warn(s.append(this).append(sql).toString() + " err:" + e.toString(),e); 31: 			String msg = e.getMessage(); 32: 			writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg == null ? e.getClass().getSimpleName() : msg); 33: 			return; 34: 		} 35:  36: 		// 执行 SQL 37: 		if (rrs != null) { 38: 			// session执行 39: 			session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type); 40: 		} 41: 		 42:  	} 43:  44: }

3. 获得路由结果

【单库单表】插入(02获取路由)

【 1 - 2 】【 12 】

获得路由主流程。核心代码如下:

1: // ⬇️⬇️⬇️【RouteService.java】  2: public RouteResultset route(SystemConfig sysconf, SchemaConfig schema,  3: 		int sqlType, String stmt, String charset, ServerConnection sc)  4: 		throws SQLNonTransientException {  5: 	RouteResultset rrs = null;  6: 	// .... 省略代码  7: 	int hintLength = RouteService.isHintSql(stmt);  8: 	if(hintLength != -1){ // TODO 待读:hint  9: 		// .... 省略代码 10: 		} 11: 	} else { 12: 		stmt = stmt.trim(); 13: 		rrs = RouteStrategyFactory.getRouteStrategy().route(sysconf, schema, sqlType, stmt, 14: 				charset, sc, tableId2DataNodeCache); 15: 	} 16:  17: 	// .... 省略代码		return rrs; 18: } 19: // ⬇️⬇️⬇️【AbstractRouteStrategy.java】 20: @Override 21: public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL, 22: 		String charset, ServerConnection sc, LayerCachePool cachePool) throws SQLNonTransientException { 23:  24: 	// .... 省略代码 25:  26: 	// 处理一些路由之前的逻辑;全局序列号,父子表插入 27: 	if (beforeRouteProcess(schema, sqlType, origSQL, sc) ) { 28: 		return null; 29: 	} 30:  31: 	// .... 省略代码 32:  33: 	// 检查是否有分片 34: 	if (schema.isNoSharding() && ServerParse.SHOW != sqlType) { 35: 		rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt); 36: 	} else { 37: 		RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs); 38: 		if (returnedSet == null) { 39: 			rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool,sqlType,sc); 40: 		} 41: 	} 42:  43: 	return rrs; 44: }

路由 详细解析,我们另开文章,避免内容过多,影响大家对【插入】流程和逻辑的理解。

【 3 - 6 】

路由前置处理。当符合如下三种情况下,进行处理:

{ 1 } 使用全局序列号

insert into table (id, name) values (NEXT VALUE FOR MYCATSEQ_ID, 'name')

{ 2 } ER 子表插入

{ 3 } 主键使用自增 ID 插入:

insert into table (name) values ('name')===>insert into table (id, name) values (NEXT VALUE FOR MYCATSEQ_ID, 'name')

情况 { 1 } { 3 } 情况类似,使用全局序列号。

核心代码如下:

1: // ⬇️⬇️⬇️【AbstractRouteStrategy.java】  2: private boolean beforeRouteProcess(SchemaConfig schema, int sqlType, String origSQL, ServerConnection sc)  3: 		throws SQLNonTransientException {  4: 	return  // 处理 id 使用 全局序列号  5:             RouterUtil.processWithMycatSeq(schema, sqlType, origSQL, sc)  6:             // 处理 ER 子表  7: 			|| (sqlType == ServerParse.INSERT && RouterUtil.processERChildTable(schema, origSQL, sc))  8:             // 处理 id 自增长  9: 			|| (sqlType == ServerParse.INSERT && RouterUtil.processInsert(schema, sqlType, origSQL, sc)); 10: }

RouterUtil.java 处理 SQL 考虑性能,实现会比较 C-style,代码咱就不贴了,传送门: (?该仓库从官方 Fork,逐步完善中文注释,欢迎 Star)

【 7 - 11 】

前置路由处理全局序列号时,添加到全局序列处理器(MyCATSequnceProcessor)。该处理器会异步生成 ID,替换 SQL 内的 NEXT VALUE FOR MYCATSEQ_ 正则。例如:

insert into table (id, name) values (NEXT VALUE FOR MYCATSEQ_ID, 'name')===>insert into table (id, name) values (868348974560579584, 'name')

异步处理完后,调用 ServerConnection#routeEndExecuteSQL(sql, type, schema) 方法重新执行 SQL。

核心代码如下:

1: // ⬇️⬇️⬇️【RouterUtil.java】  2: public static void processSQL(ServerConnection sc,SchemaConfig schema,String sql,int sqlType){  3: 	SessionSQLPair sessionSQLPair = new SessionSQLPair(sc.getSession2(), schema, sql, sqlType);  4: 	MycatServer.getInstance().getSequnceProcessor().addNewSql(sessionSQLPair);  5: }  6: // ⬇️⬇️⬇️【MyCATSequnceProcessor.java】  7: public class MyCATSequnceProcessor {  8: 	private LinkedBlockingQueue
seqSQLQueue = new LinkedBlockingQueue
(); 9: private volatile boolean running=true; 10: 11: public void addNewSql(SessionSQLPair pair) { 12: seqSQLQueue.add(pair); 13: } 14: 15: private void executeSeq(SessionSQLPair pair) { 16: try { 17: 18: // 使用Druid解析器实现sequence处理 @兵临城下 19: DruidSequenceHandler sequenceHandler = new DruidSequenceHandler(MycatServer 20: .getInstance().getConfig().getSystem().getSequnceHandlerType()); 21: 22: // 生成可执行 SQL :目前主要是生成 id 23: String charset = pair.session.getSource().getCharset(); 24: String executeSql = sequenceHandler.getExecuteSql(pair.sql,charset == null ? "utf-8":charset); 25: 26: // 执行 SQL 27: pair.session.getSource().routeEndExecuteSQL(executeSql, pair.type,pair.schema); 28: } catch (Exception e) { 29: LOGGER.error("MyCATSequenceProcessor.executeSeq(SesionSQLPair)",e); 30: pair.session.getSource().writeErrMessage(ErrorCode.ER_YES,"mycat sequnce err." + e); 31: return; 32: } 33: } 34: 35: class ExecuteThread extends Thread { 36: 37: public ExecuteThread() { 38: setDaemon(true); // 设置为后台线程,防止throw RuntimeExecption进程仍然存在的问题 39: } 40: 41: public void run() { 42: while (running) { 43: try { 44: SessionSQLPair pair=seqSQLQueue.poll(100,TimeUnit.MILLISECONDS); 45: if(pair!=null){ 46: executeSeq(pair); 47: } 48: } catch (Exception e) { 49: LOGGER.warn("MyCATSequenceProcessor$ExecutorThread",e); 50: } 51: } 52: } 53: } 54: }

❓此处有个疑问:MyCATSequnceProcessor 是单线程,会不会插入性能有一定的影响?后续咱做下性能测试。

4. 获得 MySQL 连接,执行 SQL

【单库单表】插入(03执行 SQL)

【 1 - 8 】

获得 MySQL 连接。

  • PhysicalDBNode :物理数据库节点。
  • PhysicalDatasource :物理数据库数据源。

【 9 - 13 】

发送 SQL 到 MySQL Server,执行 SQL。

5. 响应执行 SQL 结果

【单库单表】插入(04执行响应)

【 1 - 4 】

处理 MySQL Server 响应数据包。

【 5 - 8 】

发送插入成功结果给 MySQL Client。

转载于:https://my.oschina.net/sword4j/blog/911174

你可能感兴趣的文章
ueditor编辑器使用总结
查看>>
ssh整合之七注解结合xml形式
查看>>
[C puzzle book] Programming styles
查看>>
Javascript&Jquery获取浏览器和屏幕各种高度宽度方法总结及运用
查看>>
js懒加载
查看>>
小波变换 完美通俗解读【转载】
查看>>
python练习---校园管理系统
查看>>
小程序简介
查看>>
表格行的偶数与奇数
查看>>
基础数据类型之列表,元组
查看>>
1124 Raffle for Weibo Followers
查看>>
如何对文献进行阅读与整理
查看>>
GsonFormat插件
查看>>
SharePoint Portal Server 2003深入指南(部分章节--转)1
查看>>
织梦dedecms标签大全总结
查看>>
中文词频统计及词云制作
查看>>
基础知识(14)- 多线程
查看>>
javascript中break和continue
查看>>
使用maven创建web项目【转】
查看>>
ASP.NET状态管理之十一(总结)
查看>>