libcamera: ipc_unixsocket: Fix sendSync() timeout and hang
diff mbox series

Message ID 20241220145556.3011657-1-julien.vuillaumier@nxp.com
State New
Headers show
Series
  • libcamera: ipc_unixsocket: Fix sendSync() timeout and hang
Related show

Commit Message

Julien Vuillaumier Dec. 20, 2024, 2:55 p.m. UTC
Unix-socket based IPC sometimes times out or hangs, typically
when multiple camera are stopped simulaneously. That specific case
triggers the concurrent sending by each pipeline handler instance
of a synchronous stop() message to its peer IPA process.

There is a dedicated IPC socket per camera. Sockets payload receipt
signals all run in the camera manager thread.
To send a synchronous message to IPA, pipeline invokes from camera
thread IPCPipeUnixSocket::sendSync(). This sends the message then blocks
busy waiting for the peer acknowledgment. Such busy wait is done by
blocking on event loop calling dispatchEvent(), until the ack condition
is detected.

One issue is that the socket receive slot readyRead() wakes up the
blocked thread via libcamera::Message receipt. Even though such message
resumes processEvents(), it may reblock immediately because readyRead()
does not interrupt() explictly the dispatcher.
Most of the time, an other pending event for the thread unblocks the
event dispatcher and the ack condition is detected - in worst case the 2
sec timeout kicks in. Once unblocked, the dispatcher let the message
acknowledgment to be detected and the sendSync() completes.

The other issue is that in case of concurrent synchronous IPC messages
sent by multiple pipeline handlers, there is a possible recursion
of sendSync() / processEvents() nested in the camera thread stack. As
commented in the source, that is a dangerous construct that can lead to
a hang.
The reason is that the last synchronous message sent is the deepest in
the stack. It is also the one whose acknowledgment is being busy waited.
However other pending synchronous messages may have been initiated before
and are upper in the stack. If they timeout, the condition is not
detected because of the stack recursion, as the thread is busy waiting
for the last message to be acknowledged.

This change implements a safer mechanism to handle the synchronous
message sending, similar to the one used for non isolated IPA. The
IPCUnixSocketWrapper class is introduced to handle the IPCUnixSocket
receive signal in a dedicated thread.
Doing so, the sending thread, when emiting a synchronous message, can be
blocked without event dispatcher's processEvents() usage, which avoids
the risky stack recursion.

Fixes: 21f1b555b ("libcamera: Add IPCPipe implementation based on unix socket")

Signed-off-by: Julien Vuillaumier <julien.vuillaumier@nxp.com>
---
 .../libcamera/internal/ipc_pipe_unixsocket.h  |  13 +-
 src/libcamera/ipc_pipe_unixsocket.cpp         | 242 +++++++++++++-----
 2 files changed, 178 insertions(+), 77 deletions(-)

Comments

Kieran Bingham July 21, 2025, 6:08 p.m. UTC | #1
Hi Julien,

I've just seen this in my backlog.

It's ... a substantial change, so it's hard to digest. Is there anything
we can break down here?

Are you running this change at NXP ? 

Quoting Julien Vuillaumier (2024-12-20 14:55:56)
> Unix-socket based IPC sometimes times out or hangs, typically
> when multiple camera are stopped simulaneously. That specific case
> triggers the concurrent sending by each pipeline handler instance
> of a synchronous stop() message to its peer IPA process.
> 
> There is a dedicated IPC socket per camera. Sockets payload receipt
> signals all run in the camera manager thread.
> To send a synchronous message to IPA, pipeline invokes from camera
> thread IPCPipeUnixSocket::sendSync(). This sends the message then blocks
> busy waiting for the peer acknowledgment. Such busy wait is done by
> blocking on event loop calling dispatchEvent(), until the ack condition
> is detected.
> 
> One issue is that the socket receive slot readyRead() wakes up the
> blocked thread via libcamera::Message receipt. Even though such message
> resumes processEvents(), it may reblock immediately because readyRead()
> does not interrupt() explictly the dispatcher.
> Most of the time, an other pending event for the thread unblocks the
> event dispatcher and the ack condition is detected - in worst case the 2
> sec timeout kicks in. Once unblocked, the dispatcher let the message
> acknowledgment to be detected and the sendSync() completes.
> 
> The other issue is that in case of concurrent synchronous IPC messages
> sent by multiple pipeline handlers, there is a possible recursion
> of sendSync() / processEvents() nested in the camera thread stack. As
> commented in the source, that is a dangerous construct that can lead to
> a hang.

