[libcamera-devel,v7,7/7] android: post_processor: Make post processing async
diff mbox series

Message ID 20211025203833.122460-8-umang.jain@ideasonboard.com
State Superseded
Delegated to: Umang Jain
Headers show
Series
  • Async Post Processor
Related show

Commit Message

Umang Jain Oct. 25, 2021, 8:38 p.m. UTC
Introduce a dedicated worker class derived from libcamera::Thread.
The worker class maintains a queue for post-processing requests
and waits for a post-processing request to become available.
It will process them as per FIFO before de-queuing it from the
queue.

The entire post-processing handling iteration is locked under
streamProcessMutex_ which helps us to queue all the post-processing
request at once, before any of the post-processing completion slot
(streamProcessingComplete()) is allowed to run for post-processing
requests completing in parallel. This helps us to manage both
synchronous and asynchronous errors encountered during the entire
post processing operation. Since a post-processing operation can
even complete after CameraDevice::requestComplete() has returned,
we need to check and complete the descriptor from
streamProcessingComplete() running in the PostProcessorWorker's
thread.

This patch also implements a flush() for the PostProcessorWorker
class which is responsible to purge post-processing requests
queued up while a camera is stopping/flushing. It is hooked with
CameraStream::flush(), which isn't used currently but will be
used when we handle flush/stop scenarios in greater detail
subsequently (in a different patchset).

The libcamera request completion handler CameraDevice::requestComplete()
assumes that the request that has just completed is at the front of the
queue. Now that the post-processor runs asynchronously, this isn't true
anymore, a request being post-processed will stay in the queue and a new
libcamera request may complete. Remove that assumption, and use the
request cookie to obtain the Camera3RequestDescriptor.

Signed-off-by: Umang Jain <umang.jain@ideasonboard.com>
Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
---
 src/android/camera_device.cpp |  44 ++++++---------
 src/android/camera_stream.cpp | 101 ++++++++++++++++++++++++++++++++--
 src/android/camera_stream.h   |  37 +++++++++++++
 3 files changed, 151 insertions(+), 31 deletions(-)

Comments

Laurent Pinchart Oct. 25, 2021, 10:05 p.m. UTC | #1
Hi Umang,

Thank you for the patch.

On Tue, Oct 26, 2021 at 02:08:33AM +0530, Umang Jain wrote:
> Introduce a dedicated worker class derived from libcamera::Thread.
> The worker class maintains a queue for post-processing requests
> and waits for a post-processing request to become available.
> It will process them as per FIFO before de-queuing it from the
> queue.
> 
> The entire post-processing handling iteration is locked under
> streamProcessMutex_ which helps us to queue all the post-processing

s/streamProcessMutex_/streamsProcessMutex_/

> request at once, before any of the post-processing completion slot
> (streamProcessingComplete()) is allowed to run for post-processing
> requests completing in parallel. This helps us to manage both
> synchronous and asynchronous errors encountered during the entire
> post processing operation. Since a post-processing operation can
> even complete after CameraDevice::requestComplete() has returned,
> we need to check and complete the descriptor from
> streamProcessingComplete() running in the PostProcessorWorker's
> thread.
> 
> This patch also implements a flush() for the PostProcessorWorker
> class which is responsible to purge post-processing requests
> queued up while a camera is stopping/flushing. It is hooked with
> CameraStream::flush(), which isn't used currently but will be
> used when we handle flush/stop scenarios in greater detail
> subsequently (in a different patchset).
> 
> The libcamera request completion handler CameraDevice::requestComplete()
> assumes that the request that has just completed is at the front of the
> queue. Now that the post-processor runs asynchronously, this isn't true
> anymore, a request being post-processed will stay in the queue and a new
> libcamera request may complete. Remove that assumption, and use the
> request cookie to obtain the Camera3RequestDescriptor.
> 
> Signed-off-by: Umang Jain <umang.jain@ideasonboard.com>
> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> ---
>  src/android/camera_device.cpp |  44 ++++++---------
>  src/android/camera_stream.cpp | 101 ++++++++++++++++++++++++++++++++--
>  src/android/camera_stream.h   |  37 +++++++++++++
>  3 files changed, 151 insertions(+), 31 deletions(-)
> 
> diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
> index 3ded0f7e..53ebe0ea 100644
> --- a/src/android/camera_device.cpp
> +++ b/src/android/camera_device.cpp
> @@ -1027,29 +1027,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
>  
>  void CameraDevice::requestComplete(Request *request)
>  {
> -	Camera3RequestDescriptor *descriptor;
> -	{
> -		MutexLocker descriptorsLock(descriptorsMutex_);
> -		ASSERT(!descriptors_.empty());
> -		descriptor = descriptors_.front().get();
> -	}
> -
> -	if (descriptor->request_->cookie() != request->cookie()) {
> -		/*
> -		 * \todo Clarify if the Camera has to be closed on
> -		 * ERROR_DEVICE.
> -		 */
> -		LOG(HAL, Error)
> -			<< "Out-of-order completion for request "
> -			<< utils::hex(request->cookie());
> -
> -		MutexLocker descriptorsLock(descriptorsMutex_);
> -		descriptors_.pop();
> -
> -		notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
> -
> -		return;
> -	}
> +	Camera3RequestDescriptor *descriptor =
> +		reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
>  
>  	/*
>  	 * Prepare the capture result for the Android camera stack.
> @@ -1124,9 +1103,13 @@ void CameraDevice::requestComplete(Request *request)
>  	}
>  
>  	/* Handle post-processing. */
> +	MutexLocker locker(descriptor->streamsProcessMutex_);
> +
>  	/*
> -	 * \todo Protect the loop below with streamProcessMutex_ when post

In the patch that introduced this,

s/streamProcessMutex_/streamsProcessMutex_/

> -	 * processor runs asynchronously.
> +	 * Queue all the post-processing streams request at once. The completion
> +	 * slot streamProcessingComplete() can only execute when we are out
> +	 * this critical section. This helps to handle synchronous errors here
> +	 * itself.
>  	 */
>  	auto iter = descriptor->pendingStreamsToProcess_.begin();
>  	while (iter != descriptor->pendingStreamsToProcess_.end()) {
> @@ -1158,8 +1141,10 @@ void CameraDevice::requestComplete(Request *request)
>  		}
>  	}
>  
> -	if (descriptor->pendingStreamsToProcess_.empty())
> +	if (descriptor->pendingStreamsToProcess_.empty()) {
> +		locker.unlock();
>  		completeDescriptor(descriptor);
> +	}
>  }
>  
>  void CameraDevice::completeDescriptor(Camera3RequestDescriptor *descriptor)
> @@ -1245,6 +1230,13 @@ void CameraDevice::streamProcessingComplete(Camera3RequestDescriptor::StreamBuff
>  	MutexLocker locker(request->streamsProcessMutex_);
>  
>  	request->pendingStreamsToProcess_.erase(streamBuffer->stream);
> +
> +	if (!request->pendingStreamsToProcess_.empty())
> +		return;
> +
> +	locker.unlock();

Maybe

	{
		MutexLocker locker(request->streamsProcessMutex_);

		request->pendingStreamsToProcess_.erase(streamBuffer->stream);
		if (!request->pendingStreamsToProcess_.empty())
			return;
	}

up to you.

> +
> +	completeDescriptor(streamBuffer->request);
>  }
>  
>  std::string CameraDevice::logPrefix() const
> diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
> index fed99022..9023c13c 100644
> --- a/src/android/camera_stream.cpp
> +++ b/src/android/camera_stream.cpp
> @@ -99,6 +99,7 @@ int CameraStream::configure()
>  		if (ret)
>  			return ret;
>  
> +		worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
>  		postProcessor_->processComplete.connect(
>  			this, [&](Camera3RequestDescriptor::StreamBuffer *streamBuffer,
>  				  PostProcessor::Status status) {
> @@ -112,6 +113,8 @@ int CameraStream::configure()
>  				cameraDevice_->streamProcessingComplete(streamBuffer,
>  									bufferStatus);
>  			});
> +
> +		worker_->start();
>  	}
>  
>  	if (type_ == Type::Internal) {
> @@ -178,10 +181,6 @@ int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer)
>  		streamBuffer->fence = -1;
>  	}
>  
> -	/*
> -	 * \todo Buffer mapping and processing should be moved to a
> -	 * separate thread.
> -	 */
>  	const StreamConfiguration &output = configuration();
>  	streamBuffer->dstBuffer = std::make_unique<CameraBuffer>(
>  		*streamBuffer->camera3Buffer, output.pixelFormat, output.size,
> @@ -191,11 +190,19 @@ int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer)
>  		return -EINVAL;
>  	}
>  
> -	postProcessor_->process(streamBuffer);
> +	worker_->queueRequest(streamBuffer);
>  
>  	return 0;
>  }
>  
> +void CameraStream::flush()
> +{
> +	if (!postProcessor_)
> +		return;
> +
> +	worker_->flush();
> +}
> +
>  FrameBuffer *CameraStream::getBuffer()
>  {
>  	if (!allocator_)
> @@ -223,3 +230,87 @@ void CameraStream::putBuffer(FrameBuffer *buffer)
>  
>  	buffers_.push_back(buffer);
>  }
> +
> +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
> +	: postProcessor_(postProcessor)
> +{
> +}
> +
> +CameraStream::PostProcessorWorker::~PostProcessorWorker()
> +{
> +	{
> +		libcamera::MutexLocker lock(mutex_);
> +		state_ = State::Stopped;
> +	}
> +
> +	cv_.notify_one();
> +	wait();
> +}
> +
> +void CameraStream::PostProcessorWorker::start()
> +{
> +	{
> +		libcamera::MutexLocker lock(mutex_);
> +		ASSERT(state_ != State::Running);
> +		state_ = State::Running;
> +	}
> +
> +	Thread::start();
> +}
> +
> +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest)
> +{
> +	{
> +		MutexLocker lock(mutex_);
> +		ASSERT(state_ == State::Running);
> +		requests_.push(dest);
> +	}
> +
> +	cv_.notify_one();
> +}
> +
> +void CameraStream::PostProcessorWorker::run()
> +{
> +	MutexLocker locker(mutex_);
> +
> +	while (1) {
> +		cv_.wait(locker, [&] {
> +			return state_ != State::Running || !requests_.empty();
> +		});
> +
> +		if (state_ != State::Running)
> +			break;
> +
> +		Camera3RequestDescriptor::StreamBuffer *streamBuffer = requests_.front();
> +		requests_.pop();
> +		locker.unlock();
> +
> +		postProcessor_->process(streamBuffer);
> +
> +		locker.lock();
> +	}
> +
> +	if (state_ == State::Flushing) {
> +		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests =
> +			std::move(requests_);
> +		locker.unlock();
> +
> +		while (!requests.empty()) {
> +			postProcessor_->processComplete.emit(
> +				requests.front(), PostProcessor::Status::Error);
> +			requests.pop();
> +		}
> +
> +		locker.lock();
> +		state_ = State::Stopped;
> +	}
> +}
> +
> +void CameraStream::PostProcessorWorker::flush()
> +{
> +	libcamera::MutexLocker lock(mutex_);
> +	state_ = State::Flushing;
> +	lock.unlock();
> +
> +	cv_.notify_one();
> +}
> diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
> index e74a9a3b..1588938a 100644
> --- a/src/android/camera_stream.h
> +++ b/src/android/camera_stream.h
> @@ -7,12 +7,16 @@
>  #ifndef __ANDROID_CAMERA_STREAM_H__
>  #define __ANDROID_CAMERA_STREAM_H__
>  
> +#include <condition_variable>
>  #include <memory>
>  #include <mutex>
> +#include <queue>
>  #include <vector>
>  
>  #include <hardware/camera3.h>
>  
> +#include <libcamera/base/thread.h>
> +
>  #include <libcamera/camera.h>
>  #include <libcamera/framebuffer.h>
>  #include <libcamera/framebuffer_allocator.h>
> @@ -20,6 +24,7 @@
>  #include <libcamera/pixel_format.h>
>  
>  #include "camera_request.h"
> +#include "post_processor.h"
>  
>  class CameraDevice;
>  class PostProcessor;

