[libcamera-devel,13/13] gstreamer: Fix race conditions in task pause/resume
diff mbox series

Message ID 20220623232210.18742-14-laurent.pinchart@ideasonboard.com
State Accepted
Headers show
Series
  • gstreamer: Queue multiple requests
Related show

Commit Message

Laurent Pinchart June 23, 2022, 11:22 p.m. UTC
The task run function races with two other threads that want to resume
the task: the requestCompleted() handler and the buffer-notify signal
handler. If the former queues completed requests or the latter queues
back buffers to the pool, and then resume the task, after the task run
handler checks the queues but before it attemps to pause the task, then
the task may be paused without noticing that more work is available.

The most immediate way to fix this is to take the stream_lock in the
requestCompleted() and buffer-notify signal handlers, or cover the whole
task run handler with the GstLibcameraSrcState lock. This could cause
long delays in the requestCompleted() handler, so that's not a good
option.

Instead, add a wakeup flag, preotected by the GstLibcameraSrcState lock,
that allows detection of a lost race, and retry the task run.

Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
---
 src/gstreamer/gstlibcamerasrc.cpp | 82 +++++++++++++++++++++++--------
 1 file changed, 62 insertions(+), 20 deletions(-)

Comments

Nicolas Dufresne June 28, 2022, 1:50 p.m. UTC | #1
Hi Laurent,

this one is difficult to follow, see some idea below.

