Premature pessimization. O(N*N) вместо O(N), или хотя бы O(N * ln(N)).
GT>Даже при миллионе тасок этот цикл будет выполняться за миллисекунды.
На добавление миллиона заданий потребуется 500 миллиардов итераций — это как минимум минуты под мьютексом. Для десяти миллионов — уже 50 триллионов, а это уже часы.
GT>А ситуация что там будет миллион тасок врят-ли реальна.
Здравствуйте, Evgeny.Panasyuk, Вы писали:
EP>Здравствуйте, GreenTea, Вы писали:
GT>>Ну и что?
EP>Premature pessimization. O(N*N) вместо O(N), или хотя бы O(N * ln(N)).
GT>>Даже при миллионе тасок этот цикл будет выполняться за миллисекунды.
EP>На добавление миллиона заданий потребуется 500 миллиардов итераций — это как минимум минуты под мьютексом. Для десяти миллионов — уже 50 триллионов, а это уже часы.
GT>>А ситуация что там будет миллион тасок врят-ли реальна.
EP>Почему? Это же не потоки.
Ну так таски еще будут выполняться параллельно. Поэтому очередь будет очищаться. А если взять и допустить что за долисекунды придут триллионы тасок, тогда по памяти все умрет. Но зачем вдаваться в такие крайности.
Здравствуйте, antropolog, Вы писали:
A>А надо было всего-лишь реализовать простой паттерн — привязку продюсера к конкретному потоку. В данном случае это делается отдельной очередью на каждый поток, и помещением таски в очередь с индексом = taskID % threadNum
Такого требования нет в задании.
Таск от любого клиента может выполняться любым потоком, если только другой поток не выполняет в это время таск этого клиента.
Мне видится простой алгоритм: все таски (с ID клиента) ставятся в одну очередь, в порядке поступления.
CThreadPool имеет set "клиентов в процессе".
Потокам при создании передаётся callback на:
CTask* CThreadPool::NextTask(int& _clientId);
Где [in, out] _clientId — при вызове равен ID предыдущего клиента (например, -1 для начала), а по возвращении — текущего.
Потоки работают в бесконечном цикле NextTask() / Execute(). Ведь конечного условия не дано?
CThreadPool::NextTask() удаляет ID предыдущего клиента из set'а, перебирает лист в поисках первого ID клиента, не найденного в том set'е, добавляет его в set, удаляет из очереди и возвращает потоку.
В задании не говорится об ответственности за удаление CTask; очень может быть, что его Execute() делает delete this; перед возвращением.
Итого: 3-4 строки на функцию потока, 2-3 — на AddTask() и 15-20 на NextTask().
@ТС: к сожалению, ссылка у меня на работе не открывается (по соображениям безопасности).
Здравствуйте, GreenTea, Вы писали:
EP>>Почему? Это же не потоки. GT>Ну так таски еще будут выполняться параллельно. Поэтому очередь будет очищаться. А если взять и допустить что за долисекунды придут триллионы тасок, тогда по памяти все умрет. Но зачем вдаваться в такие крайности.
Даже если закрыть глаза на квадратичность — там всё равно условие не выполняется. Возможны вот такие последовательные состояния списка заданий (индексы клиентов):
Здравствуйте, VladFein, Вы писали:
VF>Такого требования нет в задании. VF>Таск от любого клиента может выполняться любым потоком, если только другой поток не выполняет в это время таск этого клиента. VF>Мне видится простой алгоритм: все таски (с ID клиента) ставятся в одну очередь, в порядке поступления. VF>CThreadPool имеет set "клиентов в процессе". VF>Потокам при создании передаётся callback на: VF>CTask* CThreadPool::NextTask(int& _clientId); VF>Где [in, out] _clientId — при вызове равен ID предыдущего клиента (например, -1 для начала), а по возвращении — текущего. VF>Потоки работают в бесконечном цикле NextTask() / Execute(). Ведь конечного условия не дано? VF>CThreadPool::NextTask() удаляет ID предыдущего клиента из set'а, перебирает лист в поисках первого ID клиента, не найденного в том set'е, добавляет его в set, удаляет из очереди и возвращает потоку.
В этой схеме условие тоже не выполняется. Пример списка заданий (индексы клиентов):
Здравствуйте, Evgeny.Panasyuk, Вы писали:
EP>Здравствуйте, GreenTea, Вы писали:
EP>>>Почему? Это же не потоки. GT>>Ну так таски еще будут выполняться параллельно. Поэтому очередь будет очищаться. А если взять и допустить что за долисекунды придут триллионы тасок, тогда по памяти все умрет. Но зачем вдаваться в такие крайности.
EP>Даже если закрыть глаза на квадратичность — там всё равно условие не выполняется. Возможны вот такие последовательные состояния списка заданий (индексы клиентов): EP>
Здравствуйте, GreenTea, Вы писали:
GT>Но это частный случай в котором случайно совпали тайминги добавления и обработки тасок. В общем же случае вероятность такой ситуации очень мала.
Даже если и так — условие всё равно не выполняется. Я бы ещё понял если бы это условие было трудно достичь — так ведь нет, есть же простой вариант циклического перебора клиентов.
VF>>Такого требования нет в задании. VF>>Таск от любого клиента может выполняться любым потоком, если только другой поток не выполняет в это время таск этого клиента. VF>>Мне видится простой алгоритм: все таски (с ID клиента) ставятся в одну очередь, в порядке поступления. VF>>CThreadPool имеет set "клиентов в процессе". VF>>Потокам при создании передаётся callback на: VF>>CTask* CThreadPool::NextTask(int& _clientId); VF>>Где [in, out] _clientId — при вызове равен ID предыдущего клиента (например, -1 для начала), а по возвращении — текущего. VF>>Потоки работают в бесконечном цикле NextTask() / Execute(). Ведь конечного условия не дано? VF>>CThreadPool::NextTask() удаляет ID предыдущего клиента из set'а, перебирает лист в поисках первого ID клиента, не найденного в том set'е, добавляет его в set, удаляет из очереди и возвращает потоку.
EP>В этой схеме условие тоже не выполняется. Пример списка заданий (индексы клиентов): EP>
Не должно быть "повисших" клиентов, даже если клиентов больше, чем потоков.
Мы, похоже, по разному читаем это условие.
Я понял, что нельзя "без очереди" обрабатывать клиента #N только потому, что какой-то поток уже имел с ним дело.
Иначе для чего же подсказка "даже если клиентов больше, чем потоков"?
Другими словами — никакой thread affinity.
Не должно быть "повисших" клиентов, даже если клиентов больше, чем потоков.
VF>Мы, похоже, по разному читаем это условие. VF>Я понял, что нельзя "без очереди" обрабатывать клиента #N только потому, что какой-то поток уже имел с ним дело. VF>Иначе для чего же подсказка "даже если клиентов больше, чем потоков"? VF>Другими словами — никакой thread affinity.
Там сказано:
Т.е. поток должен обрабатывать разных клиентов по-очереди, а не одного до завершения всех его задач.
В этой же схеме обрабатываются задачи клиентов 1 и 2 до завершения всех их задач, а клиенты 3, 4, 5, 6, 7 — всё это время ждут.
Здравствуйте, VladFein, Вы писали:
EP>>Там сказано: EP>>
EP>>Т.е. поток должен обрабатывать разных клиентов по-очереди, а не одного до завершения всех его задач.
EP>>В этой же схеме обрабатываются задачи клиентов 1 и 2 до завершения всех их задач, а клиенты 3, 4, 5, 6, 7 — всё это время ждут. VF>Опять зе — "по очереди", т.е. не давая преимущества клиентам по принципу affinity (или другим).
Тут получается у клиентов 1 и 2 преимущество над остальными, хотя добавление заданий могло произойти практически в одно и то же время.
VF>Очередь-то я соблюдаю?
Здравствуйте, Evgeny.Panasyuk, Вы писали:
VF>>Очередь-то я соблюдаю?
EP>Какую?
Живую? В порядке поступления, так сказать.
Я понял так, что поток А не должен вытягивать клиента Б "одного до завершения всех его задач", а должен брать из очереди, следя что бы "Задачи одного клиента обрабатываются в том порядке, в котором они были добавлены" (а НЕ параллельно).
Клиент, не подавший задания, не может считаться "повисшим".
Я допускаю, что Ваше прочтения задания верно; это я бы уточнил. Но тогда какой смысл в уточнении "даже если клиентов больше, чем потоков"?
Здравствуйте, VladFein, Вы писали:
VF>Здравствуйте, Evgeny.Panasyuk, Вы писали:
VF>>>Очередь-то я соблюдаю?
EP>>Какую?
VF>Живую? В порядке поступления, так сказать. VF>Я понял так, что поток А не должен вытягивать клиента Б "одного до завершения всех его задач", а должен брать из очереди, следя что бы "Задачи одного клиента обрабатываются в том порядке, в котором они были добавлены" (а НЕ параллельно). VF>Клиент, не подавший задания, не может считаться "повисшим". VF>Я допускаю, что Ваше прочтения задания верно; это я бы уточнил. Но тогда какой смысл в уточнении "даже если клиентов больше, чем потоков"?
Заметьте, что моей реализации, которую все обос*али вышеописанных проблем нет)
Здравствуйте, VladFein, Вы писали:
VF>Я допускаю, что Ваше прочтения задания верно; это я бы уточнил. Но тогда какой смысл в уточнении "даже если клиентов больше, чем потоков"?
Задачи одного клиента обрабатываются последовательно. Если клиентов не больше чем потоков — то тут трудно что-то напутать, параллельно будут обрабатываться задачи всех клиентов.
Если же клиентов больше, то при неправильной схеме возможно неравномерное распределение клиентских задач по потокам. Например клиенты 3, 4, 5, 6, 7 ждут пока не выполнятся ВСЕ задачи клиентов 1 и 2.
Здравствуйте, Evgeny.Panasyuk, Вы писали:
EP>Здравствуйте, GreenTea, Вы писали:
GT>>Но это частный случай в котором случайно совпали тайминги добавления и обработки тасок. В общем же случае вероятность такой ситуации очень мала.
EP>Даже если и так — условие всё равно не выполняется. Я бы ещё понял если бы это условие было трудно достичь — так ведь нет, есть же простой вариант циклического перебора клиентов.
Тогда вот так.. Тут трекается история clientId тасок которые выполнялись, в последовательности выполнения, и из свободных берется самая ранняя таская клиента, который обрабатывался раньше всех по истории.
Величина истории ограничена 100, если надо можно вынести это в параметр пула.
package net.sf.brunneng;
import java.util.*;
public class ThreadPool {
private class ClientTask {
final int clientId;
final Runnable runnable;
public ClientTask(int clientId, Runnable runnable) {
this.clientId = clientId;
this.runnable = runnable;
}
}
private static final int MAX_EXECUTED_CLIENTS_HISTORY = 100;
private boolean closed;
private final List<ClientTask> tasks = Collections.synchronizedList(new ArrayList<ClientTask>());
private List<Thread> threads = new ArrayList<Thread>();
private Set<Integer> executingClients = Collections.synchronizedSet(new HashSet<Integer>());
private LinkedList<Integer> executedClientsHistory = new LinkedList<Integer>();
public List<ClientTask> getTasks() {
return tasks;
}
public ThreadPool(int threadsCount) {
for (int i = 0; i < threadsCount; ++i) {
Thread thread = createThread();
threads.add(thread);
thread.start();
}
}
private ClientTask getNextClientTask() {
ClientTask res = null;
synchronized (tasks) {
Map<Integer, Integer> clientIdToEarliestTaskInstanceMap = new HashMap<Integer, Integer>();
for (int i = 0; i < tasks.size(); i++) {
ClientTask task = tasks.get(i);
if (!executingClients.contains(task.clientId) &&
!clientIdToEarliestTaskInstanceMap.containsKey(task.clientId)) {
clientIdToEarliestTaskInstanceMap.put(task.clientId, i);
}
}
final Map<Integer, Integer> clientIdMinStepsBack = new HashMap<Integer, Integer>();
Iterator<Integer> iterator = executedClientsHistory.descendingIterator();
int i = 1;
while (iterator.hasNext()) {
Integer clientId = iterator.next();
if (!clientIdMinStepsBack.containsKey(clientId)) {
clientIdMinStepsBack.put(clientId, i);
}
i++;
}
int maxStepBack = -1;
Integer bestEarliestTaskIndex = null;
for (Integer clientId : clientIdToEarliestTaskInstanceMap.keySet()) {
Integer stepsBack = clientIdMinStepsBack.get(clientId);
if (stepsBack == null) { // no occurence in history
bestEarliestTaskIndex = clientIdToEarliestTaskInstanceMap.get(clientId);
break;
}
else if (stepsBack > maxStepBack) {
maxStepBack = stepsBack;
bestEarliestTaskIndex = clientIdToEarliestTaskInstanceMap.get(clientId);
}
}
if (bestEarliestTaskIndex != null) {
res = tasks.remove(bestEarliestTaskIndex.intValue());
executingClients.add(res.clientId);
executedClientsHistory.add(res.clientId);
if (executedClientsHistory.size() > MAX_EXECUTED_CLIENTS_HISTORY) {
executedClientsHistory.removeFirst();
}
}
}
return res;
}
private Thread createThread() {
return new Thread(new Runnable() {
@Override
public void run() {
boolean finish = false;
while (!finish && !closed) {
try {
ClientTask task = getNextClientTask();
if (task == null) {
synchronized (tasks) {
tasks.wait();
}
continue;
}
task.runnable.run();
executingClients.remove(task.clientId);
} catch (InterruptedException e) {
e.printStackTrace();
finish = true;
}
}
}
});
}
public void addTask(int clientId, Runnable task) {
synchronized (tasks) {
tasks.add(new ClientTask(clientId, task));
if (!executingClients.contains(clientId)) {
tasks.notify();
}
}
}
public void close() {
closed = true;
synchronized (tasks) {
tasks.notifyAll();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException ignored) {
}
}
}
}
Здравствуйте, Selavi, Вы писали:
S>Заметьте, что моей реализации, которую все обос*али вышеописанных проблем нет)
Там есть проблемы покруче, например:
* одно и то же задание может выполнится несколько раз, так как не учитываются spurious wakeup.
* выполнение не заданного задания (segfault).
* отсутствие join для CThreadPoolX, в результате std::terminate.
Здравствуйте, GreenTea, Вы писали:
EP>>Даже если и так — условие всё равно не выполняется. Я бы ещё понял если бы это условие было трудно достичь — так ведь нет, есть же простой вариант циклического перебора клиентов. GT>Тогда вот так.. Тут трекается история clientId тасок которые выполнялись, в последовательности выполнения, и из свободных берется самая ранняя таская клиента, который обрабатывался раньше всех по истории. GT>Величина истории ограничена 100, если надо можно вынести это в параметр пула.
Требуется обойти всех клиентов по порядку + зацикливание на начало, всё
Зачем так сложно-то? В getNextClientTask создаётся два отображения, крутятся три цикла, всё это под мьютексом, да ещё и введён "параметр аппроксимации решения"
Здравствуйте, Evgeny.Panasyuk, Вы писали:
EP>Здравствуйте, Selavi, Вы писали:
S>>Заметьте, что моей реализации, которую все обос*али вышеописанных проблем нет)
EP>Там есть проблемы покруче, например: EP>* одно и то же задание может выполнится несколько раз, так как не учитываются spurious wakeup. EP>* выполнение не заданного задания (segfault). EP>* отсутствие join для CThreadPoolX, в результате std::terminate.
Да, Вы правы...вот черт( Наконец то хороший анализ!
Здравствуйте, Evgeny.Panasyuk, Вы писали:
EP>Здравствуйте, GreenTea, Вы писали:
EP>>>Даже если и так — условие всё равно не выполняется. Я бы ещё понял если бы это условие было трудно достичь — так ведь нет, есть же простой вариант циклического перебора клиентов. GT>>Тогда вот так.. Тут трекается история clientId тасок которые выполнялись, в последовательности выполнения, и из свободных берется самая ранняя таская клиента, который обрабатывался раньше всех по истории. GT>>Величина истории ограничена 100, если надо можно вынести это в параметр пула.
EP>Требуется обойти всех клиентов по порядку + зацикливание на начало, всё EP>Зачем так сложно-то? В getNextClientTask создаётся два отображения, крутятся три цикла, всё это под мьютексом, да ещё и введён "параметр аппроксимации решения"
Не понял. Какое еще зацикливание? Возможно сложно, но это первое что пришло в голову. Хотелось бы посмотреть на ваше решение.