You can drop the forward declaration.

> @@ -124,8 +129,38 @@ public:
>  	int process(Camera3RequestDescriptor::StreamBuffer *streamBuffer);
>  	libcamera::FrameBuffer *getBuffer();
>  	void putBuffer(libcamera::FrameBuffer *buffer);
> +	void flush();
>  
>  private:
> +	class PostProcessorWorker : public libcamera::Thread
> +	{
> +	public:
> +		enum class State {
> +			Stopped,
> +			Running,
> +			Flushing,
> +		};
> +
> +		PostProcessorWorker(PostProcessor *postProcessor);
> +		~PostProcessorWorker();
> +
> +		void start();
> +		void queueRequest(Camera3RequestDescriptor::StreamBuffer *request);
> +		void flush();
> +
> +	protected:
> +		void run() override;
> +
> +	private:
> +		PostProcessor *postProcessor_;
> +
> +		libcamera::Mutex mutex_;
> +		std::condition_variable cv_;
> +
> +		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_;
> +		State state_ = State::Stopped;
> +	};
> +
>  	int waitFence(int fence);
>  
>  	CameraDevice *const cameraDevice_;
> @@ -142,6 +177,8 @@ private:
>  	 */
>  	std::unique_ptr<std::mutex> mutex_;
>  	std::unique_ptr<PostProcessor> postProcessor_;
> +
> +	std::unique_ptr<PostProcessorWorker> worker_;
>  };
>  
>  #endif /* __ANDROID_CAMERA_STREAM__ */
Hirokazu Honda Oct. 26, 2021, 1:12 a.m. UTC | #2
Hi Umang, thank you for the patch.

On Tue, Oct 26, 2021 at 7:05 AM Laurent Pinchart
<laurent.pinchart@ideasonboard.com> wrote:
>
> Hi Umang,
>
> Thank you for the patch.
>
> On Tue, Oct 26, 2021 at 02:08:33AM +0530, Umang Jain wrote:
> > Introduce a dedicated worker class derived from libcamera::Thread.
> > The worker class maintains a queue for post-processing requests
> > and waits for a post-processing request to become available.
> > It will process them as per FIFO before de-queuing it from the
> > queue.
> >
> > The entire post-processing handling iteration is locked under
> > streamProcessMutex_ which helps us to queue all the post-processing
>
> s/streamProcessMutex_/streamsProcessMutex_/
>
> > request at once, before any of the post-processing completion slot
> > (streamProcessingComplete()) is allowed to run for post-processing
> > requests completing in parallel. This helps us to manage both
> > synchronous and asynchronous errors encountered during the entire
> > post processing operation. Since a post-processing operation can
> > even complete after CameraDevice::requestComplete() has returned,
> > we need to check and complete the descriptor from
> > streamProcessingComplete() running in the PostProcessorWorker's
> > thread.
> >
> > This patch also implements a flush() for the PostProcessorWorker
> > class which is responsible to purge post-processing requests
> > queued up while a camera is stopping/flushing. It is hooked with
> > CameraStream::flush(), which isn't used currently but will be
> > used when we handle flush/stop scenarios in greater detail
> > subsequently (in a different patchset).
> >
> > The libcamera request completion handler CameraDevice::requestComplete()
> > assumes that the request that has just completed is at the front of the
> > queue. Now that the post-processor runs asynchronously, this isn't true
> > anymore, a request being post-processed will stay in the queue and a new
> > libcamera request may complete. Remove that assumption, and use the
> > request cookie to obtain the Camera3RequestDescriptor.
> >
> > Signed-off-by: Umang Jain <umang.jain@ideasonboard.com>
> > Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>

