[libcamera-devel,v7,06/13] py: add unittests.py
diff mbox series

Message ID 20220505104104.70841-7-tomi.valkeinen@ideasonboard.com
State Superseded
Headers show
Series
  • Python bindings
Related show

Commit Message

Tomi Valkeinen May 5, 2022, 10:40 a.m. UTC
Add a simple unittests.py as a base for python unittests.

Signed-off-by: Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
---
 test/meson.build     |   1 +
 test/py/meson.build  |  17 ++
 test/py/unittests.py | 368 +++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 386 insertions(+)
 create mode 100644 test/py/meson.build
 create mode 100755 test/py/unittests.py

Comments

Laurent Pinchart May 5, 2022, 5:52 p.m. UTC | #1
Hi Tomi,

Thank you for the patch.

On Thu, May 05, 2022 at 01:40:57PM +0300, Tomi Valkeinen wrote:
> Add a simple unittests.py as a base for python unittests.
> 
> Signed-off-by: Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
> ---
>  test/meson.build     |   1 +
>  test/py/meson.build  |  17 ++
>  test/py/unittests.py | 368 +++++++++++++++++++++++++++++++++++++++++++
>  3 files changed, 386 insertions(+)
>  create mode 100644 test/py/meson.build
>  create mode 100755 test/py/unittests.py
> 
> diff --git a/test/meson.build b/test/meson.build
> index fd4c5ca0..623f3baa 100644
> --- a/test/meson.build
> +++ b/test/meson.build
> @@ -18,6 +18,7 @@ subdir('log')
>  subdir('media_device')
>  subdir('pipeline')
>  subdir('process')
> +subdir('py')
>  subdir('serialization')
>  subdir('stream')
>  subdir('v4l2_compat')
> diff --git a/test/py/meson.build b/test/py/meson.build
> new file mode 100644
> index 00000000..f6b42bd0
> --- /dev/null
> +++ b/test/py/meson.build
> @@ -0,0 +1,17 @@
> +# SPDX-License-Identifier: CC0-1.0
> +
> +if not pycamera_enabled
> +    subdir_done()
> +endif
> +
> +pymod = import('python')
> +py3 = pymod.find_installation('python3')
> +
> +pypathdir = meson.project_build_root() / 'src/py'

pypathdir = meson.project_build_root() / 'src' / 'py'

> +
> +test('pyunittests',
> +     py3,
> +     args : files('unittests.py'),
> +     env : ['PYTHONPATH=' + pypathdir],
> +     suite : 'pybindings',
> +     is_parallel : false)
> diff --git a/test/py/unittests.py b/test/py/unittests.py
> new file mode 100755
> index 00000000..15d5b4a7
> --- /dev/null
> +++ b/test/py/unittests.py
> @@ -0,0 +1,368 @@
> +#!/usr/bin/env python3
> +
> +# SPDX-License-Identifier: GPL-2.0-or-later
> +# Copyright (C) 2021, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
> +
> +from collections import defaultdict
> +import errno
> +import gc
> +import libcamera as libcam

I'm tempted to claim the "cam" name in all our Python code, or possibly
"camera". What do you think ?

> +import os
> +import selectors
> +import time
> +import unittest
> +import weakref
> +
> +
> +class MyTestCase(unittest.TestCase):

s/MyTestCase/BaseTestCase/

It's not yours only anymore :-)

> +    def assertZero(self, a, msg=None):
> +        self.assertEqual(a, 0, msg)
> +
> +
> +class SimpleTestMethods(MyTestCase):
> +    def test_find_ref(self):
> +        cm = libcam.CameraManager.singleton()
> +        wr_cm = weakref.ref(cm)
> +
> +        cam = cm.find("platform/vimc")

Standardizing on single-quotes here too would be nice.

> +        self.assertIsNotNone(cam)
> +        wr_cam = weakref.ref(cam)
> +
> +        cm = None
> +        gc.collect()
> +        self.assertIsNotNone(wr_cm())
> +
> +        cam = None
> +        gc.collect()
> +        self.assertIsNone(wr_cm())
> +        self.assertIsNone(wr_cam())
> +
> +    def test_get_ref(self):
> +        cm = libcam.CameraManager.singleton()
> +        wr_cm = weakref.ref(cm)
> +
> +        cam = cm.get("platform/vimc.0 Sensor B")
> +        self.assertTrue(cam is not None)
> +        wr_cam = weakref.ref(cam)
> +
> +        cm = None
> +        gc.collect()
> +        self.assertIsNotNone(wr_cm())
> +
> +        cam = None
> +        gc.collect()
> +        self.assertIsNone(wr_cm())
> +        self.assertIsNone(wr_cam())
> +
> +    def test_acquire_release(self):
> +        cm = libcam.CameraManager.singleton()
> +        cam = cm.get("platform/vimc.0 Sensor B")
> +        self.assertTrue(cam is not None)
> +
> +        ret = cam.acquire()
> +        self.assertZero(ret)
> +
> +        ret = cam.release()
> +        self.assertZero(ret)
> +
> +    def test_double_acquire(self):
> +        cm = libcam.CameraManager.singleton()
> +        cam = cm.get("platform/vimc.0 Sensor B")
> +        self.assertTrue(cam is not None)
> +
> +        ret = cam.acquire()
> +        self.assertZero(ret)
> +
> +        libcam.logSetLevel("Camera", "FATAL")
> +        ret = cam.acquire()
> +        self.assertEqual(ret, -errno.EBUSY)
> +        libcam.logSetLevel("Camera", "ERROR")
> +
> +        ret = cam.release()
> +        self.assertZero(ret)
> +
> +        ret = cam.release()
> +        # I expected EBUSY, but looks like double release works fine
> +        self.assertZero(ret)
> +
> +
> +class CameraTesterBase(MyTestCase):
> +    def setUp(self):
> +        self.cm = libcam.CameraManager.singleton()
> +        self.cam = self.cm.find("platform/vimc")
> +        if self.cam is None:
> +            self.cm = None
> +            raise Exception("No vimc found")

Is it possible to skip the test in that case instead of failing ? This
could be done on top, a todo comment somewhere in the file would be good
then.

