目的
1,读写分离,提高系统响应能力,大部分高并发访问的web都是读大与写,我希望,以大量的读请求分布大多台Mysql从库上。
2,Mysql集群系统架构对业务开发的透明性,也就是做业务开发不需要关系底层存储的分布式架构。
以上是最主要的两点考虑,基于以上考虑,写了一个数据源组件,当然数据源组件还包括,redis集群分片,mongo集群分片的处理,此文是说明Mysql集群的读写分离。
实现
实现过程只是说明思路,并提供关键代码片段。
Mysql集群配置信息
- 上面部分是c3p连接池的基本配置,当然我做了基本分类:低配,中配,高配。
- 下面部分是集群内的主和从的配置信息。
mysql_cluster_defaut:集群的名字,随意定义,但是要以mysql_cluster为前缀,看解析工厂就知道了。
mysql_cluster_defaut_master:集群的master前缀。
mysql_cluster_defaut_slave:集群的slave前缀。
mysql_cluster_defaut_slave_0:集群的第一个slave,之后以此类推。
### c3p pool configuration ###config_small.driverClass=com.mysql.jdbc.Driverconfig_small.minPoolSize=5config_small.maxPoolSize=30config_small.initialPoolSize=10config_small.maxIdleTime=60config_small.acquireIncrement=5config_small.maxStatements=0config_small.idleConnectionTestPeriod=60config_small.acquireRetryAttempts=30config_small.breakAfterAcquireFailure=trueconfig_small.testConnectionOnCheckout=falseconfig_medium.driverClass=com.mysql.jdbc.Driverconfig_medium.minPoolSize=30config_medium.maxPoolSize=100config_medium.initialPoolSize=50config_medium.maxIdleTime=60config_medium.acquireIncrement=5config_medium.maxStatements=0config_medium.idleConnectionTestPeriod=60config_medium.acquireRetryAttempts=30config_medium.breakAfterAcquireFailure=trueconfig_medium.testConnectionOnCheckout=falseconfig_large.driverClass=com.mysql.jdbc.Driverconfig_large.minPoolSize=100config_large.maxPoolSize=1000config_large.initialPoolSize=200config_large.maxIdleTime=60config_large.acquireIncrement=5config_large.maxStatements=0config_large.idleConnectionTestPeriod=60config_large.acquireRetryAttempts=30config_large.breakAfterAcquireFailure=trueconfig_large.testConnectionOnCheckout=false### mysql master-slaves configuration ###mysql_cluster_defaut_master.jdbcUrl=jdbc:mysql://host:port/db?useUnicode=true&characterEncoding=utf8mysql_cluster_defaut_master.user=rootmysql_cluster_defaut_master.password=pwdmysql_cluster_defaut_slave_0.jdbcUrl=jdbc:mysql://host:port/db?useUnicode=true&characterEncoding=utf8mysql_cluster_defaut_slave_0.user=rootmysql_cluster_defaut_slave_0.password=pwdmysql_cluster_defaut_slave_1.jdbcUrl=jdbc:mysql://host:port/db?useUnicode=true&characterEncoding=utf8mysql_cluster_defaut_slave_1.user=rootmysql_cluster_defaut_slave_1.password=pwd
配置解析
有了上面的配置文件,咱得把他解析出来呀。
- C3P0PoolConfig:把C3P0Pool的属性抽象个Bean
public class C3P0PoolConfig { private String driverClass; private int minPoolSize; private int maxPoolSize; private int initialPoolSize; private int maxIdleTime; private int acquireIncrement; private int maxStatements; private int idleConnectionTestPeriod; private int acquireRetryAttempts; private boolean breakAfterAcquireFailure; private boolean testConnectionOnCheckout; //getter和setter略,占地方}
- Constants:一些基本的常量定义,比如:各种前缀。
public class Constants { public static final String KEY_SEPARATOR = "_"; public static final String MASTER = "master"; public static final String JDBC_URL = "jdbcUrl"; public static final String USER = "user"; public static final String PASSWORD = "password"; public static final String LARGE = "large"; public static final String SMALL = "small"; public static final String MEDIUM = "medium"; public static final String KEY_PREFIX_CONFIG = "config_"; public static final String DRIVER_CLASS = "driverClass"; public static final String MIN_POOL_SIZE = "minPoolSize"; public static final String MAX_POOL_SIZE = "maxPoolSize"; public static final String INITIAL_POOL_SIZE = "initialPoolSize"; public static final String MAX_IDLE_TIME = "maxIdleTime"; public static final String ACQUIRE_INCREMENT = "acquireIncrement"; public static final String MAX_STATEMENTS = "maxStatements"; public static final String IDLE_CONNECTION_TEST_PERIOD = "idleConnectionTestPeriod"; public static final String ACQUIRE_RETRY_ATTEMPTS = "acquireRetryAttempts"; public static final String BREAK_AFTER_ACQUIRE_FAILURE = "breakAfterAcquireFailure"; public static final String TEST_CONNECTION_ON_CHECKOUT = "testConnectionOnCheckout"; public static final String KEY_PREFIX_MYSQL_CLUSTER = "mysql_cluster";}
- MysqlNode:mysql节点对象
1,是否是master
2,生成数据源ComboPooledDataSource对象
public class MysqlNode { private C3P0PoolConfig c3P0PoolConfig; private String nodeKey; private boolean isMaster; private String jdbcUrl; private String user; private String password; private ComboPooledDataSource dataSource; public synchronized ComboPooledDataSource generateDataSource() throws Exception{ if (dataSource!=null) return dataSource; dataSource=new ComboPooledDataSource(); dataSource.setDriverClass(c3P0PoolConfig.getDriverClass()); dataSource.setMinPoolSize(c3P0PoolConfig.getMinPoolSize()); dataSource.setMaxPoolSize(c3P0PoolConfig.getMaxPoolSize()); dataSource.setInitialPoolSize(c3P0PoolConfig.getInitialPoolSize()); dataSource.setMaxIdleTime(c3P0PoolConfig.getMaxIdleTime()); dataSource.setAcquireIncrement(c3P0PoolConfig.getAcquireIncrement()); dataSource.setMaxStatements(c3P0PoolConfig.getMaxStatements()); dataSource.setIdleConnectionTestPeriod(c3P0PoolConfig.getIdleConnectionTestPeriod()); dataSource.setAcquireRetryAttempts(c3P0PoolConfig.getAcquireRetryAttempts()); dataSource.setBreakAfterAcquireFailure(c3P0PoolConfig.isBreakAfterAcquireFailure()); dataSource.setTestConnectionOnCheckout(c3P0PoolConfig.isTestConnectionOnCheckout()); dataSource.setJdbcUrl(this.jdbcUrl); dataSource.setPassword(this.password); dataSource.setUser(this.user); return dataSource; } public String getNodeKey() { return nodeKey; } public void setNodeKey(String nodeKey) { this.nodeKey = nodeKey; } public C3P0PoolConfig getC3P0PoolConfig() { return c3P0PoolConfig; } public void setC3P0PoolConfig(C3P0PoolConfig c3P0PoolConfig) { this.c3P0PoolConfig = c3P0PoolConfig; } public boolean isMaster() { return isMaster; } public void setMaster(boolean isMaster) { this.isMaster = isMaster; } public String getJdbcUrl() { return jdbcUrl; } public void setJdbcUrl(String jdbcUrl) { this.jdbcUrl = jdbcUrl; } public String getUser() { return user; } public void setUser(String user) { this.user = user; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; }}
- MysqlDataSourceFactory:工厂类,解析配置。
private static Map
configMap = new ConcurrentHashMap (); //master数据源 private static MysqlNode masterNode; //slaves数据源 private static Map slavesNodes = new ConcurrentHashMap (); public static void loadProperties(String propertyFile,String poolConfigShort) throws Exception { final PropertiesManager propertiesManager = new PropertiesManager(propertyFile); Map mysqlConfigs = propertiesManager.getPropsByPrefix(Constants.KEY_PREFIX_CONFIG); for (String configKey : mysqlConfigs.keySet()) { Map mysqlConfig = (Map) mysqlConfigs.get(configKey); C3P0PoolConfig poolConfig = new C3P0PoolConfig(); poolConfig.setDriverClass(MapUtils.getString(mysqlConfig, Constants.DRIVER_CLASS)); poolConfig.setMinPoolSize(MapUtils.getIntValue(mysqlConfig, Constants.MIN_POOL_SIZE)); poolConfig.setMaxPoolSize(MapUtils.getIntValue(mysqlConfig, Constants.MAX_POOL_SIZE)); poolConfig.setInitialPoolSize(MapUtils.getIntValue(mysqlConfig, Constants.INITIAL_POOL_SIZE)); poolConfig.setMaxIdleTime(MapUtils.getIntValue(mysqlConfig, Constants.MAX_IDLE_TIME)); poolConfig.setAcquireIncrement(MapUtils.getIntValue(mysqlConfig, Constants.ACQUIRE_INCREMENT)); poolConfig.setMaxStatements(MapUtils.getIntValue(mysqlConfig, Constants.MAX_STATEMENTS)); poolConfig.setIdleConnectionTestPeriod(MapUtils.getIntValue(mysqlConfig, Constants.IDLE_CONNECTION_TEST_PERIOD)); poolConfig.setAcquireRetryAttempts(MapUtils.getIntValue(mysqlConfig, Constants.ACQUIRE_RETRY_ATTEMPTS)); poolConfig.setBreakAfterAcquireFailure(MapUtils.getBooleanValue(mysqlConfig, Constants.BREAK_AFTER_ACQUIRE_FAILURE)); poolConfig.setTestConnectionOnCheckout(MapUtils.getBooleanValue(mysqlConfig, Constants.TEST_CONNECTION_ON_CHECKOUT)); configMap.put(configKey, poolConfig); } C3P0PoolConfig config=getC3P0PoolConfig(poolConfigShort); Set mysqlNodes = propertiesManager.getGroups(Constants.KEY_PREFIX_MYSQL_CLUSTER); for (String node : mysqlNodes) { Map nodeConfigs = propertiesManager.getPropsByPrefix(node); for(String nodeKey:nodeConfigs.keySet()){ Map nodeConfig = (Map) nodeConfigs.get(nodeKey); MysqlNode mysqlNode=new MysqlNode(); mysqlNode.setJdbcUrl(MapUtils.getString(nodeConfig,Constants.JDBC_URL)); mysqlNode.setUser(MapUtils.getString(nodeConfig,Constants.USER)); mysqlNode.setPassword(MapUtils.getString(nodeConfig,Constants.PASSWORD)); mysqlNode.setNodeKey(nodeKey); mysqlNode.setC3P0PoolConfig(config); if(node.contains(Constants.MASTER)){ mysqlNode.setMaster(true); masterNode=mysqlNode; continue; } slavesNodes.put(nodeKey,mysqlNode); } } } public static C3P0PoolConfig getC3P0PoolConfig(String poolConfig){ for(String key:configMap.keySet()){ if(key.contains(poolConfig)){ return configMap.get(key); } } return null; } /** * 获取master数据源 * @return */ public static ComboPooledDataSource getMasterMasterDataSource() throws Exception{ return masterNode.generateDataSource(); } /** * 获取slaves数据源 * @return */ public static Set getSlavesDataSourceSet() throws Exception{ Set slavesDataSource=new HashSet (); for (MysqlNode node:slavesNodes.values()){ slavesDataSource.add(node.generateDataSource()); } return slavesDataSource; } /** * 获取slaves数据源 * @return */ public static Map 比较简单就是解析之前的jdbc.properties文件。
-
动态数据源与切换
之前都是准备工作,现在开始使用之前的配置信息了。
- DynamicDataSource:动态数据源
public class DynamicDataSource extends AbstractRoutingDataSource { //低配,中配,高配 private String poolConfig; public DynamicDataSource(String poolConfig) throws Exception{ this.poolConfig=poolConfig; MysqlDataSourceFactory.loadProperties("jdbc.properties",poolConfig); if(Constants.LARGE.equalsIgnoreCase(poolConfig)){ this.setDefaultTargetDataSource(MysqlDataSourceFactory.getMasterMasterDataSource()); this.setTargetDataSources(MysqlDataSourceFactory.getSlavesDataSourceMap()); }else if(Constants.MEDIUM.equalsIgnoreCase(poolConfig)){ this.setDefaultTargetDataSource(MysqlDataSourceFactory.getMasterMasterDataSource()); this.setTargetDataSources(MysqlDataSourceFactory.getSlavesDataSourceMap()); }else if(Constants.SMALL.equalsIgnoreCase(poolConfig)){ this.setDefaultTargetDataSource(MysqlDataSourceFactory.getMasterMasterDataSource()); this.setTargetDataSources(MysqlDataSourceFactory.getSlavesDataSourceMap()); }else{ throw new DatasourceException("datasource: illegal pool config:"+poolConfig+".should be one of \"small\",\"large\" or \"medium\"."); } } public void setPoolConfig(String poolConfig) { this.poolConfig = poolConfig; } public void init() throws Exception{ System.out.println("init DynamicDataSource..."); } @Override protected Object determineCurrentLookupKey() { return DataSourceSwitcher.getDataSource(); }}
- 数据源切换器
public class DataSourceSwitcher { @SuppressWarnings("rawtypes") private static final ThreadLocal contextHolder = new ThreadLocal(); @SuppressWarnings("unchecked") public static void setDataSource(String lookupKey) { contextHolder.set(lookupKey); } public static void setMaster() { clearDataSource(); } public static void setSlave() { Object slaveKey=MysqlDataSourceFactory.getSlavesNodes().keySet().toArray()[new Random().nextInt(MysqlDataSourceFactory.getSlavesNodes().size())]; setDataSource(slaveKey.toString()); } public static String getDataSource() { return (String) contextHolder.get(); } public static void clearDataSource() { contextHolder.remove(); }}
以上两部分核心就是
动态数据源实现了determineCurrentLookupKey方法和ThreadLocal contextHolder,可以看下AbstractRoutingDataSource类中的抽象方法determineCurrentLookupKey的说明,如下:
-
/** * Determine the current lookup key. This will typically be * implemented to check a thread-bound transaction context. *
Allows for arbitrary keys. The returned key needs * to match the stored lookup key type, as resolved by the * {
@link #resolveSpecifiedLookupKey} method. */ protected abstract Object determineCurrentLookupKey(); - 切面DataSourceAdvice:master和slave的切换
-
public class DataSourceAdvice implements MethodBeforeAdvice, AfterReturningAdvice, ThrowsAdvice { public void before(Method method, Object[] args, Object target) throws Throwable { if(method.getName().startsWith("add") || method.getName().startsWith("create") || method.getName().startsWith("save") || method.getName().startsWith("edit") || method.getName().startsWith("update") || method.getName().startsWith("delete") || method.getName().startsWith("remove")){ DataSourceSwitcher.setMaster(); } else { DataSourceSwitcher.setSlave(); } } public void afterReturning(Object arg0, Method method, Object[] args, Object target) throws Throwable { } public void afterThrowing(Method method, Object[] args, Object target, Exception ex) throws Throwable { DataSourceSwitcher.setSlave(); }}
Spring的配置
-
全文完,done~