libcamera: ipc_unixsocket: Fix sendSync() timeout and hang
diff mbox series

Message ID 20241220145556.3011657-1-julien.vuillaumier@nxp.com
State New
Headers show
Series
  • libcamera: ipc_unixsocket: Fix sendSync() timeout and hang
Related show

Commit Message

Julien Vuillaumier Dec. 20, 2024, 2:55 p.m. UTC
Unix-socket based IPC sometimes times out or hangs, typically
when multiple camera are stopped simulaneously. That specific case
triggers the concurrent sending by each pipeline handler instance
of a synchronous stop() message to its peer IPA process.

There is a dedicated IPC socket per camera. Sockets payload receipt
signals all run in the camera manager thread.
To send a synchronous message to IPA, pipeline invokes from camera
thread IPCPipeUnixSocket::sendSync(). This sends the message then blocks
busy waiting for the peer acknowledgment. Such busy wait is done by
blocking on event loop calling dispatchEvent(), until the ack condition
is detected.

One issue is that the socket receive slot readyRead() wakes up the
blocked thread via libcamera::Message receipt. Even though such message
resumes processEvents(), it may reblock immediately because readyRead()
does not interrupt() explictly the dispatcher.
Most of the time, an other pending event for the thread unblocks the
event dispatcher and the ack condition is detected - in worst case the 2
sec timeout kicks in. Once unblocked, the dispatcher let the message
acknowledgment to be detected and the sendSync() completes.

The other issue is that in case of concurrent synchronous IPC messages
sent by multiple pipeline handlers, there is a possible recursion
of sendSync() / processEvents() nested in the camera thread stack. As
commented in the source, that is a dangerous construct that can lead to
a hang.
The reason is that the last synchronous message sent is the deepest in
the stack. It is also the one whose acknowledgment is being busy waited.
However other pending synchronous messages may have been initiated before
and are upper in the stack. If they timeout, the condition is not
detected because of the stack recursion, as the thread is busy waiting
for the last message to be acknowledged.

This change implements a safer mechanism to handle the synchronous
message sending, similar to the one used for non isolated IPA. The
IPCUnixSocketWrapper class is introduced to handle the IPCUnixSocket
receive signal in a dedicated thread.
Doing so, the sending thread, when emiting a synchronous message, can be
blocked without event dispatcher's processEvents() usage, which avoids
the risky stack recursion.

Fixes: 21f1b555b ("libcamera: Add IPCPipe implementation based on unix socket")

Signed-off-by: Julien Vuillaumier <julien.vuillaumier@nxp.com>
---
 .../libcamera/internal/ipc_pipe_unixsocket.h  |  13 +-
 src/libcamera/ipc_pipe_unixsocket.cpp         | 242 +++++++++++++-----
 2 files changed, 178 insertions(+), 77 deletions(-)

Comments

Kieran Bingham July 21, 2025, 6:08 p.m. UTC | #1
Hi Julien,

I've just seen this in my backlog.

It's ... a substantial change, so it's hard to digest. Is there anything
we can break down here?

Are you running this change at NXP ? 

Quoting Julien Vuillaumier (2024-12-20 14:55:56)
> Unix-socket based IPC sometimes times out or hangs, typically
> when multiple camera are stopped simulaneously. That specific case
> triggers the concurrent sending by each pipeline handler instance
> of a synchronous stop() message to its peer IPA process.
> 
> There is a dedicated IPC socket per camera. Sockets payload receipt
> signals all run in the camera manager thread.
> To send a synchronous message to IPA, pipeline invokes from camera
> thread IPCPipeUnixSocket::sendSync(). This sends the message then blocks
> busy waiting for the peer acknowledgment. Such busy wait is done by
> blocking on event loop calling dispatchEvent(), until the ack condition
> is detected.
> 
> One issue is that the socket receive slot readyRead() wakes up the
> blocked thread via libcamera::Message receipt. Even though such message
> resumes processEvents(), it may reblock immediately because readyRead()
> does not interrupt() explictly the dispatcher.
> Most of the time, an other pending event for the thread unblocks the
> event dispatcher and the ack condition is detected - in worst case the 2
> sec timeout kicks in. Once unblocked, the dispatcher let the message
> acknowledgment to be detected and the sendSync() completes.
> 
> The other issue is that in case of concurrent synchronous IPC messages
> sent by multiple pipeline handlers, there is a possible recursion
> of sendSync() / processEvents() nested in the camera thread stack. As
> commented in the source, that is a dangerous construct that can lead to
> a hang.

I'm not sure I fully understand, - do you have multiple cameras using a
Single IPA - or multiple cameras using multiple IPAs using a single
pipeline handler perhaps?



> The reason is that the last synchronous message sent is the deepest in
> the stack. It is also the one whose acknowledgment is being busy waited.
> However other pending synchronous messages may have been initiated before
> and are upper in the stack. If they timeout, the condition is not
> detected because of the stack recursion, as the thread is busy waiting
> for the last message to be acknowledged.
> 
> This change implements a safer mechanism to handle the synchronous
> message sending, similar to the one used for non isolated IPA. The
> IPCUnixSocketWrapper class is introduced to handle the IPCUnixSocket
> receive signal in a dedicated thread.
> Doing so, the sending thread, when emiting a synchronous message, can be
> blocked without event dispatcher's processEvents() usage, which avoids
> the risky stack recursion.
> 
> Fixes: 21f1b555b ("libcamera: Add IPCPipe implementation based on unix socket")
> 

Paul - you were the author of the commit referenced above - could you
check through this patch please?

> Signed-off-by: Julien Vuillaumier <julien.vuillaumier@nxp.com>
> ---
>  .../libcamera/internal/ipc_pipe_unixsocket.h  |  13 +-
>  src/libcamera/ipc_pipe_unixsocket.cpp         | 242 +++++++++++++-----
>  2 files changed, 178 insertions(+), 77 deletions(-)
> 
> diff --git a/include/libcamera/internal/ipc_pipe_unixsocket.h b/include/libcamera/internal/ipc_pipe_unixsocket.h
> index 8c972613..280639d5 100644
> --- a/include/libcamera/internal/ipc_pipe_unixsocket.h
> +++ b/include/libcamera/internal/ipc_pipe_unixsocket.h
> @@ -16,6 +16,7 @@
>  namespace libcamera {
>  
>  class Process;
> +class IPCUnixSocketWrapper;
>  
>  class IPCPipeUnixSocket : public IPCPipe
>  {
> @@ -29,18 +30,8 @@ public:
>         int sendAsync(const IPCMessage &data) override;
>  
>  private:
> -       struct CallData {
> -               IPCUnixSocket::Payload *response;
> -               bool done;
> -       };
> -
> -       void readyRead();
> -       int call(const IPCUnixSocket::Payload &message,
> -                IPCUnixSocket::Payload *response, uint32_t seq);
> -
>         std::unique_ptr<Process> proc_;
> -       std::unique_ptr<IPCUnixSocket> socket_;
> -       std::map<uint32_t, CallData> callData_;
> +       std::unique_ptr<IPCUnixSocketWrapper> socketWrap_;
>  };
>  
>  } /* namespace libcamera */
> diff --git a/src/libcamera/ipc_pipe_unixsocket.cpp b/src/libcamera/ipc_pipe_unixsocket.cpp
> index 668ec73b..eb5408d4 100644
> --- a/src/libcamera/ipc_pipe_unixsocket.cpp
> +++ b/src/libcamera/ipc_pipe_unixsocket.cpp
> @@ -9,10 +9,9 @@
>  
>  #include <vector>
>  
> -#include <libcamera/base/event_dispatcher.h>
>  #include <libcamera/base/log.h>
> +#include <libcamera/base/mutex.h>
>  #include <libcamera/base/thread.h>
> -#include <libcamera/base/timer.h>
>  
>  #include "libcamera/internal/ipc_pipe.h"
>  #include "libcamera/internal/ipc_unixsocket.h"
> @@ -24,67 +23,161 @@ namespace libcamera {
>  
>  LOG_DECLARE_CATEGORY(IPCPipe)
>  
> -IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath,
> -                                    const char *ipaProxyWorkerPath)
> -       : IPCPipe()
> +class IPCUnixSocketWrapper : Thread

I'm a bit weary of the concept of 'wrapping' an IPCUnixSocket. Doesn't
that simply mean IPCUnixSocket should be re-implemented ? Why do we
wrap it ?

(Sorry - big complicated change - trying to understand as I go through
...)

>  {
> -       std::vector<int> fds;
> -       std::vector<std::string> args;
> -       args.push_back(ipaModulePath);
> +public:
> +       IPCUnixSocketWrapper(Signal<const IPCMessage &> *recv)
> +               : recv_(recv), ready_(false), sendSyncPending_(false),
> +                 sendSyncCookie_(0)
> +       {
> +               start();
> +       }
>  
> -       socket_ = std::make_unique<IPCUnixSocket>();
> -       UniqueFD fd = socket_->create();
> -       if (!fd.isValid()) {
> -               LOG(IPCPipe, Error) << "Failed to create socket";
> -               return;
> +       ~IPCUnixSocketWrapper()
> +       {
> +               exit();
> +               wait();
>         }
> -       socket_->readyRead.connect(this, &IPCPipeUnixSocket::readyRead);
> -       args.push_back(std::to_string(fd.get()));
> -       fds.push_back(fd.get());
>  
> -       proc_ = std::make_unique<Process>();
> -       int ret = proc_->start(ipaProxyWorkerPath, args, fds);
> -       if (ret) {
> -               LOG(IPCPipe, Error)
> -                       << "Failed to start proxy worker process";
> -               return;
> +       void run() override
> +       {
> +               /*
> +                * IPC socket construction and connection to its readyRead
> +                * signal has to be done from the IPC thread so that the
> +                * relevant Object instances (EventNotifier, slot) are bound to
> +                * its context.
> +                */
> +               init();
> +               exec();
> +               deinit();
>         }
>  
> -       connected_ = true;
> -}
> +       int fd() { return fd_.get(); }
> +       int sendSync(const IPCMessage &in, IPCMessage *out);
> +       int sendAsync(const IPCMessage &data);
> +       bool waitReady();
>  
> -IPCPipeUnixSocket::~IPCPipeUnixSocket()
> -{
> -}
> +private:
> +       void init();
> +       void deinit();
> +       void readyRead();
>  
> -int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out)
> +       UniqueFD fd_;
> +       Signal<const IPCMessage &> *recv_;
> +       ConditionVariable cv_;
> +       Mutex mutex_;
> +       bool ready_;
> +       bool sendSyncPending_;
> +       uint32_t sendSyncCookie_;
> +       IPCUnixSocket::Payload *sendSyncResponse_;
> +
> +       /* Socket shall be constructed and destructed from IPC thread context */
> +       std::unique_ptr<IPCUnixSocket> socket_;
> +};
> +
> +int IPCUnixSocketWrapper::sendSync(const IPCMessage &in, IPCMessage *out)
>  {
> +       int ret;
>         IPCUnixSocket::Payload response;
>  
> -       int ret = call(in.payload(), &response, in.header().cookie);
> +       mutex_.lock();
> +       ASSERT(!sendSyncPending_);
> +       sendSyncPending_ = true;
> +       sendSyncCookie_ = in.header().cookie;
> +       sendSyncResponse_ = &response;
> +       mutex_.unlock();
> +
> +       ret = socket_->send(in.payload());
>         if (ret) {
> -               LOG(IPCPipe, Error) << "Failed to call sync";
> -               return ret;
> +               LOG(IPCPipe, Error) << "Failed to send sync message";
> +               goto cleanup;
> +       }
> +
> +       bool complete;
> +       {
> +               MutexLocker locker(mutex_);
> +               auto syncComplete = ([&]() {
> +                       return sendSyncPending_ == false;
> +               });
> +               complete = cv_.wait_for(locker, 1000ms, syncComplete);
> +       }
> +
> +       if (!complete) {
> +               LOG(IPCPipe, Error) << "Timeout sending sync message";
> +               ret = -ETIMEDOUT;
> +               goto cleanup;
>         }
>  
>         if (out)
>                 *out = IPCMessage(response);
>  
>         return 0;
> +
> +cleanup:
> +       mutex_.lock();
> +       sendSyncPending_ = false;
> +       mutex_.unlock();
> +
> +       return ret;
>  }
>  
> -int IPCPipeUnixSocket::sendAsync(const IPCMessage &data)
> +int IPCUnixSocketWrapper::sendAsync(const IPCMessage &data)
>  {
> -       int ret = socket_->send(data.payload());
> -       if (ret) {
> -               LOG(IPCPipe, Error) << "Failed to call async";
> -               return ret;
> +       int ret;
> +       ret = socket_->send(data.payload());
> +       if (ret)
> +               LOG(IPCPipe, Error) << "Failed to send sync message";
> +       return ret;
> +}
> +
> +bool IPCUnixSocketWrapper::waitReady()
> +{
> +       bool ready;
> +       {
> +               MutexLocker locker(mutex_);
> +               auto isReady = ([&]() {
> +                       return ready_;
> +               });
> +               ready = cv_.wait_for(locker, 1000ms, isReady);
>         }
>  
> -       return 0;
> +       return ready;
> +}
> +
> +void IPCUnixSocketWrapper::init()
> +{
> +       /* Init is to be done from the IPC thread context */
> +       ASSERT(Thread::current() == this);
> +
> +       socket_ = std::make_unique<IPCUnixSocket>();
> +       fd_ = socket_->create();
> +       if (!fd_.isValid()) {
> +               LOG(IPCPipe, Error) << "Failed to create socket";
> +               return;
> +       }
> +
> +       socket_->readyRead.connect(this, &IPCUnixSocketWrapper::readyRead);
> +
> +       mutex_.lock();
> +       ready_ = true;
> +       mutex_.unlock();
> +       cv_.notify_one();
>  }
>  
> -void IPCPipeUnixSocket::readyRead()
> +void IPCUnixSocketWrapper::deinit()
> +{
> +       /* Deinit is to be done from the IPC thread context */
> +       ASSERT(Thread::current() == this);
> +
> +       socket_->readyRead.disconnect(this);
> +       socket_.reset();
> +
> +       mutex_.lock();
> +       ready_ = false;
> +       mutex_.unlock();
> +}
> +
> +void IPCUnixSocketWrapper::readyRead()
>  {
>         IPCUnixSocket::Payload payload;
>         int ret = socket_->receive(&payload);
> @@ -93,55 +186,72 @@ void IPCPipeUnixSocket::readyRead()
>                 return;
>         }
>  
> -       /* \todo Use span to avoid the double copy when callData is found. */

Is this todo handled by the patch or made redundant by the patch ?

>         if (payload.data.size() < sizeof(IPCMessage::Header)) {
>                 LOG(IPCPipe, Error) << "Not enough data received";
>                 return;
>         }
>  
> -       IPCMessage ipcMessage(payload);
> +       const IPCMessage::Header *header =
> +               reinterpret_cast<IPCMessage::Header *>(payload.data.data());
> +       bool syncComplete = false;
> +       mutex_.lock();
> +       if (sendSyncPending_ && sendSyncCookie_ == header->cookie) {
> +               syncComplete = true;
> +               sendSyncPending_ = false;
> +               *sendSyncResponse_ = std::move(payload);
> +       }
> +       mutex_.unlock();
>  
> -       auto callData = callData_.find(ipcMessage.header().cookie);
> -       if (callData != callData_.end()) {
> -               *callData->second.response = std::move(payload);
> -               callData->second.done = true;
> +       if (syncComplete) {
> +               cv_.notify_one();
>                 return;
>         }
>  
>         /* Received unexpected data, this means it's a call from the IPA. */
> -       recv.emit(ipcMessage);
> +       IPCMessage ipcMessage(payload);
> +       recv_->emit(ipcMessage);
>  }
>  
> -int IPCPipeUnixSocket::call(const IPCUnixSocket::Payload &message,
> -                           IPCUnixSocket::Payload *response, uint32_t cookie)
> +IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath,
> +                                    const char *ipaProxyWorkerPath)
> +       : IPCPipe()
>  {
> -       Timer timeout;
> -       int ret;
> +       socketWrap_ = std::make_unique<IPCUnixSocketWrapper>(&recv);
> +       if (!socketWrap_->waitReady()) {
> +               LOG(IPCPipe, Error) << "Failed to create socket";
> +               return;
> +       }
> +       int fd = socketWrap_->fd();
>  
> -       const auto result = callData_.insert({ cookie, { response, false } });
> -       const auto &iter = result.first;
> +       std::vector<int> fds;
> +       std::vector<std::string> args;
> +       args.push_back(ipaModulePath);
> +       args.push_back(std::to_string(fd));
> +       fds.push_back(fd);
>  
> -       ret = socket_->send(message);
> +       proc_ = std::make_unique<Process>();
> +       int ret = proc_->start(ipaProxyWorkerPath, args, fds);
>         if (ret) {
> -               callData_.erase(iter);
> -               return ret;
> +               LOG(IPCPipe, Error)
> +                       << "Failed to start proxy worker process";
> +               return;
>         }
>  
> -       /* \todo Make this less dangerous, see IPCPipe::sendSync() */
> -       timeout.start(2000ms);
> -       while (!iter->second.done) {
> -               if (!timeout.isRunning()) {
> -                       LOG(IPCPipe, Error) << "Call timeout!";
> -                       callData_.erase(iter);
> -                       return -ETIMEDOUT;
> -               }
> +       connected_ = true;
> +}
>  
> -               Thread::current()->eventDispatcher()->processEvents();
> -       }
> +IPCPipeUnixSocket::~IPCPipeUnixSocket()
> +{
> +}
>  
> -       callData_.erase(iter);
> +int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out)
> +{
> +       return socketWrap_->sendSync(in, out);
> +}
>  
> -       return 0;
> +int IPCPipeUnixSocket::sendAsync(const IPCMessage &data)
> +{
> +       return socketWrap_->sendAsync(data);
>  }
>  
>  } /* namespace libcamera */
> -- 
> 2.34.1
>
Julien Vuillaumier July 22, 2025, 2:12 p.m. UTC | #2
Hi Kieran,

On 21/07/2025 20:08, Kieran Bingham wrote:
> Hi Julien,
> 
> I've just seen this in my backlog.
> 
> It's ... a substantial change, so it's hard to digest. Is there anything
> we can break down here?

This change introduces the IPCUnixSocketWrapper class to move a part of 
the existing IPCPipeUnixSocket class implementation, the IPC socket 
receive path, in a dedicated thread instead of running it in the camera 
manager thread. Purpose is to be able to properly block the camera 
manager thread execution when sending a synchronous message over IPC. 
The camera manager thread is later resumed by this new thread that 
detects the completion of the sync message sending.

Existing implementation had to be redistributed between the two classes.
The change may look substantial but I don't see how to break it down :/

> 
> Are you running this change at NXP ?

This change is present in the NXP BSP.
It was introduced because of IPC hangs experienced during multi-cameras 
operation, with a third-party IPA running in isolated mode.

