[libcamera-devel,v4,3/3] libcamera: ipc: unix: Make socket operation asynchronous

Message ID 20190701232339.5191-4-laurent.pinchart@ideasonboard.com
State Accepted
Commit f137451817f47c0bfe59586afe5af7b51f8ccad4
Headers show
Series
  • libcamera: ipc: unix: Add a IPC mechanism based on Unix sockets
Related show

Commit Message

Laurent Pinchart July 1, 2019, 11:23 p.m. UTC
Blocking socket operation when receiving messages may lead to long
delays, and possibly a complete deadlock, if the remote side delays
sending of the payload after the header, or doesn't send the payload at
all. To avoid this, make the socket non-blocking and implement a simple
state machine to receive the header synchronously with the socket read
notification. The payload read is still synchronous with the receive()
method to avoid data copies.

Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
---
 src/libcamera/include/ipc_unixsocket.h |  2 +
 src/libcamera/ipc_unixsocket.cpp       | 88 ++++++++++++++++++--------
 2 files changed, 63 insertions(+), 27 deletions(-)

Comments

Niklas Söderlund July 1, 2019, 11:36 p.m. UTC | #1
Hi Laurent,

Thanks for your work.

On 2019-07-02 02:23:39 +0300, Laurent Pinchart wrote:
> Blocking socket operation when receiving messages may lead to long
> delays, and possibly a complete deadlock, if the remote side delays
> sending of the payload after the header, or doesn't send the payload at
> all. To avoid this, make the socket non-blocking and implement a simple
> state machine to receive the header synchronously with the socket read
> notification. The payload read is still synchronous with the receive()
> method to avoid data copies.
> 
> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>

Reviewed-by: Niklas Söderlund <niklas.soderlund@ragnatech.se>

