???关注**微信公众号:【芋艿的后端小屋】**有福利:
- RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
- 您对于源码的疑问每条留言都将得到认真回复。甚至不知道如何读源码也可以请教噢。
- 新的源码解析文章实时收到通知。每周更新一篇左右。
1. 概述
内容形态以 顺序图 + 核心代码 为主。
如果有地方表述不错误或者不清晰,欢迎留言。 对于内容形态,非常纠结,如果有建议,特别特别特别欢迎您提出。 微信号:wangwenbin-server。
本文讲解 【单库单表】插入 所涉及到的代码。交互如下图:
整个过程,MyCAT Server 流程如下:
- 接收 MySQL Client 请求,解析 SQL。
- 获得路由结果,进行路由。
- 获得 MySQL 连接,执行 SQL。
- 响应执行结果,发送结果给 MySQL Client。
我们逐个步骤分析,一起来看看源码。
2. 接收请求,解析 SQL
【 1 - 2 】
接收一条 MySQL 命令。在【1】之前,还有请求数据读取、拆成单条 MySQL SQL。
【 3 】
不同 MySQL 命令,分发到不同的方法执行。核心代码如下:
1: // ⬇️⬇️⬇️【FrontendCommandHandler.java】 2: public class FrontendCommandHandler implements NIOHandler { 3: 4: @Override 5: public void handle(byte[] data) { 6: 7: // .... 省略部分代码 8: switch (data[4]) // 9: { 10: case MySQLPacket.COM_INIT_DB: 11: commands.doInitDB(); 12: source.initDB(data); 13: break; 14: case MySQLPacket.COM_QUERY: // 查询命令 15: // 计数查询命令 16: commands.doQuery(); 17: // 执行查询命令 18: source.query(data); 19: break; 20: case MySQLPacket.COM_PING: 21: commands.doPing(); 22: source.ping(); 23: break; 24: // .... 省略部分case 25: } 26: } 27: 28: }
INSERT
/SELECT
/UPDATE
/DELETE
等 SQL 归属于 MySQLPacket.COM_QUERY
,详细可见:。
##【 4 】
将 二进制数组 解析成 SQL。核心代码如下:
1: // ⬇️⬇️⬇️【FrontendConnection.java】 2: public void query(byte[] data) { 3: // 取得语句 4: String sql = null; 5: try { 6: MySQLMessage mm = new MySQLMessage(data); 7: mm.position(5); 8: sql = mm.readString(charset); 9: } catch (UnsupportedEncodingException e) { 10: writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + charset + "'"); 11: return; 12: } 13: // 执行语句 14: this.query( sql ); 15: }
##【 5 】
解析 SQL 类型。核心代码如下:
1: // ⬇️⬇️⬇️【ServerQueryHandler.java】 2: @Override 3: public void query(String sql) { 4: // 解析 SQL 类型 5: int rs = ServerParse.parse(sql); 6: int sqlType = rs & 0xff; 7: 8: switch (sqlType) { 9: //explain sql 10: case ServerParse.EXPLAIN: 11: ExplainHandler.handle(sql, c, rs >>> 8); 12: break; 13: // .... 省略部分case 14: break; 15: case ServerParse.SELECT: 16: SelectHandler.handle(sql, c, rs >>> 8); 17: break; 18: // .... 省略部分case 19: default: 20: if(readOnly){ 21: LOGGER.warn(new StringBuilder().append("User readonly:").append(sql).toString()); 22: c.writeErrMessage(ErrorCode.ER_USER_READ_ONLY, "User readonly"); 23: break; 24: } 25: c.execute(sql, rs & 0xff); 26: } 27: } 28: 29: 30: // ⬇️⬇️⬇️【ServerParse.java】 31: public static int parse(String stmt) { 32: int length = stmt.length(); 33: //FIX BUG FOR SQL SUCH AS /XXXX/SQL 34: int rt = -1; 35: for (int i = 0; i < length; ++i) { 36: switch (stmt.charAt(i)) { 37: // .... 省略部分case case 'I': 38: case 'i': 39: rt = insertCheck(stmt, i); 40: if (rt != OTHER) { 41: return rt; 42: } 43: continue; 44: // .... 省略部分case 45: case 'S': 46: case 's': 47: rt = sCheck(stmt, i); 48: if (rt != OTHER) { 49: return rt; 50: } 51: continue; 52: // .... 省略部分case 53: default: 54: continue; 55: } 56: } 57: return OTHER; 58: }
##【 6 】
执行 SQL,详细解析见下文,核心代码如下:
1: // ⬇️⬇️⬇️【ServerConnection.java】 2: public class ServerConnection extends FrontendConnection { 3: public void execute(String sql, int type) { 4: // .... 省略代码 5: SchemaConfig schema = MycatServer.getInstance().getConfig().getSchemas().get(db); 6: if (schema == null) { 7: writeErrMessage(ErrorCode.ERR_BAD_LOGICDB, 8: "Unknown MyCAT Database '" + db + "'"); 9: return; 10: } 11: 12: // .... 省略代码 13: 14: // 路由到后端数据库,执行 SQL 15: routeEndExecuteSQL(sql, type, schema); 16: } 17: 18: public void routeEndExecuteSQL(String sql, final int type, final SchemaConfig schema) { 19: // 路由计算 20: RouteResultset rrs = null; 21: try { 22: rrs = MycatServer 23: .getInstance() 24: .getRouterservice() 25: .route(MycatServer.getInstance().getConfig().getSystem(), 26: schema, type, sql, this.charset, this); 27: 28: } catch (Exception e) { 29: StringBuilder s = new StringBuilder(); 30: LOGGER.warn(s.append(this).append(sql).toString() + " err:" + e.toString(),e); 31: String msg = e.getMessage(); 32: writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg == null ? e.getClass().getSimpleName() : msg); 33: return; 34: } 35: 36: // 执行 SQL 37: if (rrs != null) { 38: // session执行 39: session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type); 40: } 41: 42: } 43: 44: }
3. 获得路由结果
【 1 - 2 】【 12 】
获得路由主流程。核心代码如下:
1: // ⬇️⬇️⬇️【RouteService.java】 2: public RouteResultset route(SystemConfig sysconf, SchemaConfig schema, 3: int sqlType, String stmt, String charset, ServerConnection sc) 4: throws SQLNonTransientException { 5: RouteResultset rrs = null; 6: // .... 省略代码 7: int hintLength = RouteService.isHintSql(stmt); 8: if(hintLength != -1){ // TODO 待读:hint 9: // .... 省略代码 10: } 11: } else { 12: stmt = stmt.trim(); 13: rrs = RouteStrategyFactory.getRouteStrategy().route(sysconf, schema, sqlType, stmt, 14: charset, sc, tableId2DataNodeCache); 15: } 16: 17: // .... 省略代码 return rrs; 18: } 19: // ⬇️⬇️⬇️【AbstractRouteStrategy.java】 20: @Override 21: public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL, 22: String charset, ServerConnection sc, LayerCachePool cachePool) throws SQLNonTransientException { 23: 24: // .... 省略代码 25: 26: // 处理一些路由之前的逻辑;全局序列号,父子表插入 27: if (beforeRouteProcess(schema, sqlType, origSQL, sc) ) { 28: return null; 29: } 30: 31: // .... 省略代码 32: 33: // 检查是否有分片 34: if (schema.isNoSharding() && ServerParse.SHOW != sqlType) { 35: rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt); 36: } else { 37: RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs); 38: if (returnedSet == null) { 39: rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool,sqlType,sc); 40: } 41: } 42: 43: return rrs; 44: }
路由 详细解析,我们另开文章,避免内容过多,影响大家对【插入】流程和逻辑的理解。
【 3 - 6 】
路由前置处理。当符合如下三种情况下,进行处理:
{ 1 } 使用全局序列号:
insert into table (id, name) values (NEXT VALUE FOR MYCATSEQ_ID, 'name')
{ 2 } ER 子表插入
{ 3 } 主键使用自增 ID 插入:insert into table (name) values ('name')===>insert into table (id, name) values (NEXT VALUE FOR MYCATSEQ_ID, 'name')
情况 { 1 } { 3 } 情况类似,使用全局序列号。
核心代码如下:
1: // ⬇️⬇️⬇️【AbstractRouteStrategy.java】 2: private boolean beforeRouteProcess(SchemaConfig schema, int sqlType, String origSQL, ServerConnection sc) 3: throws SQLNonTransientException { 4: return // 处理 id 使用 全局序列号 5: RouterUtil.processWithMycatSeq(schema, sqlType, origSQL, sc) 6: // 处理 ER 子表 7: || (sqlType == ServerParse.INSERT && RouterUtil.processERChildTable(schema, origSQL, sc)) 8: // 处理 id 自增长 9: || (sqlType == ServerParse.INSERT && RouterUtil.processInsert(schema, sqlType, origSQL, sc)); 10: }
RouterUtil.java
处理 SQL 考虑性能,实现会比较 C-style,代码咱就不贴了,传送门: (?该仓库从官方 Fork,逐步完善中文注释,欢迎 Star)
【 7 - 11 】
当前置路由处理全局序列号时,添加到全局序列处理器(MyCATSequnceProcessor
)。该处理器会异步生成 ID,替换 SQL 内的 NEXT VALUE FOR MYCATSEQ_
正则。例如:
insert into table (id, name) values (NEXT VALUE FOR MYCATSEQ_ID, 'name')===>insert into table (id, name) values (868348974560579584, 'name')
异步处理完后,调用 ServerConnection#routeEndExecuteSQL(sql, type, schema)
方法重新执行 SQL。
核心代码如下:
1: // ⬇️⬇️⬇️【RouterUtil.java】 2: public static void processSQL(ServerConnection sc,SchemaConfig schema,String sql,int sqlType){ 3: SessionSQLPair sessionSQLPair = new SessionSQLPair(sc.getSession2(), schema, sql, sqlType); 4: MycatServer.getInstance().getSequnceProcessor().addNewSql(sessionSQLPair); 5: } 6: // ⬇️⬇️⬇️【MyCATSequnceProcessor.java】 7: public class MyCATSequnceProcessor { 8: private LinkedBlockingQueueseqSQLQueue = new LinkedBlockingQueue (); 9: private volatile boolean running=true; 10: 11: public void addNewSql(SessionSQLPair pair) { 12: seqSQLQueue.add(pair); 13: } 14: 15: private void executeSeq(SessionSQLPair pair) { 16: try { 17: 18: // 使用Druid解析器实现sequence处理 @兵临城下 19: DruidSequenceHandler sequenceHandler = new DruidSequenceHandler(MycatServer 20: .getInstance().getConfig().getSystem().getSequnceHandlerType()); 21: 22: // 生成可执行 SQL :目前主要是生成 id 23: String charset = pair.session.getSource().getCharset(); 24: String executeSql = sequenceHandler.getExecuteSql(pair.sql,charset == null ? "utf-8":charset); 25: 26: // 执行 SQL 27: pair.session.getSource().routeEndExecuteSQL(executeSql, pair.type,pair.schema); 28: } catch (Exception e) { 29: LOGGER.error("MyCATSequenceProcessor.executeSeq(SesionSQLPair)",e); 30: pair.session.getSource().writeErrMessage(ErrorCode.ER_YES,"mycat sequnce err." + e); 31: return; 32: } 33: } 34: 35: class ExecuteThread extends Thread { 36: 37: public ExecuteThread() { 38: setDaemon(true); // 设置为后台线程,防止throw RuntimeExecption进程仍然存在的问题 39: } 40: 41: public void run() { 42: while (running) { 43: try { 44: SessionSQLPair pair=seqSQLQueue.poll(100,TimeUnit.MILLISECONDS); 45: if(pair!=null){ 46: executeSeq(pair); 47: } 48: } catch (Exception e) { 49: LOGGER.warn("MyCATSequenceProcessor$ExecutorThread",e); 50: } 51: } 52: } 53: } 54: }
❓此处有个疑问:MyCATSequnceProcessor
是单线程,会不会插入性能有一定的影响?后续咱做下性能测试。
4. 获得 MySQL 连接,执行 SQL
【 1 - 8 】
获得 MySQL 连接。
- PhysicalDBNode :物理数据库节点。
- PhysicalDatasource :物理数据库数据源。
【 9 - 13 】
发送 SQL 到 MySQL Server,执行 SQL。
5. 响应执行 SQL 结果
【 1 - 4 】
处理 MySQL Server 响应数据包。
【 5 - 8 】
发送插入成功结果给 MySQL Client。