Root cause for that issue is a bit intricate. Background is given in the 
commit message - message length was limited to keep it readable though.

> 
> Quoting Julien Vuillaumier (2024-12-20 14:55:56)
>> Unix-socket based IPC sometimes times out or hangs, typically
>> when multiple camera are stopped simulaneously. That specific case
>> triggers the concurrent sending by each pipeline handler instance
>> of a synchronous stop() message to its peer IPA process.
>>
>> There is a dedicated IPC socket per camera. Sockets payload receipt
>> signals all run in the camera manager thread.
>> To send a synchronous message to IPA, pipeline invokes from camera
>> thread IPCPipeUnixSocket::sendSync(). This sends the message then blocks
>> busy waiting for the peer acknowledgment. Such busy wait is done by
>> blocking on event loop calling dispatchEvent(), until the ack condition
>> is detected.
>>
>> One issue is that the socket receive slot readyRead() wakes up the
>> blocked thread via libcamera::Message receipt. Even though such message
>> resumes processEvents(), it may reblock immediately because readyRead()
>> does not interrupt() explictly the dispatcher.
>> Most of the time, an other pending event for the thread unblocks the
>> event dispatcher and the ack condition is detected - in worst case the 2
>> sec timeout kicks in. Once unblocked, the dispatcher let the message
>> acknowledgment to be detected and the sendSync() completes.
>>
>> The other issue is that in case of concurrent synchronous IPC messages
>> sent by multiple pipeline handlers, there is a possible recursion
>> of sendSync() / processEvents() nested in the camera thread stack. As
>> commented in the source, that is a dangerous construct that can lead to
>> a hang.
> 
> I'm not sure I fully understand, - do you have multiple cameras using a
> Single IPA - or multiple cameras using multiple IPAs using a single
> pipeline handler perhaps?

Wording is not very clear indeed, my apologies for this.

Conditions where this issue can be seen is with a pipeline handler 
instance in charge of multiple cameras (same libcamera process/thread). 
Each camera is interfacing with its own instance of the same IPA. Each 
IPA instance is running in isolated mode (third-party control algorithms).
Typical occurrence of the hang is when application is stopping all the 
cameras simultaneously: pipeline handler, from the camera thread 
context, has to send synchronous IPC stop messages to each IPA instance 
and handle concurrently the completion of those messages.

> 
>> The reason is that the last synchronous message sent is the deepest in
>> the stack. It is also the one whose acknowledgment is being busy waited.
>> However other pending synchronous messages may have been initiated before
>> and are upper in the stack. If they timeout, the condition is not
>> detected because of the stack recursion, as the thread is busy waiting
>> for the last message to be acknowledged.
>>
>> This change implements a safer mechanism to handle the synchronous
>> message sending, similar to the one used for non isolated IPA. The
>> IPCUnixSocketWrapper class is introduced to handle the IPCUnixSocket
>> receive signal in a dedicated thread.
>> Doing so, the sending thread, when emiting a synchronous message, can be
>> blocked without event dispatcher's processEvents() usage, which avoids
>> the risky stack recursion.
>>
>> Fixes: 21f1b555b ("libcamera: Add IPCPipe implementation based on unix socket")
>>
> 
> Paul - you were the author of the commit referenced above - could you
> check through this patch please?
> 

Let me know if there's something I can do to help with the review of 
this change.

Thanks,
Julien
Paul Elder Aug. 26, 2025, 10:28 a.m. UTC | #3
Hi Julien,

Thanks for the patch.

Quoting Julien Vuillaumier (2024-12-20 23:55:56)
> Unix-socket based IPC sometimes times out or hangs, typically
> when multiple camera are stopped simulaneously. That specific case
> triggers the concurrent sending by each pipeline handler instance
> of a synchronous stop() message to its peer IPA process.
> 
> There is a dedicated IPC socket per camera. Sockets payload receipt
> signals all run in the camera manager thread.
> To send a synchronous message to IPA, pipeline invokes from camera
> thread IPCPipeUnixSocket::sendSync(). This sends the message then blocks
> busy waiting for the peer acknowledgment. Such busy wait is done by
> blocking on event loop calling dispatchEvent(), until the ack condition
> is detected.
> 
> One issue is that the socket receive slot readyRead() wakes up the
> blocked thread via libcamera::Message receipt. Even though such message
> resumes processEvents(), it may reblock immediately because readyRead()
> does not interrupt() explictly the dispatcher.
> Most of the time, an other pending event for the thread unblocks the
> event dispatcher and the ack condition is detected - in worst case the 2
> sec timeout kicks in. Once unblocked, the dispatcher let the message
> acknowledgment to be detected and the sendSync() completes.
> 
> The other issue is that in case of concurrent synchronous IPC messages
> sent by multiple pipeline handlers, there is a possible recursion
> of sendSync() / processEvents() nested in the camera thread stack. As
> commented in the source, that is a dangerous construct that can lead to
> a hang.
> The reason is that the last synchronous message sent is the deepest in
> the stack. It is also the one whose acknowledgment is being busy waited.
> However other pending synchronous messages may have been initiated before
> and are upper in the stack. If they timeout, the condition is not
> detected because of the stack recursion, as the thread is busy waiting
> for the last message to be acknowledged.

Ok, I think I understand the issue.

> 
> This change implements a safer mechanism to handle the synchronous
> message sending, similar to the one used for non isolated IPA. The
> IPCUnixSocketWrapper class is introduced to handle the IPCUnixSocket
> receive signal in a dedicated thread.
> Doing so, the sending thread, when emiting a synchronous message, can be
> blocked without event dispatcher's processEvents() usage, which avoids
> the risky stack recursion.
> 
> Fixes: 21f1b555b ("libcamera: Add IPCPipe implementation based on unix socket")
> 
> Signed-off-by: Julien Vuillaumier <julien.vuillaumier@nxp.com>
> ---
>  .../libcamera/internal/ipc_pipe_unixsocket.h  |  13 +-
>  src/libcamera/ipc_pipe_unixsocket.cpp         | 242 +++++++++++++-----
>  2 files changed, 178 insertions(+), 77 deletions(-)
> 
> diff --git a/include/libcamera/internal/ipc_pipe_unixsocket.h b/include/libcamera/internal/ipc_pipe_unixsocket.h
> index 8c972613..280639d5 100644
> --- a/include/libcamera/internal/ipc_pipe_unixsocket.h
> +++ b/include/libcamera/internal/ipc_pipe_unixsocket.h
> @@ -16,6 +16,7 @@
>  namespace libcamera {
>  
>  class Process;
> +class IPCUnixSocketWrapper;
>  
>  class IPCPipeUnixSocket : public IPCPipe
>  {
> @@ -29,18 +30,8 @@ public:
>         int sendAsync(const IPCMessage &data) override;
>  
>  private:
> -       struct CallData {
> -               IPCUnixSocket::Payload *response;
> -               bool done;
> -       };
> -
> -       void readyRead();
> -       int call(const IPCUnixSocket::Payload &message,
> -                IPCUnixSocket::Payload *response, uint32_t seq);
> -
>         std::unique_ptr<Process> proc_;
> -       std::unique_ptr<IPCUnixSocket> socket_;
> -       std::map<uint32_t, CallData> callData_;
> +       std::unique_ptr<IPCUnixSocketWrapper> socketWrap_;

afaict you're essentially deleting the whole IPCPipe class and reimplementing
it. What's wrong with changing it directly instead of adding a wrapper?

>  };
>  
>  } /* namespace libcamera */
> diff --git a/src/libcamera/ipc_pipe_unixsocket.cpp b/src/libcamera/ipc_pipe_unixsocket.cpp
> index 668ec73b..eb5408d4 100644
> --- a/src/libcamera/ipc_pipe_unixsocket.cpp
> +++ b/src/libcamera/ipc_pipe_unixsocket.cpp
> @@ -9,10 +9,9 @@
>  
>  #include <vector>
>  
> -#include <libcamera/base/event_dispatcher.h>
>  #include <libcamera/base/log.h>
> +#include <libcamera/base/mutex.h>
>  #include <libcamera/base/thread.h>
> -#include <libcamera/base/timer.h>
>  
>  #include "libcamera/internal/ipc_pipe.h"
>  #include "libcamera/internal/ipc_unixsocket.h"
> @@ -24,67 +23,161 @@ namespace libcamera {
>  
>  LOG_DECLARE_CATEGORY(IPCPipe)
>  
> -IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath,
> -                                    const char *ipaProxyWorkerPath)
> -       : IPCPipe()
> +class IPCUnixSocketWrapper : Thread
>  {
> -       std::vector<int> fds;
> -       std::vector<std::string> args;
> -       args.push_back(ipaModulePath);
> +public:
> +       IPCUnixSocketWrapper(Signal<const IPCMessage &> *recv)
> +               : recv_(recv), ready_(false), sendSyncPending_(false),
> +                 sendSyncCookie_(0)
> +       {
> +               start();
> +       }
>  
> -       socket_ = std::make_unique<IPCUnixSocket>();
> -       UniqueFD fd = socket_->create();
> -       if (!fd.isValid()) {
> -               LOG(IPCPipe, Error) << "Failed to create socket";
> -               return;
> +       ~IPCUnixSocketWrapper()
> +       {
> +               exit();
> +               wait();
>         }
> -       socket_->readyRead.connect(this, &IPCPipeUnixSocket::readyRead);
> -       args.push_back(std::to_string(fd.get()));
> -       fds.push_back(fd.get());
>  
> -       proc_ = std::make_unique<Process>();
> -       int ret = proc_->start(ipaProxyWorkerPath, args, fds);
> -       if (ret) {
> -               LOG(IPCPipe, Error)
> -                       << "Failed to start proxy worker process";
> -               return;
> +       void run() override
> +       {
> +               /*
> +                * IPC socket construction and connection to its readyRead
> +                * signal has to be done from the IPC thread so that the
> +                * relevant Object instances (EventNotifier, slot) are bound to
> +                * its context.
> +                */
> +               init();
> +               exec();
> +               deinit();
>         }
>  
> -       connected_ = true;
> -}
> +       int fd() { return fd_.get(); }
> +       int sendSync(const IPCMessage &in, IPCMessage *out);
> +       int sendAsync(const IPCMessage &data);
> +       bool waitReady();
>  
> -IPCPipeUnixSocket::~IPCPipeUnixSocket()
> -{
> -}
> +private:
> +       void init();
> +       void deinit();
> +       void readyRead();
>  
> -int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out)
> +       UniqueFD fd_;
> +       Signal<const IPCMessage &> *recv_;
> +       ConditionVariable cv_;
> +       Mutex mutex_;

So the mutex protects everything from here...

> +       bool ready_;
> +       bool sendSyncPending_;
> +       uint32_t sendSyncCookie_;
> +       IPCUnixSocket::Payload *sendSyncResponse_;

...to here

> +
> +       /* Socket shall be constructed and destructed from IPC thread context */
> +       std::unique_ptr<IPCUnixSocket> socket_;
> +};
> +
> +int IPCUnixSocketWrapper::sendSync(const IPCMessage &in, IPCMessage *out)
>  {
> +       int ret;
>         IPCUnixSocket::Payload response;
>  
> -       int ret = call(in.payload(), &response, in.header().cookie);
> +       mutex_.lock();
> +       ASSERT(!sendSyncPending_);
> +       sendSyncPending_ = true;
> +       sendSyncCookie_ = in.header().cookie;
> +       sendSyncResponse_ = &response;
> +       mutex_.unlock();
> +
> +       ret = socket_->send(in.payload());
>         if (ret) {
> -               LOG(IPCPipe, Error) << "Failed to call sync";
> -               return ret;
> +               LOG(IPCPipe, Error) << "Failed to send sync message";
> +               goto cleanup;
> +       }
> +
> +       bool complete;
> +       {
> +               MutexLocker locker(mutex_);
> +               auto syncComplete = ([&]() {
> +                       return sendSyncPending_ == false;
> +               });
> +               complete = cv_.wait_for(locker, 1000ms, syncComplete);
> +       }

Ok, I think I like this idea.

I still don't see why it needs to be in a wrapper class, though.

> +
> +       if (!complete) {
> +               LOG(IPCPipe, Error) << "Timeout sending sync message";
> +               ret = -ETIMEDOUT;
> +               goto cleanup;
>         }
>  
>         if (out)
>                 *out = IPCMessage(response);
>  
>         return 0;
> +
> +cleanup:
> +       mutex_.lock();
> +       sendSyncPending_ = false;
> +       mutex_.unlock();
> +
> +       return ret;
>  }
>  
> -int IPCPipeUnixSocket::sendAsync(const IPCMessage &data)
> +int IPCUnixSocketWrapper::sendAsync(const IPCMessage &data)
>  {
> -       int ret = socket_->send(data.payload());
> -       if (ret) {
> -               LOG(IPCPipe, Error) << "Failed to call async";
> -               return ret;
> +       int ret;
> +       ret = socket_->send(data.payload());
> +       if (ret)
> +               LOG(IPCPipe, Error) << "Failed to send sync message";
> +       return ret;
> +}
> +
> +bool IPCUnixSocketWrapper::waitReady()
> +{
> +       bool ready;
> +       {
> +               MutexLocker locker(mutex_);
> +               auto isReady = ([&]() {
> +                       return ready_;
> +               });
> +               ready = cv_.wait_for(locker, 1000ms, isReady);
>         }
>  
> -       return 0;
> +       return ready;
> +}

I don't understand why this function is needed. It only waits to make sure that
init() is done, but afaict nothing in init() is threaded. Especially if there
is no wrapper. Why can't IPCUnixSocket live directly in another thread?

> +
> +void IPCUnixSocketWrapper::init()
> +{
> +       /* Init is to be done from the IPC thread context */
> +       ASSERT(Thread::current() == this);
> +
> +       socket_ = std::make_unique<IPCUnixSocket>();
> +       fd_ = socket_->create();
> +       if (!fd_.isValid()) {
> +               LOG(IPCPipe, Error) << "Failed to create socket";
> +               return;
> +       }
> +
> +       socket_->readyRead.connect(this, &IPCUnixSocketWrapper::readyRead);
> +
> +       mutex_.lock();
> +       ready_ = true;
> +       mutex_.unlock();
> +       cv_.notify_one();
>  }
>  
> -void IPCPipeUnixSocket::readyRead()
> +void IPCUnixSocketWrapper::deinit()
> +{
> +       /* Deinit is to be done from the IPC thread context */
> +       ASSERT(Thread::current() == this);
> +
> +       socket_->readyRead.disconnect(this);
> +       socket_.reset();
> +
> +       mutex_.lock();
> +       ready_ = false;
> +       mutex_.unlock();
> +}
> +
> +void IPCUnixSocketWrapper::readyRead()
>  {
>         IPCUnixSocket::Payload payload;
>         int ret = socket_->receive(&payload);
> @@ -93,55 +186,72 @@ void IPCPipeUnixSocket::readyRead()
>                 return;
>         }
>  
> -       /* \todo Use span to avoid the double copy when callData is found. */

Oh nice, this is taken care of.

>         if (payload.data.size() < sizeof(IPCMessage::Header)) {
>                 LOG(IPCPipe, Error) << "Not enough data received";
>                 return;
>         }
>  
> -       IPCMessage ipcMessage(payload);
> +       const IPCMessage::Header *header =
> +               reinterpret_cast<IPCMessage::Header *>(payload.data.data());
> +       bool syncComplete = false;
> +       mutex_.lock();
> +       if (sendSyncPending_ && sendSyncCookie_ == header->cookie) {
> +               syncComplete = true;
> +               sendSyncPending_ = false;
> +               *sendSyncResponse_ = std::move(payload);
> +       }
> +       mutex_.unlock();

That's a cool idea.

>  
> -       auto callData = callData_.find(ipcMessage.header().cookie);
> -       if (callData != callData_.end()) {
> -               *callData->second.response = std::move(payload);
> -               callData->second.done = true;
> +       if (syncComplete) {
> +               cv_.notify_one();
>                 return;
>         }
>  
>         /* Received unexpected data, this means it's a call from the IPA. */
> -       recv.emit(ipcMessage);
> +       IPCMessage ipcMessage(payload);
> +       recv_->emit(ipcMessage);
>  }
>  
> -int IPCPipeUnixSocket::call(const IPCUnixSocket::Payload &message,
> -                           IPCUnixSocket::Payload *response, uint32_t cookie)
> +IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath,
> +                                    const char *ipaProxyWorkerPath)
> +       : IPCPipe()
>  {
> -       Timer timeout;
> -       int ret;
> +       socketWrap_ = std::make_unique<IPCUnixSocketWrapper>(&recv);
> +       if (!socketWrap_->waitReady()) {
> +               LOG(IPCPipe, Error) << "Failed to create socket";
> +               return;
> +       }
> +       int fd = socketWrap_->fd();
>  
> -       const auto result = callData_.insert({ cookie, { response, false } });
> -       const auto &iter = result.first;
> +       std::vector<int> fds;
> +       std::vector<std::string> args;
> +       args.push_back(ipaModulePath);
> +       args.push_back(std::to_string(fd));
> +       fds.push_back(fd);

I'd replace this with:

	std::array args{ std::string(ipaModulePath), std::to_string(fd.get()) };
	std::array fds{ fd.get() };

(Since this is how the base code was changed since you when wrote this (sorry I
didn't notice this patch earlier (I only noticed because Kieran cc'ed me)))


Thanks,

Paul

>  
> -       ret = socket_->send(message);
> +       proc_ = std::make_unique<Process>();
> +       int ret = proc_->start(ipaProxyWorkerPath, args, fds);
>         if (ret) {
> -               callData_.erase(iter);
> -               return ret;
> +               LOG(IPCPipe, Error)
> +                       << "Failed to start proxy worker process";
> +               return;
>         }
>  
> -       /* \todo Make this less dangerous, see IPCPipe::sendSync() */
> -       timeout.start(2000ms);
> -       while (!iter->second.done) {
> -               if (!timeout.isRunning()) {
> -                       LOG(IPCPipe, Error) << "Call timeout!";
> -                       callData_.erase(iter);
> -                       return -ETIMEDOUT;
> -               }
> +       connected_ = true;
> +}
>  
> -               Thread::current()->eventDispatcher()->processEvents();
> -       }
> +IPCPipeUnixSocket::~IPCPipeUnixSocket()
> +{
> +}
>  
> -       callData_.erase(iter);
> +int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out)
> +{
> +       return socketWrap_->sendSync(in, out);
> +}
>  
> -       return 0;
> +int IPCPipeUnixSocket::sendAsync(const IPCMessage &data)
> +{
> +       return socketWrap_->sendAsync(data);
>  }
>  
>  } /* namespace libcamera */
> -- 
> 2.34.1
>
Julien Vuillaumier Aug. 29, 2025, 9:42 a.m. UTC | #4
Hi Paul,


