[v2,5/5] libcamera: software_isp: Modify dispatching messages on stop
diff mbox series

Message ID 20250224185235.43381-6-mzamazal@redhat.com
State Changes Requested
Headers show
Series
  • Fix occasional software ISP assertion error on stop
Related show

Commit Message

Milan Zamazal Feb. 24, 2025, 6:52 p.m. UTC
There may be pending messages in SoftwareIsp message queue when
SoftwareIsp stops.  The call to IPAProxySoft::stop() will dispatch them
before SoftwareIsp::stop() finishes.  But this is dependent on
IPAProxySoft::stop() implementation and we can do better to ensure they
are dispatched before SoftwareIsp::stop() finishes.

Let's introduce new `receiver' argument to Thread::dispatchMessages(),
limiting dispatching the messages to the given receiver.  Now we can
flush the messages destined for the SoftwareIsp instance explicitly.
And the IPA proxy can flush just the messages destined for itself.
Other messages of the given thread remain queued and will be handled
elsewhere as appropriate.

Signed-off-by: Milan Zamazal <mzamazal@redhat.com>
---
 include/libcamera/base/thread.h                   |  3 ++-
 src/libcamera/base/thread.cpp                     | 15 ++++++++++-----
 src/libcamera/software_isp/software_isp.cpp       |  3 +++
 .../libcamera_templates/proxy_functions.tmpl      |  2 +-
 4 files changed, 16 insertions(+), 7 deletions(-)

Comments

Laurent Pinchart Feb. 24, 2025, 9:01 p.m. UTC | #1
Hi Milan,

Thank you for the patch.

On Mon, Feb 24, 2025 at 07:52:35PM +0100, Milan Zamazal wrote:
> There may be pending messages in SoftwareIsp message queue when
> SoftwareIsp stops.  The call to IPAProxySoft::stop() will dispatch them
> before SoftwareIsp::stop() finishes.  But this is dependent on
> IPAProxySoft::stop() implementation and we can do better to ensure they
> are dispatched before SoftwareIsp::stop() finishes.
> 
> Let's introduce new `receiver' argument to Thread::dispatchMessages(),
> limiting dispatching the messages to the given receiver.  Now we can
> flush the messages destined for the SoftwareIsp instance explicitly.
> And the IPA proxy can flush just the messages destined for itself.
> Other messages of the given thread remain queued and will be handled
> elsewhere as appropriate.
> 
> Signed-off-by: Milan Zamazal <mzamazal@redhat.com>
> ---
>  include/libcamera/base/thread.h                   |  3 ++-
>  src/libcamera/base/thread.cpp                     | 15 ++++++++++-----

Could you split this to a separate patch ? Something like

libcamera: base: thread: Support dispatching messages for specific receiver

The Thread::dispatchMessage() function supports filtering messages based
on their type. It can be useful to also dispatch only messages posted
for a specific receiver. Add an optional receiver argument to the
dispatchMessage() function to do so. When set to null (the default
value), the behaviour of the function is not changed.

>  src/libcamera/software_isp/software_isp.cpp       |  3 +++
>  .../libcamera_templates/proxy_functions.tmpl      |  2 +-

prxoy_function.tmpl can then also be split to a separate patch.

utils: ipc: Only dispatch messages for proxy when stopping thread

When stopping the proxy thread, all messages of InvokeMessage type
posted to the pipeline handler thread are dispatched, to ensure that all
signals emitted from the proxy thread and queued for delivery to the
proxy are delivered synchronously. This unnecessarily delivers queued
signals for other objects in the pipeline handler thread, possibly
delaying processing of higher priority events.

Improve the implementation by limiting synchronous delivery to messages
posted for the proxy.



In case a problem is found later with the IPA proxy, it will be easier
to revert this specific change.

