Появилась задача реализовать наколенную ESB по-быстрому, а частью ESB является реализация очередей (queues). В качестве хранилища имеется либо файловая система, либо Oracle. Файловую систему я сразу отбросил, начал с реализации на оракле в виде таблиц.
Создал таблицу, загоняю в неё входящие сообщения (в виде поля NCLOB), отдельные потоки читают свои очереди и рассылают сообщения получателям.
Разумеется возник вопрос — как читать нескольким рассылателям из одной очереди, при этом чтобы не было возможности прочитать одно и то же сообщение дважды?
Начал делать через дополнительную таблицу, т.е. если сама таблица с очередями у меня такая:
create table msg_queue (
id number not null,
queue_name nvarchar2(100) not null,
priority number not null,
date_created date not null,
msg_content nclob
);
к ней создаю вторую таблицу для блокирования:
create table msg_queue_locked (
id number not null
);
и на ней PK.
Соот-но предполагал реализовать такой алгоритм:
1. выбираем следующее сообщение для рассылки.
2. вставляем его ID в msg_queue_locked. коммитим транзакцию.
3. раз коммит удался, значит никто другой уже это сообщение не возьмёт, значит высылаем сообщение получателю и удаляем записи из обеих таблиц.
всё бы ничего но не нравится коммит транзакции в середине операции. Выходит так, что если после этого коммита процесс или БД упадёт, то сообщение останется зависшим. Разумеется можно вводить всякие дополнительные процессы, контролирующие это дело и возвращающие зависшие сообщения в очередь, но это уже начинаются костыли.
В ходе гугления обнаружил выражение select for update skip locked, а через него вышел на то, что у оракла оказывается уже есть очереди (Advanced Queueing) и можно попросту ими воспользоваться. Начал копать.
Первое, что смутило, что при дёргании Dequeue на пустой очереди вываливается Exception, который приходится отлавливать и глушить:
} catch (OracleException ex) {
if (ex.Number == 25228) {
no_messages_in_the_queue = true;
} else {
throw;
}
}
другого способа не нашёл — знает ли кто-нибудь правильный способ?
Далее начал реализовывать такой алгоритм для рассылки:
1. начинаем транзакцию.
2. берём сообщение из очереди в режиме удаления (т.е. взять и удалить).
3. высылаем сообщение в систему-назначения, анализируем ответ.
4. если всё ок, то коммитим транзакцию.
в ходе экспериментов обнаружил неприятную особенность. если транзакцию не закоммитить, то сообщение через пять таких попыток перестанет существовать в очереди. Оно мигрирует в очередь исключений со статусом EXPIRED. Причём у параметра max_retries нет значения INFINITY, можно поставить только условно бесконечное число 2**31 -1. И тут начинается головоломка — стоит ли теперь в приложении предусматривать сценарии на этот счёт или нет.
Вопрос №2 — есть ли какой-нибудь адекватный workaround для проблемы с max_retries?
Вопрос №3 — кто-нибудь реализовывал queue в своих приложениях средствами БД? Каким образом это делали?
P.S. Вероятно есть ещё специализированные хранилища, но они не особо интересны, поскольку надо заморачиваться с лицензиями — не то.