Reviewed-by: Hirokazu Honda <hiroh@chromium.org>
> > ---
> >  src/android/camera_device.cpp |  44 ++++++---------
> >  src/android/camera_stream.cpp | 101 ++++++++++++++++++++++++++++++++--
> >  src/android/camera_stream.h   |  37 +++++++++++++
> >  3 files changed, 151 insertions(+), 31 deletions(-)
> >
> > diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
> > index 3ded0f7e..53ebe0ea 100644
> > --- a/src/android/camera_device.cpp
> > +++ b/src/android/camera_device.cpp
> > @@ -1027,29 +1027,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
> >
> >  void CameraDevice::requestComplete(Request *request)
> >  {
> > -     Camera3RequestDescriptor *descriptor;
> > -     {
> > -             MutexLocker descriptorsLock(descriptorsMutex_);
> > -             ASSERT(!descriptors_.empty());
> > -             descriptor = descriptors_.front().get();
> > -     }
> > -
> > -     if (descriptor->request_->cookie() != request->cookie()) {
> > -             /*
> > -              * \todo Clarify if the Camera has to be closed on
> > -              * ERROR_DEVICE.
> > -              */
> > -             LOG(HAL, Error)
> > -                     << "Out-of-order completion for request "
> > -                     << utils::hex(request->cookie());
> > -
> > -             MutexLocker descriptorsLock(descriptorsMutex_);
> > -             descriptors_.pop();
> > -
> > -             notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
> > -
> > -             return;
> > -     }
> > +     Camera3RequestDescriptor *descriptor =
> > +             reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
> >
> >       /*
> >        * Prepare the capture result for the Android camera stack.
> > @@ -1124,9 +1103,13 @@ void CameraDevice::requestComplete(Request *request)
> >       }
> >
> >       /* Handle post-processing. */
> > +     MutexLocker locker(descriptor->streamsProcessMutex_);
> > +
> >       /*
> > -      * \todo Protect the loop below with streamProcessMutex_ when post
>
> In the patch that introduced this,
>
> s/streamProcessMutex_/streamsProcessMutex_/
>
> > -      * processor runs asynchronously.
> > +      * Queue all the post-processing streams request at once. The completion
> > +      * slot streamProcessingComplete() can only execute when we are out
> > +      * this critical section. This helps to handle synchronous errors here
> > +      * itself.
> >        */
> >       auto iter = descriptor->pendingStreamsToProcess_.begin();
> >       while (iter != descriptor->pendingStreamsToProcess_.end()) {
> > @@ -1158,8 +1141,10 @@ void CameraDevice::requestComplete(Request *request)
> >               }
> >       }
> >
> > -     if (descriptor->pendingStreamsToProcess_.empty())
> > +     if (descriptor->pendingStreamsToProcess_.empty()) {
> > +             locker.unlock();
> >               completeDescriptor(descriptor);
> > +     }
> >  }
> >
> >  void CameraDevice::completeDescriptor(Camera3RequestDescriptor *descriptor)
> > @@ -1245,6 +1230,13 @@ void CameraDevice::streamProcessingComplete(Camera3RequestDescriptor::StreamBuff
> >       MutexLocker locker(request->streamsProcessMutex_);
> >
> >       request->pendingStreamsToProcess_.erase(streamBuffer->stream);
> > +
> > +     if (!request->pendingStreamsToProcess_.empty())
> > +             return;
> > +
> > +     locker.unlock();
>
> Maybe
>
>         {
>                 MutexLocker locker(request->streamsProcessMutex_);
>
>                 request->pendingStreamsToProcess_.erase(streamBuffer->stream);
>                 if (!request->pendingStreamsToProcess_.empty())
>                         return;
>         }
>
> up to you.
>
> > +
> > +     completeDescriptor(streamBuffer->request);
> >  }
> >
> >  std::string CameraDevice::logPrefix() const
> > diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
> > index fed99022..9023c13c 100644
> > --- a/src/android/camera_stream.cpp
> > +++ b/src/android/camera_stream.cpp
> > @@ -99,6 +99,7 @@ int CameraStream::configure()
> >               if (ret)
> >                       return ret;
> >
> > +             worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
> >               postProcessor_->processComplete.connect(
> >                       this, [&](Camera3RequestDescriptor::StreamBuffer *streamBuffer,
> >                                 PostProcessor::Status status) {
> > @@ -112,6 +113,8 @@ int CameraStream::configure()
> >                               cameraDevice_->streamProcessingComplete(streamBuffer,
> >                                                                       bufferStatus);
> >                       });
> > +
> > +             worker_->start();
> >       }
> >
> >       if (type_ == Type::Internal) {
> > @@ -178,10 +181,6 @@ int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer)
> >               streamBuffer->fence = -1;
> >       }
> >
> > -     /*
> > -      * \todo Buffer mapping and processing should be moved to a
> > -      * separate thread.
> > -      */
> >       const StreamConfiguration &output = configuration();
> >       streamBuffer->dstBuffer = std::make_unique<CameraBuffer>(
> >               *streamBuffer->camera3Buffer, output.pixelFormat, output.size,
> > @@ -191,11 +190,19 @@ int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer)
> >               return -EINVAL;
> >       }
> >
> > -     postProcessor_->process(streamBuffer);
> > +     worker_->queueRequest(streamBuffer);
> >
> >       return 0;
> >  }
> >
> > +void CameraStream::flush()
> > +{
> > +     if (!postProcessor_)
> > +             return;
> > +
> > +     worker_->flush();
> > +}
> > +
> >  FrameBuffer *CameraStream::getBuffer()
> >  {
> >       if (!allocator_)
> > @@ -223,3 +230,87 @@ void CameraStream::putBuffer(FrameBuffer *buffer)
> >
> >       buffers_.push_back(buffer);
> >  }
> > +
> > +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
> > +     : postProcessor_(postProcessor)
> > +{
> > +}
> > +
> > +CameraStream::PostProcessorWorker::~PostProcessorWorker()
> > +{
> > +     {
> > +             libcamera::MutexLocker lock(mutex_);
> > +             state_ = State::Stopped;
> > +     }
> > +
> > +     cv_.notify_one();
> > +     wait();
> > +}
> > +
> > +void CameraStream::PostProcessorWorker::start()
> > +{
> > +     {
> > +             libcamera::MutexLocker lock(mutex_);
> > +             ASSERT(state_ != State::Running);
> > +             state_ = State::Running;
> > +     }
> > +
> > +     Thread::start();
> > +}
> > +
> > +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest)
> > +{
> > +     {
> > +             MutexLocker lock(mutex_);
> > +             ASSERT(state_ == State::Running);
> > +             requests_.push(dest);
> > +     }
> > +
> > +     cv_.notify_one();
> > +}
> > +
> > +void CameraStream::PostProcessorWorker::run()
> > +{
> > +     MutexLocker locker(mutex_);
> > +
> > +     while (1) {
> > +             cv_.wait(locker, [&] {
> > +                     return state_ != State::Running || !requests_.empty();
> > +             });
> > +
> > +             if (state_ != State::Running)
> > +                     break;
> > +
> > +             Camera3RequestDescriptor::StreamBuffer *streamBuffer = requests_.front();
> > +             requests_.pop();
> > +             locker.unlock();
> > +
> > +             postProcessor_->process(streamBuffer);
> > +
> > +             locker.lock();
> > +     }
> > +
> > +     if (state_ == State::Flushing) {
> > +             std::queue<Camera3RequestDescriptor::StreamBuffer *> requests =
> > +                     std::move(requests_);
> > +             locker.unlock();
> > +
> > +             while (!requests.empty()) {
> > +                     postProcessor_->processComplete.emit(
> > +                             requests.front(), PostProcessor::Status::Error);
> > +                     requests.pop();
> > +             }
> > +
> > +             locker.lock();
> > +             state_ = State::Stopped;
> > +     }
> > +}
> > +
> > +void CameraStream::PostProcessorWorker::flush()
> > +{
> > +     libcamera::MutexLocker lock(mutex_);
> > +     state_ = State::Flushing;
> > +     lock.unlock();
> > +
> > +     cv_.notify_one();
> > +}
> > diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
> > index e74a9a3b..1588938a 100644
> > --- a/src/android/camera_stream.h
> > +++ b/src/android/camera_stream.h
> > @@ -7,12 +7,16 @@
> >  #ifndef __ANDROID_CAMERA_STREAM_H__
> >  #define __ANDROID_CAMERA_STREAM_H__
> >
> > +#include <condition_variable>
> >  #include <memory>
> >  #include <mutex>
> > +#include <queue>
> >  #include <vector>
> >
> >  #include <hardware/camera3.h>
> >
> > +#include <libcamera/base/thread.h>
> > +
> >  #include <libcamera/camera.h>
> >  #include <libcamera/framebuffer.h>
> >  #include <libcamera/framebuffer_allocator.h>
> > @@ -20,6 +24,7 @@
> >  #include <libcamera/pixel_format.h>
> >
> >  #include "camera_request.h"
> > +#include "post_processor.h"
> >
> >  class CameraDevice;
> >  class PostProcessor;
>
> You can drop the forward declaration.
>
> > @@ -124,8 +129,38 @@ public:
> >       int process(Camera3RequestDescriptor::StreamBuffer *streamBuffer);
> >       libcamera::FrameBuffer *getBuffer();
> >       void putBuffer(libcamera::FrameBuffer *buffer);
> > +     void flush();
> >
> >  private:
> > +     class PostProcessorWorker : public libcamera::Thread
> > +     {
> > +     public:
> > +             enum class State {
> > +                     Stopped,
> > +                     Running,
> > +                     Flushing,
> > +             };
> > +
> > +             PostProcessorWorker(PostProcessor *postProcessor);
> > +             ~PostProcessorWorker();
> > +
> > +             void start();
> > +             void queueRequest(Camera3RequestDescriptor::StreamBuffer *request);
> > +             void flush();
> > +
> > +     protected:
> > +             void run() override;
> > +
> > +     private:
> > +             PostProcessor *postProcessor_;
> > +
> > +             libcamera::Mutex mutex_;
> > +             std::condition_variable cv_;
> > +
> > +             std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_;
> > +             State state_ = State::Stopped;
> > +     };
> > +
> >       int waitFence(int fence);
> >
> >       CameraDevice *const cameraDevice_;
> > @@ -142,6 +177,8 @@ private:
> >        */
> >       std::unique_ptr<std::mutex> mutex_;
> >       std::unique_ptr<PostProcessor> postProcessor_;
> > +
> > +     std::unique_ptr<PostProcessorWorker> worker_;
> >  };
> >
> >  #endif /* __ANDROID_CAMERA_STREAM__ */
>
> --
> Regards,
>
> Laurent Pinchart
Umang Jain Oct. 26, 2021, 4:39 a.m. UTC | #3
Hi Laurent,

