[libcamera-devel,v4,5/7] android: post_processor: Make post processing async
diff mbox series

Message ID 20211011073505.243864-6-umang.jain@ideasonboard.com
State Changes Requested
Delegated to: Umang Jain
Headers show
Series
  • Async Post Processor
Related show

Commit Message

Umang Jain Oct. 11, 2021, 7:35 a.m. UTC
Introduce a dedicated worker class derived from libcamera::Thread.
The worker class maintains a queue for post-processing requests
and waits for a post-processing request to become available.
It will process them as per FIFO before de-queuing it from the
queue.

To get access to the source and destination buffers in the worker
thread, we also need to save a pointer to them in the
Camera3RequestDescriptor.

This patch also implements a flush() for the PostProcessorWorker
class which is responsible to purge post-processing requests
queued up while a camera is stopping/flushing.

The libcamera request completion handler CameraDevice::requestComplete()
assumes that the request that has just completed is at the front of the
queue. Now that the post-processor runs asynchronously, this isn't true
anymore, a request being post-processed will stay in the queue and a new
libcamera request may complete. Remove that assumption, and use the
request cookie to obtain the Camera3RequestDescriptor.

Signed-off-by: Umang Jain <umang.jain@ideasonboard.com>
Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
---
 src/android/camera_device.cpp |  25 +-------
 src/android/camera_device.h   |   3 +
 src/android/camera_stream.cpp | 108 +++++++++++++++++++++++++++++++---
 src/android/camera_stream.h   |  39 +++++++++++-
 4 files changed, 144 insertions(+), 31 deletions(-)

Comments

Umang Jain Oct. 11, 2021, 6:17 p.m. UTC | #1
Hi me,

On 10/11/21 1:05 PM, Umang Jain wrote:
> Introduce a dedicated worker class derived from libcamera::Thread.
> The worker class maintains a queue for post-processing requests
> and waits for a post-processing request to become available.
> It will process them as per FIFO before de-queuing it from the
> queue.
>
> To get access to the source and destination buffers in the worker
> thread, we also need to save a pointer to them in the
> Camera3RequestDescriptor.
>
> This patch also implements a flush() for the PostProcessorWorker
> class which is responsible to purge post-processing requests
> queued up while a camera is stopping/flushing.
>
> The libcamera request completion handler CameraDevice::requestComplete()
> assumes that the request that has just completed is at the front of the
> queue. Now that the post-processor runs asynchronously, this isn't true
> anymore, a request being post-processed will stay in the queue and a new
> libcamera request may complete. Remove that assumption, and use the
> request cookie to obtain the Camera3RequestDescriptor.
>
> Signed-off-by: Umang Jain <umang.jain@ideasonboard.com>
> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> ---
>   src/android/camera_device.cpp |  25 +-------
>   src/android/camera_device.h   |   3 +
>   src/android/camera_stream.cpp | 108 +++++++++++++++++++++++++++++++---
>   src/android/camera_stream.h   |  39 +++++++++++-
>   4 files changed, 144 insertions(+), 31 deletions(-)
>
> diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
> index eba370ea..61b902ad 100644
> --- a/src/android/camera_device.cpp
> +++ b/src/android/camera_device.cpp
> @@ -239,6 +239,7 @@ Camera3RequestDescriptor::Camera3RequestDescriptor(
>   	/* Clone the controls associated with the camera3 request. */
>   	settings_ = CameraMetadata(camera3Request->settings);
>   
> +	dest_.reset();
>   	/*
>   	 * Create the CaptureRequest, stored as a unique_ptr<> to tie its
>   	 * lifetime to the descriptor.
> @@ -1094,28 +1095,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
>   
>   void CameraDevice::requestComplete(Request *request)
>   {
> -	Camera3RequestDescriptor *descriptor;
> -	{
> -		MutexLocker descriptorsLock(descriptorsMutex_);
> -		ASSERT(!descriptors_.empty());
> -		descriptor = descriptors_.front().get();
> -	}
> -
> -	if (descriptor->request_->cookie() != request->cookie()) {
> -		/*
> -		 * \todo Clarify if the Camera has to be closed on
> -		 * ERROR_DEVICE and possibly demote the Fatal to simple
> -		 * Error.
> -		 */
> -		notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
> -		LOG(HAL, Fatal)
> -			<< "Out-of-order completion for request "
> -			<< utils::hex(request->cookie());
> -
> -		MutexLocker descriptorsLock(descriptorsMutex_);
> -		descriptors_.pop();
> -		return;
> -	}
> +	Camera3RequestDescriptor *descriptor =
> +		reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
>   
>   	/*
>   	 * Prepare the capture result for the Android camera stack.
> diff --git a/src/android/camera_device.h b/src/android/camera_device.h
> index eee97516..725a0618 100644
> --- a/src/android/camera_device.h
> +++ b/src/android/camera_device.h
> @@ -59,6 +59,9 @@ struct Camera3RequestDescriptor {
>   	std::unique_ptr<CameraMetadata> resultMetadata_;
>   	libcamera::FrameBuffer *internalBuffer_;
>   
> +	std::unique_ptr<CameraBuffer> dest_;
> +	const libcamera::FrameBuffer *src_;
> +
>   	camera3_capture_result_t captureResult_ = {};
>   	Status status_ = Status::Pending;
>   };
> diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
> index cec07269..818ef948 100644
> --- a/src/android/camera_stream.cpp
> +++ b/src/android/camera_stream.cpp
> @@ -94,10 +94,12 @@ int CameraStream::configure()
>   		if (ret)
>   			return ret;
>   
> +		worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
>   		postProcessor_->processComplete.connect(
>   			this, [&](Camera3RequestDescriptor *request, PostProcessor::Status status) {
>   				cameraDevice_->streamProcessingComplete(this, request, status);
>   			});
> +		worker_->start();
>   	}
>   
>   	if (type_ == Type::Internal) {
> @@ -167,19 +169,26 @@ void CameraStream::process(const FrameBuffer &source,
>   	if (!postProcessor_)
>   		return;
>   
> -	/*
> -	 * \todo Buffer mapping and processing should be moved to a
> -	 * separate thread.
> -	 */
>   	const StreamConfiguration &output = configuration();
> -	CameraBuffer dest(*camera3Dest.buffer, output.pixelFormat, output.size,
> -			  PROT_READ | PROT_WRITE);
> -	if (!dest.isValid()) {
> +	request->dest_ = std::make_unique<CameraBuffer>(
> +		*camera3Dest.buffer, output.pixelFormat, output.size, PROT_READ | PROT_WRITE);
> +	if (!request->dest_->isValid()) {
>   		LOG(HAL, Error) << "Failed to create destination buffer";
>   		return;
>   	}
>   
> -	postProcessor_->process(source, &dest, request);
> +	request->src_ = &source;
> +
> +	/* Push the postProcessor request to the worker queue. */
> +	worker_->queueRequest(request);
> +}
> +
> +void CameraStream::flush()
> +{
> +	if (!postProcessor_)
> +		return;
> +
> +	worker_->flush();
>   }
>   
>   FrameBuffer *CameraStream::getBuffer()
> @@ -209,3 +218,86 @@ void CameraStream::putBuffer(FrameBuffer *buffer)
>   
>   	buffers_.push_back(buffer);
>   }
> +
> +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
> +	: postProcessor_(postProcessor)
> +{
> +}
> +
> +CameraStream::PostProcessorWorker::~PostProcessorWorker()
> +{
> +	{
> +		libcamera::MutexLocker lock(mutex_);
> +		state_ = State::Stopped;
> +	}
> +
> +	cv_.notify_one();
> +	wait();
> +}
> +
> +void CameraStream::PostProcessorWorker::start()
> +{
> +	{
> +		libcamera::MutexLocker lock(mutex_);
> +		state_ = State::Running;
> +	}
> +
> +	Thread::start();
> +}
> +
> +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor *request)
> +{
> +	{
> +		MutexLocker lock(mutex_);
> +		ASSERT(state_ == State::Running);
> +		requests_.push(request);
> +	}
> +	cv_.notify_one();
> +}
> +
> +void CameraStream::PostProcessorWorker::run()
> +{
> +	MutexLocker locker(mutex_);
> +
> +	while (1) {
> +		cv_.wait(locker, [&] {
> +			return state_ != State::Running || !requests_.empty();
> +		});
> +
> +		if (state_ != State::Running)
> +			break;
> +
> +		Camera3RequestDescriptor *descriptor = requests_.front();
> +		requests_.pop();
> +		locker.unlock();
> +
> +		postProcessor_->process(*descriptor->src_, descriptor->dest_.get(),
> +					descriptor);
> +
> +		locker.lock();
> +	}
> +
> +	if (state_ == State::Flushing) {
> +		while (!requests_.empty()) {
> +			postProcessor_->processComplete.emit(requests_.front(),
> +							     PostProcessor::Status::Error);
> +			requests_.pop();
> +		}
> +		state_ = State::Stopped;
> +		locker.unlock();
> +		cv_.notify_one();
> +	}
> +}
> +
> +void CameraStream::PostProcessorWorker::flush()
> +{
> +	libcamera::MutexLocker lock(mutex_);
> +	state_ = State::Flushing;
> +	lock.unlock();
> +	cv_.notify_one();
> +
> +	lock.lock();
> +	cv_.wait(lock, [&] {


Mental note:

This can add a lot of latency to the flush op since it's happening on 
the main thread. I think the queues (post-processing one and 
descriptors) can be flushed out independently without waiting for 
one-another.

One still needs to set error state on descriptors/buffers, so that will 
require some thinking (since you can't iterate over a std::queue but 
set-pop() only, even for error setting). This might introduce another 
process_capture_results() callback outside sendCaptureResults (which is 
a divergence from ideal but maybe need to bite that bullet). The goal 
here is to be as fast as possible on flush op, to clear the queues. As 
an experiment, one can measure asctual latency on flush() as in master 
to have a common base figure can compare with the latency of this series


> +		return state_ == State::Stopped;
> +	});
> +}
> diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
> index a0c5f166..e410f35d 100644
> --- a/src/android/camera_stream.h
> +++ b/src/android/camera_stream.h
> @@ -7,21 +7,26 @@
>   #ifndef __ANDROID_CAMERA_STREAM_H__
>   #define __ANDROID_CAMERA_STREAM_H__
>   
> +#include <condition_variable>
>   #include <memory>
>   #include <mutex>
> +#include <queue>
>   #include <vector>
>   
>   #include <hardware/camera3.h>
>   
> +#include <libcamera/base/thread.h>
> +
>   #include <libcamera/camera.h>
>   #include <libcamera/framebuffer.h>
>   #include <libcamera/framebuffer_allocator.h>
>   #include <libcamera/geometry.h>
>   #include <libcamera/pixel_format.h>
>   
> +#include "post_processor.h"
> +
>   class CameraDevice;
>   class CameraMetadata;
> -class PostProcessor;
>   
>   struct Camera3RequestDescriptor;
>   
> @@ -125,8 +130,38 @@ public:
>   		     Camera3RequestDescriptor *request);
>   	libcamera::FrameBuffer *getBuffer();
>   	void putBuffer(libcamera::FrameBuffer *buffer);
> +	void flush();
>   
>   private:
> +	class PostProcessorWorker : public libcamera::Thread
> +	{
> +	public:
> +		enum class State {
> +			Stopped,
> +			Running,
> +			Flushing,
> +		};
> +
> +		PostProcessorWorker(PostProcessor *postProcessor);
> +		~PostProcessorWorker();
> +
> +		void start();
> +		void queueRequest(Camera3RequestDescriptor *request);
> +		void flush();
> +
> +	protected:
> +		void run() override;
> +
> +	private:
> +		PostProcessor *postProcessor_;
> +
> +		libcamera::Mutex mutex_;
> +		std::condition_variable cv_;
> +
> +		std::queue<Camera3RequestDescriptor *> requests_;
> +		State state_;
> +	};
> +
>   	int waitFence(int fence);
>   
>   	CameraDevice *const cameraDevice_;
> @@ -143,6 +178,8 @@ private:
>   	 */
>   	std::unique_ptr<std::mutex> mutex_;
>   	std::unique_ptr<PostProcessor> postProcessor_;
> +
> +	std::unique_ptr<PostProcessorWorker> worker_;
>   };
>   
>   #endif /* __ANDROID_CAMERA_STREAM__ */
Laurent Pinchart Oct. 12, 2021, 11:57 p.m. UTC | #2
Hi Umang,

Thank you for the patch.

s/async/asynchronous/ in the subject line.

