Re[6]: Тестовое задание ...
От: Evgeny.Panasyuk Россия  
Дата: 15.06.15 20:23
Оценка:
Здравствуйте, GreenTea, Вы писали:

GT>Ну и что?


Premature pessimization. O(N*N) вместо O(N), или хотя бы O(N * ln(N)).

GT>Даже при миллионе тасок этот цикл будет выполняться за миллисекунды.


На добавление миллиона заданий потребуется 500 миллиардов итераций — это как минимум минуты под мьютексом. Для десяти миллионов — уже 50 триллионов, а это уже часы.

GT>А ситуация что там будет миллион тасок врят-ли реальна.


Почему? Это же не потоки.
Re[7]: Тестовое задание ...
От: GreenTea  
Дата: 15.06.15 20:36
Оценка:
Здравствуйте, Evgeny.Panasyuk, Вы писали:

EP>Здравствуйте, GreenTea, Вы писали:


GT>>Ну и что?


EP>Premature pessimization. O(N*N) вместо O(N), или хотя бы O(N * ln(N)).


GT>>Даже при миллионе тасок этот цикл будет выполняться за миллисекунды.


EP>На добавление миллиона заданий потребуется 500 миллиардов итераций — это как минимум минуты под мьютексом. Для десяти миллионов — уже 50 триллионов, а это уже часы.


GT>>А ситуация что там будет миллион тасок врят-ли реальна.


EP>Почему? Это же не потоки.


Ну так таски еще будут выполняться параллельно. Поэтому очередь будет очищаться. А если взять и допустить что за долисекунды придут триллионы тасок, тогда по памяти все умрет. Но зачем вдаваться в такие крайности.
Re[3]: Тестовое задание ...
От: VladFein США  
Дата: 15.06.15 20:45
Оценка:
Здравствуйте, antropolog, Вы писали:

A>А надо было всего-лишь реализовать простой паттерн — привязку продюсера к конкретному потоку. В данном случае это делается отдельной очередью на каждый поток, и помещением таски в очередь с индексом = taskID % threadNum


Такого требования нет в задании.
Таск от любого клиента может выполняться любым потоком, если только другой поток не выполняет в это время таск этого клиента.
Мне видится простой алгоритм: все таски (с ID клиента) ставятся в одну очередь, в порядке поступления.
CThreadPool имеет set "клиентов в процессе".
Потокам при создании передаётся callback на:
CTask* CThreadPool::NextTask(int& _clientId);
Где [in, out] _clientId — при вызове равен ID предыдущего клиента (например, -1 для начала), а по возвращении — текущего.
Потоки работают в бесконечном цикле NextTask() / Execute(). Ведь конечного условия не дано?

CThreadPool::NextTask() удаляет ID предыдущего клиента из set'а, перебирает лист в поисках первого ID клиента, не найденного в том set'е, добавляет его в set, удаляет из очереди и возвращает потоку.

В задании не говорится об ответственности за удаление CTask; очень может быть, что его Execute() делает delete this; перед возвращением.

Итого: 3-4 строки на функцию потока, 2-3 — на AddTask() и 15-20 на NextTask().

@ТС: к сожалению, ссылка у меня на работе не открывается (по соображениям безопасности).
Re[8]: Тестовое задание ...
От: Evgeny.Panasyuk Россия  
Дата: 15.06.15 20:50
Оценка:
Здравствуйте, GreenTea, Вы писали:

EP>>Почему? Это же не потоки.

GT>Ну так таски еще будут выполняться параллельно. Поэтому очередь будет очищаться. А если взять и допустить что за долисекунды придут триллионы тасок, тогда по памяти все умрет. Но зачем вдаваться в такие крайности.

Даже если закрыть глаза на квадратичность — там всё равно условие не выполняется. Возможны вот такие последовательные состояния списка заданий (индексы клиентов):
1 2 3 4 5
add(7)
1 7 2 3 4 5
process(1)
7 2 3 4 5
add(1)
7 1 2 3 4 5
process(7)
1 2 3 4 5
Получаем цикл.
Re[4]: Тестовое задание ...
От: Evgeny.Panasyuk Россия  
Дата: 15.06.15 21:03
Оценка:
Здравствуйте, VladFein, Вы писали:

VF>Такого требования нет в задании.

