diff --git a/src/gstreamer/gstlibcamerapad.cpp b/src/gstreamer/gstlibcamerapad.cpp
index 8662c92..2cf1630 100644
--- a/src/gstreamer/gstlibcamerapad.cpp
+++ b/src/gstreamer/gstlibcamerapad.cpp
@@ -18,6 +18,7 @@ struct _GstLibcameraPad {
 	StreamRole role;
 	GstLibcameraPool *pool;
 	GQueue pending_buffers;
+	GstClockTime latency;
 };
 
 enum {
@@ -159,7 +160,15 @@ gst_libcamera_pad_push_pending(GstPad *pad)
 	}
 
 	if (!buffer)
-		return GST_FLOW_CUSTOM_SUCCESS;
+		return GST_FLOW_OK;
 
 	return gst_pad_push(pad, buffer);
 }
+
+bool
+gst_libcamera_pad_has_pending(GstPad *pad)
+{
+	auto *self = GST_LIBCAMERA_PAD(pad);
+	GLibLocker lock(GST_OBJECT(self));
+	return (self->pending_buffers.length > 0);
+}
diff --git a/src/gstreamer/gstlibcamerapad.h b/src/gstreamer/gstlibcamerapad.h
index d928570..eb24000 100644
--- a/src/gstreamer/gstlibcamerapad.h
+++ b/src/gstreamer/gstlibcamerapad.h
@@ -30,4 +30,6 @@ void gst_libcamera_pad_queue_buffer(GstPad *pad, GstBuffer *buffer);
 
 GstFlowReturn gst_libcamera_pad_push_pending(GstPad *pad);
 
+bool gst_libcamera_pad_has_pending(GstPad *pad);
+
 #endif /* __GST_LIBCAMERA_PAD_H__ */
diff --git a/src/gstreamer/gstlibcamerasrc.cpp b/src/gstreamer/gstlibcamerasrc.cpp
index 83f93cc..70f2048 100644
--- a/src/gstreamer/gstlibcamerasrc.cpp
+++ b/src/gstreamer/gstlibcamerasrc.cpp
@@ -18,8 +18,10 @@
 #include "gstlibcamerapool.h"
 #include "gstlibcamerasrc.h"
 
+#include <gst/base/base.h>
 #include <libcamera/camera.h>
 #include <libcamera/camera_manager.h>
+#include <queue>
 #include <vector>
 
 using namespace libcamera;
@@ -27,12 +29,71 @@ using namespace libcamera;
 GST_DEBUG_CATEGORY_STATIC(source_debug);
 #define GST_CAT_DEFAULT source_debug
 
