/* Source: xio-posixmq.c */ /* Copyright Gerhard Rieger and contributors (see file CHANGES) */ /* Published under the GNU General Public License V.2, see file COPYING */ /* This file contains the source for opening addresses of POSIX MQ type */ #include "xiosysincludes.h" #include "xioopen.h" #include "xio-socket.h" #include "xio-listen.h" #include "xio-posixmq.h" #include "xio-named.h" #if WITH_POSIXMQ static int _posixmq_unlink( const char *name, int level); /* message level on error */ static int xioopen_posixmq(int argc, const char *argv[], struct opt *opts, int xioflags, xiofile_t *xfd, groups_t groups, int dirs, int dummy, int dummy3); const struct addrdesc xioaddr_posixmq_bidir = { "POSIXMQ-BIDIRECTIONAL", 1+XIO_RDWR, xioopen_posixmq, GROUP_FD|GROUP_NAMED|GROUP_POSIXMQ|GROUP_RETRY, XIO_RDWR, 0, 0 HELP(":") }; const struct addrdesc xioaddr_posixmq_read = { "POSIXMQ-READ", 1+XIO_RDONLY, xioopen_posixmq, GROUP_FD|GROUP_NAMED|GROUP_POSIXMQ|GROUP_RETRY, XIO_RDONLY, 0, 0 HELP(":") }; const struct addrdesc xioaddr_posixmq_receive = { "POSIXMQ-RECEIVE", 1+XIO_RDONLY, xioopen_posixmq, GROUP_FD|GROUP_NAMED|GROUP_POSIXMQ|GROUP_RETRY|GROUP_CHILD, XIO_RDONLY, XIOREAD_RECV_ONESHOT, 0 HELP(":") }; const struct addrdesc xioaddr_posixmq_send = { "POSIXMQ-SEND", 1+XIO_WRONLY, xioopen_posixmq, GROUP_FD|GROUP_NAMED|GROUP_POSIXMQ|GROUP_RETRY|GROUP_CHILD, XIO_WRONLY, 0, 0 HELP(":") }; const struct optdesc opt_posixmq_priority = { "posixmq-priority", "mq-pri", OPT_POSIXMQ_PRIORITY, GROUP_POSIXMQ, PH_INIT, TYPE_BOOL, OFUNC_OFFSET, XIO_OFFSETOF(para.posixmq.prio), XIO_SIZEOF(para.posixmq.prio), 0 }; /* _read(): open immediately, stay in transfer loop _recv(): wait until data (how we know there is??), oneshot, opt.fork */ static int xioopen_posixmq( int argc, const char *argv[], struct opt *opts, int xioflags, xiofile_t *xfd, groups_t groups, int dirs, int oneshot, int dummy3) { /* We expect the form: /mqname */ xiosingle_t *sfd = &xfd->stream; const char *name; bool opt_unlink_early = false; int oflag; bool opt_o_excl = false; mode_t opt_mode = 0666; mqd_t mqd; int _errno; bool dofork = false; int maxchildren = 0; bool with_intv = false; int result = 0; if (!xioparms.experimental) { Error1("%s: use option --experimental to acknowledge unmature state", argv[0]); } if (argc != 2) { Error2("%s: wrong number of parameters (%d instead of 1)", argv[0], argc-1); return STAT_NORETRY; } name = argv[1]; retropt_bool(opts, OPT_FORK, &dofork); if (dofork) { if (!(xioflags & XIO_MAYFORK)) { Error1("%s: option fork not allowed in this context", argv[0]); return STAT_NORETRY; } sfd->flags |= XIO_DOESFORK; if (dirs == XIO_WRONLY) { with_intv = true; } } retropt_int(opts, OPT_MAX_CHILDREN, &maxchildren); if (! dofork && maxchildren) { Error("option max-children not allowed without option fork"); return STAT_NORETRY; } if (maxchildren) { xiosetchilddied(); /* set SIGCHLD handler */ } applyopts_offset(sfd, opts); if (applyopts_single(sfd, opts, PH_INIT) < 0) return STAT_NORETRY; applyopts(-1, opts, PH_INIT); if ((sfd->para.posixmq.name = strdup(name)) == NULL) { Error1("strdup(\"%s\"): out of memory", name); } retropt_bool(opts, OPT_O_EXCL, &opt_o_excl); retropt_mode(opts, OPT_PERM, &opt_mode); retropt_bool(opts, OPT_UNLINK_EARLY, &opt_unlink_early); if (opt_unlink_early) { _posixmq_unlink(sfd->para.posixmq.name, E_INFO); } retropt_bool(opts, OPT_UNLINK_CLOSE, &sfd->opt_unlink_close); sfd->howtoend = END_CLOSE; sfd->dtype = XIODATA_POSIXMQ | oneshot; oflag = O_CREAT; if (opt_o_excl) oflag |= O_EXCL; switch (dirs) { case XIO_RDWR: oflag |= O_RDWR; break; case XIO_RDONLY: oflag |= O_RDONLY; break; case XIO_WRONLY: oflag |= O_WRONLY; break; } /* Now open the message queue */ Debug3("mq_open(\"%s\", %d, "F_mode", NULL)", name, oflag, opt_mode); mqd = mq_open(name, oflag, opt_mode, NULL); _errno = errno; Debug1("mq_open() -> %d", mqd); if (mqd < 0) { Error3("%s: mq_open(\"%s\"): %s", argv[0], name, strerror(errno)); errno = _errno; return STAT_RETRYLATER; } sfd->fd = mqd; if (!dofork && !oneshot) { return STAT_OK; } /* Continue with modes that open only when data available */ if (!oneshot) { if (xioparms.logopt == 'm') { Info("starting POSIX-MQ fork loop, switching to syslog"); diag_set('y', xioparms.syslogfac); xioparms.logopt = 'y'; } else { Info("starting POSIX-MQ fork loop"); } } /* Wait until a message is available (or until interval has expired), then fork a sub process that handles this single message. Here we continue waiting for more. The trigger mechanism is described with function _xioopen_dgram_recvfrom() */ while (true) { int trigger[2]; pid_t pid; /* mostly int; only used with fork */ sigset_t mask_sigchld; Info1("%s: waiting for data or interval", argv[0]); do { struct pollfd pollfd; pollfd.fd = sfd->fd; pollfd.events = (dirs==XIO_RDONLY?POLLIN:POLLOUT); if (xiopoll(&pollfd, 1, NULL) > 0) { break; } if (errno == EINTR) { continue; } Warn2("poll({%d,,},,-1): %s", sfd->fd, strerror(errno)); Sleep(1); } while (true); if (!dofork) return STAT_OK; Info("generating pipe that triggers parent when packet has been consumed"); if (dirs == XIO_RDONLY) { if (Pipe(trigger) < 0) { Error1("pipe(): %s", strerror(errno)); } } /* Block SIGCHLD until parent is ready to react */ sigemptyset(&mask_sigchld); sigaddset(&mask_sigchld, SIGCHLD); Sigprocmask(SIG_BLOCK, &mask_sigchld, NULL); if ((pid = xio_fork(false, E_ERROR, xfd->stream.shutup)) < 0) { Sigprocmask(SIG_UNBLOCK, &mask_sigchld, NULL); if (dirs==XIO_RDONLY) { Close(trigger[0]); Close(trigger[1]); } xioclose_posixmq(sfd); return STAT_RETRYLATER; } if (pid == 0) { /* child */ pid_t cpid = Getpid(); Sigprocmask(SIG_UNBLOCK, &mask_sigchld, NULL); xiosetenvulong("PID", cpid, 1); if (dirs == XIO_RDONLY) { Close(trigger[0]); Fcntl_l(trigger[1], F_SETFD, FD_CLOEXEC); sfd->triggerfd = trigger[1]; } break; } /* Parent */ if (dirs == XIO_RDONLY) { char buf[1]; Close(trigger[1]); while (Read(trigger[0], buf, 1) < 0 && errno == EINTR) ; } if (with_intv) { Nanosleep(&sfd->intervall, NULL); } /* now we are ready to handle signals */ Sigprocmask(SIG_UNBLOCK, &mask_sigchld, NULL); while (maxchildren) { if (num_child < maxchildren) break; Notice1("max of %d children is active, waiting", num_child); while (!Sleep(UINT_MAX)) ; /* any signal lets us continue */ } Info("continue listening"); } _xio_openlate(sfd, opts); return result; } ssize_t xiowrite_posixmq( struct single *sfd, const void *buff, size_t bufsiz) { int res; int _errno; Debug4("mq_send(mqd=%d, %p, "F_Zu", %u)", sfd->fd, buff, bufsiz, sfd->para.posixmq.prio); res = mq_send(sfd->fd, buff, bufsiz, sfd->para.posixmq.prio); _errno = errno; Debug1("mq_send() -> %d", res); errno = _errno; if (res < 0) { Error2("mq_send(mqd=%d): %s", sfd->fd, strerror(errno)); return -1; } return bufsiz; /* success */ } ssize_t xioread_posixmq( struct single *sfd, void *buff, size_t bufsiz) { ssize_t res; int _errno; Debug3("mq_receive(mqd=%d, %p, "F_Zu", {} )", sfd->fd, buff, bufsiz); res = mq_receive(sfd->fd, buff, bufsiz, &sfd->para.posixmq.prio); _errno = errno; Debug1("mq_receive() -> "F_Zd, res); errno = _errno; if (res < 0) { Error2("mq_receive(mqd=%d): %s", sfd->fd, strerror(errno)); return -1; } if (sfd->triggerfd > 0) { Close(sfd->triggerfd); sfd->triggerfd = -1; } Info1("mq_receive() -> {prio=%u}", sfd->para.posixmq.prio); xiosetenvulong("POSIXMQ_PRIO", (unsigned long)sfd->para.posixmq.prio, 1); return res; } ssize_t xiopending_posixmq(struct single *sfd); ssize_t xioclose_posixmq( struct single *sfd) { int res; Debug1("xioclose_posixmq(): mq_close(%d)", sfd->fd); res = mq_close(sfd->fd); if (res < 0) { Warn2("xioclose_posixmq(): mq_close(%d) -> -1: %s", sfd->fd, strerror(errno)); } else { Debug("xioclose_posixmq(): mq_close() -> 0"); } if (sfd->opt_unlink_close) { _posixmq_unlink(sfd->para.posixmq.name, E_WARN); } free((void *)sfd->para.posixmq.name); return 0; } static int _posixmq_unlink( const char *name, int level) /* message level on error */ { int _errno; int res; Debug1("mq_unlink(\"%s\")", name); res = mq_unlink(name); _errno = errno; Debug1("mq_unlink() -> %d", res); errno = _errno; if (res < 0) { Msg2(level, "mq_unlink(\"%s\"): %s",name, strerror(errno)); } return res; } #endif /* WITH_POSIXMQ */