Message ID | 20200306202637.525587-23-nicolas@ndufresne.ca |
---|---|
State | Accepted |
Headers | show |
Series |
|
Related | show |
Hi Nicolas, Thank you for the patch. On Fri, Mar 06, 2020 at 03:26:32PM -0500, Nicolas Dufresne wrote: > From: Nicolas Dufresne <nicolas.dufresne@collabora.com> > > With this patch, the element is now able to push buffers to the next > element in the graph. The buffers are currently missing any metadata > like timestamp, sequence number. This will be added in the next commit. > > Signed-off-by: Nicolas Dufresne <nicolas.dufresne@collabora.com> Reviewed-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com> > --- > src/gstreamer/gstlibcamerapad.cpp | 9 ++ > src/gstreamer/gstlibcamerapad.h | 2 + > src/gstreamer/gstlibcamerasrc.cpp | 192 +++++++++++++++++++++++++++++- > 3 files changed, 202 insertions(+), 1 deletion(-) > > diff --git a/src/gstreamer/gstlibcamerapad.cpp b/src/gstreamer/gstlibcamerapad.cpp > index 49dd35b..840f391 100644 > --- a/src/gstreamer/gstlibcamerapad.cpp > +++ b/src/gstreamer/gstlibcamerapad.cpp > @@ -19,6 +19,7 @@ struct _GstLibcameraPad { > StreamRole role; > GstLibcameraPool *pool; > GQueue pending_buffers; > + GstClockTime latency; > }; > > enum { > @@ -164,3 +165,11 @@ gst_libcamera_pad_push_pending(GstPad *pad) > > return gst_pad_push(pad, buffer); > } > + > +bool > +gst_libcamera_pad_has_pending(GstPad *pad) > +{ > + auto *self = GST_LIBCAMERA_PAD(pad); > + GLibLocker lock(GST_OBJECT(self)); > + return self->pending_buffers.length > 0; > +} > diff --git a/src/gstreamer/gstlibcamerapad.h b/src/gstreamer/gstlibcamerapad.h > index 2e9ec20..9d43129 100644 > --- a/src/gstreamer/gstlibcamerapad.h > +++ b/src/gstreamer/gstlibcamerapad.h > @@ -30,4 +30,6 @@ void gst_libcamera_pad_queue_buffer(GstPad *pad, GstBuffer *buffer); > > GstFlowReturn gst_libcamera_pad_push_pending(GstPad *pad); > > +bool gst_libcamera_pad_has_pending(GstPad *pad); > + > #endif /* __GST_LIBCAMERA_PAD_H__ */ > diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp > index 5ffc004..e3718db 100644 > --- a/src/gstreamer/gstlibcamerasrc.cpp > +++ b/src/gstreamer/gstlibcamerasrc.cpp > @@ -14,8 +14,11 @@ > > #include "gstlibcamerasrc.h" > > +#include <queue> > #include <vector> > > +#include <gst/base/base.h> > + > #include <libcamera/camera.h> > #include <libcamera/camera_manager.h> > > @@ -29,12 +32,71 @@ using namespace libcamera; > GST_DEBUG_CATEGORY_STATIC(source_debug); > #define GST_CAT_DEFAULT source_debug > > +struct RequestWrap { > + RequestWrap(Request *request); > + ~RequestWrap(); > + > + void attachBuffer(GstBuffer *buffer); > + GstBuffer *detachBuffer(Stream *stream); > + > + /* For ptr comparison only. */ > + Request *request_; > + std::map<Stream *, GstBuffer *> buffers_; > +}; > + > +RequestWrap::RequestWrap(Request *request) > + : request_(request) > +{ > +} > + > +RequestWrap::~RequestWrap() > +{ > + for (std::pair<Stream *const, GstBuffer *> &item : buffers_) { > + if (item.second) > + gst_buffer_unref(item.second); > + } > +} > + > +void RequestWrap::attachBuffer(GstBuffer *buffer) > +{ > + FrameBuffer *fb = gst_libcamera_buffer_get_frame_buffer(buffer); > + Stream *stream = gst_libcamera_buffer_get_stream(buffer); > + > + request_->addBuffer(stream, fb); > + > + auto item = buffers_.find(stream); > + if (item != buffers_.end()) { > + gst_buffer_unref(item->second); > + item->second = buffer; > + } else { > + buffers_[stream] = buffer; > + } > +} > + > +GstBuffer *RequestWrap::detachBuffer(Stream *stream) > +{ > + GstBuffer *buffer = nullptr; > + > + auto item = buffers_.find(stream); > + if (item != buffers_.end()) { > + buffer = item->second; > + item->second = nullptr; > + } > + > + return buffer; > +} > + > /* Used for C++ object with destructors. */ > struct GstLibcameraSrcState { > + GstLibcameraSrc *src_; > + > std::unique_ptr<CameraManager> cm_; > std::shared_ptr<Camera> cam_; > std::unique_ptr<CameraConfiguration> config_; > std::vector<GstPad *> srcpads_; > + std::queue<std::unique_ptr<RequestWrap>> requests_; > + > + void requestCompleted(Request *request); > }; > > struct _GstLibcameraSrc { > @@ -47,6 +109,7 @@ struct _GstLibcameraSrc { > > GstLibcameraSrcState *state; > GstLibcameraAllocator *allocator; > + GstFlowCombiner *flow_combiner; > }; > > enum { > @@ -70,6 +133,41 @@ GstStaticPadTemplate request_src_template = { > "src_%s", GST_PAD_SRC, GST_PAD_REQUEST, TEMPLATE_CAPS > }; > > +void > +GstLibcameraSrcState::requestCompleted(Request *request) > +{ > + GLibLocker lock(GST_OBJECT(src_)); > + > + GST_DEBUG_OBJECT(src_, "buffers are ready"); > + > + std::unique_ptr<RequestWrap> wrap = std::move(requests_.front()); > + requests_.pop(); > + > + g_return_if_fail(wrap->request_ == request); > + > + if ((request->status() == Request::RequestCancelled)) { > + GST_DEBUG_OBJECT(src_, "Request was cancelled"); > + return; > + } > + > + GstBuffer *buffer; > + for (GstPad *srcpad : srcpads_) { > + Stream *stream = gst_libcamera_pad_get_stream(srcpad); > + buffer = wrap->detachBuffer(stream); > + gst_libcamera_pad_queue_buffer(srcpad, buffer); > + } > + > + { > + /* We only want to resume the task if it's paused. */ > + GstTask *task = src_->task; > + GLibLocker lock(GST_OBJECT(task)); > + if (GST_TASK_STATE(task) == GST_TASK_PAUSED) { > + GST_TASK_STATE(task) = GST_TASK_STARTED; > + GST_TASK_SIGNAL(task); > + } > + } > +} > + > static bool > gst_libcamera_src_open(GstLibcameraSrc *self) > { > @@ -122,6 +220,8 @@ gst_libcamera_src_open(GstLibcameraSrc *self) > return false; > } > > + cam->requestCompleted.connect(self->state, &GstLibcameraSrcState::requestCompleted); > + > /* No need to lock here, we didn't start our threads yet. */ > self->state->cm_ = std::move(cm); > self->state->cam_ = cam; > @@ -133,8 +233,77 @@ static void > gst_libcamera_src_task_run(gpointer user_data) > { > GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data); > + GstLibcameraSrcState *state = self->state; > + > + Request *request = state->cam_->createRequest(); > + auto wrap = std::make_unique<RequestWrap>(request); > + for (GstPad *srcpad : state->srcpads_) { > + GstLibcameraPool *pool = gst_libcamera_pad_get_pool(srcpad); > + GstBuffer *buffer; > + GstFlowReturn ret; > + > + ret = gst_buffer_pool_acquire_buffer(GST_BUFFER_POOL(pool), > + &buffer, nullptr); > + if (ret != GST_FLOW_OK) { > + /* > + * RequestWrap does not take ownership, and we won't be > + * queueing this one due to lack of buffers. > + */ > + delete request; > + request = nullptr; > + break; > + } > + > + wrap->attachBuffer(buffer); > + } > + > + if (request) { > + GLibLocker lock(GST_OBJECT(self)); > + GST_TRACE_OBJECT(self, "Requesting buffers"); > + state->cam_->queueRequest(request); > + state->requests_.push(std::move(wrap)); > + } > + > + GstFlowReturn ret = GST_FLOW_OK; > + gst_flow_combiner_reset(self->flow_combiner); > + for (GstPad *srcpad : state->srcpads_) { > + ret = gst_libcamera_pad_push_pending(srcpad); > + ret = gst_flow_combiner_update_pad_flow(self->flow_combiner, > + srcpad, ret); > + } > > - GST_DEBUG_OBJECT(self, "Streaming thread is now capturing"); > + { > + /* > + * Here we need to decide if we want to pause or stop the task. This > + * needs to happen in lock step with the callback thread which may want > + * to resume the task. > + */ > + GLibLocker lock(GST_OBJECT(self)); > + if (ret != GST_FLOW_OK) { > + if (ret == GST_FLOW_EOS) { > + g_autoptr(GstEvent) eos = gst_event_new_eos(); > + guint32 seqnum = gst_util_seqnum_next(); > + gst_event_set_seqnum(eos, seqnum); > + for (GstPad *srcpad : state->srcpads_) > + gst_pad_push_event(srcpad, gst_event_ref(eos)); > + } else if (ret != GST_FLOW_FLUSHING) { > + GST_ELEMENT_FLOW_ERROR(self, ret); > + } > + gst_task_stop(self->task); > + return; > + } > + > + bool do_pause = true; > + for (GstPad *srcpad : state->srcpads_) { > + if (gst_libcamera_pad_has_pending(srcpad)) { > + do_pause = false; > + break; > + } > + } > + > + if (do_pause) > + gst_task_pause(self->task); > + } > } > > static void > @@ -233,12 +402,23 @@ gst_libcamera_src_task_enter(GstTask *task, GThread *thread, gpointer user_data) > return; > } > > + self->flow_combiner = gst_flow_combiner_new(); > for (gsize i = 0; i < state->srcpads_.size(); i++) { > GstPad *srcpad = state->srcpads_[i]; > const StreamConfiguration &stream_cfg = state->config_->at(i); > GstLibcameraPool *pool = gst_libcamera_pool_new(self->allocator, > stream_cfg.stream()); > gst_libcamera_pad_set_pool(srcpad, pool); > + gst_flow_combiner_add_pad(self->flow_combiner, srcpad); > + } > + > + ret = state->cam_->start(); > + if (ret) { > + GST_ELEMENT_ERROR(self, RESOURCE, SETTINGS, > + ("Failed to start the camera: %s", g_strerror(-ret)), > + ("Camera.start() failed with error code %i", ret)); > + gst_task_stop(task); > + return; > } > > done: > @@ -260,10 +440,14 @@ gst_libcamera_src_task_leave(GstTask *task, GThread *thread, gpointer user_data) > > GST_DEBUG_OBJECT(self, "Streaming thread is about to stop"); > > + state->cam_->stop(); > + > for (GstPad *srcpad : state->srcpads_) > gst_libcamera_pad_set_pool(srcpad, NULL); > > g_clear_object(&self->allocator); > + g_clear_pointer(&self->flow_combiner, > + (GDestroyNotify)gst_flow_combiner_free); > } > > static void > @@ -343,6 +527,9 @@ gst_libcamera_src_change_state(GstElement *element, GstStateChange transition) > return GST_STATE_CHANGE_FAILURE; > ret = GST_STATE_CHANGE_NO_PREROLL; > break; > + case GST_STATE_CHANGE_PAUSED_TO_PLAYING: > + gst_task_start(self->task); > + break; > case GST_STATE_CHANGE_PLAYING_TO_PAUSED: > ret = GST_STATE_CHANGE_NO_PREROLL; > break; > @@ -394,6 +581,9 @@ gst_libcamera_src_init(GstLibcameraSrc *self) > > state->srcpads_.push_back(gst_pad_new_from_template(templ, "src")); > gst_element_add_pad(GST_ELEMENT(self), state->srcpads_[0]); > + > + /* C-style friend. */ > + state->src_ = self; > self->state = state; > } >
diff --git a/src/gstreamer/gstlibcamerapad.cpp b/src/gstreamer/gstlibcamerapad.cpp index 49dd35b..840f391 100644 --- a/src/gstreamer/gstlibcamerapad.cpp +++ b/src/gstreamer/gstlibcamerapad.cpp @@ -19,6 +19,7 @@ struct _GstLibcameraPad { StreamRole role; GstLibcameraPool *pool; GQueue pending_buffers; + GstClockTime latency; }; enum { @@ -164,3 +165,11 @@ gst_libcamera_pad_push_pending(GstPad *pad) return gst_pad_push(pad, buffer); } + +bool +gst_libcamera_pad_has_pending(GstPad *pad) +{ + auto *self = GST_LIBCAMERA_PAD(pad); + GLibLocker lock(GST_OBJECT(self)); + return self->pending_buffers.length > 0; +} diff --git a/src/gstreamer/gstlibcamerapad.h b/src/gstreamer/gstlibcamerapad.h index 2e9ec20..9d43129 100644 --- a/src/gstreamer/gstlibcamerapad.h +++ b/src/gstreamer/gstlibcamerapad.h @@ -30,4 +30,6 @@ void gst_libcamera_pad_queue_buffer(GstPad *pad, GstBuffer *buffer); GstFlowReturn gst_libcamera_pad_push_pending(GstPad *pad); +bool gst_libcamera_pad_has_pending(GstPad *pad); + #endif /* __GST_LIBCAMERA_PAD_H__ */ diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp index 5ffc004..e3718db 100644 --- a/src/gstreamer/gstlibcamerasrc.cpp +++ b/src/gstreamer/gstlibcamerasrc.cpp @@ -14,8 +14,11 @@ #include "gstlibcamerasrc.h" +#include <queue> #include <vector> +#include <gst/base/base.h> + #include <libcamera/camera.h> #include <libcamera/camera_manager.h> @@ -29,12 +32,71 @@ using namespace libcamera; GST_DEBUG_CATEGORY_STATIC(source_debug); #define GST_CAT_DEFAULT source_debug +struct RequestWrap { + RequestWrap(Request *request); + ~RequestWrap(); + + void attachBuffer(GstBuffer *buffer); + GstBuffer *detachBuffer(Stream *stream); + + /* For ptr comparison only. */ + Request *request_; + std::map<Stream *, GstBuffer *> buffers_; +}; + +RequestWrap::RequestWrap(Request *request) + : request_(request) +{ +} + +RequestWrap::~RequestWrap() +{ + for (std::pair<Stream *const, GstBuffer *> &item : buffers_) { + if (item.second) + gst_buffer_unref(item.second); + } +} + +void RequestWrap::attachBuffer(GstBuffer *buffer) +{ + FrameBuffer *fb = gst_libcamera_buffer_get_frame_buffer(buffer); + Stream *stream = gst_libcamera_buffer_get_stream(buffer); + + request_->addBuffer(stream, fb); + + auto item = buffers_.find(stream); + if (item != buffers_.end()) { + gst_buffer_unref(item->second); + item->second = buffer; + } else { + buffers_[stream] = buffer; + } +} + +GstBuffer *RequestWrap::detachBuffer(Stream *stream) +{ + GstBuffer *buffer = nullptr; + + auto item = buffers_.find(stream); + if (item != buffers_.end()) { + buffer = item->second; + item->second = nullptr; + } + + return buffer; +} + /* Used for C++ object with destructors. */ struct GstLibcameraSrcState { + GstLibcameraSrc *src_; + std::unique_ptr<CameraManager> cm_; std::shared_ptr<Camera> cam_; std::unique_ptr<CameraConfiguration> config_; std::vector<GstPad *> srcpads_; + std::queue<std::unique_ptr<RequestWrap>> requests_; + + void requestCompleted(Request *request); }; struct _GstLibcameraSrc { @@ -47,6 +109,7 @@ struct _GstLibcameraSrc { GstLibcameraSrcState *state; GstLibcameraAllocator *allocator; + GstFlowCombiner *flow_combiner; }; enum { @@ -70,6 +133,41 @@ GstStaticPadTemplate request_src_template = { "src_%s", GST_PAD_SRC, GST_PAD_REQUEST, TEMPLATE_CAPS }; +void +GstLibcameraSrcState::requestCompleted(Request *request) +{ + GLibLocker lock(GST_OBJECT(src_)); + + GST_DEBUG_OBJECT(src_, "buffers are ready"); + + std::unique_ptr<RequestWrap> wrap = std::move(requests_.front()); + requests_.pop(); + + g_return_if_fail(wrap->request_ == request); + + if ((request->status() == Request::RequestCancelled)) { + GST_DEBUG_OBJECT(src_, "Request was cancelled"); + return; + } + + GstBuffer *buffer; + for (GstPad *srcpad : srcpads_) { + Stream *stream = gst_libcamera_pad_get_stream(srcpad); + buffer = wrap->detachBuffer(stream); + gst_libcamera_pad_queue_buffer(srcpad, buffer); + } + + { + /* We only want to resume the task if it's paused. */ + GstTask *task = src_->task; + GLibLocker lock(GST_OBJECT(task)); + if (GST_TASK_STATE(task) == GST_TASK_PAUSED) { + GST_TASK_STATE(task) = GST_TASK_STARTED; + GST_TASK_SIGNAL(task); + } + } +} + static bool gst_libcamera_src_open(GstLibcameraSrc *self) { @@ -122,6 +220,8 @@ gst_libcamera_src_open(GstLibcameraSrc *self) return false; } + cam->requestCompleted.connect(self->state, &GstLibcameraSrcState::requestCompleted); + /* No need to lock here, we didn't start our threads yet. */ self->state->cm_ = std::move(cm); self->state->cam_ = cam; @@ -133,8 +233,77 @@ static void gst_libcamera_src_task_run(gpointer user_data) { GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data); + GstLibcameraSrcState *state = self->state; + + Request *request = state->cam_->createRequest(); + auto wrap = std::make_unique<RequestWrap>(request); + for (GstPad *srcpad : state->srcpads_) { + GstLibcameraPool *pool = gst_libcamera_pad_get_pool(srcpad); + GstBuffer *buffer; + GstFlowReturn ret; + + ret = gst_buffer_pool_acquire_buffer(GST_BUFFER_POOL(pool), + &buffer, nullptr); + if (ret != GST_FLOW_OK) { + /* + * RequestWrap does not take ownership, and we won't be + * queueing this one due to lack of buffers. + */ + delete request; + request = nullptr; + break; + } + + wrap->attachBuffer(buffer); + } + + if (request) { + GLibLocker lock(GST_OBJECT(self)); + GST_TRACE_OBJECT(self, "Requesting buffers"); + state->cam_->queueRequest(request); + state->requests_.push(std::move(wrap)); + } + + GstFlowReturn ret = GST_FLOW_OK; + gst_flow_combiner_reset(self->flow_combiner); + for (GstPad *srcpad : state->srcpads_) { + ret = gst_libcamera_pad_push_pending(srcpad); + ret = gst_flow_combiner_update_pad_flow(self->flow_combiner, + srcpad, ret); + } - GST_DEBUG_OBJECT(self, "Streaming thread is now capturing"); + { + /* + * Here we need to decide if we want to pause or stop the task. This + * needs to happen in lock step with the callback thread which may want + * to resume the task. + */ + GLibLocker lock(GST_OBJECT(self)); + if (ret != GST_FLOW_OK) { + if (ret == GST_FLOW_EOS) { + g_autoptr(GstEvent) eos = gst_event_new_eos(); + guint32 seqnum = gst_util_seqnum_next(); + gst_event_set_seqnum(eos, seqnum); + for (GstPad *srcpad : state->srcpads_) + gst_pad_push_event(srcpad, gst_event_ref(eos)); + } else if (ret != GST_FLOW_FLUSHING) { + GST_ELEMENT_FLOW_ERROR(self, ret); + } + gst_task_stop(self->task); + return; + } + + bool do_pause = true; + for (GstPad *srcpad : state->srcpads_) { + if (gst_libcamera_pad_has_pending(srcpad)) { + do_pause = false; + break; + } + } + + if (do_pause) + gst_task_pause(self->task); + } } static void @@ -233,12 +402,23 @@ gst_libcamera_src_task_enter(GstTask *task, GThread *thread, gpointer user_data) return; } + self->flow_combiner = gst_flow_combiner_new(); for (gsize i = 0; i < state->srcpads_.size(); i++) { GstPad *srcpad = state->srcpads_[i]; const StreamConfiguration &stream_cfg = state->config_->at(i); GstLibcameraPool *pool = gst_libcamera_pool_new(self->allocator, stream_cfg.stream()); gst_libcamera_pad_set_pool(srcpad, pool); + gst_flow_combiner_add_pad(self->flow_combiner, srcpad); + } + + ret = state->cam_->start(); + if (ret) { + GST_ELEMENT_ERROR(self, RESOURCE, SETTINGS, + ("Failed to start the camera: %s", g_strerror(-ret)), + ("Camera.start() failed with error code %i", ret)); + gst_task_stop(task); + return; } done: @@ -260,10 +440,14 @@ gst_libcamera_src_task_leave(GstTask *task, GThread *thread, gpointer user_data) GST_DEBUG_OBJECT(self, "Streaming thread is about to stop"); + state->cam_->stop(); + for (GstPad *srcpad : state->srcpads_) gst_libcamera_pad_set_pool(srcpad, NULL); g_clear_object(&self->allocator); + g_clear_pointer(&self->flow_combiner, + (GDestroyNotify)gst_flow_combiner_free); } static void @@ -343,6 +527,9 @@ gst_libcamera_src_change_state(GstElement *element, GstStateChange transition) return GST_STATE_CHANGE_FAILURE; ret = GST_STATE_CHANGE_NO_PREROLL; break; + case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + gst_task_start(self->task); + break; case GST_STATE_CHANGE_PLAYING_TO_PAUSED: ret = GST_STATE_CHANGE_NO_PREROLL; break; @@ -394,6 +581,9 @@ gst_libcamera_src_init(GstLibcameraSrc *self) state->srcpads_.push_back(gst_pad_new_from_template(templ, "src")); gst_element_add_pad(GST_ELEMENT(self), state->srcpads_[0]); + + /* C-style friend. */ + state->src_ = self; self->state = state; }