On 10/26/21 3:35 AM, Laurent Pinchart wrote:
> Hi Umang,
>
> Thank you for the patch.
>
> On Tue, Oct 26, 2021 at 02:08:33AM +0530, Umang Jain wrote:
>> Introduce a dedicated worker class derived from libcamera::Thread.
>> The worker class maintains a queue for post-processing requests
>> and waits for a post-processing request to become available.
>> It will process them as per FIFO before de-queuing it from the
>> queue.
>>
>> The entire post-processing handling iteration is locked under
>> streamProcessMutex_ which helps us to queue all the post-processing
> s/streamProcessMutex_/streamsProcessMutex_/
>
>> request at once, before any of the post-processing completion slot
>> (streamProcessingComplete()) is allowed to run for post-processing
>> requests completing in parallel. This helps us to manage both
>> synchronous and asynchronous errors encountered during the entire
>> post processing operation. Since a post-processing operation can
>> even complete after CameraDevice::requestComplete() has returned,
>> we need to check and complete the descriptor from
>> streamProcessingComplete() running in the PostProcessorWorker's
>> thread.
>>
>> This patch also implements a flush() for the PostProcessorWorker
>> class which is responsible to purge post-processing requests
>> queued up while a camera is stopping/flushing. It is hooked with
>> CameraStream::flush(), which isn't used currently but will be
>> used when we handle flush/stop scenarios in greater detail
>> subsequently (in a different patchset).
>>
>> The libcamera request completion handler CameraDevice::requestComplete()
>> assumes that the request that has just completed is at the front of the
>> queue. Now that the post-processor runs asynchronously, this isn't true
>> anymore, a request being post-processed will stay in the queue and a new
>> libcamera request may complete. Remove that assumption, and use the
>> request cookie to obtain the Camera3RequestDescriptor.
>>
>> Signed-off-by: Umang Jain <umang.jain@ideasonboard.com>
>> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
>> ---
>>   src/android/camera_device.cpp |  44 ++++++---------
>>   src/android/camera_stream.cpp | 101 ++++++++++++++++++++++++++++++++--
>>   src/android/camera_stream.h   |  37 +++++++++++++
>>   3 files changed, 151 insertions(+), 31 deletions(-)
>>
>> diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
>> index 3ded0f7e..53ebe0ea 100644
>> --- a/src/android/camera_device.cpp
>> +++ b/src/android/camera_device.cpp
>> @@ -1027,29 +1027,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
>>   
>>   void CameraDevice::requestComplete(Request *request)
>>   {
>> -	Camera3RequestDescriptor *descriptor;
>> -	{
>> -		MutexLocker descriptorsLock(descriptorsMutex_);
>> -		ASSERT(!descriptors_.empty());
>> -		descriptor = descriptors_.front().get();
>> -	}
>> -
>> -	if (descriptor->request_->cookie() != request->cookie()) {
>> -		/*
>> -		 * \todo Clarify if the Camera has to be closed on
>> -		 * ERROR_DEVICE.
>> -		 */
>> -		LOG(HAL, Error)
>> -			<< "Out-of-order completion for request "
>> -			<< utils::hex(request->cookie());
>> -
>> -		MutexLocker descriptorsLock(descriptorsMutex_);
>> -		descriptors_.pop();
>> -
>> -		notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
>> -
>> -		return;
>> -	}
>> +	Camera3RequestDescriptor *descriptor =
>> +		reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
>>   
>>   	/*
>>   	 * Prepare the capture result for the Android camera stack.
>> @@ -1124,9 +1103,13 @@ void CameraDevice::requestComplete(Request *request)
>>   	}
>>   
>>   	/* Handle post-processing. */
>> +	MutexLocker locker(descriptor->streamsProcessMutex_);
>> +
>>   	/*
>> -	 * \todo Protect the loop below with streamProcessMutex_ when post
> In the patch that introduced this,
>
> s/streamProcessMutex_/streamsProcessMutex_/
>
>> -	 * processor runs asynchronously.
>> +	 * Queue all the post-processing streams request at once. The completion
>> +	 * slot streamProcessingComplete() can only execute when we are out
>> +	 * this critical section. This helps to handle synchronous errors here
>> +	 * itself.
>>   	 */
>>   	auto iter = descriptor->pendingStreamsToProcess_.begin();
>>   	while (iter != descriptor->pendingStreamsToProcess_.end()) {
>> @@ -1158,8 +1141,10 @@ void CameraDevice::requestComplete(Request *request)
>>   		}
>>   	}
>>   
>> -	if (descriptor->pendingStreamsToProcess_.empty())
>> +	if (descriptor->pendingStreamsToProcess_.empty()) {
>> +		locker.unlock();
>>   		completeDescriptor(descriptor);
>> +	}
>>   }
>>   
>>   void CameraDevice::completeDescriptor(Camera3RequestDescriptor *descriptor)
>> @@ -1245,6 +1230,13 @@ void CameraDevice::streamProcessingComplete(Camera3RequestDescriptor::StreamBuff
>>   	MutexLocker locker(request->streamsProcessMutex_);
>>   
>>   	request->pendingStreamsToProcess_.erase(streamBuffer->stream);
>> +
>> +	if (!request->pendingStreamsToProcess_.empty())
>> +		return;
>> +
>> +	locker.unlock();
> Maybe
>
> 	{
> 		MutexLocker locker(request->streamsProcessMutex_);
>
> 		request->pendingStreamsToProcess_.erase(streamBuffer->stream);
> 		if (!request->pendingStreamsToProcess_.empty())
> 			return;
> 	}
>
> up to you.


Ok, yes, this looks cleaner

>
>> +
>> +	completeDescriptor(streamBuffer->request);
>>   }
>>   
>>   std::string CameraDevice::logPrefix() const
>> diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
>> index fed99022..9023c13c 100644
>> --- a/src/android/camera_stream.cpp
>> +++ b/src/android/camera_stream.cpp
>> @@ -99,6 +99,7 @@ int CameraStream::configure()
>>   		if (ret)
>>   			return ret;
>>   
>> +		worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
>>   		postProcessor_->processComplete.connect(
>>   			this, [&](Camera3RequestDescriptor::StreamBuffer *streamBuffer,
>>   				  PostProcessor::Status status) {
>> @@ -112,6 +113,8 @@ int CameraStream::configure()
>>   				cameraDevice_->streamProcessingComplete(streamBuffer,
>>   									bufferStatus);
>>   			});
>> +
>> +		worker_->start();
>>   	}
>>   
>>   	if (type_ == Type::Internal) {
>> @@ -178,10 +181,6 @@ int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer)
>>   		streamBuffer->fence = -1;
>>   	}
>>   
>> -	/*
>> -	 * \todo Buffer mapping and processing should be moved to a
>> -	 * separate thread.
>> -	 */
>>   	const StreamConfiguration &output = configuration();
>>   	streamBuffer->dstBuffer = std::make_unique<CameraBuffer>(
>>   		*streamBuffer->camera3Buffer, output.pixelFormat, output.size,
>> @@ -191,11 +190,19 @@ int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer)
>>   		return -EINVAL;
>>   	}
>>   
>> -	postProcessor_->process(streamBuffer);
>> +	worker_->queueRequest(streamBuffer);
>>   
>>   	return 0;
>>   }
>>   
>> +void CameraStream::flush()
>> +{
>> +	if (!postProcessor_)
>> +		return;
>> +
>> +	worker_->flush();
>> +}
>> +
>>   FrameBuffer *CameraStream::getBuffer()
>>   {
>>   	if (!allocator_)
>> @@ -223,3 +230,87 @@ void CameraStream::putBuffer(FrameBuffer *buffer)
>>   
>>   	buffers_.push_back(buffer);
>>   }
>> +
>> +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
>> +	: postProcessor_(postProcessor)
>> +{
>> +}
>> +
>> +CameraStream::PostProcessorWorker::~PostProcessorWorker()
>> +{
>> +	{
>> +		libcamera::MutexLocker lock(mutex_);
>> +		state_ = State::Stopped;
>> +	}
>> +
>> +	cv_.notify_one();
>> +	wait();
>> +}
>> +
>> +void CameraStream::PostProcessorWorker::start()
>> +{
>> +	{
>> +		libcamera::MutexLocker lock(mutex_);
>> +		ASSERT(state_ != State::Running);
>> +		state_ = State::Running;
>> +	}
>> +
>> +	Thread::start();
>> +}
>> +
>> +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest)
>> +{
>> +	{
>> +		MutexLocker lock(mutex_);
>> +		ASSERT(state_ == State::Running);
>> +		requests_.push(dest);
>> +	}
>> +
>> +	cv_.notify_one();
>> +}
>> +
>> +void CameraStream::PostProcessorWorker::run()
>> +{
>> +	MutexLocker locker(mutex_);
>> +
>> +	while (1) {
>> +		cv_.wait(locker, [&] {
>> +			return state_ != State::Running || !requests_.empty();
>> +		});
>> +
>> +		if (state_ != State::Running)
>> +			break;
>> +
>> +		Camera3RequestDescriptor::StreamBuffer *streamBuffer = requests_.front();
>> +		requests_.pop();
>> +		locker.unlock();
>> +
>> +		postProcessor_->process(streamBuffer);
>> +
>> +		locker.lock();
>> +	}
>> +
>> +	if (state_ == State::Flushing) {
>> +		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests =
>> +			std::move(requests_);
>> +		locker.unlock();
>> +
>> +		while (!requests.empty()) {
>> +			postProcessor_->processComplete.emit(
>> +				requests.front(), PostProcessor::Status::Error);
>> +			requests.pop();
>> +		}
>> +
>> +		locker.lock();
>> +		state_ = State::Stopped;
>> +	}
>> +}
>> +
>> +void CameraStream::PostProcessorWorker::flush()
>> +{
>> +	libcamera::MutexLocker lock(mutex_);
>> +	state_ = State::Flushing;
>> +	lock.unlock();
>> +
>> +	cv_.notify_one();
>> +}
>> diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
>> index e74a9a3b..1588938a 100644
>> --- a/src/android/camera_stream.h
>> +++ b/src/android/camera_stream.h
>> @@ -7,12 +7,16 @@
>>   #ifndef __ANDROID_CAMERA_STREAM_H__
>>   #define __ANDROID_CAMERA_STREAM_H__
>>   
>> +#include <condition_variable>
>>   #include <memory>
>>   #include <mutex>
>> +#include <queue>
>>   #include <vector>
>>   
>>   #include <hardware/camera3.h>
>>   
>> +#include <libcamera/base/thread.h>
>> +
>>   #include <libcamera/camera.h>
>>   #include <libcamera/framebuffer.h>
>>   #include <libcamera/framebuffer_allocator.h>
>> @@ -20,6 +24,7 @@
>>   #include <libcamera/pixel_format.h>
>>   
>>   #include "camera_request.h"
>> +#include "post_processor.h"
>>   
>>   class CameraDevice;
>>   class PostProcessor;
> You can drop the forward declaration.


Ouch, What happens when you manually cherry-pick patches, because 
conflicts were too hard to deal with :-/

I will address these changes locally, should  I re-post a new version v8 
with those changes?

