[libcamera-devel,2/2] gstreamer: Implement renegotiation
diff mbox series

Message ID 20231122135406.14921-3-jaslo@ziska.de
State Superseded
Headers show
Series
  • gstreamer: Implement renegotiation
Related show

Commit Message

Jaslo Ziska Nov. 22, 2023, 1:43 p.m. UTC
This commit implements renegotiation of the camera configuration and
source pad caps. A renegotiation can happen when a downstream element
decides to change caps or the pipeline is dynamically changed.

To handle the renegotiation the GST_FLOW_NOT_NEGOTIATED return value has
to be handled in GstLibcameraSrcState::processRequest. Otherwise the
default would be to print an error and stop streaming.
To archive this in a clean way the if-statement is altered into a switch
statement which now also has a case for GST_FLOW_NOT_NEGOTIATED. Just
like the case for GST_FLOW_OK, the function will return without an error
because the renegotiation will happen later in the running task.

In gst_libcamera_src_task_run all the source pads are checked for the
reconfigure flag by calling gst_pad_check_reconfigure. It is important
to iterate all source pads and not break after one pad requested a
reconfiguration because gst_pad_check_reconfigure clears the reconfigure
flag and if two pads request a reconfiguration at the same time the
renegotiation would happen twice.
If a source pad requested a reconfiguration it is first checked whether
the old caps are still sufficient. If they are not the renegotiation
will happen.
If any pad requested a reconfiguration the following will happen:
1. The camera is stopped because changing the configuration may not
   happen while running.
2. The completedRequests queue will be cleared because the completed
   buffers have the wrong configuration (see below).
3. The new caps are negotiated by calling gst_libcamera_src_negotiate.
   When the negotiation fails streaming will stop.
4. The camera is started again.

Clearing the completedRequests queue is archived with a new method in
GstLibcameraSrcState called clearRequests. This function is now also
used in gst_libcamera_src_task_leave as the code there did the same
thing.

In gst_libcamera_src_task_enter, after the initial negotiation, a call
to gst_pad_check_reconfigure was added to clear the reconfigure flag to
avoid triggering a renegotiation when running the task for the first
time.

Signed-off-by: Jaslo Ziska <jaslo@ziska.de>
---
 src/gstreamer/gstlibcamerasrc.cpp | 72 +++++++++++++++++++++++--------
 1 file changed, 55 insertions(+), 17 deletions(-)

Comments

Nicolas Dufresne Nov. 22, 2023, 3:25 p.m. UTC | #1
Le mercredi 22 novembre 2023 à 14:43 +0100, Jaslo Ziska via libcamera-devel a
écrit :
> This commit implements renegotiation of the camera configuration and
> source pad caps. A renegotiation can happen when a downstream element
> decides to change caps or the pipeline is dynamically changed.
> 
> To handle the renegotiation the GST_FLOW_NOT_NEGOTIATED return value has
> to be handled in GstLibcameraSrcState::processRequest. Otherwise the
> default would be to print an error and stop streaming.
> To archive this in a clean way the if-statement is altered into a switch
> statement which now also has a case for GST_FLOW_NOT_NEGOTIATED. Just
> like the case for GST_FLOW_OK, the function will return without an error
> because the renegotiation will happen later in the running task.

That's an interesting comment, I don't think we expect GST_FLOW_NOT_NEGOTIATED
in the renegotiation process. Downstream should keep handling the current
format, emit an upstream reconfigure event and wait (may drop) but returning
that flow is not really expected. How did you get that ?

> 
> In gst_libcamera_src_task_run all the source pads are checked for the
> reconfigure flag by calling gst_pad_check_reconfigure. It is important
> to iterate all source pads and not break after one pad requested a
> reconfiguration because gst_pad_check_reconfigure clears the reconfigure
> flag and if two pads request a reconfiguration at the same time the
> renegotiation would happen twice.

nit: We may want to break on first, and then just systematically clear all the
reconfigure flags in the negotiation() function. This would then become handy if
we need to renegotiation due to some "signal" behind sent by libcamera itself.