On Mon, Oct 11, 2021 at 01:05:03PM +0530, Umang Jain wrote:
> Introduce a dedicated worker class derived from libcamera::Thread.
> The worker class maintains a queue for post-processing requests
> and waits for a post-processing request to become available.
> It will process them as per FIFO before de-queuing it from the
> queue.
> 
> To get access to the source and destination buffers in the worker
> thread, we also need to save a pointer to them in the
> Camera3RequestDescriptor.
> 
> This patch also implements a flush() for the PostProcessorWorker
> class which is responsible to purge post-processing requests
> queued up while a camera is stopping/flushing.
> 
> The libcamera request completion handler CameraDevice::requestComplete()
> assumes that the request that has just completed is at the front of the
> queue. Now that the post-processor runs asynchronously, this isn't true
> anymore, a request being post-processed will stay in the queue and a new
> libcamera request may complete. Remove that assumption, and use the
> request cookie to obtain the Camera3RequestDescriptor.
> 
> Signed-off-by: Umang Jain <umang.jain@ideasonboard.com>
> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> ---
>  src/android/camera_device.cpp |  25 +-------
>  src/android/camera_device.h   |   3 +
>  src/android/camera_stream.cpp | 108 +++++++++++++++++++++++++++++++---
>  src/android/camera_stream.h   |  39 +++++++++++-
>  4 files changed, 144 insertions(+), 31 deletions(-)
> 
> diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
> index eba370ea..61b902ad 100644
> --- a/src/android/camera_device.cpp
> +++ b/src/android/camera_device.cpp
> @@ -239,6 +239,7 @@ Camera3RequestDescriptor::Camera3RequestDescriptor(
>  	/* Clone the controls associated with the camera3 request. */
>  	settings_ = CameraMetadata(camera3Request->settings);
>  
> +	dest_.reset();

dest_ is a std::unique_ptr<>, its constructor will do the right thing,
you don't need to initialize it here.

>  	/*
>  	 * Create the CaptureRequest, stored as a unique_ptr<> to tie its
>  	 * lifetime to the descriptor.
> @@ -1094,28 +1095,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
>  
>  void CameraDevice::requestComplete(Request *request)
>  {
> -	Camera3RequestDescriptor *descriptor;
> -	{
> -		MutexLocker descriptorsLock(descriptorsMutex_);
> -		ASSERT(!descriptors_.empty());
> -		descriptor = descriptors_.front().get();
> -	}
> -
> -	if (descriptor->request_->cookie() != request->cookie()) {
> -		/*
> -		 * \todo Clarify if the Camera has to be closed on
> -		 * ERROR_DEVICE and possibly demote the Fatal to simple
> -		 * Error.
> -		 */
> -		notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
> -		LOG(HAL, Fatal)
> -			<< "Out-of-order completion for request "
> -			<< utils::hex(request->cookie());
> -
> -		MutexLocker descriptorsLock(descriptorsMutex_);
> -		descriptors_.pop();
> -		return;
> -	}
> +	Camera3RequestDescriptor *descriptor =
> +		reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
>  
>  	/*
>  	 * Prepare the capture result for the Android camera stack.
> diff --git a/src/android/camera_device.h b/src/android/camera_device.h
> index eee97516..725a0618 100644
> --- a/src/android/camera_device.h
> +++ b/src/android/camera_device.h
> @@ -59,6 +59,9 @@ struct Camera3RequestDescriptor {
>  	std::unique_ptr<CameraMetadata> resultMetadata_;
>  	libcamera::FrameBuffer *internalBuffer_;
>  
> +	std::unique_ptr<CameraBuffer> dest_;
> +	const libcamera::FrameBuffer *src_;

As mentioned in the review of the previous patch, you can have more than
one post-processed stream per request, so this won't be enough.

I'd recomment first refactoring the Camera3RequestDescriptor class and
add an internal

	struct Stream {
		camera3_stream_buffer_t buffer;
		std::unique_ptr<libcamera::FrameBuffer> frameBuffer;
	};

with the buffers_ and frameBuffers members replaced with

	std::vector<Stream> streams_;

Then you can extend the Stream structure in this patch to add the
necessary fields.

Thinking some more about it, src_ is likely not needed, as it's a
pointer to the FrameBuffer already stored in struct Stream. What you'll
need will be the ability to find the Stream instance corresponding to a
given libcamera stream, so maybe a map would be better than a vector.

It also seems like the PostProcessorWorker should move from processing
requests to processing streams, as there's one PostProcessorWorker for
each CameraStream. Maybe the struct Stream should contain a pointer to
its Camera3RequestDescriptor, that way you could pass the
Camera3RequestDescriptor::Stream pointer to CameraStream::process() and
to the post-processors, and then find the corresponding
Camera3RequestDescriptor in the completion handler.

> +
>  	camera3_capture_result_t captureResult_ = {};
>  	Status status_ = Status::Pending;
>  };
> diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
> index cec07269..818ef948 100644
> --- a/src/android/camera_stream.cpp
> +++ b/src/android/camera_stream.cpp
> @@ -94,10 +94,12 @@ int CameraStream::configure()
>  		if (ret)
>  			return ret;
>  
> +		worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
>  		postProcessor_->processComplete.connect(
>  			this, [&](Camera3RequestDescriptor *request, PostProcessor::Status status) {
>  				cameraDevice_->streamProcessingComplete(this, request, status);
>  			});
> +		worker_->start();
>  	}
>  
>  	if (type_ == Type::Internal) {
> @@ -167,19 +169,26 @@ void CameraStream::process(const FrameBuffer &source,
>  	if (!postProcessor_)
>  		return;
>  
> -	/*
> -	 * \todo Buffer mapping and processing should be moved to a
> -	 * separate thread.
> -	 */
>  	const StreamConfiguration &output = configuration();
> -	CameraBuffer dest(*camera3Dest.buffer, output.pixelFormat, output.size,
> -			  PROT_READ | PROT_WRITE);
> -	if (!dest.isValid()) {
> +	request->dest_ = std::make_unique<CameraBuffer>(
> +		*camera3Dest.buffer, output.pixelFormat, output.size, PROT_READ | PROT_WRITE);
> +	if (!request->dest_->isValid()) {
>  		LOG(HAL, Error) << "Failed to create destination buffer";
>  		return;
>  	}
>  
> -	postProcessor_->process(source, &dest, request);
> +	request->src_ = &source;
> +
> +	/* Push the postProcessor request to the worker queue. */
> +	worker_->queueRequest(request);
> +}
> +
> +void CameraStream::flush()
> +{
> +	if (!postProcessor_)
> +		return;
> +
> +	worker_->flush();
>  }
>  
>  FrameBuffer *CameraStream::getBuffer()
> @@ -209,3 +218,86 @@ void CameraStream::putBuffer(FrameBuffer *buffer)
>  
>  	buffers_.push_back(buffer);
>  }
> +
> +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
> +	: postProcessor_(postProcessor)
> +{
> +}
> +
> +CameraStream::PostProcessorWorker::~PostProcessorWorker()
> +{
> +	{
> +		libcamera::MutexLocker lock(mutex_);
> +		state_ = State::Stopped;
> +	}
> +
> +	cv_.notify_one();
> +	wait();
> +}
> +
> +void CameraStream::PostProcessorWorker::start()
> +{
> +	{
> +		libcamera::MutexLocker lock(mutex_);
> +		state_ = State::Running;
> +	}
> +
> +	Thread::start();
> +}
> +
> +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor *request)
> +{
> +	{
> +		MutexLocker lock(mutex_);
> +		ASSERT(state_ == State::Running);
> +		requests_.push(request);
> +	}
> +	cv_.notify_one();
> +}
> +
> +void CameraStream::PostProcessorWorker::run()
> +{
> +	MutexLocker locker(mutex_);
> +
> +	while (1) {
> +		cv_.wait(locker, [&] {
> +			return state_ != State::Running || !requests_.empty();
> +		});
> +
> +		if (state_ != State::Running)
> +			break;
> +
> +		Camera3RequestDescriptor *descriptor = requests_.front();
> +		requests_.pop();
> +		locker.unlock();
> +
> +		postProcessor_->process(*descriptor->src_, descriptor->dest_.get(),
> +					descriptor);
> +
> +		locker.lock();
> +	}
> +
> +	if (state_ == State::Flushing) {
> +		while (!requests_.empty()) {
> +			postProcessor_->processComplete.emit(requests_.front(),
> +							     PostProcessor::Status::Error);
> +			requests_.pop();
> +		}
> +		state_ = State::Stopped;
> +		locker.unlock();
> +		cv_.notify_one();
> +	}
> +}
> +
> +void CameraStream::PostProcessorWorker::flush()
> +{
> +	libcamera::MutexLocker lock(mutex_);
> +	state_ = State::Flushing;
> +	lock.unlock();
> +	cv_.notify_one();
> +
> +	lock.lock();
> +	cv_.wait(lock, [&] {
> +		return state_ == State::Stopped;
> +	});
> +}
> diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
> index a0c5f166..e410f35d 100644
> --- a/src/android/camera_stream.h
> +++ b/src/android/camera_stream.h
> @@ -7,21 +7,26 @@
>  #ifndef __ANDROID_CAMERA_STREAM_H__
>  #define __ANDROID_CAMERA_STREAM_H__
>  
> +#include <condition_variable>
>  #include <memory>
>  #include <mutex>
> +#include <queue>
>  #include <vector>
>  
>  #include <hardware/camera3.h>
>  
> +#include <libcamera/base/thread.h>
> +
>  #include <libcamera/camera.h>
>  #include <libcamera/framebuffer.h>
>  #include <libcamera/framebuffer_allocator.h>
>  #include <libcamera/geometry.h>
>  #include <libcamera/pixel_format.h>
>  
> +#include "post_processor.h"
> +
>  class CameraDevice;
>  class CameraMetadata;
> -class PostProcessor;
>  
>  struct Camera3RequestDescriptor;
>  
> @@ -125,8 +130,38 @@ public:
>  		     Camera3RequestDescriptor *request);
>  	libcamera::FrameBuffer *getBuffer();
>  	void putBuffer(libcamera::FrameBuffer *buffer);
> +	void flush();
>  
>  private:
> +	class PostProcessorWorker : public libcamera::Thread
> +	{
> +	public:
> +		enum class State {
> +			Stopped,
> +			Running,
> +			Flushing,
> +		};
> +
> +		PostProcessorWorker(PostProcessor *postProcessor);
> +		~PostProcessorWorker();
> +
> +		void start();
> +		void queueRequest(Camera3RequestDescriptor *request);
> +		void flush();
> +
> +	protected:
> +		void run() override;
> +
> +	private:
> +		PostProcessor *postProcessor_;
> +
> +		libcamera::Mutex mutex_;
> +		std::condition_variable cv_;
> +
> +		std::queue<Camera3RequestDescriptor *> requests_;
> +		State state_;
> +	};
> +
>  	int waitFence(int fence);
>  
>  	CameraDevice *const cameraDevice_;
> @@ -143,6 +178,8 @@ private:
>  	 */
>  	std::unique_ptr<std::mutex> mutex_;
>  	std::unique_ptr<PostProcessor> postProcessor_;
> +
> +	std::unique_ptr<PostProcessorWorker> worker_;
>  };
>  
>  #endif /* __ANDROID_CAMERA_STREAM__ */
Laurent Pinchart Oct. 13, 2021, 12:02 a.m. UTC | #3
On Wed, Oct 13, 2021 at 02:57:16AM +0300, Laurent Pinchart wrote:
> Hi Umang,
> 
> Thank you for the patch.
> 
> s/async/asynchronous/ in the subject line.
> 
> On Mon, Oct 11, 2021 at 01:05:03PM +0530, Umang Jain wrote:
> > Introduce a dedicated worker class derived from libcamera::Thread.
> > The worker class maintains a queue for post-processing requests
> > and waits for a post-processing request to become available.
> > It will process them as per FIFO before de-queuing it from the
> > queue.
> > 
> > To get access to the source and destination buffers in the worker
> > thread, we also need to save a pointer to them in the
> > Camera3RequestDescriptor.
> > 
> > This patch also implements a flush() for the PostProcessorWorker
> > class which is responsible to purge post-processing requests
> > queued up while a camera is stopping/flushing.
> > 
> > The libcamera request completion handler CameraDevice::requestComplete()
> > assumes that the request that has just completed is at the front of the
> > queue. Now that the post-processor runs asynchronously, this isn't true
> > anymore, a request being post-processed will stay in the queue and a new
> > libcamera request may complete. Remove that assumption, and use the
> > request cookie to obtain the Camera3RequestDescriptor.
> > 
> > Signed-off-by: Umang Jain <umang.jain@ideasonboard.com>
> > Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> > ---
> >  src/android/camera_device.cpp |  25 +-------
> >  src/android/camera_device.h   |   3 +
> >  src/android/camera_stream.cpp | 108 +++++++++++++++++++++++++++++++---
> >  src/android/camera_stream.h   |  39 +++++++++++-
> >  4 files changed, 144 insertions(+), 31 deletions(-)
> > 
> > diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
> > index eba370ea..61b902ad 100644
> > --- a/src/android/camera_device.cpp
> > +++ b/src/android/camera_device.cpp
> > @@ -239,6 +239,7 @@ Camera3RequestDescriptor::Camera3RequestDescriptor(
> >  	/* Clone the controls associated with the camera3 request. */
> >  	settings_ = CameraMetadata(camera3Request->settings);
> >  
> > +	dest_.reset();
> 
> dest_ is a std::unique_ptr<>, its constructor will do the right thing,
> you don't need to initialize it here.
> 
> >  	/*
> >  	 * Create the CaptureRequest, stored as a unique_ptr<> to tie its
> >  	 * lifetime to the descriptor.
> > @@ -1094,28 +1095,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
> >  
> >  void CameraDevice::requestComplete(Request *request)
> >  {
> > -	Camera3RequestDescriptor *descriptor;
> > -	{
> > -		MutexLocker descriptorsLock(descriptorsMutex_);
> > -		ASSERT(!descriptors_.empty());
> > -		descriptor = descriptors_.front().get();
> > -	}
> > -
> > -	if (descriptor->request_->cookie() != request->cookie()) {
> > -		/*
> > -		 * \todo Clarify if the Camera has to be closed on
> > -		 * ERROR_DEVICE and possibly demote the Fatal to simple
> > -		 * Error.
> > -		 */
> > -		notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
> > -		LOG(HAL, Fatal)
> > -			<< "Out-of-order completion for request "
> > -			<< utils::hex(request->cookie());
> > -
> > -		MutexLocker descriptorsLock(descriptorsMutex_);
> > -		descriptors_.pop();
> > -		return;
> > -	}
> > +	Camera3RequestDescriptor *descriptor =
> > +		reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
> >  
> >  	/*
> >  	 * Prepare the capture result for the Android camera stack.
> > diff --git a/src/android/camera_device.h b/src/android/camera_device.h
> > index eee97516..725a0618 100644
> > --- a/src/android/camera_device.h
> > +++ b/src/android/camera_device.h
> > @@ -59,6 +59,9 @@ struct Camera3RequestDescriptor {
> >  	std::unique_ptr<CameraMetadata> resultMetadata_;
> >  	libcamera::FrameBuffer *internalBuffer_;
> >  
> > +	std::unique_ptr<CameraBuffer> dest_;
> > +	const libcamera::FrameBuffer *src_;
> 
> As mentioned in the review of the previous patch, you can have more than
> one post-processed stream per request, so this won't be enough.
> 
> I'd recomment first refactoring the Camera3RequestDescriptor class and
> add an internal
> 
> 	struct Stream {
> 		camera3_stream_buffer_t buffer;
> 		std::unique_ptr<libcamera::FrameBuffer> frameBuffer;
> 	};
> 
> with the buffers_ and frameBuffers members replaced with
> 
> 	std::vector<Stream> streams_;
> 
> Then you can extend the Stream structure in this patch to add the
> necessary fields.
> 
> Thinking some more about it, src_ is likely not needed, as it's a
> pointer to the FrameBuffer already stored in struct Stream. What you'll
> need will be the ability to find the Stream instance corresponding to a
> given libcamera stream, so maybe a map would be better than a vector.
> 
> It also seems like the PostProcessorWorker should move from processing
> requests to processing streams, as there's one PostProcessorWorker for
> each CameraStream. Maybe the struct Stream should contain a pointer to
> its Camera3RequestDescriptor, that way you could pass the
> Camera3RequestDescriptor::Stream pointer to CameraStream::process() and
> to the post-processors, and then find the corresponding
> Camera3RequestDescriptor in the completion handler.

By the way, another option may be to move the PostProcessorWorker to
CameraDevice and still give it a Camera3RequestDescriptor, and
internally in the thread run the post-processors sequentially for each
post-processed stream. If we had a lot of post-processed streams I would
say that would be a better design, as we could then create a threads
pool (with N threads for M streams, and N < M) and dispatch the jobs to
those threads, but that's overkill I think. Still, maybe a single thread
design would be easier and look cleaner, I'm not sure.

> > +
> >  	camera3_capture_result_t captureResult_ = {};
> >  	Status status_ = Status::Pending;
> >  };
> > diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
> > index cec07269..818ef948 100644
> > --- a/src/android/camera_stream.cpp
> > +++ b/src/android/camera_stream.cpp
> > @@ -94,10 +94,12 @@ int CameraStream::configure()
> >  		if (ret)
> >  			return ret;
> >  
> > +		worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
> >  		postProcessor_->processComplete.connect(
> >  			this, [&](Camera3RequestDescriptor *request, PostProcessor::Status status) {
> >  				cameraDevice_->streamProcessingComplete(this, request, status);
> >  			});
> > +		worker_->start();
> >  	}
> >  
> >  	if (type_ == Type::Internal) {
> > @@ -167,19 +169,26 @@ void CameraStream::process(const FrameBuffer &source,
> >  	if (!postProcessor_)
> >  		return;
> >  
> > -	/*
> > -	 * \todo Buffer mapping and processing should be moved to a
> > -	 * separate thread.
> > -	 */
> >  	const StreamConfiguration &output = configuration();
> > -	CameraBuffer dest(*camera3Dest.buffer, output.pixelFormat, output.size,
> > -			  PROT_READ | PROT_WRITE);
> > -	if (!dest.isValid()) {
> > +	request->dest_ = std::make_unique<CameraBuffer>(
> > +		*camera3Dest.buffer, output.pixelFormat, output.size, PROT_READ | PROT_WRITE);
> > +	if (!request->dest_->isValid()) {
> >  		LOG(HAL, Error) << "Failed to create destination buffer";
> >  		return;
> >  	}
> >  
> > -	postProcessor_->process(source, &dest, request);
> > +	request->src_ = &source;
> > +
> > +	/* Push the postProcessor request to the worker queue. */
> > +	worker_->queueRequest(request);
> > +}
> > +
> > +void CameraStream::flush()
> > +{
> > +	if (!postProcessor_)
> > +		return;
> > +
> > +	worker_->flush();
> >  }
> >  
> >  FrameBuffer *CameraStream::getBuffer()
> > @@ -209,3 +218,86 @@ void CameraStream::putBuffer(FrameBuffer *buffer)
> >  
> >  	buffers_.push_back(buffer);
> >  }
> > +
> > +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
> > +	: postProcessor_(postProcessor)
> > +{
> > +}
> > +
> > +CameraStream::PostProcessorWorker::~PostProcessorWorker()
> > +{
> > +	{
> > +		libcamera::MutexLocker lock(mutex_);
> > +		state_ = State::Stopped;
> > +	}
> > +
> > +	cv_.notify_one();
> > +	wait();
> > +}
> > +
> > +void CameraStream::PostProcessorWorker::start()
> > +{
> > +	{
> > +		libcamera::MutexLocker lock(mutex_);
> > +		state_ = State::Running;
> > +	}
> > +
> > +	Thread::start();
> > +}
> > +
> > +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor *request)
> > +{
> > +	{
> > +		MutexLocker lock(mutex_);
> > +		ASSERT(state_ == State::Running);
> > +		requests_.push(request);
> > +	}
> > +	cv_.notify_one();
> > +}
> > +
> > +void CameraStream::PostProcessorWorker::run()
> > +{
> > +	MutexLocker locker(mutex_);
> > +
> > +	while (1) {
> > +		cv_.wait(locker, [&] {
> > +			return state_ != State::Running || !requests_.empty();
> > +		});
> > +
> > +		if (state_ != State::Running)
> > +			break;
> > +
> > +		Camera3RequestDescriptor *descriptor = requests_.front();
> > +		requests_.pop();
> > +		locker.unlock();
> > +
> > +		postProcessor_->process(*descriptor->src_, descriptor->dest_.get(),
> > +					descriptor);
> > +
> > +		locker.lock();
> > +	}
> > +
> > +	if (state_ == State::Flushing) {
> > +		while (!requests_.empty()) {
> > +			postProcessor_->processComplete.emit(requests_.front(),
> > +							     PostProcessor::Status::Error);
> > +			requests_.pop();
> > +		}
> > +		state_ = State::Stopped;
> > +		locker.unlock();
> > +		cv_.notify_one();
> > +	}
> > +}
> > +
> > +void CameraStream::PostProcessorWorker::flush()
> > +{
> > +	libcamera::MutexLocker lock(mutex_);
> > +	state_ = State::Flushing;
> > +	lock.unlock();
> > +	cv_.notify_one();
> > +
> > +	lock.lock();
> > +	cv_.wait(lock, [&] {
> > +		return state_ == State::Stopped;
> > +	});
> > +}
> > diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
> > index a0c5f166..e410f35d 100644
> > --- a/src/android/camera_stream.h
> > +++ b/src/android/camera_stream.h
> > @@ -7,21 +7,26 @@
> >  #ifndef __ANDROID_CAMERA_STREAM_H__
> >  #define __ANDROID_CAMERA_STREAM_H__
> >  
> > +#include <condition_variable>
> >  #include <memory>
> >  #include <mutex>
> > +#include <queue>
> >  #include <vector>
> >  
> >  #include <hardware/camera3.h>
> >  
> > +#include <libcamera/base/thread.h>
> > +
> >  #include <libcamera/camera.h>
> >  #include <libcamera/framebuffer.h>
> >  #include <libcamera/framebuffer_allocator.h>
> >  #include <libcamera/geometry.h>
> >  #include <libcamera/pixel_format.h>
> >  
> > +#include "post_processor.h"
> > +
> >  class CameraDevice;
> >  class CameraMetadata;
> > -class PostProcessor;
> >  
> >  struct Camera3RequestDescriptor;
> >  
> > @@ -125,8 +130,38 @@ public:
> >  		     Camera3RequestDescriptor *request);
> >  	libcamera::FrameBuffer *getBuffer();
> >  	void putBuffer(libcamera::FrameBuffer *buffer);
> > +	void flush();
> >  
> >  private:
> > +	class PostProcessorWorker : public libcamera::Thread
> > +	{
> > +	public:
> > +		enum class State {
> > +			Stopped,
> > +			Running,
> > +			Flushing,
> > +		};
> > +
> > +		PostProcessorWorker(PostProcessor *postProcessor);
> > +		~PostProcessorWorker();
> > +
> > +		void start();
> > +		void queueRequest(Camera3RequestDescriptor *request);
> > +		void flush();
> > +
> > +	protected:
> > +		void run() override;
> > +
> > +	private:
> > +		PostProcessor *postProcessor_;
> > +
> > +		libcamera::Mutex mutex_;
> > +		std::condition_variable cv_;
> > +
> > +		std::queue<Camera3RequestDescriptor *> requests_;
> > +		State state_;
> > +	};
> > +
> >  	int waitFence(int fence);
> >  
> >  	CameraDevice *const cameraDevice_;
> > @@ -143,6 +178,8 @@ private:
> >  	 */
> >  	std::unique_ptr<std::mutex> mutex_;
> >  	std::unique_ptr<PostProcessor> postProcessor_;
> > +
> > +	std::unique_ptr<PostProcessorWorker> worker_;
> >  };
> >  
> >  #endif /* __ANDROID_CAMERA_STREAM__ */
Umang Jain Oct. 13, 2021, 9:44 a.m. UTC | #4
Hi Laurent,

