Message ID | 20240605194120.152960-1-nicolas@ndufresne.ca |
---|---|
State | Accepted |
Commit | 04f1f2033724f038ab1152e8135292770a33f97a |
Headers | show |
Series |
|
Related | show |
Quoting Nicolas Dufresne (2024-06-05 20:41:20) > From: Nicolas Dufresne <nicolas.dufresne@collabora.com> > > The GstAtomicQueue only supports 2 threads, one pushing, and one popping. We > pop and push on error cases and we may have multiple threads downstream > returning buffer (using tee), which breaks this assumption. > > On top of which, the release function, that notify when the queue goes from s/notify/notifies/ > empty to not-empty rely on a racy empty check. The downstream thread that s/rely on a/relies on an/ > does this check if effectively concurrent with our thread calling acquire(). s/if/is/ > Fix this by replacing the GstAtomicQueue with a std::deque, and protect access > to that using the object lock. > > Bug: https://bugs.libcamera.org/show_bug.cgi?id=201 > Signed-off-by: Nicolas Dufresne <nicolas.dufresne@collabora.com> This all looks reasonable to me - and following the bugzilla I see the comment : https://bugs.libcamera.org/show_bug.cgi?id=201#c10 which says it improved things, but that the issue might still persist (which could be a separate race/issue still?) Nicolas, what are your thoughts - do you think this patch is ready for merge already? For the code itself: Reviewed-by: Kieran Bingham <kieran.bingham@ideasonboard.com> > --- > src/gstreamer/gstlibcamerapool.cpp | 40 +++++++++++++++++++++++------- > 1 file changed, 31 insertions(+), 9 deletions(-) > > diff --git a/src/gstreamer/gstlibcamerapool.cpp b/src/gstreamer/gstlibcamerapool.cpp > index 9661c67a..0b1a5689 100644 > --- a/src/gstreamer/gstlibcamerapool.cpp > +++ b/src/gstreamer/gstlibcamerapool.cpp > @@ -8,6 +8,7 @@ > > #include "gstlibcamerapool.h" > > +#include <deque> > #include <libcamera/stream.h> > > #include "gstlibcamera-utils.h" > @@ -24,24 +25,41 @@ static guint signals[N_SIGNALS]; > struct _GstLibcameraPool { > GstBufferPool parent; > > - GstAtomicQueue *queue; > + std::deque<GstBuffer *> *queue; > GstLibcameraAllocator *allocator; > Stream *stream; > }; > > G_DEFINE_TYPE(GstLibcameraPool, gst_libcamera_pool, GST_TYPE_BUFFER_POOL) > > +static GstBuffer * > +gst_libcamera_pool_pop_buffer(GstLibcameraPool *self) > +{ > + GLibLocker lock(GST_OBJECT(self)); > + GstBuffer *buf; > + > + if (self->queue->empty()) > + return nullptr; > + > + buf = self->queue->front(); > + self->queue->pop_front(); > + > + return buf; > +} > + > static GstFlowReturn > gst_libcamera_pool_acquire_buffer(GstBufferPool *pool, GstBuffer **buffer, > [[maybe_unused]] GstBufferPoolAcquireParams *params) > { > GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool); > - GstBuffer *buf = GST_BUFFER(gst_atomic_queue_pop(self->queue)); > + GstBuffer *buf = gst_libcamera_pool_pop_buffer(self); > + > if (!buf) > return GST_FLOW_ERROR; > > if (!gst_libcamera_allocator_prepare_buffer(self->allocator, self->stream, buf)) { > - gst_atomic_queue_push(self->queue, buf); > + GLibLocker lock(GST_OBJECT(self)); > + self->queue->push_back(buf); > return GST_FLOW_ERROR; > } > > @@ -64,9 +82,13 @@ static void > gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer) > { > GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool); > - bool do_notify = gst_atomic_queue_length(self->queue) == 0; > + bool do_notify; > > - gst_atomic_queue_push(self->queue, buffer); > + { > + GLibLocker lock(GST_OBJECT(self)); > + do_notify = self->queue->empty(); > + self->queue->push_back(buffer); > + } > > if (do_notify) > g_signal_emit(self, signals[SIGNAL_BUFFER_NOTIFY], 0); > @@ -75,7 +97,7 @@ gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer) > static void > gst_libcamera_pool_init(GstLibcameraPool *self) > { > - self->queue = gst_atomic_queue_new(4); > + self->queue = new std::deque<GstBuffer *>(); > } > > static void > @@ -84,10 +106,10 @@ gst_libcamera_pool_finalize(GObject *object) > GstLibcameraPool *self = GST_LIBCAMERA_POOL(object); > GstBuffer *buf; > > - while ((buf = GST_BUFFER(gst_atomic_queue_pop(self->queue)))) > + while ((buf = gst_libcamera_pool_pop_buffer(self))) > gst_buffer_unref(buf); > > - gst_atomic_queue_unref(self->queue); > + delete self->queue; > g_object_unref(self->allocator); > > G_OBJECT_CLASS(gst_libcamera_pool_parent_class)->finalize(object); > @@ -122,7 +144,7 @@ gst_libcamera_pool_new(GstLibcameraAllocator *allocator, Stream *stream) > gsize pool_size = gst_libcamera_allocator_get_pool_size(allocator, stream); > for (gsize i = 0; i < pool_size; i++) { > GstBuffer *buffer = gst_buffer_new(); > - gst_atomic_queue_push(pool->queue, buffer); > + pool->queue->push_back(buffer); > } > > return pool; > -- > 2.45.1 >
Hi, Le mardi 23 juillet 2024 à 16:59 +0100, Kieran Bingham a écrit : > Quoting Nicolas Dufresne (2024-06-05 20:41:20) > > From: Nicolas Dufresne <nicolas.dufresne@collabora.com> > > > > The GstAtomicQueue only supports 2 threads, one pushing, and one popping. We > > pop and push on error cases and we may have multiple threads downstream > > returning buffer (using tee), which breaks this assumption. > > > > On top of which, the release function, that notify when the queue goes from > > s/notify/notifies/ > > > empty to not-empty rely on a racy empty check. The downstream thread that > > s/rely on a/relies on an/ > > > does this check if effectively concurrent with our thread calling acquire(). > > s/if/is/ > > > > Fix this by replacing the GstAtomicQueue with a std::deque, and protect access > > to that using the object lock. > > > > Bug: https://bugs.libcamera.org/show_bug.cgi?id=201 > > Signed-off-by: Nicolas Dufresne <nicolas.dufresne@collabora.com> > > This all looks reasonable to me - and following the bugzilla I see the > comment : https://bugs.libcamera.org/show_bug.cgi?id=201#c10 which says > it improved things, but that the issue might still persist (which could > be a separate race/issue still?) > > > Nicolas, what are your thoughts - do you think this patch is ready for > merge already? > > For the code itself: > > > Reviewed-by: Kieran Bingham <kieran.bingham@ideasonboard.com> With the suggested typo, yes, I'd picked it up as it aligns the code with similar fixes that happen upstream. I still have to find time to look into the remaining, but shouldn't block this one, GstAtomicQueue the way we use it now must go away anyway. Nicolas > > > --- > > src/gstreamer/gstlibcamerapool.cpp | 40 +++++++++++++++++++++++------- > > 1 file changed, 31 insertions(+), 9 deletions(-) > > > > diff --git a/src/gstreamer/gstlibcamerapool.cpp b/src/gstreamer/gstlibcamerapool.cpp > > index 9661c67a..0b1a5689 100644 > > --- a/src/gstreamer/gstlibcamerapool.cpp > > +++ b/src/gstreamer/gstlibcamerapool.cpp > > @@ -8,6 +8,7 @@ > > > > #include "gstlibcamerapool.h" > > > > +#include <deque> > > #include <libcamera/stream.h> > > > > #include "gstlibcamera-utils.h" > > @@ -24,24 +25,41 @@ static guint signals[N_SIGNALS]; > > struct _GstLibcameraPool { > > GstBufferPool parent; > > > > - GstAtomicQueue *queue; > > + std::deque<GstBuffer *> *queue; > > GstLibcameraAllocator *allocator; > > Stream *stream; > > }; > > > > G_DEFINE_TYPE(GstLibcameraPool, gst_libcamera_pool, GST_TYPE_BUFFER_POOL) > > > > +static GstBuffer * > > +gst_libcamera_pool_pop_buffer(GstLibcameraPool *self) > > +{ > > + GLibLocker lock(GST_OBJECT(self)); > > + GstBuffer *buf; > > + > > + if (self->queue->empty()) > > + return nullptr; > > + > > + buf = self->queue->front(); > > + self->queue->pop_front(); > > + > > + return buf; > > +} > > + > > static GstFlowReturn > > gst_libcamera_pool_acquire_buffer(GstBufferPool *pool, GstBuffer **buffer, > > [[maybe_unused]] GstBufferPoolAcquireParams *params) > > { > > GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool); > > - GstBuffer *buf = GST_BUFFER(gst_atomic_queue_pop(self->queue)); > > + GstBuffer *buf = gst_libcamera_pool_pop_buffer(self); > > + > > if (!buf) > > return GST_FLOW_ERROR; > > > > if (!gst_libcamera_allocator_prepare_buffer(self->allocator, self->stream, buf)) { > > - gst_atomic_queue_push(self->queue, buf); > > + GLibLocker lock(GST_OBJECT(self)); > > + self->queue->push_back(buf); > > return GST_FLOW_ERROR; > > } > > > > @@ -64,9 +82,13 @@ static void > > gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer) > > { > > GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool); > > - bool do_notify = gst_atomic_queue_length(self->queue) == 0; > > + bool do_notify; > > > > - gst_atomic_queue_push(self->queue, buffer); > > + { > > + GLibLocker lock(GST_OBJECT(self)); > > + do_notify = self->queue->empty(); > > + self->queue->push_back(buffer); > > + } > > > > if (do_notify) > > g_signal_emit(self, signals[SIGNAL_BUFFER_NOTIFY], 0); > > @@ -75,7 +97,7 @@ gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer) > > static void > > gst_libcamera_pool_init(GstLibcameraPool *self) > > { > > - self->queue = gst_atomic_queue_new(4); > > + self->queue = new std::deque<GstBuffer *>(); > > } > > > > static void > > @@ -84,10 +106,10 @@ gst_libcamera_pool_finalize(GObject *object) > > GstLibcameraPool *self = GST_LIBCAMERA_POOL(object); > > GstBuffer *buf; > > > > - while ((buf = GST_BUFFER(gst_atomic_queue_pop(self->queue)))) > > + while ((buf = gst_libcamera_pool_pop_buffer(self))) > > gst_buffer_unref(buf); > > > > - gst_atomic_queue_unref(self->queue); > > + delete self->queue; > > g_object_unref(self->allocator); > > > > G_OBJECT_CLASS(gst_libcamera_pool_parent_class)->finalize(object); > > @@ -122,7 +144,7 @@ gst_libcamera_pool_new(GstLibcameraAllocator *allocator, Stream *stream) > > gsize pool_size = gst_libcamera_allocator_get_pool_size(allocator, stream); > > for (gsize i = 0; i < pool_size; i++) { > > GstBuffer *buffer = gst_buffer_new(); > > - gst_atomic_queue_push(pool->queue, buffer); > > + pool->queue->push_back(buffer); > > } > > > > return pool; > > -- > > 2.45.1 > >
Hi Nicolas On Wed, Jun 05, 2024 at 03:41:20PM GMT, Nicolas Dufresne wrote: > From: Nicolas Dufresne <nicolas.dufresne@collabora.com> > > The GstAtomicQueue only supports 2 threads, one pushing, and one popping. We > pop and push on error cases and we may have multiple threads downstream > returning buffer (using tee), which breaks this assumption. > > On top of which, the release function, that notify when the queue goes from > empty to not-empty rely on a racy empty check. The downstream thread that > does this check if effectively concurrent with our thread calling acquire(). > > Fix this by replacing the GstAtomicQueue with a std::deque, and protect access > to that using the object lock. > > Bug: https://bugs.libcamera.org/show_bug.cgi?id=201 > Signed-off-by: Nicolas Dufresne <nicolas.dufresne@collabora.com> Looks sane to me even if the gst code base is not familiar to me Acked-by: Jacopo Mondi <jacopo.mondi@ideasonboard.com> Thanks j > --- > src/gstreamer/gstlibcamerapool.cpp | 40 +++++++++++++++++++++++------- > 1 file changed, 31 insertions(+), 9 deletions(-) > > diff --git a/src/gstreamer/gstlibcamerapool.cpp b/src/gstreamer/gstlibcamerapool.cpp > index 9661c67a..0b1a5689 100644 > --- a/src/gstreamer/gstlibcamerapool.cpp > +++ b/src/gstreamer/gstlibcamerapool.cpp > @@ -8,6 +8,7 @@ > > #include "gstlibcamerapool.h" > > +#include <deque> > #include <libcamera/stream.h> > > #include "gstlibcamera-utils.h" > @@ -24,24 +25,41 @@ static guint signals[N_SIGNALS]; > struct _GstLibcameraPool { > GstBufferPool parent; > > - GstAtomicQueue *queue; > + std::deque<GstBuffer *> *queue; > GstLibcameraAllocator *allocator; > Stream *stream; > }; > > G_DEFINE_TYPE(GstLibcameraPool, gst_libcamera_pool, GST_TYPE_BUFFER_POOL) > > +static GstBuffer * > +gst_libcamera_pool_pop_buffer(GstLibcameraPool *self) > +{ > + GLibLocker lock(GST_OBJECT(self)); > + GstBuffer *buf; > + > + if (self->queue->empty()) > + return nullptr; > + > + buf = self->queue->front(); > + self->queue->pop_front(); > + > + return buf; > +} > + > static GstFlowReturn > gst_libcamera_pool_acquire_buffer(GstBufferPool *pool, GstBuffer **buffer, > [[maybe_unused]] GstBufferPoolAcquireParams *params) > { > GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool); > - GstBuffer *buf = GST_BUFFER(gst_atomic_queue_pop(self->queue)); > + GstBuffer *buf = gst_libcamera_pool_pop_buffer(self); > + > if (!buf) > return GST_FLOW_ERROR; > > if (!gst_libcamera_allocator_prepare_buffer(self->allocator, self->stream, buf)) { > - gst_atomic_queue_push(self->queue, buf); > + GLibLocker lock(GST_OBJECT(self)); > + self->queue->push_back(buf); > return GST_FLOW_ERROR; > } > > @@ -64,9 +82,13 @@ static void > gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer) > { > GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool); > - bool do_notify = gst_atomic_queue_length(self->queue) == 0; > + bool do_notify; > > - gst_atomic_queue_push(self->queue, buffer); > + { > + GLibLocker lock(GST_OBJECT(self)); > + do_notify = self->queue->empty(); > + self->queue->push_back(buffer); > + } > > if (do_notify) > g_signal_emit(self, signals[SIGNAL_BUFFER_NOTIFY], 0); > @@ -75,7 +97,7 @@ gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer) > static void > gst_libcamera_pool_init(GstLibcameraPool *self) > { > - self->queue = gst_atomic_queue_new(4); > + self->queue = new std::deque<GstBuffer *>(); > } > > static void > @@ -84,10 +106,10 @@ gst_libcamera_pool_finalize(GObject *object) > GstLibcameraPool *self = GST_LIBCAMERA_POOL(object); > GstBuffer *buf; > > - while ((buf = GST_BUFFER(gst_atomic_queue_pop(self->queue)))) > + while ((buf = gst_libcamera_pool_pop_buffer(self))) > gst_buffer_unref(buf); > > - gst_atomic_queue_unref(self->queue); > + delete self->queue; > g_object_unref(self->allocator); > > G_OBJECT_CLASS(gst_libcamera_pool_parent_class)->finalize(object); > @@ -122,7 +144,7 @@ gst_libcamera_pool_new(GstLibcameraAllocator *allocator, Stream *stream) > gsize pool_size = gst_libcamera_allocator_get_pool_size(allocator, stream); > for (gsize i = 0; i < pool_size; i++) { > GstBuffer *buffer = gst_buffer_new(); > - gst_atomic_queue_push(pool->queue, buffer); > + pool->queue->push_back(buffer); > } > > return pool; > -- > 2.45.1 >
diff --git a/src/gstreamer/gstlibcamerapool.cpp b/src/gstreamer/gstlibcamerapool.cpp index 9661c67a..0b1a5689 100644 --- a/src/gstreamer/gstlibcamerapool.cpp +++ b/src/gstreamer/gstlibcamerapool.cpp @@ -8,6 +8,7 @@ #include "gstlibcamerapool.h" +#include <deque> #include <libcamera/stream.h> #include "gstlibcamera-utils.h" @@ -24,24 +25,41 @@ static guint signals[N_SIGNALS]; struct _GstLibcameraPool { GstBufferPool parent; - GstAtomicQueue *queue; + std::deque<GstBuffer *> *queue; GstLibcameraAllocator *allocator; Stream *stream; }; G_DEFINE_TYPE(GstLibcameraPool, gst_libcamera_pool, GST_TYPE_BUFFER_POOL) +static GstBuffer * +gst_libcamera_pool_pop_buffer(GstLibcameraPool *self) +{ + GLibLocker lock(GST_OBJECT(self)); + GstBuffer *buf; + + if (self->queue->empty()) + return nullptr; + + buf = self->queue->front(); + self->queue->pop_front(); + + return buf; +} + static GstFlowReturn gst_libcamera_pool_acquire_buffer(GstBufferPool *pool, GstBuffer **buffer, [[maybe_unused]] GstBufferPoolAcquireParams *params) { GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool); - GstBuffer *buf = GST_BUFFER(gst_atomic_queue_pop(self->queue)); + GstBuffer *buf = gst_libcamera_pool_pop_buffer(self); + if (!buf) return GST_FLOW_ERROR; if (!gst_libcamera_allocator_prepare_buffer(self->allocator, self->stream, buf)) { - gst_atomic_queue_push(self->queue, buf); + GLibLocker lock(GST_OBJECT(self)); + self->queue->push_back(buf); return GST_FLOW_ERROR; } @@ -64,9 +82,13 @@ static void gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer) { GstLibcameraPool *self = GST_LIBCAMERA_POOL(pool); - bool do_notify = gst_atomic_queue_length(self->queue) == 0; + bool do_notify; - gst_atomic_queue_push(self->queue, buffer); + { + GLibLocker lock(GST_OBJECT(self)); + do_notify = self->queue->empty(); + self->queue->push_back(buffer); + } if (do_notify) g_signal_emit(self, signals[SIGNAL_BUFFER_NOTIFY], 0); @@ -75,7 +97,7 @@ gst_libcamera_pool_release_buffer(GstBufferPool *pool, GstBuffer *buffer) static void gst_libcamera_pool_init(GstLibcameraPool *self) { - self->queue = gst_atomic_queue_new(4); + self->queue = new std::deque<GstBuffer *>(); } static void @@ -84,10 +106,10 @@ gst_libcamera_pool_finalize(GObject *object) GstLibcameraPool *self = GST_LIBCAMERA_POOL(object); GstBuffer *buf; - while ((buf = GST_BUFFER(gst_atomic_queue_pop(self->queue)))) + while ((buf = gst_libcamera_pool_pop_buffer(self))) gst_buffer_unref(buf); - gst_atomic_queue_unref(self->queue); + delete self->queue; g_object_unref(self->allocator); G_OBJECT_CLASS(gst_libcamera_pool_parent_class)->finalize(object); @@ -122,7 +144,7 @@ gst_libcamera_pool_new(GstLibcameraAllocator *allocator, Stream *stream) gsize pool_size = gst_libcamera_allocator_get_pool_size(allocator, stream); for (gsize i = 0; i < pool_size; i++) { GstBuffer *buffer = gst_buffer_new(); - gst_atomic_queue_push(pool->queue, buffer); + pool->queue->push_back(buffer); } return pool;