[libcamera-devel,v1,15/23] gst: libcamerasrc: Implement minimal caps negotiation

Message ID 20200129033210.278800-16-nicolas@ndufresne.ca
State Superseded
Headers show
Series
  • GStreamer Element for libcamera
Related show

Commit Message

Nicolas Dufresne Jan. 29, 2020, 3:32 a.m. UTC
From: Nicolas Dufresne <nicolas.dufresne@collabora.com>

This is not expected to work in every possible cases, but should be sufficient as
an initial implementation. What is does it that it turns the StreamFormats into
caps and query downstream caps with that as a filter.

The result is the subset of caps that can be used. We then keep the first
structure in that result and fixate using the default values found in
StreamConfiguration as a default in case a range is available.

We then validate this configuration and turn the potentially modified
configuration into caps that we push downstream. Note that we strust the order
in StreamFormats as being sorted best first, but this is not currently in
libcamera.

Signed-off-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>
---
 src/gstreamer/gstlibcamerasrc.cpp | 71 +++++++++++++++++++++++++++++++
 1 file changed, 71 insertions(+)

Comments

Laurent Pinchart Feb. 11, 2020, 11:47 p.m. UTC | #1
Hi Nicolas,

Thank you for the patch.

On Tue, Jan 28, 2020 at 10:32:02PM -0500, Nicolas Dufresne wrote:
> From: Nicolas Dufresne <nicolas.dufresne@collabora.com>
> 
> This is not expected to work in every possible cases, but should be sufficient as
> an initial implementation. What is does it that it turns the StreamFormats into

s/is does it/it does is/

> caps and query downstream caps with that as a filter.

s/query/queries/

> The result is the subset of caps that can be used. We then keep the first
> structure in that result and fixate using the default values found in
> StreamConfiguration as a default in case a range is available.
> 
> We then validate this configuration and turn the potentially modified
> configuration into caps that we push downstream. Note that we strust the order

s/strust/trust/

> in StreamFormats as being sorted best first, but this is not currently in
> libcamera.

Could you add a \todo comment in the code to address this ?

> Signed-off-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>
> ---
>  src/gstreamer/gstlibcamerasrc.cpp | 71 +++++++++++++++++++++++++++++++
>  1 file changed, 71 insertions(+)
> 
> diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
> index b1a21dc..7b478fa 100644
> --- a/src/gstreamer/gstlibcamerasrc.cpp
> +++ b/src/gstreamer/gstlibcamerasrc.cpp
> @@ -24,6 +24,7 @@ GST_DEBUG_CATEGORY_STATIC(source_debug);
>  struct GstLibcameraSrcState {
>  	std::shared_ptr<CameraManager> cm;
>  	std::shared_ptr<Camera> cam;
> +	std::unique_ptr<CameraConfiguration> config;
>  	std::vector<GstPad *> srcpads;
>  };
>  
> @@ -132,16 +133,86 @@ gst_libcamera_src_task_enter(GstTask *task, GThread *thread, gpointer user_data)
>  	STREAM_LOCKER(user_data);
>  	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
>  	GstLibcameraSrcState *state = self->state;
> +	GstFlowReturn flow_ret = GST_FLOW_OK;
>  
>  	GST_DEBUG_OBJECT(self, "Streaming thread has started");
>  
>  	guint group_id = gst_util_group_id_next();
> +	StreamRoles roles;
>  	for (GstPad *srcpad : state->srcpads) {
>  		/* Create stream-id and push stream-start */
>  		g_autofree gchar *stream_id = gst_pad_create_stream_id(srcpad, GST_ELEMENT(self), nullptr);
>  		GstEvent *event = gst_event_new_stream_start(stream_id);
>  		gst_event_set_group_id(event, group_id);
>  		gst_pad_push_event(srcpad, event);
> +
> +		/* Collect the streams roles for the next iteration */
> +		roles.push_back(gst_libcamera_pad_get_role(srcpad));
> +	}
> +
> +	/* Generate the stream configurations, there should be one per pad */
> +	state->config = state->cam->generateConfiguration(roles);
> +	g_assert(state->config->size() == state->srcpads.size());

What if more pads have been created than the camera can handle ? Is
there a way to fail more gracefully than with a g_assert() ? It doesn't
have to be implement now, but would be nice in the future. Maybe a \todo
comment with a brief description of how to fix it ?