Thank you for your thoughts

On 10/13/21 5:27 AM, Laurent Pinchart wrote:
> Hi Umang,
>
> Thank you for the patch.
>
> s/async/asynchronous/ in the subject line.
>
> On Mon, Oct 11, 2021 at 01:05:03PM +0530, Umang Jain wrote:
>> Introduce a dedicated worker class derived from libcamera::Thread.
>> The worker class maintains a queue for post-processing requests
>> and waits for a post-processing request to become available.
>> It will process them as per FIFO before de-queuing it from the
>> queue.
>>
>> To get access to the source and destination buffers in the worker
>> thread, we also need to save a pointer to them in the
>> Camera3RequestDescriptor.
>>
>> This patch also implements a flush() for the PostProcessorWorker
>> class which is responsible to purge post-processing requests
>> queued up while a camera is stopping/flushing.
>>
>> The libcamera request completion handler CameraDevice::requestComplete()
>> assumes that the request that has just completed is at the front of the
>> queue. Now that the post-processor runs asynchronously, this isn't true
>> anymore, a request being post-processed will stay in the queue and a new
>> libcamera request may complete. Remove that assumption, and use the
>> request cookie to obtain the Camera3RequestDescriptor.
>>
>> Signed-off-by: Umang Jain <umang.jain@ideasonboard.com>
>> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
>> ---
>>   src/android/camera_device.cpp |  25 +-------
>>   src/android/camera_device.h   |   3 +
>>   src/android/camera_stream.cpp | 108 +++++++++++++++++++++++++++++++---
>>   src/android/camera_stream.h   |  39 +++++++++++-
>>   4 files changed, 144 insertions(+), 31 deletions(-)
>>
>> diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
>> index eba370ea..61b902ad 100644
>> --- a/src/android/camera_device.cpp
>> +++ b/src/android/camera_device.cpp
>> @@ -239,6 +239,7 @@ Camera3RequestDescriptor::Camera3RequestDescriptor(
>>   	/* Clone the controls associated with the camera3 request. */
>>   	settings_ = CameraMetadata(camera3Request->settings);
>>   
>> +	dest_.reset();
> dest_ is a std::unique_ptr<>, its constructor will do the right thing,
> you don't need to initialize it here.
>
>>   	/*
>>   	 * Create the CaptureRequest, stored as a unique_ptr<> to tie its
>>   	 * lifetime to the descriptor.
>> @@ -1094,28 +1095,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
>>   
>>   void CameraDevice::requestComplete(Request *request)
>>   {
>> -	Camera3RequestDescriptor *descriptor;
>> -	{
>> -		MutexLocker descriptorsLock(descriptorsMutex_);
>> -		ASSERT(!descriptors_.empty());
>> -		descriptor = descriptors_.front().get();
>> -	}
>> -
>> -	if (descriptor->request_->cookie() != request->cookie()) {
>> -		/*
>> -		 * \todo Clarify if the Camera has to be closed on
>> -		 * ERROR_DEVICE and possibly demote the Fatal to simple
>> -		 * Error.
>> -		 */
>> -		notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
>> -		LOG(HAL, Fatal)
>> -			<< "Out-of-order completion for request "
>> -			<< utils::hex(request->cookie());
>> -
>> -		MutexLocker descriptorsLock(descriptorsMutex_);
>> -		descriptors_.pop();
>> -		return;
>> -	}
>> +	Camera3RequestDescriptor *descriptor =
>> +		reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
>>   
>>   	/*
>>   	 * Prepare the capture result for the Android camera stack.
>> diff --git a/src/android/camera_device.h b/src/android/camera_device.h
>> index eee97516..725a0618 100644
>> --- a/src/android/camera_device.h
>> +++ b/src/android/camera_device.h
>> @@ -59,6 +59,9 @@ struct Camera3RequestDescriptor {
>>   	std::unique_ptr<CameraMetadata> resultMetadata_;
>>   	libcamera::FrameBuffer *internalBuffer_;
>>   
>> +	std::unique_ptr<CameraBuffer> dest_;
>> +	const libcamera::FrameBuffer *src_;
> As mentioned in the review of the previous patch, you can have more than
> one post-processed stream per request, so this won't be enough.


Do we actually know if we have such requests coming from, that require 
multiple post-processed streams?

Or is the answer, "We might, from the CTS framework" ?

>
> I'd recomment first refactoring the Camera3RequestDescriptor class and
> add an internal
>
> 	struct Stream {
> 		camera3_stream_buffer_t buffer;
> 		std::unique_ptr<libcamera::FrameBuffer> frameBuffer;
> 	};
>
> with the buffers_ and frameBuffers members replaced with
>
> 	std::vector<Stream> streams_;
>
> Then you can extend the Stream structure in this patch to add the
> necessary fields.


Makes sense.

>
> Thinking some more about it, src_ is likely not needed, as it's a
> pointer to the FrameBuffer already stored in struct Stream. What you'll
> need will be the ability to find the Stream instance corresponding to a
> given libcamera stream, so maybe a map would be better than a vector.

Yes, we should be able to associate it from the start otherwise we will 
need to introduce needless iterations on finding it just queuing it to 
post-processor.


>
> It also seems like the PostProcessorWorker should move from processing
> requests to processing streams, as there's one PostProcessorWorker for
> each CameraStream. Maybe the struct Stream should contain a pointer to
> its Camera3RequestDescriptor, that way you could pass the
> Camera3RequestDescriptor::Stream pointer to CameraStream::process() and
> to the post-processors, and then find the corresponding
> Camera3RequestDescriptor in the completion handler.


One potential issue here post-processing streams might require to know 
the statuses of other (post-processing) streams too, belonging to the 
same request? Because we need to complete the request only after all 
streams have finished post-processing,

Currently if we only maintain one queue, that can end up with 
post-processing requests of streams from multiple successive requests. 
Need to think a bit but surely, we might need to containerize this 
further I think (container for streams of one request + container for 
post-processing requests). We have the latter, but I think we also need 
to have a former but avaibility to check container.empty(). Error 
handling paths also might can get tricky in such a scenario. Let's see.

>
>> +
>>   	camera3_capture_result_t captureResult_ = {};
>>   	Status status_ = Status::Pending;
>>   };
>> diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
>> index cec07269..818ef948 100644
>> --- a/src/android/camera_stream.cpp
>> +++ b/src/android/camera_stream.cpp
>> @@ -94,10 +94,12 @@ int CameraStream::configure()
>>   		if (ret)
>>   			return ret;
>>   
>> +		worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
>>   		postProcessor_->processComplete.connect(
>>   			this, [&](Camera3RequestDescriptor *request, PostProcessor::Status status) {
>>   				cameraDevice_->streamProcessingComplete(this, request, status);
>>   			});
>> +		worker_->start();
>>   	}
>>   
>>   	if (type_ == Type::Internal) {
>> @@ -167,19 +169,26 @@ void CameraStream::process(const FrameBuffer &source,
>>   	if (!postProcessor_)
>>   		return;
>>   
>> -	/*
>> -	 * \todo Buffer mapping and processing should be moved to a
>> -	 * separate thread.
>> -	 */
>>   	const StreamConfiguration &output = configuration();
>> -	CameraBuffer dest(*camera3Dest.buffer, output.pixelFormat, output.size,
>> -			  PROT_READ | PROT_WRITE);
>> -	if (!dest.isValid()) {
>> +	request->dest_ = std::make_unique<CameraBuffer>(
>> +		*camera3Dest.buffer, output.pixelFormat, output.size, PROT_READ | PROT_WRITE);
>> +	if (!request->dest_->isValid()) {
>>   		LOG(HAL, Error) << "Failed to create destination buffer";
>>   		return;
>>   	}
>>   
>> -	postProcessor_->process(source, &dest, request);
>> +	request->src_ = &source;
>> +
>> +	/* Push the postProcessor request to the worker queue. */
>> +	worker_->queueRequest(request);
>> +}
>> +
>> +void CameraStream::flush()
>> +{
>> +	if (!postProcessor_)
>> +		return;
>> +
>> +	worker_->flush();
>>   }
>>   
>>   FrameBuffer *CameraStream::getBuffer()
>> @@ -209,3 +218,86 @@ void CameraStream::putBuffer(FrameBuffer *buffer)
>>   
>>   	buffers_.push_back(buffer);
>>   }
>> +
>> +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
>> +	: postProcessor_(postProcessor)
>> +{
>> +}
>> +
>> +CameraStream::PostProcessorWorker::~PostProcessorWorker()
>> +{
>> +	{
>> +		libcamera::MutexLocker lock(mutex_);
>> +		state_ = State::Stopped;
>> +	}
>> +
>> +	cv_.notify_one();
>> +	wait();
>> +}
>> +
>> +void CameraStream::PostProcessorWorker::start()
>> +{
>> +	{
>> +		libcamera::MutexLocker lock(mutex_);
>> +		state_ = State::Running;
>> +	}
>> +
>> +	Thread::start();
>> +}
>> +
>> +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor *request)
>> +{
>> +	{
>> +		MutexLocker lock(mutex_);
>> +		ASSERT(state_ == State::Running);
>> +		requests_.push(request);
>> +	}
>> +	cv_.notify_one();
>> +}
>> +
>> +void CameraStream::PostProcessorWorker::run()
>> +{
>> +	MutexLocker locker(mutex_);
>> +
>> +	while (1) {
>> +		cv_.wait(locker, [&] {
>> +			return state_ != State::Running || !requests_.empty();
>> +		});
>> +
>> +		if (state_ != State::Running)
>> +			break;
>> +
>> +		Camera3RequestDescriptor *descriptor = requests_.front();
>> +		requests_.pop();
>> +		locker.unlock();
>> +
>> +		postProcessor_->process(*descriptor->src_, descriptor->dest_.get(),
>> +					descriptor);
>> +
>> +		locker.lock();
>> +	}
>> +
>> +	if (state_ == State::Flushing) {
>> +		while (!requests_.empty()) {
>> +			postProcessor_->processComplete.emit(requests_.front(),
>> +							     PostProcessor::Status::Error);
>> +			requests_.pop();
>> +		}
>> +		state_ = State::Stopped;
>> +		locker.unlock();
>> +		cv_.notify_one();
>> +	}
>> +}
>> +
>> +void CameraStream::PostProcessorWorker::flush()
>> +{
>> +	libcamera::MutexLocker lock(mutex_);
>> +	state_ = State::Flushing;
>> +	lock.unlock();
>> +	cv_.notify_one();
>> +
>> +	lock.lock();
>> +	cv_.wait(lock, [&] {
>> +		return state_ == State::Stopped;
>> +	});
>> +}
>> diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
>> index a0c5f166..e410f35d 100644
>> --- a/src/android/camera_stream.h
>> +++ b/src/android/camera_stream.h
>> @@ -7,21 +7,26 @@
>>   #ifndef __ANDROID_CAMERA_STREAM_H__
>>   #define __ANDROID_CAMERA_STREAM_H__
>>   
>> +#include <condition_variable>
>>   #include <memory>
>>   #include <mutex>
>> +#include <queue>
>>   #include <vector>
>>   
>>   #include <hardware/camera3.h>
>>   
>> +#include <libcamera/base/thread.h>
>> +
>>   #include <libcamera/camera.h>
>>   #include <libcamera/framebuffer.h>
>>   #include <libcamera/framebuffer_allocator.h>
>>   #include <libcamera/geometry.h>
>>   #include <libcamera/pixel_format.h>
>>   
>> +#include "post_processor.h"
>> +
>>   class CameraDevice;
>>   class CameraMetadata;
>> -class PostProcessor;
>>   
>>   struct Camera3RequestDescriptor;
>>   
>> @@ -125,8 +130,38 @@ public:
>>   		     Camera3RequestDescriptor *request);
>>   	libcamera::FrameBuffer *getBuffer();
>>   	void putBuffer(libcamera::FrameBuffer *buffer);
>> +	void flush();
>>   
>>   private:
>> +	class PostProcessorWorker : public libcamera::Thread
>> +	{
>> +	public:
>> +		enum class State {
>> +			Stopped,
>> +			Running,
>> +			Flushing,
>> +		};
>> +
>> +		PostProcessorWorker(PostProcessor *postProcessor);
>> +		~PostProcessorWorker();
>> +
>> +		void start();
>> +		void queueRequest(Camera3RequestDescriptor *request);
>> +		void flush();
>> +
>> +	protected:
>> +		void run() override;
>> +
>> +	private:
>> +		PostProcessor *postProcessor_;
>> +
>> +		libcamera::Mutex mutex_;
>> +		std::condition_variable cv_;
>> +
>> +		std::queue<Camera3RequestDescriptor *> requests_;
>> +		State state_;
>> +	};
>> +
>>   	int waitFence(int fence);
>>   
>>   	CameraDevice *const cameraDevice_;
>> @@ -143,6 +178,8 @@ private:
>>   	 */
>>   	std::unique_ptr<std::mutex> mutex_;
>>   	std::unique_ptr<PostProcessor> postProcessor_;
>> +
>> +	std::unique_ptr<PostProcessorWorker> worker_;
>>   };
>>   
>>   #endif /* __ANDROID_CAMERA_STREAM__ */
Umang Jain Oct. 13, 2021, 9:51 a.m. UTC | #5
Hi Laurent,

