1
0
Fork 0
mirror of https://repo.or.cz/socat.git synced 2025-07-13 15:13:23 +00:00

Added options posixmq-maxmsg and posixmq-msgsize

This commit is contained in:
Gerhard Rieger 2024-09-10 20:16:36 +02:00
parent 663a6bb012
commit 25d2f746d9
7 changed files with 79 additions and 10 deletions

View file

@ -25,8 +25,10 @@ const struct addrdesc xioaddr_posixmq_read = { "POSIXMQ-READ", 1+XIO
const struct addrdesc xioaddr_posixmq_receive = { "POSIXMQ-RECEIVE", 1+XIO_RDONLY, xioopen_posixmq, GROUP_FD|GROUP_NAMED|GROUP_POSIXMQ|GROUP_RETRY|GROUP_CHILD, XIO_RDONLY, XIOREAD_RECV_ONESHOT, 0 HELP(":<mqname>") };
const struct addrdesc xioaddr_posixmq_send = { "POSIXMQ-SEND", 1+XIO_WRONLY, xioopen_posixmq, GROUP_FD|GROUP_NAMED|GROUP_POSIXMQ|GROUP_RETRY|GROUP_CHILD, XIO_WRONLY, 0, 0 HELP(":<mqname>") };
const struct optdesc opt_posixmq_priority = { "posixmq-priority", "mq-pri", OPT_POSIXMQ_PRIORITY, GROUP_POSIXMQ, PH_INIT, TYPE_BOOL, OFUNC_OFFSET, XIO_OFFSETOF(para.posixmq.prio), XIO_SIZEOF(para.posixmq.prio), 0 };
const struct optdesc opt_posixmq_priority = { "posixmq-priority", "mq-prio", OPT_POSIXMQ_PRIORITY, GROUP_POSIXMQ, PH_INIT, TYPE_BOOL, OFUNC_OFFSET, XIO_OFFSETOF(para.posixmq.prio), XIO_SIZEOF(para.posixmq.prio), 0 };
const struct optdesc opt_posixmq_flush = { "posixmq-flush", "mq-flush", OPT_POSIXMQ_FLUSH, GROUP_POSIXMQ, PH_EARLY, TYPE_BOOL, OFUNC_SPEC, 0, 0, 0 };
const struct optdesc opt_posixmq_maxmsg = { "posixmq-maxmsg", "mq-maxmsg", OPT_POSIXMQ_MAXMSG, GROUP_POSIXMQ, PH_OPEN, TYPE_LONG, OFUNC_SPEC, 0, 0, 0 };
const struct optdesc opt_posixmq_msgsize = { "posixmq-msgsize", "mq-msgsize", OPT_POSIXMQ_MSGSIZE, GROUP_POSIXMQ, PH_OPEN, TYPE_LONG, OFUNC_SPEC, 0, 0, 0 };
/* _read(): open immediately, stay in transfer loop
_recv(): wait until data (how we know there is??), oneshot, opt.fork
@ -47,11 +49,12 @@ static int xioopen_posixmq(
bool opt_unlink_early = false;
bool nonblock = 0;
bool flush = false;
struct mq_attr attr = { 0 };
bool setopts = false;
int oflag;
bool opt_o_excl = false;
mode_t opt_mode = 0666;
mqd_t mqd;
struct mq_attr attr;
int _errno;
bool dofork = false;
int maxchildren = 0;
@ -100,6 +103,47 @@ static int xioopen_posixmq(
retropt_bool(opts, OPT_O_EXCL, &opt_o_excl);
retropt_mode(opts, OPT_PERM, &opt_mode);
retropt_bool(opts, OPT_POSIXMQ_FLUSH, &flush);
retropt_long(opts, OPT_POSIXMQ_MAXMSG, &attr.mq_maxmsg) ||
(setopts = true);
retropt_long(opts, OPT_POSIXMQ_MSGSIZE, &attr.mq_msgsize) ||
(setopts = true);
/* When only one of mq-maxmsg and mq-msgsize options has been provided,
we must nevertheless set the other option value in strucht mq_attr.
For this we have to find the default, read it from /proc fs */
if (setopts) {
int pfd;
const static char *PROC_MAXMSG = "/proc/sys/fs/mqueue/msg_default";
const static char *PROC_MSGSIZE = "/proc/sys/fs/mqueue/msgsize_default";
char buff[21]; /* fit a 64bit num in decimal */
ssize_t bytes;
if (attr.mq_maxmsg == 0) {
if ((pfd = Open(PROC_MAXMSG, O_RDONLY, 0)) < 0) {
Warn2("open(\"%s\", O_RDONLY, 0): %s", PROC_MAXMSG, strerror(errno));
} else if ((bytes = Read(pfd, buff, sizeof(buff)-1)) < 0) {
Warn4("read(%d /* \"%s\" */, buff, "F_Zd"): %s",
pfd, PROC_MAXMSG, sizeof(buff)-1, strerror (errno));
Close(pfd);
} else {
sscanf(buff, "%ld", &attr.mq_maxmsg);
Close(pfd);
}
}
if (attr.mq_msgsize == 0) {
if ((pfd = Open(PROC_MSGSIZE, O_RDONLY, 0)) < 0) {
Warn2("open(\"%s\", O_RDONLY, 0): %s", PROC_MSGSIZE, strerror(errno));
} else if ((bytes = Read(pfd, buff, sizeof(buff)-1)) < 0) {
Warn4("read(%d /* \"%s\" */, buff, "F_Zd"): %s",
pfd, PROC_MSGSIZE, sizeof(buff)-1, strerror (errno));
Close(pfd);
} else {
sscanf(buff, "%ld", &attr.mq_msgsize);
Close(pfd);
}
}
}
retropt_bool(opts, OPT_UNLINK_EARLY, &opt_unlink_early);
if (opt_unlink_early) {
@ -126,12 +170,18 @@ static int xioopen_posixmq(
}
/* Now open the message queue */
Debug3("mq_open(\"%s\", %d, "F_mode", NULL)", name, oflag, opt_mode);
mqd = mq_open(name, oflag, opt_mode, NULL);
if (setopts)
Debug8("%s: mq_open(\"%s\", "F_mode", "F_mode", {flags=%ld, maxmsg=%ld, msgsize=%ld, curmsgs=%ld} )", argv[0], name, oflag, opt_mode, attr.mq_flags, attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);
else
Debug4("%s: mq_open(\"%s\", "F_mode", "F_mode", NULL)", argv[0], name, oflag, opt_mode);
mqd = mq_open(name, oflag, opt_mode, setopts ? &attr : NULL);
_errno = errno;
Debug1("mq_open() -> %d", mqd);
if (mqd < 0) {
Error3("%s: mq_open(\"%s\"): %s", argv[0], name, strerror(errno));
if (setopts)
Error9("%s: mq_open(\"%s\", "F_mode", "F_mode", {flags=%ld, maxmsg=%ld, msgsize=%ld, curmsgs=%ld} ): %s", argv[0], name, oflag, opt_mode, attr.mq_flags, attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs, strerror(errno));
else
Error5("%s: mq_open(\"%s\", "F_mode", "F_mode", NULL): %s", argv[0], name, oflag, opt_mode, strerror(errno));
errno = _errno;
return STAT_RETRYLATER;
}