Message ID | 20241220145556.3011657-1-julien.vuillaumier@nxp.com |
---|---|
State | New |
Headers | show |
Series |
|
Related | show |
Hi Julien, I've just seen this in my backlog. It's ... a substantial change, so it's hard to digest. Is there anything we can break down here? Are you running this change at NXP ? Quoting Julien Vuillaumier (2024-12-20 14:55:56) > Unix-socket based IPC sometimes times out or hangs, typically > when multiple camera are stopped simulaneously. That specific case > triggers the concurrent sending by each pipeline handler instance > of a synchronous stop() message to its peer IPA process. > > There is a dedicated IPC socket per camera. Sockets payload receipt > signals all run in the camera manager thread. > To send a synchronous message to IPA, pipeline invokes from camera > thread IPCPipeUnixSocket::sendSync(). This sends the message then blocks > busy waiting for the peer acknowledgment. Such busy wait is done by > blocking on event loop calling dispatchEvent(), until the ack condition > is detected. > > One issue is that the socket receive slot readyRead() wakes up the > blocked thread via libcamera::Message receipt. Even though such message > resumes processEvents(), it may reblock immediately because readyRead() > does not interrupt() explictly the dispatcher. > Most of the time, an other pending event for the thread unblocks the > event dispatcher and the ack condition is detected - in worst case the 2 > sec timeout kicks in. Once unblocked, the dispatcher let the message > acknowledgment to be detected and the sendSync() completes. > > The other issue is that in case of concurrent synchronous IPC messages > sent by multiple pipeline handlers, there is a possible recursion > of sendSync() / processEvents() nested in the camera thread stack. As > commented in the source, that is a dangerous construct that can lead to > a hang. I'm not sure I fully understand, - do you have multiple cameras using a Single IPA - or multiple cameras using multiple IPAs using a single pipeline handler perhaps? > The reason is that the last synchronous message sent is the deepest in > the stack. It is also the one whose acknowledgment is being busy waited. > However other pending synchronous messages may have been initiated before > and are upper in the stack. If they timeout, the condition is not > detected because of the stack recursion, as the thread is busy waiting > for the last message to be acknowledged. > > This change implements a safer mechanism to handle the synchronous > message sending, similar to the one used for non isolated IPA. The > IPCUnixSocketWrapper class is introduced to handle the IPCUnixSocket > receive signal in a dedicated thread. > Doing so, the sending thread, when emiting a synchronous message, can be > blocked without event dispatcher's processEvents() usage, which avoids > the risky stack recursion. > > Fixes: 21f1b555b ("libcamera: Add IPCPipe implementation based on unix socket") > Paul - you were the author of the commit referenced above - could you check through this patch please? > Signed-off-by: Julien Vuillaumier <julien.vuillaumier@nxp.com> > --- > .../libcamera/internal/ipc_pipe_unixsocket.h | 13 +- > src/libcamera/ipc_pipe_unixsocket.cpp | 242 +++++++++++++----- > 2 files changed, 178 insertions(+), 77 deletions(-) > > diff --git a/include/libcamera/internal/ipc_pipe_unixsocket.h b/include/libcamera/internal/ipc_pipe_unixsocket.h > index 8c972613..280639d5 100644 > --- a/include/libcamera/internal/ipc_pipe_unixsocket.h > +++ b/include/libcamera/internal/ipc_pipe_unixsocket.h > @@ -16,6 +16,7 @@ > namespace libcamera { > > class Process; > +class IPCUnixSocketWrapper; > > class IPCPipeUnixSocket : public IPCPipe > { > @@ -29,18 +30,8 @@ public: > int sendAsync(const IPCMessage &data) override; > > private: > - struct CallData { > - IPCUnixSocket::Payload *response; > - bool done; > - }; > - > - void readyRead(); > - int call(const IPCUnixSocket::Payload &message, > - IPCUnixSocket::Payload *response, uint32_t seq); > - > std::unique_ptr<Process> proc_; > - std::unique_ptr<IPCUnixSocket> socket_; > - std::map<uint32_t, CallData> callData_; > + std::unique_ptr<IPCUnixSocketWrapper> socketWrap_; > }; > > } /* namespace libcamera */ > diff --git a/src/libcamera/ipc_pipe_unixsocket.cpp b/src/libcamera/ipc_pipe_unixsocket.cpp > index 668ec73b..eb5408d4 100644 > --- a/src/libcamera/ipc_pipe_unixsocket.cpp > +++ b/src/libcamera/ipc_pipe_unixsocket.cpp > @@ -9,10 +9,9 @@ > > #include <vector> > > -#include <libcamera/base/event_dispatcher.h> > #include <libcamera/base/log.h> > +#include <libcamera/base/mutex.h> > #include <libcamera/base/thread.h> > -#include <libcamera/base/timer.h> > > #include "libcamera/internal/ipc_pipe.h" > #include "libcamera/internal/ipc_unixsocket.h" > @@ -24,67 +23,161 @@ namespace libcamera { > > LOG_DECLARE_CATEGORY(IPCPipe) > > -IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath, > - const char *ipaProxyWorkerPath) > - : IPCPipe() > +class IPCUnixSocketWrapper : Thread I'm a bit weary of the concept of 'wrapping' an IPCUnixSocket. Doesn't that simply mean IPCUnixSocket should be re-implemented ? Why do we wrap it ? (Sorry - big complicated change - trying to understand as I go through ...) > { > - std::vector<int> fds; > - std::vector<std::string> args; > - args.push_back(ipaModulePath); > +public: > + IPCUnixSocketWrapper(Signal<const IPCMessage &> *recv) > + : recv_(recv), ready_(false), sendSyncPending_(false), > + sendSyncCookie_(0) > + { > + start(); > + } > > - socket_ = std::make_unique<IPCUnixSocket>(); > - UniqueFD fd = socket_->create(); > - if (!fd.isValid()) { > - LOG(IPCPipe, Error) << "Failed to create socket"; > - return; > + ~IPCUnixSocketWrapper() > + { > + exit(); > + wait(); > } > - socket_->readyRead.connect(this, &IPCPipeUnixSocket::readyRead); > - args.push_back(std::to_string(fd.get())); > - fds.push_back(fd.get()); > > - proc_ = std::make_unique<Process>(); > - int ret = proc_->start(ipaProxyWorkerPath, args, fds); > - if (ret) { > - LOG(IPCPipe, Error) > - << "Failed to start proxy worker process"; > - return; > + void run() override > + { > + /* > + * IPC socket construction and connection to its readyRead > + * signal has to be done from the IPC thread so that the > + * relevant Object instances (EventNotifier, slot) are bound to > + * its context. > + */ > + init(); > + exec(); > + deinit(); > } > > - connected_ = true; > -} > + int fd() { return fd_.get(); } > + int sendSync(const IPCMessage &in, IPCMessage *out); > + int sendAsync(const IPCMessage &data); > + bool waitReady(); > > -IPCPipeUnixSocket::~IPCPipeUnixSocket() > -{ > -} > +private: > + void init(); > + void deinit(); > + void readyRead(); > > -int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out) > + UniqueFD fd_; > + Signal<const IPCMessage &> *recv_; > + ConditionVariable cv_; > + Mutex mutex_; > + bool ready_; > + bool sendSyncPending_; > + uint32_t sendSyncCookie_; > + IPCUnixSocket::Payload *sendSyncResponse_; > + > + /* Socket shall be constructed and destructed from IPC thread context */ > + std::unique_ptr<IPCUnixSocket> socket_; > +}; > + > +int IPCUnixSocketWrapper::sendSync(const IPCMessage &in, IPCMessage *out) > { > + int ret; > IPCUnixSocket::Payload response; > > - int ret = call(in.payload(), &response, in.header().cookie); > + mutex_.lock(); > + ASSERT(!sendSyncPending_); > + sendSyncPending_ = true; > + sendSyncCookie_ = in.header().cookie; > + sendSyncResponse_ = &response; > + mutex_.unlock(); > + > + ret = socket_->send(in.payload()); > if (ret) { > - LOG(IPCPipe, Error) << "Failed to call sync"; > - return ret; > + LOG(IPCPipe, Error) << "Failed to send sync message"; > + goto cleanup; > + } > + > + bool complete; > + { > + MutexLocker locker(mutex_); > + auto syncComplete = ([&]() { > + return sendSyncPending_ == false; > + }); > + complete = cv_.wait_for(locker, 1000ms, syncComplete); > + } > + > + if (!complete) { > + LOG(IPCPipe, Error) << "Timeout sending sync message"; > + ret = -ETIMEDOUT; > + goto cleanup; > } > > if (out) > *out = IPCMessage(response); > > return 0; > + > +cleanup: > + mutex_.lock(); > + sendSyncPending_ = false; > + mutex_.unlock(); > + > + return ret; > } > > -int IPCPipeUnixSocket::sendAsync(const IPCMessage &data) > +int IPCUnixSocketWrapper::sendAsync(const IPCMessage &data) > { > - int ret = socket_->send(data.payload()); > - if (ret) { > - LOG(IPCPipe, Error) << "Failed to call async"; > - return ret; > + int ret; > + ret = socket_->send(data.payload()); > + if (ret) > + LOG(IPCPipe, Error) << "Failed to send sync message"; > + return ret; > +} > + > +bool IPCUnixSocketWrapper::waitReady() > +{ > + bool ready; > + { > + MutexLocker locker(mutex_); > + auto isReady = ([&]() { > + return ready_; > + }); > + ready = cv_.wait_for(locker, 1000ms, isReady); > } > > - return 0; > + return ready; > +} > + > +void IPCUnixSocketWrapper::init() > +{ > + /* Init is to be done from the IPC thread context */ > + ASSERT(Thread::current() == this); > + > + socket_ = std::make_unique<IPCUnixSocket>(); > + fd_ = socket_->create(); > + if (!fd_.isValid()) { > + LOG(IPCPipe, Error) << "Failed to create socket"; > + return; > + } > + > + socket_->readyRead.connect(this, &IPCUnixSocketWrapper::readyRead); > + > + mutex_.lock(); > + ready_ = true; > + mutex_.unlock(); > + cv_.notify_one(); > } > > -void IPCPipeUnixSocket::readyRead() > +void IPCUnixSocketWrapper::deinit() > +{ > + /* Deinit is to be done from the IPC thread context */ > + ASSERT(Thread::current() == this); > + > + socket_->readyRead.disconnect(this); > + socket_.reset(); > + > + mutex_.lock(); > + ready_ = false; > + mutex_.unlock(); > +} > + > +void IPCUnixSocketWrapper::readyRead() > { > IPCUnixSocket::Payload payload; > int ret = socket_->receive(&payload); > @@ -93,55 +186,72 @@ void IPCPipeUnixSocket::readyRead() > return; > } > > - /* \todo Use span to avoid the double copy when callData is found. */ Is this todo handled by the patch or made redundant by the patch ? > if (payload.data.size() < sizeof(IPCMessage::Header)) { > LOG(IPCPipe, Error) << "Not enough data received"; > return; > } > > - IPCMessage ipcMessage(payload); > + const IPCMessage::Header *header = > + reinterpret_cast<IPCMessage::Header *>(payload.data.data()); > + bool syncComplete = false; > + mutex_.lock(); > + if (sendSyncPending_ && sendSyncCookie_ == header->cookie) { > + syncComplete = true; > + sendSyncPending_ = false; > + *sendSyncResponse_ = std::move(payload); > + } > + mutex_.unlock(); > > - auto callData = callData_.find(ipcMessage.header().cookie); > - if (callData != callData_.end()) { > - *callData->second.response = std::move(payload); > - callData->second.done = true; > + if (syncComplete) { > + cv_.notify_one(); > return; > } > > /* Received unexpected data, this means it's a call from the IPA. */ > - recv.emit(ipcMessage); > + IPCMessage ipcMessage(payload); > + recv_->emit(ipcMessage); > } > > -int IPCPipeUnixSocket::call(const IPCUnixSocket::Payload &message, > - IPCUnixSocket::Payload *response, uint32_t cookie) > +IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath, > + const char *ipaProxyWorkerPath) > + : IPCPipe() > { > - Timer timeout; > - int ret; > + socketWrap_ = std::make_unique<IPCUnixSocketWrapper>(&recv); > + if (!socketWrap_->waitReady()) { > + LOG(IPCPipe, Error) << "Failed to create socket"; > + return; > + } > + int fd = socketWrap_->fd(); > > - const auto result = callData_.insert({ cookie, { response, false } }); > - const auto &iter = result.first; > + std::vector<int> fds; > + std::vector<std::string> args; > + args.push_back(ipaModulePath); > + args.push_back(std::to_string(fd)); > + fds.push_back(fd); > > - ret = socket_->send(message); > + proc_ = std::make_unique<Process>(); > + int ret = proc_->start(ipaProxyWorkerPath, args, fds); > if (ret) { > - callData_.erase(iter); > - return ret; > + LOG(IPCPipe, Error) > + << "Failed to start proxy worker process"; > + return; > } > > - /* \todo Make this less dangerous, see IPCPipe::sendSync() */ > - timeout.start(2000ms); > - while (!iter->second.done) { > - if (!timeout.isRunning()) { > - LOG(IPCPipe, Error) << "Call timeout!"; > - callData_.erase(iter); > - return -ETIMEDOUT; > - } > + connected_ = true; > +} > > - Thread::current()->eventDispatcher()->processEvents(); > - } > +IPCPipeUnixSocket::~IPCPipeUnixSocket() > +{ > +} > > - callData_.erase(iter); > +int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out) > +{ > + return socketWrap_->sendSync(in, out); > +} > > - return 0; > +int IPCPipeUnixSocket::sendAsync(const IPCMessage &data) > +{ > + return socketWrap_->sendAsync(data); > } > > } /* namespace libcamera */ > -- > 2.34.1 >
Hi Kieran, On 21/07/2025 20:08, Kieran Bingham wrote: > Hi Julien, > > I've just seen this in my backlog. > > It's ... a substantial change, so it's hard to digest. Is there anything > we can break down here? This change introduces the IPCUnixSocketWrapper class to move a part of the existing IPCPipeUnixSocket class implementation, the IPC socket receive path, in a dedicated thread instead of running it in the camera manager thread. Purpose is to be able to properly block the camera manager thread execution when sending a synchronous message over IPC. The camera manager thread is later resumed by this new thread that detects the completion of the sync message sending. Existing implementation had to be redistributed between the two classes. The change may look substantial but I don't see how to break it down :/ > > Are you running this change at NXP ? This change is present in the NXP BSP. It was introduced because of IPC hangs experienced during multi-cameras operation, with a third-party IPA running in isolated mode. Root cause for that issue is a bit intricate. Background is given in the commit message - message length was limited to keep it readable though. > > Quoting Julien Vuillaumier (2024-12-20 14:55:56) >> Unix-socket based IPC sometimes times out or hangs, typically >> when multiple camera are stopped simulaneously. That specific case >> triggers the concurrent sending by each pipeline handler instance >> of a synchronous stop() message to its peer IPA process. >> >> There is a dedicated IPC socket per camera. Sockets payload receipt >> signals all run in the camera manager thread. >> To send a synchronous message to IPA, pipeline invokes from camera >> thread IPCPipeUnixSocket::sendSync(). This sends the message then blocks >> busy waiting for the peer acknowledgment. Such busy wait is done by >> blocking on event loop calling dispatchEvent(), until the ack condition >> is detected. >> >> One issue is that the socket receive slot readyRead() wakes up the >> blocked thread via libcamera::Message receipt. Even though such message >> resumes processEvents(), it may reblock immediately because readyRead() >> does not interrupt() explictly the dispatcher. >> Most of the time, an other pending event for the thread unblocks the >> event dispatcher and the ack condition is detected - in worst case the 2 >> sec timeout kicks in. Once unblocked, the dispatcher let the message >> acknowledgment to be detected and the sendSync() completes. >> >> The other issue is that in case of concurrent synchronous IPC messages >> sent by multiple pipeline handlers, there is a possible recursion >> of sendSync() / processEvents() nested in the camera thread stack. As >> commented in the source, that is a dangerous construct that can lead to >> a hang. > > I'm not sure I fully understand, - do you have multiple cameras using a > Single IPA - or multiple cameras using multiple IPAs using a single > pipeline handler perhaps? Wording is not very clear indeed, my apologies for this. Conditions where this issue can be seen is with a pipeline handler instance in charge of multiple cameras (same libcamera process/thread). Each camera is interfacing with its own instance of the same IPA. Each IPA instance is running in isolated mode (third-party control algorithms). Typical occurrence of the hang is when application is stopping all the cameras simultaneously: pipeline handler, from the camera thread context, has to send synchronous IPC stop messages to each IPA instance and handle concurrently the completion of those messages. > >> The reason is that the last synchronous message sent is the deepest in >> the stack. It is also the one whose acknowledgment is being busy waited. >> However other pending synchronous messages may have been initiated before >> and are upper in the stack. If they timeout, the condition is not >> detected because of the stack recursion, as the thread is busy waiting >> for the last message to be acknowledged. >> >> This change implements a safer mechanism to handle the synchronous >> message sending, similar to the one used for non isolated IPA. The >> IPCUnixSocketWrapper class is introduced to handle the IPCUnixSocket >> receive signal in a dedicated thread. >> Doing so, the sending thread, when emiting a synchronous message, can be >> blocked without event dispatcher's processEvents() usage, which avoids >> the risky stack recursion. >> >> Fixes: 21f1b555b ("libcamera: Add IPCPipe implementation based on unix socket") >> > > Paul - you were the author of the commit referenced above - could you > check through this patch please? > Let me know if there's something I can do to help with the review of this change. Thanks, Julien
diff --git a/include/libcamera/internal/ipc_pipe_unixsocket.h b/include/libcamera/internal/ipc_pipe_unixsocket.h index 8c972613..280639d5 100644 --- a/include/libcamera/internal/ipc_pipe_unixsocket.h +++ b/include/libcamera/internal/ipc_pipe_unixsocket.h @@ -16,6 +16,7 @@ namespace libcamera { class Process; +class IPCUnixSocketWrapper; class IPCPipeUnixSocket : public IPCPipe { @@ -29,18 +30,8 @@ public: int sendAsync(const IPCMessage &data) override; private: - struct CallData { - IPCUnixSocket::Payload *response; - bool done; - }; - - void readyRead(); - int call(const IPCUnixSocket::Payload &message, - IPCUnixSocket::Payload *response, uint32_t seq); - std::unique_ptr<Process> proc_; - std::unique_ptr<IPCUnixSocket> socket_; - std::map<uint32_t, CallData> callData_; + std::unique_ptr<IPCUnixSocketWrapper> socketWrap_; }; } /* namespace libcamera */ diff --git a/src/libcamera/ipc_pipe_unixsocket.cpp b/src/libcamera/ipc_pipe_unixsocket.cpp index 668ec73b..eb5408d4 100644 --- a/src/libcamera/ipc_pipe_unixsocket.cpp +++ b/src/libcamera/ipc_pipe_unixsocket.cpp @@ -9,10 +9,9 @@ #include <vector> -#include <libcamera/base/event_dispatcher.h> #include <libcamera/base/log.h> +#include <libcamera/base/mutex.h> #include <libcamera/base/thread.h> -#include <libcamera/base/timer.h> #include "libcamera/internal/ipc_pipe.h" #include "libcamera/internal/ipc_unixsocket.h" @@ -24,67 +23,161 @@ namespace libcamera { LOG_DECLARE_CATEGORY(IPCPipe) -IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath, - const char *ipaProxyWorkerPath) - : IPCPipe() +class IPCUnixSocketWrapper : Thread { - std::vector<int> fds; - std::vector<std::string> args; - args.push_back(ipaModulePath); +public: + IPCUnixSocketWrapper(Signal<const IPCMessage &> *recv) + : recv_(recv), ready_(false), sendSyncPending_(false), + sendSyncCookie_(0) + { + start(); + } - socket_ = std::make_unique<IPCUnixSocket>(); - UniqueFD fd = socket_->create(); - if (!fd.isValid()) { - LOG(IPCPipe, Error) << "Failed to create socket"; - return; + ~IPCUnixSocketWrapper() + { + exit(); + wait(); } - socket_->readyRead.connect(this, &IPCPipeUnixSocket::readyRead); - args.push_back(std::to_string(fd.get())); - fds.push_back(fd.get()); - proc_ = std::make_unique<Process>(); - int ret = proc_->start(ipaProxyWorkerPath, args, fds); - if (ret) { - LOG(IPCPipe, Error) - << "Failed to start proxy worker process"; - return; + void run() override + { + /* + * IPC socket construction and connection to its readyRead + * signal has to be done from the IPC thread so that the + * relevant Object instances (EventNotifier, slot) are bound to + * its context. + */ + init(); + exec(); + deinit(); } - connected_ = true; -} + int fd() { return fd_.get(); } + int sendSync(const IPCMessage &in, IPCMessage *out); + int sendAsync(const IPCMessage &data); + bool waitReady(); -IPCPipeUnixSocket::~IPCPipeUnixSocket() -{ -} +private: + void init(); + void deinit(); + void readyRead(); -int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out) + UniqueFD fd_; + Signal<const IPCMessage &> *recv_; + ConditionVariable cv_; + Mutex mutex_; + bool ready_; + bool sendSyncPending_; + uint32_t sendSyncCookie_; + IPCUnixSocket::Payload *sendSyncResponse_; + + /* Socket shall be constructed and destructed from IPC thread context */ + std::unique_ptr<IPCUnixSocket> socket_; +}; + +int IPCUnixSocketWrapper::sendSync(const IPCMessage &in, IPCMessage *out) { + int ret; IPCUnixSocket::Payload response; - int ret = call(in.payload(), &response, in.header().cookie); + mutex_.lock(); + ASSERT(!sendSyncPending_); + sendSyncPending_ = true; + sendSyncCookie_ = in.header().cookie; + sendSyncResponse_ = &response; + mutex_.unlock(); + + ret = socket_->send(in.payload()); if (ret) { - LOG(IPCPipe, Error) << "Failed to call sync"; - return ret; + LOG(IPCPipe, Error) << "Failed to send sync message"; + goto cleanup; + } + + bool complete; + { + MutexLocker locker(mutex_); + auto syncComplete = ([&]() { + return sendSyncPending_ == false; + }); + complete = cv_.wait_for(locker, 1000ms, syncComplete); + } + + if (!complete) { + LOG(IPCPipe, Error) << "Timeout sending sync message"; + ret = -ETIMEDOUT; + goto cleanup; } if (out) *out = IPCMessage(response); return 0; + +cleanup: + mutex_.lock(); + sendSyncPending_ = false; + mutex_.unlock(); + + return ret; } -int IPCPipeUnixSocket::sendAsync(const IPCMessage &data) +int IPCUnixSocketWrapper::sendAsync(const IPCMessage &data) { - int ret = socket_->send(data.payload()); - if (ret) { - LOG(IPCPipe, Error) << "Failed to call async"; - return ret; + int ret; + ret = socket_->send(data.payload()); + if (ret) + LOG(IPCPipe, Error) << "Failed to send sync message"; + return ret; +} + +bool IPCUnixSocketWrapper::waitReady() +{ + bool ready; + { + MutexLocker locker(mutex_); + auto isReady = ([&]() { + return ready_; + }); + ready = cv_.wait_for(locker, 1000ms, isReady); } - return 0; + return ready; +} + +void IPCUnixSocketWrapper::init() +{ + /* Init is to be done from the IPC thread context */ + ASSERT(Thread::current() == this); + + socket_ = std::make_unique<IPCUnixSocket>(); + fd_ = socket_->create(); + if (!fd_.isValid()) { + LOG(IPCPipe, Error) << "Failed to create socket"; + return; + } + + socket_->readyRead.connect(this, &IPCUnixSocketWrapper::readyRead); + + mutex_.lock(); + ready_ = true; + mutex_.unlock(); + cv_.notify_one(); } -void IPCPipeUnixSocket::readyRead() +void IPCUnixSocketWrapper::deinit() +{ + /* Deinit is to be done from the IPC thread context */ + ASSERT(Thread::current() == this); + + socket_->readyRead.disconnect(this); + socket_.reset(); + + mutex_.lock(); + ready_ = false; + mutex_.unlock(); +} + +void IPCUnixSocketWrapper::readyRead() { IPCUnixSocket::Payload payload; int ret = socket_->receive(&payload); @@ -93,55 +186,72 @@ void IPCPipeUnixSocket::readyRead() return; } - /* \todo Use span to avoid the double copy when callData is found. */ if (payload.data.size() < sizeof(IPCMessage::Header)) { LOG(IPCPipe, Error) << "Not enough data received"; return; } - IPCMessage ipcMessage(payload); + const IPCMessage::Header *header = + reinterpret_cast<IPCMessage::Header *>(payload.data.data()); + bool syncComplete = false; + mutex_.lock(); + if (sendSyncPending_ && sendSyncCookie_ == header->cookie) { + syncComplete = true; + sendSyncPending_ = false; + *sendSyncResponse_ = std::move(payload); + } + mutex_.unlock(); - auto callData = callData_.find(ipcMessage.header().cookie); - if (callData != callData_.end()) { - *callData->second.response = std::move(payload); - callData->second.done = true; + if (syncComplete) { + cv_.notify_one(); return; } /* Received unexpected data, this means it's a call from the IPA. */ - recv.emit(ipcMessage); + IPCMessage ipcMessage(payload); + recv_->emit(ipcMessage); } -int IPCPipeUnixSocket::call(const IPCUnixSocket::Payload &message, - IPCUnixSocket::Payload *response, uint32_t cookie) +IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath, + const char *ipaProxyWorkerPath) + : IPCPipe() { - Timer timeout; - int ret; + socketWrap_ = std::make_unique<IPCUnixSocketWrapper>(&recv); + if (!socketWrap_->waitReady()) { + LOG(IPCPipe, Error) << "Failed to create socket"; + return; + } + int fd = socketWrap_->fd(); - const auto result = callData_.insert({ cookie, { response, false } }); - const auto &iter = result.first; + std::vector<int> fds; + std::vector<std::string> args; + args.push_back(ipaModulePath); + args.push_back(std::to_string(fd)); + fds.push_back(fd); - ret = socket_->send(message); + proc_ = std::make_unique<Process>(); + int ret = proc_->start(ipaProxyWorkerPath, args, fds); if (ret) { - callData_.erase(iter); - return ret; + LOG(IPCPipe, Error) + << "Failed to start proxy worker process"; + return; } - /* \todo Make this less dangerous, see IPCPipe::sendSync() */ - timeout.start(2000ms); - while (!iter->second.done) { - if (!timeout.isRunning()) { - LOG(IPCPipe, Error) << "Call timeout!"; - callData_.erase(iter); - return -ETIMEDOUT; - } + connected_ = true; +} - Thread::current()->eventDispatcher()->processEvents(); - } +IPCPipeUnixSocket::~IPCPipeUnixSocket() +{ +} - callData_.erase(iter); +int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out) +{ + return socketWrap_->sendSync(in, out); +} - return 0; +int IPCPipeUnixSocket::sendAsync(const IPCMessage &data) +{ + return socketWrap_->sendAsync(data); } } /* namespace libcamera */
Unix-socket based IPC sometimes times out or hangs, typically when multiple camera are stopped simulaneously. That specific case triggers the concurrent sending by each pipeline handler instance of a synchronous stop() message to its peer IPA process. There is a dedicated IPC socket per camera. Sockets payload receipt signals all run in the camera manager thread. To send a synchronous message to IPA, pipeline invokes from camera thread IPCPipeUnixSocket::sendSync(). This sends the message then blocks busy waiting for the peer acknowledgment. Such busy wait is done by blocking on event loop calling dispatchEvent(), until the ack condition is detected. One issue is that the socket receive slot readyRead() wakes up the blocked thread via libcamera::Message receipt. Even though such message resumes processEvents(), it may reblock immediately because readyRead() does not interrupt() explictly the dispatcher. Most of the time, an other pending event for the thread unblocks the event dispatcher and the ack condition is detected - in worst case the 2 sec timeout kicks in. Once unblocked, the dispatcher let the message acknowledgment to be detected and the sendSync() completes. The other issue is that in case of concurrent synchronous IPC messages sent by multiple pipeline handlers, there is a possible recursion of sendSync() / processEvents() nested in the camera thread stack. As commented in the source, that is a dangerous construct that can lead to a hang. The reason is that the last synchronous message sent is the deepest in the stack. It is also the one whose acknowledgment is being busy waited. However other pending synchronous messages may have been initiated before and are upper in the stack. If they timeout, the condition is not detected because of the stack recursion, as the thread is busy waiting for the last message to be acknowledged. This change implements a safer mechanism to handle the synchronous message sending, similar to the one used for non isolated IPA. The IPCUnixSocketWrapper class is introduced to handle the IPCUnixSocket receive signal in a dedicated thread. Doing so, the sending thread, when emiting a synchronous message, can be blocked without event dispatcher's processEvents() usage, which avoids the risky stack recursion. Fixes: 21f1b555b ("libcamera: Add IPCPipe implementation based on unix socket") Signed-off-by: Julien Vuillaumier <julien.vuillaumier@nxp.com> --- .../libcamera/internal/ipc_pipe_unixsocket.h | 13 +- src/libcamera/ipc_pipe_unixsocket.cpp | 242 +++++++++++++----- 2 files changed, 178 insertions(+), 77 deletions(-)