I'm not sure I fully understand, - do you have multiple cameras using a
Single IPA - or multiple cameras using multiple IPAs using a single
pipeline handler perhaps?



> The reason is that the last synchronous message sent is the deepest in
> the stack. It is also the one whose acknowledgment is being busy waited.
> However other pending synchronous messages may have been initiated before
> and are upper in the stack. If they timeout, the condition is not
> detected because of the stack recursion, as the thread is busy waiting
> for the last message to be acknowledged.
> 
> This change implements a safer mechanism to handle the synchronous
> message sending, similar to the one used for non isolated IPA. The
> IPCUnixSocketWrapper class is introduced to handle the IPCUnixSocket
> receive signal in a dedicated thread.
> Doing so, the sending thread, when emiting a synchronous message, can be
> blocked without event dispatcher's processEvents() usage, which avoids
> the risky stack recursion.
> 
> Fixes: 21f1b555b ("libcamera: Add IPCPipe implementation based on unix socket")
> 

Paul - you were the author of the commit referenced above - could you
check through this patch please?

> Signed-off-by: Julien Vuillaumier <julien.vuillaumier@nxp.com>
> ---
>  .../libcamera/internal/ipc_pipe_unixsocket.h  |  13 +-
>  src/libcamera/ipc_pipe_unixsocket.cpp         | 242 +++++++++++++-----
>  2 files changed, 178 insertions(+), 77 deletions(-)
> 
> diff --git a/include/libcamera/internal/ipc_pipe_unixsocket.h b/include/libcamera/internal/ipc_pipe_unixsocket.h
> index 8c972613..280639d5 100644
> --- a/include/libcamera/internal/ipc_pipe_unixsocket.h
> +++ b/include/libcamera/internal/ipc_pipe_unixsocket.h
> @@ -16,6 +16,7 @@
>  namespace libcamera {
>  
>  class Process;
> +class IPCUnixSocketWrapper;
>  
>  class IPCPipeUnixSocket : public IPCPipe
>  {
> @@ -29,18 +30,8 @@ public:
>         int sendAsync(const IPCMessage &data) override;
>  
>  private:
> -       struct CallData {
> -               IPCUnixSocket::Payload *response;
> -               bool done;
> -       };
> -
> -       void readyRead();
> -       int call(const IPCUnixSocket::Payload &message,
> -                IPCUnixSocket::Payload *response, uint32_t seq);
> -
>         std::unique_ptr<Process> proc_;
> -       std::unique_ptr<IPCUnixSocket> socket_;
> -       std::map<uint32_t, CallData> callData_;
> +       std::unique_ptr<IPCUnixSocketWrapper> socketWrap_;
>  };
>  
>  } /* namespace libcamera */
> diff --git a/src/libcamera/ipc_pipe_unixsocket.cpp b/src/libcamera/ipc_pipe_unixsocket.cpp
> index 668ec73b..eb5408d4 100644
> --- a/src/libcamera/ipc_pipe_unixsocket.cpp
> +++ b/src/libcamera/ipc_pipe_unixsocket.cpp
> @@ -9,10 +9,9 @@
>  
>  #include <vector>
>  
> -#include <libcamera/base/event_dispatcher.h>
>  #include <libcamera/base/log.h>
> +#include <libcamera/base/mutex.h>
>  #include <libcamera/base/thread.h>
> -#include <libcamera/base/timer.h>
>  
>  #include "libcamera/internal/ipc_pipe.h"
>  #include "libcamera/internal/ipc_unixsocket.h"
> @@ -24,67 +23,161 @@ namespace libcamera {
>  
>  LOG_DECLARE_CATEGORY(IPCPipe)
>  
> -IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath,
> -                                    const char *ipaProxyWorkerPath)
> -       : IPCPipe()
> +class IPCUnixSocketWrapper : Thread

I'm a bit weary of the concept of 'wrapping' an IPCUnixSocket. Doesn't
that simply mean IPCUnixSocket should be re-implemented ? Why do we
wrap it ?

(Sorry - big complicated change - trying to understand as I go through
...)