On 10/13/21 5:32 AM, Laurent Pinchart wrote:
> On Wed, Oct 13, 2021 at 02:57:16AM +0300, Laurent Pinchart wrote:
>> Hi Umang,
>>
>> Thank you for the patch.
>>
>> s/async/asynchronous/ in the subject line.
>>
>> On Mon, Oct 11, 2021 at 01:05:03PM +0530, Umang Jain wrote:
>>> Introduce a dedicated worker class derived from libcamera::Thread.
>>> The worker class maintains a queue for post-processing requests
>>> and waits for a post-processing request to become available.
>>> It will process them as per FIFO before de-queuing it from the
>>> queue.
>>>
>>> To get access to the source and destination buffers in the worker
>>> thread, we also need to save a pointer to them in the
>>> Camera3RequestDescriptor.
>>>
>>> This patch also implements a flush() for the PostProcessorWorker
>>> class which is responsible to purge post-processing requests
>>> queued up while a camera is stopping/flushing.
>>>
>>> The libcamera request completion handler CameraDevice::requestComplete()
>>> assumes that the request that has just completed is at the front of the
>>> queue. Now that the post-processor runs asynchronously, this isn't true
>>> anymore, a request being post-processed will stay in the queue and a new
>>> libcamera request may complete. Remove that assumption, and use the
>>> request cookie to obtain the Camera3RequestDescriptor.
>>>
>>> Signed-off-by: Umang Jain <umang.jain@ideasonboard.com>
>>> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
>>> ---
>>>   src/android/camera_device.cpp |  25 +-------
>>>   src/android/camera_device.h   |   3 +
>>>   src/android/camera_stream.cpp | 108 +++++++++++++++++++++++++++++++---
>>>   src/android/camera_stream.h   |  39 +++++++++++-
>>>   4 files changed, 144 insertions(+), 31 deletions(-)
>>>
>>> diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
>>> index eba370ea..61b902ad 100644
>>> --- a/src/android/camera_device.cpp
>>> +++ b/src/android/camera_device.cpp
>>> @@ -239,6 +239,7 @@ Camera3RequestDescriptor::Camera3RequestDescriptor(
>>>   	/* Clone the controls associated with the camera3 request. */
>>>   	settings_ = CameraMetadata(camera3Request->settings);
>>>   
>>> +	dest_.reset();
>> dest_ is a std::unique_ptr<>, its constructor will do the right thing,
>> you don't need to initialize it here.
>>
>>>   	/*
>>>   	 * Create the CaptureRequest, stored as a unique_ptr<> to tie its
>>>   	 * lifetime to the descriptor.
>>> @@ -1094,28 +1095,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
>>>   
>>>   void CameraDevice::requestComplete(Request *request)
>>>   {
>>> -	Camera3RequestDescriptor *descriptor;
>>> -	{
>>> -		MutexLocker descriptorsLock(descriptorsMutex_);
>>> -		ASSERT(!descriptors_.empty());
>>> -		descriptor = descriptors_.front().get();
>>> -	}
>>> -
>>> -	if (descriptor->request_->cookie() != request->cookie()) {
>>> -		/*
>>> -		 * \todo Clarify if the Camera has to be closed on
>>> -		 * ERROR_DEVICE and possibly demote the Fatal to simple
>>> -		 * Error.
>>> -		 */
>>> -		notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
>>> -		LOG(HAL, Fatal)
>>> -			<< "Out-of-order completion for request "
>>> -			<< utils::hex(request->cookie());
>>> -
>>> -		MutexLocker descriptorsLock(descriptorsMutex_);
>>> -		descriptors_.pop();
>>> -		return;
>>> -	}
>>> +	Camera3RequestDescriptor *descriptor =
>>> +		reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
>>>   
>>>   	/*
>>>   	 * Prepare the capture result for the Android camera stack.
>>> diff --git a/src/android/camera_device.h b/src/android/camera_device.h
>>> index eee97516..725a0618 100644
>>> --- a/src/android/camera_device.h
>>> +++ b/src/android/camera_device.h
>>> @@ -59,6 +59,9 @@ struct Camera3RequestDescriptor {
>>>   	std::unique_ptr<CameraMetadata> resultMetadata_;
>>>   	libcamera::FrameBuffer *internalBuffer_;
>>>   
>>> +	std::unique_ptr<CameraBuffer> dest_;
>>> +	const libcamera::FrameBuffer *src_;
>> As mentioned in the review of the previous patch, you can have more than
>> one post-processed stream per request, so this won't be enough.
>>
>> I'd recomment first refactoring the Camera3RequestDescriptor class and
>> add an internal
>>
>> 	struct Stream {
>> 		camera3_stream_buffer_t buffer;
>> 		std::unique_ptr<libcamera::FrameBuffer> frameBuffer;
>> 	};
>>
>> with the buffers_ and frameBuffers members replaced with
>>
>> 	std::vector<Stream> streams_;
>>
>> Then you can extend the Stream structure in this patch to add the
>> necessary fields.
>>
>> Thinking some more about it, src_ is likely not needed, as it's a
>> pointer to the FrameBuffer already stored in struct Stream. What you'll
>> need will be the ability to find the Stream instance corresponding to a
>> given libcamera stream, so maybe a map would be better than a vector.
>>
>> It also seems like the PostProcessorWorker should move from processing
>> requests to processing streams, as there's one PostProcessorWorker for
>> each CameraStream. Maybe the struct Stream should contain a pointer to
>> its Camera3RequestDescriptor, that way you could pass the
>> Camera3RequestDescriptor::Stream pointer to CameraStream::process() and
>> to the post-processors, and then find the corresponding
>> Camera3RequestDescriptor in the completion handler.
> By the way, another option may be to move the PostProcessorWorker to
> CameraDevice and still give it a Camera3RequestDescriptor, and
> internally in the thread run the post-processors sequentially for each
> post-processed stream. If we had a lot of post-processed streams I would
> say that would be a better design, as we could then create a threads
> pool (with N threads for M streams, and N < M) and dispatch the jobs to
> those threads, but that's overkill I think. Still, maybe a single thread
> design would be easier and look cleaner, I'm not sure.


PostProcessorWorker can be a self sustaining but looking at the things 
right now, I would leave it in camera-stream itself.

If we end up with thread pools in the future, I will happy to rework it, 
is that okay?

>
>>> +
>>>   	camera3_capture_result_t captureResult_ = {};
>>>   	Status status_ = Status::Pending;
>>>   };
>>> diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
>>> index cec07269..818ef948 100644
>>> --- a/src/android/camera_stream.cpp
>>> +++ b/src/android/camera_stream.cpp
>>> @@ -94,10 +94,12 @@ int CameraStream::configure()
>>>   		if (ret)
>>>   			return ret;
>>>   
>>> +		worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
>>>   		postProcessor_->processComplete.connect(
>>>   			this, [&](Camera3RequestDescriptor *request, PostProcessor::Status status) {
>>>   				cameraDevice_->streamProcessingComplete(this, request, status);
>>>   			});
>>> +		worker_->start();
>>>   	}
>>>   
>>>   	if (type_ == Type::Internal) {
>>> @@ -167,19 +169,26 @@ void CameraStream::process(const FrameBuffer &source,
>>>   	if (!postProcessor_)
>>>   		return;
>>>   
>>> -	/*
>>> -	 * \todo Buffer mapping and processing should be moved to a
>>> -	 * separate thread.
>>> -	 */
>>>   	const StreamConfiguration &output = configuration();
>>> -	CameraBuffer dest(*camera3Dest.buffer, output.pixelFormat, output.size,
>>> -			  PROT_READ | PROT_WRITE);
>>> -	if (!dest.isValid()) {
>>> +	request->dest_ = std::make_unique<CameraBuffer>(
>>> +		*camera3Dest.buffer, output.pixelFormat, output.size, PROT_READ | PROT_WRITE);
>>> +	if (!request->dest_->isValid()) {
>>>   		LOG(HAL, Error) << "Failed to create destination buffer";
>>>   		return;
>>>   	}
>>>   
>>> -	postProcessor_->process(source, &dest, request);
>>> +	request->src_ = &source;
>>> +
>>> +	/* Push the postProcessor request to the worker queue. */
>>> +	worker_->queueRequest(request);
>>> +}
>>> +
>>> +void CameraStream::flush()
>>> +{
>>> +	if (!postProcessor_)
>>> +		return;
>>> +
>>> +	worker_->flush();
>>>   }
>>>   
>>>   FrameBuffer *CameraStream::getBuffer()
>>> @@ -209,3 +218,86 @@ void CameraStream::putBuffer(FrameBuffer *buffer)
>>>   
>>>   	buffers_.push_back(buffer);
>>>   }
>>> +
>>> +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
>>> +	: postProcessor_(postProcessor)
>>> +{
>>> +}
>>> +
>>> +CameraStream::PostProcessorWorker::~PostProcessorWorker()
>>> +{
>>> +	{
>>> +		libcamera::MutexLocker lock(mutex_);
>>> +		state_ = State::Stopped;
>>> +	}
>>> +
>>> +	cv_.notify_one();
>>> +	wait();
>>> +}
>>> +
>>> +void CameraStream::PostProcessorWorker::start()
>>> +{
>>> +	{
>>> +		libcamera::MutexLocker lock(mutex_);
>>> +		state_ = State::Running;
>>> +	}
>>> +
>>> +	Thread::start();
>>> +}
>>> +
>>> +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor *request)
>>> +{
>>> +	{
>>> +		MutexLocker lock(mutex_);
>>> +		ASSERT(state_ == State::Running);
>>> +		requests_.push(request);
>>> +	}
>>> +	cv_.notify_one();
>>> +}
>>> +
>>> +void CameraStream::PostProcessorWorker::run()
>>> +{
>>> +	MutexLocker locker(mutex_);
>>> +
>>> +	while (1) {
>>> +		cv_.wait(locker, [&] {
>>> +			return state_ != State::Running || !requests_.empty();
>>> +		});
>>> +
>>> +		if (state_ != State::Running)
>>> +			break;
>>> +
>>> +		Camera3RequestDescriptor *descriptor = requests_.front();
>>> +		requests_.pop();
>>> +		locker.unlock();
>>> +
>>> +		postProcessor_->process(*descriptor->src_, descriptor->dest_.get(),
>>> +					descriptor);
>>> +
>>> +		locker.lock();
>>> +	}
>>> +
>>> +	if (state_ == State::Flushing) {
>>> +		while (!requests_.empty()) {
>>> +			postProcessor_->processComplete.emit(requests_.front(),
>>> +							     PostProcessor::Status::Error);
>>> +			requests_.pop();
>>> +		}
>>> +		state_ = State::Stopped;
>>> +		locker.unlock();
>>> +		cv_.notify_one();
>>> +	}
>>> +}
>>> +
>>> +void CameraStream::PostProcessorWorker::flush()
>>> +{
>>> +	libcamera::MutexLocker lock(mutex_);
>>> +	state_ = State::Flushing;
>>> +	lock.unlock();
>>> +	cv_.notify_one();
>>> +
>>> +	lock.lock();
>>> +	cv_.wait(lock, [&] {
>>> +		return state_ == State::Stopped;
>>> +	});
>>> +}
>>> diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
>>> index a0c5f166..e410f35d 100644
>>> --- a/src/android/camera_stream.h
>>> +++ b/src/android/camera_stream.h
>>> @@ -7,21 +7,26 @@
>>>   #ifndef __ANDROID_CAMERA_STREAM_H__
>>>   #define __ANDROID_CAMERA_STREAM_H__
>>>   
>>> +#include <condition_variable>
>>>   #include <memory>
>>>   #include <mutex>
>>> +#include <queue>
>>>   #include <vector>
>>>   
>>>   #include <hardware/camera3.h>
>>>   
>>> +#include <libcamera/base/thread.h>
>>> +
>>>   #include <libcamera/camera.h>
>>>   #include <libcamera/framebuffer.h>
>>>   #include <libcamera/framebuffer_allocator.h>
>>>   #include <libcamera/geometry.h>
>>>   #include <libcamera/pixel_format.h>
>>>   
>>> +#include "post_processor.h"
>>> +
>>>   class CameraDevice;
>>>   class CameraMetadata;
>>> -class PostProcessor;
>>>   
>>>   struct Camera3RequestDescriptor;
>>>   
>>> @@ -125,8 +130,38 @@ public:
>>>   		     Camera3RequestDescriptor *request);
>>>   	libcamera::FrameBuffer *getBuffer();
>>>   	void putBuffer(libcamera::FrameBuffer *buffer);
>>> +	void flush();
>>>   
>>>   private:
>>> +	class PostProcessorWorker : public libcamera::Thread
>>> +	{
>>> +	public:
>>> +		enum class State {
>>> +			Stopped,
>>> +			Running,
>>> +			Flushing,
>>> +		};
>>> +
>>> +		PostProcessorWorker(PostProcessor *postProcessor);
>>> +		~PostProcessorWorker();
>>> +
>>> +		void start();
>>> +		void queueRequest(Camera3RequestDescriptor *request);
>>> +		void flush();
>>> +
>>> +	protected:
>>> +		void run() override;
>>> +
>>> +	private:
>>> +		PostProcessor *postProcessor_;
>>> +
>>> +		libcamera::Mutex mutex_;
>>> +		std::condition_variable cv_;
>>> +
>>> +		std::queue<Camera3RequestDescriptor *> requests_;
>>> +		State state_;
>>> +	};
>>> +
>>>   	int waitFence(int fence);
>>>   
>>>   	CameraDevice *const cameraDevice_;
>>> @@ -143,6 +178,8 @@ private:
>>>   	 */
>>>   	std::unique_ptr<std::mutex> mutex_;
>>>   	std::unique_ptr<PostProcessor> postProcessor_;
>>> +
>>> +	std::unique_ptr<PostProcessorWorker> worker_;
>>>   };
>>>   
>>>   #endif /* __ANDROID_CAMERA_STREAM__ */
Laurent Pinchart Oct. 13, 2021, 10:18 a.m. UTC | #6
Hi Umang,