On 26/08/2025 12:28, Paul Elder wrote:
> 
> Hi Julien,
> 
> Thanks for the patch.
> 
> Quoting Julien Vuillaumier (2024-12-20 23:55:56)
>> Unix-socket based IPC sometimes times out or hangs, typically
>> when multiple camera are stopped simulaneously. That specific case
>> triggers the concurrent sending by each pipeline handler instance
>> of a synchronous stop() message to its peer IPA process.
>>
>> There is a dedicated IPC socket per camera. Sockets payload receipt
>> signals all run in the camera manager thread.
>> To send a synchronous message to IPA, pipeline invokes from camera
>> thread IPCPipeUnixSocket::sendSync(). This sends the message then blocks
>> busy waiting for the peer acknowledgment. Such busy wait is done by
>> blocking on event loop calling dispatchEvent(), until the ack condition
>> is detected.
>>
>> One issue is that the socket receive slot readyRead() wakes up the
>> blocked thread via libcamera::Message receipt. Even though such message
>> resumes processEvents(), it may reblock immediately because readyRead()
>> does not interrupt() explictly the dispatcher.
>> Most of the time, an other pending event for the thread unblocks the
>> event dispatcher and the ack condition is detected - in worst case the 2
>> sec timeout kicks in. Once unblocked, the dispatcher let the message
>> acknowledgment to be detected and the sendSync() completes.
>>
>> The other issue is that in case of concurrent synchronous IPC messages
>> sent by multiple pipeline handlers, there is a possible recursion
>> of sendSync() / processEvents() nested in the camera thread stack. As
>> commented in the source, that is a dangerous construct that can lead to
>> a hang.
>> The reason is that the last synchronous message sent is the deepest in
>> the stack. It is also the one whose acknowledgment is being busy waited.
>> However other pending synchronous messages may have been initiated before
>> and are upper in the stack. If they timeout, the condition is not
>> detected because of the stack recursion, as the thread is busy waiting
>> for the last message to be acknowledged.
> 
> Ok, I think I understand the issue.
 > >>
>> This change implements a safer mechanism to handle the synchronous
>> message sending, similar to the one used for non isolated IPA. The
>> IPCUnixSocketWrapper class is introduced to handle the IPCUnixSocket
>> receive signal in a dedicated thread.
>> Doing so, the sending thread, when emiting a synchronous message, can be
>> blocked without event dispatcher's processEvents() usage, which avoids
>> the risky stack recursion.
>>
>> Fixes: 21f1b555b ("libcamera: Add IPCPipe implementation based on unix socket")
>>
>> Signed-off-by: Julien Vuillaumier <julien.vuillaumier@nxp.com>
>> ---
>>   .../libcamera/internal/ipc_pipe_unixsocket.h  |  13 +-
>>   src/libcamera/ipc_pipe_unixsocket.cpp         | 242 +++++++++++++-----
>>   2 files changed, 178 insertions(+), 77 deletions(-)
>>
>> diff --git a/include/libcamera/internal/ipc_pipe_unixsocket.h b/include/libcamera/internal/ipc_pipe_unixsocket.h
>> index 8c972613..280639d5 100644
>> --- a/include/libcamera/internal/ipc_pipe_unixsocket.h
>> +++ b/include/libcamera/internal/ipc_pipe_unixsocket.h
>> @@ -16,6 +16,7 @@
>>   namespace libcamera {
>>
>>   class Process;
>> +class IPCUnixSocketWrapper;
>>
>>   class IPCPipeUnixSocket : public IPCPipe
>>   {
>> @@ -29,18 +30,8 @@ public:
>>          int sendAsync(const IPCMessage &data) override;
>>
>>   private:
>> -       struct CallData {
>> -               IPCUnixSocket::Payload *response;
>> -               bool done;
>> -       };
>> -
>> -       void readyRead();
>> -       int call(const IPCUnixSocket::Payload &message,
>> -                IPCUnixSocket::Payload *response, uint32_t seq);
>> -
>>          std::unique_ptr<Process> proc_;
>> -       std::unique_ptr<IPCUnixSocket> socket_;
>> -       std::map<uint32_t, CallData> callData_;
>> +       std::unique_ptr<IPCUnixSocketWrapper> socketWrap_;
> 
> afaict you're essentially deleting the whole IPCPipe class and reimplementing
> it. What's wrong with changing it directly instead of adding a wrapper?
> 

The IPCPipeUnixSocket class is used by libcamera to create the proxy 
worker process and the IPC to communicate with the proxy. The IPC part 
relies on the IPCUnixSocket class (asynchronous write, no threading) - 
and the same IPCUnixSocket class is also used by the proxy worker in the 
auto generated code.

The intent of the IPCUnixSocketWrapper was to provide an higher level 
interface for the IPCUnixSocket class, more specific to the 
IPCPipeUnixSocket class usage: threading for the receipt path, 
sync/async transmit, IPCMessage to IPCUnixSocket::Payload conversions...

As you mentioned, IPCPipeUnixSocket becomes fairly slim.
Now I can look at the option to move the IPCUnixSocketWrapper class 
content into the  IPCPipeUnixSocket class, if that is a preferred approach.


>>   };
>>
>>   } /* namespace libcamera */
>> diff --git a/src/libcamera/ipc_pipe_unixsocket.cpp b/src/libcamera/ipc_pipe_unixsocket.cpp
>> index 668ec73b..eb5408d4 100644
>> --- a/src/libcamera/ipc_pipe_unixsocket.cpp
>> +++ b/src/libcamera/ipc_pipe_unixsocket.cpp
>> @@ -9,10 +9,9 @@
>>
>>   #include <vector>
>>
>> -#include <libcamera/base/event_dispatcher.h>
>>   #include <libcamera/base/log.h>
>> +#include <libcamera/base/mutex.h>
>>   #include <libcamera/base/thread.h>
>> -#include <libcamera/base/timer.h>
>>
>>   #include "libcamera/internal/ipc_pipe.h"
>>   #include "libcamera/internal/ipc_unixsocket.h"
>> @@ -24,67 +23,161 @@ namespace libcamera {
>>
>>   LOG_DECLARE_CATEGORY(IPCPipe)
>>
>> -IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath,
>> -                                    const char *ipaProxyWorkerPath)
>> -       : IPCPipe()
>> +class IPCUnixSocketWrapper : Thread
>>   {
>> -       std::vector<int> fds;
>> -       std::vector<std::string> args;
>> -       args.push_back(ipaModulePath);
>> +public:
>> +       IPCUnixSocketWrapper(Signal<const IPCMessage &> *recv)
>> +               : recv_(recv), ready_(false), sendSyncPending_(false),
>> +                 sendSyncCookie_(0)
>> +       {
>> +               start();
>> +       }
>>
>> -       socket_ = std::make_unique<IPCUnixSocket>();
>> -       UniqueFD fd = socket_->create();
>> -       if (!fd.isValid()) {
>> -               LOG(IPCPipe, Error) << "Failed to create socket";
>> -               return;
>> +       ~IPCUnixSocketWrapper()
>> +       {
>> +               exit();
>> +               wait();
>>          }
>> -       socket_->readyRead.connect(this, &IPCPipeUnixSocket::readyRead);
>> -       args.push_back(std::to_string(fd.get()));
>> -       fds.push_back(fd.get());
>>
>> -       proc_ = std::make_unique<Process>();
>> -       int ret = proc_->start(ipaProxyWorkerPath, args, fds);
>> -       if (ret) {
>> -               LOG(IPCPipe, Error)
>> -                       << "Failed to start proxy worker process";
>> -               return;
>> +       void run() override
>> +       {
>> +               /*
>> +                * IPC socket construction and connection to its readyRead
>> +                * signal has to be done from the IPC thread so that the
>> +                * relevant Object instances (EventNotifier, slot) are bound to
>> +                * its context.
>> +                */
>> +               init();
>> +               exec();
>> +               deinit();
>>          }
>>
>> -       connected_ = true;
>> -}
>> +       int fd() { return fd_.get(); }
>> +       int sendSync(const IPCMessage &in, IPCMessage *out);
>> +       int sendAsync(const IPCMessage &data);
>> +       bool waitReady();
>>
>> -IPCPipeUnixSocket::~IPCPipeUnixSocket()
>> -{
>> -}
>> +private:
>> +       void init();
>> +       void deinit();
>> +       void readyRead();
>>
>> -int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out)
>> +       UniqueFD fd_;
>> +       Signal<const IPCMessage &> *recv_;
>> +       ConditionVariable cv_;
>> +       Mutex mutex_;
> 
> So the mutex protects everything from here...
> 
>> +       bool ready_;
>> +       bool sendSyncPending_;
>> +       uint32_t sendSyncCookie_;
>> +       IPCUnixSocket::Payload *sendSyncResponse_;
> 
> ...to here

I am not sure about this comment. But the git patch output may be a bit 
misleading here: that line is only the `Mutex mutex_` variable 
declaration, as a member of the IPCUnixSocketWrapper class.
The variables that `mutex_` actually protects depend on the functions 
implementation where `mutex_` is used through lock()/unlock() sections 
and MutexLocker-protected scopes.

> 
>> +
>> +       /* Socket shall be constructed and destructed from IPC thread context */
>> +       std::unique_ptr<IPCUnixSocket> socket_;
>> +};
>> +
>> +int IPCUnixSocketWrapper::sendSync(const IPCMessage &in, IPCMessage *out)
>>   {
>> +       int ret;
>>          IPCUnixSocket::Payload response;
>>
>> -       int ret = call(in.payload(), &response, in.header().cookie);
>> +       mutex_.lock();
>> +       ASSERT(!sendSyncPending_);
>> +       sendSyncPending_ = true;
>> +       sendSyncCookie_ = in.header().cookie;
>> +       sendSyncResponse_ = &response;
>> +       mutex_.unlock();
>> +
>> +       ret = socket_->send(in.payload());
>>          if (ret) {
>> -               LOG(IPCPipe, Error) << "Failed to call sync";
>> -               return ret;
>> +               LOG(IPCPipe, Error) << "Failed to send sync message";
>> +               goto cleanup;
>> +       }
>> +
>> +       bool complete;
>> +       {
>> +               MutexLocker locker(mutex_);
>> +               auto syncComplete = ([&]() {
>> +                       return sendSyncPending_ == false;
>> +               });
>> +               complete = cv_.wait_for(locker, 1000ms, syncComplete);
>> +       }
> 
> Ok, I think I like this idea.

Thanks :)

> 
> I still don't see why it needs to be in a wrapper class, though.

As mentioned above, I can look at the option to remove the wrapper class 
to keep only the IPCUnixSocketWrapper if desired.

> 
>> +
>> +       if (!complete) {
>> +               LOG(IPCPipe, Error) << "Timeout sending sync message";
>> +               ret = -ETIMEDOUT;
>> +               goto cleanup;
>>          }
>>
>>          if (out)
>>                  *out = IPCMessage(response);
>>
>>          return 0;
>> +
>> +cleanup:
>> +       mutex_.lock();
>> +       sendSyncPending_ = false;
>> +       mutex_.unlock();
>> +
>> +       return ret;
>>   }
>>
>> -int IPCPipeUnixSocket::sendAsync(const IPCMessage &data)
>> +int IPCUnixSocketWrapper::sendAsync(const IPCMessage &data)
>>   {
>> -       int ret = socket_->send(data.payload());
>> -       if (ret) {
>> -               LOG(IPCPipe, Error) << "Failed to call async";
>> -               return ret;
>> +       int ret;
>> +       ret = socket_->send(data.payload());
>> +       if (ret)
>> +               LOG(IPCPipe, Error) << "Failed to send sync message";
>> +       return ret;
>> +}
>> +
>> +bool IPCUnixSocketWrapper::waitReady()
>> +{
>> +       bool ready;
>> +       {
>> +               MutexLocker locker(mutex_);
>> +               auto isReady = ([&]() {
>> +                       return ready_;
>> +               });
>> +               ready = cv_.wait_for(locker, 1000ms, isReady);
>>          }
>>
>> -       return 0;
>> +       return ready;
>> +}
> 
> I don't understand why this function is needed. It only waits to make sure that
> init() is done, but afaict nothing in init() is threaded. Especially if there
> is no wrapper. Why can't IPCUnixSocket live directly in another thread?

The init() function is executed from the wrapper thread - see the 
IPCUnixSocketWrapper::run() function that calls in sequence: init(), 
exec() then deinit().
The Object bound to the wrapper thread (EventNotifier, slots...) have to 
be instantiated and later destroyed from the wrapper thread itself. 
Thus, they can not be created by the wrapper constructor that is 
executed in the camera manager thread ; their creation has to be 
deferred to the wrapper thread execution. The waitReady() checks that 
this thread is up and running which indicates that all objects needed 
for the class operation have been created.

If there is no more wrapper because the threading implementation is 
moved to the IPCPipeUnixSocket class, the Object instances 
(EventNotifier, slots...) associated to the new thread will still have 
to be created from there as well.
Similar synchronization mechanism will be necessary in the 
IPCPipeUnixSocket constructor to wait for that other thread to be active 
and its associated Objects created.

> 
>> +
>> +void IPCUnixSocketWrapper::init()
>> +{
>> +       /* Init is to be done from the IPC thread context */
>> +       ASSERT(Thread::current() == this);
>> +
>> +       socket_ = std::make_unique<IPCUnixSocket>();
>> +       fd_ = socket_->create();
>> +       if (!fd_.isValid()) {
>> +               LOG(IPCPipe, Error) << "Failed to create socket";
>> +               return;
>> +       }
>> +
>> +       socket_->readyRead.connect(this, &IPCUnixSocketWrapper::readyRead);
>> +
>> +       mutex_.lock();
>> +       ready_ = true;
>> +       mutex_.unlock();
>> +       cv_.notify_one();
>>   }
>>
>> -void IPCPipeUnixSocket::readyRead()
>> +void IPCUnixSocketWrapper::deinit()
>> +{
>> +       /* Deinit is to be done from the IPC thread context */
>> +       ASSERT(Thread::current() == this);
>> +
>> +       socket_->readyRead.disconnect(this);
>> +       socket_.reset();
>> +
>> +       mutex_.lock();
>> +       ready_ = false;
>> +       mutex_.unlock();
>> +}
>> +
>> +void IPCUnixSocketWrapper::readyRead()
>>   {
>>          IPCUnixSocket::Payload payload;
>>          int ret = socket_->receive(&payload);
>> @@ -93,55 +186,72 @@ void IPCPipeUnixSocket::readyRead()
>>                  return;
>>          }
>>
>> -       /* \todo Use span to avoid the double copy when callData is found. */
> 
> Oh nice, this is taken care of.
> 
>>          if (payload.data.size() < sizeof(IPCMessage::Header)) {
>>                  LOG(IPCPipe, Error) << "Not enough data received";
>>                  return;
>>          }
>>
>> -       IPCMessage ipcMessage(payload);
>> +       const IPCMessage::Header *header =
>> +               reinterpret_cast<IPCMessage::Header *>(payload.data.data());
>> +       bool syncComplete = false;
>> +       mutex_.lock();
>> +       if (sendSyncPending_ && sendSyncCookie_ == header->cookie) {
>> +               syncComplete = true;
>> +               sendSyncPending_ = false;
>> +               *sendSyncResponse_ = std::move(payload);
>> +       }
>> +       mutex_.unlock();
> 
> That's a cool idea.
> 
>>
>> -       auto callData = callData_.find(ipcMessage.header().cookie);
>> -       if (callData != callData_.end()) {
>> -               *callData->second.response = std::move(payload);
>> -               callData->second.done = true;
>> +       if (syncComplete) {
>> +               cv_.notify_one();
>>                  return;
>>          }
>>
>>          /* Received unexpected data, this means it's a call from the IPA. */
>> -       recv.emit(ipcMessage);
>> +       IPCMessage ipcMessage(payload);
>> +       recv_->emit(ipcMessage);
>>   }
>>
>> -int IPCPipeUnixSocket::call(const IPCUnixSocket::Payload &message,
>> -                           IPCUnixSocket::Payload *response, uint32_t cookie)
>> +IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath,
>> +                                    const char *ipaProxyWorkerPath)
>> +       : IPCPipe()
>>   {
>> -       Timer timeout;
>> -       int ret;
>> +       socketWrap_ = std::make_unique<IPCUnixSocketWrapper>(&recv);
>> +       if (!socketWrap_->waitReady()) {
>> +               LOG(IPCPipe, Error) << "Failed to create socket";
>> +               return;
>> +       }
>> +       int fd = socketWrap_->fd();
>>
>> -       const auto result = callData_.insert({ cookie, { response, false } });
>> -       const auto &iter = result.first;
>> +       std::vector<int> fds;
>> +       std::vector<std::string> args;
>> +       args.push_back(ipaModulePath);
>> +       args.push_back(std::to_string(fd));
>> +       fds.push_back(fd);
> 
> I'd replace this with:
> 
>          std::array args{ std::string(ipaModulePath), std::to_string(fd.get()) };
>          std::array fds{ fd.get() };
> 
> (Since this is how the base code was changed since you when wrote this (sorry I
> didn't notice this patch earlier (I only noticed because Kieran cc'ed me)))

Sure, I noticed that part had changed in libcamera v0.5.2.
I will update in the V2.

Thanks,
Julien


> 
> Thanks,
> 
> Paul
> 
>>
>> -       ret = socket_->send(message);
>> +       proc_ = std::make_unique<Process>();
>> +       int ret = proc_->start(ipaProxyWorkerPath, args, fds);
>>          if (ret) {
>> -               callData_.erase(iter);
>> -               return ret;
>> +               LOG(IPCPipe, Error)
>> +                       << "Failed to start proxy worker process";
>> +               return;
>>          }
>>
>> -       /* \todo Make this less dangerous, see IPCPipe::sendSync() */
>> -       timeout.start(2000ms);
>> -       while (!iter->second.done) {
>> -               if (!timeout.isRunning()) {
>> -                       LOG(IPCPipe, Error) << "Call timeout!";
>> -                       callData_.erase(iter);
>> -                       return -ETIMEDOUT;
>> -               }
>> +       connected_ = true;
>> +}
>>
>> -               Thread::current()->eventDispatcher()->processEvents();
>> -       }
>> +IPCPipeUnixSocket::~IPCPipeUnixSocket()
>> +{
>> +}
>>
>> -       callData_.erase(iter);
>> +int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out)
>> +{
>> +       return socketWrap_->sendSync(in, out);
>> +}
>>
>> -       return 0;
>> +int IPCPipeUnixSocket::sendAsync(const IPCMessage &data)
>> +{
>> +       return socketWrap_->sendAsync(data);
>>   }
>>
>>   } /* namespace libcamera */
>> --
>> 2.34.1
>>
Barnabás Pőcze Aug. 29, 2025, 4:02 p.m. UTC | #5
Hi