>  {
> -       std::vector<int> fds;
> -       std::vector<std::string> args;
> -       args.push_back(ipaModulePath);
> +public:
> +       IPCUnixSocketWrapper(Signal<const IPCMessage &> *recv)
> +               : recv_(recv), ready_(false), sendSyncPending_(false),
> +                 sendSyncCookie_(0)
> +       {
> +               start();
> +       }
>  
> -       socket_ = std::make_unique<IPCUnixSocket>();
> -       UniqueFD fd = socket_->create();
> -       if (!fd.isValid()) {
> -               LOG(IPCPipe, Error) << "Failed to create socket";
> -               return;
> +       ~IPCUnixSocketWrapper()
> +       {
> +               exit();
> +               wait();
>         }
> -       socket_->readyRead.connect(this, &IPCPipeUnixSocket::readyRead);
> -       args.push_back(std::to_string(fd.get()));
> -       fds.push_back(fd.get());
>  
> -       proc_ = std::make_unique<Process>();
> -       int ret = proc_->start(ipaProxyWorkerPath, args, fds);
> -       if (ret) {
> -               LOG(IPCPipe, Error)
> -                       << "Failed to start proxy worker process";
> -               return;
> +       void run() override
> +       {
> +               /*
> +                * IPC socket construction and connection to its readyRead
> +                * signal has to be done from the IPC thread so that the
> +                * relevant Object instances (EventNotifier, slot) are bound to
> +                * its context.
> +                */
> +               init();
> +               exec();
> +               deinit();
>         }
>  
> -       connected_ = true;
> -}
> +       int fd() { return fd_.get(); }
> +       int sendSync(const IPCMessage &in, IPCMessage *out);
> +       int sendAsync(const IPCMessage &data);
> +       bool waitReady();
>  
> -IPCPipeUnixSocket::~IPCPipeUnixSocket()
> -{
> -}
> +private:
> +       void init();
> +       void deinit();
> +       void readyRead();
>  
> -int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out)
> +       UniqueFD fd_;
> +       Signal<const IPCMessage &> *recv_;
> +       ConditionVariable cv_;
> +       Mutex mutex_;
> +       bool ready_;
> +       bool sendSyncPending_;
> +       uint32_t sendSyncCookie_;
> +       IPCUnixSocket::Payload *sendSyncResponse_;
> +
> +       /* Socket shall be constructed and destructed from IPC thread context */
> +       std::unique_ptr<IPCUnixSocket> socket_;
> +};
> +
> +int IPCUnixSocketWrapper::sendSync(const IPCMessage &in, IPCMessage *out)
>  {
> +       int ret;
>         IPCUnixSocket::Payload response;
>  
> -       int ret = call(in.payload(), &response, in.header().cookie);
> +       mutex_.lock();
> +       ASSERT(!sendSyncPending_);
> +       sendSyncPending_ = true;
> +       sendSyncCookie_ = in.header().cookie;
> +       sendSyncResponse_ = &response;
> +       mutex_.unlock();
> +
> +       ret = socket_->send(in.payload());
>         if (ret) {
> -               LOG(IPCPipe, Error) << "Failed to call sync";
> -               return ret;
> +               LOG(IPCPipe, Error) << "Failed to send sync message";
> +               goto cleanup;
> +       }
> +
> +       bool complete;
> +       {
> +               MutexLocker locker(mutex_);
> +               auto syncComplete = ([&]() {
> +                       return sendSyncPending_ == false;
> +               });
> +               complete = cv_.wait_for(locker, 1000ms, syncComplete);
> +       }
> +
> +       if (!complete) {
> +               LOG(IPCPipe, Error) << "Timeout sending sync message";
> +               ret = -ETIMEDOUT;
> +               goto cleanup;
>         }
>  
>         if (out)
>                 *out = IPCMessage(response);
>  
>         return 0;
> +
> +cleanup:
> +       mutex_.lock();
> +       sendSyncPending_ = false;
> +       mutex_.unlock();
> +
> +       return ret;
>  }
>  
> -int IPCPipeUnixSocket::sendAsync(const IPCMessage &data)
> +int IPCUnixSocketWrapper::sendAsync(const IPCMessage &data)
>  {
> -       int ret = socket_->send(data.payload());
> -       if (ret) {
> -               LOG(IPCPipe, Error) << "Failed to call async";
> -               return ret;
> +       int ret;
> +       ret = socket_->send(data.payload());
> +       if (ret)
> +               LOG(IPCPipe, Error) << "Failed to send sync message";
> +       return ret;
> +}
> +
> +bool IPCUnixSocketWrapper::waitReady()
> +{
> +       bool ready;
> +       {
> +               MutexLocker locker(mutex_);
> +               auto isReady = ([&]() {
> +                       return ready_;
> +               });
> +               ready = cv_.wait_for(locker, 1000ms, isReady);
>         }
>  
> -       return 0;
> +       return ready;
> +}
> +
> +void IPCUnixSocketWrapper::init()
> +{
> +       /* Init is to be done from the IPC thread context */
> +       ASSERT(Thread::current() == this);
> +
> +       socket_ = std::make_unique<IPCUnixSocket>();
> +       fd_ = socket_->create();
> +       if (!fd_.isValid()) {
> +               LOG(IPCPipe, Error) << "Failed to create socket";
> +               return;
> +       }
> +
> +       socket_->readyRead.connect(this, &IPCUnixSocketWrapper::readyRead);
> +
> +       mutex_.lock();
> +       ready_ = true;
> +       mutex_.unlock();
> +       cv_.notify_one();
>  }
>  
> -void IPCPipeUnixSocket::readyRead()
> +void IPCUnixSocketWrapper::deinit()
> +{
> +       /* Deinit is to be done from the IPC thread context */
> +       ASSERT(Thread::current() == this);
> +
> +       socket_->readyRead.disconnect(this);
> +       socket_.reset();
> +
> +       mutex_.lock();
> +       ready_ = false;
> +       mutex_.unlock();
> +}
> +
> +void IPCUnixSocketWrapper::readyRead()
>  {
>         IPCUnixSocket::Payload payload;
>         int ret = socket_->receive(&payload);
> @@ -93,55 +186,72 @@ void IPCPipeUnixSocket::readyRead()
>                 return;
>         }
>  
> -       /* \todo Use span to avoid the double copy when callData is found. */

Is this todo handled by the patch or made redundant by the patch ?

>         if (payload.data.size() < sizeof(IPCMessage::Header)) {
>                 LOG(IPCPipe, Error) << "Not enough data received";
>                 return;
>         }
>  
> -       IPCMessage ipcMessage(payload);
> +       const IPCMessage::Header *header =
> +               reinterpret_cast<IPCMessage::Header *>(payload.data.data());
> +       bool syncComplete = false;
> +       mutex_.lock();
> +       if (sendSyncPending_ && sendSyncCookie_ == header->cookie) {
> +               syncComplete = true;
> +               sendSyncPending_ = false;
> +               *sendSyncResponse_ = std::move(payload);
> +       }
> +       mutex_.unlock();
>  
> -       auto callData = callData_.find(ipcMessage.header().cookie);
> -       if (callData != callData_.end()) {
> -               *callData->second.response = std::move(payload);
> -               callData->second.done = true;
> +       if (syncComplete) {
> +               cv_.notify_one();
>                 return;
>         }
>  
>         /* Received unexpected data, this means it's a call from the IPA. */
> -       recv.emit(ipcMessage);
> +       IPCMessage ipcMessage(payload);
> +       recv_->emit(ipcMessage);
>  }
>  
> -int IPCPipeUnixSocket::call(const IPCUnixSocket::Payload &message,
> -                           IPCUnixSocket::Payload *response, uint32_t cookie)
> +IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath,
> +                                    const char *ipaProxyWorkerPath)
> +       : IPCPipe()
>  {
> -       Timer timeout;
> -       int ret;
> +       socketWrap_ = std::make_unique<IPCUnixSocketWrapper>(&recv);
> +       if (!socketWrap_->waitReady()) {
> +               LOG(IPCPipe, Error) << "Failed to create socket";
> +               return;
> +       }
> +       int fd = socketWrap_->fd();
>  
> -       const auto result = callData_.insert({ cookie, { response, false } });
> -       const auto &iter = result.first;
> +       std::vector<int> fds;
> +       std::vector<std::string> args;
> +       args.push_back(ipaModulePath);
> +       args.push_back(std::to_string(fd));
> +       fds.push_back(fd);
>  
> -       ret = socket_->send(message);
> +       proc_ = std::make_unique<Process>();
> +       int ret = proc_->start(ipaProxyWorkerPath, args, fds);
>         if (ret) {
> -               callData_.erase(iter);
> -               return ret;
> +               LOG(IPCPipe, Error)
> +                       << "Failed to start proxy worker process";
> +               return;
>         }
>  
> -       /* \todo Make this less dangerous, see IPCPipe::sendSync() */
> -       timeout.start(2000ms);
> -       while (!iter->second.done) {
> -               if (!timeout.isRunning()) {
> -                       LOG(IPCPipe, Error) << "Call timeout!";
> -                       callData_.erase(iter);
> -                       return -ETIMEDOUT;
> -               }
> +       connected_ = true;
> +}
>  
> -               Thread::current()->eventDispatcher()->processEvents();
> -       }
> +IPCPipeUnixSocket::~IPCPipeUnixSocket()
> +{
> +}
>  
> -       callData_.erase(iter);
> +int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out)
> +{
> +       return socketWrap_->sendSync(in, out);
> +}
>  
> -       return 0;
> +int IPCPipeUnixSocket::sendAsync(const IPCMessage &data)
> +{
> +       return socketWrap_->sendAsync(data);
>  }
>  
>  } /* namespace libcamera */
> -- 
> 2.34.1
>
Julien Vuillaumier July 22, 2025, 2:12 p.m. UTC | #2
Hi Kieran,