VF>Таск от любого клиента может выполняться любым потоком, если только другой поток не выполняет в это время таск этого клиента.
VF>Мне видится простой алгоритм: все таски (с ID клиента) ставятся в одну очередь, в порядке поступления.
VF>CThreadPool имеет set "клиентов в процессе".
VF>Потокам при создании передаётся callback на:
VF>CTask* CThreadPool::NextTask(int& _clientId);
VF>Где [in, out] _clientId — при вызове равен ID предыдущего клиента (например, -1 для начала), а по возвращении — текущего.
VF>Потоки работают в бесконечном цикле NextTask() / Execute(). Ведь конечного условия не дано?
VF>CThreadPool::NextTask() удаляет ID предыдущего клиента из set'а, перебирает лист в поисках первого ID клиента, не найденного в том set'е, добавляет его в set, удаляет из очереди и возвращает потоку.

В этой схеме условие тоже не выполняется. Пример списка заданий (индексы клиентов):
1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 3 4 5 6 7
Отредактировано 15.06.2015 21:05 Evgeny.Panasyuk . Предыдущая версия .
Re[9]: Тестовое задание ...
От: GreenTea  
Дата: 15.06.15 21:04
Оценка:
Здравствуйте, Evgeny.Panasyuk, Вы писали:

EP>Здравствуйте, GreenTea, Вы писали:


EP>>>Почему? Это же не потоки.

GT>>Ну так таски еще будут выполняться параллельно. Поэтому очередь будет очищаться. А если взять и допустить что за долисекунды придут триллионы тасок, тогда по памяти все умрет. Но зачем вдаваться в такие крайности.

EP>Даже если закрыть глаза на квадратичность — там всё равно условие не выполняется. Возможны вот такие последовательные состояния списка заданий (индексы клиентов):

EP>
EP>1 2 3 4 5
EP>add(7)
EP>1 7 2 3 4 5
EP>process(1)
EP>7 2 3 4 5
EP>add(1)
EP>7 1 2 3 4 5
EP>process(7)
EP>1 2 3 4 5
EP>
Получаем цикл.


Но это частный случай в котором случайно совпали тайминги добавления и обработки тасок. В общем же случае вероятность такой ситуации очень мала.
Отредактировано 15.06.2015 21:04 GreenTea . Предыдущая версия .
Re[10]: Тестовое задание ...
От: Evgeny.Panasyuk Россия  
Дата: 15.06.15 21:14
Оценка:
Здравствуйте, GreenTea, Вы писали:

GT>Но это частный случай в котором случайно совпали тайминги добавления и обработки тасок. В общем же случае вероятность такой ситуации очень мала.


Даже если и так — условие всё равно не выполняется. Я бы ещё понял если бы это условие было трудно достичь — так ведь нет, есть же простой вариант циклического перебора клиентов.
Re[5]: Тестовое задание ...
От: VladFein США  
Дата: 15.06.15 21:15
Оценка:
Здравствуйте, Evgeny.Panasyuk, Вы писали:

  Скрытый текст
VF>>Такого требования нет в задании.
VF>>Таск от любого клиента может выполняться любым потоком, если только другой поток не выполняет в это время таск этого клиента.
VF>>Мне видится простой алгоритм: все таски (с ID клиента) ставятся в одну очередь, в порядке поступления.
VF>>CThreadPool имеет set "клиентов в процессе".
VF>>Потокам при создании передаётся callback на:
VF>>CTask* CThreadPool::NextTask(int& _clientId);
VF>>Где [in, out] _clientId — при вызове равен ID предыдущего клиента (например, -1 для начала), а по возвращении — текущего.
VF>>Потоки работают в бесконечном цикле NextTask() / Execute(). Ведь конечного условия не дано?
VF>>CThreadPool::NextTask() удаляет ID предыдущего клиента из set'а, перебирает лист в поисках первого ID клиента, не найденного в том set'е, добавляет его в set, удаляет из очереди и возвращает потоку.

EP>В этой схеме условие тоже не выполняется. Пример списка заданий (индексы клиентов):
EP>
EP>1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 3 4 5 6 7
EP>

Вы об этом?

Не должно быть "повисших" клиентов, даже если клиентов больше, чем потоков.

Мы, похоже, по разному читаем это условие.
Я понял, что нельзя "без очереди" обрабатывать клиента #N только потому, что какой-то поток уже имел с ним дело.
Иначе для чего же подсказка "даже если клиентов больше, чем потоков"?
Другими словами — никакой thread affinity.
Re[6]: Тестовое задание ...
От: Evgeny.Panasyuk Россия  
Дата: 15.06.15 21:20
Оценка:
Здравствуйте, VladFein, Вы писали:

