потоки. стоп старт потока в Runnable ожидающего инпут
От: objet  
Дата: 12.06.09 11:38
Оценка:
Здравствуйте.

имеется работающая реализация dataQueue с Producer и Receiver

Поток продьюсера считаывает горизонтально дюжину файлов, сортирует содержимое,
приводит в один List<DataItem> и обновляет цепочку данных

Поток ресивера считывает без промедления цепочку данных.

Потоки продьюсера и ресивера синхронизированы на сигнальный обьект, следящий за размером цепочки и концами файлов.

В ресивере каждый полученный DataItem должен будить потоки группы Наблюдателей
и без модификции считываться в них параллельно.

Пробная реализация Runnable Обсервера, с попыткой пробуждения, усыпления потока
через volatile поля не работает, входные данные (Integer) не обрабатывается.

Мне будут интересны указания на мои ошибки с реализацией

void run(){
  synchronized(this){
  }
}


с уважением,
обьект


////////////////////////////////////////////////////////
public class ObserverTest {

    static class Observer implements Runnable{
       private volatile boolean threadSuspended;
       private volatile boolean threadFinished;
       private Integer item;

        Observer(){
            threadSuspended = true;
            threadFinished = false;
        }

        public boolean isThreadSuspended() {
            return threadSuspended;
        }

        public Integer getItem() {
            return item;
        }
        public void setItem(Integer item) {
            this.item = item;
            setThreadSuspended(false); //
        }

        public void setThreadSuspended(boolean threadSuspended) {
            this.threadSuspended = threadSuspended;
        }

        public void setThreadFinished() {
            this.threadFinished = true;
        }

