public class DaemonThreadFactory implements ThreadFactory {
/** Фабрика для создания потоков */private final ThreadFactory fac = Executors.defaultThreadFactory();
/** Создает новый поток */public Thread newThread(Runnable r) {
Thread result = fac.newThread(r);
result.setDaemon(true);
return result;
}
}
Вот основная программка.
public class FutureTaskTest {
public static void main(String[] args) throws Exception {
new FutureTaskTest().test();
}
private final ExecutorService executor =
Executors.newSingleThreadExecutor(new DaemonThreadFactory());
public void test() throws Exception {
Task task = new Task();
FutureTask<Void> future = new FutureTask<Void>(task);
task.setFuture(future);
executor.execute(future);
Thread.sleep(100);
future.cancel(true);
System.out.println("wait");
try {
future.get();
} catch(CancellationException ex) {
System.out.println("cancellation exception");
}
System.out.println("thinking task done");
Thread.sleep(1000);
}
}
class Task implements Callable<Void> {
private Future<Void> future;
public void setFuture(Future<Void> future) {
this.future = future;
}
public Void call() throws Exception {
while( !future.isCancelled() ) {
System.out.println("go");
try {
Thread.sleep(500);
} catch(InterruptedException ex) {
System.out.println("interrupted");
Thread.sleep(1000);
System.out.println("rrrrrrrrrrrr");
}
}
System.out.println("cancelled");
return null;
}
}
В методе test() запускаем задачу на выполнение, затем ее отменяем и пытаемся подождать завершения с помошью метода get(). Это не получается, так как метод get() не ждет завершения метода call() класса Task, а сразу генерирует исключение. В результате получаем на консоле следующее.
go
wait
cancellation exception
thinking task done
interrupted
rrrrrrrrrrrr
cancelled
То есть после thunking task done задача еще продолжает работать.
Я хочу чтобы при вызове get() или другого подобного метода, выполнение основного потока блокировалось пока не завершится задача работающая в фоне. Может быть какой-нибудь другой высокоуровневый примитив синхронизации можно применить (чтобы можно было cancel() использовать).
Мне это нужно что-бы последовательно выполнять задачи, но не начинать выполнение следующей задачи без отмены предыдущей.
Re: Отмена выполнения задачи в отдельном потоке и FutureTask
Здравствуйте, runtime2, Вы писали:
R>То есть после thunking task done задача еще продолжает работать. R>Я хочу чтобы при вызове get() или другого подобного метода, выполнение основного потока блокировалось пока не завершится задача работающая в фоне. Может быть какой-нибудь другой высокоуровневый примитив синхронизации можно применить (чтобы можно было cancel() использовать).
R>Мне это нужно что-бы последовательно выполнять задачи, но не начинать выполнение следующей задачи без отмены предыдущей.
Единственное что приходит в голову — monitor! или флаг с монитором — задача выполнилась (выполнилась = full iteration|canceled|...), а монитор что бы на нем повисеть можно было в случае cancel-а, ну и монитор будет ограничивать доступ к полю, да и вообще можете обернуть свои задачи во что нибудь которое будет само уметь...
Re: Отмена выполнения задачи в отдельном потоке и FutureTask
R>class Task implements Callable<Void> {
R> private Future<Void> future;
R> public void setFuture(Future<Void> future) { R> this.future = future; R> }
R> public Void call() throws Exception { R> while( !future.isCancelled() ) { R> System.out.println("go"); R> try { R> Thread.sleep(500); R> } catch(InterruptedException ex) { R> System.out.println("interrupted"); R> Thread.sleep(1000); R> System.out.println("rrrrrrrrrrrr"); R> } R> } R> System.out.println("cancelled"); R> return null; R> }
R>} R>[/java]
R>В методе test() запускаем задачу на выполнение, затем ее отменяем и пытаемся подождать завершения с помошью метода get(). Это не получается, так как метод get() не ждет завершения метода call() класса Task, а сразу генерирует исключение. В результате получаем на консоле следующее. R>
R>То есть после thunking task done задача еще продолжает работать. R>Я хочу чтобы при вызове get() или другого подобного метода, выполнение основного потока блокировалось пока не завершится задача работающая в фоне. Может быть какой-нибудь другой высокоуровневый примитив синхронизации можно применить (чтобы можно было cancel() использовать).
R>Мне это нужно что-бы последовательно выполнять задачи, но не начинать выполнение следующей задачи без отмены предыдущей.
Если Task корректно обрабатывают InterruptedException, то после его возникновения он ничего "существенного" делать не будет, а сразу вернётся из метода call. Это значит, что даже если выполнение таска задержится немного, то ничего страшного не случится. Чтоб ему помочь быстрей завершитсья, нужно в основном потоке позвать Thread.yield() после Future.cancel. Чтоб уменьшить вероятность ошибки и облегчить написание новых тасков, я бы создал абстрактный базовый таск типа такого
abstract class BaseTask implements Callable<Void> {
private Future<Void> future;
public void setFuture(Future<Void> future) {
this.future = future;
}
public Void call() throws Exception {
while( !future.isCancelled() ) {
try {
doCall();
} catch(InterruptedException ex) {
...
}
}
return null;
}
//впомогательный метод для использования в потомках внутри длинных операцийprotected void checkInterrupted() {
if (Thread.interrupted())
throw new InterruptedException();
}
protected abstract void toCall();
}
Если же таск некорректно обрабатывает InterruptedException, то тут вообще ничего не поможет, поток может вечно висеть в каком-нибудь цикле.
Re: Отмена выполнения задачи в отдельном потоке и FutureTask
Здравствуйте, runtime2, Вы писали:
R>В методе test() запускаем задачу на выполнение, затем ее отменяем и пытаемся подождать завершения с помошью метода get(). Это не получается, так как метод get() не ждет завершения метода call() класса Task, а сразу генерирует исключение.
Надо использовать внутри get() метод join() той нити, завершения которой вы ожидаете при вызове метода get(). При этом вызов get() будет ожидать завершения нити и только после обработки этого вернёт управление.
Re[2]: Отмена выполнения задачи в отдельном потоке и FutureT
Здравствуйте, iZEN, Вы писали:
ZEN>Здравствуйте, runtime2, Вы писали:
R>>В методе test() запускаем задачу на выполнение, затем ее отменяем и пытаемся подождать завершения с помошью метода get(). Это не получается, так как метод get() не ждет завершения метода call() класса Task, а сразу генерирует исключение.
ZEN>Надо использовать внутри get() метод join() той нити, завершения которой вы ожидаете при вызове метода get(). При этом вызов get() будет ожидать завершения нити и только после обработки этого вернёт управление.
Вызывать join внутри get проблематично, так как не факт, что поток завершится. Суть Executor-а как раз в том, чтоб повторно использовать потоки. То что call завершился не гарантирует, что поток, в котором вызов call происходил, завершится.
Re[2]: Отмена выполнения задачи в отдельном потоке и FutureT
Здравствуйте, zubr, Вы писали:
Z>Единственное что приходит в голову — monitor! или флаг с монитором — задача выполнилась (выполнилась = full iteration|canceled|...), а монитор что бы на нем повисеть можно было в случае cancel-а, ну и монитор будет ограничивать доступ к полю, да и вообще можете обернуть свои задачи во что нибудь которое будет само уметь...
Хотелось бы без флага с помощью какой-нибудь высокоуровневой конструкции. Например, просто одним флагом не обойтись из-за того, что cancel() может вызываться у еще не запущенной задачи.
Task task = new Task();
FutureTask<Void> future = new FutureTask<Void>(task);
task.setFuture(future);
future.cancel(true);
executor.execute(future);
Насчет блокировки на cancel() согласен. Но это если не нужен результат задачи,который возвращает метод get().
Re[2]: Отмена выполнения задачи в отдельном потоке и FutureT
Здравствуйте, KRA, Вы писали:
KRA>Если Task корректно обрабатывают InterruptedException, то после его возникновения он ничего "существенного" делать не будет, а сразу вернётся из метода call. Это значит, что даже если выполнение таска задержится немного, то ничего страшного не случится. Чтоб ему помочь быстрей завершитсья, нужно в основном потоке позвать Thread.yield() после Future.cancel.
Я в таске работаю с изображениями достаточно большого размера, обработка изображения не может быть прервана с помощью interruption. В то же время, таск должен отменяться как можно быстрее, так как все завязано на GUI и интерфейс пользователя. Поэтому, так как изображение обрабатывается в несколько шагов я после каждого шага проверяю отменену таски.
Здравствуйте, jqnr, Вы писали:
J>Здравствуйте, runtime2, Вы писали:
J>Почему сделали именно так (см. выделенный текст) ?
J>Обычно наоборот, Feature является "оберткой" над Callable и запускается примерно так:
J>class OurCallable extends Callable<String> { J>}
J>// вызов J>Callable<String> ourCallable = new OurCallable (...); J>ExecutorService executor = ...; J>Feature<String> ourFeature = executor.submit(ourCallable);
Re: Отмена выполнения задачи в отдельном потоке и FutureTask
От:
Аноним
Дата:
18.03.09 11:30
Оценка:
Написал класс
public class CallableControl<V> implements Callable<V> {
private final CountDownLatch latch = new CountDownLatch(2);
private final Callable<V> callable;
public CallableControl(Callable<V> callable) {
this.callable = callable;
}
public V call() throws Exception {
latch.countDown();
V result = callable.call();
latch.countDown();
return result;
}
public void waitDone() throws InterruptedException {
//задача еще не начала выполнятьсяif( latch.getCount() == 2 ) {
return;
}
latch.await();
}
}
Использую его так
public class FutureTaskTest {
public static void main(String[] args) throws Exception {
new FutureTaskTest().test();
}
private final ExecutorService executor =
Executors.newSingleThreadExecutor(new DaemonThreadFactory());
public void test() throws Exception {
Task task = new Task();
CallableControl<Void> control = new CallableControl<Void>(task);
FutureTask<Void> future = new FutureTask<Void>(control);
task.setFuture(future);
executor.execute(future);
Thread.sleep(100);
future.cancel(true);
System.out.println("wait");
control.waitDone();
try {
future.get();
} catch(CancellationException ex) {
System.out.println("cancellation exception");
}
System.out.println("thinking task done");
Thread.sleep(1000);
}
}
class Task implements Callable<Void> {
private Future<Void> future;
public void setFuture(Future<Void> future) {
this.future = future;
}
public Void call() throws Exception {
while( !future.isCancelled() ) {
System.out.println("go");
try {
Thread.sleep(500);
} catch(InterruptedException ex) {
System.out.println("interrupted");
Thread.sleep(1000);
System.out.println("rrrrrrrrrrrr");
}
}
System.out.println("cancelled");
return null;
}
}
Вывод на консоль
go
wait
interrupted
rrrrrrrrrrrr
cancelled
cancellation exception
thinking task done
То есть, вроде бы, все правильно. Только я не знаю насколько CallableControl корректен с точки зрения многопоточности.
Re[2]: Отмена выполнения задачи в отдельном потоке и FutureT
Здравствуйте, runtime2, Вы писали:
R>Здравствуйте, KRA, Вы писали:
KRA>>Если Task корректно обрабатывают InterruptedException, то после его возникновения он ничего "существенного" делать не будет, а сразу вернётся из метода call. Это значит, что даже если выполнение таска задержится немного, то ничего страшного не случится. Чтоб ему помочь быстрей завершитсья, нужно в основном потоке позвать Thread.yield() после Future.cancel.
R>Я в таске работаю с изображениями достаточно большого размера, обработка изображения не может быть прервана с помощью interruption. В то же время, таск должен отменяться как можно быстрее, так как все завязано на GUI и интерфейс пользователя. Поэтому, так как изображение обрабатывается в несколько шагов я после каждого шага проверяю отменену таски. R>
Все мои рассуждения применими и к Future.isCancelled. Я лично не знаю надёжного и корректного способа остановить поток ни из него самого. Если нужно быстро отменять работу задачи, то проверять нужно не после каждого шага, а и внутри шагов (т.е. в loadImage наверняка есть цикл, который читает данные, вот внутри него и нужно проверять). Иначе нужно быть готовым к тому, что задача не будет реагировать на отмену время, которое занимает самая длинная операция внутри метода call.
KRA>>Если же таск некорректно обрабатывает InterruptedException, то тут вообще ничего не поможет, поток может вечно висеть в каком-нибудь цикле.
R>Можно подождать заврешения с помощью wait(timeout) определенное время, а потом завершить поток, или по крайней мере выдать сообщение об ошибке.
Как вы будете завершать поток, которым управляет Executor? Если при использовании чистых Thread-ов, есть грязные методы, то с стандартными Executor-ами даже их нет. Только ждать пока задача сама вернёт управление. Обратите внимание, что решение с CountDownLatch-ем (приведённое в соседней ветке) вешает поток GUI, пока рабочий не вернёт управление!
Т.е. возвращаемся к тому, с чего я начал. Рабочие потоки должны часто проверять, а не остановили их (в Вашем случае с помощью Future.isCancelled) и по обнаружению такого собития возвращать управление из call. И тогда нет никакого смысла специально ждать пока метод call действительно завершит выполнение, так как Вы используете singleThreadExecutor, то новая задача не начнёт выполнение пока текущая не закончится (это ж я так понял главнай задача?)
Re[2]: Отмена выполнения задачи в отдельном потоке и FutureT
А>public class CallableControl<V> implements Callable<V> {
А> private final CountDownLatch latch = new CountDownLatch(2);
А> private final Callable<V> callable;
А> public CallableControl(Callable<V> callable) {
А> this.callable = callable;
А> }
А> public V call() throws Exception {
А> latch.countDown();
А> V result = callable.call();
А> latch.countDown();
А> return result;
А> }
А> public void waitDone() throws InterruptedException {
А> //задача еще не начала выполняться
А> if( latch.getCount() == 2 ) {
А> return;
А> }
А> latch.await();
А> }
А>}
А>
Здесь я его не очень хорошо написал. Метод call() неверно работает при возникновении исключения.
Вот исправленная версия
public class CallableControl<V> implements Callable<V> {
private final CountDownLatch latch = new CountDownLatch(2);
private final Callable<V> callable;
public CallableControl(Callable<V> callable) {
this.callable = callable;
}
public V call() throws Exception {
latch.countDown();
try {
V result = callable.call();
return result;
} finally {
latch.countDown();
}
}
public void waitDone() throws InterruptedException {
//задача еще не начала выполнятьсяif( latch.getCount() == 2 ) {
return;
}
latch.await();
}
}
Re[4]: Отмена выполнения задачи в отдельном потоке и FutureT
KRA>Все мои рассуждения применими и к Future.isCancelled. Я лично не знаю надёжного и корректного способа остановить поток ни из него самого. Если нужно быстро отменять работу задачи, то проверять нужно не после каждого шага, а и внутри шагов (т.е. в loadImage наверняка есть цикл, который читает данные, вот внутри него и нужно проверять). Иначе нужно быть готовым к тому, что задача не будет реагировать на отмену время, которое занимает самая длинная операция внутри метода call.
Да, надежного способа остановить поток не существует. Если бы в loadImage() внутри был цикл, то загрузка изображения заняла бы много времени, там просто идет нативный вызов (загрузка изображений осуществляется через swt). Дело в том, что в методе call() достаточно много операций после которых следует проверка отмены, некоторые занимают больше времени чем остальные. Вполне допустимо иногда подождать дольше.
KRA>>>Если же таск некорректно обрабатывает InterruptedException, то тут вообще ничего не поможет, поток может вечно висеть в каком-нибудь цикле.
R>>Можно подождать заврешения с помощью wait(timeout) определенное время, а потом завершить поток, или по крайней мере выдать сообщение об ошибке.
KRA>Как вы будете завершать поток, которым управляет Executor? Если при использовании чистых Thread-ов, есть грязные методы, то с стандартными Executor-ами даже их нет. Только ждать пока задача сама вернёт управление.
Ага, насчет с завершением я ерунду сказал. Значит надо просто выдать сообщение и завершить приложение.
KRA>Обратите внимание, что решение с CountDownLatch-ем (приведённое в соседней ветке) вешает поток GUI, пока рабочий не вернёт управление!
Это меня устраивает(и сделано специально), пользователю в этот момент часики показываются.
KRA>Т.е. возвращаемся к тому, с чего я начал. Рабочие потоки должны часто проверять, а не остановили их (в Вашем случае с помощью Future.isCancelled) и по обнаружению такого собития возвращать управление из call. И тогда нет никакого смысла специально ждать пока метод call действительно завершит выполнение, так как Вы используете singleThreadExecutor, то новая задача не начнёт выполнение пока текущая не закончится (это ж я так понял главнай задача?)
Впринципе согласен. Действительно следующая задача не начнется. Но, к сожалению, такой подход несколько усложняет архитектуру и отслеживание ошибок. Так как, перед началом следующей задачи надо выполнить большое количество разрозненных деинициализирующих действий. К тому же в деинициализацию входит отмена других задач выполняющихся в фоне. Их как-то пришлось бы в начало новой задачи запихнуть. А это сложнее чем просто подождать завершения текущих задач, выполненить деинициализацию и запустить новые.
Re: Отмена выполнения задачи в отдельном потоке и FutureTask
Здравствуйте, runtime2, Вы писали:
R>Я хочу чтобы при вызове get() или другого подобного метода, выполнение основного потока блокировалось пока не завершится задача работающая в фоне. Может быть какой-нибудь другой высокоуровневый примитив синхронизации можно применить (чтобы можно было cancel() использовать).
R>Мне это нужно что-бы последовательно выполнять задачи, но не начинать выполнение следующей задачи без отмены предыдущей.
Вы "неправильно" хотите. То есть сама задача поставлена неверно. Если вам последовательно выполнять задачи — зачем вы заморачиваетесь с экзекьюторами, тасками и пр.?
Если вы хотите выполнять всего одну задачу, и не выполнять последующие, пока она не завершится — вам не нужен пул тредов. Создайте 1 (адын) тред, в котором выполняйте эту задачу. У вас не будет проблем с тем, чтоб выяснить, работает он до сих пор или нет — заведите в нём будет флажок "занято" пока он работает. Можно глобальную статическую переменную, можно (если нужно) создавать тред своего класса (class MyThread extends Thread) и в этом объекте держать флажок.
Здравствуйте, runtime2, Вы писали:
R>Мне это нужно что-бы последовательно выполнять задачи, но не начинать выполнение следующей задачи без отмены предыдущей.
Кстати, для этого тебе вообще ничего не нужно делать, никаких дополнительных функциональностей на get не нужно.
Ты создай executor с одним тредом, и сабмить в него задачи. Поскольку у него только один тред — он физически не сможет выполять больше, чем одну задачу.
Сделаешь ты ей cancel или она нормально завершится — это дело десятое. Но следующий таск не будет выполнятся, пока предыдущий не освободит занятый тред.
Re[2]: Отмена выполнения задачи в отдельном потоке и FutureT
От:
Аноним
Дата:
18.03.09 14:38
Оценка:
Здравствуйте, mkizub, Вы писали:
M>Здравствуйте, runtime2, Вы писали:
M>Кстати, для этого тебе вообще ничего не нужно делать, никаких дополнительных функциональностей на get не нужно. M>Ты создай executor с одним тредом, и сабмить в него задачи. Поскольку у него только один тред — он физически не сможет выполять больше, чем одну задачу. M>Сделаешь ты ей cancel или она нормально завершится — это дело десятое. Но следующий таск не будет выполнятся, пока предыдущий не освободит занятый тред.
Так у меня так и сделано в примере
private final ExecutorService executor =
Executors.newSingleThreadExecutor(new DaemonThreadFactory());
Как я уже писал выше. Между завершением предыдущей задачи и началом новой мне надо выполнять ряд действий по деинициализации. Для этого мне обязательно надо знать, что задача остановлена.
Re[2]: Отмена выполнения задачи в отдельном потоке и FutureT
Здравствуйте, mkizub, Вы писали:
M>Вы "неправильно" хотите. То есть сама задача поставлена неверно. Если вам последовательно выполнять задачи — зачем вы заморачиваетесь с экзекьюторами, тасками и пр.?
M>Если вы хотите выполнять всего одну задачу, и не выполнять последующие, пока она не завершится — вам не нужен пул тредов. Создайте 1 (адын) тред, в котором выполняйте эту задачу. У вас не будет проблем с тем, чтоб выяснить, работает он до сих пор или нет — заведите в нём будет флажок "занято" пока он работает. Можно глобальную статическую переменную, можно (если нужно) создавать тред своего класса (class MyThread extends Thread) и в этом объекте держать флажок.
При использовании Thread мне надо будет пересоздавать каждый раз поток. Экзекутор позволяет использовать один и тот же поток для запуска задач. Таск я использую для отмены и для работы с исключениями. Хотя может он и не к месту здесь.
Re[3]: Отмена выполнения задачи в отдельном потоке и FutureT
Здравствуйте, Аноним, Вы писали:
А>Как я уже писал выше. Между завершением предыдущей задачи и началом новой мне надо выполнять ряд действий по деинициализации. Для этого мне обязательно надо знать, что задача остановлена.
Ну тогда код задачи должен быть вроде
public void run() {
GlobalDaemon.BUSY = true;
try {
...
} finally {
GlobalDaemon.BUSY = false;
// optionally notify waiters with Object.notifyAll or firering an event
}
}
Здравствуйте, runtime2, Вы писали:
R>При использовании Thread мне надо будет пересоздавать каждый раз поток. Экзекутор позволяет использовать один и тот же поток для запуска задач. Таск я использую для отмены и для работы с исключениями. Хотя может он и не к месту здесь.
Зачем пересоздавать? Создайте один раз и пусть он себе крутится. Вроде того, как это описано в javadoc-е для java.util.concurrent.BlockingQueue