Druid核心源码分析DruidDataSource

其他教程   发布日期:2024年10月31日   浏览次数:270

这篇“Druid核心源码分析DruidDataSource”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Druid核心源码分析DruidDataSource”文章吧。

配置读取

druid连接池支持的所有连接参数可在类

  1. com.alibaba.druid.pool.DruidDataSourceFactory
中查看。

配置读取代码:

  1. public void configFromPropety(Properties properties) {
  2. //这方法太长,自己看源码去吧,就是读读属性。。。。
  3. }

整体代码比较简单,就是把配置内容,读取到dataSource。

连接池初始化

首先是简单的判断,加锁:

  1. if (inited) {
  2. //已经被初始化好了,直接return
  3. return;
  4. }
  5. // bug fixed for dead lock, for issue #2980
  6. DruidDriver.getInstance();
  7. /**控制创建移除连接的锁,并且通过条件去控制一个连接的生成消费**/
  8. // public DruidAbstractDataSource(boolean lockFair){
  9. // lock = new ReentrantLock(lockFair);
  10. //
  11. // notEmpty = lock.newCondition();
  12. // empty = lock.newCondition();
  13. // }
  14. final ReentrantLock lock = this.lock;
  15. try {
  16. lock.lockInterruptibly();
  17. } catch (InterruptedException e) {
  18. throw new SQLException("interrupt", e);
  19. }

之后会更新一些JMX的监控指标:

  1. //一些jmx监控指标
  2. this.connectionIdSeedUpdater.addAndGet(this, delta);
  3. this.statementIdSeedUpdater.addAndGet(this, delta);
  4. this.resultSetIdSeedUpdater.addAndGet(this, delta);
  5. this.transactionIdSeedUpdater.addAndGet(this, delta);

druid的监控指标都是通过jmx实现的。

解析连接串:

  1. if (this.jdbcUrl != null) {
  2. //解析连接串
  3. this.jdbcUrl = this.jdbcUrl.trim();
  4. initFromWrapDriverUrl();
  5. }

  1. initFromWrapDriverUrl
方法,除了从jdbc url中解析出连接和驱动信息,后面还把filters的名字,解析成了对应的filter类。
  1. private void initFromWrapDriverUrl() throws SQLException {
  2. if (!jdbcUrl.startsWith(DruidDriver.DEFAULT_PREFIX)) {
  3. return;
  4. }
  5. DataSourceProxyConfig config = DruidDriver.parseConfig(jdbcUrl, null);
  6. this.driverClass = config.getRawDriverClassName();
  7. LOG.error("error url : '" + jdbcUrl + "', it should be : '" + config.getRawUrl() + "'");
  8. this.jdbcUrl = config.getRawUrl();
  9. if (this.name == null) {
  10. this.name = config.getName();
  11. }
  12. for (Filter filter : config.getFilters()) {
  13. addFilter(filter);
  14. }
  15. }

之后在init方法里面,会进行filters的初始化:

  1. //初始化filter 属性
  2. for (Filter filter : filters) {
  3. filter.init(this);
  4. }

之后解析数据库类型:

  1. if (this.dbTypeName == null || this.dbTypeName.length() == 0) {
  2. this.dbTypeName = JdbcUtils.getDbType(jdbcUrl, null);
  3. }

注意枚举值:

  1. com.alibaba.druid.DbType
,这个里面包含了目前durid连接池支持的所有数据源 类型,另外,druid还额外提供了一些驱动类,例如:
  1. elastic_search (1 << 25), // com.alibaba.xdriver.elastic.jdbc.ElasticDriver

clickhouse还提供了负载均衡的驱动类:

  1. com.alibaba.druid.support.clickhouse.BalancedClickhouseDriver