> +
> +        ret = self.cam.acquire()
> +        if ret != 0:
> +            self.cam = None
> +            self.cm = None
> +            raise Exception("Failed to acquire camera")
> +
> +    def tearDown(self):
> +        # If a test fails, the camera may be in running state. So always stop.
> +        self.cam.stop()
> +
> +        ret = self.cam.release()
> +        if ret != 0:
> +            raise Exception("Failed to release camera")
> +
> +        self.cam = None
> +        self.cm = None
> +
> +
> +class AllocatorTestMethods(CameraTesterBase):
> +    def test_allocator(self):
> +        cam = self.cam
> +
> +        camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture])
> +        self.assertTrue(camconfig.size == 1)
> +        wr_camconfig = weakref.ref(camconfig)
> +
> +        streamconfig = camconfig.at(0)
> +        wr_streamconfig = weakref.ref(streamconfig)
> +
> +        ret = cam.configure(camconfig)
> +        self.assertZero(ret)
> +
> +        stream = streamconfig.stream
> +        wr_stream = weakref.ref(stream)
> +
> +        # stream should keep streamconfig and camconfig alive
> +        streamconfig = None
> +        camconfig = None
> +        gc.collect()
> +        self.assertIsNotNone(wr_camconfig())
> +        self.assertIsNotNone(wr_streamconfig())
> +
> +        allocator = libcam.FrameBufferAllocator(cam)
> +        ret = allocator.allocate(stream)
> +        self.assertTrue(ret > 0)
> +        wr_allocator = weakref.ref(allocator)
> +
> +        buffers = allocator.buffers(stream)
> +        buffers = None
> +
> +        buffer = allocator.buffers(stream)[0]
> +        self.assertIsNotNone(buffer)
> +        wr_buffer = weakref.ref(buffer)
> +
> +        allocator = None
> +        gc.collect()
> +        self.assertIsNotNone(wr_buffer())
> +        self.assertIsNotNone(wr_allocator())
> +        self.assertIsNotNone(wr_stream())
> +
> +        buffer = None
> +        gc.collect()
> +        self.assertIsNone(wr_buffer())
> +        self.assertIsNone(wr_allocator())
> +        self.assertIsNotNone(wr_stream())
> +
> +        stream = None
> +        gc.collect()
> +        self.assertIsNone(wr_stream())
> +        self.assertIsNone(wr_camconfig())
> +        self.assertIsNone(wr_streamconfig())
> +
> +
> +class SimpleCaptureMethods(CameraTesterBase):
> +    def test_sleep(self):
> +        cm = self.cm
> +        cam = self.cam
> +
> +        camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture])
> +        self.assertTrue(camconfig.size == 1)
> +
> +        streamconfig = camconfig.at(0)
> +        fmts = streamconfig.formats
> +
> +        ret = cam.configure(camconfig)
> +        self.assertZero(ret)
> +
> +        stream = streamconfig.stream
> +
> +        allocator = libcam.FrameBufferAllocator(cam)
> +        ret = allocator.allocate(stream)
> +        self.assertTrue(ret > 0)
> +
> +        num_bufs = len(allocator.buffers(stream))
> +
> +        reqs = []
> +        for i in range(num_bufs):
> +            req = cam.createRequest(i)
> +            self.assertIsNotNone(req)
> +
> +            buffer = allocator.buffers(stream)[i]
> +            ret = req.addBuffer(stream, buffer)
> +            self.assertZero(ret)
> +
> +            reqs.append(req)
> +
> +        buffer = None
> +
> +        ret = cam.start()
> +        self.assertZero(ret)
> +
> +        for req in reqs:
> +            ret = cam.queueRequest(req)
> +            self.assertZero(ret)
> +
> +        reqs = None
> +        gc.collect()
> +
> +        time.sleep(0.5)
> +
> +        reqs = cm.getReadyRequests()
> +
> +        self.assertTrue(len(reqs) == num_bufs)
> +
> +        for i, req in enumerate(reqs):
> +            self.assertTrue(i == req.cookie)
> +
> +        reqs = None
> +        gc.collect()
> +
> +        ret = cam.stop()
> +        self.assertZero(ret)
> +
> +    def test_select(self):
> +        cm = self.cm
> +        cam = self.cam
> +
> +        camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture])
> +        self.assertTrue(camconfig.size == 1)
> +
> +        streamconfig = camconfig.at(0)
> +        fmts = streamconfig.formats
> +
> +        ret = cam.configure(camconfig)
> +        self.assertZero(ret)
> +
> +        stream = streamconfig.stream
> +
> +        allocator = libcam.FrameBufferAllocator(cam)
> +        ret = allocator.allocate(stream)
> +        self.assertTrue(ret > 0)
> +
> +        num_bufs = len(allocator.buffers(stream))
> +
> +        reqs = []
> +        for i in range(num_bufs):
> +            req = cam.createRequest(i)
> +            self.assertIsNotNone(req)
> +
> +            buffer = allocator.buffers(stream)[i]
> +            ret = req.addBuffer(stream, buffer)
> +            self.assertZero(ret)
> +
> +            reqs.append(req)
> +
> +        buffer = None
> +
> +        ret = cam.start()
> +        self.assertZero(ret)
> +
> +        for req in reqs:
> +            ret = cam.queueRequest(req)
> +            self.assertZero(ret)
> +
> +        reqs = None
> +        gc.collect()
> +
> +        sel = selectors.DefaultSelector()
> +        sel.register(cm.efd, selectors.EVENT_READ, 123)

Is the data argument needed ?

> +
> +        reqs = []
> +
> +        running = True
> +        while running:
> +            events = sel.select()
> +            for key, mask in events:
> +                os.read(key.fileobj, 8)
> +
> +                ready_reqs = cm.getReadyRequests()
> +
> +                self.assertTrue(len(ready_reqs) > 0)
> +
> +                reqs += ready_reqs
> +
> +                if len(reqs) == num_bufs:
> +                    running = False
> +
> +        self.assertTrue(len(reqs) == num_bufs)
> +
> +        for i, req in enumerate(reqs):
> +            self.assertTrue(i == req.cookie)
> +
> +        reqs = None
> +        gc.collect()
> +
> +        ret = cam.stop()
> +        self.assertZero(ret)
> +
> +
> +# Recursively expand slist's objects into olist, using seen to track already
> +# processed objects.
> +def _getr(slist, olist, seen):
> +    for e in slist:
> +        if id(e) in seen:
> +            continue
> +        seen.add(id(e))
> +        olist.append(e)
> +        tl = gc.get_referents(e)
> +        if tl:
> +            _getr(tl, olist, seen)
> +
> +
> +def get_all_objects(ignored=[]):
> +    gcl = gc.get_objects()
> +    olist = []
> +    seen = set()
> +
> +    seen.add(id(gcl))
> +    seen.add(id(olist))
> +    seen.add(id(seen))
> +    seen.update(set([id(o) for o in ignored]))
> +
> +    _getr(gcl, olist, seen)
> +
> +    return olist
> +
> +
> +def create_type_count_map(olist):
> +    map = defaultdict(int)
> +    for o in olist:
> +        map[type(o)] += 1
> +    return map
> +
> +
> +def diff_type_count_maps(before, after):
> +    return [(k, after[k] - before[k]) for k in after if after[k] != before[k]]
> +
> +
> +if __name__ == '__main__':
> +    # doesn't work very well, as things always leak a bit

Lovely :-) Is it something we can fix ?

s/doesn't/Doesn't/

Reviewed-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>

