[libcamera-devel,v8,7/7] android: post_processor: Make post processing async
diff mbox series

Message ID 20211026072148.164831-8-umang.jain@ideasonboard.com
State Accepted
Commit b1cefe38f360915ad597ab2934c009bd1e46d10d
Headers show
Series
  • Async Post Processor
Related show

Commit Message

Umang Jain Oct. 26, 2021, 7:21 a.m. UTC
Introduce a dedicated worker class derived from libcamera::Thread.
The worker class maintains a queue for post-processing requests
and waits for a post-processing request to become available.
It will process them as per FIFO before de-queuing it from the
queue.

The entire post-processing handling iteration is locked under
streamsProcessMutex_ which helps us to queue all the post-processing
request at once, before any of the post-processing completion slot
(streamProcessingComplete()) is allowed to run for post-processing
requests completing in parallel. This helps us to manage both
synchronous and asynchronous errors encountered during the entire
post processing operation. Since a post-processing operation can
even complete after CameraDevice::requestComplete() has returned,
we need to check and complete the descriptor from
streamProcessingComplete() running in the PostProcessorWorker's
thread.

This patch also implements a flush() for the PostProcessorWorker
class which is responsible to purge post-processing requests
queued up while a camera is stopping/flushing. It is hooked with
CameraStream::flush(), which isn't used currently but will be
used when we handle flush/stop scenarios in greater detail
subsequently (in a different patchset).

The libcamera request completion handler CameraDevice::requestComplete()
assumes that the request that has just completed is at the front of the
queue. Now that the post-processor runs asynchronously, this isn't true
anymore, a request being post-processed will stay in the queue and a new
libcamera request may complete. Remove that assumption, and use the
request cookie to obtain the Camera3RequestDescriptor.

Signed-off-by: Umang Jain <umang.jain@ideasonboard.com>
Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
Reviewed-by: Hirokazu Honda <hiroh@chromium.org>
---
 src/android/camera_device.cpp |  48 +++++++---------
 src/android/camera_stream.cpp | 101 ++++++++++++++++++++++++++++++++--
 src/android/camera_stream.h   |  38 ++++++++++++-
 3 files changed, 153 insertions(+), 34 deletions(-)

Comments

Laurent Pinchart Oct. 26, 2021, 10:05 a.m. UTC | #1
Hi Umang,

Thank you for the patch.

On Tue, Oct 26, 2021 at 12:51:48PM +0530, Umang Jain wrote:
> Introduce a dedicated worker class derived from libcamera::Thread.
> The worker class maintains a queue for post-processing requests
> and waits for a post-processing request to become available.
> It will process them as per FIFO before de-queuing it from the
> queue.
> 
> The entire post-processing handling iteration is locked under
> streamsProcessMutex_ which helps us to queue all the post-processing
> request at once, before any of the post-processing completion slot
> (streamProcessingComplete()) is allowed to run for post-processing
> requests completing in parallel. This helps us to manage both
> synchronous and asynchronous errors encountered during the entire
> post processing operation. Since a post-processing operation can
> even complete after CameraDevice::requestComplete() has returned,
> we need to check and complete the descriptor from
> streamProcessingComplete() running in the PostProcessorWorker's
> thread.
> 
> This patch also implements a flush() for the PostProcessorWorker
> class which is responsible to purge post-processing requests
> queued up while a camera is stopping/flushing. It is hooked with
> CameraStream::flush(), which isn't used currently but will be
> used when we handle flush/stop scenarios in greater detail
> subsequently (in a different patchset).
> 
> The libcamera request completion handler CameraDevice::requestComplete()
> assumes that the request that has just completed is at the front of the
> queue. Now that the post-processor runs asynchronously, this isn't true
> anymore, a request being post-processed will stay in the queue and a new
> libcamera request may complete. Remove that assumption, and use the
> request cookie to obtain the Camera3RequestDescriptor.
> 
> Signed-off-by: Umang Jain <umang.jain@ideasonboard.com>
> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> Reviewed-by: Hirokazu Honda <hiroh@chromium.org>

Reviewed-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>