On 21/07/2025 20:08, Kieran Bingham wrote:
> Hi Julien,
> 
> I've just seen this in my backlog.
> 
> It's ... a substantial change, so it's hard to digest. Is there anything
> we can break down here?

This change introduces the IPCUnixSocketWrapper class to move a part of 
the existing IPCPipeUnixSocket class implementation, the IPC socket 
receive path, in a dedicated thread instead of running it in the camera 
manager thread. Purpose is to be able to properly block the camera 
manager thread execution when sending a synchronous message over IPC. 
The camera manager thread is later resumed by this new thread that 
detects the completion of the sync message sending.

Existing implementation had to be redistributed between the two classes.
The change may look substantial but I don't see how to break it down :/

> 
> Are you running this change at NXP ?

This change is present in the NXP BSP.
It was introduced because of IPC hangs experienced during multi-cameras 
operation, with a third-party IPA running in isolated mode.

Root cause for that issue is a bit intricate. Background is given in the 
commit message - message length was limited to keep it readable though.

> 
> Quoting Julien Vuillaumier (2024-12-20 14:55:56)
>> Unix-socket based IPC sometimes times out or hangs, typically
>> when multiple camera are stopped simulaneously. That specific case
>> triggers the concurrent sending by each pipeline handler instance
>> of a synchronous stop() message to its peer IPA process.
>>
>> There is a dedicated IPC socket per camera. Sockets payload receipt
>> signals all run in the camera manager thread.
>> To send a synchronous message to IPA, pipeline invokes from camera
>> thread IPCPipeUnixSocket::sendSync(). This sends the message then blocks
>> busy waiting for the peer acknowledgment. Such busy wait is done by
>> blocking on event loop calling dispatchEvent(), until the ack condition
>> is detected.
>>
>> One issue is that the socket receive slot readyRead() wakes up the
>> blocked thread via libcamera::Message receipt. Even though such message
>> resumes processEvents(), it may reblock immediately because readyRead()
>> does not interrupt() explictly the dispatcher.
>> Most of the time, an other pending event for the thread unblocks the
>> event dispatcher and the ack condition is detected - in worst case the 2
>> sec timeout kicks in. Once unblocked, the dispatcher let the message
>> acknowledgment to be detected and the sendSync() completes.
>>
>> The other issue is that in case of concurrent synchronous IPC messages
>> sent by multiple pipeline handlers, there is a possible recursion
>> of sendSync() / processEvents() nested in the camera thread stack. As
>> commented in the source, that is a dangerous construct that can lead to
>> a hang.
> 
> I'm not sure I fully understand, - do you have multiple cameras using a
> Single IPA - or multiple cameras using multiple IPAs using a single
> pipeline handler perhaps?

