libcamera: ipc_unixsocket: Fix sendSync() timeout and hang
diff mbox series

Message ID 20241220145556.3011657-1-julien.vuillaumier@nxp.com
State New
Headers show
Series
  • libcamera: ipc_unixsocket: Fix sendSync() timeout and hang
Related show

Commit Message

Julien Vuillaumier Dec. 20, 2024, 2:55 p.m. UTC
Unix-socket based IPC sometimes times out or hangs, typically
when multiple camera are stopped simulaneously. That specific case
triggers the concurrent sending by each pipeline handler instance
of a synchronous stop() message to its peer IPA process.

There is a dedicated IPC socket per camera. Sockets payload receipt
signals all run in the camera manager thread.
To send a synchronous message to IPA, pipeline invokes from camera
thread IPCPipeUnixSocket::sendSync(). This sends the message then blocks
busy waiting for the peer acknowledgment. Such busy wait is done by
blocking on event loop calling dispatchEvent(), until the ack condition
is detected.

One issue is that the socket receive slot readyRead() wakes up the
blocked thread via libcamera::Message receipt. Even though such message
resumes processEvents(), it may reblock immediately because readyRead()
does not interrupt() explictly the dispatcher.
Most of the time, an other pending event for the thread unblocks the
event dispatcher and the ack condition is detected - in worst case the 2
sec timeout kicks in. Once unblocked, the dispatcher let the message
acknowledgment to be detected and the sendSync() completes.

The other issue is that in case of concurrent synchronous IPC messages
sent by multiple pipeline handlers, there is a possible recursion
of sendSync() / processEvents() nested in the camera thread stack. As
commented in the source, that is a dangerous construct that can lead to
a hang.
The reason is that the last synchronous message sent is the deepest in
the stack. It is also the one whose acknowledgment is being busy waited.
However other pending synchronous messages may have been initiated before
and are upper in the stack. If they timeout, the condition is not
detected because of the stack recursion, as the thread is busy waiting
for the last message to be acknowledged.

This change implements a safer mechanism to handle the synchronous
message sending, similar to the one used for non isolated IPA. The
IPCUnixSocketWrapper class is introduced to handle the IPCUnixSocket
receive signal in a dedicated thread.
Doing so, the sending thread, when emiting a synchronous message, can be
blocked without event dispatcher's processEvents() usage, which avoids
the risky stack recursion.

Fixes: 21f1b555b ("libcamera: Add IPCPipe implementation based on unix socket")

Signed-off-by: Julien Vuillaumier <julien.vuillaumier@nxp.com>
---
 .../libcamera/internal/ipc_pipe_unixsocket.h  |  13 +-
 src/libcamera/ipc_pipe_unixsocket.cpp         | 242 +++++++++++++-----
 2 files changed, 178 insertions(+), 77 deletions(-)

Patch
diff mbox series

diff --git a/include/libcamera/internal/ipc_pipe_unixsocket.h b/include/libcamera/internal/ipc_pipe_unixsocket.h
index 8c972613..280639d5 100644
--- a/include/libcamera/internal/ipc_pipe_unixsocket.h
+++ b/include/libcamera/internal/ipc_pipe_unixsocket.h
@@ -16,6 +16,7 @@ 
 namespace libcamera {
 
 class Process;
+class IPCUnixSocketWrapper;
 
 class IPCPipeUnixSocket : public IPCPipe
 {
@@ -29,18 +30,8 @@  public:
 	int sendAsync(const IPCMessage &data) override;
 
 private:
-	struct CallData {
-		IPCUnixSocket::Payload *response;
-		bool done;
-	};
-
-	void readyRead();
-	int call(const IPCUnixSocket::Payload &message,
-		 IPCUnixSocket::Payload *response, uint32_t seq);
-
 	std::unique_ptr<Process> proc_;
-	std::unique_ptr<IPCUnixSocket> socket_;
-	std::map<uint32_t, CallData> callData_;
+	std::unique_ptr<IPCUnixSocketWrapper> socketWrap_;
 };
 
 } /* namespace libcamera */
diff --git a/src/libcamera/ipc_pipe_unixsocket.cpp b/src/libcamera/ipc_pipe_unixsocket.cpp
index 668ec73b..eb5408d4 100644
--- a/src/libcamera/ipc_pipe_unixsocket.cpp
+++ b/src/libcamera/ipc_pipe_unixsocket.cpp
@@ -9,10 +9,9 @@ 
 
 #include <vector>
 
-#include <libcamera/base/event_dispatcher.h>
 #include <libcamera/base/log.h>
+#include <libcamera/base/mutex.h>
 #include <libcamera/base/thread.h>
-#include <libcamera/base/timer.h>
 
 #include "libcamera/internal/ipc_pipe.h"
 #include "libcamera/internal/ipc_unixsocket.h"
