[v1] gstreamer: pool: Replace GstAtomicQueue with deque and mutex
diff mbox series

Message ID 20240605194120.152960-1-nicolas@ndufresne.ca
State Accepted
Commit 04f1f2033724f038ab1152e8135292770a33f97a
Headers show
Series
  • [v1] gstreamer: pool: Replace GstAtomicQueue with deque and mutex
Related show

Commit Message

Nicolas Dufresne June 5, 2024, 7:41 p.m. UTC
From: Nicolas Dufresne <nicolas.dufresne@collabora.com>

The GstAtomicQueue only supports 2 threads, one pushing, and one popping. We
pop and push on error cases and we may have multiple threads downstream
returning buffer (using tee), which breaks this assumption.

On top of which, the release function, that notify when the queue goes from
empty to not-empty rely on a racy empty check. The downstream thread that
does this check if effectively concurrent with our thread calling acquire().

Fix this by replacing the GstAtomicQueue with a std::deque, and protect access
to that using the object lock.

Bug: https://bugs.libcamera.org/show_bug.cgi?id=201
Signed-off-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>
---
 src/gstreamer/gstlibcamerapool.cpp | 40 +++++++++++++++++++++++-------
 1 file changed, 31 insertions(+), 9 deletions(-)

Comments

Kieran Bingham July 23, 2024, 3:59 p.m. UTC | #1
Quoting Nicolas Dufresne (2024-06-05 20:41:20)
> From: Nicolas Dufresne <nicolas.dufresne@collabora.com>
> 
> The GstAtomicQueue only supports 2 threads, one pushing, and one popping. We
> pop and push on error cases and we may have multiple threads downstream
> returning buffer (using tee), which breaks this assumption.
> 
> On top of which, the release function, that notify when the queue goes from

s/notify/notifies/

> empty to not-empty rely on a racy empty check. The downstream thread that

s/rely on a/relies on an/

> does this check if effectively concurrent with our thread calling acquire().

s/if/is/


> Fix this by replacing the GstAtomicQueue with a std::deque, and protect access
> to that using the object lock.
> 
> Bug: https://bugs.libcamera.org/show_bug.cgi?id=201
> Signed-off-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>

This all looks reasonable to me - and following the bugzilla I see the
comment : https://bugs.libcamera.org/show_bug.cgi?id=201#c10 which says
it improved things, but that the issue might still persist (which could
be a separate race/issue still?)


Nicolas, what are your thoughts - do you think this patch is ready for
merge already?

For the code itself:


Reviewed-by: Kieran Bingham <kieran.bingham@ideasonboard.com>

