про lock free stack
От: ioni Россия  
Дата: 22.01.08 10:02
Оценка:
В качестве тестовых экспериментов попробовал набросать реализацию
получилось вот что, но вот беда, при прогоне падает

в связи с этим вопрос это всегда так для lock free или я чего то упустил


#include <stdio.h>
#include <tchar.h>
#include <windows.h>
#include <process.h>

//////////////////////////////////////////////////////////////////////////
bool is_work = true;

//////////////////////////////////////////////////////////////////////////
//compare and swap
bool cas(long* v, long cv, long nv)
{
    return (*v != InterlockedCompareExchange(v, nv, cv) );
}
//////////////////////////////////////////////////////////////////////////
template <class T>
class stack_lf
{
    struct node
    {
        node* next_;
        T value_;
    };
    volatile node* head_;
    //
    static node* make_empty_node()
    {
        node* n = new node();
        n->next_ = 0;
        n->value_ = 0;
        return n;
    }
    static void destroy_node(node volatile * n)
    {
        n->next_ = 0; n->value_ = 0;
        delete n, n = 0;
    }
    //
public:
    stack_lf() : head_( make_empty_node() )
    {
    }
    ~stack_lf()
    {
        volatile node* h = head_;
        while(h)
        {
            volatile node* n = h->next_;
            destroy_node(h);
            h = n;
        }
    }
    //
    void push(T v)
    {
        volatile node* next = make_empty_node();
        next->value_ = v;
        //
        do { next->next_ = head_->next_; } 
        while( !cas( (long*)&head_->next_, (long)next->next_, (long)next ) );
    }
    bool pop(T& val) 
    {
        volatile node* next = 0;
        do 
        { 
            next = head_->next_; 
            if(next == NULL)  { return false;  }
        } 
        while( !cas(((long*)(&(head_->next_))), ((long)next), ((long)(next->next_))) );
        //
        val = next->value_;
        destroy_node(next);
        return true;
    }
};
//////////////////////////////////////////////////////////////////////////
unsigned __stdcall pusher_thread(void* pv)
{
    stack_lf<long>* s = ( stack_lf<long>* )pv;
    long v = 1000;
    while( is_work )
    {
        printf("push value %d\n", v);
        s->push(v);
        ++v;        
    }
    return  0;
}

unsigned __stdcall poper_thread(void* pv)
{
    stack_lf<long>* s = ( stack_lf<long>* )pv;

    while( is_work )
    {
        long v;
        if( s->pop(v) )
        {
            printf("[0x%x] pop value %d\n", GetCurrentThreadId(), v);
        }
    }
    //////////////////////////////////////////////////////////////////////////
    // pop all elements
    long v;
    while( s->pop(v) )
    {
        printf("[0x%x] pop value %d\n", GetCurrentThreadId(), v);
    }

    //////////////////////////////////////////////////////////////////////////
    return  0;
}

int main()
{
    stack_lf<long> s;

    HANDLE hs0 = (HANDLE)_beginthreadex(NULL, 0, &pusher_thread, (void*)(&s), 0, NULL);
    HANDLE hs1 = (HANDLE)_beginthreadex(NULL, 0, &pusher_thread, (void*)(&s), 0, NULL);
    HANDLE hs2 = (HANDLE)_beginthreadex(NULL, 0, &poper_thread, (void*)(&s), 0, NULL);
    HANDLE hs3 = (HANDLE)_beginthreadex(NULL, 0, &poper_thread, (void*)(&s), 0, NULL);

    Sleep(100000);

    is_work = false;

    WaitForSingleObject(hs0, INFINITE);
    WaitForSingleObject(hs1, INFINITE);
    WaitForSingleObject(hs2, INFINITE);
    WaitForSingleObject(hs3, INFINITE);

    CloseHandle(hs0);
    CloseHandle(hs1);
    CloseHandle(hs2);
    CloseHandle(hs3);

    return 0;
}
Re: про lock free stack
От: Glоbus Украина  
Дата: 22.01.08 10:06
Оценка:
Здравствуйте, ioni, Вы писали:

I>В качестве тестовых экспериментов попробовал набросать реализацию

I>получилось вот что, но вот беда, при прогоне падает

I>в связи с этим вопрос это всегда так для lock free или я чего то упустил


Мне кажется, так и должно было быть. Тут банальный конкурентный доступ к ресурсу — никакой синхронизации, ничего нету . Конечно он будет падать.
Удачи тебе, браток!
Re[2]: про lock free stack
От: ioni Россия  
Дата: 22.01.08 10:15
Оценка: -1
Здравствуйте, Glоbus, Вы писали:

G>Здравствуйте, ioni, Вы писали:


I>>В качестве тестовых экспериментов попробовал набросать реализацию

I>>получилось вот что, но вот беда, при прогоне падает

I>>в связи с этим вопрос это всегда так для lock free или я чего то упустил


G>Мне кажется, так и должно было быть. Тут банальный конкурентный доступ к ресурсу — никакой синхронизации, ничего нету . Конечно он будет падать.


вообще то так и задумывалось, так работают lock free алгоритмы
а синхронизация есть
Re: про lock free stack
От: Аноним  
Дата: 22.01.08 10:32
Оценка:
Здравствуйте, ioni, Вы писали:

I>В качестве тестовых экспериментов попробовал набросать реализацию

I>получилось вот что, но вот беда, при прогоне падает

I>в связи с этим вопрос это всегда так для lock free или я чего то упустил



может нужна синхронизация в destroy_node?
Re[2]: про lock free stack
От: ioni Россия  
Дата: 22.01.08 10:41
Оценка:
Здравствуйте, Аноним, Вы писали:

А>Здравствуйте, ioni, Вы писали:


I>>В качестве тестовых экспериментов попробовал набросать реализацию

I>>получилось вот что, но вот беда, при прогоне падает

I>>в связи с этим вопрос это всегда так для lock free или я чего то упустил



А>может нужна синхронизация в destroy_node?


если ее добавить то тогда весь смысл теряется
меня интересует именно как народ, который пишет и пользуется
такими алгоритмами, справляется с этой проблемой
Re: про lock free stack
От: Quasi  
Дата: 22.01.08 11:47
Оценка: 26 (2)
Здравствуйте, ioni, Вы писали:

I>В качестве тестовых экспериментов попробовал набросать реализацию

I>получилось вот что, но вот беда, при прогоне падает

I>в связи с этим вопрос это всегда так для lock free или я чего то упустил


    bool pop(T& val) 
    {
        volatile node* next = 0;
        do 
        { 
            next = head_->next_; 
            if(next == NULL)  { return false;  }
        } 
        while( !cas(((long*)(&(head_->next_))), ((long)next), ((long)(next->next_))) ); //здесь может разыменовываться указатель next (next->next_),
                                                                                        // на уже удаленный в другом потоке объект
        val = next->value_;
        destroy_node(next);
        return true;
    }


Это не единственная проблема в этом коде, здесь присутствует также проблема "ABA", заменяемый c помощью CAS, указатель может претерпеть ряд изменений (например, со значения A на B и затем снова на A), но при этом CAS не "провалиться". Для решения подобного рода проблем, существует ряд технологий: Safe Memory Reclamation (SMR), пару ссылок по RCU и vZOOM здесь
Автор: remark
Дата: 17.06.07
. Советую посмотреть форум: comp.programming.threads, там много интересных реализаций lock-free алгоритмов и идей.
Re[2]: про lock free stack
От: ioni Россия  
Дата: 22.01.08 18:08
Оценка:
Здравствуйте, Quasi, Вы писали:

спасибо за ссылки

в группе честно говоря пока для себя путнего ничего не нашел

а вот этот документ посмотрел: Safe Memory Reclamation (SMR)

попробовал реализовать что называется "один-в-один" и то же пока безрезультатно
получилось примерно вот что


#include <stdio.h>
#include <tchar.h>
#include <windows.h>
#include <process.h>

#include <map>
#include <algorithm>
#include <boost/array.hpp>

//////////////////////////////////////////////////////////////////////////
bool is_work = true;
//////////////////////////////////////////////////////////////////////////
//compare and swap
// if(*v == cv) { *v = nv;  return true;  }
// else { return false; }
bool cas(long* v, long cv, long nv)
{
    return (*v != InterlockedCompareExchange(v, nv, cv) );
}