2024. 12. 20. 15:55 keltezéssel, Julien Vuillaumier írta:
> Unix-socket based IPC sometimes times out or hangs, typically
> when multiple camera are stopped simulaneously. That specific case
> triggers the concurrent sending by each pipeline handler instance
> of a synchronous stop() message to its peer IPA process.
> 
> There is a dedicated IPC socket per camera. Sockets payload receipt
> signals all run in the camera manager thread.
> To send a synchronous message to IPA, pipeline invokes from camera
> thread IPCPipeUnixSocket::sendSync(). This sends the message then blocks
> busy waiting for the peer acknowledgment. Such busy wait is done by
> blocking on event loop calling dispatchEvent(), until the ack condition
> is detected.
> 
> One issue is that the socket receive slot readyRead() wakes up the
> blocked thread via libcamera::Message receipt. Even though such message
> resumes processEvents(), it may reblock immediately because readyRead()
> does not interrupt() explictly the dispatcher.
> Most of the time, an other pending event for the thread unblocks the
> event dispatcher and the ack condition is detected - in worst case the 2
> sec timeout kicks in. Once unblocked, the dispatcher let the message
> acknowledgment to be detected and the sendSync() completes.

As far as I can see

   * EventDispatcherPoll::processEvents() is blocked in ppoll()
   * socket receives data, signals POLLIN, ppoll() returns
   * EventDispatcherPoll::processNotifiers()
       -> EventNotifier::activated emitted
       -> IPCUnixSocket::dataNotifier() runs
       -> IPCUnixSocket::readyRead emitted
       -> IPCPipeUnixSocket::readyRead() runs
   * IPCPipeUnixSocket::readyRead() reads the message and modifes `callData_` accordingly
   * EventDispatcherPoll::processEvents() returns
   * IPCPipeUnixSocket::call() sees the update in `callData_` and stops

And as far as I can tell every signal emission happens synchronously.
So what am I missing? Where does it get blocked?


> 
> The other issue is that in case of concurrent synchronous IPC messages
> sent by multiple pipeline handlers, there is a possible recursion
> of sendSync() / processEvents() nested in the camera thread stack. As
> commented in the source, that is a dangerous construct that can lead to
> a hang.
> The reason is that the last synchronous message sent is the deepest in
> the stack. It is also the one whose acknowledgment is being busy waited.
> However other pending synchronous messages may have been initiated before
> and are upper in the stack. If they timeout, the condition is not
> detected because of the stack recursion, as the thread is busy waiting
> for the last message to be acknowledged.

Without a large-scale refactor, I see two choices:

   (1) dispatch the event loop recursively;
   (2) block the event loop;

the proposed solution here appears to be (2). But then what is not clear
to me is why you're not just running a poll() in a loop on that single
socket? Why do we need an extra thread?


Regards,
Barnabás Pőcze

> 
> This change implements a safer mechanism to handle the synchronous
> message sending, similar to the one used for non isolated IPA. The
> IPCUnixSocketWrapper class is introduced to handle the IPCUnixSocket
> receive signal in a dedicated thread.
> Doing so, the sending thread, when emiting a synchronous message, can be
> blocked without event dispatcher's processEvents() usage, which avoids
> the risky stack recursion.
> 
> Fixes: 21f1b555b ("libcamera: Add IPCPipe implementation based on unix socket")
> 
> Signed-off-by: Julien Vuillaumier <julien.vuillaumier@nxp.com>
> ---
>   .../libcamera/internal/ipc_pipe_unixsocket.h  |  13 +-
>   src/libcamera/ipc_pipe_unixsocket.cpp         | 242 +++++++++++++-----
>   2 files changed, 178 insertions(+), 77 deletions(-)
> 
> diff --git a/include/libcamera/internal/ipc_pipe_unixsocket.h b/include/libcamera/internal/ipc_pipe_unixsocket.h
> index 8c972613..280639d5 100644
> --- a/include/libcamera/internal/ipc_pipe_unixsocket.h
> +++ b/include/libcamera/internal/ipc_pipe_unixsocket.h
> @@ -16,6 +16,7 @@
>   namespace libcamera {
>   
>   class Process;
> +class IPCUnixSocketWrapper;
>   
>   class IPCPipeUnixSocket : public IPCPipe
>   {
> @@ -29,18 +30,8 @@ public:
>   	int sendAsync(const IPCMessage &data) override;
>   
>   private:
> -	struct CallData {
> -		IPCUnixSocket::Payload *response;
> -		bool done;
> -	};
> -
> -	void readyRead();
> -	int call(const IPCUnixSocket::Payload &message,
> -		 IPCUnixSocket::Payload *response, uint32_t seq);
> -
>   	std::unique_ptr<Process> proc_;
> -	std::unique_ptr<IPCUnixSocket> socket_;
> -	std::map<uint32_t, CallData> callData_;
> +	std::unique_ptr<IPCUnixSocketWrapper> socketWrap_;
>   };
>   
>   } /* namespace libcamera */
> diff --git a/src/libcamera/ipc_pipe_unixsocket.cpp b/src/libcamera/ipc_pipe_unixsocket.cpp
> index 668ec73b..eb5408d4 100644
> --- a/src/libcamera/ipc_pipe_unixsocket.cpp
> +++ b/src/libcamera/ipc_pipe_unixsocket.cpp
> @@ -9,10 +9,9 @@
>   
>   #include <vector>
>   
> -#include <libcamera/base/event_dispatcher.h>
>   #include <libcamera/base/log.h>
> +#include <libcamera/base/mutex.h>
>   #include <libcamera/base/thread.h>
> -#include <libcamera/base/timer.h>
>   
>   #include "libcamera/internal/ipc_pipe.h"
>   #include "libcamera/internal/ipc_unixsocket.h"
> @@ -24,67 +23,161 @@ namespace libcamera {
>   
>   LOG_DECLARE_CATEGORY(IPCPipe)
>   
> -IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath,
> -				     const char *ipaProxyWorkerPath)
> -	: IPCPipe()
> +class IPCUnixSocketWrapper : Thread
>   {
> -	std::vector<int> fds;
> -	std::vector<std::string> args;
> -	args.push_back(ipaModulePath);
> +public:
> +	IPCUnixSocketWrapper(Signal<const IPCMessage &> *recv)
> +		: recv_(recv), ready_(false), sendSyncPending_(false),
> +		  sendSyncCookie_(0)
> +	{
> +		start();
> +	}
>   
> -	socket_ = std::make_unique<IPCUnixSocket>();
> -	UniqueFD fd = socket_->create();
> -	if (!fd.isValid()) {
> -		LOG(IPCPipe, Error) << "Failed to create socket";
> -		return;
> +	~IPCUnixSocketWrapper()
> +	{
> +		exit();
> +		wait();
>   	}
> -	socket_->readyRead.connect(this, &IPCPipeUnixSocket::readyRead);
> -	args.push_back(std::to_string(fd.get()));
> -	fds.push_back(fd.get());
>   
> -	proc_ = std::make_unique<Process>();
> -	int ret = proc_->start(ipaProxyWorkerPath, args, fds);
> -	if (ret) {
> -		LOG(IPCPipe, Error)
> -			<< "Failed to start proxy worker process";
> -		return;
> +	void run() override
> +	{
> +		/*
> +		 * IPC socket construction and connection to its readyRead
> +		 * signal has to be done from the IPC thread so that the
> +		 * relevant Object instances (EventNotifier, slot) are bound to
> +		 * its context.
> +		 */
> +		init();
> +		exec();
> +		deinit();
>   	}
>   
> -	connected_ = true;
> -}
> +	int fd() { return fd_.get(); }
> +	int sendSync(const IPCMessage &in, IPCMessage *out);
> +	int sendAsync(const IPCMessage &data);
> +	bool waitReady();
>   
> -IPCPipeUnixSocket::~IPCPipeUnixSocket()
> -{
> -}
> +private:
> +	void init();
> +	void deinit();
> +	void readyRead();
>   
> -int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out)
> +	UniqueFD fd_;
> +	Signal<const IPCMessage &> *recv_;
> +	ConditionVariable cv_;
> +	Mutex mutex_;
> +	bool ready_;
> +	bool sendSyncPending_;
> +	uint32_t sendSyncCookie_;
> +	IPCUnixSocket::Payload *sendSyncResponse_;
> +
> +	/* Socket shall be constructed and destructed from IPC thread context */
> +	std::unique_ptr<IPCUnixSocket> socket_;
> +};
> +
> +int IPCUnixSocketWrapper::sendSync(const IPCMessage &in, IPCMessage *out)
>   {
> +	int ret;
>   	IPCUnixSocket::Payload response;
>   
> -	int ret = call(in.payload(), &response, in.header().cookie);
> +	mutex_.lock();
> +	ASSERT(!sendSyncPending_);
> +	sendSyncPending_ = true;
> +	sendSyncCookie_ = in.header().cookie;
> +	sendSyncResponse_ = &response;
> +	mutex_.unlock();
> +
> +	ret = socket_->send(in.payload());
>   	if (ret) {
> -		LOG(IPCPipe, Error) << "Failed to call sync";
> -		return ret;
> +		LOG(IPCPipe, Error) << "Failed to send sync message";
> +		goto cleanup;
> +	}
> +
> +	bool complete;
> +	{
> +		MutexLocker locker(mutex_);
> +		auto syncComplete = ([&]() {
> +			return sendSyncPending_ == false;
> +		});
> +		complete = cv_.wait_for(locker, 1000ms, syncComplete);
> +	}
> +
> +	if (!complete) {
> +		LOG(IPCPipe, Error) << "Timeout sending sync message";
> +		ret = -ETIMEDOUT;
> +		goto cleanup;
>   	}
>   
>   	if (out)
>   		*out = IPCMessage(response);
>   
>   	return 0;
> +
> +cleanup:
> +	mutex_.lock();
> +	sendSyncPending_ = false;
> +	mutex_.unlock();
> +
> +	return ret;
>   }
>   
> -int IPCPipeUnixSocket::sendAsync(const IPCMessage &data)
> +int IPCUnixSocketWrapper::sendAsync(const IPCMessage &data)
>   {
> -	int ret = socket_->send(data.payload());
> -	if (ret) {
> -		LOG(IPCPipe, Error) << "Failed to call async";
> -		return ret;
> +	int ret;
> +	ret = socket_->send(data.payload());
> +	if (ret)
> +		LOG(IPCPipe, Error) << "Failed to send sync message";
> +	return ret;
> +}
> +
> +bool IPCUnixSocketWrapper::waitReady()
> +{
> +	bool ready;
> +	{
> +		MutexLocker locker(mutex_);
> +		auto isReady = ([&]() {
> +			return ready_;
> +		});
> +		ready = cv_.wait_for(locker, 1000ms, isReady);
>   	}
>   
> -	return 0;
> +	return ready;
> +}
> +
> +void IPCUnixSocketWrapper::init()
> +{
> +	/* Init is to be done from the IPC thread context */
> +	ASSERT(Thread::current() == this);
> +
> +	socket_ = std::make_unique<IPCUnixSocket>();
> +	fd_ = socket_->create();
> +	if (!fd_.isValid()) {
> +		LOG(IPCPipe, Error) << "Failed to create socket";
> +		return;
> +	}
> +
> +	socket_->readyRead.connect(this, &IPCUnixSocketWrapper::readyRead);
> +
> +	mutex_.lock();
> +	ready_ = true;
> +	mutex_.unlock();
> +	cv_.notify_one();
>   }
>   
> -void IPCPipeUnixSocket::readyRead()
> +void IPCUnixSocketWrapper::deinit()
> +{
> +	/* Deinit is to be done from the IPC thread context */
> +	ASSERT(Thread::current() == this);
> +
> +	socket_->readyRead.disconnect(this);
> +	socket_.reset();
> +
> +	mutex_.lock();
> +	ready_ = false;
> +	mutex_.unlock();
> +}
> +
> +void IPCUnixSocketWrapper::readyRead()
>   {
>   	IPCUnixSocket::Payload payload;
>   	int ret = socket_->receive(&payload);
> @@ -93,55 +186,72 @@ void IPCPipeUnixSocket::readyRead()
>   		return;
>   	}
>   
> -	/* \todo Use span to avoid the double copy when callData is found. */
>   	if (payload.data.size() < sizeof(IPCMessage::Header)) {
>   		LOG(IPCPipe, Error) << "Not enough data received";
>   		return;
>   	}
>   
> -	IPCMessage ipcMessage(payload);
> +	const IPCMessage::Header *header =
> +		reinterpret_cast<IPCMessage::Header *>(payload.data.data());
> +	bool syncComplete = false;
> +	mutex_.lock();
> +	if (sendSyncPending_ && sendSyncCookie_ == header->cookie) {
> +		syncComplete = true;
> +		sendSyncPending_ = false;
> +		*sendSyncResponse_ = std::move(payload);
> +	}
> +	mutex_.unlock();
>   
> -	auto callData = callData_.find(ipcMessage.header().cookie);
> -	if (callData != callData_.end()) {
> -		*callData->second.response = std::move(payload);
> -		callData->second.done = true;
> +	if (syncComplete) {
> +		cv_.notify_one();
>   		return;
>   	}
>   
>   	/* Received unexpected data, this means it's a call from the IPA. */
> -	recv.emit(ipcMessage);
> +	IPCMessage ipcMessage(payload);
> +	recv_->emit(ipcMessage);
>   }
>   
> -int IPCPipeUnixSocket::call(const IPCUnixSocket::Payload &message,
> -			    IPCUnixSocket::Payload *response, uint32_t cookie)
> +IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath,
> +				     const char *ipaProxyWorkerPath)
> +	: IPCPipe()
>   {
> -	Timer timeout;
> -	int ret;
> +	socketWrap_ = std::make_unique<IPCUnixSocketWrapper>(&recv);
> +	if (!socketWrap_->waitReady()) {
> +		LOG(IPCPipe, Error) << "Failed to create socket";
> +		return;
> +	}
> +	int fd = socketWrap_->fd();
>   
> -	const auto result = callData_.insert({ cookie, { response, false } });
> -	const auto &iter = result.first;
> +	std::vector<int> fds;
> +	std::vector<std::string> args;
> +	args.push_back(ipaModulePath);
> +	args.push_back(std::to_string(fd));
> +	fds.push_back(fd);
>   
> -	ret = socket_->send(message);
> +	proc_ = std::make_unique<Process>();
> +	int ret = proc_->start(ipaProxyWorkerPath, args, fds);
>   	if (ret) {
> -		callData_.erase(iter);
> -		return ret;
> +		LOG(IPCPipe, Error)
> +			<< "Failed to start proxy worker process";
> +		return;
>   	}
>   
> -	/* \todo Make this less dangerous, see IPCPipe::sendSync() */
> -	timeout.start(2000ms);
> -	while (!iter->second.done) {
> -		if (!timeout.isRunning()) {
> -			LOG(IPCPipe, Error) << "Call timeout!";
> -			callData_.erase(iter);
> -			return -ETIMEDOUT;
> -		}
> +	connected_ = true;
> +}
>   
> -		Thread::current()->eventDispatcher()->processEvents();
> -	}
> +IPCPipeUnixSocket::~IPCPipeUnixSocket()
> +{
> +}
>   
> -	callData_.erase(iter);
> +int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out)
> +{
> +	return socketWrap_->sendSync(in, out);
> +}
>   
> -	return 0;
> +int IPCPipeUnixSocket::sendAsync(const IPCMessage &data)
> +{
> +	return socketWrap_->sendAsync(data);
>   }
>   
>   } /* namespace libcamera */
Julien Vuillaumier Sept. 1, 2025, 5:12 p.m. UTC | #6
Hi Barnabás,

On 29/08/2025 18:02, Barnabás Pőcze wrote:
> 
> Hi
> 
> 
> 2024. 12. 20. 15:55 keltezéssel, Julien Vuillaumier írta:
>> Unix-socket based IPC sometimes times out or hangs, typically
>> when multiple camera are stopped simulaneously. That specific case
>> triggers the concurrent sending by each pipeline handler instance
>> of a synchronous stop() message to its peer IPA process.
>>
>> There is a dedicated IPC socket per camera. Sockets payload receipt
>> signals all run in the camera manager thread.
>> To send a synchronous message to IPA, pipeline invokes from camera
>> thread IPCPipeUnixSocket::sendSync(). This sends the message then blocks
>> busy waiting for the peer acknowledgment. Such busy wait is done by
>> blocking on event loop calling dispatchEvent(), until the ack condition
>> is detected.
>>
>> One issue is that the socket receive slot readyRead() wakes up the
>> blocked thread via libcamera::Message receipt. Even though such message
>> resumes processEvents(), it may reblock immediately because readyRead()
>> does not interrupt() explictly the dispatcher.
>> Most of the time, an other pending event for the thread unblocks the
>> event dispatcher and the ack condition is detected - in worst case the 2
>> sec timeout kicks in. Once unblocked, the dispatcher let the message
>> acknowledgment to be detected and the sendSync() completes.
> 
> As far as I can see
> 
>    * EventDispatcherPoll::processEvents() is blocked in ppoll()
>    * socket receives data, signals POLLIN, ppoll() returns
>    * EventDispatcherPoll::processNotifiers()
>        -> EventNotifier::activated emitted
>        -> IPCUnixSocket::dataNotifier() runs
>        -> IPCUnixSocket::readyRead emitted
>        -> IPCPipeUnixSocket::readyRead() runs
>    * IPCPipeUnixSocket::readyRead() reads the message and modifes 
> `callData_` accordingly
>    * EventDispatcherPoll::processEvents() returns
>    * IPCPipeUnixSocket::call() sees the update in `callData_` and stops
> 
> And as far as I can tell every signal emission happens synchronously.
> So what am I missing? Where does it get blocked?

That is the nominal behavior - no problem have been seen with a single 
camera operation. Locks are experienced when a single libcamera instance 
(process) controls multiple camera. In that case all the cameras are 
operated from the same camera manager thread. If the user wants for 
instance to stop all camera at the same time, a synchronous stop IPC 
message for each camera will be sent to its associated IPA. First IPC 
message will busy-wait waiting for the sync message acknowledgement to 
be received on the socket. While looping on camera0 ack receipt, the 
message for the second camera will be sent from the same thread and also 
busy-wait for its ack.

We end up with that kind of thread backtrace - pipeline handler 
(PipelineHandlerNxpNeo) mentioned here is not upstream yet... but that 
gives the idea of the general case.

poll()
libcamera::EventDispatcherPoll::processEvents()
libcamera::IPCPipeUnixSocket::call()
libcamera::IPCPipeUnixSocket::sendSync()
libcamera::ipa::nxpneo::IPAProxyNxpNeo::stopIPC
libcamera::ipa::nxpneo::IPAProxyNxpNeo::stop()
libcamera::NxpNeoCameraData::stop()
libcamera::PipelineHandlerNxpNeo::stop()
libcamera::InvokeMessage::invoke()
libcamera::Object::message()
libcamera::Thread::dispatchMessages
### stop request for camera 1, while blocked on camera 0 IPC stop
libcamera::EventDispatcherPoll::processEvents()
libcamera::IPCPipeUnixSocket::call()
libcamera::IPCPipeUnixSocket::sendSync()
libcamera::ipa::nxpneo::IPAProxyNxpNeo::stopIPC()
libcamera::ipa::nxpneo::IPAProxyNxpNeo:stop()
libcamera::NxpNeoCameraData::stop()
libcamera::PipelineHandlerNxpNeo::stop()
libcamera::InvokeMessage::invoke()
libcamera::Object::message()
libcamera::Thread::dispatchMessages
### stop request for camera 0
libcamera::EventDispatcherPoll::processEvents()
libcamera::Thread::exec()
libcamera::CameraManager::Private::run()
libcamera::Thread::startThread()