>
>> @@ -124,8 +129,38 @@ public:
>>   	int process(Camera3RequestDescriptor::StreamBuffer *streamBuffer);
>>   	libcamera::FrameBuffer *getBuffer();
>>   	void putBuffer(libcamera::FrameBuffer *buffer);
>> +	void flush();
>>   
>>   private:
>> +	class PostProcessorWorker : public libcamera::Thread
>> +	{
>> +	public:
>> +		enum class State {
>> +			Stopped,
>> +			Running,
>> +			Flushing,
>> +		};
>> +
>> +		PostProcessorWorker(PostProcessor *postProcessor);
>> +		~PostProcessorWorker();
>> +
>> +		void start();
>> +		void queueRequest(Camera3RequestDescriptor::StreamBuffer *request);
>> +		void flush();
>> +
>> +	protected:
>> +		void run() override;
>> +
>> +	private:
>> +		PostProcessor *postProcessor_;
>> +
>> +		libcamera::Mutex mutex_;
>> +		std::condition_variable cv_;
>> +
>> +		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_;
>> +		State state_ = State::Stopped;
>> +	};
>> +
>>   	int waitFence(int fence);
>>   
>>   	CameraDevice *const cameraDevice_;
>> @@ -142,6 +177,8 @@ private:
>>   	 */
>>   	std::unique_ptr<std::mutex> mutex_;
>>   	std::unique_ptr<PostProcessor> postProcessor_;
>> +
>> +	std::unique_ptr<PostProcessorWorker> worker_;
>>   };
>>   
>>   #endif /* __ANDROID_CAMERA_STREAM__ */
Laurent Pinchart Oct. 26, 2021, 7:20 a.m. UTC | #4
Hi Umang,

On Tue, Oct 26, 2021 at 10:09:22AM +0530, Umang Jain wrote:
> On 10/26/21 3:35 AM, Laurent Pinchart wrote:
> > On Tue, Oct 26, 2021 at 02:08:33AM +0530, Umang Jain wrote:
> >> Introduce a dedicated worker class derived from libcamera::Thread.
> >> The worker class maintains a queue for post-processing requests
> >> and waits for a post-processing request to become available.
> >> It will process them as per FIFO before de-queuing it from the
> >> queue.
> >>
> >> The entire post-processing handling iteration is locked under
> >> streamProcessMutex_ which helps us to queue all the post-processing
> > s/streamProcessMutex_/streamsProcessMutex_/
> >
> >> request at once, before any of the post-processing completion slot
> >> (streamProcessingComplete()) is allowed to run for post-processing
> >> requests completing in parallel. This helps us to manage both
> >> synchronous and asynchronous errors encountered during the entire
> >> post processing operation. Since a post-processing operation can
> >> even complete after CameraDevice::requestComplete() has returned,
> >> we need to check and complete the descriptor from
> >> streamProcessingComplete() running in the PostProcessorWorker's
> >> thread.
> >>
> >> This patch also implements a flush() for the PostProcessorWorker
> >> class which is responsible to purge post-processing requests
> >> queued up while a camera is stopping/flushing. It is hooked with
> >> CameraStream::flush(), which isn't used currently but will be
> >> used when we handle flush/stop scenarios in greater detail
> >> subsequently (in a different patchset).
> >>
> >> The libcamera request completion handler CameraDevice::requestComplete()
> >> assumes that the request that has just completed is at the front of the
> >> queue. Now that the post-processor runs asynchronously, this isn't true
> >> anymore, a request being post-processed will stay in the queue and a new
> >> libcamera request may complete. Remove that assumption, and use the
> >> request cookie to obtain the Camera3RequestDescriptor.
> >>
> >> Signed-off-by: Umang Jain <umang.jain@ideasonboard.com>
> >> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> >> ---
> >>   src/android/camera_device.cpp |  44 ++++++---------
> >>   src/android/camera_stream.cpp | 101 ++++++++++++++++++++++++++++++++--
> >>   src/android/camera_stream.h   |  37 +++++++++++++
> >>   3 files changed, 151 insertions(+), 31 deletions(-)
> >>
> >> diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
> >> index 3ded0f7e..53ebe0ea 100644
> >> --- a/src/android/camera_device.cpp
> >> +++ b/src/android/camera_device.cpp
> >> @@ -1027,29 +1027,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
> >>   
> >>   void CameraDevice::requestComplete(Request *request)
> >>   {
> >> -	Camera3RequestDescriptor *descriptor;
> >> -	{
> >> -		MutexLocker descriptorsLock(descriptorsMutex_);
> >> -		ASSERT(!descriptors_.empty());
> >> -		descriptor = descriptors_.front().get();
> >> -	}
> >> -
> >> -	if (descriptor->request_->cookie() != request->cookie()) {
> >> -		/*
> >> -		 * \todo Clarify if the Camera has to be closed on
> >> -		 * ERROR_DEVICE.
> >> -		 */
> >> -		LOG(HAL, Error)
> >> -			<< "Out-of-order completion for request "
> >> -			<< utils::hex(request->cookie());
> >> -
> >> -		MutexLocker descriptorsLock(descriptorsMutex_);
> >> -		descriptors_.pop();
> >> -
> >> -		notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
> >> -
> >> -		return;
> >> -	}
> >> +	Camera3RequestDescriptor *descriptor =
> >> +		reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
> >>   
> >>   	/*
> >>   	 * Prepare the capture result for the Android camera stack.
> >> @@ -1124,9 +1103,13 @@ void CameraDevice::requestComplete(Request *request)
> >>   	}
> >>   
> >>   	/* Handle post-processing. */
> >> +	MutexLocker locker(descriptor->streamsProcessMutex_);
> >> +
> >>   	/*
> >> -	 * \todo Protect the loop below with streamProcessMutex_ when post
> > In the patch that introduced this,
> >
> > s/streamProcessMutex_/streamsProcessMutex_/
> >
> >> -	 * processor runs asynchronously.
> >> +	 * Queue all the post-processing streams request at once. The completion
> >> +	 * slot streamProcessingComplete() can only execute when we are out
> >> +	 * this critical section. This helps to handle synchronous errors here
> >> +	 * itself.
> >>   	 */
> >>   	auto iter = descriptor->pendingStreamsToProcess_.begin();
> >>   	while (iter != descriptor->pendingStreamsToProcess_.end()) {
> >> @@ -1158,8 +1141,10 @@ void CameraDevice::requestComplete(Request *request)
> >>   		}
> >>   	}
> >>   
> >> -	if (descriptor->pendingStreamsToProcess_.empty())
> >> +	if (descriptor->pendingStreamsToProcess_.empty()) {
> >> +		locker.unlock();
> >>   		completeDescriptor(descriptor);
> >> +	}
> >>   }
> >>   
> >>   void CameraDevice::completeDescriptor(Camera3RequestDescriptor *descriptor)
> >> @@ -1245,6 +1230,13 @@ void CameraDevice::streamProcessingComplete(Camera3RequestDescriptor::StreamBuff
> >>   	MutexLocker locker(request->streamsProcessMutex_);
> >>   
> >>   	request->pendingStreamsToProcess_.erase(streamBuffer->stream);
> >> +
> >> +	if (!request->pendingStreamsToProcess_.empty())
> >> +		return;
> >> +
> >> +	locker.unlock();
> > Maybe
> >
> > 	{
> > 		MutexLocker locker(request->streamsProcessMutex_);
> >
> > 		request->pendingStreamsToProcess_.erase(streamBuffer->stream);
> > 		if (!request->pendingStreamsToProcess_.empty())
> > 			return;
> > 	}
> >
> > up to you.
> 
> 
> Ok, yes, this looks cleaner
> 
> >
> >> +
> >> +	completeDescriptor(streamBuffer->request);
> >>   }
> >>   
> >>   std::string CameraDevice::logPrefix() const
> >> diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
> >> index fed99022..9023c13c 100644
> >> --- a/src/android/camera_stream.cpp
> >> +++ b/src/android/camera_stream.cpp
> >> @@ -99,6 +99,7 @@ int CameraStream::configure()
> >>   		if (ret)
> >>   			return ret;
> >>   
> >> +		worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
> >>   		postProcessor_->processComplete.connect(
> >>   			this, [&](Camera3RequestDescriptor::StreamBuffer *streamBuffer,
> >>   				  PostProcessor::Status status) {
> >> @@ -112,6 +113,8 @@ int CameraStream::configure()
> >>   				cameraDevice_->streamProcessingComplete(streamBuffer,
> >>   									bufferStatus);
> >>   			});
> >> +
> >> +		worker_->start();
> >>   	}
> >>   
> >>   	if (type_ == Type::Internal) {
> >> @@ -178,10 +181,6 @@ int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer)
> >>   		streamBuffer->fence = -1;
> >>   	}
> >>   
> >> -	/*
> >> -	 * \todo Buffer mapping and processing should be moved to a
> >> -	 * separate thread.
> >> -	 */
> >>   	const StreamConfiguration &output = configuration();
> >>   	streamBuffer->dstBuffer = std::make_unique<CameraBuffer>(
> >>   		*streamBuffer->camera3Buffer, output.pixelFormat, output.size,
> >> @@ -191,11 +190,19 @@ int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer)
> >>   		return -EINVAL;
> >>   	}
> >>   
> >> -	postProcessor_->process(streamBuffer);
> >> +	worker_->queueRequest(streamBuffer);
> >>   
> >>   	return 0;
> >>   }
> >>   
> >> +void CameraStream::flush()
> >> +{
> >> +	if (!postProcessor_)
> >> +		return;
> >> +
> >> +	worker_->flush();
> >> +}
> >> +
> >>   FrameBuffer *CameraStream::getBuffer()
> >>   {
> >>   	if (!allocator_)
> >> @@ -223,3 +230,87 @@ void CameraStream::putBuffer(FrameBuffer *buffer)
> >>   
> >>   	buffers_.push_back(buffer);
> >>   }
> >> +
> >> +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
> >> +	: postProcessor_(postProcessor)
> >> +{
> >> +}
> >> +
> >> +CameraStream::PostProcessorWorker::~PostProcessorWorker()
> >> +{
> >> +	{
> >> +		libcamera::MutexLocker lock(mutex_);
> >> +		state_ = State::Stopped;
> >> +	}
> >> +
> >> +	cv_.notify_one();
> >> +	wait();
> >> +}
> >> +
> >> +void CameraStream::PostProcessorWorker::start()
> >> +{
> >> +	{
> >> +		libcamera::MutexLocker lock(mutex_);
> >> +		ASSERT(state_ != State::Running);
> >> +		state_ = State::Running;
> >> +	}
> >> +
> >> +	Thread::start();
> >> +}
> >> +
> >> +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest)
> >> +{
> >> +	{
> >> +		MutexLocker lock(mutex_);
> >> +		ASSERT(state_ == State::Running);
> >> +		requests_.push(dest);
> >> +	}
> >> +
> >> +	cv_.notify_one();
> >> +}
> >> +
> >> +void CameraStream::PostProcessorWorker::run()
> >> +{
> >> +	MutexLocker locker(mutex_);
> >> +
> >> +	while (1) {
> >> +		cv_.wait(locker, [&] {
> >> +			return state_ != State::Running || !requests_.empty();
> >> +		});
> >> +
> >> +		if (state_ != State::Running)
> >> +			break;
> >> +
> >> +		Camera3RequestDescriptor::StreamBuffer *streamBuffer = requests_.front();
> >> +		requests_.pop();
> >> +		locker.unlock();
> >> +
> >> +		postProcessor_->process(streamBuffer);
> >> +
> >> +		locker.lock();
> >> +	}
> >> +
> >> +	if (state_ == State::Flushing) {
> >> +		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests =
> >> +			std::move(requests_);
> >> +		locker.unlock();
> >> +
> >> +		while (!requests.empty()) {
> >> +			postProcessor_->processComplete.emit(
> >> +				requests.front(), PostProcessor::Status::Error);
> >> +			requests.pop();
> >> +		}
> >> +
> >> +		locker.lock();
> >> +		state_ = State::Stopped;
> >> +	}
> >> +}
> >> +
> >> +void CameraStream::PostProcessorWorker::flush()
> >> +{
> >> +	libcamera::MutexLocker lock(mutex_);
> >> +	state_ = State::Flushing;
> >> +	lock.unlock();
> >> +
> >> +	cv_.notify_one();
> >> +}
> >> diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
> >> index e74a9a3b..1588938a 100644
> >> --- a/src/android/camera_stream.h
> >> +++ b/src/android/camera_stream.h
> >> @@ -7,12 +7,16 @@
> >>   #ifndef __ANDROID_CAMERA_STREAM_H__
> >>   #define __ANDROID_CAMERA_STREAM_H__
> >>   
> >> +#include <condition_variable>
> >>   #include <memory>
> >>   #include <mutex>
> >> +#include <queue>
> >>   #include <vector>
> >>   
> >>   #include <hardware/camera3.h>
> >>   
> >> +#include <libcamera/base/thread.h>
> >> +
> >>   #include <libcamera/camera.h>
> >>   #include <libcamera/framebuffer.h>
> >>   #include <libcamera/framebuffer_allocator.h>
> >> @@ -20,6 +24,7 @@
> >>   #include <libcamera/pixel_format.h>
> >>   
> >>   #include "camera_request.h"
> >> +#include "post_processor.h"
> >>   
> >>   class CameraDevice;
> >>   class PostProcessor;
> >
> > You can drop the forward declaration.
> 
> Ouch, What happens when you manually cherry-pick patches, because 
> conflicts were too hard to deal with :-/
> 
> I will address these changes locally, should  I re-post a new version v8 
> with those changes?