在回到init方法,之后是一堆参数解析,不再说,跳过了。 之后是通过SPI加载自定义的filter:

  1. private void initFromSPIServiceLoader() {
  2. if (loadSpifilterSkip) {
  3. return;
  4. }
  5. if (autoFilters == null) {
  6. List<Filter> filters = new ArrayList<Filter>();
  7. ServiceLoader<Filter> autoFilterLoader = ServiceLoader.load(Filter.class);
  8. for (Filter filter : autoFilterLoader) {
  9. AutoLoad autoLoad = filter.getClass().getAnnotation(AutoLoad.class);
  10. if (autoLoad != null && autoLoad.value()) {
  11. filters.add(filter);
  12. }
  13. }
  14. autoFilters = filters;
  15. }
  16. for (Filter filter : autoFilters) {
  17. if (LOG.isInfoEnabled()) {
  18. LOG.info("load filter from spi :" + filter.getClass().getName());
  19. }
  20. addFilter(filter);
  21. }
  22. }

注意自定义的filter,要使用

  1. com.alibaba.druid.filter.AutoLoad

解析驱动:

  1. protected void resolveDriver() throws SQLException {
  2. if (this.driver == null) {
  3. if (this.driverClass == null || this.driverClass.isEmpty()) {
  4. this.driverClass = JdbcUtils.getDriverClassName(this.jdbcUrl);
  5. }
  6. if (MockDriver.class.getName().equals(driverClass)) {
  7. driver = MockDriver.instance;
  8. } else if ("com.alibaba.druid.support.clickhouse.BalancedClickhouseDriver".equals(driverClass)) {
  9. Properties info = new Properties();
  10. info.put("user", username);
  11. info.put("password", password);
  12. info.putAll(connectProperties);
  13. driver = new BalancedClickhouseDriver(jdbcUrl, info);
  14. } else {
  15. if (jdbcUrl == null && (driverClass == null || driverClass.length() == 0)) {
  16. throw new SQLException("url not set");
  17. }
  18. driver = JdbcUtils.createDriver(driverClassLoader, driverClass);
  19. }
  20. } else {
  21. if (this.driverClass == null) {
  22. this.driverClass = driver.getClass().getName();
  23. }
  24. }
  25. }

其中durid自己的mock驱动和clickhouse的负载均衡的驱动,特殊判断了下,其他走的都是class forname.

之后是exception sorter和checker的一些东西,跟主线剧情关系不大,skip.

之后是一些初始化

  1. JdbcDataSourceStat
,没啥东西。

之后是核心:

  1. connections = new DruidConnectionHolder[maxActive]; //连接数组
  2. evictConnections = new DruidConnectionHolder[maxActive]; //销毁的连接数组
  3. keepAliveConnections = new DruidConnectionHolder[maxActive]; //保持活跃可用的数组

dataSource的连接,都被包装在类

  1. DruidConnectionHolder
中,之后是一个同步去初始化连接还是异步去初始化的连接,总之,是去初始化 连接的过程:
  1. if (createScheduler != null && asyncInit) {
  2. for (int i = 0; i < initialSize; ++i) {
  3. submitCreateTask(true);
  4. }
  5. } else if (!asyncInit) {
  6. // init connections
  7. while (poolingCount < initialSize) {
  8. try {
  9. PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
  10. DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
  11. connections[poolingCount++] = holder;
  12. } catch (SQLException ex) {
  13. LOG.error("init datasource error, url: " + this.getUrl(), ex);
  14. if (initExceptionThrow) {
  15. connectError = ex;
  16. break;
  17. } else {
  18. Thread.sleep(3000);
  19. }
  20. }
  21. }
  22. if (poolingCount > 0) {
  23. poolingPeak = poolingCount;
  24. poolingPeakTime = System.currentTimeMillis();
  25. }
  26. }

初始化的连接个数为连接串里面配置的

  1. initialSize
.

核心初始化方法

  1. com.alibaba.druid.pool.DruidAbstractDataSource#createPhysicalConnection()
,在这方法里面,会拿用户名密码,之后执行真正的获取connection:
  1. public Connection createPhysicalConnection(String url, Properties info) throws SQLException {
  2. Connection conn;
  3. if (getProxyFilters().size() == 0) {
  4. conn = getDriver().connect(url, info);
  5. } else {
  6. conn = new FilterChainImpl(this).connection_connect(info);
  7. }
  8. createCountUpdater.incrementAndGet(this);
  9. return conn;
  10. }