+struct RequestWrap {
+	RequestWrap(Request *request);
+	~RequestWrap();
+
+	void attachBuffer(GstBuffer *buffer);
+	GstBuffer *detachBuffer(Stream *stream);
+
+	/* For ptr comparision only. */
+	Request *request_;
+	std::map<Stream *, GstBuffer *> buffers_;
+};
+
+RequestWrap::RequestWrap(Request *request)
+	: request_(request)
+{
+}
+
+RequestWrap::~RequestWrap()
+{
+	for (std::pair<Stream *const, GstBuffer *> &item : buffers_) {
+		if (item.second)
+			gst_buffer_unref(item.second);
+	}
+}
+
+void RequestWrap::attachBuffer(GstBuffer *buffer)
+{
+	FrameBuffer *fb = gst_libcamera_buffer_get_frame_buffer(buffer);
+	Stream *stream = gst_libcamera_buffer_get_stream(buffer);
+
+	request_->addBuffer(stream, fb);
+
+	auto item = buffers_.find(stream);
+	if (item != buffers_.end()) {
+		gst_buffer_unref(item->second);
+		item->second = buffer;
+	} else {
+		buffers_[stream] = buffer;
+	}
+}
+
+GstBuffer *RequestWrap::detachBuffer(Stream *stream)
+{
+	GstBuffer *buffer = nullptr;
+
+	auto item = buffers_.find(stream);
+	if (item != buffers_.end()) {
+		buffer = item->second;
+		item->second = nullptr;
+	}
+
+	return buffer;
+}
+
 /* Used for C++ object with destructors. */
 struct GstLibcameraSrcState {
+	GstLibcameraSrc *src;
+
 	std::unique_ptr<CameraManager> cm;
 	std::shared_ptr<Camera> cam;
 	std::unique_ptr<CameraConfiguration> config;
 	std::vector<GstPad *> srcpads;
+	std::queue<std::unique_ptr<RequestWrap>> requests;
+
+	void requestCompleted(Request *request);
 };
 
 struct _GstLibcameraSrc {
@@ -45,6 +106,7 @@ struct _GstLibcameraSrc {
 
 	GstLibcameraSrcState *state;
 	GstLibcameraAllocator *allocator;
+	GstFlowCombiner *flow_combiner;
 };
 
 enum {
@@ -68,6 +130,41 @@ GstStaticPadTemplate request_src_template = {
 	"src_%s", GST_PAD_SRC, GST_PAD_REQUEST, TEMPLATE_CAPS
 };
 
+void
+GstLibcameraSrcState::requestCompleted(Request *request)
+{
+	GLibLocker lock(GST_OBJECT(this->src));
+
+	GST_DEBUG_OBJECT(this->src, "buffers are ready");
+
+	std::unique_ptr<RequestWrap> wrap = std::move(this->requests.front());
+	this->requests.pop();
+
+	g_return_if_fail(wrap->request_ == request);
+
+	if ((request->status() == Request::RequestCancelled)) {
+		GST_DEBUG_OBJECT(this->src, "Request was cancelled");
+		return;
+	}
+
+	GstBuffer *buffer;
+	for (GstPad *srcpad : this->srcpads) {
+		Stream *stream = gst_libcamera_pad_get_stream(srcpad);
+		buffer = wrap->detachBuffer(stream);
+		gst_libcamera_pad_queue_buffer(srcpad, buffer);
+	}
+
+	{
+		/* We only want to resume the task if it's paused. */
+		GstTask *task = this->src->task;
+		GLibLocker lock(GST_OBJECT(task));
+		if (GST_TASK_STATE(task) == GST_TASK_PAUSED) {
+			GST_TASK_STATE(task) = GST_TASK_STARTED;
+			GST_TASK_SIGNAL(task);
+		}
+	}
+}
+
 static bool
 gst_libcamera_src_open(GstLibcameraSrc *self)
 {
@@ -120,6 +217,8 @@ gst_libcamera_src_open(GstLibcameraSrc *self)
 		return false;
 	}
 
+	cam->requestCompleted.connect(self->state, &GstLibcameraSrcState::requestCompleted);
+
 	/* No need to lock here, we didn't start our threads yet. */
 	self->state->cm = std::move(cm);
 	self->state->cam = cam;
@@ -131,17 +230,85 @@ static void
 gst_libcamera_src_task_run(gpointer user_data)
 {
 	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
+	GstLibcameraSrcState *state = self->state;
+
+	Request *request = new Request(state->cam.get());
+	auto wrap = std::make_unique<RequestWrap>(request);
+	for (GstPad *srcpad : state->srcpads) {
+		GstLibcameraPool *pool = gst_libcamera_pad_get_pool(srcpad);
+		GstBuffer *buffer;
+		GstFlowReturn ret;
+
+		ret = gst_buffer_pool_acquire_buffer(GST_BUFFER_POOL(pool),
+						     &buffer, nullptr);
+		if (ret != GST_FLOW_OK) {
+			/* RequestWrap does not take ownership, and we won't be
+			 * queueing this one due to lack of buffers. */
+			delete request;
+			request = NULL;
+			break;
+		}
+
+		wrap->attachBuffer(buffer);
+	}
+
+	if (request) {
+		GLibLocker lock(GST_OBJECT(self));
+		GST_TRACE_OBJECT(self, "Requesting buffers");
+		state->cam->queueRequest(request);
+		state->requests.push(std::move(wrap));
+	}
+
+	GstFlowReturn ret = GST_FLOW_OK;
+	gst_flow_combiner_reset(self->flow_combiner);
+	for (GstPad *srcpad : state->srcpads) {
+		ret = gst_libcamera_pad_push_pending(srcpad);
+		ret = gst_flow_combiner_update_pad_flow(self->flow_combiner,
+							srcpad, ret);
+	}
 
-	GST_DEBUG_OBJECT(self, "Streaming thread is now capturing");
+	{
+		/*
+		 * Here we need to decide if we want to pause or stop the task. This
+		 * needs to happen in lock step with the callback thread which may want
+		 * to resume the task.
+		 */
+		GLibLocker lock(GST_OBJECT(self));
+		if (ret != GST_FLOW_OK) {
+			if (ret == GST_FLOW_EOS) {
+				g_autoptr(GstEvent) eos = gst_event_new_eos();
+				guint32 seqnum = gst_util_seqnum_next();
+				gst_event_set_seqnum(eos, seqnum);
+				for (GstPad *srcpad : state->srcpads)
+					gst_pad_push_event(srcpad, gst_event_ref(eos));
+			} else if (ret != GST_FLOW_FLUSHING) {
+				GST_ELEMENT_FLOW_ERROR(self, ret);
+			}
+			gst_task_stop(self->task);
+			return;
+		}
+
+		gboolean do_pause = true;
+		for (GstPad *srcpad : state->srcpads) {
+			if (gst_libcamera_pad_has_pending(srcpad)) {
+				do_pause = false;
+				break;
+			}
+		}
+
+		if (do_pause)
+			gst_task_pause(self->task);
+	}
 }
 
 static void
 gst_libcamera_src_task_enter(GstTask *task, GThread *thread, gpointer user_data)
 {
 	GstLibcameraSrc *self = GST_LIBCAMERA_SRC(user_data);
-	GLibRecLocker(&self->stream_lock);
+	GLibRecLocker lock(&self->stream_lock);
 	GstLibcameraSrcState *state = self->state;
 	GstFlowReturn flow_ret = GST_FLOW_OK;
+	gint ret = 0;
 
 	GST_DEBUG_OBJECT(self, "Streaming thread has started");
 
@@ -230,12 +397,23 @@ gst_libcamera_src_task_enter(GstTask *task, GThread *thread, gpointer user_data)
 		return;
 	}
 
+	self->flow_combiner = gst_flow_combiner_new();
 	for (gsize i = 0; i < state->srcpads.size(); i++) {
 		GstPad *srcpad = state->srcpads[i];
 		const StreamConfiguration &stream_cfg = state->config->at(i);
 		GstLibcameraPool *pool = gst_libcamera_pool_new(self->allocator,
 								stream_cfg.stream());
 		gst_libcamera_pad_set_pool(srcpad, pool);
+		gst_flow_combiner_add_pad(self->flow_combiner, srcpad);
+	}
+
+	ret = state->cam->start();
+	if (ret) {
+		GST_ELEMENT_ERROR(self, RESOURCE, SETTINGS,
+				  ("Failed to start the camera: %s", g_strerror(-ret)),
+				  ("Camera.start() failed with error code %i", ret));
+		gst_task_stop(task);
+		return;
 	}
 
 done:
@@ -257,10 +435,14 @@ gst_libcamera_src_task_leave(GstTask *task, GThread *thread, gpointer user_data)
 
 	GST_DEBUG_OBJECT(self, "Streaming thread is about to stop");
 
+	state->cam->stop();
+
 	for (GstPad *srcpad : state->srcpads)
 		gst_libcamera_pad_set_pool(srcpad, NULL);
 
 	g_clear_object(&self->allocator);
+	g_clear_pointer(&self->flow_combiner,
+			(GDestroyNotify)gst_flow_combiner_free);
 }
 
 static void
@@ -340,6 +522,9 @@ gst_libcamera_src_change_state(GstElement *element, GstStateChange transition)
 			return GST_STATE_CHANGE_FAILURE;
 		ret = GST_STATE_CHANGE_NO_PREROLL;
 		break;
+	case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+		gst_task_start(self->task);
+		break;
 	case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
 		ret = GST_STATE_CHANGE_NO_PREROLL;
 		break;
@@ -389,6 +574,9 @@ gst_libcamera_src_init(GstLibcameraSrc *self)
 
 	state->srcpads.push_back(gst_pad_new_from_template(templ, "src"));
 	gst_element_add_pad(GST_ELEMENT(self), state->srcpads[0]);
+
+	/* C-style friend. */
+	state->src = self;
 	self->state = state;
 }
 
