linux, pthreads: отсутствие события чтения на дескрипторе
От: Sorc1  
Дата: 10.11.11 11:09
Оценка:
Здравствуйте!

Тестируя своё многопоточное приложение, наткнулся на проблему отсутствия события чтения на дескрипторе, хотя на самом деле данные в нём есть. Я смог написать тестовую программу, воспроизводяющую проблему. Программа ниже (на 370 строк, сильно уменьшить не получается).

Программа работает так:

Создаём 2 сокетпары.

В первую пару через отдельный поток загоняем разом 4000 чисел (они разом умещаются в буфер). Первая пара предназначена для потоков-воркеров. Потоки-воркеры будут слушать через poll читающий конец пары одновременно, считывая числа, которые данные потоки просто печатают в логе. При этом каждый воркер "капризный": он считывает ровно одно число, и напечатав его, завершается, не желая больше читать. Капризность — важное свойство для данного теста: без неё всё работает.

Вторая пара предназначена для контролирующего число воркеров потока. Когда воркер завершается, он отправляет во вторую сокетпару один байт. Контролирующий поток, получив этот байт, считает, что воркер завершился и анализирует сколько еще непрочитанных чисел осталось. Сравнив это число с текущим числом воркеров, при необходимости увеличивает число воркеров. При этом максимальный лимит воркеров — 50. Если все числа прочитаны, то поток дожмидается, когда все воркеры завершатся и затем тоже завершается.

Проблема в том, что часть чисел не считывается. При этом воркеры висят в poll(). Чтобы показать, что данные на самом деле есть, контролирующий поток через некоторое время сам успешно считывает всё, что осталось и закрывает отправляющий конец пайпа. В этот момент зависшие в poll() воркеры пробуждаются и обнаруживают, что пайп закрыт.

Воспроизводится не очень стабильно, но часто. Проверялось на убунте 11.10 (64бит), 11.04 (64бит), на достаточно старом rhel'е с ядром 2.6.18 (как на 64бит, так и на 32бит).
На FreeBSD 8.2 не воспроизводится (проверял в виртуалбоксе).

Не могу понять, в чём может быть ошибка. Исходная программа использует epoll вместо poll (через библиотеку libevent), поведение от этого не меняется. Я проверял даже на select: без изменений. Читающие концы сокетпар — неблокирующиеся, а записывающие можно делать как неблокирующимися, так и блокирующимися: это не влияет на результат.

#include <sysexits.h>
#include <stdint.h>
#include <inttypes.h>
#include <stdlib.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <stdio.h>
#include <stdarg.h>
#include <unistd.h>
#include <fcntl.h>
#include <poll.h>

struct root_ctx {
    int thr_read_fd;
    int thr_write_fd;

    int ctl_read_fd;
    int ctl_write_fd;

    /* Constants */
    unsigned nvars_total;
    unsigned nthreads_max;
    pthread_attr_t thr_attr;
    int timeout;

    /* Controlled by parent thread */
    unsigned nthreads;
    int ex;

    /* Controlled by multiple threads */
    pthread_mutex_t lck;
    unsigned nvars_recved;
};


static void mtlog(const char *func, int line, int err, const char *fmt, ...) __attribute__((__format__(printf, 4, 5)));

#define LOG(err, fmt, ...)                      \
    mtlog(__PRETTY_FUNCTION__, __LINE__, err, fmt, ##__VA_ARGS__)
#define DBG(fmt, ...) LOG(0, fmt, ##__VA_ARGS__)
#define ERR(fmt, ...) LOG(1, fmt, ##__VA_ARGS__)

static void
mtlog(const char *func, int line, int err, const char *fmt, ...)
{
    struct timeval tv;
    char buf[256];
    char out[512];
    va_list     va;

    gettimeofday(&tv, NULL);

    va_start(va, fmt);
    vsnprintf(buf, sizeof(buf), fmt, va);
    va_end(va);

    snprintf(out, sizeof(out), "%d.%06u:[%d.%"PRIuPTR"]:%s:%d:(%s):%s\n",
             (int)tv.tv_sec, (unsigned)tv.tv_usec,
             (int)getpid(), (uintptr_t)pthread_self(), func, line,
             err ? "err" : "dbg", buf);
    write(STDERR_FILENO, out, strlen(out));
}

static int
make_socket_nonblocking(int fd)
{
    int flags;

    if ((flags = fcntl(fd, F_GETFL, NULL)) == -1) {
        ERR("fcntl(%d, F_GETFL): %s", fd, strerror(errno));
        return (-1);
    }
    if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
        ERR("fcntl(%d, F_SETFL): %s", fd, strerror(errno));
        return (-1);
    }
    return (0);
}

