Сообщение Re[2]: 10K problem for keep-alive utility от 11.11.2023 17:02
Изменено 11.11.2023 17:10 avovana
Re[2]: 10K problem for keep-alive utility
Здравствуйте, Ip Man, Вы писали:
IM>Два потока.
IM>- Первый вызывает epoll в бесконечном цикле и записывает события в shared memory queue
IM>- Второй в бесконечном цикле читает события из shared memory queue и пишет в консоль или файл.
IM>Всё. Мьютекс не нужен, shared memory queue реализуется с помощью атомиков.
Здорово! Думал о memory mapped file. Но его нужно расширять. Как подметили выше, что если много потоков, то каждый немного в свой кусок будет писать.
Для читателя сложно будет вычитывать. Непоследовательно будет. Думал просто сделать фикс. размера.
А есть реализация очереди в shared_memory?
Ранее нашёл циклический буфер с атомиком. Вы про это?
https://www.codeproject.com/Articles/43510/Lock-Free-Single-Producer-Single-Consumer-Circular#heading_platform_comparison
То есть, мне сделать shm_open с данным узнаваемым для каждого потока id с размером = size of такой очереди с таким-то capacity исходя из такого-то типа элементов?
Насколько понял, так это делается. Потом получая этот кусок кастить указатель к такому типу очереди.
IM>Два потока.
IM>- Первый вызывает epoll в бесконечном цикле и записывает события в shared memory queue
IM>- Второй в бесконечном цикле читает события из shared memory queue и пишет в консоль или файл.
IM>Всё. Мьютекс не нужен, shared memory queue реализуется с помощью атомиков.
Здорово! Думал о memory mapped file. Но его нужно расширять. Как подметили выше, что если много потоков, то каждый немного в свой кусок будет писать.
Для читателя сложно будет вычитывать. Непоследовательно будет. Думал просто сделать фикс. размера.
А есть реализация очереди в shared_memory?
Ранее нашёл циклический буфер с атомиком. Вы про это?
https://www.codeproject.com/Articles/43510/Lock-Free-Single-Producer-Single-Consumer-Circular#heading_platform_comparison
#include <atomic>
#include <cstddef>
namespace memory_relaxed_aquire_release {
template<typename Element, size_t Size>
class CircularFifo{
public:
enum { Capacity = Size+1 };
CircularFifo() : _tail(0), _head(0){}
virtual ~CircularFifo() {}
bool push(const Element& item); // pushByMOve?
bool pop(Element& item);
bool wasEmpty() const;
bool wasFull() const;
bool isLockFree() const;
private:
size_t increment(size_t idx) const;
std::atomic <size_t> _tail; // tail(input) index
Element _array[Capacity];
std::atomic<size_t> _head; // head(output) index
};
template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::push(const Element& item)
{
const auto current_tail = _tail.load(std::memory_order_relaxed);
const auto next_tail = increment(current_tail);
if(next_tail != _head.load(std::memory_order_acquire))
{
_array[current_tail] = item;
_tail.store(next_tail, std::memory_order_release);
return true;
}
return false; // full queue
}
// Pop by Consumer can only update the head (load with relaxed, store with release)
// the tail must be accessed with at least aquire
template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::pop(Element& item)
{
const auto current_head = _head.load(std::memory_order_relaxed);
if(current_head == _tail.load(std::memory_order_acquire))
return false; // empty queue
item = _array[current_head];
_head.store(increment(current_head), std::memory_order_release);
return true;
}
template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::wasEmpty() const
{
// snapshot with acceptance of that this comparison operation is not atomic
return (_head.load() == _tail.load());
}
// snapshot with acceptance that this comparison is not atomic
template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::wasFull() const
{
const auto next_tail = increment(_tail.load()); // aquire, we dont know who call
return (next_tail == _head.load());
}
template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::isLockFree() const
{
return (_tail.is_lock_free() && _head.is_lock_free());
}
template<typename Element, size_t Size>
size_t CircularFifo<Element, Size>::increment(size_t idx) const
{
return (idx + 1) % Capacity;
}
То есть, мне сделать shm_open с данным узнаваемым для каждого потока id с размером = size of такой очереди с таким-то capacity исходя из такого-то типа элементов?
Насколько понял, так это делается. Потом получая этот кусок кастить указатель к такому типу очереди.
Re[2]: 10K problem for keep-alive utility
Здравствуйте, Ip Man, Вы писали:
IM>Два потока.
IM>- Первый вызывает epoll в бесконечном цикле и записывает события в shared memory queue
IM>- Второй в бесконечном цикле читает события из shared memory queue и пишет в консоль или файл.
IM>Всё. Мьютекс не нужен, shared memory queue реализуется с помощью атомиков.
Здорово! Думал о memory mapped file. Но его нужно расширять. Как подметили выше, что если много потоков, то каждый немного в свой кусок будет писать.
Для читателя сложно будет вычитывать. Непоследовательно будет. Думал просто сделать фикс. размера.
А есть реализация очереди в shared_memory?
Ранее нашёл циклический буфер с атомиком. Вы про это?
https://www.codeproject.com/Articles/43510/Lock-Free-Single-Producer-Single-Consumer-Circular#heading_platform_comparison
То есть, мне сделать shm_open с данным узнаваемым для каждого потока id с размером = size of такой очереди с таким-то capacity исходя из такого-то типа элементов?
Насколько понял, так это делается. Потом получая этот кусок кастить указатель к такому типу очереди получая экзэмляр.
IM>Два потока.
IM>- Первый вызывает epoll в бесконечном цикле и записывает события в shared memory queue
IM>- Второй в бесконечном цикле читает события из shared memory queue и пишет в консоль или файл.
IM>Всё. Мьютекс не нужен, shared memory queue реализуется с помощью атомиков.
Здорово! Думал о memory mapped file. Но его нужно расширять. Как подметили выше, что если много потоков, то каждый немного в свой кусок будет писать.
Для читателя сложно будет вычитывать. Непоследовательно будет. Думал просто сделать фикс. размера.
А есть реализация очереди в shared_memory?
Ранее нашёл циклический буфер с атомиком. Вы про это?
https://www.codeproject.com/Articles/43510/Lock-Free-Single-Producer-Single-Consumer-Circular#heading_platform_comparison
#include <atomic>
#include <cstddef>
namespace memory_relaxed_aquire_release {
template<typename Element, size_t Size>
class CircularFifo{
public:
enum { Capacity = Size+1 };
CircularFifo() : _tail(0), _head(0){}
virtual ~CircularFifo() {}
bool push(const Element& item); // pushByMOve?
bool pop(Element& item);
bool wasEmpty() const;
bool wasFull() const;
bool isLockFree() const;
private:
size_t increment(size_t idx) const;
std::atomic <size_t> _tail; // tail(input) index
Element _array[Capacity];
std::atomic<size_t> _head; // head(output) index
};
template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::push(const Element& item)
{
const auto current_tail = _tail.load(std::memory_order_relaxed);
const auto next_tail = increment(current_tail);
if(next_tail != _head.load(std::memory_order_acquire))
{
_array[current_tail] = item;
_tail.store(next_tail, std::memory_order_release);
return true;
}
return false; // full queue
}
// Pop by Consumer can only update the head (load with relaxed, store with release)
// the tail must be accessed with at least aquire
template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::pop(Element& item)
{
const auto current_head = _head.load(std::memory_order_relaxed);
if(current_head == _tail.load(std::memory_order_acquire))
return false; // empty queue
item = _array[current_head];
_head.store(increment(current_head), std::memory_order_release);
return true;
}
template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::wasEmpty() const
{
// snapshot with acceptance of that this comparison operation is not atomic
return (_head.load() == _tail.load());
}
// snapshot with acceptance that this comparison is not atomic
template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::wasFull() const
{
const auto next_tail = increment(_tail.load()); // aquire, we dont know who call
return (next_tail == _head.load());
}
template<typename Element, size_t Size>
bool CircularFifo<Element, Size>::isLockFree() const
{
return (_tail.is_lock_free() && _head.is_lock_free());
}
template<typename Element, size_t Size>
size_t CircularFifo<Element, Size>::increment(size_t idx) const
{
return (idx + 1) % Capacity;
}
То есть, мне сделать shm_open с данным узнаваемым для каждого потока id с размером = size of такой очереди с таким-то capacity исходя из такого-то типа элементов?
Насколько понял, так это делается. Потом получая этот кусок кастить указатель к такому типу очереди получая экзэмляр.