[PATCH 09/21] Cygwin: FIFO: make opening a writer more robust

Ken Brown kbrown@cornell.edu
Thu May 7 20:21:12 GMT 2020


- Make read_ready a manual-reset event.

- Signal read_ready in open instead of in the listen_client_thread.

- Don't reset read_ready when the listen_client thread terminates;
  instead do it in close().

- Rearrange open and change its error handling.

- Add a wait_open_pipe method that waits for a pipe instance to be
  available and then calls open_pipe.  Use it when opening a writer if
  we can't connect immediately.  This can happen if the system is
  heavily loaded and/or if many writers are trying to open
  simultaneously.
---
 winsup/cygwin/fhandler.h       |   1 +
 winsup/cygwin/fhandler_fifo.cc | 267 +++++++++++++++++++++------------
 2 files changed, 168 insertions(+), 100 deletions(-)

diff --git a/winsup/cygwin/fhandler.h b/winsup/cygwin/fhandler.h
index 3bc04cf13..2516c93b4 100644
--- a/winsup/cygwin/fhandler.h
+++ b/winsup/cygwin/fhandler.h
@@ -1323,6 +1323,7 @@ class fhandler_fifo: public fhandler_base
   static NTSTATUS npfs_handle (HANDLE &);
   HANDLE create_pipe_instance (bool);
   NTSTATUS open_pipe (HANDLE&);
+  NTSTATUS wait_open_pipe (HANDLE&);
   int add_client_handler ();
   void delete_client_handler (int);
   bool listen_client ();
diff --git a/winsup/cygwin/fhandler_fifo.cc b/winsup/cygwin/fhandler_fifo.cc
index 21faf4ec2..5c3df5497 100644
--- a/winsup/cygwin/fhandler_fifo.cc
+++ b/winsup/cygwin/fhandler_fifo.cc
@@ -222,7 +222,64 @@ fhandler_fifo::open_pipe (HANDLE& ph)
 			      openflags & O_CLOEXEC ? 0 : OBJ_INHERIT,
 			      npfsh, NULL);
   sharing = FILE_SHARE_READ | FILE_SHARE_WRITE;
-  status = NtOpenFile (&ph, access, &attr, &io, sharing, 0);
+  return NtOpenFile (&ph, access, &attr, &io, sharing, 0);
+}
+
+/* Wait up to 100ms for a pipe instance to be available, then connect. */
+NTSTATUS
+fhandler_fifo::wait_open_pipe (HANDLE& ph)
+{
+  HANDLE npfsh;
+  HANDLE evt;
+  NTSTATUS status;
+  IO_STATUS_BLOCK io;
+  ULONG pwbuf_size;
+  PFILE_PIPE_WAIT_FOR_BUFFER pwbuf;
+  LONGLONG stamp;
+  LONGLONG orig_timeout = -100 * NS100PERSEC / MSPERSEC;   /* 100ms */
+
+  status = npfs_handle (npfsh);
+  if (!NT_SUCCESS (status))
+    return status;
+  if (!(evt = create_event ()))
+    api_fatal ("Can't create event, %E");
+  pwbuf_size
+    = offsetof (FILE_PIPE_WAIT_FOR_BUFFER, Name) + get_pipe_name ()->Length;
+  pwbuf = (PFILE_PIPE_WAIT_FOR_BUFFER) alloca (pwbuf_size);
+  pwbuf->Timeout.QuadPart = orig_timeout;
+  pwbuf->NameLength = get_pipe_name ()->Length;
+  pwbuf->TimeoutSpecified = TRUE;
+  memcpy (pwbuf->Name, get_pipe_name ()->Buffer, get_pipe_name ()->Length);
+  stamp = get_clock (CLOCK_MONOTONIC)->n100secs ();
+  bool retry;
+  do
+    {
+      retry = false;
+      status = NtFsControlFile (npfsh, evt, NULL, NULL, &io, FSCTL_PIPE_WAIT,
+				pwbuf, pwbuf_size, NULL, 0);
+      if (status == STATUS_PENDING)
+	{
+	  if (WaitForSingleObject (evt, INFINITE) == WAIT_OBJECT_0)
+	    status = io.Status;
+	  else
+	    api_fatal ("WFSO failed, %E");
+	}
+      if (NT_SUCCESS (status))
+	status = open_pipe (ph);
+      if (STATUS_PIPE_NO_INSTANCE_AVAILABLE (status))
+	{
+	  /* Another writer has grabbed the pipe instance.  Adjust
+	     the timeout and keep waiting if there's time left. */
+	  pwbuf->Timeout.QuadPart = orig_timeout
+	    + get_clock (CLOCK_MONOTONIC)->n100secs () - stamp;
+	  if (pwbuf->Timeout.QuadPart < 0)
+	    retry = true;
+	  else
+	    status = STATUS_IO_TIMEOUT;
+	}
+    }
+  while (retry);
+  NtClose (evt);
   return status;
 }
 
