[libcamera-devel,v3,22/27] gst: libcamerasrc: Implement initial streaming

Message ID 20200306202637.525587-23-nicolas@ndufresne.ca
State Accepted
Headers show
Series
  • GStreamer Element for libcamera
Related show

Commit Message

Nicolas Dufresne March 6, 2020, 8:26 p.m. UTC
From: Nicolas Dufresne <nicolas.dufresne@collabora.com>

With this patch, the element is now able to push buffers to the next
element in the graph. The buffers are currently missing any metadata
like timestamp, sequence number. This will be added in the next commit.

Signed-off-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>
---
 src/gstreamer/gstlibcamerapad.cpp |   9 ++
 src/gstreamer/gstlibcamerapad.h   |   2 +
 src/gstreamer/gstlibcamerasrc.cpp | 192 +++++++++++++++++++++++++++++-
 3 files changed, 202 insertions(+), 1 deletion(-)

Comments

Laurent Pinchart March 6, 2020, 8:59 p.m. UTC | #1
Hi Nicolas,

Thank you for the patch.

On Fri, Mar 06, 2020 at 03:26:32PM -0500, Nicolas Dufresne wrote:
> From: Nicolas Dufresne <nicolas.dufresne@collabora.com>
> 
> With this patch, the element is now able to push buffers to the next
> element in the graph. The buffers are currently missing any metadata
> like timestamp, sequence number. This will be added in the next commit.
> 
> Signed-off-by: Nicolas Dufresne <nicolas.dufresne@collabora.com>

Reviewed-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>

