[libcamera-devel,v1,22/23] gst: libcamerasrc: Implement initial streaming

Message ID 20200129033210.278800-23-nicolas@ndufresne.ca
State Superseded
Headers show
Series
  • GStreamer Element for libcamera
Related show

Commit Message

Nicolas Dufresne Jan. 29, 2020, 3:32 a.m. UTC
From: Nicolas Dufresne <nicolas.dufresne@collabora.com>

With this patch, the element is not able to push buffers to the next
element in the graph. The buffers are currently missing any metadata
like timestamp, sequence number. The handling of the GstFlowReturn
for multiple pads isn't using a GstFlowCombiner as it should.

Signed-off-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>
---
 src/gstreamer/gstlibcamerapad.cpp |  18 ++-
 src/gstreamer/gstlibcamerapad.h   |   2 +
 src/gstreamer/gstlibcamerasrc.cpp | 191 +++++++++++++++++++++++++++++-
 3 files changed, 208 insertions(+), 3 deletions(-)

Comments

Kieran Bingham Jan. 29, 2020, 12:17 p.m. UTC | #1
Hi Nicolas,

On 29/01/2020 03:32, Nicolas Dufresne wrote:
> From: Nicolas Dufresne <nicolas.dufresne@collabora.com>
> 
> With this patch, the element is not able to push buffers to the next
> element in the graph. The buffers are currently missing any metadata
> like timestamp, sequence number. The handling of the GstFlowReturn
> for multiple pads isn't using a GstFlowCombiner as it should.

Aha, so perhaps this patch is not intended to be built/used yet and is
just for example purposes?


