#include #include #include #include #include #include #include #include int print_error(const int error) { printf("%s\n", strerror(error)); return error; } struct Message { int pid; int index; }; int main() { const char* name = "/tmp/my_pipe"; const int result = mkfifo(name, 0666); if (result) return print_error(errno); const int writers{5}; const int messages{5}; printf("open parent pipe\n"); const int rfd = open(name, O_RDONLY | O_NONBLOCK); const int wfd = open(name, O_WRONLY); if (rfd < 0) return print_error(errno); const int block_alternation[] = {0, O_NONBLOCK}; int pids[writers]; for (auto idx = 0; idx < writers; ++idx) { const auto pid = fork(); if (pid < 0) return print_error(errno); if (pid == 0) { printf("child %d\n", getpid()); for (int index = 0; index < messages; ++index) { const int wfd = open(name, O_WRONLY | block_alternation[index % 2]); if (wfd < 0) return print_error(errno); Message message{getpid(), index}; short attempts{10000}; while (write(wfd, &message, sizeof(message)) == -1 && errno == EAGAIN && attempts > 0) --attempts; if(attempts == 0) printf("child %d write - pipe busy\n", getpid()); close(wfd); } return 0; } pids[idx] = pid; } { printf("parent read\n"); for (int idx = 0; idx < writers * messages; ++idx) { short attempts{10000}; Message message{}; while (::read(rfd, &message, sizeof(message)) == -1 && errno == EAGAIN && attempts > 0) --attempts; if (attempts == 0) printf("parent %d read - pipe empty\n", getpid()); else printf("message %d %d\n", message.index, message.pid); } // wait for children. for (int idx = 0; idx < writers; ++idx) { int status = 0; waitpid(pids[idx], &status, 0); if (WIFEXITED(status)) printf("child exited %d\n", pids[idx]); } close(wfd); close(rfd); } if (unlink(name) < 0) return print_error(errno); return 0; }