On Wed, Oct 13, 2021 at 03:14:21PM +0530, Umang Jain wrote:
> On 10/13/21 5:27 AM, Laurent Pinchart wrote:
> > Hi Umang,
> >
> > Thank you for the patch.
> >
> > s/async/asynchronous/ in the subject line.
> >
> > On Mon, Oct 11, 2021 at 01:05:03PM +0530, Umang Jain wrote:
> >> Introduce a dedicated worker class derived from libcamera::Thread.
> >> The worker class maintains a queue for post-processing requests
> >> and waits for a post-processing request to become available.
> >> It will process them as per FIFO before de-queuing it from the
> >> queue.
> >>
> >> To get access to the source and destination buffers in the worker
> >> thread, we also need to save a pointer to them in the
> >> Camera3RequestDescriptor.
> >>
> >> This patch also implements a flush() for the PostProcessorWorker
> >> class which is responsible to purge post-processing requests
> >> queued up while a camera is stopping/flushing.
> >>
> >> The libcamera request completion handler CameraDevice::requestComplete()
> >> assumes that the request that has just completed is at the front of the
> >> queue. Now that the post-processor runs asynchronously, this isn't true
> >> anymore, a request being post-processed will stay in the queue and a new
> >> libcamera request may complete. Remove that assumption, and use the
> >> request cookie to obtain the Camera3RequestDescriptor.
> >>
> >> Signed-off-by: Umang Jain <umang.jain@ideasonboard.com>
> >> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> >> ---
> >>   src/android/camera_device.cpp |  25 +-------
> >>   src/android/camera_device.h   |   3 +
> >>   src/android/camera_stream.cpp | 108 +++++++++++++++++++++++++++++++---
> >>   src/android/camera_stream.h   |  39 +++++++++++-
> >>   4 files changed, 144 insertions(+), 31 deletions(-)
> >>
> >> diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
> >> index eba370ea..61b902ad 100644
> >> --- a/src/android/camera_device.cpp
> >> +++ b/src/android/camera_device.cpp
> >> @@ -239,6 +239,7 @@ Camera3RequestDescriptor::Camera3RequestDescriptor(
> >>   	/* Clone the controls associated with the camera3 request. */
> >>   	settings_ = CameraMetadata(camera3Request->settings);
> >>   
> >> +	dest_.reset();
> >
> > dest_ is a std::unique_ptr<>, its constructor will do the right thing,
> > you don't need to initialize it here.
> >
> >>   	/*
> >>   	 * Create the CaptureRequest, stored as a unique_ptr<> to tie its
> >>   	 * lifetime to the descriptor.
> >> @@ -1094,28 +1095,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
> >>   
> >>   void CameraDevice::requestComplete(Request *request)
> >>   {
> >> -	Camera3RequestDescriptor *descriptor;
> >> -	{
> >> -		MutexLocker descriptorsLock(descriptorsMutex_);
> >> -		ASSERT(!descriptors_.empty());
> >> -		descriptor = descriptors_.front().get();
> >> -	}
> >> -
> >> -	if (descriptor->request_->cookie() != request->cookie()) {
> >> -		/*
> >> -		 * \todo Clarify if the Camera has to be closed on
> >> -		 * ERROR_DEVICE and possibly demote the Fatal to simple
> >> -		 * Error.
> >> -		 */
> >> -		notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
> >> -		LOG(HAL, Fatal)
> >> -			<< "Out-of-order completion for request "
> >> -			<< utils::hex(request->cookie());
> >> -
> >> -		MutexLocker descriptorsLock(descriptorsMutex_);
> >> -		descriptors_.pop();
> >> -		return;
> >> -	}
> >> +	Camera3RequestDescriptor *descriptor =
> >> +		reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
> >>   
> >>   	/*
> >>   	 * Prepare the capture result for the Android camera stack.
> >> diff --git a/src/android/camera_device.h b/src/android/camera_device.h
> >> index eee97516..725a0618 100644
> >> --- a/src/android/camera_device.h
> >> +++ b/src/android/camera_device.h
> >> @@ -59,6 +59,9 @@ struct Camera3RequestDescriptor {
> >>   	std::unique_ptr<CameraMetadata> resultMetadata_;
> >>   	libcamera::FrameBuffer *internalBuffer_;
> >>   
> >> +	std::unique_ptr<CameraBuffer> dest_;
> >> +	const libcamera::FrameBuffer *src_;
> > As mentioned in the review of the previous patch, you can have more than
> > one post-processed stream per request, so this won't be enough.
> 
> Do we actually know if we have such requests coming from, that require 
> multiple post-processed streams?
> 
> Or is the answer, "We might, from the CTS framework" ?

It can happen, for instance with both one YUV downscaled stream produced
in software and a JPEG stream in the same request.

> > I'd recomment first refactoring the Camera3RequestDescriptor class and
> > add an internal
> >
> > 	struct Stream {
> > 		camera3_stream_buffer_t buffer;
> > 		std::unique_ptr<libcamera::FrameBuffer> frameBuffer;
> > 	};
> >
> > with the buffers_ and frameBuffers members replaced with
> >
> > 	std::vector<Stream> streams_;
> >
> > Then you can extend the Stream structure in this patch to add the
> > necessary fields.
> 
> Makes sense.
> 
> > Thinking some more about it, src_ is likely not needed, as it's a
> > pointer to the FrameBuffer already stored in struct Stream. What you'll
> > need will be the ability to find the Stream instance corresponding to a
> > given libcamera stream, so maybe a map would be better than a vector.
> 
> Yes, we should be able to associate it from the start otherwise we will 
> need to introduce needless iterations on finding it just queuing it to 
> post-processor.
> 
> > It also seems like the PostProcessorWorker should move from processing
> > requests to processing streams, as there's one PostProcessorWorker for
> > each CameraStream. Maybe the struct Stream should contain a pointer to
> > its Camera3RequestDescriptor, that way you could pass the
> > Camera3RequestDescriptor::Stream pointer to CameraStream::process() and
> > to the post-processors, and then find the corresponding
> > Camera3RequestDescriptor in the completion handler.
> 
> One potential issue here post-processing streams might require to know 
> the statuses of other (post-processing) streams too, belonging to the 
> same request? Because we need to complete the request only after all 
> streams have finished post-processing,

You need to keep track of the pending post-processing calls in the
descriptor, with a list of streams being post-processed, or possibly
just a counter. Make sure to pay attention to race conditions and
locking.

> Currently if we only maintain one queue, that can end up with 
> post-processing requests of streams from multiple successive requests. 
> Need to think a bit but surely, we might need to containerize this 
> further I think (container for streams of one request + container for 
> post-processing requests). We have the latter, but I think we also need 
> to have a former but avaibility to check container.empty(). Error 
> handling paths also might can get tricky in such a scenario. Let's see.
> 
> >> +
> >>   	camera3_capture_result_t captureResult_ = {};
> >>   	Status status_ = Status::Pending;
> >>   };
> >> diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
> >> index cec07269..818ef948 100644
> >> --- a/src/android/camera_stream.cpp
> >> +++ b/src/android/camera_stream.cpp
> >> @@ -94,10 +94,12 @@ int CameraStream::configure()
> >>   		if (ret)
> >>   			return ret;
> >>   
> >> +		worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
> >>   		postProcessor_->processComplete.connect(
> >>   			this, [&](Camera3RequestDescriptor *request, PostProcessor::Status status) {
> >>   				cameraDevice_->streamProcessingComplete(this, request, status);
> >>   			});
> >> +		worker_->start();
> >>   	}
> >>   
> >>   	if (type_ == Type::Internal) {
> >> @@ -167,19 +169,26 @@ void CameraStream::process(const FrameBuffer &source,
> >>   	if (!postProcessor_)
> >>   		return;
> >>   
> >> -	/*
> >> -	 * \todo Buffer mapping and processing should be moved to a
> >> -	 * separate thread.
> >> -	 */
> >>   	const StreamConfiguration &output = configuration();
> >> -	CameraBuffer dest(*camera3Dest.buffer, output.pixelFormat, output.size,
> >> -			  PROT_READ | PROT_WRITE);
> >> -	if (!dest.isValid()) {
> >> +	request->dest_ = std::make_unique<CameraBuffer>(
> >> +		*camera3Dest.buffer, output.pixelFormat, output.size, PROT_READ | PROT_WRITE);
> >> +	if (!request->dest_->isValid()) {
> >>   		LOG(HAL, Error) << "Failed to create destination buffer";
> >>   		return;
> >>   	}
> >>   
> >> -	postProcessor_->process(source, &dest, request);
> >> +	request->src_ = &source;
> >> +
> >> +	/* Push the postProcessor request to the worker queue. */
> >> +	worker_->queueRequest(request);
> >> +}
> >> +
> >> +void CameraStream::flush()
> >> +{
> >> +	if (!postProcessor_)
> >> +		return;
> >> +
> >> +	worker_->flush();
> >>   }
> >>   
> >>   FrameBuffer *CameraStream::getBuffer()
> >> @@ -209,3 +218,86 @@ void CameraStream::putBuffer(FrameBuffer *buffer)
> >>   
> >>   	buffers_.push_back(buffer);
> >>   }
> >> +
> >> +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
> >> +	: postProcessor_(postProcessor)
> >> +{
> >> +}
> >> +
> >> +CameraStream::PostProcessorWorker::~PostProcessorWorker()
> >> +{
> >> +	{
> >> +		libcamera::MutexLocker lock(mutex_);
> >> +		state_ = State::Stopped;
> >> +	}
> >> +
> >> +	cv_.notify_one();
> >> +	wait();
> >> +}
> >> +
> >> +void CameraStream::PostProcessorWorker::start()
> >> +{
> >> +	{
> >> +		libcamera::MutexLocker lock(mutex_);
> >> +		state_ = State::Running;
> >> +	}
> >> +
> >> +	Thread::start();
> >> +}
> >> +
> >> +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor *request)
> >> +{
> >> +	{
> >> +		MutexLocker lock(mutex_);
> >> +		ASSERT(state_ == State::Running);
> >> +		requests_.push(request);
> >> +	}
> >> +	cv_.notify_one();
> >> +}
> >> +
> >> +void CameraStream::PostProcessorWorker::run()
> >> +{
> >> +	MutexLocker locker(mutex_);
> >> +
> >> +	while (1) {
> >> +		cv_.wait(locker, [&] {
> >> +			return state_ != State::Running || !requests_.empty();
> >> +		});
> >> +
> >> +		if (state_ != State::Running)
> >> +			break;
> >> +
> >> +		Camera3RequestDescriptor *descriptor = requests_.front();
> >> +		requests_.pop();
> >> +		locker.unlock();
> >> +
> >> +		postProcessor_->process(*descriptor->src_, descriptor->dest_.get(),
> >> +					descriptor);
> >> +
> >> +		locker.lock();
> >> +	}
> >> +
> >> +	if (state_ == State::Flushing) {
> >> +		while (!requests_.empty()) {
> >> +			postProcessor_->processComplete.emit(requests_.front(),
> >> +							     PostProcessor::Status::Error);
> >> +			requests_.pop();
> >> +		}
> >> +		state_ = State::Stopped;
> >> +		locker.unlock();
> >> +		cv_.notify_one();
> >> +	}
> >> +}
> >> +
> >> +void CameraStream::PostProcessorWorker::flush()
> >> +{
> >> +	libcamera::MutexLocker lock(mutex_);
> >> +	state_ = State::Flushing;
> >> +	lock.unlock();
> >> +	cv_.notify_one();
> >> +
> >> +	lock.lock();
> >> +	cv_.wait(lock, [&] {
> >> +		return state_ == State::Stopped;
> >> +	});
> >> +}
> >> diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
> >> index a0c5f166..e410f35d 100644
> >> --- a/src/android/camera_stream.h
> >> +++ b/src/android/camera_stream.h
> >> @@ -7,21 +7,26 @@
> >>   #ifndef __ANDROID_CAMERA_STREAM_H__
> >>   #define __ANDROID_CAMERA_STREAM_H__
> >>   
> >> +#include <condition_variable>
> >>   #include <memory>
> >>   #include <mutex>
> >> +#include <queue>
> >>   #include <vector>
> >>   
> >>   #include <hardware/camera3.h>
> >>   
> >> +#include <libcamera/base/thread.h>
> >> +
> >>   #include <libcamera/camera.h>
> >>   #include <libcamera/framebuffer.h>
> >>   #include <libcamera/framebuffer_allocator.h>
> >>   #include <libcamera/geometry.h>
> >>   #include <libcamera/pixel_format.h>
> >>   
> >> +#include "post_processor.h"
> >> +
> >>   class CameraDevice;
> >>   class CameraMetadata;
> >> -class PostProcessor;
> >>   
> >>   struct Camera3RequestDescriptor;
> >>   
> >> @@ -125,8 +130,38 @@ public:
> >>   		     Camera3RequestDescriptor *request);
> >>   	libcamera::FrameBuffer *getBuffer();
> >>   	void putBuffer(libcamera::FrameBuffer *buffer);
> >> +	void flush();
> >>   
> >>   private:
> >> +	class PostProcessorWorker : public libcamera::Thread
> >> +	{
> >> +	public:
> >> +		enum class State {
> >> +			Stopped,
> >> +			Running,
> >> +			Flushing,
> >> +		};
> >> +
> >> +		PostProcessorWorker(PostProcessor *postProcessor);
> >> +		~PostProcessorWorker();
> >> +
> >> +		void start();
> >> +		void queueRequest(Camera3RequestDescriptor *request);
> >> +		void flush();
> >> +
> >> +	protected:
> >> +		void run() override;
> >> +
> >> +	private:
> >> +		PostProcessor *postProcessor_;
> >> +
> >> +		libcamera::Mutex mutex_;
> >> +		std::condition_variable cv_;
> >> +
> >> +		std::queue<Camera3RequestDescriptor *> requests_;
> >> +		State state_;
> >> +	};
> >> +
> >>   	int waitFence(int fence);
> >>   
> >>   	CameraDevice *const cameraDevice_;
> >> @@ -143,6 +178,8 @@ private:
> >>   	 */
> >>   	std::unique_ptr<std::mutex> mutex_;
> >>   	std::unique_ptr<PostProcessor> postProcessor_;
> >> +
> >> +	std::unique_ptr<PostProcessorWorker> worker_;
> >>   };
> >>   
> >>   #endif /* __ANDROID_CAMERA_STREAM__ */
Laurent Pinchart Oct. 13, 2021, 10:21 a.m. UTC | #7
Hi Umang,

