[libcamera-devel,v2,09/12] gstreamer: Fix pads locking
diff mbox series

Message ID 20220630000251.31295-10-laurent.pinchart@ideasonboard.com
State Accepted
Headers show
Series
  • gstreamer: Queue multiple requests
Related show

Commit Message

Laurent Pinchart June 30, 2022, 12:02 a.m. UTC
The srcpads_ vector is protected by two different locks, the GstObject
lock of the libcamerasrc element, and the stream_lock that covers the
run function of the thread. This isn't correct. Use the stream_lock
consistently to protect the pads.

Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
Reviewed-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>
---
Changes since v1:

- Add a comment about lock ordering
---
 src/gstreamer/gstlibcamerasrc.cpp | 73 +++++++++++++++++--------------
 1 file changed, 40 insertions(+), 33 deletions(-)

Comments

Umang Jain June 30, 2022, 9:08 a.m. UTC | #1
Hi Laurent,

Thank you for the patch.

On 6/30/22 05:32, Laurent Pinchart via libcamera-devel wrote:
> The srcpads_ vector is protected by two different locks, the GstObject
> lock of the libcamerasrc element, and the stream_lock that covers the
> run function of the thread. This isn't correct. Use the stream_lock
> consistently to protect the pads.
>
> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> Reviewed-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>


Reviewed-by: Umang Jain <umang.jain@ideasonboard.com>

