Java根据某个key加锁怎么实现

其他教程   发布日期:2023年08月31日   浏览次数:763

本篇内容主要讲解“Java根据某个key加锁怎么实现”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Java根据某个key加锁怎么实现”吧!

    一、背景

    日常开发中,有时候需要根据某个 key 加锁,确保多线程情况下,对该 key 的加锁和解锁之间的代码串行执行。
    大家可以借助每个 key 对应一个 ReentrantLock ,让同一个 key 的线程使用该 lock 加锁;每个 key 对应一个 Semaphore ,让同一个 key 的线程使用 Semaphore 控制同时执行的线程数。

    二、参考代码

    接口定义

    1. public interface LockByKey<T> {
    2. /**
    3. * 加锁
    4. */
    5. void lock(T key);
    6. /**
    7. * 解锁
    8. */
    9. void unlock(T key);
    10. }

    2.1 同一个 key 只能一个线程执行

    2.1.1 代码实现

    每个 key 对应一个 ReentrantLock ,让同一个 key 的线程使用该 lock 加锁。

    1. import java.util.Map;
    2. import java.util.concurrent.ConcurrentHashMap;
    3. import java.util.concurrent.locks.ReentrantLock;
    4. public class DefaultLockByKeyImpl<T> implements LockByKey<T> {
    5. private final Map<T, ReentrantLock> lockMap = new ConcurrentHashMap<>();
    6. /**
    7. * 加锁
    8. */
    9. @Override
    10. public void lock(T key) {
    11. // 如果key为空,直接返回
    12. if (key == null) {
    13. throw new IllegalArgumentException("key 不能为空");
    14. }
    15. // 获取或创建一个ReentrantLock对象
    16. ReentrantLock lock = lockMap.computeIfAbsent(key, k -> new ReentrantLock());
    17. // 获取锁
    18. lock.lock();
    19. }
    20. /**
    21. * 解锁
    22. */
    23. @Override
    24. public void unlock(T key) {
    25. // 如果key为空,直接返回
    26. if (key == null) {
    27. throw new IllegalArgumentException("key 不能为空");
    28. }
    29. // 从Map中获取锁对象
    30. ReentrantLock lock = lockMap.get(key);
    31. // 获取不到报错
    32. if (lock == null) {
    33. throw new IllegalArgumentException("key " + key + "尚未加锁");
    34. }
    35. // 其他线程非法持有不允许释放
    36. if (!lock.isHeldByCurrentThread()) {
    37. throw new IllegalStateException("当前线程尚未持有,key:" + key + "的锁,不允许释放");
    38. }
    39. lock.unlock();
    40. }
    41. }

    注意事项:
    (1)参数合法性校验
    (2)解锁时需要判断该锁是否为当前线程持有

    2.1.2 编写单测

    1. import com.google.common.collect.Lists;
    2. import org.junit.Test;
    3. import java.util.HashSet;
    4. import java.util.List;
    5. import java.util.Set;
    6. import java.util.concurrent.CountDownLatch;
    7. import java.util.concurrent.ExecutorService;
    8. import java.util.concurrent.Executors;
    9. import java.util.concurrent.TimeUnit;
    10. public class DefaultLockByKeyImplTest {
    11. private final LockByKey<String> lockByKey = new DefaultLockByKeyImpl<>();
    12. private final CountDownLatch countDownLatch = new CountDownLatch(7);
    13. private final ExecutorService executorService = Executors.newFixedThreadPool(10);
    14. @Test
    15. public void test() throws InterruptedException {
    16. List<String> keys = Lists.newArrayList("a", "a", "a", "b", "c", "b", "d");
    17. Set<String> executingKeySet = new HashSet<>();
    18. for (int i = 0; i < keys.size(); i++) {
    19. String key = keys.get(i);
    20. int finalI = i;
    21. executorService.submit(() -> {
    22. lockByKey.lock(key);
    23. if (executingKeySet.contains(key)) {
    24. throw new RuntimeException("存在正在执行的 key:" + key);
    25. }
    26. executingKeySet.add(key);
    27. try {
    28. System.out.println("index:" + finalI + "对 [" + key + "] 加锁 ->" + Thread.currentThread().getName());
    29. TimeUnit.SECONDS.sleep(1);
    30. } catch (InterruptedException e) {
    31. throw new RuntimeException(e);
    32. } finally {
    33. System.out.println("index:" + finalI + "释放 [" + key + "] ->" + Thread.currentThread().getName());
    34. lockByKey.unlock(key);
    35. executingKeySet.remove(key);
    36. countDownLatch.countDown();
    37. }
    38. });
    39. }
    40. countDownLatch.await();
    41. }
    42. }

    如果同一个 key 没释放能够再次进入,会抛出异常。
    也可以通过日志来观察执行情况:

    1. index:0 [a] 加锁 ->pool-1-thread-1
    2. index:6 [d] 加锁 ->pool-1-thread-7
    3. index:4 [c] 加锁 ->pool-1-thread-5
    4. index:3 [b] 加锁 ->pool-1-thread-4
    5. index:6释放 [d] ->pool-1-thread-7
    6. index:4释放 [c] ->pool-1-thread-5
    7. index:0释放 [a] ->pool-1-thread-1
    8. index:3释放 [b] ->pool-1-thread-4
    9. index:1 [a] 加锁 ->pool-1-thread-2
    10. index:5 [b] 加锁 ->pool-1-thread-6
    11. index:1释放 [a] ->pool-1-thread-2
    12. index:5释放 [b] ->pool-1-thread-6
    13. index:2 [a] 加锁 ->pool-1-thread-3
    14. index:2释放 [a] ->pool-1-thread-3

    2.2、同一个 key 可以有 n个线程执行

    2.2.1 代码实现

    每个 key 对应一个 Semaphore ,让同一个 key 的线程使用 Semaphore 控制同时执行的线程数。

    1. import lombok.SneakyThrows;
    2. import java.util.Map;
    3. import java.util.concurrent.ConcurrentHashMap;
    4. import java.util.concurrent.Semaphore;
    5. public class SimultaneousEntriesLockByKey<T> implements LockByKey<T> {
    6. private final Map<T, Semaphore> semaphores = new ConcurrentHashMap<>();
    7. /**
    8. * 最大线程
    9. */
    10. private int allowed_threads;
    11. public SimultaneousEntriesLockByKey(int allowed_threads) {
    12. this.allowed_threads = allowed_threads;
    13. }
    14. /**
    15. * 加锁
    16. */
    17. @Override
    18. public void lock(T key) {
    19. Semaphore semaphore = semaphores.compute(key, (k, v) -> v == null ? new Semaphore(allowed_threads) : v);
    20. semaphore.acquireUninterruptibly();
    21. }
    22. /**
    23. * 解锁
    24. */
    25. @Override
    26. public void unlock(T key) {
    27. // 如果key为空,直接返回
    28. if (key == null) {
    29. throw new IllegalArgumentException("key 不能为空");
    30. }
    31. // 从Map中获取锁对象
    32. Semaphore semaphore = semaphores.get(key);
    33. if (semaphore == null) {
    34. throw new IllegalArgumentException("key " + key + "尚未加锁");
    35. }
    36. semaphore.release();
    37. if (semaphore.availablePermits() >= allowed_threads) {
    38. semaphores.remove(key, semaphore);
    39. }
    40. }

    2.2.2 测试代码

    1. import com.google.common.collect.Lists;
    2. import org.junit.Test;
    3. import java.time.LocalDateTime;
    4. import java.util.Collections;
    5. import java.util.HashMap;
    6. import java.util.List;
    7. import java.util.Map;
    8. import java.util.concurrent.CountDownLatch;
    9. import java.util.concurrent.ExecutorService;
    10. import java.util.concurrent.Executors;
    11. import java.util.concurrent.TimeUnit;
    12. public class SimultaneousEntriesLockByKeyTest {
    13. private final int maxThreadEachKey = 2;
    14. private final LockByKey<String> lockByKey = new SimultaneousEntriesLockByKey<>(maxThreadEachKey);
    15. private final CountDownLatch countDownLatch = new CountDownLatch(7);
    16. private final ExecutorService executorService = Executors.newFixedThreadPool(10);
    17. @Test
    18. public void test() throws InterruptedException {
    19. List<String> keys = Lists.newArrayList("a", "a", "a", "b", "c", "b", "d");
    20. Map<String, Integer> executingKeyCount = Collections.synchronizedMap(new HashMap<>());
    21. for (int i = 0; i < keys.size(); i++) {
    22. String key = keys.get(i);
    23. int finalI = i;
    24. executorService.submit(() -> {
    25. lockByKey.lock(key);
    26. executingKeyCount.compute(key, (k, v) -> {
    27. if (v != null && v + 1 > maxThreadEachKey) {
    28. throw new RuntimeException("超过限制了");
    29. }
    30. return v == null ? 1 : v + 1;
    31. });
    32. try {
    33. System.out.println("time:" + LocalDateTime.now().toString() + " ,index:" + finalI + "对 [" + key + "] 加锁 ->" + Thread.currentThread().getName() + "count:" + executingKeyCount.get(key));
    34. TimeUnit.SECONDS.sleep(1);
    35. } catch (InterruptedException e) {
    36. throw new RuntimeException(e);
    37. } finally {
    38. System.out.println("time:" + LocalDateTime.now().toString() + " ,index:" + finalI + "释放 [" + key + "] ->" + Thread.currentThread().getName() + "count:" + (executingKeyCount.get(key) - 1));
    39. lockByKey.unlock(key);
    40. executingKeyCount.compute(key, (k, v) -> v - 1);
    41. countDownLatch.countDown();
    42. }
    43. });
    44. }
    45. countDownLatch.await();
    46. }
    47. }

    输出:

    time:2023-03-15T20:49:57.044195 ,index:6对 [d] 加锁 ->pool-1-thread-7count:1
    time:2023-03-15T20:49:57.058942 ,index:5对 [b] 加锁 ->pool-1-thread-6count:2
    time:2023-03-15T20:49:57.069789 ,index:1对 [a] 加锁 ->pool-1-thread-2count:2
    time:2023-03-15T20:49:57.042402 ,index:4对 [c] 加锁 ->pool-1-thread-5count:1
    time:2023-03-15T20:49:57.046866 ,index:0对 [a] 加锁 ->pool-1-thread-1count:2
    time:2023-03-15T20:49:57.042991 ,index:3对 [b] 加锁 ->pool-1-thread-4count:2
    time:2023-03-15T20:49:58.089557 ,index:0释放 [a] ->pool-1-thread-1count:1
    time:2023-03-15T20:49:58.082679 ,index:6释放 [d] ->pool-1-thread-7count:0
    time:2023-03-15T20:49:58.084579 ,index:4释放 [c] ->pool-1-thread-5count:0
    time:2023-03-15T20:49:58.083462 ,index:5释放 [b] ->pool-1-thread-6count:1
    time:2023-03-15T20:49:58.089576 ,index:3释放 [b] ->pool-1-thread-4count:1
    time:2023-03-15T20:49:58.085359 ,index:1释放 [a] ->pool-1-thread-2count:1
    time:2023-03-15T20:49:58.096912 ,index:2对 [a] 加锁 ->pool-1-thread-3count:1
    time:2023-03-15T20:49:59.099935 ,index:2释放 [a] ->pool-1-thread-3count:0

    以上就是Java根据某个key加锁怎么实现的详细内容,更多关于Java根据某个key加锁怎么实现的资料请关注九品源码其它相关文章!