static void *
wrk_thr_proc(void *arg)
{
    struct root_ctx *root = arg;
    struct pollfd pfd;
    uintptr_t var;
    uint8_t byte = 0;
    int rc;

    DBG("Worker started");

    memset(&pfd, 0, sizeof(pfd));
    pfd.fd = root->thr_read_fd;
    pfd.events = POLLIN;
    if ((rc = poll(&pfd, 1, -1)) < 0)
        ERR("poll(): %s", strerror(rc));
    else if (!rc)
        ERR("No events!");
    else {
        if ((rc = recv(root->thr_read_fd, &var, sizeof(var), 0)) < 0)
            ERR("recv(%d): %s", root->thr_read_fd, strerror(errno));
        else if (!rc)
            DBG("Nothing more to read");
        else if (rc != sizeof(var))
            ERR("Incorrect data size received: [%d] vs [%d]",
                rc, (int)sizeof(var));
        else {
            DBG("Got var[%"PRIuPTR"]", var);
            pthread_mutex_lock(&root->lck);
            root->nvars_recved++;
            pthread_mutex_unlock(&root->lck);
        }
    }

    if (send(root->ctl_write_fd, &byte, sizeof(byte), 0) != sizeof(byte))
        ERR("send() error");

    return (NULL);
}

static int
ctl_thr_check_threads(struct root_ctx *root)
{
    pthread_t thr;
    unsigned nvars_recved;
    unsigned nthreads_tocreate;
    unsigned i;
    int rc;

    pthread_mutex_lock(&root->lck);
    nvars_recved = root->nvars_recved;
    pthread_mutex_unlock(&root->lck);

    if (nvars_recved == root->nvars_total) {
        /* Done, wait for all the threads */
        if (root->thr_write_fd >= 0) {
            close(root->thr_write_fd);
            root->thr_write_fd = -1;
        }
        if (root->nthreads)
            return (1);
        else {
            DBG("All the threads done");
            return (0);
        }
    }
    nthreads_tocreate = root->nvars_total - nvars_recved;
    if (nthreads_tocreate > root->nthreads)
        nthreads_tocreate -= root->nthreads;
    else
        nthreads_tocreate = 0;
    if (nthreads_tocreate > root->nthreads_max - root->nthreads)
        nthreads_tocreate = root->nthreads_max - root->nthreads;
    DBG("Will create[%u] threads, nthreads[%u] max[%u] recved[%u] nvars[%u]",
        nthreads_tocreate, root->nthreads, root->nthreads_max,
        nvars_recved, root->nvars_total);

    for (i = 0; i < nthreads_tocreate; i++) {
        root->nthreads++;
        if ((rc = pthread_create(&thr, &root->thr_attr,
                                 wrk_thr_proc, root))) {
            ERR("pthread_create: %s", strerror(rc));
            root->ex = EX_SOFTWARE;
            return (-1);
        }
    }

    return (1);
}

static void *
snd_thr_proc(void *arg)
{
    struct root_ctx *root = arg;
    struct pollfd pfd;
    uintptr_t *vars;
    size_t siz;
    unsigned i, old;
    ssize_t rc;

    DBG("Sender started");
    memset(&pfd, 0, sizeof(pfd));
    pfd.fd = root->thr_write_fd;
    pfd.events = POLLOUT;

    vars = malloc(root->nvars_total * sizeof(*vars));

    for (i = 0; i < root->nvars_total; i++)
        vars[i] = i;

    i = 0;
#if 0
    for (i = 0; i < root->nvars_total; i++) {
        if ((rc = send(root->thr_write_fd, vars + i,
                       sizeof(*vars), 0)) < 0) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                DBG("Cannot send[%d] due to blocking", i);
                break;
            }
            ERR("send error: %s", strerror(errno));
            exit(EX_SOFTWARE);
        }
        if (rc != sizeof(*vars)) {
            ERR("Incorrect sent data size: [%d] vs [%d]",
                (int)rc, (int)sizeof(*vars));
            exit(EX_SOFTWARE);
        }
        DBG("Sent[%u]", i);
    }
#endif
    while (i < root->nvars_total) {
        rc = poll(&pfd, 1, -1);
        if (rc < 0) {
            ERR("poll() error: %s", strerror(errno));
            exit(EX_SOFTWARE);
        }
        if (!rc) {
            ERR("Timeout?");
            exit(EX_SOFTWARE);
        }
        siz = (root->nvars_total - i) * sizeof(*vars);
        if ((rc = send(root->thr_write_fd, vars + i, siz, 0)) < 0) {
            if (errno == EAGAIN || errno == EWOULDBLOCK)
                continue;
            ERR("send error: %s", strerror(errno));
            exit(EX_SOFTWARE);
        }
        if ((rc % sizeof(*vars)) != 0) {
            ERR("Wrong data size[%d]", (int)rc);
            exit(EX_SOFTWARE);
        }
        old = i;
        i += rc / sizeof(*vars);
        DBG("Sent[%u]-[%u]", old, i - 1);
    }
    DBG("All the data sent!");
    free(vars);
    return (NULL);
}