@@ -294,7 +351,6 @@ void
 fhandler_fifo::record_connection (fifo_client_handler& fc,
 				  fifo_client_connect_state s)
 {
-  SetEvent (write_ready);
   fc.state = s;
   maybe_eof (false);
   ResetEvent (writer_opening);
@@ -327,9 +383,6 @@ fhandler_fifo::listen_client_thread ()
       if (add_client_handler () < 0)
 	api_fatal ("Can't add a client handler, %E");
 
-      /* Allow a writer to open. */
-      SetEvent (read_ready);
-
       /* Listen for a writer to connect to the new client handler. */
       fifo_client_handler& fc = fc_handler[nhandlers - 1];
       NTSTATUS status;
@@ -405,19 +458,13 @@ fhandler_fifo::listen_client_thread ()
 out:
   if (conn_evt)
     NtClose (conn_evt);
-  ResetEvent (read_ready);
   return 0;
 }
 
 int
 fhandler_fifo::open (int flags, mode_t)
 {
-  enum
-  {
-   success,
-   error_errno_set,
-   error_set_errno
-  } res;
+  int saved_errno = 0;
 
   if (flags & O_PATH)
     return open_fs (flags);
@@ -437,8 +484,7 @@ fhandler_fifo::open (int flags, mode_t)
       break;
     default:
       set_errno (EINVAL);
-      res = error_errno_set;
-      goto out;
+      goto err;
     }
 
   debug_only_printf ("reader %d, writer %d, duplexer %d", reader, writer, duplexer);
@@ -454,135 +500,151 @@ fhandler_fifo::open (int flags, mode_t)
 
   char npbuf[MAX_PATH];
   __small_sprintf (npbuf, "r-event.%08x.%016X", get_dev (), get_ino ());
-  if (!(read_ready = CreateEvent (sa_buf, false, false, npbuf)))
+  if (!(read_ready = CreateEvent (sa_buf, true, false, npbuf)))
     {
       debug_printf ("CreateEvent for %s failed, %E", npbuf);
-      res = error_set_errno;
-      goto out;
+      __seterrno ();
+      goto err;
     }
   npbuf[0] = 'w';
   if (!(write_ready = CreateEvent (sa_buf, true, false, npbuf)))
     {
       debug_printf ("CreateEvent for %s failed, %E", npbuf);
-      res = error_set_errno;
-      goto out;
+      __seterrno ();
+      goto err_close_read_ready;
     }
   npbuf[0] = 'o';
   if (!(writer_opening = CreateEvent (sa_buf, true, false, npbuf)))
     {
       debug_printf ("CreateEvent for %s failed, %E", npbuf);
-      res = error_set_errno;
-      goto out;
-    }
-
-  /* If we're a duplexer, create the pipe and the first client handler. */
-  if (duplexer)
-    {
-      HANDLE ph = NULL;
-
-      if (add_client_handler () < 0)
-	{
-	  res = error_errno_set;
-	  goto out;
-	}
-      NTSTATUS status = open_pipe (ph);
-      if (NT_SUCCESS (status))
-	{
-	  record_connection (fc_handler[0]);
-	  set_handle (ph);
-	  set_pipe_non_blocking (ph, flags & O_NONBLOCK);
-	}
-      else
-	{
-	  __seterrno_from_nt_status (status);
-	  res = error_errno_set;
-	  goto out;
-	}
+      __seterrno ();
+      goto err_close_write_ready;
     }
 
-  /* If we're reading, start the listen_client thread (which should
-     signal read_ready), and wait for a writer. */
+  /* If we're reading, signal read_ready and start the listen_client
+     thread. */
   if (reader)
     {
       if (!listen_client ())
 	{
 	  debug_printf ("create of listen_client thread failed");
-	  res = error_errno_set;
-	  goto out;
+	  goto err_close_writer_opening;
 	}
-      else if (!duplexer && !wait (write_ready))
-	{
-	  res = error_errno_set;
-	  goto out;
-	}
-      else
+      SetEvent (read_ready);
+
+      /* If we're a duplexer, we need a handle for writing. */
+      if (duplexer)
 	{
-	  init_fixup_before ();
-	  res = success;
+	  HANDLE ph = NULL;
+	  NTSTATUS status;
+
+	  while (1)
+	    {
+	      status = open_pipe (ph);
+	      if (NT_SUCCESS (status))
+		{
+		  set_handle (ph);
+		  set_pipe_non_blocking (ph, flags & O_NONBLOCK);
+		  break;
+		}
+	      else if (status == STATUS_OBJECT_NAME_NOT_FOUND)
+		{
+		  /* The pipe hasn't been created yet. */
+		  yield ();
+		  continue;
+		}
+	      else
+		{
+		  __seterrno_from_nt_status (status);
+		  goto err_close_reader;
+		}
+	    }
 	}
+      /* Not a duplexer; wait for a writer to connect. */
+      else if (!wait (write_ready))
+	goto err_close_reader;
+      init_fixup_before ();
+      goto success;
     }
 
