[libcamera-devel,10/13] gstreamer: Fix pads locking
diff mbox series

Message ID 20220623232210.18742-11-laurent.pinchart@ideasonboard.com
State Accepted
Headers show
Series
  • gstreamer: Queue multiple requests
Related show

Commit Message

Laurent Pinchart June 23, 2022, 11:22 p.m. UTC
The srcpads_ vector is protected by two different locks, the GstObject
lock of the libcamerasrc element, and the stream_lock that covers the
run function of the thread. This isn't correct. Use the stream_lock
consistently to protect the pads.

Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
---
 src/gstreamer/gstlibcamerasrc.cpp | 68 ++++++++++++++++---------------
 1 file changed, 35 insertions(+), 33 deletions(-)

Comments

Nicolas Dufresne June 28, 2022, 1:21 p.m. UTC | #1
Le vendredi 24 juin 2022 à 02:22 +0300, Laurent Pinchart a écrit :
> The srcpads_ vector is protected by two different locks, the GstObject
> lock of the libcamerasrc element, and the stream_lock that covers the
> run function of the thread. This isn't correct. Use the stream_lock
> consistently to protect the pads.
> 
> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> ---
>  src/gstreamer/gstlibcamerasrc.cpp | 68 ++++++++++++++++---------------
>  1 file changed, 35 insertions(+), 33 deletions(-)
> 
> diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
> index b85ba39fb808..58a322b251c7 100644
> --- a/src/gstreamer/gstlibcamerasrc.cpp
> +++ b/src/gstreamer/gstlibcamerasrc.cpp
> @@ -112,7 +112,8 @@ struct GstLibcameraSrcState {
>  	std::shared_ptr<CameraManager> cm_;
>  	std::shared_ptr<Camera> cam_;
>  	std::unique_ptr<CameraConfiguration> config_;
> -	std::vector<GstPad *> srcpads_;
> +
> +	std::vector<GstPad *> srcpads_; /* Protected by stream_lock */
>  
>  	Mutex lock_;
>  	std::queue<std::unique_ptr<RequestWrap>> queuedRequests_
> @@ -349,36 +350,34 @@ gst_libcamera_src_task_run(gpointer user_data)
>  							srcpad, ret);
>  	}
>  
> +	if (ret != GST_FLOW_OK) {
> +		if (ret == GST_FLOW_EOS) {
> +			g_autoptr(GstEvent) eos = gst_event_new_eos();
> +			guint32 seqnum = gst_util_seqnum_next();
> +			gst_event_set_seqnum(eos, seqnum);
> +			for (GstPad *srcpad : state->srcpads_)
> +				gst_pad_push_event(srcpad, gst_event_ref(eos));
> +		} else if (ret != GST_FLOW_FLUSHING) {
> +			GST_ELEMENT_FLOW_ERROR(self, ret);
> +		}
> +		gst_task_stop(self->task);
> +		return;
> +	}
> +
> +	/*
> +	 * Here we need to decide if we want to pause. This needs to
> +	 * happen in lock step with the callback thread which may want
> +	 * to resume the task and might push pending buffers.
> +	 */
> +	bool do_pause;
> +
>  	{
> -		if (ret != GST_FLOW_OK) {
> -			if (ret == GST_FLOW_EOS) {
> -				g_autoptr(GstEvent) eos = gst_event_new_eos();
> -				guint32 seqnum = gst_util_seqnum_next();
> -				gst_event_set_seqnum(eos, seqnum);
> -				for (GstPad *srcpad : state->srcpads_)
> -					gst_pad_push_event(srcpad, gst_event_ref(eos));
> -			} else if (ret != GST_FLOW_FLUSHING) {
> -				GST_ELEMENT_FLOW_ERROR(self, ret);
> -			}
> -			gst_task_stop(self->task);
> -			return;
> -		}
> -
> -		/*
> -		 * Here we need to decide if we want to pause. This needs to
> -		 * happen in lock step with the callback thread which may want
> -		 * to resume the task and might push pending buffers.
> -		 */
> -		bool do_pause;
> -
> -		{
> -			MutexLocker locker(state->lock_);
> -			do_pause = state->completedRequests_.empty();
> -		}
> -
> -		if (do_pause)
> -			gst_task_pause(self->task);
> +		MutexLocker locker(state->lock_);
> +		do_pause = state->completedRequests_.empty();

As you introduce locking order restrictions here, you should also add a comment
next to the state->lock_ documetation that reminds users what order must be
respected. If my reading is correct, the recursive streaming lock must be taken
before this lock (if both are to be taken).

We need to be careful with pads streaming lock, since serialized queries and
even can be re-entrant. So we could be called back with the pad stream lock
held, which needs to be considered in our lock ordering scenarios. So to be
avoided:

  StreamLock
  Lock
  gst_pad_query(...)
  -> May be re-entrant, non-recursive lock held + locking order complexity
  ...

I would simply disallow the request lock from being held while calling into
other element in the graph.

>  	}
> +
> +	if (do_pause)
> +		gst_task_pause(self->task);

