Message ID | 20220623232210.18742-11-laurent.pinchart@ideasonboard.com |
---|---|
State | Accepted |
Headers | show |
Series |
|
Related | show |
Le vendredi 24 juin 2022 à 02:22 +0300, Laurent Pinchart a écrit : > The srcpads_ vector is protected by two different locks, the GstObject > lock of the libcamerasrc element, and the stream_lock that covers the > run function of the thread. This isn't correct. Use the stream_lock > consistently to protect the pads. > > Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com> > --- > src/gstreamer/gstlibcamerasrc.cpp | 68 ++++++++++++++++--------------- > 1 file changed, 35 insertions(+), 33 deletions(-) > > diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp > index b85ba39fb808..58a322b251c7 100644 > --- a/src/gstreamer/gstlibcamerasrc.cpp > +++ b/src/gstreamer/gstlibcamerasrc.cpp > @@ -112,7 +112,8 @@ struct GstLibcameraSrcState { > std::shared_ptr<CameraManager> cm_; > std::shared_ptr<Camera> cam_; > std::unique_ptr<CameraConfiguration> config_; > - std::vector<GstPad *> srcpads_; > + > + std::vector<GstPad *> srcpads_; /* Protected by stream_lock */ > > Mutex lock_; > std::queue<std::unique_ptr<RequestWrap>> queuedRequests_ > @@ -349,36 +350,34 @@ gst_libcamera_src_task_run(gpointer user_data) > srcpad, ret); > } > > + if (ret != GST_FLOW_OK) { > + if (ret == GST_FLOW_EOS) { > + g_autoptr(GstEvent) eos = gst_event_new_eos(); > + guint32 seqnum = gst_util_seqnum_next(); > + gst_event_set_seqnum(eos, seqnum); > + for (GstPad *srcpad : state->srcpads_) > + gst_pad_push_event(srcpad, gst_event_ref(eos)); > + } else if (ret != GST_FLOW_FLUSHING) { > + GST_ELEMENT_FLOW_ERROR(self, ret); > + } > + gst_task_stop(self->task); > + return; > + } > + > + /* > + * Here we need to decide if we want to pause. This needs to > + * happen in lock step with the callback thread which may want > + * to resume the task and might push pending buffers. > + */ > + bool do_pause; > + > { > - if (ret != GST_FLOW_OK) { > - if (ret == GST_FLOW_EOS) { > - g_autoptr(GstEvent) eos = gst_event_new_eos(); > - guint32 seqnum = gst_util_seqnum_next(); > - gst_event_set_seqnum(eos, seqnum); > - for (GstPad *srcpad : state->srcpads_) > - gst_pad_push_event(srcpad, gst_event_ref(eos)); > - } else if (ret != GST_FLOW_FLUSHING) { > - GST_ELEMENT_FLOW_ERROR(self, ret); > - } > - gst_task_stop(self->task); > - return; > - } > - > - /* > - * Here we need to decide if we want to pause. This needs to > - * happen in lock step with the callback thread which may want > - * to resume the task and might push pending buffers. > - */ > - bool do_pause; > - > - { > - MutexLocker locker(state->lock_); > - do_pause = state->completedRequests_.empty(); > - } > - > - if (do_pause) > - gst_task_pause(self->task); > + MutexLocker locker(state->lock_); > + do_pause = state->completedRequests_.empty(); As you introduce locking order restrictions here, you should also add a comment next to the state->lock_ documetation that reminds users what order must be respected. If my reading is correct, the recursive streaming lock must be taken before this lock (if both are to be taken). We need to be careful with pads streaming lock, since serialized queries and even can be re-entrant. So we could be called back with the pad stream lock held, which needs to be considered in our lock ordering scenarios. So to be avoided: StreamLock Lock gst_pad_query(...) -> May be re-entrant, non-recursive lock held + locking order complexity ... I would simply disallow the request lock from being held while calling into other element in the graph. > } > + > + if (do_pause) > + gst_task_pause(self->task); This is still racy, I believe you fix that in patch 13. What I would normally do is prevent concurrent request throughout the patchset, and only enable it in the last patch. This is just to stay "bisect" friendly, but not strictly required. > } > > static void > @@ -532,8 +531,11 @@ gst_libcamera_src_task_leave([[maybe_unused]] GstTask *task, > state->completedRequests_ = {}; > } > > - for (GstPad *srcpad : state->srcpads_) > - gst_libcamera_pad_set_pool(srcpad, nullptr); > + { > + GLibRecLocker locker(&self->stream_lock); > + for (GstPad *srcpad : state->srcpads_) > + gst_libcamera_pad_set_pool(srcpad, nullptr); > + } > > g_clear_object(&self->allocator); > g_clear_pointer(&self->flow_combiner, > @@ -692,7 +694,7 @@ gst_libcamera_src_request_new_pad(GstElement *element, GstPadTemplate *templ, > g_object_ref_sink(pad); > > if (gst_element_add_pad(element, pad)) { > - GLibLocker lock(GST_OBJECT(self)); > + GLibRecLocker lock(&self->stream_lock); > self->state->srcpads_.push_back(reinterpret_cast<GstPad *>(g_object_ref(pad))); > } else { > GST_ELEMENT_ERROR(element, STREAM, FAILED, > @@ -712,7 +714,7 @@ gst_libcamera_src_release_pad(GstElement *element, GstPad *pad) > GST_DEBUG_OBJECT(self, "Pad %" GST_PTR_FORMAT " being released", pad); > > { > - GLibLocker lock(GST_OBJECT(self)); > + GLibRecLocker lock(&self->stream_lock); > std::vector<GstPad *> &pads = self->state->srcpads_; > auto begin_iterator = pads.begin(); > auto end_iterator = pads.end(); With lock order documented: Reviewed-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>
Hi Nicolas, On Tue, Jun 28, 2022 at 09:21:07AM -0400, Nicolas Dufresne wrote: > Le vendredi 24 juin 2022 à 02:22 +0300, Laurent Pinchart a écrit : > > The srcpads_ vector is protected by two different locks, the GstObject > > lock of the libcamerasrc element, and the stream_lock that covers the > > run function of the thread. This isn't correct. Use the stream_lock > > consistently to protect the pads. > > > > Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com> > > --- > > src/gstreamer/gstlibcamerasrc.cpp | 68 ++++++++++++++++--------------- > > 1 file changed, 35 insertions(+), 33 deletions(-) > > > > diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp > > index b85ba39fb808..58a322b251c7 100644 > > --- a/src/gstreamer/gstlibcamerasrc.cpp > > +++ b/src/gstreamer/gstlibcamerasrc.cpp > > @@ -112,7 +112,8 @@ struct GstLibcameraSrcState { > > std::shared_ptr<CameraManager> cm_; > > std::shared_ptr<Camera> cam_; > > std::unique_ptr<CameraConfiguration> config_; > > - std::vector<GstPad *> srcpads_; > > + > > + std::vector<GstPad *> srcpads_; /* Protected by stream_lock */ > > > > Mutex lock_; > > std::queue<std::unique_ptr<RequestWrap>> queuedRequests_ > > @@ -349,36 +350,34 @@ gst_libcamera_src_task_run(gpointer user_data) > > srcpad, ret); > > } > > > > + if (ret != GST_FLOW_OK) { > > + if (ret == GST_FLOW_EOS) { > > + g_autoptr(GstEvent) eos = gst_event_new_eos(); > > + guint32 seqnum = gst_util_seqnum_next(); > > + gst_event_set_seqnum(eos, seqnum); > > + for (GstPad *srcpad : state->srcpads_) > > + gst_pad_push_event(srcpad, gst_event_ref(eos)); > > + } else if (ret != GST_FLOW_FLUSHING) { > > + GST_ELEMENT_FLOW_ERROR(self, ret); > > + } > > + gst_task_stop(self->task); > > + return; > > + } > > + > > + /* > > + * Here we need to decide if we want to pause. This needs to > > + * happen in lock step with the callback thread which may want > > + * to resume the task and might push pending buffers. > > + */ > > + bool do_pause; > > + > > { > > - if (ret != GST_FLOW_OK) { > > - if (ret == GST_FLOW_EOS) { > > - g_autoptr(GstEvent) eos = gst_event_new_eos(); > > - guint32 seqnum = gst_util_seqnum_next(); > > - gst_event_set_seqnum(eos, seqnum); > > - for (GstPad *srcpad : state->srcpads_) > > - gst_pad_push_event(srcpad, gst_event_ref(eos)); > > - } else if (ret != GST_FLOW_FLUSHING) { > > - GST_ELEMENT_FLOW_ERROR(self, ret); > > - } > > - gst_task_stop(self->task); > > - return; > > - } > > - > > - /* > > - * Here we need to decide if we want to pause. This needs to > > - * happen in lock step with the callback thread which may want > > - * to resume the task and might push pending buffers. > > - */ > > - bool do_pause; > > - > > - { > > - MutexLocker locker(state->lock_); > > - do_pause = state->completedRequests_.empty(); > > - } > > - > > - if (do_pause) > > - gst_task_pause(self->task); > > + MutexLocker locker(state->lock_); > > + do_pause = state->completedRequests_.empty(); > > As you introduce locking order restrictions here, you should also add a comment > next to the state->lock_ documetation that reminds users what order must be > respected. If my reading is correct, the recursive streaming lock must be taken > before this lock (if both are to be taken). > > We need to be careful with pads streaming lock, since serialized queries and > even can be re-entrant. So we could be called back with the pad stream lock > held, which needs to be considered in our lock ordering scenarios. So to be > avoided: > > StreamLock > Lock > gst_pad_query(...) > -> May be re-entrant, non-recursive lock held + locking order complexity > ... > > I would simply disallow the request lock from being held while calling into > other element in the graph. Sounds good. The request lock is meant to be held over as little code as possible, while the stream lock can cover larger sections (especially given that it covers the whole run function, by design of GstTask). I'll document that. > > } > > + > > + if (do_pause) > > + gst_task_pause(self->task); > > This is still racy, I believe you fix that in patch 13. What I would normally do > is prevent concurrent request throughout the patchset, and only enable it in the > last patch. This is just to stay "bisect" friendly, but not strictly required. I tried to find a good way to do so, but in the end decided that it may not be worth it, given that the current locking scheme is already racy. Remember I have little experience with GStreamer, so I'm navigating in difficult waters for me :-) I couldn't immediately find a way to achieve the result I wanted while bringing a clear improvement in each patch without any potential extension of existing race conditions, and without making patches too large for my own taste (which is also conditioned by my lack of experience here). > > } > > > > static void > > @@ -532,8 +531,11 @@ gst_libcamera_src_task_leave([[maybe_unused]] GstTask *task, > > state->completedRequests_ = {}; > > } > > > > - for (GstPad *srcpad : state->srcpads_) > > - gst_libcamera_pad_set_pool(srcpad, nullptr); > > + { > > + GLibRecLocker locker(&self->stream_lock); > > + for (GstPad *srcpad : state->srcpads_) > > + gst_libcamera_pad_set_pool(srcpad, nullptr); > > + } > > > > g_clear_object(&self->allocator); > > g_clear_pointer(&self->flow_combiner, > > @@ -692,7 +694,7 @@ gst_libcamera_src_request_new_pad(GstElement *element, GstPadTemplate *templ, > > g_object_ref_sink(pad); > > > > if (gst_element_add_pad(element, pad)) { > > - GLibLocker lock(GST_OBJECT(self)); > > + GLibRecLocker lock(&self->stream_lock); > > self->state->srcpads_.push_back(reinterpret_cast<GstPad *>(g_object_ref(pad))); > > } else { > > GST_ELEMENT_ERROR(element, STREAM, FAILED, > > @@ -712,7 +714,7 @@ gst_libcamera_src_release_pad(GstElement *element, GstPad *pad) > > GST_DEBUG_OBJECT(self, "Pad %" GST_PTR_FORMAT " being released", pad); > > > > { > > - GLibLocker lock(GST_OBJECT(self)); > > + GLibRecLocker lock(&self->stream_lock); > > std::vector<GstPad *> &pads = self->state->srcpads_; > > auto begin_iterator = pads.begin(); > > auto end_iterator = pads.end(); > > With lock order documented: > > Reviewed-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>
diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp index b85ba39fb808..58a322b251c7 100644 --- a/src/gstreamer/gstlibcamerasrc.cpp +++ b/src/gstreamer/gstlibcamerasrc.cpp @@ -112,7 +112,8 @@ struct GstLibcameraSrcState { std::shared_ptr<CameraManager> cm_; std::shared_ptr<Camera> cam_; std::unique_ptr<CameraConfiguration> config_; - std::vector<GstPad *> srcpads_; + + std::vector<GstPad *> srcpads_; /* Protected by stream_lock */ Mutex lock_; std::queue<std::unique_ptr<RequestWrap>> queuedRequests_ @@ -349,36 +350,34 @@ gst_libcamera_src_task_run(gpointer user_data) srcpad, ret); } + if (ret != GST_FLOW_OK) { + if (ret == GST_FLOW_EOS) { + g_autoptr(GstEvent) eos = gst_event_new_eos(); + guint32 seqnum = gst_util_seqnum_next(); + gst_event_set_seqnum(eos, seqnum); + for (GstPad *srcpad : state->srcpads_) + gst_pad_push_event(srcpad, gst_event_ref(eos)); + } else if (ret != GST_FLOW_FLUSHING) { + GST_ELEMENT_FLOW_ERROR(self, ret); + } + gst_task_stop(self->task); + return; + } + + /* + * Here we need to decide if we want to pause. This needs to + * happen in lock step with the callback thread which may want + * to resume the task and might push pending buffers. + */ + bool do_pause; + { - if (ret != GST_FLOW_OK) { - if (ret == GST_FLOW_EOS) { - g_autoptr(GstEvent) eos = gst_event_new_eos(); - guint32 seqnum = gst_util_seqnum_next(); - gst_event_set_seqnum(eos, seqnum); - for (GstPad *srcpad : state->srcpads_) - gst_pad_push_event(srcpad, gst_event_ref(eos)); - } else if (ret != GST_FLOW_FLUSHING) { - GST_ELEMENT_FLOW_ERROR(self, ret); - } - gst_task_stop(self->task); - return; - } - - /* - * Here we need to decide if we want to pause. This needs to - * happen in lock step with the callback thread which may want - * to resume the task and might push pending buffers. - */ - bool do_pause; - - { - MutexLocker locker(state->lock_); - do_pause = state->completedRequests_.empty(); - } - - if (do_pause) - gst_task_pause(self->task); + MutexLocker locker(state->lock_); + do_pause = state->completedRequests_.empty(); } + + if (do_pause) + gst_task_pause(self->task); } static void @@ -532,8 +531,11 @@ gst_libcamera_src_task_leave([[maybe_unused]] GstTask *task, state->completedRequests_ = {}; } - for (GstPad *srcpad : state->srcpads_) - gst_libcamera_pad_set_pool(srcpad, nullptr); + { + GLibRecLocker locker(&self->stream_lock); + for (GstPad *srcpad : state->srcpads_) + gst_libcamera_pad_set_pool(srcpad, nullptr); + } g_clear_object(&self->allocator); g_clear_pointer(&self->flow_combiner, @@ -692,7 +694,7 @@ gst_libcamera_src_request_new_pad(GstElement *element, GstPadTemplate *templ, g_object_ref_sink(pad); if (gst_element_add_pad(element, pad)) { - GLibLocker lock(GST_OBJECT(self)); + GLibRecLocker lock(&self->stream_lock); self->state->srcpads_.push_back(reinterpret_cast<GstPad *>(g_object_ref(pad))); } else { GST_ELEMENT_ERROR(element, STREAM, FAILED, @@ -712,7 +714,7 @@ gst_libcamera_src_release_pad(GstElement *element, GstPad *pad) GST_DEBUG_OBJECT(self, "Pad %" GST_PTR_FORMAT " being released", pad); { - GLibLocker lock(GST_OBJECT(self)); + GLibRecLocker lock(&self->stream_lock); std::vector<GstPad *> &pads = self->state->srcpads_; auto begin_iterator = pads.begin(); auto end_iterator = pads.end();
The srcpads_ vector is protected by two different locks, the GstObject lock of the libcamerasrc element, and the stream_lock that covers the run function of the thread. This isn't correct. Use the stream_lock consistently to protect the pads. Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com> --- src/gstreamer/gstlibcamerasrc.cpp | 68 ++++++++++++++++--------------- 1 file changed, 35 insertions(+), 33 deletions(-)