Size: a a a

2020 June 11

AT

Antonina Tatchuk in pro.jvm
Коллега написал статью про то, как построить надежное приложение на базе Event Sourcing. Посмотрите, может быть будет интересно — тут про архитектуру ПО (и Apache Flink внутри)
источник

АШ

Александр Шинкевич... in pro.jvm
Привет, можете, плиз, подсказать по spring
я использую асинхронный метод.
настроил executor
public Executor getAsyncExecutor() {
       log.debug("Creating Async Task Executor");
       ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
       executor.setCorePoolSize(2);
       executor.setMaxPoolSize(50);
       executor.setQueueCapacity(10_000);
       executor.setThreadNamePrefix(appName + "-Executor-");
       return new ExceptionHandlingAsyncTaskExecutor(executor);
   }

когда оправляю много задач, то падает ошибка
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask[Not completed, task = org.springframework.aop.interceptor.AsyncExecutionInterceptor] rejected from java.util.concurrent.ThreadPoolExecutor
[Running, pool size = 50, active threads = 50, queued tasks = 100, completed tasks = 4]

можно ли как-то решить проблему не увеличивая queue capacity?
к примеру, проверить, есть ли свободные execotor'ы, если нет, то подождать.
источник

Ю

Юрий in pro.jvm
Александр Шинкевич
Привет, можете, плиз, подсказать по spring
я использую асинхронный метод.
настроил executor
public Executor getAsyncExecutor() {
       log.debug("Creating Async Task Executor");
       ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
       executor.setCorePoolSize(2);
       executor.setMaxPoolSize(50);
       executor.setQueueCapacity(10_000);
       executor.setThreadNamePrefix(appName + "-Executor-");
       return new ExceptionHandlingAsyncTaskExecutor(executor);
   }

когда оправляю много задач, то падает ошибка
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask[Not completed, task = org.springframework.aop.interceptor.AsyncExecutionInterceptor] rejected from java.util.concurrent.ThreadPoolExecutor
[Running, pool size = 50, active threads = 50, queued tasks = 100, completed tasks = 4]

можно ли как-то решить проблему не увеличивая queue capacity?
к примеру, проверить, есть ли свободные execotor'ы, если нет, то подождать.
видимо тасков больше чем размер очереди
источник

Ю

Юрий in pro.jvm
увеличь размер очереди
источник

АШ

Александр Шинкевич... in pro.jvm
Юрий
видимо тасков больше чем размер очереди
да, в этом-то и беда, а как сделать без увеличения очереди?
источник

Ю

Юрий in pro.jvm
хранить в бд
источник

Ю

Юрий in pro.jvm
таску
источник

АШ

Александр Шинкевич... in pro.jvm
Юрий
хранить в бд
начал использовать RateLimiter, чтобы ограничить кол-во задач в секунду, пока могло, ошибки не возникает и очередь рассасывается
источник

AM

Aleksander Melnichni... in pro.jvm
Александр Шинкевич
Привет, можете, плиз, подсказать по spring
я использую асинхронный метод.
настроил executor
public Executor getAsyncExecutor() {
       log.debug("Creating Async Task Executor");
       ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
       executor.setCorePoolSize(2);
       executor.setMaxPoolSize(50);
       executor.setQueueCapacity(10_000);
       executor.setThreadNamePrefix(appName + "-Executor-");
       return new ExceptionHandlingAsyncTaskExecutor(executor);
   }

когда оправляю много задач, то падает ошибка
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask[Not completed, task = org.springframework.aop.interceptor.AsyncExecutionInterceptor] rejected from java.util.concurrent.ThreadPoolExecutor
[Running, pool size = 50, active threads = 50, queued tasks = 100, completed tasks = 4]

можно ли как-то решить проблему не увеличивая queue capacity?
к примеру, проверить, есть ли свободные execotor'ы, если нет, то подождать.
У тредпула такой конструкто
источник

AM

Aleksander Melnichni... in pro.jvm
public ThreadPoolExecutor(int corePoolSize,
                             int maximumPoolSize,
                             long keepAliveTime,
                             TimeUnit unit,
                             BlockingQueue<Runnable> workQueue,
                             ThreadFactory threadFactory,
                             RejectedExecutionHandler handler) {
источник

AM

Aleksander Melnichni... in pro.jvm
Тебя интересует последний аргумент - это политика что делать когда закончится очередь
источник

АШ

Александр Шинкевич... in pro.jvm
Aleksander Melnichnikov
Тебя интересует последний аргумент - это политика что делать когда закончится очередь
хорошо, а есть хорошие примеры, где идет ожидание освобождения очереди?
источник

AM

Aleksander Melnichni... in pro.jvm
Александр Шинкевич
хорошо, а есть хорошие примеры, где идет ожидание освобождения очереди?
Ну у тебя есть стандартные политики в джаве - callerRuns запускает таску в текущем треде,  Discard - просто не добавляет таску которую ты вставляешь,  Discard-Oldest - сбрасывает самую старую таску.
источник

AM

Aleksander Melnichni... in pro.jvm
Плюс в спринге вроде такая была CallerBlocksPolicy
источник

AM

Aleksander Melnichni... in pro.jvm
Которая делает тебе то, что ты как раз вроде хочешь
источник

AM

Aleksander Melnichni... in pro.jvm
код там простой как три копейки
источник

AM

Aleksander Melnichni... in pro.jvm
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
   if (!executor.isShutdown()) {
     try {
       BlockingQueue<Runnable> queue = executor.getQueue();
       if (logger.isDebugEnabled()) {
         logger.debug("Attempting to queue task execution for " + this.maxWait + " milliseconds");
       }
       if (!queue.offer(r, this.maxWait, TimeUnit.MILLISECONDS)) {
         throw new RejectedExecutionException("Max wait time expired to queue task");
       }
       if (logger.isDebugEnabled()) {
         logger.debug("Task execution queued");
       }
     }
     catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new RejectedExecutionException("Interrupted", e);
     }
   }
   else {
     throw new RejectedExecutionException("Executor has been shut down");
   }
 }
источник

AM

Aleksander Melnichni... in pro.jvm
Ну и плюс ты всегда можешь сделать переменную с блокирующей очередью которую использует тредпул и туда пихать свои раннбл - но это неудобно
источник

АШ

Александр Шинкевич... in pro.jvm
Aleksander Melnichnikov
Плюс в спринге вроде такая была CallerBlocksPolicy
хорошо, спасибо, что мысли направил
источник

AM

Aleksander Melnichni... in pro.jvm
Александр Шинкевич
хорошо, спасибо, что мысли направил
обращайся
источник