Issue is seen with 4 cameras - in that case up to 4 levels of nesting of 
IPC sync sending may be nested in the camera thread stack.

If any of the IPC socket receives its sync message ack, the camera 
thread will be scheduled on top of above stack and execute the sequence:

... callData is updated
IPCPipeUnixSocket::readyRead()
EventDispatcherPoll::processNotifiers()
IPCUnixSocket::dataNotifier()

IIRC a couple of things are fragile:
- In IPCPipeUnixSocket::readyRead()(), callData is updated and next time 
thread will resume the IPC sync ack may be detected from call() 
busy-wait loop. But that still requires the thread to be resumed by some 
kind of event, that may not occcur when pipeline is stopping - maybe an 
EventDispatcher::Interrupt() would help here
- If the ack comes for camera 0 on its socket, it will not be able to 
complete before camera 1 sync message has been acked as it is the one on 
top of the camera thread stack. So camera 1 stack frames need to unwind 
first before camera 0 IPC message can complete. More recent messages are 
delaying the completion of the older messages that are more likely to 
timeout
- The timeout construct is not robust in multi-camera: camera 0 may 
timeout while camera 1 is being busy-waited. In that case, camera 0 
timeout message will resume the thread in the camera 1 call() function, 
be ignored, and the wake up event for camera 0 get lost

This is a fairly complex construct, and the behavior is coupled to the 
EventDispatcher implementation.
Intent of that change is to propose a simpler construct without camera 
contexts nesting in the stack: for each sync message, the camera thread 
is blocked on a condition variable waiting for the message ack. A 
supplementary thread becomes necessary to be able to handle the socket 
receipt path, and eventually unblock the camera thread.

>>
>> The other issue is that in case of concurrent synchronous IPC messages
>> sent by multiple pipeline handlers, there is a possible recursion
>> of sendSync() / processEvents() nested in the camera thread stack. As
>> commented in the source, that is a dangerous construct that can lead to
>> a hang.
>> The reason is that the last synchronous message sent is the deepest in
>> the stack. It is also the one whose acknowledgment is being busy waited.
>> However other pending synchronous messages may have been initiated before
>> and are upper in the stack. If they timeout, the condition is not
>> detected because of the stack recursion, as the thread is busy waiting
>> for the last message to be acknowledged.
> 
> Without a large-scale refactor, I see two choices:
> 
>    (1) dispatch the event loop recursively;
>    (2) block the event loop;
> 
> the proposed solution here appears to be (2). But then what is not clear
> to me is why you're not just running a poll() in a loop on that single
> socket? Why do we need an extra thread?

That is correct, proposed solution is (2).

I am not sure what you mean by running a poll() in loop on that single 
socket.

 From libcamera viewpoint, the socket receive path is used for 2 cases:
1) Pipeline-handler-originated sync message acknowldegement issued by IPA
2) Asynchronous message issued by IPA

IPCUnixtSocket already handles a poll() on the socket via a combination 
of EventNotifier() + manual poll in IPCUnixSocket::dataNotifier() to 
check the payload availability.

Instead of using an additional thread, I presume we could instead change 
the existing busy-wait loop in IPCPipeUnixSocket::call() by:
- removing processEvents() call
- adding the equivalent of IPCUnixSocket::dataNotifier() without 
readyRead signal emission, followed by IPCPipeUnixSocket::readyRead()
Then some sort of timeout management would also still be needed.

I am struggling to figure out if that would play smoothly with the IPA 
asynchronous messages.
IMO blocking the camera thread and rely on a dedicated thread to handle 
the IPCUnixSocket::dataNotifier() / IPCPipeUnixSocket::readyRead() is 
simpler and safer. The other advantage would be that it keeps the 
IPCUnixSocket implementation unchanged.

Thanks,
Julien

> 
> 
> Regards,
> Barnabás Pőcze
> 
>>
>> This change implements a safer mechanism to handle the synchronous
>> message sending, similar to the one used for non isolated IPA. The
>> IPCUnixSocketWrapper class is introduced to handle the IPCUnixSocket
>> receive signal in a dedicated thread.
>> Doing so, the sending thread, when emiting a synchronous message, can be
>> blocked without event dispatcher's processEvents() usage, which avoids
>> the risky stack recursion.
>>
>> Fixes: 21f1b555b ("libcamera: Add IPCPipe implementation based on unix 
>> socket")
>>
>> Signed-off-by: Julien Vuillaumier <julien.vuillaumier@nxp.com>
>> ---
>>   .../libcamera/internal/ipc_pipe_unixsocket.h  |  13 +-
>>   src/libcamera/ipc_pipe_unixsocket.cpp         | 242 +++++++++++++-----
>>   2 files changed, 178 insertions(+), 77 deletions(-)
>>
>> diff --git a/include/libcamera/internal/ipc_pipe_unixsocket.h b/ 
>> include/libcamera/internal/ipc_pipe_unixsocket.h
>> index 8c972613..280639d5 100644
>> --- a/include/libcamera/internal/ipc_pipe_unixsocket.h
>> +++ b/include/libcamera/internal/ipc_pipe_unixsocket.h
>> @@ -16,6 +16,7 @@
>>   namespace libcamera {
>>
>>   class Process;
>> +class IPCUnixSocketWrapper;
>>
>>   class IPCPipeUnixSocket : public IPCPipe
>>   {
>> @@ -29,18 +30,8 @@ public:
>>       int sendAsync(const IPCMessage &data) override;
>>
>>   private:
>> -     struct CallData {
>> -             IPCUnixSocket::Payload *response;
>> -             bool done;
>> -     };
>> -
>> -     void readyRead();
>> -     int call(const IPCUnixSocket::Payload &message,
>> -              IPCUnixSocket::Payload *response, uint32_t seq);
>> -
>>       std::unique_ptr<Process> proc_;
>> -     std::unique_ptr<IPCUnixSocket> socket_;
>> -     std::map<uint32_t, CallData> callData_;
>> +     std::unique_ptr<IPCUnixSocketWrapper> socketWrap_;
>>   };
>>
>>   } /* namespace libcamera */
>> diff --git a/src/libcamera/ipc_pipe_unixsocket.cpp b/src/libcamera/ 
>> ipc_pipe_unixsocket.cpp
>> index 668ec73b..eb5408d4 100644
>> --- a/src/libcamera/ipc_pipe_unixsocket.cpp
>> +++ b/src/libcamera/ipc_pipe_unixsocket.cpp
>> @@ -9,10 +9,9 @@
>>
>>   #include <vector>
>>
>> -#include <libcamera/base/event_dispatcher.h>
>>   #include <libcamera/base/log.h>
>> +#include <libcamera/base/mutex.h>
>>   #include <libcamera/base/thread.h>
>> -#include <libcamera/base/timer.h>
>>
>>   #include "libcamera/internal/ipc_pipe.h"
>>   #include "libcamera/internal/ipc_unixsocket.h"
>> @@ -24,67 +23,161 @@ namespace libcamera {
>>
>>   LOG_DECLARE_CATEGORY(IPCPipe)
>>
>> -IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath,
>> -                                  const char *ipaProxyWorkerPath)
>> -     : IPCPipe()
>> +class IPCUnixSocketWrapper : Thread
>>   {
>> -     std::vector<int> fds;
>> -     std::vector<std::string> args;
>> -     args.push_back(ipaModulePath);
>> +public:
>> +     IPCUnixSocketWrapper(Signal<const IPCMessage &> *recv)
>> +             : recv_(recv), ready_(false), sendSyncPending_(false),
>> +               sendSyncCookie_(0)
>> +     {
>> +             start();
>> +     }
>>
>> -     socket_ = std::make_unique<IPCUnixSocket>();
>> -     UniqueFD fd = socket_->create();
>> -     if (!fd.isValid()) {
>> -             LOG(IPCPipe, Error) << "Failed to create socket";
>> -             return;
>> +     ~IPCUnixSocketWrapper()
>> +     {
>> +             exit();
>> +             wait();
>>       }
>> -     socket_->readyRead.connect(this, &IPCPipeUnixSocket::readyRead);
>> -     args.push_back(std::to_string(fd.get()));
>> -     fds.push_back(fd.get());
>>
>> -     proc_ = std::make_unique<Process>();
>> -     int ret = proc_->start(ipaProxyWorkerPath, args, fds);
>> -     if (ret) {
>> -             LOG(IPCPipe, Error)
>> -                     << "Failed to start proxy worker process";
>> -             return;
>> +     void run() override
>> +     {
>> +             /*
>> +              * IPC socket construction and connection to its readyRead
>> +              * signal has to be done from the IPC thread so that the
>> +              * relevant Object instances (EventNotifier, slot) are 
>> bound to
>> +              * its context.
>> +              */
>> +             init();
>> +             exec();
>> +             deinit();
>>       }
>>
>> -     connected_ = true;
>> -}
>> +     int fd() { return fd_.get(); }
>> +     int sendSync(const IPCMessage &in, IPCMessage *out);
>> +     int sendAsync(const IPCMessage &data);
>> +     bool waitReady();
>>
>> -IPCPipeUnixSocket::~IPCPipeUnixSocket()
>> -{
>> -}
>> +private:
>> +     void init();
>> +     void deinit();
>> +     void readyRead();
>>
>> -int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out)
>> +     UniqueFD fd_;
>> +     Signal<const IPCMessage &> *recv_;
>> +     ConditionVariable cv_;
>> +     Mutex mutex_;
>> +     bool ready_;
>> +     bool sendSyncPending_;
>> +     uint32_t sendSyncCookie_;
>> +     IPCUnixSocket::Payload *sendSyncResponse_;
>> +
>> +     /* Socket shall be constructed and destructed from IPC thread 
>> context */
>> +     std::unique_ptr<IPCUnixSocket> socket_;
>> +};
>> +
>> +int IPCUnixSocketWrapper::sendSync(const IPCMessage &in, IPCMessage 
>> *out)
>>   {
>> +     int ret;
>>       IPCUnixSocket::Payload response;
>>
>> -     int ret = call(in.payload(), &response, in.header().cookie);
>> +     mutex_.lock();
>> +     ASSERT(!sendSyncPending_);
>> +     sendSyncPending_ = true;
>> +     sendSyncCookie_ = in.header().cookie;
>> +     sendSyncResponse_ = &response;
>> +     mutex_.unlock();
>> +
>> +     ret = socket_->send(in.payload());
>>       if (ret) {
>> -             LOG(IPCPipe, Error) << "Failed to call sync";
>> -             return ret;
>> +             LOG(IPCPipe, Error) << "Failed to send sync message";
>> +             goto cleanup;
>> +     }
>> +
>> +     bool complete;
>> +     {
>> +             MutexLocker locker(mutex_);
>> +             auto syncComplete = ([&]() {
>> +                     return sendSyncPending_ == false;
>> +             });
>> +             complete = cv_.wait_for(locker, 1000ms, syncComplete);
>> +     }
>> +
>> +     if (!complete) {
>> +             LOG(IPCPipe, Error) << "Timeout sending sync message";
>> +             ret = -ETIMEDOUT;
>> +             goto cleanup;
>>       }
>>
>>       if (out)
>>               *out = IPCMessage(response);
>>
>>       return 0;
>> +
>> +cleanup:
>> +     mutex_.lock();
>> +     sendSyncPending_ = false;
>> +     mutex_.unlock();
>> +
>> +     return ret;
>>   }
>>
>> -int IPCPipeUnixSocket::sendAsync(const IPCMessage &data)
>> +int IPCUnixSocketWrapper::sendAsync(const IPCMessage &data)
>>   {
>> -     int ret = socket_->send(data.payload());
>> -     if (ret) {
>> -             LOG(IPCPipe, Error) << "Failed to call async";
>> -             return ret;
>> +     int ret;
>> +     ret = socket_->send(data.payload());
>> +     if (ret)
>> +             LOG(IPCPipe, Error) << "Failed to send sync message";
>> +     return ret;
>> +}
>> +
>> +bool IPCUnixSocketWrapper::waitReady()
>> +{
>> +     bool ready;
>> +     {
>> +             MutexLocker locker(mutex_);
>> +             auto isReady = ([&]() {
>> +                     return ready_;
>> +             });
>> +             ready = cv_.wait_for(locker, 1000ms, isReady);
>>       }
>>
>> -     return 0;
>> +     return ready;
>> +}
>> +
>> +void IPCUnixSocketWrapper::init()
>> +{
>> +     /* Init is to be done from the IPC thread context */
>> +     ASSERT(Thread::current() == this);
>> +
>> +     socket_ = std::make_unique<IPCUnixSocket>();
>> +     fd_ = socket_->create();
>> +     if (!fd_.isValid()) {
>> +             LOG(IPCPipe, Error) << "Failed to create socket";
>> +             return;
>> +     }
>> +
>> +     socket_->readyRead.connect(this, &IPCUnixSocketWrapper::readyRead);
>> +
>> +     mutex_.lock();
>> +     ready_ = true;
>> +     mutex_.unlock();
>> +     cv_.notify_one();
>>   }
>>
>> -void IPCPipeUnixSocket::readyRead()
>> +void IPCUnixSocketWrapper::deinit()
>> +{
>> +     /* Deinit is to be done from the IPC thread context */
>> +     ASSERT(Thread::current() == this);
>> +
>> +     socket_->readyRead.disconnect(this);
>> +     socket_.reset();
>> +
>> +     mutex_.lock();
>> +     ready_ = false;
>> +     mutex_.unlock();
>> +}
>> +
>> +void IPCUnixSocketWrapper::readyRead()
>>   {
>>       IPCUnixSocket::Payload payload;
>>       int ret = socket_->receive(&payload);
>> @@ -93,55 +186,72 @@ void IPCPipeUnixSocket::readyRead()
>>               return;
>>       }
>>
>> -     /* \todo Use span to avoid the double copy when callData is 
>> found. */
>>       if (payload.data.size() < sizeof(IPCMessage::Header)) {
>>               LOG(IPCPipe, Error) << "Not enough data received";
>>               return;
>>       }
>>
>> -     IPCMessage ipcMessage(payload);
>> +     const IPCMessage::Header *header =
>> +             reinterpret_cast<IPCMessage::Header 
>> *>(payload.data.data());
>> +     bool syncComplete = false;
>> +     mutex_.lock();
>> +     if (sendSyncPending_ && sendSyncCookie_ == header->cookie) {
>> +             syncComplete = true;
>> +             sendSyncPending_ = false;
>> +             *sendSyncResponse_ = std::move(payload);
>> +     }
>> +     mutex_.unlock();
>>
>> -     auto callData = callData_.find(ipcMessage.header().cookie);
>> -     if (callData != callData_.end()) {
>> -             *callData->second.response = std::move(payload);
>> -             callData->second.done = true;
>> +     if (syncComplete) {
>> +             cv_.notify_one();
>>               return;
>>       }
>>
>>       /* Received unexpected data, this means it's a call from the 
>> IPA. */
>> -     recv.emit(ipcMessage);
>> +     IPCMessage ipcMessage(payload);
>> +     recv_->emit(ipcMessage);
>>   }
>>
>> -int IPCPipeUnixSocket::call(const IPCUnixSocket::Payload &message,
>> -                         IPCUnixSocket::Payload *response, uint32_t 
>> cookie)
>> +IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath,
>> +                                  const char *ipaProxyWorkerPath)
>> +     : IPCPipe()
>>   {
>> -     Timer timeout;
>> -     int ret;
>> +     socketWrap_ = std::make_unique<IPCUnixSocketWrapper>(&recv);
>> +     if (!socketWrap_->waitReady()) {
>> +             LOG(IPCPipe, Error) << "Failed to create socket";
>> +             return;
>> +     }
>> +     int fd = socketWrap_->fd();
>>
>> -     const auto result = callData_.insert({ cookie, { response, 
>> false } });
>> -     const auto &iter = result.first;
>> +     std::vector<int> fds;
>> +     std::vector<std::string> args;
>> +     args.push_back(ipaModulePath);
>> +     args.push_back(std::to_string(fd));
>> +     fds.push_back(fd);
>>
>> -     ret = socket_->send(message);
>> +     proc_ = std::make_unique<Process>();
>> +     int ret = proc_->start(ipaProxyWorkerPath, args, fds);
>>       if (ret) {
>> -             callData_.erase(iter);
>> -             return ret;
>> +             LOG(IPCPipe, Error)
>> +                     << "Failed to start proxy worker process";
>> +             return;
>>       }
>>
>> -     /* \todo Make this less dangerous, see IPCPipe::sendSync() */
>> -     timeout.start(2000ms);
>> -     while (!iter->second.done) {
>> -             if (!timeout.isRunning()) {
>> -                     LOG(IPCPipe, Error) << "Call timeout!";
>> -                     callData_.erase(iter);
>> -                     return -ETIMEDOUT;
>> -             }
>> +     connected_ = true;
>> +}
>>
>> -             Thread::current()->eventDispatcher()->processEvents();
>> -     }
>> +IPCPipeUnixSocket::~IPCPipeUnixSocket()
>> +{
>> +}
>>
>> -     callData_.erase(iter);
>> +int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out)
>> +{
>> +     return socketWrap_->sendSync(in, out);
>> +}
>>
>> -     return 0;
>> +int IPCPipeUnixSocket::sendAsync(const IPCMessage &data)
>> +{
>> +     return socketWrap_->sendAsync(data);
>>   }
>>
>>   } /* namespace libcamera */
>
Paul Elder Sept. 3, 2025, 1:36 p.m. UTC | #7
Hi Julien,

