Java动态线程池插件dynamic-tp集成zookeeper怎么配置

其他教程   发布日期:2023年07月23日   浏览次数:507

这篇文章主要介绍了Java动态线程池插件dynamic-tp集成zookeeper怎么配置的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Java动态线程池插件dynamic-tp集成zookeeper怎么配置文章都会有所收获,下面我们一起来看看吧。

前言

  1. dynamic-tp
是一个轻量级的动态线程池插件,它是一个基于配置中心的动态线程池,线程池的参数可以通过配置中心配置进行动态的修改,在配置中心的支持上最开始的时候支持
  1. Nacos
  1. Apollo

配置刷新

  1. dynamic-tp
提供了一个刷新配置的接口
  1. Refresher
,抽象类
  1. AbstractRefresher
实现刷新配置接口的刷新配置方法
  1. refresh
,该方法能根据配置类型内容和配置解析配置并刷新动态线程池的相关配置,由
  1. DtpRegistry
负责刷新线程池配置,事件发布订阅模式操作Web容器参数,代码如下:
  1. public interface Refresher {
  2. /**
  3. * Refresh with specify content.
  4. * @param content content
  5. * @param fileType file type
  6. */
  7. void refresh(String content, ConfigFileTypeEnum fileType);
  8. }
  1. @Slf4j
  2. public abstract class AbstractRefresher implements Refresher {
  3. @Resource
  4. private DtpProperties dtpProperties;
  5. @Resource
  6. private ApplicationEventMulticaster applicationEventMulticaster;
  7. @Override
  8. public void refresh(String content, ConfigFileTypeEnum fileTypeEnum) {
  9. if (StringUtils.isBlank(content) || Objects.isNull(fileTypeEnum)) {
  10. return;
  11. }
  12. try {
  13. // 根据配置内容和配置类型将配置内容转成Map
  14. val prop = ConfigHandler.getInstance().parseConfig(content, fileTypeEnum);
  15. doRefresh(prop);
  16. } catch (IOException e) {
  17. log.error("DynamicTp refresh error, content: {}, fileType: {}",
  18. content, fileTypeEnum, e);
  19. }
  20. }
  21. private void doRefresh(Map<Object, Object> properties) {
  22. // 将Map中的配置转换成DtpProperties
  23. ConfigurationPropertySource sources = new MapConfigurationPropertySource(properties);
  24. Binder binder = new Binder(sources);
  25. ResolvableType type = ResolvableType.forClass(DtpProperties.class);
  26. Bindable<?> target = Bindable.of(type).withExistingValue(dtpProperties);
  27. binder.bind(MAIN_PROPERTIES_PREFIX, target);
  28. // 刷新动态线程池配置
  29. DtpRegistry.refresh(dtpProperties);
  30. // 发布刷新实现,该事件用于控制Web容器线程池参数控制
  31. publishEvent();
  32. }
  33. private void publishEvent() {
  34. RefreshEvent event = new RefreshEvent(this, dtpProperties);
  35. applicationEventMulticaster.multicastEvent(event);
  36. }
  37. }

Zookeeper配置中心接入扩展实现

基于

  1. AbstractRefresher
就可以实现
  1. Zookeeper
配置中心的扩展了,
  1. Zookeeper
的扩展实现继承
  1. AbstractRefresher
  1. Zookeeper
的扩展实现只需要监听配置中心的配置变更即可拿到配置内容,然后通过
  1. refresh
刷新配置即可。代码如下:

  1. ZookeeperRefresher
继承
  1. AbstractRefresher
,实现
  1. InitializingBean
  1. afterPropertiesSet
方法逻辑从配置
  1. DtpProperties
获取
  1. Zookeeper
的配置信息,
  1. CuratorFrameworkFactory
创建客户端,设置监听器,这里有两种监听器,一个是连接监听
  1. ConnectionStateListener
,一个是节点变动监听
  1. CuratorListener
,出发监听后
  1. loadNode
负责从
  1. Zookeeper
获取配置文件配置并组装配置内容,然后通过
  1. refresh
刷新配置,注意,
  1. Zookeeper
配置目前配置类型仅支持
  1. properties
  1. @Slf4j
  2. public class ZookeeperRefresher extends AbstractRefresher implements InitializingBean {
  3. @Resource
  4. private DtpProperties dtpProperties;
  5. private CuratorFramework curatorFramework;
  6. @Override
  7. public void afterPropertiesSet() throws Exception {
  8. DtpProperties.Zookeeper zookeeper = dtpProperties.getZookeeper();
  9. curatorFramework = CuratorFrameworkFactory.newClient(zookeeper.getZkConnectStr(),
  10. new ExponentialBackoffRetry(1000, 3));
  11. String nodePath = ZKPaths.makePath(ZKPaths.makePath(zookeeper.getRootNode(),
  12. zookeeper.getConfigVersion()), zookeeper.getNode());
  13. final ConnectionStateListener connectionStateListener = (client, newState) -> {
  14. if (newState == ConnectionState.CONNECTED || newState == ConnectionState.RECONNECTED) {
  15. loadNode(nodePath);
  16. }};
  17. final CuratorListener curatorListener = (client, curatorEvent) -> {
  18. final WatchedEvent watchedEvent = curatorEvent.getWatchedEvent();
  19. if (null != watchedEvent) {
  20. switch (watchedEvent.getType()) {
  21. case NodeChildrenChanged:
  22. case NodeDataChanged:
  23. loadNode(nodePath);
  24. break;
  25. default:
  26. break;
  27. }
  28. }};
  29. curatorFramework.getConnectionStateListenable().addListener(connectionStateListener);
  30. curatorFramework.getCuratorListenable().addListener(curatorListener);
  31. curatorFramework.start();
  32. log.info("DynamicTp refresher, add listener success, nodePath: {}", nodePath);
  33. }
  34. /**
  35. * load config and refresh
  36. * @param nodePath config path
  37. */
  38. public void loadNode(String nodePath) {
  39. try {
  40. final GetChildrenBuilder childrenBuilder = curatorFramework.getChildren();
  41. final List<String> children = childrenBuilder.watched().forPath(nodePath);
  42. StringBuilder content = new StringBuilder();
  43. children.forEach(c -> {
  44. String n = ZKPaths.makePath(nodePath, c);
  45. final String nodeName = ZKPaths.getNodeFromPath(n);
  46. final GetDataBuilder data = curatorFramework.getData();
  47. String value = "";
  48. try {
  49. value = new String(data.watched().forPath(n), StandardCharsets.UTF_8);
  50. } catch (Exception e) {
  51. log.error("zk config value watched exception.", e);
  52. }
  53. content.append(nodeName).append("=").append(value).append("
  54. ");
  55. });
  56. refresh(content.toString(), ConfigFileTypeEnum.PROPERTIES);
  57. } catch (Exception e) {
  58. log.error("load zk node error, nodePath is {}", nodePath, e);
  59. }
  60. }
  61. }

以上就是Java动态线程池插件dynamic-tp集成zookeeper怎么配置的详细内容,更多关于Java动态线程池插件dynamic-tp集成zookeeper怎么配置的资料请关注九品源码其它相关文章!