> +
> +	for (gsize i = 0; i < state->srcpads.size(); i++) {
> +		GstPad *srcpad = state->srcpads[i];
> +		StreamConfiguration &stream_cfg = state->config->at(i);
> +
> +		/* Retreive the supported caps */

s/Retreive/Retrieve/

> +		g_autoptr(GstCaps) filter = gst_libcamera_stream_formats_to_caps(stream_cfg.formats());
> +		g_autoptr(GstCaps) caps = gst_pad_peer_query_caps(srcpad, filter);
> +		if (gst_caps_is_empty(caps)) {
> +			flow_ret = GST_FLOW_NOT_NEGOTIATED;
> +			break;
> +		}
> +
> +		/* Fixate caps and configure the stream */
> +		caps = gst_caps_make_writable(caps);
> +		gst_libcamera_configure_stream_from_caps(stream_cfg, caps);
> +	}
> +
> +	if (flow_ret != GST_FLOW_OK)
> +		goto done;
> +
> +	/* Validate the configuration */
> +	if (state->config->validate() == CameraConfiguration::Invalid) {
> +		flow_ret = GST_FLOW_NOT_NEGOTIATED;
> +		goto done;
> +	}
> +
> +	/* Regardless if it has been modified, create clean caps and push the
> +	 * caps event, downstream will decide if hte caps are acceptable */

s/hte/the/

Comment style:

	/*
	 * ...
	 */

> +	for (gsize i = 0; i < state->srcpads.size(); i++) {
> +		GstPad *srcpad = state->srcpads[i];
> +		const StreamConfiguration &stream_cfg = state->config->at(i);
> +
> +		g_autoptr(GstCaps) caps = gst_libcamera_stream_configuration_to_caps(stream_cfg);
> +		if (!gst_pad_push_event(srcpad, gst_event_new_caps(caps))) {
> +			flow_ret = GST_FLOW_NOT_NEGOTIATED;
> +			break;
> +		}
> +	}
> +
> +	ret = state->cam->configure(state->config.get());
> +	if (ret) {
> +		GST_ELEMENT_ERROR(self, RESOURCE, SETTINGS,
> +				  ("Failed to configure camera: %s", g_strerror(-ret)),
> +				  ("Camera.configure() failed with error code %i", ret));

s/.configure/::configure/

> +		gst_task_stop(task);
> +		return;

Just for my information, why is there no need to push an EOS events on
pads here ? And what if the previous loop fails and sets flow_ret to
GST_FLOW_NOT_NEGOTIATED and this then fails, is it expected that the
done: label won't be reached in that case ?

> +	}
> +
> +done:
> +	switch (flow_ret) {
> +	case GST_FLOW_NOT_NEGOTIATED:
> +		for (GstPad *srcpad : state->srcpads) {
> +			gst_pad_push_event(srcpad, gst_event_new_eos());
> +		}

No need for braces.

> +		GST_ELEMENT_FLOW_ERROR(self, flow_ret);
> +		gst_task_stop(task);
> +		return;
> +	default:
> +		break;
>  	}
>  }
>
Nicolas Dufresne Feb. 12, 2020, 5:30 p.m. UTC | #2
Le mercredi 12 février 2020 à 01:47 +0200, Laurent Pinchart a écrit :
> Hi Nicolas,
> 
> Thank you for the patch.
> 
> On Tue, Jan 28, 2020 at 10:32:02PM -0500, Nicolas Dufresne wrote:
> > From: Nicolas Dufresne <nicolas.dufresne@collabora.com>
> > 
> > This is not expected to work in every possible cases, but should be sufficient as
> > an initial implementation. What is does it that it turns the StreamFormats into
> 
> s/is does it/it does is/
> 
> > caps and query downstream caps with that as a filter.
> 
> s/query/queries/
> 
> > The result is the subset of caps that can be used. We then keep the first
> > structure in that result and fixate using the default values found in
> > StreamConfiguration as a default in case a range is available.
> > 
> > We then validate this configuration and turn the potentially modified
> > configuration into caps that we push downstream. Note that we strust the order
> 
> s/strust/trust/
> 
> > in StreamFormats as being sorted best first, but this is not currently in
> > libcamera.
> 
> Could you add a \todo comment in the code to address this ?
> 
> > Signed-off-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>
> > ---
> >  src/gstreamer/gstlibcamerasrc.cpp | 71 +++++++++++++++++++++++++++++++
> >  1 file changed, 71 insertions(+)
> > 
> > diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
> > index b1a21dc..7b478fa 100644
> > --- a/src/gstreamer/gstlibcamerasrc.cpp
> > +++ b/src/gstreamer/gstlibcamerasrc.cpp
> > @@ -24,6 +24,7 @@ GST_DEBUG_CATEGORY_STATIC(source_debug);
> >  struct GstLibcameraSrcState {
> >  	std::shared_ptr<CameraManager> cm;
> >  	std::shared_ptr<Camera> cam;
> > +	std::unique_ptr<CameraConfiguration> config;
> >  	std::vector<GstPad *> srcpads;
> >  };
> >  
> > @@ -132,16 +133,86 @@ gst_libcamera_src_task_enter(GstTask *task, GThread *thread, gpointer user_data)
> >  	STREAM_LOCKER(user_data);
> >  	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
> >  	GstLibcameraSrcState *state = self->state;
> > +	GstFlowReturn flow_ret = GST_FLOW_OK;
> >  
> >  	GST_DEBUG_OBJECT(self, "Streaming thread has started");
> >  
> >  	guint group_id = gst_util_group_id_next();
> > +	StreamRoles roles;
> >  	for (GstPad *srcpad : state->srcpads) {
> >  		/* Create stream-id and push stream-start */
> >  		g_autofree gchar *stream_id = gst_pad_create_stream_id(srcpad, GST_ELEMENT(self), nullptr);
> >  		GstEvent *event = gst_event_new_stream_start(stream_id);
> >  		gst_event_set_group_id(event, group_id);
> >  		gst_pad_push_event(srcpad, event);
> > +
> > +		/* Collect the streams roles for the next iteration */
> > +		roles.push_back(gst_libcamera_pad_get_role(srcpad));
> > +	}
> > +
> > +	/* Generate the stream configurations, there should be one per pad */
> > +	state->config = state->cam->generateConfiguration(roles);
> > +	g_assert(state->config->size() == state->srcpads.size());
> 
> What if more pads have been created than the camera can handle ? Is
> there a way to fail more gracefully than with a g_assert() ? It doesn't
> have to be implement now, but would be nice in the future. Maybe a \todo
> comment with a brief description of how to fix it ?