They're small enough, if you're confident you got them right, I don't
need a v8 on the list.

> >> @@ -124,8 +129,38 @@ public:
> >>   	int process(Camera3RequestDescriptor::StreamBuffer *streamBuffer);
> >>   	libcamera::FrameBuffer *getBuffer();
> >>   	void putBuffer(libcamera::FrameBuffer *buffer);
> >> +	void flush();
> >>   
> >>   private:
> >> +	class PostProcessorWorker : public libcamera::Thread
> >> +	{
> >> +	public:
> >> +		enum class State {
> >> +			Stopped,
> >> +			Running,
> >> +			Flushing,
> >> +		};
> >> +
> >> +		PostProcessorWorker(PostProcessor *postProcessor);
> >> +		~PostProcessorWorker();
> >> +
> >> +		void start();
> >> +		void queueRequest(Camera3RequestDescriptor::StreamBuffer *request);
> >> +		void flush();
> >> +
> >> +	protected:
> >> +		void run() override;
> >> +
> >> +	private:
> >> +		PostProcessor *postProcessor_;
> >> +
> >> +		libcamera::Mutex mutex_;
> >> +		std::condition_variable cv_;
> >> +
> >> +		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_;
> >> +		State state_ = State::Stopped;
> >> +	};
> >> +
> >>   	int waitFence(int fence);
> >>   
> >>   	CameraDevice *const cameraDevice_;
> >> @@ -142,6 +177,8 @@ private:
> >>   	 */
> >>   	std::unique_ptr<std::mutex> mutex_;
> >>   	std::unique_ptr<PostProcessor> postProcessor_;
> >> +
> >> +	std::unique_ptr<PostProcessorWorker> worker_;
> >>   };
> >>   
> >>   #endif /* __ANDROID_CAMERA_STREAM__ */
Umang Jain Oct. 26, 2021, 7:42 a.m. UTC | #5
Hi Laurent,