> ---
>  src/gstreamer/gstlibcamerapool.cpp | 40 +++++++++++++++++++++++-------
>  1 file changed, 31 insertions(+), 9 deletions(-)
> 
> diff --git a/src/gstreamer/gstlibcamerapool.cpp b/src/gstreamer/gstlibcamerapool.cpp
> index 9661c67a..0b1a5689 100644
> --- a/src/gstreamer/gstlibcamerapool.cpp
> +++ b/src/gstreamer/gstlibcamerapool.cpp
> @@ -8,6 +8,7 @@
>  
>  #include "gstlibcamerapool.h"
>  
> +#include <deque>
>  #include <libcamera/stream.h>
>  
>  #include "gstlibcamera-utils.h"
> @@ -24,24 +25,41 @@ static guint signals[N_SIGNALS];
>  struct _GstLibcameraPool {
>         GstBufferPool parent;
>  
> -       GstAtomicQueue *queue;
> +       std::deque<GstBuffer *> *queue;
>         GstLibcameraAllocator *allocator;
>         Stream *stream;
>  };
>  
>  G_DEFINE_TYPE(GstLibcameraPool, gst_libcamera_pool, GST_TYPE_BUFFER_POOL)
>  
> +static GstBuffer *
> +gst_libcamera_pool_pop_buffer(GstLibcameraPool *self)
> +{
> +       GLibLocker lock(GST_OBJECT(self));
> +       GstBuffer *buf;
> +
> +       if (self->queue->empty())
> +               return nullptr;
> +
> +       buf = self->queue->front();
> +       self->queue->pop_front();
> +
> +       return buf;
> +}
> +
>  static GstFlowReturn
>  gst_libcamera_pool_acquire_buffer(GstBufferPool *pool, GstBuffer **buffer,
>                                   [[maybe_unused]] GstBufferPoolAcquireParams *params)
>  {
>         GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool);
> -       GstBuffer *buf = GST_BUFFER(gst_atomic_queue_pop(self->queue));
> +       GstBuffer *buf = gst_libcamera_pool_pop_buffer(self);
> +
>         if (!buf)
>                 return GST_FLOW_ERROR;
>  
>         if (!gst_libcamera_allocator_prepare_buffer(self->allocator, self->stream, buf)) {
> -               gst_atomic_queue_push(self->queue, buf);
> +               GLibLocker lock(GST_OBJECT(self));
> +               self->queue->push_back(buf);
>                 return GST_FLOW_ERROR;
>         }
>  
> @@ -64,9 +82,13 @@ static void
>  gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer)
>  {
>         GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool);
> -       bool do_notify = gst_atomic_queue_length(self->queue) == 0;
> +       bool do_notify;
>  
> -       gst_atomic_queue_push(self->queue, buffer);
> +       {
> +               GLibLocker lock(GST_OBJECT(self));
> +               do_notify = self->queue->empty();
> +               self->queue->push_back(buffer);
> +       }
>  
>         if (do_notify)
>                 g_signal_emit(self, signals[SIGNAL_BUFFER_NOTIFY], 0);
> @@ -75,7 +97,7 @@ gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer)
>  static void
>  gst_libcamera_pool_init(GstLibcameraPool *self)
>  {
> -       self->queue = gst_atomic_queue_new(4);
> +       self->queue = new std::deque<GstBuffer *>();
>  }
>  
>  static void
> @@ -84,10 +106,10 @@ gst_libcamera_pool_finalize(GObject *object)
>         GstLibcameraPool *self = GST_LIBCAMERA_POOL(object);
>         GstBuffer *buf;
>  
> -       while ((buf = GST_BUFFER(gst_atomic_queue_pop(self->queue))))
> +       while ((buf = gst_libcamera_pool_pop_buffer(self)))
>                 gst_buffer_unref(buf);
>  
> -       gst_atomic_queue_unref(self->queue);
> +       delete self->queue;
>         g_object_unref(self->allocator);
>  
>         G_OBJECT_CLASS(gst_libcamera_pool_parent_class)->finalize(object);
> @@ -122,7 +144,7 @@ gst_libcamera_pool_new(GstLibcameraAllocator *allocator, Stream *stream)
>         gsize pool_size = gst_libcamera_allocator_get_pool_size(allocator, stream);
>         for (gsize i = 0; i < pool_size; i++) {
>                 GstBuffer *buffer = gst_buffer_new();
> -               gst_atomic_queue_push(pool->queue, buffer);
> +               pool->queue->push_back(buffer);
>         }
>  
>         return pool;
> -- 
> 2.45.1
>
Nicolas Dufresne July 23, 2024, 4:08 p.m. UTC | #2
Hi,

