Посоветуйте имплементацию ThreadPoolExecutor
Решил спросить прежде чем городить свой велосипед
В web-based сервисе MyService при обработки одного реквеста Req1 нужно сотни раз вызывать другой сервис TheirService для получения промежуточных данных.
Вызовы TheirService достаточно долгие, пара секунд каждый и поэтому логично вызывать его в нескольких threads.
Нужно ограничить ThreadPoolExecutor для вызовов TheirService, скажем, 50 тредами.
Собственно, всё вышеописанное делает Executors.newFixedThreadPool(50);
Но нужно наложить ещё одно ограничение — каждый реквест к MyService не должен использовать больше 5 threads
Есть ли готовое решение?
Здравствуйте, Antei, Вы писали:
A>Добрый день, форум!
A>Посоветуйте имплементацию ThreadPoolExecutor A>Решил спросить прежде чем городить свой велосипед
A>В web-based сервисе MyService при обработки одного реквеста Req1 нужно сотни раз вызывать другой сервис TheirService для получения промежуточных данных. A>Вызовы TheirService достаточно долгие, пара секунд каждый и поэтому логично вызывать его в нескольких threads.
A>Нужно ограничить ThreadPoolExecutor для вызовов TheirService, скажем, 50 тредами. A>Собственно, всё вышеописанное делает Executors.newFixedThreadPool(50);
A>Но нужно наложить ещё одно ограничение — каждый реквест к MyService не должен использовать больше 5 threads A>Есть ли готовое решение?
A>Спасибо
A>Решил спросить прежде чем городить свой велосипед
Ты больше времени потратил на написание поста, чем на написание велосипеда. При этом велосипед будет ровно по мерке, а сторонняя библиотека будет тут жать, там свисать
public static class RequestQueue {
private final ExecutorService executorService;
private final int limit;
private final Object lock = new Object();
private final LinkedList<Runnable> pending = new LinkedList<>();
private int inProgress = 0;
public RequestQueue(final ExecutorService executorService, final int limit) {
this.executorService = executorService;
this.limit = limit;
}
public void submit(Runnable runnable) {
final Runnable wrapped = () -> {
try {
runnable.run();
} finally {
onComplete();
}
};
synchronized (lock) {
if (inProgress < limit) {
inProgress++;
executorService.submit(wrapped);
} else {
pending.add(wrapped);
}
}
}
private void onComplete() {
synchronized (lock) {
final Runnable pending = this.pending.poll();
if (pending == null) {
inProgress--;
} else {
executorService.submit(pending);
}
}
}
}
Здравствуйте, bzig, Вы писали:
A>>Решил спросить прежде чем городить свой велосипед B>Ты больше времени потратил на написание поста, чем на написание велосипеда. B>При этом велосипед будет ровно по мерке, а сторонняя библиотека будет тут жать, там свисать
Ну, смотря что за либа )
В баблиотеку скорее всего часов на порядки больше вложено что критически для concurrency и покрытие тестами.
В гугловой гуаве нашёл немало разных executors, правда не сильно подходили для моей цели.
в onComplete() ошибка
Я, в принципе, похожим путём и пошёл с дополнительным уровнем иерархии, т.к. ограничивать нужно по ключу реквеста, возвратил из submit'а CompletableFuture чтобы обеспечить отложенные таски и сохранить семантику ExecutorService о чём и шла речь в вопросе.
Но всё равно спасибо!
Здравствуйте, Antei, Вы писали:
A>В баблиотеку скорее всего часов на порядки больше вложено что критически для concurrency и покрытие тестами. A>В гугловой гуаве нашёл немало разных executors, правда не сильно подходили для моей цели.
Я тоже не нашел ничего готового.
A>в onComplete() ошибка
А подробнее? Я не сильно вникал но на вскидку норм вроде. Локальные переменную только не стоит называть так-же как поле.
Здравствуйте, GarryIV, Вы писали:
A>>в onComplete() ошибка GIV>А подробнее? Я не сильно вникал но на вскидку норм вроде. Локальные переменную только не стоит называть так-же как поле.
Пересмотрел ещё раз, показалось. Я просто по-другому делал, а на уровне примера здесь вроде всё нормально. Беру замечание назад