Quoting Julien Vuillaumier (2025-08-29 18:42:03)
> Hi Paul,
> 
> 
> On 26/08/2025 12:28, Paul Elder wrote:
> > 
> > Hi Julien,
> > 
> > Thanks for the patch.
> > 
> > Quoting Julien Vuillaumier (2024-12-20 23:55:56)
> >> Unix-socket based IPC sometimes times out or hangs, typically
> >> when multiple camera are stopped simulaneously. That specific case
> >> triggers the concurrent sending by each pipeline handler instance
> >> of a synchronous stop() message to its peer IPA process.
> >>
> >> There is a dedicated IPC socket per camera. Sockets payload receipt
> >> signals all run in the camera manager thread.
> >> To send a synchronous message to IPA, pipeline invokes from camera
> >> thread IPCPipeUnixSocket::sendSync(). This sends the message then blocks
> >> busy waiting for the peer acknowledgment. Such busy wait is done by
> >> blocking on event loop calling dispatchEvent(), until the ack condition
> >> is detected.
> >>
> >> One issue is that the socket receive slot readyRead() wakes up the
> >> blocked thread via libcamera::Message receipt. Even though such message
> >> resumes processEvents(), it may reblock immediately because readyRead()
> >> does not interrupt() explictly the dispatcher.
> >> Most of the time, an other pending event for the thread unblocks the
> >> event dispatcher and the ack condition is detected - in worst case the 2
> >> sec timeout kicks in. Once unblocked, the dispatcher let the message
> >> acknowledgment to be detected and the sendSync() completes.
> >>
> >> The other issue is that in case of concurrent synchronous IPC messages
> >> sent by multiple pipeline handlers, there is a possible recursion
> >> of sendSync() / processEvents() nested in the camera thread stack. As
> >> commented in the source, that is a dangerous construct that can lead to
> >> a hang.
> >> The reason is that the last synchronous message sent is the deepest in
> >> the stack. It is also the one whose acknowledgment is being busy waited.
> >> However other pending synchronous messages may have been initiated before
> >> and are upper in the stack. If they timeout, the condition is not
> >> detected because of the stack recursion, as the thread is busy waiting
> >> for the last message to be acknowledged.
> > 
> > Ok, I think I understand the issue.
>  > >>
> >> This change implements a safer mechanism to handle the synchronous
> >> message sending, similar to the one used for non isolated IPA. The
> >> IPCUnixSocketWrapper class is introduced to handle the IPCUnixSocket
> >> receive signal in a dedicated thread.
> >> Doing so, the sending thread, when emiting a synchronous message, can be
> >> blocked without event dispatcher's processEvents() usage, which avoids
> >> the risky stack recursion.

While I showed initial positive response, after some internal discussion we've
uncovered some issues...

The main design goal is that we would like to not block threads.

However, by definition, synchronous calls block the calling thread. At the
moment we have a single thread for all cameras, so blocking on one camera
blocks all cameras, thus leading to the problem that you have discovered.

So how can we design this better?

One option would be to create one thread per camera, but that wil be very
difficult as all pipeline handlers would have to deal with thread
synchronization. One thread per pipeline handler *might* be feasible.

Another option is to run nested event loops. We would need to generalize the
mechanism that exists today in IPCPipeUnixSocket::call(), and validate all side
effects and reentrant calls that the pipeline handler might not expect.

Another option is to remove sync calls and have only async calls. This seems to
easiest, as the only sync calls in any IPA interface are: init, start, stop,
configure, mapBuffers, and unmapBuffers. Since none of these are in a hot
path, I don't think it would cause that much of a problem to convert them all
to async calls. The pipeline handler would be come slightly more complex 

The main challenge I think is that stop() is called from the application, and
from the application point of view this call must be synchronous and blocking.
Thus we would need some sort of asyncio type of mechanism. Perhaps we could use
std::future or std::promise, or some other asyncio library for C++?


Thanks,

Paul

> >>
> >> Fixes: 21f1b555b ("libcamera: Add IPCPipe implementation based on unix socket")
> >>
> >> Signed-off-by: Julien Vuillaumier <julien.vuillaumier@nxp.com>
> >> ---
> >>   .../libcamera/internal/ipc_pipe_unixsocket.h  |  13 +-
> >>   src/libcamera/ipc_pipe_unixsocket.cpp         | 242 +++++++++++++-----
> >>   2 files changed, 178 insertions(+), 77 deletions(-)
> >>
> >> diff --git a/include/libcamera/internal/ipc_pipe_unixsocket.h b/include/libcamera/internal/ipc_pipe_unixsocket.h
> >> index 8c972613..280639d5 100644
> >> --- a/include/libcamera/internal/ipc_pipe_unixsocket.h
> >> +++ b/include/libcamera/internal/ipc_pipe_unixsocket.h
> >> @@ -16,6 +16,7 @@
> >>   namespace libcamera {
> >>
> >>   class Process;
> >> +class IPCUnixSocketWrapper;
> >>
> >>   class IPCPipeUnixSocket : public IPCPipe
> >>   {
> >> @@ -29,18 +30,8 @@ public:
> >>          int sendAsync(const IPCMessage &data) override;
> >>
> >>   private:
> >> -       struct CallData {
> >> -               IPCUnixSocket::Payload *response;
> >> -               bool done;
> >> -       };
> >> -
> >> -       void readyRead();
> >> -       int call(const IPCUnixSocket::Payload &message,
> >> -                IPCUnixSocket::Payload *response, uint32_t seq);
> >> -
> >>          std::unique_ptr<Process> proc_;
> >> -       std::unique_ptr<IPCUnixSocket> socket_;
> >> -       std::map<uint32_t, CallData> callData_;
> >> +       std::unique_ptr<IPCUnixSocketWrapper> socketWrap_;
> > 
> > afaict you're essentially deleting the whole IPCPipe class and reimplementing
> > it. What's wrong with changing it directly instead of adding a wrapper?
> > 
> 
> The IPCPipeUnixSocket class is used by libcamera to create the proxy 
> worker process and the IPC to communicate with the proxy. The IPC part 
> relies on the IPCUnixSocket class (asynchronous write, no threading) - 
> and the same IPCUnixSocket class is also used by the proxy worker in the 
> auto generated code.
> 
> The intent of the IPCUnixSocketWrapper was to provide an higher level 
> interface for the IPCUnixSocket class, more specific to the 
> IPCPipeUnixSocket class usage: threading for the receipt path, 
> sync/async transmit, IPCMessage to IPCUnixSocket::Payload conversions...
> 
> As you mentioned, IPCPipeUnixSocket becomes fairly slim.
> Now I can look at the option to move the IPCUnixSocketWrapper class 
> content into the  IPCPipeUnixSocket class, if that is a preferred approach.
> 
> 
> >>   };
> >>
> >>   } /* namespace libcamera */
> >> diff --git a/src/libcamera/ipc_pipe_unixsocket.cpp b/src/libcamera/ipc_pipe_unixsocket.cpp
> >> index 668ec73b..eb5408d4 100644
> >> --- a/src/libcamera/ipc_pipe_unixsocket.cpp
> >> +++ b/src/libcamera/ipc_pipe_unixsocket.cpp
> >> @@ -9,10 +9,9 @@
> >>
> >>   #include <vector>
> >>
> >> -#include <libcamera/base/event_dispatcher.h>
> >>   #include <libcamera/base/log.h>
> >> +#include <libcamera/base/mutex.h>
> >>   #include <libcamera/base/thread.h>
> >> -#include <libcamera/base/timer.h>
> >>
> >>   #include "libcamera/internal/ipc_pipe.h"
> >>   #include "libcamera/internal/ipc_unixsocket.h"
> >> @@ -24,67 +23,161 @@ namespace libcamera {
> >>
> >>   LOG_DECLARE_CATEGORY(IPCPipe)
> >>
> >> -IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath,
> >> -                                    const char *ipaProxyWorkerPath)
> >> -       : IPCPipe()
> >> +class IPCUnixSocketWrapper : Thread
> >>   {
> >> -       std::vector<int> fds;
> >> -       std::vector<std::string> args;
> >> -       args.push_back(ipaModulePath);
> >> +public:
> >> +       IPCUnixSocketWrapper(Signal<const IPCMessage &> *recv)
> >> +               : recv_(recv), ready_(false), sendSyncPending_(false),
> >> +                 sendSyncCookie_(0)
> >> +       {
> >> +               start();
> >> +       }
> >>
> >> -       socket_ = std::make_unique<IPCUnixSocket>();
> >> -       UniqueFD fd = socket_->create();
> >> -       if (!fd.isValid()) {
> >> -               LOG(IPCPipe, Error) << "Failed to create socket";
> >> -               return;
> >> +       ~IPCUnixSocketWrapper()
> >> +       {
> >> +               exit();
> >> +               wait();
> >>          }
> >> -       socket_->readyRead.connect(this, &IPCPipeUnixSocket::readyRead);
> >> -       args.push_back(std::to_string(fd.get()));
> >> -       fds.push_back(fd.get());
> >>
> >> -       proc_ = std::make_unique<Process>();
> >> -       int ret = proc_->start(ipaProxyWorkerPath, args, fds);
> >> -       if (ret) {
> >> -               LOG(IPCPipe, Error)
> >> -                       << "Failed to start proxy worker process";
> >> -               return;
> >> +       void run() override
> >> +       {
> >> +               /*
> >> +                * IPC socket construction and connection to its readyRead
> >> +                * signal has to be done from the IPC thread so that the
> >> +                * relevant Object instances (EventNotifier, slot) are bound to
> >> +                * its context.
> >> +                */
> >> +               init();
> >> +               exec();
> >> +               deinit();
> >>          }
> >>
> >> -       connected_ = true;
> >> -}
> >> +       int fd() { return fd_.get(); }
> >> +       int sendSync(const IPCMessage &in, IPCMessage *out);
> >> +       int sendAsync(const IPCMessage &data);
> >> +       bool waitReady();
> >>
> >> -IPCPipeUnixSocket::~IPCPipeUnixSocket()
> >> -{
> >> -}
> >> +private:
> >> +       void init();
> >> +       void deinit();
> >> +       void readyRead();
> >>
> >> -int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out)
> >> +       UniqueFD fd_;
> >> +       Signal<const IPCMessage &> *recv_;
> >> +       ConditionVariable cv_;
> >> +       Mutex mutex_;
> > 
> > So the mutex protects everything from here...
> > 
> >> +       bool ready_;
> >> +       bool sendSyncPending_;
> >> +       uint32_t sendSyncCookie_;
> >> +       IPCUnixSocket::Payload *sendSyncResponse_;
> > 
> > ...to here
> 
> I am not sure about this comment. But the git patch output may be a bit 
> misleading here: that line is only the `Mutex mutex_` variable 
> declaration, as a member of the IPCUnixSocketWrapper class.
> The variables that `mutex_` actually protects depend on the functions 
> implementation where `mutex_` is used through lock()/unlock() sections 
> and MutexLocker-protected scopes.
> 
> > 
> >> +
> >> +       /* Socket shall be constructed and destructed from IPC thread context */
> >> +       std::unique_ptr<IPCUnixSocket> socket_;
> >> +};
> >> +
> >> +int IPCUnixSocketWrapper::sendSync(const IPCMessage &in, IPCMessage *out)
> >>   {
> >> +       int ret;
> >>          IPCUnixSocket::Payload response;
> >>
> >> -       int ret = call(in.payload(), &response, in.header().cookie);
> >> +       mutex_.lock();
> >> +       ASSERT(!sendSyncPending_);
> >> +       sendSyncPending_ = true;
> >> +       sendSyncCookie_ = in.header().cookie;
> >> +       sendSyncResponse_ = &response;
> >> +       mutex_.unlock();
> >> +
> >> +       ret = socket_->send(in.payload());
> >>          if (ret) {
> >> -               LOG(IPCPipe, Error) << "Failed to call sync";
> >> -               return ret;
> >> +               LOG(IPCPipe, Error) << "Failed to send sync message";
> >> +               goto cleanup;
> >> +       }
> >> +
> >> +       bool complete;
> >> +       {
> >> +               MutexLocker locker(mutex_);
> >> +               auto syncComplete = ([&]() {
> >> +                       return sendSyncPending_ == false;
> >> +               });
> >> +               complete = cv_.wait_for(locker, 1000ms, syncComplete);
> >> +       }
> > 
> > Ok, I think I like this idea.
> 
> Thanks :)
> 
> > 
> > I still don't see why it needs to be in a wrapper class, though.
> 
> As mentioned above, I can look at the option to remove the wrapper class 
> to keep only the IPCUnixSocketWrapper if desired.
> 
> > 
> >> +
> >> +       if (!complete) {
> >> +               LOG(IPCPipe, Error) << "Timeout sending sync message";
> >> +               ret = -ETIMEDOUT;
> >> +               goto cleanup;
> >>          }
> >>
> >>          if (out)
> >>                  *out = IPCMessage(response);
> >>
> >>          return 0;
> >> +
> >> +cleanup:
> >> +       mutex_.lock();
> >> +       sendSyncPending_ = false;
> >> +       mutex_.unlock();
> >> +
> >> +       return ret;
> >>   }
> >>
> >> -int IPCPipeUnixSocket::sendAsync(const IPCMessage &data)
> >> +int IPCUnixSocketWrapper::sendAsync(const IPCMessage &data)
> >>   {
> >> -       int ret = socket_->send(data.payload());
> >> -       if (ret) {
> >> -               LOG(IPCPipe, Error) << "Failed to call async";
> >> -               return ret;
> >> +       int ret;
> >> +       ret = socket_->send(data.payload());
> >> +       if (ret)
> >> +               LOG(IPCPipe, Error) << "Failed to send sync message";
> >> +       return ret;
> >> +}
> >> +
> >> +bool IPCUnixSocketWrapper::waitReady()
> >> +{
> >> +       bool ready;
> >> +       {
> >> +               MutexLocker locker(mutex_);
> >> +               auto isReady = ([&]() {
> >> +                       return ready_;
> >> +               });
> >> +               ready = cv_.wait_for(locker, 1000ms, isReady);
> >>          }
> >>
> >> -       return 0;
> >> +       return ready;
> >> +}
> > 
> > I don't understand why this function is needed. It only waits to make sure that
> > init() is done, but afaict nothing in init() is threaded. Especially if there
> > is no wrapper. Why can't IPCUnixSocket live directly in another thread?
> 
> The init() function is executed from the wrapper thread - see the 
> IPCUnixSocketWrapper::run() function that calls in sequence: init(), 
> exec() then deinit().
> The Object bound to the wrapper thread (EventNotifier, slots...) have to 
> be instantiated and later destroyed from the wrapper thread itself. 
> Thus, they can not be created by the wrapper constructor that is 
> executed in the camera manager thread ; their creation has to be 
> deferred to the wrapper thread execution. The waitReady() checks that 
> this thread is up and running which indicates that all objects needed 
> for the class operation have been created.
> 
> If there is no more wrapper because the threading implementation is 
> moved to the IPCPipeUnixSocket class, the Object instances 
> (EventNotifier, slots...) associated to the new thread will still have 
> to be created from there as well.
> Similar synchronization mechanism will be necessary in the 
> IPCPipeUnixSocket constructor to wait for that other thread to be active 
> and its associated Objects created.
> 
> > 
> >> +
> >> +void IPCUnixSocketWrapper::init()
> >> +{
> >> +       /* Init is to be done from the IPC thread context */
> >> +       ASSERT(Thread::current() == this);
> >> +
> >> +       socket_ = std::make_unique<IPCUnixSocket>();
> >> +       fd_ = socket_->create();
> >> +       if (!fd_.isValid()) {
> >> +               LOG(IPCPipe, Error) << "Failed to create socket";
> >> +               return;
> >> +       }
> >> +
> >> +       socket_->readyRead.connect(this, &IPCUnixSocketWrapper::readyRead);
> >> +
> >> +       mutex_.lock();
> >> +       ready_ = true;
> >> +       mutex_.unlock();
> >> +       cv_.notify_one();
> >>   }
> >>
> >> -void IPCPipeUnixSocket::readyRead()
> >> +void IPCUnixSocketWrapper::deinit()
> >> +{
> >> +       /* Deinit is to be done from the IPC thread context */
> >> +       ASSERT(Thread::current() == this);
> >> +
> >> +       socket_->readyRead.disconnect(this);
> >> +       socket_.reset();
> >> +
> >> +       mutex_.lock();
> >> +       ready_ = false;
> >> +       mutex_.unlock();
> >> +}
> >> +
> >> +void IPCUnixSocketWrapper::readyRead()
> >>   {
> >>          IPCUnixSocket::Payload payload;
> >>          int ret = socket_->receive(&payload);
> >> @@ -93,55 +186,72 @@ void IPCPipeUnixSocket::readyRead()
> >>                  return;
> >>          }
> >>
> >> -       /* \todo Use span to avoid the double copy when callData is found. */
> > 
> > Oh nice, this is taken care of.
> > 
> >>          if (payload.data.size() < sizeof(IPCMessage::Header)) {
> >>                  LOG(IPCPipe, Error) << "Not enough data received";
> >>                  return;
> >>          }
> >>
> >> -       IPCMessage ipcMessage(payload);
> >> +       const IPCMessage::Header *header =
> >> +               reinterpret_cast<IPCMessage::Header *>(payload.data.data());
> >> +       bool syncComplete = false;
> >> +       mutex_.lock();
> >> +       if (sendSyncPending_ && sendSyncCookie_ == header->cookie) {
> >> +               syncComplete = true;
> >> +               sendSyncPending_ = false;
> >> +               *sendSyncResponse_ = std::move(payload);
> >> +       }
> >> +       mutex_.unlock();
> > 
> > That's a cool idea.
> > 
> >>
> >> -       auto callData = callData_.find(ipcMessage.header().cookie);
> >> -       if (callData != callData_.end()) {
> >> -               *callData->second.response = std::move(payload);
> >> -               callData->second.done = true;
> >> +       if (syncComplete) {
> >> +               cv_.notify_one();
> >>                  return;
> >>          }
> >>
> >>          /* Received unexpected data, this means it's a call from the IPA. */
> >> -       recv.emit(ipcMessage);
> >> +       IPCMessage ipcMessage(payload);
> >> +       recv_->emit(ipcMessage);
> >>   }
> >>
> >> -int IPCPipeUnixSocket::call(const IPCUnixSocket::Payload &message,
> >> -                           IPCUnixSocket::Payload *response, uint32_t cookie)
> >> +IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath,
> >> +                                    const char *ipaProxyWorkerPath)
> >> +       : IPCPipe()
> >>   {
> >> -       Timer timeout;
> >> -       int ret;
> >> +       socketWrap_ = std::make_unique<IPCUnixSocketWrapper>(&recv);
> >> +       if (!socketWrap_->waitReady()) {
> >> +               LOG(IPCPipe, Error) << "Failed to create socket";
> >> +               return;
> >> +       }
> >> +       int fd = socketWrap_->fd();
> >>
> >> -       const auto result = callData_.insert({ cookie, { response, false } });
> >> -       const auto &iter = result.first;
> >> +       std::vector<int> fds;
> >> +       std::vector<std::string> args;
> >> +       args.push_back(ipaModulePath);
> >> +       args.push_back(std::to_string(fd));
> >> +       fds.push_back(fd);
> > 
> > I'd replace this with:
> > 
> >          std::array args{ std::string(ipaModulePath), std::to_string(fd.get()) };
> >          std::array fds{ fd.get() };
> > 
> > (Since this is how the base code was changed since you when wrote this (sorry I
> > didn't notice this patch earlier (I only noticed because Kieran cc'ed me)))
> 
> Sure, I noticed that part had changed in libcamera v0.5.2.
> I will update in the V2.
> 
> Thanks,
> Julien
> 
> 
> > 
> > Thanks,
> > 
> > Paul
> > 
> >>
> >> -       ret = socket_->send(message);
> >> +       proc_ = std::make_unique<Process>();
> >> +       int ret = proc_->start(ipaProxyWorkerPath, args, fds);
> >>          if (ret) {
> >> -               callData_.erase(iter);
> >> -               return ret;
> >> +               LOG(IPCPipe, Error)
> >> +                       << "Failed to start proxy worker process";
> >> +               return;
> >>          }
> >>
> >> -       /* \todo Make this less dangerous, see IPCPipe::sendSync() */
> >> -       timeout.start(2000ms);
> >> -       while (!iter->second.done) {
> >> -               if (!timeout.isRunning()) {
> >> -                       LOG(IPCPipe, Error) << "Call timeout!";
> >> -                       callData_.erase(iter);
> >> -                       return -ETIMEDOUT;
> >> -               }
> >> +       connected_ = true;
> >> +}
> >>
> >> -               Thread::current()->eventDispatcher()->processEvents();
> >> -       }
> >> +IPCPipeUnixSocket::~IPCPipeUnixSocket()
> >> +{
> >> +}
> >>
> >> -       callData_.erase(iter);
> >> +int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out)
> >> +{
> >> +       return socketWrap_->sendSync(in, out);
> >> +}
> >>
> >> -       return 0;
> >> +int IPCPipeUnixSocket::sendAsync(const IPCMessage &data)
> >> +{
> >> +       return socketWrap_->sendAsync(data);
> >>   }
> >>
> >>   } /* namespace libcamera */
> >> --
> >> 2.34.1
> >>
>
Julien Vuillaumier Sept. 3, 2025, 5:42 p.m. UTC | #8
Hi Paul,

