@@ -31,6 +31,134 @@ class BaseTestCase(unittest.TestCase):
self.assertTrue(all([not wr() for wr in wr_list]), msg)
+# Test references and keep-alives by doing a capture, reusing the Requests once
+# and testing that the objects are freed as soon as all the refs and keep-alives
+# are gone.
+class CaptureRefTestMethods(BaseTestCase):
+ def test_ref(self):
+ cm = libcam.CameraManager.singleton()
+ wr_cm = weakref.ref(cm)
+
+ cam = cm.get('platform/vimc.0 Sensor B')
+ self.assertIsNotNone(cam)
+ wr_cam = weakref.ref(cam)
+
+ cam.acquire()
+
+ camconfig = cam.generate_configuration([libcam.StreamRole.StillCapture])
+ self.assertTrue(camconfig.size == 1)
+ wr_camconfig = weakref.ref(camconfig)
+
+ streamconfig = camconfig.at(0)
+ wr_streamconfig = weakref.ref(streamconfig)
+
+ cam.configure(camconfig)
+
+ stream = streamconfig.stream
+ wr_stream = weakref.ref(stream)
+
+ # stream keeps streamconfig and camconfig alive
+ del streamconfig
+ del camconfig
+ gc.collect()
+ self.assertIsAlive(wr_camconfig)
+ self.assertIsAlive(wr_streamconfig)
+
+ allocator = libcam.FrameBufferAllocator(cam)
+ num_bufs = allocator.allocate(stream)
+ self.assertTrue(num_bufs > 0)
+ wr_allocator = weakref.ref(allocator)
+
+ buffers = allocator.buffers(stream)
+ self.assertIsNotNone(buffers)
+
+ wr_buffers = [weakref.ref(b) for b in buffers]
+
+ del allocator
+ self.assertIsAlive(wr_allocator)
+
+ reqs = []
+ wr_reqs = []
+ for i in range(num_bufs):
+ req = cam.create_request(i)
+ self.assertIsNotNone(req)
+
+ wr_reqs.append(weakref.ref(req))
+
+ req.add_buffer(stream, buffers[i])
+
+ reqs.append(req)
+
+ del buffers
+ del stream
+
+ self.assertIsDead(wr_stream)
+
+ cam.start()
+
+ reqs_target = num_bufs * 2
+ reqs_queued = 0
+ reqs_captured = 0
+
+ for req in reqs:
+ cam.queue_request(req)
+ reqs_queued += 1
+
+ del req
+ del reqs
+
+ # All buffers and reqs should be alive
+ self.assertIsAllAlive(wr_buffers)
+ self.assertIsAllAlive(wr_reqs)
+
+ sel = selectors.DefaultSelector()
+ sel.register(cm.event_fd, selectors.EVENT_READ)
+
+ while True:
+ events = sel.select()
+ if not events:
+ continue
+ del events
+
+ for ev in cm.get_events():
+ self.assertEqual(ev.type, libcam.Event.Type.RequestCompleted)
+
+ reqs_captured += 1
+ self.assertLessEqual(reqs_captured, reqs_target)
+
+ if reqs_queued < reqs_target:
+ req: libcam.Request = typing.cast(libcam.Request, ev.request)
+ req.reuse()
+ cam.queue_request(req)
+ reqs_queued += 1
+ del req
+
+ del ev
+
+ if reqs_captured == reqs_target:
+ break
+
+ del sel
+
+ # The allocator and all buffers and reqs should be dead
+ self.assertIsAllDead(wr_buffers)
+ self.assertIsAllDead(wr_reqs)
+ self.assertIsDead(wr_allocator)
+
+ events = cam.stop()
+ self.assertZero(len(events))
+ del events
+ cam.release()
+
+ del cm
+ self.assertIsAlive(wr_cm)
+ self.assertIsAlive(wr_cam)
+
+ del cam
+ self.assertIsDead(wr_cam)
+ self.assertIsDead(wr_cm)
+
+
class SimpleTestMethods(BaseTestCase):
def test_get_ref(self):
cm = libcam.CameraManager.singleton()