[libcamera-devel,09/13] gstreamer: Use dedicated lock for request queues
diff mbox series

Message ID 20220623232210.18742-10-laurent.pinchart@ideasonboard.com
State Accepted
Headers show
Series
  • gstreamer: Queue multiple requests
Related show

Commit Message

Laurent Pinchart June 23, 2022, 11:22 p.m. UTC
Add a new lock to the GstLibcameraSrcState class to protect the queued
and completed requests queues. This replaces the GstObject lock, and
minimizes the lock contention between the request completion handler and
the task run handler as the former must run as fast as possible.

Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
---
 src/gstreamer/gstlibcamerasrc.cpp | 39 ++++++++++++++++++++++---------
 1 file changed, 28 insertions(+), 11 deletions(-)

Comments

Nicolas Dufresne June 28, 2022, 1:05 p.m. UTC | #1
Hi Laurent,

Le vendredi 24 juin 2022 à 02:22 +0300, Laurent Pinchart a écrit :
> Add a new lock to the GstLibcameraSrcState class to protect the queued
> and completed requests queues. This replaces the GstObject lock, and
> minimizes the lock contention between the request completion handler and
> the task run handler as the former must run as fast as possible.
> 
> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> ---
>  src/gstreamer/gstlibcamerasrc.cpp | 39 ++++++++++++++++++++++---------
>  1 file changed, 28 insertions(+), 11 deletions(-)
> 
> diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
> index e30d45fa2223..b85ba39fb808 100644
> --- a/src/gstreamer/gstlibcamerasrc.cpp
> +++ b/src/gstreamer/gstlibcamerasrc.cpp
> @@ -32,6 +32,8 @@
>  #include <queue>
>  #include <vector>
>  
> +#include <libcamera/base/mutex.h>
> +
>  #include <libcamera/camera.h>
>  #include <libcamera/camera_manager.h>
>  #include <libcamera/control_ids.h>
> @@ -111,8 +113,13 @@ struct GstLibcameraSrcState {
>  	std::shared_ptr<Camera> cam_;
>  	std::unique_ptr<CameraConfiguration> config_;
>  	std::vector<GstPad *> srcpads_;
> -	std::queue<std::unique_ptr<RequestWrap>> queuedRequests_;
> -	std::queue<std::unique_ptr<RequestWrap>> completedRequests_;
> +
> +	Mutex lock_;

I would love to see a comment explaining what this lock is for so that future
contributors knows when to take it, and when not.

Did you know that GLib Mutex are a lot faster then pthread_mutex ? Might sounds
surprising, but they don't obey all of the POSIX requirement, they remains a
perfect fit for streaming. I personally would have used these over a wrapper on
top of pthread, the change is fine though, I just thought of mentioning as you
already made pretty micro optimization removing few function call earlier in the
patchset. IIRC this is in the 20x level of improvement, at least on Linux.

> +	std::queue<std::unique_ptr<RequestWrap>> queuedRequests_
> +		LIBCAMERA_TSA_GUARDED_BY(lock_);
> +	std::queue<std::unique_ptr<RequestWrap>> completedRequests_
> +		LIBCAMERA_TSA_GUARDED_BY(lock_);

Of course, that would require extra effort to implement this "guarded by" thing.
I'm just mentioning, not action needed. There is also GstAtomicQueue if (and
only if) contention isn't expected.