> ---
>  src/gstreamer/gstlibcamerapad.cpp |   9 ++
>  src/gstreamer/gstlibcamerapad.h   |   2 +
>  src/gstreamer/gstlibcamerasrc.cpp | 192 +++++++++++++++++++++++++++++-
>  3 files changed, 202 insertions(+), 1 deletion(-)
> 
> diff --git a/src/gstreamer/gstlibcamerapad.cpp b/src/gstreamer/gstlibcamerapad.cpp
> index 49dd35b..840f391 100644
> --- a/src/gstreamer/gstlibcamerapad.cpp
> +++ b/src/gstreamer/gstlibcamerapad.cpp
> @@ -19,6 +19,7 @@ struct _GstLibcameraPad {
>  	StreamRole role;
>  	GstLibcameraPool *pool;
>  	GQueue pending_buffers;
> +	GstClockTime latency;
>  };
>  
>  enum {
> @@ -164,3 +165,11 @@ gst_libcamera_pad_push_pending(GstPad *pad)
>  
>  	return gst_pad_push(pad, buffer);
>  }
> +
> +bool
> +gst_libcamera_pad_has_pending(GstPad *pad)
> +{
> +	auto *self = GST_LIBCAMERA_PAD(pad);
> +	GLibLocker lock(GST_OBJECT(self));
> +	return self->pending_buffers.length > 0;
> +}
> diff --git a/src/gstreamer/gstlibcamerapad.h b/src/gstreamer/gstlibcamerapad.h
> index 2e9ec20..9d43129 100644
> --- a/src/gstreamer/gstlibcamerapad.h
> +++ b/src/gstreamer/gstlibcamerapad.h
> @@ -30,4 +30,6 @@ void gst_libcamera_pad_queue_buffer(GstPad *pad, GstBuffer *buffer);
>  
>  GstFlowReturn gst_libcamera_pad_push_pending(GstPad *pad);
>  
> +bool gst_libcamera_pad_has_pending(GstPad *pad);
> +
>  #endif /* __GST_LIBCAMERA_PAD_H__ */
> diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
> index 5ffc004..e3718db 100644
> --- a/src/gstreamer/gstlibcamerasrc.cpp
> +++ b/src/gstreamer/gstlibcamerasrc.cpp
> @@ -14,8 +14,11 @@
>  
>  #include "gstlibcamerasrc.h"
>  
> +#include <queue>
>  #include <vector>
>  
> +#include <gst/base/base.h>
> +
>  #include <libcamera/camera.h>
>  #include <libcamera/camera_manager.h>
>  
> @@ -29,12 +32,71 @@ using namespace libcamera;
>  GST_DEBUG_CATEGORY_STATIC(source_debug);
>  #define GST_CAT_DEFAULT source_debug
>  
> +struct RequestWrap {
> +	RequestWrap(Request *request);
> +	~RequestWrap();
> +
> +	void attachBuffer(GstBuffer *buffer);
> +	GstBuffer *detachBuffer(Stream *stream);
> +
> +	/* For ptr comparison only. */
> +	Request *request_;
> +	std::map<Stream *, GstBuffer *> buffers_;
> +};
> +
> +RequestWrap::RequestWrap(Request *request)
> +	: request_(request)
> +{
> +}
> +
> +RequestWrap::~RequestWrap()
> +{
> +	for (std::pair<Stream *const, GstBuffer *> &item : buffers_) {
> +		if (item.second)
> +			gst_buffer_unref(item.second);
> +	}
> +}
> +
> +void RequestWrap::attachBuffer(GstBuffer *buffer)
> +{
> +	FrameBuffer *fb = gst_libcamera_buffer_get_frame_buffer(buffer);
> +	Stream *stream = gst_libcamera_buffer_get_stream(buffer);
> +
> +	request_->addBuffer(stream, fb);
> +
> +	auto item = buffers_.find(stream);
> +	if (item != buffers_.end()) {
> +		gst_buffer_unref(item->second);
> +		item->second = buffer;
> +	} else {
> +		buffers_[stream] = buffer;
> +	}
> +}
> +
> +GstBuffer *RequestWrap::detachBuffer(Stream *stream)
> +{
> +	GstBuffer *buffer = nullptr;
> +
> +	auto item = buffers_.find(stream);
> +	if (item != buffers_.end()) {
> +		buffer = item->second;
> +		item->second = nullptr;
> +	}
> +
> +	return buffer;
> +}
> +
>  /* Used for C++ object with destructors. */
>  struct GstLibcameraSrcState {
> +	GstLibcameraSrc *src_;
> +
>  	std::unique_ptr<CameraManager> cm_;
>  	std::shared_ptr<Camera> cam_;
>  	std::unique_ptr<CameraConfiguration> config_;
>  	std::vector<GstPad *> srcpads_;
> +	std::queue<std::unique_ptr<RequestWrap>> requests_;
> +
> +	void requestCompleted(Request *request);
>  };
>  
>  struct _GstLibcameraSrc {
> @@ -47,6 +109,7 @@ struct _GstLibcameraSrc {
>  
>  	GstLibcameraSrcState *state;
>  	GstLibcameraAllocator *allocator;
> +	GstFlowCombiner *flow_combiner;
>  };
>  
>  enum {
> @@ -70,6 +133,41 @@ GstStaticPadTemplate request_src_template = {
>  	"src_%s", GST_PAD_SRC, GST_PAD_REQUEST, TEMPLATE_CAPS
>  };
>  
> +void
> +GstLibcameraSrcState::requestCompleted(Request *request)
> +{
> +	GLibLocker lock(GST_OBJECT(src_));
> +
> +	GST_DEBUG_OBJECT(src_, "buffers are ready");
> +
> +	std::unique_ptr<RequestWrap> wrap = std::move(requests_.front());
> +	requests_.pop();
> +
> +	g_return_if_fail(wrap->request_ == request);
> +
> +	if ((request->status() == Request::RequestCancelled)) {
> +		GST_DEBUG_OBJECT(src_, "Request was cancelled");
> +		return;
> +	}
> +
> +	GstBuffer *buffer;
> +	for (GstPad *srcpad : srcpads_) {
> +		Stream *stream = gst_libcamera_pad_get_stream(srcpad);
> +		buffer = wrap->detachBuffer(stream);
> +		gst_libcamera_pad_queue_buffer(srcpad, buffer);
> +	}
> +
> +	{
> +		/* We only want to resume the task if it's paused. */
> +		GstTask *task = src_->task;
> +		GLibLocker lock(GST_OBJECT(task));
> +		if (GST_TASK_STATE(task) == GST_TASK_PAUSED) {
> +			GST_TASK_STATE(task) = GST_TASK_STARTED;
> +			GST_TASK_SIGNAL(task);
> +		}
> +	}
> +}
> +
>  static bool
>  gst_libcamera_src_open(GstLibcameraSrc *self)
>  {
> @@ -122,6 +220,8 @@ gst_libcamera_src_open(GstLibcameraSrc *self)
>  		return false;
>  	}
>  
> +	cam->requestCompleted.connect(self->state, &GstLibcameraSrcState::requestCompleted);
> +
>  	/* No need to lock here, we didn't start our threads yet. */
>  	self->state->cm_ = std::move(cm);
>  	self->state->cam_ = cam;
> @@ -133,8 +233,77 @@ static void
>  gst_libcamera_src_task_run(gpointer user_data)
>  {
>  	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
> +	GstLibcameraSrcState *state = self->state;
> +
> +	Request *request = state->cam_->createRequest();
> +	auto wrap = std::make_unique<RequestWrap>(request);
> +	for (GstPad *srcpad : state->srcpads_) {
> +		GstLibcameraPool *pool = gst_libcamera_pad_get_pool(srcpad);
> +		GstBuffer *buffer;
> +		GstFlowReturn ret;
> +
> +		ret = gst_buffer_pool_acquire_buffer(GST_BUFFER_POOL(pool),
> +						     &buffer, nullptr);
> +		if (ret != GST_FLOW_OK) {
> +			/*
> +			 * RequestWrap does not take ownership, and we won't be
> +			 * queueing this one due to lack of buffers.
> +			 */
> +			delete request;
> +			request = nullptr;
> +			break;
> +		}
> +
> +		wrap->attachBuffer(buffer);
> +	}
> +
> +	if (request) {
> +		GLibLocker lock(GST_OBJECT(self));
> +		GST_TRACE_OBJECT(self, "Requesting buffers");
> +		state->cam_->queueRequest(request);
> +		state->requests_.push(std::move(wrap));
> +	}
> +
> +	GstFlowReturn ret = GST_FLOW_OK;
> +	gst_flow_combiner_reset(self->flow_combiner);
> +	for (GstPad *srcpad : state->srcpads_) {
> +		ret = gst_libcamera_pad_push_pending(srcpad);
> +		ret = gst_flow_combiner_update_pad_flow(self->flow_combiner,
> +							srcpad, ret);
> +	}
>  
> -	GST_DEBUG_OBJECT(self, "Streaming thread is now capturing");
> +	{
> +		/*
> +		 * Here we need to decide if we want to pause or stop the task. This
> +		 * needs to happen in lock step with the callback thread which may want
> +		 * to resume the task.
> +		 */
> +		GLibLocker lock(GST_OBJECT(self));
> +		if (ret != GST_FLOW_OK) {
> +			if (ret == GST_FLOW_EOS) {
> +				g_autoptr(GstEvent) eos = gst_event_new_eos();
> +				guint32 seqnum = gst_util_seqnum_next();
> +				gst_event_set_seqnum(eos, seqnum);
> +				for (GstPad *srcpad : state->srcpads_)
> +					gst_pad_push_event(srcpad, gst_event_ref(eos));
> +			} else if (ret != GST_FLOW_FLUSHING) {
> +				GST_ELEMENT_FLOW_ERROR(self, ret);
> +			}
> +			gst_task_stop(self->task);
> +			return;
> +		}
> +
> +		bool do_pause = true;
> +		for (GstPad *srcpad : state->srcpads_) {
> +			if (gst_libcamera_pad_has_pending(srcpad)) {
> +				do_pause = false;
> +				break;
> +			}
> +		}
> +
> +		if (do_pause)
> +			gst_task_pause(self->task);
> +	}
>  }
>  
>  static void
> @@ -233,12 +402,23 @@ gst_libcamera_src_task_enter(GstTask *task, GThread *thread, gpointer user_data)
>  		return;
>  	}
>  
> +	self->flow_combiner = gst_flow_combiner_new();
>  	for (gsize i = 0; i < state->srcpads_.size(); i++) {
>  		GstPad *srcpad = state->srcpads_[i];
>  		const StreamConfiguration &stream_cfg = state->config_->at(i);
>  		GstLibcameraPool *pool = gst_libcamera_pool_new(self->allocator,
>  								stream_cfg.stream());
>  		gst_libcamera_pad_set_pool(srcpad, pool);
> +		gst_flow_combiner_add_pad(self->flow_combiner, srcpad);
> +	}
> +
> +	ret = state->cam_->start();
> +	if (ret) {
> +		GST_ELEMENT_ERROR(self, RESOURCE, SETTINGS,
> +				  ("Failed to start the camera: %s", g_strerror(-ret)),
> +				  ("Camera.start() failed with error code %i", ret));
> +		gst_task_stop(task);
> +		return;
>  	}
>  
>  done:
> @@ -260,10 +440,14 @@ gst_libcamera_src_task_leave(GstTask *task, GThread *thread, gpointer user_data)
>  
>  	GST_DEBUG_OBJECT(self, "Streaming thread is about to stop");
>  
> +	state->cam_->stop();
> +
>  	for (GstPad *srcpad : state->srcpads_)
>  		gst_libcamera_pad_set_pool(srcpad, NULL);
>  
>  	g_clear_object(&self->allocator);
> +	g_clear_pointer(&self->flow_combiner,
> +			(GDestroyNotify)gst_flow_combiner_free);
>  }
>  
>  static void
> @@ -343,6 +527,9 @@ gst_libcamera_src_change_state(GstElement *element, GstStateChange transition)
>  			return GST_STATE_CHANGE_FAILURE;
>  		ret = GST_STATE_CHANGE_NO_PREROLL;
>  		break;
> +	case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
> +		gst_task_start(self->task);
> +		break;
>  	case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
>  		ret = GST_STATE_CHANGE_NO_PREROLL;
>  		break;
> @@ -394,6 +581,9 @@ gst_libcamera_src_init(GstLibcameraSrc *self)
>  
>  	state->srcpads_.push_back(gst_pad_new_from_template(templ, "src"));
>  	gst_element_add_pad(GST_ELEMENT(self), state->srcpads_[0]);
> +
> +	/* C-style friend. */
> +	state->src_ = self;
>  	self->state = state;
>  }
>