> ---
> Changes since v1:
>
> - Add a comment about lock ordering
> ---
>   src/gstreamer/gstlibcamerasrc.cpp | 73 +++++++++++++++++--------------
>   1 file changed, 40 insertions(+), 33 deletions(-)
>
> diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
> index 6f9a03c515d2..c92ca7d29fe6 100644
> --- a/src/gstreamer/gstlibcamerasrc.cpp
> +++ b/src/gstreamer/gstlibcamerasrc.cpp
> @@ -112,12 +112,18 @@ struct GstLibcameraSrcState {
>   	std::shared_ptr<CameraManager> cm_;
>   	std::shared_ptr<Camera> cam_;
>   	std::unique_ptr<CameraConfiguration> config_;
> -	std::vector<GstPad *> srcpads_;
> +
> +	std::vector<GstPad *> srcpads_; /* Protected by stream_lock */
>   
>   	/*
>   	 * Contention on this lock_ must be minimized, as it has to be taken in
>   	 * the realtime-sensitive requestCompleted() handler to protect
>   	 * queuedRequests_ and completedRequests_.
> +	 *
> +	 * stream_lock must be taken before lock_ in contexts where both locks
> +	 * need to be taken. In particular, this means that the lock_ must not
> +	 * be held while calling into other graph elements (e.g. when calling
> +	 * gst_pad_query()).
>   	 */
>   	Mutex lock_;
>   	std::queue<std::unique_ptr<RequestWrap>> queuedRequests_
> @@ -354,36 +360,34 @@ gst_libcamera_src_task_run(gpointer user_data)
>   							srcpad, ret);
>   	}
>   
> +	if (ret != GST_FLOW_OK) {
> +		if (ret == GST_FLOW_EOS) {
> +			g_autoptr(GstEvent) eos = gst_event_new_eos();
> +			guint32 seqnum = gst_util_seqnum_next();
> +			gst_event_set_seqnum(eos, seqnum);
> +			for (GstPad *srcpad : state->srcpads_)
> +				gst_pad_push_event(srcpad, gst_event_ref(eos));
> +		} else if (ret != GST_FLOW_FLUSHING) {
> +			GST_ELEMENT_FLOW_ERROR(self, ret);
> +		}
> +		gst_task_stop(self->task);
> +		return;
> +	}
> +
> +	/*
> +	 * Here we need to decide if we want to pause. This needs to
> +	 * happen in lock step with the callback thread which may want
> +	 * to resume the task and might push pending buffers.
> +	 */
> +	bool do_pause;
> +
>   	{
> -		if (ret != GST_FLOW_OK) {
> -			if (ret == GST_FLOW_EOS) {
> -				g_autoptr(GstEvent) eos = gst_event_new_eos();
> -				guint32 seqnum = gst_util_seqnum_next();
> -				gst_event_set_seqnum(eos, seqnum);
> -				for (GstPad *srcpad : state->srcpads_)
> -					gst_pad_push_event(srcpad, gst_event_ref(eos));
> -			} else if (ret != GST_FLOW_FLUSHING) {
> -				GST_ELEMENT_FLOW_ERROR(self, ret);
> -			}
> -			gst_task_stop(self->task);
> -			return;
> -		}
> -
> -		/*
> -		 * Here we need to decide if we want to pause. This needs to
> -		 * happen in lock step with the callback thread which may want
> -		 * to resume the task and might push pending buffers.
> -		 */
> -		bool do_pause;
> -
> -		{
> -			MutexLocker locker(state->lock_);
> -			do_pause = state->completedRequests_.empty();
> -		}
> -
> -		if (do_pause)
> -			gst_task_pause(self->task);
> +		MutexLocker locker(state->lock_);
> +		do_pause = state->completedRequests_.empty();
>   	}
> +
> +	if (do_pause)
> +		gst_task_pause(self->task);
>   }
>   
>   static void
> @@ -537,8 +541,11 @@ gst_libcamera_src_task_leave([[maybe_unused]] GstTask *task,
>   		state->completedRequests_ = {};
>   	}
>   
> -	for (GstPad *srcpad : state->srcpads_)
> -		gst_libcamera_pad_set_pool(srcpad, nullptr);
> +	{
> +		GLibRecLocker locker(&self->stream_lock);
> +		for (GstPad *srcpad : state->srcpads_)
> +			gst_libcamera_pad_set_pool(srcpad, nullptr);
> +	}
>   
>   	g_clear_object(&self->allocator);
>   	g_clear_pointer(&self->flow_combiner,
> @@ -697,7 +704,7 @@ gst_libcamera_src_request_new_pad(GstElement *element, GstPadTemplate *templ,
>   	g_object_ref_sink(pad);
>   
>   	if (gst_element_add_pad(element, pad)) {
> -		GLibLocker lock(GST_OBJECT(self));
> +		GLibRecLocker lock(&self->stream_lock);
>   		self->state->srcpads_.push_back(reinterpret_cast<GstPad *>(g_object_ref(pad)));
>   	} else {
>   		GST_ELEMENT_ERROR(element, STREAM, FAILED,
> @@ -717,7 +724,7 @@ gst_libcamera_src_release_pad(GstElement *element, GstPad *pad)
>   	GST_DEBUG_OBJECT(self, "Pad %" GST_PTR_FORMAT " being released", pad);
>   
>   	{
> -		GLibLocker lock(GST_OBJECT(self));
> +		GLibRecLocker lock(&self->stream_lock);
>   		std::vector<GstPad *> &pads = self->state->srcpads_;
>   		auto begin_iterator = pads.begin();
>   		auto end_iterator = pads.end();

Patch
diff mbox series

diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
index 6f9a03c515d2..c92ca7d29fe6 100644
--- a/src/gstreamer/gstlibcamerasrc.cpp
+++ b/src/gstreamer/gstlibcamerasrc.cpp
@@ -112,12 +112,18 @@  struct GstLibcameraSrcState {
 	std::shared_ptr<CameraManager> cm_;
 	std::shared_ptr<Camera> cam_;
 	std::unique_ptr<CameraConfiguration> config_;
-	std::vector<GstPad *> srcpads_;
+
+	std::vector<GstPad *> srcpads_; /* Protected by stream_lock */
 
 	/*
 	 * Contention on this lock_ must be minimized, as it has to be taken in
 	 * the realtime-sensitive requestCompleted() handler to protect
 	 * queuedRequests_ and completedRequests_.
+	 *
+	 * stream_lock must be taken before lock_ in contexts where both locks
+	 * need to be taken. In particular, this means that the lock_ must not
+	 * be held while calling into other graph elements (e.g. when calling
+	 * gst_pad_query()).
 	 */
 	Mutex lock_;
 	std::queue<std::unique_ptr<RequestWrap>> queuedRequests_
@@ -354,36 +360,34 @@  gst_libcamera_src_task_run(gpointer user_data)
 							srcpad, ret);
 	}
 
+	if (ret != GST_FLOW_OK) {
+		if (ret == GST_FLOW_EOS) {
+			g_autoptr(GstEvent) eos = gst_event_new_eos();
+			guint32 seqnum = gst_util_seqnum_next();
+			gst_event_set_seqnum(eos, seqnum);
+			for (GstPad *srcpad : state->srcpads_)
+				gst_pad_push_event(srcpad, gst_event_ref(eos));
+		} else if (ret != GST_FLOW_FLUSHING) {
+			GST_ELEMENT_FLOW_ERROR(self, ret);
+		}
+		gst_task_stop(self->task);
+		return;
+	}
+
+	/*
+	 * Here we need to decide if we want to pause. This needs to
+	 * happen in lock step with the callback thread which may want
+	 * to resume the task and might push pending buffers.
+	 */
+	bool do_pause;
+
 	{
-		if (ret != GST_FLOW_OK) {
-			if (ret == GST_FLOW_EOS) {
-				g_autoptr(GstEvent) eos = gst_event_new_eos();
-				guint32 seqnum = gst_util_seqnum_next();
-				gst_event_set_seqnum(eos, seqnum);
-				for (GstPad *srcpad : state->srcpads_)
-					gst_pad_push_event(srcpad, gst_event_ref(eos));
-			} else if (ret != GST_FLOW_FLUSHING) {
-				GST_ELEMENT_FLOW_ERROR(self, ret);
-			}
-			gst_task_stop(self->task);
-			return;
-		}
-
-		/*
-		 * Here we need to decide if we want to pause. This needs to
-		 * happen in lock step with the callback thread which may want
-		 * to resume the task and might push pending buffers.
-		 */
-		bool do_pause;
-
-		{
-			MutexLocker locker(state->lock_);
-			do_pause = state->completedRequests_.empty();
-		}
-
-		if (do_pause)
-			gst_task_pause(self->task);
+		MutexLocker locker(state->lock_);
+		do_pause = state->completedRequests_.empty();
 	}
+
+	if (do_pause)
+		gst_task_pause(self->task);
 }
 
 static void
@@ -537,8 +541,11 @@  gst_libcamera_src_task_leave([[maybe_unused]] GstTask *task,
 		state->completedRequests_ = {};
 	}
 
-	for (GstPad *srcpad : state->srcpads_)
-		gst_libcamera_pad_set_pool(srcpad, nullptr);
+	{
+		GLibRecLocker locker(&self->stream_lock);
+		for (GstPad *srcpad : state->srcpads_)
+			gst_libcamera_pad_set_pool(srcpad, nullptr);
+	}
 
 	g_clear_object(&self->allocator);
 	g_clear_pointer(&self->flow_combiner,
@@ -697,7 +704,7 @@  gst_libcamera_src_request_new_pad(GstElement *element, GstPadTemplate *templ,
 	g_object_ref_sink(pad);
 
 	if (gst_element_add_pad(element, pad)) {
-		GLibLocker lock(GST_OBJECT(self));
+		GLibRecLocker lock(&self->stream_lock);
 		self->state->srcpads_.push_back(reinterpret_cast<GstPad *>(g_object_ref(pad)));
 	} else {
 		GST_ELEMENT_ERROR(element, STREAM, FAILED,
@@ -717,7 +724,7 @@  gst_libcamera_src_release_pad(GstElement *element, GstPad *pad)
 	GST_DEBUG_OBJECT(self, "Pad %" GST_PTR_FORMAT " being released", pad);
 
 	{
-		GLibLocker lock(GST_OBJECT(self));
+		GLibRecLocker lock(&self->stream_lock);
 		std::vector<GstPad *> &pads = self->state->srcpads_;
 		auto begin_iterator = pads.begin();
 		auto end_iterator = pads.end();