Should add a \todo, I didn't remember this. In fact, I should check if
the size is the same and fail otherwise, there is nothing I can do
about this. User wanted N stream, and this is not possible.

I don't think there is any clean way to do it right now. Normally I
should reject the pad request, but at that moment, I have no
information about frame size, or anything.

Note that there is no such thing as multiple pads at the moment, that's
not implemented yet. I just wanted to have relatively generic code to
avoid the follow up looking like a rewrite.

> 
> > +
> > +	for (gsize i = 0; i < state->srcpads.size(); i++) {
> > +		GstPad *srcpad = state->srcpads[i];
> > +		StreamConfiguration &stream_cfg = state->config->at(i);
> > +
> > +		/* Retreive the supported caps */
> 
> s/Retreive/Retrieve/
> 
> > +		g_autoptr(GstCaps) filter = gst_libcamera_stream_formats_to_caps(stream_cfg.formats());
> > +		g_autoptr(GstCaps) caps = gst_pad_peer_query_caps(srcpad, filter);
> > +		if (gst_caps_is_empty(caps)) {
> > +			flow_ret = GST_FLOW_NOT_NEGOTIATED;
> > +			break;
> > +		}
> > +
> > +		/* Fixate caps and configure the stream */
> > +		caps = gst_caps_make_writable(caps);
> > +		gst_libcamera_configure_stream_from_caps(stream_cfg, caps);
> > +	}
> > +
> > +	if (flow_ret != GST_FLOW_OK)
> > +		goto done;
> > +
> > +	/* Validate the configuration */
> > +	if (state->config->validate() == CameraConfiguration::Invalid) {
> > +		flow_ret = GST_FLOW_NOT_NEGOTIATED;
> > +		goto done;
> > +	}
> > +
> > +	/* Regardless if it has been modified, create clean caps and push the
> > +	 * caps event, downstream will decide if hte caps are acceptable */
> 
> s/hte/the/
> 
> Comment style:
> 
> 	/*
> 	 * ...
> 	 */
> 
> > +	for (gsize i = 0; i < state->srcpads.size(); i++) {
> > +		GstPad *srcpad = state->srcpads[i];
> > +		const StreamConfiguration &stream_cfg = state->config->at(i);
> > +
> > +		g_autoptr(GstCaps) caps = gst_libcamera_stream_configuration_to_caps(stream_cfg);
> > +		if (!gst_pad_push_event(srcpad, gst_event_new_caps(caps))) {
> > +			flow_ret = GST_FLOW_NOT_NEGOTIATED;
> > +			break;
> > +		}
> > +	}
> > +
> > +	ret = state->cam->configure(state->config.get());
> > +	if (ret) {
> > +		GST_ELEMENT_ERROR(self, RESOURCE, SETTINGS,
> > +				  ("Failed to configure camera: %s", g_strerror(-ret)),
> > +				  ("Camera.configure() failed with error code %i", ret));
> 
> s/.configure/::configure/
> 
> > +		gst_task_stop(task);
> > +		return;
> 
> Just for my information, why is there no need to push an EOS events on
> pads here ? And what if the previous loop fails and sets flow_ret to
> GST_FLOW_NOT_NEGOTIATED and this then fails, is it expected that the
> done: label won't be reached in that case ?
> 
> > +	}
> > +
> > +done:
> > +	switch (flow_ret) {
> > +	case GST_FLOW_NOT_NEGOTIATED:
> > +		for (GstPad *srcpad : state->srcpads) {
> > +			gst_pad_push_event(srcpad, gst_event_new_eos());
> > +		}
> 
> No need for braces.
> 
> > +		GST_ELEMENT_FLOW_ERROR(self, flow_ret);
> > +		gst_task_stop(task);
> > +		return;
> > +	default:
> > +		break;
> >  	}
> >  }
> >
Nicolas Dufresne Feb. 12, 2020, 5:34 p.m. UTC | #3
Le mercredi 12 février 2020 à 01:47 +0200, Laurent Pinchart a écrit :
> Hi Nicolas,
> 
> Thank you for the patch.
> 
> On Tue, Jan 28, 2020 at 10:32:02PM -0500, Nicolas Dufresne wrote:
> > From: Nicolas Dufresne <nicolas.dufresne@collabora.com>
> > 
> > This is not expected to work in every possible cases, but should be sufficient as
> > an initial implementation. What is does it that it turns the StreamFormats into
> 
> s/is does it/it does is/
> 
> > caps and query downstream caps with that as a filter.
> 
> s/query/queries/
> 
> > The result is the subset of caps that can be used. We then keep the first
> > structure in that result and fixate using the default values found in
> > StreamConfiguration as a default in case a range is available.
> > 
> > We then validate this configuration and turn the potentially modified
> > configuration into caps that we push downstream. Note that we strust the order
> 
> s/strust/trust/
> 
> > in StreamFormats as being sorted best first, but this is not currently in
> > libcamera.
> 
> Could you add a \todo comment in the code to address this ?
> 
> > Signed-off-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>
> > ---
> >  src/gstreamer/gstlibcamerasrc.cpp | 71 +++++++++++++++++++++++++++++++
> >  1 file changed, 71 insertions(+)
> > 
> > diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
> > index b1a21dc..7b478fa 100644
> > --- a/src/gstreamer/gstlibcamerasrc.cpp
> > +++ b/src/gstreamer/gstlibcamerasrc.cpp
> > @@ -24,6 +24,7 @@ GST_DEBUG_CATEGORY_STATIC(source_debug);
> >  struct GstLibcameraSrcState {
> >  	std::shared_ptr<CameraManager> cm;
> >  	std::shared_ptr<Camera> cam;
> > +	std::unique_ptr<CameraConfiguration> config;
> >  	std::vector<GstPad *> srcpads;
> >  };
> >  
> > @@ -132,16 +133,86 @@ gst_libcamera_src_task_enter(GstTask *task, GThread *thread, gpointer user_data)
> >  	STREAM_LOCKER(user_data);
> >  	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
> >  	GstLibcameraSrcState *state = self->state;
> > +	GstFlowReturn flow_ret = GST_FLOW_OK;
> >  
> >  	GST_DEBUG_OBJECT(self, "Streaming thread has started");
> >  
> >  	guint group_id = gst_util_group_id_next();
> > +	StreamRoles roles;
> >  	for (GstPad *srcpad : state->srcpads) {
> >  		/* Create stream-id and push stream-start */
> >  		g_autofree gchar *stream_id = gst_pad_create_stream_id(srcpad, GST_ELEMENT(self), nullptr);
> >  		GstEvent *event = gst_event_new_stream_start(stream_id);
> >  		gst_event_set_group_id(event, group_id);
> >  		gst_pad_push_event(srcpad, event);
> > +
> > +		/* Collect the streams roles for the next iteration */
> > +		roles.push_back(gst_libcamera_pad_get_role(srcpad));
> > +	}
> > +
> > +	/* Generate the stream configurations, there should be one per pad */
> > +	state->config = state->cam->generateConfiguration(roles);
> > +	g_assert(state->config->size() == state->srcpads.size());
> 
> What if more pads have been created than the camera can handle ? Is
> there a way to fail more gracefully than with a g_assert() ? It doesn't
> have to be implement now, but would be nice in the future. Maybe a \todo
> comment with a brief description of how to fix it ?
> 
> > +
> > +	for (gsize i = 0; i < state->srcpads.size(); i++) {
> > +		GstPad *srcpad = state->srcpads[i];
> > +		StreamConfiguration &stream_cfg = state->config->at(i);
> > +
> > +		/* Retreive the supported caps */
> 
> s/Retreive/Retrieve/
> 
> > +		g_autoptr(GstCaps) filter = gst_libcamera_stream_formats_to_caps(stream_cfg.formats());
> > +		g_autoptr(GstCaps) caps = gst_pad_peer_query_caps(srcpad, filter);
> > +		if (gst_caps_is_empty(caps)) {
> > +			flow_ret = GST_FLOW_NOT_NEGOTIATED;
> > +			break;
> > +		}
> > +
> > +		/* Fixate caps and configure the stream */
> > +		caps = gst_caps_make_writable(caps);
> > +		gst_libcamera_configure_stream_from_caps(stream_cfg, caps);
> > +	}
> > +
> > +	if (flow_ret != GST_FLOW_OK)
> > +		goto done;
> > +
> > +	/* Validate the configuration */
> > +	if (state->config->validate() == CameraConfiguration::Invalid) {
> > +		flow_ret = GST_FLOW_NOT_NEGOTIATED;
> > +		goto done;
> > +	}
> > +
> > +	/* Regardless if it has been modified, create clean caps and push the
> > +	 * caps event, downstream will decide if hte caps are acceptable */
> 
> s/hte/the/
> 
> Comment style:
> 
> 	/*
> 	 * ...
> 	 */
> 
> > +	for (gsize i = 0; i < state->srcpads.size(); i++) {
> > +		GstPad *srcpad = state->srcpads[i];
> > +		const StreamConfiguration &stream_cfg = state->config->at(i);
> > +
> > +		g_autoptr(GstCaps) caps = gst_libcamera_stream_configuration_to_caps(stream_cfg);
> > +		if (!gst_pad_push_event(srcpad, gst_event_new_caps(caps))) {
> > +			flow_ret = GST_FLOW_NOT_NEGOTIATED;
> > +			break;
> > +		}
> > +	}
> > +
> > +	ret = state->cam->configure(state->config.get());
> > +	if (ret) {
> > +		GST_ELEMENT_ERROR(self, RESOURCE, SETTINGS,
> > +				  ("Failed to configure camera: %s", g_strerror(-ret)),
> > +				  ("Camera.configure() failed with error code %i", ret));
> 
> s/.configure/::configure/
> 
> > +		gst_task_stop(task);
> > +		return;
> 
> Just for my information, why is there no need to push an EOS events on
> pads here ? And what if the previous loop fails and sets flow_ret to
> GST_FLOW_NOT_NEGOTIATED and this then fails, is it expected that the
> done: label won't be reached in that case ?

Sorry, clicked send and forgot about this one. I have no idea why there
is a EOS being sent in the first place, the flow return handling was
copied from GstBaseSrc. At this stage, we haven't sent anything on any
pads yet, so it's not clear. Maybe they wanted to terminate the pre-
roll, maybe not. It pre-dates Git, so no history to figure-out.

> 
> > +	}
> > +
> > +done:
> > +	switch (flow_ret) {
> > +	case GST_FLOW_NOT_NEGOTIATED:
> > +		for (GstPad *srcpad : state->srcpads) {
> > +			gst_pad_push_event(srcpad, gst_event_new_eos());
> > +		}
> 
> No need for braces.

I'll stop sending EOS for now, as I don't know why we do this.

> 
> > +		GST_ELEMENT_FLOW_ERROR(self, flow_ret);
> > +		gst_task_stop(task);
> > +		return;
> > +	default:
> > +		break;
> >  	}
> >  }
> >