> ---
>  src/android/camera_device.cpp |  48 +++++++---------
>  src/android/camera_stream.cpp | 101 ++++++++++++++++++++++++++++++++--
>  src/android/camera_stream.h   |  38 ++++++++++++-
>  3 files changed, 153 insertions(+), 34 deletions(-)
> 
> diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
> index fed539f3..f2e0bdbd 100644
> --- a/src/android/camera_device.cpp
> +++ b/src/android/camera_device.cpp
> @@ -1027,29 +1027,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
>  
>  void CameraDevice::requestComplete(Request *request)
>  {
> -	Camera3RequestDescriptor *descriptor;
> -	{
> -		MutexLocker descriptorsLock(descriptorsMutex_);
> -		ASSERT(!descriptors_.empty());
> -		descriptor = descriptors_.front().get();
> -	}
> -
> -	if (descriptor->request_->cookie() != request->cookie()) {
> -		/*
> -		 * \todo Clarify if the Camera has to be closed on
> -		 * ERROR_DEVICE.
> -		 */
> -		LOG(HAL, Error)
> -			<< "Out-of-order completion for request "
> -			<< utils::hex(request->cookie());
> -
> -		MutexLocker descriptorsLock(descriptorsMutex_);
> -		descriptors_.pop();
> -
> -		notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
> -
> -		return;
> -	}
> +	Camera3RequestDescriptor *descriptor =
> +		reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
>  
>  	/*
>  	 * Prepare the capture result for the Android camera stack.
> @@ -1124,9 +1103,13 @@ void CameraDevice::requestComplete(Request *request)
>  	}
>  
>  	/* Handle post-processing. */
> +	MutexLocker locker(descriptor->streamsProcessMutex_);
> +
>  	/*
> -	 * \todo Protect the loop below with streamsProcessMutex_ when post
> -	 * processor runs asynchronously.
> +	 * Queue all the post-processing streams request at once. The completion
> +	 * slot streamProcessingComplete() can only execute when we are out
> +	 * this critical section. This helps to handle synchronous errors here
> +	 * itself.
>  	 */
>  	auto iter = descriptor->pendingStreamsToProcess_.begin();
>  	while (iter != descriptor->pendingStreamsToProcess_.end()) {
> @@ -1158,8 +1141,10 @@ void CameraDevice::requestComplete(Request *request)
>  		}
>  	}
>  
> -	if (descriptor->pendingStreamsToProcess_.empty())
> +	if (descriptor->pendingStreamsToProcess_.empty()) {
> +		locker.unlock();
>  		completeDescriptor(descriptor);
> +	}
>  }
>  
>  void CameraDevice::completeDescriptor(Camera3RequestDescriptor *descriptor)
> @@ -1242,9 +1227,16 @@ void CameraDevice::streamProcessingComplete(Camera3RequestDescriptor::StreamBuff
>  		streamBuffer->stream->putBuffer(streamBuffer->internalBuffer);
>  
>  	Camera3RequestDescriptor *request = streamBuffer->request;
> -	MutexLocker locker(request->streamsProcessMutex_);
>  
> -	request->pendingStreamsToProcess_.erase(streamBuffer->stream);
> +	{
> +		MutexLocker locker(request->streamsProcessMutex_);
> +
> +		request->pendingStreamsToProcess_.erase(streamBuffer->stream);
> +		if (!request->pendingStreamsToProcess_.empty())
> +			return;
> +	}
> +
> +	completeDescriptor(streamBuffer->request);
>  }
>  
>  std::string CameraDevice::logPrefix() const
> diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
> index fed99022..9023c13c 100644
> --- a/src/android/camera_stream.cpp
> +++ b/src/android/camera_stream.cpp
> @@ -99,6 +99,7 @@ int CameraStream::configure()
>  		if (ret)
>  			return ret;
>  
> +		worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
>  		postProcessor_->processComplete.connect(
>  			this, [&](Camera3RequestDescriptor::StreamBuffer *streamBuffer,
>  				  PostProcessor::Status status) {
> @@ -112,6 +113,8 @@ int CameraStream::configure()
>  				cameraDevice_->streamProcessingComplete(streamBuffer,
>  									bufferStatus);
>  			});
> +
> +		worker_->start();
>  	}
>  
>  	if (type_ == Type::Internal) {
> @@ -178,10 +181,6 @@ int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer)
>  		streamBuffer->fence = -1;
>  	}
>  
> -	/*
> -	 * \todo Buffer mapping and processing should be moved to a
> -	 * separate thread.
> -	 */
>  	const StreamConfiguration &output = configuration();
>  	streamBuffer->dstBuffer = std::make_unique<CameraBuffer>(
>  		*streamBuffer->camera3Buffer, output.pixelFormat, output.size,
> @@ -191,11 +190,19 @@ int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer)
>  		return -EINVAL;
>  	}
>  
> -	postProcessor_->process(streamBuffer);
> +	worker_->queueRequest(streamBuffer);
>  
>  	return 0;
>  }
>  
> +void CameraStream::flush()
> +{
> +	if (!postProcessor_)
> +		return;
> +
> +	worker_->flush();
> +}
> +
>  FrameBuffer *CameraStream::getBuffer()
>  {
>  	if (!allocator_)
> @@ -223,3 +230,87 @@ void CameraStream::putBuffer(FrameBuffer *buffer)
>  
>  	buffers_.push_back(buffer);
>  }
> +
> +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
> +	: postProcessor_(postProcessor)
> +{
> +}
> +
> +CameraStream::PostProcessorWorker::~PostProcessorWorker()
> +{
> +	{
> +		libcamera::MutexLocker lock(mutex_);
> +		state_ = State::Stopped;
> +	}
> +
> +	cv_.notify_one();
> +	wait();
> +}
> +
> +void CameraStream::PostProcessorWorker::start()
> +{
> +	{
> +		libcamera::MutexLocker lock(mutex_);
> +		ASSERT(state_ != State::Running);
> +		state_ = State::Running;
> +	}
> +
> +	Thread::start();
> +}
> +
> +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest)
> +{
> +	{
> +		MutexLocker lock(mutex_);
> +		ASSERT(state_ == State::Running);
> +		requests_.push(dest);
> +	}
> +
> +	cv_.notify_one();
> +}
> +
> +void CameraStream::PostProcessorWorker::run()
> +{
> +	MutexLocker locker(mutex_);
> +
> +	while (1) {
> +		cv_.wait(locker, [&] {
> +			return state_ != State::Running || !requests_.empty();
> +		});
> +
> +		if (state_ != State::Running)
> +			break;
> +
> +		Camera3RequestDescriptor::StreamBuffer *streamBuffer = requests_.front();
> +		requests_.pop();
> +		locker.unlock();
> +
> +		postProcessor_->process(streamBuffer);
> +
> +		locker.lock();
> +	}
> +
> +	if (state_ == State::Flushing) {
> +		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests =
> +			std::move(requests_);
> +		locker.unlock();
> +
> +		while (!requests.empty()) {
> +			postProcessor_->processComplete.emit(
> +				requests.front(), PostProcessor::Status::Error);
> +			requests.pop();
> +		}
> +
> +		locker.lock();
> +		state_ = State::Stopped;
> +	}
> +}
> +
> +void CameraStream::PostProcessorWorker::flush()
> +{
> +	libcamera::MutexLocker lock(mutex_);
> +	state_ = State::Flushing;
> +	lock.unlock();
> +
> +	cv_.notify_one();
> +}
> diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
> index e74a9a3b..0c402deb 100644
> --- a/src/android/camera_stream.h
> +++ b/src/android/camera_stream.h
> @@ -7,12 +7,16 @@
>  #ifndef __ANDROID_CAMERA_STREAM_H__
>  #define __ANDROID_CAMERA_STREAM_H__
>  
> +#include <condition_variable>
>  #include <memory>
>  #include <mutex>
> +#include <queue>
>  #include <vector>
>  
>  #include <hardware/camera3.h>
>  
> +#include <libcamera/base/thread.h>
> +
>  #include <libcamera/camera.h>
>  #include <libcamera/framebuffer.h>
>  #include <libcamera/framebuffer_allocator.h>
> @@ -20,9 +24,9 @@
>  #include <libcamera/pixel_format.h>
>  
>  #include "camera_request.h"
> +#include "post_processor.h"
>  
>  class CameraDevice;
> -class PostProcessor;
>  
>  class CameraStream
>  {
> @@ -124,8 +128,38 @@ public:
>  	int process(Camera3RequestDescriptor::StreamBuffer *streamBuffer);
>  	libcamera::FrameBuffer *getBuffer();
>  	void putBuffer(libcamera::FrameBuffer *buffer);
> +	void flush();
>  
>  private:
> +	class PostProcessorWorker : public libcamera::Thread
> +	{
> +	public:
> +		enum class State {
> +			Stopped,
> +			Running,
> +			Flushing,
> +		};
> +
> +		PostProcessorWorker(PostProcessor *postProcessor);
> +		~PostProcessorWorker();
> +
> +		void start();
> +		void queueRequest(Camera3RequestDescriptor::StreamBuffer *request);
> +		void flush();
> +
> +	protected:
> +		void run() override;
> +
> +	private:
> +		PostProcessor *postProcessor_;
> +
> +		libcamera::Mutex mutex_;
> +		std::condition_variable cv_;
> +
> +		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_;
> +		State state_ = State::Stopped;
> +	};
> +
>  	int waitFence(int fence);
>  
>  	CameraDevice *const cameraDevice_;
> @@ -142,6 +176,8 @@ private:
>  	 */
>  	std::unique_ptr<std::mutex> mutex_;
>  	std::unique_ptr<PostProcessor> postProcessor_;
> +
> +	std::unique_ptr<PostProcessorWorker> worker_;
>  };
>  
>  #endif /* __ANDROID_CAMERA_STREAM__ */