> +    test_leaks = False
> +
> +    if test_leaks:
> +        gc.unfreeze()
> +        gc.collect()
> +
> +        obs_before = get_all_objects()
> +
> +    unittest.main(exit=False)
> +
> +    if test_leaks:
> +        gc.unfreeze()
> +        gc.collect()
> +
> +        obs_after = get_all_objects([obs_before])
> +
> +        before = create_type_count_map(obs_before)
> +        after = create_type_count_map(obs_after)
> +
> +        leaks = diff_type_count_maps(before, after)
> +        if len(leaks) > 0:
> +            print(leaks)
Tomi Valkeinen May 6, 2022, 10:59 a.m. UTC | #2
On 05/05/2022 20:52, Laurent Pinchart wrote:
> Hi Tomi,
> 
> Thank you for the patch.
> 
> On Thu, May 05, 2022 at 01:40:57PM +0300, Tomi Valkeinen wrote:
>> Add a simple unittests.py as a base for python unittests.
>>
>> Signed-off-by: Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
>> ---
>>   test/meson.build     |   1 +
>>   test/py/meson.build  |  17 ++
>>   test/py/unittests.py | 368 +++++++++++++++++++++++++++++++++++++++++++
>>   3 files changed, 386 insertions(+)
>>   create mode 100644 test/py/meson.build
>>   create mode 100755 test/py/unittests.py
>>
>> diff --git a/test/meson.build b/test/meson.build
>> index fd4c5ca0..623f3baa 100644
>> --- a/test/meson.build
>> +++ b/test/meson.build
>> @@ -18,6 +18,7 @@ subdir('log')
>>   subdir('media_device')
>>   subdir('pipeline')
>>   subdir('process')
>> +subdir('py')
>>   subdir('serialization')
>>   subdir('stream')
>>   subdir('v4l2_compat')
>> diff --git a/test/py/meson.build b/test/py/meson.build
>> new file mode 100644
>> index 00000000..f6b42bd0
>> --- /dev/null
>> +++ b/test/py/meson.build
>> @@ -0,0 +1,17 @@
>> +# SPDX-License-Identifier: CC0-1.0
>> +
>> +if not pycamera_enabled
>> +    subdir_done()
>> +endif
>> +
>> +pymod = import('python')
>> +py3 = pymod.find_installation('python3')
>> +
>> +pypathdir = meson.project_build_root() / 'src/py'
> 
> pypathdir = meson.project_build_root() / 'src' / 'py'
> 
>> +
>> +test('pyunittests',
>> +     py3,
>> +     args : files('unittests.py'),
>> +     env : ['PYTHONPATH=' + pypathdir],
>> +     suite : 'pybindings',
>> +     is_parallel : false)
>> diff --git a/test/py/unittests.py b/test/py/unittests.py
>> new file mode 100755
>> index 00000000..15d5b4a7
>> --- /dev/null
>> +++ b/test/py/unittests.py
>> @@ -0,0 +1,368 @@
>> +#!/usr/bin/env python3
>> +
>> +# SPDX-License-Identifier: GPL-2.0-or-later
>> +# Copyright (C) 2021, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
>> +
>> +from collections import defaultdict
>> +import errno
>> +import gc
>> +import libcamera as libcam
> 
> I'm tempted to claim the "cam" name in all our Python code, or possibly
> "camera". What do you think ?

I think 'cam' and 'camera' are names for variables that contain a camera.

>> +import os
>> +import selectors
>> +import time
>> +import unittest
>> +import weakref
>> +
>> +
>> +class MyTestCase(unittest.TestCase):
> 
> s/MyTestCase/BaseTestCase/
> 
> It's not yours only anymore :-)

My precious...

>> +    def assertZero(self, a, msg=None):
>> +        self.assertEqual(a, 0, msg)
>> +
>> +
>> +class SimpleTestMethods(MyTestCase):
>> +    def test_find_ref(self):
>> +        cm = libcam.CameraManager.singleton()
>> +        wr_cm = weakref.ref(cm)
>> +
>> +        cam = cm.find("platform/vimc")
> 
> Standardizing on single-quotes here too would be nice.

I can't stand them, but... Fine, it's probably best to follow the widely 
used convention.

>> +        self.assertIsNotNone(cam)
>> +        wr_cam = weakref.ref(cam)
>> +
>> +        cm = None
>> +        gc.collect()
>> +        self.assertIsNotNone(wr_cm())
>> +
>> +        cam = None
>> +        gc.collect()
>> +        self.assertIsNone(wr_cm())
>> +        self.assertIsNone(wr_cam())
>> +
>> +    def test_get_ref(self):
>> +        cm = libcam.CameraManager.singleton()
>> +        wr_cm = weakref.ref(cm)
>> +
>> +        cam = cm.get("platform/vimc.0 Sensor B")
>> +        self.assertTrue(cam is not None)
>> +        wr_cam = weakref.ref(cam)
>> +
>> +        cm = None
>> +        gc.collect()
>> +        self.assertIsNotNone(wr_cm())
>> +
>> +        cam = None
>> +        gc.collect()
>> +        self.assertIsNone(wr_cm())
>> +        self.assertIsNone(wr_cam())
>> +
>> +    def test_acquire_release(self):
>> +        cm = libcam.CameraManager.singleton()
>> +        cam = cm.get("platform/vimc.0 Sensor B")
>> +        self.assertTrue(cam is not None)
>> +
>> +        ret = cam.acquire()
>> +        self.assertZero(ret)
>> +
>> +        ret = cam.release()
>> +        self.assertZero(ret)
>> +
>> +    def test_double_acquire(self):
>> +        cm = libcam.CameraManager.singleton()
>> +        cam = cm.get("platform/vimc.0 Sensor B")
>> +        self.assertTrue(cam is not None)
>> +
>> +        ret = cam.acquire()
>> +        self.assertZero(ret)
>> +
>> +        libcam.logSetLevel("Camera", "FATAL")
>> +        ret = cam.acquire()
>> +        self.assertEqual(ret, -errno.EBUSY)
>> +        libcam.logSetLevel("Camera", "ERROR")
>> +
>> +        ret = cam.release()
>> +        self.assertZero(ret)
>> +
>> +        ret = cam.release()
>> +        # I expected EBUSY, but looks like double release works fine
>> +        self.assertZero(ret)
>> +
>> +
>> +class CameraTesterBase(MyTestCase):
>> +    def setUp(self):
>> +        self.cm = libcam.CameraManager.singleton()
>> +        self.cam = self.cm.find("platform/vimc")
>> +        if self.cam is None:
>> +            self.cm = None
>> +            raise Exception("No vimc found")
> 
> Is it possible to skip the test in that case instead of failing ? This
> could be done on top, a todo comment somewhere in the file would be good
> then.

Yes, I can do that. Although some tests test finding the vimc camera, so 
those will still fail.