Wording is not very clear indeed, my apologies for this.

Conditions where this issue can be seen is with a pipeline handler 
instance in charge of multiple cameras (same libcamera process/thread). 
Each camera is interfacing with its own instance of the same IPA. Each 
IPA instance is running in isolated mode (third-party control algorithms).
Typical occurrence of the hang is when application is stopping all the 
cameras simultaneously: pipeline handler, from the camera thread 
context, has to send synchronous IPC stop messages to each IPA instance 
and handle concurrently the completion of those messages.

> 
>> The reason is that the last synchronous message sent is the deepest in
>> the stack. It is also the one whose acknowledgment is being busy waited.
>> However other pending synchronous messages may have been initiated before
>> and are upper in the stack. If they timeout, the condition is not
>> detected because of the stack recursion, as the thread is busy waiting
>> for the last message to be acknowledged.
>>
>> This change implements a safer mechanism to handle the synchronous
>> message sending, similar to the one used for non isolated IPA. The
>> IPCUnixSocketWrapper class is introduced to handle the IPCUnixSocket
>> receive signal in a dedicated thread.
>> Doing so, the sending thread, when emiting a synchronous message, can be
>> blocked without event dispatcher's processEvents() usage, which avoids
>> the risky stack recursion.
>>
>> Fixes: 21f1b555b ("libcamera: Add IPCPipe implementation based on unix socket")
>>
> 
> Paul - you were the author of the commit referenced above - could you
> check through this patch please?
> 