> Signed-off-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>
> ---
>  src/gstreamer/gstlibcamerapad.cpp |  18 ++-
>  src/gstreamer/gstlibcamerapad.h   |   2 +
>  src/gstreamer/gstlibcamerasrc.cpp | 191 +++++++++++++++++++++++++++++-
>  3 files changed, 208 insertions(+), 3 deletions(-)
> 
> diff --git a/src/gstreamer/gstlibcamerapad.cpp b/src/gstreamer/gstlibcamerapad.cpp
> index 4a775e4..7bc44c1 100644
> --- a/src/gstreamer/gstlibcamerapad.cpp
> +++ b/src/gstreamer/gstlibcamerapad.cpp
> @@ -152,7 +152,7 @@ gst_libcamera_pad_push_pending(GstPad *pad)
>  {
>  	auto *self = GST_LIBCAMERA_PAD(pad);
>  	GstBuffer *buffer;
> -	GstFlowReturn ret = GST_FLOW_CUSTOM_SUCCESS;
> +	GstFlowReturn ret = GST_FLOW_OK;
>  
>  	{
>  		GST_OBJECT_LOCKER(self);
> @@ -164,3 +164,19 @@ gst_libcamera_pad_push_pending(GstPad *pad)
>  
>  	return ret;
>  }
> +
> +bool
> +gst_libcamera_pad_has_pending(GstPad *pad)
> +{
> +	auto *self = GST_LIBCAMERA_PAD(pad);
> +	GST_OBJECT_LOCKER(self);
> +	return (self->pending_buffers.length > 0);
> +}
> +
> +void
> +gst_libcamera_pad_set_latency(GstPad *pad, GstClockTime latency)
> +{
> +	auto *self = GST_LIBCAMERA_PAD(pad);
> +	GST_OBJECT_LOCKER(self);
> +	self->latency = latency;

self->latency isn't yet defined, and I don't think this function is used
- so probably needs to be moved to the timestamp patch you are working on.

> +}
> diff --git a/src/gstreamer/gstlibcamerapad.h b/src/gstreamer/gstlibcamerapad.h
> index d928570..eb24000 100644
> --- a/src/gstreamer/gstlibcamerapad.h
> +++ b/src/gstreamer/gstlibcamerapad.h
> @@ -30,4 +30,6 @@ void gst_libcamera_pad_queue_buffer(GstPad *pad, GstBuffer *buffer);
>  
>  GstFlowReturn gst_libcamera_pad_push_pending(GstPad *pad);
>  
> +bool gst_libcamera_pad_has_pending(GstPad *pad);
> +
>  #endif /* __GST_LIBCAMERA_PAD_H__ */
> diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
> index 5fc4393..947a8bf 100644
> --- a/src/gstreamer/gstlibcamerasrc.cpp
> +++ b/src/gstreamer/gstlibcamerasrc.cpp
> @@ -12,8 +12,10 @@
>  #include "gstlibcamerapool.h"
>  #include "gstlibcamera-utils.h"
>  
> +#include <queue>
>  #include <libcamera/camera.h>
>  #include <libcamera/camera_manager.h>
> +#include <gst/base/base.h>
>  
>  using namespace libcamera;
>  
> @@ -22,12 +24,73 @@ GST_DEBUG_CATEGORY_STATIC(source_debug);
>  
>  #define STREAM_LOCKER(obj) g_autoptr(GRecMutexLocker) stream_locker = g_rec_mutex_locker_new(&GST_LIBCAMERA_SRC(obj)->stream_lock)
>  
> -/* Used for C++ object with destructors */
> +struct RequestWrap {
> +	RequestWrap(Request *request);
> +	~RequestWrap();
> +
> +	void AttachBuffer(GstBuffer *buffer);
> +	GstBuffer *DetachBuffer(Stream *stream);
> +
> +	/* For ptr comparision only */
> +	Request *request_;
> +	std::map<Stream *, GstBuffer *> buffers_;
> +};
> +
> +RequestWrap::RequestWrap(Request *request)
> +	: request_(request)
> +{
> +}
> +
> +RequestWrap::~RequestWrap()
> +{
> +	for (std::pair<Stream * const, GstBuffer *> &item : buffers_) {
> +		if (item.second)
> +			gst_buffer_unref(item.second);
> +	}
> +}
> +
> +void
> +RequestWrap::AttachBuffer(GstBuffer *buffer)
> +{
> +	FrameBuffer *fb = gst_libcamera_buffer_get_frame_buffer(buffer);
> +	Stream *stream = gst_libcamera_buffer_get_stream(buffer);
> +
> +	request_->addBuffer(stream, fb);
> +
> +	auto item = buffers_.find(stream);
> +	if (item != buffers_.end()) {
> +		gst_buffer_unref(item->second);
> +		item->second = buffer;
> +	} else {
> +		buffers_[stream] = buffer;
> +	}
> +}
> +
> +GstBuffer *
> +RequestWrap::DetachBuffer(Stream *stream)
> +{
> +	GstBuffer *buffer = nullptr;
> +
> +	auto item = buffers_.find(stream);
> +	if (item != buffers_.end()) {
> +		buffer = item->second;
> +		item->second = nullptr;
> +	}
> +
> +	return buffer;
> +}
> +
> +/* Used for C++ object with destructors and callbacks */
>  struct GstLibcameraSrcState {
> +	GstLibcameraSrc *src;
> +
>  	std::shared_ptr<CameraManager> cm;
>  	std::shared_ptr<Camera> cam;
>  	std::unique_ptr<CameraConfiguration> config;
>  	std::vector<GstPad *> srcpads;
> +	std::queue<std::unique_ptr<RequestWrap>> requests;
> +
> +	void requestCompleted(Request *request);
>  };
>  
>  struct _GstLibcameraSrc {
> @@ -40,6 +103,7 @@ struct _GstLibcameraSrc {
>  
>  	GstLibcameraSrcState *state;
>  	GstLibcameraAllocator *allocator;
> +	GstFlowCombiner *flow_combiner;
>  };
>  
>  enum {
> @@ -63,6 +127,41 @@ GstStaticPadTemplate request_src_template = {
>  	"src_%s", GST_PAD_SRC, GST_PAD_REQUEST, TEMPLATE_CAPS
>  };
>  
> +void
> +GstLibcameraSrcState::requestCompleted(Request *request)
> +{
> +	GST_OBJECT_LOCKER(this->src);
> +
> +	GST_DEBUG_OBJECT(this->src, "buffers are ready");
> +
> +	std::unique_ptr<RequestWrap> wrap = std::move(this->requests.front());
> +	this->requests.pop();
> +
> +	g_return_if_fail(wrap->request_ == request);
> +
> +	if ((request->status() == Request::RequestCancelled)) {
> +		GST_DEBUG_OBJECT(this->src, "Request was cancelled");
> +		return;
> +	}
> +
> +	GstBuffer *buffer;
> +	for (GstPad *srcpad : this->srcpads) {
> +		Stream *stream = gst_libcamera_pad_get_stream(srcpad);
> +		buffer = wrap->DetachBuffer(stream);
> +		gst_libcamera_pad_queue_buffer(srcpad, buffer);
> +	}
> +
> +	{
> +		/* We only want to resume the task if it's paused */
> +		GstTask *task = this->src->task;
> +		GST_OBJECT_LOCKER(task);
> +		if (GST_TASK_STATE(task) == GST_TASK_PAUSED) {
> +			GST_TASK_STATE(task) = GST_TASK_STARTED;
> +			GST_TASK_SIGNAL(task);
> +		}
> +	}
> +}
> +
>  static bool
>  gst_libcamera_src_open(GstLibcameraSrc *self)
>  {
> @@ -115,6 +214,8 @@ gst_libcamera_src_open(GstLibcameraSrc *self)
>  		return false;
>  	}
>  
> +	cam->requestCompleted.connect(self->state, &GstLibcameraSrcState::requestCompleted);
> +
>  	/* no need to lock here we didn't start our threads */
>  	self->state->cm = cm;
>  	self->state->cam = cam;
> @@ -126,8 +227,74 @@ static void
>  gst_libcamera_src_task_run(gpointer user_data)
>  {
>  	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
> +	GstLibcameraSrcState *state = self->state;
> +
> +	Request *request = new Request(state->cam.get());
> +	auto wrap = std::make_unique<RequestWrap>(request);
> +	for (GstPad *srcpad : state->srcpads) {
> +		GstLibcameraPool *pool = gst_libcamera_pad_get_pool(srcpad);
> +		GstBuffer *buffer;
> +		GstFlowReturn ret;
> +
> +		ret = gst_buffer_pool_acquire_buffer(GST_BUFFER_POOL(pool),
> +						     &buffer, nullptr);
> +		if (ret != GST_FLOW_OK) {
> +			/* RequestWrap does not take ownership, and we won't be
> +			 * queueing this one due to lack of buffers */
> +			delete request;
> +			request = NULL;
> +			break;
> +		}
> +
> +		wrap->AttachBuffer(buffer);
> +	}
> +
> +	if (request) {
> +		GST_OBJECT_LOCKER(self);
> +		GST_TRACE_OBJECT(self, "Requesting buffers");
> +		state->cam->queueRequest(request);
> +		state->requests.push(std::move(wrap));
> +	}
> +
> +	GstFlowReturn ret = GST_FLOW_OK;
> +	gst_flow_combiner_reset(self->flow_combiner);
> +	for (GstPad *srcpad : state->srcpads) {
> +		ret = gst_libcamera_pad_push_pending(srcpad);
> +		ret = gst_flow_combiner_update_pad_flow(self->flow_combiner,
> +							srcpad, ret);
> +	}
> +
> +	{
> +		/* Here we need to decide if we want to pause or stop the task. This
> +		 * needs to happend in lock step with the callback thread which may want
> +		 * to resume the task.
> +		 */
> +		GST_OBJECT_LOCKER(self);
> +		if (ret != GST_FLOW_OK) {
> +			if (ret == GST_FLOW_EOS) {
> +				g_autoptr(GstEvent) eos = gst_event_new_eos();
> +				guint32 seqnum = gst_util_seqnum_next();
> +				gst_event_set_seqnum(eos, seqnum);
> +				for (GstPad *srcpad : state->srcpads)
> +					gst_pad_push_event(srcpad, gst_event_ref(eos));
> +			} else if (ret != GST_FLOW_FLUSHING) {
> +				GST_ELEMENT_FLOW_ERROR(self, ret);
> +			}
> +			gst_task_stop(self->task);
> +			return;
> +		}
>  
> -	GST_DEBUG_OBJECT(self, "Streaming thread it now capturing");
> +		gboolean do_pause = true;
> +		for (GstPad *srcpad : state->srcpads) {
> +			if (gst_libcamera_pad_has_pending(srcpad)) {
> +				do_pause = false;
> +				break;
> +			}
> +		}
> +
> +		if (do_pause)
> +			gst_task_pause(self->task);
> +	}
>  }
>  
>  static void
> @@ -137,6 +304,7 @@ gst_libcamera_src_task_enter(GstTask *task, GThread *thread, gpointer user_data)
>  	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
>  	GstLibcameraSrcState *state = self->state;
>  	GstFlowReturn flow_ret = GST_FLOW_OK;
> +	gint ret = 0;
>  
>  	GST_DEBUG_OBJECT(self, "Streaming thread has started");
>  
> @@ -219,12 +387,23 @@ gst_libcamera_src_task_enter(GstTask *task, GThread *thread, gpointer user_data)
>  		return;
>  	}
>  
> +	self->flow_combiner = gst_flow_combiner_new();
>  	for (gsize i = 0; i < state->srcpads.size(); i++) {
>  		GstPad *srcpad = state->srcpads[i];
>  		const StreamConfiguration &stream_cfg = state->config->at(i);
>  		GstLibcameraPool *pool = gst_libcamera_pool_new(self->allocator,
>  								stream_cfg.stream());
>  		gst_libcamera_pad_set_pool(srcpad, pool);
> +		gst_flow_combiner_add_pad(self->flow_combiner, srcpad);
> +	}
> +
> +	ret = state->cam->start();
> +	if (ret) {
> +		GST_ELEMENT_ERROR(self, RESOURCE, SETTINGS,
> +				  ("Failed to start the camera: %s", g_strerror(-ret)),
> +				  ("Camera.start() failed with error code %i", ret));
> +		gst_task_stop(task);
> +		return;
>  	}
>  
>  done:
> @@ -254,6 +433,8 @@ gst_libcamera_src_task_leave(GstTask *task, GThread *thread, gpointer user_data)
>  		gst_libcamera_pad_set_pool(srcpad, NULL);
>  
>  	g_clear_object(&self->allocator);
> +	g_clear_pointer(&self->flow_combiner,
> +			(GDestroyNotify)gst_flow_combiner_free);
>  }
>  
>  static void
> @@ -333,6 +514,9 @@ gst_libcamera_src_change_state(GstElement *element, GstStateChange transition)
>  			return GST_STATE_CHANGE_FAILURE;
>  		ret = GST_STATE_CHANGE_NO_PREROLL;
>  		break;
> +	case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
> +		gst_task_start(self->task);
> +		break;
>  	case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
>  		ret = GST_STATE_CHANGE_NO_PREROLL;
>  		break;
> @@ -378,6 +562,9 @@ gst_libcamera_src_init(GstLibcameraSrc *self)
>  
>  	state->srcpads.push_back(gst_pad_new_from_template(templ, "src"));
>  	gst_element_add_pad(GST_ELEMENT(self), state->srcpads[0]);
> +
> +	/* C-style friend */
> +	state->src = self;
>  	self->state = state;
>  }
>  
>
Nicolas Dufresne Jan. 29, 2020, 6:16 p.m. UTC | #2
Le mercredi 29 janvier 2020 à 12:17 +0000, Kieran Bingham a écrit :
> Hi Nicolas,
> 
> On 29/01/2020 03:32, Nicolas Dufresne wrote:
> > From: Nicolas Dufresne <nicolas.dufresne@collabora.com>
> > 
> > With this patch, the element is not able to push buffers to the next
> > element in the graph. The buffers are currently missing any metadata
> > like timestamp, sequence number. The handling of the GstFlowReturn
> > for multiple pads isn't using a GstFlowCombiner as it should.
> 
> Aha, so perhaps this patch is not intended to be built/used yet and is
> just for example purposes?

