[libcamera-devel,3/8] cam: Turn BufferWriter into a FrameSink

Message ID 20200519032505.17307-4-laurent.pinchart@ideasonboard.com
State Changes Requested
Delegated to: Laurent Pinchart
Headers show
Series
  • libcamera: Add DRM/KMS viewfinder display to cam
Related show

Commit Message

Laurent Pinchart May 19, 2020, 3:25 a.m. UTC
Make the BufferWriter class inherit from FrameSink, and use the
FrameSink API to manage it. This makes the code more generic, and will
allow usage of other sinks.

Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
---
 src/cam/buffer_writer.cpp | 25 ++++++++++---
 src/cam/buffer_writer.h   | 13 ++++---
 src/cam/capture.cpp       | 76 ++++++++++++++++++++++++++++++++-------
 src/cam/capture.h         |  6 ++--
 4 files changed, 97 insertions(+), 23 deletions(-)

Comments

Niklas Söderlund May 19, 2020, 2:38 p.m. UTC | #1
Hi Laurent,

Thanks for your work.

On 2020-05-19 06:25:00 +0300, Laurent Pinchart wrote:
> Make the BufferWriter class inherit from FrameSink, and use the
> FrameSink API to manage it. This makes the code more generic, and will
> allow usage of other sinks.
> 
> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> ---
>  src/cam/buffer_writer.cpp | 25 ++++++++++---
>  src/cam/buffer_writer.h   | 13 ++++---
>  src/cam/capture.cpp       | 76 ++++++++++++++++++++++++++++++++-------
>  src/cam/capture.h         |  6 ++--
>  4 files changed, 97 insertions(+), 23 deletions(-)
> 
> diff --git a/src/cam/buffer_writer.cpp b/src/cam/buffer_writer.cpp
> index c5a5eb46224a..2bec4b132155 100644
> --- a/src/cam/buffer_writer.cpp
> +++ b/src/cam/buffer_writer.cpp
> @@ -13,6 +13,8 @@
>  #include <sys/mman.h>
>  #include <unistd.h>
>  
> +#include <libcamera/camera.h>
> +
>  #include "buffer_writer.h"
>  
>  using namespace libcamera;
> @@ -32,6 +34,21 @@ BufferWriter::~BufferWriter()
>  	mappedBuffers_.clear();
>  }
>  
> +int BufferWriter::configure(const libcamera::CameraConfiguration &config)
> +{
> +	int ret = FrameSink::configure(config);
> +	if (ret < 0)
> +		return ret;
> +
> +	streamNames_.clear();
> +	for (unsigned int index = 0; index < config.size(); ++index) {
> +		const StreamConfiguration &cfg = config.at(index);
> +		streamNames_[cfg.stream()] = "stream" + std::to_string(index);
> +	}
> +
> +	return 0;
> +}
> +
>  void BufferWriter::mapBuffer(FrameBuffer *buffer)
>  {
>  	for (const FrameBuffer::Plane &plane : buffer->planes()) {
> @@ -43,7 +60,7 @@ void BufferWriter::mapBuffer(FrameBuffer *buffer)
>  	}
>  }
>  
> -int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
> +bool BufferWriter::consumeBuffer(const Stream *stream, FrameBuffer *buffer)
>  {
>  	std::string filename;
>  	size_t pos;
> @@ -53,7 +70,7 @@ int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
>  	pos = filename.find_first_of('#');
>  	if (pos != std::string::npos) {
>  		std::stringstream ss;
> -		ss << streamName << "-" << std::setw(6)
> +		ss << streamNames_[stream] << "-" << std::setw(6)
>  		   << std::setfill('0') << buffer->metadata().sequence;
>  		filename.replace(pos, 1, ss.str());
>  	}
> @@ -62,7 +79,7 @@ int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
>  		  (pos == std::string::npos ? O_APPEND : O_TRUNC),
>  		  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
>  	if (fd == -1)
> -		return -errno;
> +		return true;

Would it make sens to log an error here as the error no longer is 
propagated to the caller?

>  
>  	for (const FrameBuffer::Plane &plane : buffer->planes()) {
>  		void *data = mappedBuffers_[plane.fd.fd()].first;
> @@ -84,5 +101,5 @@ int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
>  
>  	close(fd);
>  
> -	return ret;
> +	return true;
>  }
> diff --git a/src/cam/buffer_writer.h b/src/cam/buffer_writer.h
> index 47e26103e13e..5a5b176f73d8 100644
> --- a/src/cam/buffer_writer.h
> +++ b/src/cam/buffer_writer.h
> @@ -12,18 +12,23 @@
>  
>  #include <libcamera/buffer.h>
>  
> -class BufferWriter
> +#include "frame_sink.h"
> +
> +class BufferWriter : public FrameSink
>  {
>  public:
>  	BufferWriter(const std::string &pattern = "frame-#.bin");
>  	~BufferWriter();
>  
> -	void mapBuffer(libcamera::FrameBuffer *buffer);
> +	int configure(const libcamera::CameraConfiguration &config) override;
>  
> -	int write(libcamera::FrameBuffer *buffer,
> -		  const std::string &streamName);
> +	void mapBuffer(libcamera::FrameBuffer *buffer) override;
> +
> +	bool consumeBuffer(const libcamera::Stream *stream,
> +			   libcamera::FrameBuffer *buffer) override;
>  
>  private:
> +	std::map<const libcamera::Stream *, std::string> streamNames_;
>  	std::string pattern_;
>  	std::map<int, std::pair<void *, unsigned int>> mappedBuffers_;
>  };
> diff --git a/src/cam/capture.cpp b/src/cam/capture.cpp
> index b7e06bcc9463..7fc9cba48892 100644
> --- a/src/cam/capture.cpp
> +++ b/src/cam/capture.cpp
> @@ -11,6 +11,7 @@
>  #include <limits.h>
>  #include <sstream>
>  
> +#include "buffer_writer.h"
>  #include "capture.h"
>  #include "main.h"
>  
> @@ -18,7 +19,7 @@ using namespace libcamera;
>  
>  Capture::Capture(std::shared_ptr<Camera> camera, CameraConfiguration *config,
>  		 const StreamRoles &roles)
> -	: camera_(camera), config_(config), roles_(roles), writer_(nullptr)
> +	: camera_(camera), config_(config), roles_(roles), sink_(nullptr)
>  {
>  }
>  
> @@ -47,20 +48,28 @@ int Capture::run(EventLoop *loop, const OptionsParser::Options &options)
>  
>  	if (options.isSet(OptFile)) {
>  		if (!options[OptFile].toString().empty())
> -			writer_ = new BufferWriter(options[OptFile]);
> +			sink_ = new BufferWriter(options[OptFile]);
>  		else
> -			writer_ = new BufferWriter();
> +			sink_ = new BufferWriter();
>  	}
>  
> +	if (sink_) {
> +		ret = sink_->configure(*config_);
> +		if (ret < 0) {
> +			std::cout << "Failed to configure frame sink"
> +				  << std::endl;
> +			return ret;
> +		}
> +
> +		sink_->bufferReleased.connect(this, &Capture::sinkRelease);
> +	}
>  
>  	FrameBufferAllocator *allocator = new FrameBufferAllocator(camera_);
>  
>  	ret = capture(loop, allocator);
>  
> -	if (options.isSet(OptFile)) {
> -		delete writer_;
> -		writer_ = nullptr;
> -	}
> +	delete sink_;
> +	sink_ = nullptr;
>  
>  	delete allocator;
>  
> @@ -110,16 +119,26 @@ int Capture::capture(EventLoop *loop, FrameBufferAllocator *allocator)
>  				return ret;
>  			}
>  
> -			if (writer_)
> -				writer_->mapBuffer(buffer.get());
> +			if (sink_)
> +				sink_->mapBuffer(buffer.get());
>  		}
>  
>  		requests.push_back(request);
>  	}
>  
> +	if (sink_) {
> +		ret = sink_->start();
> +		if (ret) {
> +			std::cout << "Failed to start frame sink" << std::endl;
> +			return ret;
> +		}
> +	}
> +
>  	ret = camera_->start();
>  	if (ret) {
>  		std::cout << "Failed to start capture" << std::endl;
> +		if (sink_)
> +			sink_->stop();
>  		return ret;
>  	}
>  
> @@ -128,6 +147,8 @@ int Capture::capture(EventLoop *loop, FrameBufferAllocator *allocator)
>  		if (ret < 0) {
>  			std::cerr << "Can't queue request" << std::endl;
>  			camera_->stop();
> +			if (sink_)
> +				sink_->stop();
>  			return ret;
>  		}
>  	}
> @@ -141,6 +162,12 @@ int Capture::capture(EventLoop *loop, FrameBufferAllocator *allocator)
>  	if (ret)
>  		std::cout << "Failed to stop capture" << std::endl;
>  
> +	if (sink_) {
> +		ret = sink_->stop();
> +		if (ret)
> +			std::cout << "Failed to stop frame sink" << std::endl;
> +	}
> +
>  	return ret;
>  }
>  
> @@ -157,17 +184,18 @@ void Capture::requestComplete(Request *request)
>  	    ? 1000.0 / fps : 0.0;
>  	last_ = now;
>  
> +	bool requeue = true;
> +
>  	std::stringstream info;
>  	info << "fps: " << std::fixed << std::setprecision(2) << fps;
>  
>  	for (auto it = buffers.begin(); it != buffers.end(); ++it) {
>  		Stream *stream = it->first;
>  		FrameBuffer *buffer = it->second;
> -		const std::string &name = streamName_[stream];
>  
>  		const FrameMetadata &metadata = buffer->metadata();
>  
> -		info << " " << name
> +		info << " " << streamName_[stream]
>  		     << " seq: " << std::setw(6) << std::setfill('0') << metadata.sequence
>  		     << " bytesused: ";
>  
> @@ -178,12 +206,21 @@ void Capture::requestComplete(Request *request)
>  				info << "/";
>  		}
>  
> -		if (writer_)
> -			writer_->write(buffer, name);
> +		if (sink_) {
> +			if (!sink_->consumeBuffer(stream, buffer))
> +				requeue = false;

This will only work for requests that contains one buffer. In this 
change it will works as BufferWriter::consumeBuffer() returns true and 
the while request is requeued as the buffer writer does not need to hold 
onto the buffers. I know that the new sink added later in this sereis 
only supports one stream so it returning false and holding onto the 
buffer will also work with this logic.

Would it instead make sens to rework this so that the sink always would 
need to signal that it's done with the buffer(s) instead of requing the 
request from the requestComplete() callback? That way the behavior the 
sinks would be the same disregarding for how long it needs to hold on to 
the buffer.

> +		}
>  	}
>  
>  	std::cout << info.str() << std::endl;
>  
> +	/*
> +	 * If the frame sink holds on the buffer, we'll requeue it later in the
> +	 * complete handler.
> +	 */
> +	if (!requeue)
> +		return;
> +
>  	/*
>  	 * Create a new request and populate it with one buffer for each
>  	 * stream.
> @@ -203,3 +240,16 @@ void Capture::requestComplete(Request *request)
>  
>  	camera_->queueRequest(request);
>  }
> +
> +void Capture::sinkRelease(libcamera::FrameBuffer *buffer)
> +{
> +	Request *request = camera_->createRequest();
> +	if (!request) {
> +		std::cerr << "Can't create request" << std::endl;
> +		return;
> +	}
> +
> +	request->addBuffer(config_->at(0).stream(), buffer);
> +
> +	camera_->queueRequest(request);
> +}
> diff --git a/src/cam/capture.h b/src/cam/capture.h
> index c0e697b831fb..3be1d98e4d3e 100644
> --- a/src/cam/capture.h
> +++ b/src/cam/capture.h
> @@ -16,10 +16,11 @@
>  #include <libcamera/request.h>
>  #include <libcamera/stream.h>
>  
> -#include "buffer_writer.h"
>  #include "event_loop.h"
>  #include "options.h"
>  
> +class FrameSink;
> +
>  class Capture
>  {
>  public:
> @@ -33,13 +34,14 @@ private:
>  		    libcamera::FrameBufferAllocator *allocator);
>  
>  	void requestComplete(libcamera::Request *request);
> +	void sinkRelease(libcamera::FrameBuffer *buffer);
>  
>  	std::shared_ptr<libcamera::Camera> camera_;
>  	libcamera::CameraConfiguration *config_;
>  	libcamera::StreamRoles roles_;
>  
>  	std::map<libcamera::Stream *, std::string> streamName_;
> -	BufferWriter *writer_;
> +	FrameSink *sink_;
>  	std::chrono::steady_clock::time_point last_;
>  };
>  
> -- 
> Regards,
> 
> Laurent Pinchart
> 
> _______________________________________________
> libcamera-devel mailing list
> libcamera-devel@lists.libcamera.org
> https://lists.libcamera.org/listinfo/libcamera-devel
Laurent Pinchart May 19, 2020, 2:43 p.m. UTC | #2
Hi Laurent,

On Tue, May 19, 2020 at 04:38:14PM +0200, Niklas Söderlund wrote:
> On 2020-05-19 06:25:00 +0300, Laurent Pinchart wrote:
> > Make the BufferWriter class inherit from FrameSink, and use the
> > FrameSink API to manage it. This makes the code more generic, and will
> > allow usage of other sinks.
> > 
> > Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> > ---
> >  src/cam/buffer_writer.cpp | 25 ++++++++++---
> >  src/cam/buffer_writer.h   | 13 ++++---
> >  src/cam/capture.cpp       | 76 ++++++++++++++++++++++++++++++++-------
> >  src/cam/capture.h         |  6 ++--
> >  4 files changed, 97 insertions(+), 23 deletions(-)
> > 
> > diff --git a/src/cam/buffer_writer.cpp b/src/cam/buffer_writer.cpp
> > index c5a5eb46224a..2bec4b132155 100644
> > --- a/src/cam/buffer_writer.cpp
> > +++ b/src/cam/buffer_writer.cpp
> > @@ -13,6 +13,8 @@
> >  #include <sys/mman.h>
> >  #include <unistd.h>
> >  
> > +#include <libcamera/camera.h>
> > +
> >  #include "buffer_writer.h"
> >  
> >  using namespace libcamera;
> > @@ -32,6 +34,21 @@ BufferWriter::~BufferWriter()
> >  	mappedBuffers_.clear();
> >  }
> >  
> > +int BufferWriter::configure(const libcamera::CameraConfiguration &config)
> > +{
> > +	int ret = FrameSink::configure(config);
> > +	if (ret < 0)
> > +		return ret;
> > +
> > +	streamNames_.clear();
> > +	for (unsigned int index = 0; index < config.size(); ++index) {
> > +		const StreamConfiguration &cfg = config.at(index);
> > +		streamNames_[cfg.stream()] = "stream" + std::to_string(index);
> > +	}
> > +
> > +	return 0;
> > +}
> > +
> >  void BufferWriter::mapBuffer(FrameBuffer *buffer)
> >  {
> >  	for (const FrameBuffer::Plane &plane : buffer->planes()) {
> > @@ -43,7 +60,7 @@ void BufferWriter::mapBuffer(FrameBuffer *buffer)
> >  	}
> >  }
> >  
> > -int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
> > +bool BufferWriter::consumeBuffer(const Stream *stream, FrameBuffer *buffer)
> >  {
> >  	std::string filename;
> >  	size_t pos;
> > @@ -53,7 +70,7 @@ int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
> >  	pos = filename.find_first_of('#');
> >  	if (pos != std::string::npos) {
> >  		std::stringstream ss;
> > -		ss << streamName << "-" << std::setw(6)
> > +		ss << streamNames_[stream] << "-" << std::setw(6)
> >  		   << std::setfill('0') << buffer->metadata().sequence;
> >  		filename.replace(pos, 1, ss.str());
> >  	}
> > @@ -62,7 +79,7 @@ int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
> >  		  (pos == std::string::npos ? O_APPEND : O_TRUNC),
> >  		  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
> >  	if (fd == -1)
> > -		return -errno;
> > +		return true;
> 
> Would it make sens to log an error here as the error no longer is 
> propagated to the caller?

Good point, I'll do so.

> >  
> >  	for (const FrameBuffer::Plane &plane : buffer->planes()) {
> >  		void *data = mappedBuffers_[plane.fd.fd()].first;
> > @@ -84,5 +101,5 @@ int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
> >  
> >  	close(fd);
> >  
> > -	return ret;
> > +	return true;
> >  }
> > diff --git a/src/cam/buffer_writer.h b/src/cam/buffer_writer.h
> > index 47e26103e13e..5a5b176f73d8 100644
> > --- a/src/cam/buffer_writer.h
> > +++ b/src/cam/buffer_writer.h
> > @@ -12,18 +12,23 @@
> >  
> >  #include <libcamera/buffer.h>
> >  
> > -class BufferWriter
> > +#include "frame_sink.h"
> > +
> > +class BufferWriter : public FrameSink
> >  {
> >  public:
> >  	BufferWriter(const std::string &pattern = "frame-#.bin");
> >  	~BufferWriter();
> >  
> > -	void mapBuffer(libcamera::FrameBuffer *buffer);
> > +	int configure(const libcamera::CameraConfiguration &config) override;
> >  
> > -	int write(libcamera::FrameBuffer *buffer,
> > -		  const std::string &streamName);
> > +	void mapBuffer(libcamera::FrameBuffer *buffer) override;
> > +
> > +	bool consumeBuffer(const libcamera::Stream *stream,
> > +			   libcamera::FrameBuffer *buffer) override;
> >  
> >  private:
> > +	std::map<const libcamera::Stream *, std::string> streamNames_;
> >  	std::string pattern_;
> >  	std::map<int, std::pair<void *, unsigned int>> mappedBuffers_;
> >  };
> > diff --git a/src/cam/capture.cpp b/src/cam/capture.cpp
> > index b7e06bcc9463..7fc9cba48892 100644
> > --- a/src/cam/capture.cpp
> > +++ b/src/cam/capture.cpp
> > @@ -11,6 +11,7 @@
> >  #include <limits.h>
> >  #include <sstream>
> >  
> > +#include "buffer_writer.h"
> >  #include "capture.h"
> >  #include "main.h"
> >  
> > @@ -18,7 +19,7 @@ using namespace libcamera;
> >  
> >  Capture::Capture(std::shared_ptr<Camera> camera, CameraConfiguration *config,
> >  		 const StreamRoles &roles)
> > -	: camera_(camera), config_(config), roles_(roles), writer_(nullptr)
> > +	: camera_(camera), config_(config), roles_(roles), sink_(nullptr)
> >  {
> >  }
> >  
> > @@ -47,20 +48,28 @@ int Capture::run(EventLoop *loop, const OptionsParser::Options &options)
> >  
> >  	if (options.isSet(OptFile)) {
> >  		if (!options[OptFile].toString().empty())
> > -			writer_ = new BufferWriter(options[OptFile]);
> > +			sink_ = new BufferWriter(options[OptFile]);
> >  		else
> > -			writer_ = new BufferWriter();
> > +			sink_ = new BufferWriter();
> >  	}
> >  
> > +	if (sink_) {
> > +		ret = sink_->configure(*config_);
> > +		if (ret < 0) {
> > +			std::cout << "Failed to configure frame sink"
> > +				  << std::endl;
> > +			return ret;
> > +		}
> > +
> > +		sink_->bufferReleased.connect(this, &Capture::sinkRelease);
> > +	}
> >  
> >  	FrameBufferAllocator *allocator = new FrameBufferAllocator(camera_);
> >  
> >  	ret = capture(loop, allocator);
> >  
> > -	if (options.isSet(OptFile)) {
> > -		delete writer_;
> > -		writer_ = nullptr;
> > -	}
> > +	delete sink_;
> > +	sink_ = nullptr;
> >  
> >  	delete allocator;
> >  
> > @@ -110,16 +119,26 @@ int Capture::capture(EventLoop *loop, FrameBufferAllocator *allocator)
> >  				return ret;
> >  			}
> >  
> > -			if (writer_)
> > -				writer_->mapBuffer(buffer.get());
> > +			if (sink_)
> > +				sink_->mapBuffer(buffer.get());
> >  		}
> >  
> >  		requests.push_back(request);
> >  	}
> >  
> > +	if (sink_) {
> > +		ret = sink_->start();
> > +		if (ret) {
> > +			std::cout << "Failed to start frame sink" << std::endl;
> > +			return ret;
> > +		}
> > +	}
> > +
> >  	ret = camera_->start();
> >  	if (ret) {
> >  		std::cout << "Failed to start capture" << std::endl;
> > +		if (sink_)
> > +			sink_->stop();
> >  		return ret;
> >  	}
> >  
> > @@ -128,6 +147,8 @@ int Capture::capture(EventLoop *loop, FrameBufferAllocator *allocator)
> >  		if (ret < 0) {
> >  			std::cerr << "Can't queue request" << std::endl;
> >  			camera_->stop();
> > +			if (sink_)
> > +				sink_->stop();
> >  			return ret;
> >  		}
> >  	}
> > @@ -141,6 +162,12 @@ int Capture::capture(EventLoop *loop, FrameBufferAllocator *allocator)
> >  	if (ret)
> >  		std::cout << "Failed to stop capture" << std::endl;
> >  
> > +	if (sink_) {
> > +		ret = sink_->stop();
> > +		if (ret)
> > +			std::cout << "Failed to stop frame sink" << std::endl;
> > +	}
> > +
> >  	return ret;
> >  }
> >  
> > @@ -157,17 +184,18 @@ void Capture::requestComplete(Request *request)
> >  	    ? 1000.0 / fps : 0.0;
> >  	last_ = now;
> >  
> > +	bool requeue = true;
> > +
> >  	std::stringstream info;
> >  	info << "fps: " << std::fixed << std::setprecision(2) << fps;
> >  
> >  	for (auto it = buffers.begin(); it != buffers.end(); ++it) {
> >  		Stream *stream = it->first;
> >  		FrameBuffer *buffer = it->second;
> > -		const std::string &name = streamName_[stream];
> >  
> >  		const FrameMetadata &metadata = buffer->metadata();
> >  
> > -		info << " " << name
> > +		info << " " << streamName_[stream]
> >  		     << " seq: " << std::setw(6) << std::setfill('0') << metadata.sequence
> >  		     << " bytesused: ";
> >  
> > @@ -178,12 +206,21 @@ void Capture::requestComplete(Request *request)
> >  				info << "/";
> >  		}
> >  
> > -		if (writer_)
> > -			writer_->write(buffer, name);
> > +		if (sink_) {
> > +			if (!sink_->consumeBuffer(stream, buffer))
> > +				requeue = false;
> 
> This will only work for requests that contains one buffer. In this 
> change it will works as BufferWriter::consumeBuffer() returns true and 
> the while request is requeued as the buffer writer does not need to hold 
> onto the buffers. I know that the new sink added later in this sereis 
> only supports one stream so it returning false and holding onto the 
> buffer will also work with this logic.

Damn, you noticed ;-)

> Would it instead make sens to rework this so that the sink always would 
> need to signal that it's done with the buffer(s) instead of requing the 
> request from the requestComplete() callback? That way the behavior the 
> sinks would be the same disregarding for how long it needs to hold on to 
> the buffer.

I think this mechanism is indeed a bit fragile, and needs to be
reworked. It works in the two cases we support (file sink and kms sink),
but isn't future-proof. That being said, fixing it properly isn't
trivial. We can only requeue the request when all buffers have been
processed (or, rather, maybe we'll need to requeue a request with other
buffers instead). We can't unconditionally queue a new request in the
bufferReleased callback of the sink, as that would only work for a
single stream.

In addition to that, I think the cam application needs to be reworked to
move processing of buffers to its main thread. The file sink, in
particular, is too expensive to run in the request completion callback,
which runs in the camera manager thread. I think it could make sense to
address the two issues together. Do you think it needs to be done as
part of this series ? Or could I just add a \todo to explain the issue
and make sure we don't forget about it ?

> > +		}
> >  	}
> >  
> >  	std::cout << info.str() << std::endl;
> >  
> > +	/*
> > +	 * If the frame sink holds on the buffer, we'll requeue it later in the
> > +	 * complete handler.
> > +	 */
> > +	if (!requeue)
> > +		return;
> > +
> >  	/*
> >  	 * Create a new request and populate it with one buffer for each
> >  	 * stream.
> > @@ -203,3 +240,16 @@ void Capture::requestComplete(Request *request)
> >  
> >  	camera_->queueRequest(request);
> >  }
> > +
> > +void Capture::sinkRelease(libcamera::FrameBuffer *buffer)
> > +{
> > +	Request *request = camera_->createRequest();
> > +	if (!request) {
> > +		std::cerr << "Can't create request" << std::endl;
> > +		return;
> > +	}
> > +
> > +	request->addBuffer(config_->at(0).stream(), buffer);
> > +
> > +	camera_->queueRequest(request);
> > +}
> > diff --git a/src/cam/capture.h b/src/cam/capture.h
> > index c0e697b831fb..3be1d98e4d3e 100644
> > --- a/src/cam/capture.h
> > +++ b/src/cam/capture.h
> > @@ -16,10 +16,11 @@
> >  #include <libcamera/request.h>
> >  #include <libcamera/stream.h>
> >  
> > -#include "buffer_writer.h"
> >  #include "event_loop.h"
> >  #include "options.h"
> >  
> > +class FrameSink;
> > +
> >  class Capture
> >  {
> >  public:
> > @@ -33,13 +34,14 @@ private:
> >  		    libcamera::FrameBufferAllocator *allocator);
> >  
> >  	void requestComplete(libcamera::Request *request);
> > +	void sinkRelease(libcamera::FrameBuffer *buffer);
> >  
> >  	std::shared_ptr<libcamera::Camera> camera_;
> >  	libcamera::CameraConfiguration *config_;
> >  	libcamera::StreamRoles roles_;
> >  
> >  	std::map<libcamera::Stream *, std::string> streamName_;
> > -	BufferWriter *writer_;
> > +	FrameSink *sink_;
> >  	std::chrono::steady_clock::time_point last_;
> >  };
> >
Niklas Söderlund May 19, 2020, 2:59 p.m. UTC | #3
Hi Laurent,

On 2020-05-19 17:43:32 +0300, Laurent Pinchart wrote:
> Hi Laurent,
> 
> On Tue, May 19, 2020 at 04:38:14PM +0200, Niklas Söderlund wrote:
> > On 2020-05-19 06:25:00 +0300, Laurent Pinchart wrote:
> > > Make the BufferWriter class inherit from FrameSink, and use the
> > > FrameSink API to manage it. This makes the code more generic, and will
> > > allow usage of other sinks.
> > > 
> > > Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> > > ---
> > >  src/cam/buffer_writer.cpp | 25 ++++++++++---
> > >  src/cam/buffer_writer.h   | 13 ++++---
> > >  src/cam/capture.cpp       | 76 ++++++++++++++++++++++++++++++++-------
> > >  src/cam/capture.h         |  6 ++--
> > >  4 files changed, 97 insertions(+), 23 deletions(-)
> > > 
> > > diff --git a/src/cam/buffer_writer.cpp b/src/cam/buffer_writer.cpp
> > > index c5a5eb46224a..2bec4b132155 100644
> > > --- a/src/cam/buffer_writer.cpp
> > > +++ b/src/cam/buffer_writer.cpp
> > > @@ -13,6 +13,8 @@
> > >  #include <sys/mman.h>
> > >  #include <unistd.h>
> > >  
> > > +#include <libcamera/camera.h>
> > > +
> > >  #include "buffer_writer.h"
> > >  
> > >  using namespace libcamera;
> > > @@ -32,6 +34,21 @@ BufferWriter::~BufferWriter()
> > >  	mappedBuffers_.clear();
> > >  }
> > >  
> > > +int BufferWriter::configure(const libcamera::CameraConfiguration &config)
> > > +{
> > > +	int ret = FrameSink::configure(config);
> > > +	if (ret < 0)
> > > +		return ret;
> > > +
> > > +	streamNames_.clear();
> > > +	for (unsigned int index = 0; index < config.size(); ++index) {
> > > +		const StreamConfiguration &cfg = config.at(index);
> > > +		streamNames_[cfg.stream()] = "stream" + std::to_string(index);
> > > +	}
> > > +
> > > +	return 0;
> > > +}
> > > +
> > >  void BufferWriter::mapBuffer(FrameBuffer *buffer)
> > >  {
> > >  	for (const FrameBuffer::Plane &plane : buffer->planes()) {
> > > @@ -43,7 +60,7 @@ void BufferWriter::mapBuffer(FrameBuffer *buffer)
> > >  	}
> > >  }
> > >  
> > > -int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
> > > +bool BufferWriter::consumeBuffer(const Stream *stream, FrameBuffer *buffer)
> > >  {
> > >  	std::string filename;
> > >  	size_t pos;
> > > @@ -53,7 +70,7 @@ int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
> > >  	pos = filename.find_first_of('#');
> > >  	if (pos != std::string::npos) {
> > >  		std::stringstream ss;
> > > -		ss << streamName << "-" << std::setw(6)
> > > +		ss << streamNames_[stream] << "-" << std::setw(6)
> > >  		   << std::setfill('0') << buffer->metadata().sequence;
> > >  		filename.replace(pos, 1, ss.str());
> > >  	}
> > > @@ -62,7 +79,7 @@ int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
> > >  		  (pos == std::string::npos ? O_APPEND : O_TRUNC),
> > >  		  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
> > >  	if (fd == -1)
> > > -		return -errno;
> > > +		return true;
> > 
> > Would it make sens to log an error here as the error no longer is 
> > propagated to the caller?
> 
> Good point, I'll do so.
> 
> > >  
> > >  	for (const FrameBuffer::Plane &plane : buffer->planes()) {
> > >  		void *data = mappedBuffers_[plane.fd.fd()].first;
> > > @@ -84,5 +101,5 @@ int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
> > >  
> > >  	close(fd);
> > >  
> > > -	return ret;
> > > +	return true;
> > >  }
> > > diff --git a/src/cam/buffer_writer.h b/src/cam/buffer_writer.h
> > > index 47e26103e13e..5a5b176f73d8 100644
> > > --- a/src/cam/buffer_writer.h
> > > +++ b/src/cam/buffer_writer.h
> > > @@ -12,18 +12,23 @@
> > >  
> > >  #include <libcamera/buffer.h>
> > >  
> > > -class BufferWriter
> > > +#include "frame_sink.h"
> > > +
> > > +class BufferWriter : public FrameSink
> > >  {
> > >  public:
> > >  	BufferWriter(const std::string &pattern = "frame-#.bin");
> > >  	~BufferWriter();
> > >  
> > > -	void mapBuffer(libcamera::FrameBuffer *buffer);
> > > +	int configure(const libcamera::CameraConfiguration &config) override;
> > >  
> > > -	int write(libcamera::FrameBuffer *buffer,
> > > -		  const std::string &streamName);
> > > +	void mapBuffer(libcamera::FrameBuffer *buffer) override;
> > > +
> > > +	bool consumeBuffer(const libcamera::Stream *stream,
> > > +			   libcamera::FrameBuffer *buffer) override;
> > >  
> > >  private:
> > > +	std::map<const libcamera::Stream *, std::string> streamNames_;
> > >  	std::string pattern_;
> > >  	std::map<int, std::pair<void *, unsigned int>> mappedBuffers_;
> > >  };
> > > diff --git a/src/cam/capture.cpp b/src/cam/capture.cpp
> > > index b7e06bcc9463..7fc9cba48892 100644
> > > --- a/src/cam/capture.cpp
> > > +++ b/src/cam/capture.cpp
> > > @@ -11,6 +11,7 @@
> > >  #include <limits.h>
> > >  #include <sstream>
> > >  
> > > +#include "buffer_writer.h"
> > >  #include "capture.h"
> > >  #include "main.h"
> > >  
> > > @@ -18,7 +19,7 @@ using namespace libcamera;
> > >  
> > >  Capture::Capture(std::shared_ptr<Camera> camera, CameraConfiguration *config,
> > >  		 const StreamRoles &roles)
> > > -	: camera_(camera), config_(config), roles_(roles), writer_(nullptr)
> > > +	: camera_(camera), config_(config), roles_(roles), sink_(nullptr)
> > >  {
> > >  }
> > >  
> > > @@ -47,20 +48,28 @@ int Capture::run(EventLoop *loop, const OptionsParser::Options &options)
> > >  
> > >  	if (options.isSet(OptFile)) {
> > >  		if (!options[OptFile].toString().empty())
> > > -			writer_ = new BufferWriter(options[OptFile]);
> > > +			sink_ = new BufferWriter(options[OptFile]);
> > >  		else
> > > -			writer_ = new BufferWriter();
> > > +			sink_ = new BufferWriter();
> > >  	}
> > >  
> > > +	if (sink_) {
> > > +		ret = sink_->configure(*config_);
> > > +		if (ret < 0) {
> > > +			std::cout << "Failed to configure frame sink"
> > > +				  << std::endl;
> > > +			return ret;
> > > +		}
> > > +
> > > +		sink_->bufferReleased.connect(this, &Capture::sinkRelease);
> > > +	}
> > >  
> > >  	FrameBufferAllocator *allocator = new FrameBufferAllocator(camera_);
> > >  
> > >  	ret = capture(loop, allocator);
> > >  
> > > -	if (options.isSet(OptFile)) {
> > > -		delete writer_;
> > > -		writer_ = nullptr;
> > > -	}
> > > +	delete sink_;
> > > +	sink_ = nullptr;
> > >  
> > >  	delete allocator;
> > >  
> > > @@ -110,16 +119,26 @@ int Capture::capture(EventLoop *loop, FrameBufferAllocator *allocator)
> > >  				return ret;
> > >  			}
> > >  
> > > -			if (writer_)
> > > -				writer_->mapBuffer(buffer.get());
> > > +			if (sink_)
> > > +				sink_->mapBuffer(buffer.get());
> > >  		}
> > >  
> > >  		requests.push_back(request);
> > >  	}
> > >  
> > > +	if (sink_) {
> > > +		ret = sink_->start();
> > > +		if (ret) {
> > > +			std::cout << "Failed to start frame sink" << std::endl;
> > > +			return ret;
> > > +		}
> > > +	}
> > > +
> > >  	ret = camera_->start();
> > >  	if (ret) {
> > >  		std::cout << "Failed to start capture" << std::endl;
> > > +		if (sink_)
> > > +			sink_->stop();
> > >  		return ret;
> > >  	}
> > >  
> > > @@ -128,6 +147,8 @@ int Capture::capture(EventLoop *loop, FrameBufferAllocator *allocator)
> > >  		if (ret < 0) {
> > >  			std::cerr << "Can't queue request" << std::endl;
> > >  			camera_->stop();
> > > +			if (sink_)
> > > +				sink_->stop();
> > >  			return ret;
> > >  		}
> > >  	}
> > > @@ -141,6 +162,12 @@ int Capture::capture(EventLoop *loop, FrameBufferAllocator *allocator)
> > >  	if (ret)
> > >  		std::cout << "Failed to stop capture" << std::endl;
> > >  
> > > +	if (sink_) {
> > > +		ret = sink_->stop();
> > > +		if (ret)
> > > +			std::cout << "Failed to stop frame sink" << std::endl;
> > > +	}
> > > +
> > >  	return ret;
> > >  }
> > >  
> > > @@ -157,17 +184,18 @@ void Capture::requestComplete(Request *request)
> > >  	    ? 1000.0 / fps : 0.0;
> > >  	last_ = now;
> > >  
> > > +	bool requeue = true;
> > > +
> > >  	std::stringstream info;
> > >  	info << "fps: " << std::fixed << std::setprecision(2) << fps;
> > >  
> > >  	for (auto it = buffers.begin(); it != buffers.end(); ++it) {
> > >  		Stream *stream = it->first;
> > >  		FrameBuffer *buffer = it->second;
> > > -		const std::string &name = streamName_[stream];
> > >  
> > >  		const FrameMetadata &metadata = buffer->metadata();
> > >  
> > > -		info << " " << name
> > > +		info << " " << streamName_[stream]
> > >  		     << " seq: " << std::setw(6) << std::setfill('0') << metadata.sequence
> > >  		     << " bytesused: ";
> > >  
> > > @@ -178,12 +206,21 @@ void Capture::requestComplete(Request *request)
> > >  				info << "/";
> > >  		}
> > >  
> > > -		if (writer_)
> > > -			writer_->write(buffer, name);
> > > +		if (sink_) {
> > > +			if (!sink_->consumeBuffer(stream, buffer))
> > > +				requeue = false;
> > 
> > This will only work for requests that contains one buffer. In this 
> > change it will works as BufferWriter::consumeBuffer() returns true and 
> > the while request is requeued as the buffer writer does not need to hold 
> > onto the buffers. I know that the new sink added later in this sereis 
> > only supports one stream so it returning false and holding onto the 
> > buffer will also work with this logic.
> 
> Damn, you noticed ;-)

Sorry, I will pay less attention in the future ;-P

> 
> > Would it instead make sens to rework this so that the sink always would 
> > need to signal that it's done with the buffer(s) instead of requing the 
> > request from the requestComplete() callback? That way the behavior the 
> > sinks would be the same disregarding for how long it needs to hold on to 
> > the buffer.
> 
> I think this mechanism is indeed a bit fragile, and needs to be
> reworked. It works in the two cases we support (file sink and kms sink),
> but isn't future-proof. That being said, fixing it properly isn't
> trivial. We can only requeue the request when all buffers have been
> processed (or, rather, maybe we'll need to requeue a request with other
> buffers instead). We can't unconditionally queue a new request in the
> bufferReleased callback of the sink, as that would only work for a
> single stream.
> 
> In addition to that, I think the cam application needs to be reworked to
> move processing of buffers to its main thread. The file sink, in
> particular, is too expensive to run in the request completion callback,
> which runs in the camera manager thread. I think it could make sense to
> address the two issues together. Do you think it needs to be done as
> part of this series ? Or could I just add a \todo to explain the issue
> and make sure we don't forget about it ?

I don't think this needs to be solved the perfect way now, but I do 
think we need something where adding a 3rd sink that supports more then 
one stream and needs to hold onto buffers do not break, but they might 
not need to function in the most efficient way.

As we already discussed that for now cam would only support a single 
sink would it make sens to rework the interface to deal with a Request 
instead of a FrameBuffer? That way the logic you have here could still 
be used while allowing new sinks to be added which would be less 
fragile.

That being said, I do agree with you that deepening on what end gaols we 
have for cam we will likely need to rework how requests are constructed.  
We still have the whole concept of request reuse and request templates 
to bash out :-)

I'm however not convinced I like the idea of making cam an interactive 
console tool. Things like push spacebar to save frame to disk while 
viewing the viewfinder on the framebuffer I think maybe is another 
application, but I might be wrong.

> 
> > > +		}
> > >  	}
> > >  
> > >  	std::cout << info.str() << std::endl;
> > >  
> > > +	/*
> > > +	 * If the frame sink holds on the buffer, we'll requeue it later in the
> > > +	 * complete handler.
> > > +	 */
> > > +	if (!requeue)
> > > +		return;
> > > +
> > >  	/*
> > >  	 * Create a new request and populate it with one buffer for each
> > >  	 * stream.
> > > @@ -203,3 +240,16 @@ void Capture::requestComplete(Request *request)
> > >  
> > >  	camera_->queueRequest(request);
> > >  }
> > > +
> > > +void Capture::sinkRelease(libcamera::FrameBuffer *buffer)
> > > +{
> > > +	Request *request = camera_->createRequest();
> > > +	if (!request) {
> > > +		std::cerr << "Can't create request" << std::endl;
> > > +		return;
> > > +	}
> > > +
> > > +	request->addBuffer(config_->at(0).stream(), buffer);
> > > +
> > > +	camera_->queueRequest(request);
> > > +}
> > > diff --git a/src/cam/capture.h b/src/cam/capture.h
> > > index c0e697b831fb..3be1d98e4d3e 100644
> > > --- a/src/cam/capture.h
> > > +++ b/src/cam/capture.h
> > > @@ -16,10 +16,11 @@
> > >  #include <libcamera/request.h>
> > >  #include <libcamera/stream.h>
> > >  
> > > -#include "buffer_writer.h"
> > >  #include "event_loop.h"
> > >  #include "options.h"
> > >  
> > > +class FrameSink;
> > > +
> > >  class Capture
> > >  {
> > >  public:
> > > @@ -33,13 +34,14 @@ private:
> > >  		    libcamera::FrameBufferAllocator *allocator);
> > >  
> > >  	void requestComplete(libcamera::Request *request);
> > > +	void sinkRelease(libcamera::FrameBuffer *buffer);
> > >  
> > >  	std::shared_ptr<libcamera::Camera> camera_;
> > >  	libcamera::CameraConfiguration *config_;
> > >  	libcamera::StreamRoles roles_;
> > >  
> > >  	std::map<libcamera::Stream *, std::string> streamName_;
> > > -	BufferWriter *writer_;
> > > +	FrameSink *sink_;
> > >  	std::chrono::steady_clock::time_point last_;
> > >  };
> > >  
> 
> -- 
> Regards,
> 
> Laurent Pinchart
Laurent Pinchart May 19, 2020, 10:26 p.m. UTC | #4
Hi Niklas,

On Tue, May 19, 2020 at 04:59:28PM +0200, Niklas Söderlund wrote:
> On 2020-05-19 17:43:32 +0300, Laurent Pinchart wrote:
> > On Tue, May 19, 2020 at 04:38:14PM +0200, Niklas Söderlund wrote:
> > > On 2020-05-19 06:25:00 +0300, Laurent Pinchart wrote:
> > > > Make the BufferWriter class inherit from FrameSink, and use the
> > > > FrameSink API to manage it. This makes the code more generic, and will
> > > > allow usage of other sinks.
> > > > 
> > > > Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> > > > ---
> > > >  src/cam/buffer_writer.cpp | 25 ++++++++++---
> > > >  src/cam/buffer_writer.h   | 13 ++++---
> > > >  src/cam/capture.cpp       | 76 ++++++++++++++++++++++++++++++++-------
> > > >  src/cam/capture.h         |  6 ++--
> > > >  4 files changed, 97 insertions(+), 23 deletions(-)
> > > > 
> > > > diff --git a/src/cam/buffer_writer.cpp b/src/cam/buffer_writer.cpp
> > > > index c5a5eb46224a..2bec4b132155 100644
> > > > --- a/src/cam/buffer_writer.cpp
> > > > +++ b/src/cam/buffer_writer.cpp
> > > > @@ -13,6 +13,8 @@
> > > >  #include <sys/mman.h>
> > > >  #include <unistd.h>
> > > >  
> > > > +#include <libcamera/camera.h>
> > > > +
> > > >  #include "buffer_writer.h"
> > > >  
> > > >  using namespace libcamera;
> > > > @@ -32,6 +34,21 @@ BufferWriter::~BufferWriter()
> > > >  	mappedBuffers_.clear();
> > > >  }
> > > >  
> > > > +int BufferWriter::configure(const libcamera::CameraConfiguration &config)
> > > > +{
> > > > +	int ret = FrameSink::configure(config);
> > > > +	if (ret < 0)
> > > > +		return ret;
> > > > +
> > > > +	streamNames_.clear();
> > > > +	for (unsigned int index = 0; index < config.size(); ++index) {
> > > > +		const StreamConfiguration &cfg = config.at(index);
> > > > +		streamNames_[cfg.stream()] = "stream" + std::to_string(index);
> > > > +	}
> > > > +
> > > > +	return 0;
> > > > +}
> > > > +
> > > >  void BufferWriter::mapBuffer(FrameBuffer *buffer)
> > > >  {
> > > >  	for (const FrameBuffer::Plane &plane : buffer->planes()) {
> > > > @@ -43,7 +60,7 @@ void BufferWriter::mapBuffer(FrameBuffer *buffer)
> > > >  	}
> > > >  }
> > > >  
> > > > -int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
> > > > +bool BufferWriter::consumeBuffer(const Stream *stream, FrameBuffer *buffer)
> > > >  {
> > > >  	std::string filename;
> > > >  	size_t pos;
> > > > @@ -53,7 +70,7 @@ int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
> > > >  	pos = filename.find_first_of('#');
> > > >  	if (pos != std::string::npos) {
> > > >  		std::stringstream ss;
> > > > -		ss << streamName << "-" << std::setw(6)
> > > > +		ss << streamNames_[stream] << "-" << std::setw(6)
> > > >  		   << std::setfill('0') << buffer->metadata().sequence;
> > > >  		filename.replace(pos, 1, ss.str());
> > > >  	}
> > > > @@ -62,7 +79,7 @@ int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
> > > >  		  (pos == std::string::npos ? O_APPEND : O_TRUNC),
> > > >  		  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
> > > >  	if (fd == -1)
> > > > -		return -errno;
> > > > +		return true;
> > > 
> > > Would it make sens to log an error here as the error no longer is 
> > > propagated to the caller?
> > 
> > Good point, I'll do so.
> > 
> > > >  
> > > >  	for (const FrameBuffer::Plane &plane : buffer->planes()) {
> > > >  		void *data = mappedBuffers_[plane.fd.fd()].first;
> > > > @@ -84,5 +101,5 @@ int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
> > > >  
> > > >  	close(fd);
> > > >  
> > > > -	return ret;
> > > > +	return true;
> > > >  }
> > > > diff --git a/src/cam/buffer_writer.h b/src/cam/buffer_writer.h
> > > > index 47e26103e13e..5a5b176f73d8 100644
> > > > --- a/src/cam/buffer_writer.h
> > > > +++ b/src/cam/buffer_writer.h
> > > > @@ -12,18 +12,23 @@
> > > >  
> > > >  #include <libcamera/buffer.h>
> > > >  
> > > > -class BufferWriter
> > > > +#include "frame_sink.h"
> > > > +
> > > > +class BufferWriter : public FrameSink
> > > >  {
> > > >  public:
> > > >  	BufferWriter(const std::string &pattern = "frame-#.bin");
> > > >  	~BufferWriter();
> > > >  
> > > > -	void mapBuffer(libcamera::FrameBuffer *buffer);
> > > > +	int configure(const libcamera::CameraConfiguration &config) override;
> > > >  
> > > > -	int write(libcamera::FrameBuffer *buffer,
> > > > -		  const std::string &streamName);
> > > > +	void mapBuffer(libcamera::FrameBuffer *buffer) override;
> > > > +
> > > > +	bool consumeBuffer(const libcamera::Stream *stream,
> > > > +			   libcamera::FrameBuffer *buffer) override;
> > > >  
> > > >  private:
> > > > +	std::map<const libcamera::Stream *, std::string> streamNames_;
> > > >  	std::string pattern_;
> > > >  	std::map<int, std::pair<void *, unsigned int>> mappedBuffers_;
> > > >  };
> > > > diff --git a/src/cam/capture.cpp b/src/cam/capture.cpp
> > > > index b7e06bcc9463..7fc9cba48892 100644
> > > > --- a/src/cam/capture.cpp
> > > > +++ b/src/cam/capture.cpp
> > > > @@ -11,6 +11,7 @@
> > > >  #include <limits.h>
> > > >  #include <sstream>
> > > >  
> > > > +#include "buffer_writer.h"
> > > >  #include "capture.h"
> > > >  #include "main.h"
> > > >  
> > > > @@ -18,7 +19,7 @@ using namespace libcamera;
> > > >  
> > > >  Capture::Capture(std::shared_ptr<Camera> camera, CameraConfiguration *config,
> > > >  		 const StreamRoles &roles)
> > > > -	: camera_(camera), config_(config), roles_(roles), writer_(nullptr)
> > > > +	: camera_(camera), config_(config), roles_(roles), sink_(nullptr)
> > > >  {
> > > >  }
> > > >  
> > > > @@ -47,20 +48,28 @@ int Capture::run(EventLoop *loop, const OptionsParser::Options &options)
> > > >  
> > > >  	if (options.isSet(OptFile)) {
> > > >  		if (!options[OptFile].toString().empty())
> > > > -			writer_ = new BufferWriter(options[OptFile]);
> > > > +			sink_ = new BufferWriter(options[OptFile]);
> > > >  		else
> > > > -			writer_ = new BufferWriter();
> > > > +			sink_ = new BufferWriter();
> > > >  	}
> > > >  
> > > > +	if (sink_) {
> > > > +		ret = sink_->configure(*config_);
> > > > +		if (ret < 0) {
> > > > +			std::cout << "Failed to configure frame sink"
> > > > +				  << std::endl;
> > > > +			return ret;
> > > > +		}
> > > > +
> > > > +		sink_->bufferReleased.connect(this, &Capture::sinkRelease);
> > > > +	}
> > > >  
> > > >  	FrameBufferAllocator *allocator = new FrameBufferAllocator(camera_);
> > > >  
> > > >  	ret = capture(loop, allocator);
> > > >  
> > > > -	if (options.isSet(OptFile)) {
> > > > -		delete writer_;
> > > > -		writer_ = nullptr;
> > > > -	}
> > > > +	delete sink_;
> > > > +	sink_ = nullptr;
> > > >  
> > > >  	delete allocator;
> > > >  
> > > > @@ -110,16 +119,26 @@ int Capture::capture(EventLoop *loop, FrameBufferAllocator *allocator)
> > > >  				return ret;
> > > >  			}
> > > >  
> > > > -			if (writer_)
> > > > -				writer_->mapBuffer(buffer.get());
> > > > +			if (sink_)
> > > > +				sink_->mapBuffer(buffer.get());
> > > >  		}
> > > >  
> > > >  		requests.push_back(request);
> > > >  	}
> > > >  
> > > > +	if (sink_) {
> > > > +		ret = sink_->start();
> > > > +		if (ret) {
> > > > +			std::cout << "Failed to start frame sink" << std::endl;
> > > > +			return ret;
> > > > +		}
> > > > +	}
> > > > +
> > > >  	ret = camera_->start();
> > > >  	if (ret) {
> > > >  		std::cout << "Failed to start capture" << std::endl;
> > > > +		if (sink_)
> > > > +			sink_->stop();
> > > >  		return ret;
> > > >  	}
> > > >  
> > > > @@ -128,6 +147,8 @@ int Capture::capture(EventLoop *loop, FrameBufferAllocator *allocator)
> > > >  		if (ret < 0) {
> > > >  			std::cerr << "Can't queue request" << std::endl;
> > > >  			camera_->stop();
> > > > +			if (sink_)
> > > > +				sink_->stop();
> > > >  			return ret;
> > > >  		}
> > > >  	}
> > > > @@ -141,6 +162,12 @@ int Capture::capture(EventLoop *loop, FrameBufferAllocator *allocator)
> > > >  	if (ret)
> > > >  		std::cout << "Failed to stop capture" << std::endl;
> > > >  
> > > > +	if (sink_) {
> > > > +		ret = sink_->stop();
> > > > +		if (ret)
> > > > +			std::cout << "Failed to stop frame sink" << std::endl;
> > > > +	}
> > > > +
> > > >  	return ret;
> > > >  }
> > > >  
> > > > @@ -157,17 +184,18 @@ void Capture::requestComplete(Request *request)
> > > >  	    ? 1000.0 / fps : 0.0;
> > > >  	last_ = now;
> > > >  
> > > > +	bool requeue = true;
> > > > +
> > > >  	std::stringstream info;
> > > >  	info << "fps: " << std::fixed << std::setprecision(2) << fps;
> > > >  
> > > >  	for (auto it = buffers.begin(); it != buffers.end(); ++it) {
> > > >  		Stream *stream = it->first;
> > > >  		FrameBuffer *buffer = it->second;
> > > > -		const std::string &name = streamName_[stream];
> > > >  
> > > >  		const FrameMetadata &metadata = buffer->metadata();
> > > >  
> > > > -		info << " " << name
> > > > +		info << " " << streamName_[stream]
> > > >  		     << " seq: " << std::setw(6) << std::setfill('0') << metadata.sequence
> > > >  		     << " bytesused: ";
> > > >  
> > > > @@ -178,12 +206,21 @@ void Capture::requestComplete(Request *request)
> > > >  				info << "/";
> > > >  		}
> > > >  
> > > > -		if (writer_)
> > > > -			writer_->write(buffer, name);
> > > > +		if (sink_) {
> > > > +			if (!sink_->consumeBuffer(stream, buffer))
> > > > +				requeue = false;
> > > 
> > > This will only work for requests that contains one buffer. In this 
> > > change it will works as BufferWriter::consumeBuffer() returns true and 
> > > the while request is requeued as the buffer writer does not need to hold 
> > > onto the buffers. I know that the new sink added later in this sereis 
> > > only supports one stream so it returning false and holding onto the 
> > > buffer will also work with this logic.
> > 
> > Damn, you noticed ;-)
> 
> Sorry, I will pay less attention in the future ;-P
> 
> > > Would it instead make sens to rework this so that the sink always would 
> > > need to signal that it's done with the buffer(s) instead of requing the 
> > > request from the requestComplete() callback? That way the behavior the 
> > > sinks would be the same disregarding for how long it needs to hold on to 
> > > the buffer.
> > 
> > I think this mechanism is indeed a bit fragile, and needs to be
> > reworked. It works in the two cases we support (file sink and kms sink),
> > but isn't future-proof. That being said, fixing it properly isn't
> > trivial. We can only requeue the request when all buffers have been
> > processed (or, rather, maybe we'll need to requeue a request with other
> > buffers instead). We can't unconditionally queue a new request in the
> > bufferReleased callback of the sink, as that would only work for a
> > single stream.
> > 
> > In addition to that, I think the cam application needs to be reworked to
> > move processing of buffers to its main thread. The file sink, in
> > particular, is too expensive to run in the request completion callback,
> > which runs in the camera manager thread. I think it could make sense to
> > address the two issues together. Do you think it needs to be done as
> > part of this series ? Or could I just add a \todo to explain the issue
> > and make sure we don't forget about it ?
> 
> I don't think this needs to be solved the perfect way now, but I do 
> think we need something where adding a 3rd sink that supports more then 
> one stream and needs to hold onto buffers do not break, but they might 
> not need to function in the most efficient way.
> 
> As we already discussed that for now cam would only support a single 
> sink would it make sens to rework the interface to deal with a Request 
> instead of a FrameBuffer? That way the logic you have here could still 
> be used while allowing new sinks to be added which would be less 
> fragile.

Just to make it clear, how much rework would you do as part of this
series, and how much should be delayed ?

> That being said, I do agree with you that deepening on what end gaols we 
> have for cam we will likely need to rework how requests are constructed.  
> We still have the whole concept of request reuse and request templates 
> to bash out :-)
> 
> I'm however not convinced I like the idea of making cam an interactive 
> console tool. Things like push spacebar to save frame to disk while 
> viewing the viewfinder on the framebuffer I think maybe is another 
> application, but I might be wrong.

You're probably right there, it's not a direction I intend to push for.

> > > > +		}
> > > >  	}
> > > >  
> > > >  	std::cout << info.str() << std::endl;
> > > >  
> > > > +	/*
> > > > +	 * If the frame sink holds on the buffer, we'll requeue it later in the
> > > > +	 * complete handler.
> > > > +	 */
> > > > +	if (!requeue)
> > > > +		return;
> > > > +
> > > >  	/*
> > > >  	 * Create a new request and populate it with one buffer for each
> > > >  	 * stream.
> > > > @@ -203,3 +240,16 @@ void Capture::requestComplete(Request *request)
> > > >  
> > > >  	camera_->queueRequest(request);
> > > >  }
> > > > +
> > > > +void Capture::sinkRelease(libcamera::FrameBuffer *buffer)
> > > > +{
> > > > +	Request *request = camera_->createRequest();
> > > > +	if (!request) {
> > > > +		std::cerr << "Can't create request" << std::endl;
> > > > +		return;
> > > > +	}
> > > > +
> > > > +	request->addBuffer(config_->at(0).stream(), buffer);
> > > > +
> > > > +	camera_->queueRequest(request);
> > > > +}
> > > > diff --git a/src/cam/capture.h b/src/cam/capture.h
> > > > index c0e697b831fb..3be1d98e4d3e 100644
> > > > --- a/src/cam/capture.h
> > > > +++ b/src/cam/capture.h
> > > > @@ -16,10 +16,11 @@
> > > >  #include <libcamera/request.h>
> > > >  #include <libcamera/stream.h>
> > > >  
> > > > -#include "buffer_writer.h"
> > > >  #include "event_loop.h"
> > > >  #include "options.h"
> > > >  
> > > > +class FrameSink;
> > > > +
> > > >  class Capture
> > > >  {
> > > >  public:
> > > > @@ -33,13 +34,14 @@ private:
> > > >  		    libcamera::FrameBufferAllocator *allocator);
> > > >  
> > > >  	void requestComplete(libcamera::Request *request);
> > > > +	void sinkRelease(libcamera::FrameBuffer *buffer);
> > > >  
> > > >  	std::shared_ptr<libcamera::Camera> camera_;
> > > >  	libcamera::CameraConfiguration *config_;
> > > >  	libcamera::StreamRoles roles_;
> > > >  
> > > >  	std::map<libcamera::Stream *, std::string> streamName_;
> > > > -	BufferWriter *writer_;
> > > > +	FrameSink *sink_;
> > > >  	std::chrono::steady_clock::time_point last_;
> > > >  };
> > > >
Niklas Söderlund May 22, 2020, 1:02 a.m. UTC | #5
Hi Laurent,

On 2020-05-20 01:26:10 +0300, Laurent Pinchart wrote:
> Hi Niklas,
> 
> On Tue, May 19, 2020 at 04:59:28PM +0200, Niklas Söderlund wrote:
> > On 2020-05-19 17:43:32 +0300, Laurent Pinchart wrote:
> > > On Tue, May 19, 2020 at 04:38:14PM +0200, Niklas Söderlund wrote:
> > > > On 2020-05-19 06:25:00 +0300, Laurent Pinchart wrote:
> > > > > Make the BufferWriter class inherit from FrameSink, and use the
> > > > > FrameSink API to manage it. This makes the code more generic, and will
> > > > > allow usage of other sinks.
> > > > > 
> > > > > Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> > > > > ---
> > > > >  src/cam/buffer_writer.cpp | 25 ++++++++++---
> > > > >  src/cam/buffer_writer.h   | 13 ++++---
> > > > >  src/cam/capture.cpp       | 76 ++++++++++++++++++++++++++++++++-------
> > > > >  src/cam/capture.h         |  6 ++--
> > > > >  4 files changed, 97 insertions(+), 23 deletions(-)
> > > > > 
> > > > > diff --git a/src/cam/buffer_writer.cpp b/src/cam/buffer_writer.cpp
> > > > > index c5a5eb46224a..2bec4b132155 100644
> > > > > --- a/src/cam/buffer_writer.cpp
> > > > > +++ b/src/cam/buffer_writer.cpp
> > > > > @@ -13,6 +13,8 @@
> > > > >  #include <sys/mman.h>
> > > > >  #include <unistd.h>
> > > > >  
> > > > > +#include <libcamera/camera.h>
> > > > > +
> > > > >  #include "buffer_writer.h"
> > > > >  
> > > > >  using namespace libcamera;
> > > > > @@ -32,6 +34,21 @@ BufferWriter::~BufferWriter()
> > > > >  	mappedBuffers_.clear();
> > > > >  }
> > > > >  
> > > > > +int BufferWriter::configure(const libcamera::CameraConfiguration &config)
> > > > > +{
> > > > > +	int ret = FrameSink::configure(config);
> > > > > +	if (ret < 0)
> > > > > +		return ret;
> > > > > +
> > > > > +	streamNames_.clear();
> > > > > +	for (unsigned int index = 0; index < config.size(); ++index) {
> > > > > +		const StreamConfiguration &cfg = config.at(index);
> > > > > +		streamNames_[cfg.stream()] = "stream" + std::to_string(index);
> > > > > +	}
> > > > > +
> > > > > +	return 0;
> > > > > +}
> > > > > +
> > > > >  void BufferWriter::mapBuffer(FrameBuffer *buffer)
> > > > >  {
> > > > >  	for (const FrameBuffer::Plane &plane : buffer->planes()) {
> > > > > @@ -43,7 +60,7 @@ void BufferWriter::mapBuffer(FrameBuffer *buffer)
> > > > >  	}
> > > > >  }
> > > > >  
> > > > > -int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
> > > > > +bool BufferWriter::consumeBuffer(const Stream *stream, FrameBuffer *buffer)
> > > > >  {
> > > > >  	std::string filename;
> > > > >  	size_t pos;
> > > > > @@ -53,7 +70,7 @@ int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
> > > > >  	pos = filename.find_first_of('#');
> > > > >  	if (pos != std::string::npos) {
> > > > >  		std::stringstream ss;
> > > > > -		ss << streamName << "-" << std::setw(6)
> > > > > +		ss << streamNames_[stream] << "-" << std::setw(6)
> > > > >  		   << std::setfill('0') << buffer->metadata().sequence;
> > > > >  		filename.replace(pos, 1, ss.str());
> > > > >  	}
> > > > > @@ -62,7 +79,7 @@ int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
> > > > >  		  (pos == std::string::npos ? O_APPEND : O_TRUNC),
> > > > >  		  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
> > > > >  	if (fd == -1)
> > > > > -		return -errno;
> > > > > +		return true;
> > > > 
> > > > Would it make sens to log an error here as the error no longer is 
> > > > propagated to the caller?
> > > 
> > > Good point, I'll do so.
> > > 
> > > > >  
> > > > >  	for (const FrameBuffer::Plane &plane : buffer->planes()) {
> > > > >  		void *data = mappedBuffers_[plane.fd.fd()].first;
> > > > > @@ -84,5 +101,5 @@ int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
> > > > >  
> > > > >  	close(fd);
> > > > >  
> > > > > -	return ret;
> > > > > +	return true;
> > > > >  }
> > > > > diff --git a/src/cam/buffer_writer.h b/src/cam/buffer_writer.h
> > > > > index 47e26103e13e..5a5b176f73d8 100644
> > > > > --- a/src/cam/buffer_writer.h
> > > > > +++ b/src/cam/buffer_writer.h
> > > > > @@ -12,18 +12,23 @@
> > > > >  
> > > > >  #include <libcamera/buffer.h>
> > > > >  
> > > > > -class BufferWriter
> > > > > +#include "frame_sink.h"
> > > > > +
> > > > > +class BufferWriter : public FrameSink
> > > > >  {
> > > > >  public:
> > > > >  	BufferWriter(const std::string &pattern = "frame-#.bin");
> > > > >  	~BufferWriter();
> > > > >  
> > > > > -	void mapBuffer(libcamera::FrameBuffer *buffer);
> > > > > +	int configure(const libcamera::CameraConfiguration &config) override;
> > > > >  
> > > > > -	int write(libcamera::FrameBuffer *buffer,
> > > > > -		  const std::string &streamName);
> > > > > +	void mapBuffer(libcamera::FrameBuffer *buffer) override;
> > > > > +
> > > > > +	bool consumeBuffer(const libcamera::Stream *stream,
> > > > > +			   libcamera::FrameBuffer *buffer) override;
> > > > >  
> > > > >  private:
> > > > > +	std::map<const libcamera::Stream *, std::string> streamNames_;
> > > > >  	std::string pattern_;
> > > > >  	std::map<int, std::pair<void *, unsigned int>> mappedBuffers_;
> > > > >  };
> > > > > diff --git a/src/cam/capture.cpp b/src/cam/capture.cpp
> > > > > index b7e06bcc9463..7fc9cba48892 100644
> > > > > --- a/src/cam/capture.cpp
> > > > > +++ b/src/cam/capture.cpp
> > > > > @@ -11,6 +11,7 @@
> > > > >  #include <limits.h>
> > > > >  #include <sstream>
> > > > >  
> > > > > +#include "buffer_writer.h"
> > > > >  #include "capture.h"
> > > > >  #include "main.h"
> > > > >  
> > > > > @@ -18,7 +19,7 @@ using namespace libcamera;
> > > > >  
> > > > >  Capture::Capture(std::shared_ptr<Camera> camera, CameraConfiguration *config,
> > > > >  		 const StreamRoles &roles)
> > > > > -	: camera_(camera), config_(config), roles_(roles), writer_(nullptr)
> > > > > +	: camera_(camera), config_(config), roles_(roles), sink_(nullptr)
> > > > >  {
> > > > >  }
> > > > >  
> > > > > @@ -47,20 +48,28 @@ int Capture::run(EventLoop *loop, const OptionsParser::Options &options)
> > > > >  
> > > > >  	if (options.isSet(OptFile)) {
> > > > >  		if (!options[OptFile].toString().empty())
> > > > > -			writer_ = new BufferWriter(options[OptFile]);
> > > > > +			sink_ = new BufferWriter(options[OptFile]);
> > > > >  		else
> > > > > -			writer_ = new BufferWriter();
> > > > > +			sink_ = new BufferWriter();
> > > > >  	}
> > > > >  
> > > > > +	if (sink_) {
> > > > > +		ret = sink_->configure(*config_);
> > > > > +		if (ret < 0) {
> > > > > +			std::cout << "Failed to configure frame sink"
> > > > > +				  << std::endl;
> > > > > +			return ret;
> > > > > +		}
> > > > > +
> > > > > +		sink_->bufferReleased.connect(this, &Capture::sinkRelease);
> > > > > +	}
> > > > >  
> > > > >  	FrameBufferAllocator *allocator = new FrameBufferAllocator(camera_);
> > > > >  
> > > > >  	ret = capture(loop, allocator);
> > > > >  
> > > > > -	if (options.isSet(OptFile)) {
> > > > > -		delete writer_;
> > > > > -		writer_ = nullptr;
> > > > > -	}
> > > > > +	delete sink_;
> > > > > +	sink_ = nullptr;
> > > > >  
> > > > >  	delete allocator;
> > > > >  
> > > > > @@ -110,16 +119,26 @@ int Capture::capture(EventLoop *loop, FrameBufferAllocator *allocator)
> > > > >  				return ret;
> > > > >  			}
> > > > >  
> > > > > -			if (writer_)
> > > > > -				writer_->mapBuffer(buffer.get());
> > > > > +			if (sink_)
> > > > > +				sink_->mapBuffer(buffer.get());
> > > > >  		}
> > > > >  
> > > > >  		requests.push_back(request);
> > > > >  	}
> > > > >  
> > > > > +	if (sink_) {
> > > > > +		ret = sink_->start();
> > > > > +		if (ret) {
> > > > > +			std::cout << "Failed to start frame sink" << std::endl;
> > > > > +			return ret;
> > > > > +		}
> > > > > +	}
> > > > > +
> > > > >  	ret = camera_->start();
> > > > >  	if (ret) {
> > > > >  		std::cout << "Failed to start capture" << std::endl;
> > > > > +		if (sink_)
> > > > > +			sink_->stop();
> > > > >  		return ret;
> > > > >  	}
> > > > >  
> > > > > @@ -128,6 +147,8 @@ int Capture::capture(EventLoop *loop, FrameBufferAllocator *allocator)
> > > > >  		if (ret < 0) {
> > > > >  			std::cerr << "Can't queue request" << std::endl;
> > > > >  			camera_->stop();
> > > > > +			if (sink_)
> > > > > +				sink_->stop();
> > > > >  			return ret;
> > > > >  		}
> > > > >  	}
> > > > > @@ -141,6 +162,12 @@ int Capture::capture(EventLoop *loop, FrameBufferAllocator *allocator)
> > > > >  	if (ret)
> > > > >  		std::cout << "Failed to stop capture" << std::endl;
> > > > >  
> > > > > +	if (sink_) {
> > > > > +		ret = sink_->stop();
> > > > > +		if (ret)
> > > > > +			std::cout << "Failed to stop frame sink" << std::endl;
> > > > > +	}
> > > > > +
> > > > >  	return ret;
> > > > >  }
> > > > >  
> > > > > @@ -157,17 +184,18 @@ void Capture::requestComplete(Request *request)
> > > > >  	    ? 1000.0 / fps : 0.0;
> > > > >  	last_ = now;
> > > > >  
> > > > > +	bool requeue = true;
> > > > > +
> > > > >  	std::stringstream info;
> > > > >  	info << "fps: " << std::fixed << std::setprecision(2) << fps;
> > > > >  
> > > > >  	for (auto it = buffers.begin(); it != buffers.end(); ++it) {
> > > > >  		Stream *stream = it->first;
> > > > >  		FrameBuffer *buffer = it->second;
> > > > > -		const std::string &name = streamName_[stream];
> > > > >  
> > > > >  		const FrameMetadata &metadata = buffer->metadata();
> > > > >  
> > > > > -		info << " " << name
> > > > > +		info << " " << streamName_[stream]
> > > > >  		     << " seq: " << std::setw(6) << std::setfill('0') << metadata.sequence
> > > > >  		     << " bytesused: ";
> > > > >  
> > > > > @@ -178,12 +206,21 @@ void Capture::requestComplete(Request *request)
> > > > >  				info << "/";
> > > > >  		}
> > > > >  
> > > > > -		if (writer_)
> > > > > -			writer_->write(buffer, name);
> > > > > +		if (sink_) {
> > > > > +			if (!sink_->consumeBuffer(stream, buffer))
> > > > > +				requeue = false;
> > > > 
> > > > This will only work for requests that contains one buffer. In this 
> > > > change it will works as BufferWriter::consumeBuffer() returns true and 
> > > > the while request is requeued as the buffer writer does not need to hold 
> > > > onto the buffers. I know that the new sink added later in this sereis 
> > > > only supports one stream so it returning false and holding onto the 
> > > > buffer will also work with this logic.
> > > 
> > > Damn, you noticed ;-)
> > 
> > Sorry, I will pay less attention in the future ;-P
> > 
> > > > Would it instead make sens to rework this so that the sink always would 
> > > > need to signal that it's done with the buffer(s) instead of requing the 
> > > > request from the requestComplete() callback? That way the behavior the 
> > > > sinks would be the same disregarding for how long it needs to hold on to 
> > > > the buffer.
> > > 
> > > I think this mechanism is indeed a bit fragile, and needs to be
> > > reworked. It works in the two cases we support (file sink and kms sink),
> > > but isn't future-proof. That being said, fixing it properly isn't
> > > trivial. We can only requeue the request when all buffers have been
> > > processed (or, rather, maybe we'll need to requeue a request with other
> > > buffers instead). We can't unconditionally queue a new request in the
> > > bufferReleased callback of the sink, as that would only work for a
> > > single stream.
> > > 
> > > In addition to that, I think the cam application needs to be reworked to
> > > move processing of buffers to its main thread. The file sink, in
> > > particular, is too expensive to run in the request completion callback,
> > > which runs in the camera manager thread. I think it could make sense to
> > > address the two issues together. Do you think it needs to be done as
> > > part of this series ? Or could I just add a \todo to explain the issue
> > > and make sure we don't forget about it ?
> > 
> > I don't think this needs to be solved the perfect way now, but I do 
> > think we need something where adding a 3rd sink that supports more then 
> > one stream and needs to hold onto buffers do not break, but they might 
> > not need to function in the most efficient way.
> > 
> > As we already discussed that for now cam would only support a single 
> > sink would it make sens to rework the interface to deal with a Request 
> > instead of a FrameBuffer? That way the logic you have here could still 
> > be used while allowing new sinks to be added which would be less 
> > fragile.
> 
> Just to make it clear, how much rework would you do as part of this
> series, and how much should be delayed ?

For me the least amount needed to allow for a sink that supports more 
then one stream but needs to hold onto the buffers/request. I would be 
OK to pass the whole request to the sink instead of individual buffers. 
Or if you can think of something else more clever.

What I don't like is that the code is fragile and without looking at 
this change in isolation it's not clear what requirements are put on a 
sink so adding a new one could explode and take time to figure why it 
did so. On the other hand I really like the sink concept you add here, 
maybe it can be shared with qcam sometime far into the future. :-)

> 
> > That being said, I do agree with you that deepening on what end gaols we 
> > have for cam we will likely need to rework how requests are constructed.  
> > We still have the whole concept of request reuse and request templates 
> > to bash out :-)
> > 
> > I'm however not convinced I like the idea of making cam an interactive 
> > console tool. Things like push spacebar to save frame to disk while 
> > viewing the viewfinder on the framebuffer I think maybe is another 
> > application, but I might be wrong.
> 
> You're probably right there, it's not a direction I intend to push for.
> 
> > > > > +		}
> > > > >  	}
> > > > >  
> > > > >  	std::cout << info.str() << std::endl;
> > > > >  
> > > > > +	/*
> > > > > +	 * If the frame sink holds on the buffer, we'll requeue it later in the
> > > > > +	 * complete handler.
> > > > > +	 */
> > > > > +	if (!requeue)
> > > > > +		return;
> > > > > +
> > > > >  	/*
> > > > >  	 * Create a new request and populate it with one buffer for each
> > > > >  	 * stream.
> > > > > @@ -203,3 +240,16 @@ void Capture::requestComplete(Request *request)
> > > > >  
> > > > >  	camera_->queueRequest(request);
> > > > >  }
> > > > > +
> > > > > +void Capture::sinkRelease(libcamera::FrameBuffer *buffer)
> > > > > +{
> > > > > +	Request *request = camera_->createRequest();
> > > > > +	if (!request) {
> > > > > +		std::cerr << "Can't create request" << std::endl;
> > > > > +		return;
> > > > > +	}
> > > > > +
> > > > > +	request->addBuffer(config_->at(0).stream(), buffer);
> > > > > +
> > > > > +	camera_->queueRequest(request);
> > > > > +}
> > > > > diff --git a/src/cam/capture.h b/src/cam/capture.h
> > > > > index c0e697b831fb..3be1d98e4d3e 100644
> > > > > --- a/src/cam/capture.h
> > > > > +++ b/src/cam/capture.h
> > > > > @@ -16,10 +16,11 @@
> > > > >  #include <libcamera/request.h>
> > > > >  #include <libcamera/stream.h>
> > > > >  
> > > > > -#include "buffer_writer.h"
> > > > >  #include "event_loop.h"
> > > > >  #include "options.h"
> > > > >  
> > > > > +class FrameSink;
> > > > > +
> > > > >  class Capture
> > > > >  {
> > > > >  public:
> > > > > @@ -33,13 +34,14 @@ private:
> > > > >  		    libcamera::FrameBufferAllocator *allocator);
> > > > >  
> > > > >  	void requestComplete(libcamera::Request *request);
> > > > > +	void sinkRelease(libcamera::FrameBuffer *buffer);
> > > > >  
> > > > >  	std::shared_ptr<libcamera::Camera> camera_;
> > > > >  	libcamera::CameraConfiguration *config_;
> > > > >  	libcamera::StreamRoles roles_;
> > > > >  
> > > > >  	std::map<libcamera::Stream *, std::string> streamName_;
> > > > > -	BufferWriter *writer_;
> > > > > +	FrameSink *sink_;
> > > > >  	std::chrono::steady_clock::time_point last_;
> > > > >  };
> > > > >  
> 
> -- 
> Regards,
> 
> Laurent Pinchart
Laurent Pinchart May 22, 2020, 9:32 p.m. UTC | #6
Hi Niklas,

On Fri, May 22, 2020 at 03:02:14AM +0200, Niklas Söderlund wrote:
> On 2020-05-20 01:26:10 +0300, Laurent Pinchart wrote:
> > On Tue, May 19, 2020 at 04:59:28PM +0200, Niklas Söderlund wrote:
> >> On 2020-05-19 17:43:32 +0300, Laurent Pinchart wrote:
> >>> On Tue, May 19, 2020 at 04:38:14PM +0200, Niklas Söderlund wrote:
> >>>> On 2020-05-19 06:25:00 +0300, Laurent Pinchart wrote:
> >>>>> Make the BufferWriter class inherit from FrameSink, and use the
> >>>>> FrameSink API to manage it. This makes the code more generic, and will
> >>>>> allow usage of other sinks.
> >>>>> 
> >>>>> Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
> >>>>> ---
> >>>>>  src/cam/buffer_writer.cpp | 25 ++++++++++---
> >>>>>  src/cam/buffer_writer.h   | 13 ++++---
> >>>>>  src/cam/capture.cpp       | 76 ++++++++++++++++++++++++++++++++-------
> >>>>>  src/cam/capture.h         |  6 ++--
> >>>>>  4 files changed, 97 insertions(+), 23 deletions(-)
> >>>>> 
> >>>>> diff --git a/src/cam/buffer_writer.cpp b/src/cam/buffer_writer.cpp
> >>>>> index c5a5eb46224a..2bec4b132155 100644
> >>>>> --- a/src/cam/buffer_writer.cpp
> >>>>> +++ b/src/cam/buffer_writer.cpp
> >>>>> @@ -13,6 +13,8 @@
> >>>>>  #include <sys/mman.h>
> >>>>>  #include <unistd.h>
> >>>>>  
> >>>>> +#include <libcamera/camera.h>
> >>>>> +
> >>>>>  #include "buffer_writer.h"
> >>>>>  
> >>>>>  using namespace libcamera;
> >>>>> @@ -32,6 +34,21 @@ BufferWriter::~BufferWriter()
> >>>>>  	mappedBuffers_.clear();
> >>>>>  }
> >>>>>  
> >>>>> +int BufferWriter::configure(const libcamera::CameraConfiguration &config)
> >>>>> +{
> >>>>> +	int ret = FrameSink::configure(config);
> >>>>> +	if (ret < 0)
> >>>>> +		return ret;
> >>>>> +
> >>>>> +	streamNames_.clear();
> >>>>> +	for (unsigned int index = 0; index < config.size(); ++index) {
> >>>>> +		const StreamConfiguration &cfg = config.at(index);
> >>>>> +		streamNames_[cfg.stream()] = "stream" + std::to_string(index);
> >>>>> +	}
> >>>>> +
> >>>>> +	return 0;
> >>>>> +}
> >>>>> +
> >>>>>  void BufferWriter::mapBuffer(FrameBuffer *buffer)
> >>>>>  {
> >>>>>  	for (const FrameBuffer::Plane &plane : buffer->planes()) {
> >>>>> @@ -43,7 +60,7 @@ void BufferWriter::mapBuffer(FrameBuffer *buffer)
> >>>>>  	}
> >>>>>  }
> >>>>>  
> >>>>> -int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
> >>>>> +bool BufferWriter::consumeBuffer(const Stream *stream, FrameBuffer *buffer)
> >>>>>  {
> >>>>>  	std::string filename;
> >>>>>  	size_t pos;
> >>>>> @@ -53,7 +70,7 @@ int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
> >>>>>  	pos = filename.find_first_of('#');
> >>>>>  	if (pos != std::string::npos) {
> >>>>>  		std::stringstream ss;
> >>>>> -		ss << streamName << "-" << std::setw(6)
> >>>>> +		ss << streamNames_[stream] << "-" << std::setw(6)
> >>>>>  		   << std::setfill('0') << buffer->metadata().sequence;
> >>>>>  		filename.replace(pos, 1, ss.str());
> >>>>>  	}
> >>>>> @@ -62,7 +79,7 @@ int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
> >>>>>  		  (pos == std::string::npos ? O_APPEND : O_TRUNC),
> >>>>>  		  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
> >>>>>  	if (fd == -1)
> >>>>> -		return -errno;
> >>>>> +		return true;
> >>>> 
> >>>> Would it make sens to log an error here as the error no longer is 
> >>>> propagated to the caller?
> >>> 
> >>> Good point, I'll do so.
> >>> 
> >>>>>  
> >>>>>  	for (const FrameBuffer::Plane &plane : buffer->planes()) {
> >>>>>  		void *data = mappedBuffers_[plane.fd.fd()].first;
> >>>>> @@ -84,5 +101,5 @@ int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
> >>>>>  
> >>>>>  	close(fd);
> >>>>>  
> >>>>> -	return ret;
> >>>>> +	return true;
> >>>>>  }
> >>>>> diff --git a/src/cam/buffer_writer.h b/src/cam/buffer_writer.h
> >>>>> index 47e26103e13e..5a5b176f73d8 100644
> >>>>> --- a/src/cam/buffer_writer.h
> >>>>> +++ b/src/cam/buffer_writer.h
> >>>>> @@ -12,18 +12,23 @@
> >>>>>  
> >>>>>  #include <libcamera/buffer.h>
> >>>>>  
> >>>>> -class BufferWriter
> >>>>> +#include "frame_sink.h"
> >>>>> +
> >>>>> +class BufferWriter : public FrameSink
> >>>>>  {
> >>>>>  public:
> >>>>>  	BufferWriter(const std::string &pattern = "frame-#.bin");
> >>>>>  	~BufferWriter();
> >>>>>  
> >>>>> -	void mapBuffer(libcamera::FrameBuffer *buffer);
> >>>>> +	int configure(const libcamera::CameraConfiguration &config) override;
> >>>>>  
> >>>>> -	int write(libcamera::FrameBuffer *buffer,
> >>>>> -		  const std::string &streamName);
> >>>>> +	void mapBuffer(libcamera::FrameBuffer *buffer) override;
> >>>>> +
> >>>>> +	bool consumeBuffer(const libcamera::Stream *stream,
> >>>>> +			   libcamera::FrameBuffer *buffer) override;
> >>>>>  
> >>>>>  private:
> >>>>> +	std::map<const libcamera::Stream *, std::string> streamNames_;
> >>>>>  	std::string pattern_;
> >>>>>  	std::map<int, std::pair<void *, unsigned int>> mappedBuffers_;
> >>>>>  };
> >>>>> diff --git a/src/cam/capture.cpp b/src/cam/capture.cpp
> >>>>> index b7e06bcc9463..7fc9cba48892 100644
> >>>>> --- a/src/cam/capture.cpp
> >>>>> +++ b/src/cam/capture.cpp
> >>>>> @@ -11,6 +11,7 @@
> >>>>>  #include <limits.h>
> >>>>>  #include <sstream>
> >>>>>  
> >>>>> +#include "buffer_writer.h"
> >>>>>  #include "capture.h"
> >>>>>  #include "main.h"
> >>>>>  
> >>>>> @@ -18,7 +19,7 @@ using namespace libcamera;
> >>>>>  
> >>>>>  Capture::Capture(std::shared_ptr<Camera> camera, CameraConfiguration *config,
> >>>>>  		 const StreamRoles &roles)
> >>>>> -	: camera_(camera), config_(config), roles_(roles), writer_(nullptr)
> >>>>> +	: camera_(camera), config_(config), roles_(roles), sink_(nullptr)
> >>>>>  {
> >>>>>  }
> >>>>>  
> >>>>> @@ -47,20 +48,28 @@ int Capture::run(EventLoop *loop, const OptionsParser::Options &options)
> >>>>>  
> >>>>>  	if (options.isSet(OptFile)) {
> >>>>>  		if (!options[OptFile].toString().empty())
> >>>>> -			writer_ = new BufferWriter(options[OptFile]);
> >>>>> +			sink_ = new BufferWriter(options[OptFile]);
> >>>>>  		else
> >>>>> -			writer_ = new BufferWriter();
> >>>>> +			sink_ = new BufferWriter();
> >>>>>  	}
> >>>>>  
> >>>>> +	if (sink_) {
> >>>>> +		ret = sink_->configure(*config_);
> >>>>> +		if (ret < 0) {
> >>>>> +			std::cout << "Failed to configure frame sink"
> >>>>> +				  << std::endl;
> >>>>> +			return ret;
> >>>>> +		}
> >>>>> +
> >>>>> +		sink_->bufferReleased.connect(this, &Capture::sinkRelease);
> >>>>> +	}
> >>>>>  
> >>>>>  	FrameBufferAllocator *allocator = new FrameBufferAllocator(camera_);
> >>>>>  
> >>>>>  	ret = capture(loop, allocator);
> >>>>>  
> >>>>> -	if (options.isSet(OptFile)) {
> >>>>> -		delete writer_;
> >>>>> -		writer_ = nullptr;
> >>>>> -	}
> >>>>> +	delete sink_;
> >>>>> +	sink_ = nullptr;
> >>>>>  
> >>>>>  	delete allocator;
> >>>>>  
> >>>>> @@ -110,16 +119,26 @@ int Capture::capture(EventLoop *loop, FrameBufferAllocator *allocator)
> >>>>>  				return ret;
> >>>>>  			}
> >>>>>  
> >>>>> -			if (writer_)
> >>>>> -				writer_->mapBuffer(buffer.get());
> >>>>> +			if (sink_)
> >>>>> +				sink_->mapBuffer(buffer.get());
> >>>>>  		}
> >>>>>  
> >>>>>  		requests.push_back(request);
> >>>>>  	}
> >>>>>  
> >>>>> +	if (sink_) {
> >>>>> +		ret = sink_->start();
> >>>>> +		if (ret) {
> >>>>> +			std::cout << "Failed to start frame sink" << std::endl;
> >>>>> +			return ret;
> >>>>> +		}
> >>>>> +	}
> >>>>> +
> >>>>>  	ret = camera_->start();
> >>>>>  	if (ret) {
> >>>>>  		std::cout << "Failed to start capture" << std::endl;
> >>>>> +		if (sink_)
> >>>>> +			sink_->stop();
> >>>>>  		return ret;
> >>>>>  	}
> >>>>>  
> >>>>> @@ -128,6 +147,8 @@ int Capture::capture(EventLoop *loop, FrameBufferAllocator *allocator)
> >>>>>  		if (ret < 0) {
> >>>>>  			std::cerr << "Can't queue request" << std::endl;
> >>>>>  			camera_->stop();
> >>>>> +			if (sink_)
> >>>>> +				sink_->stop();
> >>>>>  			return ret;
> >>>>>  		}
> >>>>>  	}
> >>>>> @@ -141,6 +162,12 @@ int Capture::capture(EventLoop *loop, FrameBufferAllocator *allocator)
> >>>>>  	if (ret)
> >>>>>  		std::cout << "Failed to stop capture" << std::endl;
> >>>>>  
> >>>>> +	if (sink_) {
> >>>>> +		ret = sink_->stop();
> >>>>> +		if (ret)
> >>>>> +			std::cout << "Failed to stop frame sink" << std::endl;
> >>>>> +	}
> >>>>> +
> >>>>>  	return ret;
> >>>>>  }
> >>>>>  
> >>>>> @@ -157,17 +184,18 @@ void Capture::requestComplete(Request *request)
> >>>>>  	    ? 1000.0 / fps : 0.0;
> >>>>>  	last_ = now;
> >>>>>  
> >>>>> +	bool requeue = true;
> >>>>> +
> >>>>>  	std::stringstream info;
> >>>>>  	info << "fps: " << std::fixed << std::setprecision(2) << fps;
> >>>>>  
> >>>>>  	for (auto it = buffers.begin(); it != buffers.end(); ++it) {
> >>>>>  		Stream *stream = it->first;
> >>>>>  		FrameBuffer *buffer = it->second;
> >>>>> -		const std::string &name = streamName_[stream];
> >>>>>  
> >>>>>  		const FrameMetadata &metadata = buffer->metadata();
> >>>>>  
> >>>>> -		info << " " << name
> >>>>> +		info << " " << streamName_[stream]
> >>>>>  		     << " seq: " << std::setw(6) << std::setfill('0') << metadata.sequence
> >>>>>  		     << " bytesused: ";
> >>>>>  
> >>>>> @@ -178,12 +206,21 @@ void Capture::requestComplete(Request *request)
> >>>>>  				info << "/";
> >>>>>  		}
> >>>>>  
> >>>>> -		if (writer_)
> >>>>> -			writer_->write(buffer, name);
> >>>>> +		if (sink_) {
> >>>>> +			if (!sink_->consumeBuffer(stream, buffer))
> >>>>> +				requeue = false;
> >>>> 
> >>>> This will only work for requests that contains one buffer. In this 
> >>>> change it will works as BufferWriter::consumeBuffer() returns true and 
> >>>> the while request is requeued as the buffer writer does not need to hold 
> >>>> onto the buffers. I know that the new sink added later in this sereis 
> >>>> only supports one stream so it returning false and holding onto the 
> >>>> buffer will also work with this logic.
> >>> 
> >>> Damn, you noticed ;-)
> >> 
> >> Sorry, I will pay less attention in the future ;-P
> >> 
> >>>> Would it instead make sens to rework this so that the sink always would 
> >>>> need to signal that it's done with the buffer(s) instead of requing the 
> >>>> request from the requestComplete() callback? That way the behavior the 
> >>>> sinks would be the same disregarding for how long it needs to hold on to 
> >>>> the buffer.
> >>> 
> >>> I think this mechanism is indeed a bit fragile, and needs to be
> >>> reworked. It works in the two cases we support (file sink and kms sink),
> >>> but isn't future-proof. That being said, fixing it properly isn't
> >>> trivial. We can only requeue the request when all buffers have been
> >>> processed (or, rather, maybe we'll need to requeue a request with other
> >>> buffers instead). We can't unconditionally queue a new request in the
> >>> bufferReleased callback of the sink, as that would only work for a
> >>> single stream.
> >>> 
> >>> In addition to that, I think the cam application needs to be reworked to
> >>> move processing of buffers to its main thread. The file sink, in
> >>> particular, is too expensive to run in the request completion callback,
> >>> which runs in the camera manager thread. I think it could make sense to
> >>> address the two issues together. Do you think it needs to be done as
> >>> part of this series ? Or could I just add a \todo to explain the issue
> >>> and make sure we don't forget about it ?
> >> 
> >> I don't think this needs to be solved the perfect way now, but I do 
> >> think we need something where adding a 3rd sink that supports more then 
> >> one stream and needs to hold onto buffers do not break, but they might 
> >> not need to function in the most efficient way.
> >> 
> >> As we already discussed that for now cam would only support a single 
> >> sink would it make sens to rework the interface to deal with a Request 
> >> instead of a FrameBuffer? That way the logic you have here could still 
> >> be used while allowing new sinks to be added which would be less 
> >> fragile.
> > 
> > Just to make it clear, how much rework would you do as part of this
> > series, and how much should be delayed ?
> 
> For me the least amount needed to allow for a sink that supports more 
> then one stream but needs to hold onto the buffers/request. I would be 
> OK to pass the whole request to the sink instead of individual buffers. 
> Or if you can think of something else more clever.
> 
> What I don't like is that the code is fragile and without looking at 
> this change in isolation it's not clear what requirements are put on a 
> sink so adding a new one could explode and take time to figure why it 
> did so. On the other hand I really like the sink concept you add here, 
> maybe it can be shared with qcam sometime far into the future. :-)

I agree with you, moving the sink API from buffer to request is a good
idea. There's one issue though, the Request objects are transient, and
while they could be passed synchronously to the sink, the sink can't
hold to them until it completes processing, as they are deleted
automatically by the libcamera core in Camera::requestComplete().

I could work around that by only holding on the streams to buffers map
and passing that from the sink to the Capture class when processing
finishes. The alternative is to already turn the Request class into a
non-transient object, which we are planning to do anyway. I was hoping
DRM/KMS support could be merged in cam without that, but maybe I should
bite the bullet and rework the Request object first ?

> >> That being said, I do agree with you that deepening on what end gaols we 
> >> have for cam we will likely need to rework how requests are constructed.  
> >> We still have the whole concept of request reuse and request templates 
> >> to bash out :-)
> >> 
> >> I'm however not convinced I like the idea of making cam an interactive 
> >> console tool. Things like push spacebar to save frame to disk while 
> >> viewing the viewfinder on the framebuffer I think maybe is another 
> >> application, but I might be wrong.
> > 
> > You're probably right there, it's not a direction I intend to push for.
> > 
> >>>>> +		}
> >>>>>  	}
> >>>>>  
> >>>>>  	std::cout << info.str() << std::endl;
> >>>>>  
> >>>>> +	/*
> >>>>> +	 * If the frame sink holds on the buffer, we'll requeue it later in the
> >>>>> +	 * complete handler.
> >>>>> +	 */
> >>>>> +	if (!requeue)
> >>>>> +		return;
> >>>>> +
> >>>>>  	/*
> >>>>>  	 * Create a new request and populate it with one buffer for each
> >>>>>  	 * stream.
> >>>>> @@ -203,3 +240,16 @@ void Capture::requestComplete(Request *request)
> >>>>>  
> >>>>>  	camera_->queueRequest(request);
> >>>>>  }
> >>>>> +
> >>>>> +void Capture::sinkRelease(libcamera::FrameBuffer *buffer)
> >>>>> +{
> >>>>> +	Request *request = camera_->createRequest();
> >>>>> +	if (!request) {
> >>>>> +		std::cerr << "Can't create request" << std::endl;
> >>>>> +		return;
> >>>>> +	}
> >>>>> +
> >>>>> +	request->addBuffer(config_->at(0).stream(), buffer);
> >>>>> +
> >>>>> +	camera_->queueRequest(request);
> >>>>> +}
> >>>>> diff --git a/src/cam/capture.h b/src/cam/capture.h
> >>>>> index c0e697b831fb..3be1d98e4d3e 100644
> >>>>> --- a/src/cam/capture.h
> >>>>> +++ b/src/cam/capture.h
> >>>>> @@ -16,10 +16,11 @@
> >>>>>  #include <libcamera/request.h>
> >>>>>  #include <libcamera/stream.h>
> >>>>>  
> >>>>> -#include "buffer_writer.h"
> >>>>>  #include "event_loop.h"
> >>>>>  #include "options.h"
> >>>>>  
> >>>>> +class FrameSink;
> >>>>> +
> >>>>>  class Capture
> >>>>>  {
> >>>>>  public:
> >>>>> @@ -33,13 +34,14 @@ private:
> >>>>>  		    libcamera::FrameBufferAllocator *allocator);
> >>>>>  
> >>>>>  	void requestComplete(libcamera::Request *request);
> >>>>> +	void sinkRelease(libcamera::FrameBuffer *buffer);
> >>>>>  
> >>>>>  	std::shared_ptr<libcamera::Camera> camera_;
> >>>>>  	libcamera::CameraConfiguration *config_;
> >>>>>  	libcamera::StreamRoles roles_;
> >>>>>  
> >>>>>  	std::map<libcamera::Stream *, std::string> streamName_;
> >>>>> -	BufferWriter *writer_;
> >>>>> +	FrameSink *sink_;
> >>>>>  	std::chrono::steady_clock::time_point last_;
> >>>>>  };
> >>>>>

Patch

diff --git a/src/cam/buffer_writer.cpp b/src/cam/buffer_writer.cpp
index c5a5eb46224a..2bec4b132155 100644
--- a/src/cam/buffer_writer.cpp
+++ b/src/cam/buffer_writer.cpp
@@ -13,6 +13,8 @@ 
 #include <sys/mman.h>
 #include <unistd.h>
 
+#include <libcamera/camera.h>
+
 #include "buffer_writer.h"
 
 using namespace libcamera;
@@ -32,6 +34,21 @@  BufferWriter::~BufferWriter()
 	mappedBuffers_.clear();
 }
 
+int BufferWriter::configure(const libcamera::CameraConfiguration &config)
+{
+	int ret = FrameSink::configure(config);
+	if (ret < 0)
+		return ret;
+
+	streamNames_.clear();
+	for (unsigned int index = 0; index < config.size(); ++index) {
+		const StreamConfiguration &cfg = config.at(index);
+		streamNames_[cfg.stream()] = "stream" + std::to_string(index);
+	}
+
+	return 0;
+}
+
 void BufferWriter::mapBuffer(FrameBuffer *buffer)
 {
 	for (const FrameBuffer::Plane &plane : buffer->planes()) {
@@ -43,7 +60,7 @@  void BufferWriter::mapBuffer(FrameBuffer *buffer)
 	}
 }
 
-int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
+bool BufferWriter::consumeBuffer(const Stream *stream, FrameBuffer *buffer)
 {
 	std::string filename;
 	size_t pos;
@@ -53,7 +70,7 @@  int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
 	pos = filename.find_first_of('#');
 	if (pos != std::string::npos) {
 		std::stringstream ss;
-		ss << streamName << "-" << std::setw(6)
+		ss << streamNames_[stream] << "-" << std::setw(6)
 		   << std::setfill('0') << buffer->metadata().sequence;
 		filename.replace(pos, 1, ss.str());
 	}
@@ -62,7 +79,7 @@  int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
 		  (pos == std::string::npos ? O_APPEND : O_TRUNC),
 		  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
 	if (fd == -1)
-		return -errno;
+		return true;
 
 	for (const FrameBuffer::Plane &plane : buffer->planes()) {
 		void *data = mappedBuffers_[plane.fd.fd()].first;
@@ -84,5 +101,5 @@  int BufferWriter::write(FrameBuffer *buffer, const std::string &streamName)
 
 	close(fd);
 
-	return ret;
+	return true;
 }
diff --git a/src/cam/buffer_writer.h b/src/cam/buffer_writer.h
index 47e26103e13e..5a5b176f73d8 100644
--- a/src/cam/buffer_writer.h
+++ b/src/cam/buffer_writer.h
@@ -12,18 +12,23 @@ 
 
 #include <libcamera/buffer.h>
 
-class BufferWriter
+#include "frame_sink.h"
+
+class BufferWriter : public FrameSink
 {
 public:
 	BufferWriter(const std::string &pattern = "frame-#.bin");
 	~BufferWriter();
 
-	void mapBuffer(libcamera::FrameBuffer *buffer);
+	int configure(const libcamera::CameraConfiguration &config) override;
 
-	int write(libcamera::FrameBuffer *buffer,
-		  const std::string &streamName);
+	void mapBuffer(libcamera::FrameBuffer *buffer) override;
+
+	bool consumeBuffer(const libcamera::Stream *stream,
+			   libcamera::FrameBuffer *buffer) override;
 
 private:
+	std::map<const libcamera::Stream *, std::string> streamNames_;
 	std::string pattern_;
 	std::map<int, std::pair<void *, unsigned int>> mappedBuffers_;
 };
diff --git a/src/cam/capture.cpp b/src/cam/capture.cpp
index b7e06bcc9463..7fc9cba48892 100644
--- a/src/cam/capture.cpp
+++ b/src/cam/capture.cpp
@@ -11,6 +11,7 @@ 
 #include <limits.h>
 #include <sstream>
 
+#include "buffer_writer.h"
 #include "capture.h"
 #include "main.h"
 
@@ -18,7 +19,7 @@  using namespace libcamera;
 
 Capture::Capture(std::shared_ptr<Camera> camera, CameraConfiguration *config,
 		 const StreamRoles &roles)
-	: camera_(camera), config_(config), roles_(roles), writer_(nullptr)
+	: camera_(camera), config_(config), roles_(roles), sink_(nullptr)
 {
 }
 
@@ -47,20 +48,28 @@  int Capture::run(EventLoop *loop, const OptionsParser::Options &options)
 
 	if (options.isSet(OptFile)) {
 		if (!options[OptFile].toString().empty())
-			writer_ = new BufferWriter(options[OptFile]);
+			sink_ = new BufferWriter(options[OptFile]);
 		else
-			writer_ = new BufferWriter();
+			sink_ = new BufferWriter();
 	}
 
+	if (sink_) {
+		ret = sink_->configure(*config_);
+		if (ret < 0) {
+			std::cout << "Failed to configure frame sink"
+				  << std::endl;
+			return ret;
+		}
+
+		sink_->bufferReleased.connect(this, &Capture::sinkRelease);
+	}
 
 	FrameBufferAllocator *allocator = new FrameBufferAllocator(camera_);
 
 	ret = capture(loop, allocator);
 
-	if (options.isSet(OptFile)) {
-		delete writer_;
-		writer_ = nullptr;
-	}
+	delete sink_;
+	sink_ = nullptr;
 
 	delete allocator;
 
@@ -110,16 +119,26 @@  int Capture::capture(EventLoop *loop, FrameBufferAllocator *allocator)
 				return ret;
 			}
 
-			if (writer_)
-				writer_->mapBuffer(buffer.get());
+			if (sink_)
+				sink_->mapBuffer(buffer.get());
 		}
 
 		requests.push_back(request);
 	}
 
+	if (sink_) {
+		ret = sink_->start();
+		if (ret) {
+			std::cout << "Failed to start frame sink" << std::endl;
+			return ret;
+		}
+	}
+
 	ret = camera_->start();
 	if (ret) {
 		std::cout << "Failed to start capture" << std::endl;
+		if (sink_)
+			sink_->stop();
 		return ret;
 	}
 
@@ -128,6 +147,8 @@  int Capture::capture(EventLoop *loop, FrameBufferAllocator *allocator)
 		if (ret < 0) {
 			std::cerr << "Can't queue request" << std::endl;
 			camera_->stop();
+			if (sink_)
+				sink_->stop();
 			return ret;
 		}
 	}
@@ -141,6 +162,12 @@  int Capture::capture(EventLoop *loop, FrameBufferAllocator *allocator)
 	if (ret)
 		std::cout << "Failed to stop capture" << std::endl;
 
+	if (sink_) {
+		ret = sink_->stop();
+		if (ret)
+			std::cout << "Failed to stop frame sink" << std::endl;
+	}
+
 	return ret;
 }
 
@@ -157,17 +184,18 @@  void Capture::requestComplete(Request *request)
 	    ? 1000.0 / fps : 0.0;
 	last_ = now;
 
+	bool requeue = true;
+
 	std::stringstream info;
 	info << "fps: " << std::fixed << std::setprecision(2) << fps;
 
 	for (auto it = buffers.begin(); it != buffers.end(); ++it) {
 		Stream *stream = it->first;
 		FrameBuffer *buffer = it->second;
-		const std::string &name = streamName_[stream];
 
 		const FrameMetadata &metadata = buffer->metadata();
 
-		info << " " << name
+		info << " " << streamName_[stream]
 		     << " seq: " << std::setw(6) << std::setfill('0') << metadata.sequence
 		     << " bytesused: ";
 
@@ -178,12 +206,21 @@  void Capture::requestComplete(Request *request)
 				info << "/";
 		}
 
-		if (writer_)
-			writer_->write(buffer, name);
+		if (sink_) {
+			if (!sink_->consumeBuffer(stream, buffer))
+				requeue = false;
+		}
 	}
 
 	std::cout << info.str() << std::endl;
 
+	/*
+	 * If the frame sink holds on the buffer, we'll requeue it later in the
+	 * complete handler.
+	 */
+	if (!requeue)
+		return;
+
 	/*
 	 * Create a new request and populate it with one buffer for each
 	 * stream.
@@ -203,3 +240,16 @@  void Capture::requestComplete(Request *request)
 
 	camera_->queueRequest(request);
 }
+
+void Capture::sinkRelease(libcamera::FrameBuffer *buffer)
+{
+	Request *request = camera_->createRequest();
+	if (!request) {
+		std::cerr << "Can't create request" << std::endl;
+		return;
+	}
+
+	request->addBuffer(config_->at(0).stream(), buffer);
+
+	camera_->queueRequest(request);
+}
diff --git a/src/cam/capture.h b/src/cam/capture.h
index c0e697b831fb..3be1d98e4d3e 100644
--- a/src/cam/capture.h
+++ b/src/cam/capture.h
@@ -16,10 +16,11 @@ 
 #include <libcamera/request.h>
 #include <libcamera/stream.h>
 
-#include "buffer_writer.h"
 #include "event_loop.h"
 #include "options.h"
 
+class FrameSink;
+
 class Capture
 {
 public:
@@ -33,13 +34,14 @@  private:
 		    libcamera::FrameBufferAllocator *allocator);
 
 	void requestComplete(libcamera::Request *request);
+	void sinkRelease(libcamera::FrameBuffer *buffer);
 
 	std::shared_ptr<libcamera::Camera> camera_;
 	libcamera::CameraConfiguration *config_;
 	libcamera::StreamRoles roles_;
 
 	std::map<libcamera::Stream *, std::string> streamName_;
-	BufferWriter *writer_;
+	FrameSink *sink_;
 	std::chrono::steady_clock::time_point last_;
 };