>> +
>> +        ret = self.cam.acquire()
>> +        if ret != 0:
>> +            self.cam = None
>> +            self.cm = None
>> +            raise Exception("Failed to acquire camera")
>> +
>> +    def tearDown(self):
>> +        # If a test fails, the camera may be in running state. So always stop.
>> +        self.cam.stop()
>> +
>> +        ret = self.cam.release()
>> +        if ret != 0:
>> +            raise Exception("Failed to release camera")
>> +
>> +        self.cam = None
>> +        self.cm = None
>> +
>> +
>> +class AllocatorTestMethods(CameraTesterBase):
>> +    def test_allocator(self):
>> +        cam = self.cam
>> +
>> +        camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture])
>> +        self.assertTrue(camconfig.size == 1)
>> +        wr_camconfig = weakref.ref(camconfig)
>> +
>> +        streamconfig = camconfig.at(0)
>> +        wr_streamconfig = weakref.ref(streamconfig)
>> +
>> +        ret = cam.configure(camconfig)
>> +        self.assertZero(ret)
>> +
>> +        stream = streamconfig.stream
>> +        wr_stream = weakref.ref(stream)
>> +
>> +        # stream should keep streamconfig and camconfig alive
>> +        streamconfig = None
>> +        camconfig = None
>> +        gc.collect()
>> +        self.assertIsNotNone(wr_camconfig())
>> +        self.assertIsNotNone(wr_streamconfig())
>> +
>> +        allocator = libcam.FrameBufferAllocator(cam)
>> +        ret = allocator.allocate(stream)
>> +        self.assertTrue(ret > 0)
>> +        wr_allocator = weakref.ref(allocator)
>> +
>> +        buffers = allocator.buffers(stream)
>> +        buffers = None
>> +
>> +        buffer = allocator.buffers(stream)[0]
>> +        self.assertIsNotNone(buffer)
>> +        wr_buffer = weakref.ref(buffer)
>> +
>> +        allocator = None
>> +        gc.collect()
>> +        self.assertIsNotNone(wr_buffer())
>> +        self.assertIsNotNone(wr_allocator())
>> +        self.assertIsNotNone(wr_stream())
>> +
>> +        buffer = None
>> +        gc.collect()
>> +        self.assertIsNone(wr_buffer())
>> +        self.assertIsNone(wr_allocator())
>> +        self.assertIsNotNone(wr_stream())
>> +
>> +        stream = None
>> +        gc.collect()
>> +        self.assertIsNone(wr_stream())
>> +        self.assertIsNone(wr_camconfig())
>> +        self.assertIsNone(wr_streamconfig())
>> +
>> +
>> +class SimpleCaptureMethods(CameraTesterBase):
>> +    def test_sleep(self):
>> +        cm = self.cm
>> +        cam = self.cam
>> +
>> +        camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture])
>> +        self.assertTrue(camconfig.size == 1)
>> +
>> +        streamconfig = camconfig.at(0)
>> +        fmts = streamconfig.formats
>> +
>> +        ret = cam.configure(camconfig)
>> +        self.assertZero(ret)
>> +
>> +        stream = streamconfig.stream
>> +
>> +        allocator = libcam.FrameBufferAllocator(cam)
>> +        ret = allocator.allocate(stream)
>> +        self.assertTrue(ret > 0)
>> +
>> +        num_bufs = len(allocator.buffers(stream))
>> +
>> +        reqs = []
>> +        for i in range(num_bufs):
>> +            req = cam.createRequest(i)
>> +            self.assertIsNotNone(req)
>> +
>> +            buffer = allocator.buffers(stream)[i]
>> +            ret = req.addBuffer(stream, buffer)
>> +            self.assertZero(ret)
>> +
>> +            reqs.append(req)
>> +
>> +        buffer = None
>> +
>> +        ret = cam.start()
>> +        self.assertZero(ret)
>> +
>> +        for req in reqs:
>> +            ret = cam.queueRequest(req)
>> +            self.assertZero(ret)
>> +
>> +        reqs = None
>> +        gc.collect()
>> +
>> +        time.sleep(0.5)
>> +
>> +        reqs = cm.getReadyRequests()
>> +
>> +        self.assertTrue(len(reqs) == num_bufs)
>> +
>> +        for i, req in enumerate(reqs):
>> +            self.assertTrue(i == req.cookie)
>> +
>> +        reqs = None
>> +        gc.collect()
>> +
>> +        ret = cam.stop()
>> +        self.assertZero(ret)
>> +
>> +    def test_select(self):
>> +        cm = self.cm
>> +        cam = self.cam
>> +
>> +        camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture])
>> +        self.assertTrue(camconfig.size == 1)
>> +
>> +        streamconfig = camconfig.at(0)
>> +        fmts = streamconfig.formats
>> +
>> +        ret = cam.configure(camconfig)
>> +        self.assertZero(ret)
>> +
>> +        stream = streamconfig.stream
>> +
>> +        allocator = libcam.FrameBufferAllocator(cam)
>> +        ret = allocator.allocate(stream)
>> +        self.assertTrue(ret > 0)
>> +
>> +        num_bufs = len(allocator.buffers(stream))
>> +
>> +        reqs = []
>> +        for i in range(num_bufs):
>> +            req = cam.createRequest(i)
>> +            self.assertIsNotNone(req)
>> +
>> +            buffer = allocator.buffers(stream)[i]
>> +            ret = req.addBuffer(stream, buffer)
>> +            self.assertZero(ret)
>> +
>> +            reqs.append(req)
>> +
>> +        buffer = None
>> +
>> +        ret = cam.start()
>> +        self.assertZero(ret)
>> +
>> +        for req in reqs:
>> +            ret = cam.queueRequest(req)
>> +            self.assertZero(ret)
>> +
>> +        reqs = None
>> +        gc.collect()
>> +
>> +        sel = selectors.DefaultSelector()
>> +        sel.register(cm.efd, selectors.EVENT_READ, 123)
> 
> Is the data argument needed ?

No, it's not.

>> +
>> +        reqs = []
>> +
>> +        running = True
>> +        while running:
>> +            events = sel.select()
>> +            for key, mask in events:
>> +                os.read(key.fileobj, 8)
>> +
>> +                ready_reqs = cm.getReadyRequests()
>> +
>> +                self.assertTrue(len(ready_reqs) > 0)
>> +
>> +                reqs += ready_reqs
>> +
>> +                if len(reqs) == num_bufs:
>> +                    running = False
>> +
>> +        self.assertTrue(len(reqs) == num_bufs)
>> +
>> +        for i, req in enumerate(reqs):
>> +            self.assertTrue(i == req.cookie)
>> +
>> +        reqs = None
>> +        gc.collect()
>> +
>> +        ret = cam.stop()
>> +        self.assertZero(ret)
>> +
>> +
>> +# Recursively expand slist's objects into olist, using seen to track already
>> +# processed objects.
>> +def _getr(slist, olist, seen):
>> +    for e in slist:
>> +        if id(e) in seen:
>> +            continue
>> +        seen.add(id(e))
>> +        olist.append(e)
>> +        tl = gc.get_referents(e)
>> +        if tl:
>> +            _getr(tl, olist, seen)
>> +
>> +
>> +def get_all_objects(ignored=[]):
>> +    gcl = gc.get_objects()
>> +    olist = []
>> +    seen = set()
>> +
>> +    seen.add(id(gcl))
>> +    seen.add(id(olist))
>> +    seen.add(id(seen))
>> +    seen.update(set([id(o) for o in ignored]))
>> +
>> +    _getr(gcl, olist, seen)
>> +
>> +    return olist
>> +
>> +
>> +def create_type_count_map(olist):
>> +    map = defaultdict(int)
>> +    for o in olist:
>> +        map[type(o)] += 1
>> +    return map
>> +
>> +
>> +def diff_type_count_maps(before, after):
>> +    return [(k, after[k] - before[k]) for k in after if after[k] != before[k]]
>> +
>> +
>> +if __name__ == '__main__':
>> +    # doesn't work very well, as things always leak a bit
> 
> Lovely :-) Is it something we can fix ?