On Wed, Oct 13, 2021 at 03:21:28PM +0530, Umang Jain wrote:
> On 10/13/21 5:32 AM, Laurent Pinchart wrote:
> > On Wed, Oct 13, 2021 at 02:57:16AM +0300, Laurent Pinchart wrote:
> >> Hi Umang,
> >>
> >> Thank you for the patch.
> >>
> >> s/async/asynchronous/ in the subject line.
> >>
> >> On Mon, Oct 11, 2021 at 01:05:03PM +0530, Umang Jain wrote:
> >>> Introduce a dedicated worker class derived from libcamera::Thread.
> >>> The worker class maintains a queue for post-processing requests
> >>> and waits for a post-processing request to become available.
> >>> It will process them as per FIFO before de-queuing it from the
> >>> queue.
> >>>
> >>> To get access to the source and destination buffers in the worker
> >>> thread, we also need to save a pointer to them in the
> >>> Camera3RequestDescriptor.
> >>>
> >>> This patch also implements a flush() for the PostProcessorWorker
> >>> class which is responsible to purge post-processing requests
> >>> queued up while a camera is stopping/flushing.
> >>>
> >>> The libcamera request completion handler CameraDevice::requestComplete()
> >>> assumes that the request that has just completed is at the front of the
> >>> queue. Now that the post-processor runs asynchronously, this isn't true
> >>> anymore, a request being post-processed will stay in the queue and a new
> >>> libcamera request may complete. Remove that assumption, and use the
> >>> request cookie to obtain the Camera3RequestDescriptor.
> >>>
> >>> Signed-off-by: Umang Jain <umang.jain@ideasonboard.com>
> >>> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> >>> ---
> >>>   src/android/camera_device.cpp |  25 +-------
> >>>   src/android/camera_device.h   |   3 +
> >>>   src/android/camera_stream.cpp | 108 +++++++++++++++++++++++++++++++---
> >>>   src/android/camera_stream.h   |  39 +++++++++++-
> >>>   4 files changed, 144 insertions(+), 31 deletions(-)
> >>>
> >>> diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
> >>> index eba370ea..61b902ad 100644
> >>> --- a/src/android/camera_device.cpp
> >>> +++ b/src/android/camera_device.cpp
> >>> @@ -239,6 +239,7 @@ Camera3RequestDescriptor::Camera3RequestDescriptor(
> >>>   	/* Clone the controls associated with the camera3 request. */
> >>>   	settings_ = CameraMetadata(camera3Request->settings);
> >>>   
> >>> +	dest_.reset();
> >> dest_ is a std::unique_ptr<>, its constructor will do the right thing,
> >> you don't need to initialize it here.
> >>
> >>>   	/*
> >>>   	 * Create the CaptureRequest, stored as a unique_ptr<> to tie its
> >>>   	 * lifetime to the descriptor.
> >>> @@ -1094,28 +1095,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
> >>>   
> >>>   void CameraDevice::requestComplete(Request *request)
> >>>   {
> >>> -	Camera3RequestDescriptor *descriptor;
> >>> -	{
> >>> -		MutexLocker descriptorsLock(descriptorsMutex_);
> >>> -		ASSERT(!descriptors_.empty());
> >>> -		descriptor = descriptors_.front().get();
> >>> -	}
> >>> -
> >>> -	if (descriptor->request_->cookie() != request->cookie()) {
> >>> -		/*
> >>> -		 * \todo Clarify if the Camera has to be closed on
> >>> -		 * ERROR_DEVICE and possibly demote the Fatal to simple
> >>> -		 * Error.
> >>> -		 */
> >>> -		notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
> >>> -		LOG(HAL, Fatal)
> >>> -			<< "Out-of-order completion for request "
> >>> -			<< utils::hex(request->cookie());
> >>> -
> >>> -		MutexLocker descriptorsLock(descriptorsMutex_);
> >>> -		descriptors_.pop();
> >>> -		return;
> >>> -	}
> >>> +	Camera3RequestDescriptor *descriptor =
> >>> +		reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
> >>>   
> >>>   	/*
> >>>   	 * Prepare the capture result for the Android camera stack.
> >>> diff --git a/src/android/camera_device.h b/src/android/camera_device.h
> >>> index eee97516..725a0618 100644
> >>> --- a/src/android/camera_device.h
> >>> +++ b/src/android/camera_device.h
> >>> @@ -59,6 +59,9 @@ struct Camera3RequestDescriptor {
> >>>   	std::unique_ptr<CameraMetadata> resultMetadata_;
> >>>   	libcamera::FrameBuffer *internalBuffer_;
> >>>   
> >>> +	std::unique_ptr<CameraBuffer> dest_;
> >>> +	const libcamera::FrameBuffer *src_;
> >> As mentioned in the review of the previous patch, you can have more than
> >> one post-processed stream per request, so this won't be enough.
> >>
> >> I'd recomment first refactoring the Camera3RequestDescriptor class and
> >> add an internal
> >>
> >> 	struct Stream {
> >> 		camera3_stream_buffer_t buffer;
> >> 		std::unique_ptr<libcamera::FrameBuffer> frameBuffer;
> >> 	};
> >>
> >> with the buffers_ and frameBuffers members replaced with
> >>
> >> 	std::vector<Stream> streams_;
> >>
> >> Then you can extend the Stream structure in this patch to add the
> >> necessary fields.
> >>
> >> Thinking some more about it, src_ is likely not needed, as it's a
> >> pointer to the FrameBuffer already stored in struct Stream. What you'll
> >> need will be the ability to find the Stream instance corresponding to a
> >> given libcamera stream, so maybe a map would be better than a vector.
> >>
> >> It also seems like the PostProcessorWorker should move from processing
> >> requests to processing streams, as there's one PostProcessorWorker for
> >> each CameraStream. Maybe the struct Stream should contain a pointer to
> >> its Camera3RequestDescriptor, that way you could pass the
> >> Camera3RequestDescriptor::Stream pointer to CameraStream::process() and
> >> to the post-processors, and then find the corresponding
> >> Camera3RequestDescriptor in the completion handler.
> >
> > By the way, another option may be to move the PostProcessorWorker to
> > CameraDevice and still give it a Camera3RequestDescriptor, and
> > internally in the thread run the post-processors sequentially for each
> > post-processed stream. If we had a lot of post-processed streams I would
> > say that would be a better design, as we could then create a threads
> > pool (with N threads for M streams, and N < M) and dispatch the jobs to
> > those threads, but that's overkill I think. Still, maybe a single thread
> > design would be easier and look cleaner, I'm not sure.
> 
> PostProcessorWorker can be a self sustaining but looking at the things 
> right now, I would leave it in camera-stream itself.
> 
> If we end up with thread pools in the future, I will happy to rework it, 
> is that okay?

I don't think we'll end up with thread pools, it will likely be one
thread per stream, or a single thread. Either way works for me, with
bonus points if the implementation can compartiment the bits and pieces
nicely to make a rework not too difficult later (and to keep the code
and data structures readable in any case).

> >>> +
> >>>   	camera3_capture_result_t captureResult_ = {};
> >>>   	Status status_ = Status::Pending;
> >>>   };
> >>> diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
> >>> index cec07269..818ef948 100644
> >>> --- a/src/android/camera_stream.cpp
> >>> +++ b/src/android/camera_stream.cpp
> >>> @@ -94,10 +94,12 @@ int CameraStream::configure()
> >>>   		if (ret)
> >>>   			return ret;
> >>>   
> >>> +		worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
> >>>   		postProcessor_->processComplete.connect(
> >>>   			this, [&](Camera3RequestDescriptor *request, PostProcessor::Status status) {
> >>>   				cameraDevice_->streamProcessingComplete(this, request, status);
> >>>   			});
> >>> +		worker_->start();
> >>>   	}
> >>>   
> >>>   	if (type_ == Type::Internal) {
> >>> @@ -167,19 +169,26 @@ void CameraStream::process(const FrameBuffer &source,
> >>>   	if (!postProcessor_)
> >>>   		return;
> >>>   
> >>> -	/*
> >>> -	 * \todo Buffer mapping and processing should be moved to a
> >>> -	 * separate thread.
> >>> -	 */
> >>>   	const StreamConfiguration &output = configuration();
> >>> -	CameraBuffer dest(*camera3Dest.buffer, output.pixelFormat, output.size,
> >>> -			  PROT_READ | PROT_WRITE);
> >>> -	if (!dest.isValid()) {
> >>> +	request->dest_ = std::make_unique<CameraBuffer>(
> >>> +		*camera3Dest.buffer, output.pixelFormat, output.size, PROT_READ | PROT_WRITE);
> >>> +	if (!request->dest_->isValid()) {
> >>>   		LOG(HAL, Error) << "Failed to create destination buffer";
> >>>   		return;
> >>>   	}
> >>>   
> >>> -	postProcessor_->process(source, &dest, request);
> >>> +	request->src_ = &source;
> >>> +
> >>> +	/* Push the postProcessor request to the worker queue. */
> >>> +	worker_->queueRequest(request);
> >>> +}
> >>> +
> >>> +void CameraStream::flush()
> >>> +{
> >>> +	if (!postProcessor_)
> >>> +		return;
> >>> +
> >>> +	worker_->flush();
> >>>   }
> >>>   
> >>>   FrameBuffer *CameraStream::getBuffer()
> >>> @@ -209,3 +218,86 @@ void CameraStream::putBuffer(FrameBuffer *buffer)
> >>>   
> >>>   	buffers_.push_back(buffer);
> >>>   }
> >>> +
> >>> +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
> >>> +	: postProcessor_(postProcessor)
> >>> +{
> >>> +}
> >>> +
> >>> +CameraStream::PostProcessorWorker::~PostProcessorWorker()
> >>> +{
> >>> +	{
> >>> +		libcamera::MutexLocker lock(mutex_);
> >>> +		state_ = State::Stopped;
> >>> +	}
> >>> +
> >>> +	cv_.notify_one();
> >>> +	wait();
> >>> +}
> >>> +
> >>> +void CameraStream::PostProcessorWorker::start()
> >>> +{
> >>> +	{
> >>> +		libcamera::MutexLocker lock(mutex_);
> >>> +		state_ = State::Running;
> >>> +	}
> >>> +
> >>> +	Thread::start();
> >>> +}
> >>> +
> >>> +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor *request)
> >>> +{
> >>> +	{
> >>> +		MutexLocker lock(mutex_);
> >>> +		ASSERT(state_ == State::Running);
> >>> +		requests_.push(request);
> >>> +	}
> >>> +	cv_.notify_one();
> >>> +}
> >>> +
> >>> +void CameraStream::PostProcessorWorker::run()
> >>> +{
> >>> +	MutexLocker locker(mutex_);
> >>> +
> >>> +	while (1) {
> >>> +		cv_.wait(locker, [&] {
> >>> +			return state_ != State::Running || !requests_.empty();
> >>> +		});
> >>> +
> >>> +		if (state_ != State::Running)
> >>> +			break;
> >>> +
> >>> +		Camera3RequestDescriptor *descriptor = requests_.front();
> >>> +		requests_.pop();
> >>> +		locker.unlock();
> >>> +
> >>> +		postProcessor_->process(*descriptor->src_, descriptor->dest_.get(),
> >>> +					descriptor);
> >>> +
> >>> +		locker.lock();
> >>> +	}
> >>> +
> >>> +	if (state_ == State::Flushing) {
> >>> +		while (!requests_.empty()) {
> >>> +			postProcessor_->processComplete.emit(requests_.front(),
> >>> +							     PostProcessor::Status::Error);
> >>> +			requests_.pop();
> >>> +		}
> >>> +		state_ = State::Stopped;
> >>> +		locker.unlock();
> >>> +		cv_.notify_one();
> >>> +	}
> >>> +}
> >>> +
> >>> +void CameraStream::PostProcessorWorker::flush()
> >>> +{
> >>> +	libcamera::MutexLocker lock(mutex_);
> >>> +	state_ = State::Flushing;
> >>> +	lock.unlock();
> >>> +	cv_.notify_one();
> >>> +
> >>> +	lock.lock();
> >>> +	cv_.wait(lock, [&] {
> >>> +		return state_ == State::Stopped;
> >>> +	});
> >>> +}
> >>> diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
> >>> index a0c5f166..e410f35d 100644
> >>> --- a/src/android/camera_stream.h
> >>> +++ b/src/android/camera_stream.h
> >>> @@ -7,21 +7,26 @@
> >>>   #ifndef __ANDROID_CAMERA_STREAM_H__
> >>>   #define __ANDROID_CAMERA_STREAM_H__
> >>>   
> >>> +#include <condition_variable>
> >>>   #include <memory>
> >>>   #include <mutex>
> >>> +#include <queue>
> >>>   #include <vector>
> >>>   
> >>>   #include <hardware/camera3.h>
> >>>   
> >>> +#include <libcamera/base/thread.h>
> >>> +
> >>>   #include <libcamera/camera.h>
> >>>   #include <libcamera/framebuffer.h>
> >>>   #include <libcamera/framebuffer_allocator.h>
> >>>   #include <libcamera/geometry.h>
> >>>   #include <libcamera/pixel_format.h>
> >>>   
> >>> +#include "post_processor.h"
> >>> +
> >>>   class CameraDevice;
> >>>   class CameraMetadata;
> >>> -class PostProcessor;
> >>>   
> >>>   struct Camera3RequestDescriptor;
> >>>   
> >>> @@ -125,8 +130,38 @@ public:
> >>>   		     Camera3RequestDescriptor *request);
> >>>   	libcamera::FrameBuffer *getBuffer();
> >>>   	void putBuffer(libcamera::FrameBuffer *buffer);
> >>> +	void flush();
> >>>   
> >>>   private:
> >>> +	class PostProcessorWorker : public libcamera::Thread
> >>> +	{
> >>> +	public:
> >>> +		enum class State {
> >>> +			Stopped,
> >>> +			Running,
> >>> +			Flushing,
> >>> +		};
> >>> +
> >>> +		PostProcessorWorker(PostProcessor *postProcessor);
> >>> +		~PostProcessorWorker();
> >>> +
> >>> +		void start();
> >>> +		void queueRequest(Camera3RequestDescriptor *request);
> >>> +		void flush();
> >>> +
> >>> +	protected:
> >>> +		void run() override;
> >>> +
> >>> +	private:
> >>> +		PostProcessor *postProcessor_;
> >>> +
> >>> +		libcamera::Mutex mutex_;
> >>> +		std::condition_variable cv_;
> >>> +
> >>> +		std::queue<Camera3RequestDescriptor *> requests_;
> >>> +		State state_;
> >>> +	};
> >>> +
> >>>   	int waitFence(int fence);
> >>>   
> >>>   	CameraDevice *const cameraDevice_;
> >>> @@ -143,6 +178,8 @@ private:
> >>>   	 */
> >>>   	std::unique_ptr<std::mutex> mutex_;
> >>>   	std::unique_ptr<PostProcessor> postProcessor_;
> >>> +
> >>> +	std::unique_ptr<PostProcessorWorker> worker_;
> >>>   };
> >>>   
> >>>   #endif /* __ANDROID_CAMERA_STREAM__ */