Patch
diff mbox series

diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
index fed539f3..f2e0bdbd 100644
--- a/src/android/camera_device.cpp
+++ b/src/android/camera_device.cpp
@@ -1027,29 +1027,8 @@  int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
 
 void CameraDevice::requestComplete(Request *request)
 {
-	Camera3RequestDescriptor *descriptor;
-	{
-		MutexLocker descriptorsLock(descriptorsMutex_);
-		ASSERT(!descriptors_.empty());
-		descriptor = descriptors_.front().get();
-	}
-
-	if (descriptor->request_->cookie() != request->cookie()) {
-		/*
-		 * \todo Clarify if the Camera has to be closed on
-		 * ERROR_DEVICE.
-		 */
-		LOG(HAL, Error)
-			<< "Out-of-order completion for request "
-			<< utils::hex(request->cookie());
-
-		MutexLocker descriptorsLock(descriptorsMutex_);
-		descriptors_.pop();
-
-		notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
-
-		return;
-	}
+	Camera3RequestDescriptor *descriptor =
+		reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
 
 	/*
 	 * Prepare the capture result for the Android camera stack.
@@ -1124,9 +1103,13 @@  void CameraDevice::requestComplete(Request *request)
 	}
 
 	/* Handle post-processing. */
+	MutexLocker locker(descriptor->streamsProcessMutex_);
+
 	/*
-	 * \todo Protect the loop below with streamsProcessMutex_ when post
-	 * processor runs asynchronously.
+	 * Queue all the post-processing streams request at once. The completion
+	 * slot streamProcessingComplete() can only execute when we are out
+	 * this critical section. This helps to handle synchronous errors here
+	 * itself.
 	 */
 	auto iter = descriptor->pendingStreamsToProcess_.begin();
 	while (iter != descriptor->pendingStreamsToProcess_.end()) {
@@ -1158,8 +1141,10 @@  void CameraDevice::requestComplete(Request *request)
 		}
 	}
 
