Информация об изменениях

Сообщение Re[2]: 10K problem for keep-alive utility от 11.11.2023 17:02

Изменено 11.11.2023 17:10 avovana

Re[2]: 10K problem for keep-alive utility
Здравствуйте, Ip Man, Вы писали:

IM>Два потока.


IM>- Первый вызывает epoll в бесконечном цикле и записывает события в shared memory queue

IM>- Второй в бесконечном цикле читает события из shared memory queue и пишет в консоль или файл.

IM>Всё. Мьютекс не нужен, shared memory queue реализуется с помощью атомиков.


Здорово! Думал о memory mapped file. Но его нужно расширять. Как подметили выше, что если много потоков, то каждый немного в свой кусок будет писать.
Для читателя сложно будет вычитывать. Непоследовательно будет. Думал просто сделать фикс. размера.

А есть реализация очереди в shared_memory?
Ранее нашёл циклический буфер с атомиком. Вы про это?
https://www.codeproject.com/Articles/43510/Lock-Free-Single-Producer-Single-Consumer-Circular#heading_platform_comparison

#include <atomic>
#include <cstddef>
namespace memory_relaxed_aquire_release {
template<typename Element, size_t Size> 
class CircularFifo{
public:
  enum { Capacity = Size+1 };

  CircularFifo() : _tail(0), _head(0){}   
  virtual ~CircularFifo() {}

  bool push(const Element& item); // pushByMOve?
  bool pop(Element& item);

  bool wasEmpty() const;
  bool wasFull() const;
  bool isLockFree() const;

private:
  size_t increment(size_t idx) const; 

  std::atomic <size_t>  _tail;  // tail(input) index
  Element    _array[Capacity];
  std::atomic<size_t>   _head; // head(output) index
};

template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::push(const Element& item)
{    
  const auto current_tail = _tail.load(std::memory_order_relaxed); 
  const auto next_tail = increment(current_tail); 
  if(next_tail != _head.load(std::memory_order_acquire))                           
  {    
    _array[current_tail] = item;
    _tail.store(next_tail, std::memory_order_release); 
    return true;
  }
  
  return false; // full queue

}


// Pop by Consumer can only update the head (load with relaxed, store with release)
//     the tail must be accessed with at least aquire
template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::pop(Element& item)
{
  const auto current_head = _head.load(std::memory_order_relaxed);  
  if(current_head == _tail.load(std::memory_order_acquire)) 
    return false; // empty queue

  item = _array[current_head]; 
  _head.store(increment(current_head), std::memory_order_release); 
  return true;
}

template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::wasEmpty() const
{
  // snapshot with acceptance of that this comparison operation is not atomic
  return (_head.load() == _tail.load()); 
}


// snapshot with acceptance that this comparison is not atomic
template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::wasFull() const
{
  const auto next_tail = increment(_tail.load()); // aquire, we dont know who call
  return (next_tail == _head.load());
}


template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::isLockFree() const
{
  return (_tail.is_lock_free() && _head.is_lock_free());
}

template<typename Element, size_t Size>
size_t CircularFifo<Element, Size>::increment(size_t idx) const
{
  return (idx + 1) % Capacity;
}

То есть, мне сделать shm_open с данным узнаваемым для каждого потока id с размером = size of такой очереди с таким-то capacity исходя из такого-то типа элементов?
Насколько понял, так это делается. Потом получая этот кусок кастить указатель к такому типу очереди.
Re[2]: 10K problem for keep-alive utility
Здравствуйте, Ip Man, Вы писали:

IM>Два потока.


IM>- Первый вызывает epoll в бесконечном цикле и записывает события в shared memory queue

IM>- Второй в бесконечном цикле читает события из shared memory queue и пишет в консоль или файл.

IM>Всё. Мьютекс не нужен, shared memory queue реализуется с помощью атомиков.


Здорово! Думал о memory mapped file. Но его нужно расширять. Как подметили выше, что если много потоков, то каждый немного в свой кусок будет писать.
Для читателя сложно будет вычитывать. Непоследовательно будет. Думал просто сделать фикс. размера.

А есть реализация очереди в shared_memory?
Ранее нашёл циклический буфер с атомиком. Вы про это?
https://www.codeproject.com/Articles/43510/Lock-Free-Single-Producer-Single-Consumer-Circular#heading_platform_comparison

#include <atomic>
#include <cstddef>
namespace memory_relaxed_aquire_release {
template<typename Element, size_t Size> 
class CircularFifo{
public:
  enum { Capacity = Size+1 };

  CircularFifo() : _tail(0), _head(0){}   
  virtual ~CircularFifo() {}

  bool push(const Element& item); // pushByMOve?
  bool pop(Element& item);

  bool wasEmpty() const;
  bool wasFull() const;
  bool isLockFree() const;

private:
  size_t increment(size_t idx) const; 

  std::atomic <size_t>  _tail;  // tail(input) index
  Element    _array[Capacity];
  std::atomic<size_t>   _head; // head(output) index
};

template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::push(const Element& item)
{    
  const auto current_tail = _tail.load(std::memory_order_relaxed); 
  const auto next_tail = increment(current_tail); 
  if(next_tail != _head.load(std::memory_order_acquire))                           
  {    
    _array[current_tail] = item;
    _tail.store(next_tail, std::memory_order_release); 
    return true;
  }
  
  return false; // full queue

}


// Pop by Consumer can only update the head (load with relaxed, store with release)
//     the tail must be accessed with at least aquire
template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::pop(Element& item)
{
  const auto current_head = _head.load(std::memory_order_relaxed);  
  if(current_head == _tail.load(std::memory_order_acquire)) 
    return false; // empty queue

  item = _array[current_head]; 
  _head.store(increment(current_head), std::memory_order_release); 
  return true;
}

template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::wasEmpty() const
{
  // snapshot with acceptance of that this comparison operation is not atomic
  return (_head.load() == _tail.load()); 
}


// snapshot with acceptance that this comparison is not atomic
template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::wasFull() const
{
  const auto next_tail = increment(_tail.load()); // aquire, we dont know who call
  return (next_tail == _head.load());
}


template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::isLockFree() const
{
  return (_tail.is_lock_free() && _head.is_lock_free());
}

template<typename Element, size_t Size>
size_t CircularFifo<Element, Size>::increment(size_t idx) const
{
  return (idx + 1) % Capacity;
}

То есть, мне сделать shm_open с данным узнаваемым для каждого потока id с размером = size of такой очереди с таким-то capacity исходя из такого-то типа элементов?
Насколько понял, так это делается. Потом получая этот кусок кастить указатель к такому типу очереди получая экзэмляр.