Le mardi 23 juillet 2024 à 16:59 +0100, Kieran Bingham a écrit :
> Quoting Nicolas Dufresne (2024-06-05 20:41:20)
> > From: Nicolas Dufresne <nicolas.dufresne@collabora.com>
> > 
> > The GstAtomicQueue only supports 2 threads, one pushing, and one popping. We
> > pop and push on error cases and we may have multiple threads downstream
> > returning buffer (using tee), which breaks this assumption.
> > 
> > On top of which, the release function, that notify when the queue goes from
> 
> s/notify/notifies/
> 
> > empty to not-empty rely on a racy empty check. The downstream thread that
> 
> s/rely on a/relies on an/
> 
> > does this check if effectively concurrent with our thread calling acquire().
> 
> s/if/is/
> 
> 
> > Fix this by replacing the GstAtomicQueue with a std::deque, and protect access
> > to that using the object lock.
> > 
> > Bug: https://bugs.libcamera.org/show_bug.cgi?id=201
> > Signed-off-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>
> 
> This all looks reasonable to me - and following the bugzilla I see the
> comment : https://bugs.libcamera.org/show_bug.cgi?id=201#c10 which says
> it improved things, but that the issue might still persist (which could
> be a separate race/issue still?)
> 
> 
> Nicolas, what are your thoughts - do you think this patch is ready for
> merge already?
> 
> For the code itself:
> 
> 
> Reviewed-by: Kieran Bingham <kieran.bingham@ideasonboard.com>

With the suggested typo, yes, I'd picked it up as it aligns the code with
similar fixes that happen upstream. I still have to find time to look into the
remaining, but shouldn't block this one, GstAtomicQueue the way we use it now
must go away anyway.

Nicolas

> 
> > ---
> >  src/gstreamer/gstlibcamerapool.cpp | 40 +++++++++++++++++++++++-------
> >  1 file changed, 31 insertions(+), 9 deletions(-)
> > 
> > diff --git a/src/gstreamer/gstlibcamerapool.cpp b/src/gstreamer/gstlibcamerapool.cpp
> > index 9661c67a..0b1a5689 100644
> > --- a/src/gstreamer/gstlibcamerapool.cpp
> > +++ b/src/gstreamer/gstlibcamerapool.cpp
> > @@ -8,6 +8,7 @@
> >  
> >  #include "gstlibcamerapool.h"
> >  
> > +#include <deque>
> >  #include <libcamera/stream.h>
> >  
> >  #include "gstlibcamera-utils.h"
> > @@ -24,24 +25,41 @@ static guint signals[N_SIGNALS];
> >  struct _GstLibcameraPool {
> >         GstBufferPool parent;
> >  
> > -       GstAtomicQueue *queue;
> > +       std::deque<GstBuffer *> *queue;
> >         GstLibcameraAllocator *allocator;
> >         Stream *stream;
> >  };
> >  
> >  G_DEFINE_TYPE(GstLibcameraPool, gst_libcamera_pool, GST_TYPE_BUFFER_POOL)
> >  
> > +static GstBuffer *
> > +gst_libcamera_pool_pop_buffer(GstLibcameraPool *self)
> > +{
> > +       GLibLocker lock(GST_OBJECT(self));
> > +       GstBuffer *buf;
> > +
> > +       if (self->queue->empty())
> > +               return nullptr;
> > +
> > +       buf = self->queue->front();
> > +       self->queue->pop_front();
> > +
> > +       return buf;
> > +}
> > +
> >  static GstFlowReturn
> >  gst_libcamera_pool_acquire_buffer(GstBufferPool *pool, GstBuffer **buffer,
> >                                   [[maybe_unused]] GstBufferPoolAcquireParams *params)
> >  {
> >         GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool);
> > -       GstBuffer *buf = GST_BUFFER(gst_atomic_queue_pop(self->queue));
> > +       GstBuffer *buf = gst_libcamera_pool_pop_buffer(self);
> > +
> >         if (!buf)
> >                 return GST_FLOW_ERROR;
> >  
> >         if (!gst_libcamera_allocator_prepare_buffer(self->allocator, self->stream, buf)) {
> > -               gst_atomic_queue_push(self->queue, buf);
> > +               GLibLocker lock(GST_OBJECT(self));
> > +               self->queue->push_back(buf);
> >                 return GST_FLOW_ERROR;
> >         }
> >  
> > @@ -64,9 +82,13 @@ static void
> >  gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer)
> >  {
> >         GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool);
> > -       bool do_notify = gst_atomic_queue_length(self->queue) == 0;
> > +       bool do_notify;
> >  
> > -       gst_atomic_queue_push(self->queue, buffer);
> > +       {
> > +               GLibLocker lock(GST_OBJECT(self));
> > +               do_notify = self->queue->empty();
> > +               self->queue->push_back(buffer);
> > +       }
> >  
> >         if (do_notify)
> >                 g_signal_emit(self, signals[SIGNAL_BUFFER_NOTIFY], 0);
> > @@ -75,7 +97,7 @@ gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer)
> >  static void
> >  gst_libcamera_pool_init(GstLibcameraPool *self)
> >  {
> > -       self->queue = gst_atomic_queue_new(4);
> > +       self->queue = new std::deque<GstBuffer *>();
> >  }
> >  
> >  static void
> > @@ -84,10 +106,10 @@ gst_libcamera_pool_finalize(GObject *object)
> >         GstLibcameraPool *self = GST_LIBCAMERA_POOL(object);
> >         GstBuffer *buf;
> >  
> > -       while ((buf = GST_BUFFER(gst_atomic_queue_pop(self->queue))))
> > +       while ((buf = gst_libcamera_pool_pop_buffer(self)))
> >                 gst_buffer_unref(buf);
> >  
> > -       gst_atomic_queue_unref(self->queue);
> > +       delete self->queue;
> >         g_object_unref(self->allocator);
> >  
> >         G_OBJECT_CLASS(gst_libcamera_pool_parent_class)->finalize(object);
> > @@ -122,7 +144,7 @@ gst_libcamera_pool_new(GstLibcameraAllocator *allocator, Stream *stream)
> >         gsize pool_size = gst_libcamera_allocator_get_pool_size(allocator, stream);
> >         for (gsize i = 0; i < pool_size; i++) {
> >                 GstBuffer *buffer = gst_buffer_new();
> > -               gst_atomic_queue_push(pool->queue, buffer);
> > +               pool->queue->push_back(buffer);
> >         }
> >  
> >         return pool;
> > -- 
> > 2.45.1
> >
Jacopo Mondi July 25, 2024, 8:55 a.m. UTC | #3
Hi Nicolas

