Message ID | 20190701232339.5191-4-laurent.pinchart@ideasonboard.com |
---|---|
State | Accepted |
Commit | f137451817f47c0bfe59586afe5af7b51f8ccad4 |
Headers | show |
Series |
|
Related | show |
Hi Laurent, Thanks for your work. On 2019-07-02 02:23:39 +0300, Laurent Pinchart wrote: > Blocking socket operation when receiving messages may lead to long > delays, and possibly a complete deadlock, if the remote side delays > sending of the payload after the header, or doesn't send the payload at > all. To avoid this, make the socket non-blocking and implement a simple > state machine to receive the header synchronously with the socket read > notification. The payload read is still synchronous with the receive() > method to avoid data copies. > > Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com> Reviewed-by: Niklas Söderlund <niklas.soderlund@ragnatech.se> > --- > src/libcamera/include/ipc_unixsocket.h | 2 + > src/libcamera/ipc_unixsocket.cpp | 88 ++++++++++++++++++-------- > 2 files changed, 63 insertions(+), 27 deletions(-) > > diff --git a/src/libcamera/include/ipc_unixsocket.h b/src/libcamera/include/ipc_unixsocket.h > index ef166d742554..03e9fe492bde 100644 > --- a/src/libcamera/include/ipc_unixsocket.h > +++ b/src/libcamera/include/ipc_unixsocket.h > @@ -49,6 +49,8 @@ private: > void dataNotifier(EventNotifier *notifier); > > int fd_; > + bool headerReceived_; > + struct Header header_; > EventNotifier *notifier_; > }; > > diff --git a/src/libcamera/ipc_unixsocket.cpp b/src/libcamera/ipc_unixsocket.cpp > index c11f116093c5..def08eef00f8 100644 > --- a/src/libcamera/ipc_unixsocket.cpp > +++ b/src/libcamera/ipc_unixsocket.cpp > @@ -7,6 +7,7 @@ > > #include "ipc_unixsocket.h" > > +#include <poll.h> > #include <string.h> > #include <sys/socket.h> > #include <unistd.h> > @@ -49,10 +50,10 @@ LOG_DEFINE_CATEGORY(IPCUnixSocket) > * transporting entire payloads with guaranteed ordering. > * > * The IPC design is asynchronous, a message is queued to a receiver which gets > - * notified that a message is ready to be consumed by a signal. The queuer of > - * the message gets no notification when a message is delivered nor processed. > - * If such interactions are needed a protocol specific to the users use-case > - * should be implemented on top of the IPC objects. > + * notified that a message is ready to be consumed by the \ref readyRead > + * signal. The sender of the message gets no notification when a message is > + * delivered nor processed. If such interactions are needed a protocol specific > + * to the users use-case should be implemented on top of the IPC objects. > * > * Establishment of an IPC channel is asymmetrical. The side that initiates > * communication first instantiates a local side socket and creates the channel > @@ -64,7 +65,7 @@ LOG_DEFINE_CATEGORY(IPCUnixSocket) > */ > > IPCUnixSocket::IPCUnixSocket() > - : fd_(-1), notifier_(nullptr) > + : fd_(-1), headerReceived_(false), notifier_(nullptr) > { > } > > @@ -89,7 +90,7 @@ int IPCUnixSocket::create() > int sockets[2]; > int ret; > > - ret = socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets); > + ret = socketpair(AF_UNIX, SOCK_DGRAM | SOCK_NONBLOCK, 0, sockets); > if (ret) { > ret = -errno; > LOG(IPCUnixSocket, Error) > @@ -142,6 +143,7 @@ void IPCUnixSocket::close() > ::close(fd_); > > fd_ = -1; > + headerReceived_ = false; > } > > /** > @@ -193,38 +195,38 @@ int IPCUnixSocket::send(const Payload &payload) > * \param[out] payload Payload where to write the received message > * > * This method receives the message payload from the IPC channel and writes it > - * to the \a payload. It blocks until one message is received, if an > - * asynchronous behavior is desired this method should be called when the > - * readyRead signal is emitted. > + * to the \a payload. If no message payload is available, it returns > + * immediately with -EAGAIN. The \ref readyRead signal shall be used to receive > + * notification of message availability. > * > * \todo Add state machine to make sure we don't block forever and that > * a header is always followed by a payload. > * > * \return 0 on success or a negative error code otherwise > + * \retval -EAGAIN No message payload is available > + * \retval -ENOTCONN The socket is not connected (neither create() nor bind() > + * has been called) > */ > int IPCUnixSocket::receive(Payload *payload) > { > - Header hdr; > - int ret; > - > if (!isBound()) > return -ENOTCONN; > > - if (!payload) > - return -EINVAL; > + if (!headerReceived_) > + return -EAGAIN; > > - ret = ::recv(fd_, &hdr, sizeof(hdr), 0); > - if (ret < 0) { > - ret = -errno; > - LOG(IPCUnixSocket, Error) > - << "Failed to recv header: " << strerror(-ret); > + payload->data.resize(header_.data); > + payload->fds.resize(header_.fds); > + > + int ret = recvData(payload->data.data(), header_.data, > + payload->fds.data(), header_.fds); > + if (ret < 0) > return ret; > - } > > - payload->data.resize(hdr.data); > - payload->fds.resize(hdr.fds); > + headerReceived_ = false; > + notifier_->setEnabled(true); > > - return recvData(payload->data.data(), hdr.data, payload->fds.data(), hdr.fds); > + return 0; > } > > /** > @@ -232,7 +234,8 @@ int IPCUnixSocket::receive(Payload *payload) > * \brief A Signal emitted when a message is ready to be read > */ > > -int IPCUnixSocket::sendData(const void *buffer, size_t length, const int32_t *fds, unsigned int num) > +int IPCUnixSocket::sendData(const void *buffer, size_t length, > + const int32_t *fds, unsigned int num) > { > struct iovec iov[1]; > iov[0].iov_base = const_cast<void *>(buffer); > @@ -266,7 +269,8 @@ int IPCUnixSocket::sendData(const void *buffer, size_t length, const int32_t *fd > return 0; > } > > -int IPCUnixSocket::recvData(void *buffer, size_t length, int32_t *fds, unsigned int num) > +int IPCUnixSocket::recvData(void *buffer, size_t length, > + int32_t *fds, unsigned int num) > { > struct iovec iov[1]; > iov[0].iov_base = buffer; > @@ -291,8 +295,9 @@ int IPCUnixSocket::recvData(void *buffer, size_t length, int32_t *fds, unsigned > > if (recvmsg(fd_, &msg, 0) < 0) { > int ret = -errno; > - LOG(IPCUnixSocket, Error) > - << "Failed to recvmsg: " << strerror(-ret); > + if (ret != -EAGAIN) > + LOG(IPCUnixSocket, Error) > + << "Failed to recvmsg: " << strerror(-ret); > return ret; > } > > @@ -303,6 +308,35 @@ int IPCUnixSocket::recvData(void *buffer, size_t length, int32_t *fds, unsigned > > void IPCUnixSocket::dataNotifier(EventNotifier *notifier) > { > + int ret; > + > + if (!headerReceived_) { > + /* Receive the header. */ > + ret = ::recv(fd_, &header_, sizeof(header_), 0); > + if (ret < 0) { > + ret = -errno; > + LOG(IPCUnixSocket, Error) > + << "Failed to receive header: " << strerror(-ret); > + return; > + } > + > + headerReceived_ = true; > + } > + > + /* > + * If the payload has arrived, disable the notifier and emit the > + * readyRead signal. The notifier will be reenabled by the receive() > + * method. > + */ > + struct pollfd fds = { fd_, POLLIN, 0 }; > + ret = poll(&fds, 1, 0); > + if (ret < 0) > + return; > + > + if (!(fds.revents & POLLIN)) > + return; > + > + notifier_->setEnabled(false); > readyRead.emit(this); > } > > -- > Regards, > > Laurent Pinchart > > _______________________________________________ > libcamera-devel mailing list > libcamera-devel@lists.libcamera.org > https://lists.libcamera.org/listinfo/libcamera-devel
diff --git a/src/libcamera/include/ipc_unixsocket.h b/src/libcamera/include/ipc_unixsocket.h index ef166d742554..03e9fe492bde 100644 --- a/src/libcamera/include/ipc_unixsocket.h +++ b/src/libcamera/include/ipc_unixsocket.h @@ -49,6 +49,8 @@ private: void dataNotifier(EventNotifier *notifier); int fd_; + bool headerReceived_; + struct Header header_; EventNotifier *notifier_; }; diff --git a/src/libcamera/ipc_unixsocket.cpp b/src/libcamera/ipc_unixsocket.cpp index c11f116093c5..def08eef00f8 100644 --- a/src/libcamera/ipc_unixsocket.cpp +++ b/src/libcamera/ipc_unixsocket.cpp @@ -7,6 +7,7 @@ #include "ipc_unixsocket.h" +#include <poll.h> #include <string.h> #include <sys/socket.h> #include <unistd.h> @@ -49,10 +50,10 @@ LOG_DEFINE_CATEGORY(IPCUnixSocket) * transporting entire payloads with guaranteed ordering. * * The IPC design is asynchronous, a message is queued to a receiver which gets - * notified that a message is ready to be consumed by a signal. The queuer of - * the message gets no notification when a message is delivered nor processed. - * If such interactions are needed a protocol specific to the users use-case - * should be implemented on top of the IPC objects. + * notified that a message is ready to be consumed by the \ref readyRead + * signal. The sender of the message gets no notification when a message is + * delivered nor processed. If such interactions are needed a protocol specific + * to the users use-case should be implemented on top of the IPC objects. * * Establishment of an IPC channel is asymmetrical. The side that initiates * communication first instantiates a local side socket and creates the channel @@ -64,7 +65,7 @@ LOG_DEFINE_CATEGORY(IPCUnixSocket) */ IPCUnixSocket::IPCUnixSocket() - : fd_(-1), notifier_(nullptr) + : fd_(-1), headerReceived_(false), notifier_(nullptr) { } @@ -89,7 +90,7 @@ int IPCUnixSocket::create() int sockets[2]; int ret; - ret = socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets); + ret = socketpair(AF_UNIX, SOCK_DGRAM | SOCK_NONBLOCK, 0, sockets); if (ret) { ret = -errno; LOG(IPCUnixSocket, Error) @@ -142,6 +143,7 @@ void IPCUnixSocket::close() ::close(fd_); fd_ = -1; + headerReceived_ = false; } /** @@ -193,38 +195,38 @@ int IPCUnixSocket::send(const Payload &payload) * \param[out] payload Payload where to write the received message * * This method receives the message payload from the IPC channel and writes it - * to the \a payload. It blocks until one message is received, if an - * asynchronous behavior is desired this method should be called when the - * readyRead signal is emitted. + * to the \a payload. If no message payload is available, it returns + * immediately with -EAGAIN. The \ref readyRead signal shall be used to receive + * notification of message availability. * * \todo Add state machine to make sure we don't block forever and that * a header is always followed by a payload. * * \return 0 on success or a negative error code otherwise + * \retval -EAGAIN No message payload is available + * \retval -ENOTCONN The socket is not connected (neither create() nor bind() + * has been called) */ int IPCUnixSocket::receive(Payload *payload) { - Header hdr; - int ret; - if (!isBound()) return -ENOTCONN; - if (!payload) - return -EINVAL; + if (!headerReceived_) + return -EAGAIN; - ret = ::recv(fd_, &hdr, sizeof(hdr), 0); - if (ret < 0) { - ret = -errno; - LOG(IPCUnixSocket, Error) - << "Failed to recv header: " << strerror(-ret); + payload->data.resize(header_.data); + payload->fds.resize(header_.fds); + + int ret = recvData(payload->data.data(), header_.data, + payload->fds.data(), header_.fds); + if (ret < 0) return ret; - } - payload->data.resize(hdr.data); - payload->fds.resize(hdr.fds); + headerReceived_ = false; + notifier_->setEnabled(true); - return recvData(payload->data.data(), hdr.data, payload->fds.data(), hdr.fds); + return 0; } /** @@ -232,7 +234,8 @@ int IPCUnixSocket::receive(Payload *payload) * \brief A Signal emitted when a message is ready to be read */ -int IPCUnixSocket::sendData(const void *buffer, size_t length, const int32_t *fds, unsigned int num) +int IPCUnixSocket::sendData(const void *buffer, size_t length, + const int32_t *fds, unsigned int num) { struct iovec iov[1]; iov[0].iov_base = const_cast<void *>(buffer); @@ -266,7 +269,8 @@ int IPCUnixSocket::sendData(const void *buffer, size_t length, const int32_t *fd return 0; } -int IPCUnixSocket::recvData(void *buffer, size_t length, int32_t *fds, unsigned int num) +int IPCUnixSocket::recvData(void *buffer, size_t length, + int32_t *fds, unsigned int num) { struct iovec iov[1]; iov[0].iov_base = buffer; @@ -291,8 +295,9 @@ int IPCUnixSocket::recvData(void *buffer, size_t length, int32_t *fds, unsigned if (recvmsg(fd_, &msg, 0) < 0) { int ret = -errno; - LOG(IPCUnixSocket, Error) - << "Failed to recvmsg: " << strerror(-ret); + if (ret != -EAGAIN) + LOG(IPCUnixSocket, Error) + << "Failed to recvmsg: " << strerror(-ret); return ret; } @@ -303,6 +308,35 @@ int IPCUnixSocket::recvData(void *buffer, size_t length, int32_t *fds, unsigned void IPCUnixSocket::dataNotifier(EventNotifier *notifier) { + int ret; + + if (!headerReceived_) { + /* Receive the header. */ + ret = ::recv(fd_, &header_, sizeof(header_), 0); + if (ret < 0) { + ret = -errno; + LOG(IPCUnixSocket, Error) + << "Failed to receive header: " << strerror(-ret); + return; + } + + headerReceived_ = true; + } + + /* + * If the payload has arrived, disable the notifier and emit the + * readyRead signal. The notifier will be reenabled by the receive() + * method. + */ + struct pollfd fds = { fd_, POLLIN, 0 }; + ret = poll(&fds, 1, 0); + if (ret < 0) + return; + + if (!(fds.revents & POLLIN)) + return; + + notifier_->setEnabled(false); readyRead.emit(this); }
Blocking socket operation when receiving messages may lead to long delays, and possibly a complete deadlock, if the remote side delays sending of the payload after the header, or doesn't send the payload at all. To avoid this, make the socket non-blocking and implement a simple state machine to receive the header synchronously with the socket read notification. The payload read is still synchronous with the receive() method to avoid data copies. Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com> --- src/libcamera/include/ipc_unixsocket.h | 2 + src/libcamera/ipc_unixsocket.cpp | 88 ++++++++++++++++++-------- 2 files changed, 63 insertions(+), 27 deletions(-)