//////////////////////////////////////////////////////////////////////////
template<class T>
struct node
{
    node* next_;
    T value_;
};
//////////////////////////////////////////////////////////////////////////
template<typename T>
class hazards_storage
{
    static const long max_threads = 8;
    //
    struct smr
    {
        long dcount_;
        boost::array< node< T >* , max_threads> dlist_;
    };
    //////////////////////////////////////////////////////////////////////////
    boost::array< node< T >* , max_threads> hp_;
    std::map< long,  smr* > hazards_;
    long threads_p_;
    long hazards_k_;
    long total_n_;
    long batch_r_;
    //
public:
    hazards_storage() : threads_p_(0), hazards_k_(1), total_n_(0), batch_r_(0)
    {
    }
    //
    void add_id(long id)
    {
        ++threads_p_;
        total_n_ = threads_p_ * hazards_k_;
        batch_r_ = 2 * total_n_;
        //
        smr* s = new smr();
        s->dcount_ = 0;
        hazards_.insert( std::make_pair(id, s) );
    }
    //
    void delete_node(long id, node<T>* n)
    {
        smr* s = hazards_[id];
        if(s == NULL) return;
        s->dlist_[ s->dcount_++ ] = n;
        if(s->dcount_ == batch_r_) scan( id );
    }
    node<T>* get_hazard( long )
    {
        return hp_[0];
    }
    //
private:
    void scan(long id)
    {
        smr* s = hazards_[id];
        boost::array< node< T >* , max_threads>& dlist = s->dlist_;
        long& dcount = s->dcount_;
        //
        node<T>* hptr;
        boost::array< node< T >* , max_threads> plist;
        boost::array< node< T >* , max_threads> new_dlist;
        long p = 0, new_dcount = 0;
        //////////////////////////////////////////////////////////////////////////
        //1.
        for(long i = 0; i < total_n_; ++i)
        {
            hptr = hp_[i];
            if( hptr ) plist[ p++ ] = hptr;
        }
        //////////////////////////////////////////////////////////////////////////
        //2.
        std::sort(plist.begin(), plist.end());
        //////////////////////////////////////////////////////////////////////////
        //3.
        for(long i = 0; i < batch_r_; ++i)
        {
            if(std::binary_search(plist.begin(), plist.end(), dlist[i]) )
            {
                new_dlist[ new_dcount++ ] = dlist[i];
            }
            else
            {
                delete ( dlist[i] );
                dlist[i] = 0;
            }
        }
        //////////////////////////////////////////////////////////////////////////
        //4.
        for(long i = 0; i < new_dcount; ++i) { dlist[i] = new_dlist[i];    }
        dcount = new_dcount;
        //////////////////////////////////////////////////////////////////////////
    }
};
hazards_storage< long > hazards_storage_;
//////////////////////////////////////////////////////////////////////////
template <class T>
void destroy_node( node<T>* n)
{
    hazards_storage_.delete_node( GetCurrentThreadId(), n);
}
//////////////////////////////////////////////////////////////////////////
//
template <class T>
class stack_lf
{
    typedef typename node< T > node;
    node* head_;
    //
    static node* make_new_node(T v = 0)
    {
        node* n = new node();
        n->value_ = v;
        n->next_ = 0;
        return n;
    }
    //////////////////////////////////////////////////////////////////////////
public:
    stack_lf() : head_( 0 )
    {
    }
    ~stack_lf()
    {
        node* h = head_;
        while(h) {  node* n = h->next_; delete h; h = n;    }
    }
    //
    void push(T v)
    {
        node* next = make_new_node( v );
        //
        node* head;
        for(;;)
        {
            head = head_;
            next->next_ = head;
            if( cas( (long*)&head_, (long)head, (long)next) ) break;
        }
    }
    bool pop(node* hazard, T& val) 
    {
        //////////////////////////////////////////////////////////////////////////
         node* head, *next;
         for(;;)
         {
             head = head_;
             if(head == NULL) return false;
             //
             hazard = head;
             if(head != head_) continue;
             //
             next = head->next_;
             if(next == NULL) return false;
             //
             if ( cas( (long*)&head_, (long)head, (long)next)) break;
         }
         val = next->value_;
         hazard = 0;
         destroy_node(head);
         return true;
        //////////////////////////////////////////////////////////////////////////
    }
};
//////////////////////////////////////////////////////////////////////////
unsigned __stdcall pusher_thread(void* pv)
{
    hazards_storage_.add_id( GetCurrentThreadId() );
    //
    stack_lf<long>* s = ( stack_lf<long>* )pv;
    long v = 1000;
    while( is_work )
    {
        printf("push value %d\n", v);
        s->push(v);
        ++v;        
    }
    return  0;
}

unsigned __stdcall poper_thread(void* pv)
{
    hazards_storage_.add_id( GetCurrentThreadId() );
    //
    stack_lf<long>* s = ( stack_lf<long>* )pv;

    while( is_work )
    {
        long v;
        if( s->pop( hazards_storage_.get_hazard( GetCurrentThreadId() ), v) )
        {
            printf("[0x%x] pop value %d\n", GetCurrentThreadId(), v);
        }
    }
    //////////////////////////////////////////////////////////////////////////
    long v;
    while( s->pop(hazards_storage_.get_hazard( GetCurrentThreadId() ),v) )
    {
        printf("[0x%x] pop value %d\n", GetCurrentThreadId(), v);
    }

    //////////////////////////////////////////////////////////////////////////
    return  0;
}