-  /* If we're writing, wait for read_ready and then connect to the
-     pipe.  This should always succeed quickly if the reader's
-     listen_client thread is running.  Then signal write_ready.  */
+  /* If we're writing, wait for read_ready, connect to the pipe, and
+     signal write_ready.  */
   if (writer)
     {
+      NTSTATUS status;
+
       SetEvent (writer_opening);
+      if (!wait (read_ready))
+	{
+	  ResetEvent (writer_opening);
+	  goto err_close_writer_opening;
+	}
       while (1)
 	{
-	  if (!wait (read_ready))
-	    {
-	      ResetEvent (writer_opening);
-	      res = error_errno_set;
-	      goto out;
-	    }
-	  NTSTATUS status = open_pipe (get_handle ());
+	  status = open_pipe (get_handle ());
 	  if (NT_SUCCESS (status))
+	    goto writer_success;
+	  else if (status == STATUS_OBJECT_NAME_NOT_FOUND)
 	    {
-	      set_pipe_non_blocking (get_handle (), flags & O_NONBLOCK);
-	      SetEvent (write_ready);
-	      res = success;
-	      goto out;
+	      /* The pipe hasn't been created yet. */
+	      yield ();
+	      continue;
 	    }
 	  else if (STATUS_PIPE_NO_INSTANCE_AVAILABLE (status))
-	    Sleep (1);
+	    break;
 	  else
 	    {
 	      debug_printf ("create of writer failed");
 	      __seterrno_from_nt_status (status);
-	      res = error_errno_set;
 	      ResetEvent (writer_opening);
-	      goto out;
+	      goto err_close_writer_opening;
 	    }
 	}
-    }
-out:
-  if (res == error_set_errno)
-    __seterrno ();
-  if (res != success)
-    {
-      if (read_ready)
-	{
-	  NtClose (read_ready);
-	  read_ready = NULL;
-	}
-      if (write_ready)
-	{
-	  NtClose (write_ready);
-	  write_ready = NULL;
-	}
-      if (writer_opening)
+
+      /* We should get here only if the system is heavily loaded
+	 and/or many writers are trying to connect simultaneously */
+      while (1)
 	{
-	  NtClose (writer_opening);
-	  writer_opening = NULL;
+	  SetEvent (writer_opening);
+	  if (!wait (read_ready))
+	    {
+	      ResetEvent (writer_opening);
+	      goto err_close_writer_opening;
+	    }
+	  status = wait_open_pipe (get_handle ());
+	  if (NT_SUCCESS (status))
+	    goto writer_success;
+	  else if (status == STATUS_IO_TIMEOUT)
+	    continue;
+	  else
+	    {
+	      debug_printf ("create of writer failed");
+	      __seterrno_from_nt_status (status);
+	      ResetEvent (writer_opening);
+	      goto err_close_writer_opening;
+	    }
 	}
-      if (get_handle ())
-	NtClose (get_handle ());
-      if (listen_client_thr)
-	stop_listen_client ();
     }
-  debug_printf ("res %d", res);
-  return res == success;
+writer_success:
+  set_pipe_non_blocking (get_handle (), flags & O_NONBLOCK);
+  SetEvent (write_ready);
+success:
+  return 1;
+err_close_reader:
+  saved_errno = get_errno ();
+  close ();
+  set_errno (saved_errno);
+  return 0;
+err_close_writer_opening:
+  NtClose (writer_opening);
+err_close_write_ready:
+  NtClose (write_ready);
+err_close_read_ready:
+  NtClose (read_ready);
+err:
+  if (get_handle ())
+    NtClose (get_handle ());
+  return 0;
 }
 
 off_t
@@ -938,6 +1000,11 @@ fhandler_fifo::close ()
      handler or another thread. */
   fifo_client_unlock ();
   stop_listen_client ();
+  if (reader)
+    /* FIXME: There could be several readers open because of
+       dup/fork/exec; we should only reset read_ready when the last
+       one closes. */
+    ResetEvent (read_ready);
   if (read_ready)
     NtClose (read_ready);
   if (write_ready)
-- 
2.21.0



More information about the Cygwin-patches mailing list