On Wed, Jun 05, 2024 at 03:41:20PM GMT, Nicolas Dufresne wrote:
> From: Nicolas Dufresne <nicolas.dufresne@collabora.com>
>
> The GstAtomicQueue only supports 2 threads, one pushing, and one popping. We
> pop and push on error cases and we may have multiple threads downstream
> returning buffer (using tee), which breaks this assumption.
>
> On top of which, the release function, that notify when the queue goes from
> empty to not-empty rely on a racy empty check. The downstream thread that
> does this check if effectively concurrent with our thread calling acquire().
>
> Fix this by replacing the GstAtomicQueue with a std::deque, and protect access
> to that using the object lock.
>
> Bug: https://bugs.libcamera.org/show_bug.cgi?id=201
> Signed-off-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>

Looks sane to me even if the gst code base is not familiar to me
Acked-by: Jacopo Mondi <jacopo.mondi@ideasonboard.com>

Thanks
  j
> ---
>  src/gstreamer/gstlibcamerapool.cpp | 40 +++++++++++++++++++++++-------
>  1 file changed, 31 insertions(+), 9 deletions(-)
>
> diff --git a/src/gstreamer/gstlibcamerapool.cpp b/src/gstreamer/gstlibcamerapool.cpp
> index 9661c67a..0b1a5689 100644
> --- a/src/gstreamer/gstlibcamerapool.cpp
> +++ b/src/gstreamer/gstlibcamerapool.cpp
> @@ -8,6 +8,7 @@
>
>  #include "gstlibcamerapool.h"
>
> +#include <deque>
>  #include <libcamera/stream.h>
>
>  #include "gstlibcamera-utils.h"
> @@ -24,24 +25,41 @@ static guint signals[N_SIGNALS];
>  struct _GstLibcameraPool {
>  	GstBufferPool parent;
>
> -	GstAtomicQueue *queue;
> +	std::deque<GstBuffer *> *queue;
>  	GstLibcameraAllocator *allocator;
>  	Stream *stream;
>  };
>
>  G_DEFINE_TYPE(GstLibcameraPool, gst_libcamera_pool, GST_TYPE_BUFFER_POOL)
>
> +static GstBuffer *
> +gst_libcamera_pool_pop_buffer(GstLibcameraPool *self)
> +{
> +	GLibLocker lock(GST_OBJECT(self));
> +	GstBuffer *buf;
> +
> +	if (self->queue->empty())
> +		return nullptr;
> +
> +	buf = self->queue->front();
> +	self->queue->pop_front();
> +
> +	return buf;
> +}
> +
>  static GstFlowReturn
>  gst_libcamera_pool_acquire_buffer(GstBufferPool *pool, GstBuffer **buffer,
>  				  [[maybe_unused]] GstBufferPoolAcquireParams *params)
>  {
>  	GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool);
> -	GstBuffer *buf = GST_BUFFER(gst_atomic_queue_pop(self->queue));
> +	GstBuffer *buf = gst_libcamera_pool_pop_buffer(self);
> +
>  	if (!buf)
>  		return GST_FLOW_ERROR;
>
>  	if (!gst_libcamera_allocator_prepare_buffer(self->allocator, self->stream, buf)) {
> -		gst_atomic_queue_push(self->queue, buf);
> +		GLibLocker lock(GST_OBJECT(self));
> +		self->queue->push_back(buf);
>  		return GST_FLOW_ERROR;
>  	}
>
> @@ -64,9 +82,13 @@ static void
>  gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer)
>  {
>  	GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool);
> -	bool do_notify = gst_atomic_queue_length(self->queue) == 0;
> +	bool do_notify;
>
> -	gst_atomic_queue_push(self->queue, buffer);
> +	{
> +		GLibLocker lock(GST_OBJECT(self));
> +		do_notify = self->queue->empty();
> +		self->queue->push_back(buffer);
> +	}
>
>  	if (do_notify)
>  		g_signal_emit(self, signals[SIGNAL_BUFFER_NOTIFY], 0);
> @@ -75,7 +97,7 @@ gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer)
>  static void
>  gst_libcamera_pool_init(GstLibcameraPool *self)
>  {
> -	self->queue = gst_atomic_queue_new(4);
> +	self->queue = new std::deque<GstBuffer *>();
>  }
>
>  static void
> @@ -84,10 +106,10 @@ gst_libcamera_pool_finalize(GObject *object)
>  	GstLibcameraPool *self = GST_LIBCAMERA_POOL(object);
>  	GstBuffer *buf;
>
> -	while ((buf = GST_BUFFER(gst_atomic_queue_pop(self->queue))))
> +	while ((buf = gst_libcamera_pool_pop_buffer(self)))
>  		gst_buffer_unref(buf);
>
> -	gst_atomic_queue_unref(self->queue);
> +	delete self->queue;
>  	g_object_unref(self->allocator);
>
>  	G_OBJECT_CLASS(gst_libcamera_pool_parent_class)->finalize(object);
> @@ -122,7 +144,7 @@ gst_libcamera_pool_new(GstLibcameraAllocator *allocator, Stream *stream)
>  	gsize pool_size = gst_libcamera_allocator_get_pool_size(allocator, stream);
>  	for (gsize i = 0; i < pool_size; i++) {
>  		GstBuffer *buffer = gst_buffer_new();
> -		gst_atomic_queue_push(pool->queue, buffer);
> +		pool->queue->push_back(buffer);
>  	}
>
>  	return pool;
> --
> 2.45.1
>