Let me know if there's something I can do to help with the review of 
this change.

Thanks,
Julien

Patch
diff mbox series

diff --git a/include/libcamera/internal/ipc_pipe_unixsocket.h b/include/libcamera/internal/ipc_pipe_unixsocket.h
index 8c972613..280639d5 100644
--- a/include/libcamera/internal/ipc_pipe_unixsocket.h
+++ b/include/libcamera/internal/ipc_pipe_unixsocket.h
@@ -16,6 +16,7 @@ 
 namespace libcamera {
 
 class Process;
+class IPCUnixSocketWrapper;
 
 class IPCPipeUnixSocket : public IPCPipe
 {
@@ -29,18 +30,8 @@  public:
 	int sendAsync(const IPCMessage &data) override;
 
 private:
-	struct CallData {
-		IPCUnixSocket::Payload *response;
-		bool done;
-	};
-
-	void readyRead();
-	int call(const IPCUnixSocket::Payload &message,
-		 IPCUnixSocket::Payload *response, uint32_t seq);
-
 	std::unique_ptr<Process> proc_;
-	std::unique_ptr<IPCUnixSocket> socket_;
-	std::map<uint32_t, CallData> callData_;
+	std::unique_ptr<IPCUnixSocketWrapper> socketWrap_;
 };
 
 } /* namespace libcamera */
diff --git a/src/libcamera/ipc_pipe_unixsocket.cpp b/src/libcamera/ipc_pipe_unixsocket.cpp
index 668ec73b..eb5408d4 100644
--- a/src/libcamera/ipc_pipe_unixsocket.cpp
+++ b/src/libcamera/ipc_pipe_unixsocket.cpp
@@ -9,10 +9,9 @@ 
 
 #include <vector>
 
-#include <libcamera/base/event_dispatcher.h>
 #include <libcamera/base/log.h>
+#include <libcamera/base/mutex.h>
 #include <libcamera/base/thread.h>
-#include <libcamera/base/timer.h>
 
 #include "libcamera/internal/ipc_pipe.h"
 #include "libcamera/internal/ipc_unixsocket.h"