> If a source pad requested a reconfiguration it is first checked whether
> the old caps are still sufficient. If they are not the renegotiation
> will happen.
> If any pad requested a reconfiguration the following will happen:
> 1. The camera is stopped because changing the configuration may not
>    happen while running.
> 2. The completedRequests queue will be cleared because the completed
>    buffers have the wrong configuration (see below).
> 3. The new caps are negotiated by calling gst_libcamera_src_negotiate.
>    When the negotiation fails streaming will stop.
> 4. The camera is started again.
> 
> Clearing the completedRequests queue is archived with a new method in
> GstLibcameraSrcState called clearRequests. This function is now also
> used in gst_libcamera_src_task_leave as the code there did the same
> thing.

Might want to slim down the patch by doing this refactoring in its own patch ?

> 
> In gst_libcamera_src_task_enter, after the initial negotiation, a call
> to gst_pad_check_reconfigure was added to clear the reconfigure flag to
> avoid triggering a renegotiation when running the task for the first
> time.
> 
> Signed-off-by: Jaslo Ziska <jaslo@ziska.de>
> ---
>  src/gstreamer/gstlibcamerasrc.cpp | 72 +++++++++++++++++++++++--------
>  1 file changed, 55 insertions(+), 17 deletions(-)
> 
> diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
> index e7a49fef..868fa20a 100644
> --- a/src/gstreamer/gstlibcamerasrc.cpp
> +++ b/src/gstreamer/gstlibcamerasrc.cpp
> @@ -11,7 +11,6 @@
>   *  - Implement GstElement::send_event
>   *    + Allowing application to use FLUSH/FLUSH_STOP
>   *    + Prevent the main thread from accessing streaming thread
> - *  - Implement renegotiation (even if slow)
>   *  - Implement GstElement::request-new-pad (multi stream)
>   *    + Evaluate if a single streaming thread is fine
>   *  - Add application driven request (snapshot)
> @@ -133,6 +132,7 @@ struct GstLibcameraSrcState {
>  	int queueRequest();
>  	void requestCompleted(Request *request);
>  	int processRequest();
> +	void clearRequests();
>  };
>  
>  struct _GstLibcameraSrc {
> @@ -301,23 +301,39 @@ int GstLibcameraSrcState::processRequest()
>  							srcpad, ret);
>  	}
>  
> -	if (ret != GST_FLOW_OK) {
> -		if (ret == GST_FLOW_EOS) {
> -			g_autoptr(GstEvent) eos = gst_event_new_eos();
> -			guint32 seqnum = gst_util_seqnum_next();
> -			gst_event_set_seqnum(eos, seqnum);
> -			for (GstPad *srcpad : srcpads_)
> -				gst_pad_push_event(srcpad, gst_event_ref(eos));
> -		} else if (ret != GST_FLOW_FLUSHING) {
> -			GST_ELEMENT_FLOW_ERROR(src_, ret);
> -		}
> +	switch (ret) {
> +	case GST_FLOW_OK:
> +	case GST_FLOW_NOT_NEGOTIATED:
> +		break;
> +	case GST_FLOW_EOS: {
> +		g_autoptr(GstEvent) eos = gst_event_new_eos();
> +		guint32 seqnum = gst_util_seqnum_next();
> +		gst_event_set_seqnum(eos, seqnum);
> +		for (GstPad *srcpad : srcpads_)
> +			gst_pad_push_event(srcpad, gst_event_ref(eos));
> +
> +		err = -EPIPE;
> +		break;
> +	}
> +	case GST_FLOW_FLUSHING:
> +		err = -EPIPE;
> +		break;
> +	default:
> +		GST_ELEMENT_FLOW_ERROR(src_, ret);
>  
> -		return -EPIPE;
> +		err = -EPIPE;
> +		break;
>  	}
>  
>  	return err;
>  }
>  
> +void GstLibcameraSrcState::clearRequests()
> +{
> +	GLibLocker locker(&lock_);
> +	completedRequests_ = {};
> +}
> +
>  static bool
>  gst_libcamera_src_open(GstLibcameraSrc *self)
>  {
> @@ -488,6 +504,31 @@ gst_libcamera_src_task_run(gpointer user_data)
>  		return;
>  	}
>  
> +	// check if a srcpad requested a renegotiation
> +	gboolean reconfigure = FALSE;
> +	for (GstPad *srcpad : state->srcpads_) {
> +		if (gst_pad_check_reconfigure(srcpad)) {
> +			// check whether the caps even need changing
> +			g_autoptr(GstCaps) caps = gst_pad_get_current_caps(srcpad);
> +			g_autoptr(GstCaps) intersection = gst_pad_peer_query_caps(srcpad, caps);

Just wondering, but maybe gst_pad_peer_query_accept_caps() would do the job,
with a slightly improved performance ? This is only possible since
gst_pad_get_current_caps() is fixed. At this point, it should cannot be null
either.

> +
> +			if (gst_caps_is_empty(intersection))
> +				reconfigure = TRUE;
> +		}
> +	}
> +
> +	if (reconfigure) {
> +		state->cam_->stop();
> +		state->clearRequests();
> +
> +		if (!gst_libcamera_src_negotiate(self)) {
> +			GST_ELEMENT_FLOW_ERROR(self, GST_FLOW_NOT_NEGOTIATED);
> +			gst_task_stop(self->task);
> +		}
> +
> +		state->cam_->start(&state->initControls_);
> +	}
> +
>  	/*
>  	 * Create and queue one request. If no buffers are available the
>  	 * function returns -ENOBUFS, which we ignore here as that's not a
> @@ -594,6 +635,7 @@ gst_libcamera_src_task_enter(GstTask *task, [[maybe_unused]] GThread *thread,
>  		gst_segment_init(&segment, GST_FORMAT_TIME);
>  		gst_pad_push_event(srcpad, gst_event_new_segment(&segment));
>  
> +		gst_pad_check_reconfigure(srcpad); // clear reconfigure flag
>  	}
>  
>  	if (self->auto_focus_mode != controls::AfModeManual) {
> @@ -629,11 +671,7 @@ gst_libcamera_src_task_leave([[maybe_unused]] GstTask *task,
>  	GST_DEBUG_OBJECT(self, "Streaming thread is about to stop");
>  
>  	state->cam_->stop();
> -
> -	{
> -		GLibLocker locker(&state->lock_);
> -		state->completedRequests_ = {};
> -	}
> +	state->clearRequests();
>  
>  	{
>  		GLibRecLocker locker(&self->stream_lock);
Jaslo Ziska Nov. 23, 2023, 11 a.m. UTC | #2
Hi Nicolas,

thanks for the reviews!

Nicolas Dufresne <nicolas@ndufresne.ca> writes:
> Le mercredi 22 novembre 2023 à 14:43 +0100, Jaslo Ziska via 
> libcamera-devel a
> écrit :
>> This commit implements renegotiation of the camera 
>> configuration and
>> source pad caps. A renegotiation can happen when a downstream 
>> element
>> decides to change caps or the pipeline is dynamically changed.
>>
>> To handle the renegotiation the GST_FLOW_NOT_NEGOTIATED return 
>> value has
>> to be handled in GstLibcameraSrcState::processRequest. 
>> Otherwise the
>> default would be to print an error and stop streaming.
>> To archive this in a clean way the if-statement is altered into 
>> a switch
>> statement which now also has a case for 
>> GST_FLOW_NOT_NEGOTIATED. Just
>> like the case for GST_FLOW_OK, the function will return without 
>> an error
>> because the renegotiation will happen later in the running 
>> task.
>
> That's an interesting comment, I don't think we expect 
> GST_FLOW_NOT_NEGOTIATED
> in the renegotiation process. Downstream should keep handling 
> the current
> format, emit an upstream reconfigure event and wait (may drop) 
> but returning
> that flow is not really expected. How did you get that ?

You are right, the downstream element does send the 
GST_EVENT_RECONFIGURE event. And I think we could handle this just 
like with the EOS event by setting an atomic and handling it in 
the running task. The way that I solved it now is similar to how 
GstBaseSrc does it, they also check for reconfigure with 
gst_pad_check_reconfigure.

As to why we receive GST_FLOW_NOT_NEGOTIATED: I'm not really sure 
what is supposed to happen in this situation as the documentation 
is a little scarce around this. I test this by dynamically 
switching capsfilter elements with different caps so I think it 
makes sense that the new capsfilter does not accept the buffers 
and instead returns GST_FLOW_NOT_NEGOTIATED. Maybe other elements 
can keep processing the buffers with the old caps and only send 
the event?
But GstBaseSrc also has a case where they check for 
GST_FLOW_NOT_NEGOTIATED (although they then also check if it 
really is a reconfiguration with gst_pad_needs_reconfigure, maybe 
I should add that) so it might be that this is supposed to happen.

>>
>> In gst_libcamera_src_task_run all the source pads are checked 
>> for the
>> reconfigure flag by calling gst_pad_check_reconfigure. It is 
>> important
>> to iterate all source pads and not break after one pad 
>> requested a
>> reconfiguration because gst_pad_check_reconfigure clears the 
>> reconfigure
>> flag and if two pads request a reconfiguration at the same time 
>> the
>> renegotiation would happen twice.
>
> nit: We may want to break on first, and then just systematically 
> clear all the
> reconfigure flags in the negotiation() function. This would then 
> become handy if
> we need to renegotiation due to some "signal" behind sent by 
> libcamera itself.

That's a good idea, I will add that.

>> If a source pad requested a reconfiguration it is first checked 
>> whether
>> the old caps are still sufficient. If they are not the 
>> renegotiation
>> will happen.
>> If any pad requested a reconfiguration the following will 
>> happen:
>> 1. The camera is stopped because changing the configuration may 
>> not
>>    happen while running.
>> 2. The completedRequests queue will be cleared because the 
>> completed
>>    buffers have the wrong configuration (see below).
>> 3. The new caps are negotiated by calling 
>> gst_libcamera_src_negotiate.
>>    When the negotiation fails streaming will stop.
>> 4. The camera is started again.
>>
>> Clearing the completedRequests queue is archived with a new 
>> method in
>> GstLibcameraSrcState called clearRequests. This function is now 
>> also
>> used in gst_libcamera_src_task_leave as the code there did the 
>> same
>> thing.
>
> Might want to slim down the patch by doing this refactoring in 
> its own patch ?

Ok, I will do that.

>>
>> In gst_libcamera_src_task_enter, after the initial negotiation, 
>> a call
>> to gst_pad_check_reconfigure was added to clear the reconfigure 
>> flag to
>> avoid triggering a renegotiation when running the task for the 
>> first
>> time.
>>
>> Signed-off-by: Jaslo Ziska <jaslo@ziska.de>
>> ---
>>  src/gstreamer/gstlibcamerasrc.cpp | 72 
>>  +++++++++++++++++++++++--------
>>  1 file changed, 55 insertions(+), 17 deletions(-)
>>
>> diff --git a/src/gstreamer/gstlibcamerasrc.cpp 
>> b/src/gstreamer/gstlibcamerasrc.cpp
>> index e7a49fef..868fa20a 100644
>> --- a/src/gstreamer/gstlibcamerasrc.cpp
>> +++ b/src/gstreamer/gstlibcamerasrc.cpp
>> @@ -11,7 +11,6 @@
>>   *  - Implement GstElement::send_event
>>   *    + Allowing application to use FLUSH/FLUSH_STOP
>>   *    + Prevent the main thread from accessing streaming 
>>   thread
>> - *  - Implement renegotiation (even if slow)
>>   *  - Implement GstElement::request-new-pad (multi stream)
>>   *    + Evaluate if a single streaming thread is fine
>>   *  - Add application driven request (snapshot)
>> @@ -133,6 +132,7 @@ struct GstLibcameraSrcState {
>>  	int queueRequest();
>>  	void requestCompleted(Request *request);
>>  	int processRequest();
>> +	void clearRequests();
>>  };
>>
>>  struct _GstLibcameraSrc {
>> @@ -301,23 +301,39 @@ int 
>> GstLibcameraSrcState::processRequest()
>>  							srcpad, ret);
>>  	}
>>
>> -	if (ret != GST_FLOW_OK) {
>> -		if (ret == GST_FLOW_EOS) {
>> -			g_autoptr(GstEvent) eos = gst_event_new_eos();
>> -			guint32 seqnum = gst_util_seqnum_next();
>> -			gst_event_set_seqnum(eos, seqnum);
>> -			for (GstPad *srcpad : srcpads_)
>> -				gst_pad_push_event(srcpad, 
>> gst_event_ref(eos));
>> -		} else if (ret != GST_FLOW_FLUSHING) {
>> -			GST_ELEMENT_FLOW_ERROR(src_, ret);
>> -		}
>> +	switch (ret) {
>> +	case GST_FLOW_OK:
>> +	case GST_FLOW_NOT_NEGOTIATED:
>> +		break;
>> +	case GST_FLOW_EOS: {
>> +		g_autoptr(GstEvent) eos = gst_event_new_eos();
>> +		guint32 seqnum = gst_util_seqnum_next();
>> +		gst_event_set_seqnum(eos, seqnum);
>> +		for (GstPad *srcpad : srcpads_)
>> +			gst_pad_push_event(srcpad, gst_event_ref(eos));
>> +
>> +		err = -EPIPE;
>> +		break;
>> +	}
>> +	case GST_FLOW_FLUSHING:
>> +		err = -EPIPE;
>> +		break;
>> +	default:
>> +		GST_ELEMENT_FLOW_ERROR(src_, ret);
>>
>> -		return -EPIPE;
>> +		err = -EPIPE;
>> +		break;
>>  	}
>>
>>  	return err;
>>  }
>>
>> +void GstLibcameraSrcState::clearRequests()
>> +{
>> +	GLibLocker locker(&lock_);
>> +	completedRequests_ = {};
>> +}
>> +
>>  static bool
>>  gst_libcamera_src_open(GstLibcameraSrc *self)
>>  {
>> @@ -488,6 +504,31 @@ gst_libcamera_src_task_run(gpointer 
>> user_data)
>>  		return;
>>  	}
>>
>> +	// check if a srcpad requested a renegotiation
>> +	gboolean reconfigure = FALSE;
>> +	for (GstPad *srcpad : state->srcpads_) {
>> +		if (gst_pad_check_reconfigure(srcpad)) {
>> +			// check whether the caps even need changing
>> +			g_autoptr(GstCaps) caps = 
>> gst_pad_get_current_caps(srcpad);
>> +			g_autoptr(GstCaps) intersection = 
>> gst_pad_peer_query_caps(srcpad, caps);
>
> Just wondering, but maybe gst_pad_peer_query_accept_caps() would 
> do the job,
> with a slightly improved performance ? This is only possible 
> since
> gst_pad_get_current_caps() is fixed. At this point, it should 
> cannot be null
> either.

I will test if this works and if it does add it to v2.

Regards,

Jaslo

>> +
>> +			if (gst_caps_is_empty(intersection))
>> +				reconfigure = TRUE;
>> +		}
>> +	}
>> +
>> +	if (reconfigure) {
>> +		state->cam_->stop();
>> +		state->clearRequests();
>> +
>> +		if (!gst_libcamera_src_negotiate(self)) {
>> +			GST_ELEMENT_FLOW_ERROR(self, 
>> GST_FLOW_NOT_NEGOTIATED);
>> +			gst_task_stop(self->task);
>> +		}
>> +
>> +		state->cam_->start(&state->initControls_);
>> +	}
>> +
>>  	/*
>>  	 * Create and queue one request. If no buffers are 
>>  available the
>>  	 * function returns -ENOBUFS, which we ignore here as 
>>  that's not a
>> @@ -594,6 +635,7 @@ gst_libcamera_src_task_enter(GstTask *task, 
>> [[maybe_unused]] GThread *thread,
>>  		gst_segment_init(&segment, GST_FORMAT_TIME);
>>  		gst_pad_push_event(srcpad, 
>>  gst_event_new_segment(&segment));
>>
>> +		gst_pad_check_reconfigure(srcpad); // clear 
>> reconfigure flag
>>  	}
>>
>>  	if (self->auto_focus_mode != controls::AfModeManual) {
>> @@ -629,11 +671,7 @@ 
>> gst_libcamera_src_task_leave([[maybe_unused]] GstTask *task,
>>  	GST_DEBUG_OBJECT(self, "Streaming thread is about to 
>>  stop");
>>
>>  	state->cam_->stop();
>> -
>> -	{
>> -		GLibLocker locker(&state->lock_);
>> -		state->completedRequests_ = {};
>> -	}
>> +	state->clearRequests();
>>
>>  	{
>>  		GLibRecLocker locker(&self->stream_lock);

Patch
diff mbox series

diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
index e7a49fef..868fa20a 100644
--- a/src/gstreamer/gstlibcamerasrc.cpp
+++ b/src/gstreamer/gstlibcamerasrc.cpp
@@ -11,7 +11,6 @@ 
  *  - Implement GstElement::send_event
  *    + Allowing application to use FLUSH/FLUSH_STOP
  *    + Prevent the main thread from accessing streaming thread
- *  - Implement renegotiation (even if slow)
  *  - Implement GstElement::request-new-pad (multi stream)
  *    + Evaluate if a single streaming thread is fine
  *  - Add application driven request (snapshot)
@@ -133,6 +132,7 @@  struct GstLibcameraSrcState {
 	int queueRequest();
 	void requestCompleted(Request *request);
 	int processRequest();
+	void clearRequests();
 };
 
 struct _GstLibcameraSrc {
@@ -301,23 +301,39 @@  int GstLibcameraSrcState::processRequest()
 							srcpad, ret);
 	}
 
-	if (ret != GST_FLOW_OK) {
-		if (ret == GST_FLOW_EOS) {
-			g_autoptr(GstEvent) eos = gst_event_new_eos();
-			guint32 seqnum = gst_util_seqnum_next();
-			gst_event_set_seqnum(eos, seqnum);
-			for (GstPad *srcpad : srcpads_)
-				gst_pad_push_event(srcpad, gst_event_ref(eos));
-		} else if (ret != GST_FLOW_FLUSHING) {
-			GST_ELEMENT_FLOW_ERROR(src_, ret);
-		}
+	switch (ret) {
+	case GST_FLOW_OK:
+	case GST_FLOW_NOT_NEGOTIATED:
+		break;
+	case GST_FLOW_EOS: {
+		g_autoptr(GstEvent) eos = gst_event_new_eos();
+		guint32 seqnum = gst_util_seqnum_next();
+		gst_event_set_seqnum(eos, seqnum);
+		for (GstPad *srcpad : srcpads_)
+			gst_pad_push_event(srcpad, gst_event_ref(eos));
+
+		err = -EPIPE;
+		break;
+	}
+	case GST_FLOW_FLUSHING:
+		err = -EPIPE;
+		break;
+	default:
+		GST_ELEMENT_FLOW_ERROR(src_, ret);
 
-		return -EPIPE;
+		err = -EPIPE;
+		break;
 	}
 
 	return err;
 }
 
+void GstLibcameraSrcState::clearRequests()
+{
+	GLibLocker locker(&lock_);
+	completedRequests_ = {};
+}
+
 static bool
 gst_libcamera_src_open(GstLibcameraSrc *self)
 {
@@ -488,6 +504,31 @@  gst_libcamera_src_task_run(gpointer user_data)
 		return;
 	}
 
+	// check if a srcpad requested a renegotiation
+	gboolean reconfigure = FALSE;
+	for (GstPad *srcpad : state->srcpads_) {
+		if (gst_pad_check_reconfigure(srcpad)) {
+			// check whether the caps even need changing
+			g_autoptr(GstCaps) caps = gst_pad_get_current_caps(srcpad);
+			g_autoptr(GstCaps) intersection = gst_pad_peer_query_caps(srcpad, caps);
+
+			if (gst_caps_is_empty(intersection))
+				reconfigure = TRUE;
+		}
+	}
+
+	if (reconfigure) {
+		state->cam_->stop();
+		state->clearRequests();
+
+		if (!gst_libcamera_src_negotiate(self)) {
+			GST_ELEMENT_FLOW_ERROR(self, GST_FLOW_NOT_NEGOTIATED);
+			gst_task_stop(self->task);
+		}
+
+		state->cam_->start(&state->initControls_);
+	}
+
 	/*
 	 * Create and queue one request. If no buffers are available the
 	 * function returns -ENOBUFS, which we ignore here as that's not a
@@ -594,6 +635,7 @@  gst_libcamera_src_task_enter(GstTask *task, [[maybe_unused]] GThread *thread,
 		gst_segment_init(&segment, GST_FORMAT_TIME);
 		gst_pad_push_event(srcpad, gst_event_new_segment(&segment));
 
+		gst_pad_check_reconfigure(srcpad); // clear reconfigure flag
 	}
 
 	if (self->auto_focus_mode != controls::AfModeManual) {
@@ -629,11 +671,7 @@  gst_libcamera_src_task_leave([[maybe_unused]] GstTask *task,
 	GST_DEBUG_OBJECT(self, "Streaming thread is about to stop");
 
 	state->cam_->stop();
-
-	{
-		GLibLocker locker(&state->lock_);
-		state->completedRequests_ = {};
-	}
+	state->clearRequests();
 
 	{
 		GLibRecLocker locker(&self->stream_lock);