Patch
diff mbox series

diff --git a/src/gstreamer/gstlibcamerapool.cpp b/src/gstreamer/gstlibcamerapool.cpp
index 9661c67a..0b1a5689 100644
--- a/src/gstreamer/gstlibcamerapool.cpp
+++ b/src/gstreamer/gstlibcamerapool.cpp
@@ -8,6 +8,7 @@ 
 
 #include "gstlibcamerapool.h"
 
+#include <deque>
 #include <libcamera/stream.h>
 
 #include "gstlibcamera-utils.h"
@@ -24,24 +25,41 @@  static guint signals[N_SIGNALS];
 struct _GstLibcameraPool {
 	GstBufferPool parent;
 
-	GstAtomicQueue *queue;
+	std::deque<GstBuffer *> *queue;
 	GstLibcameraAllocator *allocator;
 	Stream *stream;
 };
 
 G_DEFINE_TYPE(GstLibcameraPool, gst_libcamera_pool, GST_TYPE_BUFFER_POOL)
 
+static GstBuffer *
+gst_libcamera_pool_pop_buffer(GstLibcameraPool *self)
+{
+	GLibLocker lock(GST_OBJECT(self));
+	GstBuffer *buf;
+
+	if (self->queue->empty())
+		return nullptr;
+
+	buf = self->queue->front();
+	self->queue->pop_front();
+
+	return buf;
+}
+
 static GstFlowReturn
 gst_libcamera_pool_acquire_buffer(GstBufferPool *pool, GstBuffer **buffer,
 				  [[maybe_unused]] GstBufferPoolAcquireParams *params)
 {
 	GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool);