EP>>В этой схеме условие тоже не выполняется. Пример списка заданий (индексы клиентов):

EP>>
EP>>1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 3 4 5 6 7
EP>>

VF>Вы об этом?

Не должно быть "повисших" клиентов, даже если клиентов больше, чем потоков.

VF>Мы, похоже, по разному читаем это условие.
VF>Я понял, что нельзя "без очереди" обрабатывать клиента #N только потому, что какой-то поток уже имел с ним дело.
VF>Иначе для чего же подсказка "даже если клиентов больше, чем потоков"?
VF>Другими словами — никакой thread affinity.

Там сказано:

Т.е. поток должен обрабатывать разных клиентов по-очереди, а не одного до завершения всех его задач.

В этой же схеме обрабатываются задачи клиентов 1 и 2 до завершения всех их задач, а клиенты 3, 4, 5, 6, 7 — всё это время ждут.
Re[7]: Тестовое задание ...
От: VladFein США  
Дата: 15.06.15 21:24
Оценка:
Здравствуйте, Evgeny.Panasyuk, Вы писали:

EP>Там сказано:

EP>

EP>Т.е. поток должен обрабатывать разных клиентов по-очереди, а не одного до завершения всех его задач.

EP>В этой же схеме обрабатываются задачи клиентов 1 и 2 до завершения всех их задач, а клиенты 3, 4, 5, 6, 7 — всё это время ждут.

Опять зе — "по очереди", т.е. не давая преимущества клиентам по принципу affinity (или другим). Очередь-то я соблюдаю?
Re[8]: Тестовое задание ...
От: Evgeny.Panasyuk Россия  
Дата: 15.06.15 21:30
Оценка:
Здравствуйте, VladFein, Вы писали:

EP>>Там сказано:

EP>>

EP>>Т.е. поток должен обрабатывать разных клиентов по-очереди, а не одного до завершения всех его задач.

EP>>В этой же схеме обрабатываются задачи клиентов 1 и 2 до завершения всех их задач, а клиенты 3, 4, 5, 6, 7 — всё это время ждут.
VF>Опять зе — "по очереди", т.е. не давая преимущества клиентам по принципу affinity (или другим).

Тут получается у клиентов 1 и 2 преимущество над остальными, хотя добавление заданий могло произойти практически в одно и то же время.

VF>Очередь-то я соблюдаю?


Какую?
Re[9]: Тестовое задание ...
От: VladFein США  
Дата: 15.06.15 21:48
Оценка:
Здравствуйте, Evgeny.Panasyuk, Вы писали:

VF>>Очередь-то я соблюдаю?


EP>Какую?


Живую? В порядке поступления, так сказать.
Я понял так, что поток А не должен вытягивать клиента Б "одного до завершения всех его задач", а должен брать из очереди, следя что бы "Задачи одного клиента обрабатываются в том порядке, в котором они были добавлены" (а НЕ параллельно).
Клиент, не подавший задания, не может считаться "повисшим".
Я допускаю, что Ваше прочтения задания верно; это я бы уточнил. Но тогда какой смысл в уточнении "даже если клиентов больше, чем потоков"?
Re[10]: Тестовое задание ...
От: Selavi  
Дата: 15.06.15 21:59
Оценка:
Здравствуйте, VladFein, Вы писали:

VF>Здравствуйте, Evgeny.Panasyuk, Вы писали:


VF>>>Очередь-то я соблюдаю?


EP>>Какую?


VF>Живую? В порядке поступления, так сказать.

VF>Я понял так, что поток А не должен вытягивать клиента Б "одного до завершения всех его задач", а должен брать из очереди, следя что бы "Задачи одного клиента обрабатываются в том порядке, в котором они были добавлены" (а НЕ параллельно).
VF>Клиент, не подавший задания, не может считаться "повисшим".
VF>Я допускаю, что Ваше прочтения задания верно; это я бы уточнил. Но тогда какой смысл в уточнении "даже если клиентов больше, чем потоков"?

Заметьте, что моей реализации, которую все обос*али вышеописанных проблем нет)
Re[10]: Тестовое задание ...
От: Evgeny.Panasyuk Россия  
Дата: 15.06.15 22:06
Оценка:
Здравствуйте, VladFein, Вы писали:

VF>Я допускаю, что Ваше прочтения задания верно; это я бы уточнил. Но тогда какой смысл в уточнении "даже если клиентов больше, чем потоков"?