//////////////////////////////////////////////////////////////////////////
int main( )
{
    stack_lf<long> s;

    HANDLE hs0 = (HANDLE)_beginthreadex(NULL, 0, &pusher_thread, (void*)(&s), 0, NULL);
    HANDLE hs1 = (HANDLE)_beginthreadex(NULL, 0, &pusher_thread, (void*)(&s), 0, NULL);
    HANDLE hs2 = (HANDLE)_beginthreadex(NULL, 0, &poper_thread, (void*)(&s), 0, NULL);
    HANDLE hs3 = (HANDLE)_beginthreadex(NULL, 0, &poper_thread, (void*)(&s), 0, NULL);
    //
    Sleep(50000);
    is_work = false;
    WaitForSingleObject(hs0, INFINITE);
    WaitForSingleObject(hs1, INFINITE);
    WaitForSingleObject(hs2, INFINITE);
    WaitForSingleObject(hs3, INFINITE);
    //
    CloseHandle(hs0);
    CloseHandle(hs1);
    CloseHandle(hs2);
    CloseHandle(hs3);

    return 0;
}
Re: про lock free stack
От: remark Россия http://www.1024cores.net/
Дата: 22.01.08 20:16
Оценка:
Здравствуйте, ioni, Вы писали:

I>В качестве тестовых экспериментов попробовал набросать реализацию

I>получилось вот что, но вот беда, при прогоне падает


Полностью согласен с Quasi.

По мелочам ещё.

Фейковый нод, который ты выделяешь в конструкторе не нужен. Если ты посмотришь на код внимательно, то заметишь, что ты всегда используешь только head_->next_, но не head_. Это, конечно, не принципиально, но затрудняет восприятие — если ты 100 раз видел алгоритм в одной записи, а потом немного в другой, то это сбивает с толку

volatile'ов лучше переставить, чем недоставить. В pop() ты объявляешь volatile node* next, хотя обращаешься и к next, а не только к next->next_. Я бы объявил как volatile node* volatile next. Сложно сказать, может ли это повлять на корректность в данном случае. Но поверь, это не те проблемы, с которыми ты хочешь иметь дело



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[2]: про lock free stack
От: remark Россия http://www.1024cores.net/
Дата: 22.01.08 20:22
Оценка:
Здравствуйте, remark, Вы писали:

R>Фейковый нод, который ты выделяешь в конструкторе не нужен. Если ты посмотришь на код внимательно, то заметишь, что ты всегда используешь только head_->next_, но не head_. Это, конечно, не принципиально, но затрудняет восприятие — если ты 100 раз видел алгоритм в одной записи, а потом немного в другой, то это сбивает с толку



Вижу, уже исправил в версии с SMR


R>


1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[3]: SMR
От: remark Россия http://www.1024cores.net/
Дата: 22.01.08 20:42
Оценка:
Здравствуйте, ioni, Вы писали:


I>
I>template<typename T>
I>class hazards_storage
I>{
I>    static const long max_threads = 8;
I>    //
I>    struct smr
I>    {
I>        long dcount_;
I>        boost::array< node< T >* , max_threads> dlist_;
I>    };
I>    //////////////////////////////////////////////////////////////////////////
I>    boost::array< node< T >* , max_threads> hp_;
I>    std::map< long,  smr* > hazards_;
I>    long threads_p_;
I>    long hazards_k_;
I>    long total_n_;
I>    long batch_r_;
I>    //
I>public:
I>    hazards_storage() : threads_p_(0), hazards_k_(1), total_n_(0), batch_r_(0)
I>    {
I>    }
I>    //
I>    void add_id(long id)
I>    {
I>



add_id() надо защитить критической секцией! Блин, ты же её из разных потоков вызываешь!

I>
I>        ++threads_p_;
I>        total_n_ = threads_p_ * hazards_k_;
I>        batch_r_ = 2 * total_n_;
I>        //
I>        smr* s = new smr();
        s->>dcount_ = 0;
I>        hazards_.insert( std::make_pair(id, s) );
I>    }
I>    //
I>    void delete_node(long id, node<T>* n)
I>    {
I>        smr* s = hazards_[id];
I>        if(s == NULL) return;
I>



Тут, видимо, должен быть assert. Иначе будет утечка памяти, если тут произойдёт return.


I>
        s->>dlist_[ s->dcount_++ ] = n;
I>        if(s->dcount_ == batch_r_) scan( id );
I>    }
I>    node<T>* get_hazard( long )
I>    {
I>        return hp_[0];
I>



С этом сложно надеяться на работоспособность кода. Ты же в один и тот же hp пишешь из всех потоков!
Плюс get_hazard() должна возвращать node<T>**, а не node<T>*. Ты же так ничего вообще не запишешь в hp! Ты пишешь всегда во временный объект.


I>
I>    }
I>    //
I>private:
I>    void scan(long id)
I>    {
I>        smr* s = hazards_[id];
I>        boost::array< node< T >* , max_threads>& dlist = s->dlist_;
I>        long& dcount = s->dcount_;
I>        //
I>        node<T>* hptr;
I>        boost::array< node< T >* , max_threads> plist;
I>        boost::array< node< T >* , max_threads> new_dlist;
I>        long p = 0, new_dcount = 0;
I>        //////////////////////////////////////////////////////////////////////////
I>        //1.
I>        for(long i = 0; i < total_n_; ++i)
I>        {
I>            hptr = hp_[i];
I>            if( hptr ) plist[ p++ ] = hptr;
I>        }
I>        //////////////////////////////////////////////////////////////////////////
I>        //2.
I>        std::sort(plist.begin(), plist.end());
I>        //////////////////////////////////////////////////////////////////////////
I>        //3.
I>        for(long i = 0; i < batch_r_; ++i)
I>        {
I>            if(std::binary_search(plist.begin(), plist.end(), dlist[i]) )
I>            {
I>                new_dlist[ new_dcount++ ] = dlist[i];
I>            }
I>            else
I>            {
I>                delete ( dlist[i] );
I>                dlist[i] = 0;
I>            }
I>        }
I>        //////////////////////////////////////////////////////////////////////////
I>        //4.
I>        for(long i = 0; i < new_dcount; ++i) { dlist[i] = new_dlist[i];    }
I>        dcount = new_dcount;
I>        //////////////////////////////////////////////////////////////////////////
I>    }
I>};
I>hazards_storage< long > hazards_storage_;
I>//////////////////////////////////////////////////////////////////////////
I>template <class T>
I>void destroy_node( node<T>* n)
I>{
I>    hazards_storage_.delete_node( GetCurrentThreadId(), n);
I>}
I>//////////////////////////////////////////////////////////////////////////
I>//
I>template <class T>
I>class stack_lf
I>{
I>    typedef typename node< T > node;
I>    node* head_;
I>



volatile!!!


I>
I>    //
I>    static node* make_new_node(T v = 0)
I>    {
I>        node* n = new node();
        n->>value_ = v;
        n->>next_ = 0;
I>        return n;
I>    }
I>    //////////////////////////////////////////////////////////////////////////
I>public:
I>    stack_lf() : head_( 0 )
I>    {
I>    }
I>    ~stack_lf()
I>    {
I>        node* h = head_;
I>        while(h) {  node* n = h->next_; delete h; h = n;    }
I>    }
I>    //
I>    void push(T v)
I>    {
I>        node* next = make_new_node( v );
I>        //
I>        node* head;
I>



volatile!!!


I>
I>        for(;;)
I>        {
I>            head = head_;
            next->>next_ = head;
I>            if( cas( (long*)&head_, (long)head, (long)next) ) break;
I>        }
I>    }
I>    bool pop(node* hazard, T& val) 
I>



Опять же. hazard надо принимать как node** иначе ты пишешь во временную переменную.
Ещё лучше как node* volatile*


I>
I>    {
I>        //////////////////////////////////////////////////////////////////////////
I>         node* head, *next;
I>



volatile!!!


I>
I>         for(;;)
I>         {
I>             head = head_;
I>             if(head == NULL) return false;
I>             //
I>             hazard = head;
I>



#StoreLoad барьер памяти! Если ты делаешь критическую последовательность из load следующего за store, то скорее всего всегда нужен #StoreLoad барьер памяти. В противном случае тебе просто на разрешить конкуренцию между двумя конкурирующими потоками.


I>
I>             if(head != head_) continue;
I>             //
I>             next = head->next_;
I>             if(next == NULL) return false;
I>             //
I>             if ( cas( (long*)&head_, (long)head, (long)next)) break;
I>         }
I>         val = next->value_;
I>



Как минимум барьер для компилятора. Если hazard сбросится до считывания value, то всё накроется.


I>
I>         hazard = 0;
I>         destroy_node(head);
I>         return true;
I>        //////////////////////////////////////////////////////////////////////////
I>    }
I>};
I>




1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re: про lock free stack
От: remark Россия http://www.1024cores.net/
Дата: 22.01.08 20:58
Оценка: 12 (2)
Здравствуйте, ioni, Вы писали:

I>В качестве тестовых экспериментов попробовал набросать реализацию