注意,如果配置了filters,则所有操作,都会在操作前执行filter处理链。

  1. public ConnectionProxy connection_connect(Properties info) throws SQLException {
  2. if (this.pos &lt; filterSize) {
  3. return nextFilter()
  4. .connection_connect(this, info);
  5. }
  6. Driver driver = dataSource.getRawDriver();
  7. String url = dataSource.getRawJdbcUrl();
  8. Connection nativeConnection = driver.connect(url, info);
  9. if (nativeConnection == null) {
  10. return null;
  11. }
  12. return new ConnectionProxyImpl(dataSource, nativeConnection, info, dataSource.createConnectionId());
  13. }

再回到主流程init方法,

  1. connections
数组初始化完成之后, 开启额外线程:
  1. createAndLogThread(); //打印连接信息
  2. createAndStartCreatorThread(); //创建连接线程
  3. createAndStartDestroyThread(); //销毁连接线程

先看注释,具体里面的内容后面单独拉出来讲。

之后:

  1. initedLatch.await(); //初始化 latch -1
  2. init = true; //标记已经初始化完成
  3. initedTime = new Date(); //时间
  4. registerMbean(); //为datasource 注册jmx监控指标

最后的最后,如果配置了keepAlive:

  1. if (keepAlive) {
  2. // async fill to minIdle
  3. if (createScheduler != null) {
  4. for (int i = 0; i &lt; minIdle; ++i) {
  5. submitCreateTask(true);
  6. }
  7. } else {
  8. this.emptySignal();
  9. }
  10. }

这时候,会根据配置的活跃连接数

  1. minIdle
,去给datasource的连接,做个保持活跃连接个数,具体后面再说。

连接池使用的核心逻辑

首先,使用数组作为连接的容器,对于真实连接的加入和移除,使用lock就行同步,另外,在加入和移除连接时候,对比生产消费模型,通过lock上的条件,来通知是否可以获取或者加入连接。

  1. public DruidAbstractDataSource(boolean lockFair){
  2. lock = new ReentrantLock(lockFair);
  3. notEmpty = lock.newCondition(); //非空,有连接
  4. empty = lock.newCondition(); //空的
  5. }

另外,默认的fairlock为false

  1. public DruidDataSource(){
  2. this(false);
  3. }
  4. public DruidDataSource(boolean fairLock){
  5. super(fairLock);
  6. configFromPropety(System.getProperties());
  7. }

创建连接

在线程

  1. com.alibaba.druid.pool.DruidDataSource.CreateConnectionThread
中:
  1. if (emptyWait) {
  2. // 必须存在线程等待,才创建连接
  3. if (poolingCount >= notEmptyWaitThreadCount //
  4. && (!(keepAlive && activeCount + poolingCount < minIdle))
  5. && !isFailContinuous()
  6. ) {
  7. empty.await();
  8. }
  9. // 防止创建超过maxActive数量的连接
  10. if (activeCount + poolingCount >= maxActive) {
  11. empty.await();
  12. continue;
  13. }
  14. }

必须存在线程等待获取连接时候,才能创建连接,并且要保持总的连接数,不能超过配置的最大连接。

创建完连接之后,执行

  1. notEmpty.signalAll();
通知消费者。

获取连接

外层代码:

  1. @Override
  2. public DruidPooledConnection getConnection() throws SQLException {
  3. return getConnection(maxWait);
  4. }
  5. public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
  6. init();
  7. if (filters.size() > 0) {
  8. FilterChainImpl filterChain = new FilterChainImpl(this);
  9. return filterChain.dataSource_connect(this, maxWaitMillis);
  10. } else {
  11. return getConnectionDirect(maxWaitMillis);
  12. }
  13. }

忽略掉filter chain,其实最后执行的还是

  1. com.alibaba.druid.pool.DruidDataSource#getConnectionDirect

方法内部:

  1. poolableConnection = getConnectionInternal(maxWaitMillis);
  • 1 , 连接不足,需要直接去创建新的,跟我们初始化一样

  • 2,从connections里面拿

  1. if (maxWait &gt; 0) {
  2. holder = pollLast(nanos);
  3. } else {
  4. holder = takeLast();
  5. }

