1
0
Fork 0
mirror of https://repo.or.cz/socat.git synced 2025-07-10 14:12:58 +00:00

Replaced SIGUSR1 with socketpair for synchronization between parent and child processes on RECVFROM type addresses

This commit is contained in:
Gerhard Rieger 2023-06-23 22:48:26 +02:00
parent a0e17a813a
commit 02483ff39e
12 changed files with 269 additions and 257 deletions

View file

@ -1084,101 +1084,13 @@ int _xioopen_dgram_sendto(/* them is already in xfd->peersa */
packet in the IP stacks input queue and forks a sub process. The sub process
then reads this packet for processing its data.
There is a problem because the parent process would find the same packet
again if it calls select()/poll() before the child process reads the
again if it calls select()/poll() before the child process has read the
packet.
To solve this problem we implement the following mechanism:
The sub process sends a SIGUSR1 when it has read the packet (or a SIGCHLD if
it dies before). The parent process waits until it receives that signal and
only then continues to listen.
To prevent a signal from another process to trigger our loop, we pass the
pid of the sub process to the signal handler in xio_waitingfor. The signal
handler sets xio_hashappened if the pid matched.
Before forking an unnamed pipe (fifo) is created. The sub process closes the
write side when it has read the packet. The parent process waits until the
read side of the pipe gives EOF and only then continues to listen.
*/
static pid_t xio_waitingfor; /* info from recv loop to signal handler:
indicates the pid of the child process
that should send us the USR1 signal */
static bool xio_hashappened; /* info from signal handler to loop: child
process has read ("consumed") the packet */
static int xio_childstatus;
/* this is the signal handler for USR1 and CHLD */
void xiosigaction_hasread(int signum
#if HAVE_STRUCT_SIGACTION_SA_SIGACTION && defined(SA_SIGINFO)
, siginfo_t *siginfo, void *ucontext
#endif
) {
pid_t pid;
int _errno;
int status = 0;
bool wassig = false;
_errno = errno;
diag_in_handler = 1;
#if HAVE_STRUCT_SIGACTION_SA_SIGACTION && defined(SA_SIGINFO)
Debug5("xiosigaction_hasread(%d, {%d,%d,%d,"F_pid"}, )",
signum, siginfo->si_signo, siginfo->si_errno, siginfo->si_code,
siginfo->si_pid);
#else
Debug1("xiosigaction_hasread(%d)", signum);
#endif
if (signum == SIGCHLD) {
do {
pid = Waitpid(-1, &status, WNOHANG);
if (pid == 0) {
Msg(wassig?E_INFO:E_WARN,
"waitpid(-1, {}, WNOHANG): no child has exited");
Info("xiosigaction_hasread() finished");
Debug("xiosigaction_hasread() ->");
diag_in_handler = 0;
errno = _errno;
return;
} else if (pid < 0 && errno == EINTR) {
Info1("xiosigaction_hasread(): %s", strerror(errno));
} else if (pid < 0 && errno == ECHILD) {
Msg(wassig?E_INFO:E_NOTICE,
"waitpid(-1, {}, WNOHANG): "F_strerror);
Info("xiosigaction_hasread() finished");
Debug("xiosigaction_hasread() ->");
diag_in_handler = 0;
errno = _errno;
return;
}
wassig = true;
if (pid < 0) {
Warn1("waitpid(-1, {%d}, WNOHANG): "F_strerror, status);
Info("xiosigaction_hasread() finished");
Debug("xiosigaction_hasread() ->");
diag_in_handler = 0;
errno = _errno;
return;
}
if (pid == xio_waitingfor) {
xio_waitingfor = 0; /* so this child will not set hashappened again */
xio_hashappened = true;
xio_childstatus = WEXITSTATUS(status);
Debug("xiosigaction_hasread() ->");
diag_in_handler = 0;
errno = _errno;
return;
}
} while (1);
}
#if HAVE_STRUCT_SIGACTION_SA_SIGACTION && defined(SA_SIGINFO)
if (xio_waitingfor == siginfo->si_pid) {
xio_hashappened = true;
}
#else
xio_hashappened = true;
#endif
#if !HAVE_SIGACTION
Signal(sig, xiosigaction_hasread);
#endif /* !HAVE_SIGACTION */
Debug("xiosigaction_hasread() ->");
diag_in_handler = 0;
errno = _errno;
return;
}
/* waits for incoming packet, checks its source address and port. Depending
on fork option, it may fork a subprocess.
@ -1267,42 +1179,7 @@ int _xioopen_dgram_recvfrom(struct single *xfd, int xioflags,
}
if (dofork) {
#if HAVE_SIGACTION
{
struct sigaction act;
memset(&act, 0, sizeof(struct sigaction));
act.sa_flags = SA_NOCLDSTOP/*|SA_RESTART*/
#ifdef SA_SIGINFO /* not on Linux 2.0(.33) */
|SA_SIGINFO
#endif
#ifdef SA_NOMASK
|SA_NOMASK
#endif
;
#if HAVE_STRUCT_SIGACTION_SA_SIGACTION && defined(SA_SIGINFO)
act.sa_sigaction = xiosigaction_hasread;
#else /* Linux 2.0(.33) does not have sigaction.sa_sigaction */
act.sa_handler = xiosigaction_hasread;
#endif
sigfillset(&act.sa_mask);
if (Sigaction(SIGUSR1, &act, NULL) < 0) {
/*! Linux man does not explicitely say that errno is defined */
Warn1("sigaction(SIGUSR1, {&xiosigaction_subaddr_ok}, NULL): %s", strerror(errno));
}
if (Sigaction(SIGCHLD, &act, NULL) < 0) {
/*! Linux man does not explicitely say that errno is defined */
Warn1("sigaction(SIGCHLD, {&xiosigaction_subaddr_ok}, NULL): %s", strerror(errno));
}
}
#else /* !HAVE_SIGACTION */
/*!!!*/
if (Signal(SIGUSR1, xiosigaction_hasread) == SIG_ERR) {
Warn1("signal(SIGUSR1, xiosigaction_hasread): %s", strerror(errno));
}
if (Signal(SIGCHLD, xiosigaction_hasread) == SIG_ERR) {
Warn1("signal(SIGCHLD, xiosigaction_hasread): %s", strerror(errno));
}
#endif /* !HAVE_SIGACTION */
xiosetchilddied();
}
while (true) { /* but we only loop if fork option is set */
@ -1314,6 +1191,7 @@ int _xioopen_dgram_recvfrom(struct single *xfd, int xioflags,
socklen_t palen = sizeof(_peername); /* peer address size */
char ctrlbuff[1024]; /* ancillary messages */
struct msghdr msgh = {0};
int trigger[2]; /* for socketpair that indicates consumption of packet */
int rc;
socket_init(pf, pa);
@ -1397,29 +1275,22 @@ int _xioopen_dgram_recvfrom(struct single *xfd, int xioflags,
xfd->salen = palen;
if (dofork) {
sigset_t oldset, mask_sigchldusr1;
/* we must prevent that the current packet triggers another fork;
therefore we wait for a signal from the recent child: USR1
indicates that is has consumed the last packet; CHLD means it has
terminated */
/* block SIGCHLD and SIGUSR1 until parent is ready to react */
Sigprocmask(SIG_BLOCK, NULL, &mask_sigchldusr1);
sigaddset(&mask_sigchldusr1, SIGCHLD);
sigaddset(&mask_sigchldusr1, SIGUSR1);
Sigprocmask(SIG_SETMASK, &mask_sigchldusr1, &oldset);
Info("Generating socketpair that triggers parent when packet has been consumed");
if (Socketpair(PF_UNIX, SOCK_STREAM, 0, trigger) < 0) {
Error1("socketpair(PF_UNIX, SOCK_STREAM, 0, ...): %s", strerror(errno));
}
if ((pid = xio_fork(false, level)) < 0) {
Close(trigger[0]);
Close(trigger[1]);
Close(xfd->fd);
Sigprocmask(SIG_SETMASK, &oldset, NULL);
return STAT_RETRYLATER;
}
if (pid == 0) { /* child */
/* no reason to block SIGCHLD in child process */
Sigprocmask(SIG_SETMASK, &oldset, NULL);
xfd->ppid = Getppid(); /* send parent a signal when packet has
been consumed */
Close(trigger[0]);
xfd->triggerfd = trigger[1];
Fcntl_l(xfd->triggerfd, F_SETFD, FD_CLOEXEC);
#if WITH_RETRY
/* !? */
@ -1437,29 +1308,14 @@ int _xioopen_dgram_recvfrom(struct single *xfd, int xioflags,
break;
}
xio_waitingfor = pid;
/* Parent */
Close(trigger[1]);
do {
#if HAVE_PSELECT
{
struct timespec timeout = { LONG_MAX, 0 };
Pselect(0, NULL, NULL, NULL, &timeout, &oldset);
Sigprocmask(SIG_SETMASK, &oldset, NULL);
}
#else /* ! HAVE_PSELECT */
/* now we are ready to handle signals */
Sigprocmask(SIG_SETMASK, &oldset, NULL);
Sleep(1); /* any signal speeds up return */
#endif /* ! HAVE_PSELECT */
} while (!xio_hashappened) ;
xio_hashappened = false;
{
char buf[1];
while (Read(trigger[0], buf, 1) < 0 && errno == EINTR) ;
}
if (xio_childstatus != 0) {
char buff[512];
Recv(xfd->fd, buff, sizeof(buff), 0);
xio_childstatus = 0;
Info("drop data because of child exit failed");
}
Info("continue listening");
} else {
break;