On 10/26/21 12:50 PM, Laurent Pinchart wrote:
> Hi Umang,
>
> On Tue, Oct 26, 2021 at 10:09:22AM +0530, Umang Jain wrote:
>> On 10/26/21 3:35 AM, Laurent Pinchart wrote:
>>> On Tue, Oct 26, 2021 at 02:08:33AM +0530, Umang Jain wrote:
>>>> Introduce a dedicated worker class derived from libcamera::Thread.
>>>> The worker class maintains a queue for post-processing requests
>>>> and waits for a post-processing request to become available.
>>>> It will process them as per FIFO before de-queuing it from the
>>>> queue.
>>>>
>>>> The entire post-processing handling iteration is locked under
>>>> streamProcessMutex_ which helps us to queue all the post-processing
>>> s/streamProcessMutex_/streamsProcessMutex_/
>>>
>>>> request at once, before any of the post-processing completion slot
>>>> (streamProcessingComplete()) is allowed to run for post-processing
>>>> requests completing in parallel. This helps us to manage both
>>>> synchronous and asynchronous errors encountered during the entire
>>>> post processing operation. Since a post-processing operation can
>>>> even complete after CameraDevice::requestComplete() has returned,
>>>> we need to check and complete the descriptor from
>>>> streamProcessingComplete() running in the PostProcessorWorker's
>>>> thread.
>>>>
>>>> This patch also implements a flush() for the PostProcessorWorker
>>>> class which is responsible to purge post-processing requests
>>>> queued up while a camera is stopping/flushing. It is hooked with
>>>> CameraStream::flush(), which isn't used currently but will be
>>>> used when we handle flush/stop scenarios in greater detail
>>>> subsequently (in a different patchset).
>>>>
>>>> The libcamera request completion handler CameraDevice::requestComplete()
>>>> assumes that the request that has just completed is at the front of the
>>>> queue. Now that the post-processor runs asynchronously, this isn't true
>>>> anymore, a request being post-processed will stay in the queue and a new
>>>> libcamera request may complete. Remove that assumption, and use the
>>>> request cookie to obtain the Camera3RequestDescriptor.
>>>>
>>>> Signed-off-by: Umang Jain <umang.jain@ideasonboard.com>
>>>> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
>>>> ---
>>>>    src/android/camera_device.cpp |  44 ++++++---------
>>>>    src/android/camera_stream.cpp | 101 ++++++++++++++++++++++++++++++++--
>>>>    src/android/camera_stream.h   |  37 +++++++++++++
>>>>    3 files changed, 151 insertions(+), 31 deletions(-)
>>>>
>>>> diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
>>>> index 3ded0f7e..53ebe0ea 100644
>>>> --- a/src/android/camera_device.cpp
>>>> +++ b/src/android/camera_device.cpp
>>>> @@ -1027,29 +1027,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
>>>>    
>>>>    void CameraDevice::requestComplete(Request *request)
>>>>    {
>>>> -	Camera3RequestDescriptor *descriptor;
>>>> -	{
>>>> -		MutexLocker descriptorsLock(descriptorsMutex_);
>>>> -		ASSERT(!descriptors_.empty());
>>>> -		descriptor = descriptors_.front().get();
>>>> -	}
>>>> -
>>>> -	if (descriptor->request_->cookie() != request->cookie()) {
>>>> -		/*
>>>> -		 * \todo Clarify if the Camera has to be closed on
>>>> -		 * ERROR_DEVICE.
>>>> -		 */
>>>> -		LOG(HAL, Error)
>>>> -			<< "Out-of-order completion for request "
>>>> -			<< utils::hex(request->cookie());
>>>> -
>>>> -		MutexLocker descriptorsLock(descriptorsMutex_);
>>>> -		descriptors_.pop();
>>>> -
>>>> -		notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
>>>> -
>>>> -		return;
>>>> -	}
>>>> +	Camera3RequestDescriptor *descriptor =
>>>> +		reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
>>>>    
>>>>    	/*
>>>>    	 * Prepare the capture result for the Android camera stack.
>>>> @@ -1124,9 +1103,13 @@ void CameraDevice::requestComplete(Request *request)
>>>>    	}
>>>>    
>>>>    	/* Handle post-processing. */
>>>> +	MutexLocker locker(descriptor->streamsProcessMutex_);
>>>> +
>>>>    	/*
>>>> -	 * \todo Protect the loop below with streamProcessMutex_ when post
>>> In the patch that introduced this,
>>>
>>> s/streamProcessMutex_/streamsProcessMutex_/
>>>
>>>> -	 * processor runs asynchronously.
>>>> +	 * Queue all the post-processing streams request at once. The completion
>>>> +	 * slot streamProcessingComplete() can only execute when we are out
>>>> +	 * this critical section. This helps to handle synchronous errors here
>>>> +	 * itself.
>>>>    	 */
>>>>    	auto iter = descriptor->pendingStreamsToProcess_.begin();
>>>>    	while (iter != descriptor->pendingStreamsToProcess_.end()) {
>>>> @@ -1158,8 +1141,10 @@ void CameraDevice::requestComplete(Request *request)
>>>>    		}
>>>>    	}
>>>>    
>>>> -	if (descriptor->pendingStreamsToProcess_.empty())
>>>> +	if (descriptor->pendingStreamsToProcess_.empty()) {
>>>> +		locker.unlock();
>>>>    		completeDescriptor(descriptor);
>>>> +	}
>>>>    }
>>>>    
>>>>    void CameraDevice::completeDescriptor(Camera3RequestDescriptor *descriptor)
>>>> @@ -1245,6 +1230,13 @@ void CameraDevice::streamProcessingComplete(Camera3RequestDescriptor::StreamBuff
>>>>    	MutexLocker locker(request->streamsProcessMutex_);
>>>>    
>>>>    	request->pendingStreamsToProcess_.erase(streamBuffer->stream);
>>>> +
>>>> +	if (!request->pendingStreamsToProcess_.empty())
>>>> +		return;
>>>> +
>>>> +	locker.unlock();
>>> Maybe
>>>
>>> 	{
>>> 		MutexLocker locker(request->streamsProcessMutex_);
>>>
>>> 		request->pendingStreamsToProcess_.erase(streamBuffer->stream);
>>> 		if (!request->pendingStreamsToProcess_.empty())
>>> 			return;
>>> 	}
>>>
>>> up to you.
>>
>> Ok, yes, this looks cleaner
>>
>>>> +
>>>> +	completeDescriptor(streamBuffer->request);
>>>>    }
>>>>    
>>>>    std::string CameraDevice::logPrefix() const
>>>> diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
>>>> index fed99022..9023c13c 100644
>>>> --- a/src/android/camera_stream.cpp
>>>> +++ b/src/android/camera_stream.cpp
>>>> @@ -99,6 +99,7 @@ int CameraStream::configure()
>>>>    		if (ret)
>>>>    			return ret;
>>>>    
>>>> +		worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
>>>>    		postProcessor_->processComplete.connect(
>>>>    			this, [&](Camera3RequestDescriptor::StreamBuffer *streamBuffer,
>>>>    				  PostProcessor::Status status) {
>>>> @@ -112,6 +113,8 @@ int CameraStream::configure()
>>>>    				cameraDevice_->streamProcessingComplete(streamBuffer,
>>>>    									bufferStatus);
>>>>    			});
>>>> +
>>>> +		worker_->start();
>>>>    	}
>>>>    
>>>>    	if (type_ == Type::Internal) {
>>>> @@ -178,10 +181,6 @@ int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer)
>>>>    		streamBuffer->fence = -1;
>>>>    	}
>>>>    
>>>> -	/*
>>>> -	 * \todo Buffer mapping and processing should be moved to a
>>>> -	 * separate thread.
>>>> -	 */
>>>>    	const StreamConfiguration &output = configuration();
>>>>    	streamBuffer->dstBuffer = std::make_unique<CameraBuffer>(
>>>>    		*streamBuffer->camera3Buffer, output.pixelFormat, output.size,
>>>> @@ -191,11 +190,19 @@ int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer)
>>>>    		return -EINVAL;
>>>>    	}
>>>>    
>>>> -	postProcessor_->process(streamBuffer);
>>>> +	worker_->queueRequest(streamBuffer);
>>>>    
>>>>    	return 0;
>>>>    }
>>>>    
>>>> +void CameraStream::flush()
>>>> +{
>>>> +	if (!postProcessor_)
>>>> +		return;
>>>> +
>>>> +	worker_->flush();
>>>> +}
>>>> +
>>>>    FrameBuffer *CameraStream::getBuffer()
>>>>    {
>>>>    	if (!allocator_)
>>>> @@ -223,3 +230,87 @@ void CameraStream::putBuffer(FrameBuffer *buffer)
>>>>    
>>>>    	buffers_.push_back(buffer);
>>>>    }
>>>> +
>>>> +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
>>>> +	: postProcessor_(postProcessor)
>>>> +{
>>>> +}
>>>> +
>>>> +CameraStream::PostProcessorWorker::~PostProcessorWorker()
>>>> +{
>>>> +	{
>>>> +		libcamera::MutexLocker lock(mutex_);
>>>> +		state_ = State::Stopped;
>>>> +	}
>>>> +
>>>> +	cv_.notify_one();
>>>> +	wait();
>>>> +}
>>>> +
>>>> +void CameraStream::PostProcessorWorker::start()
>>>> +{
>>>> +	{
>>>> +		libcamera::MutexLocker lock(mutex_);
>>>> +		ASSERT(state_ != State::Running);
>>>> +		state_ = State::Running;
>>>> +	}
>>>> +
>>>> +	Thread::start();
>>>> +}
>>>> +
>>>> +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest)
>>>> +{
>>>> +	{
>>>> +		MutexLocker lock(mutex_);
>>>> +		ASSERT(state_ == State::Running);
>>>> +		requests_.push(dest);
>>>> +	}
>>>> +
>>>> +	cv_.notify_one();
>>>> +}
>>>> +
>>>> +void CameraStream::PostProcessorWorker::run()
>>>> +{
>>>> +	MutexLocker locker(mutex_);
>>>> +
>>>> +	while (1) {
>>>> +		cv_.wait(locker, [&] {
>>>> +			return state_ != State::Running || !requests_.empty();
>>>> +		});
>>>> +
>>>> +		if (state_ != State::Running)
>>>> +			break;
>>>> +
>>>> +		Camera3RequestDescriptor::StreamBuffer *streamBuffer = requests_.front();
>>>> +		requests_.pop();
>>>> +		locker.unlock();
>>>> +
>>>> +		postProcessor_->process(streamBuffer);
>>>> +
>>>> +		locker.lock();
>>>> +	}
>>>> +
>>>> +	if (state_ == State::Flushing) {
>>>> +		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests =
>>>> +			std::move(requests_);
>>>> +		locker.unlock();
>>>> +
>>>> +		while (!requests.empty()) {
>>>> +			postProcessor_->processComplete.emit(
>>>> +				requests.front(), PostProcessor::Status::Error);
>>>> +			requests.pop();
>>>> +		}
>>>> +
>>>> +		locker.lock();
>>>> +		state_ = State::Stopped;
>>>> +	}
>>>> +}
>>>> +
>>>> +void CameraStream::PostProcessorWorker::flush()
>>>> +{
>>>> +	libcamera::MutexLocker lock(mutex_);
>>>> +	state_ = State::Flushing;
>>>> +	lock.unlock();
>>>> +
>>>> +	cv_.notify_one();
>>>> +}
>>>> diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
>>>> index e74a9a3b..1588938a 100644
>>>> --- a/src/android/camera_stream.h
>>>> +++ b/src/android/camera_stream.h
>>>> @@ -7,12 +7,16 @@
>>>>    #ifndef __ANDROID_CAMERA_STREAM_H__
>>>>    #define __ANDROID_CAMERA_STREAM_H__
>>>>    
>>>> +#include <condition_variable>
>>>>    #include <memory>
>>>>    #include <mutex>
>>>> +#include <queue>
>>>>    #include <vector>
>>>>    
>>>>    #include <hardware/camera3.h>
>>>>    
>>>> +#include <libcamera/base/thread.h>
>>>> +
>>>>    #include <libcamera/camera.h>
>>>>    #include <libcamera/framebuffer.h>
>>>>    #include <libcamera/framebuffer_allocator.h>
>>>> @@ -20,6 +24,7 @@
>>>>    #include <libcamera/pixel_format.h>
>>>>    
>>>>    #include "camera_request.h"
>>>> +#include "post_processor.h"
>>>>    
>>>>    class CameraDevice;
>>>>    class PostProcessor;
>>> You can drop the forward declaration.
>> Ouch, What happens when you manually cherry-pick patches, because
>> conflicts were too hard to deal with :-/
>>
>> I will address these changes locally, should  I re-post a new version v8
>> with those changes?
> They're small enough, if you're confident you got them right, I don't
> need a v8 on the list.


Aahh.. Saw this email right now. Already posted v8 on the list.

Anyway, we are missing one R-B tag on last one patch, so it would be 
good for someone to look at cleaned up patches.