Задачи одного клиента обрабатываются последовательно. Если клиентов не больше чем потоков — то тут трудно что-то напутать, параллельно будут обрабатываться задачи всех клиентов.
Если же клиентов больше, то при неправильной схеме возможно неравномерное распределение клиентских задач по потокам. Например клиенты 3, 4, 5, 6, 7 ждут пока не выполнятся ВСЕ задачи клиентов 1 и 2.
Re[11]: Тестовое задание ...
От: GreenTea  
Дата: 15.06.15 22:10
Оценка:
Здравствуйте, Evgeny.Panasyuk, Вы писали:

EP>Здравствуйте, GreenTea, Вы писали:


GT>>Но это частный случай в котором случайно совпали тайминги добавления и обработки тасок. В общем же случае вероятность такой ситуации очень мала.


EP>Даже если и так — условие всё равно не выполняется. Я бы ещё понял если бы это условие было трудно достичь — так ведь нет, есть же простой вариант циклического перебора клиентов.


Тогда вот так.. Тут трекается история clientId тасок которые выполнялись, в последовательности выполнения, и из свободных берется самая ранняя таская клиента, который обрабатывался раньше всех по истории.
Величина истории ограничена 100, если надо можно вынести это в параметр пула.

package net.sf.brunneng;


import java.util.*;

public class ThreadPool {

   private class ClientTask {
      final int clientId;
      final Runnable runnable;

      public ClientTask(int clientId, Runnable runnable) {
         this.clientId = clientId;
         this.runnable = runnable;
      }
   }

   private static final int MAX_EXECUTED_CLIENTS_HISTORY = 100;

   private boolean closed;
   private final List<ClientTask> tasks = Collections.synchronizedList(new ArrayList<ClientTask>());

   private List<Thread> threads = new ArrayList<Thread>();
   private Set<Integer> executingClients = Collections.synchronizedSet(new HashSet<Integer>());

   private LinkedList<Integer> executedClientsHistory = new LinkedList<Integer>();

   public List<ClientTask> getTasks() {
      return tasks;
   }

   public ThreadPool(int threadsCount) {
      for (int i = 0; i < threadsCount; ++i) {
         Thread thread = createThread();
         threads.add(thread);
         thread.start();
      }
   }

   private ClientTask getNextClientTask() {
      ClientTask res = null;

      synchronized (tasks) {

         Map<Integer, Integer> clientIdToEarliestTaskInstanceMap = new HashMap<Integer, Integer>();
         for (int i = 0; i < tasks.size(); i++) {
            ClientTask task = tasks.get(i);
            if (!executingClients.contains(task.clientId) &&
                    !clientIdToEarliestTaskInstanceMap.containsKey(task.clientId)) {
               clientIdToEarliestTaskInstanceMap.put(task.clientId, i);
            }
         }

         final Map<Integer, Integer> clientIdMinStepsBack = new HashMap<Integer, Integer>();
         Iterator<Integer> iterator = executedClientsHistory.descendingIterator();
         int i = 1;
         while (iterator.hasNext()) {
            Integer clientId = iterator.next();
            if (!clientIdMinStepsBack.containsKey(clientId)) {
               clientIdMinStepsBack.put(clientId, i);
            }
            i++;
         }

         int maxStepBack = -1;
         Integer bestEarliestTaskIndex = null;
         for (Integer clientId : clientIdToEarliestTaskInstanceMap.keySet()) {
            Integer stepsBack = clientIdMinStepsBack.get(clientId);
            if (stepsBack == null) { // no occurence in history
               bestEarliestTaskIndex = clientIdToEarliestTaskInstanceMap.get(clientId);
               break;
            }
            else if (stepsBack > maxStepBack) {
               maxStepBack = stepsBack;
               bestEarliestTaskIndex = clientIdToEarliestTaskInstanceMap.get(clientId);
            }
         }

         if (bestEarliestTaskIndex != null) {
            res = tasks.remove(bestEarliestTaskIndex.intValue());
            executingClients.add(res.clientId);

            executedClientsHistory.add(res.clientId);
            if (executedClientsHistory.size() > MAX_EXECUTED_CLIENTS_HISTORY) {
               executedClientsHistory.removeFirst();
            }
         }
      }

      return res;
   }

   private Thread createThread() {
      return new Thread(new Runnable() {
         @Override
         public void run() {
            boolean finish = false;

            while (!finish && !closed) {
               try {
                  ClientTask task = getNextClientTask();

                  if (task == null) {
                     synchronized (tasks) {
                        tasks.wait();
                     }
                     continue;
                  }

                  task.runnable.run();
                  executingClients.remove(task.clientId);

               } catch (InterruptedException e) {
                  e.printStackTrace();
                  finish = true;
               }
            }
         }
      });
   }