Le vendredi 24 juin 2022 à 02:22 +0300, Laurent Pinchart a écrit :
> The task run function races with two other threads that want to resume
> the task: the requestCompleted() handler and the buffer-notify signal
> handler. If the former queues completed requests or the latter queues
> back buffers to the pool, and then resume the task, after the task run
> handler checks the queues but before it attemps to pause the task, then
> the task may be paused without noticing that more work is available.
> 
> The most immediate way to fix this is to take the stream_lock in the
> requestCompleted() and buffer-notify signal handlers, or cover the whole
> task run handler with the GstLibcameraSrcState lock. This could cause
> long delays in the requestCompleted() handler, so that's not a good
> option.
> 
> Instead, add a wakeup flag, preotected by the GstLibcameraSrcState lock,
> that allows detection of a lost race, and retry the task run.
> 
> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> ---
>  src/gstreamer/gstlibcamerasrc.cpp | 82 +++++++++++++++++++++++--------
>  1 file changed, 62 insertions(+), 20 deletions(-)
> 
> diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
> index 3feb87254916..59400f17ae85 100644
> --- a/src/gstreamer/gstlibcamerasrc.cpp
> +++ b/src/gstreamer/gstlibcamerasrc.cpp
> @@ -120,6 +120,7 @@ struct GstLibcameraSrcState {
>  		LIBCAMERA_TSA_GUARDED_BY(lock_);
>  	std::queue<std::unique_ptr<RequestWrap>> completedRequests_
>  		LIBCAMERA_TSA_GUARDED_BY(lock_);
> +	bool wakeup_ LIBCAMERA_TSA_GUARDED_BY(lock_);
>  
>  	guint group_id_;
>  
> @@ -237,14 +238,16 @@ GstLibcameraSrcState::requestCompleted(Request *request)
>  	{
>  		MutexLocker locker(lock_);
>  		completedRequests_.push(std::move(wrap));
> -	}
> +		wakeup_ = true;
>  
> -	gst_task_resume(src_->task);
> +		gst_task_resume(src_->task);
> +	}
>  }
>  
>  int GstLibcameraSrcState::processRequest()
>  {
>  	std::unique_ptr<RequestWrap> wrap;
> +	int err = 0;
>  
>  	{
>  		MutexLocker locker(lock_);
> @@ -253,10 +256,13 @@ int GstLibcameraSrcState::processRequest()
>  			wrap = std::move(completedRequests_.front());
>  			completedRequests_.pop();
>  		}
> +
> +		if (completedRequests_.empty())
> +			err = -ENOBUFS;
>  	}
>  
>  	if (!wrap)
> -		return -ENODATA;
> +		return -ENOBUFS;
>  
>  	GstFlowReturn ret = GST_FLOW_OK;
>  	gst_flow_combiner_reset(src_->flow_combiner);
> @@ -296,7 +302,7 @@ int GstLibcameraSrcState::processRequest()
>  		return -EPIPE;
>  	}
>  
> -	return 0;
> +	return err;
>  }
>  
>  static bool
> @@ -360,53 +366,88 @@ gst_libcamera_src_open(GstLibcameraSrc *self)
>  	return true;
>  }
>  
> +static void
> +gst_libcamera_src_task_resume(gpointer user_data)
> +{
> +	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
> +	GstLibcameraSrcState *state = self->state;
> +
> +	MutexLocker locker(state->lock_);
> +	state->wakeup_ = true;
> +	gst_task_resume(self->task);
> +}
> +
>  static void
>  gst_libcamera_src_task_run(gpointer user_data)
>  {
>  	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
>  	GstLibcameraSrcState *state = self->state;
>  
> +	{
> +		MutexLocker locker(state->lock_);
> +		state->wakeup_ = true;
> +	}
> +
> +	bool doPause = true;
> +
>  	/*
>  	 * Create and queue one request. If no buffers are available the
>  	 * function returns -ENOBUFS, which we ignore here as that's not a
>  	 * fatal error.
>  	 */
>  	int ret = state->queueRequest();
> -	if (ret == -ENOMEM) {
> +	switch (ret) {
> +	case 0:
> +		/*
> +		 * The request was successfully queued, there may be enough
> +		 * buffers to create a new one. Don't pause the task to give it
> +		 * another try.
> +		 */
> +		doPause = false;
> +		break;
> +
> +	case -ENOMEM:
>  		GST_ELEMENT_ERROR(self, RESOURCE, NO_SPACE_LEFT,
>  				  ("Failed to allocate request for camera '%s'.",
>  				   state->cam_->id().c_str()),
>  				  ("libcamera::Camera::createRequest() failed"));
>  		gst_task_stop(self->task);
>  		return;
> +
> +	case -ENOBUFS:
> +	default:
> +		break;
>  	}
>  
> -	/* Process one completed request, if available. */
> +	/*
> +	 * Process one completed request, if available, and record if further
> +	 * requests are ready for processing.
> +	 */
>  	ret = state->processRequest();
>  	switch (ret) {
> +	case -ENOBUFS:
> +		doPause = false;
> +		break;
> +
>  	case -EPIPE:
>  		gst_task_stop(self->task);
>  		return;
>  
> -	case -ENODATA:
> -		gst_task_pause(self->task);
> -		return;
> +	case 0:
> +	default:
> +		break;
>  	}
>  
>  	/*
> -	 * Here we need to decide if we want to pause. This needs to
> -	 * happen in lock step with the callback thread which may want
> -	 * to resume the task and might push pending buffers.
> +	 * Here we need to decide if we want to pause. This needs to happen in
> +	 * lock step with the requestCompleted callback and the buffer-notify
> +	 * signal handler that resume the task.
>  	 */
> -	bool do_pause;
> -
> -	{
> +	if (doPause) {
>  		MutexLocker locker(state->lock_);
> -		do_pause = state->completedRequests_.empty();
> +		if (!state->wakeup_)
> +			gst_task_pause(self->task);

This dance is difficult to follow. Perhaps we could start the run function by
"pausing" the task? Then we will resume the task if state->processRequest() ==
0, or concurrently if a request completed. Using a doResume bool instead of
doPause.

>  	}
> -
> -	if (do_pause)
> -		gst_task_pause(self->task);
>  }
>  
>  static void
> @@ -517,7 +558,8 @@ gst_libcamera_src_task_enter(GstTask *task, [[maybe_unused]] GThread *thread,
>  		GstLibcameraPool *pool = gst_libcamera_pool_new(self->allocator,
>  								stream_cfg.stream());
>  		g_signal_connect_swapped(pool, "buffer-notify",
> -					 G_CALLBACK(gst_task_resume), task);
> +					 G_CALLBACK(gst_libcamera_src_task_resume),
> +					 self);
>  
>  		gst_libcamera_pad_set_pool(srcpad, pool);
>  		gst_flow_combiner_add_pad(self->flow_combiner, srcpad);
Laurent Pinchart June 29, 2022, 11:50 p.m. UTC | #2
Hi Nicolas,

On Tue, Jun 28, 2022 at 09:50:41AM -0400, Nicolas Dufresne wrote:
> Hi Laurent,
> 
> this one is difficult to follow, see some idea below.

That matches the experience I had writing it :-)

> Le vendredi 24 juin 2022 à 02:22 +0300, Laurent Pinchart a écrit :
> > The task run function races with two other threads that want to resume
> > the task: the requestCompleted() handler and the buffer-notify signal
> > handler. If the former queues completed requests or the latter queues
> > back buffers to the pool, and then resume the task, after the task run
> > handler checks the queues but before it attemps to pause the task, then
> > the task may be paused without noticing that more work is available.
> > 
> > The most immediate way to fix this is to take the stream_lock in the
> > requestCompleted() and buffer-notify signal handlers, or cover the whole
> > task run handler with the GstLibcameraSrcState lock. This could cause
> > long delays in the requestCompleted() handler, so that's not a good
> > option.
> > 
> > Instead, add a wakeup flag, preotected by the GstLibcameraSrcState lock,
> > that allows detection of a lost race, and retry the task run.
> > 
> > Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> > ---
> >  src/gstreamer/gstlibcamerasrc.cpp | 82 +++++++++++++++++++++++--------
> >  1 file changed, 62 insertions(+), 20 deletions(-)
> > 
> > diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
> > index 3feb87254916..59400f17ae85 100644
> > --- a/src/gstreamer/gstlibcamerasrc.cpp
> > +++ b/src/gstreamer/gstlibcamerasrc.cpp
> > @@ -120,6 +120,7 @@ struct GstLibcameraSrcState {
> >  		LIBCAMERA_TSA_GUARDED_BY(lock_);
> >  	std::queue<std::unique_ptr<RequestWrap>> completedRequests_
> >  		LIBCAMERA_TSA_GUARDED_BY(lock_);
> > +	bool wakeup_ LIBCAMERA_TSA_GUARDED_BY(lock_);
> >  
> >  	guint group_id_;
> >  
> > @@ -237,14 +238,16 @@ GstLibcameraSrcState::requestCompleted(Request *request)
> >  	{
> >  		MutexLocker locker(lock_);
> >  		completedRequests_.push(std::move(wrap));
> > -	}
> > +		wakeup_ = true;
> >  
> > -	gst_task_resume(src_->task);
> > +		gst_task_resume(src_->task);
> > +	}
> >  }
> >  
> >  int GstLibcameraSrcState::processRequest()
> >  {
> >  	std::unique_ptr<RequestWrap> wrap;
> > +	int err = 0;
> >  
> >  	{
> >  		MutexLocker locker(lock_);
> > @@ -253,10 +256,13 @@ int GstLibcameraSrcState::processRequest()
> >  			wrap = std::move(completedRequests_.front());
> >  			completedRequests_.pop();
> >  		}
> > +
> > +		if (completedRequests_.empty())
> > +			err = -ENOBUFS;
> >  	}
> >  
> >  	if (!wrap)
> > -		return -ENODATA;
> > +		return -ENOBUFS;
> >  
> >  	GstFlowReturn ret = GST_FLOW_OK;
> >  	gst_flow_combiner_reset(src_->flow_combiner);
> > @@ -296,7 +302,7 @@ int GstLibcameraSrcState::processRequest()
> >  		return -EPIPE;
> >  	}
> >  
> > -	return 0;
> > +	return err;
> >  }
> >  
> >  static bool
> > @@ -360,53 +366,88 @@ gst_libcamera_src_open(GstLibcameraSrc *self)
> >  	return true;
> >  }
> >  
> > +static void
> > +gst_libcamera_src_task_resume(gpointer user_data)
> > +{
> > +	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
> > +	GstLibcameraSrcState *state = self->state;
> > +
> > +	MutexLocker locker(state->lock_);
> > +	state->wakeup_ = true;
> > +	gst_task_resume(self->task);
> > +}
> > +
> >  static void
> >  gst_libcamera_src_task_run(gpointer user_data)
> >  {
> >  	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
> >  	GstLibcameraSrcState *state = self->state;
> >  
> > +	{
> > +		MutexLocker locker(state->lock_);
> > +		state->wakeup_ = true;

Looks like there's a bug here, wakeup_ needs to be set to false. It's
always true otherwise.

> > +	}
> > +
> > +	bool doPause = true;
> > +
> >  	/*
> >  	 * Create and queue one request. If no buffers are available the
> >  	 * function returns -ENOBUFS, which we ignore here as that's not a
> >  	 * fatal error.
> >  	 */
> >  	int ret = state->queueRequest();
> > -	if (ret == -ENOMEM) {
> > +	switch (ret) {
> > +	case 0:
> > +		/*
> > +		 * The request was successfully queued, there may be enough
> > +		 * buffers to create a new one. Don't pause the task to give it
> > +		 * another try.
> > +		 */
> > +		doPause = false;
> > +		break;
> > +
> > +	case -ENOMEM:
> >  		GST_ELEMENT_ERROR(self, RESOURCE, NO_SPACE_LEFT,
> >  				  ("Failed to allocate request for camera '%s'.",
> >  				   state->cam_->id().c_str()),
> >  				  ("libcamera::Camera::createRequest() failed"));
> >  		gst_task_stop(self->task);
> >  		return;
> > +
> > +	case -ENOBUFS:
> > +	default:
> > +		break;
> >  	}
> >  
> > -	/* Process one completed request, if available. */
> > +	/*
> > +	 * Process one completed request, if available, and record if further
> > +	 * requests are ready for processing.
> > +	 */
> >  	ret = state->processRequest();
> >  	switch (ret) {
> > +	case -ENOBUFS:
> > +		doPause = false;

And here too, this needs to go to case 0.

> > +		break;
> > +
> >  	case -EPIPE:
> >  		gst_task_stop(self->task);
> >  		return;
> >  
> > -	case -ENODATA:
> > -		gst_task_pause(self->task);
> > -		return;
> > +	case 0:
> > +	default:
> > +		break;
> >  	}
> >  
> >  	/*
> > -	 * Here we need to decide if we want to pause. This needs to
> > -	 * happen in lock step with the callback thread which may want
> > -	 * to resume the task and might push pending buffers.
> > +	 * Here we need to decide if we want to pause. This needs to happen in
> > +	 * lock step with the requestCompleted callback and the buffer-notify
> > +	 * signal handler that resume the task.
> >  	 */
> > -	bool do_pause;
> > -
> > -	{
> > +	if (doPause) {
> >  		MutexLocker locker(state->lock_);
> > -		do_pause = state->completedRequests_.empty();
> > +		if (!state->wakeup_)
> > +			gst_task_pause(self->task);
> 
> This dance is difficult to follow. Perhaps we could start the run function by
> "pausing" the task? Then we will resume the task if state->processRequest() ==
> 0, or concurrently if a request completed. Using a doResume bool instead of
> doPause.

I've given it a try, and the result is as follows:

diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
index 6ee315cf5efe..324ea457eb4f 100644
--- a/src/gstreamer/gstlibcamerasrc.cpp
+++ b/src/gstreamer/gstlibcamerasrc.cpp
@@ -411,7 +411,9 @@ gst_libcamera_src_task_run(gpointer user_data)
 		state->wakeup_ = false;
 	}
 
-	bool doPause = true;
+	bool doResume = false;
+
+	gst_task_pause(self->task);
 
 	/*
 	 * Create and queue one request. If no buffers are available the
@@ -426,7 +428,7 @@ gst_libcamera_src_task_run(gpointer user_data)
 		 * buffers to create a new one. Don't pause the task to give it
 		 * another try.
 		 */
-		doPause = false;
+		doResume = true;
 		break;
 
 	case -ENOMEM:
@@ -449,11 +451,8 @@ gst_libcamera_src_task_run(gpointer user_data)
 	ret = state->processRequest();
 	switch (ret) {
 	case 0:
-		/*
-		 * Another completed request is available, don't pause the
-		 * task.
-		 */
-		doPause = false;
+		/* Another completed request is available, resume the task. */
+		doResume = true;
 		break;
 
 	case -EPIPE:
@@ -466,13 +466,13 @@ gst_libcamera_src_task_run(gpointer user_data)
 	}
 
 	/*
-	 * Here we need to decide if we want to pause. This needs to happen in
+	 * Here we need to decide if we want to resume. This needs to happen in
 	 * lock step with the requestCompleted callback and the buffer-notify
 	 * signal handler that resume the task.
 	 */
-	if (doPause) {
+	{
 		MutexLocker locker(state->lock_);
-		if (!state->wakeup_)
-			gst_task_pause(self->task);
+		if (doResume || state->wakeup_)
+			gst_task_resume(self->task);
 	}
 }

I'm not sure that's what you meant, as it doesn't seem much easier to
follow.

With the mechanism from v1 (and the two fixes mentioned above), the task
runs ~80 times / second, is resumed ~55 times / second, and paused ~26
times / second. With the opposite logic, we get ~80 pause/second (that's
expected, as it's paused at every run) and ~107 resume/second. That
seems less efficient to me.

I'll use the original mechanism with the bug fixes in v2, if you meant
something else than the above, you can explain it in a reply to the new
patch.

> >  	}
> > -
> > -	if (do_pause)
> > -		gst_task_pause(self->task);
> >  }
> >  
> >  static void
> > @@ -517,7 +558,8 @@ gst_libcamera_src_task_enter(GstTask *task, [[maybe_unused]] GThread *thread,
> >  		GstLibcameraPool *pool = gst_libcamera_pool_new(self->allocator,
> >  								stream_cfg.stream());
> >  		g_signal_connect_swapped(pool, "buffer-notify",
> > -					 G_CALLBACK(gst_task_resume), task);
> > +					 G_CALLBACK(gst_libcamera_src_task_resume),
> > +					 self);
> >  
> >  		gst_libcamera_pad_set_pool(srcpad, pool);
> >  		gst_flow_combiner_add_pad(self->flow_combiner, srcpad);
Nicolas Dufresne June 30, 2022, 7:32 p.m. UTC | #3
Le jeudi 30 juin 2022 à 02:50 +0300, Laurent Pinchart a écrit :
> > This dance is difficult to follow. Perhaps we could start the run function
> > by
> > "pausing" the task? Then we will resume the task if state->processRequest()
> > ==
> > 0, or concurrently if a request completed. Using a doResume bool instead of
> > doPause.
> 
> I've given it a try, and the result is as follows:
> 
> diff --git a/src/gstreamer/gstlibcamerasrc.cpp
> b/src/gstreamer/gstlibcamerasrc.cpp
> index 6ee315cf5efe..324ea457eb4f 100644
> --- a/src/gstreamer/gstlibcamerasrc.cpp
> +++ b/src/gstreamer/gstlibcamerasrc.cpp
> @@ -411,7 +411,9 @@ gst_libcamera_src_task_run(gpointer user_data)
>  		state->wakeup_ = false;
>  	}
>  
> -	bool doPause = true;
> +	bool doResume = false;
> +
> +	gst_task_pause(self->task);
>  
>  	/*
>  	 * Create and queue one request. If no buffers are available the
> @@ -426,7 +428,7 @@ gst_libcamera_src_task_run(gpointer user_data)
>  		 * buffers to create a new one. Don't pause the task to give
> it
>  		 * another try.
>  		 */
> -		doPause = false;
> +		doResume = true;
>  		break;
>  
>  	case -ENOMEM:
> @@ -449,11 +451,8 @@ gst_libcamera_src_task_run(gpointer user_data)
>  	ret = state->processRequest();
>  	switch (ret) {
>  	case 0:
> -		/*
> -		 * Another completed request is available, don't pause the
> -		 * task.
> -		 */
> -		doPause = false;
> +		/* Another completed request is available, resume the task.
> */
> +		doResume = true;
>  		break;
>  
>  	case -EPIPE:
> @@ -466,13 +466,13 @@ gst_libcamera_src_task_run(gpointer user_data)
>  	}
>  
>  	/*
> -	 * Here we need to decide if we want to pause. This needs to happen
> in
> +	 * Here we need to decide if we want to resume. This needs to happen
> in
>  	 * lock step with the requestCompleted callback and the buffer-notify
>  	 * signal handler that resume the task.
>  	 */
> -	if (doPause) {
> +	{
>  		MutexLocker locker(state->lock_);
> -		if (!state->wakeup_)
> -			gst_task_pause(self->task);
> +		if (doResume || state->wakeup_)
> +			gst_task_resume(self->task);
>  	}
>  }
> 
> I'm not sure that's what you meant, as it doesn't seem much easier to
> follow.
> 
> With the mechanism from v1 (and the two fixes mentioned above), the task
> runs ~80 times / second, is resumed ~55 times / second, and paused ~26
> times / second. With the opposite logic, we get ~80 pause/second (that's
> expected, as it's paused at every run) and ~107 resume/second. That
> seems less efficient to me.
> 
> I'll use the original mechanism with the bug fixes in v2, if you meant
> something else than the above, you can explain it in a reply to the new
> patch.

You don't need state->wakeup_ if you use the suggested resume technique, making
the fix less invasive. The reason is run() function is the only function that
may pause the task. Getting an external caller to resume will just lead to run()
to pause the task again. So you don't have to track what is happening in the
outside world. Other threads just resume the task, and the run() function
assumes it will be paused, and then resume() itself as needed.

Nicolas

Patch
diff mbox series

diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
index 3feb87254916..59400f17ae85 100644
--- a/src/gstreamer/gstlibcamerasrc.cpp
+++ b/src/gstreamer/gstlibcamerasrc.cpp
@@ -120,6 +120,7 @@  struct GstLibcameraSrcState {
 		LIBCAMERA_TSA_GUARDED_BY(lock_);
 	std::queue<std::unique_ptr<RequestWrap>> completedRequests_
 		LIBCAMERA_TSA_GUARDED_BY(lock_);
+	bool wakeup_ LIBCAMERA_TSA_GUARDED_BY(lock_);
 
 	guint group_id_;
 
@@ -237,14 +238,16 @@  GstLibcameraSrcState::requestCompleted(Request *request)
 	{
 		MutexLocker locker(lock_);
 		completedRequests_.push(std::move(wrap));
-	}
+		wakeup_ = true;
 
-	gst_task_resume(src_->task);
+		gst_task_resume(src_->task);
+	}
 }
 
 int GstLibcameraSrcState::processRequest()
 {
 	std::unique_ptr<RequestWrap> wrap;
+	int err = 0;
 
 	{
 		MutexLocker locker(lock_);
@@ -253,10 +256,13 @@  int GstLibcameraSrcState::processRequest()
 			wrap = std::move(completedRequests_.front());
 			completedRequests_.pop();
 		}
+
+		if (completedRequests_.empty())
+			err = -ENOBUFS;
 	}
 
 	if (!wrap)
-		return -ENODATA;
+		return -ENOBUFS;
 
 	GstFlowReturn ret = GST_FLOW_OK;
 	gst_flow_combiner_reset(src_->flow_combiner);
@@ -296,7 +302,7 @@  int GstLibcameraSrcState::processRequest()
 		return -EPIPE;
 	}
 