>
>>>> @@ -124,8 +129,38 @@ public:
>>>>    	int process(Camera3RequestDescriptor::StreamBuffer *streamBuffer);
>>>>    	libcamera::FrameBuffer *getBuffer();
>>>>    	void putBuffer(libcamera::FrameBuffer *buffer);
>>>> +	void flush();
>>>>    
>>>>    private:
>>>> +	class PostProcessorWorker : public libcamera::Thread
>>>> +	{
>>>> +	public:
>>>> +		enum class State {
>>>> +			Stopped,
>>>> +			Running,
>>>> +			Flushing,
>>>> +		};
>>>> +
>>>> +		PostProcessorWorker(PostProcessor *postProcessor);
>>>> +		~PostProcessorWorker();
>>>> +
>>>> +		void start();
>>>> +		void queueRequest(Camera3RequestDescriptor::StreamBuffer *request);
>>>> +		void flush();
>>>> +
>>>> +	protected:
>>>> +		void run() override;
>>>> +
>>>> +	private:
>>>> +		PostProcessor *postProcessor_;
>>>> +
>>>> +		libcamera::Mutex mutex_;
>>>> +		std::condition_variable cv_;
>>>> +
>>>> +		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_;
>>>> +		State state_ = State::Stopped;
>>>> +	};
>>>> +
>>>>    	int waitFence(int fence);
>>>>    
>>>>    	CameraDevice *const cameraDevice_;
>>>> @@ -142,6 +177,8 @@ private:
>>>>    	 */
>>>>    	std::unique_ptr<std::mutex> mutex_;
>>>>    	std::unique_ptr<PostProcessor> postProcessor_;
>>>> +
>>>> +	std::unique_ptr<PostProcessorWorker> worker_;
>>>>    };
>>>>    
>>>>    #endif /* __ANDROID_CAMERA_STREAM__ */

Patch
diff mbox series

diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
index 3ded0f7e..53ebe0ea 100644
--- a/src/android/camera_device.cpp
+++ b/src/android/camera_device.cpp
@@ -1027,29 +1027,8 @@  int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
 
 void CameraDevice::requestComplete(Request *request)
 {
-	Camera3RequestDescriptor *descriptor;
-	{
-		MutexLocker descriptorsLock(descriptorsMutex_);
-		ASSERT(!descriptors_.empty());
-		descriptor = descriptors_.front().get();
-	}
-
-	if (descriptor->request_->cookie() != request->cookie()) {
-		/*
-		 * \todo Clarify if the Camera has to be closed on
-		 * ERROR_DEVICE.
-		 */
-		LOG(HAL, Error)
-			<< "Out-of-order completion for request "
-			<< utils::hex(request->cookie());
-
-		MutexLocker descriptorsLock(descriptorsMutex_);
-		descriptors_.pop();
-
-		notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
-
-		return;
-	}
+	Camera3RequestDescriptor *descriptor =
+		reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
 
 	/*
 	 * Prepare the capture result for the Android camera stack.
@@ -1124,9 +1103,13 @@  void CameraDevice::requestComplete(Request *request)
 	}
 
 	/* Handle post-processing. */
+	MutexLocker locker(descriptor->streamsProcessMutex_);
+
 	/*
-	 * \todo Protect the loop below with streamProcessMutex_ when post
-	 * processor runs asynchronously.
+	 * Queue all the post-processing streams request at once. The completion
+	 * slot streamProcessingComplete() can only execute when we are out
+	 * this critical section. This helps to handle synchronous errors here
+	 * itself.
 	 */
 	auto iter = descriptor->pendingStreamsToProcess_.begin();
 	while (iter != descriptor->pendingStreamsToProcess_.end()) {
@@ -1158,8 +1141,10 @@  void CameraDevice::requestComplete(Request *request)
 		}
 	}
 
-	if (descriptor->pendingStreamsToProcess_.empty())
+	if (descriptor->pendingStreamsToProcess_.empty()) {
+		locker.unlock();
 		completeDescriptor(descriptor);
+	}
 }
 
 void CameraDevice::completeDescriptor(Camera3RequestDescriptor *descriptor)
@@ -1245,6 +1230,13 @@  void CameraDevice::streamProcessingComplete(Camera3RequestDescriptor::StreamBuff
 	MutexLocker locker(request->streamsProcessMutex_);
 
 	request->pendingStreamsToProcess_.erase(streamBuffer->stream);
+
+	if (!request->pendingStreamsToProcess_.empty())
+		return;
+
+	locker.unlock();
+
+	completeDescriptor(streamBuffer->request);
 }
 
 std::string CameraDevice::logPrefix() const
diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
index fed99022..9023c13c 100644
--- a/src/android/camera_stream.cpp
+++ b/src/android/camera_stream.cpp
@@ -99,6 +99,7 @@  int CameraStream::configure()
 		if (ret)
 			return ret;
 
+		worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
 		postProcessor_->processComplete.connect(
 			this, [&](Camera3RequestDescriptor::StreamBuffer *streamBuffer,
 				  PostProcessor::Status status) {
@@ -112,6 +113,8 @@  int CameraStream::configure()
 				cameraDevice_->streamProcessingComplete(streamBuffer,
 									bufferStatus);
 			});
+
+		worker_->start();
 	}
 
 	if (type_ == Type::Internal) {
@@ -178,10 +181,6 @@  int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer)
 		streamBuffer->fence = -1;
 	}
 
-	/*
-	 * \todo Buffer mapping and processing should be moved to a
-	 * separate thread.
-	 */
 	const StreamConfiguration &output = configuration();
 	streamBuffer->dstBuffer = std::make_unique<CameraBuffer>(
 		*streamBuffer->camera3Buffer, output.pixelFormat, output.size,
@@ -191,11 +190,19 @@  int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer)
 		return -EINVAL;
 	}
 
-	postProcessor_->process(streamBuffer);
+	worker_->queueRequest(streamBuffer);
 
 	return 0;
 }
 
+void CameraStream::flush()
+{
+	if (!postProcessor_)
+		return;
+
+	worker_->flush();
+}
+
 FrameBuffer *CameraStream::getBuffer()
 {
 	if (!allocator_)
@@ -223,3 +230,87 @@  void CameraStream::putBuffer(FrameBuffer *buffer)
 
 	buffers_.push_back(buffer);
 }
+
+CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
+	: postProcessor_(postProcessor)
+{
+}
+
+CameraStream::PostProcessorWorker::~PostProcessorWorker()
+{
+	{
+		libcamera::MutexLocker lock(mutex_);
+		state_ = State::Stopped;
+	}
+
+	cv_.notify_one();
+	wait();
+}
+
+void CameraStream::PostProcessorWorker::start()
+{
+	{
+		libcamera::MutexLocker lock(mutex_);
+		ASSERT(state_ != State::Running);
+		state_ = State::Running;
+	}
+
+	Thread::start();
+}
+
+void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest)
+{
+	{
+		MutexLocker lock(mutex_);
+		ASSERT(state_ == State::Running);
+		requests_.push(dest);
+	}
+
+	cv_.notify_one();
+}
+
+void CameraStream::PostProcessorWorker::run()
+{
+	MutexLocker locker(mutex_);
+
+	while (1) {
+		cv_.wait(locker, [&] {
+			return state_ != State::Running || !requests_.empty();
+		});
+
+		if (state_ != State::Running)
+			break;
+
+		Camera3RequestDescriptor::StreamBuffer *streamBuffer = requests_.front();
+		requests_.pop();
+		locker.unlock();
+
+		postProcessor_->process(streamBuffer);
+
+		locker.lock();
+	}
+
+	if (state_ == State::Flushing) {
+		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests =
+			std::move(requests_);
+		locker.unlock();
+
+		while (!requests.empty()) {
+			postProcessor_->processComplete.emit(
+				requests.front(), PostProcessor::Status::Error);
+			requests.pop();
+		}
+
+		locker.lock();
+		state_ = State::Stopped;
+	}
+}
+
+void CameraStream::PostProcessorWorker::flush()
+{
+	libcamera::MutexLocker lock(mutex_);
+	state_ = State::Flushing;
+	lock.unlock();
+
+	cv_.notify_one();
+}
diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
index e74a9a3b..1588938a 100644
--- a/src/android/camera_stream.h
+++ b/src/android/camera_stream.h
@@ -7,12 +7,16 @@ 
 #ifndef __ANDROID_CAMERA_STREAM_H__
 #define __ANDROID_CAMERA_STREAM_H__
 
+#include <condition_variable>
 #include <memory>
 #include <mutex>
+#include <queue>
 #include <vector>
 
 #include <hardware/camera3.h>
 
+#include <libcamera/base/thread.h>
+
 #include <libcamera/camera.h>
 #include <libcamera/framebuffer.h>
 #include <libcamera/framebuffer_allocator.h>
@@ -20,6 +24,7 @@ 
 #include <libcamera/pixel_format.h>
 
 #include "camera_request.h"
+#include "post_processor.h"
 
 class CameraDevice;
 class PostProcessor;
@@ -124,8 +129,38 @@  public:
 	int process(Camera3RequestDescriptor::StreamBuffer *streamBuffer);
 	libcamera::FrameBuffer *getBuffer();
 	void putBuffer(libcamera::FrameBuffer *buffer);
+	void flush();
 
 private:
+	class PostProcessorWorker : public libcamera::Thread
+	{
+	public:
+		enum class State {
+			Stopped,
+			Running,
+			Flushing,
+		};
+
+		PostProcessorWorker(PostProcessor *postProcessor);
+		~PostProcessorWorker();
+
+		void start();
+		void queueRequest(Camera3RequestDescriptor::StreamBuffer *request);
+		void flush();
+
+	protected:
+		void run() override;
+
+	private:
+		PostProcessor *postProcessor_;
+
+		libcamera::Mutex mutex_;
+		std::condition_variable cv_;
+
+		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_;
+		State state_ = State::Stopped;
+	};
+
 	int waitFence(int fence);
 
 	CameraDevice *const cameraDevice_;
@@ -142,6 +177,8 @@  private:
 	 */
 	std::unique_ptr<std::mutex> mutex_;
 	std::unique_ptr<PostProcessor> postProcessor_;
+
+	std::unique_ptr<PostProcessorWorker> worker_;
 };
 
 #endif /* __ANDROID_CAMERA_STREAM__ */