@@ -24,67 +23,161 @@  namespace libcamera {
 
 LOG_DECLARE_CATEGORY(IPCPipe)
 
-IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath,
-				     const char *ipaProxyWorkerPath)
-	: IPCPipe()
+class IPCUnixSocketWrapper : Thread
 {
-	std::vector<int> fds;
-	std::vector<std::string> args;
-	args.push_back(ipaModulePath);
+public:
+	IPCUnixSocketWrapper(Signal<const IPCMessage &> *recv)
+		: recv_(recv), ready_(false), sendSyncPending_(false),
+		  sendSyncCookie_(0)
+	{
+		start();
+	}
 
-	socket_ = std::make_unique<IPCUnixSocket>();
-	UniqueFD fd = socket_->create();
-	if (!fd.isValid()) {
-		LOG(IPCPipe, Error) << "Failed to create socket";
-		return;
+	~IPCUnixSocketWrapper()
+	{
+		exit();
+		wait();
 	}
-	socket_->readyRead.connect(this, &IPCPipeUnixSocket::readyRead);
-	args.push_back(std::to_string(fd.get()));
-	fds.push_back(fd.get());
 
-	proc_ = std::make_unique<Process>();
-	int ret = proc_->start(ipaProxyWorkerPath, args, fds);
-	if (ret) {
-		LOG(IPCPipe, Error)
-			<< "Failed to start proxy worker process";
-		return;
+	void run() override
+	{
+		/*
+		 * IPC socket construction and connection to its readyRead
+		 * signal has to be done from the IPC thread so that the
+		 * relevant Object instances (EventNotifier, slot) are bound to
+		 * its context.
+		 */
+		init();
+		exec();
+		deinit();
 	}
 
-	connected_ = true;
-}
+	int fd() { return fd_.get(); }
+	int sendSync(const IPCMessage &in, IPCMessage *out);
+	int sendAsync(const IPCMessage &data);
+	bool waitReady();
 
-IPCPipeUnixSocket::~IPCPipeUnixSocket()
-{
-}
+private:
+	void init();
+	void deinit();
+	void readyRead();
 
-int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out)
+	UniqueFD fd_;
+	Signal<const IPCMessage &> *recv_;
+	ConditionVariable cv_;
+	Mutex mutex_;
+	bool ready_;
+	bool sendSyncPending_;
+	uint32_t sendSyncCookie_;
+	IPCUnixSocket::Payload *sendSyncResponse_;
+
+	/* Socket shall be constructed and destructed from IPC thread context */
+	std::unique_ptr<IPCUnixSocket> socket_;
+};
+
+int IPCUnixSocketWrapper::sendSync(const IPCMessage &in, IPCMessage *out)
 {
+	int ret;
 	IPCUnixSocket::Payload response;
 
-	int ret = call(in.payload(), &response, in.header().cookie);
+	mutex_.lock();
+	ASSERT(!sendSyncPending_);
+	sendSyncPending_ = true;
+	sendSyncCookie_ = in.header().cookie;
+	sendSyncResponse_ = &response;
+	mutex_.unlock();
+
+	ret = socket_->send(in.payload());
 	if (ret) {
-		LOG(IPCPipe, Error) << "Failed to call sync";
-		return ret;
+		LOG(IPCPipe, Error) << "Failed to send sync message";
+		goto cleanup;
+	}
+
+	bool complete;
+	{
+		MutexLocker locker(mutex_);
+		auto syncComplete = ([&]() {
+			return sendSyncPending_ == false;
+		});
+		complete = cv_.wait_for(locker, 1000ms, syncComplete);
+	}
+
+	if (!complete) {
+		LOG(IPCPipe, Error) << "Timeout sending sync message";
+		ret = -ETIMEDOUT;
+		goto cleanup;
 	}
 
 	if (out)
 		*out = IPCMessage(response);
 
 	return 0;
+
+cleanup:
+	mutex_.lock();
+	sendSyncPending_ = false;
+	mutex_.unlock();
+
+	return ret;
 }
 