I>получилось вот что, но вот беда, при прогоне падает
I>в связи с этим вопрос это всегда так для lock free или я чего то упустил


Рекомендую посмотреть на реализацию стека от M$, которая есть в WinAPI. Она полностью рабочая.

ABA проблему они решают с помощью т.н. ABA counter. Т.е. якорь вершины стека выглядит как:
struct anchor
{
  node* head;
  unsigned counter;
};


counter инкрементируется при каждом добавлении (или удалении) элемента. Это защается от того, что ты увидишь тот же head, но при этом head->next уже будет другой. И соотв. для этого они применяют уже InterlockedCompareExchange64().


Возможность обращения к удалённой памяти они решают гораздо более интересно.
pop() выглядит примерно так:
bool pop(T& val) 
{
  volatile node volatile* next;
  for (;;)
  {
   bool fail = false;
   __try
   {
    do 
    { 
      next = head_->next_; 
      if(next == 0)
        return false;
    }
    while ( !cas(((long*)(&(head_->next_))), ((long)next), ((long)(next->next_))) );
   }
   __catch (GetExceptionCode() == EXCEPTION_ACCESS_VIOLATION ? EXCEPTION_EXECUTE_HANDLER : EXCEPTION_CONTINUE_SEARCH)
   {
     fail = true;
   }
   if (!fail)
     break;
  }
  val = next->value_;
  destroy_node(next);
  return true;
}



Очень занятно. Ещё более занятно, как они на 64-битной платформе запихивают в 64 бита: 1. указатель на следующий нод, 2. 16-ти битное кол-во элементов в стеке, 3. ABA счётчик (на 9 бит).



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[4]: SMR
От: ioni Россия  
Дата: 23.01.08 08:23
Оценка:
Здравствуйте, remark, Вы писали:

Спасибо за конструктивную критику
сейчас перепишу посмотрю что выйдет
как же это не просто написать простейший контейнер с lock free

R>add_id() надо защитить критической секцией! Блин, ты же её из разных потоков вызываешь!

ну тут я решил что раз lock free то везде

R>Тут, видимо, должен быть assert. Иначе будет утечка памяти, если тут произойдёт return.

согласен но это пока все таки тестовый пример и на такие детали не обращаешь внимания

R>С этом сложно надеяться на работоспособность кода. Ты же в один и тот же hp пишешь из всех потоков!

да тут промашка вышла исправлю

R>Плюс get_hazard() должна возвращать node<T>**, а не node<T>*. Ты же так ничего вообще не запишешь в hp!

R>Ты пишешь всегда во временный объект.
поправлю

R>volatile!!!

добавлю везде

R>#StoreLoad барьер памяти! Если ты делаешь критическую последовательность из load следующего за store, то скорее всего всегда нужен R>#StoreLoad барьер памяти. В противном случае тебе просто на разрешить конкуренцию между двумя конкурирующими потоками.


#StoreLoad барьер памяти — это видмо что специфичное для windows искать или уже есть какой нибудь готовый объект ?

R>Как минимум барьер для компилятора. Если hazard сбросится до считывания value, то всё накроется.

а это каким образом обеспечить?

ну если получиться дописать что бы не падало
Re[2]: про lock free stack
От: ioni Россия  
Дата: 23.01.08 08:24
Оценка:
Здравствуйте, remark, Вы писали:

R>Здравствуйте, ioni, Вы писали:


I>>В качестве тестовых экспериментов попробовал набросать реализацию

I>>получилось вот что, но вот беда, при прогоне падает
I>>в связи с этим вопрос это всегда так для lock free или я чего то упустил

R>Рекомендую посмотреть на реализацию стека от M$, которая есть в WinAPI. Она полностью рабочая.


а где искать я посмотрел sdk и не нашел
Re[3]: про lock free stack
От: ioni Россия  
Дата: 23.01.08 09:54
Оценка:
вот что получилось после исправлений
но результат пока по прежнему не удовлетворительный