Patch

diff --git a/src/gstreamer/gstlibcamerapad.cpp b/src/gstreamer/gstlibcamerapad.cpp
index 49dd35b..840f391 100644
--- a/src/gstreamer/gstlibcamerapad.cpp
+++ b/src/gstreamer/gstlibcamerapad.cpp
@@ -19,6 +19,7 @@  struct _GstLibcameraPad {
 	StreamRole role;
 	GstLibcameraPool *pool;
 	GQueue pending_buffers;
+	GstClockTime latency;
 };
 
 enum {
@@ -164,3 +165,11 @@  gst_libcamera_pad_push_pending(GstPad *pad)
 
 	return gst_pad_push(pad, buffer);
 }
+
+bool
+gst_libcamera_pad_has_pending(GstPad *pad)
+{
+	auto *self = GST_LIBCAMERA_PAD(pad);
+	GLibLocker lock(GST_OBJECT(self));
+	return self->pending_buffers.length > 0;
+}
diff --git a/src/gstreamer/gstlibcamerapad.h b/src/gstreamer/gstlibcamerapad.h
index 2e9ec20..9d43129 100644
--- a/src/gstreamer/gstlibcamerapad.h
+++ b/src/gstreamer/gstlibcamerapad.h
@@ -30,4 +30,6 @@  void gst_libcamera_pad_queue_buffer(GstPad *pad, GstBuffer *buffer);
 
 GstFlowReturn gst_libcamera_pad_push_pending(GstPad *pad);
 
