From 7d6295114bd1ce13e989a4a3d6ff3309e6d12b4d Mon Sep 17 00:00:00 2001 From: Gerhard Rieger Date: Sat, 30 Sep 2023 19:32:14 +0200 Subject: [PATCH] New feature POSIX message queues (MQ) --- CHANGES | 6 + Makefile.in | 4 +- config.h.in | 1 + configure.ac | 17 +++ doc/socat.yo | 102 +++++++++++++- socat.c | 5 + sysincludes.h | 3 + test.sh | 344 +++++++++++++++++++++++++++++++++++++++++++++++- xio-posixmq.c | 316 ++++++++++++++++++++++++++++++++++++++++++++ xio-posixmq.h | 20 +++ xio-socks5.c | 2 +- xio.h | 19 ++- xioclose.c | 6 + xioinitialize.c | 1 + xiomodes.h | 1 + xioopen.c | 7 + xioopts.c | 6 + xioopts.h | 5 + xioread.c | 12 ++ xiosigchld.c | 4 + xiowrite.c | 9 ++ 21 files changed, 876 insertions(+), 14 deletions(-) create mode 100644 xio-posixmq.c create mode 100644 xio-posixmq.h diff --git a/CHANGES b/CHANGES index df39858..df2cfc3 100644 --- a/CHANGES +++ b/CHANGES @@ -127,6 +127,12 @@ Features: SOCKS5-LISTEN:::: Thanks to Charlie Svensson and others for contributions. + New address types POSIXMQ-RECEIVE, POSIXMQ-READ, POSIXMQ-SEND, and + POSIXMQ-BIDIRECTIONAL (Linux only, experimental), and option + posixmq-priority + Tests: LINUX_POSIXMQ_READ_PRIO LINUX_POSIXMQ_RECV_FORK + LINUX_POSIXMQ_RECV_MAXCHILDREN LINUX_POSIXMQ_SEND_MAXCHILDREN + Corrections: When a sub process (EXEC, SYSTEM) terminated with exit code other than 0, its last sent data might have been lost depending on timing of read/ diff --git a/Makefile.in b/Makefile.in index 367783a..dd46d7a 100644 --- a/Makefile.in +++ b/Makefile.in @@ -48,7 +48,7 @@ XIOSRCS = xioinitialize.c xiohelp.c xioparam.c xiodiag.c xioopen.c xioopts.c \ xio-socketpair.c xio-gopen.c xio-creat.c xio-file.c xio-named.c \ xio-socket.c xio-interface.c xio-listen.c xio-unix.c xio-vsock.c \ xio-ip.c xio-ip4.c xio-ip6.c xio-ipapp.c xio-tcp.c \ - xio-sctp.c xio-rawip.c \ + xio-sctp.c xio-rawip.c xio-posixmq.c \ xio-socks.c xio-socks5.c xio-proxy.c xio-udp.c \ xio-progcall.c xio-exec.c xio-system.c xio-termios.c xio-readline.c \ xio-pty.c xio-openssl.c xio-streams.c xio-namespaces.c \ @@ -65,7 +65,7 @@ HFILES = sycls.h sslcls.h error.h dalan.h procan.h filan.h hostan.h sysincludes. xiomodes.h xiolayer.h xio-process.h xio-fd.h xio-fdnum.h xio-stdio.h \ xio-named.h xio-file.h xio-creat.h xio-gopen.h xio-pipe.h \ xio-socketpair.h xio-socket.h xio-interface.h xio-listen.h xio-unix.h xio-vsock.h \ - xio-ip.h xio-ip4.h xio-ip6.h xio-rawip.h \ + xio-ip.h xio-ip4.h xio-ip6.h xio-rawip.h xio-posixmq.h \ xio-ipapp.h xio-tcp.h xio-udp.h xio-sctp.h \ xio-socks.h xio-socks5.h xio-proxy.h xio-progcall.h xio-exec.h \ xio-system.h xio-termios.h xio-readline.h \ diff --git a/config.h.in b/config.h.in index 2d5266a..f24152d 100644 --- a/config.h.in +++ b/config.h.in @@ -704,6 +704,7 @@ #undef WITH_UDP #undef WITH_SCTP #undef WITH_LISTEN +#undef WITH_POSIXMQ #undef WITH_SOCKS4 #undef WITH_SOCKS4A #undef WITH_SOCKS5 diff --git a/configure.ac b/configure.ac index 0d54292..adff4c5 100644 --- a/configure.ac +++ b/configure.ac @@ -444,6 +444,23 @@ AC_ARG_ENABLE(listen, [ --disable-listen disable listen support], esac], [AC_DEFINE(WITH_LISTEN) AC_MSG_RESULT(yes)]) +AC_MSG_CHECKING(whether to include POSIX MQ support) +AC_ARG_ENABLE(posixmq, [ --disable-posixmq disable POSIX MQ support], + [case "$enableval" in + no) AC_MSG_RESULT(no) + WITH_POSIXMQ= ;; + *) WITH_POSIXMQ=1; AC_MSG_RESULT(yes);; + esac], + [WITH_POSIXMQ=1; AC_MSG_RESULT(yes)]) +if test "$WITH_POSIXMQ"; then + case "`uname`" in + Linux) AC_DEFINE(WITH_POSIXMQ) + LIBS="$LIBS -lrt" ;; + *) AC_MSG_WARN([POSIX MQ currently implemented for Linux only]) + WITH_POSIXMQ= ;; + esac +fi + AC_MSG_CHECKING(whether to include socks4 support) AC_ARG_ENABLE(socks4, [ --disable-socks4 disable socks4 support], [case "$enableval" in diff --git a/doc/socat.yo b/doc/socat.yo index a8e6a6b..8ba7fa1 100644 --- a/doc/socat.yo +++ b/doc/socat.yo @@ -121,7 +121,7 @@ dit(bf(tt(-d -d -d -d | -dddd | -d4))) Prints fatal, error, warning, notice, inf messages. dit(bf(tt(-D))) Logs information about file descriptors before starting the transfer phase. -dit(bf(tt(--experimental))) +label(option_experimental)dit(bf(tt(--experimental))) New features that are not well tested or are subject to change in the future must me explicitely enabled using this option. dit(bf(tt(-ly[]))) @@ -700,6 +700,53 @@ label(ADDRESS_SOCKETPAIR)dit(bf(tt(SOCKETPAIR))) Useful options: link(socktype)(OPTION_SO_TYPE)nl() See also: link(unnamed pipe)(ADDRESS_UNNAMED_PIPE) + +label(ADDRESS_POSIXMQ_READ)dit(bf(tt(POSIXMQ-READ:/))) + Opens the specified POSIX message queue and reads messages (packets). It + keeps the boundaries.nl() + This is a read-only address, see options link(-u)(option_u) and + link(-U)(option_U) and link(dual addresses)(ADDRESS_DUAL).nl() + Socat() provides this address type only on Linux because POSIX MQ is based + on UNIX filedescriptors there.nl() + This feature is new in version 1.7.5.0 and might change in the future, + therefore it is link(experimental)(option_experimental).nl() + Useful options: + link(posixmq-priority)(OPTION_POSIXMQ_PRIORITY), + link(unlink-early)(OPTION_UNLINK_EARLY), + link(unlink-close)(OPTION_UNLINK_CLOSE) + +label(ADDRESS_POSIXMQ_RECEIVE)dit(bf(tt(POSIXMQ-RECEIVE:/))) +dit(bf(tt(POSIXMQ-RECV:/))) + Opens the specified POSIX message queue and reads one message (packet).nl() + This is a read-only address. See link(POSIXMQ-READ)(ADDRESS_POSIXMQ_READ) + for more info.nl() + Example: link(POSIX MQ recv with fork)(EXAMPLE_POSIXMQ_RECV_FORK)nl() + This feature is link(experimental)(option_experimental).nl() + Useful options: + link(posixmq-priority)(OPTION_POSIXMQ_PRIORITY), + link(fork)(OPTION_FORK), + link(max-children)(OPTION_MAX_CHILDREN), + link(unlink-early)(OPTION_UNLINK_EARLY), + link(unlink-close)(OPTION_UNLINK_CLOSE) + +label(ADDRESS_POSIXMQ_SEND)dit(bf(tt(POSIXMQ-SEND:/))) + Opens the specified POSIX message queue and writes messages (packets).nl() + This is a write-only address. See link(POSIXMQ-READ)(ADDRESS_POSIXMQ_READ) + for more info.nl() + (link(Example)(EXAMPLE_POSIXMQ_SEND))nl() + This feature is link(experimental)(option_experimental).nl() + Useful options: + link(posixmq-priority)(OPTION_POSIXMQ_PRIORITY), + link(fork)(OPTION_FORK), + link(max-children)(OPTION_MAX_CHILDREN), + link(unlink-early)(OPTION_UNLINK_EARLY), + link(unlink-close)(OPTION_UNLINK_CLOSE) + +label(ADDRESS_POSIXMQ_BIDIRECTIONAL)dit(bf(tt(POSIXMQ-BIDIRECTIONAL:/mqueue))) + Opens the specified POSIX message queue and writes and reads messages + (packet). This is probably rarely useful but has been implemented for + functional completeness. + label(ADDRESS_PROXY_CONNECT)dit(bf(tt(PROXY:::))) Connects to an HTTP proxy server on port 8080 using TCP/IP version 4 or 6 depending on address specification, name resolution, or option @@ -1913,8 +1960,8 @@ label(GROUP_PROCESS)em(bf(PROCESS option group)) Options of this group change the process properties instead of just affecting one data channel. For EXEC and SYSTEM addresses and for LISTEN and CONNECT type addresses with -option FORK, -these options apply to the child processes instead of the main socat process. +option link(fork)(OPTION_FORK), +these options apply to the child processes instead of the main socat() process. startdit() label(OPTION_CHROOT)dit(bf(tt(chroot=))) Performs a code(chroot()) operation to link()(TYPE_DIRECTORY) @@ -3187,6 +3234,16 @@ enddit() startdit()enddit()nl() +label(GROUP_POSIXMQ)em(bf(POSIX-MQ option group)) + +Options that may be applied to POSIX-MQ addresses. + +startdit() +label(OPTION_POSIXMQ_PRIORITY)dit(bf(tt(posixmq-priority (mq-prio)))) + Sets the priority of messages (packets) written to the queue, or the minimal + priority of packet read from the queue. +enddit() + label(VALUES) manpagesection(DATA VALUES) @@ -4137,6 +4194,45 @@ link(interface)(ADDRESS_INTERFACE) tt(hdlc0), and can transfer data between both devices. Use pppd on device tt(/var/run/ppp) then. +label(EXAMPLE_POSIXMQ_SEND) +mancommand(\.LP) +mancommand(\.nf) +mancommand(\fBsocat --experimental -u \\) +mancommand(\.RS) +mancommand(\fBSTDIO \\ + POSIXMQ-SEND:/queue1,unlink-early,mq-prio=10\fP) +mancommand(\.RE) +mancommand(\.fi) + +htmlcommand(
socat --experimental -u \ + STDIO \ + POSIXMQ-SEND:/queue1,unlink-early,mq-prio=10
) + +Writes packets read from stdio (i.e., lines of input when run interactively) +into POSIX message queue, with priority 10. + + +label(EXAMPLE_POSIXMQ_RECV_FORK) + +mancommand(\.LP) +mancommand(\.nf) +mancommand(\fBsocat --experimental -u \\) +mancommand(\.RS) +mancommand(\fBPOSIXMQ-RECV:/queue1,fork,max-children=3 \\ + SYSTEM:"robot.sh"\fP) + +mancommand(\.RE) +mancommand(\.fi) + +htmlcommand(
socat --experimental -u \ + POSIXMQ-RECV:/queue1,fork,max-children=3 \ + SYSTEM:"robot.sh"
) + +Receives messages (packets) from POSIX message queue and, for each messages, +forks a sub process that reads and processes the message. At most 3 sub +processes are allowed at the same time. + + label(EXAMPLE_HTTPECHO) COMMENT( dit(bf(tt(socat -T 1 -d -d TCP-L:10081,reuseaddr,fork,crlf SYSTEM:"echo -e \"\\\"HTTP/1.0 200 OK\\\nDocumentType: text/plain\\\n\\\ndate: \$\(date\)\\\nserver:\$SOCAT_SOCKADDR:\$SOCAT_SOCKPORT\\\nclient: \$SOCAT_PEERADDR:\$SOCAT_PEERPORT\\\n\\\"\"; cat; echo -e \"\\\"\\\n\\\"\""))) ) mancommand(\.LP) diff --git a/socat.c b/socat.c index 0c7a464..f8ea83d 100644 --- a/socat.c +++ b/socat.c @@ -600,6 +600,11 @@ void socat_version(FILE *fd) { #else fputs(" #undef WITH_LISTEN\n", fd); #endif +#ifdef WITH_POSIXMQ + fprintf(fd, " #define WITH_POSIXMQ %d\n", WITH_POSIXMQ); +#else + fputs(" #undef WITH_POSIXMQ\n", fd); +#endif #ifdef WITH_SOCKS4 fprintf(fd, " #define WITH_SOCKS4 %d\n", WITH_SOCKS4); #else diff --git a/sysincludes.h b/sysincludes.h index 8445ccb..b960dc4 100644 --- a/sysincludes.h +++ b/sysincludes.h @@ -180,6 +180,9 @@ #if WITH_NAMESPACES && HAVE_SCHED_H #include #endif +#if WITH_POSIXMQ +#include /* POSIX MQ */ +#endif #if WITH_READLINE # if HAVE_READLINE_READLINE_H #include diff --git a/test.sh b/test.sh index bf2be0e..2225f3c 100755 --- a/test.sh +++ b/test.sh @@ -16348,7 +16348,7 @@ N=$((N+1)) NAME=NETNS case "$TESTS" in *%$N%*|*%functions%*|*%root%*|*%namespace%*|*%netns%*|*%socket%*|*%$NAME%*) -ns=socat-$$-$N +ns=socat-$$-test$N TEST="$NAME: option netns (net namespace $ns)" # Start a simple echo server with option netns on localhost of a net namespace; # use a client process with option netns to send data to the net namespace @@ -16718,6 +16718,344 @@ esac N=$((N+1)) + +# Test the POSIX MQ feature with continuous READ and priorization on Linux +NAME=LINUX_POSIXMQ_READ_PRIO +case "$TESTS" in +*%$N%*|*%functions%*|*%socket%*|*%posixmq%*|*%$NAME%*) +TEST="$NAME: POSIX-MQ (Linux) with prio" +# Run a client/sender that creates a POSIX-MQ and sends a normal message and +# then a client/sender with a higher priority message. +# Run a passive/listening/receiving/reading process and check if it receives +# both messages and in the prioritized order +if ! eval $NUMCOND; then :; +elif [ "$UNAME" != Linux ]; then + $PRINTF "test $F_n $TEST... ${YELLOW}Only on Linux${NORMAL}\n" $N + numCANT=$((numCANT+1)) + listCANT="$listCANT $N" +elif ! F=$(testfeats POSIXMQ STDIO); then + $PRINTF "test $F_n $TEST... ${YELLOW}Feature $F not available in $SOCAT${NORMAL}\n" $N + numCANT=$((numCANT+1)) + listCANT="$listCANT $N" +elif ! A=$(testaddrs POSIXMQ-SEND POSIXMQ-READ STDIO); then + $PRINTF "test $F_n $TEST... ${YELLOW}Address $A not available in $SOCAT${NORMAL}\n" $N + numCANT=$((numCANT+1)) + listCANT="$listCANT $N" +elif ! o=$(testoptions mq-prio unlink-early unlink-close) >/dev/null; then + $PRINTF "test $F_n $TEST... ${YELLOW}Option $o not available in $SOCAT${NORMAL}\n" $N + numCANT=$((numCANT+1)) + listCANT="$listCANT $N" +else +tf="$td/test$N.stdout" +te="$td/test$N.stderr" +tdiff="$td/test$N.diff" +da="test$N $(date) $RANDOM" +tq=/test$N +CMD0a="$TRACE $SOCAT --experimental $opts -u STDIO POSIXMQ-SEND:$tq,mq-prio=0,unlink-early" +CMD0b="$TRACE $SOCAT --experimental $opts -u STDIO POSIXMQ-SEND:$tq,mq-prio=1" +CMD1="$TRACE $SOCAT --experimental $opts -u POSIXMQ-READ:$tq,unlink-close STDIO" +printf "test $F_n $TEST... " $N +echo "$da 0" |$CMD0a 2>"${te}0a" +rc0a=$? +echo "$da 1" |$CMD0b 2>"${te}0b" +rc0b=$? +$CMD1 >"${tf}1" 2>"${te}1" & +pid1=$! +relsleep 1 +kill $pid1; wait +if [ $rc0a -ne 0 -o $rc0b -ne 0 ]; then + $PRINTF "$FAILED\n" + echo "$CMD0a" + cat "${te}0a" >&2 + echo "$CMD0b" + cat "${te}0b" >&2 + echo "$CMD1" + cat "${te}1" >&2 + numFAIL=$((numFAIL+1)) + listFAIL="$listFAIL $N" + namesFAIL="$namesFAIL $NAME" +elif $ECHO "$da 1\n$da 0" |diff - ${tf}1 >${tdiff}1; then + $PRINTF "$OK\n" + if [ "$VERBOSE" ]; then echo "$CMD0a"; fi + if [ "$DEBUG" ]; then cat "${te}0a" >&2; fi + if [ "$VERBOSE" ]; then echo "$CMD0b"; fi + if [ "$DEBUG" ]; then cat "${te}0b" >&2; fi + if [ "$VERBOSE" ]; then echo "$CMD1"; fi + if [ "$DEBUG" ]; then cat "${te}1" >&2; fi + numOK=$((numOK+1)) +else + $PRINTF "$FAILED\n" + echo "$CMD0a" + cat "${te}0a" >&2 + echo "$CMD0b" + cat "${te}0b" >&2 + echo "$CMD1" + cat "${te}1" >&2 + echo "difference:" >&2 + cat ${tdiff}1 >&2 + numFAIL=$((numFAIL+1)) + listFAIL="$listFAIL $N" + namesFAIL="$namesFAIL $NAME" +fi +fi # NUMCOND + ;; +esac +N=$((N+1)) + +# Test the POSIX MQ feature with RECV,fork on Linux +NAME=LINUX_POSIXMQ_RECV_FORK +case "$TESTS" in +*%$N%*|*%functions%*|*%fork%*|*%socket%*|*%posixmq%*|*%$NAME%*) +TEST="$NAME: POSIX-MQ (Linux) RECV with fork" +# Start a POSIX-MQ receiver with fork that creates a POSIX-MQ and stores its +# output. +# Run two clients/senders each with a message. +# Check if both messages are stored. +if ! eval $NUMCOND; then :; +elif [ "$UNAME" != Linux ]; then + $PRINTF "test $F_n $TEST... ${YELLOW}Only on Linux${NORMAL}\n" $N + numCANT=$((numCANT+1)) + listCANT="$listCANT $N" +elif ! F=$(testfeats POSIXMQ STDIO); then + $PRINTF "test $F_n $TEST... ${YELLOW}Feature $F not available in $SOCAT${NORMAL}\n" $N + numCANT=$((numCANT+1)) + listCANT="$listCANT $N" +elif ! A=$(testaddrs POSIXMQ-SEND POSIXMQ-RECEIVE STDIO); then + $PRINTF "test $F_n $TEST... ${YELLOW}Address $A not available in $SOCAT${NORMAL}\n" $N + numCANT=$((numCANT+1)) + listCANT="$listCANT $N" +elif ! o=$(testoptions fork unlink-early unlink-close) >/dev/null; then + $PRINTF "test $F_n $TEST... ${YELLOW}Option $o not available in $SOCAT${NORMAL}\n" $N + numCANT=$((numCANT+1)) + listCANT="$listCANT $N" +else +tf="$td/test$N.stdout" +te="$td/test$N.stderr" +tdiff="$td/test$N.diff" +da="test$N $(date) $RANDOM" +tq=/test$N +CMD0="$TRACE $SOCAT --experimental $opts -u POSIXMQ-RECV:$tq,unlink-early,fork STDIO" +CMD1a="$TRACE $SOCAT --experimental $opts -u STDIO POSIXMQ-SEND:$tq" +CMD1b="$TRACE $SOCAT --experimental $opts -u STDIO POSIXMQ-SEND:$tq,unlink-close" +printf "test $F_n $TEST... " $N +$CMD0 2>"${te}0" >"${tf}0" & +pid0=$! +relsleep 1 +echo "$da 0" |$CMD1a >/dev/null 2>"${te}1a" +rc1a=$? +echo "$da 1" |$CMD1b >/dev/null 2>"${te}1b" +rc1b=$? +relsleep 1 +kill $pid0; wait +if [ $rc1a -ne 0 -o $rc1b -ne 0 ]; then + $PRINTF "$FAILED\n" + echo "$CMD0" + cat "${te}0" >&2 + echo "$CMD1a" + cat "${te}1a" >&2 + echo "$CMD1b" + cat "${te}1b" >&2 + numFAIL=$((numFAIL+1)) + listFAIL="$listFAIL $N" + namesFAIL="$namesFAIL $NAME" +elif $ECHO "$da 0\n$da 1" |diff - ${tf}0 >${tdiff}0; then + $PRINTF "$OK\n" + if [ "$VERBOSE" ]; then echo "$CMD0"; fi + if [ "$DEBUG" ]; then cat "${te}0" >&2; fi + if [ "$VERBOSE" ]; then echo "$CMD1a"; fi + if [ "$DEBUG" ]; then cat "${te}1a" >&2; fi + if [ "$VERBOSE" ]; then echo "$CMD1b"; fi + if [ "$DEBUG" ]; then cat "${te}1b" >&2; fi + numOK=$((numOK+1)) +else + $PRINTF "$FAILED\n" + echo "$CMD0" + cat "${te}0" >&2 + echo "$CMD1a" + cat "${te}1a" >&2 + echo "$CMD1b" + cat "${te}1b" >&2 + echo "difference:" >&2 + cat ${tdiff}0 >&2 + numFAIL=$((numFAIL+1)) + listFAIL="$listFAIL $N" + namesFAIL="$namesFAIL $NAME" +fi +fi # NUMCOND + ;; +esac +N=$((N+1)) + +# Test the POSIX MQ feature with RECV,fork,max-children on Linux +NAME=LINUX_POSIXMQ_RECV_MAXCHILDREN +case "$TESTS" in +*%$N%*|*%functions%*|*%fork%*|*%maxchildren%*|*%socket%*|*%posixmq%*|*%$NAME%*) +TEST="$NAME: POSIX-MQ (Linux) RECV with fork,max-children" +# Start a POSIX-MQ receiver with fork that creates a POSIX-MQ and stores its +# output via sub processes that sleeps after writing. +# Run a client/sender that sends message 1; +# run a client/sender that sends message 3, has to wait; +# write message 2 directly into output file; +# Check if the messages are stored in order of their numbers +if ! eval $NUMCOND; then :; +elif [ "$UNAME" != Linux ]; then + $PRINTF "test $F_n $TEST... ${YELLOW}Only on Linux${NORMAL}\n" $N + numCANT=$((numCANT+1)) + listCANT="$listCANT $N" +elif ! F=$(testfeats POSIXMQ STDIO SYSTEM); then + $PRINTF "test $F_n $TEST... ${YELLOW}Feature $F not available in $SOCAT${NORMAL}\n" $N + numCANT=$((numCANT+1)) + listCANT="$listCANT $N" +elif ! A=$(testaddrs POSIXMQ-SEND POSIXMQ-RECEIVE STDIO SYSTEM); then + $PRINTF "test $F_n $TEST... ${YELLOW}Address $A not available in $SOCAT${NORMAL}\n" $N + numCANT=$((numCANT+1)) + listCANT="$listCANT $N" +elif ! o=$(testoptions fork max-children unlink-early unlink-close) >/dev/null; then + $PRINTF "test $F_n $TEST... ${YELLOW}Option $o not available in $SOCAT${NORMAL}\n" $N + numCANT=$((numCANT+1)) + listCANT="$listCANT $N" +else +tf="$td/test$N.stdout" +te="$td/test$N.stderr" +tdiff="$td/test$N.diff" +da="test$N $(date) $RANDOM" +tq=/test$N +CMD0="$TRACE $SOCAT --experimental $opts -u POSIXMQ-RECV:$tq,unlink-early,fork,max-children=1 SYSTEM:\"cat\ >>${tf}0;\ sleep\ 1\"" +CMD1a="$TRACE $SOCAT --experimental $opts -u STDIO POSIXMQ-SEND:$tq" +CMD1b="$TRACE $SOCAT --experimental $opts -u STDIO POSIXMQ-SEND:$tq,unlink-close" +printf "test $F_n $TEST... " $N +eval $CMD0 2>"${te}0" >"${tf}0" & +pid0=$! +relsleep 1 +echo "$da 1" |$CMD1a >/dev/null 2>"${te}1a" +rc1a=$? +echo "$da 3" |$CMD1b >/dev/null 2>"${te}1b" +rc1b=$? +psleep 0.5 +echo "$da 2" >>"${tf}0" +sleep 1 # as in SYSTEM +kill $(childpids $pid0) $pid0; wait +if [ $rc1a -ne 0 -o $rc1b -ne 0 ]; then + $PRINTF "$FAILED\n" + echo "$CMD0" + cat "${te}0" >&2 + echo "$CMD1a" + cat "${te}1a" >&2 + echo "$CMD1b" + cat "${te}1b" >&2 + numFAIL=$((numFAIL+1)) + listFAIL="$listFAIL $N" + namesFAIL="$namesFAIL $NAME" +elif $ECHO "$da 1\n$da 2\n$da 3" |diff - ${tf}0 >${tdiff}0; then + $PRINTF "$OK\n" + if [ "$VERBOSE" ]; then echo "$CMD0"; fi + if [ "$DEBUG" ]; then cat "${te}0" >&2; fi + if [ "$VERBOSE" ]; then echo "$CMD1a"; fi + if [ "$DEBUG" ]; then cat "${te}1a" >&2; fi + if [ "$VERBOSE" ]; then echo "$CMD1b"; fi + if [ "$DEBUG" ]; then cat "${te}1b" >&2; fi + numOK=$((numOK+1)) +else + $PRINTF "$FAILED\n" + echo "$CMD0" + cat "${te}0" >&2 + echo "$CMD1a" + cat "${te}1a" >&2 + echo "$CMD1b" + cat "${te}1b" >&2 + echo "difference:" >&2 + cat ${tdiff}0 >&2 + numFAIL=$((numFAIL+1)) + listFAIL="$listFAIL $N" + namesFAIL="$namesFAIL $NAME" +fi +fi # NUMCOND + ;; +esac +N=$((N+1)) + +# Test the POSIX MQ feature with SEND,fork,max-children on Linux +NAME=LINUX_POSIXMQ_SEND_MAXCHILDREN +case "$TESTS" in +*%$N%*|*%functions%*|*%fork%*|*%maxchildren%*|*%socket%*|*%posixmq%*|*%$NAME%*) +TEST="$NAME: POSIX-MQ (Linux) SEND with fork,max-children" +# Start a POSIX-MQ receiver that creates a POSIX-MQ and transfers data from +# there to an output file +# Run a POSIX-MQ sender that two times forks and invokes a data generator +# for messages 1 and 3 in a shell process with some trailing sleep. +# Afterwards write message 2 directly into output file; message 3 should be +# delayed due to max-children option +# Check if the messages are stored in order of their numbers. +# The data generator is implemented as a receiver from an MQ with "1", "3" +if ! eval $NUMCOND; then :; +elif [ "$UNAME" != Linux ]; then + $PRINTF "test $F_n $TEST... ${YELLOW}Only on Linux${NORMAL}\n" $N + numCANT=$((numCANT+1)) + listCANT="$listCANT $N" +elif ! F=$(testfeats POSIXMQ STDIO SYSTEM); then + $PRINTF "test $F_n $TEST... ${YELLOW}Feature $F not available in $SOCAT${NORMAL}\n" $N + numCANT=$((numCANT+1)) + listCANT="$listCANT $N" +elif ! A=$(testaddrs POSIXMQ-SEND POSIXMQ-READ STDIO SYSTEM); then + $PRINTF "test $F_n $TEST... ${YELLOW}Address $A not available in $SOCAT${NORMAL}\n" $N + numCANT=$((numCANT+1)) + listCANT="$listCANT $N" +elif ! o=$(testoptions fork max-children mq-prio unlink-early unlink-close) >/dev/null; then + $PRINTF "test $F_n $TEST... ${YELLOW}Option $o not available in $SOCAT${NORMAL}\n" $N + numCANT=$((numCANT+1)) + listCANT="$listCANT $N" +#elif ! runsposixmq >/dev/null; then +# $PRINTF "test $F_n $TEST... ${YELLOW}IPv4 not available${NORMAL}\n" $N +# numCANT=$((numCANT+1)) +# listCANT="$listCANT $N" +else +tf="$td/test$N.stdout" +te="$td/test$N.stderr" +tdiff="$td/test$N.diff" +da="test$N $(date) $RANDOM" +tq=/test$N +CMD0="$TRACE $SOCAT --experimental $opts -u POSIXMQ-READ:$tq,unlink-early STDIO" +CMD1="$TRACE $SOCAT --experimental $opts -U POSIXMQ-SEND:$tq,fork,max-children=1,interval=0.1 SYSTEM:\"./socat\ --experimental\ -u\ POSIXMQ-RECV\:$tq-data\ -;\ sleep\ 1\"" +printf "test $F_n $TEST... " $N +# create data for the generator +echo "$da 1" |$SOCAT -u --experimental - POSIXMQ-SEND:$tq-data,unlink-early +echo "$da 3" |$SOCAT -u --experimental - POSIXMQ-SEND:$tq-data +eval $CMD0 2>"${te}0" >>"${tf}0" & +pid0=$! +relsleep 1 +eval $CMD1 2>"${te}1" & +pid1=$! +psleep 0.5 +echo "$da 2" >>"${tf}0" +sleep 1 # as in SYSTEM +kill $pid0 $(childpids $pid0) $pid1 $(childpids $pid1) +wait +$SOCAT -u --experimental /dev/null POSIXMQ-SEND:$tq-data,unlink-close +if $ECHO "$da 1\n$da 2\n$da 3" |diff - ${tf}0 >${tdiff}0; then + $PRINTF "$OK\n" + if [ "$VERBOSE" ]; then echo "$CMD0"; fi + if [ "$DEBUG" ]; then cat "${te}0" >&2; fi + if [ "$VERBOSE" ]; then echo "$CMD1"; fi + if [ "$DEBUG" ]; then cat "${te}1" >&2; fi + numOK=$((numOK+1)) +else + $PRINTF "$FAILED\n" + echo "$CMD0" + cat "${te}0" >&2 + echo "$CMD1" + cat "${te}1" >&2 + echo "difference:" >&2 + cat ${tdiff}0 >&2 + numFAIL=$((numFAIL+1)) + listFAIL="$listFAIL $N" + namesFAIL="$namesFAIL $NAME" +fi +fi # NUMCOND + ;; +esac +N=$((N+1)) + + # end of common tests ################################################################################## @@ -16906,14 +17244,14 @@ waitport $PORT 1 echo "$da" |$CMD1 >"${tf}1" 2>"${te}1" rc1=$? kill $pid0 2>/dev/null; wait -if echo "$da" |diff "${tf}1" - >$tdiff !!!; then +if echo "$da" |diff "${tf}1" - >$tdiff; then $PRINTF "$OK\n" if [ "$VERBOSE" ]; then echo "$CMD0 &"; fi if [ "$DEBUG" ]; then cat "${te}0" >&2; fi if [ "$VERBOSE" ]; then echo "$CMD1"; fi if [ "$DEBUG" ]; then cat "${te}1" >&2; fi numOK=$((numOK+1)) -elif [ !!! ]; then +elif [ !?!?! ]; then # The test could not run meaningfully $PRINTF "$CANT\n" if [ "$VERBOSE" ]; then echo "$CMD0 &"; fi diff --git a/xio-posixmq.c b/xio-posixmq.c new file mode 100644 index 0000000..78d26fe --- /dev/null +++ b/xio-posixmq.c @@ -0,0 +1,316 @@ +/* Source: xio-posixmq.c */ +/* Copyright Gerhard Rieger and contributors (see file CHANGES) */ +/* Published under the GNU General Public License V.2, see file COPYING */ + +/* This file contains the source for opening addresses of POSIX MQ type */ + +#include "xiosysincludes.h" +#include "xioopen.h" + +#include "xio-socket.h" +#include "xio-listen.h" +#include "xio-posixmq.h" +#include "xio-named.h" + + +#if WITH_POSIXMQ + +static int _posixmq_unlink( + const char *name, + int level); /* message level on error */ + +static int xioopen_posixmq(int argc, const char *argv[], struct opt *opts, int xioflags, xiofile_t *xfd, groups_t groups, int dirs, int dummy, int dummy3); + +const struct addrdesc xioaddr_posixmq_bidir = { "POSIXMQ-BIDIRECTIONAL", 1+XIO_RDWR, xioopen_posixmq, GROUP_FD|GROUP_NAMED|GROUP_POSIXMQ|GROUP_RETRY, XIO_RDWR, 0, 0 HELP(":") }; +const struct addrdesc xioaddr_posixmq_read = { "POSIXMQ-READ", 1+XIO_RDONLY, xioopen_posixmq, GROUP_FD|GROUP_NAMED|GROUP_POSIXMQ|GROUP_RETRY, XIO_RDONLY, 0, 0 HELP(":") }; +const struct addrdesc xioaddr_posixmq_receive = { "POSIXMQ-RECEIVE", 1+XIO_RDONLY, xioopen_posixmq, GROUP_FD|GROUP_NAMED|GROUP_POSIXMQ|GROUP_RETRY|GROUP_CHILD, XIO_RDONLY, XIOREAD_RECV_ONESHOT, 0 HELP(":") }; +const struct addrdesc xioaddr_posixmq_send = { "POSIXMQ-SEND", 1+XIO_WRONLY, xioopen_posixmq, GROUP_FD|GROUP_NAMED|GROUP_POSIXMQ|GROUP_RETRY|GROUP_CHILD, XIO_WRONLY, 0, 0 HELP(":") }; + +const struct optdesc opt_posixmq_priority = { "posixmq-priority", "mq-pri", OPT_POSIXMQ_PRIORITY, GROUP_POSIXMQ, PH_INIT, TYPE_BOOL, OFUNC_OFFSET, XIO_OFFSETOF(para.posixmq.prio), XIO_SIZEOF(para.posixmq.prio), 0 }; + +/* _read(): open immediately, stay in transfer loop + _recv(): wait until data (how we know there is??), oneshot, opt.fork +*/ +static int xioopen_posixmq( + int argc, + const char *argv[], + struct opt *opts, + int xioflags, + xiofile_t *xfd, + groups_t groups, + int dirs, + int oneshot, + int dummy3) +{ + /* We expect the form: /mqname */ + xiosingle_t *sfd = &xfd->stream; + const char *name; + bool opt_unlink_early = false; + int oflag; + bool opt_o_excl = false; + mode_t opt_mode = 0666; + mqd_t mqd; + int _errno; + bool dofork = false; + int maxchildren = 0; + bool with_intv = false; + int result = 0; + + if (!xioparms.experimental) { + Error1("%s: use option --experimental to acknowledge unmature state", argv[0]); + } + if (argc != 2) { + Error2("%s: wrong number of parameters (%d instead of 1)", + argv[0], argc-1); + return STAT_NORETRY; + } + + name = argv[1]; + + retropt_bool(opts, OPT_FORK, &dofork); + if (dofork) { + if (!(xioflags & XIO_MAYFORK)) { + Error1("%s: option fork not allowed in this context", argv[0]); + return STAT_NORETRY; + } + sfd->flags |= XIO_DOESFORK; + if (dirs == XIO_WRONLY) { + with_intv = true; + } + } + + retropt_int(opts, OPT_MAX_CHILDREN, &maxchildren); + if (! dofork && maxchildren) { + Error("option max-children not allowed without option fork"); + return STAT_NORETRY; + } + if (maxchildren) { + xiosetchilddied(); /* set SIGCHLD handler */ + } + applyopts_offset(sfd, opts); + if (applyopts_single(sfd, opts, PH_INIT) < 0) return STAT_NORETRY; + applyopts(-1, opts, PH_INIT); + + if ((sfd->para.posixmq.name = strdup(name)) == NULL) { + Error1("strdup(\"%s\"): out of memory", name); + } + + retropt_bool(opts, OPT_O_EXCL, &opt_o_excl); + retropt_mode(opts, OPT_PERM, &opt_mode); + + retropt_bool(opts, OPT_UNLINK_EARLY, &opt_unlink_early); + if (opt_unlink_early) { + _posixmq_unlink(sfd->para.posixmq.name, E_INFO); + } + retropt_bool(opts, OPT_UNLINK_CLOSE, &sfd->opt_unlink_close); + sfd->howtoend = END_CLOSE; + sfd->dtype = XIODATA_POSIXMQ | oneshot; + + oflag = O_CREAT; + if (opt_o_excl) oflag |= O_EXCL; + switch (dirs) { + case XIO_RDWR: oflag |= O_RDWR; break; + case XIO_RDONLY: oflag |= O_RDONLY; break; + case XIO_WRONLY: oflag |= O_WRONLY; break; + } + + /* Now open the message queue */ + Debug3("mq_open(\"%s\", %d, "F_mode", NULL)", name, oflag, opt_mode); + mqd = mq_open(name, oflag, opt_mode, NULL); + _errno = errno; + Debug1("mq_open() -> %d", mqd); + if (mqd < 0) { + Error3("%s: mq_open(\"%s\"): %s", argv[0], name, strerror(errno)); + errno = _errno; + return STAT_RETRYLATER; + } + sfd->fd = mqd; + + if (!dofork && !oneshot) { + return STAT_OK; + } + /* Continue with modes that open only when data available */ + + if (!oneshot) { + if (xioparms.logopt == 'm') { + Info("starting POSIX-MQ fork loop, switching to syslog"); + diag_set('y', xioparms.syslogfac); xioparms.logopt = 'y'; + } else { + Info("starting POSIX-MQ fork loop"); + } + } + + /* Wait until a message is available (or until interval has expired), + then fork a sub process that handles this single message. Here we + continue waiting for more. + The trigger mechanism is described with function + _xioopen_dgram_recvfrom() + */ + while (true) { + int trigger[2]; + pid_t pid; /* mostly int; only used with fork */ + sigset_t mask_sigchld; + + Info1("%s: waiting for data or interval", argv[0]); + do { + struct pollfd pollfd; + + pollfd.fd = sfd->fd; + pollfd.events = (dirs==XIO_RDONLY?POLLIN:POLLOUT); + if (xiopoll(&pollfd, 1, NULL) > 0) { + break; + } + if (errno == EINTR) { + continue; + } + Warn2("poll({%d,,},,-1): %s", sfd->fd, strerror(errno)); + Sleep(1); + } while (true); + if (!dofork) return STAT_OK; + + Info("generating pipe that triggers parent when packet has been consumed"); + if (dirs == XIO_RDONLY) { + if (Pipe(trigger) < 0) { + Error1("pipe(): %s", strerror(errno)); + } + } + + /* Block SIGCHLD until parent is ready to react */ + sigemptyset(&mask_sigchld); + sigaddset(&mask_sigchld, SIGCHLD); + Sigprocmask(SIG_BLOCK, &mask_sigchld, NULL); + + if ((pid = xio_fork(false, E_ERROR, xfd->stream.shutup)) < 0) { + Sigprocmask(SIG_UNBLOCK, &mask_sigchld, NULL); + if (dirs==XIO_RDONLY) { + Close(trigger[0]); + Close(trigger[1]); + } + xioclose_posixmq(sfd); + return STAT_RETRYLATER; + } + if (pid == 0) { /* child */ + pid_t cpid = Getpid(); + Sigprocmask(SIG_UNBLOCK, &mask_sigchld, NULL); + xiosetenvulong("PID", cpid, 1); + + if (dirs == XIO_RDONLY) { + Close(trigger[0]); + Fcntl_l(trigger[1], F_SETFD, FD_CLOEXEC); + sfd->triggerfd = trigger[1]; + } + break; + } + + /* Parent */ + if (dirs == XIO_RDONLY) { + char buf[1]; + Close(trigger[1]); + while (Read(trigger[0], buf, 1) < 0 && errno == EINTR) + ; + } + + if (with_intv) { + Nanosleep(&sfd->intervall, NULL); + } + + /* now we are ready to handle signals */ + Sigprocmask(SIG_UNBLOCK, &mask_sigchld, NULL); + while (maxchildren) { + if (num_child < maxchildren) break; + Notice1("max of %d children is active, waiting", num_child); + while (!Sleep(UINT_MAX)) ; /* any signal lets us continue */ + } + Info("continue listening"); + } + + _xio_openlate(sfd, opts); + return result; +} + + +ssize_t xiowrite_posixmq( + struct single *sfd, + const void *buff, + size_t bufsiz) +{ + int res; + int _errno; + + Debug4("mq_send(mqd=%d, %p, "F_Zu", %u)", sfd->fd, buff, bufsiz, sfd->para.posixmq.prio); + res = mq_send(sfd->fd, buff, bufsiz, sfd->para.posixmq.prio); + _errno = errno; + Debug1("mq_send() -> %d", res); + errno = _errno; + if (res < 0) { + Error2("mq_send(mqd=%d): %s", sfd->fd, strerror(errno)); + return -1; + } + return bufsiz; /* success */ +} + +ssize_t xioread_posixmq( + struct single *sfd, + void *buff, + size_t bufsiz) +{ + ssize_t res; + int _errno; + + Debug3("mq_receive(mqd=%d, %p, "F_Zu", {} )", sfd->fd, buff, bufsiz); + res = mq_receive(sfd->fd, buff, bufsiz, &sfd->para.posixmq.prio); + _errno = errno; + Debug1("mq_receive() -> "F_Zd, res); + errno = _errno; + if (res < 0) { + Error2("mq_receive(mqd=%d): %s", sfd->fd, strerror(errno)); + return -1; + } + if (sfd->triggerfd > 0) { + Close(sfd->triggerfd); + sfd->triggerfd = -1; + } + Info1("mq_receive() -> {prio=%u}", sfd->para.posixmq.prio); + xiosetenvulong("POSIXMQ_PRIO", (unsigned long)sfd->para.posixmq.prio, 1); + return res; +} + +ssize_t xiopending_posixmq(struct single *sfd); + +ssize_t xioclose_posixmq( + struct single *sfd) +{ + int res; + Debug1("xioclose_posixmq(): mq_close(%d)", sfd->fd); + res = mq_close(sfd->fd); + if (res < 0) { + Warn2("xioclose_posixmq(): mq_close(%d) -> -1: %s", sfd->fd, strerror(errno)); + } else { + Debug("xioclose_posixmq(): mq_close() -> 0"); + } + if (sfd->opt_unlink_close) { + _posixmq_unlink(sfd->para.posixmq.name, E_WARN); + } + free((void *)sfd->para.posixmq.name); + return 0; +} + +static int _posixmq_unlink( + const char *name, + int level) /* message level on error */ +{ + int _errno; + int res; + + Debug1("mq_unlink(\"%s\")", name); + res = mq_unlink(name); + _errno = errno; + Debug1("mq_unlink() -> %d", res); + errno = _errno; + if (res < 0) { + Msg2(level, "mq_unlink(\"%s\"): %s",name, strerror(errno)); + } + return res; +} + +#endif /* WITH_POSIXMQ */ diff --git a/xio-posixmq.h b/xio-posixmq.h new file mode 100644 index 0000000..8359696 --- /dev/null +++ b/xio-posixmq.h @@ -0,0 +1,20 @@ +/* Source: xio-posixmq.h */ +/* Copyright Gerhard Rieger and contributors (see file CHANGES) */ +/* Published under the GNU General Public License V.2, see file COPYING */ + +#ifndef __xio_posixmq_h_included +#define __xio_posixmq_h_included 1 + +extern const struct addrdesc xioaddr_posixmq_bidir; +extern const struct addrdesc xioaddr_posixmq_read; +extern const struct addrdesc xioaddr_posixmq_receive; +extern const struct addrdesc xioaddr_posixmq_send; + +extern const struct optdesc opt_posixmq_priority; + +extern ssize_t xioread_posixmq(struct single *file, void *buff, size_t bufsiz); +extern ssize_t xiopending_posixmq(struct single *pipe); +extern ssize_t xiowrite_posixmq(struct single *file, const void *buff, size_t bufsiz); +extern ssize_t xioclose_posixmq(struct single *sfd); + +#endif /* !defined(__xio_posixmq_h_included) */ diff --git a/xio-socks5.c b/xio-socks5.c index 18b7c38..fb85202 100644 --- a/xio-socks5.c +++ b/xio-socks5.c @@ -55,7 +55,7 @@ static int xioopen_socks5(int argc, const char *argv[], struct opt *opts, const struct addrdesc xioaddr_socks5_connect = { "SOCKS5-CONNECT", 1+XIO_RDWR, xioopen_socks5, GROUP_FD|GROUP_SOCKET|GROUP_SOCK_IP4|GROUP_SOCK_IP6|GROUP_IP_TCP|GROUP_CHILD|GROUP_RETRY, SOCKS5_COMMAND_CONNECT, 0, 0 HELP("::::") }; -const struct addrdesc xioaddr_socks5_listen = { "SOCKS5-LISTEN", 1+XIO_RDWR, xioopen_socks5, GROUP_FD|GROUP_SOCKET|GROUP_SOCK_IP4|GROUP_SOCK_IP6|GROUP_IP_TCP|GROUP_CHILD|GROUP_RETRY, SOCKS5_COMMAND_BIND, 0, 0 HELP("::::") }; +const struct addrdesc xioaddr_socks5_listen = { "SOCKS5-LISTEN", 1+XIO_RDWR, xioopen_socks5, GROUP_FD|GROUP_SOCKET|GROUP_SOCK_IP4|GROUP_SOCK_IP6|GROUP_IP_TCP|GROUP_CHILD|GROUP_RETRY, SOCKS5_COMMAND_BIND, 0, 0 HELP("::::") }; static const char * _xioopen_socks5_strerror(uint8_t r) { diff --git a/xio.h b/xio.h index 6c3d057..3c47d64 100644 --- a/xio.h +++ b/xio.h @@ -45,15 +45,17 @@ struct opt; #define XIOREAD_STREAM 0x1000 /* read() (default) */ #define XIOREAD_RECV 0x2000 /* recvfrom() */ #define XIOREAD_PTY 0x4000 /* handle EIO */ -#define XIOREAD_READLINE 0x5000 /* ... */ -#define XIOREAD_OPENSSL 0x6000 /* SSL_read() */ +#define XIOREAD_POSIXMQ 0x5000 /* POSIX MQ */ +#define XIOREAD_READLINE 0x6000 /* ... */ +#define XIOREAD_OPENSSL 0x7000 /* SSL_read() */ #define XIODATA_WRITEMASK 0x0f00 /* mask for basic r/w method */ #define XIOWRITE_STREAM 0x0100 /* write() (default) */ #define XIOWRITE_SENDTO 0x0200 /* sendto() */ #define XIOWRITE_PIPE 0x0300 /* write() to alternate (pipe) Fd */ #define XIOWRITE_2PIPE 0x0400 /* write() to alternate (2pipe) Fd */ -#define XIOWRITE_READLINE 0x0500 /* check for prompt */ -#define XIOWRITE_OPENSSL 0x0600 /* SSL_write() */ +#define XIOWRITE_POSIXMQ 0x0500 /* POSIX MQ */ +#define XIOWRITE_READLINE 0x0600 /* check for prompt */ +#define XIOWRITE_OPENSSL 0x0700 /* SSL_write() */ /* modifiers to XIODATA_READ_RECV */ #define XIOREAD_RECV_CHECKPORT 0x0001 /* recv, check peer port */ #define XIOREAD_RECV_CHECKADDR 0x0002 /* recv, check peer address */ @@ -74,6 +76,7 @@ struct opt; #define XIODATA_RECV_SKIPIP (XIODATA_RECV|XIOREAD_RECV_SKIPIP) #define XIODATA_PIPE (XIOREAD_STREAM|XIOWRITE_PIPE) #define XIODATA_2PIPE (XIOREAD_STREAM|XIOWRITE_2PIPE) +#define XIODATA_POSIXMQ (XIOREAD_POSIXMQ|XIOWRITE_POSIXMQ) #define XIODATA_PTY (XIOREAD_PTY|XIOWRITE_STREAM) #define XIODATA_READLINE (XIOREAD_READLINE|XIOWRITE_STREAM) #define XIODATA_OPENSSL (XIOREAD_OPENSSL|XIOWRITE_OPENSSL) @@ -169,7 +172,7 @@ typedef struct single { size_t actbytes; /* so many bytes still to be read (when readbytes!=0)*/ xiolock_t lock; /* parameters of lockfile */ bool havelock; /* we are happy owner of the above lock */ - int triggerfd; /* close this FD in child process to signal parent */ + int triggerfd; /* close this FD in child process to notify parent */ bool cool_write; /* downlevel EPIPE, ECONNRESET to notice */ /* until here, keep consistent with bipipe.dual ! */ int argc; /* number of fields in argv */ @@ -243,6 +246,12 @@ typedef struct single { #endif /* WITH_UNIX */ } socket; #endif /* _WITH_SOCKET */ +#if WITH_POSIXMQ + struct { + const char *name; + unsigned int prio; /* POSIX message queue */ + } posixmq; +#endif /* WITH_POSIXMQ */ struct { int fdout; /* use fd for output if two pipes */ pid_t pid; /* child PID, with EXEC: */ diff --git a/xioclose.c b/xioclose.c index 2a236d1..8fefe16 100644 --- a/xioclose.c +++ b/xioclose.c @@ -11,6 +11,7 @@ #include "xio-termios.h" #include "xio-interface.h" +#include "xio-posixmq.h" /* close the xio fd; must be valid and "simple" (not dual) */ @@ -51,6 +52,11 @@ int xioclose1(struct single *pipe) { } } #endif /* WITH_TERMIOS */ +#if WITH_POSIXMQ + if ((pipe->dtype & XIODATA_MASK) == XIODATA_POSIXMQ) { + xioclose_posixmq(pipe); + } +#endif /* WITH_POSIXMQ */ if (pipe->fd >= 0) { switch (pipe->howtoend) { case END_KILL: case END_SHUTDOWN_KILL: case END_CLOSE_KILL: diff --git a/xioinitialize.c b/xioinitialize.c index 813a4d9..c5f6abe 100644 --- a/xioinitialize.c +++ b/xioinitialize.c @@ -256,6 +256,7 @@ pid_t xio_fork(bool subchild, } num_child++; + Info1("number of children increased to %d", num_child); /* parent process */ Notice1("forked off child process "F_pid, pid); /* gdb recommends to have env controlled sleep after fork */ diff --git a/xiomodes.h b/xiomodes.h index 2faa2ae..971a9a7 100644 --- a/xiomodes.h +++ b/xiomodes.h @@ -37,6 +37,7 @@ #include "xio-vsock.h" #endif /* _WITH_SOCKET */ #include "xio-namespaces.h" +#include "xio-posixmq.h" #include "xio-progcall.h" #include "xio-exec.h" #include "xio-system.h" diff --git a/xioopen.c b/xioopen.c index 7a49811..e466630 100644 --- a/xioopen.c +++ b/xioopen.c @@ -150,6 +150,13 @@ const struct addrname addressnames[] = { #if WITH_PIPE { "PIPE", &xioaddr_pipe }, #endif +#if WITH_POSIXMQ + { "POSIXMQ-BIDIRECTIONAL", &xioaddr_posixmq_bidir }, + { "POSIXMQ-READ", &xioaddr_posixmq_read }, + { "POSIXMQ-RECEIVE", &xioaddr_posixmq_receive }, + { "POSIXMQ-RECV", &xioaddr_posixmq_receive }, + { "POSIXMQ-SEND", &xioaddr_posixmq_send }, +#endif #if WITH_PROXY { "PROXY", &xioaddr_proxy_connect }, { "PROXY-CONNECT", &xioaddr_proxy_connect }, diff --git a/xioopts.c b/xioopts.c index f31bbaa..324147a 100644 --- a/xioopts.c +++ b/xioopts.c @@ -991,6 +991,9 @@ const struct optname optionnames[] = { IF_OPENSSL("min-version", &opt_openssl_min_proto_version) #endif IF_ANY ("mode", &opt_perm) +#if WITH_POSIXMQ + IF_ANY ("mq-prio", &opt_posixmq_priority) +#endif #ifdef TCP_MAXSEG IF_TCP ("mss", &opt_tcp_maxseg) IF_TCP ("mss-late", &opt_tcp_maxseg_late) @@ -1295,6 +1298,9 @@ const struct optname optionnames[] = { #ifdef IFF_PORTSEL IF_INTERFACE("portsel", &opt_iff_portsel) #endif +#if WITH_POSIXMQ + IF_ANY ("posixmq-priority", &opt_posixmq_priority) +#endif #if HAVE_RESOLV_H && WITH_RES_PRIMARY IF_IP ("primary", &opt_res_primary) #endif diff --git a/xioopts.h b/xioopts.h index 509809e..f03f204 100644 --- a/xioopts.h +++ b/xioopts.h @@ -152,6 +152,8 @@ enum e_func { #define GROUP_FILE GROUP_REG #define GROUP_SOCKET 0x00000020 #define GROUP_READLINE 0x00000040 +#define GROUP_POSIXMQ 0x00000080 + #define GROUP_NAMED 0x00000100 /* file system entry */ #define GROUP_OPEN 0x00000200 /* flags for open() */ #define GROUP_EXEC 0x00000400 /* program or script execution */ @@ -465,6 +467,9 @@ enum e_optcode { OPT_LOCKFILE, OPT_LOWPORT, OPT_MAX_CHILDREN, +#if WITH_POSIXMQ + OPT_POSIXMQ_PRIORITY, +#endif #ifdef NLDLY # ifdef NL0 OPT_NL0, /* termios.c_oflag */ diff --git a/xioread.c b/xioread.c index 1399012..b67e2aa 100644 --- a/xioread.c +++ b/xioread.c @@ -9,6 +9,7 @@ #include "xio-termios.h" #include "xio-socket.h" +#include "xio-posixmq.h" #include "xio-readline.h" #include "xio-openssl.h" @@ -115,6 +116,17 @@ ssize_t xioread(xiofile_t *file, void *buff, size_t bufsiz) { return bytes; break; +#if WITH_POSIXMQ + case XIOREAD_POSIXMQ: + if ((bytes = xioread_posixmq(pipe, buff, bufsiz)) < 0) { + return -1; + } + if (pipe->dtype & XIOREAD_RECV_ONESHOT) { + pipe->eof = 2; + } + break; +#endif /* WITH_POSIXMQ */ + #if WITH_READLINE case XIOREAD_READLINE: if ((bytes = xioread_readline(pipe, buff, bufsiz)) < 0) { diff --git a/xiosigchld.c b/xiosigchld.c index 81f5dbc..10f7f49 100644 --- a/xiosigchld.c +++ b/xiosigchld.c @@ -112,6 +112,10 @@ void childdied(int signum) { return; } /*! indent */ + if (num_child) { + num_child--; + Info1("number of children decreased to %d", num_child); + } /* check if it was a registered child process */ i = 0; while (i < XIO_MAXSOCK) { diff --git a/xiowrite.c b/xiowrite.c index 769a3fd..1619bc4 100644 --- a/xiowrite.c +++ b/xiowrite.c @@ -8,6 +8,7 @@ #include "xiosysincludes.h" #include "xioopen.h" +#include "xio-posixmq.h" #include "xio-readline.h" #include "xio-openssl.h" @@ -139,6 +140,14 @@ ssize_t xiowrite(xiofile_t *file, const void *buff, size_t bytes) { } break; +#if WITH_POSIXMQ + case XIOWRITE_POSIXMQ: + if ((writt = xiowrite_posixmq(pipe, buff, bytes)) < 0) { + return -1; + } + break; +#endif /* WITH_POSIXMQ */ + #if WITH_OPENSSL case XIOWRITE_OPENSSL: /* this function prints its own error messages */