template<class T>
struct node
{
    volatile node* next_;
    T value_;
};
//////////////////////////////////////////////////////////////////////////
template<typename T>
class hazards_storage
{
    static const long max_threads = 8;
    //
    struct smr
    {
        long dcount_;
        boost::array< volatile node< T >* , max_threads> dlist_;
    };
    //////////////////////////////////////////////////////////////////////////
    std::map< long, volatile node< T >* > hp_;
    std::map< long,  smr* > hazards_;
    long threads_p_;
    long hazards_k_;
    long total_n_;
    long batch_r_;
    //
    boost::mutex guard_;
    //
public:
    hazards_storage() : threads_p_(0), hazards_k_(1), total_n_(0), batch_r_(0)
    {        
    }
    ~hazards_storage()
    {
        std::map< long,  smr* >::iterator i = hazards_.begin(), e = hazards_.end();
        for(; i != e; ++i)
        {
            smr* s = (*i).second;
            delete s, s = 0;
        }
        hazards_.clear();
    }
    //
    void add_id(long id)
    {
        boost::mutex::scoped_lock l(guard_);
        ++threads_p_;
        total_n_ = threads_p_ * hazards_k_;
        batch_r_ = 2 * total_n_;
        //
        smr* s = new smr();
        s->dcount_ = 0;
        hazards_.insert( std::make_pair(id, s) );
        //
        hp_.insert( std::make_pair(id, (volatile node< T >*)0));
    }
    //
    void remove_id(long id)
    {
        boost::mutex::scoped_lock l(guard_);
        //... 
    }
    //
    void delete_node(long id, volatile node<T>* n)
    {
        smr* s = hazards_[id];
        if(s == NULL) { assert(false);   return; }
        s->dlist_[ s->dcount_++ ] = n;
        if(s->dcount_ == batch_r_) scan( id );
    }
    volatile node<T>** get_hazard( long id )
    {
        return &hp_[ id ];
    }
    //
private:
    void scan(long id)
    {
        smr* s = hazards_[id];
        boost::array< volatile node< T >* , max_threads>& dlist = s->dlist_;
        long& dcount = s->dcount_;
        //
        volatile node<T>* hptr;
        boost::array< volatile node< T >* , max_threads> plist;
        boost::array< volatile node< T >* , max_threads> new_dlist;
        long p = 0, new_dcount = 0;
        //////////////////////////////////////////////////////////////////////////
        //1.
        std::map< long, volatile node< T >* >::iterator i = hp_.begin(), e = hp_.end();
        for(; i != e; ++i)
        {
            hptr = (*i).second;
            if( hptr ) plist[ p++ ] = hptr;
        }
        //////////////////////////////////////////////////////////////////////////
        //2.
        std::sort(plist.begin(), plist.end());
        //////////////////////////////////////////////////////////////////////////
        //3.
        for(long i = 0; i < batch_r_; ++i)
        {
            if(std::binary_search(plist.begin(), plist.end(), dlist[i]) )
            {
                new_dlist[ new_dcount++ ] = dlist[i];
            }
            else
            {
                delete ( dlist[i] );
                dlist[i] = 0;
            }
        }
        //////////////////////////////////////////////////////////////////////////
        //4.
        for(long i = 0; i < new_dcount; ++i) { dlist[i] = new_dlist[i];    }
        dcount = new_dcount;
        //////////////////////////////////////////////////////////////////////////
    }
};
hazards_storage< long > hazards_storage_;
//////////////////////////////////////////////////////////////////////////
template <class T>
void destroy_node( volatile node<T>* n)
{
    hazards_storage_.delete_node( GetCurrentThreadId(), n);
}
//////////////////////////////////////////////////////////////////////////
//
template <class T>
class stack_lf
{
    typedef typename node< T > node;
    volatile node* head_;
    //
    static node* make_new_node(T v = 0)
    {
        node* n = new node();
        n->value_ = v;
        n->next_ = 0;
        return n;
    }
    //////////////////////////////////////////////////////////////////////////
public:
    stack_lf() : head_( 0 )
    {
    }
    ~stack_lf()
    {
        volatile node* h = head_;
        while(h) {  volatile node* n = h->next_; delete h; h = n;    }
    }
    //
    void push(T v)
    {
        node volatile* next = make_new_node( v );
        //
        node volatile* volatile head;
        for(;;)
        {
            head = head_;
            next->next_ = head;
            if( cas( (long*)&head_, (long)head, (long)next) ) return;
        }
    }
    bool pop( node volatile* volatile* hazard, T& val) 
    {
        //////////////////////////////////////////////////////////////////////////
         node volatile* head, volatile* next;
         for(;;)
         {
             head = head_;
             if(head == NULL) return false;
             //
             //*hazard = head;
             InterlockedExchangePointer(hazard, head);
             if(head != head_) continue;
             //
             next = head->next_;
             if(next == NULL) return false;
             //
             if ( cas( (long*)&head_, (long)head, (long)next)) break;
         }
         val = next->value_;
         //*hazard = 0;
         InterlockedExchangePointer(hazard, 0);
         destroy_node(head);
         return true;
        //////////////////////////////////////////////////////////////////////////
    }
};
Re: про lock free stack
От: nen777w  
Дата: 23.01.08 11:14
Оценка:
Сорьки а что есть сабж?
Для чего и в каких ситуациях надо, если можно в двух словах.
Приведенный код изучать не охота пока.
Re[5]: SMR
От: remark Россия http://www.1024cores.net/
Дата: 23.01.08 12:03
Оценка:
Здравствуйте, ioni, Вы писали:

I>Здравствуйте, remark, Вы писали:


I>Спасибо за конструктивную критику

I>сейчас перепишу посмотрю что выйдет
I>как же это не просто написать простейший контейнер с lock free

R>>add_id() надо защитить критической секцией! Блин, ты же её из разных потоков вызываешь!

I>ну тут я решил что раз lock free то везде

Ну тогда и делай аккуратно через атомарные изменения. lock free — это не просто убирание критических секций
Вообще, если ты делаешь lock free, то какие-либо стандартные готовые контейнеры — это скорее всего не то, что ты хочешь использовать. Все контейнеры скорее всего в конечном итоге тебе придётся переписать самому (если ты не согласен просто защащать их критическими секциями).




R>>#StoreLoad барьер памяти! Если ты делаешь критическую последовательность из load следующего за store, то скорее всего всегда нужен R>#StoreLoad барьер памяти. В противном случае тебе просто на разрешить конкуренцию между двумя конкурирующими потоками.


I>#StoreLoad барьер памяти — это видмо что специфичное для windows искать или уже есть какой нибудь готовый объект ?


_mm_mfence()


R>>Как минимум барьер для компилятора. Если hazard сбросится до считывания value, то всё накроется.

I>а это каким образом обеспечить?

_ReadWriteBarrier()


I>ну если получиться дописать что бы не падало


1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[3]: про lock free stack
От: remark Россия http://www.1024cores.net/
Дата: 23.01.08 12:07
Оценка:
Здравствуйте, ioni, Вы писали:

I>
I>    void delete_node(long id, node<T>* n)
I>    {
I>        smr* s = hazards_[id];
I>        if(s == NULL) return;
I>        s->>dlist_[ s->dcount_++ ] = n;
I>        if(s->dcount_ == batch_r_) scan( id );
I>    }
I>



Да ещё забыл, тут должно быть что-то типа такого:

    void delete_node(long id, node<T>* n)
    {
        smr* s = hazards_[id];
        if(s == NULL) return;
        if(s->dcount_ == batch_r_) scan( id );
        if(s->dcount_ == batch_r_) delete n;
        s->dlist_[ s->dcount_++ ] = n;
    }


Иначе у тебя может быть переполнение массива dlist_



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[4]: про lock free stack
От: remark Россия http://www.1024cores.net/
Дата: 23.01.08 12:15
Оценка:
Здравствуйте, ioni, Вы писали:

I>вот что получилось после исправлений

I>но результат пока по прежнему не удовлетворительный


А что не удовлетворяет? Падает? Просто досконально изучить код достаточно сложно и нужно много времени, я пока просто написал то, что лежало на поверхности.


Вообще смотри USENET группу comp.programming.threads:
http://groups.google.com/group/comp.programming.threads/topics?hl=en

Вот буквально за последние дни вопрос по этому стеку поднимался несколько раз:
http://groups.google.com/group/comp.programming.threads/browse_frm/thread/d052b4de8810c0f8
http://groups.google.com/group/comp.programming.threads/browse_frm/thread/1e166dc04c3df118

А в архивах должны быть десятки обсуждений по этому вопросу.

Вот ещё можешь посмотреть алгоритм proxy collector:
http://groups.google.com/group/comp.programming.threads/browse_frm/thread/3ded0e3c5496eaa0?hl=en#
Это тоже вариант алгоритма для управления памятью в lock-free алгоритмах — замена SMR



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[2]: про lock free stack
От: remark Россия http://www.1024cores.net/
Дата: 23.01.08 12:20
Оценка:
Здравствуйте, nen777w, Вы писали:

N>Сорьки а что есть сабж?

N>Для чего и в каких ситуациях надо, если можно в двух словах.
N>Приведенный код изучать не охота пока.

http://en.wikipedia.org/wiki/Lock-free


1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[5]: про lock free stack
От: ioni Россия  
Дата: 23.01.08 13:17
Оценка:
Здравствуйте, remark, Вы писали:

спаисбо подправлю
ссылки посмотрю

ага падает зараза

неееее все контейнеры, я не собираюсь переписывать
меня сейчас интересует подход и во что в принципе
выливается реализация lock freeи с точки зрения трудоемкости
и с точки зрения стабильности работы, поэтому был выбран самый простой контейнер
и пока результат меня не удовлетворяет (в теории выходит все гладко)
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.