diff --git a/CHANGES b/CHANGES index aeeb398..225a8cd 100644 --- a/CHANGES +++ b/CHANGES @@ -45,6 +45,10 @@ Features: POSIXMQ-RECV now takes option o-nonblock; this, in combination with -T, makes it possible to terminate Socat in case the queue is empty. + New option posixmq-flush (mq-flush) for POSIXMQ addresses empty the + queue before starting to transfer data. + Test: LINUX_POSIXMQ_FLUSH + Building: Disabling certain features during configure could break build process. diff --git a/doc/socat.yo b/doc/socat.yo index 30c2bb0..a478fbb 100644 --- a/doc/socat.yo +++ b/doc/socat.yo @@ -3492,6 +3492,9 @@ startdit() label(OPTION_POSIXMQ_PRIORITY)dit(bf(tt(posixmq-priority (mq-prio)))) Sets the priority of messages (packets) written to the queue, or the minimal priority of packet read from the queue. +label(OPTION_POSIXMQ_FLUSH)dit(bf(tt(posixmq-flush (mq-flush)))) + "Consumes" (drops) all messages currently in the queue before starting + transfers. enddit() diff --git a/test.sh b/test.sh index c16a839..7dcb0da 100755 --- a/test.sh +++ b/test.sh @@ -16683,28 +16683,18 @@ esac N=$((N+1)) - # Test the POSIX MQ feature with continuous READ and priorization on Linux -NAME=LINUX_POSIXMQ_READ_PRIO +NAME=POSIXMQ_READ_PRIO case "$TESTS" in *%$N%*|*%functions%*|*%socket%*|*%posixmq%*|*%$NAME%*) -TEST="$NAME: POSIX-MQ (Linux) with prio" +TEST="$NAME: POSIX-MQ with prio" # Run a client/sender that creates a POSIX-MQ and sends a normal message and # then a client/sender with a higher priority message. # Run a passive/listening/receiving/reading process and check if it receives # both messages and in the prioritized order if ! eval $NUMCOND; then :; -elif [ "$UNAME" != Linux ]; then - $PRINTF "test $F_n $TEST... ${YELLOW}Only on Linux${NORMAL}\n" $N - cant -elif ! F=$(testfeats POSIXMQ STDIO); then - $PRINTF "test $F_n $TEST... ${YELLOW}Feature $F not available in $SOCAT${NORMAL}\n" $N - cant -elif ! A=$(testaddrs POSIXMQ-SEND POSIXMQ-READ STDIO); then - $PRINTF "test $F_n $TEST... ${YELLOW}Address $A not available in $SOCAT${NORMAL}\n" $N - cant -elif ! o=$(testoptions mq-prio unlink-early unlink-close) >/dev/null; then - $PRINTF "test $F_n $TEST... ${YELLOW}Option $o not available in $SOCAT${NORMAL}\n" $N +elif ! cond=$(checkconds "" "" "" "POSIXMQ STDIO" "POSIXMQ-SEND POSIXMQ-READ STDIO" "mq-prio unlink-early unlink-close"); then + $PRINTF "test $F_n $TEST... ${YELLOW}$cond${NORMAL}\n" $N cant else tf="$td/test$N.stdout" @@ -16725,7 +16715,7 @@ pid1=$! relsleep 1 kill $pid1; wait if [ $rc0a -ne 0 -o $rc0b -ne 0 ]; then - $PRINTF "$FAILED\n" + $PRINTF "$FAILED (rc0a=$rc0a, rc0b=$rc0b)\n" echo "$CMD0a" cat "${te}0a" >&2 echo "$CMD0b" @@ -16743,7 +16733,7 @@ elif $ECHO "$da 1\n$da 0" |diff - ${tf}1 >${tdiff}1; then if [ "$DEBUG" ]; then cat "${te}1" >&2; fi ok else - $PRINTF "$FAILED\n" + $PRINTF "$FAILED (diff)\n" echo "$CMD0a" cat "${te}0a" >&2 echo "$CMD0b" @@ -16759,27 +16749,18 @@ fi # NUMCOND esac N=$((N+1)) -# Test the POSIX MQ feature with RECV,fork on Linux -NAME=LINUX_POSIXMQ_RECV_FORK +# Test the POSIX MQ feature with RECV,fork +NAME=POSIXMQ_RECV_FORK case "$TESTS" in *%$N%*|*%functions%*|*%fork%*|*%socket%*|*%posixmq%*|*%$NAME%*) -TEST="$NAME: POSIX-MQ (Linux) RECV with fork" +TEST="$NAME: POSIX-MQ RECV with fork" # Start a POSIX-MQ receiver with fork that creates a POSIX-MQ and stores its # output. # Run two clients/senders each with a message. # Check if both messages are stored. if ! eval $NUMCOND; then :; -elif [ "$UNAME" != Linux ]; then - $PRINTF "test $F_n $TEST... ${YELLOW}Only on Linux${NORMAL}\n" $N - cant -elif ! F=$(testfeats POSIXMQ STDIO); then - $PRINTF "test $F_n $TEST... ${YELLOW}Feature $F not available in $SOCAT${NORMAL}\n" $N - cant -elif ! A=$(testaddrs POSIXMQ-SEND POSIXMQ-RECEIVE STDIO); then - $PRINTF "test $F_n $TEST... ${YELLOW}Address $A not available in $SOCAT${NORMAL}\n" $N - cant -elif ! o=$(testoptions fork unlink-early unlink-close) >/dev/null; then - $PRINTF "test $F_n $TEST... ${YELLOW}Option $o not available in $SOCAT${NORMAL}\n" $N +elif ! cond=$(checkconds "" "" "" "POSIXMQ STDIO" "POSIXMQ-SEND POSIXMQ-READ STDIO" "fork unlink-early unlink-close"); then + $PRINTF "test $F_n $TEST... ${YELLOW}$cond${NORMAL}\n" $N cant else tf="$td/test$N.stdout" @@ -16801,7 +16782,7 @@ rc1b=$? relsleep 1 kill $pid0; wait if [ $rc1a -ne 0 -o $rc1b -ne 0 ]; then - $PRINTF "$FAILED\n" + $PRINTF "$FAILED (rc1a=$rc1a, rc1b=$rc1b)\n" echo "$CMD0" cat "${te}0" >&2 echo "$CMD1a" @@ -16819,7 +16800,7 @@ elif $ECHO "$da 0\n$da 1" |diff - ${tf}0 >${tdiff}0; then if [ "$DEBUG" ]; then cat "${te}1b" >&2; fi ok else - $PRINTF "$FAILED\n" + $PRINTF "$FAILED (diff)\n" echo "$CMD0" cat "${te}0" >&2 echo "$CMD1a" @@ -16835,11 +16816,11 @@ fi # NUMCOND esac N=$((N+1)) -# Test the POSIX MQ feature with RECV,fork,max-children on Linux -NAME=LINUX_POSIXMQ_RECV_MAXCHILDREN +# Test the POSIX MQ feature with RECV,fork,max-children +NAME=POSIXMQ_RECV_MAXCHILDREN case "$TESTS" in *%$N%*|*%functions%*|*%fork%*|*%maxchildren%*|*%socket%*|*%posixmq%*|*%$NAME%*) -TEST="$NAME: POSIX-MQ (Linux) RECV with fork,max-children" +TEST="$NAME: POSIX-MQ RECV with fork,max-children" # Start a POSIX-MQ receiver with fork that creates a POSIX-MQ and stores its # output via sub processes that sleeps after writing. # Run a client/sender that sends message 1; @@ -16847,17 +16828,8 @@ TEST="$NAME: POSIX-MQ (Linux) RECV with fork,max-children" # write message 2 directly into output file; # Check if the messages are stored in order of their numbers if ! eval $NUMCOND; then :; -elif [ "$UNAME" != Linux ]; then - $PRINTF "test $F_n $TEST... ${YELLOW}Only on Linux${NORMAL}\n" $N - cant -elif ! F=$(testfeats POSIXMQ STDIO SYSTEM); then - $PRINTF "test $F_n $TEST... ${YELLOW}Feature $F not available in $SOCAT${NORMAL}\n" $N - cant -elif ! A=$(testaddrs POSIXMQ-SEND POSIXMQ-RECEIVE STDIO SYSTEM); then - $PRINTF "test $F_n $TEST... ${YELLOW}Address $A not available in $SOCAT${NORMAL}\n" $N - cant -elif ! o=$(testoptions fork max-children unlink-early unlink-close) >/dev/null; then - $PRINTF "test $F_n $TEST... ${YELLOW}Option $o not available in $SOCAT${NORMAL}\n" $N +elif ! cond=$(checkconds "" "" "" "POSIXMQ STDIO SYSTEM" "POSIXMQ-SEND POSIXMQ-RECEIVE STDIO SYSTEM" "fork max-children unlink-early unlink-close"); then + $PRINTF "test $F_n $TEST... ${YELLOW}$cond${NORMAL}\n" $N cant else tf="$td/test$N.stdout" @@ -16882,7 +16854,7 @@ sleep 1 # as in SYSTEM kill $(childpids $pid0) $pid0 2>/dev/null wait 2>/dev/null if [ $rc1a -ne 0 -o $rc1b -ne 0 ]; then - $PRINTF "$FAILED\n" + $PRINTF "$FAILED (rc1a=$rc1a, rc1b=$rc1b)\n" echo "$CMD0" cat "${te}0" >&2 echo "$CMD1a" @@ -16900,7 +16872,7 @@ elif $ECHO "$da 1\n$da 2\n$da 3" |diff - ${tf}0 >${tdiff}0; then if [ "$DEBUG" ]; then cat "${te}1b" >&2; fi ok else - $PRINTF "$FAILED\n" + $PRINTF "$FAILED (diff)\n" echo "$CMD0" cat "${te}0" >&2 echo "$CMD1a" @@ -16916,11 +16888,11 @@ fi # NUMCOND esac N=$((N+1)) -# Test the POSIX MQ feature with SEND,fork,max-children on Linux -NAME=LINUX_POSIXMQ_SEND_MAXCHILDREN +# Test the POSIX MQ feature with SEND,fork,max-children +NAME=POSIXMQ_SEND_MAXCHILDREN case "$TESTS" in *%$N%*|*%functions%*|*%fork%*|*%maxchildren%*|*%socket%*|*%posixmq%*|*%$NAME%*) -TEST="$NAME: POSIX-MQ (Linux) SEND with fork,max-children" +TEST="$NAME: POSIX-MQ SEND with fork,max-children" # Start a POSIX-MQ receiver that creates a POSIX-MQ and transfers data from # there to an output file # Run a POSIX-MQ sender that two times forks and invokes a data generator @@ -16930,21 +16902,9 @@ TEST="$NAME: POSIX-MQ (Linux) SEND with fork,max-children" # Check if the messages are stored in order of their numbers. # The data generator is implemented as a receiver from an MQ with "1", "3" if ! eval $NUMCOND; then :; -elif [ "$UNAME" != Linux ]; then - $PRINTF "test $F_n $TEST... ${YELLOW}Only on Linux${NORMAL}\n" $N +elif ! cond=$(checkconds "" "" "" "POSIXMQ STDIO SYSTEM" "POSIXMQ-SEND POSIXMQ-READ STDIO SYSTEM" "fork max-children mq-prio unlink-early unlink-close"); then + $PRINTF "test $F_n $TEST... ${YELLOW}$cond${NORMAL}\n" $N cant -elif ! F=$(testfeats POSIXMQ STDIO SYSTEM); then - $PRINTF "test $F_n $TEST... ${YELLOW}Feature $F not available in $SOCAT${NORMAL}\n" $N - cant -elif ! A=$(testaddrs POSIXMQ-SEND POSIXMQ-READ STDIO SYSTEM); then - $PRINTF "test $F_n $TEST... ${YELLOW}Address $A not available in $SOCAT${NORMAL}\n" $N - cant -elif ! o=$(testoptions fork max-children mq-prio unlink-early unlink-close) >/dev/null; then - $PRINTF "test $F_n $TEST... ${YELLOW}Option $o not available in $SOCAT${NORMAL}\n" $N - cant -#elif ! runsposixmq >/dev/null; then -# $PRINTF "test $F_n $TEST... ${YELLOW}IPv4 not available${NORMAL}\n" $N -# cant else tf="$td/test$N.stdout" te="$td/test$N.stderr" @@ -19252,6 +19212,8 @@ DCCP-CONNECT dccp4 PORT #ROXY::127.0.0.1 tcp4 PORT " +# Above tests introduced before or with 1.8.0.1 +# Below test introduced with 1.8.0.2 # Test the readline.sh file overwrite vulnerability NAME=READLINE_SH_OVERWRITE diff --git a/xio-posixmq.c b/xio-posixmq.c index f80b303..5168781 100644 --- a/xio-posixmq.c +++ b/xio-posixmq.c @@ -15,9 +15,8 @@ #if WITH_POSIXMQ -static int _posixmq_unlink( - const char *name, - int level); /* message level on error */ +static int _posixmq_flush(struct single *sfd); +static int _posixmq_unlink(const char *name, int level); static int xioopen_posixmq(int argc, const char *argv[], struct opt *opts, int xioflags, xiofile_t *xfd, const struct addrdesc *addrdesc); @@ -27,6 +26,7 @@ const struct addrdesc xioaddr_posixmq_receive = { "POSIXMQ-RECEIVE", 1+XIO const struct addrdesc xioaddr_posixmq_send = { "POSIXMQ-SEND", 1+XIO_WRONLY, xioopen_posixmq, GROUP_FD|GROUP_NAMED|GROUP_POSIXMQ|GROUP_RETRY|GROUP_CHILD, XIO_WRONLY, 0, 0 HELP(":<mqname>") }; const struct optdesc opt_posixmq_priority = { "posixmq-priority", "mq-pri", OPT_POSIXMQ_PRIORITY, GROUP_POSIXMQ, PH_INIT, TYPE_BOOL, OFUNC_OFFSET, XIO_OFFSETOF(para.posixmq.prio), XIO_SIZEOF(para.posixmq.prio), 0 }; +const struct optdesc opt_posixmq_flush = { "posixmq-flush", "mq-flush", OPT_POSIXMQ_FLUSH, GROUP_POSIXMQ, PH_EARLY, TYPE_BOOL, OFUNC_SPEC, 0, 0, 0 }; /* _read(): open immediately, stay in transfer loop _recv(): wait until data (how we know there is??), oneshot, opt.fork @@ -45,11 +45,13 @@ static int xioopen_posixmq( int dirs = addrdesc->arg1; int oneshot = addrdesc->arg2; bool opt_unlink_early = false; - bool nonblock; + bool nonblock = 0; + bool flush = false; int oflag; bool opt_o_excl = false; mode_t opt_mode = 0666; mqd_t mqd; + struct mq_attr attr; int _errno; bool dofork = false; int maxchildren = 0; @@ -97,6 +99,7 @@ static int xioopen_posixmq( retropt_bool(opts, OPT_O_EXCL, &opt_o_excl); retropt_mode(opts, OPT_PERM, &opt_mode); + retropt_bool(opts, OPT_POSIXMQ_FLUSH, &flush); retropt_bool(opts, OPT_UNLINK_EARLY, &opt_unlink_early); if (opt_unlink_early) { @@ -117,6 +120,11 @@ static int xioopen_posixmq( if (retropt_bool(opts, OPT_O_NONBLOCK, &nonblock) >= 0 && nonblock) oflag |= O_NONBLOCK; + if (flush) { + if (_posixmq_flush(sfd) != STAT_OK) + return STAT_NORETRY; + } + /* Now open the message queue */ Debug3("mq_open(\"%s\", %d, "F_mode", NULL)", name, oflag, opt_mode); mqd = mq_open(name, oflag, opt_mode, NULL); @@ -129,6 +137,16 @@ static int xioopen_posixmq( } sfd->fd = mqd; + Debug1("mq_getattr(%d, ...)", mqd); + if (mq_getattr(mqd, &attr) < 0) { + Warn4("mq_getattr(%d[\"%s\"], %p): %s", + mqd, sfd->para.posixmq.name, &attr, strerror(errno)); + mq_close(mqd); + return STAT_NORETRY; + } + Info5("POSIXMQ queue \"%s\": flags=%ld, maxmsg=%ld, msgsize=%ld, curmsgs=%ld", + name, attr.mq_flags, attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs); + if (!dofork && !oneshot) { return STAT_OK; } @@ -236,6 +254,81 @@ static int xioopen_posixmq( } +/* With option flush try to open the queue and "consume" its current contents */ +static int _posixmq_flush( + struct single *sfd) +{ + mqd_t mqd; + struct mq_attr attr; + void *buff; + size_t bufsiz; + int _errno; + int p = 0; /* number of messages flushed */ + size_t b = 0; /* number of bytes flushed */ + + Info1("flushing POSIXMQ queue \"%s\"", sfd->para.posixmq.name); + Debug1("mq_open(\"%s\", O_RDONLY|O_NONBLOCK, 0, NULL)", + sfd->para.posixmq.name); + mqd = mq_open(sfd->para.posixmq.name, O_RDONLY|O_NONBLOCK, 0, NULL); + _errno = errno; + Debug1("mq_open() -> %d", mqd); + if (mqd < 0 && _errno == ENOENT) { + Info("this queue does not exist, no need to flush it"); + return STAT_OK; + } + if (mqd < 0) { + Warn2("mq_open(\"%s\", ...): %s", sfd->para.posixmq.name, + strerror(_errno)); + return STAT_NORETRY; + } + + Debug1("mq_getattr(%d, ...)", mqd); + if (mq_getattr(mqd, &attr) < 0) { + Warn4("mq_getattr(%d[\"%s\"], %p): %s", + mqd, sfd->para.posixmq.name, &attr, strerror(errno)); + mq_close(mqd); + return STAT_NORETRY; + } + if (attr.mq_curmsgs == 0) { + Info1("POSIXMQ \"%s\" is empty", sfd->para.posixmq.name); + mq_close(mqd); + return STAT_OK; + } + bufsiz = attr.mq_msgsize; + if ((buff = Malloc(bufsiz)) == NULL) { + mq_close(mqd); + return STAT_RETRYLATER; + } + + /* Now read all messages to null */ + while (true) { + ssize_t bytes; + + Debug3("mq_receive(mqd=%d, %p, "F_Zu", {} )", mqd, buff, bufsiz); + bytes = mq_receive(mqd, buff, bufsiz, &sfd->para.posixmq.prio); + _errno = errno; + Debug1("mq_receive() -> "F_Zd, bytes); + errno = _errno; + if (bytes == 0 || (bytes < 0 && _errno == EAGAIN)) { + break; + } + if (bytes < 0) { + Warn2("flushing POSIXMQ \"%s\" failed: %s", + sfd->para.posixmq.name, strerror(_errno)); + free(buff); + mq_close(mqd); + return STAT_NORETRY; + } + ++p; + b += bytes; + } + Info3("flushed "F_Zu" bytes in %u packets from queue \"%s\"", b, p, + sfd->para.posixmq.name); + free(buff); + mq_close(mqd); + return STAT_OK; +} + ssize_t xiowrite_posixmq( struct single *sfd, const void *buff, @@ -288,6 +381,9 @@ ssize_t xioclose_posixmq( struct single *sfd) { int res; + + if (sfd->fd < 0) + return 0; Debug1("xioclose_posixmq(): mq_close(%d)", sfd->fd); res = mq_close(sfd->fd); if (res < 0) { diff --git a/xio-posixmq.h b/xio-posixmq.h index 8359696..b7242e0 100644 --- a/xio-posixmq.h +++ b/xio-posixmq.h @@ -11,6 +11,7 @@ extern const struct addrdesc xioaddr_posixmq_receive; extern const struct addrdesc xioaddr_posixmq_send; extern const struct optdesc opt_posixmq_priority; +extern const struct optdesc opt_posixmq_flush; extern ssize_t xioread_posixmq(struct single *file, void *buff, size_t bufsiz); extern ssize_t xiopending_posixmq(struct single *pipe); diff --git a/xioopts.c b/xioopts.c index a93a1b6..3824014 100644 --- a/xioopts.c +++ b/xioopts.c @@ -1032,6 +1032,7 @@ const struct optname optionnames[] = { #endif IF_ANY ("mode", &opt_perm) #if WITH_POSIXMQ + IF_ANY ("mq-flush", &opt_posixmq_flush) IF_ANY ("mq-prio", &opt_posixmq_priority) #endif #ifdef TCP_MAXSEG @@ -1348,6 +1349,7 @@ const struct optname optionnames[] = { IF_INTERFACE("portsel", &opt_iff_portsel) #endif #if WITH_POSIXMQ + IF_ANY ("posixmq-flush", &opt_posixmq_flush) IF_ANY ("posixmq-priority", &opt_posixmq_priority) #endif #if (WITH_IP4 || WITH_IP6) && HAVE_RESOLV_H && WITH_RES_PRIMARY diff --git a/xioopts.h b/xioopts.h index 37545e1..4796fb2 100644 --- a/xioopts.h +++ b/xioopts.h @@ -482,6 +482,7 @@ enum e_optcode { OPT_LOWPORT, OPT_MAX_CHILDREN, #if WITH_POSIXMQ + OPT_POSIXMQ_FLUSH, OPT_POSIXMQ_PRIORITY, #endif #ifdef NLDLY