+bool gst_libcamera_pad_has_pending(GstPad *pad);
+
 #endif /* __GST_LIBCAMERA_PAD_H__ */
diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
index 5ffc004..e3718db 100644
--- a/src/gstreamer/gstlibcamerasrc.cpp
+++ b/src/gstreamer/gstlibcamerasrc.cpp
@@ -14,8 +14,11 @@ 
 
 #include "gstlibcamerasrc.h"
 
+#include <queue>
 #include <vector>
 
+#include <gst/base/base.h>
+
 #include <libcamera/camera.h>
 #include <libcamera/camera_manager.h>
 
@@ -29,12 +32,71 @@  using namespace libcamera;
 GST_DEBUG_CATEGORY_STATIC(source_debug);
 #define GST_CAT_DEFAULT source_debug
 
+struct RequestWrap {
+	RequestWrap(Request *request);
+	~RequestWrap();
+
+	void attachBuffer(GstBuffer *buffer);
+	GstBuffer *detachBuffer(Stream *stream);
+
+	/* For ptr comparison only. */
+	Request *request_;
+	std::map<Stream *, GstBuffer *> buffers_;
+};
+
+RequestWrap::RequestWrap(Request *request)
+	: request_(request)
+{
+}
+
+RequestWrap::~RequestWrap()
+{
+	for (std::pair<Stream *const, GstBuffer *> &item : buffers_) {
+		if (item.second)
+			gst_buffer_unref(item.second);
+	}
+}
+
+void RequestWrap::attachBuffer(GstBuffer *buffer)
+{
+	FrameBuffer *fb = gst_libcamera_buffer_get_frame_buffer(buffer);
+	Stream *stream = gst_libcamera_buffer_get_stream(buffer);
+
+	request_->addBuffer(stream, fb);
+
+	auto item = buffers_.find(stream);
+	if (item != buffers_.end()) {
+		gst_buffer_unref(item->second);
+		item->second = buffer;
+	} else {
+		buffers_[stream] = buffer;
+	}
+}
+
+GstBuffer *RequestWrap::detachBuffer(Stream *stream)
+{
+	GstBuffer *buffer = nullptr;
+
+	auto item = buffers_.find(stream);
+	if (item != buffers_.end()) {
+		buffer = item->second;
+		item->second = nullptr;
+	}
+
+	return buffer;
+}
+
 /* Used for C++ object with destructors. */
 struct GstLibcameraSrcState {
+	GstLibcameraSrc *src_;
+
 	std::unique_ptr<CameraManager> cm_;
 	std::shared_ptr<Camera> cam_;
 	std::unique_ptr<CameraConfiguration> config_;
 	std::vector<GstPad *> srcpads_;
+	std::queue<std::unique_ptr<RequestWrap>> requests_;
+
+	void requestCompleted(Request *request);
 };
 
 struct _GstLibcameraSrc {
@@ -47,6 +109,7 @@  struct _GstLibcameraSrc {
 
 	GstLibcameraSrcState *state;
 	GstLibcameraAllocator *allocator;
+	GstFlowCombiner *flow_combiner;
 };
 
 enum {
@@ -70,6 +133,41 @@  GstStaticPadTemplate request_src_template = {
 	"src_%s", GST_PAD_SRC, GST_PAD_REQUEST, TEMPLATE_CAPS
 };
 
+void
+GstLibcameraSrcState::requestCompleted(Request *request)
+{
+	GLibLocker lock(GST_OBJECT(src_));
+
+	GST_DEBUG_OBJECT(src_, "buffers are ready");
+
+	std::unique_ptr<RequestWrap> wrap = std::move(requests_.front());
+	requests_.pop();
+
+	g_return_if_fail(wrap->request_ == request);
+
+	if ((request->status() == Request::RequestCancelled)) {
+		GST_DEBUG_OBJECT(src_, "Request was cancelled");
+		return;
+	}
+
+	GstBuffer *buffer;
+	for (GstPad *srcpad : srcpads_) {
+		Stream *stream = gst_libcamera_pad_get_stream(srcpad);
+		buffer = wrap->detachBuffer(stream);
+		gst_libcamera_pad_queue_buffer(srcpad, buffer);
+	}
+
+	{
+		/* We only want to resume the task if it's paused. */
+		GstTask *task = src_->task;
+		GLibLocker lock(GST_OBJECT(task));
+		if (GST_TASK_STATE(task) == GST_TASK_PAUSED) {
+			GST_TASK_STATE(task) = GST_TASK_STARTED;
+			GST_TASK_SIGNAL(task);
+		}
+	}
+}
+
 static bool
 gst_libcamera_src_open(GstLibcameraSrc *self)
 {
@@ -122,6 +220,8 @@  gst_libcamera_src_open(GstLibcameraSrc *self)
 		return false;
 	}
 
+	cam->requestCompleted.connect(self->state, &GstLibcameraSrcState::requestCompleted);
+
 	/* No need to lock here, we didn't start our threads yet. */
 	self->state->cm_ = std::move(cm);
 	self->state->cam_ = cam;
@@ -133,8 +233,77 @@  static void
 gst_libcamera_src_task_run(gpointer user_data)
 {
 	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
+	GstLibcameraSrcState *state = self->state;
+
+	Request *request = state->cam_->createRequest();
+	auto wrap = std::make_unique<RequestWrap>(request);
+	for (GstPad *srcpad : state->srcpads_) {
+		GstLibcameraPool *pool = gst_libcamera_pad_get_pool(srcpad);
+		GstBuffer *buffer;
+		GstFlowReturn ret;
+
+		ret = gst_buffer_pool_acquire_buffer(GST_BUFFER_POOL(pool),
+						     &buffer, nullptr);
+		if (ret != GST_FLOW_OK) {
+			/*
+			 * RequestWrap does not take ownership, and we won't be
+			 * queueing this one due to lack of buffers.
+			 */
+			delete request;
+			request = nullptr;
+			break;
+		}
+
+		wrap->attachBuffer(buffer);
+	}
+
+	if (request) {
+		GLibLocker lock(GST_OBJECT(self));
+		GST_TRACE_OBJECT(self, "Requesting buffers");
+		state->cam_->queueRequest(request);
+		state->requests_.push(std::move(wrap));
+	}
+
+	GstFlowReturn ret = GST_FLOW_OK;
+	gst_flow_combiner_reset(self->flow_combiner);
+	for (GstPad *srcpad : state->srcpads_) {
+		ret = gst_libcamera_pad_push_pending(srcpad);
+		ret = gst_flow_combiner_update_pad_flow(self->flow_combiner,
+							srcpad, ret);
+	}
 
-	GST_DEBUG_OBJECT(self, "Streaming thread is now capturing");
+	{
+		/*
+		 * Here we need to decide if we want to pause or stop the task. This
+		 * needs to happen in lock step with the callback thread which may want
+		 * to resume the task.
+		 */
+		GLibLocker lock(GST_OBJECT(self));
+		if (ret != GST_FLOW_OK) {
+			if (ret == GST_FLOW_EOS) {
+				g_autoptr(GstEvent) eos = gst_event_new_eos();
+				guint32 seqnum = gst_util_seqnum_next();
+				gst_event_set_seqnum(eos, seqnum);
+				for (GstPad *srcpad : state->srcpads_)
+					gst_pad_push_event(srcpad, gst_event_ref(eos));
+			} else if (ret != GST_FLOW_FLUSHING) {
+				GST_ELEMENT_FLOW_ERROR(self, ret);
+			}
+			gst_task_stop(self->task);
+			return;
+		}
+
+		bool do_pause = true;
+		for (GstPad *srcpad : state->srcpads_) {
+			if (gst_libcamera_pad_has_pending(srcpad)) {
+				do_pause = false;
+				break;
+			}
+		}
+
+		if (do_pause)
+			gst_task_pause(self->task);
+	}
 }
 
 static void
@@ -233,12 +402,23 @@  gst_libcamera_src_task_enter(GstTask *task, GThread *thread, gpointer user_data)
 		return;
 	}
 
+	self->flow_combiner = gst_flow_combiner_new();
 	for (gsize i = 0; i < state->srcpads_.size(); i++) {
 		GstPad *srcpad = state->srcpads_[i];
 		const StreamConfiguration &stream_cfg = state->config_->at(i);
 		GstLibcameraPool *pool = gst_libcamera_pool_new(self->allocator,
 								stream_cfg.stream());
 		gst_libcamera_pad_set_pool(srcpad, pool);
+		gst_flow_combiner_add_pad(self->flow_combiner, srcpad);
+	}
+
+	ret = state->cam_->start();
+	if (ret) {
+		GST_ELEMENT_ERROR(self, RESOURCE, SETTINGS,
+				  ("Failed to start the camera: %s", g_strerror(-ret)),
+				  ("Camera.start() failed with error code %i", ret));
+		gst_task_stop(task);
+		return;
 	}
 
 done:
@@ -260,10 +440,14 @@  gst_libcamera_src_task_leave(GstTask *task, GThread *thread, gpointer user_data)
 
 	GST_DEBUG_OBJECT(self, "Streaming thread is about to stop");
 
+	state->cam_->stop();
+
 	for (GstPad *srcpad : state->srcpads_)
 		gst_libcamera_pad_set_pool(srcpad, NULL);
 
 	g_clear_object(&self->allocator);
+	g_clear_pointer(&self->flow_combiner,
+			(GDestroyNotify)gst_flow_combiner_free);
 }
 
 static void
@@ -343,6 +527,9 @@  gst_libcamera_src_change_state(GstElement *element, GstStateChange transition)
 			return GST_STATE_CHANGE_FAILURE;
 		ret = GST_STATE_CHANGE_NO_PREROLL;
 		break;
+	case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+		gst_task_start(self->task);
+		break;
 	case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
 		ret = GST_STATE_CHANGE_NO_PREROLL;
 		break;
@@ -394,6 +581,9 @@  gst_libcamera_src_init(GstLibcameraSrc *self)
 
 	state->srcpads_.push_back(gst_pad_new_from_template(templ, "src"));
 	gst_element_add_pad(GST_ELEMENT(self), state->srcpads_[0]);
+
+	/* C-style friend. */
+	state->src_ = self;
 	self->state = state;
 }