[v1] gstreamer: pool: Replace GstAtomicQueue with deque and mutex
diff mbox series

Message ID 20240605194120.152960-1-nicolas@ndufresne.ca
State New
Headers show
Series
  • [v1] gstreamer: pool: Replace GstAtomicQueue with deque and mutex
Related show

Commit Message

Nicolas Dufresne June 5, 2024, 7:41 p.m. UTC
From: Nicolas Dufresne <nicolas.dufresne@collabora.com>

The GstAtomicQueue only supports 2 threads, one pushing, and one popping. We
pop and push on error cases and we may have multiple threads downstream
returning buffer (using tee), which breaks this assumption.

On top of which, the release function, that notify when the queue goes from
empty to not-empty rely on a racy empty check. The downstream thread that
does this check if effectively concurrent with our thread calling acquire().

Fix this by replacing the GstAtomicQueue with a std::deque, and protect access
to that using the object lock.

Bug: https://bugs.libcamera.org/show_bug.cgi?id=201
Signed-off-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>
---
 src/gstreamer/gstlibcamerapool.cpp | 40 +++++++++++++++++++++++-------
 1 file changed, 31 insertions(+), 9 deletions(-)

Patch
diff mbox series

diff --git a/src/gstreamer/gstlibcamerapool.cpp b/src/gstreamer/gstlibcamerapool.cpp
index 9661c67a..0b1a5689 100644
--- a/src/gstreamer/gstlibcamerapool.cpp
+++ b/src/gstreamer/gstlibcamerapool.cpp
@@ -8,6 +8,7 @@ 
 
 #include "gstlibcamerapool.h"
 
+#include <deque>
 #include <libcamera/stream.h>
 
 #include "gstlibcamera-utils.h"
@@ -24,24 +25,41 @@  static guint signals[N_SIGNALS];
 struct _GstLibcameraPool {
 	GstBufferPool parent;
 
-	GstAtomicQueue *queue;
+	std::deque<GstBuffer *> *queue;
 	GstLibcameraAllocator *allocator;
 	Stream *stream;
 };
 
 G_DEFINE_TYPE(GstLibcameraPool, gst_libcamera_pool, GST_TYPE_BUFFER_POOL)
 
+static GstBuffer *
+gst_libcamera_pool_pop_buffer(GstLibcameraPool *self)
+{
+	GLibLocker lock(GST_OBJECT(self));
+	GstBuffer *buf;
+
+	if (self->queue->empty())
+		return nullptr;
+
+	buf = self->queue->front();
+	self->queue->pop_front();
+
+	return buf;
+}
+
 static GstFlowReturn
 gst_libcamera_pool_acquire_buffer(GstBufferPool *pool, GstBuffer **buffer,
 				  [[maybe_unused]] GstBufferPoolAcquireParams *params)
 {
 	GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool);
-	GstBuffer *buf = GST_BUFFER(gst_atomic_queue_pop(self->queue));
+	GstBuffer *buf = gst_libcamera_pool_pop_buffer(self);
+
 	if (!buf)
 		return GST_FLOW_ERROR;
 
 	if (!gst_libcamera_allocator_prepare_buffer(self->allocator, self->stream, buf)) {
-		gst_atomic_queue_push(self->queue, buf);
+		GLibLocker lock(GST_OBJECT(self));
+		self->queue->push_back(buf);
 		return GST_FLOW_ERROR;
 	}
 
@@ -64,9 +82,13 @@  static void
 gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer)
 {
 	GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool);
-	bool do_notify = gst_atomic_queue_length(self->queue) == 0;
+	bool do_notify;
 
-	gst_atomic_queue_push(self->queue, buffer);
+	{
+		GLibLocker lock(GST_OBJECT(self));
+		do_notify = self->queue->empty();
+		self->queue->push_back(buffer);
+	}
 
 	if (do_notify)
 		g_signal_emit(self, signals[SIGNAL_BUFFER_NOTIFY], 0);
@@ -75,7 +97,7 @@  gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer)
 static void
 gst_libcamera_pool_init(GstLibcameraPool *self)
 {
-	self->queue = gst_atomic_queue_new(4);
+	self->queue = new std::deque<GstBuffer *>();
 }
 
 static void
@@ -84,10 +106,10 @@  gst_libcamera_pool_finalize(GObject *object)
 	GstLibcameraPool *self = GST_LIBCAMERA_POOL(object);
 	GstBuffer *buf;
 
-	while ((buf = GST_BUFFER(gst_atomic_queue_pop(self->queue))))
+	while ((buf = gst_libcamera_pool_pop_buffer(self)))
 		gst_buffer_unref(buf);
 
-	gst_atomic_queue_unref(self->queue);
+	delete self->queue;
 	g_object_unref(self->allocator);
 
 	G_OBJECT_CLASS(gst_libcamera_pool_parent_class)->finalize(object);
@@ -122,7 +144,7 @@  gst_libcamera_pool_new(GstLibcameraAllocator *allocator, Stream *stream)
 	gsize pool_size = gst_libcamera_allocator_get_pool_size(allocator, stream);
 	for (gsize i = 0; i < pool_size; i++) {
 		GstBuffer *buffer = gst_buffer_new();
-		gst_atomic_queue_push(pool->queue, buffer);
+		pool->queue->push_back(buffer);
 	}
 
 	return pool;