I have no idea if this method is a valid way to observe object leaks, 
so... no clue. I think it can still be used, but manually, comparing 
object lists when doing some changes to the tests.

  Tomi
Laurent Pinchart May 6, 2022, 3:26 p.m. UTC | #3
Hi Tomi,

On Fri, May 06, 2022 at 01:59:38PM +0300, Tomi Valkeinen wrote:
> On 05/05/2022 20:52, Laurent Pinchart wrote:
> > On Thu, May 05, 2022 at 01:40:57PM +0300, Tomi Valkeinen wrote:
> >> Add a simple unittests.py as a base for python unittests.
> >>
> >> Signed-off-by: Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
> >> ---
> >>   test/meson.build     |   1 +
> >>   test/py/meson.build  |  17 ++
> >>   test/py/unittests.py | 368 +++++++++++++++++++++++++++++++++++++++++++
> >>   3 files changed, 386 insertions(+)
> >>   create mode 100644 test/py/meson.build
> >>   create mode 100755 test/py/unittests.py
> >>
> >> diff --git a/test/meson.build b/test/meson.build
> >> index fd4c5ca0..623f3baa 100644
> >> --- a/test/meson.build
> >> +++ b/test/meson.build
> >> @@ -18,6 +18,7 @@ subdir('log')
> >>   subdir('media_device')
> >>   subdir('pipeline')
> >>   subdir('process')
> >> +subdir('py')
> >>   subdir('serialization')
> >>   subdir('stream')
> >>   subdir('v4l2_compat')
> >> diff --git a/test/py/meson.build b/test/py/meson.build
> >> new file mode 100644
> >> index 00000000..f6b42bd0
> >> --- /dev/null
> >> +++ b/test/py/meson.build
> >> @@ -0,0 +1,17 @@
> >> +# SPDX-License-Identifier: CC0-1.0
> >> +
> >> +if not pycamera_enabled
> >> +    subdir_done()
> >> +endif
> >> +
> >> +pymod = import('python')
> >> +py3 = pymod.find_installation('python3')
> >> +
> >> +pypathdir = meson.project_build_root() / 'src/py'
> > 
> > pypathdir = meson.project_build_root() / 'src' / 'py'
> > 
> >> +
> >> +test('pyunittests',
> >> +     py3,
> >> +     args : files('unittests.py'),
> >> +     env : ['PYTHONPATH=' + pypathdir],
> >> +     suite : 'pybindings',
> >> +     is_parallel : false)
> >> diff --git a/test/py/unittests.py b/test/py/unittests.py
> >> new file mode 100755
> >> index 00000000..15d5b4a7
> >> --- /dev/null
> >> +++ b/test/py/unittests.py
> >> @@ -0,0 +1,368 @@
> >> +#!/usr/bin/env python3
> >> +
> >> +# SPDX-License-Identifier: GPL-2.0-or-later
> >> +# Copyright (C) 2021, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
> >> +
> >> +from collections import defaultdict
> >> +import errno
> >> +import gc
> >> +import libcamera as libcam
> > 
> > I'm tempted to claim the "cam" name in all our Python code, or possibly
> > "camera". What do you think ?
> 
> I think 'cam' and 'camera' are names for variables that contain a camera.

That's a very good point, even if "cam" would also be a good name for
the module. Any idea from anyone on what would be a good short module
name alias ? We could also use it as a libcamera namespace alias in C++
code examples for consistency.

> >> +import os
> >> +import selectors
> >> +import time
> >> +import unittest
> >> +import weakref
> >> +
> >> +
> >> +class MyTestCase(unittest.TestCase):
> > 
> > s/MyTestCase/BaseTestCase/
> > 
> > It's not yours only anymore :-)
> 
> My precious...
> 
> >> +    def assertZero(self, a, msg=None):
> >> +        self.assertEqual(a, 0, msg)
> >> +
> >> +
> >> +class SimpleTestMethods(MyTestCase):
> >> +    def test_find_ref(self):
> >> +        cm = libcam.CameraManager.singleton()
> >> +        wr_cm = weakref.ref(cm)
> >> +
> >> +        cam = cm.find("platform/vimc")
> > 
> > Standardizing on single-quotes here too would be nice.
> 
> I can't stand them, but... Fine, it's probably best to follow the widely 
> used convention.

I think there's a PEP that recommends single quotes. It took me a while
to get used to it.

