Message ID | 20211025203833.122460-8-umang.jain@ideasonboard.com |
---|---|
State | Superseded |
Delegated to: | Umang Jain |
Headers | show |
Series |
|
Related | show |
Hi Umang, Thank you for the patch. On Tue, Oct 26, 2021 at 02:08:33AM +0530, Umang Jain wrote: > Introduce a dedicated worker class derived from libcamera::Thread. > The worker class maintains a queue for post-processing requests > and waits for a post-processing request to become available. > It will process them as per FIFO before de-queuing it from the > queue. > > The entire post-processing handling iteration is locked under > streamProcessMutex_ which helps us to queue all the post-processing s/streamProcessMutex_/streamsProcessMutex_/ > request at once, before any of the post-processing completion slot > (streamProcessingComplete()) is allowed to run for post-processing > requests completing in parallel. This helps us to manage both > synchronous and asynchronous errors encountered during the entire > post processing operation. Since a post-processing operation can > even complete after CameraDevice::requestComplete() has returned, > we need to check and complete the descriptor from > streamProcessingComplete() running in the PostProcessorWorker's > thread. > > This patch also implements a flush() for the PostProcessorWorker > class which is responsible to purge post-processing requests > queued up while a camera is stopping/flushing. It is hooked with > CameraStream::flush(), which isn't used currently but will be > used when we handle flush/stop scenarios in greater detail > subsequently (in a different patchset). > > The libcamera request completion handler CameraDevice::requestComplete() > assumes that the request that has just completed is at the front of the > queue. Now that the post-processor runs asynchronously, this isn't true > anymore, a request being post-processed will stay in the queue and a new > libcamera request may complete. Remove that assumption, and use the > request cookie to obtain the Camera3RequestDescriptor. > > Signed-off-by: Umang Jain <umang.jain@ideasonboard.com> > Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com> > --- > src/android/camera_device.cpp | 44 ++++++--------- > src/android/camera_stream.cpp | 101 ++++++++++++++++++++++++++++++++-- > src/android/camera_stream.h | 37 +++++++++++++ > 3 files changed, 151 insertions(+), 31 deletions(-) > > diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp > index 3ded0f7e..53ebe0ea 100644 > --- a/src/android/camera_device.cpp > +++ b/src/android/camera_device.cpp > @@ -1027,29 +1027,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques > > void CameraDevice::requestComplete(Request *request) > { > - Camera3RequestDescriptor *descriptor; > - { > - MutexLocker descriptorsLock(descriptorsMutex_); > - ASSERT(!descriptors_.empty()); > - descriptor = descriptors_.front().get(); > - } > - > - if (descriptor->request_->cookie() != request->cookie()) { > - /* > - * \todo Clarify if the Camera has to be closed on > - * ERROR_DEVICE. > - */ > - LOG(HAL, Error) > - << "Out-of-order completion for request " > - << utils::hex(request->cookie()); > - > - MutexLocker descriptorsLock(descriptorsMutex_); > - descriptors_.pop(); > - > - notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE); > - > - return; > - } > + Camera3RequestDescriptor *descriptor = > + reinterpret_cast<Camera3RequestDescriptor *>(request->cookie()); > > /* > * Prepare the capture result for the Android camera stack. > @@ -1124,9 +1103,13 @@ void CameraDevice::requestComplete(Request *request) > } > > /* Handle post-processing. */ > + MutexLocker locker(descriptor->streamsProcessMutex_); > + > /* > - * \todo Protect the loop below with streamProcessMutex_ when post In the patch that introduced this, s/streamProcessMutex_/streamsProcessMutex_/ > - * processor runs asynchronously. > + * Queue all the post-processing streams request at once. The completion > + * slot streamProcessingComplete() can only execute when we are out > + * this critical section. This helps to handle synchronous errors here > + * itself. > */ > auto iter = descriptor->pendingStreamsToProcess_.begin(); > while (iter != descriptor->pendingStreamsToProcess_.end()) { > @@ -1158,8 +1141,10 @@ void CameraDevice::requestComplete(Request *request) > } > } > > - if (descriptor->pendingStreamsToProcess_.empty()) > + if (descriptor->pendingStreamsToProcess_.empty()) { > + locker.unlock(); > completeDescriptor(descriptor); > + } > } > > void CameraDevice::completeDescriptor(Camera3RequestDescriptor *descriptor) > @@ -1245,6 +1230,13 @@ void CameraDevice::streamProcessingComplete(Camera3RequestDescriptor::StreamBuff > MutexLocker locker(request->streamsProcessMutex_); > > request->pendingStreamsToProcess_.erase(streamBuffer->stream); > + > + if (!request->pendingStreamsToProcess_.empty()) > + return; > + > + locker.unlock(); Maybe { MutexLocker locker(request->streamsProcessMutex_); request->pendingStreamsToProcess_.erase(streamBuffer->stream); if (!request->pendingStreamsToProcess_.empty()) return; } up to you. > + > + completeDescriptor(streamBuffer->request); > } > > std::string CameraDevice::logPrefix() const > diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp > index fed99022..9023c13c 100644 > --- a/src/android/camera_stream.cpp > +++ b/src/android/camera_stream.cpp > @@ -99,6 +99,7 @@ int CameraStream::configure() > if (ret) > return ret; > > + worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get()); > postProcessor_->processComplete.connect( > this, [&](Camera3RequestDescriptor::StreamBuffer *streamBuffer, > PostProcessor::Status status) { > @@ -112,6 +113,8 @@ int CameraStream::configure() > cameraDevice_->streamProcessingComplete(streamBuffer, > bufferStatus); > }); > + > + worker_->start(); > } > > if (type_ == Type::Internal) { > @@ -178,10 +181,6 @@ int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer) > streamBuffer->fence = -1; > } > > - /* > - * \todo Buffer mapping and processing should be moved to a > - * separate thread. > - */ > const StreamConfiguration &output = configuration(); > streamBuffer->dstBuffer = std::make_unique<CameraBuffer>( > *streamBuffer->camera3Buffer, output.pixelFormat, output.size, > @@ -191,11 +190,19 @@ int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer) > return -EINVAL; > } > > - postProcessor_->process(streamBuffer); > + worker_->queueRequest(streamBuffer); > > return 0; > } > > +void CameraStream::flush() > +{ > + if (!postProcessor_) > + return; > + > + worker_->flush(); > +} > + > FrameBuffer *CameraStream::getBuffer() > { > if (!allocator_) > @@ -223,3 +230,87 @@ void CameraStream::putBuffer(FrameBuffer *buffer) > > buffers_.push_back(buffer); > } > + > +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor) > + : postProcessor_(postProcessor) > +{ > +} > + > +CameraStream::PostProcessorWorker::~PostProcessorWorker() > +{ > + { > + libcamera::MutexLocker lock(mutex_); > + state_ = State::Stopped; > + } > + > + cv_.notify_one(); > + wait(); > +} > + > +void CameraStream::PostProcessorWorker::start() > +{ > + { > + libcamera::MutexLocker lock(mutex_); > + ASSERT(state_ != State::Running); > + state_ = State::Running; > + } > + > + Thread::start(); > +} > + > +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest) > +{ > + { > + MutexLocker lock(mutex_); > + ASSERT(state_ == State::Running); > + requests_.push(dest); > + } > + > + cv_.notify_one(); > +} > + > +void CameraStream::PostProcessorWorker::run() > +{ > + MutexLocker locker(mutex_); > + > + while (1) { > + cv_.wait(locker, [&] { > + return state_ != State::Running || !requests_.empty(); > + }); > + > + if (state_ != State::Running) > + break; > + > + Camera3RequestDescriptor::StreamBuffer *streamBuffer = requests_.front(); > + requests_.pop(); > + locker.unlock(); > + > + postProcessor_->process(streamBuffer); > + > + locker.lock(); > + } > + > + if (state_ == State::Flushing) { > + std::queue<Camera3RequestDescriptor::StreamBuffer *> requests = > + std::move(requests_); > + locker.unlock(); > + > + while (!requests.empty()) { > + postProcessor_->processComplete.emit( > + requests.front(), PostProcessor::Status::Error); > + requests.pop(); > + } > + > + locker.lock(); > + state_ = State::Stopped; > + } > +} > + > +void CameraStream::PostProcessorWorker::flush() > +{ > + libcamera::MutexLocker lock(mutex_); > + state_ = State::Flushing; > + lock.unlock(); > + > + cv_.notify_one(); > +} > diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h > index e74a9a3b..1588938a 100644 > --- a/src/android/camera_stream.h > +++ b/src/android/camera_stream.h > @@ -7,12 +7,16 @@ > #ifndef __ANDROID_CAMERA_STREAM_H__ > #define __ANDROID_CAMERA_STREAM_H__ > > +#include <condition_variable> > #include <memory> > #include <mutex> > +#include <queue> > #include <vector> > > #include <hardware/camera3.h> > > +#include <libcamera/base/thread.h> > + > #include <libcamera/camera.h> > #include <libcamera/framebuffer.h> > #include <libcamera/framebuffer_allocator.h> > @@ -20,6 +24,7 @@ > #include <libcamera/pixel_format.h> > > #include "camera_request.h" > +#include "post_processor.h" > > class CameraDevice; > class PostProcessor; You can drop the forward declaration. > @@ -124,8 +129,38 @@ public: > int process(Camera3RequestDescriptor::StreamBuffer *streamBuffer); > libcamera::FrameBuffer *getBuffer(); > void putBuffer(libcamera::FrameBuffer *buffer); > + void flush(); > > private: > + class PostProcessorWorker : public libcamera::Thread > + { > + public: > + enum class State { > + Stopped, > + Running, > + Flushing, > + }; > + > + PostProcessorWorker(PostProcessor *postProcessor); > + ~PostProcessorWorker(); > + > + void start(); > + void queueRequest(Camera3RequestDescriptor::StreamBuffer *request); > + void flush(); > + > + protected: > + void run() override; > + > + private: > + PostProcessor *postProcessor_; > + > + libcamera::Mutex mutex_; > + std::condition_variable cv_; > + > + std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_; > + State state_ = State::Stopped; > + }; > + > int waitFence(int fence); > > CameraDevice *const cameraDevice_; > @@ -142,6 +177,8 @@ private: > */ > std::unique_ptr<std::mutex> mutex_; > std::unique_ptr<PostProcessor> postProcessor_; > + > + std::unique_ptr<PostProcessorWorker> worker_; > }; > > #endif /* __ANDROID_CAMERA_STREAM__ */
Hi Umang, thank you for the patch. On Tue, Oct 26, 2021 at 7:05 AM Laurent Pinchart <laurent.pinchart@ideasonboard.com> wrote: > > Hi Umang, > > Thank you for the patch. > > On Tue, Oct 26, 2021 at 02:08:33AM +0530, Umang Jain wrote: > > Introduce a dedicated worker class derived from libcamera::Thread. > > The worker class maintains a queue for post-processing requests > > and waits for a post-processing request to become available. > > It will process them as per FIFO before de-queuing it from the > > queue. > > > > The entire post-processing handling iteration is locked under > > streamProcessMutex_ which helps us to queue all the post-processing > > s/streamProcessMutex_/streamsProcessMutex_/ > > > request at once, before any of the post-processing completion slot > > (streamProcessingComplete()) is allowed to run for post-processing > > requests completing in parallel. This helps us to manage both > > synchronous and asynchronous errors encountered during the entire > > post processing operation. Since a post-processing operation can > > even complete after CameraDevice::requestComplete() has returned, > > we need to check and complete the descriptor from > > streamProcessingComplete() running in the PostProcessorWorker's > > thread. > > > > This patch also implements a flush() for the PostProcessorWorker > > class which is responsible to purge post-processing requests > > queued up while a camera is stopping/flushing. It is hooked with > > CameraStream::flush(), which isn't used currently but will be > > used when we handle flush/stop scenarios in greater detail > > subsequently (in a different patchset). > > > > The libcamera request completion handler CameraDevice::requestComplete() > > assumes that the request that has just completed is at the front of the > > queue. Now that the post-processor runs asynchronously, this isn't true > > anymore, a request being post-processed will stay in the queue and a new > > libcamera request may complete. Remove that assumption, and use the > > request cookie to obtain the Camera3RequestDescriptor. > > > > Signed-off-by: Umang Jain <umang.jain@ideasonboard.com> > > Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com> Reviewed-by: Hirokazu Honda <hiroh@chromium.org> > > --- > > src/android/camera_device.cpp | 44 ++++++--------- > > src/android/camera_stream.cpp | 101 ++++++++++++++++++++++++++++++++-- > > src/android/camera_stream.h | 37 +++++++++++++ > > 3 files changed, 151 insertions(+), 31 deletions(-) > > > > diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp > > index 3ded0f7e..53ebe0ea 100644 > > --- a/src/android/camera_device.cpp > > +++ b/src/android/camera_device.cpp > > @@ -1027,29 +1027,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques > > > > void CameraDevice::requestComplete(Request *request) > > { > > - Camera3RequestDescriptor *descriptor; > > - { > > - MutexLocker descriptorsLock(descriptorsMutex_); > > - ASSERT(!descriptors_.empty()); > > - descriptor = descriptors_.front().get(); > > - } > > - > > - if (descriptor->request_->cookie() != request->cookie()) { > > - /* > > - * \todo Clarify if the Camera has to be closed on > > - * ERROR_DEVICE. > > - */ > > - LOG(HAL, Error) > > - << "Out-of-order completion for request " > > - << utils::hex(request->cookie()); > > - > > - MutexLocker descriptorsLock(descriptorsMutex_); > > - descriptors_.pop(); > > - > > - notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE); > > - > > - return; > > - } > > + Camera3RequestDescriptor *descriptor = > > + reinterpret_cast<Camera3RequestDescriptor *>(request->cookie()); > > > > /* > > * Prepare the capture result for the Android camera stack. > > @@ -1124,9 +1103,13 @@ void CameraDevice::requestComplete(Request *request) > > } > > > > /* Handle post-processing. */ > > + MutexLocker locker(descriptor->streamsProcessMutex_); > > + > > /* > > - * \todo Protect the loop below with streamProcessMutex_ when post > > In the patch that introduced this, > > s/streamProcessMutex_/streamsProcessMutex_/ > > > - * processor runs asynchronously. > > + * Queue all the post-processing streams request at once. The completion > > + * slot streamProcessingComplete() can only execute when we are out > > + * this critical section. This helps to handle synchronous errors here > > + * itself. > > */ > > auto iter = descriptor->pendingStreamsToProcess_.begin(); > > while (iter != descriptor->pendingStreamsToProcess_.end()) { > > @@ -1158,8 +1141,10 @@ void CameraDevice::requestComplete(Request *request) > > } > > } > > > > - if (descriptor->pendingStreamsToProcess_.empty()) > > + if (descriptor->pendingStreamsToProcess_.empty()) { > > + locker.unlock(); > > completeDescriptor(descriptor); > > + } > > } > > > > void CameraDevice::completeDescriptor(Camera3RequestDescriptor *descriptor) > > @@ -1245,6 +1230,13 @@ void CameraDevice::streamProcessingComplete(Camera3RequestDescriptor::StreamBuff > > MutexLocker locker(request->streamsProcessMutex_); > > > > request->pendingStreamsToProcess_.erase(streamBuffer->stream); > > + > > + if (!request->pendingStreamsToProcess_.empty()) > > + return; > > + > > + locker.unlock(); > > Maybe > > { > MutexLocker locker(request->streamsProcessMutex_); > > request->pendingStreamsToProcess_.erase(streamBuffer->stream); > if (!request->pendingStreamsToProcess_.empty()) > return; > } > > up to you. > > > + > > + completeDescriptor(streamBuffer->request); > > } > > > > std::string CameraDevice::logPrefix() const > > diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp > > index fed99022..9023c13c 100644 > > --- a/src/android/camera_stream.cpp > > +++ b/src/android/camera_stream.cpp > > @@ -99,6 +99,7 @@ int CameraStream::configure() > > if (ret) > > return ret; > > > > + worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get()); > > postProcessor_->processComplete.connect( > > this, [&](Camera3RequestDescriptor::StreamBuffer *streamBuffer, > > PostProcessor::Status status) { > > @@ -112,6 +113,8 @@ int CameraStream::configure() > > cameraDevice_->streamProcessingComplete(streamBuffer, > > bufferStatus); > > }); > > + > > + worker_->start(); > > } > > > > if (type_ == Type::Internal) { > > @@ -178,10 +181,6 @@ int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer) > > streamBuffer->fence = -1; > > } > > > > - /* > > - * \todo Buffer mapping and processing should be moved to a > > - * separate thread. > > - */ > > const StreamConfiguration &output = configuration(); > > streamBuffer->dstBuffer = std::make_unique<CameraBuffer>( > > *streamBuffer->camera3Buffer, output.pixelFormat, output.size, > > @@ -191,11 +190,19 @@ int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer) > > return -EINVAL; > > } > > > > - postProcessor_->process(streamBuffer); > > + worker_->queueRequest(streamBuffer); > > > > return 0; > > } > > > > +void CameraStream::flush() > > +{ > > + if (!postProcessor_) > > + return; > > + > > + worker_->flush(); > > +} > > + > > FrameBuffer *CameraStream::getBuffer() > > { > > if (!allocator_) > > @@ -223,3 +230,87 @@ void CameraStream::putBuffer(FrameBuffer *buffer) > > > > buffers_.push_back(buffer); > > } > > + > > +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor) > > + : postProcessor_(postProcessor) > > +{ > > +} > > + > > +CameraStream::PostProcessorWorker::~PostProcessorWorker() > > +{ > > + { > > + libcamera::MutexLocker lock(mutex_); > > + state_ = State::Stopped; > > + } > > + > > + cv_.notify_one(); > > + wait(); > > +} > > + > > +void CameraStream::PostProcessorWorker::start() > > +{ > > + { > > + libcamera::MutexLocker lock(mutex_); > > + ASSERT(state_ != State::Running); > > + state_ = State::Running; > > + } > > + > > + Thread::start(); > > +} > > + > > +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest) > > +{ > > + { > > + MutexLocker lock(mutex_); > > + ASSERT(state_ == State::Running); > > + requests_.push(dest); > > + } > > + > > + cv_.notify_one(); > > +} > > + > > +void CameraStream::PostProcessorWorker::run() > > +{ > > + MutexLocker locker(mutex_); > > + > > + while (1) { > > + cv_.wait(locker, [&] { > > + return state_ != State::Running || !requests_.empty(); > > + }); > > + > > + if (state_ != State::Running) > > + break; > > + > > + Camera3RequestDescriptor::StreamBuffer *streamBuffer = requests_.front(); > > + requests_.pop(); > > + locker.unlock(); > > + > > + postProcessor_->process(streamBuffer); > > + > > + locker.lock(); > > + } > > + > > + if (state_ == State::Flushing) { > > + std::queue<Camera3RequestDescriptor::StreamBuffer *> requests = > > + std::move(requests_); > > + locker.unlock(); > > + > > + while (!requests.empty()) { > > + postProcessor_->processComplete.emit( > > + requests.front(), PostProcessor::Status::Error); > > + requests.pop(); > > + } > > + > > + locker.lock(); > > + state_ = State::Stopped; > > + } > > +} > > + > > +void CameraStream::PostProcessorWorker::flush() > > +{ > > + libcamera::MutexLocker lock(mutex_); > > + state_ = State::Flushing; > > + lock.unlock(); > > + > > + cv_.notify_one(); > > +} > > diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h > > index e74a9a3b..1588938a 100644 > > --- a/src/android/camera_stream.h > > +++ b/src/android/camera_stream.h > > @@ -7,12 +7,16 @@ > > #ifndef __ANDROID_CAMERA_STREAM_H__ > > #define __ANDROID_CAMERA_STREAM_H__ > > > > +#include <condition_variable> > > #include <memory> > > #include <mutex> > > +#include <queue> > > #include <vector> > > > > #include <hardware/camera3.h> > > > > +#include <libcamera/base/thread.h> > > + > > #include <libcamera/camera.h> > > #include <libcamera/framebuffer.h> > > #include <libcamera/framebuffer_allocator.h> > > @@ -20,6 +24,7 @@ > > #include <libcamera/pixel_format.h> > > > > #include "camera_request.h" > > +#include "post_processor.h" > > > > class CameraDevice; > > class PostProcessor; > > You can drop the forward declaration. > > > @@ -124,8 +129,38 @@ public: > > int process(Camera3RequestDescriptor::StreamBuffer *streamBuffer); > > libcamera::FrameBuffer *getBuffer(); > > void putBuffer(libcamera::FrameBuffer *buffer); > > + void flush(); > > > > private: > > + class PostProcessorWorker : public libcamera::Thread > > + { > > + public: > > + enum class State { > > + Stopped, > > + Running, > > + Flushing, > > + }; > > + > > + PostProcessorWorker(PostProcessor *postProcessor); > > + ~PostProcessorWorker(); > > + > > + void start(); > > + void queueRequest(Camera3RequestDescriptor::StreamBuffer *request); > > + void flush(); > > + > > + protected: > > + void run() override; > > + > > + private: > > + PostProcessor *postProcessor_; > > + > > + libcamera::Mutex mutex_; > > + std::condition_variable cv_; > > + > > + std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_; > > + State state_ = State::Stopped; > > + }; > > + > > int waitFence(int fence); > > > > CameraDevice *const cameraDevice_; > > @@ -142,6 +177,8 @@ private: > > */ > > std::unique_ptr<std::mutex> mutex_; > > std::unique_ptr<PostProcessor> postProcessor_; > > + > > + std::unique_ptr<PostProcessorWorker> worker_; > > }; > > > > #endif /* __ANDROID_CAMERA_STREAM__ */ > > -- > Regards, > > Laurent Pinchart
Hi Laurent, On 10/26/21 3:35 AM, Laurent Pinchart wrote: > Hi Umang, > > Thank you for the patch. > > On Tue, Oct 26, 2021 at 02:08:33AM +0530, Umang Jain wrote: >> Introduce a dedicated worker class derived from libcamera::Thread. >> The worker class maintains a queue for post-processing requests >> and waits for a post-processing request to become available. >> It will process them as per FIFO before de-queuing it from the >> queue. >> >> The entire post-processing handling iteration is locked under >> streamProcessMutex_ which helps us to queue all the post-processing > s/streamProcessMutex_/streamsProcessMutex_/ > >> request at once, before any of the post-processing completion slot >> (streamProcessingComplete()) is allowed to run for post-processing >> requests completing in parallel. This helps us to manage both >> synchronous and asynchronous errors encountered during the entire >> post processing operation. Since a post-processing operation can >> even complete after CameraDevice::requestComplete() has returned, >> we need to check and complete the descriptor from >> streamProcessingComplete() running in the PostProcessorWorker's >> thread. >> >> This patch also implements a flush() for the PostProcessorWorker >> class which is responsible to purge post-processing requests >> queued up while a camera is stopping/flushing. It is hooked with >> CameraStream::flush(), which isn't used currently but will be >> used when we handle flush/stop scenarios in greater detail >> subsequently (in a different patchset). >> >> The libcamera request completion handler CameraDevice::requestComplete() >> assumes that the request that has just completed is at the front of the >> queue. Now that the post-processor runs asynchronously, this isn't true >> anymore, a request being post-processed will stay in the queue and a new >> libcamera request may complete. Remove that assumption, and use the >> request cookie to obtain the Camera3RequestDescriptor. >> >> Signed-off-by: Umang Jain <umang.jain@ideasonboard.com> >> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com> >> --- >> src/android/camera_device.cpp | 44 ++++++--------- >> src/android/camera_stream.cpp | 101 ++++++++++++++++++++++++++++++++-- >> src/android/camera_stream.h | 37 +++++++++++++ >> 3 files changed, 151 insertions(+), 31 deletions(-) >> >> diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp >> index 3ded0f7e..53ebe0ea 100644 >> --- a/src/android/camera_device.cpp >> +++ b/src/android/camera_device.cpp >> @@ -1027,29 +1027,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques >> >> void CameraDevice::requestComplete(Request *request) >> { >> - Camera3RequestDescriptor *descriptor; >> - { >> - MutexLocker descriptorsLock(descriptorsMutex_); >> - ASSERT(!descriptors_.empty()); >> - descriptor = descriptors_.front().get(); >> - } >> - >> - if (descriptor->request_->cookie() != request->cookie()) { >> - /* >> - * \todo Clarify if the Camera has to be closed on >> - * ERROR_DEVICE. >> - */ >> - LOG(HAL, Error) >> - << "Out-of-order completion for request " >> - << utils::hex(request->cookie()); >> - >> - MutexLocker descriptorsLock(descriptorsMutex_); >> - descriptors_.pop(); >> - >> - notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE); >> - >> - return; >> - } >> + Camera3RequestDescriptor *descriptor = >> + reinterpret_cast<Camera3RequestDescriptor *>(request->cookie()); >> >> /* >> * Prepare the capture result for the Android camera stack. >> @@ -1124,9 +1103,13 @@ void CameraDevice::requestComplete(Request *request) >> } >> >> /* Handle post-processing. */ >> + MutexLocker locker(descriptor->streamsProcessMutex_); >> + >> /* >> - * \todo Protect the loop below with streamProcessMutex_ when post > In the patch that introduced this, > > s/streamProcessMutex_/streamsProcessMutex_/ > >> - * processor runs asynchronously. >> + * Queue all the post-processing streams request at once. The completion >> + * slot streamProcessingComplete() can only execute when we are out >> + * this critical section. This helps to handle synchronous errors here >> + * itself. >> */ >> auto iter = descriptor->pendingStreamsToProcess_.begin(); >> while (iter != descriptor->pendingStreamsToProcess_.end()) { >> @@ -1158,8 +1141,10 @@ void CameraDevice::requestComplete(Request *request) >> } >> } >> >> - if (descriptor->pendingStreamsToProcess_.empty()) >> + if (descriptor->pendingStreamsToProcess_.empty()) { >> + locker.unlock(); >> completeDescriptor(descriptor); >> + } >> } >> >> void CameraDevice::completeDescriptor(Camera3RequestDescriptor *descriptor) >> @@ -1245,6 +1230,13 @@ void CameraDevice::streamProcessingComplete(Camera3RequestDescriptor::StreamBuff >> MutexLocker locker(request->streamsProcessMutex_); >> >> request->pendingStreamsToProcess_.erase(streamBuffer->stream); >> + >> + if (!request->pendingStreamsToProcess_.empty()) >> + return; >> + >> + locker.unlock(); > Maybe > > { > MutexLocker locker(request->streamsProcessMutex_); > > request->pendingStreamsToProcess_.erase(streamBuffer->stream); > if (!request->pendingStreamsToProcess_.empty()) > return; > } > > up to you. Ok, yes, this looks cleaner > >> + >> + completeDescriptor(streamBuffer->request); >> } >> >> std::string CameraDevice::logPrefix() const >> diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp >> index fed99022..9023c13c 100644 >> --- a/src/android/camera_stream.cpp >> +++ b/src/android/camera_stream.cpp >> @@ -99,6 +99,7 @@ int CameraStream::configure() >> if (ret) >> return ret; >> >> + worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get()); >> postProcessor_->processComplete.connect( >> this, [&](Camera3RequestDescriptor::StreamBuffer *streamBuffer, >> PostProcessor::Status status) { >> @@ -112,6 +113,8 @@ int CameraStream::configure() >> cameraDevice_->streamProcessingComplete(streamBuffer, >> bufferStatus); >> }); >> + >> + worker_->start(); >> } >> >> if (type_ == Type::Internal) { >> @@ -178,10 +181,6 @@ int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer) >> streamBuffer->fence = -1; >> } >> >> - /* >> - * \todo Buffer mapping and processing should be moved to a >> - * separate thread. >> - */ >> const StreamConfiguration &output = configuration(); >> streamBuffer->dstBuffer = std::make_unique<CameraBuffer>( >> *streamBuffer->camera3Buffer, output.pixelFormat, output.size, >> @@ -191,11 +190,19 @@ int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer) >> return -EINVAL; >> } >> >> - postProcessor_->process(streamBuffer); >> + worker_->queueRequest(streamBuffer); >> >> return 0; >> } >> >> +void CameraStream::flush() >> +{ >> + if (!postProcessor_) >> + return; >> + >> + worker_->flush(); >> +} >> + >> FrameBuffer *CameraStream::getBuffer() >> { >> if (!allocator_) >> @@ -223,3 +230,87 @@ void CameraStream::putBuffer(FrameBuffer *buffer) >> >> buffers_.push_back(buffer); >> } >> + >> +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor) >> + : postProcessor_(postProcessor) >> +{ >> +} >> + >> +CameraStream::PostProcessorWorker::~PostProcessorWorker() >> +{ >> + { >> + libcamera::MutexLocker lock(mutex_); >> + state_ = State::Stopped; >> + } >> + >> + cv_.notify_one(); >> + wait(); >> +} >> + >> +void CameraStream::PostProcessorWorker::start() >> +{ >> + { >> + libcamera::MutexLocker lock(mutex_); >> + ASSERT(state_ != State::Running); >> + state_ = State::Running; >> + } >> + >> + Thread::start(); >> +} >> + >> +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest) >> +{ >> + { >> + MutexLocker lock(mutex_); >> + ASSERT(state_ == State::Running); >> + requests_.push(dest); >> + } >> + >> + cv_.notify_one(); >> +} >> + >> +void CameraStream::PostProcessorWorker::run() >> +{ >> + MutexLocker locker(mutex_); >> + >> + while (1) { >> + cv_.wait(locker, [&] { >> + return state_ != State::Running || !requests_.empty(); >> + }); >> + >> + if (state_ != State::Running) >> + break; >> + >> + Camera3RequestDescriptor::StreamBuffer *streamBuffer = requests_.front(); >> + requests_.pop(); >> + locker.unlock(); >> + >> + postProcessor_->process(streamBuffer); >> + >> + locker.lock(); >> + } >> + >> + if (state_ == State::Flushing) { >> + std::queue<Camera3RequestDescriptor::StreamBuffer *> requests = >> + std::move(requests_); >> + locker.unlock(); >> + >> + while (!requests.empty()) { >> + postProcessor_->processComplete.emit( >> + requests.front(), PostProcessor::Status::Error); >> + requests.pop(); >> + } >> + >> + locker.lock(); >> + state_ = State::Stopped; >> + } >> +} >> + >> +void CameraStream::PostProcessorWorker::flush() >> +{ >> + libcamera::MutexLocker lock(mutex_); >> + state_ = State::Flushing; >> + lock.unlock(); >> + >> + cv_.notify_one(); >> +} >> diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h >> index e74a9a3b..1588938a 100644 >> --- a/src/android/camera_stream.h >> +++ b/src/android/camera_stream.h >> @@ -7,12 +7,16 @@ >> #ifndef __ANDROID_CAMERA_STREAM_H__ >> #define __ANDROID_CAMERA_STREAM_H__ >> >> +#include <condition_variable> >> #include <memory> >> #include <mutex> >> +#include <queue> >> #include <vector> >> >> #include <hardware/camera3.h> >> >> +#include <libcamera/base/thread.h> >> + >> #include <libcamera/camera.h> >> #include <libcamera/framebuffer.h> >> #include <libcamera/framebuffer_allocator.h> >> @@ -20,6 +24,7 @@ >> #include <libcamera/pixel_format.h> >> >> #include "camera_request.h" >> +#include "post_processor.h" >> >> class CameraDevice; >> class PostProcessor; > You can drop the forward declaration. Ouch, What happens when you manually cherry-pick patches, because conflicts were too hard to deal with :-/ I will address these changes locally, should I re-post a new version v8 with those changes? > >> @@ -124,8 +129,38 @@ public: >> int process(Camera3RequestDescriptor::StreamBuffer *streamBuffer); >> libcamera::FrameBuffer *getBuffer(); >> void putBuffer(libcamera::FrameBuffer *buffer); >> + void flush(); >> >> private: >> + class PostProcessorWorker : public libcamera::Thread >> + { >> + public: >> + enum class State { >> + Stopped, >> + Running, >> + Flushing, >> + }; >> + >> + PostProcessorWorker(PostProcessor *postProcessor); >> + ~PostProcessorWorker(); >> + >> + void start(); >> + void queueRequest(Camera3RequestDescriptor::StreamBuffer *request); >> + void flush(); >> + >> + protected: >> + void run() override; >> + >> + private: >> + PostProcessor *postProcessor_; >> + >> + libcamera::Mutex mutex_; >> + std::condition_variable cv_; >> + >> + std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_; >> + State state_ = State::Stopped; >> + }; >> + >> int waitFence(int fence); >> >> CameraDevice *const cameraDevice_; >> @@ -142,6 +177,8 @@ private: >> */ >> std::unique_ptr<std::mutex> mutex_; >> std::unique_ptr<PostProcessor> postProcessor_; >> + >> + std::unique_ptr<PostProcessorWorker> worker_; >> }; >> >> #endif /* __ANDROID_CAMERA_STREAM__ */
Hi Umang, On Tue, Oct 26, 2021 at 10:09:22AM +0530, Umang Jain wrote: > On 10/26/21 3:35 AM, Laurent Pinchart wrote: > > On Tue, Oct 26, 2021 at 02:08:33AM +0530, Umang Jain wrote: > >> Introduce a dedicated worker class derived from libcamera::Thread. > >> The worker class maintains a queue for post-processing requests > >> and waits for a post-processing request to become available. > >> It will process them as per FIFO before de-queuing it from the > >> queue. > >> > >> The entire post-processing handling iteration is locked under > >> streamProcessMutex_ which helps us to queue all the post-processing > > s/streamProcessMutex_/streamsProcessMutex_/ > > > >> request at once, before any of the post-processing completion slot > >> (streamProcessingComplete()) is allowed to run for post-processing > >> requests completing in parallel. This helps us to manage both > >> synchronous and asynchronous errors encountered during the entire > >> post processing operation. Since a post-processing operation can > >> even complete after CameraDevice::requestComplete() has returned, > >> we need to check and complete the descriptor from > >> streamProcessingComplete() running in the PostProcessorWorker's > >> thread. > >> > >> This patch also implements a flush() for the PostProcessorWorker > >> class which is responsible to purge post-processing requests > >> queued up while a camera is stopping/flushing. It is hooked with > >> CameraStream::flush(), which isn't used currently but will be > >> used when we handle flush/stop scenarios in greater detail > >> subsequently (in a different patchset). > >> > >> The libcamera request completion handler CameraDevice::requestComplete() > >> assumes that the request that has just completed is at the front of the > >> queue. Now that the post-processor runs asynchronously, this isn't true > >> anymore, a request being post-processed will stay in the queue and a new > >> libcamera request may complete. Remove that assumption, and use the > >> request cookie to obtain the Camera3RequestDescriptor. > >> > >> Signed-off-by: Umang Jain <umang.jain@ideasonboard.com> > >> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com> > >> --- > >> src/android/camera_device.cpp | 44 ++++++--------- > >> src/android/camera_stream.cpp | 101 ++++++++++++++++++++++++++++++++-- > >> src/android/camera_stream.h | 37 +++++++++++++ > >> 3 files changed, 151 insertions(+), 31 deletions(-) > >> > >> diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp > >> index 3ded0f7e..53ebe0ea 100644 > >> --- a/src/android/camera_device.cpp > >> +++ b/src/android/camera_device.cpp > >> @@ -1027,29 +1027,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques > >> > >> void CameraDevice::requestComplete(Request *request) > >> { > >> - Camera3RequestDescriptor *descriptor; > >> - { > >> - MutexLocker descriptorsLock(descriptorsMutex_); > >> - ASSERT(!descriptors_.empty()); > >> - descriptor = descriptors_.front().get(); > >> - } > >> - > >> - if (descriptor->request_->cookie() != request->cookie()) { > >> - /* > >> - * \todo Clarify if the Camera has to be closed on > >> - * ERROR_DEVICE. > >> - */ > >> - LOG(HAL, Error) > >> - << "Out-of-order completion for request " > >> - << utils::hex(request->cookie()); > >> - > >> - MutexLocker descriptorsLock(descriptorsMutex_); > >> - descriptors_.pop(); > >> - > >> - notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE); > >> - > >> - return; > >> - } > >> + Camera3RequestDescriptor *descriptor = > >> + reinterpret_cast<Camera3RequestDescriptor *>(request->cookie()); > >> > >> /* > >> * Prepare the capture result for the Android camera stack. > >> @@ -1124,9 +1103,13 @@ void CameraDevice::requestComplete(Request *request) > >> } > >> > >> /* Handle post-processing. */ > >> + MutexLocker locker(descriptor->streamsProcessMutex_); > >> + > >> /* > >> - * \todo Protect the loop below with streamProcessMutex_ when post > > In the patch that introduced this, > > > > s/streamProcessMutex_/streamsProcessMutex_/ > > > >> - * processor runs asynchronously. > >> + * Queue all the post-processing streams request at once. The completion > >> + * slot streamProcessingComplete() can only execute when we are out > >> + * this critical section. This helps to handle synchronous errors here > >> + * itself. > >> */ > >> auto iter = descriptor->pendingStreamsToProcess_.begin(); > >> while (iter != descriptor->pendingStreamsToProcess_.end()) { > >> @@ -1158,8 +1141,10 @@ void CameraDevice::requestComplete(Request *request) > >> } > >> } > >> > >> - if (descriptor->pendingStreamsToProcess_.empty()) > >> + if (descriptor->pendingStreamsToProcess_.empty()) { > >> + locker.unlock(); > >> completeDescriptor(descriptor); > >> + } > >> } > >> > >> void CameraDevice::completeDescriptor(Camera3RequestDescriptor *descriptor) > >> @@ -1245,6 +1230,13 @@ void CameraDevice::streamProcessingComplete(Camera3RequestDescriptor::StreamBuff > >> MutexLocker locker(request->streamsProcessMutex_); > >> > >> request->pendingStreamsToProcess_.erase(streamBuffer->stream); > >> + > >> + if (!request->pendingStreamsToProcess_.empty()) > >> + return; > >> + > >> + locker.unlock(); > > Maybe > > > > { > > MutexLocker locker(request->streamsProcessMutex_); > > > > request->pendingStreamsToProcess_.erase(streamBuffer->stream); > > if (!request->pendingStreamsToProcess_.empty()) > > return; > > } > > > > up to you. > > > Ok, yes, this looks cleaner > > > > >> + > >> + completeDescriptor(streamBuffer->request); > >> } > >> > >> std::string CameraDevice::logPrefix() const > >> diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp > >> index fed99022..9023c13c 100644 > >> --- a/src/android/camera_stream.cpp > >> +++ b/src/android/camera_stream.cpp > >> @@ -99,6 +99,7 @@ int CameraStream::configure() > >> if (ret) > >> return ret; > >> > >> + worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get()); > >> postProcessor_->processComplete.connect( > >> this, [&](Camera3RequestDescriptor::StreamBuffer *streamBuffer, > >> PostProcessor::Status status) { > >> @@ -112,6 +113,8 @@ int CameraStream::configure() > >> cameraDevice_->streamProcessingComplete(streamBuffer, > >> bufferStatus); > >> }); > >> + > >> + worker_->start(); > >> } > >> > >> if (type_ == Type::Internal) { > >> @@ -178,10 +181,6 @@ int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer) > >> streamBuffer->fence = -1; > >> } > >> > >> - /* > >> - * \todo Buffer mapping and processing should be moved to a > >> - * separate thread. > >> - */ > >> const StreamConfiguration &output = configuration(); > >> streamBuffer->dstBuffer = std::make_unique<CameraBuffer>( > >> *streamBuffer->camera3Buffer, output.pixelFormat, output.size, > >> @@ -191,11 +190,19 @@ int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer) > >> return -EINVAL; > >> } > >> > >> - postProcessor_->process(streamBuffer); > >> + worker_->queueRequest(streamBuffer); > >> > >> return 0; > >> } > >> > >> +void CameraStream::flush() > >> +{ > >> + if (!postProcessor_) > >> + return; > >> + > >> + worker_->flush(); > >> +} > >> + > >> FrameBuffer *CameraStream::getBuffer() > >> { > >> if (!allocator_) > >> @@ -223,3 +230,87 @@ void CameraStream::putBuffer(FrameBuffer *buffer) > >> > >> buffers_.push_back(buffer); > >> } > >> + > >> +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor) > >> + : postProcessor_(postProcessor) > >> +{ > >> +} > >> + > >> +CameraStream::PostProcessorWorker::~PostProcessorWorker() > >> +{ > >> + { > >> + libcamera::MutexLocker lock(mutex_); > >> + state_ = State::Stopped; > >> + } > >> + > >> + cv_.notify_one(); > >> + wait(); > >> +} > >> + > >> +void CameraStream::PostProcessorWorker::start() > >> +{ > >> + { > >> + libcamera::MutexLocker lock(mutex_); > >> + ASSERT(state_ != State::Running); > >> + state_ = State::Running; > >> + } > >> + > >> + Thread::start(); > >> +} > >> + > >> +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest) > >> +{ > >> + { > >> + MutexLocker lock(mutex_); > >> + ASSERT(state_ == State::Running); > >> + requests_.push(dest); > >> + } > >> + > >> + cv_.notify_one(); > >> +} > >> + > >> +void CameraStream::PostProcessorWorker::run() > >> +{ > >> + MutexLocker locker(mutex_); > >> + > >> + while (1) { > >> + cv_.wait(locker, [&] { > >> + return state_ != State::Running || !requests_.empty(); > >> + }); > >> + > >> + if (state_ != State::Running) > >> + break; > >> + > >> + Camera3RequestDescriptor::StreamBuffer *streamBuffer = requests_.front(); > >> + requests_.pop(); > >> + locker.unlock(); > >> + > >> + postProcessor_->process(streamBuffer); > >> + > >> + locker.lock(); > >> + } > >> + > >> + if (state_ == State::Flushing) { > >> + std::queue<Camera3RequestDescriptor::StreamBuffer *> requests = > >> + std::move(requests_); > >> + locker.unlock(); > >> + > >> + while (!requests.empty()) { > >> + postProcessor_->processComplete.emit( > >> + requests.front(), PostProcessor::Status::Error); > >> + requests.pop(); > >> + } > >> + > >> + locker.lock(); > >> + state_ = State::Stopped; > >> + } > >> +} > >> + > >> +void CameraStream::PostProcessorWorker::flush() > >> +{ > >> + libcamera::MutexLocker lock(mutex_); > >> + state_ = State::Flushing; > >> + lock.unlock(); > >> + > >> + cv_.notify_one(); > >> +} > >> diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h > >> index e74a9a3b..1588938a 100644 > >> --- a/src/android/camera_stream.h > >> +++ b/src/android/camera_stream.h > >> @@ -7,12 +7,16 @@ > >> #ifndef __ANDROID_CAMERA_STREAM_H__ > >> #define __ANDROID_CAMERA_STREAM_H__ > >> > >> +#include <condition_variable> > >> #include <memory> > >> #include <mutex> > >> +#include <queue> > >> #include <vector> > >> > >> #include <hardware/camera3.h> > >> > >> +#include <libcamera/base/thread.h> > >> + > >> #include <libcamera/camera.h> > >> #include <libcamera/framebuffer.h> > >> #include <libcamera/framebuffer_allocator.h> > >> @@ -20,6 +24,7 @@ > >> #include <libcamera/pixel_format.h> > >> > >> #include "camera_request.h" > >> +#include "post_processor.h" > >> > >> class CameraDevice; > >> class PostProcessor; > > > > You can drop the forward declaration. > > Ouch, What happens when you manually cherry-pick patches, because > conflicts were too hard to deal with :-/ > > I will address these changes locally, should I re-post a new version v8 > with those changes? They're small enough, if you're confident you got them right, I don't need a v8 on the list. > >> @@ -124,8 +129,38 @@ public: > >> int process(Camera3RequestDescriptor::StreamBuffer *streamBuffer); > >> libcamera::FrameBuffer *getBuffer(); > >> void putBuffer(libcamera::FrameBuffer *buffer); > >> + void flush(); > >> > >> private: > >> + class PostProcessorWorker : public libcamera::Thread > >> + { > >> + public: > >> + enum class State { > >> + Stopped, > >> + Running, > >> + Flushing, > >> + }; > >> + > >> + PostProcessorWorker(PostProcessor *postProcessor); > >> + ~PostProcessorWorker(); > >> + > >> + void start(); > >> + void queueRequest(Camera3RequestDescriptor::StreamBuffer *request); > >> + void flush(); > >> + > >> + protected: > >> + void run() override; > >> + > >> + private: > >> + PostProcessor *postProcessor_; > >> + > >> + libcamera::Mutex mutex_; > >> + std::condition_variable cv_; > >> + > >> + std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_; > >> + State state_ = State::Stopped; > >> + }; > >> + > >> int waitFence(int fence); > >> > >> CameraDevice *const cameraDevice_; > >> @@ -142,6 +177,8 @@ private: > >> */ > >> std::unique_ptr<std::mutex> mutex_; > >> std::unique_ptr<PostProcessor> postProcessor_; > >> + > >> + std::unique_ptr<PostProcessorWorker> worker_; > >> }; > >> > >> #endif /* __ANDROID_CAMERA_STREAM__ */
Hi Laurent, On 10/26/21 12:50 PM, Laurent Pinchart wrote: > Hi Umang, > > On Tue, Oct 26, 2021 at 10:09:22AM +0530, Umang Jain wrote: >> On 10/26/21 3:35 AM, Laurent Pinchart wrote: >>> On Tue, Oct 26, 2021 at 02:08:33AM +0530, Umang Jain wrote: >>>> Introduce a dedicated worker class derived from libcamera::Thread. >>>> The worker class maintains a queue for post-processing requests >>>> and waits for a post-processing request to become available. >>>> It will process them as per FIFO before de-queuing it from the >>>> queue. >>>> >>>> The entire post-processing handling iteration is locked under >>>> streamProcessMutex_ which helps us to queue all the post-processing >>> s/streamProcessMutex_/streamsProcessMutex_/ >>> >>>> request at once, before any of the post-processing completion slot >>>> (streamProcessingComplete()) is allowed to run for post-processing >>>> requests completing in parallel. This helps us to manage both >>>> synchronous and asynchronous errors encountered during the entire >>>> post processing operation. Since a post-processing operation can >>>> even complete after CameraDevice::requestComplete() has returned, >>>> we need to check and complete the descriptor from >>>> streamProcessingComplete() running in the PostProcessorWorker's >>>> thread. >>>> >>>> This patch also implements a flush() for the PostProcessorWorker >>>> class which is responsible to purge post-processing requests >>>> queued up while a camera is stopping/flushing. It is hooked with >>>> CameraStream::flush(), which isn't used currently but will be >>>> used when we handle flush/stop scenarios in greater detail >>>> subsequently (in a different patchset). >>>> >>>> The libcamera request completion handler CameraDevice::requestComplete() >>>> assumes that the request that has just completed is at the front of the >>>> queue. Now that the post-processor runs asynchronously, this isn't true >>>> anymore, a request being post-processed will stay in the queue and a new >>>> libcamera request may complete. Remove that assumption, and use the >>>> request cookie to obtain the Camera3RequestDescriptor. >>>> >>>> Signed-off-by: Umang Jain <umang.jain@ideasonboard.com> >>>> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com> >>>> --- >>>> src/android/camera_device.cpp | 44 ++++++--------- >>>> src/android/camera_stream.cpp | 101 ++++++++++++++++++++++++++++++++-- >>>> src/android/camera_stream.h | 37 +++++++++++++ >>>> 3 files changed, 151 insertions(+), 31 deletions(-) >>>> >>>> diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp >>>> index 3ded0f7e..53ebe0ea 100644 >>>> --- a/src/android/camera_device.cpp >>>> +++ b/src/android/camera_device.cpp >>>> @@ -1027,29 +1027,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques >>>> >>>> void CameraDevice::requestComplete(Request *request) >>>> { >>>> - Camera3RequestDescriptor *descriptor; >>>> - { >>>> - MutexLocker descriptorsLock(descriptorsMutex_); >>>> - ASSERT(!descriptors_.empty()); >>>> - descriptor = descriptors_.front().get(); >>>> - } >>>> - >>>> - if (descriptor->request_->cookie() != request->cookie()) { >>>> - /* >>>> - * \todo Clarify if the Camera has to be closed on >>>> - * ERROR_DEVICE. >>>> - */ >>>> - LOG(HAL, Error) >>>> - << "Out-of-order completion for request " >>>> - << utils::hex(request->cookie()); >>>> - >>>> - MutexLocker descriptorsLock(descriptorsMutex_); >>>> - descriptors_.pop(); >>>> - >>>> - notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE); >>>> - >>>> - return; >>>> - } >>>> + Camera3RequestDescriptor *descriptor = >>>> + reinterpret_cast<Camera3RequestDescriptor *>(request->cookie()); >>>> >>>> /* >>>> * Prepare the capture result for the Android camera stack. >>>> @@ -1124,9 +1103,13 @@ void CameraDevice::requestComplete(Request *request) >>>> } >>>> >>>> /* Handle post-processing. */ >>>> + MutexLocker locker(descriptor->streamsProcessMutex_); >>>> + >>>> /* >>>> - * \todo Protect the loop below with streamProcessMutex_ when post >>> In the patch that introduced this, >>> >>> s/streamProcessMutex_/streamsProcessMutex_/ >>> >>>> - * processor runs asynchronously. >>>> + * Queue all the post-processing streams request at once. The completion >>>> + * slot streamProcessingComplete() can only execute when we are out >>>> + * this critical section. This helps to handle synchronous errors here >>>> + * itself. >>>> */ >>>> auto iter = descriptor->pendingStreamsToProcess_.begin(); >>>> while (iter != descriptor->pendingStreamsToProcess_.end()) { >>>> @@ -1158,8 +1141,10 @@ void CameraDevice::requestComplete(Request *request) >>>> } >>>> } >>>> >>>> - if (descriptor->pendingStreamsToProcess_.empty()) >>>> + if (descriptor->pendingStreamsToProcess_.empty()) { >>>> + locker.unlock(); >>>> completeDescriptor(descriptor); >>>> + } >>>> } >>>> >>>> void CameraDevice::completeDescriptor(Camera3RequestDescriptor *descriptor) >>>> @@ -1245,6 +1230,13 @@ void CameraDevice::streamProcessingComplete(Camera3RequestDescriptor::StreamBuff >>>> MutexLocker locker(request->streamsProcessMutex_); >>>> >>>> request->pendingStreamsToProcess_.erase(streamBuffer->stream); >>>> + >>>> + if (!request->pendingStreamsToProcess_.empty()) >>>> + return; >>>> + >>>> + locker.unlock(); >>> Maybe >>> >>> { >>> MutexLocker locker(request->streamsProcessMutex_); >>> >>> request->pendingStreamsToProcess_.erase(streamBuffer->stream); >>> if (!request->pendingStreamsToProcess_.empty()) >>> return; >>> } >>> >>> up to you. >> >> Ok, yes, this looks cleaner >> >>>> + >>>> + completeDescriptor(streamBuffer->request); >>>> } >>>> >>>> std::string CameraDevice::logPrefix() const >>>> diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp >>>> index fed99022..9023c13c 100644 >>>> --- a/src/android/camera_stream.cpp >>>> +++ b/src/android/camera_stream.cpp >>>> @@ -99,6 +99,7 @@ int CameraStream::configure() >>>> if (ret) >>>> return ret; >>>> >>>> + worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get()); >>>> postProcessor_->processComplete.connect( >>>> this, [&](Camera3RequestDescriptor::StreamBuffer *streamBuffer, >>>> PostProcessor::Status status) { >>>> @@ -112,6 +113,8 @@ int CameraStream::configure() >>>> cameraDevice_->streamProcessingComplete(streamBuffer, >>>> bufferStatus); >>>> }); >>>> + >>>> + worker_->start(); >>>> } >>>> >>>> if (type_ == Type::Internal) { >>>> @@ -178,10 +181,6 @@ int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer) >>>> streamBuffer->fence = -1; >>>> } >>>> >>>> - /* >>>> - * \todo Buffer mapping and processing should be moved to a >>>> - * separate thread. >>>> - */ >>>> const StreamConfiguration &output = configuration(); >>>> streamBuffer->dstBuffer = std::make_unique<CameraBuffer>( >>>> *streamBuffer->camera3Buffer, output.pixelFormat, output.size, >>>> @@ -191,11 +190,19 @@ int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer) >>>> return -EINVAL; >>>> } >>>> >>>> - postProcessor_->process(streamBuffer); >>>> + worker_->queueRequest(streamBuffer); >>>> >>>> return 0; >>>> } >>>> >>>> +void CameraStream::flush() >>>> +{ >>>> + if (!postProcessor_) >>>> + return; >>>> + >>>> + worker_->flush(); >>>> +} >>>> + >>>> FrameBuffer *CameraStream::getBuffer() >>>> { >>>> if (!allocator_) >>>> @@ -223,3 +230,87 @@ void CameraStream::putBuffer(FrameBuffer *buffer) >>>> >>>> buffers_.push_back(buffer); >>>> } >>>> + >>>> +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor) >>>> + : postProcessor_(postProcessor) >>>> +{ >>>> +} >>>> + >>>> +CameraStream::PostProcessorWorker::~PostProcessorWorker() >>>> +{ >>>> + { >>>> + libcamera::MutexLocker lock(mutex_); >>>> + state_ = State::Stopped; >>>> + } >>>> + >>>> + cv_.notify_one(); >>>> + wait(); >>>> +} >>>> + >>>> +void CameraStream::PostProcessorWorker::start() >>>> +{ >>>> + { >>>> + libcamera::MutexLocker lock(mutex_); >>>> + ASSERT(state_ != State::Running); >>>> + state_ = State::Running; >>>> + } >>>> + >>>> + Thread::start(); >>>> +} >>>> + >>>> +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest) >>>> +{ >>>> + { >>>> + MutexLocker lock(mutex_); >>>> + ASSERT(state_ == State::Running); >>>> + requests_.push(dest); >>>> + } >>>> + >>>> + cv_.notify_one(); >>>> +} >>>> + >>>> +void CameraStream::PostProcessorWorker::run() >>>> +{ >>>> + MutexLocker locker(mutex_); >>>> + >>>> + while (1) { >>>> + cv_.wait(locker, [&] { >>>> + return state_ != State::Running || !requests_.empty(); >>>> + }); >>>> + >>>> + if (state_ != State::Running) >>>> + break; >>>> + >>>> + Camera3RequestDescriptor::StreamBuffer *streamBuffer = requests_.front(); >>>> + requests_.pop(); >>>> + locker.unlock(); >>>> + >>>> + postProcessor_->process(streamBuffer); >>>> + >>>> + locker.lock(); >>>> + } >>>> + >>>> + if (state_ == State::Flushing) { >>>> + std::queue<Camera3RequestDescriptor::StreamBuffer *> requests = >>>> + std::move(requests_); >>>> + locker.unlock(); >>>> + >>>> + while (!requests.empty()) { >>>> + postProcessor_->processComplete.emit( >>>> + requests.front(), PostProcessor::Status::Error); >>>> + requests.pop(); >>>> + } >>>> + >>>> + locker.lock(); >>>> + state_ = State::Stopped; >>>> + } >>>> +} >>>> + >>>> +void CameraStream::PostProcessorWorker::flush() >>>> +{ >>>> + libcamera::MutexLocker lock(mutex_); >>>> + state_ = State::Flushing; >>>> + lock.unlock(); >>>> + >>>> + cv_.notify_one(); >>>> +} >>>> diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h >>>> index e74a9a3b..1588938a 100644 >>>> --- a/src/android/camera_stream.h >>>> +++ b/src/android/camera_stream.h >>>> @@ -7,12 +7,16 @@ >>>> #ifndef __ANDROID_CAMERA_STREAM_H__ >>>> #define __ANDROID_CAMERA_STREAM_H__ >>>> >>>> +#include <condition_variable> >>>> #include <memory> >>>> #include <mutex> >>>> +#include <queue> >>>> #include <vector> >>>> >>>> #include <hardware/camera3.h> >>>> >>>> +#include <libcamera/base/thread.h> >>>> + >>>> #include <libcamera/camera.h> >>>> #include <libcamera/framebuffer.h> >>>> #include <libcamera/framebuffer_allocator.h> >>>> @@ -20,6 +24,7 @@ >>>> #include <libcamera/pixel_format.h> >>>> >>>> #include "camera_request.h" >>>> +#include "post_processor.h" >>>> >>>> class CameraDevice; >>>> class PostProcessor; >>> You can drop the forward declaration. >> Ouch, What happens when you manually cherry-pick patches, because >> conflicts were too hard to deal with :-/ >> >> I will address these changes locally, should I re-post a new version v8 >> with those changes? > They're small enough, if you're confident you got them right, I don't > need a v8 on the list. Aahh.. Saw this email right now. Already posted v8 on the list. Anyway, we are missing one R-B tag on last one patch, so it would be good for someone to look at cleaned up patches. > >>>> @@ -124,8 +129,38 @@ public: >>>> int process(Camera3RequestDescriptor::StreamBuffer *streamBuffer); >>>> libcamera::FrameBuffer *getBuffer(); >>>> void putBuffer(libcamera::FrameBuffer *buffer); >>>> + void flush(); >>>> >>>> private: >>>> + class PostProcessorWorker : public libcamera::Thread >>>> + { >>>> + public: >>>> + enum class State { >>>> + Stopped, >>>> + Running, >>>> + Flushing, >>>> + }; >>>> + >>>> + PostProcessorWorker(PostProcessor *postProcessor); >>>> + ~PostProcessorWorker(); >>>> + >>>> + void start(); >>>> + void queueRequest(Camera3RequestDescriptor::StreamBuffer *request); >>>> + void flush(); >>>> + >>>> + protected: >>>> + void run() override; >>>> + >>>> + private: >>>> + PostProcessor *postProcessor_; >>>> + >>>> + libcamera::Mutex mutex_; >>>> + std::condition_variable cv_; >>>> + >>>> + std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_; >>>> + State state_ = State::Stopped; >>>> + }; >>>> + >>>> int waitFence(int fence); >>>> >>>> CameraDevice *const cameraDevice_; >>>> @@ -142,6 +177,8 @@ private: >>>> */ >>>> std::unique_ptr<std::mutex> mutex_; >>>> std::unique_ptr<PostProcessor> postProcessor_; >>>> + >>>> + std::unique_ptr<PostProcessorWorker> worker_; >>>> }; >>>> >>>> #endif /* __ANDROID_CAMERA_STREAM__ */
diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp index 3ded0f7e..53ebe0ea 100644 --- a/src/android/camera_device.cpp +++ b/src/android/camera_device.cpp @@ -1027,29 +1027,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques void CameraDevice::requestComplete(Request *request) { - Camera3RequestDescriptor *descriptor; - { - MutexLocker descriptorsLock(descriptorsMutex_); - ASSERT(!descriptors_.empty()); - descriptor = descriptors_.front().get(); - } - - if (descriptor->request_->cookie() != request->cookie()) { - /* - * \todo Clarify if the Camera has to be closed on - * ERROR_DEVICE. - */ - LOG(HAL, Error) - << "Out-of-order completion for request " - << utils::hex(request->cookie()); - - MutexLocker descriptorsLock(descriptorsMutex_); - descriptors_.pop(); - - notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE); - - return; - } + Camera3RequestDescriptor *descriptor = + reinterpret_cast<Camera3RequestDescriptor *>(request->cookie()); /* * Prepare the capture result for the Android camera stack. @@ -1124,9 +1103,13 @@ void CameraDevice::requestComplete(Request *request) } /* Handle post-processing. */ + MutexLocker locker(descriptor->streamsProcessMutex_); + /* - * \todo Protect the loop below with streamProcessMutex_ when post - * processor runs asynchronously. + * Queue all the post-processing streams request at once. The completion + * slot streamProcessingComplete() can only execute when we are out + * this critical section. This helps to handle synchronous errors here + * itself. */ auto iter = descriptor->pendingStreamsToProcess_.begin(); while (iter != descriptor->pendingStreamsToProcess_.end()) { @@ -1158,8 +1141,10 @@ void CameraDevice::requestComplete(Request *request) } } - if (descriptor->pendingStreamsToProcess_.empty()) + if (descriptor->pendingStreamsToProcess_.empty()) { + locker.unlock(); completeDescriptor(descriptor); + } } void CameraDevice::completeDescriptor(Camera3RequestDescriptor *descriptor) @@ -1245,6 +1230,13 @@ void CameraDevice::streamProcessingComplete(Camera3RequestDescriptor::StreamBuff MutexLocker locker(request->streamsProcessMutex_); request->pendingStreamsToProcess_.erase(streamBuffer->stream); + + if (!request->pendingStreamsToProcess_.empty()) + return; + + locker.unlock(); + + completeDescriptor(streamBuffer->request); } std::string CameraDevice::logPrefix() const diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp index fed99022..9023c13c 100644 --- a/src/android/camera_stream.cpp +++ b/src/android/camera_stream.cpp @@ -99,6 +99,7 @@ int CameraStream::configure() if (ret) return ret; + worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get()); postProcessor_->processComplete.connect( this, [&](Camera3RequestDescriptor::StreamBuffer *streamBuffer, PostProcessor::Status status) { @@ -112,6 +113,8 @@ int CameraStream::configure() cameraDevice_->streamProcessingComplete(streamBuffer, bufferStatus); }); + + worker_->start(); } if (type_ == Type::Internal) { @@ -178,10 +181,6 @@ int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer) streamBuffer->fence = -1; } - /* - * \todo Buffer mapping and processing should be moved to a - * separate thread. - */ const StreamConfiguration &output = configuration(); streamBuffer->dstBuffer = std::make_unique<CameraBuffer>( *streamBuffer->camera3Buffer, output.pixelFormat, output.size, @@ -191,11 +190,19 @@ int CameraStream::process(Camera3RequestDescriptor::StreamBuffer *streamBuffer) return -EINVAL; } - postProcessor_->process(streamBuffer); + worker_->queueRequest(streamBuffer); return 0; } +void CameraStream::flush() +{ + if (!postProcessor_) + return; + + worker_->flush(); +} + FrameBuffer *CameraStream::getBuffer() { if (!allocator_) @@ -223,3 +230,87 @@ void CameraStream::putBuffer(FrameBuffer *buffer) buffers_.push_back(buffer); } + +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor) + : postProcessor_(postProcessor) +{ +} + +CameraStream::PostProcessorWorker::~PostProcessorWorker() +{ + { + libcamera::MutexLocker lock(mutex_); + state_ = State::Stopped; + } + + cv_.notify_one(); + wait(); +} + +void CameraStream::PostProcessorWorker::start() +{ + { + libcamera::MutexLocker lock(mutex_); + ASSERT(state_ != State::Running); + state_ = State::Running; + } + + Thread::start(); +} + +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest) +{ + { + MutexLocker lock(mutex_); + ASSERT(state_ == State::Running); + requests_.push(dest); + } + + cv_.notify_one(); +} + +void CameraStream::PostProcessorWorker::run() +{ + MutexLocker locker(mutex_); + + while (1) { + cv_.wait(locker, [&] { + return state_ != State::Running || !requests_.empty(); + }); + + if (state_ != State::Running) + break; + + Camera3RequestDescriptor::StreamBuffer *streamBuffer = requests_.front(); + requests_.pop(); + locker.unlock(); + + postProcessor_->process(streamBuffer); + + locker.lock(); + } + + if (state_ == State::Flushing) { + std::queue<Camera3RequestDescriptor::StreamBuffer *> requests = + std::move(requests_); + locker.unlock(); + + while (!requests.empty()) { + postProcessor_->processComplete.emit( + requests.front(), PostProcessor::Status::Error); + requests.pop(); + } + + locker.lock(); + state_ = State::Stopped; + } +} + +void CameraStream::PostProcessorWorker::flush() +{ + libcamera::MutexLocker lock(mutex_); + state_ = State::Flushing; + lock.unlock(); + + cv_.notify_one(); +} diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h index e74a9a3b..1588938a 100644 --- a/src/android/camera_stream.h +++ b/src/android/camera_stream.h @@ -7,12 +7,16 @@ #ifndef __ANDROID_CAMERA_STREAM_H__ #define __ANDROID_CAMERA_STREAM_H__ +#include <condition_variable> #include <memory> #include <mutex> +#include <queue> #include <vector> #include <hardware/camera3.h> +#include <libcamera/base/thread.h> + #include <libcamera/camera.h> #include <libcamera/framebuffer.h> #include <libcamera/framebuffer_allocator.h> @@ -20,6 +24,7 @@ #include <libcamera/pixel_format.h> #include "camera_request.h" +#include "post_processor.h" class CameraDevice; class PostProcessor; @@ -124,8 +129,38 @@ public: int process(Camera3RequestDescriptor::StreamBuffer *streamBuffer); libcamera::FrameBuffer *getBuffer(); void putBuffer(libcamera::FrameBuffer *buffer); + void flush(); private: + class PostProcessorWorker : public libcamera::Thread + { + public: + enum class State { + Stopped, + Running, + Flushing, + }; + + PostProcessorWorker(PostProcessor *postProcessor); + ~PostProcessorWorker(); + + void start(); + void queueRequest(Camera3RequestDescriptor::StreamBuffer *request); + void flush(); + + protected: + void run() override; + + private: + PostProcessor *postProcessor_; + + libcamera::Mutex mutex_; + std::condition_variable cv_; + + std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_; + State state_ = State::Stopped; + }; + int waitFence(int fence); CameraDevice *const cameraDevice_; @@ -142,6 +177,8 @@ private: */ std::unique_ptr<std::mutex> mutex_; std::unique_ptr<PostProcessor> postProcessor_; + + std::unique_ptr<PostProcessorWorker> worker_; }; #endif /* __ANDROID_CAMERA_STREAM__ */