   public void addTask(int clientId, Runnable task) {
      synchronized (tasks) {
         tasks.add(new ClientTask(clientId, task));

         if (!executingClients.contains(clientId)) {
            tasks.notify();
         }
      }
   }

   public void close() {
      closed = true;

      synchronized (tasks) {
         tasks.notifyAll();
      }

      for (Thread thread : threads) {
         try {
            thread.join();
         } catch (InterruptedException ignored) {

         }
      }
   }
}
Отредактировано 15.06.2015 22:22 GreenTea . Предыдущая версия .
Re[11]: Тестовое задание ...
От: Evgeny.Panasyuk Россия  
Дата: 15.06.15 22:23
Оценка: 2 (1)
Здравствуйте, Selavi, Вы писали:

S>Заметьте, что моей реализации, которую все обос*али вышеописанных проблем нет)


Там есть проблемы покруче, например:
* одно и то же задание может выполнится несколько раз, так как не учитываются spurious wakeup.
* выполнение не заданного задания (segfault).
* отсутствие join для CThreadPoolX, в результате std::terminate.
Отредактировано 15.06.2015 22:59 Evgeny.Panasyuk . Предыдущая версия .
Re[12]: Тестовое задание ...
От: Evgeny.Panasyuk Россия  
Дата: 15.06.15 23:36
Оценка:
Здравствуйте, GreenTea, Вы писали:

EP>>Даже если и так — условие всё равно не выполняется. Я бы ещё понял если бы это условие было трудно достичь — так ведь нет, есть же простой вариант циклического перебора клиентов.

GT>Тогда вот так.. Тут трекается история clientId тасок которые выполнялись, в последовательности выполнения, и из свободных берется самая ранняя таская клиента, который обрабатывался раньше всех по истории.
GT>Величина истории ограничена 100, если надо можно вынести это в параметр пула.

Требуется обойти всех клиентов по порядку + зацикливание на начало, всё
Зачем так сложно-то? В getNextClientTask создаётся два отображения, крутятся три цикла, всё это под мьютексом, да ещё и введён "параметр аппроксимации решения"
Re[12]: Тестовое задание ...
От: Selavi  
Дата: 16.06.15 07:04
Оценка:
Здравствуйте, Evgeny.Panasyuk, Вы писали:

EP>Здравствуйте, Selavi, Вы писали:


S>>Заметьте, что моей реализации, которую все обос*али вышеописанных проблем нет)


EP>Там есть проблемы покруче, например:

EP>* одно и то же задание может выполнится несколько раз, так как не учитываются spurious wakeup.
EP>* выполнение не заданного задания (segfault).
EP>* отсутствие join для CThreadPoolX, в результате std::terminate.

Да, Вы правы...вот черт( Наконец то хороший анализ!

А не могли бы подробней про 2 и 3?
Re[13]: Тестовое задание ...
От: GreenTea  
Дата: 16.06.15 07:54
Оценка:
Здравствуйте, Evgeny.Panasyuk, Вы писали:

EP>Здравствуйте, GreenTea, Вы писали:


EP>>>Даже если и так — условие всё равно не выполняется. Я бы ещё понял если бы это условие было трудно достичь — так ведь нет, есть же простой вариант циклического перебора клиентов.

GT>>Тогда вот так.. Тут трекается история clientId тасок которые выполнялись, в последовательности выполнения, и из свободных берется самая ранняя таская клиента, который обрабатывался раньше всех по истории.
GT>>Величина истории ограничена 100, если надо можно вынести это в параметр пула.

EP>Требуется обойти всех клиентов по порядку + зацикливание на начало, всё

EP>Зачем так сложно-то? В getNextClientTask создаётся два отображения, крутятся три цикла, всё это под мьютексом, да ещё и введён "параметр аппроксимации решения"

Не понял. Какое еще зацикливание? Возможно сложно, но это первое что пришло в голову. Хотелось бы посмотреть на ваше решение.
Re[14]: Тестовое задание ...
От: Слава  
Дата: 16.06.15 11:24
Оценка:
Здравствуйте, GreenTea, Вы писали:

GT>...


В этой ветке мы видим подтверждение фразы "многопоточность — это СЛОЖНО".

И тем более неясными становятся придирки по оформлению при выполненной задаче, которые ставились в вину создателю темы.
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.