Message ID | 20220505104104.70841-7-tomi.valkeinen@ideasonboard.com |
---|---|
State | Superseded |
Headers | show |
Series |
|
Related | show |
Hi Tomi, Thank you for the patch. On Thu, May 05, 2022 at 01:40:57PM +0300, Tomi Valkeinen wrote: > Add a simple unittests.py as a base for python unittests. > > Signed-off-by: Tomi Valkeinen <tomi.valkeinen@ideasonboard.com> > --- > test/meson.build | 1 + > test/py/meson.build | 17 ++ > test/py/unittests.py | 368 +++++++++++++++++++++++++++++++++++++++++++ > 3 files changed, 386 insertions(+) > create mode 100644 test/py/meson.build > create mode 100755 test/py/unittests.py > > diff --git a/test/meson.build b/test/meson.build > index fd4c5ca0..623f3baa 100644 > --- a/test/meson.build > +++ b/test/meson.build > @@ -18,6 +18,7 @@ subdir('log') > subdir('media_device') > subdir('pipeline') > subdir('process') > +subdir('py') > subdir('serialization') > subdir('stream') > subdir('v4l2_compat') > diff --git a/test/py/meson.build b/test/py/meson.build > new file mode 100644 > index 00000000..f6b42bd0 > --- /dev/null > +++ b/test/py/meson.build > @@ -0,0 +1,17 @@ > +# SPDX-License-Identifier: CC0-1.0 > + > +if not pycamera_enabled > + subdir_done() > +endif > + > +pymod = import('python') > +py3 = pymod.find_installation('python3') > + > +pypathdir = meson.project_build_root() / 'src/py' pypathdir = meson.project_build_root() / 'src' / 'py' > + > +test('pyunittests', > + py3, > + args : files('unittests.py'), > + env : ['PYTHONPATH=' + pypathdir], > + suite : 'pybindings', > + is_parallel : false) > diff --git a/test/py/unittests.py b/test/py/unittests.py > new file mode 100755 > index 00000000..15d5b4a7 > --- /dev/null > +++ b/test/py/unittests.py > @@ -0,0 +1,368 @@ > +#!/usr/bin/env python3 > + > +# SPDX-License-Identifier: GPL-2.0-or-later > +# Copyright (C) 2021, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com> > + > +from collections import defaultdict > +import errno > +import gc > +import libcamera as libcam I'm tempted to claim the "cam" name in all our Python code, or possibly "camera". What do you think ? > +import os > +import selectors > +import time > +import unittest > +import weakref > + > + > +class MyTestCase(unittest.TestCase): s/MyTestCase/BaseTestCase/ It's not yours only anymore :-) > + def assertZero(self, a, msg=None): > + self.assertEqual(a, 0, msg) > + > + > +class SimpleTestMethods(MyTestCase): > + def test_find_ref(self): > + cm = libcam.CameraManager.singleton() > + wr_cm = weakref.ref(cm) > + > + cam = cm.find("platform/vimc") Standardizing on single-quotes here too would be nice. > + self.assertIsNotNone(cam) > + wr_cam = weakref.ref(cam) > + > + cm = None > + gc.collect() > + self.assertIsNotNone(wr_cm()) > + > + cam = None > + gc.collect() > + self.assertIsNone(wr_cm()) > + self.assertIsNone(wr_cam()) > + > + def test_get_ref(self): > + cm = libcam.CameraManager.singleton() > + wr_cm = weakref.ref(cm) > + > + cam = cm.get("platform/vimc.0 Sensor B") > + self.assertTrue(cam is not None) > + wr_cam = weakref.ref(cam) > + > + cm = None > + gc.collect() > + self.assertIsNotNone(wr_cm()) > + > + cam = None > + gc.collect() > + self.assertIsNone(wr_cm()) > + self.assertIsNone(wr_cam()) > + > + def test_acquire_release(self): > + cm = libcam.CameraManager.singleton() > + cam = cm.get("platform/vimc.0 Sensor B") > + self.assertTrue(cam is not None) > + > + ret = cam.acquire() > + self.assertZero(ret) > + > + ret = cam.release() > + self.assertZero(ret) > + > + def test_double_acquire(self): > + cm = libcam.CameraManager.singleton() > + cam = cm.get("platform/vimc.0 Sensor B") > + self.assertTrue(cam is not None) > + > + ret = cam.acquire() > + self.assertZero(ret) > + > + libcam.logSetLevel("Camera", "FATAL") > + ret = cam.acquire() > + self.assertEqual(ret, -errno.EBUSY) > + libcam.logSetLevel("Camera", "ERROR") > + > + ret = cam.release() > + self.assertZero(ret) > + > + ret = cam.release() > + # I expected EBUSY, but looks like double release works fine > + self.assertZero(ret) > + > + > +class CameraTesterBase(MyTestCase): > + def setUp(self): > + self.cm = libcam.CameraManager.singleton() > + self.cam = self.cm.find("platform/vimc") > + if self.cam is None: > + self.cm = None > + raise Exception("No vimc found") Is it possible to skip the test in that case instead of failing ? This could be done on top, a todo comment somewhere in the file would be good then. > + > + ret = self.cam.acquire() > + if ret != 0: > + self.cam = None > + self.cm = None > + raise Exception("Failed to acquire camera") > + > + def tearDown(self): > + # If a test fails, the camera may be in running state. So always stop. > + self.cam.stop() > + > + ret = self.cam.release() > + if ret != 0: > + raise Exception("Failed to release camera") > + > + self.cam = None > + self.cm = None > + > + > +class AllocatorTestMethods(CameraTesterBase): > + def test_allocator(self): > + cam = self.cam > + > + camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture]) > + self.assertTrue(camconfig.size == 1) > + wr_camconfig = weakref.ref(camconfig) > + > + streamconfig = camconfig.at(0) > + wr_streamconfig = weakref.ref(streamconfig) > + > + ret = cam.configure(camconfig) > + self.assertZero(ret) > + > + stream = streamconfig.stream > + wr_stream = weakref.ref(stream) > + > + # stream should keep streamconfig and camconfig alive > + streamconfig = None > + camconfig = None > + gc.collect() > + self.assertIsNotNone(wr_camconfig()) > + self.assertIsNotNone(wr_streamconfig()) > + > + allocator = libcam.FrameBufferAllocator(cam) > + ret = allocator.allocate(stream) > + self.assertTrue(ret > 0) > + wr_allocator = weakref.ref(allocator) > + > + buffers = allocator.buffers(stream) > + buffers = None > + > + buffer = allocator.buffers(stream)[0] > + self.assertIsNotNone(buffer) > + wr_buffer = weakref.ref(buffer) > + > + allocator = None > + gc.collect() > + self.assertIsNotNone(wr_buffer()) > + self.assertIsNotNone(wr_allocator()) > + self.assertIsNotNone(wr_stream()) > + > + buffer = None > + gc.collect() > + self.assertIsNone(wr_buffer()) > + self.assertIsNone(wr_allocator()) > + self.assertIsNotNone(wr_stream()) > + > + stream = None > + gc.collect() > + self.assertIsNone(wr_stream()) > + self.assertIsNone(wr_camconfig()) > + self.assertIsNone(wr_streamconfig()) > + > + > +class SimpleCaptureMethods(CameraTesterBase): > + def test_sleep(self): > + cm = self.cm > + cam = self.cam > + > + camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture]) > + self.assertTrue(camconfig.size == 1) > + > + streamconfig = camconfig.at(0) > + fmts = streamconfig.formats > + > + ret = cam.configure(camconfig) > + self.assertZero(ret) > + > + stream = streamconfig.stream > + > + allocator = libcam.FrameBufferAllocator(cam) > + ret = allocator.allocate(stream) > + self.assertTrue(ret > 0) > + > + num_bufs = len(allocator.buffers(stream)) > + > + reqs = [] > + for i in range(num_bufs): > + req = cam.createRequest(i) > + self.assertIsNotNone(req) > + > + buffer = allocator.buffers(stream)[i] > + ret = req.addBuffer(stream, buffer) > + self.assertZero(ret) > + > + reqs.append(req) > + > + buffer = None > + > + ret = cam.start() > + self.assertZero(ret) > + > + for req in reqs: > + ret = cam.queueRequest(req) > + self.assertZero(ret) > + > + reqs = None > + gc.collect() > + > + time.sleep(0.5) > + > + reqs = cm.getReadyRequests() > + > + self.assertTrue(len(reqs) == num_bufs) > + > + for i, req in enumerate(reqs): > + self.assertTrue(i == req.cookie) > + > + reqs = None > + gc.collect() > + > + ret = cam.stop() > + self.assertZero(ret) > + > + def test_select(self): > + cm = self.cm > + cam = self.cam > + > + camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture]) > + self.assertTrue(camconfig.size == 1) > + > + streamconfig = camconfig.at(0) > + fmts = streamconfig.formats > + > + ret = cam.configure(camconfig) > + self.assertZero(ret) > + > + stream = streamconfig.stream > + > + allocator = libcam.FrameBufferAllocator(cam) > + ret = allocator.allocate(stream) > + self.assertTrue(ret > 0) > + > + num_bufs = len(allocator.buffers(stream)) > + > + reqs = [] > + for i in range(num_bufs): > + req = cam.createRequest(i) > + self.assertIsNotNone(req) > + > + buffer = allocator.buffers(stream)[i] > + ret = req.addBuffer(stream, buffer) > + self.assertZero(ret) > + > + reqs.append(req) > + > + buffer = None > + > + ret = cam.start() > + self.assertZero(ret) > + > + for req in reqs: > + ret = cam.queueRequest(req) > + self.assertZero(ret) > + > + reqs = None > + gc.collect() > + > + sel = selectors.DefaultSelector() > + sel.register(cm.efd, selectors.EVENT_READ, 123) Is the data argument needed ? > + > + reqs = [] > + > + running = True > + while running: > + events = sel.select() > + for key, mask in events: > + os.read(key.fileobj, 8) > + > + ready_reqs = cm.getReadyRequests() > + > + self.assertTrue(len(ready_reqs) > 0) > + > + reqs += ready_reqs > + > + if len(reqs) == num_bufs: > + running = False > + > + self.assertTrue(len(reqs) == num_bufs) > + > + for i, req in enumerate(reqs): > + self.assertTrue(i == req.cookie) > + > + reqs = None > + gc.collect() > + > + ret = cam.stop() > + self.assertZero(ret) > + > + > +# Recursively expand slist's objects into olist, using seen to track already > +# processed objects. > +def _getr(slist, olist, seen): > + for e in slist: > + if id(e) in seen: > + continue > + seen.add(id(e)) > + olist.append(e) > + tl = gc.get_referents(e) > + if tl: > + _getr(tl, olist, seen) > + > + > +def get_all_objects(ignored=[]): > + gcl = gc.get_objects() > + olist = [] > + seen = set() > + > + seen.add(id(gcl)) > + seen.add(id(olist)) > + seen.add(id(seen)) > + seen.update(set([id(o) for o in ignored])) > + > + _getr(gcl, olist, seen) > + > + return olist > + > + > +def create_type_count_map(olist): > + map = defaultdict(int) > + for o in olist: > + map[type(o)] += 1 > + return map > + > + > +def diff_type_count_maps(before, after): > + return [(k, after[k] - before[k]) for k in after if after[k] != before[k]] > + > + > +if __name__ == '__main__': > + # doesn't work very well, as things always leak a bit Lovely :-) Is it something we can fix ? s/doesn't/Doesn't/ Reviewed-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com> > + test_leaks = False > + > + if test_leaks: > + gc.unfreeze() > + gc.collect() > + > + obs_before = get_all_objects() > + > + unittest.main(exit=False) > + > + if test_leaks: > + gc.unfreeze() > + gc.collect() > + > + obs_after = get_all_objects([obs_before]) > + > + before = create_type_count_map(obs_before) > + after = create_type_count_map(obs_after) > + > + leaks = diff_type_count_maps(before, after) > + if len(leaks) > 0: > + print(leaks)
On 05/05/2022 20:52, Laurent Pinchart wrote: > Hi Tomi, > > Thank you for the patch. > > On Thu, May 05, 2022 at 01:40:57PM +0300, Tomi Valkeinen wrote: >> Add a simple unittests.py as a base for python unittests. >> >> Signed-off-by: Tomi Valkeinen <tomi.valkeinen@ideasonboard.com> >> --- >> test/meson.build | 1 + >> test/py/meson.build | 17 ++ >> test/py/unittests.py | 368 +++++++++++++++++++++++++++++++++++++++++++ >> 3 files changed, 386 insertions(+) >> create mode 100644 test/py/meson.build >> create mode 100755 test/py/unittests.py >> >> diff --git a/test/meson.build b/test/meson.build >> index fd4c5ca0..623f3baa 100644 >> --- a/test/meson.build >> +++ b/test/meson.build >> @@ -18,6 +18,7 @@ subdir('log') >> subdir('media_device') >> subdir('pipeline') >> subdir('process') >> +subdir('py') >> subdir('serialization') >> subdir('stream') >> subdir('v4l2_compat') >> diff --git a/test/py/meson.build b/test/py/meson.build >> new file mode 100644 >> index 00000000..f6b42bd0 >> --- /dev/null >> +++ b/test/py/meson.build >> @@ -0,0 +1,17 @@ >> +# SPDX-License-Identifier: CC0-1.0 >> + >> +if not pycamera_enabled >> + subdir_done() >> +endif >> + >> +pymod = import('python') >> +py3 = pymod.find_installation('python3') >> + >> +pypathdir = meson.project_build_root() / 'src/py' > > pypathdir = meson.project_build_root() / 'src' / 'py' > >> + >> +test('pyunittests', >> + py3, >> + args : files('unittests.py'), >> + env : ['PYTHONPATH=' + pypathdir], >> + suite : 'pybindings', >> + is_parallel : false) >> diff --git a/test/py/unittests.py b/test/py/unittests.py >> new file mode 100755 >> index 00000000..15d5b4a7 >> --- /dev/null >> +++ b/test/py/unittests.py >> @@ -0,0 +1,368 @@ >> +#!/usr/bin/env python3 >> + >> +# SPDX-License-Identifier: GPL-2.0-or-later >> +# Copyright (C) 2021, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com> >> + >> +from collections import defaultdict >> +import errno >> +import gc >> +import libcamera as libcam > > I'm tempted to claim the "cam" name in all our Python code, or possibly > "camera". What do you think ? I think 'cam' and 'camera' are names for variables that contain a camera. >> +import os >> +import selectors >> +import time >> +import unittest >> +import weakref >> + >> + >> +class MyTestCase(unittest.TestCase): > > s/MyTestCase/BaseTestCase/ > > It's not yours only anymore :-) My precious... >> + def assertZero(self, a, msg=None): >> + self.assertEqual(a, 0, msg) >> + >> + >> +class SimpleTestMethods(MyTestCase): >> + def test_find_ref(self): >> + cm = libcam.CameraManager.singleton() >> + wr_cm = weakref.ref(cm) >> + >> + cam = cm.find("platform/vimc") > > Standardizing on single-quotes here too would be nice. I can't stand them, but... Fine, it's probably best to follow the widely used convention. >> + self.assertIsNotNone(cam) >> + wr_cam = weakref.ref(cam) >> + >> + cm = None >> + gc.collect() >> + self.assertIsNotNone(wr_cm()) >> + >> + cam = None >> + gc.collect() >> + self.assertIsNone(wr_cm()) >> + self.assertIsNone(wr_cam()) >> + >> + def test_get_ref(self): >> + cm = libcam.CameraManager.singleton() >> + wr_cm = weakref.ref(cm) >> + >> + cam = cm.get("platform/vimc.0 Sensor B") >> + self.assertTrue(cam is not None) >> + wr_cam = weakref.ref(cam) >> + >> + cm = None >> + gc.collect() >> + self.assertIsNotNone(wr_cm()) >> + >> + cam = None >> + gc.collect() >> + self.assertIsNone(wr_cm()) >> + self.assertIsNone(wr_cam()) >> + >> + def test_acquire_release(self): >> + cm = libcam.CameraManager.singleton() >> + cam = cm.get("platform/vimc.0 Sensor B") >> + self.assertTrue(cam is not None) >> + >> + ret = cam.acquire() >> + self.assertZero(ret) >> + >> + ret = cam.release() >> + self.assertZero(ret) >> + >> + def test_double_acquire(self): >> + cm = libcam.CameraManager.singleton() >> + cam = cm.get("platform/vimc.0 Sensor B") >> + self.assertTrue(cam is not None) >> + >> + ret = cam.acquire() >> + self.assertZero(ret) >> + >> + libcam.logSetLevel("Camera", "FATAL") >> + ret = cam.acquire() >> + self.assertEqual(ret, -errno.EBUSY) >> + libcam.logSetLevel("Camera", "ERROR") >> + >> + ret = cam.release() >> + self.assertZero(ret) >> + >> + ret = cam.release() >> + # I expected EBUSY, but looks like double release works fine >> + self.assertZero(ret) >> + >> + >> +class CameraTesterBase(MyTestCase): >> + def setUp(self): >> + self.cm = libcam.CameraManager.singleton() >> + self.cam = self.cm.find("platform/vimc") >> + if self.cam is None: >> + self.cm = None >> + raise Exception("No vimc found") > > Is it possible to skip the test in that case instead of failing ? This > could be done on top, a todo comment somewhere in the file would be good > then. Yes, I can do that. Although some tests test finding the vimc camera, so those will still fail. >> + >> + ret = self.cam.acquire() >> + if ret != 0: >> + self.cam = None >> + self.cm = None >> + raise Exception("Failed to acquire camera") >> + >> + def tearDown(self): >> + # If a test fails, the camera may be in running state. So always stop. >> + self.cam.stop() >> + >> + ret = self.cam.release() >> + if ret != 0: >> + raise Exception("Failed to release camera") >> + >> + self.cam = None >> + self.cm = None >> + >> + >> +class AllocatorTestMethods(CameraTesterBase): >> + def test_allocator(self): >> + cam = self.cam >> + >> + camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture]) >> + self.assertTrue(camconfig.size == 1) >> + wr_camconfig = weakref.ref(camconfig) >> + >> + streamconfig = camconfig.at(0) >> + wr_streamconfig = weakref.ref(streamconfig) >> + >> + ret = cam.configure(camconfig) >> + self.assertZero(ret) >> + >> + stream = streamconfig.stream >> + wr_stream = weakref.ref(stream) >> + >> + # stream should keep streamconfig and camconfig alive >> + streamconfig = None >> + camconfig = None >> + gc.collect() >> + self.assertIsNotNone(wr_camconfig()) >> + self.assertIsNotNone(wr_streamconfig()) >> + >> + allocator = libcam.FrameBufferAllocator(cam) >> + ret = allocator.allocate(stream) >> + self.assertTrue(ret > 0) >> + wr_allocator = weakref.ref(allocator) >> + >> + buffers = allocator.buffers(stream) >> + buffers = None >> + >> + buffer = allocator.buffers(stream)[0] >> + self.assertIsNotNone(buffer) >> + wr_buffer = weakref.ref(buffer) >> + >> + allocator = None >> + gc.collect() >> + self.assertIsNotNone(wr_buffer()) >> + self.assertIsNotNone(wr_allocator()) >> + self.assertIsNotNone(wr_stream()) >> + >> + buffer = None >> + gc.collect() >> + self.assertIsNone(wr_buffer()) >> + self.assertIsNone(wr_allocator()) >> + self.assertIsNotNone(wr_stream()) >> + >> + stream = None >> + gc.collect() >> + self.assertIsNone(wr_stream()) >> + self.assertIsNone(wr_camconfig()) >> + self.assertIsNone(wr_streamconfig()) >> + >> + >> +class SimpleCaptureMethods(CameraTesterBase): >> + def test_sleep(self): >> + cm = self.cm >> + cam = self.cam >> + >> + camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture]) >> + self.assertTrue(camconfig.size == 1) >> + >> + streamconfig = camconfig.at(0) >> + fmts = streamconfig.formats >> + >> + ret = cam.configure(camconfig) >> + self.assertZero(ret) >> + >> + stream = streamconfig.stream >> + >> + allocator = libcam.FrameBufferAllocator(cam) >> + ret = allocator.allocate(stream) >> + self.assertTrue(ret > 0) >> + >> + num_bufs = len(allocator.buffers(stream)) >> + >> + reqs = [] >> + for i in range(num_bufs): >> + req = cam.createRequest(i) >> + self.assertIsNotNone(req) >> + >> + buffer = allocator.buffers(stream)[i] >> + ret = req.addBuffer(stream, buffer) >> + self.assertZero(ret) >> + >> + reqs.append(req) >> + >> + buffer = None >> + >> + ret = cam.start() >> + self.assertZero(ret) >> + >> + for req in reqs: >> + ret = cam.queueRequest(req) >> + self.assertZero(ret) >> + >> + reqs = None >> + gc.collect() >> + >> + time.sleep(0.5) >> + >> + reqs = cm.getReadyRequests() >> + >> + self.assertTrue(len(reqs) == num_bufs) >> + >> + for i, req in enumerate(reqs): >> + self.assertTrue(i == req.cookie) >> + >> + reqs = None >> + gc.collect() >> + >> + ret = cam.stop() >> + self.assertZero(ret) >> + >> + def test_select(self): >> + cm = self.cm >> + cam = self.cam >> + >> + camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture]) >> + self.assertTrue(camconfig.size == 1) >> + >> + streamconfig = camconfig.at(0) >> + fmts = streamconfig.formats >> + >> + ret = cam.configure(camconfig) >> + self.assertZero(ret) >> + >> + stream = streamconfig.stream >> + >> + allocator = libcam.FrameBufferAllocator(cam) >> + ret = allocator.allocate(stream) >> + self.assertTrue(ret > 0) >> + >> + num_bufs = len(allocator.buffers(stream)) >> + >> + reqs = [] >> + for i in range(num_bufs): >> + req = cam.createRequest(i) >> + self.assertIsNotNone(req) >> + >> + buffer = allocator.buffers(stream)[i] >> + ret = req.addBuffer(stream, buffer) >> + self.assertZero(ret) >> + >> + reqs.append(req) >> + >> + buffer = None >> + >> + ret = cam.start() >> + self.assertZero(ret) >> + >> + for req in reqs: >> + ret = cam.queueRequest(req) >> + self.assertZero(ret) >> + >> + reqs = None >> + gc.collect() >> + >> + sel = selectors.DefaultSelector() >> + sel.register(cm.efd, selectors.EVENT_READ, 123) > > Is the data argument needed ? No, it's not. >> + >> + reqs = [] >> + >> + running = True >> + while running: >> + events = sel.select() >> + for key, mask in events: >> + os.read(key.fileobj, 8) >> + >> + ready_reqs = cm.getReadyRequests() >> + >> + self.assertTrue(len(ready_reqs) > 0) >> + >> + reqs += ready_reqs >> + >> + if len(reqs) == num_bufs: >> + running = False >> + >> + self.assertTrue(len(reqs) == num_bufs) >> + >> + for i, req in enumerate(reqs): >> + self.assertTrue(i == req.cookie) >> + >> + reqs = None >> + gc.collect() >> + >> + ret = cam.stop() >> + self.assertZero(ret) >> + >> + >> +# Recursively expand slist's objects into olist, using seen to track already >> +# processed objects. >> +def _getr(slist, olist, seen): >> + for e in slist: >> + if id(e) in seen: >> + continue >> + seen.add(id(e)) >> + olist.append(e) >> + tl = gc.get_referents(e) >> + if tl: >> + _getr(tl, olist, seen) >> + >> + >> +def get_all_objects(ignored=[]): >> + gcl = gc.get_objects() >> + olist = [] >> + seen = set() >> + >> + seen.add(id(gcl)) >> + seen.add(id(olist)) >> + seen.add(id(seen)) >> + seen.update(set([id(o) for o in ignored])) >> + >> + _getr(gcl, olist, seen) >> + >> + return olist >> + >> + >> +def create_type_count_map(olist): >> + map = defaultdict(int) >> + for o in olist: >> + map[type(o)] += 1 >> + return map >> + >> + >> +def diff_type_count_maps(before, after): >> + return [(k, after[k] - before[k]) for k in after if after[k] != before[k]] >> + >> + >> +if __name__ == '__main__': >> + # doesn't work very well, as things always leak a bit > > Lovely :-) Is it something we can fix ? I have no idea if this method is a valid way to observe object leaks, so... no clue. I think it can still be used, but manually, comparing object lists when doing some changes to the tests. Tomi
Hi Tomi, On Fri, May 06, 2022 at 01:59:38PM +0300, Tomi Valkeinen wrote: > On 05/05/2022 20:52, Laurent Pinchart wrote: > > On Thu, May 05, 2022 at 01:40:57PM +0300, Tomi Valkeinen wrote: > >> Add a simple unittests.py as a base for python unittests. > >> > >> Signed-off-by: Tomi Valkeinen <tomi.valkeinen@ideasonboard.com> > >> --- > >> test/meson.build | 1 + > >> test/py/meson.build | 17 ++ > >> test/py/unittests.py | 368 +++++++++++++++++++++++++++++++++++++++++++ > >> 3 files changed, 386 insertions(+) > >> create mode 100644 test/py/meson.build > >> create mode 100755 test/py/unittests.py > >> > >> diff --git a/test/meson.build b/test/meson.build > >> index fd4c5ca0..623f3baa 100644 > >> --- a/test/meson.build > >> +++ b/test/meson.build > >> @@ -18,6 +18,7 @@ subdir('log') > >> subdir('media_device') > >> subdir('pipeline') > >> subdir('process') > >> +subdir('py') > >> subdir('serialization') > >> subdir('stream') > >> subdir('v4l2_compat') > >> diff --git a/test/py/meson.build b/test/py/meson.build > >> new file mode 100644 > >> index 00000000..f6b42bd0 > >> --- /dev/null > >> +++ b/test/py/meson.build > >> @@ -0,0 +1,17 @@ > >> +# SPDX-License-Identifier: CC0-1.0 > >> + > >> +if not pycamera_enabled > >> + subdir_done() > >> +endif > >> + > >> +pymod = import('python') > >> +py3 = pymod.find_installation('python3') > >> + > >> +pypathdir = meson.project_build_root() / 'src/py' > > > > pypathdir = meson.project_build_root() / 'src' / 'py' > > > >> + > >> +test('pyunittests', > >> + py3, > >> + args : files('unittests.py'), > >> + env : ['PYTHONPATH=' + pypathdir], > >> + suite : 'pybindings', > >> + is_parallel : false) > >> diff --git a/test/py/unittests.py b/test/py/unittests.py > >> new file mode 100755 > >> index 00000000..15d5b4a7 > >> --- /dev/null > >> +++ b/test/py/unittests.py > >> @@ -0,0 +1,368 @@ > >> +#!/usr/bin/env python3 > >> + > >> +# SPDX-License-Identifier: GPL-2.0-or-later > >> +# Copyright (C) 2021, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com> > >> + > >> +from collections import defaultdict > >> +import errno > >> +import gc > >> +import libcamera as libcam > > > > I'm tempted to claim the "cam" name in all our Python code, or possibly > > "camera". What do you think ? > > I think 'cam' and 'camera' are names for variables that contain a camera. That's a very good point, even if "cam" would also be a good name for the module. Any idea from anyone on what would be a good short module name alias ? We could also use it as a libcamera namespace alias in C++ code examples for consistency. > >> +import os > >> +import selectors > >> +import time > >> +import unittest > >> +import weakref > >> + > >> + > >> +class MyTestCase(unittest.TestCase): > > > > s/MyTestCase/BaseTestCase/ > > > > It's not yours only anymore :-) > > My precious... > > >> + def assertZero(self, a, msg=None): > >> + self.assertEqual(a, 0, msg) > >> + > >> + > >> +class SimpleTestMethods(MyTestCase): > >> + def test_find_ref(self): > >> + cm = libcam.CameraManager.singleton() > >> + wr_cm = weakref.ref(cm) > >> + > >> + cam = cm.find("platform/vimc") > > > > Standardizing on single-quotes here too would be nice. > > I can't stand them, but... Fine, it's probably best to follow the widely > used convention. I think there's a PEP that recommends single quotes. It took me a while to get used to it. > >> + self.assertIsNotNone(cam) > >> + wr_cam = weakref.ref(cam) > >> + > >> + cm = None > >> + gc.collect() > >> + self.assertIsNotNone(wr_cm()) > >> + > >> + cam = None > >> + gc.collect() > >> + self.assertIsNone(wr_cm()) > >> + self.assertIsNone(wr_cam()) > >> + > >> + def test_get_ref(self): > >> + cm = libcam.CameraManager.singleton() > >> + wr_cm = weakref.ref(cm) > >> + > >> + cam = cm.get("platform/vimc.0 Sensor B") > >> + self.assertTrue(cam is not None) > >> + wr_cam = weakref.ref(cam) > >> + > >> + cm = None > >> + gc.collect() > >> + self.assertIsNotNone(wr_cm()) > >> + > >> + cam = None > >> + gc.collect() > >> + self.assertIsNone(wr_cm()) > >> + self.assertIsNone(wr_cam()) > >> + > >> + def test_acquire_release(self): > >> + cm = libcam.CameraManager.singleton() > >> + cam = cm.get("platform/vimc.0 Sensor B") > >> + self.assertTrue(cam is not None) > >> + > >> + ret = cam.acquire() > >> + self.assertZero(ret) > >> + > >> + ret = cam.release() > >> + self.assertZero(ret) > >> + > >> + def test_double_acquire(self): > >> + cm = libcam.CameraManager.singleton() > >> + cam = cm.get("platform/vimc.0 Sensor B") > >> + self.assertTrue(cam is not None) > >> + > >> + ret = cam.acquire() > >> + self.assertZero(ret) > >> + > >> + libcam.logSetLevel("Camera", "FATAL") > >> + ret = cam.acquire() > >> + self.assertEqual(ret, -errno.EBUSY) > >> + libcam.logSetLevel("Camera", "ERROR") > >> + > >> + ret = cam.release() > >> + self.assertZero(ret) > >> + > >> + ret = cam.release() > >> + # I expected EBUSY, but looks like double release works fine > >> + self.assertZero(ret) > >> + > >> + > >> +class CameraTesterBase(MyTestCase): > >> + def setUp(self): > >> + self.cm = libcam.CameraManager.singleton() > >> + self.cam = self.cm.find("platform/vimc") > >> + if self.cam is None: > >> + self.cm = None > >> + raise Exception("No vimc found") > > > > Is it possible to skip the test in that case instead of failing ? This > > could be done on top, a todo comment somewhere in the file would be good > > then. > > Yes, I can do that. Although some tests test finding the vimc camera, so > those will still fail. > > >> + > >> + ret = self.cam.acquire() > >> + if ret != 0: > >> + self.cam = None > >> + self.cm = None > >> + raise Exception("Failed to acquire camera") > >> + > >> + def tearDown(self): > >> + # If a test fails, the camera may be in running state. So always stop. > >> + self.cam.stop() > >> + > >> + ret = self.cam.release() > >> + if ret != 0: > >> + raise Exception("Failed to release camera") > >> + > >> + self.cam = None > >> + self.cm = None > >> + > >> + > >> +class AllocatorTestMethods(CameraTesterBase): > >> + def test_allocator(self): > >> + cam = self.cam > >> + > >> + camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture]) > >> + self.assertTrue(camconfig.size == 1) > >> + wr_camconfig = weakref.ref(camconfig) > >> + > >> + streamconfig = camconfig.at(0) > >> + wr_streamconfig = weakref.ref(streamconfig) > >> + > >> + ret = cam.configure(camconfig) > >> + self.assertZero(ret) > >> + > >> + stream = streamconfig.stream > >> + wr_stream = weakref.ref(stream) > >> + > >> + # stream should keep streamconfig and camconfig alive > >> + streamconfig = None > >> + camconfig = None > >> + gc.collect() > >> + self.assertIsNotNone(wr_camconfig()) > >> + self.assertIsNotNone(wr_streamconfig()) > >> + > >> + allocator = libcam.FrameBufferAllocator(cam) > >> + ret = allocator.allocate(stream) > >> + self.assertTrue(ret > 0) > >> + wr_allocator = weakref.ref(allocator) > >> + > >> + buffers = allocator.buffers(stream) > >> + buffers = None > >> + > >> + buffer = allocator.buffers(stream)[0] > >> + self.assertIsNotNone(buffer) > >> + wr_buffer = weakref.ref(buffer) > >> + > >> + allocator = None > >> + gc.collect() > >> + self.assertIsNotNone(wr_buffer()) > >> + self.assertIsNotNone(wr_allocator()) > >> + self.assertIsNotNone(wr_stream()) > >> + > >> + buffer = None > >> + gc.collect() > >> + self.assertIsNone(wr_buffer()) > >> + self.assertIsNone(wr_allocator()) > >> + self.assertIsNotNone(wr_stream()) > >> + > >> + stream = None > >> + gc.collect() > >> + self.assertIsNone(wr_stream()) > >> + self.assertIsNone(wr_camconfig()) > >> + self.assertIsNone(wr_streamconfig()) > >> + > >> + > >> +class SimpleCaptureMethods(CameraTesterBase): > >> + def test_sleep(self): > >> + cm = self.cm > >> + cam = self.cam > >> + > >> + camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture]) > >> + self.assertTrue(camconfig.size == 1) > >> + > >> + streamconfig = camconfig.at(0) > >> + fmts = streamconfig.formats > >> + > >> + ret = cam.configure(camconfig) > >> + self.assertZero(ret) > >> + > >> + stream = streamconfig.stream > >> + > >> + allocator = libcam.FrameBufferAllocator(cam) > >> + ret = allocator.allocate(stream) > >> + self.assertTrue(ret > 0) > >> + > >> + num_bufs = len(allocator.buffers(stream)) > >> + > >> + reqs = [] > >> + for i in range(num_bufs): > >> + req = cam.createRequest(i) > >> + self.assertIsNotNone(req) > >> + > >> + buffer = allocator.buffers(stream)[i] > >> + ret = req.addBuffer(stream, buffer) > >> + self.assertZero(ret) > >> + > >> + reqs.append(req) > >> + > >> + buffer = None > >> + > >> + ret = cam.start() > >> + self.assertZero(ret) > >> + > >> + for req in reqs: > >> + ret = cam.queueRequest(req) > >> + self.assertZero(ret) > >> + > >> + reqs = None > >> + gc.collect() > >> + > >> + time.sleep(0.5) > >> + > >> + reqs = cm.getReadyRequests() > >> + > >> + self.assertTrue(len(reqs) == num_bufs) > >> + > >> + for i, req in enumerate(reqs): > >> + self.assertTrue(i == req.cookie) > >> + > >> + reqs = None > >> + gc.collect() > >> + > >> + ret = cam.stop() > >> + self.assertZero(ret) > >> + > >> + def test_select(self): > >> + cm = self.cm > >> + cam = self.cam > >> + > >> + camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture]) > >> + self.assertTrue(camconfig.size == 1) > >> + > >> + streamconfig = camconfig.at(0) > >> + fmts = streamconfig.formats > >> + > >> + ret = cam.configure(camconfig) > >> + self.assertZero(ret) > >> + > >> + stream = streamconfig.stream > >> + > >> + allocator = libcam.FrameBufferAllocator(cam) > >> + ret = allocator.allocate(stream) > >> + self.assertTrue(ret > 0) > >> + > >> + num_bufs = len(allocator.buffers(stream)) > >> + > >> + reqs = [] > >> + for i in range(num_bufs): > >> + req = cam.createRequest(i) > >> + self.assertIsNotNone(req) > >> + > >> + buffer = allocator.buffers(stream)[i] > >> + ret = req.addBuffer(stream, buffer) > >> + self.assertZero(ret) > >> + > >> + reqs.append(req) > >> + > >> + buffer = None > >> + > >> + ret = cam.start() > >> + self.assertZero(ret) > >> + > >> + for req in reqs: > >> + ret = cam.queueRequest(req) > >> + self.assertZero(ret) > >> + > >> + reqs = None > >> + gc.collect() > >> + > >> + sel = selectors.DefaultSelector() > >> + sel.register(cm.efd, selectors.EVENT_READ, 123) > > > > Is the data argument needed ? > > No, it's not. > > >> + > >> + reqs = [] > >> + > >> + running = True > >> + while running: > >> + events = sel.select() > >> + for key, mask in events: > >> + os.read(key.fileobj, 8) > >> + > >> + ready_reqs = cm.getReadyRequests() > >> + > >> + self.assertTrue(len(ready_reqs) > 0) > >> + > >> + reqs += ready_reqs > >> + > >> + if len(reqs) == num_bufs: > >> + running = False > >> + > >> + self.assertTrue(len(reqs) == num_bufs) > >> + > >> + for i, req in enumerate(reqs): > >> + self.assertTrue(i == req.cookie) > >> + > >> + reqs = None > >> + gc.collect() > >> + > >> + ret = cam.stop() > >> + self.assertZero(ret) > >> + > >> + > >> +# Recursively expand slist's objects into olist, using seen to track already > >> +# processed objects. > >> +def _getr(slist, olist, seen): > >> + for e in slist: > >> + if id(e) in seen: > >> + continue > >> + seen.add(id(e)) > >> + olist.append(e) > >> + tl = gc.get_referents(e) > >> + if tl: > >> + _getr(tl, olist, seen) > >> + > >> + > >> +def get_all_objects(ignored=[]): > >> + gcl = gc.get_objects() > >> + olist = [] > >> + seen = set() > >> + > >> + seen.add(id(gcl)) > >> + seen.add(id(olist)) > >> + seen.add(id(seen)) > >> + seen.update(set([id(o) for o in ignored])) > >> + > >> + _getr(gcl, olist, seen) > >> + > >> + return olist > >> + > >> + > >> +def create_type_count_map(olist): > >> + map = defaultdict(int) > >> + for o in olist: > >> + map[type(o)] += 1 > >> + return map > >> + > >> + > >> +def diff_type_count_maps(before, after): > >> + return [(k, after[k] - before[k]) for k in after if after[k] != before[k]] > >> + > >> + > >> +if __name__ == '__main__': > >> + # doesn't work very well, as things always leak a bit > > > > Lovely :-) Is it something we can fix ? > > I have no idea if this method is a valid way to observe object leaks, > so... no clue. I think it can still be used, but manually, comparing > object lists when doing some changes to the tests. We can figure it out later.
diff --git a/test/meson.build b/test/meson.build index fd4c5ca0..623f3baa 100644 --- a/test/meson.build +++ b/test/meson.build @@ -18,6 +18,7 @@ subdir('log') subdir('media_device') subdir('pipeline') subdir('process') +subdir('py') subdir('serialization') subdir('stream') subdir('v4l2_compat') diff --git a/test/py/meson.build b/test/py/meson.build new file mode 100644 index 00000000..f6b42bd0 --- /dev/null +++ b/test/py/meson.build @@ -0,0 +1,17 @@ +# SPDX-License-Identifier: CC0-1.0 + +if not pycamera_enabled + subdir_done() +endif + +pymod = import('python') +py3 = pymod.find_installation('python3') + +pypathdir = meson.project_build_root() / 'src/py' + +test('pyunittests', + py3, + args : files('unittests.py'), + env : ['PYTHONPATH=' + pypathdir], + suite : 'pybindings', + is_parallel : false) diff --git a/test/py/unittests.py b/test/py/unittests.py new file mode 100755 index 00000000..15d5b4a7 --- /dev/null +++ b/test/py/unittests.py @@ -0,0 +1,368 @@ +#!/usr/bin/env python3 + +# SPDX-License-Identifier: GPL-2.0-or-later +# Copyright (C) 2021, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com> + +from collections import defaultdict +import errno +import gc +import libcamera as libcam +import os +import selectors +import time +import unittest +import weakref + + +class MyTestCase(unittest.TestCase): + def assertZero(self, a, msg=None): + self.assertEqual(a, 0, msg) + + +class SimpleTestMethods(MyTestCase): + def test_find_ref(self): + cm = libcam.CameraManager.singleton() + wr_cm = weakref.ref(cm) + + cam = cm.find("platform/vimc") + self.assertIsNotNone(cam) + wr_cam = weakref.ref(cam) + + cm = None + gc.collect() + self.assertIsNotNone(wr_cm()) + + cam = None + gc.collect() + self.assertIsNone(wr_cm()) + self.assertIsNone(wr_cam()) + + def test_get_ref(self): + cm = libcam.CameraManager.singleton() + wr_cm = weakref.ref(cm) + + cam = cm.get("platform/vimc.0 Sensor B") + self.assertTrue(cam is not None) + wr_cam = weakref.ref(cam) + + cm = None + gc.collect() + self.assertIsNotNone(wr_cm()) + + cam = None + gc.collect() + self.assertIsNone(wr_cm()) + self.assertIsNone(wr_cam()) + + def test_acquire_release(self): + cm = libcam.CameraManager.singleton() + cam = cm.get("platform/vimc.0 Sensor B") + self.assertTrue(cam is not None) + + ret = cam.acquire() + self.assertZero(ret) + + ret = cam.release() + self.assertZero(ret) + + def test_double_acquire(self): + cm = libcam.CameraManager.singleton() + cam = cm.get("platform/vimc.0 Sensor B") + self.assertTrue(cam is not None) + + ret = cam.acquire() + self.assertZero(ret) + + libcam.logSetLevel("Camera", "FATAL") + ret = cam.acquire() + self.assertEqual(ret, -errno.EBUSY) + libcam.logSetLevel("Camera", "ERROR") + + ret = cam.release() + self.assertZero(ret) + + ret = cam.release() + # I expected EBUSY, but looks like double release works fine + self.assertZero(ret) + + +class CameraTesterBase(MyTestCase): + def setUp(self): + self.cm = libcam.CameraManager.singleton() + self.cam = self.cm.find("platform/vimc") + if self.cam is None: + self.cm = None + raise Exception("No vimc found") + + ret = self.cam.acquire() + if ret != 0: + self.cam = None + self.cm = None + raise Exception("Failed to acquire camera") + + def tearDown(self): + # If a test fails, the camera may be in running state. So always stop. + self.cam.stop() + + ret = self.cam.release() + if ret != 0: + raise Exception("Failed to release camera") + + self.cam = None + self.cm = None + + +class AllocatorTestMethods(CameraTesterBase): + def test_allocator(self): + cam = self.cam + + camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture]) + self.assertTrue(camconfig.size == 1) + wr_camconfig = weakref.ref(camconfig) + + streamconfig = camconfig.at(0) + wr_streamconfig = weakref.ref(streamconfig) + + ret = cam.configure(camconfig) + self.assertZero(ret) + + stream = streamconfig.stream + wr_stream = weakref.ref(stream) + + # stream should keep streamconfig and camconfig alive + streamconfig = None + camconfig = None + gc.collect() + self.assertIsNotNone(wr_camconfig()) + self.assertIsNotNone(wr_streamconfig()) + + allocator = libcam.FrameBufferAllocator(cam) + ret = allocator.allocate(stream) + self.assertTrue(ret > 0) + wr_allocator = weakref.ref(allocator) + + buffers = allocator.buffers(stream) + buffers = None + + buffer = allocator.buffers(stream)[0] + self.assertIsNotNone(buffer) + wr_buffer = weakref.ref(buffer) + + allocator = None + gc.collect() + self.assertIsNotNone(wr_buffer()) + self.assertIsNotNone(wr_allocator()) + self.assertIsNotNone(wr_stream()) + + buffer = None + gc.collect() + self.assertIsNone(wr_buffer()) + self.assertIsNone(wr_allocator()) + self.assertIsNotNone(wr_stream()) + + stream = None + gc.collect() + self.assertIsNone(wr_stream()) + self.assertIsNone(wr_camconfig()) + self.assertIsNone(wr_streamconfig()) + + +class SimpleCaptureMethods(CameraTesterBase): + def test_sleep(self): + cm = self.cm + cam = self.cam + + camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture]) + self.assertTrue(camconfig.size == 1) + + streamconfig = camconfig.at(0) + fmts = streamconfig.formats + + ret = cam.configure(camconfig) + self.assertZero(ret) + + stream = streamconfig.stream + + allocator = libcam.FrameBufferAllocator(cam) + ret = allocator.allocate(stream) + self.assertTrue(ret > 0) + + num_bufs = len(allocator.buffers(stream)) + + reqs = [] + for i in range(num_bufs): + req = cam.createRequest(i) + self.assertIsNotNone(req) + + buffer = allocator.buffers(stream)[i] + ret = req.addBuffer(stream, buffer) + self.assertZero(ret) + + reqs.append(req) + + buffer = None + + ret = cam.start() + self.assertZero(ret) + + for req in reqs: + ret = cam.queueRequest(req) + self.assertZero(ret) + + reqs = None + gc.collect() + + time.sleep(0.5) + + reqs = cm.getReadyRequests() + + self.assertTrue(len(reqs) == num_bufs) + + for i, req in enumerate(reqs): + self.assertTrue(i == req.cookie) + + reqs = None + gc.collect() + + ret = cam.stop() + self.assertZero(ret) + + def test_select(self): + cm = self.cm + cam = self.cam + + camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture]) + self.assertTrue(camconfig.size == 1) + + streamconfig = camconfig.at(0) + fmts = streamconfig.formats + + ret = cam.configure(camconfig) + self.assertZero(ret) + + stream = streamconfig.stream + + allocator = libcam.FrameBufferAllocator(cam) + ret = allocator.allocate(stream) + self.assertTrue(ret > 0) + + num_bufs = len(allocator.buffers(stream)) + + reqs = [] + for i in range(num_bufs): + req = cam.createRequest(i) + self.assertIsNotNone(req) + + buffer = allocator.buffers(stream)[i] + ret = req.addBuffer(stream, buffer) + self.assertZero(ret) + + reqs.append(req) + + buffer = None + + ret = cam.start() + self.assertZero(ret) + + for req in reqs: + ret = cam.queueRequest(req) + self.assertZero(ret) + + reqs = None + gc.collect() + + sel = selectors.DefaultSelector() + sel.register(cm.efd, selectors.EVENT_READ, 123) + + reqs = [] + + running = True + while running: + events = sel.select() + for key, mask in events: + os.read(key.fileobj, 8) + + ready_reqs = cm.getReadyRequests() + + self.assertTrue(len(ready_reqs) > 0) + + reqs += ready_reqs + + if len(reqs) == num_bufs: + running = False + + self.assertTrue(len(reqs) == num_bufs) + + for i, req in enumerate(reqs): + self.assertTrue(i == req.cookie) + + reqs = None + gc.collect() + + ret = cam.stop() + self.assertZero(ret) + + +# Recursively expand slist's objects into olist, using seen to track already +# processed objects. +def _getr(slist, olist, seen): + for e in slist: + if id(e) in seen: + continue + seen.add(id(e)) + olist.append(e) + tl = gc.get_referents(e) + if tl: + _getr(tl, olist, seen) + + +def get_all_objects(ignored=[]): + gcl = gc.get_objects() + olist = [] + seen = set() + + seen.add(id(gcl)) + seen.add(id(olist)) + seen.add(id(seen)) + seen.update(set([id(o) for o in ignored])) + + _getr(gcl, olist, seen) + + return olist + + +def create_type_count_map(olist): + map = defaultdict(int) + for o in olist: + map[type(o)] += 1 + return map + + +def diff_type_count_maps(before, after): + return [(k, after[k] - before[k]) for k in after if after[k] != before[k]] + + +if __name__ == '__main__': + # doesn't work very well, as things always leak a bit + test_leaks = False + + if test_leaks: + gc.unfreeze() + gc.collect() + + obs_before = get_all_objects() + + unittest.main(exit=False) + + if test_leaks: + gc.unfreeze() + gc.collect() + + obs_after = get_all_objects([obs_before]) + + before = create_type_count_map(obs_before) + after = create_type_count_map(obs_after) + + leaks = diff_type_count_maps(before, after) + if len(leaks) > 0: + print(leaks)
Add a simple unittests.py as a base for python unittests. Signed-off-by: Tomi Valkeinen <tomi.valkeinen@ideasonboard.com> --- test/meson.build | 1 + test/py/meson.build | 17 ++ test/py/unittests.py | 368 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 386 insertions(+) create mode 100644 test/py/meson.build create mode 100755 test/py/unittests.py