This is still racy, I believe you fix that in patch 13. What I would normally do
is prevent concurrent request throughout the patchset, and only enable it in the
last patch. This is just to stay "bisect" friendly, but not strictly required.

>  }
>  
>  static void
> @@ -532,8 +531,11 @@ gst_libcamera_src_task_leave([[maybe_unused]] GstTask *task,
>  		state->completedRequests_ = {};
>  	}
>  
> -	for (GstPad *srcpad : state->srcpads_)
> -		gst_libcamera_pad_set_pool(srcpad, nullptr);
> +	{
> +		GLibRecLocker locker(&self->stream_lock);
> +		for (GstPad *srcpad : state->srcpads_)
> +			gst_libcamera_pad_set_pool(srcpad, nullptr);
> +	}
>  
>  	g_clear_object(&self->allocator);
>  	g_clear_pointer(&self->flow_combiner,
> @@ -692,7 +694,7 @@ gst_libcamera_src_request_new_pad(GstElement *element, GstPadTemplate *templ,
>  	g_object_ref_sink(pad);
>  
>  	if (gst_element_add_pad(element, pad)) {
> -		GLibLocker lock(GST_OBJECT(self));
> +		GLibRecLocker lock(&self->stream_lock);
>  		self->state->srcpads_.push_back(reinterpret_cast<GstPad *>(g_object_ref(pad)));
>  	} else {
>  		GST_ELEMENT_ERROR(element, STREAM, FAILED,
> @@ -712,7 +714,7 @@ gst_libcamera_src_release_pad(GstElement *element, GstPad *pad)
>  	GST_DEBUG_OBJECT(self, "Pad %" GST_PTR_FORMAT " being released", pad);
>  
>  	{
> -		GLibLocker lock(GST_OBJECT(self));
> +		GLibRecLocker lock(&self->stream_lock);
>  		std::vector<GstPad *> &pads = self->state->srcpads_;
>  		auto begin_iterator = pads.begin();
>  		auto end_iterator = pads.end();

With lock order documented:

Reviewed-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>
Laurent Pinchart June 28, 2022, 6:01 p.m. UTC | #2
Hi Nicolas,

On Tue, Jun 28, 2022 at 09:21:07AM -0400, Nicolas Dufresne wrote:
> Le vendredi 24 juin 2022 à 02:22 +0300, Laurent Pinchart a écrit :
> > The srcpads_ vector is protected by two different locks, the GstObject
> > lock of the libcamerasrc element, and the stream_lock that covers the
> > run function of the thread. This isn't correct. Use the stream_lock
> > consistently to protect the pads.
> > 
> > Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> > ---
> >  src/gstreamer/gstlibcamerasrc.cpp | 68 ++++++++++++++++---------------
> >  1 file changed, 35 insertions(+), 33 deletions(-)
> > 
> > diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
> > index b85ba39fb808..58a322b251c7 100644
> > --- a/src/gstreamer/gstlibcamerasrc.cpp
> > +++ b/src/gstreamer/gstlibcamerasrc.cpp
> > @@ -112,7 +112,8 @@ struct GstLibcameraSrcState {
> >  	std::shared_ptr<CameraManager> cm_;
> >  	std::shared_ptr<Camera> cam_;
> >  	std::unique_ptr<CameraConfiguration> config_;
> > -	std::vector<GstPad *> srcpads_;
> > +
> > +	std::vector<GstPad *> srcpads_; /* Protected by stream_lock */
> >  
> >  	Mutex lock_;
> >  	std::queue<std::unique_ptr<RequestWrap>> queuedRequests_
> > @@ -349,36 +350,34 @@ gst_libcamera_src_task_run(gpointer user_data)
> >  							srcpad, ret);
> >  	}
> >  
> > +	if (ret != GST_FLOW_OK) {
> > +		if (ret == GST_FLOW_EOS) {
> > +			g_autoptr(GstEvent) eos = gst_event_new_eos();
> > +			guint32 seqnum = gst_util_seqnum_next();
> > +			gst_event_set_seqnum(eos, seqnum);
> > +			for (GstPad *srcpad : state->srcpads_)
> > +				gst_pad_push_event(srcpad, gst_event_ref(eos));
> > +		} else if (ret != GST_FLOW_FLUSHING) {
> > +			GST_ELEMENT_FLOW_ERROR(self, ret);
> > +		}
> > +		gst_task_stop(self->task);
> > +		return;
> > +	}
> > +
> > +	/*
> > +	 * Here we need to decide if we want to pause. This needs to
> > +	 * happen in lock step with the callback thread which may want
> > +	 * to resume the task and might push pending buffers.
> > +	 */
> > +	bool do_pause;
> > +
> >  	{
> > -		if (ret != GST_FLOW_OK) {
> > -			if (ret == GST_FLOW_EOS) {
> > -				g_autoptr(GstEvent) eos = gst_event_new_eos();
> > -				guint32 seqnum = gst_util_seqnum_next();
> > -				gst_event_set_seqnum(eos, seqnum);
> > -				for (GstPad *srcpad : state->srcpads_)
> > -					gst_pad_push_event(srcpad, gst_event_ref(eos));
> > -			} else if (ret != GST_FLOW_FLUSHING) {
> > -				GST_ELEMENT_FLOW_ERROR(self, ret);
> > -			}
> > -			gst_task_stop(self->task);
> > -			return;
> > -		}
> > -
> > -		/*
> > -		 * Here we need to decide if we want to pause. This needs to
> > -		 * happen in lock step with the callback thread which may want
> > -		 * to resume the task and might push pending buffers.
> > -		 */
> > -		bool do_pause;
> > -
> > -		{
> > -			MutexLocker locker(state->lock_);
> > -			do_pause = state->completedRequests_.empty();
> > -		}
> > -
> > -		if (do_pause)
> > -			gst_task_pause(self->task);
> > +		MutexLocker locker(state->lock_);
> > +		do_pause = state->completedRequests_.empty();
> 
> As you introduce locking order restrictions here, you should also add a comment
> next to the state->lock_ documetation that reminds users what order must be
> respected. If my reading is correct, the recursive streaming lock must be taken
> before this lock (if both are to be taken).
> 
> We need to be careful with pads streaming lock, since serialized queries and
> even can be re-entrant. So we could be called back with the pad stream lock
> held, which needs to be considered in our lock ordering scenarios. So to be
> avoided:
> 
>   StreamLock
>   Lock
>   gst_pad_query(...)
>   -> May be re-entrant, non-recursive lock held + locking order complexity
>   ...
> 
> I would simply disallow the request lock from being held while calling into
> other element in the graph.

Sounds good. The request lock is meant to be held over as little code as
possible, while the stream lock can cover larger sections (especially
given that it covers the whole run function, by design of GstTask). I'll
document that.

> >  	}
> > +
> > +	if (do_pause)
> > +		gst_task_pause(self->task);
> 
> This is still racy, I believe you fix that in patch 13. What I would normally do
> is prevent concurrent request throughout the patchset, and only enable it in the
> last patch. This is just to stay "bisect" friendly, but not strictly required.

I tried to find a good way to do so, but in the end decided that it may
not be worth it, given that the current locking scheme is already racy.
Remember I have little experience with GStreamer, so I'm navigating in
difficult waters for me :-) I couldn't immediately find a way to achieve
the result I wanted while bringing a clear improvement in each patch
without any potential extension of existing race conditions, and without
making patches too large for my own taste (which is also conditioned by
my lack of experience here).

> >  }
> >  
> >  static void
> > @@ -532,8 +531,11 @@ gst_libcamera_src_task_leave([[maybe_unused]] GstTask *task,
> >  		state->completedRequests_ = {};
> >  	}
> >  
> > -	for (GstPad *srcpad : state->srcpads_)
> > -		gst_libcamera_pad_set_pool(srcpad, nullptr);
> > +	{
> > +		GLibRecLocker locker(&self->stream_lock);
> > +		for (GstPad *srcpad : state->srcpads_)
> > +			gst_libcamera_pad_set_pool(srcpad, nullptr);
> > +	}
> >  
> >  	g_clear_object(&self->allocator);
> >  	g_clear_pointer(&self->flow_combiner,
> > @@ -692,7 +694,7 @@ gst_libcamera_src_request_new_pad(GstElement *element, GstPadTemplate *templ,
> >  	g_object_ref_sink(pad);
> >  
> >  	if (gst_element_add_pad(element, pad)) {
> > -		GLibLocker lock(GST_OBJECT(self));
> > +		GLibRecLocker lock(&self->stream_lock);
> >  		self->state->srcpads_.push_back(reinterpret_cast<GstPad *>(g_object_ref(pad)));
> >  	} else {
> >  		GST_ELEMENT_ERROR(element, STREAM, FAILED,
> > @@ -712,7 +714,7 @@ gst_libcamera_src_release_pad(GstElement *element, GstPad *pad)
> >  	GST_DEBUG_OBJECT(self, "Pad %" GST_PTR_FORMAT " being released", pad);
> >  
> >  	{
> > -		GLibLocker lock(GST_OBJECT(self));
> > +		GLibRecLocker lock(&self->stream_lock);
> >  		std::vector<GstPad *> &pads = self->state->srcpads_;
> >  		auto begin_iterator = pads.begin();
> >  		auto end_iterator = pads.end();
> 
> With lock order documented:
> 
> Reviewed-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>

Patch
diff mbox series

diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
index b85ba39fb808..58a322b251c7 100644
--- a/src/gstreamer/gstlibcamerasrc.cpp
+++ b/src/gstreamer/gstlibcamerasrc.cpp
@@ -112,7 +112,8 @@  struct GstLibcameraSrcState {
 	std::shared_ptr<CameraManager> cm_;
 	std::shared_ptr<Camera> cam_;
 	std::unique_ptr<CameraConfiguration> config_;
-	std::vector<GstPad *> srcpads_;
+
+	std::vector<GstPad *> srcpads_; /* Protected by stream_lock */
 
 	Mutex lock_;
 	std::queue<std::unique_ptr<RequestWrap>> queuedRequests_
@@ -349,36 +350,34 @@  gst_libcamera_src_task_run(gpointer user_data)
 							srcpad, ret);
 	}
 
+	if (ret != GST_FLOW_OK) {
+		if (ret == GST_FLOW_EOS) {
+			g_autoptr(GstEvent) eos = gst_event_new_eos();
+			guint32 seqnum = gst_util_seqnum_next();
+			gst_event_set_seqnum(eos, seqnum);
+			for (GstPad *srcpad : state->srcpads_)
+				gst_pad_push_event(srcpad, gst_event_ref(eos));
+		} else if (ret != GST_FLOW_FLUSHING) {
+			GST_ELEMENT_FLOW_ERROR(self, ret);
+		}
+		gst_task_stop(self->task);
+		return;
+	}
+
+	/*
+	 * Here we need to decide if we want to pause. This needs to
+	 * happen in lock step with the callback thread which may want
+	 * to resume the task and might push pending buffers.
+	 */
+	bool do_pause;
+
 	{
-		if (ret != GST_FLOW_OK) {
-			if (ret == GST_FLOW_EOS) {
-				g_autoptr(GstEvent) eos = gst_event_new_eos();
-				guint32 seqnum = gst_util_seqnum_next();
-				gst_event_set_seqnum(eos, seqnum);
-				for (GstPad *srcpad : state->srcpads_)
-					gst_pad_push_event(srcpad, gst_event_ref(eos));
-			} else if (ret != GST_FLOW_FLUSHING) {
-				GST_ELEMENT_FLOW_ERROR(self, ret);
-			}
-			gst_task_stop(self->task);
-			return;
-		}
-
-		/*
-		 * Here we need to decide if we want to pause. This needs to
-		 * happen in lock step with the callback thread which may want
-		 * to resume the task and might push pending buffers.
-		 */
-		bool do_pause;
-
-		{
-			MutexLocker locker(state->lock_);
-			do_pause = state->completedRequests_.empty();
-		}
-
-		if (do_pause)
-			gst_task_pause(self->task);
+		MutexLocker locker(state->lock_);
+		do_pause = state->completedRequests_.empty();
 	}
+
+	if (do_pause)
+		gst_task_pause(self->task);
 }
 
 static void
@@ -532,8 +531,11 @@  gst_libcamera_src_task_leave([[maybe_unused]] GstTask *task,
 		state->completedRequests_ = {};
 	}
 
-	for (GstPad *srcpad : state->srcpads_)
-		gst_libcamera_pad_set_pool(srcpad, nullptr);
+	{
+		GLibRecLocker locker(&self->stream_lock);
+		for (GstPad *srcpad : state->srcpads_)
+			gst_libcamera_pad_set_pool(srcpad, nullptr);
+	}
 
 	g_clear_object(&self->allocator);
 	g_clear_pointer(&self->flow_combiner,
@@ -692,7 +694,7 @@  gst_libcamera_src_request_new_pad(GstElement *element, GstPadTemplate *templ,
 	g_object_ref_sink(pad);
 
 	if (gst_element_add_pad(element, pad)) {
-		GLibLocker lock(GST_OBJECT(self));
+		GLibRecLocker lock(&self->stream_lock);
 		self->state->srcpads_.push_back(reinterpret_cast<GstPad *>(g_object_ref(pad)));
 	} else {
 		GST_ELEMENT_ERROR(element, STREAM, FAILED,
@@ -712,7 +714,7 @@  gst_libcamera_src_release_pad(GstElement *element, GstPad *pad)
 	GST_DEBUG_OBJECT(self, "Pad %" GST_PTR_FORMAT " being released", pad);
 
 	{
-		GLibLocker lock(GST_OBJECT(self));
+		GLibRecLocker lock(&self->stream_lock);
 		std::vector<GstPad *> &pads = self->state->srcpads_;
 		auto begin_iterator = pads.begin();
 		auto end_iterator = pads.end();