@@ -16,6 +16,7 @@
namespace libcamera {
class Process;
+class IPCUnixSocketWrapper;
class IPCPipeUnixSocket : public IPCPipe
{
@@ -29,18 +30,8 @@ public:
int sendAsync(const IPCMessage &data) override;
private:
- struct CallData {
- IPCUnixSocket::Payload *response;
- bool done;
- };
-
- void readyRead();
- int call(const IPCUnixSocket::Payload &message,
- IPCUnixSocket::Payload *response, uint32_t seq);
-
std::unique_ptr<Process> proc_;
- std::unique_ptr<IPCUnixSocket> socket_;
- std::map<uint32_t, CallData> callData_;
+ std::unique_ptr<IPCUnixSocketWrapper> socketWrap_;
};
} /* namespace libcamera */
@@ -9,10 +9,9 @@
#include <vector>
-#include <libcamera/base/event_dispatcher.h>
#include <libcamera/base/log.h>
+#include <libcamera/base/mutex.h>
#include <libcamera/base/thread.h>
-#include <libcamera/base/timer.h>
#include "libcamera/internal/ipc_pipe.h"
#include "libcamera/internal/ipc_unixsocket.h"
@@ -24,67 +23,161 @@ namespace libcamera {
LOG_DECLARE_CATEGORY(IPCPipe)
-IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath,
- const char *ipaProxyWorkerPath)
- : IPCPipe()
+class IPCUnixSocketWrapper : Thread
{
- std::vector<int> fds;
- std::vector<std::string> args;
- args.push_back(ipaModulePath);
+public:
+ IPCUnixSocketWrapper(Signal<const IPCMessage &> *recv)
+ : recv_(recv), ready_(false), sendSyncPending_(false),
+ sendSyncCookie_(0)
+ {
+ start();
+ }
- socket_ = std::make_unique<IPCUnixSocket>();
- UniqueFD fd = socket_->create();
- if (!fd.isValid()) {
- LOG(IPCPipe, Error) << "Failed to create socket";
- return;
+ ~IPCUnixSocketWrapper()
+ {
+ exit();
+ wait();
}
- socket_->readyRead.connect(this, &IPCPipeUnixSocket::readyRead);
- args.push_back(std::to_string(fd.get()));
- fds.push_back(fd.get());
- proc_ = std::make_unique<Process>();
- int ret = proc_->start(ipaProxyWorkerPath, args, fds);
- if (ret) {
- LOG(IPCPipe, Error)
- << "Failed to start proxy worker process";
- return;
+ void run() override
+ {
+ /*
+ * IPC socket construction and connection to its readyRead
+ * signal has to be done from the IPC thread so that the
+ * relevant Object instances (EventNotifier, slot) are bound to
+ * its context.
+ */
+ init();
+ exec();
+ deinit();
}
- connected_ = true;
-}
+ int fd() { return fd_.get(); }
+ int sendSync(const IPCMessage &in, IPCMessage *out);
+ int sendAsync(const IPCMessage &data);
+ bool waitReady();
-IPCPipeUnixSocket::~IPCPipeUnixSocket()
-{
-}
+private:
+ void init();
+ void deinit();
+ void readyRead();
-int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out)
+ UniqueFD fd_;
+ Signal<const IPCMessage &> *recv_;
+ ConditionVariable cv_;
+ Mutex mutex_;
+ bool ready_;
+ bool sendSyncPending_;
+ uint32_t sendSyncCookie_;
+ IPCUnixSocket::Payload *sendSyncResponse_;
+
+ /* Socket shall be constructed and destructed from IPC thread context */
+ std::unique_ptr<IPCUnixSocket> socket_;
+};
+
+int IPCUnixSocketWrapper::sendSync(const IPCMessage &in, IPCMessage *out)
{
+ int ret;
IPCUnixSocket::Payload response;
- int ret = call(in.payload(), &response, in.header().cookie);
+ mutex_.lock();
+ ASSERT(!sendSyncPending_);
+ sendSyncPending_ = true;
+ sendSyncCookie_ = in.header().cookie;
+ sendSyncResponse_ = &response;
+ mutex_.unlock();
+
+ ret = socket_->send(in.payload());
if (ret) {
- LOG(IPCPipe, Error) << "Failed to call sync";
- return ret;
+ LOG(IPCPipe, Error) << "Failed to send sync message";
+ goto cleanup;
+ }
+
+ bool complete;
+ {
+ MutexLocker locker(mutex_);
+ auto syncComplete = ([&]() {
+ return sendSyncPending_ == false;
+ });
+ complete = cv_.wait_for(locker, 1000ms, syncComplete);
+ }
+
+ if (!complete) {
+ LOG(IPCPipe, Error) << "Timeout sending sync message";
+ ret = -ETIMEDOUT;
+ goto cleanup;
}
if (out)
*out = IPCMessage(response);
return 0;
+
+cleanup:
+ mutex_.lock();
+ sendSyncPending_ = false;
+ mutex_.unlock();
+
+ return ret;
}
-int IPCPipeUnixSocket::sendAsync(const IPCMessage &data)
+int IPCUnixSocketWrapper::sendAsync(const IPCMessage &data)
{
- int ret = socket_->send(data.payload());
- if (ret) {
- LOG(IPCPipe, Error) << "Failed to call async";
- return ret;
+ int ret;
+ ret = socket_->send(data.payload());
+ if (ret)
+ LOG(IPCPipe, Error) << "Failed to send sync message";
+ return ret;
+}
+
+bool IPCUnixSocketWrapper::waitReady()
+{
+ bool ready;
+ {
+ MutexLocker locker(mutex_);
+ auto isReady = ([&]() {
+ return ready_;
+ });
+ ready = cv_.wait_for(locker, 1000ms, isReady);
}
- return 0;
+ return ready;
+}
+
+void IPCUnixSocketWrapper::init()
+{
+ /* Init is to be done from the IPC thread context */
+ ASSERT(Thread::current() == this);
+
+ socket_ = std::make_unique<IPCUnixSocket>();
+ fd_ = socket_->create();
+ if (!fd_.isValid()) {
+ LOG(IPCPipe, Error) << "Failed to create socket";
+ return;
+ }
+
+ socket_->readyRead.connect(this, &IPCUnixSocketWrapper::readyRead);
+
+ mutex_.lock();
+ ready_ = true;
+ mutex_.unlock();
+ cv_.notify_one();
}
-void IPCPipeUnixSocket::readyRead()
+void IPCUnixSocketWrapper::deinit()
+{
+ /* Deinit is to be done from the IPC thread context */
+ ASSERT(Thread::current() == this);
+
+ socket_->readyRead.disconnect(this);
+ socket_.reset();
+
+ mutex_.lock();
+ ready_ = false;
+ mutex_.unlock();
+}
+
+void IPCUnixSocketWrapper::readyRead()
{
IPCUnixSocket::Payload payload;
int ret = socket_->receive(&payload);
@@ -93,55 +186,72 @@ void IPCPipeUnixSocket::readyRead()
return;
}
- /* \todo Use span to avoid the double copy when callData is found. */
if (payload.data.size() < sizeof(IPCMessage::Header)) {
LOG(IPCPipe, Error) << "Not enough data received";
return;
}
- IPCMessage ipcMessage(payload);
+ const IPCMessage::Header *header =
+ reinterpret_cast<IPCMessage::Header *>(payload.data.data());
+ bool syncComplete = false;
+ mutex_.lock();
+ if (sendSyncPending_ && sendSyncCookie_ == header->cookie) {
+ syncComplete = true;
+ sendSyncPending_ = false;
+ *sendSyncResponse_ = std::move(payload);
+ }
+ mutex_.unlock();
- auto callData = callData_.find(ipcMessage.header().cookie);
- if (callData != callData_.end()) {
- *callData->second.response = std::move(payload);
- callData->second.done = true;
+ if (syncComplete) {
+ cv_.notify_one();
return;
}
/* Received unexpected data, this means it's a call from the IPA. */
- recv.emit(ipcMessage);
+ IPCMessage ipcMessage(payload);
+ recv_->emit(ipcMessage);
}
-int IPCPipeUnixSocket::call(const IPCUnixSocket::Payload &message,
- IPCUnixSocket::Payload *response, uint32_t cookie)
+IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath,
+ const char *ipaProxyWorkerPath)
+ : IPCPipe()
{
- Timer timeout;
- int ret;
+ socketWrap_ = std::make_unique<IPCUnixSocketWrapper>(&recv);
+ if (!socketWrap_->waitReady()) {
+ LOG(IPCPipe, Error) << "Failed to create socket";
+ return;
+ }
+ int fd = socketWrap_->fd();
- const auto result = callData_.insert({ cookie, { response, false } });
- const auto &iter = result.first;
+ std::vector<int> fds;
+ std::vector<std::string> args;
+ args.push_back(ipaModulePath);
+ args.push_back(std::to_string(fd));
+ fds.push_back(fd);
- ret = socket_->send(message);
+ proc_ = std::make_unique<Process>();
+ int ret = proc_->start(ipaProxyWorkerPath, args, fds);
if (ret) {
- callData_.erase(iter);
- return ret;
+ LOG(IPCPipe, Error)
+ << "Failed to start proxy worker process";
+ return;
}
- /* \todo Make this less dangerous, see IPCPipe::sendSync() */
- timeout.start(2000ms);
- while (!iter->second.done) {
- if (!timeout.isRunning()) {
- LOG(IPCPipe, Error) << "Call timeout!";
- callData_.erase(iter);
- return -ETIMEDOUT;
- }
+ connected_ = true;
+}
- Thread::current()->eventDispatcher()->processEvents();
- }
+IPCPipeUnixSocket::~IPCPipeUnixSocket()
+{
+}
- callData_.erase(iter);
+int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out)
+{
+ return socketWrap_->sendSync(in, out);
+}
- return 0;
+int IPCPipeUnixSocket::sendAsync(const IPCMessage &data)
+{
+ return socketWrap_->sendAsync(data);
}
} /* namespace libcamera */
Unix-socket based IPC sometimes times out or hangs, typically when multiple camera are stopped simulaneously. That specific case triggers the concurrent sending by each pipeline handler instance of a synchronous stop() message to its peer IPA process. There is a dedicated IPC socket per camera. Sockets payload receipt signals all run in the camera manager thread. To send a synchronous message to IPA, pipeline invokes from camera thread IPCPipeUnixSocket::sendSync(). This sends the message then blocks busy waiting for the peer acknowledgment. Such busy wait is done by blocking on event loop calling dispatchEvent(), until the ack condition is detected. One issue is that the socket receive slot readyRead() wakes up the blocked thread via libcamera::Message receipt. Even though such message resumes processEvents(), it may reblock immediately because readyRead() does not interrupt() explictly the dispatcher. Most of the time, an other pending event for the thread unblocks the event dispatcher and the ack condition is detected - in worst case the 2 sec timeout kicks in. Once unblocked, the dispatcher let the message acknowledgment to be detected and the sendSync() completes. The other issue is that in case of concurrent synchronous IPC messages sent by multiple pipeline handlers, there is a possible recursion of sendSync() / processEvents() nested in the camera thread stack. As commented in the source, that is a dangerous construct that can lead to a hang. The reason is that the last synchronous message sent is the deepest in the stack. It is also the one whose acknowledgment is being busy waited. However other pending synchronous messages may have been initiated before and are upper in the stack. If they timeout, the condition is not detected because of the stack recursion, as the thread is busy waiting for the last message to be acknowledged. This change implements a safer mechanism to handle the synchronous message sending, similar to the one used for non isolated IPA. The IPCUnixSocketWrapper class is introduced to handle the IPCUnixSocket receive signal in a dedicated thread. Doing so, the sending thread, when emiting a synchronous message, can be blocked without event dispatcher's processEvents() usage, which avoids the risky stack recursion. Fixes: 21f1b555b ("libcamera: Add IPCPipe implementation based on unix socket") Signed-off-by: Julien Vuillaumier <julien.vuillaumier@nxp.com> --- .../libcamera/internal/ipc_pipe_unixsocket.h | 13 +- src/libcamera/ipc_pipe_unixsocket.cpp | 242 +++++++++++++----- 2 files changed, 178 insertions(+), 77 deletions(-)