> >> +        self.assertIsNotNone(cam)
> >> +        wr_cam = weakref.ref(cam)
> >> +
> >> +        cm = None
> >> +        gc.collect()
> >> +        self.assertIsNotNone(wr_cm())
> >> +
> >> +        cam = None
> >> +        gc.collect()
> >> +        self.assertIsNone(wr_cm())
> >> +        self.assertIsNone(wr_cam())
> >> +
> >> +    def test_get_ref(self):
> >> +        cm = libcam.CameraManager.singleton()
> >> +        wr_cm = weakref.ref(cm)
> >> +
> >> +        cam = cm.get("platform/vimc.0 Sensor B")
> >> +        self.assertTrue(cam is not None)
> >> +        wr_cam = weakref.ref(cam)
> >> +
> >> +        cm = None
> >> +        gc.collect()
> >> +        self.assertIsNotNone(wr_cm())
> >> +
> >> +        cam = None
> >> +        gc.collect()
> >> +        self.assertIsNone(wr_cm())
> >> +        self.assertIsNone(wr_cam())
> >> +
> >> +    def test_acquire_release(self):
> >> +        cm = libcam.CameraManager.singleton()
> >> +        cam = cm.get("platform/vimc.0 Sensor B")
> >> +        self.assertTrue(cam is not None)
> >> +
> >> +        ret = cam.acquire()
> >> +        self.assertZero(ret)
> >> +
> >> +        ret = cam.release()
> >> +        self.assertZero(ret)
> >> +
> >> +    def test_double_acquire(self):
> >> +        cm = libcam.CameraManager.singleton()
> >> +        cam = cm.get("platform/vimc.0 Sensor B")
> >> +        self.assertTrue(cam is not None)
> >> +
> >> +        ret = cam.acquire()
> >> +        self.assertZero(ret)
> >> +
> >> +        libcam.logSetLevel("Camera", "FATAL")
> >> +        ret = cam.acquire()
> >> +        self.assertEqual(ret, -errno.EBUSY)
> >> +        libcam.logSetLevel("Camera", "ERROR")
> >> +
> >> +        ret = cam.release()
> >> +        self.assertZero(ret)
> >> +
> >> +        ret = cam.release()
> >> +        # I expected EBUSY, but looks like double release works fine
> >> +        self.assertZero(ret)
> >> +
> >> +
> >> +class CameraTesterBase(MyTestCase):
> >> +    def setUp(self):
> >> +        self.cm = libcam.CameraManager.singleton()
> >> +        self.cam = self.cm.find("platform/vimc")
> >> +        if self.cam is None:
> >> +            self.cm = None
> >> +            raise Exception("No vimc found")
> > 
> > Is it possible to skip the test in that case instead of failing ? This
> > could be done on top, a todo comment somewhere in the file would be good
> > then.
> 
> Yes, I can do that. Although some tests test finding the vimc camera, so 
> those will still fail.
> 
> >> +
> >> +        ret = self.cam.acquire()
> >> +        if ret != 0:
> >> +            self.cam = None
> >> +            self.cm = None
> >> +            raise Exception("Failed to acquire camera")
> >> +
> >> +    def tearDown(self):
> >> +        # If a test fails, the camera may be in running state. So always stop.
> >> +        self.cam.stop()
> >> +
> >> +        ret = self.cam.release()
> >> +        if ret != 0:
> >> +            raise Exception("Failed to release camera")
> >> +
> >> +        self.cam = None
> >> +        self.cm = None
> >> +
> >> +
> >> +class AllocatorTestMethods(CameraTesterBase):
> >> +    def test_allocator(self):
> >> +        cam = self.cam
> >> +
> >> +        camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture])
> >> +        self.assertTrue(camconfig.size == 1)
> >> +        wr_camconfig = weakref.ref(camconfig)
> >> +
> >> +        streamconfig = camconfig.at(0)
> >> +        wr_streamconfig = weakref.ref(streamconfig)
> >> +
> >> +        ret = cam.configure(camconfig)
> >> +        self.assertZero(ret)
> >> +
> >> +        stream = streamconfig.stream
> >> +        wr_stream = weakref.ref(stream)
> >> +
> >> +        # stream should keep streamconfig and camconfig alive
> >> +        streamconfig = None
> >> +        camconfig = None
> >> +        gc.collect()
> >> +        self.assertIsNotNone(wr_camconfig())
> >> +        self.assertIsNotNone(wr_streamconfig())
> >> +
> >> +        allocator = libcam.FrameBufferAllocator(cam)
> >> +        ret = allocator.allocate(stream)
> >> +        self.assertTrue(ret > 0)
> >> +        wr_allocator = weakref.ref(allocator)
> >> +
> >> +        buffers = allocator.buffers(stream)
> >> +        buffers = None
> >> +
> >> +        buffer = allocator.buffers(stream)[0]
> >> +        self.assertIsNotNone(buffer)
> >> +        wr_buffer = weakref.ref(buffer)
> >> +
> >> +        allocator = None
> >> +        gc.collect()
> >> +        self.assertIsNotNone(wr_buffer())
> >> +        self.assertIsNotNone(wr_allocator())
> >> +        self.assertIsNotNone(wr_stream())
> >> +
> >> +        buffer = None
> >> +        gc.collect()
> >> +        self.assertIsNone(wr_buffer())
> >> +        self.assertIsNone(wr_allocator())
> >> +        self.assertIsNotNone(wr_stream())
> >> +
> >> +        stream = None
> >> +        gc.collect()
> >> +        self.assertIsNone(wr_stream())
> >> +        self.assertIsNone(wr_camconfig())
> >> +        self.assertIsNone(wr_streamconfig())
> >> +
> >> +
> >> +class SimpleCaptureMethods(CameraTesterBase):
> >> +    def test_sleep(self):
> >> +        cm = self.cm
> >> +        cam = self.cam
> >> +
> >> +        camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture])
> >> +        self.assertTrue(camconfig.size == 1)
> >> +
> >> +        streamconfig = camconfig.at(0)
> >> +        fmts = streamconfig.formats
> >> +
> >> +        ret = cam.configure(camconfig)
> >> +        self.assertZero(ret)
> >> +
> >> +        stream = streamconfig.stream
> >> +
> >> +        allocator = libcam.FrameBufferAllocator(cam)
> >> +        ret = allocator.allocate(stream)
> >> +        self.assertTrue(ret > 0)
> >> +
> >> +        num_bufs = len(allocator.buffers(stream))
> >> +
> >> +        reqs = []
> >> +        for i in range(num_bufs):
> >> +            req = cam.createRequest(i)
> >> +            self.assertIsNotNone(req)
> >> +
> >> +            buffer = allocator.buffers(stream)[i]
> >> +            ret = req.addBuffer(stream, buffer)
> >> +            self.assertZero(ret)
> >> +
> >> +            reqs.append(req)
> >> +
> >> +        buffer = None
> >> +
> >> +        ret = cam.start()
> >> +        self.assertZero(ret)
> >> +
> >> +        for req in reqs:
> >> +            ret = cam.queueRequest(req)
> >> +            self.assertZero(ret)
> >> +
> >> +        reqs = None
> >> +        gc.collect()
> >> +
> >> +        time.sleep(0.5)
> >> +
> >> +        reqs = cm.getReadyRequests()
> >> +
> >> +        self.assertTrue(len(reqs) == num_bufs)
> >> +
> >> +        for i, req in enumerate(reqs):
> >> +            self.assertTrue(i == req.cookie)
> >> +
> >> +        reqs = None
> >> +        gc.collect()
> >> +
> >> +        ret = cam.stop()
> >> +        self.assertZero(ret)
> >> +
> >> +    def test_select(self):
> >> +        cm = self.cm
> >> +        cam = self.cam
> >> +
> >> +        camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture])
> >> +        self.assertTrue(camconfig.size == 1)
> >> +
> >> +        streamconfig = camconfig.at(0)
> >> +        fmts = streamconfig.formats
> >> +
> >> +        ret = cam.configure(camconfig)
> >> +        self.assertZero(ret)
> >> +
> >> +        stream = streamconfig.stream
> >> +
> >> +        allocator = libcam.FrameBufferAllocator(cam)
> >> +        ret = allocator.allocate(stream)
> >> +        self.assertTrue(ret > 0)
> >> +
> >> +        num_bufs = len(allocator.buffers(stream))
> >> +
> >> +        reqs = []
> >> +        for i in range(num_bufs):
> >> +            req = cam.createRequest(i)
> >> +            self.assertIsNotNone(req)
> >> +
> >> +            buffer = allocator.buffers(stream)[i]
> >> +            ret = req.addBuffer(stream, buffer)
> >> +            self.assertZero(ret)
> >> +
> >> +            reqs.append(req)
> >> +
> >> +        buffer = None
> >> +
> >> +        ret = cam.start()
> >> +        self.assertZero(ret)
> >> +
> >> +        for req in reqs:
> >> +            ret = cam.queueRequest(req)
> >> +            self.assertZero(ret)
> >> +
> >> +        reqs = None
> >> +        gc.collect()
> >> +
> >> +        sel = selectors.DefaultSelector()
> >> +        sel.register(cm.efd, selectors.EVENT_READ, 123)
> > 
> > Is the data argument needed ?
> 
> No, it's not.
> 
> >> +
> >> +        reqs = []
> >> +
> >> +        running = True
> >> +        while running:
> >> +            events = sel.select()
> >> +            for key, mask in events:
> >> +                os.read(key.fileobj, 8)
> >> +
> >> +                ready_reqs = cm.getReadyRequests()
> >> +
> >> +                self.assertTrue(len(ready_reqs) > 0)
> >> +
> >> +                reqs += ready_reqs
> >> +
> >> +                if len(reqs) == num_bufs:
> >> +                    running = False
> >> +
> >> +        self.assertTrue(len(reqs) == num_bufs)
> >> +
> >> +        for i, req in enumerate(reqs):
> >> +            self.assertTrue(i == req.cookie)
> >> +
> >> +        reqs = None
> >> +        gc.collect()
> >> +
> >> +        ret = cam.stop()
> >> +        self.assertZero(ret)
> >> +
> >> +
> >> +# Recursively expand slist's objects into olist, using seen to track already
> >> +# processed objects.
> >> +def _getr(slist, olist, seen):
> >> +    for e in slist:
> >> +        if id(e) in seen:
> >> +            continue
> >> +        seen.add(id(e))
> >> +        olist.append(e)
> >> +        tl = gc.get_referents(e)
> >> +        if tl:
> >> +            _getr(tl, olist, seen)
> >> +
> >> +
> >> +def get_all_objects(ignored=[]):
> >> +    gcl = gc.get_objects()
> >> +    olist = []
> >> +    seen = set()
> >> +
> >> +    seen.add(id(gcl))
> >> +    seen.add(id(olist))
> >> +    seen.add(id(seen))
> >> +    seen.update(set([id(o) for o in ignored]))
> >> +
> >> +    _getr(gcl, olist, seen)
> >> +
> >> +    return olist
> >> +
> >> +
> >> +def create_type_count_map(olist):
> >> +    map = defaultdict(int)
> >> +    for o in olist:
> >> +        map[type(o)] += 1
> >> +    return map
> >> +
> >> +
> >> +def diff_type_count_maps(before, after):
> >> +    return [(k, after[k] - before[k]) for k in after if after[k] != before[k]]
> >> +
> >> +
> >> +if __name__ == '__main__':
> >> +    # doesn't work very well, as things always leak a bit
> > 
> > Lovely :-) Is it something we can fix ?
> 
> I have no idea if this method is a valid way to observe object leaks, 
> so... no clue. I think it can still be used, but manually, comparing 
> object lists when doing some changes to the tests.