-	if (descriptor->pendingStreamsToProcess_.empty())
+	if (descriptor->pendingStreamsToProcess_.empty()) {
+		locker.unlock();
 		completeDescriptor(descriptor);
+	}
 }
 
 void CameraDevice::completeDescriptor(Camera3RequestDescriptor *descriptor)
@@ -1242,9 +1227,16 @@  void CameraDevice::streamProcessingComplete(Camera3RequestDescriptor::StreamBuff
 		streamBuffer->stream->putBuffer(streamBuffer->internalBuffer);
 
 	Camera3RequestDescriptor *request = streamBuffer->request;
-	MutexLocker locker(request->streamsProcessMutex_);
 
-	request->pendingStreamsToProcess_.erase(streamBuffer->stream);
+	{
+		MutexLocker locker(request->streamsProcessMutex_);
+
+		request->pendingStreamsToProcess_.erase(streamBuffer->stream);
+		if (!request->pendingStreamsToProcess_.empty())
+			return;
+	}
+
+	completeDescriptor(streamBuffer->request);
 }
 
 std::string CameraDevice::logPrefix() const
diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
index fed99022..9023c13c 100644
--- a/src/android/camera_stream.cpp
+++ b/src/android/camera_stream.cpp
@@ -99,6 +99,7 @@  int CameraStream::configure()
 		if (ret)
 			return ret;
 
+		worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
 		postProcessor_->processComplete.connect(
 			this, [&](Camera3RequestDescriptor::StreamBuffer *streamBuffer,
 				  PostProcessor::Status status) {
@@ -112,6 +113,8 @@  int CameraStream::configure()
 				cameraDevice_->streamProcessingComplete(streamBuffer,
 									bufferStatus);
 			});
+
+		worker_->start();
 	}
 
 	if (type_ == Type::Internal) {
@@ -178,10 +181,6 @@  int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer)
 		streamBuffer->fence = -1;
 	}
 
-	/*
-	 * \todo Buffer mapping and processing should be moved to a
-	 * separate thread.
-	 */
 	const StreamConfiguration &output = configuration();
 	streamBuffer->dstBuffer = std::make_unique<CameraBuffer>(
 		*streamBuffer->camera3Buffer, output.pixelFormat, output.size,
@@ -191,11 +190,19 @@  int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer)
 		return -EINVAL;
 	}
 
-	postProcessor_->process(streamBuffer);
+	worker_->queueRequest(streamBuffer);
 
 	return 0;
 }
 
