[libcamera-devel,v6,7/7] android: post_processor: Make post processing async
diff mbox series

Message ID 20211023103302.152769-8-umang.jain@ideasonboard.com
State Superseded
Headers show
Series
  • Async Post Processor
Related show

Commit Message

Umang Jain Oct. 23, 2021, 10:33 a.m. UTC
Introduce a dedicated worker class derived from libcamera::Thread.
The worker class maintains a queue for post-processing requests
and waits for a post-processing request to become available.
It will process them as per FIFO before de-queuing it from the
queue.

The post processing streams loop in CameraDevice::requestComplete()
is tweaked a bit to better suit asynchronous needs of post processing.
Also, the entire iteration is locked under streamProcessMutex_ which
helps us to queue all the post-processing request at once, before any
of the post-processing completion slot (streamProcessingComplete())
is allowed to run for post-processing requests completing in parallel.
This helps us to manage both synchronous and asynchronous errors
encountered during the entire post processing operation.

This patch also implements a flush() for the PostProcessorWorker
class which is responsible to purge post-processing requests
queued up while a camera is stopping/flushing. It is hooked with
CameraStream::flush(), which isn't used currently but will be
used when we handle flush/stop scenarios in greater detail
subsequently (in a different patchset).

The libcamera request completion handler CameraDevice::requestComplete()
assumes that the request that has just completed is at the front of the
queue. Now that the post-processor runs asynchronously, this isn't true
anymore, a request being post-processed will stay in the queue and a new
libcamera request may complete. Remove that assumption, and use the
request cookie to obtain the Camera3RequestDescriptor.

Signed-off-by: Umang Jain <umang.jain@ideasonboard.com>
Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
---
 src/android/camera_device.cpp |  49 +++++------------
 src/android/camera_stream.cpp | 101 ++++++++++++++++++++++++++++++++--
 src/android/camera_stream.h   |  38 ++++++++++++-
 3 files changed, 148 insertions(+), 40 deletions(-)

Comments

Hirokazu Honda Oct. 25, 2021, 12:29 p.m. UTC | #1
Hi Umang, thank you for the patch.

