博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spring4.0.3+Hiberbate4.3.5实现Mysql主从集群读写分离数据源组件
阅读量:6246 次
发布时间:2019-06-22

本文共 16206 字,大约阅读时间需要 54 分钟。

目的

      1,读写分离,提高系统响应能力,大部分高并发访问的web都是读大与写,我希望,以大量的读请求分布大多台Mysql从库上。

  2,Mysql集群系统架构对业务开发的透明性,也就是做业务开发不需要关系底层存储的分布式架构。

以上是最主要的两点考虑,基于以上考虑,写了一个数据源组件,当然数据源组件还包括,redis集群分片,mongo集群分片的处理,此文是说明Mysql集群的读写分离。

实现

        实现过程只是说明思路,并提供关键代码片段。

Mysql集群配置信息

       

  • 上面部分是c3p连接池的基本配置,当然我做了基本分类:低配,中配,高配。
  • 下面部分是集群内的主和从的配置信息。
    mysql_cluster_defaut:集群的名字,随意定义,但是要以mysql_cluster为前缀,看解析工厂就知道了。
    mysql_cluster_defaut_master:集群的master前缀。
    mysql_cluster_defaut_slave:集群的slave前缀。
    mysql_cluster_defaut_slave_0:集群的第一个slave,之后以此类推。
### c3p pool configuration ###config_small.driverClass=com.mysql.jdbc.Driverconfig_small.minPoolSize=5config_small.maxPoolSize=30config_small.initialPoolSize=10config_small.maxIdleTime=60config_small.acquireIncrement=5config_small.maxStatements=0config_small.idleConnectionTestPeriod=60config_small.acquireRetryAttempts=30config_small.breakAfterAcquireFailure=trueconfig_small.testConnectionOnCheckout=falseconfig_medium.driverClass=com.mysql.jdbc.Driverconfig_medium.minPoolSize=30config_medium.maxPoolSize=100config_medium.initialPoolSize=50config_medium.maxIdleTime=60config_medium.acquireIncrement=5config_medium.maxStatements=0config_medium.idleConnectionTestPeriod=60config_medium.acquireRetryAttempts=30config_medium.breakAfterAcquireFailure=trueconfig_medium.testConnectionOnCheckout=falseconfig_large.driverClass=com.mysql.jdbc.Driverconfig_large.minPoolSize=100config_large.maxPoolSize=1000config_large.initialPoolSize=200config_large.maxIdleTime=60config_large.acquireIncrement=5config_large.maxStatements=0config_large.idleConnectionTestPeriod=60config_large.acquireRetryAttempts=30config_large.breakAfterAcquireFailure=trueconfig_large.testConnectionOnCheckout=false### mysql master-slaves configuration ###mysql_cluster_defaut_master.jdbcUrl=jdbc:mysql://host:port/db?useUnicode=true&characterEncoding=utf8mysql_cluster_defaut_master.user=rootmysql_cluster_defaut_master.password=pwdmysql_cluster_defaut_slave_0.jdbcUrl=jdbc:mysql://host:port/db?useUnicode=true&characterEncoding=utf8mysql_cluster_defaut_slave_0.user=rootmysql_cluster_defaut_slave_0.password=pwdmysql_cluster_defaut_slave_1.jdbcUrl=jdbc:mysql://host:port/db?useUnicode=true&characterEncoding=utf8mysql_cluster_defaut_slave_1.user=rootmysql_cluster_defaut_slave_1.password=pwd