Patch
diff mbox series

diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
index eba370ea..61b902ad 100644
--- a/src/android/camera_device.cpp
+++ b/src/android/camera_device.cpp
@@ -239,6 +239,7 @@  Camera3RequestDescriptor::Camera3RequestDescriptor(
 	/* Clone the controls associated with the camera3 request. */
 	settings_ = CameraMetadata(camera3Request->settings);
 
+	dest_.reset();
 	/*
 	 * Create the CaptureRequest, stored as a unique_ptr<> to tie its
 	 * lifetime to the descriptor.
@@ -1094,28 +1095,8 @@  int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
 
 void CameraDevice::requestComplete(Request *request)
 {
-	Camera3RequestDescriptor *descriptor;
-	{
-		MutexLocker descriptorsLock(descriptorsMutex_);
-		ASSERT(!descriptors_.empty());
-		descriptor = descriptors_.front().get();
-	}
-
-	if (descriptor->request_->cookie() != request->cookie()) {
-		/*
-		 * \todo Clarify if the Camera has to be closed on
-		 * ERROR_DEVICE and possibly demote the Fatal to simple
-		 * Error.
-		 */
-		notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
-		LOG(HAL, Fatal)
-			<< "Out-of-order completion for request "
-			<< utils::hex(request->cookie());
-
-		MutexLocker descriptorsLock(descriptorsMutex_);
-		descriptors_.pop();
-		return;
-	}
+	Camera3RequestDescriptor *descriptor =
+		reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
 
 	/*
 	 * Prepare the capture result for the Android camera stack.
diff --git a/src/android/camera_device.h b/src/android/camera_device.h
index eee97516..725a0618 100644
--- a/src/android/camera_device.h
+++ b/src/android/camera_device.h
@@ -59,6 +59,9 @@  struct Camera3RequestDescriptor {
 	std::unique_ptr<CameraMetadata> resultMetadata_;
 	libcamera::FrameBuffer *internalBuffer_;
 
+	std::unique_ptr<CameraBuffer> dest_;
+	const libcamera::FrameBuffer *src_;
+
 	camera3_capture_result_t captureResult_ = {};
 	Status status_ = Status::Pending;
 };
diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
index cec07269..818ef948 100644
--- a/src/android/camera_stream.cpp
+++ b/src/android/camera_stream.cpp
@@ -94,10 +94,12 @@  int CameraStream::configure()
 		if (ret)
 			return ret;
 
+		worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
 		postProcessor_->processComplete.connect(
 			this, [&](Camera3RequestDescriptor *request, PostProcessor::Status status) {
 				cameraDevice_->streamProcessingComplete(this, request, status);
 			});
+		worker_->start();
 	}
 
 	if (type_ == Type::Internal) {
@@ -167,19 +169,26 @@  void CameraStream::process(const FrameBuffer &source,
 	if (!postProcessor_)
 		return;
 
-	/*
-	 * \todo Buffer mapping and processing should be moved to a
-	 * separate thread.
-	 */
 	const StreamConfiguration &output = configuration();