On Sat, Oct 23, 2021 at 7:33 PM Umang Jain <umang.jain@ideasonboard.com> wrote:
>
> Introduce a dedicated worker class derived from libcamera::Thread.
> The worker class maintains a queue for post-processing requests
> and waits for a post-processing request to become available.
> It will process them as per FIFO before de-queuing it from the
> queue.
>
> The post processing streams loop in CameraDevice::requestComplete()
> is tweaked a bit to better suit asynchronous needs of post processing.
> Also, the entire iteration is locked under streamProcessMutex_ which
> helps us to queue all the post-processing request at once, before any
> of the post-processing completion slot (streamProcessingComplete())
> is allowed to run for post-processing requests completing in parallel.
> This helps us to manage both synchronous and asynchronous errors
> encountered during the entire post processing operation.
>
> This patch also implements a flush() for the PostProcessorWorker
> class which is responsible to purge post-processing requests
> queued up while a camera is stopping/flushing. It is hooked with
> CameraStream::flush(), which isn't used currently but will be
> used when we handle flush/stop scenarios in greater detail
> subsequently (in a different patchset).
>
> The libcamera request completion handler CameraDevice::requestComplete()
> assumes that the request that has just completed is at the front of the
> queue. Now that the post-processor runs asynchronously, this isn't true
> anymore, a request being post-processed will stay in the queue and a new
> libcamera request may complete. Remove that assumption, and use the
> request cookie to obtain the Camera3RequestDescriptor.
>
> Signed-off-by: Umang Jain <umang.jain@ideasonboard.com>
> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> ---
>  src/android/camera_device.cpp |  49 +++++------------
>  src/android/camera_stream.cpp | 101 ++++++++++++++++++++++++++++++++--
>  src/android/camera_stream.h   |  38 ++++++++++++-
>  3 files changed, 148 insertions(+), 40 deletions(-)
>
> diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
> index 3114def0..dc39467b 100644
> --- a/src/android/camera_device.cpp
> +++ b/src/android/camera_device.cpp
> @@ -1026,29 +1026,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
>
>  void CameraDevice::requestComplete(Request *request)
>  {
> -       Camera3RequestDescriptor *descriptor;
> -       {
> -               MutexLocker descriptorsLock(descriptorsMutex_);
> -               ASSERT(!descriptors_.empty());
> -               descriptor = descriptors_.front().get();
> -       }
> -
> -       if (descriptor->request_->cookie() != request->cookie()) {
> -               /*
> -                * \todo Clarify if the Camera has to be closed on
> -                * ERROR_DEVICE.
> -                */
> -               LOG(HAL, Error)
> -                       << "Out-of-order completion for request "
> -                       << utils::hex(request->cookie());
> -
> -               MutexLocker descriptorsLock(descriptorsMutex_);
> -               descriptors_.pop();
> -
> -               notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
> -
> -               return;
> -       }
> +       Camera3RequestDescriptor *descriptor =
> +               reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
>
>         /*
>          * Prepare the capture result for the Android camera stack.
> @@ -1124,12 +1103,16 @@ void CameraDevice::requestComplete(Request *request)
>
>         /* Handle post-processing. */
>         bool needPostProcessing = false;
> +       MutexLocker locker(descriptor->streamsProcessMutex_);
> +
>         /*
> -        * \todo Protect the loop below with streamProcessMutex_ when post
> -        * processor runs asynchronously.
> +        * Queue all the post-processing streams request at once. The completion
> +        * slot streamProcessingComplete() can only execute when we are out
> +        * this critical section. This helps to handle synchronous errors here
> +        * itself.
>          */
>         auto iter = descriptor->pendingStreamsToProcess_.begin();
> -       while (descriptor->pendingStreamsToProcess_.size() > 0) {
> +       while (iter != descriptor->pendingStreamsToProcess_.end()) {
>                 CameraStream *stream = iter->first;
>                 Camera3RequestDescriptor::StreamBuffer *buffer = iter->second;
>                 needPostProcessing = true;
> @@ -1151,18 +1134,16 @@ void CameraDevice::requestComplete(Request *request)
>         }
>
>         if (needPostProcessing) {
> -               /*
> -                * \todo We will require to check if we failed to queue
> -                * post-processing requests when we migrate to post-processor
> -                * running asynchronously.
> -                *
> -                * if (descriptor->pendingStreamsToProcess_.size() == 0)
> -                *      completeDescriptor(descriptor);
> -                */
> +               if (descriptor->pendingStreamsToProcess_.size() == 0) {
> +                       locker.unlock();
> +                       completeDescriptor(descriptor);
> +               }

As Laurent suggested in other patch, I think the code would be cleaner
if we remove needPostProcessing.

>
>                 return;
>         }
>
> +       locker.unlock();
> +
>         completeDescriptor(descriptor);
>  }
>
> diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
> index 45d0607d..68b916e9 100644
> --- a/src/android/camera_stream.cpp
> +++ b/src/android/camera_stream.cpp
> @@ -99,6 +99,7 @@ int CameraStream::configure()
>                 if (ret)
>                         return ret;
>
> +               worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
>                 postProcessor_->processComplete.connect(
>                         this, [&](Camera3RequestDescriptor::StreamBuffer *streamBuffer,
>                                   PostProcessor::Status status) {
> @@ -112,6 +113,8 @@ int CameraStream::configure()
>                                 cameraDevice_->streamProcessingComplete(streamBuffer,
>                                                                         bufferStatus);
>                         });
> +
> +               worker_->start();
>         }
>
>         if (type_ == Type::Internal) {
> @@ -179,10 +182,6 @@ int CameraStream::process(const FrameBuffer &source,
>
>         ASSERT(type_ != Type::Direct);
>
> -       /*
> -        * \todo Buffer mapping and processing should be moved to a
> -        * separate thread.
> -        */
>         const StreamConfiguration &output = configuration();
>         dest.destBuffer = std::make_unique<CameraBuffer>(
>                 *dest.camera3Buffer, output.pixelFormat, output.size,
> @@ -194,11 +193,19 @@ int CameraStream::process(const FrameBuffer &source,
>
>         dest.srcBuffer = &source;
>
> -       postProcessor_->process(&dest);
> +       worker_->queueRequest(&dest);
>
>         return 0;
>  }
>
> +void CameraStream::flush()
> +{
> +       if (!postProcessor_)
> +               return;
> +
> +       worker_->flush();
> +}
> +
>  FrameBuffer *CameraStream::getBuffer()
>  {
>         if (!allocator_)
> @@ -226,3 +233,87 @@ void CameraStream::putBuffer(FrameBuffer *buffer)
>
>         buffers_.push_back(buffer);
>  }
> +
> +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
> +       : postProcessor_(postProcessor)
> +{
> +}
> +
> +CameraStream::PostProcessorWorker::~PostProcessorWorker()
> +{
> +       {
> +               libcamera::MutexLocker lock(mutex_);
> +               state_ = State::Stopped;
> +       }
> +
> +       cv_.notify_one();
> +       wait();
> +}
> +
> +void CameraStream::PostProcessorWorker::start()
> +{
> +       {
> +               libcamera::MutexLocker lock(mutex_);
> +               ASSERT(state_ != State::Running);
> +               state_ = State::Running;
> +       }
> +
> +       Thread::start();
> +}
> +
> +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest)
> +{
> +       {
> +               MutexLocker lock(mutex_);
> +               ASSERT(state_ == State::Running);
> +               requests_.push(dest);
> +       }
> +
> +       cv_.notify_one();
> +}
> +
> +void CameraStream::PostProcessorWorker::run()
> +{
> +       MutexLocker locker(mutex_);
> +
> +       while (1) {
> +               cv_.wait(locker, [&] {
> +                       return state_ != State::Running || !requests_.empty();
> +               });
> +
> +               if (state_ != State::Running)
> +                       break;
> +
> +               Camera3RequestDescriptor::StreamBuffer *streamBuffer = requests_.front();
> +               requests_.pop();
> +               locker.unlock();
> +
> +               postProcessor_->process(streamBuffer);
> +
> +               locker.lock();
> +       }
> +
> +       if (state_ == State::Flushing) {
> +               std::queue<Camera3RequestDescriptor::StreamBuffer *> requests =
> +                       std::move(requests_);

Why is moving requests_ to a local variable necessary?

Reviewed-by: Hirokazu Honda<hiroh@chromium.org>
> +               locker.unlock();
> +
> +               while (!requests.empty()) {
> +                       postProcessor_->processComplete.emit(
> +                               requests.front(), PostProcessor::Status::Error);
> +                       requests.pop();
> +               }
> +
> +               locker.lock();
> +               state_ = State::Stopped;
> +       }
> +}
> +
> +void CameraStream::PostProcessorWorker::flush()
> +{
> +       libcamera::MutexLocker lock(mutex_);
> +       state_ = State::Flushing;
> +       lock.unlock();
> +
> +       cv_.notify_one();
> +}
> diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
> index e9c320b1..7c6e887c 100644
> --- a/src/android/camera_stream.h
> +++ b/src/android/camera_stream.h
> @@ -7,12 +7,16 @@
>  #ifndef __ANDROID_CAMERA_STREAM_H__
>  #define __ANDROID_CAMERA_STREAM_H__
>
> +#include <condition_variable>
>  #include <memory>
>  #include <mutex>
> +#include <queue>
>  #include <vector>
>
>  #include <hardware/camera3.h>
>
> +#include <libcamera/base/thread.h>
> +
>  #include <libcamera/camera.h>
>  #include <libcamera/framebuffer.h>
>  #include <libcamera/framebuffer_allocator.h>
> @@ -20,9 +24,9 @@
>  #include <libcamera/pixel_format.h>
>
>  #include "camera_request.h"
> +#include "post_processor.h"
>
>  class CameraDevice;
> -class PostProcessor;
>
>  class CameraStream
>  {
> @@ -125,8 +129,38 @@ public:
>                     Camera3RequestDescriptor::StreamBuffer &dest);
>         libcamera::FrameBuffer *getBuffer();
>         void putBuffer(libcamera::FrameBuffer *buffer);
> +       void flush();
>
>  private:
> +       class PostProcessorWorker : public libcamera::Thread
> +       {
> +       public:
> +               enum class State {
> +                       Stopped,
> +                       Running,
> +                       Flushing,
> +               };
> +
> +               PostProcessorWorker(PostProcessor *postProcessor);
> +               ~PostProcessorWorker();
> +
> +               void start();
> +               void queueRequest(Camera3RequestDescriptor::StreamBuffer *request);
> +               void flush();
> +
> +       protected:
> +               void run() override;
> +
> +       private:
> +               PostProcessor *postProcessor_;
> +
> +               libcamera::Mutex mutex_;
> +               std::condition_variable cv_;
> +
> +               std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_;
> +               State state_ = State::Stopped;
> +       };
> +
>         int waitFence(int fence);
>
>         CameraDevice *const cameraDevice_;
> @@ -143,6 +177,8 @@ private:
>          */
>         std::unique_ptr<std::mutex> mutex_;
>         std::unique_ptr<PostProcessor> postProcessor_;
> +
> +       std::unique_ptr<PostProcessorWorker> worker_;
>  };
>
>  #endif /* __ANDROID_CAMERA_STREAM__ */
> --
> 2.31.1
>
Umang Jain Oct. 25, 2021, 1:48 p.m. UTC | #2
Hi Hiro,

On 10/25/21 5:59 PM, Hirokazu Honda wrote:
> Hi Umang, thank you for the patch.
>
> On Sat, Oct 23, 2021 at 7:33 PM Umang Jain <umang.jain@ideasonboard.com> wrote:
>> Introduce a dedicated worker class derived from libcamera::Thread.
>> The worker class maintains a queue for post-processing requests
>> and waits for a post-processing request to become available.
>> It will process them as per FIFO before de-queuing it from the
>> queue.
>>
>> The post processing streams loop in CameraDevice::requestComplete()
>> is tweaked a bit to better suit asynchronous needs of post processing.
>> Also, the entire iteration is locked under streamProcessMutex_ which
>> helps us to queue all the post-processing request at once, before any
>> of the post-processing completion slot (streamProcessingComplete())
>> is allowed to run for post-processing requests completing in parallel.
>> This helps us to manage both synchronous and asynchronous errors
>> encountered during the entire post processing operation.
>>
>> This patch also implements a flush() for the PostProcessorWorker
>> class which is responsible to purge post-processing requests
>> queued up while a camera is stopping/flushing. It is hooked with
>> CameraStream::flush(), which isn't used currently but will be
>> used when we handle flush/stop scenarios in greater detail
>> subsequently (in a different patchset).
>>
>> The libcamera request completion handler CameraDevice::requestComplete()
>> assumes that the request that has just completed is at the front of the
>> queue. Now that the post-processor runs asynchronously, this isn't true
>> anymore, a request being post-processed will stay in the queue and a new
>> libcamera request may complete. Remove that assumption, and use the
>> request cookie to obtain the Camera3RequestDescriptor.
>>
>> Signed-off-by: Umang Jain <umang.jain@ideasonboard.com>
>> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
>> ---
>>   src/android/camera_device.cpp |  49 +++++------------
>>   src/android/camera_stream.cpp | 101 ++++++++++++++++++++++++++++++++--
>>   src/android/camera_stream.h   |  38 ++++++++++++-
>>   3 files changed, 148 insertions(+), 40 deletions(-)
>>
>> diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
>> index 3114def0..dc39467b 100644
>> --- a/src/android/camera_device.cpp
>> +++ b/src/android/camera_device.cpp
>> @@ -1026,29 +1026,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
>>
>>   void CameraDevice::requestComplete(Request *request)
>>   {
>> -       Camera3RequestDescriptor *descriptor;
>> -       {
>> -               MutexLocker descriptorsLock(descriptorsMutex_);
>> -               ASSERT(!descriptors_.empty());
>> -               descriptor = descriptors_.front().get();
>> -       }
>> -
>> -       if (descriptor->request_->cookie() != request->cookie()) {
>> -               /*
>> -                * \todo Clarify if the Camera has to be closed on
>> -                * ERROR_DEVICE.
>> -                */
>> -               LOG(HAL, Error)
>> -                       << "Out-of-order completion for request "
>> -                       << utils::hex(request->cookie());
>> -
>> -               MutexLocker descriptorsLock(descriptorsMutex_);
>> -               descriptors_.pop();
>> -
>> -               notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
>> -
>> -               return;
>> -       }
>> +       Camera3RequestDescriptor *descriptor =
>> +               reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
>>
>>          /*
>>           * Prepare the capture result for the Android camera stack.
>> @@ -1124,12 +1103,16 @@ void CameraDevice::requestComplete(Request *request)
>>
>>          /* Handle post-processing. */
>>          bool needPostProcessing = false;
>> +       MutexLocker locker(descriptor->streamsProcessMutex_);
>> +
>>          /*
>> -        * \todo Protect the loop below with streamProcessMutex_ when post
>> -        * processor runs asynchronously.
>> +        * Queue all the post-processing streams request at once. The completion
>> +        * slot streamProcessingComplete() can only execute when we are out
>> +        * this critical section. This helps to handle synchronous errors here
>> +        * itself.
>>           */
>>          auto iter = descriptor->pendingStreamsToProcess_.begin();
>> -       while (descriptor->pendingStreamsToProcess_.size() > 0) {
>> +       while (iter != descriptor->pendingStreamsToProcess_.end()) {
>>                  CameraStream *stream = iter->first;
>>                  Camera3RequestDescriptor::StreamBuffer *buffer = iter->second;
>>                  needPostProcessing = true;
>> @@ -1151,18 +1134,16 @@ void CameraDevice::requestComplete(Request *request)
>>          }
>>
>>          if (needPostProcessing) {
>> -               /*
>> -                * \todo We will require to check if we failed to queue
>> -                * post-processing requests when we migrate to post-processor
>> -                * running asynchronously.
>> -                *
>> -                * if (descriptor->pendingStreamsToProcess_.size() == 0)
>> -                *      completeDescriptor(descriptor);
>> -                */
>> +               if (descriptor->pendingStreamsToProcess_.size() == 0) {
>> +                       locker.unlock();
>> +                       completeDescriptor(descriptor);
>> +               }
> As Laurent suggested in other patch, I think the code would be cleaner
> if we remove needPostProcessing.
>
>>                  return;
>>          }
>>
>> +       locker.unlock();
>> +
>>          completeDescriptor(descriptor);
>>   }
>>
>> diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
>> index 45d0607d..68b916e9 100644
>> --- a/src/android/camera_stream.cpp
>> +++ b/src/android/camera_stream.cpp
>> @@ -99,6 +99,7 @@ int CameraStream::configure()
>>                  if (ret)
>>                          return ret;
>>
>> +               worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
>>                  postProcessor_->processComplete.connect(
>>                          this, [&](Camera3RequestDescriptor::StreamBuffer *streamBuffer,
>>                                    PostProcessor::Status status) {
>> @@ -112,6 +113,8 @@ int CameraStream::configure()
>>                                  cameraDevice_->streamProcessingComplete(streamBuffer,
>>                                                                          bufferStatus);
>>                          });
>> +
>> +               worker_->start();
>>          }
>>
>>          if (type_ == Type::Internal) {
>> @@ -179,10 +182,6 @@ int CameraStream::process(const FrameBuffer &source,
>>
>>          ASSERT(type_ != Type::Direct);
>>
>> -       /*
>> -        * \todo Buffer mapping and processing should be moved to a
>> -        * separate thread.
>> -        */
>>          const StreamConfiguration &output = configuration();
>>          dest.destBuffer = std::make_unique<CameraBuffer>(
>>                  *dest.camera3Buffer, output.pixelFormat, output.size,
>> @@ -194,11 +193,19 @@ int CameraStream::process(const FrameBuffer &source,
>>
>>          dest.srcBuffer = &source;
>>
>> -       postProcessor_->process(&dest);
>> +       worker_->queueRequest(&dest);
>>
>>          return 0;
>>   }
>>
>> +void CameraStream::flush()
>> +{
>> +       if (!postProcessor_)
>> +               return;
>> +
>> +       worker_->flush();
>> +}
>> +
>>   FrameBuffer *CameraStream::getBuffer()
>>   {
>>          if (!allocator_)
>> @@ -226,3 +233,87 @@ void CameraStream::putBuffer(FrameBuffer *buffer)
>>
>>          buffers_.push_back(buffer);
>>   }
>> +
>> +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
>> +       : postProcessor_(postProcessor)
>> +{
>> +}
>> +
>> +CameraStream::PostProcessorWorker::~PostProcessorWorker()
>> +{
>> +       {
>> +               libcamera::MutexLocker lock(mutex_);
>> +               state_ = State::Stopped;
>> +       }
>> +
>> +       cv_.notify_one();
>> +       wait();
>> +}
>> +
>> +void CameraStream::PostProcessorWorker::start()
>> +{
>> +       {
>> +               libcamera::MutexLocker lock(mutex_);
>> +               ASSERT(state_ != State::Running);
>> +               state_ = State::Running;
>> +       }
>> +
>> +       Thread::start();
>> +}
>> +
>> +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest)
>> +{
>> +       {
>> +               MutexLocker lock(mutex_);
>> +               ASSERT(state_ == State::Running);
>> +               requests_.push(dest);
>> +       }
>> +
>> +       cv_.notify_one();
>> +}
>> +
>> +void CameraStream::PostProcessorWorker::run()
>> +{
>> +       MutexLocker locker(mutex_);
>> +
>> +       while (1) {
>> +               cv_.wait(locker, [&] {
>> +                       return state_ != State::Running || !requests_.empty();
>> +               });
>> +
>> +               if (state_ != State::Running)
>> +                       break;
>> +
>> +               Camera3RequestDescriptor::StreamBuffer *streamBuffer = requests_.front();
>> +               requests_.pop();
>> +               locker.unlock();
>> +
>> +               postProcessor_->process(streamBuffer);
>> +
>> +               locker.lock();
>> +       }
>> +
>> +       if (state_ == State::Flushing) {
>> +               std::queue<Camera3RequestDescriptor::StreamBuffer *> requests =
>> +                       std::move(requests_);
> Why is moving requests_ to a local variable necessary?


To avoid iterating under a lock.


>
> Reviewed-by: Hirokazu Honda<hiroh@chromium.org>
>> +               locker.unlock();
>> +
>> +               while (!requests.empty()) {
>> +                       postProcessor_->processComplete.emit(
>> +                               requests.front(), PostProcessor::Status::Error);
>> +                       requests.pop();
>> +               }
>> +
>> +               locker.lock();
>> +               state_ = State::Stopped;
>> +       }
>> +}
>> +
>> +void CameraStream::PostProcessorWorker::flush()
>> +{
>> +       libcamera::MutexLocker lock(mutex_);
>> +       state_ = State::Flushing;
>> +       lock.unlock();
>> +
>> +       cv_.notify_one();
>> +}
>> diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
>> index e9c320b1..7c6e887c 100644
>> --- a/src/android/camera_stream.h
>> +++ b/src/android/camera_stream.h
>> @@ -7,12 +7,16 @@
>>   #ifndef __ANDROID_CAMERA_STREAM_H__
>>   #define __ANDROID_CAMERA_STREAM_H__
>>
>> +#include <condition_variable>
>>   #include <memory>
>>   #include <mutex>
>> +#include <queue>
>>   #include <vector>
>>
>>   #include <hardware/camera3.h>
>>
>> +#include <libcamera/base/thread.h>
>> +
>>   #include <libcamera/camera.h>
>>   #include <libcamera/framebuffer.h>
>>   #include <libcamera/framebuffer_allocator.h>
>> @@ -20,9 +24,9 @@
>>   #include <libcamera/pixel_format.h>
>>
>>   #include "camera_request.h"
>> +#include "post_processor.h"
>>
>>   class CameraDevice;
>> -class PostProcessor;
>>
>>   class CameraStream
>>   {
>> @@ -125,8 +129,38 @@ public:
>>                      Camera3RequestDescriptor::StreamBuffer &dest);
>>          libcamera::FrameBuffer *getBuffer();
>>          void putBuffer(libcamera::FrameBuffer *buffer);
>> +       void flush();
>>
>>   private:
>> +       class PostProcessorWorker : public libcamera::Thread
>> +       {
>> +       public:
>> +               enum class State {
>> +                       Stopped,
>> +                       Running,
>> +                       Flushing,
>> +               };
>> +
>> +               PostProcessorWorker(PostProcessor *postProcessor);
>> +               ~PostProcessorWorker();
>> +
>> +               void start();
>> +               void queueRequest(Camera3RequestDescriptor::StreamBuffer *request);
>> +               void flush();
>> +
>> +       protected:
>> +               void run() override;
>> +
>> +       private:
>> +               PostProcessor *postProcessor_;
>> +
>> +               libcamera::Mutex mutex_;
>> +               std::condition_variable cv_;
>> +
>> +               std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_;
>> +               State state_ = State::Stopped;
>> +       };
>> +
>>          int waitFence(int fence);
>>
>>          CameraDevice *const cameraDevice_;
>> @@ -143,6 +177,8 @@ private:
>>           */
>>          std::unique_ptr<std::mutex> mutex_;
>>          std::unique_ptr<PostProcessor> postProcessor_;
>> +
>> +       std::unique_ptr<PostProcessorWorker> worker_;
>>   };
>>
>>   #endif /* __ANDROID_CAMERA_STREAM__ */
>> --
>> 2.31.1
>>
Hirokazu Honda Oct. 25, 2021, 1:52 p.m. UTC | #3
Hi Umang,

On Mon, Oct 25, 2021 at 10:48 PM Umang Jain <umang.jain@ideasonboard.com> wrote:
>
> Hi Hiro,
>
> On 10/25/21 5:59 PM, Hirokazu Honda wrote:
> > Hi Umang, thank you for the patch.
> >
> > On Sat, Oct 23, 2021 at 7:33 PM Umang Jain <umang.jain@ideasonboard.com> wrote:
> >> Introduce a dedicated worker class derived from libcamera::Thread.
> >> The worker class maintains a queue for post-processing requests
> >> and waits for a post-processing request to become available.
> >> It will process them as per FIFO before de-queuing it from the
> >> queue.
> >>
> >> The post processing streams loop in CameraDevice::requestComplete()
> >> is tweaked a bit to better suit asynchronous needs of post processing.
> >> Also, the entire iteration is locked under streamProcessMutex_ which
> >> helps us to queue all the post-processing request at once, before any
> >> of the post-processing completion slot (streamProcessingComplete())
> >> is allowed to run for post-processing requests completing in parallel.
> >> This helps us to manage both synchronous and asynchronous errors
> >> encountered during the entire post processing operation.
> >>
> >> This patch also implements a flush() for the PostProcessorWorker
> >> class which is responsible to purge post-processing requests
> >> queued up while a camera is stopping/flushing. It is hooked with
> >> CameraStream::flush(), which isn't used currently but will be
> >> used when we handle flush/stop scenarios in greater detail
> >> subsequently (in a different patchset).
> >>
> >> The libcamera request completion handler CameraDevice::requestComplete()
> >> assumes that the request that has just completed is at the front of the
> >> queue. Now that the post-processor runs asynchronously, this isn't true
> >> anymore, a request being post-processed will stay in the queue and a new
> >> libcamera request may complete. Remove that assumption, and use the
> >> request cookie to obtain the Camera3RequestDescriptor.
> >>
> >> Signed-off-by: Umang Jain <umang.jain@ideasonboard.com>
> >> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> >> ---
> >>   src/android/camera_device.cpp |  49 +++++------------
> >>   src/android/camera_stream.cpp | 101 ++++++++++++++++++++++++++++++++--
> >>   src/android/camera_stream.h   |  38 ++++++++++++-
> >>   3 files changed, 148 insertions(+), 40 deletions(-)
> >>
> >> diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
> >> index 3114def0..dc39467b 100644
> >> --- a/src/android/camera_device.cpp
> >> +++ b/src/android/camera_device.cpp
> >> @@ -1026,29 +1026,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
> >>
> >>   void CameraDevice::requestComplete(Request *request)
> >>   {
> >> -       Camera3RequestDescriptor *descriptor;
> >> -       {
> >> -               MutexLocker descriptorsLock(descriptorsMutex_);
> >> -               ASSERT(!descriptors_.empty());
> >> -               descriptor = descriptors_.front().get();
> >> -       }
> >> -
> >> -       if (descriptor->request_->cookie() != request->cookie()) {
> >> -               /*
> >> -                * \todo Clarify if the Camera has to be closed on
> >> -                * ERROR_DEVICE.
> >> -                */
> >> -               LOG(HAL, Error)
> >> -                       << "Out-of-order completion for request "
> >> -                       << utils::hex(request->cookie());
> >> -
> >> -               MutexLocker descriptorsLock(descriptorsMutex_);
> >> -               descriptors_.pop();
> >> -
> >> -               notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
> >> -
> >> -               return;
> >> -       }
> >> +       Camera3RequestDescriptor *descriptor =
> >> +               reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
> >>
> >>          /*
> >>           * Prepare the capture result for the Android camera stack.
> >> @@ -1124,12 +1103,16 @@ void CameraDevice::requestComplete(Request *request)
> >>
> >>          /* Handle post-processing. */
> >>          bool needPostProcessing = false;
> >> +       MutexLocker locker(descriptor->streamsProcessMutex_);
> >> +
> >>          /*
> >> -        * \todo Protect the loop below with streamProcessMutex_ when post
> >> -        * processor runs asynchronously.
> >> +        * Queue all the post-processing streams request at once. The completion
> >> +        * slot streamProcessingComplete() can only execute when we are out
> >> +        * this critical section. This helps to handle synchronous errors here
> >> +        * itself.
> >>           */
> >>          auto iter = descriptor->pendingStreamsToProcess_.begin();
> >> -       while (descriptor->pendingStreamsToProcess_.size() > 0) {
> >> +       while (iter != descriptor->pendingStreamsToProcess_.end()) {
> >>                  CameraStream *stream = iter->first;
> >>                  Camera3RequestDescriptor::StreamBuffer *buffer = iter->second;
> >>                  needPostProcessing = true;
> >> @@ -1151,18 +1134,16 @@ void CameraDevice::requestComplete(Request *request)
> >>          }
> >>
> >>          if (needPostProcessing) {
> >> -               /*
> >> -                * \todo We will require to check if we failed to queue
> >> -                * post-processing requests when we migrate to post-processor
> >> -                * running asynchronously.
> >> -                *
> >> -                * if (descriptor->pendingStreamsToProcess_.size() == 0)
> >> -                *      completeDescriptor(descriptor);
> >> -                */
> >> +               if (descriptor->pendingStreamsToProcess_.size() == 0) {
> >> +                       locker.unlock();
> >> +                       completeDescriptor(descriptor);
> >> +               }
> > As Laurent suggested in other patch, I think the code would be cleaner
> > if we remove needPostProcessing.
> >
> >>                  return;
> >>          }
> >>
> >> +       locker.unlock();
> >> +
> >>          completeDescriptor(descriptor);
> >>   }
> >>
> >> diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
> >> index 45d0607d..68b916e9 100644
> >> --- a/src/android/camera_stream.cpp
> >> +++ b/src/android/camera_stream.cpp
> >> @@ -99,6 +99,7 @@ int CameraStream::configure()
> >>                  if (ret)
> >>                          return ret;
> >>
> >> +               worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
> >>                  postProcessor_->processComplete.connect(
> >>                          this, [&](Camera3RequestDescriptor::StreamBuffer *streamBuffer,
> >>                                    PostProcessor::Status status) {
> >> @@ -112,6 +113,8 @@ int CameraStream::configure()
> >>                                  cameraDevice_->streamProcessingComplete(streamBuffer,
> >>                                                                          bufferStatus);
> >>                          });
> >> +
> >> +               worker_->start();
> >>          }
> >>
> >>          if (type_ == Type::Internal) {
> >> @@ -179,10 +182,6 @@ int CameraStream::process(const FrameBuffer &source,
> >>
> >>          ASSERT(type_ != Type::Direct);
> >>
> >> -       /*
> >> -        * \todo Buffer mapping and processing should be moved to a
> >> -        * separate thread.
> >> -        */
> >>          const StreamConfiguration &output = configuration();
> >>          dest.destBuffer = std::make_unique<CameraBuffer>(
> >>                  *dest.camera3Buffer, output.pixelFormat, output.size,
> >> @@ -194,11 +193,19 @@ int CameraStream::process(const FrameBuffer &source,
> >>
> >>          dest.srcBuffer = &source;
> >>
> >> -       postProcessor_->process(&dest);
> >> +       worker_->queueRequest(&dest);
> >>
> >>          return 0;
> >>   }
> >>
> >> +void CameraStream::flush()
> >> +{
> >> +       if (!postProcessor_)
> >> +               return;
> >> +
> >> +       worker_->flush();
> >> +}
> >> +
> >>   FrameBuffer *CameraStream::getBuffer()
> >>   {
> >>          if (!allocator_)
> >> @@ -226,3 +233,87 @@ void CameraStream::putBuffer(FrameBuffer *buffer)
> >>
> >>          buffers_.push_back(buffer);
> >>   }
> >> +
> >> +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
> >> +       : postProcessor_(postProcessor)
> >> +{
> >> +}
> >> +
> >> +CameraStream::PostProcessorWorker::~PostProcessorWorker()
> >> +{
> >> +       {
> >> +               libcamera::MutexLocker lock(mutex_);
> >> +               state_ = State::Stopped;
> >> +       }
> >> +
> >> +       cv_.notify_one();
> >> +       wait();
> >> +}
> >> +
> >> +void CameraStream::PostProcessorWorker::start()
> >> +{
> >> +       {
> >> +               libcamera::MutexLocker lock(mutex_);
> >> +               ASSERT(state_ != State::Running);
> >> +               state_ = State::Running;
> >> +       }
> >> +
> >> +       Thread::start();
> >> +}
> >> +
> >> +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest)
> >> +{
> >> +       {
> >> +               MutexLocker lock(mutex_);
> >> +               ASSERT(state_ == State::Running);
> >> +               requests_.push(dest);
> >> +       }
> >> +
> >> +       cv_.notify_one();
> >> +}
> >> +
> >> +void CameraStream::PostProcessorWorker::run()
> >> +{
> >> +       MutexLocker locker(mutex_);
> >> +
> >> +       while (1) {
> >> +               cv_.wait(locker, [&] {
> >> +                       return state_ != State::Running || !requests_.empty();
> >> +               });
> >> +
> >> +               if (state_ != State::Running)
> >> +                       break;
> >> +
> >> +               Camera3RequestDescriptor::StreamBuffer *streamBuffer = requests_.front();
> >> +               requests_.pop();
> >> +               locker.unlock();
> >> +
> >> +               postProcessor_->process(streamBuffer);
> >> +
> >> +               locker.lock();
> >> +       }
> >> +
> >> +       if (state_ == State::Flushing) {
> >> +               std::queue<Camera3RequestDescriptor::StreamBuffer *> requests =
> >> +                       std::move(requests_);
> > Why is moving requests_ to a local variable necessary?
>
>
> To avoid iterating under a lock.
>

I got it. Thanks.

>
> >
> > Reviewed-by: Hirokazu Honda<hiroh@chromium.org>
> >> +               locker.unlock();
> >> +
> >> +               while (!requests.empty()) {
> >> +                       postProcessor_->processComplete.emit(
> >> +                               requests.front(), PostProcessor::Status::Error);
> >> +                       requests.pop();
> >> +               }
> >> +
> >> +               locker.lock();
> >> +               state_ = State::Stopped;
> >> +       }
> >> +}
> >> +
> >> +void CameraStream::PostProcessorWorker::flush()
> >> +{
> >> +       libcamera::MutexLocker lock(mutex_);
> >> +       state_ = State::Flushing;
> >> +       lock.unlock();
> >> +
> >> +       cv_.notify_one();
> >> +}
> >> diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
> >> index e9c320b1..7c6e887c 100644
> >> --- a/src/android/camera_stream.h
> >> +++ b/src/android/camera_stream.h
> >> @@ -7,12 +7,16 @@
> >>   #ifndef __ANDROID_CAMERA_STREAM_H__
> >>   #define __ANDROID_CAMERA_STREAM_H__
> >>
> >> +#include <condition_variable>
> >>   #include <memory>
> >>   #include <mutex>
> >> +#include <queue>
> >>   #include <vector>
> >>
> >>   #include <hardware/camera3.h>
> >>
> >> +#include <libcamera/base/thread.h>
> >> +
> >>   #include <libcamera/camera.h>
> >>   #include <libcamera/framebuffer.h>
> >>   #include <libcamera/framebuffer_allocator.h>
> >> @@ -20,9 +24,9 @@
> >>   #include <libcamera/pixel_format.h>
> >>
> >>   #include "camera_request.h"
> >> +#include "post_processor.h"
> >>
> >>   class CameraDevice;
> >> -class PostProcessor;
> >>
> >>   class CameraStream
> >>   {
> >> @@ -125,8 +129,38 @@ public:
> >>                      Camera3RequestDescriptor::StreamBuffer &dest);
> >>          libcamera::FrameBuffer *getBuffer();
> >>          void putBuffer(libcamera::FrameBuffer *buffer);
> >> +       void flush();
> >>
> >>   private:
> >> +       class PostProcessorWorker : public libcamera::Thread
> >> +       {
> >> +       public:
> >> +               enum class State {
> >> +                       Stopped,
> >> +                       Running,
> >> +                       Flushing,
> >> +               };
> >> +
> >> +               PostProcessorWorker(PostProcessor *postProcessor);
> >> +               ~PostProcessorWorker();
> >> +
> >> +               void start();
> >> +               void queueRequest(Camera3RequestDescriptor::StreamBuffer *request);
> >> +               void flush();
> >> +
> >> +       protected:
> >> +               void run() override;
> >> +
> >> +       private:
> >> +               PostProcessor *postProcessor_;
> >> +
> >> +               libcamera::Mutex mutex_;
> >> +               std::condition_variable cv_;
> >> +
> >> +               std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_;
> >> +               State state_ = State::Stopped;
> >> +       };
> >> +
> >>          int waitFence(int fence);
> >>
> >>          CameraDevice *const cameraDevice_;
> >> @@ -143,6 +177,8 @@ private:
> >>           */
> >>          std::unique_ptr<std::mutex> mutex_;
> >>          std::unique_ptr<PostProcessor> postProcessor_;
> >> +
> >> +       std::unique_ptr<PostProcessorWorker> worker_;
> >>   };
> >>
> >>   #endif /* __ANDROID_CAMERA_STREAM__ */
> >> --
> >> 2.31.1
> >>

Patch
diff mbox series

diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp
index 3114def0..dc39467b 100644
--- a/src/android/camera_device.cpp
+++ b/src/android/camera_device.cpp
@@ -1026,29 +1026,8 @@  int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques
 
 void CameraDevice::requestComplete(Request *request)
 {
-	Camera3RequestDescriptor *descriptor;
-	{
-		MutexLocker descriptorsLock(descriptorsMutex_);
-		ASSERT(!descriptors_.empty());
-		descriptor = descriptors_.front().get();
-	}
-
-	if (descriptor->request_->cookie() != request->cookie()) {
-		/*
-		 * \todo Clarify if the Camera has to be closed on
-		 * ERROR_DEVICE.
-		 */
-		LOG(HAL, Error)
-			<< "Out-of-order completion for request "
-			<< utils::hex(request->cookie());
-
-		MutexLocker descriptorsLock(descriptorsMutex_);
-		descriptors_.pop();
-
-		notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE);
-
-		return;
-	}
+	Camera3RequestDescriptor *descriptor =
+		reinterpret_cast<Camera3RequestDescriptor *>(request->cookie());
 
 	/*
 	 * Prepare the capture result for the Android camera stack.
@@ -1124,12 +1103,16 @@  void CameraDevice::requestComplete(Request *request)
 
 	/* Handle post-processing. */
 	bool needPostProcessing = false;
+	MutexLocker locker(descriptor->streamsProcessMutex_);
+
 	/*
-	 * \todo Protect the loop below with streamProcessMutex_ when post
-	 * processor runs asynchronously.
+	 * Queue all the post-processing streams request at once. The completion
+	 * slot streamProcessingComplete() can only execute when we are out
+	 * this critical section. This helps to handle synchronous errors here
+	 * itself.
 	 */
 	auto iter = descriptor->pendingStreamsToProcess_.begin();
-	while (descriptor->pendingStreamsToProcess_.size() > 0) {
+	while (iter != descriptor->pendingStreamsToProcess_.end()) {
 		CameraStream *stream = iter->first;
 		Camera3RequestDescriptor::StreamBuffer *buffer = iter->second;
 		needPostProcessing = true;
@@ -1151,18 +1134,16 @@  void CameraDevice::requestComplete(Request *request)
 	}
 
 	if (needPostProcessing) {
-		/*
-		 * \todo We will require to check if we failed to queue
-		 * post-processing requests when we migrate to post-processor
-		 * running asynchronously.
-		 *
-		 * if (descriptor->pendingStreamsToProcess_.size() == 0)
-		 *	completeDescriptor(descriptor);
-		 */
+		if (descriptor->pendingStreamsToProcess_.size() == 0) {
+			locker.unlock();
+			completeDescriptor(descriptor);
+		}
 
 		return;
 	}
 
+	locker.unlock();
+
 	completeDescriptor(descriptor);
 }
 
diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp
index 45d0607d..68b916e9 100644
--- a/src/android/camera_stream.cpp
+++ b/src/android/camera_stream.cpp
@@ -99,6 +99,7 @@  int CameraStream::configure()
 		if (ret)
 			return ret;
 
+		worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get());
 		postProcessor_->processComplete.connect(
 			this, [&](Camera3RequestDescriptor::StreamBuffer *streamBuffer,
 				  PostProcessor::Status status) {
@@ -112,6 +113,8 @@  int CameraStream::configure()
 				cameraDevice_->streamProcessingComplete(streamBuffer,
 									bufferStatus);
 			});
+
+		worker_->start();
 	}
 
 	if (type_ == Type::Internal) {
@@ -179,10 +182,6 @@  int CameraStream::process(const FrameBuffer &source,
 
 	ASSERT(type_ != Type::Direct);
 
-	/*
-	 * \todo Buffer mapping and processing should be moved to a
-	 * separate thread.
-	 */
 	const StreamConfiguration &output = configuration();
 	dest.destBuffer = std::make_unique<CameraBuffer>(
 		*dest.camera3Buffer, output.pixelFormat, output.size,
@@ -194,11 +193,19 @@  int CameraStream::process(const FrameBuffer &source,
 
 	dest.srcBuffer = &source;
 
-	postProcessor_->process(&dest);
+	worker_->queueRequest(&dest);
 
 	return 0;
 }
 
+void CameraStream::flush()
+{
+	if (!postProcessor_)
+		return;
+
+	worker_->flush();
+}
+
 FrameBuffer *CameraStream::getBuffer()
 {
 	if (!allocator_)
@@ -226,3 +233,87 @@  void CameraStream::putBuffer(FrameBuffer *buffer)
 
 	buffers_.push_back(buffer);
 }
+
+CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor)
+	: postProcessor_(postProcessor)
+{
+}
+
+CameraStream::PostProcessorWorker::~PostProcessorWorker()
+{
+	{
+		libcamera::MutexLocker lock(mutex_);
+		state_ = State::Stopped;
+	}
+
+	cv_.notify_one();
+	wait();
+}
+
+void CameraStream::PostProcessorWorker::start()
+{
+	{
+		libcamera::MutexLocker lock(mutex_);
+		ASSERT(state_ != State::Running);
+		state_ = State::Running;
+	}
+
+	Thread::start();
+}
+
+void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest)
+{
+	{
+		MutexLocker lock(mutex_);
+		ASSERT(state_ == State::Running);
+		requests_.push(dest);
+	}
+
+	cv_.notify_one();
+}
+
+void CameraStream::PostProcessorWorker::run()
+{
+	MutexLocker locker(mutex_);
+
+	while (1) {
+		cv_.wait(locker, [&] {
+			return state_ != State::Running || !requests_.empty();
+		});
+
+		if (state_ != State::Running)
+			break;
+
+		Camera3RequestDescriptor::StreamBuffer *streamBuffer = requests_.front();
+		requests_.pop();
+		locker.unlock();
+
+		postProcessor_->process(streamBuffer);
+
+		locker.lock();
+	}
+
+	if (state_ == State::Flushing) {
+		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests =
+			std::move(requests_);
+		locker.unlock();
+
+		while (!requests.empty()) {
+			postProcessor_->processComplete.emit(
+				requests.front(), PostProcessor::Status::Error);
+			requests.pop();
+		}
+
+		locker.lock();
+		state_ = State::Stopped;
+	}
+}
+
+void CameraStream::PostProcessorWorker::flush()
+{
+	libcamera::MutexLocker lock(mutex_);
+	state_ = State::Flushing;
+	lock.unlock();
+
+	cv_.notify_one();
+}
diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h
index e9c320b1..7c6e887c 100644
--- a/src/android/camera_stream.h
+++ b/src/android/camera_stream.h
@@ -7,12 +7,16 @@ 
 #ifndef __ANDROID_CAMERA_STREAM_H__
 #define __ANDROID_CAMERA_STREAM_H__
 
+#include <condition_variable>
 #include <memory>
 #include <mutex>
+#include <queue>
 #include <vector>
 
 #include <hardware/camera3.h>
 
+#include <libcamera/base/thread.h>
+
 #include <libcamera/camera.h>
 #include <libcamera/framebuffer.h>
 #include <libcamera/framebuffer_allocator.h>
@@ -20,9 +24,9 @@ 
 #include <libcamera/pixel_format.h>
 
 #include "camera_request.h"
+#include "post_processor.h"
 
 class CameraDevice;
-class PostProcessor;
 
 class CameraStream
 {
@@ -125,8 +129,38 @@  public:
 		    Camera3RequestDescriptor::StreamBuffer &dest);
 	libcamera::FrameBuffer *getBuffer();
 	void putBuffer(libcamera::FrameBuffer *buffer);
+	void flush();
 
 private:
+	class PostProcessorWorker : public libcamera::Thread
+	{
+	public:
+		enum class State {
+			Stopped,
+			Running,
+			Flushing,
+		};
+
+		PostProcessorWorker(PostProcessor *postProcessor);
+		~PostProcessorWorker();
+
+		void start();
+		void queueRequest(Camera3RequestDescriptor::StreamBuffer *request);
+		void flush();
+
+	protected:
+		void run() override;
+
+	private:
+		PostProcessor *postProcessor_;
+
+		libcamera::Mutex mutex_;
+		std::condition_variable cv_;
+
+		std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_;
+		State state_ = State::Stopped;
+	};
+
 	int waitFence(int fence);
 
 	CameraDevice *const cameraDevice_;
@@ -143,6 +177,8 @@  private:
 	 */
 	std::unique_ptr<std::mutex> mutex_;
 	std::unique_ptr<PostProcessor> postProcessor_;
+
+	std::unique_ptr<PostProcessorWorker> worker_;
 };
 
 #endif /* __ANDROID_CAMERA_STREAM__ */