        public void run(){

            long interval = 10;  //in ms
            Thread thisThread = Thread.currentThread();
            synchronized(this){
                while (true) {
                    try {
                        thisThread.sleep(interval);


                        while (threadSuspended){
                            wait();
                        }

                        if (threadFinished)
                            break;

                    //processing
                        System.out.println(thisThread.getId()+" id  processing item: "+item);

                    //putting thread back to wait  state
                        this.setThreadSuspended(true);

                    } catch (InterruptedException e){
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    public static void main(String[] args) {
        Observer  obs = new Observer();
        Thread observerT = new Thread(obs);
        
        observerT.start();

        obs.setItem(new Integer(1));
        obs.setItem(new Integer(2));
        obs.setItem(new Integer(3));

        obs.setThreadFinished();
    }
}

Не забываем использовать теги подсветки кода. Blazkowicz
Re: потоки. стоп старт потока в Runnable ожидающего инпут
От: KRA Украина  
Дата: 12.06.09 12:01
Оценка:
Здравствуйте, objet, Вы писали:

явные проблемы
1. вызывается wait и на нём поток будет висеть вечно. Чтоб исправить нужно в setItem вызывать notify. Поле threadSuspended — вообще не нужно. Похоже Вы не правильно понимаете, что делает wait.
2. некоторые item-ы могут теряться. Если записывать их быстрей чем обрабатывать, то последующий просто затрёт предыдущий. Нужно хранить очередь item-ов ожидающих обработки.
Re: потоки. стоп старт потока в Runnable ожидающего инпут
От: Blazkowicz Россия  
Дата: 12.06.09 12:06
Оценка:
Здравствуйте, objet, Вы писали:

Зачем там sleep? Флаг finished выставляется сразу после добавления. Соответсвенно поток спит и просыпаясь выходит из цикла ничего не обрабатывая. Почему вообще finished выставляется по факту добавления всех элементов а не по факту их обработки? Задача поставлена сумбурно, а почему код реализован именно так вообще не понятно.
Re[2]: потоки. стоп старт потока в Runnable ожидающего ин
От: KRA Украина  
Дата: 12.06.09 12:07
Оценка:
Здравствуйте, KRA, Вы писали:

KRA>Здравствуйте, objet, Вы писали:


KRA>явные проблемы

KRA>1. вызывается wait и на нём поток будет висеть вечно. Чтоб исправить нужно в setItem вызывать notify. Поле threadSuspended — вообще не нужно. Похоже Вы не правильно понимаете, что делает wait.
KRA>2. некоторые item-ы могут теряться. Если записывать их быстрей чем обрабатывать, то последующий просто затрёт предыдущий. Нужно хранить очередь item-ов ожидающих обработки.

И вдогонку, тут вообще можно обойтись без блокировок. Вот пример

public abstract class BackgroundEventProcessor<Event> implements Runnable {
    protected final Log log = LogFactory.getLog(getClass());
    private final Queue<Event> events = new ConcurrentLinkedQueue<Event>();
    private Thread worker;

    private Long processingTimeout;

    public void stop() {
        log.info("interrupting worker...");
        worker.interrupt();
    }

    @Monitor(startEvent = EventType.STARTED, endEvent = EventType.STOPPED)
    public void run() {
        log.info("starting worker...");
        while (true) {
            try {
                processEvents();
            } catch (InterruptedException e) {
                log.info("worker interrupted");
                break;
            }
        }
        log.info("worker stopped");
    }

    @Monitor(exceptions = { @ExceptionEvent(event = EventType.STOPPING, exception = InterruptedException.class) } )
    private void processEvents() throws InterruptedException {
        Event event;
        while((event = events.poll()) != null) {
            onEvent(event);
            if (Thread.interrupted())
                throw new InterruptedException();
        }

        Thread.sleep(processingTimeout);
    }

    protected abstract void onEvent(Event event);

    public void start() {
        worker = new Thread(this);
        worker.start();
    }


    public void setProcessingTimeout(Long processingTimeout) {
        this.processingTimeout = processingTimeout;
    }

    protected void addEvent(Event event) {
        events.add(event);
    }
}
Re[2]: потоки. стоп старт потока в Runnable ожидающего ин
От: objet  
Дата: 12.06.09 12:47
Оценка:
Здравствуйте, Blazkowicz, Вы писали:

B>Здравствуйте, objet, Вы писали:


B>Зачем там sleep? Флаг finished выставляется сразу после добавления. Соответсвенно поток спит и просыпаясь выходит из цикла ничего не обрабатывая. Почему вообще finished выставляется по факту добавления всех элементов а не по факту их обработки? Задача поставлена сумбурно, а почему код реализован именно так вообще не понятно.



синтаксис подчерпнут отсюда.

jdk7/docs/technotes/guides/concurrency/threadPrimitiveDeprecation.html
Re[3]: потоки. стоп старт потока в Runnable ожидающего ин
От: Blazkowicz Россия  
Дата: 12.06.09 12:52
Оценка:
Здравствуйте, objet, Вы писали:

O>синтаксис подчерпнут отсюда.

Знания надо черпать а не синтаксис.

O>jdk7/docs/technotes/guides/concurrency/threadPrimitiveDeprecation.html

Чего это все с JDK7 связываются. Его уже зарелизили?

Постарайтесь подчерпнуть что-нибудь отсюда:
http://java.sun.com/docs/books/tutorial/essential/concurrency/index.html
Re[4]: потоки. стоп старт потока в Runnable ожидающего ин
От: objet  
Дата: 12.06.09 13:43
Оценка:
спасибо за наводку.

никак не мог вспомнить где лежит этот урок.


Здравствуйте, Blazkowicz, Вы писали:

B>Здравствуйте, objet, Вы писали:


O>>синтаксис подчерпнут отсюда.

B>Знания надо черпать а не синтаксис.

O>>jdk7/docs/technotes/guides/concurrency/threadPrimitiveDeprecation.html

B>Чего это все с JDK7 связываются. Его уже зарелизили?

B>Постарайтесь подчерпнуть что-нибудь отсюда:

B>http://java.sun.com/docs/books/tutorial/essential/concurrency/index.html
Re[5]: потоки. стоп старт потока в Runnable ожидающего ин
От: Blazkowicz Россия  
Дата: 12.06.09 14:45
Оценка: 2 (1)
Здравствуйте, objet, Вы писали:

O>спасибо за наводку.

O>никак не мог вспомнить где лежит этот урок.
После освоения базовых моментов рекомендуется так же к прочтению:
http://java.sun.com/developer/technicalArticles/J2SE/concurrency/
Дабы не плодить велосипедов.
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.