[libcamera-devel,v2.1,12/12] gstreamer: Fix race conditions in task pause/resume
diff mbox series

Message ID 20220630203221.12440-1-laurent.pinchart@ideasonboard.com
State Accepted
Headers show
Series
  • Untitled series #3240
Related show

Commit Message

Laurent Pinchart June 30, 2022, 8:32 p.m. UTC
The task run function races with two other threads that want to resume
the task: the requestCompleted() handler and the buffer-notify signal
handler. If the former queues completed requests or the latter queues
back buffers to the pool, and then resume the task, after the task run
handler checks the queues but before it attemps to pause the task, then
the task may be paused without noticing that more work is available.

The most immediate way to fix this is to take the stream_lock in the
requestCompleted() and buffer-notify signal handlers, or cover the whole
task run handler with the GstLibcameraSrcState lock. This could cause
long delays in the requestCompleted() handler, so that's not a good
option.

Instead, add a wakeup flag, preotected by the GstLibcameraSrcState lock,
that allows detection of a lost race, and retry the task run.

Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
---
Changes since v2:

- Invert the pause/resume logic

Changes since v1:

- Fix incorrect wakeup and pause logic
---
 src/gstreamer/gstlibcamerasrc.cpp | 71 ++++++++++++++++++++++---------
 1 file changed, 50 insertions(+), 21 deletions(-)

Comments

Laurent Pinchart June 30, 2022, 8:35 p.m. UTC | #1
On Thu, Jun 30, 2022 at 11:32:21PM +0300, Laurent Pinchart via libcamera-devel wrote:
> The task run function races with two other threads that want to resume
> the task: the requestCompleted() handler and the buffer-notify signal
> handler. If the former queues completed requests or the latter queues
> back buffers to the pool, and then resume the task, after the task run
> handler checks the queues but before it attemps to pause the task, then
> the task may be paused without noticing that more work is available.
> 
> The most immediate way to fix this is to take the stream_lock in the
> requestCompleted() and buffer-notify signal handlers, or cover the whole
> task run handler with the GstLibcameraSrcState lock. This could cause
> long delays in the requestCompleted() handler, so that's not a good
> option.
> 
> Instead, add a wakeup flag, preotected by the GstLibcameraSrcState lock,
> that allows detection of a lost race, and retry the task run.

Of course I forgot to update this. I'll replace this paragraph with

Instead, pause the task unconditionally at the beginning of its run
function, and track while processing buffers and requests if the task
needs to be resumed. It may also get resumed externally by the
buffer-notify signal handler or the request completion handler, which
are guaranteed not to race due to the lock taken by the gst_task_pause()
and gst_task_resume() functions.

> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> ---
> Changes since v2:
> 
> - Invert the pause/resume logic
> 
> Changes since v1:
> 
> - Fix incorrect wakeup and pause logic
> ---
>  src/gstreamer/gstlibcamerasrc.cpp | 71 ++++++++++++++++++++++---------
>  1 file changed, 50 insertions(+), 21 deletions(-)
> 
> diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
> index 9ea59631a9f2..a7a0c4403e28 100644
> --- a/src/gstreamer/gstlibcamerasrc.cpp
> +++ b/src/gstreamer/gstlibcamerasrc.cpp
> @@ -259,6 +259,7 @@ GstLibcameraSrcState::requestCompleted(Request *request)
>  int GstLibcameraSrcState::processRequest()
>  {
>  	std::unique_ptr<RequestWrap> wrap;
> +	int err = 0;
>  
>  	{
>  		MutexLocker locker(lock_);
> @@ -267,10 +268,13 @@ int GstLibcameraSrcState::processRequest()
>  			wrap = std::move(completedRequests_.front());
>  			completedRequests_.pop();
>  		}
> +
> +		if (completedRequests_.empty())
> +			err = -ENOBUFS;
>  	}
>  
>  	if (!wrap)
> -		return -ENODATA;
> +		return -ENOBUFS;
>  
>  	GstFlowReturn ret = GST_FLOW_OK;
>  	gst_flow_combiner_reset(src_->flow_combiner);
> @@ -310,7 +314,7 @@ int GstLibcameraSrcState::processRequest()
>  		return -EPIPE;
>  	}
>  
> -	return 0;
> +	return err;
>  }
>  
>  static bool
> @@ -380,47 +384,72 @@ gst_libcamera_src_task_run(gpointer user_data)
>  	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
>  	GstLibcameraSrcState *state = self->state;
>  
> +	/*
> +	 * Start by pausing the task. The task may also get resumed by the
> +	 * buffer-notify signal when new buffers are queued back to the pool,
> +	 * or by the request completion handler when a new request has
> +	 * completed.  Both will resume the task after adding the buffers or
> +	 * request to their respective lists, which are checked below to decide
> +	 * if the task needs to be resumed for another iteration. This is thus
> +	 * guaranteed to be race-free, the lock taken by gst_task_pause() and
> +	 * gst_task_resume() serves as a memory barrier.
> +	 */
> +	gst_task_pause(self->task);
> +
> +	bool doResume = false;
> +
>  	/*
>  	 * Create and queue one request. If no buffers are available the
>  	 * function returns -ENOBUFS, which we ignore here as that's not a
>  	 * fatal error.
>  	 */
>  	int ret = state->queueRequest();
> -	if (ret == -ENOMEM) {
> +	switch (ret) {
> +	case 0:
> +		/*
> +		 * The request was successfully queued, there may be enough
> +		 * buffers to create a new one. Don't pause the task to give it
> +		 * another try.
> +		 */
> +		doResume = true;
> +		break;
> +
> +	case -ENOMEM:
>  		GST_ELEMENT_ERROR(self, RESOURCE, NO_SPACE_LEFT,
>  				  ("Failed to allocate request for camera '%s'.",
>  				   state->cam_->id().c_str()),
>  				  ("libcamera::Camera::createRequest() failed"));
>  		gst_task_stop(self->task);
>  		return;
> +
> +	case -ENOBUFS:
> +	default:
> +		break;
>  	}
>  
> -	/* Process one completed request, if available. */
> +	/*
> +	 * Process one completed request, if available, and record if further
> +	 * requests are ready for processing.
> +	 */
>  	ret = state->processRequest();
>  	switch (ret) {
> +	case 0:
> +		/* Another completed request is available, resume the task. */
> +		doResume = true;
> +		break;
> +
>  	case -EPIPE:
>  		gst_task_stop(self->task);
>  		return;
>  
> -	case -ENODATA:
> -		gst_task_pause(self->task);
> -		return;
> +	case -ENOBUFS:
> +	default:
> +		break;
>  	}
>  
> -	/*
> -	 * Here we need to decide if we want to pause. This needs to
> -	 * happen in lock step with the callback thread which may want
> -	 * to resume the task and might push pending buffers.
> -	 */
> -	bool do_pause;
> -
> -	{
> -		MutexLocker locker(state->lock_);
> -		do_pause = state->completedRequests_.empty();
> -	}
> -
> -	if (do_pause)
> -		gst_task_pause(self->task);
> +	/* Resume the task for another iteration if needed. */
> +	if (doResume)
> +		gst_task_resume(self->task);
>  }
>  
>  static void
Nicolas Dufresne July 4, 2022, 7:29 p.m. UTC | #2
Le jeudi 30 juin 2022 à 23:35 +0300, Laurent Pinchart via libcamera-devel a
écrit :
> On Thu, Jun 30, 2022 at 11:32:21PM +0300, Laurent Pinchart via libcamera-devel wrote:
> > The task run function races with two other threads that want to resume
> > the task: the requestCompleted() handler and the buffer-notify signal
> > handler. If the former queues completed requests or the latter queues
> > back buffers to the pool, and then resume the task, after the task run
> > handler checks the queues but before it attemps to pause the task, then
> > the task may be paused without noticing that more work is available.
> > 
> > The most immediate way to fix this is to take the stream_lock in the
> > requestCompleted() and buffer-notify signal handlers, or cover the whole
> > task run handler with the GstLibcameraSrcState lock. This could cause
> > long delays in the requestCompleted() handler, so that's not a good
> > option.
> > 
> > Instead, add a wakeup flag, preotected by the GstLibcameraSrcState lock,
> > that allows detection of a lost race, and retry the task run.
> 
> Of course I forgot to update this. I'll replace this paragraph with
> 
> Instead, pause the task unconditionally at the beginning of its run
> function, and track while processing buffers and requests if the task
> needs to be resumed. It may also get resumed externally by the
> buffer-notify signal handler or the request completion handler, which
> are guaranteed not to race due to the lock taken by the gst_task_pause()
> and gst_task_resume() functions.
> 
> > Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>

Thanks Laurent for your hard work on.

Reviewed-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>