On 03/09/2025 15:36, Paul Elder wrote:
> Hi Julien,
> 
> Quoting Julien Vuillaumier (2025-08-29 18:42:03)
>> Hi Paul,
>>
>>
>> On 26/08/2025 12:28, Paul Elder wrote:
>>>
>>> Hi Julien,
>>>
>>> Thanks for the patch.
>>>
>>> Quoting Julien Vuillaumier (2024-12-20 23:55:56)
>>>> Unix-socket based IPC sometimes times out or hangs, typically
>>>> when multiple camera are stopped simulaneously. That specific case
>>>> triggers the concurrent sending by each pipeline handler instance
>>>> of a synchronous stop() message to its peer IPA process.
>>>>
>>>> There is a dedicated IPC socket per camera. Sockets payload receipt
>>>> signals all run in the camera manager thread.
>>>> To send a synchronous message to IPA, pipeline invokes from camera
>>>> thread IPCPipeUnixSocket::sendSync(). This sends the message then blocks
>>>> busy waiting for the peer acknowledgment. Such busy wait is done by
>>>> blocking on event loop calling dispatchEvent(), until the ack condition
>>>> is detected.
>>>>
>>>> One issue is that the socket receive slot readyRead() wakes up the
>>>> blocked thread via libcamera::Message receipt. Even though such message
>>>> resumes processEvents(), it may reblock immediately because readyRead()
>>>> does not interrupt() explictly the dispatcher.
>>>> Most of the time, an other pending event for the thread unblocks the
>>>> event dispatcher and the ack condition is detected - in worst case the 2
>>>> sec timeout kicks in. Once unblocked, the dispatcher let the message
>>>> acknowledgment to be detected and the sendSync() completes.
>>>>
>>>> The other issue is that in case of concurrent synchronous IPC messages
>>>> sent by multiple pipeline handlers, there is a possible recursion
>>>> of sendSync() / processEvents() nested in the camera thread stack. As
>>>> commented in the source, that is a dangerous construct that can lead to
>>>> a hang.
>>>> The reason is that the last synchronous message sent is the deepest in
>>>> the stack. It is also the one whose acknowledgment is being busy waited.
>>>> However other pending synchronous messages may have been initiated before
>>>> and are upper in the stack. If they timeout, the condition is not
>>>> detected because of the stack recursion, as the thread is busy waiting
>>>> for the last message to be acknowledged.
>>>
>>> Ok, I think I understand the issue.
>>   > >>
>>>> This change implements a safer mechanism to handle the synchronous
>>>> message sending, similar to the one used for non isolated IPA. The
>>>> IPCUnixSocketWrapper class is introduced to handle the IPCUnixSocket
>>>> receive signal in a dedicated thread.
>>>> Doing so, the sending thread, when emiting a synchronous message, can be
>>>> blocked without event dispatcher's processEvents() usage, which avoids
>>>> the risky stack recursion.
> 
> While I showed initial positive response, after some internal discussion we've
> uncovered some issues...
> 
> The main design goal is that we would like to not block threads.
> 
> However, by definition, synchronous calls block the calling thread. At the
> moment we have a single thread for all cameras, so blocking on one camera
> blocks all cameras, thus leading to the problem that you have discovered.

IPC synchronous calls target short functions - the ones involving IPA 
computations like process(), prepare() etc are asynchronous. IMO briefly 
blocking the camera manager thread (I did not measure the duration but 
would expect tens of microsec?) is not the main issue. Main issue is 
rather the nesting of the EventDispatcher::processEvents() calls that is 
a fragile construct in its actual form.

When the IPA is running in a thread (no isolation so no IPC), the camera 
manager thread is also blocked on a condition variable during the 
synchronous call from the pipeline handler to the IPA thread.

On stop() command issued by the app thread, the camera manager thread 
call stack is something like this:

libcamera::ConditionVariable::wait<>()
libcamera::Semaphore::acquire()
libcamera::BoundMethodBase::activatePack()
libcamera::BoundMethodMember<>::activate
libcamera::Object::invokeMethod<>(func=IPAProxyNxpNeo::ThreadProxy::stop(), 
type=ConnectionTypeBlocking)
libcamera::ipa::nxpneo::IPAProxyNxpNeo::stopThread()
libcamera::ipa::nxpneo::IPAProxyNxpNeo::stop()
libcamera::NxpNeoCameraData::stopDevice()
libcamera::PipelineHandlerNxpNeo::stopDevice()
libcamera::PipelineHandler::stop()
libcamera::BoundMethodMember<>::invoke()
libcamera::BoundMethodArgs()
libcamera::InvokeMessage::invoke()
libcamera::Object::message()
libcamera::Thread::dispatchMessages()
libcamera::EventDispatcherPoll::processEvents()
libcamera::Thread::exec()
libcamera::CameraManager::Private::run()
libcamera::Thread::startThread()

Semaphore is acquired by camera thread in 
BoundMethodBase::activatePack() implementation, and released in 
Object::message() from the IPA thread after the remote function 
execution. During that time the camera thread is blocked, there is no 
EventDispatcher::processEvent() loop, and that does not cause problems 
even in multi-camera.

For that reason, implementing the same blocking scheme for IPC seemed to 
be a reasonable option.

> 
> So how can we design this better?
> 
> One option would be to create one thread per camera, but that wil be very
> difficult as all pipeline handlers would have to deal with thread
> synchronization. One thread per pipeline handler *might* be feasible.

Using one thread per pipeline handler -if doable-  would not help: a 
single pipeline handler instance may handle multiple cameras when they 
share media device resources.
So we could still have multiple cameras operated from the same thread.


> Another option is to run nested event loops. We would need to generalize the
> mechanism that exists today in IPCPipeUnixSocket::call(), and validate all side
> effects and reentrant calls that the pipeline handler might not expect.

It is an option but probably a complex tasks: its is an intricate 
construct and does require a very fine understanding of the 
implementation of the threading, event loop, messaging, timers 
management etc framework.

> 
> Another option is to remove sync calls and have only async calls. This seems to
> easiest, as the only sync calls in any IPA interface are: init, start, stop,
> configure, mapBuffers, and unmapBuffers. Since none of these are in a hot
> path, I don't think it would cause that much of a problem to convert them all
> to async calls. The pipeline handler would be come slightly more complex
> 
> The main challenge I think is that stop() is called from the application, and
> from the application point of view this call must be synchronous and blocking.
> Thus we would need some sort of asyncio type of mechanism. Perhaps we could use
> std::future or std::promise, or some other asyncio library for C++?

Moving all synchronous messages to asynchronous would be an option 
indeed. Simple implementation could be for all pipeline handler to split 
their init(), configure(), start() and such ops in multiple chunks to 
wait for an IPA async message confirmation before proceeding, where 
currently synchronization is done via sync message.
That is doable but cost may be high as it impacts all pipeline handlers. 
Also it may negatively impact their readability.

IMO synchronizing IPCPIpeUnixSocket sync message from a IPCUnixSocket 
thread is a simpler option limiting the scope of the changes. Drawback 
is that it blocks the camera thread during some (short) time, but that 
is not more constraining for the camera thread than what we already do 
for the threaded IPA case, so I believe (hope?) that is acceptable.

Thanks,
Julien

> 
> Thanks,
> 
> Paul
> 
>>>>
>>>> Fixes: 21f1b555b ("libcamera: Add IPCPipe implementation based on unix socket")
>>>>
>>>> Signed-off-by: Julien Vuillaumier <julien.vuillaumier@nxp.com>
>>>> ---
>>>>    .../libcamera/internal/ipc_pipe_unixsocket.h  |  13 +-
>>>>    src/libcamera/ipc_pipe_unixsocket.cpp         | 242 +++++++++++++-----
>>>>    2 files changed, 178 insertions(+), 77 deletions(-)
>>>>
>>>> diff --git a/include/libcamera/internal/ipc_pipe_unixsocket.h b/include/libcamera/internal/ipc_pipe_unixsocket.h
>>>> index 8c972613..280639d5 100644
>>>> --- a/include/libcamera/internal/ipc_pipe_unixsocket.h
>>>> +++ b/include/libcamera/internal/ipc_pipe_unixsocket.h
>>>> @@ -16,6 +16,7 @@
>>>>    namespace libcamera {
>>>>
>>>>    class Process;
>>>> +class IPCUnixSocketWrapper;
>>>>
>>>>    class IPCPipeUnixSocket : public IPCPipe
>>>>    {
>>>> @@ -29,18 +30,8 @@ public:
>>>>           int sendAsync(const IPCMessage &data) override;
>>>>
>>>>    private:
>>>> -       struct CallData {
>>>> -               IPCUnixSocket::Payload *response;
>>>> -               bool done;
>>>> -       };
>>>> -
>>>> -       void readyRead();
>>>> -       int call(const IPCUnixSocket::Payload &message,
>>>> -                IPCUnixSocket::Payload *response, uint32_t seq);
>>>> -
>>>>           std::unique_ptr<Process> proc_;
>>>> -       std::unique_ptr<IPCUnixSocket> socket_;
>>>> -       std::map<uint32_t, CallData> callData_;
>>>> +       std::unique_ptr<IPCUnixSocketWrapper> socketWrap_;
>>>
>>> afaict you're essentially deleting the whole IPCPipe class and reimplementing
>>> it. What's wrong with changing it directly instead of adding a wrapper?
>>>
>>
>> The IPCPipeUnixSocket class is used by libcamera to create the proxy
>> worker process and the IPC to communicate with the proxy. The IPC part
>> relies on the IPCUnixSocket class (asynchronous write, no threading) -
>> and the same IPCUnixSocket class is also used by the proxy worker in the
>> auto generated code.
>>
>> The intent of the IPCUnixSocketWrapper was to provide an higher level
>> interface for the IPCUnixSocket class, more specific to the
>> IPCPipeUnixSocket class usage: threading for the receipt path,
>> sync/async transmit, IPCMessage to IPCUnixSocket::Payload conversions...
>>
>> As you mentioned, IPCPipeUnixSocket becomes fairly slim.
>> Now I can look at the option to move the IPCUnixSocketWrapper class
>> content into the  IPCPipeUnixSocket class, if that is a preferred approach.
>>
>>
>>>>    };
>>>>
>>>>    } /* namespace libcamera */
>>>> diff --git a/src/libcamera/ipc_pipe_unixsocket.cpp b/src/libcamera/ipc_pipe_unixsocket.cpp
>>>> index 668ec73b..eb5408d4 100644
>>>> --- a/src/libcamera/ipc_pipe_unixsocket.cpp
>>>> +++ b/src/libcamera/ipc_pipe_unixsocket.cpp
>>>> @@ -9,10 +9,9 @@
>>>>
>>>>    #include <vector>
>>>>
>>>> -#include <libcamera/base/event_dispatcher.h>
>>>>    #include <libcamera/base/log.h>
>>>> +#include <libcamera/base/mutex.h>
>>>>    #include <libcamera/base/thread.h>
>>>> -#include <libcamera/base/timer.h>
>>>>
>>>>    #include "libcamera/internal/ipc_pipe.h"
>>>>    #include "libcamera/internal/ipc_unixsocket.h"
>>>> @@ -24,67 +23,161 @@ namespace libcamera {
>>>>
>>>>    LOG_DECLARE_CATEGORY(IPCPipe)
>>>>
>>>> -IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath,
>>>> -                                    const char *ipaProxyWorkerPath)
>>>> -       : IPCPipe()
>>>> +class IPCUnixSocketWrapper : Thread
>>>>    {
>>>> -       std::vector<int> fds;
>>>> -       std::vector<std::string> args;
>>>> -       args.push_back(ipaModulePath);
>>>> +public:
>>>> +       IPCUnixSocketWrapper(Signal<const IPCMessage &> *recv)
>>>> +               : recv_(recv), ready_(false), sendSyncPending_(false),
>>>> +                 sendSyncCookie_(0)
>>>> +       {
>>>> +               start();
>>>> +       }
>>>>
>>>> -       socket_ = std::make_unique<IPCUnixSocket>();
>>>> -       UniqueFD fd = socket_->create();
>>>> -       if (!fd.isValid()) {
>>>> -               LOG(IPCPipe, Error) << "Failed to create socket";
>>>> -               return;
>>>> +       ~IPCUnixSocketWrapper()
>>>> +       {
>>>> +               exit();
>>>> +               wait();
>>>>           }
>>>> -       socket_->readyRead.connect(this, &IPCPipeUnixSocket::readyRead);
>>>> -       args.push_back(std::to_string(fd.get()));
>>>> -       fds.push_back(fd.get());
>>>>
>>>> -       proc_ = std::make_unique<Process>();
>>>> -       int ret = proc_->start(ipaProxyWorkerPath, args, fds);
>>>> -       if (ret) {
>>>> -               LOG(IPCPipe, Error)
>>>> -                       << "Failed to start proxy worker process";
>>>> -               return;
>>>> +       void run() override
>>>> +       {
>>>> +               /*
>>>> +                * IPC socket construction and connection to its readyRead
>>>> +                * signal has to be done from the IPC thread so that the
>>>> +                * relevant Object instances (EventNotifier, slot) are bound to
>>>> +                * its context.
>>>> +                */
>>>> +               init();
>>>> +               exec();
>>>> +               deinit();
>>>>           }
>>>>
>>>> -       connected_ = true;
>>>> -}
>>>> +       int fd() { return fd_.get(); }
>>>> +       int sendSync(const IPCMessage &in, IPCMessage *out);
>>>> +       int sendAsync(const IPCMessage &data);
>>>> +       bool waitReady();
>>>>
>>>> -IPCPipeUnixSocket::~IPCPipeUnixSocket()
>>>> -{
>>>> -}
>>>> +private:
>>>> +       void init();
>>>> +       void deinit();
>>>> +       void readyRead();
>>>>
>>>> -int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out)
>>>> +       UniqueFD fd_;
>>>> +       Signal<const IPCMessage &> *recv_;
>>>> +       ConditionVariable cv_;
>>>> +       Mutex mutex_;
>>>
>>> So the mutex protects everything from here...
>>>
>>>> +       bool ready_;
>>>> +       bool sendSyncPending_;
>>>> +       uint32_t sendSyncCookie_;
>>>> +       IPCUnixSocket::Payload *sendSyncResponse_;
>>>
>>> ...to here
>>
>> I am not sure about this comment. But the git patch output may be a bit
>> misleading here: that line is only the `Mutex mutex_` variable
>> declaration, as a member of the IPCUnixSocketWrapper class.
>> The variables that `mutex_` actually protects depend on the functions
>> implementation where `mutex_` is used through lock()/unlock() sections
>> and MutexLocker-protected scopes.
>>
>>>
>>>> +
>>>> +       /* Socket shall be constructed and destructed from IPC thread context */
>>>> +       std::unique_ptr<IPCUnixSocket> socket_;
>>>> +};
>>>> +
>>>> +int IPCUnixSocketWrapper::sendSync(const IPCMessage &in, IPCMessage *out)
>>>>    {
>>>> +       int ret;
>>>>           IPCUnixSocket::Payload response;
>>>>
>>>> -       int ret = call(in.payload(), &response, in.header().cookie);
>>>> +       mutex_.lock();
>>>> +       ASSERT(!sendSyncPending_);
>>>> +       sendSyncPending_ = true;
>>>> +       sendSyncCookie_ = in.header().cookie;
>>>> +       sendSyncResponse_ = &response;
>>>> +       mutex_.unlock();
>>>> +
>>>> +       ret = socket_->send(in.payload());
>>>>           if (ret) {
>>>> -               LOG(IPCPipe, Error) << "Failed to call sync";
>>>> -               return ret;
>>>> +               LOG(IPCPipe, Error) << "Failed to send sync message";
>>>> +               goto cleanup;
>>>> +       }
>>>> +
>>>> +       bool complete;
>>>> +       {
>>>> +               MutexLocker locker(mutex_);
>>>> +               auto syncComplete = ([&]() {
>>>> +                       return sendSyncPending_ == false;
>>>> +               });
>>>> +               complete = cv_.wait_for(locker, 1000ms, syncComplete);
>>>> +       }
>>>
>>> Ok, I think I like this idea.
>>
>> Thanks :)
>>
>>>
>>> I still don't see why it needs to be in a wrapper class, though.
>>
>> As mentioned above, I can look at the option to remove the wrapper class
>> to keep only the IPCUnixSocketWrapper if desired.
>>
>>>
>>>> +
>>>> +       if (!complete) {
>>>> +               LOG(IPCPipe, Error) << "Timeout sending sync message";
>>>> +               ret = -ETIMEDOUT;
>>>> +               goto cleanup;
>>>>           }
>>>>
>>>>           if (out)
>>>>                   *out = IPCMessage(response);
>>>>
>>>>           return 0;
>>>> +
>>>> +cleanup:
>>>> +       mutex_.lock();
>>>> +       sendSyncPending_ = false;
>>>> +       mutex_.unlock();
>>>> +
>>>> +       return ret;
>>>>    }
>>>>
>>>> -int IPCPipeUnixSocket::sendAsync(const IPCMessage &data)
>>>> +int IPCUnixSocketWrapper::sendAsync(const IPCMessage &data)
>>>>    {
>>>> -       int ret = socket_->send(data.payload());
>>>> -       if (ret) {
>>>> -               LOG(IPCPipe, Error) << "Failed to call async";
>>>> -               return ret;
>>>> +       int ret;
>>>> +       ret = socket_->send(data.payload());
>>>> +       if (ret)
>>>> +               LOG(IPCPipe, Error) << "Failed to send sync message";
>>>> +       return ret;
>>>> +}
>>>> +
>>>> +bool IPCUnixSocketWrapper::waitReady()
>>>> +{
>>>> +       bool ready;
>>>> +       {
>>>> +               MutexLocker locker(mutex_);
>>>> +               auto isReady = ([&]() {
>>>> +                       return ready_;
>>>> +               });
>>>> +               ready = cv_.wait_for(locker, 1000ms, isReady);
>>>>           }
>>>>
>>>> -       return 0;
>>>> +       return ready;
>>>> +}
>>>
>>> I don't understand why this function is needed. It only waits to make sure that
>>> init() is done, but afaict nothing in init() is threaded. Especially if there
>>> is no wrapper. Why can't IPCUnixSocket live directly in another thread?
>>
>> The init() function is executed from the wrapper thread - see the
>> IPCUnixSocketWrapper::run() function that calls in sequence: init(),
>> exec() then deinit().
>> The Object bound to the wrapper thread (EventNotifier, slots...) have to
>> be instantiated and later destroyed from the wrapper thread itself.
>> Thus, they can not be created by the wrapper constructor that is
>> executed in the camera manager thread ; their creation has to be
>> deferred to the wrapper thread execution. The waitReady() checks that
>> this thread is up and running which indicates that all objects needed
>> for the class operation have been created.
>>
>> If there is no more wrapper because the threading implementation is
>> moved to the IPCPipeUnixSocket class, the Object instances
>> (EventNotifier, slots...) associated to the new thread will still have
>> to be created from there as well.
>> Similar synchronization mechanism will be necessary in the
>> IPCPipeUnixSocket constructor to wait for that other thread to be active
>> and its associated Objects created.
>>
>>>
>>>> +
>>>> +void IPCUnixSocketWrapper::init()
>>>> +{
>>>> +       /* Init is to be done from the IPC thread context */
>>>> +       ASSERT(Thread::current() == this);
>>>> +
>>>> +       socket_ = std::make_unique<IPCUnixSocket>();
>>>> +       fd_ = socket_->create();
>>>> +       if (!fd_.isValid()) {
>>>> +               LOG(IPCPipe, Error) << "Failed to create socket";
>>>> +               return;
>>>> +       }
>>>> +
>>>> +       socket_->readyRead.connect(this, &IPCUnixSocketWrapper::readyRead);
>>>> +
>>>> +       mutex_.lock();
>>>> +       ready_ = true;
>>>> +       mutex_.unlock();
>>>> +       cv_.notify_one();
>>>>    }
>>>>
>>>> -void IPCPipeUnixSocket::readyRead()
>>>> +void IPCUnixSocketWrapper::deinit()
>>>> +{
>>>> +       /* Deinit is to be done from the IPC thread context */
>>>> +       ASSERT(Thread::current() == this);
>>>> +
>>>> +       socket_->readyRead.disconnect(this);
>>>> +       socket_.reset();
>>>> +
>>>> +       mutex_.lock();
>>>> +       ready_ = false;
>>>> +       mutex_.unlock();
>>>> +}
>>>> +
>>>> +void IPCUnixSocketWrapper::readyRead()
>>>>    {
>>>>           IPCUnixSocket::Payload payload;
>>>>           int ret = socket_->receive(&payload);
>>>> @@ -93,55 +186,72 @@ void IPCPipeUnixSocket::readyRead()
>>>>                   return;
>>>>           }
>>>>
>>>> -       /* \todo Use span to avoid the double copy when callData is found. */
>>>
>>> Oh nice, this is taken care of.
>>>
>>>>           if (payload.data.size() < sizeof(IPCMessage::Header)) {
>>>>                   LOG(IPCPipe, Error) << "Not enough data received";
>>>>                   return;
>>>>           }
>>>>
>>>> -       IPCMessage ipcMessage(payload);
>>>> +       const IPCMessage::Header *header =
>>>> +               reinterpret_cast<IPCMessage::Header *>(payload.data.data());
>>>> +       bool syncComplete = false;
>>>> +       mutex_.lock();
>>>> +       if (sendSyncPending_ && sendSyncCookie_ == header->cookie) {
>>>> +               syncComplete = true;
>>>> +               sendSyncPending_ = false;
>>>> +               *sendSyncResponse_ = std::move(payload);
>>>> +       }
>>>> +       mutex_.unlock();
>>>
>>> That's a cool idea.
>>>
>>>>
>>>> -       auto callData = callData_.find(ipcMessage.header().cookie);
>>>> -       if (callData != callData_.end()) {
>>>> -               *callData->second.response = std::move(payload);
>>>> -               callData->second.done = true;
>>>> +       if (syncComplete) {
>>>> +               cv_.notify_one();
>>>>                   return;
>>>>           }
>>>>
>>>>           /* Received unexpected data, this means it's a call from the IPA. */
>>>> -       recv.emit(ipcMessage);
>>>> +       IPCMessage ipcMessage(payload);
>>>> +       recv_->emit(ipcMessage);
>>>>    }
>>>>
>>>> -int IPCPipeUnixSocket::call(const IPCUnixSocket::Payload &message,
>>>> -                           IPCUnixSocket::Payload *response, uint32_t cookie)
>>>> +IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath,
>>>> +                                    const char *ipaProxyWorkerPath)
>>>> +       : IPCPipe()
>>>>    {
>>>> -       Timer timeout;
>>>> -       int ret;
>>>> +       socketWrap_ = std::make_unique<IPCUnixSocketWrapper>(&recv);
>>>> +       if (!socketWrap_->waitReady()) {
>>>> +               LOG(IPCPipe, Error) << "Failed to create socket";
>>>> +               return;
>>>> +       }
>>>> +       int fd = socketWrap_->fd();
>>>>
>>>> -       const auto result = callData_.insert({ cookie, { response, false } });
>>>> -       const auto &iter = result.first;
>>>> +       std::vector<int> fds;
>>>> +       std::vector<std::string> args;
>>>> +       args.push_back(ipaModulePath);
>>>> +       args.push_back(std::to_string(fd));
>>>> +       fds.push_back(fd);
>>>
>>> I'd replace this with:
>>>
>>>           std::array args{ std::string(ipaModulePath), std::to_string(fd.get()) };
>>>           std::array fds{ fd.get() };
>>>
>>> (Since this is how the base code was changed since you when wrote this (sorry I
>>> didn't notice this patch earlier (I only noticed because Kieran cc'ed me)))
>>
>> Sure, I noticed that part had changed in libcamera v0.5.2.
>> I will update in the V2.
>>
>> Thanks,
>> Julien
>>
>>
>>>
>>> Thanks,
>>>
>>> Paul
>>>
>>>>
>>>> -       ret = socket_->send(message);
>>>> +       proc_ = std::make_unique<Process>();
>>>> +       int ret = proc_->start(ipaProxyWorkerPath, args, fds);
>>>>           if (ret) {
>>>> -               callData_.erase(iter);
>>>> -               return ret;
>>>> +               LOG(IPCPipe, Error)
>>>> +                       << "Failed to start proxy worker process";
>>>> +               return;
>>>>           }
>>>>
>>>> -       /* \todo Make this less dangerous, see IPCPipe::sendSync() */
>>>> -       timeout.start(2000ms);
>>>> -       while (!iter->second.done) {
>>>> -               if (!timeout.isRunning()) {
>>>> -                       LOG(IPCPipe, Error) << "Call timeout!";
>>>> -                       callData_.erase(iter);
>>>> -                       return -ETIMEDOUT;
>>>> -               }
>>>> +       connected_ = true;
>>>> +}
>>>>
>>>> -               Thread::current()->eventDispatcher()->processEvents();
>>>> -       }
>>>> +IPCPipeUnixSocket::~IPCPipeUnixSocket()
>>>> +{
>>>> +}
>>>>
>>>> -       callData_.erase(iter);
>>>> +int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out)
>>>> +{
>>>> +       return socketWrap_->sendSync(in, out);
>>>> +}
>>>>
>>>> -       return 0;
>>>> +int IPCPipeUnixSocket::sendAsync(const IPCMessage &data)
>>>> +{
>>>> +       return socketWrap_->sendAsync(data);
>>>>    }
>>>>
>>>>    } /* namespace libcamera */
>>>> --
>>>> 2.34.1
>>>>
>>