I forgot to drop the comment about flow combiner desopite me implementing it
yesterday.

For the timestamp, I'll cleanup and include my patch that assumes the timestamp
are from CLOCK_MONOTONIC. Currently we only have V4L2 Capture nodes, which are
required to use that clock, so it's fine tempory assumption. Note that it will
just render as fast as possible without timestamp. What it breaks is RTP 
streaming, fpsdisplaysink, QoS and muxing, which I agree is a lot.

> 
> 
> > Signed-off-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>
> > ---
> >  src/gstreamer/gstlibcamerapad.cpp |  18 ++-
> >  src/gstreamer/gstlibcamerapad.h   |   2 +
> >  src/gstreamer/gstlibcamerasrc.cpp | 191 +++++++++++++++++++++++++++++-
> >  3 files changed, 208 insertions(+), 3 deletions(-)
> > 
> > diff --git a/src/gstreamer/gstlibcamerapad.cpp
> > b/src/gstreamer/gstlibcamerapad.cpp
> > index 4a775e4..7bc44c1 100644
> > --- a/src/gstreamer/gstlibcamerapad.cpp
> > +++ b/src/gstreamer/gstlibcamerapad.cpp
> > @@ -152,7 +152,7 @@ gst_libcamera_pad_push_pending(GstPad *pad)
> >  {
> >  	auto *self = GST_LIBCAMERA_PAD(pad);
> >  	GstBuffer *buffer;
> > -	GstFlowReturn ret = GST_FLOW_CUSTOM_SUCCESS;
> > +	GstFlowReturn ret = GST_FLOW_OK;
> >  
> >  	{
> >  		GST_OBJECT_LOCKER(self);
> > @@ -164,3 +164,19 @@ gst_libcamera_pad_push_pending(GstPad *pad)
> >  
> >  	return ret;
> >  }
> > +
> > +bool
> > +gst_libcamera_pad_has_pending(GstPad *pad)
> > +{
> > +	auto *self = GST_LIBCAMERA_PAD(pad);
> > +	GST_OBJECT_LOCKER(self);
> > +	return (self->pending_buffers.length > 0);
> > +}
> > +
> > +void
> > +gst_libcamera_pad_set_latency(GstPad *pad, GstClockTime latency)
> > +{
> > +	auto *self = GST_LIBCAMERA_PAD(pad);
> > +	GST_OBJECT_LOCKER(self);
> > +	self->latency = latency;
> 
> self->latency isn't yet defined, and I don't think this function is used
> - so probably needs to be moved to the timestamp patch you are working on.
> 
> > +}
> > diff --git a/src/gstreamer/gstlibcamerapad.h
> > b/src/gstreamer/gstlibcamerapad.h
> > index d928570..eb24000 100644
> > --- a/src/gstreamer/gstlibcamerapad.h
> > +++ b/src/gstreamer/gstlibcamerapad.h
> > @@ -30,4 +30,6 @@ void gst_libcamera_pad_queue_buffer(GstPad *pad, GstBuffer
> > *buffer);
> >  
> >  GstFlowReturn gst_libcamera_pad_push_pending(GstPad *pad);
> >  
> > +bool gst_libcamera_pad_has_pending(GstPad *pad);
> > +
> >  #endif /* __GST_LIBCAMERA_PAD_H__ */
> > diff --git a/src/gstreamer/gstlibcamerasrc.cpp
> > b/src/gstreamer/gstlibcamerasrc.cpp
> > index 5fc4393..947a8bf 100644
> > --- a/src/gstreamer/gstlibcamerasrc.cpp
> > +++ b/src/gstreamer/gstlibcamerasrc.cpp
> > @@ -12,8 +12,10 @@
> >  #include "gstlibcamerapool.h"
> >  #include "gstlibcamera-utils.h"
> >  
> > +#include <queue>
> >  #include <libcamera/camera.h>
> >  #include <libcamera/camera_manager.h>
> > +#include <gst/base/base.h>
> >  
> >  using namespace libcamera;
> >  
> > @@ -22,12 +24,73 @@ GST_DEBUG_CATEGORY_STATIC(source_debug);
> >  
> >  #define STREAM_LOCKER(obj) g_autoptr(GRecMutexLocker) stream_locker =
> > g_rec_mutex_locker_new(&GST_LIBCAMERA_SRC(obj)->stream_lock)
> >  
> > -/* Used for C++ object with destructors */
> > +struct RequestWrap {
> > +	RequestWrap(Request *request);
> > +	~RequestWrap();
> > +
> > +	void AttachBuffer(GstBuffer *buffer);
> > +	GstBuffer *DetachBuffer(Stream *stream);
> > +
> > +	/* For ptr comparision only */
> > +	Request *request_;
> > +	std::map<Stream *, GstBuffer *> buffers_;
> > +};
> > +
> > +RequestWrap::RequestWrap(Request *request)
> > +	: request_(request)
> > +{
> > +}
> > +
> > +RequestWrap::~RequestWrap()
> > +{
> > +	for (std::pair<Stream * const, GstBuffer *> &item : buffers_) {
> > +		if (item.second)
> > +			gst_buffer_unref(item.second);
> > +	}
> > +}
> > +
> > +void
> > +RequestWrap::AttachBuffer(GstBuffer *buffer)
> > +{
> > +	FrameBuffer *fb = gst_libcamera_buffer_get_frame_buffer(buffer);
> > +	Stream *stream = gst_libcamera_buffer_get_stream(buffer);
> > +
> > +	request_->addBuffer(stream, fb);
> > +
> > +	auto item = buffers_.find(stream);
> > +	if (item != buffers_.end()) {
> > +		gst_buffer_unref(item->second);
> > +		item->second = buffer;
> > +	} else {
> > +		buffers_[stream] = buffer;
> > +	}
> > +}
> > +
> > +GstBuffer *
> > +RequestWrap::DetachBuffer(Stream *stream)
> > +{
> > +	GstBuffer *buffer = nullptr;
> > +
> > +	auto item = buffers_.find(stream);
> > +	if (item != buffers_.end()) {
> > +		buffer = item->second;
> > +		item->second = nullptr;
> > +	}
> > +
> > +	return buffer;
> > +}
> > +
> > +/* Used for C++ object with destructors and callbacks */
> >  struct GstLibcameraSrcState {
> > +	GstLibcameraSrc *src;
> > +
> >  	std::shared_ptr<CameraManager> cm;
> >  	std::shared_ptr<Camera> cam;
> >  	std::unique_ptr<CameraConfiguration> config;
> >  	std::vector<GstPad *> srcpads;
> > +	std::queue<std::unique_ptr<RequestWrap>> requests;
> > +
> > +	void requestCompleted(Request *request);
> >  };
> >  
> >  struct _GstLibcameraSrc {
> > @@ -40,6 +103,7 @@ struct _GstLibcameraSrc {
> >  
> >  	GstLibcameraSrcState *state;
> >  	GstLibcameraAllocator *allocator;
> > +	GstFlowCombiner *flow_combiner;
> >  };
> >  
> >  enum {
> > @@ -63,6 +127,41 @@ GstStaticPadTemplate request_src_template = {
> >  	"src_%s", GST_PAD_SRC, GST_PAD_REQUEST, TEMPLATE_CAPS
> >  };
> >  
> > +void
> > +GstLibcameraSrcState::requestCompleted(Request *request)
> > +{
> > +	GST_OBJECT_LOCKER(this->src);
> > +
> > +	GST_DEBUG_OBJECT(this->src, "buffers are ready");
> > +
> > +	std::unique_ptr<RequestWrap> wrap = std::move(this->requests.front());
> > +	this->requests.pop();
> > +
> > +	g_return_if_fail(wrap->request_ == request);
> > +
> > +	if ((request->status() == Request::RequestCancelled)) {
> > +		GST_DEBUG_OBJECT(this->src, "Request was cancelled");
> > +		return;
> > +	}
> > +
> > +	GstBuffer *buffer;
> > +	for (GstPad *srcpad : this->srcpads) {
> > +		Stream *stream = gst_libcamera_pad_get_stream(srcpad);
> > +		buffer = wrap->DetachBuffer(stream);
> > +		gst_libcamera_pad_queue_buffer(srcpad, buffer);
> > +	}
> > +
> > +	{
> > +		/* We only want to resume the task if it's paused */
> > +		GstTask *task = this->src->task;
> > +		GST_OBJECT_LOCKER(task);
> > +		if (GST_TASK_STATE(task) == GST_TASK_PAUSED) {
> > +			GST_TASK_STATE(task) = GST_TASK_STARTED;
> > +			GST_TASK_SIGNAL(task);
> > +		}
> > +	}
> > +}
> > +
> >  static bool
> >  gst_libcamera_src_open(GstLibcameraSrc *self)
> >  {
> > @@ -115,6 +214,8 @@ gst_libcamera_src_open(GstLibcameraSrc *self)
> >  		return false;
> >  	}
> >  
> > +	cam->requestCompleted.connect(self->state,
> > &GstLibcameraSrcState::requestCompleted);
> > +
> >  	/* no need to lock here we didn't start our threads */
> >  	self->state->cm = cm;
> >  	self->state->cam = cam;
> > @@ -126,8 +227,74 @@ static void
> >  gst_libcamera_src_task_run(gpointer user_data)
> >  {
> >  	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
> > +	GstLibcameraSrcState *state = self->state;
> > +
> > +	Request *request = new Request(state->cam.get());
> > +	auto wrap = std::make_unique<RequestWrap>(request);
> > +	for (GstPad *srcpad : state->srcpads) {
> > +		GstLibcameraPool *pool = gst_libcamera_pad_get_pool(srcpad);
> > +		GstBuffer *buffer;
> > +		GstFlowReturn ret;
> > +
> > +		ret = gst_buffer_pool_acquire_buffer(GST_BUFFER_POOL(pool),
> > +						     &buffer, nullptr);
> > +		if (ret != GST_FLOW_OK) {
> > +			/* RequestWrap does not take ownership, and we won't be
> > +			 * queueing this one due to lack of buffers */
> > +			delete request;
> > +			request = NULL;
> > +			break;
> > +		}
> > +
> > +		wrap->AttachBuffer(buffer);
> > +	}
> > +
> > +	if (request) {
> > +		GST_OBJECT_LOCKER(self);
> > +		GST_TRACE_OBJECT(self, "Requesting buffers");
> > +		state->cam->queueRequest(request);
> > +		state->requests.push(std::move(wrap));
> > +	}
> > +
> > +	GstFlowReturn ret = GST_FLOW_OK;
> > +	gst_flow_combiner_reset(self->flow_combiner);
> > +	for (GstPad *srcpad : state->srcpads) {
> > +		ret = gst_libcamera_pad_push_pending(srcpad);
> > +		ret = gst_flow_combiner_update_pad_flow(self->flow_combiner,
> > +							srcpad, ret);
> > +	}
> > +
> > +	{
> > +		/* Here we need to decide if we want to pause or stop the task.
> > This
> > +		 * needs to happend in lock step with the callback thread which
> > may want
> > +		 * to resume the task.
> > +		 */
> > +		GST_OBJECT_LOCKER(self);
> > +		if (ret != GST_FLOW_OK) {
> > +			if (ret == GST_FLOW_EOS) {
> > +				g_autoptr(GstEvent) eos = gst_event_new_eos();
> > +				guint32 seqnum = gst_util_seqnum_next();
> > +				gst_event_set_seqnum(eos, seqnum);
> > +				for (GstPad *srcpad : state->srcpads)
> > +					gst_pad_push_event(srcpad,
> > gst_event_ref(eos));
> > +			} else if (ret != GST_FLOW_FLUSHING) {
> > +				GST_ELEMENT_FLOW_ERROR(self, ret);
> > +			}
> > +			gst_task_stop(self->task);
> > +			return;
> > +		}
> >  
> > -	GST_DEBUG_OBJECT(self, "Streaming thread it now capturing");
> > +		gboolean do_pause = true;
> > +		for (GstPad *srcpad : state->srcpads) {
> > +			if (gst_libcamera_pad_has_pending(srcpad)) {
> > +				do_pause = false;
> > +				break;
> > +			}
> > +		}
> > +
> > +		if (do_pause)
> > +			gst_task_pause(self->task);
> > +	}
> >  }
> >  
> >  static void
> > @@ -137,6 +304,7 @@ gst_libcamera_src_task_enter(GstTask *task, GThread
> > *thread, gpointer user_data)
> >  	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
> >  	GstLibcameraSrcState *state = self->state;
> >  	GstFlowReturn flow_ret = GST_FLOW_OK;
> > +	gint ret = 0;
> >  
> >  	GST_DEBUG_OBJECT(self, "Streaming thread has started");
> >  
> > @@ -219,12 +387,23 @@ gst_libcamera_src_task_enter(GstTask *task, GThread
> > *thread, gpointer user_data)
> >  		return;
> >  	}
> >  
> > +	self->flow_combiner = gst_flow_combiner_new();
> >  	for (gsize i = 0; i < state->srcpads.size(); i++) {
> >  		GstPad *srcpad = state->srcpads[i];
> >  		const StreamConfiguration &stream_cfg = state->config->at(i);
> >  		GstLibcameraPool *pool = gst_libcamera_pool_new(self->allocator,
> >  								stream_cfg.strea
> > m());
> >  		gst_libcamera_pad_set_pool(srcpad, pool);
> > +		gst_flow_combiner_add_pad(self->flow_combiner, srcpad);
> > +	}
> > +
> > +	ret = state->cam->start();
> > +	if (ret) {
> > +		GST_ELEMENT_ERROR(self, RESOURCE, SETTINGS,
> > +				  ("Failed to start the camera: %s",
> > g_strerror(-ret)),
> > +				  ("Camera.start() failed with error code %i",
> > ret));
> > +		gst_task_stop(task);
> > +		return;
> >  	}
> >  
> >  done:
> > @@ -254,6 +433,8 @@ gst_libcamera_src_task_leave(GstTask *task, GThread
> > *thread, gpointer user_data)
> >  		gst_libcamera_pad_set_pool(srcpad, NULL);
> >  
> >  	g_clear_object(&self->allocator);
> > +	g_clear_pointer(&self->flow_combiner,
> > +			(GDestroyNotify)gst_flow_combiner_free);
> >  }
> >  
> >  static void
> > @@ -333,6 +514,9 @@ gst_libcamera_src_change_state(GstElement *element,
> > GstStateChange transition)
> >  			return GST_STATE_CHANGE_FAILURE;
> >  		ret = GST_STATE_CHANGE_NO_PREROLL;
> >  		break;
> > +	case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
> > +		gst_task_start(self->task);
> > +		break;
> >  	case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
> >  		ret = GST_STATE_CHANGE_NO_PREROLL;
> >  		break;
> > @@ -378,6 +562,9 @@ gst_libcamera_src_init(GstLibcameraSrc *self)
> >  
> >  	state->srcpads.push_back(gst_pad_new_from_template(templ, "src"));
> >  	gst_element_add_pad(GST_ELEMENT(self), state->srcpads[0]);
> > +
> > +	/* C-style friend */
> > +	state->src = self;
> >  	self->state = state;
> >  }
> >  
> >