-	return 0;
+	return err;
 }
 
 static bool
@@ -360,53 +366,88 @@  gst_libcamera_src_open(GstLibcameraSrc *self)
 	return true;
 }
 
+static void
+gst_libcamera_src_task_resume(gpointer user_data)
+{
+	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
+	GstLibcameraSrcState *state = self->state;
+
+	MutexLocker locker(state->lock_);
+	state->wakeup_ = true;
+	gst_task_resume(self->task);
+}
+
 static void
 gst_libcamera_src_task_run(gpointer user_data)
 {
 	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
 	GstLibcameraSrcState *state = self->state;
 
+	{
+		MutexLocker locker(state->lock_);
+		state->wakeup_ = true;
+	}
+
+	bool doPause = true;
+
 	/*
 	 * Create and queue one request. If no buffers are available the
 	 * function returns -ENOBUFS, which we ignore here as that's not a
 	 * fatal error.
 	 */
 	int ret = state->queueRequest();
-	if (ret == -ENOMEM) {
+	switch (ret) {
+	case 0:
+		/*
+		 * The request was successfully queued, there may be enough
+		 * buffers to create a new one. Don't pause the task to give it
+		 * another try.
+		 */
+		doPause = false;
+		break;
+
+	case -ENOMEM:
 		GST_ELEMENT_ERROR(self, RESOURCE, NO_SPACE_LEFT,
 				  ("Failed to allocate request for camera '%s'.",
 				   state->cam_->id().c_str()),
 				  ("libcamera::Camera::createRequest() failed"));
 		gst_task_stop(self->task);
 		return;
+
+	case -ENOBUFS:
+	default:
+		break;
 	}
 
-	/* Process one completed request, if available. */
+	/*
+	 * Process one completed request, if available, and record if further
+	 * requests are ready for processing.
+	 */
 	ret = state->processRequest();
 	switch (ret) {
+	case -ENOBUFS:
+		doPause = false;
+		break;
+
 	case -EPIPE:
 		gst_task_stop(self->task);
 		return;
 
-	case -ENODATA:
-		gst_task_pause(self->task);
-		return;
+	case 0:
+	default:
+		break;
 	}
 
 	/*
-	 * Here we need to decide if we want to pause. This needs to
-	 * happen in lock step with the callback thread which may want
-	 * to resume the task and might push pending buffers.
+	 * Here we need to decide if we want to pause. This needs to happen in
+	 * lock step with the requestCompleted callback and the buffer-notify
+	 * signal handler that resume the task.
 	 */
-	bool do_pause;
-
-	{
+	if (doPause) {
 		MutexLocker locker(state->lock_);
-		do_pause = state->completedRequests_.empty();
+		if (!state->wakeup_)
+			gst_task_pause(self->task);
 	}
-
-	if (do_pause)
-		gst_task_pause(self->task);
 }
 
 static void
@@ -517,7 +558,8 @@  gst_libcamera_src_task_enter(GstTask *task, [[maybe_unused]] GThread *thread,
 		GstLibcameraPool *pool = gst_libcamera_pool_new(self->allocator,
 								stream_cfg.stream());
 		g_signal_connect_swapped(pool, "buffer-notify",
-					 G_CALLBACK(gst_task_resume), task);
+					 G_CALLBACK(gst_libcamera_src_task_resume),
+					 self);
 
 		gst_libcamera_pad_set_pool(srcpad, pool);
 		gst_flow_combiner_add_pad(self->flow_combiner, srcpad);