We can figure it out later.

Patch
diff mbox series

diff --git a/test/meson.build b/test/meson.build
index fd4c5ca0..623f3baa 100644
--- a/test/meson.build
+++ b/test/meson.build
@@ -18,6 +18,7 @@  subdir('log')
 subdir('media_device')
 subdir('pipeline')
 subdir('process')
+subdir('py')
 subdir('serialization')
 subdir('stream')
 subdir('v4l2_compat')
diff --git a/test/py/meson.build b/test/py/meson.build
new file mode 100644
index 00000000..f6b42bd0
--- /dev/null
+++ b/test/py/meson.build
@@ -0,0 +1,17 @@ 
+# SPDX-License-Identifier: CC0-1.0
+
+if not pycamera_enabled
+    subdir_done()
+endif
+
+pymod = import('python')
+py3 = pymod.find_installation('python3')
+
+pypathdir = meson.project_build_root() / 'src/py'
+
+test('pyunittests',
+     py3,
+     args : files('unittests.py'),
+     env : ['PYTHONPATH=' + pypathdir],
+     suite : 'pybindings',
+     is_parallel : false)
diff --git a/test/py/unittests.py b/test/py/unittests.py
new file mode 100755
index 00000000..15d5b4a7
--- /dev/null
+++ b/test/py/unittests.py
@@ -0,0 +1,368 @@ 
+#!/usr/bin/env python3
+
+# SPDX-License-Identifier: GPL-2.0-or-later
+# Copyright (C) 2021, Tomi Valkeinen <tomi.valkeinen@ideasonboard.com>
+
+from collections import defaultdict
+import errno
+import gc
+import libcamera as libcam
+import os
+import selectors
+import time
+import unittest
+import weakref
+
+
+class MyTestCase(unittest.TestCase):
+    def assertZero(self, a, msg=None):
+        self.assertEqual(a, 0, msg)
+
+
+class SimpleTestMethods(MyTestCase):
+    def test_find_ref(self):
+        cm = libcam.CameraManager.singleton()
+        wr_cm = weakref.ref(cm)
+
+        cam = cm.find("platform/vimc")
+        self.assertIsNotNone(cam)
+        wr_cam = weakref.ref(cam)
+
+        cm = None
+        gc.collect()
+        self.assertIsNotNone(wr_cm())
+
+        cam = None
+        gc.collect()
+        self.assertIsNone(wr_cm())
+        self.assertIsNone(wr_cam())
+
+    def test_get_ref(self):
+        cm = libcam.CameraManager.singleton()
+        wr_cm = weakref.ref(cm)
+
+        cam = cm.get("platform/vimc.0 Sensor B")
+        self.assertTrue(cam is not None)
+        wr_cam = weakref.ref(cam)
+
+        cm = None
+        gc.collect()
+        self.assertIsNotNone(wr_cm())
+
+        cam = None
+        gc.collect()
+        self.assertIsNone(wr_cm())
+        self.assertIsNone(wr_cam())
+
+    def test_acquire_release(self):
+        cm = libcam.CameraManager.singleton()
+        cam = cm.get("platform/vimc.0 Sensor B")
+        self.assertTrue(cam is not None)
+
+        ret = cam.acquire()
+        self.assertZero(ret)
+
+        ret = cam.release()
+        self.assertZero(ret)
+
+    def test_double_acquire(self):
+        cm = libcam.CameraManager.singleton()
+        cam = cm.get("platform/vimc.0 Sensor B")
+        self.assertTrue(cam is not None)
+
+        ret = cam.acquire()
+        self.assertZero(ret)
+
+        libcam.logSetLevel("Camera", "FATAL")
+        ret = cam.acquire()
+        self.assertEqual(ret, -errno.EBUSY)
+        libcam.logSetLevel("Camera", "ERROR")
+
+        ret = cam.release()
+        self.assertZero(ret)
+
+        ret = cam.release()
+        # I expected EBUSY, but looks like double release works fine
+        self.assertZero(ret)
+
+
+class CameraTesterBase(MyTestCase):
+    def setUp(self):
+        self.cm = libcam.CameraManager.singleton()
+        self.cam = self.cm.find("platform/vimc")
+        if self.cam is None:
+            self.cm = None
+            raise Exception("No vimc found")
+
+        ret = self.cam.acquire()
+        if ret != 0:
+            self.cam = None
+            self.cm = None
+            raise Exception("Failed to acquire camera")
+
+    def tearDown(self):
+        # If a test fails, the camera may be in running state. So always stop.
+        self.cam.stop()
+
+        ret = self.cam.release()
+        if ret != 0:
+            raise Exception("Failed to release camera")
+
+        self.cam = None
+        self.cm = None
+
+
+class AllocatorTestMethods(CameraTesterBase):
+    def test_allocator(self):
+        cam = self.cam
+
+        camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture])
+        self.assertTrue(camconfig.size == 1)
+        wr_camconfig = weakref.ref(camconfig)
+
+        streamconfig = camconfig.at(0)
+        wr_streamconfig = weakref.ref(streamconfig)
+
+        ret = cam.configure(camconfig)
+        self.assertZero(ret)
+
+        stream = streamconfig.stream
+        wr_stream = weakref.ref(stream)
+
+        # stream should keep streamconfig and camconfig alive
+        streamconfig = None
+        camconfig = None
+        gc.collect()
+        self.assertIsNotNone(wr_camconfig())
+        self.assertIsNotNone(wr_streamconfig())
+
+        allocator = libcam.FrameBufferAllocator(cam)
+        ret = allocator.allocate(stream)
+        self.assertTrue(ret > 0)
+        wr_allocator = weakref.ref(allocator)
+
+        buffers = allocator.buffers(stream)
+        buffers = None
+
+        buffer = allocator.buffers(stream)[0]
+        self.assertIsNotNone(buffer)
+        wr_buffer = weakref.ref(buffer)
+
+        allocator = None
+        gc.collect()
+        self.assertIsNotNone(wr_buffer())
+        self.assertIsNotNone(wr_allocator())
+        self.assertIsNotNone(wr_stream())
+
+        buffer = None
+        gc.collect()
+        self.assertIsNone(wr_buffer())
+        self.assertIsNone(wr_allocator())
+        self.assertIsNotNone(wr_stream())
+
+        stream = None
+        gc.collect()
+        self.assertIsNone(wr_stream())
+        self.assertIsNone(wr_camconfig())
+        self.assertIsNone(wr_streamconfig())
+
+
+class SimpleCaptureMethods(CameraTesterBase):
+    def test_sleep(self):
+        cm = self.cm
+        cam = self.cam
+
+        camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture])
+        self.assertTrue(camconfig.size == 1)
+
+        streamconfig = camconfig.at(0)
+        fmts = streamconfig.formats
+
+        ret = cam.configure(camconfig)
+        self.assertZero(ret)
+
+        stream = streamconfig.stream
+
+        allocator = libcam.FrameBufferAllocator(cam)
+        ret = allocator.allocate(stream)
+        self.assertTrue(ret > 0)
+
+        num_bufs = len(allocator.buffers(stream))
+
+        reqs = []
+        for i in range(num_bufs):
+            req = cam.createRequest(i)
+            self.assertIsNotNone(req)
+
+            buffer = allocator.buffers(stream)[i]
+            ret = req.addBuffer(stream, buffer)
+            self.assertZero(ret)
+
+            reqs.append(req)
+
+        buffer = None
+
+        ret = cam.start()
+        self.assertZero(ret)
+
+        for req in reqs:
+            ret = cam.queueRequest(req)
+            self.assertZero(ret)
+
+        reqs = None
+        gc.collect()
+
+        time.sleep(0.5)
+
+        reqs = cm.getReadyRequests()
+
+        self.assertTrue(len(reqs) == num_bufs)
+
+        for i, req in enumerate(reqs):
+            self.assertTrue(i == req.cookie)
+
+        reqs = None
+        gc.collect()
+
+        ret = cam.stop()
+        self.assertZero(ret)
+
+    def test_select(self):
+        cm = self.cm
+        cam = self.cam
+
+        camconfig = cam.generateConfiguration([libcam.StreamRole.StillCapture])
+        self.assertTrue(camconfig.size == 1)
+
+        streamconfig = camconfig.at(0)
+        fmts = streamconfig.formats
+
+        ret = cam.configure(camconfig)
+        self.assertZero(ret)
+
+        stream = streamconfig.stream
+
+        allocator = libcam.FrameBufferAllocator(cam)
+        ret = allocator.allocate(stream)
+        self.assertTrue(ret > 0)
+
+        num_bufs = len(allocator.buffers(stream))
+
+        reqs = []
+        for i in range(num_bufs):
+            req = cam.createRequest(i)
+            self.assertIsNotNone(req)
+
+            buffer = allocator.buffers(stream)[i]
+            ret = req.addBuffer(stream, buffer)
+            self.assertZero(ret)
+
+            reqs.append(req)
+
+        buffer = None
+
+        ret = cam.start()
+        self.assertZero(ret)
+
+        for req in reqs:
+            ret = cam.queueRequest(req)
+            self.assertZero(ret)
+
+        reqs = None
+        gc.collect()
+
+        sel = selectors.DefaultSelector()
+        sel.register(cm.efd, selectors.EVENT_READ, 123)
+
+        reqs = []
+
+        running = True
+        while running:
+            events = sel.select()
+            for key, mask in events:
+                os.read(key.fileobj, 8)
+
+                ready_reqs = cm.getReadyRequests()
+
+                self.assertTrue(len(ready_reqs) > 0)
+
+                reqs += ready_reqs
+
+                if len(reqs) == num_bufs:
+                    running = False
+
+        self.assertTrue(len(reqs) == num_bufs)
+
+        for i, req in enumerate(reqs):
+            self.assertTrue(i == req.cookie)
+
+        reqs = None
+        gc.collect()
+
+        ret = cam.stop()
+        self.assertZero(ret)
+
+
+# Recursively expand slist's objects into olist, using seen to track already
+# processed objects.
+def _getr(slist, olist, seen):
+    for e in slist:
+        if id(e) in seen:
+            continue
+        seen.add(id(e))
+        olist.append(e)
+        tl = gc.get_referents(e)
+        if tl:
+            _getr(tl, olist, seen)
+
+
+def get_all_objects(ignored=[]):
+    gcl = gc.get_objects()
+    olist = []
+    seen = set()
+
+    seen.add(id(gcl))
+    seen.add(id(olist))
+    seen.add(id(seen))
+    seen.update(set([id(o) for o in ignored]))
+
+    _getr(gcl, olist, seen)
+
+    return olist
+
+
+def create_type_count_map(olist):
+    map = defaultdict(int)
+    for o in olist:
+        map[type(o)] += 1
+    return map
+
+
+def diff_type_count_maps(before, after):
+    return [(k, after[k] - before[k]) for k in after if after[k] != before[k]]
+
+
+if __name__ == '__main__':
+    # doesn't work very well, as things always leak a bit
+    test_leaks = False
+
+    if test_leaks:
+        gc.unfreeze()
+        gc.collect()
+
+        obs_before = get_all_objects()
+
+    unittest.main(exit=False)
+
+    if test_leaks:
+        gc.unfreeze()
+        gc.collect()
+
+        obs_after = get_all_objects([obs_before])
+
+        before = create_type_count_map(obs_before)
+        after = create_type_count_map(obs_after)
+
+        leaks = diff_type_count_maps(before, after)
+        if len(leaks) > 0:
+            print(leaks)