Patch
diff mbox series

diff --git a/include/libcamera/internal/ipc_pipe_unixsocket.h b/include/libcamera/internal/ipc_pipe_unixsocket.h
index 8c972613..280639d5 100644
--- a/include/libcamera/internal/ipc_pipe_unixsocket.h
+++ b/include/libcamera/internal/ipc_pipe_unixsocket.h
@@ -16,6 +16,7 @@ 
 namespace libcamera {
 
 class Process;
+class IPCUnixSocketWrapper;
 
 class IPCPipeUnixSocket : public IPCPipe
 {
@@ -29,18 +30,8 @@  public:
 	int sendAsync(const IPCMessage &data) override;
 
 private:
-	struct CallData {
-		IPCUnixSocket::Payload *response;
-		bool done;
-	};
-
-	void readyRead();
-	int call(const IPCUnixSocket::Payload &message,
-		 IPCUnixSocket::Payload *response, uint32_t seq);
-
 	std::unique_ptr<Process> proc_;
-	std::unique_ptr<IPCUnixSocket> socket_;
-	std::map<uint32_t, CallData> callData_;
+	std::unique_ptr<IPCUnixSocketWrapper> socketWrap_;
 };
 
 } /* namespace libcamera */
diff --git a/src/libcamera/ipc_pipe_unixsocket.cpp b/src/libcamera/ipc_pipe_unixsocket.cpp
index 668ec73b..eb5408d4 100644
--- a/src/libcamera/ipc_pipe_unixsocket.cpp
+++ b/src/libcamera/ipc_pipe_unixsocket.cpp
@@ -9,10 +9,9 @@ 
 
 #include <vector>
 
-#include <libcamera/base/event_dispatcher.h>
 #include <libcamera/base/log.h>
+#include <libcamera/base/mutex.h>
 #include <libcamera/base/thread.h>
-#include <libcamera/base/timer.h>
 
 #include "libcamera/internal/ipc_pipe.h"
 #include "libcamera/internal/ipc_unixsocket.h"
@@ -24,67 +23,161 @@  namespace libcamera {
 
 LOG_DECLARE_CATEGORY(IPCPipe)
 
-IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath,
-				     const char *ipaProxyWorkerPath)
-	: IPCPipe()
+class IPCUnixSocketWrapper : Thread
 {
-	std::vector<int> fds;
-	std::vector<std::string> args;
-	args.push_back(ipaModulePath);
+public:
+	IPCUnixSocketWrapper(Signal<const IPCMessage &> *recv)
+		: recv_(recv), ready_(false), sendSyncPending_(false),
+		  sendSyncCookie_(0)
+	{
+		start();
+	}
 
-	socket_ = std::make_unique<IPCUnixSocket>();
-	UniqueFD fd = socket_->create();
-	if (!fd.isValid()) {
-		LOG(IPCPipe, Error) << "Failed to create socket";
-		return;
+	~IPCUnixSocketWrapper()
+	{
+		exit();
+		wait();
 	}
-	socket_->readyRead.connect(this, &IPCPipeUnixSocket::readyRead);
-	args.push_back(std::to_string(fd.get()));
-	fds.push_back(fd.get());
 
-	proc_ = std::make_unique<Process>();
-	int ret = proc_->start(ipaProxyWorkerPath, args, fds);
-	if (ret) {
-		LOG(IPCPipe, Error)
-			<< "Failed to start proxy worker process";
-		return;
+	void run() override
+	{
+		/*
+		 * IPC socket construction and connection to its readyRead
+		 * signal has to be done from the IPC thread so that the
+		 * relevant Object instances (EventNotifier, slot) are bound to
+		 * its context.
+		 */
+		init();
+		exec();
+		deinit();
 	}
 
-	connected_ = true;
-}
+	int fd() { return fd_.get(); }
+	int sendSync(const IPCMessage &in, IPCMessage *out);
+	int sendAsync(const IPCMessage &data);
+	bool waitReady();
 
-IPCPipeUnixSocket::~IPCPipeUnixSocket()
-{
-}
+private:
+	void init();
+	void deinit();
+	void readyRead();
 
-int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out)
+	UniqueFD fd_;
+	Signal<const IPCMessage &> *recv_;
+	ConditionVariable cv_;
+	Mutex mutex_;
+	bool ready_;
+	bool sendSyncPending_;
+	uint32_t sendSyncCookie_;
+	IPCUnixSocket::Payload *sendSyncResponse_;
+
+	/* Socket shall be constructed and destructed from IPC thread context */
+	std::unique_ptr<IPCUnixSocket> socket_;
+};
+
+int IPCUnixSocketWrapper::sendSync(const IPCMessage &in, IPCMessage *out)
 {
+	int ret;
 	IPCUnixSocket::Payload response;
 
-	int ret = call(in.payload(), &response, in.header().cookie);
+	mutex_.lock();
+	ASSERT(!sendSyncPending_);
+	sendSyncPending_ = true;
+	sendSyncCookie_ = in.header().cookie;
+	sendSyncResponse_ = &response;
+	mutex_.unlock();
+
+	ret = socket_->send(in.payload());
 	if (ret) {
-		LOG(IPCPipe, Error) << "Failed to call sync";
-		return ret;
+		LOG(IPCPipe, Error) << "Failed to send sync message";
+		goto cleanup;
+	}
+
+	bool complete;
+	{
+		MutexLocker locker(mutex_);
+		auto syncComplete = ([&]() {
+			return sendSyncPending_ == false;
+		});
+		complete = cv_.wait_for(locker, 1000ms, syncComplete);
+	}
+
+	if (!complete) {
+		LOG(IPCPipe, Error) << "Timeout sending sync message";
+		ret = -ETIMEDOUT;
+		goto cleanup;
 	}
 
 	if (out)
 		*out = IPCMessage(response);
 
 	return 0;
+
+cleanup:
+	mutex_.lock();
+	sendSyncPending_ = false;
+	mutex_.unlock();
+
+	return ret;
 }
 
-int IPCPipeUnixSocket::sendAsync(const IPCMessage &data)
+int IPCUnixSocketWrapper::sendAsync(const IPCMessage &data)
 {
-	int ret = socket_->send(data.payload());
-	if (ret) {
-		LOG(IPCPipe, Error) << "Failed to call async";
-		return ret;
+	int ret;
+	ret = socket_->send(data.payload());
+	if (ret)
+		LOG(IPCPipe, Error) << "Failed to send sync message";
+	return ret;
+}
+
+bool IPCUnixSocketWrapper::waitReady()
+{
+	bool ready;
+	{
+		MutexLocker locker(mutex_);
+		auto isReady = ([&]() {
+			return ready_;
+		});
+		ready = cv_.wait_for(locker, 1000ms, isReady);
 	}
 
-	return 0;
+	return ready;
+}
+
+void IPCUnixSocketWrapper::init()
+{
+	/* Init is to be done from the IPC thread context */
+	ASSERT(Thread::current() == this);
+
+	socket_ = std::make_unique<IPCUnixSocket>();
+	fd_ = socket_->create();
+	if (!fd_.isValid()) {
+		LOG(IPCPipe, Error) << "Failed to create socket";
+		return;
+	}
+
+	socket_->readyRead.connect(this, &IPCUnixSocketWrapper::readyRead);
+
+	mutex_.lock();
+	ready_ = true;
+	mutex_.unlock();
+	cv_.notify_one();
 }
 
-void IPCPipeUnixSocket::readyRead()
+void IPCUnixSocketWrapper::deinit()
+{
+	/* Deinit is to be done from the IPC thread context */
+	ASSERT(Thread::current() == this);
+
+	socket_->readyRead.disconnect(this);
+	socket_.reset();
+
+	mutex_.lock();
+	ready_ = false;
+	mutex_.unlock();
+}
+
+void IPCUnixSocketWrapper::readyRead()
 {
 	IPCUnixSocket::Payload payload;
 	int ret = socket_->receive(&payload);
@@ -93,55 +186,72 @@  void IPCPipeUnixSocket::readyRead()
 		return;
 	}
 
-	/* \todo Use span to avoid the double copy when callData is found. */
 	if (payload.data.size() < sizeof(IPCMessage::Header)) {
 		LOG(IPCPipe, Error) << "Not enough data received";
 		return;
 	}
 
-	IPCMessage ipcMessage(payload);
+	const IPCMessage::Header *header =
+		reinterpret_cast<IPCMessage::Header *>(payload.data.data());
+	bool syncComplete = false;
+	mutex_.lock();
+	if (sendSyncPending_ && sendSyncCookie_ == header->cookie) {
+		syncComplete = true;
+		sendSyncPending_ = false;
+		*sendSyncResponse_ = std::move(payload);
+	}
+	mutex_.unlock();
 
-	auto callData = callData_.find(ipcMessage.header().cookie);
-	if (callData != callData_.end()) {
-		*callData->second.response = std::move(payload);
-		callData->second.done = true;
+	if (syncComplete) {
+		cv_.notify_one();
 		return;
 	}
 
 	/* Received unexpected data, this means it's a call from the IPA. */
-	recv.emit(ipcMessage);
+	IPCMessage ipcMessage(payload);
+	recv_->emit(ipcMessage);
 }
 
-int IPCPipeUnixSocket::call(const IPCUnixSocket::Payload &message,
-			    IPCUnixSocket::Payload *response, uint32_t cookie)
+IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath,
+				     const char *ipaProxyWorkerPath)
+	: IPCPipe()
 {
-	Timer timeout;
-	int ret;
+	socketWrap_ = std::make_unique<IPCUnixSocketWrapper>(&recv);
+	if (!socketWrap_->waitReady()) {
+		LOG(IPCPipe, Error) << "Failed to create socket";
+		return;
+	}
+	int fd = socketWrap_->fd();
 
-	const auto result = callData_.insert({ cookie, { response, false } });
-	const auto &iter = result.first;
+	std::vector<int> fds;
+	std::vector<std::string> args;
+	args.push_back(ipaModulePath);
+	args.push_back(std::to_string(fd));
+	fds.push_back(fd);
 
-	ret = socket_->send(message);
+	proc_ = std::make_unique<Process>();
+	int ret = proc_->start(ipaProxyWorkerPath, args, fds);
 	if (ret) {
-		callData_.erase(iter);
-		return ret;
+		LOG(IPCPipe, Error)
+			<< "Failed to start proxy worker process";
+		return;
 	}
 
-	/* \todo Make this less dangerous, see IPCPipe::sendSync() */
-	timeout.start(2000ms);
-	while (!iter->second.done) {
-		if (!timeout.isRunning()) {
-			LOG(IPCPipe, Error) << "Call timeout!";
-			callData_.erase(iter);
-			return -ETIMEDOUT;
-		}
+	connected_ = true;
+}
 
-		Thread::current()->eventDispatcher()->processEvents();
-	}
+IPCPipeUnixSocket::~IPCPipeUnixSocket()
+{
+}
 
-	callData_.erase(iter);
+int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out)
+{
+	return socketWrap_->sendSync(in, out);
+}
 
-	return 0;
+int IPCPipeUnixSocket::sendAsync(const IPCMessage &data)
+{
+	return socketWrap_->sendAsync(data);
 }
 
 } /* namespace libcamera */