其中,maxWait默认为-1,配置在init里面:

  1. String property = properties.getProperty("druid.maxWait");
  2. if (property != null && property.length() > 0) {
  3. try {
  4. int value = Integer.parseInt(property);
  5. this.setMaxWait(value);
  6. } catch (NumberFormatException e) {
  7. LOG.error("illegal property 'druid.maxWait'", e);
  8. }
  9. }

这个用于配置拿连接时候,是否在这个时间上进行等待,默认是否,即一直等到拿到连接为止。

直接看下阻塞拿的过程:

  1. DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
  2. try {
  3. //没连接了
  4. while (poolingCount == 0) {
  5. //暗示下创建线程没连接了
  6. emptySignal(); // send signal to CreateThread create connection
  7. if (failFast &amp;&amp; isFailContinuous()) {
  8. throw new DataSourceNotAvailableException(createError);
  9. }
  10. notEmptyWaitThreadCount++;
  11. if (notEmptyWaitThreadCount &gt; notEmptyWaitThreadPeak) {
  12. notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
  13. }
  14. try {
  15. //傻等着创建或者回收,能给整出来点儿连接
  16. notEmpty.await(); // signal by recycle or creator
  17. } finally {
  18. notEmptyWaitThreadCount--;
  19. }
  20. notEmptyWaitCount++;
  21. if (!enable) {
  22. connectErrorCountUpdater.incrementAndGet(this);
  23. if (disableException != null) {
  24. throw disableException;
  25. }
  26. throw new DataSourceDisableException();
  27. }
  28. }
  29. } catch (InterruptedException ie) {
  30. notEmpty.signal(); // propagate to non-interrupted thread
  31. notEmptySignalCount++;
  32. throw ie;
  33. }
  34. //拿数组的最后一个连接
  35. decrementPoolingCount();
  36. DruidConnectionHolder last = connections[poolingCount];
  37. connections[poolingCount] = null;
  38. return last;
  39. }

连接回收

  1. protected void createAndStartDestroyThread() {
  2. destroyTask = new DestroyTask();
  3. //自定义配置销毁 ,适用于连接数非常多的 情况
  4. if (destroyScheduler != null) {
  5. long period = timeBetweenEvictionRunsMillis;
  6. if (period &lt;= 0) {
  7. period = 1000;
  8. }
  9. destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destroyTask, period, period,
  10. TimeUnit.MILLISECONDS);
  11. initedLatch.countDown();
  12. return;
  13. }
  14. String threadName = "Druid-ConnectionPool-Destroy-" + System.identityHashCode(this);
  15. //单线程销毁
  16. destroyConnectionThread = new DestroyConnectionThread(threadName);
  17. destroyConnectionThread.start();
  18. }

实际的销毁:

  1. public class DestroyTask implements Runnable {
  2. public DestroyTask() {
  3. }
  4. @Override
  5. public void run() {
  6. shrink(true, keepAlive);
  7. if (isRemoveAbandoned()) {
  8. removeAbandoned();
  9. }
  10. }
  11. }

最终 执行的还是

  1. shrink
方法。
  1. public void shrink(boolean checkTime, boolean keepAlive) {
  2. try {
  3. lock.lockInterruptibly();
  4. } catch (InterruptedException e) {
  5. return;
  6. }
  7. boolean needFill = false;
  8. int evictCount = 0;
  9. int keepAliveCount = 0;
  10. int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink;
  11. fatalErrorCountLastShrink = fatalErrorCount;
  12. try {
  13. if (!inited) {
  14. return;
  15. }
  16. final int checkCount = poolingCount - minIdle; //需要检测连接的数量
  17. final long currentTimeMillis = System.currentTimeMillis();
  18. for (int i = 0; i < poolingCount; ++i) { //检测目前connections数组中的连接
  19. DruidConnectionHolder connection&amp

以上就是Druid核心源码分析DruidDataSource的详细内容,更多关于Druid核心源码分析DruidDataSource的资料请关注九品源码其它相关文章!