> ---
>  src/libcamera/include/ipc_unixsocket.h |  2 +
>  src/libcamera/ipc_unixsocket.cpp       | 88 ++++++++++++++++++--------
>  2 files changed, 63 insertions(+), 27 deletions(-)
> 
> diff --git a/src/libcamera/include/ipc_unixsocket.h b/src/libcamera/include/ipc_unixsocket.h
> index ef166d742554..03e9fe492bde 100644
> --- a/src/libcamera/include/ipc_unixsocket.h
> +++ b/src/libcamera/include/ipc_unixsocket.h
> @@ -49,6 +49,8 @@ private:
>  	void dataNotifier(EventNotifier *notifier);
>  
>  	int fd_;
> +	bool headerReceived_;
> +	struct Header header_;
>  	EventNotifier *notifier_;
>  };
>  
> diff --git a/src/libcamera/ipc_unixsocket.cpp b/src/libcamera/ipc_unixsocket.cpp
> index c11f116093c5..def08eef00f8 100644
> --- a/src/libcamera/ipc_unixsocket.cpp
> +++ b/src/libcamera/ipc_unixsocket.cpp
> @@ -7,6 +7,7 @@
>  
>  #include "ipc_unixsocket.h"
>  
> +#include <poll.h>
>  #include <string.h>
>  #include <sys/socket.h>
>  #include <unistd.h>
> @@ -49,10 +50,10 @@ LOG_DEFINE_CATEGORY(IPCUnixSocket)
>   * transporting entire payloads with guaranteed ordering.
>   *
>   * The IPC design is asynchronous, a message is queued to a receiver which gets
> - * notified that a message is ready to be consumed by a signal. The queuer of
> - * the message gets no notification when a message is delivered nor processed.
> - * If such interactions are needed a protocol specific to the users use-case
> - * should be implemented on top of the IPC objects.
> + * notified that a message is ready to be consumed by the \ref readyRead
> + * signal. The sender of the message gets no notification when a message is
> + * delivered nor processed. If such interactions are needed a protocol specific
> + * to the users use-case should be implemented on top of the IPC objects.
>   *
>   * Establishment of an IPC channel is asymmetrical. The side that initiates
>   * communication first instantiates a local side socket and creates the channel
> @@ -64,7 +65,7 @@ LOG_DEFINE_CATEGORY(IPCUnixSocket)
>   */
>  
>  IPCUnixSocket::IPCUnixSocket()
> -	: fd_(-1), notifier_(nullptr)
> +	: fd_(-1), headerReceived_(false), notifier_(nullptr)
>  {
>  }
>  
> @@ -89,7 +90,7 @@ int IPCUnixSocket::create()
>  	int sockets[2];
>  	int ret;
>  
> -	ret = socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets);
> +	ret = socketpair(AF_UNIX, SOCK_DGRAM | SOCK_NONBLOCK, 0, sockets);
>  	if (ret) {
>  		ret = -errno;
>  		LOG(IPCUnixSocket, Error)
> @@ -142,6 +143,7 @@ void IPCUnixSocket::close()
>  	::close(fd_);
>  
>  	fd_ = -1;
> +	headerReceived_ = false;
>  }
>  
>  /**
> @@ -193,38 +195,38 @@ int IPCUnixSocket::send(const Payload &payload)
>   * \param[out] payload Payload where to write the received message
>   *
>   * This method receives the message payload from the IPC channel and writes it
> - * to the \a payload. It blocks until one message is received, if an
> - * asynchronous behavior is desired this method should be called when the
> - * readyRead signal is emitted.
> + * to the \a payload. If no message payload is available, it returns
> + * immediately with -EAGAIN. The \ref readyRead signal shall be used to receive
> + * notification of message availability.
>   *
>   * \todo Add state machine to make sure we don't block forever and that
>   * a header is always followed by a payload.
>   *
>   * \return 0 on success or a negative error code otherwise
> + * \retval -EAGAIN No message payload is available
> + * \retval -ENOTCONN The socket is not connected (neither create() nor bind()
> + * has been called)
>   */
>  int IPCUnixSocket::receive(Payload *payload)
>  {
> -	Header hdr;
> -	int ret;
> -
>  	if (!isBound())
>  		return -ENOTCONN;
>  
> -	if (!payload)
> -		return -EINVAL;
> +	if (!headerReceived_)
> +		return -EAGAIN;
>  
> -	ret = ::recv(fd_, &hdr, sizeof(hdr), 0);
> -	if (ret < 0) {
> -		ret = -errno;
> -		LOG(IPCUnixSocket, Error)
> -			<< "Failed to recv header: " << strerror(-ret);
> +	payload->data.resize(header_.data);
> +	payload->fds.resize(header_.fds);
> +
> +	int ret = recvData(payload->data.data(), header_.data,
> +			   payload->fds.data(), header_.fds);
> +	if (ret < 0)
>  		return ret;
> -	}
>  
> -	payload->data.resize(hdr.data);
> -	payload->fds.resize(hdr.fds);
> +	headerReceived_ = false;
> +	notifier_->setEnabled(true);
>  
> -	return recvData(payload->data.data(), hdr.data, payload->fds.data(), hdr.fds);
> +	return 0;
>  }
>  
>  /**
> @@ -232,7 +234,8 @@ int IPCUnixSocket::receive(Payload *payload)
>   * \brief A Signal emitted when a message is ready to be read
>   */
>  
> -int IPCUnixSocket::sendData(const void *buffer, size_t length, const int32_t *fds, unsigned int num)
> +int IPCUnixSocket::sendData(const void *buffer, size_t length,
> +			    const int32_t *fds, unsigned int num)
>  {
>  	struct iovec iov[1];
>  	iov[0].iov_base = const_cast<void *>(buffer);
> @@ -266,7 +269,8 @@ int IPCUnixSocket::sendData(const void *buffer, size_t length, const int32_t *fd
>  	return 0;
>  }
>  
> -int IPCUnixSocket::recvData(void *buffer, size_t length, int32_t *fds, unsigned int num)
> +int IPCUnixSocket::recvData(void *buffer, size_t length,
> +			    int32_t *fds, unsigned int num)
>  {
>  	struct iovec iov[1];
>  	iov[0].iov_base = buffer;
> @@ -291,8 +295,9 @@ int IPCUnixSocket::recvData(void *buffer, size_t length, int32_t *fds, unsigned
>  
>  	if (recvmsg(fd_, &msg, 0) < 0) {
>  		int ret = -errno;
> -		LOG(IPCUnixSocket, Error)
> -			<< "Failed to recvmsg: " << strerror(-ret);
> +		if (ret != -EAGAIN)
> +			LOG(IPCUnixSocket, Error)
> +				<< "Failed to recvmsg: " << strerror(-ret);
>  		return ret;
>  	}
>  
> @@ -303,6 +308,35 @@ int IPCUnixSocket::recvData(void *buffer, size_t length, int32_t *fds, unsigned
>  
>  void IPCUnixSocket::dataNotifier(EventNotifier *notifier)
>  {
> +	int ret;
> +
> +	if (!headerReceived_) {
> +		/* Receive the header. */
> +		ret = ::recv(fd_, &header_, sizeof(header_), 0);
> +		if (ret < 0) {
> +			ret = -errno;
> +			LOG(IPCUnixSocket, Error)
> +				<< "Failed to receive header: " << strerror(-ret);
> +			return;
> +		}
> +
> +		headerReceived_ = true;
> +	}
> +
> +	/*
> +	 * If the payload has arrived, disable the notifier and emit the
> +	 * readyRead signal. The notifier will be reenabled by the receive()
> +	 * method.
> +	 */
> +	struct pollfd fds = { fd_, POLLIN, 0 };
> +	ret = poll(&fds, 1, 0);
> +	if (ret < 0)
> +		return;
> +
> +	if (!(fds.revents & POLLIN))
> +		return;
> +
> +	notifier_->setEnabled(false);
>  	readyRead.emit(this);
>  }
>  
> -- 
> Regards,
> 
> Laurent Pinchart
> 
> _______________________________________________
> libcamera-devel mailing list
> libcamera-devel@lists.libcamera.org
> https://lists.libcamera.org/listinfo/libcamera-devel

Patch

diff --git a/src/libcamera/include/ipc_unixsocket.h b/src/libcamera/include/ipc_unixsocket.h
index ef166d742554..03e9fe492bde 100644
--- a/src/libcamera/include/ipc_unixsocket.h
+++ b/src/libcamera/include/ipc_unixsocket.h
@@ -49,6 +49,8 @@  private:
 	void dataNotifier(EventNotifier *notifier);
 
 	int fd_;
+	bool headerReceived_;
+	struct Header header_;
 	EventNotifier *notifier_;
 };
 
diff --git a/src/libcamera/ipc_unixsocket.cpp b/src/libcamera/ipc_unixsocket.cpp
index c11f116093c5..def08eef00f8 100644
--- a/src/libcamera/ipc_unixsocket.cpp
+++ b/src/libcamera/ipc_unixsocket.cpp
@@ -7,6 +7,7 @@ 
 
 #include "ipc_unixsocket.h"
 
+#include <poll.h>
 #include <string.h>
 #include <sys/socket.h>
 #include <unistd.h>
@@ -49,10 +50,10 @@  LOG_DEFINE_CATEGORY(IPCUnixSocket)
  * transporting entire payloads with guaranteed ordering.
  *
  * The IPC design is asynchronous, a message is queued to a receiver which gets
- * notified that a message is ready to be consumed by a signal. The queuer of
- * the message gets no notification when a message is delivered nor processed.
- * If such interactions are needed a protocol specific to the users use-case
- * should be implemented on top of the IPC objects.
+ * notified that a message is ready to be consumed by the \ref readyRead
+ * signal. The sender of the message gets no notification when a message is
+ * delivered nor processed. If such interactions are needed a protocol specific
+ * to the users use-case should be implemented on top of the IPC objects.
  *
  * Establishment of an IPC channel is asymmetrical. The side that initiates
  * communication first instantiates a local side socket and creates the channel
@@ -64,7 +65,7 @@  LOG_DEFINE_CATEGORY(IPCUnixSocket)
  */
 
 IPCUnixSocket::IPCUnixSocket()
-	: fd_(-1), notifier_(nullptr)
+	: fd_(-1), headerReceived_(false), notifier_(nullptr)
 {
 }
 
@@ -89,7 +90,7 @@  int IPCUnixSocket::create()
 	int sockets[2];
 	int ret;
 
-	ret = socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets);
+	ret = socketpair(AF_UNIX, SOCK_DGRAM | SOCK_NONBLOCK, 0, sockets);
 	if (ret) {
 		ret = -errno;
 		LOG(IPCUnixSocket, Error)
@@ -142,6 +143,7 @@  void IPCUnixSocket::close()
 	::close(fd_);
 
 	fd_ = -1;
+	headerReceived_ = false;
 }
 
 /**
@@ -193,38 +195,38 @@  int IPCUnixSocket::send(const Payload &payload)
  * \param[out] payload Payload where to write the received message
  *
  * This method receives the message payload from the IPC channel and writes it
- * to the \a payload. It blocks until one message is received, if an
- * asynchronous behavior is desired this method should be called when the
- * readyRead signal is emitted.
+ * to the \a payload. If no message payload is available, it returns
+ * immediately with -EAGAIN. The \ref readyRead signal shall be used to receive
+ * notification of message availability.
  *
  * \todo Add state machine to make sure we don't block forever and that
  * a header is always followed by a payload.
  *
  * \return 0 on success or a negative error code otherwise
+ * \retval -EAGAIN No message payload is available
+ * \retval -ENOTCONN The socket is not connected (neither create() nor bind()
+ * has been called)
  */
 int IPCUnixSocket::receive(Payload *payload)
 {
-	Header hdr;
-	int ret;
-
 	if (!isBound())
 		return -ENOTCONN;
 
-	if (!payload)
-		return -EINVAL;
+	if (!headerReceived_)
+		return -EAGAIN;
 
-	ret = ::recv(fd_, &hdr, sizeof(hdr), 0);
-	if (ret < 0) {
-		ret = -errno;
-		LOG(IPCUnixSocket, Error)
-			<< "Failed to recv header: " << strerror(-ret);
+	payload->data.resize(header_.data);
+	payload->fds.resize(header_.fds);
+
+	int ret = recvData(payload->data.data(), header_.data,
+			   payload->fds.data(), header_.fds);
+	if (ret < 0)
 		return ret;
-	}
 
-	payload->data.resize(hdr.data);
-	payload->fds.resize(hdr.fds);
+	headerReceived_ = false;
+	notifier_->setEnabled(true);
 
-	return recvData(payload->data.data(), hdr.data, payload->fds.data(), hdr.fds);
+	return 0;
 }
 
 /**
@@ -232,7 +234,8 @@  int IPCUnixSocket::receive(Payload *payload)
  * \brief A Signal emitted when a message is ready to be read
  */
 
-int IPCUnixSocket::sendData(const void *buffer, size_t length, const int32_t *fds, unsigned int num)
+int IPCUnixSocket::sendData(const void *buffer, size_t length,
+			    const int32_t *fds, unsigned int num)
 {
 	struct iovec iov[1];
 	iov[0].iov_base = const_cast<void *>(buffer);
@@ -266,7 +269,8 @@  int IPCUnixSocket::sendData(const void *buffer, size_t length, const int32_t *fd
 	return 0;
 }
 
-int IPCUnixSocket::recvData(void *buffer, size_t length, int32_t *fds, unsigned int num)
+int IPCUnixSocket::recvData(void *buffer, size_t length,
+			    int32_t *fds, unsigned int num)
 {
 	struct iovec iov[1];
 	iov[0].iov_base = buffer;
@@ -291,8 +295,9 @@  int IPCUnixSocket::recvData(void *buffer, size_t length, int32_t *fds, unsigned
 
 	if (recvmsg(fd_, &msg, 0) < 0) {
 		int ret = -errno;
-		LOG(IPCUnixSocket, Error)
-			<< "Failed to recvmsg: " << strerror(-ret);
+		if (ret != -EAGAIN)
+			LOG(IPCUnixSocket, Error)
+				<< "Failed to recvmsg: " << strerror(-ret);
 		return ret;
 	}
 
@@ -303,6 +308,35 @@  int IPCUnixSocket::recvData(void *buffer, size_t length, int32_t *fds, unsigned
 
 void IPCUnixSocket::dataNotifier(EventNotifier *notifier)
 {
+	int ret;
+
+	if (!headerReceived_) {
+		/* Receive the header. */
+		ret = ::recv(fd_, &header_, sizeof(header_), 0);
+		if (ret < 0) {
+			ret = -errno;
+			LOG(IPCUnixSocket, Error)
+				<< "Failed to receive header: " << strerror(-ret);
+			return;
+		}
+
+		headerReceived_ = true;
+	}
+
+	/*
+	 * If the payload has arrived, disable the notifier and emit the
+	 * readyRead signal. The notifier will be reenabled by the receive()
+	 * method.
+	 */
+	struct pollfd fds = { fd_, POLLIN, 0 };
+	ret = poll(&fds, 1, 0);
+	if (ret < 0)
+		return;
+
+	if (!(fds.revents & POLLIN))
+		return;
+
+	notifier_->setEnabled(false);
 	readyRead.emit(this);
 }