Patch

diff --git a/src/gstreamer/gstlibcamerapad.cpp b/src/gstreamer/gstlibcamerapad.cpp
index 4a775e4..7bc44c1 100644
--- a/src/gstreamer/gstlibcamerapad.cpp
+++ b/src/gstreamer/gstlibcamerapad.cpp
@@ -152,7 +152,7 @@  gst_libcamera_pad_push_pending(GstPad *pad)
 {
 	auto *self = GST_LIBCAMERA_PAD(pad);
 	GstBuffer *buffer;
-	GstFlowReturn ret = GST_FLOW_CUSTOM_SUCCESS;
+	GstFlowReturn ret = GST_FLOW_OK;
 
 	{
 		GST_OBJECT_LOCKER(self);
@@ -164,3 +164,19 @@  gst_libcamera_pad_push_pending(GstPad *pad)
 
 	return ret;
 }
+
+bool
+gst_libcamera_pad_has_pending(GstPad *pad)
+{
+	auto *self = GST_LIBCAMERA_PAD(pad);
+	GST_OBJECT_LOCKER(self);
+	return (self->pending_buffers.length > 0);
+}
+
+void
+gst_libcamera_pad_set_latency(GstPad *pad, GstClockTime latency)
+{
+	auto *self = GST_LIBCAMERA_PAD(pad);
+	GST_OBJECT_LOCKER(self);
+	self->latency = latency;
+}
diff --git a/src/gstreamer/gstlibcamerapad.h b/src/gstreamer/gstlibcamerapad.h
index d928570..eb24000 100644
--- a/src/gstreamer/gstlibcamerapad.h
+++ b/src/gstreamer/gstlibcamerapad.h
@@ -30,4 +30,6 @@  void gst_libcamera_pad_queue_buffer(GstPad *pad, GstBuffer *buffer);
 
 GstFlowReturn gst_libcamera_pad_push_pending(GstPad *pad);
 
+bool gst_libcamera_pad_has_pending(GstPad *pad);
+
 #endif /* __GST_LIBCAMERA_PAD_H__ */
diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
index 5fc4393..947a8bf 100644
--- a/src/gstreamer/gstlibcamerasrc.cpp
+++ b/src/gstreamer/gstlibcamerasrc.cpp
@@ -12,8 +12,10 @@ 
 #include "gstlibcamerapool.h"
 #include "gstlibcamera-utils.h"
 
+#include <queue>
 #include <libcamera/camera.h>
 #include <libcamera/camera_manager.h>
+#include <gst/base/base.h>
 
 using namespace libcamera;
 
@@ -22,12 +24,73 @@  GST_DEBUG_CATEGORY_STATIC(source_debug);
 
 #define STREAM_LOCKER(obj) g_autoptr(GRecMutexLocker) stream_locker = g_rec_mutex_locker_new(&GST_LIBCAMERA_SRC(obj)->stream_lock)
 
-/* Used for C++ object with destructors */
+struct RequestWrap {
+	RequestWrap(Request *request);
+	~RequestWrap();
+
+	void AttachBuffer(GstBuffer *buffer);
+	GstBuffer *DetachBuffer(Stream *stream);
+
+	/* For ptr comparision only */
+	Request *request_;
+	std::map<Stream *, GstBuffer *> buffers_;
+};
+
+RequestWrap::RequestWrap(Request *request)
+	: request_(request)
+{
+}
+
+RequestWrap::~RequestWrap()
+{
+	for (std::pair<Stream * const, GstBuffer *> &item : buffers_) {
+		if (item.second)
+			gst_buffer_unref(item.second);
+	}
+}
+
+void
+RequestWrap::AttachBuffer(GstBuffer *buffer)
+{
+	FrameBuffer *fb = gst_libcamera_buffer_get_frame_buffer(buffer);
+	Stream *stream = gst_libcamera_buffer_get_stream(buffer);
+
+	request_->addBuffer(stream, fb);
+
+	auto item = buffers_.find(stream);
+	if (item != buffers_.end()) {
+		gst_buffer_unref(item->second);
+		item->second = buffer;
+	} else {
+		buffers_[stream] = buffer;
+	}
+}
+
+GstBuffer *
+RequestWrap::DetachBuffer(Stream *stream)
+{
+	GstBuffer *buffer = nullptr;
+
+	auto item = buffers_.find(stream);
+	if (item != buffers_.end()) {
+		buffer = item->second;
+		item->second = nullptr;
+	}
+
+	return buffer;
+}
+
+/* Used for C++ object with destructors and callbacks */
 struct GstLibcameraSrcState {
+	GstLibcameraSrc *src;
+
 	std::shared_ptr<CameraManager> cm;
 	std::shared_ptr<Camera> cam;
 	std::unique_ptr<CameraConfiguration> config;
 	std::vector<GstPad *> srcpads;
+	std::queue<std::unique_ptr<RequestWrap>> requests;
+
+	void requestCompleted(Request *request);
 };
 
 struct _GstLibcameraSrc {
@@ -40,6 +103,7 @@  struct _GstLibcameraSrc {
 
 	GstLibcameraSrcState *state;
 	GstLibcameraAllocator *allocator;
+	GstFlowCombiner *flow_combiner;
 };
 
 enum {
@@ -63,6 +127,41 @@  GstStaticPadTemplate request_src_template = {
 	"src_%s", GST_PAD_SRC, GST_PAD_REQUEST, TEMPLATE_CAPS
 };
 
+void
+GstLibcameraSrcState::requestCompleted(Request *request)
+{
+	GST_OBJECT_LOCKER(this->src);
+
+	GST_DEBUG_OBJECT(this->src, "buffers are ready");
+
+	std::unique_ptr<RequestWrap> wrap = std::move(this->requests.front());
+	this->requests.pop();
+
+	g_return_if_fail(wrap->request_ == request);
+
+	if ((request->status() == Request::RequestCancelled)) {
+		GST_DEBUG_OBJECT(this->src, "Request was cancelled");
+		return;
+	}
+
+	GstBuffer *buffer;
+	for (GstPad *srcpad : this->srcpads) {
+		Stream *stream = gst_libcamera_pad_get_stream(srcpad);
+		buffer = wrap->DetachBuffer(stream);
+		gst_libcamera_pad_queue_buffer(srcpad, buffer);
+	}
+
+	{
+		/* We only want to resume the task if it's paused */
+		GstTask *task = this->src->task;
+		GST_OBJECT_LOCKER(task);
+		if (GST_TASK_STATE(task) == GST_TASK_PAUSED) {
+			GST_TASK_STATE(task) = GST_TASK_STARTED;
+			GST_TASK_SIGNAL(task);
+		}
+	}
+}
+
 static bool
 gst_libcamera_src_open(GstLibcameraSrc *self)
 {
@@ -115,6 +214,8 @@  gst_libcamera_src_open(GstLibcameraSrc *self)
 		return false;
 	}
 
+	cam->requestCompleted.connect(self->state, &GstLibcameraSrcState::requestCompleted);
+
 	/* no need to lock here we didn't start our threads */
 	self->state->cm = cm;
 	self->state->cam = cam;
@@ -126,8 +227,74 @@  static void
 gst_libcamera_src_task_run(gpointer user_data)
 {
 	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
+	GstLibcameraSrcState *state = self->state;
+
+	Request *request = new Request(state->cam.get());
+	auto wrap = std::make_unique<RequestWrap>(request);
+	for (GstPad *srcpad : state->srcpads) {
+		GstLibcameraPool *pool = gst_libcamera_pad_get_pool(srcpad);
+		GstBuffer *buffer;
+		GstFlowReturn ret;
+
+		ret = gst_buffer_pool_acquire_buffer(GST_BUFFER_POOL(pool),
+						     &buffer, nullptr);
+		if (ret != GST_FLOW_OK) {
+			/* RequestWrap does not take ownership, and we won't be
+			 * queueing this one due to lack of buffers */
+			delete request;
+			request = NULL;
+			break;
+		}
+
+		wrap->AttachBuffer(buffer);
+	}
+
+	if (request) {
+		GST_OBJECT_LOCKER(self);
+		GST_TRACE_OBJECT(self, "Requesting buffers");
+		state->cam->queueRequest(request);
+		state->requests.push(std::move(wrap));
+	}
+
+	GstFlowReturn ret = GST_FLOW_OK;
+	gst_flow_combiner_reset(self->flow_combiner);
+	for (GstPad *srcpad : state->srcpads) {
+		ret = gst_libcamera_pad_push_pending(srcpad);
+		ret = gst_flow_combiner_update_pad_flow(self->flow_combiner,
+							srcpad, ret);
+	}
+
+	{
+		/* Here we need to decide if we want to pause or stop the task. This
+		 * needs to happend in lock step with the callback thread which may want
+		 * to resume the task.
+		 */
+		GST_OBJECT_LOCKER(self);
+		if (ret != GST_FLOW_OK) {
+			if (ret == GST_FLOW_EOS) {
+				g_autoptr(GstEvent) eos = gst_event_new_eos();
+				guint32 seqnum = gst_util_seqnum_next();
+				gst_event_set_seqnum(eos, seqnum);
+				for (GstPad *srcpad : state->srcpads)
+					gst_pad_push_event(srcpad, gst_event_ref(eos));
+			} else if (ret != GST_FLOW_FLUSHING) {
+				GST_ELEMENT_FLOW_ERROR(self, ret);
+			}
+			gst_task_stop(self->task);
+			return;
+		}
 
-	GST_DEBUG_OBJECT(self, "Streaming thread it now capturing");
+		gboolean do_pause = true;
+		for (GstPad *srcpad : state->srcpads) {
+			if (gst_libcamera_pad_has_pending(srcpad)) {
+				do_pause = false;
+				break;
+			}
+		}
+
+		if (do_pause)
+			gst_task_pause(self->task);
+	}
 }
 
 static void
@@ -137,6 +304,7 @@  gst_libcamera_src_task_enter(GstTask *task, GThread *thread, gpointer user_data)
 	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
 	GstLibcameraSrcState *state = self->state;
 	GstFlowReturn flow_ret = GST_FLOW_OK;
+	gint ret = 0;
 
 	GST_DEBUG_OBJECT(self, "Streaming thread has started");
 
@@ -219,12 +387,23 @@  gst_libcamera_src_task_enter(GstTask *task, GThread *thread, gpointer user_data)
 		return;
 	}
 
+	self->flow_combiner = gst_flow_combiner_new();
 	for (gsize i = 0; i < state->srcpads.size(); i++) {
 		GstPad *srcpad = state->srcpads[i];
 		const StreamConfiguration &stream_cfg = state->config->at(i);
 		GstLibcameraPool *pool = gst_libcamera_pool_new(self->allocator,
 								stream_cfg.stream());
 		gst_libcamera_pad_set_pool(srcpad, pool);
+		gst_flow_combiner_add_pad(self->flow_combiner, srcpad);
+	}
+
+	ret = state->cam->start();
+	if (ret) {
+		GST_ELEMENT_ERROR(self, RESOURCE, SETTINGS,
+				  ("Failed to start the camera: %s", g_strerror(-ret)),
+				  ("Camera.start() failed with error code %i", ret));
+		gst_task_stop(task);
+		return;
 	}
 
 done:
@@ -254,6 +433,8 @@  gst_libcamera_src_task_leave(GstTask *task, GThread *thread, gpointer user_data)
 		gst_libcamera_pad_set_pool(srcpad, NULL);
 
 	g_clear_object(&self->allocator);
+	g_clear_pointer(&self->flow_combiner,
+			(GDestroyNotify)gst_flow_combiner_free);
 }
 
 static void
@@ -333,6 +514,9 @@  gst_libcamera_src_change_state(GstElement *element, GstStateChange transition)
 			return GST_STATE_CHANGE_FAILURE;
 		ret = GST_STATE_CHANGE_NO_PREROLL;
 		break;
+	case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+		gst_task_start(self->task);
+		break;
 	case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
 		ret = GST_STATE_CHANGE_NO_PREROLL;
 		break;
@@ -378,6 +562,9 @@  gst_libcamera_src_init(GstLibcameraSrc *self)
 
 	state->srcpads.push_back(gst_pad_new_from_template(templ, "src"));
 	gst_element_add_pad(GST_ELEMENT(self), state->srcpads[0]);
+
+	/* C-style friend */
+	state->src = self;
 	self->state = state;
 }