Message ID | 20250224185235.43381-6-mzamazal@redhat.com |
---|---|
State | Changes Requested |
Headers | show |
Series |
|
Related | show |
Hi Milan, Thank you for the patch. On Mon, Feb 24, 2025 at 07:52:35PM +0100, Milan Zamazal wrote: > There may be pending messages in SoftwareIsp message queue when > SoftwareIsp stops. The call to IPAProxySoft::stop() will dispatch them > before SoftwareIsp::stop() finishes. But this is dependent on > IPAProxySoft::stop() implementation and we can do better to ensure they > are dispatched before SoftwareIsp::stop() finishes. > > Let's introduce new `receiver' argument to Thread::dispatchMessages(), > limiting dispatching the messages to the given receiver. Now we can > flush the messages destined for the SoftwareIsp instance explicitly. > And the IPA proxy can flush just the messages destined for itself. > Other messages of the given thread remain queued and will be handled > elsewhere as appropriate. > > Signed-off-by: Milan Zamazal <mzamazal@redhat.com> > --- > include/libcamera/base/thread.h | 3 ++- > src/libcamera/base/thread.cpp | 15 ++++++++++----- Could you split this to a separate patch ? Something like libcamera: base: thread: Support dispatching messages for specific receiver The Thread::dispatchMessage() function supports filtering messages based on their type. It can be useful to also dispatch only messages posted for a specific receiver. Add an optional receiver argument to the dispatchMessage() function to do so. When set to null (the default value), the behaviour of the function is not changed. > src/libcamera/software_isp/software_isp.cpp | 3 +++ > .../libcamera_templates/proxy_functions.tmpl | 2 +- prxoy_function.tmpl can then also be split to a separate patch. utils: ipc: Only dispatch messages for proxy when stopping thread When stopping the proxy thread, all messages of InvokeMessage type posted to the pipeline handler thread are dispatched, to ensure that all signals emitted from the proxy thread and queued for delivery to the proxy are delivered synchronously. This unnecessarily delivers queued signals for other objects in the pipeline handler thread, possibly delaying processing of higher priority events. Improve the implementation by limiting synchronous delivery to messages posted for the proxy. In case a problem is found later with the IPA proxy, it will be easier to revert this specific change. > 4 files changed, 16 insertions(+), 7 deletions(-) > > diff --git a/include/libcamera/base/thread.h b/include/libcamera/base/thread.h > index 3cbf6398..b9284c2c 100644 > --- a/include/libcamera/base/thread.h > +++ b/include/libcamera/base/thread.h > @@ -48,7 +48,8 @@ public: > > EventDispatcher *eventDispatcher(); > > - void dispatchMessages(Message::Type type = Message::Type::None); > + void dispatchMessages(Message::Type type = Message::Type::None, > + Object *receiver = nullptr); > > protected: > int exec(); > diff --git a/src/libcamera/base/thread.cpp b/src/libcamera/base/thread.cpp > index 02128f23..6f22f4ed 100644 > --- a/src/libcamera/base/thread.cpp > +++ b/src/libcamera/base/thread.cpp > @@ -603,6 +603,8 @@ void Thread::removeMessages(Object *receiver) > /** > * \brief Dispatch posted messages for this thread > * \param[in] type The message type > + * \param[in] receiver If not null, dispatch only messages for the given > + * receiver > * > * This function immediately dispatches all the messages previously posted for > * this thread with postMessage() that match the message \a type. If the \a type Let's expand this paragraph to explain the receiver parameter: * This function immediately dispatches all the messages of the given \a type * previously posted to this thread for the \a receiver with postMessage(). If * the \a type is Message::Type::None, all messages types are dispatched. If the * \a receiver is null, messages to all receivers are dispatched. I think you can then shorten the documentation of the parameter: * \param[in] receiver The receiver whose messages to dispatch > @@ -616,7 +618,7 @@ void Thread::removeMessages(Object *receiver) > * same thread from an object's message handler. It guarantees delivery of > * messages in the order they have been posted in all cases. > */ > -void Thread::dispatchMessages(Message::Type type) > +void Thread::dispatchMessages(Message::Type type, Object *receiver) > { > ASSERT(data_ == ThreadData::current()); > > @@ -633,6 +635,9 @@ void Thread::dispatchMessages(Message::Type type) > if (type != Message::Type::None && msg->type() != type) > continue; > > + if (receiver && receiver != msg->receiver_) > + continue; > + > /* > * Move the message, setting the entry in the list to null. It > * will cause recursive calls to ignore the entry, and the erase > @@ -640,12 +645,12 @@ void Thread::dispatchMessages(Message::Type type) > */ > std::unique_ptr<Message> message = std::move(msg); > > - Object *receiver = message->receiver_; > - ASSERT(data_ == receiver->thread()->data_); > - receiver->pendingMessages_--; > + Object *messageReceiver = message->receiver_; > + ASSERT(data_ == messageReceiver->thread()->data_); > + messageReceiver->pendingMessages_--; > > locker.unlock(); > - receiver->message(message.get()); > + messageReceiver->message(message.get()); > message.reset(); > locker.lock(); > } > diff --git a/src/libcamera/software_isp/software_isp.cpp b/src/libcamera/software_isp/software_isp.cpp > index 3a605ab2..8f5ee774 100644 > --- a/src/libcamera/software_isp/software_isp.cpp > +++ b/src/libcamera/software_isp/software_isp.cpp > @@ -13,6 +13,8 @@ > #include <sys/types.h> > #include <unistd.h> > > +#include <libcamera/base/thread.h> > + > #include <libcamera/controls.h> > #include <libcamera/formats.h> > #include <libcamera/stream.h> > @@ -339,6 +341,7 @@ void SoftwareIsp::stop() > ispWorkerThread_.wait(); > > running_ = false; > + Thread::current()->dispatchMessages(Message::Type::InvokeMessage, this); I'd add a blank line here. I think you can also swap the two lines. Dispatching the messages first will ensure that any queued signal that indicates completion of a buffer will be delivered to the pipeline handler. That may push one extra frame to the application. On the other hand, as we're stopping the ISP, maybe the application doesn't care. I wonder if we could actually drop the running_ variable now. By dispatching all pending messages, we ensure that no new calls to the IPA will be made from signal handlers invoked from the dispatchMessages() call from within ipa_->stop(). > ipa_->stop(); > > for (auto buffer : queuedOutputBuffers_) { > diff --git a/utils/codegen/ipc/generators/libcamera_templates/proxy_functions.tmpl b/utils/codegen/ipc/generators/libcamera_templates/proxy_functions.tmpl > index b5797b14..25476990 100644 > --- a/utils/codegen/ipc/generators/libcamera_templates/proxy_functions.tmpl > +++ b/utils/codegen/ipc/generators/libcamera_templates/proxy_functions.tmpl > @@ -34,7 +34,7 @@ > thread_.exit(); > thread_.wait(); > > - Thread::current()->dispatchMessages(Message::Type::InvokeMessage); > + Thread::current()->dispatchMessages(Message::Type::InvokeMessage, this); > > state_ = ProxyStopped; > {%- endmacro -%}
Hi Laurent, thank you for review. Laurent Pinchart <laurent.pinchart@ideasonboard.com> writes: > Hi Milan, > > Thank you for the patch. > > On Mon, Feb 24, 2025 at 07:52:35PM +0100, Milan Zamazal wrote: >> There may be pending messages in SoftwareIsp message queue when >> SoftwareIsp stops. The call to IPAProxySoft::stop() will dispatch them >> before SoftwareIsp::stop() finishes. But this is dependent on >> IPAProxySoft::stop() implementation and we can do better to ensure they >> are dispatched before SoftwareIsp::stop() finishes. >> >> Let's introduce new `receiver' argument to Thread::dispatchMessages(), >> limiting dispatching the messages to the given receiver. Now we can >> flush the messages destined for the SoftwareIsp instance explicitly. >> And the IPA proxy can flush just the messages destined for itself. >> Other messages of the given thread remain queued and will be handled >> elsewhere as appropriate. >> >> Signed-off-by: Milan Zamazal <mzamazal@redhat.com> >> --- >> include/libcamera/base/thread.h | 3 ++- >> src/libcamera/base/thread.cpp | 15 ++++++++++----- > > Could you split this to a separate patch ? Something like > > libcamera: base: thread: Support dispatching messages for specific receiver > > The Thread::dispatchMessage() function supports filtering messages based > on their type. It can be useful to also dispatch only messages posted > for a specific receiver. Add an optional receiver argument to the > dispatchMessage() function to do so. When set to null (the default > value), the behaviour of the function is not changed. > >> src/libcamera/software_isp/software_isp.cpp | 3 +++ >> .../libcamera_templates/proxy_functions.tmpl | 2 +- > > prxoy_function.tmpl can then also be split to a separate patch. > > utils: ipc: Only dispatch messages for proxy when stopping thread > > When stopping the proxy thread, all messages of InvokeMessage type > posted to the pipeline handler thread are dispatched, to ensure that all > signals emitted from the proxy thread and queued for delivery to the > proxy are delivered synchronously. This unnecessarily delivers queued > signals for other objects in the pipeline handler thread, possibly > delaying processing of higher priority events. > > Improve the implementation by limiting synchronous delivery to messages > posted for the proxy. > > > > In case a problem is found later with the IPA proxy, it will be easier > to revert this specific change. > >> 4 files changed, 16 insertions(+), 7 deletions(-) >> >> diff --git a/include/libcamera/base/thread.h b/include/libcamera/base/thread.h >> index 3cbf6398..b9284c2c 100644 >> --- a/include/libcamera/base/thread.h >> +++ b/include/libcamera/base/thread.h >> @@ -48,7 +48,8 @@ public: >> >> EventDispatcher *eventDispatcher(); >> >> - void dispatchMessages(Message::Type type = Message::Type::None); >> + void dispatchMessages(Message::Type type = Message::Type::None, >> + Object *receiver = nullptr); >> >> protected: >> int exec(); >> diff --git a/src/libcamera/base/thread.cpp b/src/libcamera/base/thread.cpp >> index 02128f23..6f22f4ed 100644 >> --- a/src/libcamera/base/thread.cpp >> +++ b/src/libcamera/base/thread.cpp >> @@ -603,6 +603,8 @@ void Thread::removeMessages(Object *receiver) >> /** >> * \brief Dispatch posted messages for this thread >> * \param[in] type The message type >> + * \param[in] receiver If not null, dispatch only messages for the given >> + * receiver >> * >> * This function immediately dispatches all the messages previously posted for >> * this thread with postMessage() that match the message \a type. If the \a type > > Let's expand this paragraph to explain the receiver parameter: > > * This function immediately dispatches all the messages of the given \a type > * previously posted to this thread for the \a receiver with postMessage(). If > * the \a type is Message::Type::None, all messages types are dispatched. If the > * \a receiver is null, messages to all receivers are dispatched. > > I think you can then shorten the documentation of the parameter: > > * \param[in] receiver The receiver whose messages to dispatch > >> @@ -616,7 +618,7 @@ void Thread::removeMessages(Object *receiver) >> * same thread from an object's message handler. It guarantees delivery of >> * messages in the order they have been posted in all cases. >> */ >> -void Thread::dispatchMessages(Message::Type type) >> +void Thread::dispatchMessages(Message::Type type, Object *receiver) >> { >> ASSERT(data_ == ThreadData::current()); >> >> @@ -633,6 +635,9 @@ void Thread::dispatchMessages(Message::Type type) >> if (type != Message::Type::None && msg->type() != type) >> continue; >> >> + if (receiver && receiver != msg->receiver_) >> + continue; >> + >> /* >> * Move the message, setting the entry in the list to null. It >> * will cause recursive calls to ignore the entry, and the erase >> @@ -640,12 +645,12 @@ void Thread::dispatchMessages(Message::Type type) >> */ >> std::unique_ptr<Message> message = std::move(msg); >> >> - Object *receiver = message->receiver_; >> - ASSERT(data_ == receiver->thread()->data_); >> - receiver->pendingMessages_--; >> + Object *messageReceiver = message->receiver_; >> + ASSERT(data_ == messageReceiver->thread()->data_); >> + messageReceiver->pendingMessages_--; >> >> locker.unlock(); >> - receiver->message(message.get()); >> + messageReceiver->message(message.get()); >> message.reset(); >> locker.lock(); >> } >> diff --git a/src/libcamera/software_isp/software_isp.cpp b/src/libcamera/software_isp/software_isp.cpp >> index 3a605ab2..8f5ee774 100644 >> --- a/src/libcamera/software_isp/software_isp.cpp >> +++ b/src/libcamera/software_isp/software_isp.cpp >> @@ -13,6 +13,8 @@ >> #include <sys/types.h> >> #include <unistd.h> >> >> +#include <libcamera/base/thread.h> >> + >> #include <libcamera/controls.h> >> #include <libcamera/formats.h> >> #include <libcamera/stream.h> >> @@ -339,6 +341,7 @@ void SoftwareIsp::stop() >> ispWorkerThread_.wait(); >> >> running_ = false; >> + Thread::current()->dispatchMessages(Message::Type::InvokeMessage, this); > > I'd add a blank line here. > > I think you can also swap the two lines. Dispatching the messages first > will ensure that any queued signal that indicates completion of a buffer > will be delivered to the pipeline handler. That may push one extra frame > to the application. On the other hand, as we're stopping the ISP, maybe > the application doesn't care. > > I wonder if we could actually drop the running_ variable now. I think we can and it works for me. I'll do it in v3. > By dispatching all pending messages, we ensure that no new calls to > the IPA will be made from signal handlers invoked from the > dispatchMessages() call from within ipa_->stop(). > >> ipa_->stop(); >> >> for (auto buffer : queuedOutputBuffers_) { >> diff --git a/utils/codegen/ipc/generators/libcamera_templates/proxy_functions.tmpl b/utils/codegen/ipc/generators/libcamera_templates/proxy_functions.tmpl >> index b5797b14..25476990 100644 >> --- a/utils/codegen/ipc/generators/libcamera_templates/proxy_functions.tmpl >> +++ b/utils/codegen/ipc/generators/libcamera_templates/proxy_functions.tmpl >> @@ -34,7 +34,7 @@ >> thread_.exit(); >> thread_.wait(); >> >> - Thread::current()->dispatchMessages(Message::Type::InvokeMessage); >> + Thread::current()->dispatchMessages(Message::Type::InvokeMessage, this); >> >> state_ = ProxyStopped; >> {%- endmacro -%}
diff --git a/include/libcamera/base/thread.h b/include/libcamera/base/thread.h index 3cbf6398..b9284c2c 100644 --- a/include/libcamera/base/thread.h +++ b/include/libcamera/base/thread.h @@ -48,7 +48,8 @@ public: EventDispatcher *eventDispatcher(); - void dispatchMessages(Message::Type type = Message::Type::None); + void dispatchMessages(Message::Type type = Message::Type::None, + Object *receiver = nullptr); protected: int exec(); diff --git a/src/libcamera/base/thread.cpp b/src/libcamera/base/thread.cpp index 02128f23..6f22f4ed 100644 --- a/src/libcamera/base/thread.cpp +++ b/src/libcamera/base/thread.cpp @@ -603,6 +603,8 @@ void Thread::removeMessages(Object *receiver) /** * \brief Dispatch posted messages for this thread * \param[in] type The message type + * \param[in] receiver If not null, dispatch only messages for the given + * receiver * * This function immediately dispatches all the messages previously posted for * this thread with postMessage() that match the message \a type. If the \a type @@ -616,7 +618,7 @@ void Thread::removeMessages(Object *receiver) * same thread from an object's message handler. It guarantees delivery of * messages in the order they have been posted in all cases. */ -void Thread::dispatchMessages(Message::Type type) +void Thread::dispatchMessages(Message::Type type, Object *receiver) { ASSERT(data_ == ThreadData::current()); @@ -633,6 +635,9 @@ void Thread::dispatchMessages(Message::Type type) if (type != Message::Type::None && msg->type() != type) continue; + if (receiver && receiver != msg->receiver_) + continue; + /* * Move the message, setting the entry in the list to null. It * will cause recursive calls to ignore the entry, and the erase @@ -640,12 +645,12 @@ void Thread::dispatchMessages(Message::Type type) */ std::unique_ptr<Message> message = std::move(msg); - Object *receiver = message->receiver_; - ASSERT(data_ == receiver->thread()->data_); - receiver->pendingMessages_--; + Object *messageReceiver = message->receiver_; + ASSERT(data_ == messageReceiver->thread()->data_); + messageReceiver->pendingMessages_--; locker.unlock(); - receiver->message(message.get()); + messageReceiver->message(message.get()); message.reset(); locker.lock(); } diff --git a/src/libcamera/software_isp/software_isp.cpp b/src/libcamera/software_isp/software_isp.cpp index 3a605ab2..8f5ee774 100644 --- a/src/libcamera/software_isp/software_isp.cpp +++ b/src/libcamera/software_isp/software_isp.cpp @@ -13,6 +13,8 @@ #include <sys/types.h> #include <unistd.h> +#include <libcamera/base/thread.h> + #include <libcamera/controls.h> #include <libcamera/formats.h> #include <libcamera/stream.h> @@ -339,6 +341,7 @@ void SoftwareIsp::stop() ispWorkerThread_.wait(); running_ = false; + Thread::current()->dispatchMessages(Message::Type::InvokeMessage, this); ipa_->stop(); for (auto buffer : queuedOutputBuffers_) { diff --git a/utils/codegen/ipc/generators/libcamera_templates/proxy_functions.tmpl b/utils/codegen/ipc/generators/libcamera_templates/proxy_functions.tmpl index b5797b14..25476990 100644 --- a/utils/codegen/ipc/generators/libcamera_templates/proxy_functions.tmpl +++ b/utils/codegen/ipc/generators/libcamera_templates/proxy_functions.tmpl @@ -34,7 +34,7 @@ thread_.exit(); thread_.wait(); - Thread::current()->dispatchMessages(Message::Type::InvokeMessage); + Thread::current()->dispatchMessages(Message::Type::InvokeMessage, this); state_ = ProxyStopped; {%- endmacro -%}
There may be pending messages in SoftwareIsp message queue when SoftwareIsp stops. The call to IPAProxySoft::stop() will dispatch them before SoftwareIsp::stop() finishes. But this is dependent on IPAProxySoft::stop() implementation and we can do better to ensure they are dispatched before SoftwareIsp::stop() finishes. Let's introduce new `receiver' argument to Thread::dispatchMessages(), limiting dispatching the messages to the given receiver. Now we can flush the messages destined for the SoftwareIsp instance explicitly. And the IPA proxy can flush just the messages destined for itself. Other messages of the given thread remain queued and will be handled elsewhere as appropriate. Signed-off-by: Milan Zamazal <mzamazal@redhat.com> --- include/libcamera/base/thread.h | 3 ++- src/libcamera/base/thread.cpp | 15 ++++++++++----- src/libcamera/software_isp/software_isp.cpp | 3 +++ .../libcamera_templates/proxy_functions.tmpl | 2 +- 4 files changed, 16 insertions(+), 7 deletions(-)