>  4 files changed, 16 insertions(+), 7 deletions(-)
> 
> diff --git a/include/libcamera/base/thread.h b/include/libcamera/base/thread.h
> index 3cbf6398..b9284c2c 100644
> --- a/include/libcamera/base/thread.h
> +++ b/include/libcamera/base/thread.h
> @@ -48,7 +48,8 @@ public:
>  
>  	EventDispatcher *eventDispatcher();
>  
> -	void dispatchMessages(Message::Type type = Message::Type::None);
> +	void dispatchMessages(Message::Type type = Message::Type::None,
> +			      Object *receiver = nullptr);
>  
>  protected:
>  	int exec();
> diff --git a/src/libcamera/base/thread.cpp b/src/libcamera/base/thread.cpp
> index 02128f23..6f22f4ed 100644
> --- a/src/libcamera/base/thread.cpp
> +++ b/src/libcamera/base/thread.cpp
> @@ -603,6 +603,8 @@ void Thread::removeMessages(Object *receiver)
>  /**
>   * \brief Dispatch posted messages for this thread
>   * \param[in] type The message type
> + * \param[in] receiver If not null, dispatch only messages for the given
> + *    receiver
>   *
>   * This function immediately dispatches all the messages previously posted for
>   * this thread with postMessage() that match the message \a type. If the \a type

Let's expand this paragraph to explain the receiver parameter:

 * This function immediately dispatches all the messages of the given \a type
 * previously posted to this thread for the \a receiver with postMessage(). If
 * the \a type is Message::Type::None, all messages types are dispatched. If the
 * \a receiver is null, messages to all receivers are dispatched.

I think you can then shorten the documentation of the parameter:

 * \param[in] receiver The receiver whose messages to dispatch

> @@ -616,7 +618,7 @@ void Thread::removeMessages(Object *receiver)
>   * same thread from an object's message handler. It guarantees delivery of
>   * messages in the order they have been posted in all cases.
>   */
> -void Thread::dispatchMessages(Message::Type type)
> +void Thread::dispatchMessages(Message::Type type, Object *receiver)
>  {
>  	ASSERT(data_ == ThreadData::current());
>  
> @@ -633,6 +635,9 @@ void Thread::dispatchMessages(Message::Type type)
>  		if (type != Message::Type::None && msg->type() != type)
>  			continue;
>  
> +		if (receiver && receiver != msg->receiver_)
> +			continue;
> +
>  		/*
>  		 * Move the message, setting the entry in the list to null. It
>  		 * will cause recursive calls to ignore the entry, and the erase
> @@ -640,12 +645,12 @@ void Thread::dispatchMessages(Message::Type type)
>  		 */
>  		std::unique_ptr<Message> message = std::move(msg);
>  
> -		Object *receiver = message->receiver_;
> -		ASSERT(data_ == receiver->thread()->data_);
> -		receiver->pendingMessages_--;
> +		Object *messageReceiver = message->receiver_;
> +		ASSERT(data_ == messageReceiver->thread()->data_);
> +		messageReceiver->pendingMessages_--;
>  
>  		locker.unlock();
> -		receiver->message(message.get());
> +		messageReceiver->message(message.get());
>  		message.reset();
>  		locker.lock();
>  	}
> diff --git a/src/libcamera/software_isp/software_isp.cpp b/src/libcamera/software_isp/software_isp.cpp
> index 3a605ab2..8f5ee774 100644
> --- a/src/libcamera/software_isp/software_isp.cpp
> +++ b/src/libcamera/software_isp/software_isp.cpp
> @@ -13,6 +13,8 @@
>  #include <sys/types.h>
>  #include <unistd.h>
>  
> +#include <libcamera/base/thread.h>
> +
>  #include <libcamera/controls.h>
>  #include <libcamera/formats.h>
>  #include <libcamera/stream.h>
> @@ -339,6 +341,7 @@ void SoftwareIsp::stop()
>  	ispWorkerThread_.wait();
>  
>  	running_ = false;
> +	Thread::current()->dispatchMessages(Message::Type::InvokeMessage, this);

I'd add a blank line here.

I think you can also swap the two lines. Dispatching the messages first
will ensure that any queued signal that indicates completion of a buffer
will be delivered to the pipeline handler. That may push one extra frame
to the application. On the other hand, as we're stopping the ISP, maybe
the application doesn't care.

I wonder if we could actually drop the running_ variable now. By
dispatching all pending messages, we ensure that no new calls to the IPA
will be made from signal handlers invoked from the dispatchMessages()
call from within ipa_->stop().