-	CameraBuffer dest(*camera3Dest.buffer, output.pixelFormat, output.size,
-			  PROT_READ | PROT_WRITE);
-	if (!dest.isValid()) {
+	request->dest_ = std::make_unique<CameraBuffer>(
+		*camera3Dest.buffer, output.pixelFormat, output.size, PROT_READ | PROT_WRITE);
+	if (!request->dest_->isValid()) {
 		LOG(HAL, Error) << "Failed to create destination buffer";
 		return;
 	}
 
-	postProcessor_->process(source, &dest, request);
+	request->src_ = &source;
+
+	/* Push the postProcessor request to the worker queue. */
+	worker_->queueRequest(request);
+}
+
+void CameraStream::flush()
+{
+	if (!postProcessor_)
+		return;
+
+	worker_->flush();
 }
 
 FrameBuffer *CameraStream::getBuffer()
@@ -209,3 +218,86 @@  void CameraStream::putBuffer(FrameBuffer *buffer)
 
 	buffers_.push_back(buffer);
 }
+
+CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
+	: postProcessor_(postProcessor)
+{
+}
+
+CameraStream::PostProcessorWorker::~PostProcessorWorker()
+{
+	{
+		libcamera::MutexLocker lock(mutex_);
+		state_ = State::Stopped;
+	}
+
+	cv_.notify_one();
+	wait();
+}
+
+void CameraStream::PostProcessorWorker::start()
+{
+	{
+		libcamera::MutexLocker lock(mutex_);
+		state_ = State::Running;
+	}
+
+	Thread::start();
+}
+
+void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor *request)
+{
+	{
+		MutexLocker lock(mutex_);
+		ASSERT(state_ == State::Running);
+		requests_.push(request);
+	}
+	cv_.notify_one();
+}
+
+void CameraStream::PostProcessorWorker::run()
+{
+	MutexLocker locker(mutex_);
+
+	while (1) {
+		cv_.wait(locker, [&] {
+			return state_ != State::Running || !requests_.empty();
+		});
+
+		if (state_ != State::Running)
+			break;
+
+		Camera3RequestDescriptor *descriptor = requests_.front();
+		requests_.pop();
+		locker.unlock();
+
+		postProcessor_->process(*descriptor->src_, descriptor->dest_.get(),
+					descriptor);
+
+		locker.lock();
+	}
+
+	if (state_ == State::Flushing) {
+		while (!requests_.empty()) {
+			postProcessor_->processComplete.emit(requests_.front(),
+							     PostProcessor::Status::Error);
+			requests_.pop();
+		}
+		state_ = State::Stopped;
+		locker.unlock();
+		cv_.notify_one();
+	}
+}
+
+void CameraStream::PostProcessorWorker::flush()
+{
+	libcamera::MutexLocker lock(mutex_);
+	state_ = State::Flushing;
+	lock.unlock();
+	cv_.notify_one();
+
+	lock.lock();
+	cv_.wait(lock, [&] {
+		return state_ == State::Stopped;
+	});
+}
diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
index a0c5f166..e410f35d 100644
--- a/src/android/camera_stream.h
+++ b/src/android/camera_stream.h
@@ -7,21 +7,26 @@ 
 #ifndef __ANDROID_CAMERA_STREAM_H__
 #define __ANDROID_CAMERA_STREAM_H__
 
+#include <condition_variable>
 #include <memory>
 #include <mutex>
+#include <queue>
 #include <vector>
 
 #include <hardware/camera3.h>
 
+#include <libcamera/base/thread.h>
+
 #include <libcamera/camera.h>
 #include <libcamera/framebuffer.h>
 #include <libcamera/framebuffer_allocator.h>
 #include <libcamera/geometry.h>
 #include <libcamera/pixel_format.h>
 
+#include "post_processor.h"
+
 class CameraDevice;
 class CameraMetadata;
-class PostProcessor;
 
 struct Camera3RequestDescriptor;
 
@@ -125,8 +130,38 @@  public:
 		     Camera3RequestDescriptor *request);
 	libcamera::FrameBuffer *getBuffer();
 	void putBuffer(libcamera::FrameBuffer *buffer);
+	void flush();
 
 private:
+	class PostProcessorWorker : public libcamera::Thread
+	{
+	public:
+		enum class State {
+			Stopped,
+			Running,
+			Flushing,
+		};
+
+		PostProcessorWorker(PostProcessor *postProcessor);
+		~PostProcessorWorker();
+
+		void start();
+		void queueRequest(Camera3RequestDescriptor *request);
+		void flush();
+
+	protected:
+		void run() override;
+
+	private:
+		PostProcessor *postProcessor_;
+
+		libcamera::Mutex mutex_;
+		std::condition_variable cv_;
+
+		std::queue<Camera3RequestDescriptor *> requests_;
+		State state_;
+	};
+
 	int waitFence(int fence);
 
 	CameraDevice *const cameraDevice_;
@@ -143,6 +178,8 @@  private:
 	 */
 	std::unique_ptr<std::mutex> mutex_;
 	std::unique_ptr<PostProcessor> postProcessor_;
+
+	std::unique_ptr<PostProcessorWorker> worker_;
 };
 
 #endif /* __ANDROID_CAMERA_STREAM__ */