Message ID | 20211020104212.121743-4-umang.jain@ideasonboard.com |
---|---|
State | Superseded |
Headers | show |
Series |
|
Related | show |
Hi Umang, Thank you for the patch. On Wed, Oct 20, 2021 at 04:12:11PM +0530, Umang Jain wrote: > Introduce a dedicated worker class derived from libcamera::Thread. > The worker class maintains a queue for post-processing requests > and waits for a post-processing request to become available. > It will process them as per FIFO before de-queuing it from the > queue. > > To get access to the source and destination buffers in the worker > thread, we also need to save a pointer to them in the s/a pointer/pointers/ > Camera3RequestDescriptor. > > This patch also implements a flush() for the PostProcessorWorker > class which is responsible to purge post-processing requests > queued up while a camera is stopping/flushing. It is hooked with > CameraStream::flush(), which isn't used currently but will be > used when we handle flush/stop scnearios in greater detail > subsequently (in a different patchset). > > The libcamera request completion handler CameraDevice::requestComplete() > assumes that the request that has just completed is at the front of the > queue. Now that the post-processor runs asynchronously, this isn't true > anymore, a request being post-processed will stay in the queue and a new > libcamera request may complete. Remove that assumption, and use the > request cookie to obtain the Camera3RequestDescriptor. > > Signed-off-by: Umang Jain <umang.jain@ideasonboard.com> > Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com> > --- > src/android/camera_device.cpp | 46 +++++---------- > src/android/camera_request.h | 4 ++ > src/android/camera_stream.cpp | 107 +++++++++++++++++++++++++++++++--- > src/android/camera_stream.h | 38 +++++++++++- > 4 files changed, 156 insertions(+), 39 deletions(-) > > diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp > index 541c2c81..b14416ce 100644 > --- a/src/android/camera_device.cpp > +++ b/src/android/camera_device.cpp > @@ -1020,29 +1020,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques > > void CameraDevice::requestComplete(Request *request) > { > - Camera3RequestDescriptor *descriptor; > - { > - MutexLocker descriptorsLock(descriptorsMutex_); > - ASSERT(!descriptors_.empty()); > - descriptor = descriptors_.front().get(); > - } > - > - if (descriptor->request_->cookie() != request->cookie()) { > - /* > - * \todo Clarify if the Camera has to be closed on > - * ERROR_DEVICE. > - */ > - LOG(HAL, Error) > - << "Out-of-order completion for request " > - << utils::hex(request->cookie()); > - > - MutexLocker descriptorsLock(descriptorsMutex_); > - descriptors_.pop(); > - > - notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE); > - > - return; > - } > + Camera3RequestDescriptor *descriptor = > + reinterpret_cast<Camera3RequestDescriptor *>(request->cookie()); > > /* > * Prepare the capture result for the Android camera stack. > @@ -1120,12 +1099,7 @@ void CameraDevice::requestComplete(Request *request) > bool needsPostProcessing = false; > Camera3RequestDescriptor::Status processingStatus = > Camera3RequestDescriptor::Status::Pending; > - /* > - * \todo Apply streamsProcessMutex_ when post-processing is adapted to run > - * asynchronously. If we introduce the streamsProcessMutex_ right away, the > - * lock will be held forever since it is synchronous at this point > - * (see streamProcessingComplete). > - */ > + > for (auto &buffer : descriptor->buffers_) { > CameraStream *stream = buffer.stream; > > @@ -1145,7 +1119,13 @@ void CameraDevice::requestComplete(Request *request) > buffer.internalBuffer = src; > > needsPostProcessing = true; > - int ret = stream->process(*src, buffer, descriptor); > + > + int ret; > + { > + MutexLocker locker(descriptor->streamsProcessMutex_); It may be possible (see my review of 2/4) to only lock access to the data structures and not cover the process() call. Let's see how it turns out. > + ret = stream->process(*src, buffer, descriptor); > + } > + > if (ret) { > setBufferStatus(stream, buffer, descriptor, > Camera3RequestDescriptor::Status::Error); > @@ -1156,6 +1136,12 @@ void CameraDevice::requestComplete(Request *request) > if (needsPostProcessing) { > if (processingStatus == Camera3RequestDescriptor::Status::Error) { > descriptor->status_ = processingStatus; > + /* > + * \todo This is problematic when some streams are > + * queued successfully, but some fail to get queued. > + * We might end up with use-after-free situation for > + * descriptor in streamProcessingComplete(). > + */ Hopefully this will be fixed in v6 of 2/4 :-) > sendCaptureResults(); > } > > diff --git a/src/android/camera_request.h b/src/android/camera_request.h > index 3a2774e0..85274414 100644 > --- a/src/android/camera_request.h > +++ b/src/android/camera_request.h > @@ -18,6 +18,7 @@ > > #include <hardware/camera3.h> > > +#include "camera_buffer.h" > #include "camera_metadata.h" > #include "camera_worker.h" > > @@ -39,6 +40,9 @@ public: > int fence; > Status status; > libcamera::FrameBuffer *internalBuffer = nullptr; > + std::unique_ptr<CameraBuffer> destBuffer; > + const libcamera::FrameBuffer *srcBuffer; > + Camera3RequestDescriptor *request = nullptr; > }; > > Camera3RequestDescriptor(libcamera::Camera *camera, > diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp > index b3cb06a2..a29ce33b 100644 > --- a/src/android/camera_stream.cpp > +++ b/src/android/camera_stream.cpp > @@ -99,6 +99,7 @@ int CameraStream::configure() > if (ret) > return ret; > > + worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get()); > postProcessor_->processComplete.connect( > this, [&](Camera3RequestDescriptor *request, PostProcessor::Status status) { > Camera3RequestDescriptor::Status bufferStatus = > @@ -110,6 +111,7 @@ int CameraStream::configure() > cameraDevice_->streamProcessingComplete(this, request, > bufferStatus); > }); > + worker_->start(); > } > > if (type_ == Type::Internal) { > @@ -179,23 +181,31 @@ int CameraStream::process(const FrameBuffer &source, > if (!postProcessor_) > return 0; > > - /* > - * \todo Buffer mapping and processing should be moved to a > - * separate thread. > - */ > const StreamConfiguration &output = configuration(); > - CameraBuffer destBuffer(*dest.camera3Buffer, output.pixelFormat, > - output.size, PROT_READ | PROT_WRITE); > - if (!destBuffer.isValid()) { > + dest.destBuffer = std::make_unique<CameraBuffer>( > + *dest.camera3Buffer, output.pixelFormat, output.size, PROT_READ | PROT_WRITE); dest.destBuffer = std::make_unique<CameraBuffer>( *dest.camera3Buffer, output.pixelFormat, output.size, PROT_READ | PROT_WRITE); > + if (!dest.destBuffer->isValid()) { > LOG(HAL, Error) << "Failed to create destination buffer"; > return -EINVAL; > } > > - postProcessor_->process(source, &destBuffer, request); > + dest.srcBuffer = &source; > + dest.request = request; > + > + /* Push the postProcessor request to the worker queue. */ > + worker_->queueRequest(&dest); > > return 0; > } > > +void CameraStream::flush() > +{ > + if (!postProcessor_) > + return; > + > + worker_->flush(); > +} > + > FrameBuffer *CameraStream::getBuffer() > { > if (!allocator_) > @@ -223,3 +233,84 @@ void CameraStream::putBuffer(FrameBuffer *buffer) > > buffers_.push_back(buffer); > } > + > +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor) > + : postProcessor_(postProcessor) > +{ > +} > + > +CameraStream::PostProcessorWorker::~PostProcessorWorker() > +{ > + { > + libcamera::MutexLocker lock(mutex_); > + state_ = State::Stopped; > + } > + > + cv_.notify_one(); > + wait(); > +} > + > +void CameraStream::PostProcessorWorker::start() > +{ > + { > + libcamera::MutexLocker lock(mutex_); Could you add a ASSERT(state_ != State::Running); here ? I'm worried that a future refactoring will call CameraStream::configure() multiple times, and that would result in the worker being start multiple times too. > + state_ = State::Running; > + } > + > + Thread::start(); > +} > + > +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest) > +{ > + { > + MutexLocker lock(mutex_); > + ASSERT(state_ == State::Running); > + requests_.push(dest); > + } > + > + cv_.notify_one(); > +} > + > +void CameraStream::PostProcessorWorker::run() > +{ > + MutexLocker locker(mutex_); > + > + while (1) { > + cv_.wait(locker, [&] { > + return state_ != State::Running || !requests_.empty(); > + }); > + > + if (state_ != State::Running) > + break; > + > + Camera3RequestDescriptor::StreamBuffer *stream = requests_.front(); > + requests_.pop(); > + locker.unlock(); > + > + postProcessor_->process(*stream->srcBuffer, stream->destBuffer.get(), > + stream->request); > + > + locker.lock(); > + } > + > + if (state_ == State::Flushing) { > + while (!requests_.empty()) { > + postProcessor_->processComplete.emit( > + requests_.front()->request, > + PostProcessor::Status::Error); I'm also a tiny bit concerned here that we're emitting the signal with the lock held, while for requests that complete normally we don't. The behaviour isn't consistent and could cause issues in the CameraStream or CameraDevice classes. I think it's harmless for now, but maybe we could drop the lock around the emit(à call ? One option to avoid dropping and taking the lock in every iteration would be std::queue<Camera3RequestDescriptor::StreamBuffer *> requests = std::move(requests_); locker.unlock(); while (!requests.empty()) { postProcessor_->processComplete.emit( requests.front()->request, PostProcessor::Status::Error); requests.pop(); } locker.lock(); And now that I think about it, given that we now have a StreamBuffer object, wouldn't it be simpler to pass the StreamBuffer instead of the Camera3RequestDescriptor to the signal ? > + requests_.pop(); > + } > + > + state_ = State::Stopped; > + locker.unlock(); You can drop the unlock() call here. > + } > +} > + > +void CameraStream::PostProcessorWorker::flush() > +{ > + libcamera::MutexLocker lock(mutex_); > + state_ = State::Flushing; > + lock.unlock(); > + > + cv_.notify_one(); > +} > diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h > index f242336e..64e32c77 100644 > --- a/src/android/camera_stream.h > +++ b/src/android/camera_stream.h > @@ -7,12 +7,16 @@ > #ifndef __ANDROID_CAMERA_STREAM_H__ > #define __ANDROID_CAMERA_STREAM_H__ > > +#include <condition_variable> > #include <memory> > #include <mutex> > +#include <queue> > #include <vector> > > #include <hardware/camera3.h> > > +#include <libcamera/base/thread.h> > + > #include <libcamera/camera.h> > #include <libcamera/framebuffer.h> > #include <libcamera/framebuffer_allocator.h> > @@ -20,9 +24,9 @@ > #include <libcamera/pixel_format.h> > > #include "camera_request.h" > +#include "post_processor.h" > > class CameraDevice; > -class PostProcessor; > > class CameraStream > { > @@ -126,8 +130,38 @@ public: > Camera3RequestDescriptor *request); > libcamera::FrameBuffer *getBuffer(); > void putBuffer(libcamera::FrameBuffer *buffer); > + void flush(); > > private: > + class PostProcessorWorker : public libcamera::Thread > + { > + public: > + enum class State { > + Stopped, > + Running, > + Flushing, > + }; > + > + PostProcessorWorker(PostProcessor *postProcessor); > + ~PostProcessorWorker(); > + > + void start(); > + void queueRequest(Camera3RequestDescriptor::StreamBuffer *request); > + void flush(); > + > + protected: > + void run() override; > + > + private: > + PostProcessor *postProcessor_; > + > + libcamera::Mutex mutex_; > + std::condition_variable cv_; > + > + std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_; > + State state_; > + }; > + > int waitFence(int fence); > > CameraDevice *const cameraDevice_; > @@ -144,6 +178,8 @@ private: > */ > std::unique_ptr<std::mutex> mutex_; > std::unique_ptr<PostProcessor> postProcessor_; > + > + std::unique_ptr<PostProcessorWorker> worker_; > }; > > #endif /* __ANDROID_CAMERA_STREAM__ */
Hi Laurent, On 10/21/21 7:03 AM, Laurent Pinchart wrote: > Hi Umang, > > Thank you for the patch. > > On Wed, Oct 20, 2021 at 04:12:11PM +0530, Umang Jain wrote: >> Introduce a dedicated worker class derived from libcamera::Thread. >> The worker class maintains a queue for post-processing requests >> and waits for a post-processing request to become available. >> It will process them as per FIFO before de-queuing it from the >> queue. >> >> To get access to the source and destination buffers in the worker >> thread, we also need to save a pointer to them in the > s/a pointer/pointers/ > >> Camera3RequestDescriptor. >> >> This patch also implements a flush() for the PostProcessorWorker >> class which is responsible to purge post-processing requests >> queued up while a camera is stopping/flushing. It is hooked with >> CameraStream::flush(), which isn't used currently but will be >> used when we handle flush/stop scnearios in greater detail >> subsequently (in a different patchset). >> >> The libcamera request completion handler CameraDevice::requestComplete() >> assumes that the request that has just completed is at the front of the >> queue. Now that the post-processor runs asynchronously, this isn't true >> anymore, a request being post-processed will stay in the queue and a new >> libcamera request may complete. Remove that assumption, and use the >> request cookie to obtain the Camera3RequestDescriptor. >> >> Signed-off-by: Umang Jain <umang.jain@ideasonboard.com> >> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com> >> --- >> src/android/camera_device.cpp | 46 +++++---------- >> src/android/camera_request.h | 4 ++ >> src/android/camera_stream.cpp | 107 +++++++++++++++++++++++++++++++--- >> src/android/camera_stream.h | 38 +++++++++++- >> 4 files changed, 156 insertions(+), 39 deletions(-) >> >> diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp >> index 541c2c81..b14416ce 100644 >> --- a/src/android/camera_device.cpp >> +++ b/src/android/camera_device.cpp >> @@ -1020,29 +1020,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques >> >> void CameraDevice::requestComplete(Request *request) >> { >> - Camera3RequestDescriptor *descriptor; >> - { >> - MutexLocker descriptorsLock(descriptorsMutex_); >> - ASSERT(!descriptors_.empty()); >> - descriptor = descriptors_.front().get(); >> - } >> - >> - if (descriptor->request_->cookie() != request->cookie()) { >> - /* >> - * \todo Clarify if the Camera has to be closed on >> - * ERROR_DEVICE. >> - */ >> - LOG(HAL, Error) >> - << "Out-of-order completion for request " >> - << utils::hex(request->cookie()); >> - >> - MutexLocker descriptorsLock(descriptorsMutex_); >> - descriptors_.pop(); >> - >> - notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE); >> - >> - return; >> - } >> + Camera3RequestDescriptor *descriptor = >> + reinterpret_cast<Camera3RequestDescriptor *>(request->cookie()); >> >> /* >> * Prepare the capture result for the Android camera stack. >> @@ -1120,12 +1099,7 @@ void CameraDevice::requestComplete(Request *request) >> bool needsPostProcessing = false; >> Camera3RequestDescriptor::Status processingStatus = >> Camera3RequestDescriptor::Status::Pending; >> - /* >> - * \todo Apply streamsProcessMutex_ when post-processing is adapted to run >> - * asynchronously. If we introduce the streamsProcessMutex_ right away, the >> - * lock will be held forever since it is synchronous at this point >> - * (see streamProcessingComplete). >> - */ >> + >> for (auto &buffer : descriptor->buffers_) { >> CameraStream *stream = buffer.stream; >> >> @@ -1145,7 +1119,13 @@ void CameraDevice::requestComplete(Request *request) >> buffer.internalBuffer = src; >> >> needsPostProcessing = true; >> - int ret = stream->process(*src, buffer, descriptor); >> + >> + int ret; >> + { >> + MutexLocker locker(descriptor->streamsProcessMutex_); > It may be possible (see my review of 2/4) to only lock access to the > data structures and not cover the process() call. Let's see how it turns > out. Yes, I need to draft the code and then only I'll be able to confirm. My deep-code visual skills are kinda limited. > >> + ret = stream->process(*src, buffer, descriptor); >> + } >> + >> if (ret) { >> setBufferStatus(stream, buffer, descriptor, >> Camera3RequestDescriptor::Status::Error); >> @@ -1156,6 +1136,12 @@ void CameraDevice::requestComplete(Request *request) >> if (needsPostProcessing) { >> if (processingStatus == Camera3RequestDescriptor::Status::Error) { >> descriptor->status_ = processingStatus; >> + /* >> + * \todo This is problematic when some streams are >> + * queued successfully, but some fail to get queued. >> + * We might end up with use-after-free situation for >> + * descriptor in streamProcessingComplete(). >> + */ > Hopefully this will be fixed in v6 of 2/4 :-) > >> sendCaptureResults(); >> } >> >> diff --git a/src/android/camera_request.h b/src/android/camera_request.h >> index 3a2774e0..85274414 100644 >> --- a/src/android/camera_request.h >> +++ b/src/android/camera_request.h >> @@ -18,6 +18,7 @@ >> >> #include <hardware/camera3.h> >> >> +#include "camera_buffer.h" >> #include "camera_metadata.h" >> #include "camera_worker.h" >> >> @@ -39,6 +40,9 @@ public: >> int fence; >> Status status; >> libcamera::FrameBuffer *internalBuffer = nullptr; >> + std::unique_ptr<CameraBuffer> destBuffer; >> + const libcamera::FrameBuffer *srcBuffer; >> + Camera3RequestDescriptor *request = nullptr; >> }; >> >> Camera3RequestDescriptor(libcamera::Camera *camera, >> diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp >> index b3cb06a2..a29ce33b 100644 >> --- a/src/android/camera_stream.cpp >> +++ b/src/android/camera_stream.cpp >> @@ -99,6 +99,7 @@ int CameraStream::configure() >> if (ret) >> return ret; >> >> + worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get()); >> postProcessor_->processComplete.connect( >> this, [&](Camera3RequestDescriptor *request, PostProcessor::Status status) { >> Camera3RequestDescriptor::Status bufferStatus = >> @@ -110,6 +111,7 @@ int CameraStream::configure() >> cameraDevice_->streamProcessingComplete(this, request, >> bufferStatus); >> }); >> + worker_->start(); >> } >> >> if (type_ == Type::Internal) { >> @@ -179,23 +181,31 @@ int CameraStream::process(const FrameBuffer &source, >> if (!postProcessor_) >> return 0; >> >> - /* >> - * \todo Buffer mapping and processing should be moved to a >> - * separate thread. >> - */ >> const StreamConfiguration &output = configuration(); >> - CameraBuffer destBuffer(*dest.camera3Buffer, output.pixelFormat, >> - output.size, PROT_READ | PROT_WRITE); >> - if (!destBuffer.isValid()) { >> + dest.destBuffer = std::make_unique<CameraBuffer>( >> + *dest.camera3Buffer, output.pixelFormat, output.size, PROT_READ | PROT_WRITE); > dest.destBuffer = std::make_unique<CameraBuffer>( > *dest.camera3Buffer, output.pixelFormat, output.size, > PROT_READ | PROT_WRITE); > >> + if (!dest.destBuffer->isValid()) { >> LOG(HAL, Error) << "Failed to create destination buffer"; >> return -EINVAL; >> } >> >> - postProcessor_->process(source, &destBuffer, request); >> + dest.srcBuffer = &source; >> + dest.request = request; >> + >> + /* Push the postProcessor request to the worker queue. */ >> + worker_->queueRequest(&dest); >> >> return 0; >> } >> >> +void CameraStream::flush() >> +{ >> + if (!postProcessor_) >> + return; >> + >> + worker_->flush(); >> +} >> + >> FrameBuffer *CameraStream::getBuffer() >> { >> if (!allocator_) >> @@ -223,3 +233,84 @@ void CameraStream::putBuffer(FrameBuffer *buffer) >> >> buffers_.push_back(buffer); >> } >> + >> +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor) >> + : postProcessor_(postProcessor) >> +{ >> +} >> + >> +CameraStream::PostProcessorWorker::~PostProcessorWorker() >> +{ >> + { >> + libcamera::MutexLocker lock(mutex_); >> + state_ = State::Stopped; >> + } >> + >> + cv_.notify_one(); >> + wait(); >> +} >> + >> +void CameraStream::PostProcessorWorker::start() >> +{ >> + { >> + libcamera::MutexLocker lock(mutex_); > Could you add a > > ASSERT(state_ != State::Running); > > here ? I'm worried that a future refactoring will call > CameraStream::configure() multiple times, and that would result in the > worker being start multiple times too. Ack > >> + state_ = State::Running; >> + } >> + >> + Thread::start(); >> +} >> + >> +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest) >> +{ >> + { >> + MutexLocker lock(mutex_); >> + ASSERT(state_ == State::Running); >> + requests_.push(dest); >> + } >> + >> + cv_.notify_one(); >> +} >> + >> +void CameraStream::PostProcessorWorker::run() >> +{ >> + MutexLocker locker(mutex_); >> + >> + while (1) { >> + cv_.wait(locker, [&] { >> + return state_ != State::Running || !requests_.empty(); >> + }); >> + >> + if (state_ != State::Running) >> + break; >> + >> + Camera3RequestDescriptor::StreamBuffer *stream = requests_.front(); >> + requests_.pop(); >> + locker.unlock(); >> + >> + postProcessor_->process(*stream->srcBuffer, stream->destBuffer.get(), >> + stream->request); >> + >> + locker.lock(); >> + } >> + >> + if (state_ == State::Flushing) { >> + while (!requests_.empty()) { >> + postProcessor_->processComplete.emit( >> + requests_.front()->request, >> + PostProcessor::Status::Error); > I'm also a tiny bit concerned here that we're emitting the signal with > the lock held, while for requests that complete normally we don't. The > behaviour isn't consistent and could cause issues in the CameraStream or > CameraDevice classes. I think it's harmless for now, but maybe we could > drop the lock around the emit(à call ? One option to avoid dropping and > taking the lock in every iteration would be > > std::queue<Camera3RequestDescriptor::StreamBuffer *> requests = std::move(requests_); > > locker.unlock(); > > while (!requests.empty()) { > postProcessor_->processComplete.emit( > requests.front()->request, > PostProcessor::Status::Error); > requests.pop(); > } > > locker.lock(); > > And now that I think about it, given that we now have a StreamBuffer > object, wouldn't it be simpler to pass the StreamBuffer instead of the > Camera3RequestDescriptor to the signal ? Yesteday, Jacopo might have suggested the same. I'll try to accomodate this. >> + } >> + >> + state_ = State::Stopped; >> + locker.unlock(); > You can drop the unlock() call here. Ack. Shouldn't be here > >> + } >> +} >> + >> +void CameraStream::PostProcessorWorker::flush() >> +{ >> + libcamera::MutexLocker lock(mutex_); >> + state_ = State::Flushing; >> + lock.unlock(); >> + >> + cv_.notify_one(); >> +} >> diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h >> index f242336e..64e32c77 100644 >> --- a/src/android/camera_stream.h >> +++ b/src/android/camera_stream.h >> @@ -7,12 +7,16 @@ >> #ifndef __ANDROID_CAMERA_STREAM_H__ >> #define __ANDROID_CAMERA_STREAM_H__ >> >> +#include <condition_variable> >> #include <memory> >> #include <mutex> >> +#include <queue> >> #include <vector> >> >> #include <hardware/camera3.h> >> >> +#include <libcamera/base/thread.h> >> + >> #include <libcamera/camera.h> >> #include <libcamera/framebuffer.h> >> #include <libcamera/framebuffer_allocator.h> >> @@ -20,9 +24,9 @@ >> #include <libcamera/pixel_format.h> >> >> #include "camera_request.h" >> +#include "post_processor.h" >> >> class CameraDevice; >> -class PostProcessor; >> >> class CameraStream >> { >> @@ -126,8 +130,38 @@ public: >> Camera3RequestDescriptor *request); >> libcamera::FrameBuffer *getBuffer(); >> void putBuffer(libcamera::FrameBuffer *buffer); >> + void flush(); >> >> private: >> + class PostProcessorWorker : public libcamera::Thread >> + { >> + public: >> + enum class State { >> + Stopped, >> + Running, >> + Flushing, >> + }; >> + >> + PostProcessorWorker(PostProcessor *postProcessor); >> + ~PostProcessorWorker(); >> + >> + void start(); >> + void queueRequest(Camera3RequestDescriptor::StreamBuffer *request); >> + void flush(); >> + >> + protected: >> + void run() override; >> + >> + private: >> + PostProcessor *postProcessor_; >> + >> + libcamera::Mutex mutex_; >> + std::condition_variable cv_; >> + >> + std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_; >> + State state_; >> + }; >> + >> int waitFence(int fence); >> >> CameraDevice *const cameraDevice_; >> @@ -144,6 +178,8 @@ private: >> */ >> std::unique_ptr<std::mutex> mutex_; >> std::unique_ptr<PostProcessor> postProcessor_; >> + >> + std::unique_ptr<PostProcessorWorker> worker_; >> }; >> >> #endif /* __ANDROID_CAMERA_STREAM__ */
Hello, On 10/20/21 4:12 PM, Umang Jain wrote: > Introduce a dedicated worker class derived from libcamera::Thread. > The worker class maintains a queue for post-processing requests > and waits for a post-processing request to become available. > It will process them as per FIFO before de-queuing it from the > queue. > > To get access to the source and destination buffers in the worker > thread, we also need to save a pointer to them in the > Camera3RequestDescriptor. > > This patch also implements a flush() for the PostProcessorWorker > class which is responsible to purge post-processing requests > queued up while a camera is stopping/flushing. It is hooked with > CameraStream::flush(), which isn't used currently but will be > used when we handle flush/stop scnearios in greater detail > subsequently (in a different patchset). > > The libcamera request completion handler CameraDevice::requestComplete() > assumes that the request that has just completed is at the front of the > queue. Now that the post-processor runs asynchronously, this isn't true > anymore, a request being post-processed will stay in the queue and a new > libcamera request may complete. Remove that assumption, and use the > request cookie to obtain the Camera3RequestDescriptor. > > Signed-off-by: Umang Jain <umang.jain@ideasonboard.com> > Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com> > --- > src/android/camera_device.cpp | 46 +++++---------- > src/android/camera_request.h | 4 ++ > src/android/camera_stream.cpp | 107 +++++++++++++++++++++++++++++++--- > src/android/camera_stream.h | 38 +++++++++++- > 4 files changed, 156 insertions(+), 39 deletions(-) > > diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp > index 541c2c81..b14416ce 100644 > --- a/src/android/camera_device.cpp > +++ b/src/android/camera_device.cpp > @@ -1020,29 +1020,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques > > void CameraDevice::requestComplete(Request *request) > { > - Camera3RequestDescriptor *descriptor; > - { > - MutexLocker descriptorsLock(descriptorsMutex_); > - ASSERT(!descriptors_.empty()); > - descriptor = descriptors_.front().get(); > - } > - > - if (descriptor->request_->cookie() != request->cookie()) { > - /* > - * \todo Clarify if the Camera has to be closed on > - * ERROR_DEVICE. > - */ > - LOG(HAL, Error) > - << "Out-of-order completion for request " > - << utils::hex(request->cookie()); > - > - MutexLocker descriptorsLock(descriptorsMutex_); > - descriptors_.pop(); > - > - notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE); > - > - return; > - } > + Camera3RequestDescriptor *descriptor = > + reinterpret_cast<Camera3RequestDescriptor *>(request->cookie()); > > /* > * Prepare the capture result for the Android camera stack. > @@ -1120,12 +1099,7 @@ void CameraDevice::requestComplete(Request *request) > bool needsPostProcessing = false; > Camera3RequestDescriptor::Status processingStatus = > Camera3RequestDescriptor::Status::Pending; > - /* > - * \todo Apply streamsProcessMutex_ when post-processing is adapted to run > - * asynchronously. If we introduce the streamsProcessMutex_ right away, the > - * lock will be held forever since it is synchronous at this point > - * (see streamProcessingComplete). > - */ > + > for (auto &buffer : descriptor->buffers_) { > CameraStream *stream = buffer.stream; > > @@ -1145,7 +1119,13 @@ void CameraDevice::requestComplete(Request *request) > buffer.internalBuffer = src; > > needsPostProcessing = true; > - int ret = stream->process(*src, buffer, descriptor); > + > + int ret; > + { > + MutexLocker locker(descriptor->streamsProcessMutex_); > + ret = stream->process(*src, buffer, descriptor); > + } > + > if (ret) { > setBufferStatus(stream, buffer, descriptor, > Camera3RequestDescriptor::Status::Error); > @@ -1156,6 +1136,12 @@ void CameraDevice::requestComplete(Request *request) > if (needsPostProcessing) { > if (processingStatus == Camera3RequestDescriptor::Status::Error) { > descriptor->status_ = processingStatus; > + /* > + * \todo This is problematic when some streams are > + * queued successfully, but some fail to get queued. > + * We might end up with use-after-free situation for > + * descriptor in streamProcessingComplete(). > + */ > sendCaptureResults(); > } > > diff --git a/src/android/camera_request.h b/src/android/camera_request.h > index 3a2774e0..85274414 100644 > --- a/src/android/camera_request.h > +++ b/src/android/camera_request.h > @@ -18,6 +18,7 @@ > > #include <hardware/camera3.h> > > +#include "camera_buffer.h" > #include "camera_metadata.h" > #include "camera_worker.h" > > @@ -39,6 +40,9 @@ public: > int fence; > Status status; > libcamera::FrameBuffer *internalBuffer = nullptr; > + std::unique_ptr<CameraBuffer> destBuffer; > + const libcamera::FrameBuffer *srcBuffer; > + Camera3RequestDescriptor *request = nullptr; > }; > > Camera3RequestDescriptor(libcamera::Camera *camera, > diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp > index b3cb06a2..a29ce33b 100644 > --- a/src/android/camera_stream.cpp > +++ b/src/android/camera_stream.cpp > @@ -99,6 +99,7 @@ int CameraStream::configure() > if (ret) > return ret; > > + worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get()); > postProcessor_->processComplete.connect( > this, [&](Camera3RequestDescriptor *request, PostProcessor::Status status) { > Camera3RequestDescriptor::Status bufferStatus = > @@ -110,6 +111,7 @@ int CameraStream::configure() > cameraDevice_->streamProcessingComplete(this, request, > bufferStatus); > }); > + worker_->start(); > } > > if (type_ == Type::Internal) { > @@ -179,23 +181,31 @@ int CameraStream::process(const FrameBuffer &source, > if (!postProcessor_) > return 0; > > - /* > - * \todo Buffer mapping and processing should be moved to a > - * separate thread. > - */ > const StreamConfiguration &output = configuration(); > - CameraBuffer destBuffer(*dest.camera3Buffer, output.pixelFormat, > - output.size, PROT_READ | PROT_WRITE); > - if (!destBuffer.isValid()) { > + dest.destBuffer = std::make_unique<CameraBuffer>( > + *dest.camera3Buffer, output.pixelFormat, output.size, PROT_READ | PROT_WRITE); > + if (!dest.destBuffer->isValid()) { > LOG(HAL, Error) << "Failed to create destination buffer"; > return -EINVAL; > } > > - postProcessor_->process(source, &destBuffer, request); > + dest.srcBuffer = &source; > + dest.request = request; > + > + /* Push the postProcessor request to the worker queue. */ > + worker_->queueRequest(&dest); > > return 0; > } > > +void CameraStream::flush() > +{ > + if (!postProcessor_) > + return; > + > + worker_->flush(); > +} > + > FrameBuffer *CameraStream::getBuffer() > { > if (!allocator_) > @@ -223,3 +233,84 @@ void CameraStream::putBuffer(FrameBuffer *buffer) > > buffers_.push_back(buffer); > } > + > +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor) > + : postProcessor_(postProcessor) > +{ > +} > + > +CameraStream::PostProcessorWorker::~PostProcessorWorker() > +{ > + { > + libcamera::MutexLocker lock(mutex_); > + state_ = State::Stopped; > + } > + > + cv_.notify_one(); > + wait(); > +} > + > +void CameraStream::PostProcessorWorker::start() > +{ > + { > + libcamera::MutexLocker lock(mutex_); > + state_ = State::Running; > + } > + > + Thread::start(); > +} > + > +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest) > +{ > + { > + MutexLocker lock(mutex_); > + ASSERT(state_ == State::Running); > + requests_.push(dest); > + } > + > + cv_.notify_one(); > +} > + > +void CameraStream::PostProcessorWorker::run() > +{ > + MutexLocker locker(mutex_); > + > + while (1) { > + cv_.wait(locker, [&] { > + return state_ != State::Running || !requests_.empty(); > + }); > + > + if (state_ != State::Running) > + break; > + > + Camera3RequestDescriptor::StreamBuffer *stream = requests_.front(); > + requests_.pop(); > + locker.unlock(); > + > + postProcessor_->process(*stream->srcBuffer, stream->destBuffer.get(), > + stream->request); As I have received in reviews, processComplete should emit a Camera3RequestDescriptor::StreamBuffer pointer, instead of having emitted currently a pointer to Camera3RequestDescriptor Now looking postProcessor_->process() call above, I think I should further migrate its signature to: PostProcessor::process(Camera3RequestDescriptor::StreamBuffer *stream) and all the three elements can be accessed from there. Laurent, do you agree with the design decision? > + > + locker.lock(); > + } > + > + if (state_ == State::Flushing) { > + while (!requests_.empty()) { > + postProcessor_->processComplete.emit( > + requests_.front()->request, > + PostProcessor::Status::Error); > + requests_.pop(); > + } > + > + state_ = State::Stopped; > + locker.unlock(); > + } > +} > + > +void CameraStream::PostProcessorWorker::flush() > +{ > + libcamera::MutexLocker lock(mutex_); > + state_ = State::Flushing; > + lock.unlock(); > + > + cv_.notify_one(); > +} > diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h > index f242336e..64e32c77 100644 > --- a/src/android/camera_stream.h > +++ b/src/android/camera_stream.h > @@ -7,12 +7,16 @@ > #ifndef __ANDROID_CAMERA_STREAM_H__ > #define __ANDROID_CAMERA_STREAM_H__ > > +#include <condition_variable> > #include <memory> > #include <mutex> > +#include <queue> > #include <vector> > > #include <hardware/camera3.h> > > +#include <libcamera/base/thread.h> > + > #include <libcamera/camera.h> > #include <libcamera/framebuffer.h> > #include <libcamera/framebuffer_allocator.h> > @@ -20,9 +24,9 @@ > #include <libcamera/pixel_format.h> > > #include "camera_request.h" > +#include "post_processor.h" > > class CameraDevice; > -class PostProcessor; > > class CameraStream > { > @@ -126,8 +130,38 @@ public: > Camera3RequestDescriptor *request); > libcamera::FrameBuffer *getBuffer(); > void putBuffer(libcamera::FrameBuffer *buffer); > + void flush(); > > private: > + class PostProcessorWorker : public libcamera::Thread > + { > + public: > + enum class State { > + Stopped, > + Running, > + Flushing, > + }; > + > + PostProcessorWorker(PostProcessor *postProcessor); > + ~PostProcessorWorker(); > + > + void start(); > + void queueRequest(Camera3RequestDescriptor::StreamBuffer *request); > + void flush(); > + > + protected: > + void run() override; > + > + private: > + PostProcessor *postProcessor_; > + > + libcamera::Mutex mutex_; > + std::condition_variable cv_; > + > + std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_; > + State state_; > + }; > + > int waitFence(int fence); > > CameraDevice *const cameraDevice_; > @@ -144,6 +178,8 @@ private: > */ > std::unique_ptr<std::mutex> mutex_; > std::unique_ptr<PostProcessor> postProcessor_; > + > + std::unique_ptr<PostProcessorWorker> worker_; > }; > > #endif /* __ANDROID_CAMERA_STREAM__ */
Hi Umang, On Thu, Oct 21, 2021 at 01:07:50PM +0530, Umang Jain wrote: > On 10/20/21 4:12 PM, Umang Jain wrote: > > Introduce a dedicated worker class derived from libcamera::Thread. > > The worker class maintains a queue for post-processing requests > > and waits for a post-processing request to become available. > > It will process them as per FIFO before de-queuing it from the > > queue. > > > > To get access to the source and destination buffers in the worker > > thread, we also need to save a pointer to them in the > > Camera3RequestDescriptor. > > > > This patch also implements a flush() for the PostProcessorWorker > > class which is responsible to purge post-processing requests > > queued up while a camera is stopping/flushing. It is hooked with > > CameraStream::flush(), which isn't used currently but will be > > used when we handle flush/stop scnearios in greater detail > > subsequently (in a different patchset). > > > > The libcamera request completion handler CameraDevice::requestComplete() > > assumes that the request that has just completed is at the front of the > > queue. Now that the post-processor runs asynchronously, this isn't true > > anymore, a request being post-processed will stay in the queue and a new > > libcamera request may complete. Remove that assumption, and use the > > request cookie to obtain the Camera3RequestDescriptor. > > > > Signed-off-by: Umang Jain <umang.jain@ideasonboard.com> > > Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com> > > --- > > src/android/camera_device.cpp | 46 +++++---------- > > src/android/camera_request.h | 4 ++ > > src/android/camera_stream.cpp | 107 +++++++++++++++++++++++++++++++--- > > src/android/camera_stream.h | 38 +++++++++++- > > 4 files changed, 156 insertions(+), 39 deletions(-) > > > > diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp > > index 541c2c81..b14416ce 100644 > > --- a/src/android/camera_device.cpp > > +++ b/src/android/camera_device.cpp > > @@ -1020,29 +1020,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques > > > > void CameraDevice::requestComplete(Request *request) > > { > > - Camera3RequestDescriptor *descriptor; > > - { > > - MutexLocker descriptorsLock(descriptorsMutex_); > > - ASSERT(!descriptors_.empty()); > > - descriptor = descriptors_.front().get(); > > - } > > - > > - if (descriptor->request_->cookie() != request->cookie()) { > > - /* > > - * \todo Clarify if the Camera has to be closed on > > - * ERROR_DEVICE. > > - */ > > - LOG(HAL, Error) > > - << "Out-of-order completion for request " > > - << utils::hex(request->cookie()); > > - > > - MutexLocker descriptorsLock(descriptorsMutex_); > > - descriptors_.pop(); > > - > > - notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE); > > - > > - return; > > - } > > + Camera3RequestDescriptor *descriptor = > > + reinterpret_cast<Camera3RequestDescriptor *>(request->cookie()); > > > > /* > > * Prepare the capture result for the Android camera stack. > > @@ -1120,12 +1099,7 @@ void CameraDevice::requestComplete(Request *request) > > bool needsPostProcessing = false; > > Camera3RequestDescriptor::Status processingStatus = > > Camera3RequestDescriptor::Status::Pending; > > - /* > > - * \todo Apply streamsProcessMutex_ when post-processing is adapted to run > > - * asynchronously. If we introduce the streamsProcessMutex_ right away, the > > - * lock will be held forever since it is synchronous at this point > > - * (see streamProcessingComplete). > > - */ > > + > > for (auto &buffer : descriptor->buffers_) { > > CameraStream *stream = buffer.stream; > > > > @@ -1145,7 +1119,13 @@ void CameraDevice::requestComplete(Request *request) > > buffer.internalBuffer = src; > > > > needsPostProcessing = true; > > - int ret = stream->process(*src, buffer, descriptor); > > + > > + int ret; > > + { > > + MutexLocker locker(descriptor->streamsProcessMutex_); > > + ret = stream->process(*src, buffer, descriptor); > > + } > > + > > if (ret) { > > setBufferStatus(stream, buffer, descriptor, > > Camera3RequestDescriptor::Status::Error); > > @@ -1156,6 +1136,12 @@ void CameraDevice::requestComplete(Request *request) > > if (needsPostProcessing) { > > if (processingStatus == Camera3RequestDescriptor::Status::Error) { > > descriptor->status_ = processingStatus; > > + /* > > + * \todo This is problematic when some streams are > > + * queued successfully, but some fail to get queued. > > + * We might end up with use-after-free situation for > > + * descriptor in streamProcessingComplete(). > > + */ > > sendCaptureResults(); > > } > > > > diff --git a/src/android/camera_request.h b/src/android/camera_request.h > > index 3a2774e0..85274414 100644 > > --- a/src/android/camera_request.h > > +++ b/src/android/camera_request.h > > @@ -18,6 +18,7 @@ > > > > #include <hardware/camera3.h> > > > > +#include "camera_buffer.h" > > #include "camera_metadata.h" > > #include "camera_worker.h" > > > > @@ -39,6 +40,9 @@ public: > > int fence; > > Status status; > > libcamera::FrameBuffer *internalBuffer = nullptr; > > + std::unique_ptr<CameraBuffer> destBuffer; > > + const libcamera::FrameBuffer *srcBuffer; > > + Camera3RequestDescriptor *request = nullptr; > > }; > > > > Camera3RequestDescriptor(libcamera::Camera *camera, > > diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp > > index b3cb06a2..a29ce33b 100644 > > --- a/src/android/camera_stream.cpp > > +++ b/src/android/camera_stream.cpp > > @@ -99,6 +99,7 @@ int CameraStream::configure() > > if (ret) > > return ret; > > > > + worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get()); > > postProcessor_->processComplete.connect( > > this, [&](Camera3RequestDescriptor *request, PostProcessor::Status status) { > > Camera3RequestDescriptor::Status bufferStatus = > > @@ -110,6 +111,7 @@ int CameraStream::configure() > > cameraDevice_->streamProcessingComplete(this, request, > > bufferStatus); > > }); > > + worker_->start(); > > } > > > > if (type_ == Type::Internal) { > > @@ -179,23 +181,31 @@ int CameraStream::process(const FrameBuffer &source, > > if (!postProcessor_) > > return 0; > > > > - /* > > - * \todo Buffer mapping and processing should be moved to a > > - * separate thread. > > - */ > > const StreamConfiguration &output = configuration(); > > - CameraBuffer destBuffer(*dest.camera3Buffer, output.pixelFormat, > > - output.size, PROT_READ | PROT_WRITE); > > - if (!destBuffer.isValid()) { > > + dest.destBuffer = std::make_unique<CameraBuffer>( > > + *dest.camera3Buffer, output.pixelFormat, output.size, PROT_READ | PROT_WRITE); > > + if (!dest.destBuffer->isValid()) { > > LOG(HAL, Error) << "Failed to create destination buffer"; > > return -EINVAL; > > } > > > > - postProcessor_->process(source, &destBuffer, request); > > + dest.srcBuffer = &source; > > + dest.request = request; > > + > > + /* Push the postProcessor request to the worker queue. */ > > + worker_->queueRequest(&dest); > > > > return 0; > > } > > > > +void CameraStream::flush() > > +{ > > + if (!postProcessor_) > > + return; > > + > > + worker_->flush(); > > +} > > + > > FrameBuffer *CameraStream::getBuffer() > > { > > if (!allocator_) > > @@ -223,3 +233,84 @@ void CameraStream::putBuffer(FrameBuffer *buffer) > > > > buffers_.push_back(buffer); > > } > > + > > +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor) > > + : postProcessor_(postProcessor) > > +{ > > +} > > + > > +CameraStream::PostProcessorWorker::~PostProcessorWorker() > > +{ > > + { > > + libcamera::MutexLocker lock(mutex_); > > + state_ = State::Stopped; > > + } > > + > > + cv_.notify_one(); > > + wait(); > > +} > > + > > +void CameraStream::PostProcessorWorker::start() > > +{ > > + { > > + libcamera::MutexLocker lock(mutex_); > > + state_ = State::Running; > > + } > > + > > + Thread::start(); > > +} > > + > > +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest) > > +{ > > + { > > + MutexLocker lock(mutex_); > > + ASSERT(state_ == State::Running); > > + requests_.push(dest); > > + } > > + > > + cv_.notify_one(); > > +} > > + > > +void CameraStream::PostProcessorWorker::run() > > +{ > > + MutexLocker locker(mutex_); > > + > > + while (1) { > > + cv_.wait(locker, [&] { > > + return state_ != State::Running || !requests_.empty(); > > + }); > > + > > + if (state_ != State::Running) > > + break; > > + > > + Camera3RequestDescriptor::StreamBuffer *stream = requests_.front(); > > + requests_.pop(); > > + locker.unlock(); > > + > > + postProcessor_->process(*stream->srcBuffer, stream->destBuffer.get(), > > + stream->request); > > As I have received in reviews, processComplete should emit a > Camera3RequestDescriptor::StreamBuffer pointer, instead of having > emitted currently a pointer to Camera3RequestDescriptor > > Now looking postProcessor_->process() call above, I think I should > further migrate its signature to: > > PostProcessor::process(Camera3RequestDescriptor::StreamBuffer *stream) > > and all the three elements can be accessed from there. > > Laurent, do you agree with the design decision? Yes, that looks good to me. Maybe the variable could also be renamed to streamBuffer here, to avoid confusing it with a stream. > > + > > + locker.lock(); > > + } > > + > > + if (state_ == State::Flushing) { > > + while (!requests_.empty()) { > > + postProcessor_->processComplete.emit( > > + requests_.front()->request, > > + PostProcessor::Status::Error); > > + requests_.pop(); > > + } > > + > > + state_ = State::Stopped; > > + locker.unlock(); > > + } > > +} > > + > > +void CameraStream::PostProcessorWorker::flush() > > +{ > > + libcamera::MutexLocker lock(mutex_); > > + state_ = State::Flushing; > > + lock.unlock(); > > + > > + cv_.notify_one(); > > +} > > diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h > > index f242336e..64e32c77 100644 > > --- a/src/android/camera_stream.h > > +++ b/src/android/camera_stream.h > > @@ -7,12 +7,16 @@ > > #ifndef __ANDROID_CAMERA_STREAM_H__ > > #define __ANDROID_CAMERA_STREAM_H__ > > > > +#include <condition_variable> > > #include <memory> > > #include <mutex> > > +#include <queue> > > #include <vector> > > > > #include <hardware/camera3.h> > > > > +#include <libcamera/base/thread.h> > > + > > #include <libcamera/camera.h> > > #include <libcamera/framebuffer.h> > > #include <libcamera/framebuffer_allocator.h> > > @@ -20,9 +24,9 @@ > > #include <libcamera/pixel_format.h> > > > > #include "camera_request.h" > > +#include "post_processor.h" > > > > class CameraDevice; > > -class PostProcessor; > > > > class CameraStream > > { > > @@ -126,8 +130,38 @@ public: > > Camera3RequestDescriptor *request); > > libcamera::FrameBuffer *getBuffer(); > > void putBuffer(libcamera::FrameBuffer *buffer); > > + void flush(); > > > > private: > > + class PostProcessorWorker : public libcamera::Thread > > + { > > + public: > > + enum class State { > > + Stopped, > > + Running, > > + Flushing, > > + }; > > + > > + PostProcessorWorker(PostProcessor *postProcessor); > > + ~PostProcessorWorker(); > > + > > + void start(); > > + void queueRequest(Camera3RequestDescriptor::StreamBuffer *request); > > + void flush(); > > + > > + protected: > > + void run() override; > > + > > + private: > > + PostProcessor *postProcessor_; > > + > > + libcamera::Mutex mutex_; > > + std::condition_variable cv_; > > + > > + std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_; > > + State state_; > > + }; > > + > > int waitFence(int fence); > > > > CameraDevice *const cameraDevice_; > > @@ -144,6 +178,8 @@ private: > > */ > > std::unique_ptr<std::mutex> mutex_; > > std::unique_ptr<PostProcessor> postProcessor_; > > + > > + std::unique_ptr<PostProcessorWorker> worker_; > > }; > > > > #endif /* __ANDROID_CAMERA_STREAM__ */
diff --git a/src/android/camera_device.cpp b/src/android/camera_device.cpp index 541c2c81..b14416ce 100644 --- a/src/android/camera_device.cpp +++ b/src/android/camera_device.cpp @@ -1020,29 +1020,8 @@ int CameraDevice::processCaptureRequest(camera3_capture_request_t *camera3Reques void CameraDevice::requestComplete(Request *request) { - Camera3RequestDescriptor *descriptor; - { - MutexLocker descriptorsLock(descriptorsMutex_); - ASSERT(!descriptors_.empty()); - descriptor = descriptors_.front().get(); - } - - if (descriptor->request_->cookie() != request->cookie()) { - /* - * \todo Clarify if the Camera has to be closed on - * ERROR_DEVICE. - */ - LOG(HAL, Error) - << "Out-of-order completion for request " - << utils::hex(request->cookie()); - - MutexLocker descriptorsLock(descriptorsMutex_); - descriptors_.pop(); - - notifyError(0, nullptr, CAMERA3_MSG_ERROR_DEVICE); - - return; - } + Camera3RequestDescriptor *descriptor = + reinterpret_cast<Camera3RequestDescriptor *>(request->cookie()); /* * Prepare the capture result for the Android camera stack. @@ -1120,12 +1099,7 @@ void CameraDevice::requestComplete(Request *request) bool needsPostProcessing = false; Camera3RequestDescriptor::Status processingStatus = Camera3RequestDescriptor::Status::Pending; - /* - * \todo Apply streamsProcessMutex_ when post-processing is adapted to run - * asynchronously. If we introduce the streamsProcessMutex_ right away, the - * lock will be held forever since it is synchronous at this point - * (see streamProcessingComplete). - */ + for (auto &buffer : descriptor->buffers_) { CameraStream *stream = buffer.stream; @@ -1145,7 +1119,13 @@ void CameraDevice::requestComplete(Request *request) buffer.internalBuffer = src; needsPostProcessing = true; - int ret = stream->process(*src, buffer, descriptor); + + int ret; + { + MutexLocker locker(descriptor->streamsProcessMutex_); + ret = stream->process(*src, buffer, descriptor); + } + if (ret) { setBufferStatus(stream, buffer, descriptor, Camera3RequestDescriptor::Status::Error); @@ -1156,6 +1136,12 @@ void CameraDevice::requestComplete(Request *request) if (needsPostProcessing) { if (processingStatus == Camera3RequestDescriptor::Status::Error) { descriptor->status_ = processingStatus; + /* + * \todo This is problematic when some streams are + * queued successfully, but some fail to get queued. + * We might end up with use-after-free situation for + * descriptor in streamProcessingComplete(). + */ sendCaptureResults(); } diff --git a/src/android/camera_request.h b/src/android/camera_request.h index 3a2774e0..85274414 100644 --- a/src/android/camera_request.h +++ b/src/android/camera_request.h @@ -18,6 +18,7 @@ #include <hardware/camera3.h> +#include "camera_buffer.h" #include "camera_metadata.h" #include "camera_worker.h" @@ -39,6 +40,9 @@ public: int fence; Status status; libcamera::FrameBuffer *internalBuffer = nullptr; + std::unique_ptr<CameraBuffer> destBuffer; + const libcamera::FrameBuffer *srcBuffer; + Camera3RequestDescriptor *request = nullptr; }; Camera3RequestDescriptor(libcamera::Camera *camera, diff --git a/src/android/camera_stream.cpp b/src/android/camera_stream.cpp index b3cb06a2..a29ce33b 100644 --- a/src/android/camera_stream.cpp +++ b/src/android/camera_stream.cpp @@ -99,6 +99,7 @@ int CameraStream::configure() if (ret) return ret; + worker_ = std::make_unique<PostProcessorWorker>(postProcessor_.get()); postProcessor_->processComplete.connect( this, [&](Camera3RequestDescriptor *request, PostProcessor::Status status) { Camera3RequestDescriptor::Status bufferStatus = @@ -110,6 +111,7 @@ int CameraStream::configure() cameraDevice_->streamProcessingComplete(this, request, bufferStatus); }); + worker_->start(); } if (type_ == Type::Internal) { @@ -179,23 +181,31 @@ int CameraStream::process(const FrameBuffer &source, if (!postProcessor_) return 0; - /* - * \todo Buffer mapping and processing should be moved to a - * separate thread. - */ const StreamConfiguration &output = configuration(); - CameraBuffer destBuffer(*dest.camera3Buffer, output.pixelFormat, - output.size, PROT_READ | PROT_WRITE); - if (!destBuffer.isValid()) { + dest.destBuffer = std::make_unique<CameraBuffer>( + *dest.camera3Buffer, output.pixelFormat, output.size, PROT_READ | PROT_WRITE); + if (!dest.destBuffer->isValid()) { LOG(HAL, Error) << "Failed to create destination buffer"; return -EINVAL; } - postProcessor_->process(source, &destBuffer, request); + dest.srcBuffer = &source; + dest.request = request; + + /* Push the postProcessor request to the worker queue. */ + worker_->queueRequest(&dest); return 0; } +void CameraStream::flush() +{ + if (!postProcessor_) + return; + + worker_->flush(); +} + FrameBuffer *CameraStream::getBuffer() { if (!allocator_) @@ -223,3 +233,84 @@ void CameraStream::putBuffer(FrameBuffer *buffer) buffers_.push_back(buffer); } + +CameraStream::PostProcessorWorker::PostProcessorWorker(PostProcessor *postProcessor) + : postProcessor_(postProcessor) +{ +} + +CameraStream::PostProcessorWorker::~PostProcessorWorker() +{ + { + libcamera::MutexLocker lock(mutex_); + state_ = State::Stopped; + } + + cv_.notify_one(); + wait(); +} + +void CameraStream::PostProcessorWorker::start() +{ + { + libcamera::MutexLocker lock(mutex_); + state_ = State::Running; + } + + Thread::start(); +} + +void CameraStream::PostProcessorWorker::queueRequest(Camera3RequestDescriptor::StreamBuffer *dest) +{ + { + MutexLocker lock(mutex_); + ASSERT(state_ == State::Running); + requests_.push(dest); + } + + cv_.notify_one(); +} + +void CameraStream::PostProcessorWorker::run() +{ + MutexLocker locker(mutex_); + + while (1) { + cv_.wait(locker, [&] { + return state_ != State::Running || !requests_.empty(); + }); + + if (state_ != State::Running) + break; + + Camera3RequestDescriptor::StreamBuffer *stream = requests_.front(); + requests_.pop(); + locker.unlock(); + + postProcessor_->process(*stream->srcBuffer, stream->destBuffer.get(), + stream->request); + + locker.lock(); + } + + if (state_ == State::Flushing) { + while (!requests_.empty()) { + postProcessor_->processComplete.emit( + requests_.front()->request, + PostProcessor::Status::Error); + requests_.pop(); + } + + state_ = State::Stopped; + locker.unlock(); + } +} + +void CameraStream::PostProcessorWorker::flush() +{ + libcamera::MutexLocker lock(mutex_); + state_ = State::Flushing; + lock.unlock(); + + cv_.notify_one(); +} diff --git a/src/android/camera_stream.h b/src/android/camera_stream.h index f242336e..64e32c77 100644 --- a/src/android/camera_stream.h +++ b/src/android/camera_stream.h @@ -7,12 +7,16 @@ #ifndef __ANDROID_CAMERA_STREAM_H__ #define __ANDROID_CAMERA_STREAM_H__ +#include <condition_variable> #include <memory> #include <mutex> +#include <queue> #include <vector> #include <hardware/camera3.h> +#include <libcamera/base/thread.h> + #include <libcamera/camera.h> #include <libcamera/framebuffer.h> #include <libcamera/framebuffer_allocator.h> @@ -20,9 +24,9 @@ #include <libcamera/pixel_format.h> #include "camera_request.h" +#include "post_processor.h" class CameraDevice; -class PostProcessor; class CameraStream { @@ -126,8 +130,38 @@ public: Camera3RequestDescriptor *request); libcamera::FrameBuffer *getBuffer(); void putBuffer(libcamera::FrameBuffer *buffer); + void flush(); private: + class PostProcessorWorker : public libcamera::Thread + { + public: + enum class State { + Stopped, + Running, + Flushing, + }; + + PostProcessorWorker(PostProcessor *postProcessor); + ~PostProcessorWorker(); + + void start(); + void queueRequest(Camera3RequestDescriptor::StreamBuffer *request); + void flush(); + + protected: + void run() override; + + private: + PostProcessor *postProcessor_; + + libcamera::Mutex mutex_; + std::condition_variable cv_; + + std::queue<Camera3RequestDescriptor::StreamBuffer *> requests_; + State state_; + }; + int waitFence(int fence); CameraDevice *const cameraDevice_; @@ -144,6 +178,8 @@ private: */ std::unique_ptr<std::mutex> mutex_; std::unique_ptr<PostProcessor> postProcessor_; + + std::unique_ptr<PostProcessorWorker> worker_; }; #endif /* __ANDROID_CAMERA_STREAM__ */