1
0
Fork 0
mirror of https://repo.or.cz/socat.git synced 2025-07-18 00:53:25 +00:00

Added option posixmq-flush

This commit is contained in:
Gerhard Rieger 2024-09-06 13:09:44 +02:00
parent 9bf5fc625c
commit 663a6bb012
7 changed files with 138 additions and 69 deletions

View file

@ -15,9 +15,8 @@
#if WITH_POSIXMQ
static int _posixmq_unlink(
const char *name,
int level); /* message level on error */
static int _posixmq_flush(struct single *sfd);
static int _posixmq_unlink(const char *name, int level);
static int xioopen_posixmq(int argc, const char *argv[], struct opt *opts, int xioflags, xiofile_t *xfd, const struct addrdesc *addrdesc);
@ -27,6 +26,7 @@ const struct addrdesc xioaddr_posixmq_receive = { "POSIXMQ-RECEIVE", 1+XIO
const struct addrdesc xioaddr_posixmq_send = { "POSIXMQ-SEND", 1+XIO_WRONLY, xioopen_posixmq, GROUP_FD|GROUP_NAMED|GROUP_POSIXMQ|GROUP_RETRY|GROUP_CHILD, XIO_WRONLY, 0, 0 HELP(":<mqname>") };
const struct optdesc opt_posixmq_priority = { "posixmq-priority", "mq-pri", OPT_POSIXMQ_PRIORITY, GROUP_POSIXMQ, PH_INIT, TYPE_BOOL, OFUNC_OFFSET, XIO_OFFSETOF(para.posixmq.prio), XIO_SIZEOF(para.posixmq.prio), 0 };
const struct optdesc opt_posixmq_flush = { "posixmq-flush", "mq-flush", OPT_POSIXMQ_FLUSH, GROUP_POSIXMQ, PH_EARLY, TYPE_BOOL, OFUNC_SPEC, 0, 0, 0 };
/* _read(): open immediately, stay in transfer loop
_recv(): wait until data (how we know there is??), oneshot, opt.fork
@ -45,11 +45,13 @@ static int xioopen_posixmq(
int dirs = addrdesc->arg1;
int oneshot = addrdesc->arg2;
bool opt_unlink_early = false;
bool nonblock;
bool nonblock = 0;
bool flush = false;
int oflag;
bool opt_o_excl = false;
mode_t opt_mode = 0666;
mqd_t mqd;
struct mq_attr attr;
int _errno;
bool dofork = false;
int maxchildren = 0;
@ -97,6 +99,7 @@ static int xioopen_posixmq(
retropt_bool(opts, OPT_O_EXCL, &opt_o_excl);
retropt_mode(opts, OPT_PERM, &opt_mode);
retropt_bool(opts, OPT_POSIXMQ_FLUSH, &flush);
retropt_bool(opts, OPT_UNLINK_EARLY, &opt_unlink_early);
if (opt_unlink_early) {
@ -117,6 +120,11 @@ static int xioopen_posixmq(
if (retropt_bool(opts, OPT_O_NONBLOCK, &nonblock) >= 0 && nonblock)
oflag |= O_NONBLOCK;
if (flush) {
if (_posixmq_flush(sfd) != STAT_OK)
return STAT_NORETRY;
}
/* Now open the message queue */
Debug3("mq_open(\"%s\", %d, "F_mode", NULL)", name, oflag, opt_mode);
mqd = mq_open(name, oflag, opt_mode, NULL);
@ -129,6 +137,16 @@ static int xioopen_posixmq(
}
sfd->fd = mqd;
Debug1("mq_getattr(%d, ...)", mqd);
if (mq_getattr(mqd, &attr) < 0) {
Warn4("mq_getattr(%d[\"%s\"], %p): %s",
mqd, sfd->para.posixmq.name, &attr, strerror(errno));
mq_close(mqd);
return STAT_NORETRY;
}
Info5("POSIXMQ queue \"%s\": flags=%ld, maxmsg=%ld, msgsize=%ld, curmsgs=%ld",
name, attr.mq_flags, attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);
if (!dofork && !oneshot) {
return STAT_OK;
}
@ -236,6 +254,81 @@ static int xioopen_posixmq(
}
/* With option flush try to open the queue and "consume" its current contents */
static int _posixmq_flush(
struct single *sfd)
{
mqd_t mqd;
struct mq_attr attr;
void *buff;
size_t bufsiz;
int _errno;
int p = 0; /* number of messages flushed */
size_t b = 0; /* number of bytes flushed */
Info1("flushing POSIXMQ queue \"%s\"", sfd->para.posixmq.name);
Debug1("mq_open(\"%s\", O_RDONLY|O_NONBLOCK, 0, NULL)",
sfd->para.posixmq.name);
mqd = mq_open(sfd->para.posixmq.name, O_RDONLY|O_NONBLOCK, 0, NULL);
_errno = errno;
Debug1("mq_open() -> %d", mqd);
if (mqd < 0 && _errno == ENOENT) {
Info("this queue does not exist, no need to flush it");
return STAT_OK;
}
if (mqd < 0) {
Warn2("mq_open(\"%s\", ...): %s", sfd->para.posixmq.name,
strerror(_errno));
return STAT_NORETRY;
}
Debug1("mq_getattr(%d, ...)", mqd);
if (mq_getattr(mqd, &attr) < 0) {
Warn4("mq_getattr(%d[\"%s\"], %p): %s",
mqd, sfd->para.posixmq.name, &attr, strerror(errno));
mq_close(mqd);
return STAT_NORETRY;
}
if (attr.mq_curmsgs == 0) {
Info1("POSIXMQ \"%s\" is empty", sfd->para.posixmq.name);
mq_close(mqd);
return STAT_OK;
}
bufsiz = attr.mq_msgsize;
if ((buff = Malloc(bufsiz)) == NULL) {
mq_close(mqd);
return STAT_RETRYLATER;
}
/* Now read all messages to null */
while (true) {
ssize_t bytes;
Debug3("mq_receive(mqd=%d, %p, "F_Zu", {} )", mqd, buff, bufsiz);
bytes = mq_receive(mqd, buff, bufsiz, &sfd->para.posixmq.prio);
_errno = errno;
Debug1("mq_receive() -> "F_Zd, bytes);
errno = _errno;
if (bytes == 0 || (bytes < 0 && _errno == EAGAIN)) {
break;
}
if (bytes < 0) {
Warn2("flushing POSIXMQ \"%s\" failed: %s",
sfd->para.posixmq.name, strerror(_errno));
free(buff);
mq_close(mqd);
return STAT_NORETRY;
}
++p;
b += bytes;
}
Info3("flushed "F_Zu" bytes in %u packets from queue \"%s\"", b, p,
sfd->para.posixmq.name);
free(buff);
mq_close(mqd);
return STAT_OK;
}
ssize_t xiowrite_posixmq(
struct single *sfd,
const void *buff,
@ -288,6 +381,9 @@ ssize_t xioclose_posixmq(
struct single *sfd)
{
int res;
if (sfd->fd < 0)
return 0;
Debug1("xioclose_posixmq(): mq_close(%d)", sfd->fd);
res = mq_close(sfd->fd);
if (res < 0) {