static int
ctl_thr_find_data(struct root_ctx *root)
{
    size_t siz;
    uintptr_t *vars;
    ssize_t rc;

    siz = (root->nvars_total - root->nvars_recved) * sizeof(*vars);
    vars = malloc(siz);
    if ((rc = recv(root->thr_read_fd, vars, siz, 0)) < 0) {
        ERR("recv(): %s", strerror(errno));
        return (0);
    }
    if (rc != siz) {
        ERR("Cannot find all the data: [%d] vs [%u]",
            (int)rc, (unsigned)siz);
        return (0);
    }
    DBG("Found all the data[%u]", (unsigned)(siz / sizeof(*vars)));
    root->nvars_recved += (unsigned)(siz / sizeof(*vars));
    shutdown(root->thr_write_fd, SHUT_WR);
    return (1);
}

static void *
ctl_thr_proc(void *arg)
{
    struct root_ctx *root = arg;
    struct pollfd pfd;
    uint8_t byte;
    int rc;
    int tried = 0;

    memset(&pfd, 0, sizeof(pfd));
    pfd.fd = root->ctl_read_fd;
    pfd.events = POLLIN;

    DBG("Controlling thread started");
    while (ctl_thr_check_threads(root) > 0) {
        rc = poll(&pfd, 1, root->timeout);
        if (rc < 0) {
            ERR("poll() error: %s", strerror(errno));
            root->ex = EX_SOFTWARE;
            return (NULL);
        }
        if (!rc) {
            if (!tried) {
                ERR("Timeout! Will try to read data by ourselves");
                root->ex = EX_SOFTWARE;
                if (!ctl_thr_find_data(root))
                    return (NULL);
                tried = 1;
                continue;
            } else {
                ERR("Timeout anyway");
                return (NULL);
            }
        }
        if (recv(root->ctl_read_fd, &byte, sizeof(byte), 0) != sizeof(byte)) {
            ERR("recv error");
            root->ex = EX_SOFTWARE;
            return (NULL);
        }
        root->nthreads--;
        DBG("nthreads[%u]", root->nthreads);
    }
    return (NULL);
}

int
main(void)
{
    int sockets[2];
    struct root_ctx root;
    int rc;
    pthread_t ctl_thr;
    pthread_t snd_thr;

    memset(&root, 0, sizeof(root));

    root.nvars_total = 4000;
    root.nthreads_max = 50;
    root.timeout = 3 * 1000;
    root.ex = EX_OK;

    if (socketpair(PF_UNIX, SOCK_STREAM, 0, sockets) < 0) {
        ERR("Socketpair error: %s", strerror(errno));
        return (EX_SOFTWARE);
    }
    root.thr_write_fd = sockets[1];
    root.thr_read_fd = sockets[0];

    if (socketpair(PF_UNIX, SOCK_STREAM, 0, sockets) < 0) {
        ERR("Socketpair error: %s", strerror(errno));
        return (EX_SOFTWARE);
    }
    root.ctl_write_fd = sockets[1];
    root.ctl_read_fd = sockets[0];

    if (make_socket_nonblocking(root.thr_read_fd) ||
        make_socket_nonblocking(root.ctl_read_fd))
        return (EX_SOFTWARE);

    if (make_socket_nonblocking(root.thr_write_fd))
        return (EX_SOFTWARE);
    pthread_attr_init(&root.thr_attr);
    pthread_attr_setdetachstate(&root.thr_attr, PTHREAD_CREATE_DETACHED);

    if ((rc = pthread_mutex_init(&root.lck, NULL))) {
        ERR("pthread_mutex_init: %s", strerror(rc));
        return (EX_SOFTWARE);
    }

    if ((rc = pthread_create(&ctl_thr, NULL, ctl_thr_proc, &root))) {
        ERR("pthread_create: %s", strerror(rc));
        return (EX_SOFTWARE);
    }
    if ((rc = pthread_create(&snd_thr, NULL, snd_thr_proc, &root))) {
        ERR("pthread_create: %s", strerror(rc));
        return (EX_SOFTWARE);
    }

    DBG("Waiting for all the threads thread");
    pthread_join(snd_thr, NULL);
    pthread_join(ctl_thr, NULL);

    return (root.ex);
}
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.