имеется работающая реализация dataQueue с Producer и Receiver
Поток продьюсера считаывает горизонтально дюжину файлов, сортирует содержимое,
приводит в один List<DataItem> и обновляет цепочку данных
Поток ресивера считывает без промедления цепочку данных.
Потоки продьюсера и ресивера синхронизированы на сигнальный обьект, следящий за размером цепочки и концами файлов.
В ресивере каждый полученный DataItem должен будить потоки группы Наблюдателей
и без модификции считываться в них параллельно.
Пробная реализация Runnable Обсервера, с попыткой пробуждения, усыпления потока
через volatile поля не работает, входные данные (Integer) не обрабатывается.
Мне будут интересны указания на мои ошибки с реализацией
void run(){
synchronized(this){
}
}
с уважением,
обьект
////////////////////////////////////////////////////////public class ObserverTest {
static class Observer implements Runnable{
private volatile boolean threadSuspended;
private volatile boolean threadFinished;
private Integer item;
Observer(){
threadSuspended = true;
threadFinished = false;
}
public boolean isThreadSuspended() {
return threadSuspended;
}
public Integer getItem() {
return item;
}
public void setItem(Integer item) {
this.item = item;
setThreadSuspended(false); //
}
public void setThreadSuspended(boolean threadSuspended) {
this.threadSuspended = threadSuspended;
}
public void setThreadFinished() {
this.threadFinished = true;
}
public void run(){
long interval = 10; //in ms
Thread thisThread = Thread.currentThread();
synchronized(this){
while (true) {
try {
thisThread.sleep(interval);
while (threadSuspended){
wait();
}
if (threadFinished)
break;
//processing
System.out.println(thisThread.getId()+" id processing item: "+item);
//putting thread back to wait statethis.setThreadSuspended(true);
} catch (InterruptedException e){
e.printStackTrace();
}
}
}
}
}
public static void main(String[] args) {
Observer obs = new Observer();
Thread observerT = new Thread(obs);
observerT.start();
obs.setItem(new Integer(1));
obs.setItem(new Integer(2));
obs.setItem(new Integer(3));
obs.setThreadFinished();
}
}
Не забываем использовать теги подсветки кода. Blazkowicz
Re: потоки. стоп старт потока в Runnable ожидающего инпут
явные проблемы
1. вызывается wait и на нём поток будет висеть вечно. Чтоб исправить нужно в setItem вызывать notify. Поле threadSuspended — вообще не нужно. Похоже Вы не правильно понимаете, что делает wait.
2. некоторые item-ы могут теряться. Если записывать их быстрей чем обрабатывать, то последующий просто затрёт предыдущий. Нужно хранить очередь item-ов ожидающих обработки.
Re: потоки. стоп старт потока в Runnable ожидающего инпут
Зачем там sleep? Флаг finished выставляется сразу после добавления. Соответсвенно поток спит и просыпаясь выходит из цикла ничего не обрабатывая. Почему вообще finished выставляется по факту добавления всех элементов а не по факту их обработки? Задача поставлена сумбурно, а почему код реализован именно так вообще не понятно.
Re[2]: потоки. стоп старт потока в Runnable ожидающего ин
Здравствуйте, KRA, Вы писали:
KRA>Здравствуйте, objet, Вы писали:
KRA>явные проблемы KRA>1. вызывается wait и на нём поток будет висеть вечно. Чтоб исправить нужно в setItem вызывать notify. Поле threadSuspended — вообще не нужно. Похоже Вы не правильно понимаете, что делает wait. KRA>2. некоторые item-ы могут теряться. Если записывать их быстрей чем обрабатывать, то последующий просто затрёт предыдущий. Нужно хранить очередь item-ов ожидающих обработки.
И вдогонку, тут вообще можно обойтись без блокировок. Вот пример
public abstract class BackgroundEventProcessor<Event> implements Runnable {
protected final Log log = LogFactory.getLog(getClass());
private final Queue<Event> events = new ConcurrentLinkedQueue<Event>();
private Thread worker;
private Long processingTimeout;
public void stop() {
log.info("interrupting worker...");
worker.interrupt();
}
@Monitor(startEvent = EventType.STARTED, endEvent = EventType.STOPPED)
public void run() {
log.info("starting worker...");
while (true) {
try {
processEvents();
} catch (InterruptedException e) {
log.info("worker interrupted");
break;
}
}
log.info("worker stopped");
}
@Monitor(exceptions = { @ExceptionEvent(event = EventType.STOPPING, exception = InterruptedException.class) } )
private void processEvents() throws InterruptedException {
Event event;
while((event = events.poll()) != null) {
onEvent(event);
if (Thread.interrupted())
throw new InterruptedException();
}
Thread.sleep(processingTimeout);
}
protected abstract void onEvent(Event event);
public void start() {
worker = new Thread(this);
worker.start();
}
public void setProcessingTimeout(Long processingTimeout) {
this.processingTimeout = processingTimeout;
}
protected void addEvent(Event event) {
events.add(event);
}
}
Re[2]: потоки. стоп старт потока в Runnable ожидающего ин
Здравствуйте, Blazkowicz, Вы писали:
B>Здравствуйте, objet, Вы писали:
B>Зачем там sleep? Флаг finished выставляется сразу после добавления. Соответсвенно поток спит и просыпаясь выходит из цикла ничего не обрабатывая. Почему вообще finished выставляется по факту добавления всех элементов а не по факту их обработки? Задача поставлена сумбурно, а почему код реализован именно так вообще не понятно.
Здравствуйте, objet, Вы писали:
O>синтаксис подчерпнут отсюда.
Знания надо черпать а не синтаксис.
O>jdk7/docs/technotes/guides/concurrency/threadPrimitiveDeprecation.html
Чего это все с JDK7 связываются. Его уже зарелизили?
Здравствуйте, Blazkowicz, Вы писали:
B>Здравствуйте, objet, Вы писали:
O>>синтаксис подчерпнут отсюда. B>Знания надо черпать а не синтаксис.
O>>jdk7/docs/technotes/guides/concurrency/threadPrimitiveDeprecation.html B>Чего это все с JDK7 связываются. Его уже зарелизили?
B>Постарайтесь подчерпнуть что-нибудь отсюда: B>http://java.sun.com/docs/books/tutorial/essential/concurrency/index.html
Re[5]: потоки. стоп старт потока в Runnable ожидающего ин
Здравствуйте, objet, Вы писали:
O>спасибо за наводку. O>никак не мог вспомнить где лежит этот урок.
После освоения базовых моментов рекомендуется так же к прочтению: http://java.sun.com/developer/technicalArticles/J2SE/concurrency/
Дабы не плодить велосипедов.