配置解析

       有了上面的配置文件,咱得把他解析出来呀。

  • C3P0PoolConfig:把C3P0Pool的属性抽象个Bean
    public class C3P0PoolConfig {    private String driverClass;    private int minPoolSize;    private int maxPoolSize;    private int initialPoolSize;    private int maxIdleTime;    private int acquireIncrement;    private int maxStatements;    private int idleConnectionTestPeriod;    private int acquireRetryAttempts;    private boolean breakAfterAcquireFailure;    private boolean testConnectionOnCheckout;    //getter和setter略,占地方}
  • Constants:一些基本的常量定义,比如:各种前缀。
    public class Constants {    public static final String KEY_SEPARATOR = "_";    public static final String MASTER = "master";    public static final String JDBC_URL = "jdbcUrl";    public static final String USER = "user";    public static final String PASSWORD = "password";    public static final String LARGE = "large";    public static final String SMALL = "small";    public static final String MEDIUM = "medium";    public static final String KEY_PREFIX_CONFIG = "config_";    public static final String DRIVER_CLASS = "driverClass";    public static final String MIN_POOL_SIZE = "minPoolSize";    public static final String MAX_POOL_SIZE = "maxPoolSize";    public static final String INITIAL_POOL_SIZE = "initialPoolSize";    public static final String MAX_IDLE_TIME = "maxIdleTime";    public static final String ACQUIRE_INCREMENT = "acquireIncrement";    public static final String MAX_STATEMENTS = "maxStatements";    public static final String IDLE_CONNECTION_TEST_PERIOD = "idleConnectionTestPeriod";    public static final String ACQUIRE_RETRY_ATTEMPTS = "acquireRetryAttempts";    public static final String BREAK_AFTER_ACQUIRE_FAILURE = "breakAfterAcquireFailure";    public static final String TEST_CONNECTION_ON_CHECKOUT = "testConnectionOnCheckout";    public static final String KEY_PREFIX_MYSQL_CLUSTER = "mysql_cluster";}
  • MysqlNode:mysql节点对象

          1,是否是master

          2,生成数据源ComboPooledDataSource对象

          

public class MysqlNode {    private C3P0PoolConfig c3P0PoolConfig;    private String nodeKey;    private boolean isMaster;    private String jdbcUrl;    private String user;    private String password;    private ComboPooledDataSource dataSource;    public synchronized ComboPooledDataSource generateDataSource() throws Exception{        if (dataSource!=null)            return dataSource;        dataSource=new ComboPooledDataSource();        dataSource.setDriverClass(c3P0PoolConfig.getDriverClass());        dataSource.setMinPoolSize(c3P0PoolConfig.getMinPoolSize());        dataSource.setMaxPoolSize(c3P0PoolConfig.getMaxPoolSize());        dataSource.setInitialPoolSize(c3P0PoolConfig.getInitialPoolSize());        dataSource.setMaxIdleTime(c3P0PoolConfig.getMaxIdleTime());        dataSource.setAcquireIncrement(c3P0PoolConfig.getAcquireIncrement());        dataSource.setMaxStatements(c3P0PoolConfig.getMaxStatements());        dataSource.setIdleConnectionTestPeriod(c3P0PoolConfig.getIdleConnectionTestPeriod());        dataSource.setAcquireRetryAttempts(c3P0PoolConfig.getAcquireRetryAttempts());        dataSource.setBreakAfterAcquireFailure(c3P0PoolConfig.isBreakAfterAcquireFailure());        dataSource.setTestConnectionOnCheckout(c3P0PoolConfig.isTestConnectionOnCheckout());        dataSource.setJdbcUrl(this.jdbcUrl);        dataSource.setPassword(this.password);        dataSource.setUser(this.user);        return dataSource;    }    public String getNodeKey() {        return nodeKey;    }    public void setNodeKey(String nodeKey) {        this.nodeKey = nodeKey;    }    public C3P0PoolConfig getC3P0PoolConfig() {        return c3P0PoolConfig;    }    public void setC3P0PoolConfig(C3P0PoolConfig c3P0PoolConfig) {        this.c3P0PoolConfig = c3P0PoolConfig;    }    public boolean isMaster() {        return isMaster;    }    public void setMaster(boolean isMaster) {        this.isMaster = isMaster;    }    public String getJdbcUrl() {        return jdbcUrl;    }    public void setJdbcUrl(String jdbcUrl) {        this.jdbcUrl = jdbcUrl;    }    public String getUser() {        return user;    }    public void setUser(String user) {        this.user = user;    }    public String getPassword() {        return password;    }    public void setPassword(String password) {        this.password = password;    }}
  • MysqlDataSourceFactory:工厂类,解析配置。
    private static Map
    configMap = new ConcurrentHashMap
    (); //master数据源 private static MysqlNode masterNode; //slaves数据源 private static Map
    slavesNodes = new ConcurrentHashMap
    (); public static void loadProperties(String propertyFile,String poolConfigShort) throws Exception { final PropertiesManager propertiesManager = new PropertiesManager(propertyFile); Map
    mysqlConfigs = propertiesManager.getPropsByPrefix(Constants.KEY_PREFIX_CONFIG); for (String configKey : mysqlConfigs.keySet()) { Map mysqlConfig = (Map) mysqlConfigs.get(configKey); C3P0PoolConfig poolConfig = new C3P0PoolConfig(); poolConfig.setDriverClass(MapUtils.getString(mysqlConfig, Constants.DRIVER_CLASS)); poolConfig.setMinPoolSize(MapUtils.getIntValue(mysqlConfig, Constants.MIN_POOL_SIZE)); poolConfig.setMaxPoolSize(MapUtils.getIntValue(mysqlConfig, Constants.MAX_POOL_SIZE)); poolConfig.setInitialPoolSize(MapUtils.getIntValue(mysqlConfig, Constants.INITIAL_POOL_SIZE)); poolConfig.setMaxIdleTime(MapUtils.getIntValue(mysqlConfig, Constants.MAX_IDLE_TIME)); poolConfig.setAcquireIncrement(MapUtils.getIntValue(mysqlConfig, Constants.ACQUIRE_INCREMENT)); poolConfig.setMaxStatements(MapUtils.getIntValue(mysqlConfig, Constants.MAX_STATEMENTS)); poolConfig.setIdleConnectionTestPeriod(MapUtils.getIntValue(mysqlConfig, Constants.IDLE_CONNECTION_TEST_PERIOD)); poolConfig.setAcquireRetryAttempts(MapUtils.getIntValue(mysqlConfig, Constants.ACQUIRE_RETRY_ATTEMPTS)); poolConfig.setBreakAfterAcquireFailure(MapUtils.getBooleanValue(mysqlConfig, Constants.BREAK_AFTER_ACQUIRE_FAILURE)); poolConfig.setTestConnectionOnCheckout(MapUtils.getBooleanValue(mysqlConfig, Constants.TEST_CONNECTION_ON_CHECKOUT)); configMap.put(configKey, poolConfig); } C3P0PoolConfig config=getC3P0PoolConfig(poolConfigShort); Set
    mysqlNodes = propertiesManager.getGroups(Constants.KEY_PREFIX_MYSQL_CLUSTER); for (String node : mysqlNodes) { Map
    nodeConfigs = propertiesManager.getPropsByPrefix(node); for(String nodeKey:nodeConfigs.keySet()){ Map nodeConfig = (Map) nodeConfigs.get(nodeKey); MysqlNode mysqlNode=new MysqlNode(); mysqlNode.setJdbcUrl(MapUtils.getString(nodeConfig,Constants.JDBC_URL)); mysqlNode.setUser(MapUtils.getString(nodeConfig,Constants.USER)); mysqlNode.setPassword(MapUtils.getString(nodeConfig,Constants.PASSWORD)); mysqlNode.setNodeKey(nodeKey); mysqlNode.setC3P0PoolConfig(config); if(node.contains(Constants.MASTER)){ mysqlNode.setMaster(true); masterNode=mysqlNode; continue; } slavesNodes.put(nodeKey,mysqlNode); } } } public static C3P0PoolConfig getC3P0PoolConfig(String poolConfig){ for(String key:configMap.keySet()){ if(key.contains(poolConfig)){ return configMap.get(key); } } return null; } /** * 获取master数据源 * @return */ public static ComboPooledDataSource getMasterMasterDataSource() throws Exception{ return masterNode.generateDataSource(); } /** * 获取slaves数据源 * @return */ public static Set
    getSlavesDataSourceSet() throws Exception{ Set
    slavesDataSource=new HashSet
    (); for (MysqlNode node:slavesNodes.values()){ slavesDataSource.add(node.generateDataSource()); } return slavesDataSource; } /** * 获取slaves数据源 * @return */ public static Map
    getSlavesDataSourceMap() throws Exception{ Map
    slavesDataSource=new HashMap
    (); for (MysqlNode node:slavesNodes.values()){ slavesDataSource.put(node.getNodeKey(), node.generateDataSource()); } return slavesDataSource; } /** * 获取指定节点key的数据源 * @param slaveKey * @return */ public static ComboPooledDataSource getSlaveDataSource(String slaveKey) throws Exception{ return slavesNodes.get(slaveKey).generateDataSource(); } public static Map
    getConfigMap() { return configMap; } public static void setConfigMap(Map
    configMap) { MysqlDataSourceFactory.configMap = configMap; } public static MysqlNode getMasterNode() { return masterNode; } public static void setMasterNode(MysqlNode masterNode) { MysqlDataSourceFactory.masterNode = masterNode; } public static Map
    getSlavesNodes() { return slavesNodes; } public static void setSlavesNodes(Map
    slavesNodes) { MysqlDataSourceFactory.slavesNodes = slavesNodes; } public static void main(String[] args) throws Exception { MysqlDataSourceFactory.loadProperties("jdbc.properties","small"); //System.out.print(new Random().nextInt(4)); }

    比较简单就是解析之前的jdbc.properties文件。

  • 动态数据源与切换

         之前都是准备工作,现在开始使用之前的配置信息了。

  • DynamicDataSource:动态数据源
    public class DynamicDataSource  extends AbstractRoutingDataSource {    //低配,中配,高配    private String poolConfig;    public DynamicDataSource(String poolConfig) throws Exception{        this.poolConfig=poolConfig;        MysqlDataSourceFactory.loadProperties("jdbc.properties",poolConfig);        if(Constants.LARGE.equalsIgnoreCase(poolConfig)){            this.setDefaultTargetDataSource(MysqlDataSourceFactory.getMasterMasterDataSource());            this.setTargetDataSources(MysqlDataSourceFactory.getSlavesDataSourceMap());        }else if(Constants.MEDIUM.equalsIgnoreCase(poolConfig)){            this.setDefaultTargetDataSource(MysqlDataSourceFactory.getMasterMasterDataSource());            this.setTargetDataSources(MysqlDataSourceFactory.getSlavesDataSourceMap());        }else if(Constants.SMALL.equalsIgnoreCase(poolConfig)){            this.setDefaultTargetDataSource(MysqlDataSourceFactory.getMasterMasterDataSource());            this.setTargetDataSources(MysqlDataSourceFactory.getSlavesDataSourceMap());        }else{            throw new DatasourceException("datasource: illegal pool config:"+poolConfig+".should be one of \"small\",\"large\" or \"medium\".");        }    }    public void setPoolConfig(String poolConfig) {        this.poolConfig = poolConfig;    }    public void init() throws Exception{        System.out.println("init DynamicDataSource...");    }    @Override    protected Object determineCurrentLookupKey() {        return DataSourceSwitcher.getDataSource();    }}
  • 数据源切换器
    public class DataSourceSwitcher {    @SuppressWarnings("rawtypes")    private static final ThreadLocal contextHolder = new ThreadLocal();    @SuppressWarnings("unchecked")    public static void setDataSource(String lookupKey) {        contextHolder.set(lookupKey);    }    public static void setMaster() {        clearDataSource();    }    public static void setSlave() {        Object slaveKey=MysqlDataSourceFactory.getSlavesNodes().keySet().toArray()[new Random().nextInt(MysqlDataSourceFactory.getSlavesNodes().size())];        setDataSource(slaveKey.toString());    }    public static String getDataSource() {        return (String) contextHolder.get();    }    public static void clearDataSource() {        contextHolder.remove();    }}

    以上两部分核心就是

    动态数据源实现了determineCurrentLookupKey方法和ThreadLocal contextHolder,可以看下AbstractRoutingDataSource类中的抽象方法determineCurrentLookupKey的说明,如下:
  • /**     * Determine the current lookup key. This will typically be     * implemented to check a thread-bound transaction context.     * 

    Allows for arbitrary keys. The returned key needs * to match the stored lookup key type, as resolved by the * {

    @link #resolveSpecifiedLookupKey} method. */ protected abstract Object determineCurrentLookupKey();

  • 切面DataSourceAdvice:master和slave的切换
  • public class DataSourceAdvice implements MethodBeforeAdvice, AfterReturningAdvice, ThrowsAdvice {    public void before(Method method, Object[] args, Object target) throws Throwable {        if(method.getName().startsWith("add")                || method.getName().startsWith("create")                || method.getName().startsWith("save")                || method.getName().startsWith("edit")                || method.getName().startsWith("update")                || method.getName().startsWith("delete")                || method.getName().startsWith("remove")){            DataSourceSwitcher.setMaster();        }        else  {            DataSourceSwitcher.setSlave();        }    }    public void afterReturning(Object arg0, Method method, Object[] args, Object target) throws Throwable {    }    public void afterThrowing(Method method, Object[] args, Object target, Exception ex) throws Throwable {        DataSourceSwitcher.setSlave();    }}

    Spring的配置

  • small
    org.hibernate.dialect.MySQLDialect
    true
    true
    create

    全文完,done~

转载于:https://www.cnblogs.com/requelqi/p/3740824.html

你可能感兴趣的文章
Spring整合shiro,使用jdbcTmplate操作SessionDAO
查看>>
Hibernate所鼓励的7大措施
查看>>
Python对进程Multiprocessing基础
查看>>
Shell脚本语法
查看>>
scrapy与xpath的坑
查看>>
windows 下安装tidylib
查看>>
MapReduce的那些事
查看>>
CentOS6.5环境下OpenSSL实战:自己搭建CA中心,申请,签发,吊销,导入证书,SSL 握手详解...
查看>>
关于:url伪静态
查看>>
Android开发之制作圆形头像自定义View,直接引用工具类,加快开发速度。带有源代码学习...
查看>>
申请微信公众号
查看>>
python中 __name__的使用
查看>>
(译)iPhone: 用公开API创建带小数点的数字键盘 (OS 3.0, OS 4.0)
查看>>
WSUS客户端升级使用命令行快速自动更新系统补丁包
查看>>
如何不让上网影响工作?看看作家怎么做
查看>>
MySQL 获得当前日期时间(以及时间的转换)
查看>>
solrcloud分布式集群部署及索引操作实例
查看>>
PHP URL 重定向 的三种方法(转)
查看>>
ubuntu14.04安装docker
查看>>
Android ADT 离线下载技巧(告别在线安装的麻烦)
查看>>