@@ -24,67 +23,161 @@  namespace libcamera {
 
 LOG_DECLARE_CATEGORY(IPCPipe)
 
-IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath,
-				     const char *ipaProxyWorkerPath)
-	: IPCPipe()
+class IPCUnixSocketWrapper : Thread
 {
-	std::vector<int> fds;
-	std::vector<std::string> args;
-	args.push_back(ipaModulePath);
+public:
+	IPCUnixSocketWrapper(Signal<const IPCMessage &> *recv)
+		: recv_(recv), ready_(false), sendSyncPending_(false),
+		  sendSyncCookie_(0)
+	{
+		start();
+	}
 
-	socket_ = std::make_unique<IPCUnixSocket>();
-	UniqueFD fd = socket_->create();
-	if (!fd.isValid()) {
-		LOG(IPCPipe, Error) << "Failed to create socket";
-		return;
+	~IPCUnixSocketWrapper()
+	{
+		exit();
+		wait();
 	}
-	socket_->readyRead.connect(this, &IPCPipeUnixSocket::readyRead);
-	args.push_back(std::to_string(fd.get()));
-	fds.push_back(fd.get());
 
-	proc_ = std::make_unique<Process>();
-	int ret = proc_->start(ipaProxyWorkerPath, args, fds);
-	if (ret) {
-		LOG(IPCPipe, Error)
-			<< "Failed to start proxy worker process";
-		return;
+	void run() override
+	{
+		/*
+		 * IPC socket construction and connection to its readyRead
+		 * signal has to be done from the IPC thread so that the
+		 * relevant Object instances (EventNotifier, slot) are bound to
+		 * its context.
+		 */
+		init();
+		exec();
+		deinit();
 	}
 
-	connected_ = true;
-}
+	int fd() { return fd_.get(); }
+	int sendSync(const IPCMessage &in, IPCMessage *out);
+	int sendAsync(const IPCMessage &data);
+	bool waitReady();
 
-IPCPipeUnixSocket::~IPCPipeUnixSocket()
-{
-}
+private:
+	void init();
+	void deinit();
+	void readyRead();
 
-int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out)
+	UniqueFD fd_;
+	Signal<const IPCMessage &> *recv_;
+	ConditionVariable cv_;
+	Mutex mutex_;
+	bool ready_;
+	bool sendSyncPending_;
+	uint32_t sendSyncCookie_;
+	IPCUnixSocket::Payload *sendSyncResponse_;
+
+	/* Socket shall be constructed and destructed from IPC thread context */
+	std::unique_ptr<IPCUnixSocket> socket_;
+};
+
+int IPCUnixSocketWrapper::sendSync(const IPCMessage &in, IPCMessage *out)
 {
+	int ret;
 	IPCUnixSocket::Payload response;
 
-	int ret = call(in.payload(), &response, in.header().cookie);
+	mutex_.lock();
+	ASSERT(!sendSyncPending_);
+	sendSyncPending_ = true;
+	sendSyncCookie_ = in.header().cookie;
+	sendSyncResponse_ = &response;
+	mutex_.unlock();
+
+	ret = socket_->send(in.payload());
 	if (ret) {
-		LOG(IPCPipe, Error) << "Failed to call sync";
-		return ret;
+		LOG(IPCPipe, Error) << "Failed to send sync message";
+		goto cleanup;
+	}
+
+	bool complete;
+	{
+		MutexLocker locker(mutex_);
+		auto syncComplete = ([&]() {
+			return sendSyncPending_ == false;
+		});
+		complete = cv_.wait_for(locker, 1000ms, syncComplete);
+	}
+
+	if (!complete) {
+		LOG(IPCPipe, Error) << "Timeout sending sync message";
+		ret = -ETIMEDOUT;
+		goto cleanup;
 	}
 
 	if (out)
 		*out = IPCMessage(response);
 
 	return 0;
+
+cleanup:
+	mutex_.lock();
+	sendSyncPending_ = false;
+	mutex_.unlock();
+
+	return ret;
 }
 
