[libcamera-devel,v8,6/7] py: add unittests.py
diff mbox series

Message ID 20220506145414.99039-7-tomi.valkeinen@ideasonboard.com
State Superseded
Headers show
Series
  • Python bindings
Related show

Commit Message

Tomi Valkeinen May 6, 2022, 2:54 p.m. UTC
Add a simple unittests.py as a base for python unittests.

Signed-off-by: Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
Reviewed-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
---
 test/meson.build     |   1 +
 test/py/meson.build  |  17 +++
 test/py/unittests.py | 352 +++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 370 insertions(+)
 create mode 100644 test/py/meson.build
 create mode 100755 test/py/unittests.py

Patch
diff mbox series

diff --git a/test/meson.build b/test/meson.build
index fd4c5ca0..623f3baa 100644
--- a/test/meson.build
+++ b/test/meson.build
@@ -18,6 +18,7 @@  subdir('log')
 subdir('media_device')
 subdir('pipeline')
 subdir('process')
+subdir('py')
 subdir('serialization')
 subdir('stream')
 subdir('v4l2_compat')
diff --git a/test/py/meson.build b/test/py/meson.build
new file mode 100644
index 00000000..2affdbd4
--- /dev/null
+++ b/test/py/meson.build
@@ -0,0 +1,17 @@ 
+# SPDX-License-Identifier: CC0-1.0
+
+if not pycamera_enabled
+    subdir_done()
+endif
+
+pymod = import('python')
+py3 = pymod.find_installation('python3')
+
+pypathdir = meson.project_build_root() / 'src' / 'py'
+
+test('pyunittests',
+     py3,
+     args : files('unittests.py'),
+     env : ['PYTHONPATH=' + pypathdir],
+     suite : 'pybindings',
+     is_parallel : false)
diff --git a/test/py/unittests.py b/test/py/unittests.py
new file mode 100755
index 00000000..cbc00ff3
--- /dev/null
+++ b/test/py/unittests.py
@@ -0,0 +1,352 @@ 
+#!/usr/bin/env python3
+
+# SPDX-License-Identifier: GPL-2.0-or-later
+# Copyright (C) 2022, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+
+from collections import defaultdict
+import errno
+import gc
+import libcamera as libcam
+import os
+import selectors
+import time
+import unittest
+import weakref
+
+
+class BaseTestCase(unittest.TestCase):
+    def assertZero(self, a, msg=None):
+        self.assertEqual(a, 0, msg)
+
+
+class SimpleTestMethods(BaseTestCase):
+    def test_get_ref(self):
+        cm = libcam.CameraManager.singleton()
+        wr_cm = weakref.ref(cm)
+
+        cam = cm.get('platform/vimc.0 Sensor B')
+        self.assertIsNotNone(cam)
+        wr_cam = weakref.ref(cam)
+
+        cm = None
+        gc.collect()
+        self.assertIsNotNone(wr_cm())
+
+        cam = None
+        gc.collect()
+        self.assertIsNone(wr_cm())
+        self.assertIsNone(wr_cam())
+
+    def test_acquire_release(self):
+        cm = libcam.CameraManager.singleton()
+        cam = cm.get('platform/vimc.0 Sensor B')
+        self.assertIsNotNone(cam)
+
+        ret = cam.acquire()
+        self.assertZero(ret)
+
+        ret = cam.release()
+        self.assertZero(ret)
+
+    def test_double_acquire(self):
+        cm = libcam.CameraManager.singleton()
+        cam = cm.get('platform/vimc.0 Sensor B')
+        self.assertIsNotNone(cam)
+
+        ret = cam.acquire()
+        self.assertZero(ret)
+
+        libcam.log_set_level('Camera', 'FATAL')
+        ret = cam.acquire()
+        self.assertEqual(ret, -errno.EBUSY)
+        libcam.log_set_level('Camera', 'ERROR')
+
+        ret = cam.release()
+        self.assertZero(ret)
+
+        ret = cam.release()
+        # I expected EBUSY, but looks like double release works fine
+        self.assertZero(ret)
+
+
+class CameraTesterBase(BaseTestCase):
+    def setUp(self):
+        self.cm = libcam.CameraManager.singleton()
+        self.cam = next((cam for cam in self.cm.cameras if 'platform/vimc' in cam.id), None)
+        if self.cam is None:
+            self.cm = None
+            self.skipTest('No vimc found')
+
+        ret = self.cam.acquire()
+        if ret != 0:
+            self.cam = None
+            self.cm = None
+            raise Exception('Failed to acquire camera')
+
+    def tearDown(self):
+        # If a test fails, the camera may be in running state. So always stop.
+        self.cam.stop()
+
+        ret = self.cam.release()
+        if ret != 0:
+            raise Exception('Failed to release camera')
+
+        self.cam = None
+        self.cm = None
+
+
+class AllocatorTestMethods(CameraTesterBase):
+    def test_allocator(self):
+        cam = self.cam
+
+        camconfig = cam.generate_configuration([libcam.StreamRole.StillCapture])
+        self.assertTrue(camconfig.size == 1)
+        wr_camconfig = weakref.ref(camconfig)
+
+        streamconfig = camconfig.at(0)
+        wr_streamconfig = weakref.ref(streamconfig)
+
+        ret = cam.configure(camconfig)
+        self.assertZero(ret)
+
+        stream = streamconfig.stream
+        wr_stream = weakref.ref(stream)
+
+        # stream should keep streamconfig and camconfig alive
+        streamconfig = None
+        camconfig = None
+        gc.collect()
+        self.assertIsNotNone(wr_camconfig())
+        self.assertIsNotNone(wr_streamconfig())
+
+        allocator = libcam.FrameBufferAllocator(cam)
+        ret = allocator.allocate(stream)
+        self.assertTrue(ret > 0)
+        wr_allocator = weakref.ref(allocator)
+
+        buffers = allocator.buffers(stream)
+        buffers = None
+
+        buffer = allocator.buffers(stream)[0]
+        self.assertIsNotNone(buffer)
+        wr_buffer = weakref.ref(buffer)
+
+        allocator = None
+        gc.collect()
+        self.assertIsNotNone(wr_buffer())
+        self.assertIsNotNone(wr_allocator())
+        self.assertIsNotNone(wr_stream())
+
+        buffer = None
+        gc.collect()
+        self.assertIsNone(wr_buffer())
+        self.assertIsNone(wr_allocator())
+        self.assertIsNotNone(wr_stream())
+
+        stream = None
+        gc.collect()
+        self.assertIsNone(wr_stream())
+        self.assertIsNone(wr_camconfig())
+        self.assertIsNone(wr_streamconfig())
+
+
+class SimpleCaptureMethods(CameraTesterBase):
+    def test_sleep(self):
+        cm = self.cm
+        cam = self.cam
+
+        camconfig = cam.generate_configuration([libcam.StreamRole.StillCapture])
+        self.assertTrue(camconfig.size == 1)
+
+        streamconfig = camconfig.at(0)
+        fmts = streamconfig.formats
+
+        ret = cam.configure(camconfig)
+        self.assertZero(ret)
+
+        stream = streamconfig.stream
+
+        allocator = libcam.FrameBufferAllocator(cam)
+        ret = allocator.allocate(stream)
+        self.assertTrue(ret > 0)
+
+        num_bufs = len(allocator.buffers(stream))
+
+        reqs = []
+        for i in range(num_bufs):
+            req = cam.create_request(i)
+            self.assertIsNotNone(req)
+
+            buffer = allocator.buffers(stream)[i]
+            ret = req.add_buffer(stream, buffer)
+            self.assertZero(ret)
+
+            reqs.append(req)
+
+        buffer = None
+
+        ret = cam.start()
+        self.assertZero(ret)
+
+        for req in reqs:
+            ret = cam.queue_request(req)
+            self.assertZero(ret)
+
+        reqs = None
+        gc.collect()
+
+        time.sleep(0.5)
+
+        reqs = cm.get_ready_requests()
+
+        self.assertTrue(len(reqs) == num_bufs)
+
+        for i, req in enumerate(reqs):
+            self.assertTrue(i == req.cookie)
+
+        reqs = None
+        gc.collect()
+
+        ret = cam.stop()
+        self.assertZero(ret)
+
+    def test_select(self):
+        cm = self.cm
+        cam = self.cam
+
+        camconfig = cam.generate_configuration([libcam.StreamRole.StillCapture])
+        self.assertTrue(camconfig.size == 1)
+
+        streamconfig = camconfig.at(0)
+        fmts = streamconfig.formats
+
+        ret = cam.configure(camconfig)
+        self.assertZero(ret)
+
+        stream = streamconfig.stream
+
+        allocator = libcam.FrameBufferAllocator(cam)
+        ret = allocator.allocate(stream)
+        self.assertTrue(ret > 0)
+
+        num_bufs = len(allocator.buffers(stream))
+
+        reqs = []
+        for i in range(num_bufs):
+            req = cam.create_request(i)
+            self.assertIsNotNone(req)
+
+            buffer = allocator.buffers(stream)[i]
+            ret = req.add_buffer(stream, buffer)
+            self.assertZero(ret)
+
+            reqs.append(req)
+
+        buffer = None
+
+        ret = cam.start()
+        self.assertZero(ret)
+
+        for req in reqs:
+            ret = cam.queue_request(req)
+            self.assertZero(ret)
+
+        reqs = None
+        gc.collect()
+
+        sel = selectors.DefaultSelector()
+        sel.register(cm.efd, selectors.EVENT_READ)
+
+        reqs = []
+
+        running = True
+        while running:
+            events = sel.select()
+            for key, mask in events:
+                os.read(key.fileobj, 8)
+
+                ready_reqs = cm.get_ready_requests()
+
+                self.assertTrue(len(ready_reqs) > 0)
+
+                reqs += ready_reqs
+
+                if len(reqs) == num_bufs:
+                    running = False
+
+        self.assertTrue(len(reqs) == num_bufs)
+
+        for i, req in enumerate(reqs):
+            self.assertTrue(i == req.cookie)
+
+        reqs = None
+        gc.collect()
+
+        ret = cam.stop()
+        self.assertZero(ret)
+
+
+# Recursively expand slist's objects into olist, using seen to track already
+# processed objects.
+def _getr(slist, olist, seen):
+    for e in slist:
+        if id(e) in seen:
+            continue
+        seen.add(id(e))
+        olist.append(e)
+        tl = gc.get_referents(e)
+        if tl:
+            _getr(tl, olist, seen)
+
+
+def get_all_objects(ignored=[]):
+    gcl = gc.get_objects()
+    olist = []
+    seen = set()
+
+    seen.add(id(gcl))
+    seen.add(id(olist))
+    seen.add(id(seen))
+    seen.update(set([id(o) for o in ignored]))
+
+    _getr(gcl, olist, seen)
+
+    return olist
+
+
+def create_type_count_map(olist):
+    map = defaultdict(int)
+    for o in olist:
+        map[type(o)] += 1
+    return map
+
+
+def diff_type_count_maps(before, after):
+    return [(k, after[k] - before[k]) for k in after if after[k] != before[k]]
+
+
+if __name__ == '__main__':
+    # \todo This is an attempt to see the Python objects that are not collected,
+    # but this doesn't work very well, as things always leak a bit.
+    test_leaks = False
+
+    if test_leaks:
+        gc.unfreeze()
+        gc.collect()
+
+        obs_before = get_all_objects()
+
+    unittest.main(exit=False)
+
+    if test_leaks:
+        gc.unfreeze()
+        gc.collect()
+
+        obs_after = get_all_objects([obs_before])
+
+        before = create_type_count_map(obs_before)
+        after = create_type_count_map(obs_after)
+
+        leaks = diff_type_count_maps(before, after)
+        if len(leaks) > 0:
+            print(leaks)