Patch

diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
index b1a21dc..7b478fa 100644
--- a/src/gstreamer/gstlibcamerasrc.cpp
+++ b/src/gstreamer/gstlibcamerasrc.cpp
@@ -24,6 +24,7 @@  GST_DEBUG_CATEGORY_STATIC(source_debug);
 struct GstLibcameraSrcState {
 	std::shared_ptr<CameraManager> cm;
 	std::shared_ptr<Camera> cam;
+	std::unique_ptr<CameraConfiguration> config;
 	std::vector<GstPad *> srcpads;
 };
 
@@ -132,16 +133,86 @@  gst_libcamera_src_task_enter(GstTask *task, GThread *thread, gpointer user_data)
 	STREAM_LOCKER(user_data);
 	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
 	GstLibcameraSrcState *state = self->state;
+	GstFlowReturn flow_ret = GST_FLOW_OK;
 
 	GST_DEBUG_OBJECT(self, "Streaming thread has started");
 
 	guint group_id = gst_util_group_id_next();
+	StreamRoles roles;
 	for (GstPad *srcpad : state->srcpads) {
 		/* Create stream-id and push stream-start */
 		g_autofree gchar *stream_id = gst_pad_create_stream_id(srcpad, GST_ELEMENT(self), nullptr);
 		GstEvent *event = gst_event_new_stream_start(stream_id);
 		gst_event_set_group_id(event, group_id);
 		gst_pad_push_event(srcpad, event);
+
+		/* Collect the streams roles for the next iteration */
+		roles.push_back(gst_libcamera_pad_get_role(srcpad));
+	}
+
+	/* Generate the stream configurations, there should be one per pad */
+	state->config = state->cam->generateConfiguration(roles);
+	g_assert(state->config->size() == state->srcpads.size());
+
+	for (gsize i = 0; i < state->srcpads.size(); i++) {
+		GstPad *srcpad = state->srcpads[i];
+		StreamConfiguration &stream_cfg = state->config->at(i);
+
+		/* Retreive the supported caps */
+		g_autoptr(GstCaps) filter = gst_libcamera_stream_formats_to_caps(stream_cfg.formats());
+		g_autoptr(GstCaps) caps = gst_pad_peer_query_caps(srcpad, filter);
+		if (gst_caps_is_empty(caps)) {
+			flow_ret = GST_FLOW_NOT_NEGOTIATED;
+			break;
+		}
+
+		/* Fixate caps and configure the stream */
+		caps = gst_caps_make_writable(caps);
+		gst_libcamera_configure_stream_from_caps(stream_cfg, caps);
+	}
+
+	if (flow_ret != GST_FLOW_OK)
+		goto done;
+
+	/* Validate the configuration */
+	if (state->config->validate() == CameraConfiguration::Invalid) {
+		flow_ret = GST_FLOW_NOT_NEGOTIATED;
+		goto done;
+	}
+
+	/* Regardless if it has been modified, create clean caps and push the
+	 * caps event, downstream will decide if hte caps are acceptable */
+	for (gsize i = 0; i < state->srcpads.size(); i++) {
+		GstPad *srcpad = state->srcpads[i];
+		const StreamConfiguration &stream_cfg = state->config->at(i);
+
+		g_autoptr(GstCaps) caps = gst_libcamera_stream_configuration_to_caps(stream_cfg);
+		if (!gst_pad_push_event(srcpad, gst_event_new_caps(caps))) {
+			flow_ret = GST_FLOW_NOT_NEGOTIATED;
+			break;
+		}
+	}
+
+	ret = state->cam->configure(state->config.get());
+	if (ret) {
+		GST_ELEMENT_ERROR(self, RESOURCE, SETTINGS,
+				  ("Failed to configure camera: %s", g_strerror(-ret)),
+				  ("Camera.configure() failed with error code %i", ret));
+		gst_task_stop(task);
+		return;
+	}
+
+done:
+	switch (flow_ret) {
+	case GST_FLOW_NOT_NEGOTIATED:
+		for (GstPad *srcpad : state->srcpads) {
+			gst_pad_push_event(srcpad, gst_event_new_eos());
+		}
+		GST_ELEMENT_FLOW_ERROR(self, flow_ret);
+		gst_task_stop(task);
+		return;
+	default:
+		break;
 	}
 }