> > ---
> > Changes since v2:
> > 
> > - Invert the pause/resume logic
> > 
> > Changes since v1:
> > 
> > - Fix incorrect wakeup and pause logic
> > ---
> >  src/gstreamer/gstlibcamerasrc.cpp | 71 ++++++++++++++++++++++---------
> >  1 file changed, 50 insertions(+), 21 deletions(-)
> > 
> > diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
> > index 9ea59631a9f2..a7a0c4403e28 100644
> > --- a/src/gstreamer/gstlibcamerasrc.cpp
> > +++ b/src/gstreamer/gstlibcamerasrc.cpp
> > @@ -259,6 +259,7 @@ GstLibcameraSrcState::requestCompleted(Request *request)
> >  int GstLibcameraSrcState::processRequest()
> >  {
> >  	std::unique_ptr<RequestWrap> wrap;
> > +	int err = 0;
> >  
> >  	{
> >  		MutexLocker locker(lock_);
> > @@ -267,10 +268,13 @@ int GstLibcameraSrcState::processRequest()
> >  			wrap = std::move(completedRequests_.front());
> >  			completedRequests_.pop();
> >  		}
> > +
> > +		if (completedRequests_.empty())
> > +			err = -ENOBUFS;
> >  	}
> >  
> >  	if (!wrap)
> > -		return -ENODATA;
> > +		return -ENOBUFS;
> >  
> >  	GstFlowReturn ret = GST_FLOW_OK;
> >  	gst_flow_combiner_reset(src_->flow_combiner);
> > @@ -310,7 +314,7 @@ int GstLibcameraSrcState::processRequest()
> >  		return -EPIPE;
> >  	}
> >  
> > -	return 0;
> > +	return err;
> >  }
> >  
> >  static bool
> > @@ -380,47 +384,72 @@ gst_libcamera_src_task_run(gpointer user_data)
> >  	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
> >  	GstLibcameraSrcState *state = self->state;
> >  
> > +	/*
> > +	 * Start by pausing the task. The task may also get resumed by the
> > +	 * buffer-notify signal when new buffers are queued back to the pool,
> > +	 * or by the request completion handler when a new request has
> > +	 * completed.  Both will resume the task after adding the buffers or
> > +	 * request to their respective lists, which are checked below to decide
> > +	 * if the task needs to be resumed for another iteration. This is thus
> > +	 * guaranteed to be race-free, the lock taken by gst_task_pause() and
> > +	 * gst_task_resume() serves as a memory barrier.
> > +	 */
> > +	gst_task_pause(self->task);
> > +
> > +	bool doResume = false;
> > +
> >  	/*
> >  	 * Create and queue one request. If no buffers are available the
> >  	 * function returns -ENOBUFS, which we ignore here as that's not a
> >  	 * fatal error.
> >  	 */
> >  	int ret = state->queueRequest();
> > -	if (ret == -ENOMEM) {
> > +	switch (ret) {
> > +	case 0:
> > +		/*
> > +		 * The request was successfully queued, there may be enough
> > +		 * buffers to create a new one. Don't pause the task to give it
> > +		 * another try.
> > +		 */
> > +		doResume = true;
> > +		break;
> > +
> > +	case -ENOMEM:
> >  		GST_ELEMENT_ERROR(self, RESOURCE, NO_SPACE_LEFT,
> >  				  ("Failed to allocate request for camera '%s'.",
> >  				   state->cam_->id().c_str()),
> >  				  ("libcamera::Camera::createRequest() failed"));
> >  		gst_task_stop(self->task);
> >  		return;
> > +
> > +	case -ENOBUFS:
> > +	default:
> > +		break;
> >  	}
> >  
> > -	/* Process one completed request, if available. */
> > +	/*
> > +	 * Process one completed request, if available, and record if further
> > +	 * requests are ready for processing.
> > +	 */
> >  	ret = state->processRequest();
> >  	switch (ret) {
> > +	case 0:
> > +		/* Another completed request is available, resume the task. */
> > +		doResume = true;
> > +		break;
> > +
> >  	case -EPIPE:
> >  		gst_task_stop(self->task);
> >  		return;
> >  
> > -	case -ENODATA:
> > -		gst_task_pause(self->task);
> > -		return;
> > +	case -ENOBUFS:
> > +	default:
> > +		break;
> >  	}
> >  
> > -	/*
> > -	 * Here we need to decide if we want to pause. This needs to
> > -	 * happen in lock step with the callback thread which may want
> > -	 * to resume the task and might push pending buffers.
> > -	 */
> > -	bool do_pause;
> > -
> > -	{
> > -		MutexLocker locker(state->lock_);
> > -		do_pause = state->completedRequests_.empty();
> > -	}
> > -
> > -	if (do_pause)
> > -		gst_task_pause(self->task);
> > +	/* Resume the task for another iteration if needed. */
> > +	if (doResume)
> > +		gst_task_resume(self->task);
> >  }
> >  
> >  static void
>