-int IPCPipeUnixSocket::sendAsync(const IPCMessage &data)
+int IPCUnixSocketWrapper::sendAsync(const IPCMessage &data)
 {
-	int ret = socket_->send(data.payload());
-	if (ret) {
-		LOG(IPCPipe, Error) << "Failed to call async";
-		return ret;
+	int ret;
+	ret = socket_->send(data.payload());
+	if (ret)
+		LOG(IPCPipe, Error) << "Failed to send sync message";
+	return ret;
+}
+
+bool IPCUnixSocketWrapper::waitReady()
+{
+	bool ready;
+	{
+		MutexLocker locker(mutex_);
+		auto isReady = ([&]() {
+			return ready_;
+		});
+		ready = cv_.wait_for(locker, 1000ms, isReady);
 	}
 
-	return 0;
+	return ready;
+}
+
+void IPCUnixSocketWrapper::init()
+{
+	/* Init is to be done from the IPC thread context */
+	ASSERT(Thread::current() == this);
+
+	socket_ = std::make_unique<IPCUnixSocket>();
+	fd_ = socket_->create();
+	if (!fd_.isValid()) {
+		LOG(IPCPipe, Error) << "Failed to create socket";
+		return;
+	}
+
+	socket_->readyRead.connect(this, &IPCUnixSocketWrapper::readyRead);
+
+	mutex_.lock();
+	ready_ = true;
+	mutex_.unlock();
+	cv_.notify_one();
 }
 
-void IPCPipeUnixSocket::readyRead()
+void IPCUnixSocketWrapper::deinit()
+{
+	/* Deinit is to be done from the IPC thread context */
+	ASSERT(Thread::current() == this);
+
+	socket_->readyRead.disconnect(this);
+	socket_.reset();
+
+	mutex_.lock();
+	ready_ = false;
+	mutex_.unlock();
+}
+
+void IPCUnixSocketWrapper::readyRead()
 {
 	IPCUnixSocket::Payload payload;
 	int ret = socket_->receive(&payload);
@@ -93,55 +186,72 @@  void IPCPipeUnixSocket::readyRead()
 		return;
 	}
 
-	/* \todo Use span to avoid the double copy when callData is found. */
 	if (payload.data.size() < sizeof(IPCMessage::Header)) {
 		LOG(IPCPipe, Error) << "Not enough data received";
 		return;
 	}
 
-	IPCMessage ipcMessage(payload);
+	const IPCMessage::Header *header =
+		reinterpret_cast<IPCMessage::Header *>(payload.data.data());
+	bool syncComplete = false;
+	mutex_.lock();
+	if (sendSyncPending_ && sendSyncCookie_ == header->cookie) {
+		syncComplete = true;
+		sendSyncPending_ = false;
+		*sendSyncResponse_ = std::move(payload);
+	}
+	mutex_.unlock();
 
-	auto callData = callData_.find(ipcMessage.header().cookie);
-	if (callData != callData_.end()) {
-		*callData->second.response = std::move(payload);
-		callData->second.done = true;
+	if (syncComplete) {
+		cv_.notify_one();
 		return;
 	}
 
 	/* Received unexpected data, this means it's a call from the IPA. */
-	recv.emit(ipcMessage);
+	IPCMessage ipcMessage(payload);
+	recv_->emit(ipcMessage);
 }
 
-int IPCPipeUnixSocket::call(const IPCUnixSocket::Payload &message,
-			    IPCUnixSocket::Payload *response, uint32_t cookie)
+IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath,
+				     const char *ipaProxyWorkerPath)
+	: IPCPipe()
 {
-	Timer timeout;
-	int ret;
+	socketWrap_ = std::make_unique<IPCUnixSocketWrapper>(&recv);
+	if (!socketWrap_->waitReady()) {
+		LOG(IPCPipe, Error) << "Failed to create socket";
+		return;
+	}
+	int fd = socketWrap_->fd();
 
-	const auto result = callData_.insert({ cookie, { response, false } });
-	const auto &iter = result.first;
+	std::vector<int> fds;
+	std::vector<std::string> args;
+	args.push_back(ipaModulePath);
+	args.push_back(std::to_string(fd));
+	fds.push_back(fd);
 
-	ret = socket_->send(message);
+	proc_ = std::make_unique<Process>();
+	int ret = proc_->start(ipaProxyWorkerPath, args, fds);
 	if (ret) {
-		callData_.erase(iter);
-		return ret;
+		LOG(IPCPipe, Error)
+			<< "Failed to start proxy worker process";
+		return;
 	}
 
-	/* \todo Make this less dangerous, see IPCPipe::sendSync() */
-	timeout.start(2000ms);
-	while (!iter->second.done) {
-		if (!timeout.isRunning()) {
-			LOG(IPCPipe, Error) << "Call timeout!";
-			callData_.erase(iter);
-			return -ETIMEDOUT;
-		}
+	connected_ = true;
+}
 
-		Thread::current()->eventDispatcher()->processEvents();
-	}
+IPCPipeUnixSocket::~IPCPipeUnixSocket()
+{
+}
 
-	callData_.erase(iter);
+int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out)
+{
+	return socketWrap_->sendSync(in, out);
+}
 
-	return 0;
+int IPCPipeUnixSocket::sendAsync(const IPCMessage &data)
+{
+	return socketWrap_->sendAsync(data);
 }
 
 } /* namespace libcamera */