-int IPCPipeUnixSocket::sendAsync(const IPCMessage &data)
+int IPCUnixSocketWrapper::sendAsync(const IPCMessage &data)
 {
-	int ret = socket_->send(data.payload());
-	if (ret) {
-		LOG(IPCPipe, Error) << "Failed to call async";
-		return ret;
+	int ret;
+	ret = socket_->send(data.payload());
+	if (ret)
+		LOG(IPCPipe, Error) << "Failed to send sync message";
+	return ret;
+}
+
+bool IPCUnixSocketWrapper::waitReady()
+{
+	bool ready;
+	{
+		MutexLocker locker(mutex_);
+		auto isReady = ([&]() {
+			return ready_;
+		});
+		ready = cv_.wait_for(locker, 1000ms, isReady);
 	}
 
-	return 0;
+	return ready;
+}
+
+void IPCUnixSocketWrapper::init()
+{
+	/* Init is to be done from the IPC thread context */
+	ASSERT(Thread::current() == this);
+
+	socket_ = std::make_unique<IPCUnixSocket>();
+	fd_ = socket_->create();
+	if (!fd_.isValid()) {
+		LOG(IPCPipe, Error) << "Failed to create socket";
+		return;
+	}
+
+	socket_->readyRead.connect(this, &IPCUnixSocketWrapper::readyRead);
+
+	mutex_.lock();
+	ready_ = true;
+	mutex_.unlock();
+	cv_.notify_one();
 }
 
-void IPCPipeUnixSocket::readyRead()
+void IPCUnixSocketWrapper::deinit()
+{
+	/* Deinit is to be done from the IPC thread context */
+	ASSERT(Thread::current() == this);
+
+	socket_->readyRead.disconnect(this);
+	socket_.reset();
+
+	mutex_.lock();
+	ready_ = false;
+	mutex_.unlock();
+}
+
+void IPCUnixSocketWrapper::readyRead()
 {
 	IPCUnixSocket::Payload payload;
 	int ret = socket_->receive(&payload);
@@ -93,55 +186,72 @@  void IPCPipeUnixSocket::readyRead()
 		return;
 	}
 
-	/* \todo Use span to avoid the double copy when callData is found. */
 	if (payload.data.size() < sizeof(IPCMessage::Header)) {
 		LOG(IPCPipe, Error) << "Not enough data received";
 		return;
 	}
 
-	IPCMessage ipcMessage(payload);
+	const IPCMessage::Header *header =
+		reinterpret_cast<IPCMessage::Header *>(payload.data.data());
+	bool syncComplete = false;
+	mutex_.lock();
+	if (sendSyncPending_ && sendSyncCookie_ == header->cookie) {
+		syncComplete = true;
+		sendSyncPending_ = false;
+		*sendSyncResponse_ = std::move(payload);
+	}
+	mutex_.unlock();
 
-	auto callData = callData_.find(ipcMessage.header().cookie);
-	if (callData != callData_.end()) {
-		*callData->second.response = std::move(payload);
-		callData->second.done = true;
+	if (syncComplete) {
+		cv_.notify_one();
 		return;
 	}
 
 	/* Received unexpected data, this means it's a call from the IPA. */
-	recv.emit(ipcMessage);
+	IPCMessage ipcMessage(payload);
+	recv_->emit(ipcMessage);
 }
 
-int IPCPipeUnixSocket::call(const IPCUnixSocket::Payload &message,
-			    IPCUnixSocket::Payload *response, uint32_t cookie)
+IPCPipeUnixSocket::IPCPipeUnixSocket(const char *ipaModulePath,
+				     const char *ipaProxyWorkerPath)
+	: IPCPipe()
 {
-	Timer timeout;
-	int ret;
+	socketWrap_ = std::make_unique<IPCUnixSocketWrapper>(&recv);
+	if (!socketWrap_->waitReady()) {
+		LOG(IPCPipe, Error) << "Failed to create socket";
+		return;
+	}
+	int fd = socketWrap_->fd();
 
-	const auto result = callData_.insert({ cookie, { response, false } });
-	const auto &iter = result.first;
+	std::vector<int> fds;
+	std::vector<std::string> args;
+	args.push_back(ipaModulePath);
+	args.push_back(std::to_string(fd));
+	fds.push_back(fd);
 
-	ret = socket_->send(message);
+	proc_ = std::make_unique<Process>();
+	int ret = proc_->start(ipaProxyWorkerPath, args, fds);
 	if (ret) {
-		callData_.erase(iter);
-		return ret;
+		LOG(IPCPipe, Error)
+			<< "Failed to start proxy worker process";
+		return;
 	}
 
-	/* \todo Make this less dangerous, see IPCPipe::sendSync() */
-	timeout.start(2000ms);
-	while (!iter->second.done) {
-		if (!timeout.isRunning()) {
-			LOG(IPCPipe, Error) << "Call timeout!";
-			callData_.erase(iter);
-			return -ETIMEDOUT;
-		}
+	connected_ = true;
+}
 
-		Thread::current()->eventDispatcher()->processEvents();
-	}
+IPCPipeUnixSocket::~IPCPipeUnixSocket()
+{
+}
 
-	callData_.erase(iter);
+int IPCPipeUnixSocket::sendSync(const IPCMessage &in, IPCMessage *out)
+{
+	return socketWrap_->sendSync(in, out);
+}
 
-	return 0;
+int IPCPipeUnixSocket::sendAsync(const IPCMessage &data)
+{
+	return socketWrap_->sendAsync(data);
 }
 
 } /* namespace libcamera */