+void CameraStream::flush()
+{
+	if (!postProcessor_)
+		return;
+
+	worker_->flush();
+}
+
 FrameBuffer *CameraStream::getBuffer()
 {
 	if (!allocator_)
@@ -223,3 +230,87 @@  void CameraStream::putBuffer(FrameBuffer *buffer)
 
 	buffers_.push_back(buffer);
 }
+
+CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
+	: postProcessor_(postProcessor)
+{
+}
+
+CameraStream::PostProcessorWorker::~PostProcessorWorker()
+{
+	{
+		libcamera::MutexLocker lock(mutex_);
+		state_ = State::Stopped;
+	}
+
+	cv_.notify_one();
+	wait();
+}
+
+void CameraStream::PostProcessorWorker::start()
+{
+	{
+		libcamera::MutexLocker lock(mutex_);
+		ASSERT(state_ != State::Running);
+		state_ = State::Running;
+	}
+
+	Thread::start();
+}
+
+void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest)
+{
+	{
+		MutexLocker lock(mutex_);
+		ASSERT(state_ == State::Running);
+		requests_.push(dest);
+	}
+
+	cv_.notify_one();
+}
+
+void CameraStream::PostProcessorWorker::run()
+{
+	MutexLocker locker(mutex_);
+
+	while (1) {
+		cv_.wait(locker, [&] {
+			return state_ != State::Running || !requests_.empty();
+		});
+
+		if (state_ != State::Running)
+			break;
+
+		Camera3RequestDescriptor::StreamBuffer *streamBuffer = requests_.front();
+		requests_.pop();
+		locker.unlock();
+
+		postProcessor_->process(streamBuffer);
+
+		locker.lock();
+	}
+
+	if (state_ == State::Flushing) {
+		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests =
+			std::move(requests_);
+		locker.unlock();
+
+		while (!requests.empty()) {
+			postProcessor_->processComplete.emit(
+				requests.front(), PostProcessor::Status::Error);
+			requests.pop();
+		}
+
+		locker.lock();
+		state_ = State::Stopped;
+	}
+}
+
+void CameraStream::PostProcessorWorker::flush()
+{
+	libcamera::MutexLocker lock(mutex_);
+	state_ = State::Flushing;
+	lock.unlock();
+
+	cv_.notify_one();
+}
diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
index e74a9a3b..0c402deb 100644
--- a/src/android/camera_stream.h
+++ b/src/android/camera_stream.h
@@ -7,12 +7,16 @@ 
 #ifndef __ANDROID_CAMERA_STREAM_H__
 #define __ANDROID_CAMERA_STREAM_H__
 
+#include <condition_variable>
 #include <memory>
 #include <mutex>
+#include <queue>
 #include <vector>
 
 #include <hardware/camera3.h>
 
+#include <libcamera/base/thread.h>
+
 #include <libcamera/camera.h>
 #include <libcamera/framebuffer.h>
 #include <libcamera/framebuffer_allocator.h>
@@ -20,9 +24,9 @@ 
 #include <libcamera/pixel_format.h>
 
 #include "camera_request.h"
+#include "post_processor.h"
 
 class CameraDevice;
-class PostProcessor;
 
 class CameraStream
 {
@@ -124,8 +128,38 @@  public:
 	int process(Camera3RequestDescriptor::StreamBuffer *streamBuffer);
 	libcamera::FrameBuffer *getBuffer();
 	void putBuffer(libcamera::FrameBuffer *buffer);
+	void flush();
 
 private:
+	class PostProcessorWorker : public libcamera::Thread
+	{
+	public:
+		enum class State {
+			Stopped,
+			Running,
+			Flushing,
+		};
+
+		PostProcessorWorker(PostProcessor *postProcessor);
+		~PostProcessorWorker();
+
+		void start();
+		void queueRequest(Camera3RequestDescriptor::StreamBuffer *request);
+		void flush();
+
+	protected:
+		void run() override;
+
+	private:
+		PostProcessor *postProcessor_;
+
+		libcamera::Mutex mutex_;
+		std::condition_variable cv_;
+
+		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_;
+		State state_ = State::Stopped;
+	};
+
 	int waitFence(int fence);
 
 	CameraDevice *const cameraDevice_;
@@ -142,6 +176,8 @@  private:
 	 */
 	std::unique_ptr<std::mutex> mutex_;
 	std::unique_ptr<PostProcessor> postProcessor_;
+
+	std::unique_ptr<PostProcessorWorker> worker_;
 };
 
 #endif /* __ANDROID_CAMERA_STREAM__ */