> +
>  	guint group_id_;
>  
>  	void requestCompleted(Request *request);
> @@ -155,12 +162,15 @@ GstStaticPadTemplate request_src_template = {
>  void
>  GstLibcameraSrcState::requestCompleted(Request *request)
>  {
> -	GLibLocker lock(GST_OBJECT(src_));
> -
>  	GST_DEBUG_OBJECT(src_, "buffers are ready");
>  
> -	std::unique_ptr<RequestWrap> wrap = std::move(queuedRequests_.front());
> -	queuedRequests_.pop();
> +	std::unique_ptr<RequestWrap> wrap;
> +
> +	{
> +		MutexLocker locker(lock_);
> +		wrap = std::move(queuedRequests_.front());
> +		queuedRequests_.pop();
> +	}
>  
>  	g_return_if_fail(wrap->request_.get() == request);
>  
> @@ -183,7 +193,10 @@ GstLibcameraSrcState::requestCompleted(Request *request)
>  		wrap->latency_ = sys_now - timestamp;
>  	}
>  
> -	completedRequests_.push(std::move(wrap));
> +	{
> +		MutexLocker locker(lock_);
> +		completedRequests_.push(std::move(wrap));
> +	}
>  
>  	gst_task_resume(src_->task);
>  }
> @@ -289,16 +302,17 @@ gst_libcamera_src_task_run(gpointer user_data)
>  	}
>  
>  	if (wrap) {
> -		GLibLocker lock(GST_OBJECT(self));
>  		GST_TRACE_OBJECT(self, "Requesting buffers");
>  		state->cam_->queueRequest(wrap->request_.get());
> +
> +		MutexLocker locker(state->lock_);
>  		state->queuedRequests_.push(std::move(wrap));
>  
>  		/* The RequestWrap will be deleted in the completion handler. */
>  	}
>  
>  	{
> -		GLibLocker lock(GST_OBJECT(self));
> +		MutexLocker locker(state->lock_);
>  
>  		if (!state->completedRequests_.empty()) {
>  			wrap = std::move(state->completedRequests_.front());
> @@ -358,7 +372,7 @@ gst_libcamera_src_task_run(gpointer user_data)
>  		bool do_pause;
>  
>  		{
> -			GLibLocker lock(GST_OBJECT(self));
> +			MutexLocker locker(state->lock_);
>  			do_pause = state->completedRequests_.empty();
>  		}
>  
> @@ -513,7 +527,10 @@ gst_libcamera_src_task_leave([[maybe_unused]] GstTask *task,
>  
>  	state->cam_->stop();
>  
> -	state->completedRequests_ = {};
> +	{
> +		MutexLocker locker(state->lock_);
> +		state->completedRequests_ = {};
> +	}
>  
>  	for (GstPad *srcpad : state->srcpads_)
>  		gst_libcamera_pad_set_pool(srcpad, nullptr);

Does not solve concurrency between pads iteration and request/release pads, but
this wasn't the goal and was already broken.

Reviewed-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>
Laurent Pinchart June 28, 2022, 1:45 p.m. UTC | #2
Hi Nicolas,

On Tue, Jun 28, 2022 at 09:05:52AM -0400, Nicolas Dufresne wrote:
> Le vendredi 24 juin 2022 à 02:22 +0300, Laurent Pinchart a écrit :
> > Add a new lock to the GstLibcameraSrcState class to protect the queued
> > and completed requests queues. This replaces the GstObject lock, and
> > minimizes the lock contention between the request completion handler and
> > the task run handler as the former must run as fast as possible.
> > 
> > Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> > ---
> >  src/gstreamer/gstlibcamerasrc.cpp | 39 ++++++++++++++++++++++---------
> >  1 file changed, 28 insertions(+), 11 deletions(-)
> > 
> > diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
> > index e30d45fa2223..b85ba39fb808 100644
> > --- a/src/gstreamer/gstlibcamerasrc.cpp
> > +++ b/src/gstreamer/gstlibcamerasrc.cpp
> > @@ -32,6 +32,8 @@
> >  #include <queue>
> >  #include <vector>
> >  
> > +#include <libcamera/base/mutex.h>
> > +
> >  #include <libcamera/camera.h>
> >  #include <libcamera/camera_manager.h>
> >  #include <libcamera/control_ids.h>
> > @@ -111,8 +113,13 @@ struct GstLibcameraSrcState {
> >  	std::shared_ptr<Camera> cam_;
> >  	std::unique_ptr<CameraConfiguration> config_;
> >  	std::vector<GstPad *> srcpads_;
> > -	std::queue<std::unique_ptr<RequestWrap>> queuedRequests_;
> > -	std::queue<std::unique_ptr<RequestWrap>> completedRequests_;
> > +
> > +	Mutex lock_;
> 
> I would love to see a comment explaining what this lock is for so that future
> contributors knows when to take it, and when not.
> 
> Did you know that GLib Mutex are a lot faster then pthread_mutex ? Might sounds
> surprising, but they don't obey all of the POSIX requirement, they remains a
> perfect fit for streaming. I personally would have used these over a wrapper on
> top of pthread, the change is fine though, I just thought of mentioning as you
> already made pretty micro optimization removing few function call earlier in the
> patchset. IIRC this is in the 20x level of improvement, at least on Linux.
> 
> > +	std::queue<std::unique_ptr<RequestWrap>> queuedRequests_
> > +		LIBCAMERA_TSA_GUARDED_BY(lock_);
> > +	std::queue<std::unique_ptr<RequestWrap>> completedRequests_
> > +		LIBCAMERA_TSA_GUARDED_BY(lock_);
> 
> Of course, that would require extra effort to implement this "guarded by" thing.
> I'm just mentioning, not action needed. There is also GstAtomicQueue if (and
> only if) contention isn't expected.

The thread-safefy annotation is the reason why I picked the Mutex class,
as it addresses your comment about documenting the data protected by the
lock. It's also nice to get the compiler to validate the locking
patterns. It's not strictly mandatory though.

I wasn't aware of the differences in performance between the pthread
mutex implementation and the GLib implementation, and I don't know how
much difference it would make in practice here. Would you rather replace
Mutex + TSA with GLibMutex + a comment ? Or should this be done on top
when/if needed ?

> > +
> >  	guint group_id_;
> >  
> >  	void requestCompleted(Request *request);
> > @@ -155,12 +162,15 @@ GstStaticPadTemplate request_src_template = {
> >  void
> >  GstLibcameraSrcState::requestCompleted(Request *request)
> >  {
> > -	GLibLocker lock(GST_OBJECT(src_));
> > -
> >  	GST_DEBUG_OBJECT(src_, "buffers are ready");
> >  
> > -	std::unique_ptr<RequestWrap> wrap = std::move(queuedRequests_.front());
> > -	queuedRequests_.pop();
> > +	std::unique_ptr<RequestWrap> wrap;
> > +
> > +	{
> > +		MutexLocker locker(lock_);
> > +		wrap = std::move(queuedRequests_.front());
> > +		queuedRequests_.pop();
> > +	}
> >  
> >  	g_return_if_fail(wrap->request_.get() == request);
> >  
> > @@ -183,7 +193,10 @@ GstLibcameraSrcState::requestCompleted(Request *request)
> >  		wrap->latency_ = sys_now - timestamp;
> >  	}
> >  
> > -	completedRequests_.push(std::move(wrap));
> > +	{
> > +		MutexLocker locker(lock_);
> > +		completedRequests_.push(std::move(wrap));
> > +	}
> >  
> >  	gst_task_resume(src_->task);
> >  }
> > @@ -289,16 +302,17 @@ gst_libcamera_src_task_run(gpointer user_data)
> >  	}
> >  
> >  	if (wrap) {
> > -		GLibLocker lock(GST_OBJECT(self));
> >  		GST_TRACE_OBJECT(self, "Requesting buffers");
> >  		state->cam_->queueRequest(wrap->request_.get());
> > +
> > +		MutexLocker locker(state->lock_);
> >  		state->queuedRequests_.push(std::move(wrap));
> >  
> >  		/* The RequestWrap will be deleted in the completion handler. */
> >  	}
> >  
> >  	{
> > -		GLibLocker lock(GST_OBJECT(self));
> > +		MutexLocker locker(state->lock_);
> >  
> >  		if (!state->completedRequests_.empty()) {
> >  			wrap = std::move(state->completedRequests_.front());
> > @@ -358,7 +372,7 @@ gst_libcamera_src_task_run(gpointer user_data)
> >  		bool do_pause;
> >  
> >  		{
> > -			GLibLocker lock(GST_OBJECT(self));
> > +			MutexLocker locker(state->lock_);
> >  			do_pause = state->completedRequests_.empty();
> >  		}
> >  
> > @@ -513,7 +527,10 @@ gst_libcamera_src_task_leave([[maybe_unused]] GstTask *task,
> >  
> >  	state->cam_->stop();
> >  
> > -	state->completedRequests_ = {};
> > +	{
> > +		MutexLocker locker(state->lock_);
> > +		state->completedRequests_ = {};
> > +	}
> >  
> >  	for (GstPad *srcpad : state->srcpads_)
> >  		gst_libcamera_pad_set_pool(srcpad, nullptr);
> 
> Does not solve concurrency between pads iteration and request/release pads, but
> this wasn't the goal and was already broken.

Hopefully the next patches do :-)

> Reviewed-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>
Nicolas Dufresne June 28, 2022, 1:53 p.m. UTC | #3
Le mardi 28 juin 2022 à 16:45 +0300, Laurent Pinchart a écrit :
> Hi Nicolas,
> 
> On Tue, Jun 28, 2022 at 09:05:52AM -0400, Nicolas Dufresne wrote:
> > Le vendredi 24 juin 2022 à 02:22 +0300, Laurent Pinchart a écrit :
> > > Add a new lock to the GstLibcameraSrcState class to protect the queued
> > > and completed requests queues. This replaces the GstObject lock, and
> > > minimizes the lock contention between the request completion handler and
> > > the task run handler as the former must run as fast as possible.
> > > 
> > > Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> > > ---
> > >  src/gstreamer/gstlibcamerasrc.cpp | 39 ++++++++++++++++++++++---------
> > >  1 file changed, 28 insertions(+), 11 deletions(-)
> > > 
> > > diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
> > > index e30d45fa2223..b85ba39fb808 100644
> > > --- a/src/gstreamer/gstlibcamerasrc.cpp
> > > +++ b/src/gstreamer/gstlibcamerasrc.cpp
> > > @@ -32,6 +32,8 @@
> > >  #include <queue>
> > >  #include <vector>
> > >  
> > > +#include <libcamera/base/mutex.h>
> > > +
> > >  #include <libcamera/camera.h>
> > >  #include <libcamera/camera_manager.h>
> > >  #include <libcamera/control_ids.h>
> > > @@ -111,8 +113,13 @@ struct GstLibcameraSrcState {
> > >  	std::shared_ptr<Camera> cam_;
> > >  	std::unique_ptr<CameraConfiguration> config_;
> > >  	std::vector<GstPad *> srcpads_;
> > > -	std::queue<std::unique_ptr<RequestWrap>> queuedRequests_;
> > > -	std::queue<std::unique_ptr<RequestWrap>> completedRequests_;
> > > +
> > > +	Mutex lock_;
> > 
> > I would love to see a comment explaining what this lock is for so that future
> > contributors knows when to take it, and when not.
> > 
> > Did you know that GLib Mutex are a lot faster then pthread_mutex ? Might sounds
> > surprising, but they don't obey all of the POSIX requirement, they remains a
> > perfect fit for streaming. I personally would have used these over a wrapper on
> > top of pthread, the change is fine though, I just thought of mentioning as you
> > already made pretty micro optimization removing few function call earlier in the
> > patchset. IIRC this is in the 20x level of improvement, at least on Linux.
> > 
> > > +	std::queue<std::unique_ptr<RequestWrap>> queuedRequests_
> > > +		LIBCAMERA_TSA_GUARDED_BY(lock_);
> > > +	std::queue<std::unique_ptr<RequestWrap>> completedRequests_
> > > +		LIBCAMERA_TSA_GUARDED_BY(lock_);
> > 
> > Of course, that would require extra effort to implement this "guarded by" thing.
> > I'm just mentioning, not action needed. There is also GstAtomicQueue if (and
> > only if) contention isn't expected.
> 
> The thread-safefy annotation is the reason why I picked the Mutex class,
> as it addresses your comment about documenting the data protected by the
> lock. It's also nice to get the compiler to validate the locking
> patterns. It's not strictly mandatory though.
> 
> I wasn't aware of the differences in performance between the pthread
> mutex implementation and the GLib implementation, and I don't know how
> much difference it would make in practice here. Would you rather replace
> Mutex + TSA with GLibMutex + a comment ? Or should this be done on top
> when/if needed ?

Only if needed. It does matter for lower latency audio, but for video which is
pretty low rate it will rarely make a big difference. This comment was just to
keep you inform that GMutex is not a pthread_mutex_t (and not POSIX compliant,
which is the reason pthread under perform). It might matter when we start
handling frame burst and highly concurrent streaming (e.g. 100 or more streams).

> 
> > > +
> > >  	guint group_id_;
> > >  
> > >  	void requestCompleted(Request *request);
> > > @@ -155,12 +162,15 @@ GstStaticPadTemplate request_src_template = {
> > >  void
> > >  GstLibcameraSrcState::requestCompleted(Request *request)
> > >  {
> > > -	GLibLocker lock(GST_OBJECT(src_));
> > > -
> > >  	GST_DEBUG_OBJECT(src_, "buffers are ready");
> > >  
> > > -	std::unique_ptr<RequestWrap> wrap = std::move(queuedRequests_.front());
> > > -	queuedRequests_.pop();
> > > +	std::unique_ptr<RequestWrap> wrap;
> > > +
> > > +	{
> > > +		MutexLocker locker(lock_);
> > > +		wrap = std::move(queuedRequests_.front());
> > > +		queuedRequests_.pop();
> > > +	}
> > >  
> > >  	g_return_if_fail(wrap->request_.get() == request);
> > >  
> > > @@ -183,7 +193,10 @@ GstLibcameraSrcState::requestCompleted(Request *request)
> > >  		wrap->latency_ = sys_now - timestamp;
> > >  	}
> > >  
> > > -	completedRequests_.push(std::move(wrap));
> > > +	{
> > > +		MutexLocker locker(lock_);
> > > +		completedRequests_.push(std::move(wrap));
> > > +	}
> > >  
> > >  	gst_task_resume(src_->task);
> > >  }
> > > @@ -289,16 +302,17 @@ gst_libcamera_src_task_run(gpointer user_data)
> > >  	}
> > >  
> > >  	if (wrap) {
> > > -		GLibLocker lock(GST_OBJECT(self));
> > >  		GST_TRACE_OBJECT(self, "Requesting buffers");
> > >  		state->cam_->queueRequest(wrap->request_.get());
> > > +
> > > +		MutexLocker locker(state->lock_);
> > >  		state->queuedRequests_.push(std::move(wrap));
> > >  
> > >  		/* The RequestWrap will be deleted in the completion handler. */
> > >  	}
> > >  
> > >  	{
> > > -		GLibLocker lock(GST_OBJECT(self));
> > > +		MutexLocker locker(state->lock_);
> > >  
> > >  		if (!state->completedRequests_.empty()) {
> > >  			wrap = std::move(state->completedRequests_.front());
> > > @@ -358,7 +372,7 @@ gst_libcamera_src_task_run(gpointer user_data)
> > >  		bool do_pause;
> > >  
> > >  		{
> > > -			GLibLocker lock(GST_OBJECT(self));
> > > +			MutexLocker locker(state->lock_);
> > >  			do_pause = state->completedRequests_.empty();
> > >  		}
> > >  
> > > @@ -513,7 +527,10 @@ gst_libcamera_src_task_leave([[maybe_unused]] GstTask *task,
> > >  
> > >  	state->cam_->stop();
> > >  
> > > -	state->completedRequests_ = {};
> > > +	{
> > > +		MutexLocker locker(state->lock_);
> > > +		state->completedRequests_ = {};
> > > +	}
> > >  
> > >  	for (GstPad *srcpad : state->srcpads_)
> > >  		gst_libcamera_pad_set_pool(srcpad, nullptr);
> > 
> > Does not solve concurrency between pads iteration and request/release pads, but
> > this wasn't the goal and was already broken.
> 
> Hopefully the next patches do :-)
> 
> > Reviewed-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>
>

Patch
diff mbox series

diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
index e30d45fa2223..b85ba39fb808 100644
--- a/src/gstreamer/gstlibcamerasrc.cpp
+++ b/src/gstreamer/gstlibcamerasrc.cpp
@@ -32,6 +32,8 @@ 
 #include <queue>
 #include <vector>
 
+#include <libcamera/base/mutex.h>
+
 #include <libcamera/camera.h>
 #include <libcamera/camera_manager.h>
 #include <libcamera/control_ids.h>
@@ -111,8 +113,13 @@  struct GstLibcameraSrcState {
 	std::shared_ptr<Camera> cam_;
 	std::unique_ptr<CameraConfiguration> config_;
 	std::vector<GstPad *> srcpads_;
-	std::queue<std::unique_ptr<RequestWrap>> queuedRequests_;
-	std::queue<std::unique_ptr<RequestWrap>> completedRequests_;
+
+	Mutex lock_;
+	std::queue<std::unique_ptr<RequestWrap>> queuedRequests_
+		LIBCAMERA_TSA_GUARDED_BY(lock_);
+	std::queue<std::unique_ptr<RequestWrap>> completedRequests_
+		LIBCAMERA_TSA_GUARDED_BY(lock_);
+
 	guint group_id_;
 
 	void requestCompleted(Request *request);
@@ -155,12 +162,15 @@  GstStaticPadTemplate request_src_template = {
 void
 GstLibcameraSrcState::requestCompleted(Request *request)
 {
-	GLibLocker lock(GST_OBJECT(src_));
-
 	GST_DEBUG_OBJECT(src_, "buffers are ready");
 
-	std::unique_ptr<RequestWrap> wrap = std::move(queuedRequests_.front());
-	queuedRequests_.pop();
+	std::unique_ptr<RequestWrap> wrap;
+
+	{
+		MutexLocker locker(lock_);
+		wrap = std::move(queuedRequests_.front());
+		queuedRequests_.pop();
+	}
 
 	g_return_if_fail(wrap->request_.get() == request);
 
@@ -183,7 +193,10 @@  GstLibcameraSrcState::requestCompleted(Request *request)
 		wrap->latency_ = sys_now - timestamp;
 	}
 
-	completedRequests_.push(std::move(wrap));
+	{
+		MutexLocker locker(lock_);
+		completedRequests_.push(std::move(wrap));
+	}
 
 	gst_task_resume(src_->task);
 }
@@ -289,16 +302,17 @@  gst_libcamera_src_task_run(gpointer user_data)
 	}
 
 	if (wrap) {
-		GLibLocker lock(GST_OBJECT(self));
 		GST_TRACE_OBJECT(self, "Requesting buffers");
 		state->cam_->queueRequest(wrap->request_.get());
+
+		MutexLocker locker(state->lock_);
 		state->queuedRequests_.push(std::move(wrap));
 
 		/* The RequestWrap will be deleted in the completion handler. */
 	}
 
 	{
-		GLibLocker lock(GST_OBJECT(self));
+		MutexLocker locker(state->lock_);
 
 		if (!state->completedRequests_.empty()) {
 			wrap = std::move(state->completedRequests_.front());
@@ -358,7 +372,7 @@  gst_libcamera_src_task_run(gpointer user_data)
 		bool do_pause;
 
 		{
-			GLibLocker lock(GST_OBJECT(self));
+			MutexLocker locker(state->lock_);
 			do_pause = state->completedRequests_.empty();
 		}
 
@@ -513,7 +527,10 @@  gst_libcamera_src_task_leave([[maybe_unused]] GstTask *task,
 
 	state->cam_->stop();
 
-	state->completedRequests_ = {};
+	{
+		MutexLocker locker(state->lock_);
+		state->completedRequests_ = {};
+	}
 
 	for (GstPad *srcpad : state->srcpads_)
 		gst_libcamera_pad_set_pool(srcpad, nullptr);