[libcamera-devel,v5,3/4] android: post_processor: Make post processing async
diff mbox series

Message ID 20211020104212.121743-4-umang.jain@ideasonboard.com
State Superseded
Headers show
Series
  • Async Post Processor
Related show

Commit Message

Umang Jain Oct. 20, 2021, 10:42 a.m. UTC
Introduce a dedicated worker class derived from libcamera::Thread.
The worker class maintains a queue for post-processing requests
and waits for a post-processing request to become available.
It will process them as per FIFO before de-queuing it from the
queue.

To get access to the source and destination buffers in the worker
thread, we also need to save a pointer to them in the
Camera3RequestDescriptor.

This patch also implements a flush() for the PostProcessorWorker
class which is responsible to purge post-processing requests
queued up while a camera is stopping/flushing. It is hooked with
CameraStream::flush(), which isn't used currently but will be
used when we handle flush/stop scnearios in greater detail
subsequently (in a different patchset).

The libcamera request completion handler CameraDevice::requestComplete()
assumes that the request that has just completed is at the front of the
queue. Now that the post-processor runs asynchronously, this isn't true
anymore, a request being post-processed will stay in the queue and a new
libcamera request may complete. Remove that assumption, and use the
request cookie to obtain the Camera3RequestDescriptor.

Signed-off-by: Umang Jain <umang.jain@ideasonboard.com>
Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
---
 src/android/camera_device.cpp |  46 +++++----------
 src/android/camera_request.h  |   4 ++
 src/android/camera_stream.cpp | 107 +++++++++++++++++++++++++++++++---
 src/android/camera_stream.h   |  38 +++++++++++-
 4 files changed, 156 insertions(+), 39 deletions(-)

Comments

Laurent Pinchart Oct. 21, 2021, 1:33 a.m. UTC | #1
Hi Umang,

Thank you for the patch.

On Wed, Oct 20, 2021 at 04:12:11PM +0530, Umang Jain wrote:
> Introduce a dedicated worker class derived from libcamera::Thread.
> The worker class maintains a queue for post-processing requests
> and waits for a post-processing request to become available.
> It will process them as per FIFO before de-queuing it from the
> queue.
> 
> To get access to the source and destination buffers in the worker
> thread, we also need to save a pointer to them in the

s/a pointer/pointers/

