[libcamera-devel,v2,11/12] gstreamer: Split completed request processing to a separate function
diff mbox series

Message ID 20220630000251.31295-12-laurent.pinchart@ideasonboard.com
State Accepted
Headers show
Series
  • gstreamer: Queue multiple requests
Related show

Commit Message

Laurent Pinchart June 30, 2022, 12:02 a.m. UTC
Simplify the task run function futher by moving the processing of
completed requests to a separate function. No functional change
intended, only increased readability.

Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
Reviewed-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>
---
Changes since v1:

- Add comment about locking requirement
---
 src/gstreamer/gstlibcamerasrc.cpp | 126 ++++++++++++++++++------------
 1 file changed, 74 insertions(+), 52 deletions(-)

Comments

Umang Jain June 30, 2022, 9:14 a.m. UTC | #1
Hi Laurent,

On 6/30/22 05:32, Laurent Pinchart via libcamera-devel wrote:
> Simplify the task run function futher by moving the processing of
> completed requests to a separate function. No functional change
> intended, only increased readability.
>
> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> Reviewed-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>


Reviewed-by: Umang Jain <umang.jain@ideasonboard.com>

> ---
> Changes since v1:
>
> - Add comment about locking requirement
> ---
>   src/gstreamer/gstlibcamerasrc.cpp | 126 ++++++++++++++++++------------
>   1 file changed, 74 insertions(+), 52 deletions(-)
>
> diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
> index d63083d0cd8f..9ea59631a9f2 100644
> --- a/src/gstreamer/gstlibcamerasrc.cpp
> +++ b/src/gstreamer/gstlibcamerasrc.cpp
> @@ -135,6 +135,7 @@ struct GstLibcameraSrcState {
>   
>   	int queueRequest();
>   	void requestCompleted(Request *request);
> +	int processRequest();
>   };
>   
>   struct _GstLibcameraSrc {
> @@ -254,6 +255,64 @@ GstLibcameraSrcState::requestCompleted(Request *request)
>   	gst_task_resume(src_->task);
>   }
>   
> +/* Must be called with stream_lock held. */
> +int GstLibcameraSrcState::processRequest()
> +{
> +	std::unique_ptr<RequestWrap> wrap;
> +
> +	{
> +		MutexLocker locker(lock_);
> +
> +		if (!completedRequests_.empty()) {
> +			wrap = std::move(completedRequests_.front());
> +			completedRequests_.pop();
> +		}
> +	}
> +
> +	if (!wrap)
> +		return -ENODATA;
> +
> +	GstFlowReturn ret = GST_FLOW_OK;
> +	gst_flow_combiner_reset(src_->flow_combiner);
> +
> +	for (GstPad *srcpad : srcpads_) {
> +		Stream *stream = gst_libcamera_pad_get_stream(srcpad);
> +		GstBuffer *buffer = wrap->detachBuffer(stream);
> +
> +		FrameBuffer *fb = gst_libcamera_buffer_get_frame_buffer(buffer);
> +
> +		if (GST_CLOCK_TIME_IS_VALID(wrap->pts_)) {
> +			GST_BUFFER_PTS(buffer) = wrap->pts_;
> +			gst_libcamera_pad_set_latency(srcpad, wrap->latency_);
> +		} else {
> +			GST_BUFFER_PTS(buffer) = 0;
> +		}
> +
> +		GST_BUFFER_OFFSET(buffer) = fb->metadata().sequence;
> +		GST_BUFFER_OFFSET_END(buffer) = fb->metadata().sequence;
> +
> +		ret = gst_pad_push(srcpad, buffer);
> +		ret = gst_flow_combiner_update_pad_flow(src_->flow_combiner,
> +							srcpad, ret);
> +	}
> +
> +	if (ret != GST_FLOW_OK) {
> +		if (ret == GST_FLOW_EOS) {
> +			g_autoptr(GstEvent) eos = gst_event_new_eos();
> +			guint32 seqnum = gst_util_seqnum_next();
> +			gst_event_set_seqnum(eos, seqnum);
> +			for (GstPad *srcpad : srcpads_)
> +				gst_pad_push_event(srcpad, gst_event_ref(eos));
> +		} else if (ret != GST_FLOW_FLUSHING) {
> +			GST_ELEMENT_FLOW_ERROR(src_, ret);
> +		}
> +
> +		return -EPIPE;
> +	}
> +
> +	return 0;
> +}
> +
>   static bool
>   gst_libcamera_src_open(GstLibcameraSrc *self)
>   {
> @@ -321,8 +380,13 @@ gst_libcamera_src_task_run(gpointer user_data)
>   	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
>   	GstLibcameraSrcState *state = self->state;
>   
> -	int err = state->queueRequest();
> -	if (err == -ENOMEM) {
> +	/*
> +	 * Create and queue one request. If no buffers are available the
> +	 * function returns -ENOBUFS, which we ignore here as that's not a
> +	 * fatal error.
> +	 */
> +	int ret = state->queueRequest();
> +	if (ret == -ENOMEM) {
>   		GST_ELEMENT_ERROR(self, RESOURCE, NO_SPACE_LEFT,
>   				  ("Failed to allocate request for camera '%s'.",
>   				   state->cam_->id().c_str()),
> @@ -331,58 +395,16 @@ gst_libcamera_src_task_run(gpointer user_data)
>   		return;
>   	}
>   
> -	std::unique_ptr<RequestWrap> wrap;
> -
> -	{
> -		MutexLocker locker(state->lock_);
> -
> -		if (!state->completedRequests_.empty()) {
> -			wrap = std::move(state->completedRequests_.front());
> -			state->completedRequests_.pop();
> -		}
> -	}
> -
> -	if (!wrap) {
> -		gst_task_pause(self->task);
> -		return;
> -	}
> -
> -	GstFlowReturn ret = GST_FLOW_OK;
> -	gst_flow_combiner_reset(self->flow_combiner);
> -
> -	for (GstPad *srcpad : state->srcpads_) {
> -		Stream *stream = gst_libcamera_pad_get_stream(srcpad);
> -		GstBuffer *buffer = wrap->detachBuffer(stream);
> -
> -		FrameBuffer *fb = gst_libcamera_buffer_get_frame_buffer(buffer);
> -
> -		if (GST_CLOCK_TIME_IS_VALID(wrap->pts_)) {
> -			GST_BUFFER_PTS(buffer) = wrap->pts_;
> -			gst_libcamera_pad_set_latency(srcpad, wrap->latency_);
> -		} else {
> -			GST_BUFFER_PTS(buffer) = 0;
> -		}
> -
> -		GST_BUFFER_OFFSET(buffer) = fb->metadata().sequence;
> -		GST_BUFFER_OFFSET_END(buffer) = fb->metadata().sequence;
> -
> -		ret = gst_pad_push(srcpad, buffer);
> -		ret = gst_flow_combiner_update_pad_flow(self->flow_combiner,
> -							srcpad, ret);
> -	}
> -
> -	if (ret != GST_FLOW_OK) {
> -		if (ret == GST_FLOW_EOS) {
> -			g_autoptr(GstEvent) eos = gst_event_new_eos();
> -			guint32 seqnum = gst_util_seqnum_next();
> -			gst_event_set_seqnum(eos, seqnum);
> -			for (GstPad *srcpad : state->srcpads_)
> -				gst_pad_push_event(srcpad, gst_event_ref(eos));
> -		} else if (ret != GST_FLOW_FLUSHING) {
> -			GST_ELEMENT_FLOW_ERROR(self, ret);
> -		}
> +	/* Process one completed request, if available. */
> +	ret = state->processRequest();
> +	switch (ret) {
> +	case -EPIPE:
>   		gst_task_stop(self->task);
>   		return;
> +
> +	case -ENODATA:
> +		gst_task_pause(self->task);
> +		return;
>   	}
>   
>   	/*

Patch
diff mbox series

diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
index d63083d0cd8f..9ea59631a9f2 100644
--- a/src/gstreamer/gstlibcamerasrc.cpp
+++ b/src/gstreamer/gstlibcamerasrc.cpp
@@ -135,6 +135,7 @@  struct GstLibcameraSrcState {
 
 	int queueRequest();
 	void requestCompleted(Request *request);
+	int processRequest();
 };
 
 struct _GstLibcameraSrc {
@@ -254,6 +255,64 @@  GstLibcameraSrcState::requestCompleted(Request *request)
 	gst_task_resume(src_->task);
 }
 
+/* Must be called with stream_lock held. */
+int GstLibcameraSrcState::processRequest()
+{
+	std::unique_ptr<RequestWrap> wrap;
+
+	{
+		MutexLocker locker(lock_);
+
+		if (!completedRequests_.empty()) {
+			wrap = std::move(completedRequests_.front());
+			completedRequests_.pop();
+		}
+	}
+
+	if (!wrap)
+		return -ENODATA;
+
+	GstFlowReturn ret = GST_FLOW_OK;
+	gst_flow_combiner_reset(src_->flow_combiner);
+
+	for (GstPad *srcpad : srcpads_) {
+		Stream *stream = gst_libcamera_pad_get_stream(srcpad);
+		GstBuffer *buffer = wrap->detachBuffer(stream);
+
+		FrameBuffer *fb = gst_libcamera_buffer_get_frame_buffer(buffer);
+
+		if (GST_CLOCK_TIME_IS_VALID(wrap->pts_)) {
+			GST_BUFFER_PTS(buffer) = wrap->pts_;
+			gst_libcamera_pad_set_latency(srcpad, wrap->latency_);
+		} else {
+			GST_BUFFER_PTS(buffer) = 0;
+		}
+
+		GST_BUFFER_OFFSET(buffer) = fb->metadata().sequence;
+		GST_BUFFER_OFFSET_END(buffer) = fb->metadata().sequence;
+
+		ret = gst_pad_push(srcpad, buffer);
+		ret = gst_flow_combiner_update_pad_flow(src_->flow_combiner,
+							srcpad, ret);
+	}
+
+	if (ret != GST_FLOW_OK) {
+		if (ret == GST_FLOW_EOS) {
+			g_autoptr(GstEvent) eos = gst_event_new_eos();
+			guint32 seqnum = gst_util_seqnum_next();
+			gst_event_set_seqnum(eos, seqnum);
+			for (GstPad *srcpad : srcpads_)
+				gst_pad_push_event(srcpad, gst_event_ref(eos));
+		} else if (ret != GST_FLOW_FLUSHING) {
+			GST_ELEMENT_FLOW_ERROR(src_, ret);
+		}
+
+		return -EPIPE;
+	}
+
+	return 0;
+}
+
 static bool
 gst_libcamera_src_open(GstLibcameraSrc *self)
 {
@@ -321,8 +380,13 @@  gst_libcamera_src_task_run(gpointer user_data)
 	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
 	GstLibcameraSrcState *state = self->state;
 
-	int err = state->queueRequest();
-	if (err == -ENOMEM) {
+	/*
+	 * Create and queue one request. If no buffers are available the
+	 * function returns -ENOBUFS, which we ignore here as that's not a
+	 * fatal error.
+	 */
+	int ret = state->queueRequest();
+	if (ret == -ENOMEM) {
 		GST_ELEMENT_ERROR(self, RESOURCE, NO_SPACE_LEFT,
 				  ("Failed to allocate request for camera '%s'.",
 				   state->cam_->id().c_str()),
@@ -331,58 +395,16 @@  gst_libcamera_src_task_run(gpointer user_data)
 		return;
 	}
 
-	std::unique_ptr<RequestWrap> wrap;
-
-	{
-		MutexLocker locker(state->lock_);
-
-		if (!state->completedRequests_.empty()) {
-			wrap = std::move(state->completedRequests_.front());
-			state->completedRequests_.pop();
-		}
-	}
-
-	if (!wrap) {
-		gst_task_pause(self->task);
-		return;
-	}
-
-	GstFlowReturn ret = GST_FLOW_OK;
-	gst_flow_combiner_reset(self->flow_combiner);
-
-	for (GstPad *srcpad : state->srcpads_) {
-		Stream *stream = gst_libcamera_pad_get_stream(srcpad);
-		GstBuffer *buffer = wrap->detachBuffer(stream);
-
-		FrameBuffer *fb = gst_libcamera_buffer_get_frame_buffer(buffer);
-
-		if (GST_CLOCK_TIME_IS_VALID(wrap->pts_)) {
-			GST_BUFFER_PTS(buffer) = wrap->pts_;
-			gst_libcamera_pad_set_latency(srcpad, wrap->latency_);
-		} else {
-			GST_BUFFER_PTS(buffer) = 0;
-		}
-
-		GST_BUFFER_OFFSET(buffer) = fb->metadata().sequence;
-		GST_BUFFER_OFFSET_END(buffer) = fb->metadata().sequence;
-
-		ret = gst_pad_push(srcpad, buffer);
-		ret = gst_flow_combiner_update_pad_flow(self->flow_combiner,
-							srcpad, ret);
-	}
-
-	if (ret != GST_FLOW_OK) {
-		if (ret == GST_FLOW_EOS) {
-			g_autoptr(GstEvent) eos = gst_event_new_eos();
-			guint32 seqnum = gst_util_seqnum_next();
-			gst_event_set_seqnum(eos, seqnum);
-			for (GstPad *srcpad : state->srcpads_)
-				gst_pad_push_event(srcpad, gst_event_ref(eos));
-		} else if (ret != GST_FLOW_FLUSHING) {
-			GST_ELEMENT_FLOW_ERROR(self, ret);
-		}
+	/* Process one completed request, if available. */
+	ret = state->processRequest();
+	switch (ret) {
+	case -EPIPE:
 		gst_task_stop(self->task);
 		return;
+
+	case -ENODATA:
+		gst_task_pause(self->task);
+		return;
 	}
 
 	/*