Patch
diff mbox series

diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
index 9ea59631a9f2..a7a0c4403e28 100644
--- a/src/gstreamer/gstlibcamerasrc.cpp
+++ b/src/gstreamer/gstlibcamerasrc.cpp
@@ -259,6 +259,7 @@  GstLibcameraSrcState::requestCompleted(Request *request)
 int GstLibcameraSrcState::processRequest()
 {
 	std::unique_ptr<RequestWrap> wrap;
+	int err = 0;
 
 	{
 		MutexLocker locker(lock_);
@@ -267,10 +268,13 @@  int GstLibcameraSrcState::processRequest()
 			wrap = std::move(completedRequests_.front());
 			completedRequests_.pop();
 		}
+
+		if (completedRequests_.empty())
+			err = -ENOBUFS;
 	}
 
 	if (!wrap)
-		return -ENODATA;
+		return -ENOBUFS;
 
 	GstFlowReturn ret = GST_FLOW_OK;
 	gst_flow_combiner_reset(src_->flow_combiner);
@@ -310,7 +314,7 @@  int GstLibcameraSrcState::processRequest()
 		return -EPIPE;
 	}
 
-	return 0;
+	return err;
 }
 
 static bool
@@ -380,47 +384,72 @@  gst_libcamera_src_task_run(gpointer user_data)
 	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
 	GstLibcameraSrcState *state = self->state;
 
+	/*
+	 * Start by pausing the task. The task may also get resumed by the
+	 * buffer-notify signal when new buffers are queued back to the pool,
+	 * or by the request completion handler when a new request has
+	 * completed.  Both will resume the task after adding the buffers or
+	 * request to their respective lists, which are checked below to decide
+	 * if the task needs to be resumed for another iteration. This is thus
+	 * guaranteed to be race-free, the lock taken by gst_task_pause() and
+	 * gst_task_resume() serves as a memory barrier.
+	 */
+	gst_task_pause(self->task);
+
+	bool doResume = false;
+
 	/*
 	 * Create and queue one request. If no buffers are available the
 	 * function returns -ENOBUFS, which we ignore here as that's not a
 	 * fatal error.
 	 */
 	int ret = state->queueRequest();
-	if (ret == -ENOMEM) {
+	switch (ret) {
+	case 0:
+		/*
+		 * The request was successfully queued, there may be enough
+		 * buffers to create a new one. Don't pause the task to give it
+		 * another try.
+		 */
+		doResume = true;
+		break;
+
+	case -ENOMEM:
 		GST_ELEMENT_ERROR(self, RESOURCE, NO_SPACE_LEFT,
 				  ("Failed to allocate request for camera '%s'.",
 				   state->cam_->id().c_str()),
 				  ("libcamera::Camera::createRequest() failed"));
 		gst_task_stop(self->task);
 		return;
+
+	case -ENOBUFS:
+	default:
+		break;
 	}
 
-	/* Process one completed request, if available. */
+	/*
+	 * Process one completed request, if available, and record if further
+	 * requests are ready for processing.
+	 */
 	ret = state->processRequest();
 	switch (ret) {
+	case 0:
+		/* Another completed request is available, resume the task. */
+		doResume = true;
+		break;
+
 	case -EPIPE:
 		gst_task_stop(self->task);
 		return;
 
-	case -ENODATA:
-		gst_task_pause(self->task);
-		return;
+	case -ENOBUFS:
+	default:
+		break;
 	}
 
-	/*
-	 * Here we need to decide if we want to pause. This needs to
-	 * happen in lock step with the callback thread which may want
-	 * to resume the task and might push pending buffers.
-	 */
-	bool do_pause;
-
-	{
-		MutexLocker locker(state->lock_);
-		do_pause = state->completedRequests_.empty();
-	}
-
-	if (do_pause)
-		gst_task_pause(self->task);
+	/* Resume the task for another iteration if needed. */
+	if (doResume)
+		gst_task_resume(self->task);
 }
 
 static void