-	GstBuffer *buf = GST_BUFFER(gst_atomic_queue_pop(self->queue));
+	GstBuffer *buf = gst_libcamera_pool_pop_buffer(self);
+
 	if (!buf)
 		return GST_FLOW_ERROR;
 
 	if (!gst_libcamera_allocator_prepare_buffer(self->allocator, self->stream, buf)) {
-		gst_atomic_queue_push(self->queue, buf);
+		GLibLocker lock(GST_OBJECT(self));
+		self->queue->push_back(buf);
 		return GST_FLOW_ERROR;
 	}
 
@@ -64,9 +82,13 @@  static void
 gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer)
 {
 	GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool);
-	bool do_notify = gst_atomic_queue_length(self->queue) == 0;
+	bool do_notify;
 
-	gst_atomic_queue_push(self->queue, buffer);
+	{
+		GLibLocker lock(GST_OBJECT(self));
+		do_notify = self->queue->empty();
+		self->queue->push_back(buffer);
+	}
 
 	if (do_notify)
 		g_signal_emit(self, signals[SIGNAL_BUFFER_NOTIFY], 0);
@@ -75,7 +97,7 @@  gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer)
 static void
 gst_libcamera_pool_init(GstLibcameraPool *self)
 {
-	self->queue = gst_atomic_queue_new(4);
+	self->queue = new std::deque<GstBuffer *>();
 }
 
 static void
@@ -84,10 +106,10 @@  gst_libcamera_pool_finalize(GObject *object)
 	GstLibcameraPool *self = GST_LIBCAMERA_POOL(object);
 	GstBuffer *buf;
 
-	while ((buf = GST_BUFFER(gst_atomic_queue_pop(self->queue))))
+	while ((buf = gst_libcamera_pool_pop_buffer(self)))
 		gst_buffer_unref(buf);
 
-	gst_atomic_queue_unref(self->queue);
+	delete self->queue;
 	g_object_unref(self->allocator);
 
 	G_OBJECT_CLASS(gst_libcamera_pool_parent_class)->finalize(object);
@@ -122,7 +144,7 @@  gst_libcamera_pool_new(GstLibcameraAllocator *allocator, Stream *stream)
 	gsize pool_size = gst_libcamera_allocator_get_pool_size(allocator, stream);
 	for (gsize i = 0; i < pool_size; i++) {
 		GstBuffer *buffer = gst_buffer_new();
-		gst_atomic_queue_push(pool->queue, buffer);
+		pool->queue->push_back(buffer);
 	}
 
 	return pool;