>  	ipa_->stop();
>  
>  	for (auto buffer : queuedOutputBuffers_) {
> diff --git a/utils/codegen/ipc/generators/libcamera_templates/proxy_functions.tmpl b/utils/codegen/ipc/generators/libcamera_templates/proxy_functions.tmpl
> index b5797b14..25476990 100644
> --- a/utils/codegen/ipc/generators/libcamera_templates/proxy_functions.tmpl
> +++ b/utils/codegen/ipc/generators/libcamera_templates/proxy_functions.tmpl
> @@ -34,7 +34,7 @@
>  	thread_.exit();
>  	thread_.wait();
>  
> -	Thread::current()->dispatchMessages(Message::Type::InvokeMessage);
> +	Thread::current()->dispatchMessages(Message::Type::InvokeMessage, this);
>  
>  	state_ = ProxyStopped;
>  {%- endmacro -%}
Milan Zamazal Feb. 25, 2025, 2:44 p.m. UTC | #2
Hi Laurent,

thank you for review.

Laurent Pinchart <laurent.pinchart@ideasonboard.com> writes:

> Hi Milan,
>
> Thank you for the patch.
>
> On Mon, Feb 24, 2025 at 07:52:35PM +0100, Milan Zamazal wrote:
>> There may be pending messages in SoftwareIsp message queue when
>> SoftwareIsp stops.  The call to IPAProxySoft::stop() will dispatch them
>> before SoftwareIsp::stop() finishes.  But this is dependent on
>> IPAProxySoft::stop() implementation and we can do better to ensure they
>> are dispatched before SoftwareIsp::stop() finishes.
>> 
>> Let's introduce new `receiver' argument to Thread::dispatchMessages(),
>> limiting dispatching the messages to the given receiver.  Now we can
>> flush the messages destined for the SoftwareIsp instance explicitly.
>> And the IPA proxy can flush just the messages destined for itself.
>> Other messages of the given thread remain queued and will be handled
>> elsewhere as appropriate.
>> 
>> Signed-off-by: Milan Zamazal <mzamazal@redhat.com>
>> ---
>>  include/libcamera/base/thread.h                   |  3 ++-
>>  src/libcamera/base/thread.cpp                     | 15 ++++++++++-----
>
> Could you split this to a separate patch ? Something like
>
> libcamera: base: thread: Support dispatching messages for specific receiver
>
> The Thread::dispatchMessage() function supports filtering messages based
> on their type. It can be useful to also dispatch only messages posted
> for a specific receiver. Add an optional receiver argument to the
> dispatchMessage() function to do so. When set to null (the default
> value), the behaviour of the function is not changed.
>
>>  src/libcamera/software_isp/software_isp.cpp       |  3 +++
>>  .../libcamera_templates/proxy_functions.tmpl      |  2 +-
>
> prxoy_function.tmpl can then also be split to a separate patch.
>
> utils: ipc: Only dispatch messages for proxy when stopping thread
>
> When stopping the proxy thread, all messages of InvokeMessage type
> posted to the pipeline handler thread are dispatched, to ensure that all
> signals emitted from the proxy thread and queued for delivery to the
> proxy are delivered synchronously. This unnecessarily delivers queued
> signals for other objects in the pipeline handler thread, possibly
> delaying processing of higher priority events.
>
> Improve the implementation by limiting synchronous delivery to messages
> posted for the proxy.
>
>
>
> In case a problem is found later with the IPA proxy, it will be easier
> to revert this specific change.
>
>>  4 files changed, 16 insertions(+), 7 deletions(-)
>> 
>> diff --git a/include/libcamera/base/thread.h b/include/libcamera/base/thread.h
>> index 3cbf6398..b9284c2c 100644
>> --- a/include/libcamera/base/thread.h
>> +++ b/include/libcamera/base/thread.h
>> @@ -48,7 +48,8 @@ public:
>>  
>>  	EventDispatcher *eventDispatcher();
>>  
>> -	void dispatchMessages(Message::Type type = Message::Type::None);
>> +	void dispatchMessages(Message::Type type = Message::Type::None,
>> +			      Object *receiver = nullptr);
>>  
>>  protected:
>>  	int exec();
>> diff --git a/src/libcamera/base/thread.cpp b/src/libcamera/base/thread.cpp
>> index 02128f23..6f22f4ed 100644
>> --- a/src/libcamera/base/thread.cpp
>> +++ b/src/libcamera/base/thread.cpp
>> @@ -603,6 +603,8 @@ void Thread::removeMessages(Object *receiver)
>>  /**
>>   * \brief Dispatch posted messages for this thread
>>   * \param[in] type The message type
>> + * \param[in] receiver If not null, dispatch only messages for the given
>> + *    receiver
>>   *
>>   * This function immediately dispatches all the messages previously posted for
>>   * this thread with postMessage() that match the message \a type. If the \a type
>
> Let's expand this paragraph to explain the receiver parameter:
>
>  * This function immediately dispatches all the messages of the given \a type
>  * previously posted to this thread for the \a receiver with postMessage(). If
>  * the \a type is Message::Type::None, all messages types are dispatched. If the
>  * \a receiver is null, messages to all receivers are dispatched.
>
> I think you can then shorten the documentation of the parameter:
>
>  * \param[in] receiver The receiver whose messages to dispatch
>
>> @@ -616,7 +618,7 @@ void Thread::removeMessages(Object *receiver)
>>   * same thread from an object's message handler. It guarantees delivery of
>>   * messages in the order they have been posted in all cases.
>>   */
>> -void Thread::dispatchMessages(Message::Type type)
>> +void Thread::dispatchMessages(Message::Type type, Object *receiver)
>>  {
>>  	ASSERT(data_ == ThreadData::current());
>>  
>> @@ -633,6 +635,9 @@ void Thread::dispatchMessages(Message::Type type)
>>  		if (type != Message::Type::None && msg->type() != type)
>>  			continue;
>>  
>> +		if (receiver && receiver != msg->receiver_)
>> +			continue;
>> +
>>  		/*
>>  		 * Move the message, setting the entry in the list to null. It
>>  		 * will cause recursive calls to ignore the entry, and the erase
>> @@ -640,12 +645,12 @@ void Thread::dispatchMessages(Message::Type type)
>>  		 */
>>  		std::unique_ptr<Message> message = std::move(msg);
>>  
>> -		Object *receiver = message->receiver_;
>> -		ASSERT(data_ == receiver->thread()->data_);
>> -		receiver->pendingMessages_--;
>> +		Object *messageReceiver = message->receiver_;
>> +		ASSERT(data_ == messageReceiver->thread()->data_);
>> +		messageReceiver->pendingMessages_--;
>>  
>>  		locker.unlock();
>> -		receiver->message(message.get());
>> +		messageReceiver->message(message.get());
>>  		message.reset();
>>  		locker.lock();
>>  	}
>> diff --git a/src/libcamera/software_isp/software_isp.cpp b/src/libcamera/software_isp/software_isp.cpp
>> index 3a605ab2..8f5ee774 100644
>> --- a/src/libcamera/software_isp/software_isp.cpp
>> +++ b/src/libcamera/software_isp/software_isp.cpp
>> @@ -13,6 +13,8 @@
>>  #include <sys/types.h>
>>  #include <unistd.h>
>>  
>> +#include <libcamera/base/thread.h>
>> +
>>  #include <libcamera/controls.h>
>>  #include <libcamera/formats.h>
>>  #include <libcamera/stream.h>
>> @@ -339,6 +341,7 @@ void SoftwareIsp::stop()
>>  	ispWorkerThread_.wait();
>>  
>>  	running_ = false;
>> +	Thread::current()->dispatchMessages(Message::Type::InvokeMessage, this);
>
> I'd add a blank line here.
>
> I think you can also swap the two lines. Dispatching the messages first
> will ensure that any queued signal that indicates completion of a buffer
> will be delivered to the pipeline handler. That may push one extra frame
> to the application. On the other hand, as we're stopping the ISP, maybe
> the application doesn't care.
>
> I wonder if we could actually drop the running_ variable now. 

I think we can and it works for me.  I'll do it in v3.

> By dispatching all pending messages, we ensure that no new calls to
> the IPA will be made from signal handlers invoked from the
> dispatchMessages() call from within ipa_->stop().
>
>>  	ipa_->stop();
>>  
>>  	for (auto buffer : queuedOutputBuffers_) {
>> diff --git a/utils/codegen/ipc/generators/libcamera_templates/proxy_functions.tmpl b/utils/codegen/ipc/generators/libcamera_templates/proxy_functions.tmpl
>> index b5797b14..25476990 100644
>> --- a/utils/codegen/ipc/generators/libcamera_templates/proxy_functions.tmpl
>> +++ b/utils/codegen/ipc/generators/libcamera_templates/proxy_functions.tmpl
>> @@ -34,7 +34,7 @@
>>  	thread_.exit();
>>  	thread_.wait();
>>  
>> -	Thread::current()->dispatchMessages(Message::Type::InvokeMessage);
>> +	Thread::current()->dispatchMessages(Message::Type::InvokeMessage, this);
>>  
>>  	state_ = ProxyStopped;
>>  {%- endmacro -%}

Patch
diff mbox series

diff --git a/include/libcamera/base/thread.h b/include/libcamera/base/thread.h
index 3cbf6398..b9284c2c 100644
--- a/include/libcamera/base/thread.h
+++ b/include/libcamera/base/thread.h
@@ -48,7 +48,8 @@  public:
 
 	EventDispatcher *eventDispatcher();
 
-	void dispatchMessages(Message::Type type = Message::Type::None);
+	void dispatchMessages(Message::Type type = Message::Type::None,
+			      Object *receiver = nullptr);
 
 protected:
 	int exec();
diff --git a/src/libcamera/base/thread.cpp b/src/libcamera/base/thread.cpp
index 02128f23..6f22f4ed 100644
--- a/src/libcamera/base/thread.cpp
+++ b/src/libcamera/base/thread.cpp
@@ -603,6 +603,8 @@  void Thread::removeMessages(Object *receiver)
 /**
  * \brief Dispatch posted messages for this thread
  * \param[in] type The message type
+ * \param[in] receiver If not null, dispatch only messages for the given
+ *    receiver
  *
  * This function immediately dispatches all the messages previously posted for
  * this thread with postMessage() that match the message \a type. If the \a type
@@ -616,7 +618,7 @@  void Thread::removeMessages(Object *receiver)
  * same thread from an object's message handler. It guarantees delivery of
  * messages in the order they have been posted in all cases.
  */
-void Thread::dispatchMessages(Message::Type type)
+void Thread::dispatchMessages(Message::Type type, Object *receiver)
 {
 	ASSERT(data_ == ThreadData::current());
 
@@ -633,6 +635,9 @@  void Thread::dispatchMessages(Message::Type type)
 		if (type != Message::Type::None && msg->type() != type)
 			continue;
 
+		if (receiver && receiver != msg->receiver_)
+			continue;
+
 		/*
 		 * Move the message, setting the entry in the list to null. It
 		 * will cause recursive calls to ignore the entry, and the erase
@@ -640,12 +645,12 @@  void Thread::dispatchMessages(Message::Type type)
 		 */
 		std::unique_ptr<Message> message = std::move(msg);
 
-		Object *receiver = message->receiver_;
-		ASSERT(data_ == receiver->thread()->data_);
-		receiver->pendingMessages_--;
+		Object *messageReceiver = message->receiver_;
+		ASSERT(data_ == messageReceiver->thread()->data_);
+		messageReceiver->pendingMessages_--;
 
 		locker.unlock();
-		receiver->message(message.get());
+		messageReceiver->message(message.get());
 		message.reset();
 		locker.lock();
 	}
diff --git a/src/libcamera/software_isp/software_isp.cpp b/src/libcamera/software_isp/software_isp.cpp
index 3a605ab2..8f5ee774 100644
--- a/src/libcamera/software_isp/software_isp.cpp
+++ b/src/libcamera/software_isp/software_isp.cpp
@@ -13,6 +13,8 @@ 
 #include <sys/types.h>
 #include <unistd.h>
 
+#include <libcamera/base/thread.h>
+
 #include <libcamera/controls.h>
 #include <libcamera/formats.h>
 #include <libcamera/stream.h>
@@ -339,6 +341,7 @@  void SoftwareIsp::stop()
 	ispWorkerThread_.wait();
 
 	running_ = false;
+	Thread::current()->dispatchMessages(Message::Type::InvokeMessage, this);
 	ipa_->stop();
 
 	for (auto buffer : queuedOutputBuffers_) {
diff --git a/utils/codegen/ipc/generators/libcamera_templates/proxy_functions.tmpl b/utils/codegen/ipc/generators/libcamera_templates/proxy_functions.tmpl
index b5797b14..25476990 100644
--- a/utils/codegen/ipc/generators/libcamera_templates/proxy_functions.tmpl
+++ b/utils/codegen/ipc/generators/libcamera_templates/proxy_functions.tmpl
@@ -34,7 +34,7 @@ 
 	thread_.exit();
 	thread_.wait();
 
-	Thread::current()->dispatchMessages(Message::Type::InvokeMessage);
+	Thread::current()->dispatchMessages(Message::Type::InvokeMessage, this);
 
 	state_ = ProxyStopped;
 {%- endmacro -%}