> Camera3RequestDescriptor.
> 
> This patch also implements a flush() for the PostProcessorWorker
> class which is responsible to purge post-processing requests
> queued up while a camera is stopping/flushing. It is hooked with
> CameraStream::flush(), which isn't used currently but will be
> used when we handle flush/stop scnearios in greater detail
> subsequently (in a different patchset).
> 
> The libcamera request completion handler CameraDevice::requestComplete()
> assumes that the request that has just completed is at the front of the
> queue. Now that the post-processor runs asynchronously, this isn't true
> anymore, a request being post-processed will stay in the queue and a new
> libcamera request may complete. Remove that assumption, and use the
> request cookie to obtain the Camera3RequestDescriptor.
> 
> Signed-off-by: Umang Jain <umang.jain@ideasonboard.com>
> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> ---
>  src/android/camera_device.cpp |  46 +++++----------
>  src/android/camera_request.h  |   4 ++
>  src/android/camera_stream.cpp | 107 +++++++++++++++++++++++++++++++---
>  src/android/camera_stream.h   |  38 +++++++++++-
>  4 files changed, 156 insertions(+), 39 deletions(-)
> 
> diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
> index 541c2c81..b14416ce 100644
> --- a/src/android/camera_device.cpp
> +++ b/src/android/camera_device.cpp
> @@ -1020,29 +1020,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
>  
>  void CameraDevice::requestComplete(Request *request)
>  {
> -	Camera3RequestDescriptor *descriptor;
> -	{
> -		MutexLocker descriptorsLock(descriptorsMutex_);
> -		ASSERT(!descriptors_.empty());
> -		descriptor = descriptors_.front().get();
> -	}
> -
> -	if (descriptor->request_->cookie() != request->cookie()) {
> -		/*
> -		 * \todo Clarify if the Camera has to be closed on
> -		 * ERROR_DEVICE.
> -		 */
> -		LOG(HAL, Error)
> -			<< "Out-of-order completion for request "
> -			<< utils::hex(request->cookie());
> -
> -		MutexLocker descriptorsLock(descriptorsMutex_);
> -		descriptors_.pop();
> -
> -		notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
> -
> -		return;
> -	}
> +	Camera3RequestDescriptor *descriptor =
> +		reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
>  
>  	/*
>  	 * Prepare the capture result for the Android camera stack.
> @@ -1120,12 +1099,7 @@ void CameraDevice::requestComplete(Request *request)
>  	bool needsPostProcessing = false;
>  	Camera3RequestDescriptor::Status processingStatus =
>  		Camera3RequestDescriptor::Status::Pending;
> -	/*
> -	 * \todo Apply streamsProcessMutex_ when post-processing is adapted to run
> -	 * asynchronously. If we introduce the streamsProcessMutex_ right away, the
> -	 * lock will be held forever since it is synchronous at this point
> -	 * (see streamProcessingComplete).
> -	 */
> +
>  	for (auto &buffer : descriptor->buffers_) {
>  		CameraStream *stream = buffer.stream;
>  
> @@ -1145,7 +1119,13 @@ void CameraDevice::requestComplete(Request *request)
>  			buffer.internalBuffer = src;
>  
>  		needsPostProcessing = true;
> -		int ret = stream->process(*src, buffer, descriptor);
> +
> +		int ret;
> +		{
> +			MutexLocker locker(descriptor->streamsProcessMutex_);

It may be possible (see my review of 2/4) to only lock access to the
data structures and not cover the process() call. Let's see how it turns
out.

> +			ret = stream->process(*src, buffer, descriptor);
> +		}
> +
>  		if (ret) {
>  			setBufferStatus(stream, buffer, descriptor,
>  					Camera3RequestDescriptor::Status::Error);
> @@ -1156,6 +1136,12 @@ void CameraDevice::requestComplete(Request *request)
>  	if (needsPostProcessing) {
>  		if (processingStatus == Camera3RequestDescriptor::Status::Error) {
>  			descriptor->status_ = processingStatus;
> +			/*
> +			 * \todo This is problematic when some streams are
> +			 * queued successfully, but some fail to get queued.
> +			 * We might end up with use-after-free situation for
> +			 * descriptor in streamProcessingComplete().
> +			 */

Hopefully this will be fixed in v6 of 2/4 :-)

>  			sendCaptureResults();
>  		}
>  
> diff --git a/src/android/camera_request.h b/src/android/camera_request.h
> index 3a2774e0..85274414 100644
> --- a/src/android/camera_request.h
> +++ b/src/android/camera_request.h
> @@ -18,6 +18,7 @@
>  
>  #include <hardware/camera3.h>
>  
> +#include "camera_buffer.h"
>  #include "camera_metadata.h"
>  #include "camera_worker.h"
>  
> @@ -39,6 +40,9 @@ public:
>  		int fence;
>  		Status status;
>  		libcamera::FrameBuffer *internalBuffer = nullptr;
> +		std::unique_ptr<CameraBuffer> destBuffer;
> +		const libcamera::FrameBuffer *srcBuffer;
> +		Camera3RequestDescriptor *request = nullptr;
>  	};
>  
>  	Camera3RequestDescriptor(libcamera::Camera *camera,
> diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
> index b3cb06a2..a29ce33b 100644
> --- a/src/android/camera_stream.cpp
> +++ b/src/android/camera_stream.cpp
> @@ -99,6 +99,7 @@ int CameraStream::configure()
>  		if (ret)
>  			return ret;
>  
> +		worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
>  		postProcessor_->processComplete.connect(
>  			this, [&](Camera3RequestDescriptor *request, PostProcessor::Status status) {
>  				Camera3RequestDescriptor::Status bufferStatus =
> @@ -110,6 +111,7 @@ int CameraStream::configure()
>  				cameraDevice_->streamProcessingComplete(this, request,
>  									bufferStatus);
>  			});
> +		worker_->start();
>  	}
>  
>  	if (type_ == Type::Internal) {
> @@ -179,23 +181,31 @@ int CameraStream::process(const FrameBuffer &source,
>  	if (!postProcessor_)
>  		return 0;
>  
> -	/*
> -	 * \todo Buffer mapping and processing should be moved to a
> -	 * separate thread.
> -	 */
>  	const StreamConfiguration &output = configuration();
> -	CameraBuffer destBuffer(*dest.camera3Buffer, output.pixelFormat,
> -				output.size, PROT_READ | PROT_WRITE);
> -	if (!destBuffer.isValid()) {
> +	dest.destBuffer = std::make_unique<CameraBuffer>(
> +		*dest.camera3Buffer, output.pixelFormat, output.size, PROT_READ | PROT_WRITE);

	dest.destBuffer = std::make_unique<CameraBuffer>(
		*dest.camera3Buffer, output.pixelFormat, output.size,
		PROT_READ | PROT_WRITE);

> +	if (!dest.destBuffer->isValid()) {
>  		LOG(HAL, Error) << "Failed to create destination buffer";
>  		return -EINVAL;
>  	}
>  
> -	postProcessor_->process(source, &destBuffer, request);
> +	dest.srcBuffer = &source;
> +	dest.request = request;
> +
> +	/* Push the postProcessor request to the worker queue. */
> +	worker_->queueRequest(&dest);
>  
>  	return 0;
>  }
>  
> +void CameraStream::flush()
> +{
> +	if (!postProcessor_)
> +		return;
> +
> +	worker_->flush();
> +}
> +
>  FrameBuffer *CameraStream::getBuffer()
>  {
>  	if (!allocator_)
> @@ -223,3 +233,84 @@ void CameraStream::putBuffer(FrameBuffer *buffer)
>  
>  	buffers_.push_back(buffer);
>  }
> +
> +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
> +	: postProcessor_(postProcessor)
> +{
> +}
> +
> +CameraStream::PostProcessorWorker::~PostProcessorWorker()
> +{
> +	{
> +		libcamera::MutexLocker lock(mutex_);
> +		state_ = State::Stopped;
> +	}
> +
> +	cv_.notify_one();
> +	wait();
> +}
> +
> +void CameraStream::PostProcessorWorker::start()
> +{
> +	{
> +		libcamera::MutexLocker lock(mutex_);

Could you add a

		ASSERT(state_ != State::Running);

here ? I'm worried that a future refactoring will call
CameraStream::configure() multiple times, and that would result in the
worker being start multiple times too.

> +		state_ = State::Running;
> +	}
> +
> +	Thread::start();
> +}
> +
> +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest)
> +{
> +	{
> +		MutexLocker lock(mutex_);
> +		ASSERT(state_ == State::Running);
> +		requests_.push(dest);
> +	}
> +
> +	cv_.notify_one();
> +}
> +
> +void CameraStream::PostProcessorWorker::run()
> +{
> +	MutexLocker locker(mutex_);
> +
> +	while (1) {
> +		cv_.wait(locker, [&] {
> +			return state_ != State::Running || !requests_.empty();
> +		});
> +
> +		if (state_ != State::Running)
> +			break;
> +
> +		Camera3RequestDescriptor::StreamBuffer *stream = requests_.front();
> +		requests_.pop();
> +		locker.unlock();
> +
> +		postProcessor_->process(*stream->srcBuffer, stream->destBuffer.get(),
> +					stream->request);
> +
> +		locker.lock();
> +	}
> +
> +	if (state_ == State::Flushing) {
> +		while (!requests_.empty()) {
> +			postProcessor_->processComplete.emit(
> +				requests_.front()->request,
> +				PostProcessor::Status::Error);

I'm also a tiny bit concerned here that we're emitting the signal with
the lock held, while for requests that complete normally we don't. The
behaviour isn't consistent and could cause issues in the CameraStream or
CameraDevice classes. I think it's harmless for now, but maybe we could
drop the lock around the emit(à call ? One option to avoid dropping and
taking the lock in every iteration would be

		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests = std::move(requests_);

		locker.unlock();

		while (!requests.empty()) {
			postProcessor_->processComplete.emit(
				requests.front()->request,
				PostProcessor::Status::Error);
			requests.pop();
		}

		locker.lock();

And now that I think about it, given that we now have a StreamBuffer
object, wouldn't it be simpler to pass the StreamBuffer instead of the
Camera3RequestDescriptor to the signal ?

> +			requests_.pop();
> +		}
> +
> +		state_ = State::Stopped;
> +		locker.unlock();

You can drop the unlock() call here.

> +	}
> +}
> +
> +void CameraStream::PostProcessorWorker::flush()
> +{
> +	libcamera::MutexLocker lock(mutex_);
> +	state_ = State::Flushing;
> +	lock.unlock();
> +
> +	cv_.notify_one();
> +}
> diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
> index f242336e..64e32c77 100644
> --- a/src/android/camera_stream.h
> +++ b/src/android/camera_stream.h
> @@ -7,12 +7,16 @@
>  #ifndef __ANDROID_CAMERA_STREAM_H__
>  #define __ANDROID_CAMERA_STREAM_H__
>  
> +#include <condition_variable>
>  #include <memory>
>  #include <mutex>
> +#include <queue>
>  #include <vector>
>  
>  #include <hardware/camera3.h>
>  
> +#include <libcamera/base/thread.h>
> +
>  #include <libcamera/camera.h>
>  #include <libcamera/framebuffer.h>
>  #include <libcamera/framebuffer_allocator.h>
> @@ -20,9 +24,9 @@
>  #include <libcamera/pixel_format.h>
>  
>  #include "camera_request.h"
> +#include "post_processor.h"
>  
>  class CameraDevice;
> -class PostProcessor;
>  
>  class CameraStream
>  {
> @@ -126,8 +130,38 @@ public:
>  		    Camera3RequestDescriptor *request);
>  	libcamera::FrameBuffer *getBuffer();
>  	void putBuffer(libcamera::FrameBuffer *buffer);
> +	void flush();
>  
>  private:
> +	class PostProcessorWorker : public libcamera::Thread
> +	{
> +	public:
> +		enum class State {
> +			Stopped,
> +			Running,
> +			Flushing,
> +		};
> +
> +		PostProcessorWorker(PostProcessor *postProcessor);
> +		~PostProcessorWorker();
> +
> +		void start();
> +		void queueRequest(Camera3RequestDescriptor::StreamBuffer *request);
> +		void flush();
> +
> +	protected:
> +		void run() override;
> +
> +	private:
> +		PostProcessor *postProcessor_;
> +
> +		libcamera::Mutex mutex_;
> +		std::condition_variable cv_;
> +
> +		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_;
> +		State state_;
> +	};
> +
>  	int waitFence(int fence);
>  
>  	CameraDevice *const cameraDevice_;
> @@ -144,6 +178,8 @@ private:
>  	 */
>  	std::unique_ptr<std::mutex> mutex_;
>  	std::unique_ptr<PostProcessor> postProcessor_;
> +
> +	std::unique_ptr<PostProcessorWorker> worker_;
>  };
>  
>  #endif /* __ANDROID_CAMERA_STREAM__ */
Umang Jain Oct. 21, 2021, 5:53 a.m. UTC | #2
Hi Laurent,

On 10/21/21 7:03 AM, Laurent Pinchart wrote:
> Hi Umang,
>
> Thank you for the patch.
>
> On Wed, Oct 20, 2021 at 04:12:11PM +0530, Umang Jain wrote:
>> Introduce a dedicated worker class derived from libcamera::Thread.
>> The worker class maintains a queue for post-processing requests
>> and waits for a post-processing request to become available.
>> It will process them as per FIFO before de-queuing it from the
>> queue.
>>
>> To get access to the source and destination buffers in the worker
>> thread, we also need to save a pointer to them in the
> s/a pointer/pointers/
>
>> Camera3RequestDescriptor.
>>
>> This patch also implements a flush() for the PostProcessorWorker
>> class which is responsible to purge post-processing requests
>> queued up while a camera is stopping/flushing. It is hooked with
>> CameraStream::flush(), which isn't used currently but will be
>> used when we handle flush/stop scnearios in greater detail
>> subsequently (in a different patchset).
>>
>> The libcamera request completion handler CameraDevice::requestComplete()
>> assumes that the request that has just completed is at the front of the
>> queue. Now that the post-processor runs asynchronously, this isn't true
>> anymore, a request being post-processed will stay in the queue and a new
>> libcamera request may complete. Remove that assumption, and use the
>> request cookie to obtain the Camera3RequestDescriptor.
>>
>> Signed-off-by: Umang Jain <umang.jain@ideasonboard.com>
>> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
>> ---
>>   src/android/camera_device.cpp |  46 +++++----------
>>   src/android/camera_request.h  |   4 ++
>>   src/android/camera_stream.cpp | 107 +++++++++++++++++++++++++++++++---
>>   src/android/camera_stream.h   |  38 +++++++++++-
>>   4 files changed, 156 insertions(+), 39 deletions(-)
>>
>> diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
>> index 541c2c81..b14416ce 100644
>> --- a/src/android/camera_device.cpp
>> +++ b/src/android/camera_device.cpp
>> @@ -1020,29 +1020,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
>>   
>>   void CameraDevice::requestComplete(Request *request)
>>   {
>> -	Camera3RequestDescriptor *descriptor;
>> -	{
>> -		MutexLocker descriptorsLock(descriptorsMutex_);
>> -		ASSERT(!descriptors_.empty());
>> -		descriptor = descriptors_.front().get();
>> -	}
>> -
>> -	if (descriptor->request_->cookie() != request->cookie()) {
>> -		/*
>> -		 * \todo Clarify if the Camera has to be closed on
>> -		 * ERROR_DEVICE.
>> -		 */
>> -		LOG(HAL, Error)
>> -			<< "Out-of-order completion for request "
>> -			<< utils::hex(request->cookie());
>> -
>> -		MutexLocker descriptorsLock(descriptorsMutex_);
>> -		descriptors_.pop();
>> -
>> -		notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
>> -
>> -		return;
>> -	}
>> +	Camera3RequestDescriptor *descriptor =
>> +		reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
>>   
>>   	/*
>>   	 * Prepare the capture result for the Android camera stack.
>> @@ -1120,12 +1099,7 @@ void CameraDevice::requestComplete(Request *request)
>>   	bool needsPostProcessing = false;
>>   	Camera3RequestDescriptor::Status processingStatus =
>>   		Camera3RequestDescriptor::Status::Pending;
>> -	/*
>> -	 * \todo Apply streamsProcessMutex_ when post-processing is adapted to run
>> -	 * asynchronously. If we introduce the streamsProcessMutex_ right away, the
>> -	 * lock will be held forever since it is synchronous at this point
>> -	 * (see streamProcessingComplete).
>> -	 */
>> +
>>   	for (auto &buffer : descriptor->buffers_) {
>>   		CameraStream *stream = buffer.stream;
>>   
>> @@ -1145,7 +1119,13 @@ void CameraDevice::requestComplete(Request *request)
>>   			buffer.internalBuffer = src;
>>   
>>   		needsPostProcessing = true;
>> -		int ret = stream->process(*src, buffer, descriptor);
>> +
>> +		int ret;
>> +		{
>> +			MutexLocker locker(descriptor->streamsProcessMutex_);
> It may be possible (see my review of 2/4) to only lock access to the
> data structures and not cover the process() call. Let's see how it turns
> out.


Yes, I need to draft the code and then only I'll be able to confirm. My 
deep-code visual skills  are kinda limited.

>
>> +			ret = stream->process(*src, buffer, descriptor);
>> +		}
>> +
>>   		if (ret) {
>>   			setBufferStatus(stream, buffer, descriptor,
>>   					Camera3RequestDescriptor::Status::Error);
>> @@ -1156,6 +1136,12 @@ void CameraDevice::requestComplete(Request *request)
>>   	if (needsPostProcessing) {
>>   		if (processingStatus == Camera3RequestDescriptor::Status::Error) {
>>   			descriptor->status_ = processingStatus;
>> +			/*
>> +			 * \todo This is problematic when some streams are
>> +			 * queued successfully, but some fail to get queued.
>> +			 * We might end up with use-after-free situation for
>> +			 * descriptor in streamProcessingComplete().
>> +			 */
> Hopefully this will be fixed in v6 of 2/4 :-)
>
>>   			sendCaptureResults();
>>   		}
>>   
>> diff --git a/src/android/camera_request.h b/src/android/camera_request.h
>> index 3a2774e0..85274414 100644
>> --- a/src/android/camera_request.h
>> +++ b/src/android/camera_request.h
>> @@ -18,6 +18,7 @@
>>   
>>   #include <hardware/camera3.h>
>>   
>> +#include "camera_buffer.h"
>>   #include "camera_metadata.h"
>>   #include "camera_worker.h"
>>   
>> @@ -39,6 +40,9 @@ public:
>>   		int fence;
>>   		Status status;
>>   		libcamera::FrameBuffer *internalBuffer = nullptr;
>> +		std::unique_ptr<CameraBuffer> destBuffer;
>> +		const libcamera::FrameBuffer *srcBuffer;
>> +		Camera3RequestDescriptor *request = nullptr;
>>   	};
>>   
>>   	Camera3RequestDescriptor(libcamera::Camera *camera,
>> diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
>> index b3cb06a2..a29ce33b 100644
>> --- a/src/android/camera_stream.cpp
>> +++ b/src/android/camera_stream.cpp
>> @@ -99,6 +99,7 @@ int CameraStream::configure()
>>   		if (ret)
>>   			return ret;
>>   
>> +		worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
>>   		postProcessor_->processComplete.connect(
>>   			this, [&](Camera3RequestDescriptor *request, PostProcessor::Status status) {
>>   				Camera3RequestDescriptor::Status bufferStatus =
>> @@ -110,6 +111,7 @@ int CameraStream::configure()
>>   				cameraDevice_->streamProcessingComplete(this, request,
>>   									bufferStatus);
>>   			});
>> +		worker_->start();
>>   	}
>>   
>>   	if (type_ == Type::Internal) {
>> @@ -179,23 +181,31 @@ int CameraStream::process(const FrameBuffer &source,
>>   	if (!postProcessor_)
>>   		return 0;
>>   
>> -	/*
>> -	 * \todo Buffer mapping and processing should be moved to a
>> -	 * separate thread.
>> -	 */
>>   	const StreamConfiguration &output = configuration();
>> -	CameraBuffer destBuffer(*dest.camera3Buffer, output.pixelFormat,
>> -				output.size, PROT_READ | PROT_WRITE);
>> -	if (!destBuffer.isValid()) {
>> +	dest.destBuffer = std::make_unique<CameraBuffer>(
>> +		*dest.camera3Buffer, output.pixelFormat, output.size, PROT_READ | PROT_WRITE);
> 	dest.destBuffer = std::make_unique<CameraBuffer>(
> 		*dest.camera3Buffer, output.pixelFormat, output.size,
> 		PROT_READ | PROT_WRITE);
>
>> +	if (!dest.destBuffer->isValid()) {
>>   		LOG(HAL, Error) << "Failed to create destination buffer";
>>   		return -EINVAL;
>>   	}
>>   
>> -	postProcessor_->process(source, &destBuffer, request);
>> +	dest.srcBuffer = &source;
>> +	dest.request = request;
>> +
>> +	/* Push the postProcessor request to the worker queue. */
>> +	worker_->queueRequest(&dest);
>>   
>>   	return 0;
>>   }
>>   
>> +void CameraStream::flush()
>> +{
>> +	if (!postProcessor_)
>> +		return;
>> +
>> +	worker_->flush();
>> +}
>> +
>>   FrameBuffer *CameraStream::getBuffer()
>>   {
>>   	if (!allocator_)
>> @@ -223,3 +233,84 @@ void CameraStream::putBuffer(FrameBuffer *buffer)
>>   
>>   	buffers_.push_back(buffer);
>>   }
>> +
>> +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
>> +	: postProcessor_(postProcessor)
>> +{
>> +}
>> +
>> +CameraStream::PostProcessorWorker::~PostProcessorWorker()
>> +{
>> +	{
>> +		libcamera::MutexLocker lock(mutex_);
>> +		state_ = State::Stopped;
>> +	}
>> +
>> +	cv_.notify_one();
>> +	wait();
>> +}
>> +
>> +void CameraStream::PostProcessorWorker::start()
>> +{
>> +	{
>> +		libcamera::MutexLocker lock(mutex_);
> Could you add a
>
> 		ASSERT(state_ != State::Running);
>
> here ? I'm worried that a future refactoring will call
> CameraStream::configure() multiple times, and that would result in the
> worker being start multiple times too.


Ack

>
>> +		state_ = State::Running;
>> +	}
>> +
>> +	Thread::start();
>> +}
>> +
>> +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest)
>> +{
>> +	{
>> +		MutexLocker lock(mutex_);
>> +		ASSERT(state_ == State::Running);
>> +		requests_.push(dest);
>> +	}
>> +
>> +	cv_.notify_one();
>> +}
>> +
>> +void CameraStream::PostProcessorWorker::run()
>> +{
>> +	MutexLocker locker(mutex_);
>> +
>> +	while (1) {
>> +		cv_.wait(locker, [&] {
>> +			return state_ != State::Running || !requests_.empty();
>> +		});
>> +
>> +		if (state_ != State::Running)
>> +			break;
>> +
>> +		Camera3RequestDescriptor::StreamBuffer *stream = requests_.front();
>> +		requests_.pop();
>> +		locker.unlock();
>> +
>> +		postProcessor_->process(*stream->srcBuffer, stream->destBuffer.get(),
>> +					stream->request);
>> +
>> +		locker.lock();
>> +	}
>> +
>> +	if (state_ == State::Flushing) {
>> +		while (!requests_.empty()) {
>> +			postProcessor_->processComplete.emit(
>> +				requests_.front()->request,
>> +				PostProcessor::Status::Error);
> I'm also a tiny bit concerned here that we're emitting the signal with
> the lock held, while for requests that complete normally we don't. The
> behaviour isn't consistent and could cause issues in the CameraStream or
> CameraDevice classes. I think it's harmless for now, but maybe we could
> drop the lock around the emit(à call ? One option to avoid dropping and
> taking the lock in every iteration would be
>
> 		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests = std::move(requests_);
>
> 		locker.unlock();
>
> 		while (!requests.empty()) {
> 			postProcessor_->processComplete.emit(
> 				requests.front()->request,
> 				PostProcessor::Status::Error);
> 			requests.pop();
> 		}
>
> 		locker.lock();
>
> And now that I think about it, given that we now have a StreamBuffer
> object, wouldn't it be simpler to pass the StreamBuffer instead of the
> Camera3RequestDescriptor to the signal ?


Yesteday, Jacopo might have suggested the same. I'll try to accomodate this.

>> +		}
>> +
>> +		state_ = State::Stopped;
>> +		locker.unlock();
> You can drop the unlock() call here.


Ack. Shouldn't be here

>
>> +	}
>> +}
>> +
>> +void CameraStream::PostProcessorWorker::flush()
>> +{
>> +	libcamera::MutexLocker lock(mutex_);
>> +	state_ = State::Flushing;
>> +	lock.unlock();
>> +
>> +	cv_.notify_one();
>> +}
>> diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
>> index f242336e..64e32c77 100644
>> --- a/src/android/camera_stream.h
>> +++ b/src/android/camera_stream.h
>> @@ -7,12 +7,16 @@
>>   #ifndef __ANDROID_CAMERA_STREAM_H__
>>   #define __ANDROID_CAMERA_STREAM_H__
>>   
>> +#include <condition_variable>
>>   #include <memory>
>>   #include <mutex>
>> +#include <queue>
>>   #include <vector>
>>   
>>   #include <hardware/camera3.h>
>>   
>> +#include <libcamera/base/thread.h>
>> +
>>   #include <libcamera/camera.h>
>>   #include <libcamera/framebuffer.h>
>>   #include <libcamera/framebuffer_allocator.h>
>> @@ -20,9 +24,9 @@
>>   #include <libcamera/pixel_format.h>
>>   
>>   #include "camera_request.h"
>> +#include "post_processor.h"
>>   
>>   class CameraDevice;
>> -class PostProcessor;
>>   
>>   class CameraStream
>>   {
>> @@ -126,8 +130,38 @@ public:
>>   		    Camera3RequestDescriptor *request);
>>   	libcamera::FrameBuffer *getBuffer();
>>   	void putBuffer(libcamera::FrameBuffer *buffer);
>> +	void flush();
>>   
>>   private:
>> +	class PostProcessorWorker : public libcamera::Thread
>> +	{
>> +	public:
>> +		enum class State {
>> +			Stopped,
>> +			Running,
>> +			Flushing,
>> +		};
>> +
>> +		PostProcessorWorker(PostProcessor *postProcessor);
>> +		~PostProcessorWorker();
>> +
>> +		void start();
>> +		void queueRequest(Camera3RequestDescriptor::StreamBuffer *request);
>> +		void flush();
>> +
>> +	protected:
>> +		void run() override;
>> +
>> +	private:
>> +		PostProcessor *postProcessor_;
>> +
>> +		libcamera::Mutex mutex_;
>> +		std::condition_variable cv_;
>> +
>> +		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_;
>> +		State state_;
>> +	};
>> +
>>   	int waitFence(int fence);
>>   
>>   	CameraDevice *const cameraDevice_;
>> @@ -144,6 +178,8 @@ private:
>>   	 */
>>   	std::unique_ptr<std::mutex> mutex_;
>>   	std::unique_ptr<PostProcessor> postProcessor_;
>> +
>> +	std::unique_ptr<PostProcessorWorker> worker_;
>>   };
>>   
>>   #endif /* __ANDROID_CAMERA_STREAM__ */
Umang Jain Oct. 21, 2021, 7:37 a.m. UTC | #3
Hello,

On 10/20/21 4:12 PM, Umang Jain wrote:
> Introduce a dedicated worker class derived from libcamera::Thread.
> The worker class maintains a queue for post-processing requests
> and waits for a post-processing request to become available.
> It will process them as per FIFO before de-queuing it from the
> queue.
>
> To get access to the source and destination buffers in the worker
> thread, we also need to save a pointer to them in the
> Camera3RequestDescriptor.
>
> This patch also implements a flush() for the PostProcessorWorker
> class which is responsible to purge post-processing requests
> queued up while a camera is stopping/flushing. It is hooked with
> CameraStream::flush(), which isn't used currently but will be
> used when we handle flush/stop scnearios in greater detail
> subsequently (in a different patchset).
>
> The libcamera request completion handler CameraDevice::requestComplete()
> assumes that the request that has just completed is at the front of the
> queue. Now that the post-processor runs asynchronously, this isn't true
> anymore, a request being post-processed will stay in the queue and a new
> libcamera request may complete. Remove that assumption, and use the
> request cookie to obtain the Camera3RequestDescriptor.
>
> Signed-off-by: Umang Jain <umang.jain@ideasonboard.com>
> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> ---
>   src/android/camera_device.cpp |  46 +++++----------
>   src/android/camera_request.h  |   4 ++
>   src/android/camera_stream.cpp | 107 +++++++++++++++++++++++++++++++---
>   src/android/camera_stream.h   |  38 +++++++++++-
>   4 files changed, 156 insertions(+), 39 deletions(-)
>
> diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
> index 541c2c81..b14416ce 100644
> --- a/src/android/camera_device.cpp
> +++ b/src/android/camera_device.cpp
> @@ -1020,29 +1020,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
>   
>   void CameraDevice::requestComplete(Request *request)
>   {
> -	Camera3RequestDescriptor *descriptor;
> -	{
> -		MutexLocker descriptorsLock(descriptorsMutex_);
> -		ASSERT(!descriptors_.empty());
> -		descriptor = descriptors_.front().get();
> -	}
> -
> -	if (descriptor->request_->cookie() != request->cookie()) {
> -		/*
> -		 * \todo Clarify if the Camera has to be closed on
> -		 * ERROR_DEVICE.
> -		 */
> -		LOG(HAL, Error)
> -			<< "Out-of-order completion for request "
> -			<< utils::hex(request->cookie());
> -
> -		MutexLocker descriptorsLock(descriptorsMutex_);
> -		descriptors_.pop();
> -
> -		notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
> -
> -		return;
> -	}
> +	Camera3RequestDescriptor *descriptor =
> +		reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
>   
>   	/*
>   	 * Prepare the capture result for the Android camera stack.
> @@ -1120,12 +1099,7 @@ void CameraDevice::requestComplete(Request *request)
>   	bool needsPostProcessing = false;
>   	Camera3RequestDescriptor::Status processingStatus =
>   		Camera3RequestDescriptor::Status::Pending;
> -	/*
> -	 * \todo Apply streamsProcessMutex_ when post-processing is adapted to run
> -	 * asynchronously. If we introduce the streamsProcessMutex_ right away, the
> -	 * lock will be held forever since it is synchronous at this point
> -	 * (see streamProcessingComplete).
> -	 */
> +
>   	for (auto &buffer : descriptor->buffers_) {
>   		CameraStream *stream = buffer.stream;
>   
> @@ -1145,7 +1119,13 @@ void CameraDevice::requestComplete(Request *request)
>   			buffer.internalBuffer = src;
>   
>   		needsPostProcessing = true;
> -		int ret = stream->process(*src, buffer, descriptor);
> +
> +		int ret;
> +		{
> +			MutexLocker locker(descriptor->streamsProcessMutex_);
> +			ret = stream->process(*src, buffer, descriptor);
> +		}
> +
>   		if (ret) {
>   			setBufferStatus(stream, buffer, descriptor,
>   					Camera3RequestDescriptor::Status::Error);
> @@ -1156,6 +1136,12 @@ void CameraDevice::requestComplete(Request *request)
>   	if (needsPostProcessing) {
>   		if (processingStatus == Camera3RequestDescriptor::Status::Error) {
>   			descriptor->status_ = processingStatus;
> +			/*
> +			 * \todo This is problematic when some streams are
> +			 * queued successfully, but some fail to get queued.
> +			 * We might end up with use-after-free situation for
> +			 * descriptor in streamProcessingComplete().
> +			 */
>   			sendCaptureResults();
>   		}
>   
> diff --git a/src/android/camera_request.h b/src/android/camera_request.h
> index 3a2774e0..85274414 100644
> --- a/src/android/camera_request.h
> +++ b/src/android/camera_request.h
> @@ -18,6 +18,7 @@
>   
>   #include <hardware/camera3.h>
>   
> +#include "camera_buffer.h"
>   #include "camera_metadata.h"
>   #include "camera_worker.h"
>   
> @@ -39,6 +40,9 @@ public:
>   		int fence;
>   		Status status;
>   		libcamera::FrameBuffer *internalBuffer = nullptr;
> +		std::unique_ptr<CameraBuffer> destBuffer;
> +		const libcamera::FrameBuffer *srcBuffer;
> +		Camera3RequestDescriptor *request = nullptr;
>   	};
>   
>   	Camera3RequestDescriptor(libcamera::Camera *camera,
> diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
> index b3cb06a2..a29ce33b 100644
> --- a/src/android/camera_stream.cpp
> +++ b/src/android/camera_stream.cpp
> @@ -99,6 +99,7 @@ int CameraStream::configure()
>   		if (ret)
>   			return ret;
>   
> +		worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
>   		postProcessor_->processComplete.connect(
>   			this, [&](Camera3RequestDescriptor *request, PostProcessor::Status status) {
>   				Camera3RequestDescriptor::Status bufferStatus =
> @@ -110,6 +111,7 @@ int CameraStream::configure()
>   				cameraDevice_->streamProcessingComplete(this, request,
>   									bufferStatus);
>   			});
> +		worker_->start();
>   	}
>   
>   	if (type_ == Type::Internal) {
> @@ -179,23 +181,31 @@ int CameraStream::process(const FrameBuffer &source,
>   	if (!postProcessor_)
>   		return 0;
>   
> -	/*
> -	 * \todo Buffer mapping and processing should be moved to a
> -	 * separate thread.
> -	 */
>   	const StreamConfiguration &output = configuration();
> -	CameraBuffer destBuffer(*dest.camera3Buffer, output.pixelFormat,
> -				output.size, PROT_READ | PROT_WRITE);
> -	if (!destBuffer.isValid()) {
> +	dest.destBuffer = std::make_unique<CameraBuffer>(
> +		*dest.camera3Buffer, output.pixelFormat, output.size, PROT_READ | PROT_WRITE);
> +	if (!dest.destBuffer->isValid()) {
>   		LOG(HAL, Error) << "Failed to create destination buffer";
>   		return -EINVAL;
>   	}
>   
> -	postProcessor_->process(source, &destBuffer, request);
> +	dest.srcBuffer = &source;
> +	dest.request = request;
> +
> +	/* Push the postProcessor request to the worker queue. */
> +	worker_->queueRequest(&dest);
>   
>   	return 0;
>   }
>   
> +void CameraStream::flush()
> +{
> +	if (!postProcessor_)
> +		return;
> +
> +	worker_->flush();
> +}
> +
>   FrameBuffer *CameraStream::getBuffer()
>   {
>   	if (!allocator_)
> @@ -223,3 +233,84 @@ void CameraStream::putBuffer(FrameBuffer *buffer)
>   
>   	buffers_.push_back(buffer);
>   }
> +
> +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
> +	: postProcessor_(postProcessor)
> +{
> +}
> +
> +CameraStream::PostProcessorWorker::~PostProcessorWorker()
> +{
> +	{
> +		libcamera::MutexLocker lock(mutex_);
> +		state_ = State::Stopped;
> +	}
> +
> +	cv_.notify_one();
> +	wait();
> +}
> +
> +void CameraStream::PostProcessorWorker::start()
> +{
> +	{
> +		libcamera::MutexLocker lock(mutex_);
> +		state_ = State::Running;
> +	}
> +
> +	Thread::start();
> +}
> +
> +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest)
> +{
> +	{
> +		MutexLocker lock(mutex_);
> +		ASSERT(state_ == State::Running);
> +		requests_.push(dest);
> +	}
> +
> +	cv_.notify_one();
> +}
> +
> +void CameraStream::PostProcessorWorker::run()
> +{
> +	MutexLocker locker(mutex_);
> +
> +	while (1) {
> +		cv_.wait(locker, [&] {
> +			return state_ != State::Running || !requests_.empty();
> +		});
> +
> +		if (state_ != State::Running)
> +			break;
> +
> +		Camera3RequestDescriptor::StreamBuffer *stream = requests_.front();
> +		requests_.pop();
> +		locker.unlock();
> +
> +		postProcessor_->process(*stream->srcBuffer, stream->destBuffer.get(),
> +					stream->request);


As I have received in reviews, processComplete should emit a 
Camera3RequestDescriptor::StreamBuffer pointer, instead of having 
emitted currently a pointer to Camera3RequestDescriptor

Now looking postProcessor_->process() call above, I think I should 
further migrate its signature to:

     PostProcessor::process(Camera3RequestDescriptor::StreamBuffer *stream)

and all the three elements can be accessed from there.

Laurent, do you agree with the design decision?

> +
> +		locker.lock();
> +	}
> +
> +	if (state_ == State::Flushing) {
> +		while (!requests_.empty()) {
> +			postProcessor_->processComplete.emit(
> +				requests_.front()->request,
> +				PostProcessor::Status::Error);
> +			requests_.pop();
> +		}
> +
> +		state_ = State::Stopped;
> +		locker.unlock();
> +	}
> +}
> +
> +void CameraStream::PostProcessorWorker::flush()
> +{
> +	libcamera::MutexLocker lock(mutex_);
> +	state_ = State::Flushing;
> +	lock.unlock();
> +
> +	cv_.notify_one();
> +}
> diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
> index f242336e..64e32c77 100644
> --- a/src/android/camera_stream.h
> +++ b/src/android/camera_stream.h
> @@ -7,12 +7,16 @@
>   #ifndef __ANDROID_CAMERA_STREAM_H__
>   #define __ANDROID_CAMERA_STREAM_H__
>   
> +#include <condition_variable>
>   #include <memory>
>   #include <mutex>
> +#include <queue>
>   #include <vector>
>   
>   #include <hardware/camera3.h>
>   
> +#include <libcamera/base/thread.h>
> +
>   #include <libcamera/camera.h>
>   #include <libcamera/framebuffer.h>
>   #include <libcamera/framebuffer_allocator.h>
> @@ -20,9 +24,9 @@
>   #include <libcamera/pixel_format.h>
>   
>   #include "camera_request.h"
> +#include "post_processor.h"
>   
>   class CameraDevice;
> -class PostProcessor;
>   
>   class CameraStream
>   {
> @@ -126,8 +130,38 @@ public:
>   		    Camera3RequestDescriptor *request);
>   	libcamera::FrameBuffer *getBuffer();
>   	void putBuffer(libcamera::FrameBuffer *buffer);
> +	void flush();
>   
>   private:
> +	class PostProcessorWorker : public libcamera::Thread
> +	{
> +	public:
> +		enum class State {
> +			Stopped,
> +			Running,
> +			Flushing,
> +		};
> +
> +		PostProcessorWorker(PostProcessor *postProcessor);
> +		~PostProcessorWorker();
> +
> +		void start();
> +		void queueRequest(Camera3RequestDescriptor::StreamBuffer *request);
> +		void flush();
> +
> +	protected:
> +		void run() override;
> +
> +	private:
> +		PostProcessor *postProcessor_;
> +
> +		libcamera::Mutex mutex_;
> +		std::condition_variable cv_;
> +
> +		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_;
> +		State state_;
> +	};
> +
>   	int waitFence(int fence);
>   
>   	CameraDevice *const cameraDevice_;
> @@ -144,6 +178,8 @@ private:
>   	 */
>   	std::unique_ptr<std::mutex> mutex_;
>   	std::unique_ptr<PostProcessor> postProcessor_;
> +
> +	std::unique_ptr<PostProcessorWorker> worker_;
>   };
>   
>   #endif /* __ANDROID_CAMERA_STREAM__ */
Laurent Pinchart Oct. 21, 2021, 1:10 p.m. UTC | #4
Hi Umang,

On Thu, Oct 21, 2021 at 01:07:50PM +0530, Umang Jain wrote:
> On 10/20/21 4:12 PM, Umang Jain wrote:
> > Introduce a dedicated worker class derived from libcamera::Thread.
> > The worker class maintains a queue for post-processing requests
> > and waits for a post-processing request to become available.
> > It will process them as per FIFO before de-queuing it from the
> > queue.
> >
> > To get access to the source and destination buffers in the worker
> > thread, we also need to save a pointer to them in the
> > Camera3RequestDescriptor.
> >
> > This patch also implements a flush() for the PostProcessorWorker
> > class which is responsible to purge post-processing requests
> > queued up while a camera is stopping/flushing. It is hooked with
> > CameraStream::flush(), which isn't used currently but will be
> > used when we handle flush/stop scnearios in greater detail
> > subsequently (in a different patchset).
> >
> > The libcamera request completion handler CameraDevice::requestComplete()
> > assumes that the request that has just completed is at the front of the
> > queue. Now that the post-processor runs asynchronously, this isn't true
> > anymore, a request being post-processed will stay in the queue and a new
> > libcamera request may complete. Remove that assumption, and use the
> > request cookie to obtain the Camera3RequestDescriptor.
> >
> > Signed-off-by: Umang Jain <umang.jain@ideasonboard.com>
> > Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> > ---
> >   src/android/camera_device.cpp |  46 +++++----------
> >   src/android/camera_request.h  |   4 ++
> >   src/android/camera_stream.cpp | 107 +++++++++++++++++++++++++++++++---
> >   src/android/camera_stream.h   |  38 +++++++++++-
> >   4 files changed, 156 insertions(+), 39 deletions(-)
> >
> > diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
> > index 541c2c81..b14416ce 100644
> > --- a/src/android/camera_device.cpp
> > +++ b/src/android/camera_device.cpp
> > @@ -1020,29 +1020,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
> >   
> >   void CameraDevice::requestComplete(Request *request)
> >   {
> > -	Camera3RequestDescriptor *descriptor;
> > -	{
> > -		MutexLocker descriptorsLock(descriptorsMutex_);
> > -		ASSERT(!descriptors_.empty());
> > -		descriptor = descriptors_.front().get();
> > -	}
> > -
> > -	if (descriptor->request_->cookie() != request->cookie()) {
> > -		/*
> > -		 * \todo Clarify if the Camera has to be closed on
> > -		 * ERROR_DEVICE.
> > -		 */
> > -		LOG(HAL, Error)
> > -			<< "Out-of-order completion for request "
> > -			<< utils::hex(request->cookie());
> > -
> > -		MutexLocker descriptorsLock(descriptorsMutex_);
> > -		descriptors_.pop();
> > -
> > -		notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
> > -
> > -		return;
> > -	}
> > +	Camera3RequestDescriptor *descriptor =
> > +		reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
> >   
> >   	/*
> >   	 * Prepare the capture result for the Android camera stack.
> > @@ -1120,12 +1099,7 @@ void CameraDevice::requestComplete(Request *request)
> >   	bool needsPostProcessing = false;
> >   	Camera3RequestDescriptor::Status processingStatus =
> >   		Camera3RequestDescriptor::Status::Pending;
> > -	/*
> > -	 * \todo Apply streamsProcessMutex_ when post-processing is adapted to run
> > -	 * asynchronously. If we introduce the streamsProcessMutex_ right away, the
> > -	 * lock will be held forever since it is synchronous at this point
> > -	 * (see streamProcessingComplete).
> > -	 */
> > +
> >   	for (auto &buffer : descriptor->buffers_) {
> >   		CameraStream *stream = buffer.stream;
> >   
> > @@ -1145,7 +1119,13 @@ void CameraDevice::requestComplete(Request *request)
> >   			buffer.internalBuffer = src;
> >   
> >   		needsPostProcessing = true;
> > -		int ret = stream->process(*src, buffer, descriptor);
> > +
> > +		int ret;
> > +		{
> > +			MutexLocker locker(descriptor->streamsProcessMutex_);
> > +			ret = stream->process(*src, buffer, descriptor);
> > +		}
> > +
> >   		if (ret) {
> >   			setBufferStatus(stream, buffer, descriptor,
> >   					Camera3RequestDescriptor::Status::Error);
> > @@ -1156,6 +1136,12 @@ void CameraDevice::requestComplete(Request *request)
> >   	if (needsPostProcessing) {
> >   		if (processingStatus == Camera3RequestDescriptor::Status::Error) {
> >   			descriptor->status_ = processingStatus;
> > +			/*
> > +			 * \todo This is problematic when some streams are
> > +			 * queued successfully, but some fail to get queued.
> > +			 * We might end up with use-after-free situation for
> > +			 * descriptor in streamProcessingComplete().
> > +			 */
> >   			sendCaptureResults();
> >   		}
> >   
> > diff --git a/src/android/camera_request.h b/src/android/camera_request.h
> > index 3a2774e0..85274414 100644
> > --- a/src/android/camera_request.h
> > +++ b/src/android/camera_request.h
> > @@ -18,6 +18,7 @@
> >   
> >   #include <hardware/camera3.h>
> >   
> > +#include "camera_buffer.h"
> >   #include "camera_metadata.h"
> >   #include "camera_worker.h"
> >   
> > @@ -39,6 +40,9 @@ public:
> >   		int fence;
> >   		Status status;
> >   		libcamera::FrameBuffer *internalBuffer = nullptr;
> > +		std::unique_ptr<CameraBuffer> destBuffer;
> > +		const libcamera::FrameBuffer *srcBuffer;
> > +		Camera3RequestDescriptor *request = nullptr;
> >   	};
> >   
> >   	Camera3RequestDescriptor(libcamera::Camera *camera,
> > diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
> > index b3cb06a2..a29ce33b 100644
> > --- a/src/android/camera_stream.cpp
> > +++ b/src/android/camera_stream.cpp
> > @@ -99,6 +99,7 @@ int CameraStream::configure()
> >   		if (ret)
> >   			return ret;
> >   
> > +		worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
> >   		postProcessor_->processComplete.connect(
> >   			this, [&](Camera3RequestDescriptor *request, PostProcessor::Status status) {
> >   				Camera3RequestDescriptor::Status bufferStatus =
> > @@ -110,6 +111,7 @@ int CameraStream::configure()
> >   				cameraDevice_->streamProcessingComplete(this, request,
> >   									bufferStatus);
> >   			});
> > +		worker_->start();
> >   	}
> >   
> >   	if (type_ == Type::Internal) {
> > @@ -179,23 +181,31 @@ int CameraStream::process(const FrameBuffer &source,
> >   	if (!postProcessor_)
> >   		return 0;
> >   
> > -	/*
> > -	 * \todo Buffer mapping and processing should be moved to a
> > -	 * separate thread.
> > -	 */
> >   	const StreamConfiguration &output = configuration();
> > -	CameraBuffer destBuffer(*dest.camera3Buffer, output.pixelFormat,
> > -				output.size, PROT_READ | PROT_WRITE);
> > -	if (!destBuffer.isValid()) {
> > +	dest.destBuffer = std::make_unique<CameraBuffer>(
> > +		*dest.camera3Buffer, output.pixelFormat, output.size, PROT_READ | PROT_WRITE);
> > +	if (!dest.destBuffer->isValid()) {
> >   		LOG(HAL, Error) << "Failed to create destination buffer";
> >   		return -EINVAL;
> >   	}
> >   
> > -	postProcessor_->process(source, &destBuffer, request);
> > +	dest.srcBuffer = &source;
> > +	dest.request = request;
> > +
> > +	/* Push the postProcessor request to the worker queue. */
> > +	worker_->queueRequest(&dest);
> >   
> >   	return 0;
> >   }
> >   
> > +void CameraStream::flush()
> > +{
> > +	if (!postProcessor_)
> > +		return;
> > +
> > +	worker_->flush();
> > +}
> > +
> >   FrameBuffer *CameraStream::getBuffer()
> >   {
> >   	if (!allocator_)
> > @@ -223,3 +233,84 @@ void CameraStream::putBuffer(FrameBuffer *buffer)
> >   
> >   	buffers_.push_back(buffer);
> >   }
> > +
> > +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
> > +	: postProcessor_(postProcessor)
> > +{
> > +}
> > +
> > +CameraStream::PostProcessorWorker::~PostProcessorWorker()
> > +{
> > +	{
> > +		libcamera::MutexLocker lock(mutex_);
> > +		state_ = State::Stopped;
> > +	}
> > +
> > +	cv_.notify_one();
> > +	wait();
> > +}
> > +
> > +void CameraStream::PostProcessorWorker::start()
> > +{
> > +	{
> > +		libcamera::MutexLocker lock(mutex_);
> > +		state_ = State::Running;
> > +	}
> > +
> > +	Thread::start();
> > +}
> > +
> > +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest)
> > +{
> > +	{
> > +		MutexLocker lock(mutex_);
> > +		ASSERT(state_ == State::Running);
> > +		requests_.push(dest);
> > +	}
> > +
> > +	cv_.notify_one();
> > +}
> > +
> > +void CameraStream::PostProcessorWorker::run()
> > +{
> > +	MutexLocker locker(mutex_);
> > +
> > +	while (1) {
> > +		cv_.wait(locker, [&] {
> > +			return state_ != State::Running || !requests_.empty();
> > +		});
> > +
> > +		if (state_ != State::Running)
> > +			break;
> > +
> > +		Camera3RequestDescriptor::StreamBuffer *stream = requests_.front();
> > +		requests_.pop();
> > +		locker.unlock();
> > +
> > +		postProcessor_->process(*stream->srcBuffer, stream->destBuffer.get(),
> > +					stream->request);
> 
> As I have received in reviews, processComplete should emit a 
> Camera3RequestDescriptor::StreamBuffer pointer, instead of having 
> emitted currently a pointer to Camera3RequestDescriptor
> 
> Now looking postProcessor_->process() call above, I think I should 
> further migrate its signature to:
> 
>      PostProcessor::process(Camera3RequestDescriptor::StreamBuffer *stream)
> 
> and all the three elements can be accessed from there.
> 
> Laurent, do you agree with the design decision?

Yes, that looks good to me. Maybe the variable could also be renamed to
streamBuffer here, to avoid confusing it with a stream.

> > +
> > +		locker.lock();
> > +	}
> > +
> > +	if (state_ == State::Flushing) {
> > +		while (!requests_.empty()) {
> > +			postProcessor_->processComplete.emit(
> > +				requests_.front()->request,
> > +				PostProcessor::Status::Error);
> > +			requests_.pop();
> > +		}
> > +
> > +		state_ = State::Stopped;
> > +		locker.unlock();
> > +	}
> > +}
> > +
> > +void CameraStream::PostProcessorWorker::flush()
> > +{
> > +	libcamera::MutexLocker lock(mutex_);
> > +	state_ = State::Flushing;
> > +	lock.unlock();
> > +
> > +	cv_.notify_one();
> > +}
> > diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
> > index f242336e..64e32c77 100644
> > --- a/src/android/camera_stream.h
> > +++ b/src/android/camera_stream.h
> > @@ -7,12 +7,16 @@
> >   #ifndef __ANDROID_CAMERA_STREAM_H__
> >   #define __ANDROID_CAMERA_STREAM_H__
> >   
> > +#include <condition_variable>
> >   #include <memory>
> >   #include <mutex>
> > +#include <queue>
> >   #include <vector>
> >   
> >   #include <hardware/camera3.h>
> >   
> > +#include <libcamera/base/thread.h>
> > +
> >   #include <libcamera/camera.h>
> >   #include <libcamera/framebuffer.h>
> >   #include <libcamera/framebuffer_allocator.h>
> > @@ -20,9 +24,9 @@
> >   #include <libcamera/pixel_format.h>
> >   
> >   #include "camera_request.h"
> > +#include "post_processor.h"
> >   
> >   class CameraDevice;
> > -class PostProcessor;
> >   
> >   class CameraStream
> >   {
> > @@ -126,8 +130,38 @@ public:
> >   		    Camera3RequestDescriptor *request);
> >   	libcamera::FrameBuffer *getBuffer();
> >   	void putBuffer(libcamera::FrameBuffer *buffer);
> > +	void flush();
> >   
> >   private:
> > +	class PostProcessorWorker : public libcamera::Thread
> > +	{
> > +	public:
> > +		enum class State {
> > +			Stopped,
> > +			Running,
> > +			Flushing,
> > +		};
> > +
> > +		PostProcessorWorker(PostProcessor *postProcessor);
> > +		~PostProcessorWorker();
> > +
> > +		void start();
> > +		void queueRequest(Camera3RequestDescriptor::StreamBuffer *request);
> > +		void flush();
> > +
> > +	protected:
> > +		void run() override;
> > +
> > +	private:
> > +		PostProcessor *postProcessor_;
> > +
> > +		libcamera::Mutex mutex_;
> > +		std::condition_variable cv_;
> > +
> > +		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_;
> > +		State state_;
> > +	};
> > +
> >   	int waitFence(int fence);
> >   
> >   	CameraDevice *const cameraDevice_;
> > @@ -144,6 +178,8 @@ private:
> >   	 */
> >   	std::unique_ptr<std::mutex> mutex_;
> >   	std::unique_ptr<PostProcessor> postProcessor_;
> > +
> > +	std::unique_ptr<PostProcessorWorker> worker_;
> >   };
> >   
> >   #endif /* __ANDROID_CAMERA_STREAM__ */

Patch
diff mbox series

diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
index 541c2c81..b14416ce 100644
--- a/src/android/camera_device.cpp
+++ b/src/android/camera_device.cpp
@@ -1020,29 +1020,8 @@  int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
 
 void CameraDevice::requestComplete(Request *request)
 {
-	Camera3RequestDescriptor *descriptor;
-	{
-		MutexLocker descriptorsLock(descriptorsMutex_);
-		ASSERT(!descriptors_.empty());
-		descriptor = descriptors_.front().get();
-	}
-
-	if (descriptor->request_->cookie() != request->cookie()) {
-		/*
-		 * \todo Clarify if the Camera has to be closed on
-		 * ERROR_DEVICE.
-		 */
-		LOG(HAL, Error)
-			<< "Out-of-order completion for request "
-			<< utils::hex(request->cookie());
-
-		MutexLocker descriptorsLock(descriptorsMutex_);
-		descriptors_.pop();
-
-		notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
-
-		return;
-	}
+	Camera3RequestDescriptor *descriptor =
+		reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
 
 	/*
 	 * Prepare the capture result for the Android camera stack.
@@ -1120,12 +1099,7 @@  void CameraDevice::requestComplete(Request *request)
 	bool needsPostProcessing = false;
 	Camera3RequestDescriptor::Status processingStatus =
 		Camera3RequestDescriptor::Status::Pending;
-	/*
-	 * \todo Apply streamsProcessMutex_ when post-processing is adapted to run
-	 * asynchronously. If we introduce the streamsProcessMutex_ right away, the
-	 * lock will be held forever since it is synchronous at this point
-	 * (see streamProcessingComplete).
-	 */
+
 	for (auto &buffer : descriptor->buffers_) {
 		CameraStream *stream = buffer.stream;
 
@@ -1145,7 +1119,13 @@  void CameraDevice::requestComplete(Request *request)
 			buffer.internalBuffer = src;
 
 		needsPostProcessing = true;
-		int ret = stream->process(*src, buffer, descriptor);
+
+		int ret;
+		{
+			MutexLocker locker(descriptor->streamsProcessMutex_);
+			ret = stream->process(*src, buffer, descriptor);
+		}
+
 		if (ret) {
 			setBufferStatus(stream, buffer, descriptor,
 					Camera3RequestDescriptor::Status::Error);
@@ -1156,6 +1136,12 @@  void CameraDevice::requestComplete(Request *request)
 	if (needsPostProcessing) {
 		if (processingStatus == Camera3RequestDescriptor::Status::Error) {
 			descriptor->status_ = processingStatus;
+			/*
+			 * \todo This is problematic when some streams are
+			 * queued successfully, but some fail to get queued.
+			 * We might end up with use-after-free situation for
+			 * descriptor in streamProcessingComplete().
+			 */
 			sendCaptureResults();
 		}
 
diff --git a/src/android/camera_request.h b/src/android/camera_request.h
index 3a2774e0..85274414 100644
--- a/src/android/camera_request.h
+++ b/src/android/camera_request.h
@@ -18,6 +18,7 @@ 
 
 #include <hardware/camera3.h>
 
+#include "camera_buffer.h"
 #include "camera_metadata.h"
 #include "camera_worker.h"
 
@@ -39,6 +40,9 @@  public:
 		int fence;
 		Status status;
 		libcamera::FrameBuffer *internalBuffer = nullptr;
+		std::unique_ptr<CameraBuffer> destBuffer;
+		const libcamera::FrameBuffer *srcBuffer;
+		Camera3RequestDescriptor *request = nullptr;
 	};
 
 	Camera3RequestDescriptor(libcamera::Camera *camera,
diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
index b3cb06a2..a29ce33b 100644
--- a/src/android/camera_stream.cpp
+++ b/src/android/camera_stream.cpp
@@ -99,6 +99,7 @@  int CameraStream::configure()
 		if (ret)
 			return ret;
 
+		worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
 		postProcessor_->processComplete.connect(
 			this, [&](Camera3RequestDescriptor *request, PostProcessor::Status status) {
 				Camera3RequestDescriptor::Status bufferStatus =
@@ -110,6 +111,7 @@  int CameraStream::configure()
 				cameraDevice_->streamProcessingComplete(this, request,
 									bufferStatus);
 			});
+		worker_->start();
 	}
 
 	if (type_ == Type::Internal) {
@@ -179,23 +181,31 @@  int CameraStream::process(const FrameBuffer &source,
 	if (!postProcessor_)
 		return 0;
 
-	/*
-	 * \todo Buffer mapping and processing should be moved to a
-	 * separate thread.
-	 */
 	const StreamConfiguration &output = configuration();
-	CameraBuffer destBuffer(*dest.camera3Buffer, output.pixelFormat,
-				output.size, PROT_READ | PROT_WRITE);
-	if (!destBuffer.isValid()) {
+	dest.destBuffer = std::make_unique<CameraBuffer>(
+		*dest.camera3Buffer, output.pixelFormat, output.size, PROT_READ | PROT_WRITE);
+	if (!dest.destBuffer->isValid()) {
 		LOG(HAL, Error) << "Failed to create destination buffer";
 		return -EINVAL;
 	}
 
-	postProcessor_->process(source, &destBuffer, request);
+	dest.srcBuffer = &source;
+	dest.request = request;
+
+	/* Push the postProcessor request to the worker queue. */
+	worker_->queueRequest(&dest);
 
 	return 0;
 }
 
+void CameraStream::flush()
+{
+	if (!postProcessor_)
+		return;
+
+	worker_->flush();
+}
+
 FrameBuffer *CameraStream::getBuffer()
 {
 	if (!allocator_)
@@ -223,3 +233,84 @@  void CameraStream::putBuffer(FrameBuffer *buffer)
 
 	buffers_.push_back(buffer);
 }
+
+CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
+	: postProcessor_(postProcessor)
+{
+}
+
+CameraStream::PostProcessorWorker::~PostProcessorWorker()
+{
+	{
+		libcamera::MutexLocker lock(mutex_);
+		state_ = State::Stopped;
+	}
+
+	cv_.notify_one();
+	wait();
+}
+
+void CameraStream::PostProcessorWorker::start()
+{
+	{
+		libcamera::MutexLocker lock(mutex_);
+		state_ = State::Running;
+	}
+
+	Thread::start();
+}
+
+void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest)
+{
+	{
+		MutexLocker lock(mutex_);
+		ASSERT(state_ == State::Running);
+		requests_.push(dest);
+	}
+
+	cv_.notify_one();
+}
+
+void CameraStream::PostProcessorWorker::run()
+{
+	MutexLocker locker(mutex_);
+
+	while (1) {
+		cv_.wait(locker, [&] {
+			return state_ != State::Running || !requests_.empty();
+		});
+
+		if (state_ != State::Running)
+			break;
+
+		Camera3RequestDescriptor::StreamBuffer *stream = requests_.front();
+		requests_.pop();
+		locker.unlock();
+
+		postProcessor_->process(*stream->srcBuffer, stream->destBuffer.get(),
+					stream->request);
+
+		locker.lock();
+	}
+
+	if (state_ == State::Flushing) {
+		while (!requests_.empty()) {
+			postProcessor_->processComplete.emit(
+				requests_.front()->request,
+				PostProcessor::Status::Error);
+			requests_.pop();
+		}
+
+		state_ = State::Stopped;
+		locker.unlock();
+	}
+}
+
+void CameraStream::PostProcessorWorker::flush()
+{
+	libcamera::MutexLocker lock(mutex_);
+	state_ = State::Flushing;
+	lock.unlock();
+
+	cv_.notify_one();
+}
diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
index f242336e..64e32c77 100644
--- a/src/android/camera_stream.h
+++ b/src/android/camera_stream.h
@@ -7,12 +7,16 @@ 
 #ifndef __ANDROID_CAMERA_STREAM_H__
 #define __ANDROID_CAMERA_STREAM_H__
 
+#include <condition_variable>
 #include <memory>
 #include <mutex>
+#include <queue>
 #include <vector>
 
 #include <hardware/camera3.h>
 
+#include <libcamera/base/thread.h>
+
 #include <libcamera/camera.h>
 #include <libcamera/framebuffer.h>
 #include <libcamera/framebuffer_allocator.h>
@@ -20,9 +24,9 @@ 
 #include <libcamera/pixel_format.h>
 
 #include "camera_request.h"
+#include "post_processor.h"
 
 class CameraDevice;
-class PostProcessor;
 
 class CameraStream
 {
@@ -126,8 +130,38 @@  public:
 		    Camera3RequestDescriptor *request);
 	libcamera::FrameBuffer *getBuffer();
 	void putBuffer(libcamera::FrameBuffer *buffer);
+	void flush();
 
 private:
+	class PostProcessorWorker : public libcamera::Thread
+	{
+	public:
+		enum class State {
+			Stopped,
+			Running,
+			Flushing,
+		};
+
+		PostProcessorWorker(PostProcessor *postProcessor);
+		~PostProcessorWorker();
+
+		void start();
+		void queueRequest(Camera3RequestDescriptor::StreamBuffer *request);
+		void flush();
+
+	protected:
+		void run() override;
+
+	private:
+		PostProcessor *postProcessor_;
+
+		libcamera::Mutex mutex_;
+		std::condition_variable cv_;
+
+		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_;
+		State state_;
+	};
+
 	int waitFence(int fence);
 
 	CameraDevice *const cameraDevice_;
@@ -144,6 +178,8 @@  private:
 	 */
 	std::unique_ptr<std::mutex> mutex_;
 	std::unique_ptr<PostProcessor> postProcessor_;
+
+	std::unique_ptr<PostProcessorWorker> worker_;
 };
 
 #endif /* __ANDROID_CAMERA_STREAM__ */