@@ -6,6 +6,7 @@
# \todo Convert ctx and state dicts to proper classes, and move relevant
# functions to those classes.
+from typing import Any
import argparse
import binascii
import libcamera as libcam
@@ -14,379 +15,371 @@ import sys
import traceback
-class CustomAction(argparse.Action):
- def __init__(self, option_strings, dest, **kwargs):
- super().__init__(option_strings, dest, default={}, **kwargs)
-
- def __call__(self, parser, namespace, values, option_string=None):
- if len(namespace.camera) == 0:
- print(f'Option {option_string} requires a --camera context')
- sys.exit(-1)
-
- if self.type == bool:
- values = True
-
- current = namespace.camera[-1]
-
- data = getattr(namespace, self.dest)
-
- if self.nargs == '+':
- if current not in data:
- data[current] = []
-
- data[current] += values
- else:
- data[current] = values
-
-
-def do_cmd_list(cm):
- print('Available cameras:')
-
- for idx, c in enumerate(cm.cameras):
- print(f'{idx + 1}: {c.id}')
-
-
-def do_cmd_list_props(ctx):
- camera = ctx['camera']
-
- print('Properties for', ctx['id'])
+class CameraContext:
+ camera: libcam.Camera
+ id: str
+ idx: int
- for name, prop in camera.properties.items():
- print('\t{}: {}'.format(name, prop))
+ opt_stream: str
+ opt_strict_formats: bool
+ opt_crc: bool
+ opt_metadata: bool
+ opt_save_frames: bool
+ opt_capture: int
+ stream_names: dict[libcam.Stream, str]
+ streams: list[libcam.Stream]
+ allocator: libcam.FrameBufferAllocator
+ requests: list[libcam.Request]
+ reqs_queued: int
+ reqs_completed: int
+ last: int = 0
+ fps: float
-def do_cmd_list_controls(ctx):
- camera = ctx['camera']
+ def __init__(self, camera, idx):
+ self.camera = camera
+ self.idx = idx
+ self.id = 'cam' + str(idx)
+ self.reqs_queued = 0
+ self.reqs_completed = 0
- print('Controls for', ctx['id'])
+ def do_cmd_list_props(self):
+ print('Properties for', self.id)
- for name, prop in camera.controls.items():
- print('\t{}: {}'.format(name, prop))
+ for name, prop in self.camera.properties.items():
+ print('\t{}: {}'.format(name, prop))
+ def do_cmd_list_controls(self):
+ print('Controls for', self.id)
-def do_cmd_info(ctx):
- camera = ctx['camera']
+ for name, prop in self.camera.controls.items():
+ print('\t{}: {}'.format(name, prop))
- print('Stream info for', ctx['id'])
+ def do_cmd_info(self):
+ print('Stream info for', self.id)
- roles = [libcam.StreamRole.Viewfinder]
+ roles = [libcam.StreamRole.Viewfinder]
- camconfig = camera.generate_configuration(roles)
- if camconfig is None:
- raise Exception('Generating config failed')
+ camconfig = self.camera.generate_configuration(roles)
+ if camconfig is None:
+ raise Exception('Generating config failed')
- for i, stream_config in enumerate(camconfig):
- print('\t{}: {}'.format(i, stream_config))
+ for i, stream_config in enumerate(camconfig):
+ print('\t{}: {}'.format(i, stream_config))
- formats = stream_config.formats
- for fmt in formats.pixel_formats:
- print('\t * Pixelformat:', fmt, formats.range(fmt))
+ formats = stream_config.formats
+ for fmt in formats.pixel_formats:
+ print('\t * Pixelformat:', fmt, formats.range(fmt))
- for size in formats.sizes(fmt):
- print('\t -', size)
+ for size in formats.sizes(fmt):
+ print('\t -', size)
+ def acquire(self):
+ self.camera.acquire()
-def acquire(ctx):
- camera = ctx['camera']
+ def release(self):
+ self.camera.release()
- camera.acquire()
+ def __parse_streams(self):
+ streams = []
+ for stream_desc in self.opt_stream:
+ stream_opts: dict[str, Any]
+ stream_opts = {'role': libcam.StreamRole.Viewfinder}
-def release(ctx):
- camera = ctx['camera']
+ for stream_opt in stream_desc.split(','):
+ if stream_opt == 0:
+ continue
- camera.release()
-
-
-def parse_streams(ctx):
- streams = []
-
- for stream_desc in ctx['opt-stream']:
- stream_opts = {'role': libcam.StreamRole.Viewfinder}
-
- for stream_opt in stream_desc.split(','):
- if stream_opt == 0:
- continue
-
- arr = stream_opt.split('=')
- if len(arr) != 2:
- print('Bad stream option', stream_opt)
- sys.exit(-1)
-
- key = arr[0]
- value = arr[1]
-
- if key in ['width', 'height']:
- value = int(value)
- elif key == 'role':
- rolemap = {
- 'still': libcam.StreamRole.StillCapture,
- 'raw': libcam.StreamRole.Raw,
- 'video': libcam.StreamRole.VideoRecording,
- 'viewfinder': libcam.StreamRole.Viewfinder,
- }
-
- role = rolemap.get(value.lower(), None)
-
- if role is None:
- print('Bad stream role', value)
+ arr = stream_opt.split('=')
+ if len(arr) != 2:
+ print('Bad stream option', stream_opt)
sys.exit(-1)
- value = role
- elif key == 'pixelformat':
- pass
- else:
- print('Bad stream option key', key)
- sys.exit(-1)
-
- stream_opts[key] = value
-
- streams.append(stream_opts)
+ key = arr[0]
+ value = arr[1]
+
+ if key in ['width', 'height']:
+ value = int(value)
+ elif key == 'role':
+ rolemap = {
+ 'still': libcam.StreamRole.StillCapture,
+ 'raw': libcam.StreamRole.Raw,
+ 'video': libcam.StreamRole.VideoRecording,
+ 'viewfinder': libcam.StreamRole.Viewfinder,
+ }
+
+ role = rolemap.get(value.lower(), None)
+
+ if role is None:
+ print('Bad stream role', value)
+ sys.exit(-1)
+
+ value = role
+ elif key == 'pixelformat':
+ pass
+ else:
+ print('Bad stream option key', key)
+ sys.exit(-1)
- return streams
+ stream_opts[key] = value
+ streams.append(stream_opts)
-def configure(ctx):
- camera = ctx['camera']
+ return streams
- streams = parse_streams(ctx)
+ def configure(self):
+ streams = self.__parse_streams()
- roles = [opts['role'] for opts in streams]
+ roles = [opts['role'] for opts in streams]
- camconfig = camera.generate_configuration(roles)
- if camconfig is None:
- raise Exception('Generating config failed')
+ camconfig = self.camera.generate_configuration(roles)
+ if camconfig is None:
+ raise Exception('Generating config failed')
- for idx, stream_opts in enumerate(streams):
- stream_config = camconfig.at(idx)
+ for idx, stream_opts in enumerate(streams):
+ stream_config = camconfig.at(idx)
- if 'width' in stream_opts:
- stream_config.size.width = stream_opts['width']
+ if 'width' in stream_opts:
+ stream_config.size.width = stream_opts['width']
- if 'height' in stream_opts:
- stream_config.size.height = stream_opts['height']
+ if 'height' in stream_opts:
+ stream_config.size.height = stream_opts['height']
- if 'pixelformat' in stream_opts:
- stream_config.pixel_format = libcam.PixelFormat(stream_opts['pixelformat'])
+ if 'pixelformat' in stream_opts:
+ stream_config.pixel_format = libcam.PixelFormat(stream_opts['pixelformat'])
- stat = camconfig.validate()
+ stat = camconfig.validate()
- if stat == libcam.CameraConfiguration.Status.Invalid:
- print('Camera configuration invalid')
- exit(-1)
- elif stat == libcam.CameraConfiguration.Status.Adjusted:
- if ctx['opt-strict-formats']:
- print('Adjusting camera configuration disallowed by --strict-formats argument')
+ if stat == libcam.CameraConfiguration.Status.Invalid:
+ print('Camera configuration invalid')
exit(-1)
+ elif stat == libcam.CameraConfiguration.Status.Adjusted:
+ if self.opt_strict_formats:
+ print('Adjusting camera configuration disallowed by --strict-formats argument')
+ exit(-1)
- print('Camera configuration adjusted')
-
- r = camera.configure(camconfig)
- if r != 0:
- raise Exception('Configure failed')
-
- ctx['stream-names'] = {}
- ctx['streams'] = []
-
- for idx, stream_config in enumerate(camconfig):
- stream = stream_config.stream
- ctx['streams'].append(stream)
- ctx['stream-names'][stream] = 'stream' + str(idx)
- print('{}-{}: stream config {}'.format(ctx['id'], ctx['stream-names'][stream], stream.configuration))
-
-
-def alloc_buffers(ctx):
- camera = ctx['camera']
-
- allocator = libcam.FrameBufferAllocator(camera)
+ print('Camera configuration adjusted')
- for idx, stream in enumerate(ctx['streams']):
- ret = allocator.allocate(stream)
- if ret < 0:
- print('Cannot allocate buffers')
- exit(-1)
+ r = self.camera.configure(camconfig)
+ if r != 0:
+ raise Exception('Configure failed')
- allocated = len(allocator.buffers(stream))
+ self.stream_names = {}
+ self.streams = []
- print('{}-{}: Allocated {} buffers'.format(ctx['id'], ctx['stream-names'][stream], allocated))
+ for idx, stream_config in enumerate(camconfig):
+ stream = stream_config.stream
+ self.streams.append(stream)
+ self.stream_names[stream] = 'stream' + str(idx)
+ print('{}-{}: stream config {}'.format(self.id, self.stream_names[stream], stream.configuration))
- ctx['allocator'] = allocator
+ def alloc_buffers(self):
+ allocator = libcam.FrameBufferAllocator(self.camera)
+ for stream in self.streams:
+ ret = allocator.allocate(stream)
+ if ret < 0:
+ print('Cannot allocate buffers')
+ exit(-1)
-def create_requests(ctx):
- camera = ctx['camera']
+ allocated = len(allocator.buffers(stream))
- ctx['requests'] = []
+ print('{}-{}: Allocated {} buffers'.format(self.id, self.stream_names[stream], allocated))
- # Identify the stream with the least number of buffers
- num_bufs = min([len(ctx['allocator'].buffers(stream)) for stream in ctx['streams']])
+ self.allocator = allocator
- requests = []
+ def create_requests(self):
+ self.requests = []
- for buf_num in range(num_bufs):
- request = camera.create_request(ctx['idx'])
+ # Identify the stream with the least number of buffers
+ num_bufs = min([len(self.allocator.buffers(stream)) for stream in self.streams])
- if request is None:
- print('Can not create request')
- exit(-1)
+ requests = []
- for stream in ctx['streams']:
- buffers = ctx['allocator'].buffers(stream)
- buffer = buffers[buf_num]
+ for buf_num in range(num_bufs):
+ request = self.camera.create_request(self.idx)
- ret = request.add_buffer(stream, buffer)
- if ret < 0:
- print('Can not set buffer for request')
+ if request is None:
+ print('Can not create request')
exit(-1)
- requests.append(request)
+ for stream in self.streams:
+ buffers = self.allocator.buffers(stream)
+ buffer = buffers[buf_num]
- ctx['requests'] = requests
+ ret = request.add_buffer(stream, buffer)
+ if ret < 0:
+ print('Can not set buffer for request')
+ exit(-1)
+ requests.append(request)
-def start(ctx):
- camera = ctx['camera']
+ self.requests = requests
- camera.start()
+ def start(self):
+ self.camera.start()
+ def stop(self):
+ self.camera.stop()
-def stop(ctx):
- camera = ctx['camera']
+ def queue_requests(self):
+ for request in self.requests:
+ self.camera.queue_request(request)
+ self.reqs_queued += 1
- camera.stop()
+ del self.requests
-def queue_requests(ctx):
- camera = ctx['camera']
+class CaptureState:
+ cm: libcam.CameraManager
+ contexts: list[CameraContext]
+ renderer: Any
- for request in ctx['requests']:
- camera.queue_request(request)
- ctx['reqs-queued'] += 1
+ def __init__(self, cm, contexts):
+ self.cm = cm
+ self.contexts = contexts
- del ctx['requests']
+ # Called from renderer when there is a libcamera event
+ def event_handler(self):
+ try:
+ self.cm.read_event()
+ reqs = self.cm.get_ready_requests()
-def capture_init(contexts):
- for ctx in contexts:
- acquire(ctx)
+ for req in reqs:
+ ctx = next(ctx for ctx in self.contexts if ctx.idx == req.cookie)
+ self.__request_handler(ctx, req)
- for ctx in contexts:
- configure(ctx)
+ running = any(ctx.reqs_completed < ctx.opt_capture for ctx in self.contexts)
+ return running
+ except Exception:
+ traceback.print_exc()
+ return False
- for ctx in contexts:
- alloc_buffers(ctx)
+ def __request_handler(self, ctx, req):
+ if req.status != libcam.Request.Status.Complete:
+ raise Exception('{}: Request failed: {}'.format(ctx.id, req.status))
- for ctx in contexts:
- create_requests(ctx)
+ buffers = req.buffers
+ # Compute the frame rate. The timestamp is arbitrarily retrieved from
+ # the first buffer, as all buffers should have matching timestamps.
+ ts = buffers[next(iter(buffers))].metadata.timestamp
+ last = ctx.last
+ fps = 1000000000.0 / (ts - last) if (last != 0 and (ts - last) != 0) else 0
+ ctx.last = ts
+ ctx.fps = fps
-def capture_start(contexts):
- for ctx in contexts:
- start(ctx)
+ for stream, fb in buffers.items():
+ stream_name = ctx.stream_names[stream]
- for ctx in contexts:
- queue_requests(ctx)
+ crcs = []
+ if ctx.opt_crc:
+ with libcamera.utils.MappedFrameBuffer(fb) as mfb:
+ plane_crcs = [binascii.crc32(p) for p in mfb.planes]
+ crcs.append(plane_crcs)
+ meta = fb.metadata
-# Called from renderer when there is a libcamera event
-def event_handler(state):
- try:
- cm = state['cm']
- contexts = state['contexts']
+ print('{:.6f} ({:.2f} fps) {}-{}: seq {}, bytes {}, CRCs {}'
+ .format(ts / 1000000000, fps,
+ ctx.id, stream_name,
+ meta.sequence, meta.bytesused,
+ crcs))
- cm.read_event()
+ if ctx.opt_metadata:
+ reqmeta = req.metadata
+ for ctrl, val in reqmeta.items():
+ print(f'\t{ctrl} = {val}')
- reqs = cm.get_ready_requests()
+ if ctx.opt_save_frames:
+ with libcamera.utils.MappedFrameBuffer(fb) as mfb:
+ filename = 'frame-{}-{}-{}.data'.format(ctx.id, stream_name, ctx.reqs_completed)
+ with open(filename, 'wb') as f:
+ for p in mfb.planes:
+ f.write(p)
- for req in reqs:
- ctx = next(ctx for ctx in contexts if ctx['idx'] == req.cookie)
- request_handler(state, ctx, req)
+ self.renderer.request_handler(ctx, req)
- running = any(ctx['reqs-completed'] < ctx['opt-capture'] for ctx in contexts)
- return running
- except Exception:
- traceback.print_exc()
- return False
+ ctx.reqs_completed += 1
+ # Called from renderer when it has finished with a request
+ def request_processed(self, ctx, req):
+ if ctx.reqs_queued < ctx.opt_capture:
+ req.reuse()
+ ctx.camera.queue_request(req)
+ ctx.reqs_queued += 1
-def request_handler(state, ctx, req):
- if req.status != libcam.Request.Status.Complete:
- raise Exception('{}: Request failed: {}'.format(ctx['id'], req.status))
+ def __capture_init(self):
+ for ctx in self.contexts:
+ ctx.acquire()
- buffers = req.buffers
+ for ctx in self.contexts:
+ ctx.configure()
- # Compute the frame rate. The timestamp is arbitrarily retrieved from
- # the first buffer, as all buffers should have matching timestamps.
- ts = buffers[next(iter(buffers))].metadata.timestamp
- last = ctx.get('last', 0)
- fps = 1000000000.0 / (ts - last) if (last != 0 and (ts - last) != 0) else 0
- ctx['last'] = ts
- ctx['fps'] = fps
+ for ctx in self.contexts:
+ ctx.alloc_buffers()
- for stream, fb in buffers.items():
- stream_name = ctx['stream-names'][stream]
+ for ctx in self.contexts:
+ ctx.create_requests()
- crcs = []
- if ctx['opt-crc']:
- with libcamera.utils.MappedFrameBuffer(fb) as mfb:
- plane_crcs = [binascii.crc32(p) for p in mfb.planes]
- crcs.append(plane_crcs)
+ def __capture_start(self):
+ for ctx in self.contexts:
+ ctx.start()
- meta = fb.metadata
+ for ctx in self.contexts:
+ ctx.queue_requests()
- print('{:.6f} ({:.2f} fps) {}-{}: seq {}, bytes {}, CRCs {}'
- .format(ts / 1000000000, fps,
- ctx['id'], stream_name,
- meta.sequence, meta.bytesused,
- crcs))
+ def __capture_deinit(self):
+ for ctx in self.contexts:
+ ctx.stop()
- if ctx['opt-metadata']:
- reqmeta = req.metadata
- for ctrl, val in reqmeta.items():
- print(f'\t{ctrl} = {val}')
+ for ctx in self.contexts:
+ ctx.release()
- if ctx['opt-save-frames']:
- with libcamera.utils.MappedFrameBuffer(fb) as mfb:
- filename = 'frame-{}-{}-{}.data'.format(ctx['id'], stream_name, ctx['reqs-completed'])
- with open(filename, 'wb') as f:
- for p in mfb.planes:
- f.write(p)
+ def do_cmd_capture(self):
+ self.__capture_init()
- state['renderer'].request_handler(ctx, req)
+ self.renderer.setup()
- ctx['reqs-completed'] += 1
+ self.__capture_start()
+ self.renderer.run()
-# Called from renderer when it has finished with a request
-def request_prcessed(ctx, req):
- camera = ctx['camera']
+ self.__capture_deinit()
- if ctx['reqs-queued'] < ctx['opt-capture']:
- req.reuse()
- camera.queue_request(req)
- ctx['reqs-queued'] += 1
+class CustomAction(argparse.Action):
+ def __init__(self, option_strings, dest, **kwargs):
+ super().__init__(option_strings, dest, default={}, **kwargs)
-def capture_deinit(contexts):
- for ctx in contexts:
- stop(ctx)
+ def __call__(self, parser, namespace, values, option_string=None):
+ if len(namespace.camera) == 0:
+ print(f'Option {option_string} requires a --camera context')
+ sys.exit(-1)
- for ctx in contexts:
- release(ctx)
+ if self.type == bool:
+ values = True
+ current = namespace.camera[-1]
-def do_cmd_capture(state):
- capture_init(state['contexts'])
+ data = getattr(namespace, self.dest)
- renderer = state['renderer']
+ if self.nargs == '+':
+ if current not in data:
+ data[current] = []
- renderer.setup()
+ data[current] += values
+ else:
+ data[current] = values
- capture_start(state['contexts'])
- renderer.run()
+def do_cmd_list(cm):
+ print('Available cameras:')
- capture_deinit(state['contexts'])
+ for idx, c in enumerate(cm.cameras):
+ print(f'{idx + 1}: {c.id}')
def main():
@@ -422,39 +415,28 @@ def main():
print('Unable to find camera', cam_idx)
return -1
- contexts.append({
- 'camera': camera,
- 'idx': cam_idx,
- 'id': 'cam' + str(cam_idx),
- 'reqs-queued': 0,
- 'reqs-completed': 0,
- 'opt-capture': args.capture.get(cam_idx, False),
- 'opt-crc': args.crc.get(cam_idx, False),
- 'opt-save-frames': args.save_frames.get(cam_idx, False),
- 'opt-metadata': args.metadata.get(cam_idx, False),
- 'opt-strict-formats': args.strict_formats.get(cam_idx, False),
- 'opt-stream': args.stream.get(cam_idx, ['role=viewfinder']),
- })
+ ctx = CameraContext(camera, cam_idx)
+ ctx.opt_capture = args.capture.get(cam_idx, 0)
+ ctx.opt_crc = args.crc.get(cam_idx, False)
+ ctx.opt_save_frames = args.save_frames.get(cam_idx, False)
+ ctx.opt_metadata = args.metadata.get(cam_idx, False)
+ ctx.opt_strict_formats = args.strict_formats.get(cam_idx, False)
+ ctx.opt_stream = args.stream.get(cam_idx, ['role=viewfinder'])
+ contexts.append(ctx)
for ctx in contexts:
- print('Using camera {} as {}'.format(ctx['camera'].id, ctx['id']))
+ print('Using camera {} as {}'.format(ctx.camera.id, ctx.id))
for ctx in contexts:
if args.list_properties:
- do_cmd_list_props(ctx)
+ ctx.do_cmd_list_props()
if args.list_controls:
- do_cmd_list_controls(ctx)
+ ctx.do_cmd_list_controls()
if args.info:
- do_cmd_info(ctx)
+ ctx.do_cmd_info()
if args.capture:
-
- state = {
- 'cm': cm,
- 'contexts': contexts,
- 'event_handler': event_handler,
- 'request_prcessed': request_prcessed,
- }
+ state = CaptureState(cm, contexts)
if args.renderer == 'null':
import cam_null
@@ -472,9 +454,9 @@ def main():
print('Bad renderer', args.renderer)
return -1
- state['renderer'] = renderer
+ state.renderer = renderer
- do_cmd_capture(state)
+ state.do_cmd_capture()
return 0
@@ -10,8 +10,8 @@ class KMSRenderer:
def __init__(self, state):
self.state = state
- self.cm = state['cm']
- self.contexts = state['contexts']
+ self.cm = state.cm
+ self.contexts = state.contexts
self.running = False
card = pykms.Card()
@@ -92,7 +92,7 @@ class KMSRenderer:
if old:
req = old['camreq']
ctx = old['camctx']
- self.state['request_prcessed'](ctx, req)
+ self.state.request_processed(ctx, req)
def queue(self, drmreq):
if not self.next:
@@ -108,7 +108,7 @@ class KMSRenderer:
idx = 0
for ctx in self.contexts:
- for stream in ctx['streams']:
+ for stream in ctx.streams:
cfg = stream.configuration
fmt = cfg.pixel_format
@@ -125,7 +125,7 @@ class KMSRenderer:
'size': cfg.size,
})
- for fb in ctx['allocator'].buffers(stream):
+ for fb in ctx.allocator.buffers(stream):
w = cfg.size.width
h = cfg.size.height
fds = []
@@ -148,7 +148,7 @@ class KMSRenderer:
self.handle_page_flip(ev.seq, ev.time)
def readcam(self, fd):
- self.running = self.state['event_handler'](self.state)
+ self.running = self.state.event_handler()
def readkey(self, fileobj):
sys.stdin.readline()
@@ -9,8 +9,8 @@ class NullRenderer:
def __init__(self, state):
self.state = state
- self.cm = state['cm']
- self.contexts = state['contexts']
+ self.cm = state.cm
+ self.contexts = state.contexts
self.running = False
@@ -37,11 +37,11 @@ class NullRenderer:
print('Exiting...')
def readcam(self, fd):
- self.running = self.state['event_handler'](self.state)
+ self.running = self.state.event_handler()
def readkey(self, fileobj):
sys.stdin.readline()
self.running = False
def request_handler(self, ctx, req):
- self.state['request_prcessed'](ctx, req)
+ self.state.request_processed(ctx, req)
@@ -176,8 +176,8 @@ class QtRenderer:
def __init__(self, state):
self.state = state
- self.cm = state['cm']
- self.contexts = state['contexts']
+ self.cm = state.cm
+ self.contexts = state.contexts
def setup(self):
self.app = QtWidgets.QApplication([])
@@ -185,7 +185,7 @@ class QtRenderer:
windows = []
for ctx in self.contexts:
- for stream in ctx['streams']:
+ for stream in ctx.streams:
window = MainWindow(ctx, stream)
window.show()
windows.append(window)
@@ -206,7 +206,7 @@ class QtRenderer:
print('Exiting...')
def readcam(self):
- running = self.state['event_handler'](self.state)
+ running = self.state.event_handler()
if not running:
self.app.quit()
@@ -223,7 +223,7 @@ class QtRenderer:
wnd.handle_request(stream, fb)
- self.state['request_prcessed'](ctx, req)
+ self.state.request_processed(ctx, req)
def cleanup(self):
for w in self.windows:
@@ -254,7 +254,7 @@ class MainWindow(QtWidgets.QWidget):
group.setLayout(groupLayout)
controlsLayout.addWidget(group)
- lab = QtWidgets.QLabel(ctx['id'])
+ lab = QtWidgets.QLabel(ctx.id)
groupLayout.addWidget(lab)
self.frameLabel = QtWidgets.QLabel()
@@ -265,7 +265,7 @@ class MainWindow(QtWidgets.QWidget):
group.setLayout(groupLayout)
controlsLayout.addWidget(group)
- camera = ctx['camera']
+ camera = ctx.camera
for k, v in camera.properties.items():
lab = QtWidgets.QLabel()
@@ -308,4 +308,4 @@ class MainWindow(QtWidgets.QWidget):
self.label.setPixmap(pix)
self.frameLabel.setText('Queued: {}\nDone: {}\nFps: {:.2f}'
- .format(ctx['reqs-queued'], ctx['reqs-completed'], ctx['fps']))
+ .format(ctx.reqs_queued, ctx.reqs_completed, ctx.fps))
@@ -142,7 +142,7 @@ class QtRenderer:
self.window = window
def run(self):
- camnotif = QtCore.QSocketNotifier(self.state['cm'].efd, QtCore.QSocketNotifier.Read)
+ camnotif = QtCore.QSocketNotifier(self.state.cm.efd, QtCore.QSocketNotifier.Read)
camnotif.activated.connect(lambda _: self.readcam())
keynotif = QtCore.QSocketNotifier(sys.stdin.fileno(), QtCore.QSocketNotifier.Read)
@@ -155,7 +155,7 @@ class QtRenderer:
print('Exiting...')
def readcam(self):
- running = self.state['event_handler'](self.state)
+ running = self.state.event_handler()
if not running:
self.app.quit()
@@ -184,12 +184,12 @@ class MainWindow(QtWidgets.QWidget):
self.reqqueue = {}
self.current = {}
- for ctx in self.state['contexts']:
+ for ctx in self.state.contexts:
- self.reqqueue[ctx['idx']] = []
- self.current[ctx['idx']] = []
+ self.reqqueue[ctx.idx] = []
+ self.current[ctx.idx] = []
- for stream in ctx['streams']:
+ for stream in ctx.streams:
self.textures[stream] = None
num_tiles = len(self.textures)
@@ -312,12 +312,12 @@ class MainWindow(QtWidgets.QWidget):
if len(queue) == 0:
continue
- ctx = next(ctx for ctx in self.state['contexts'] if ctx['idx'] == ctx_idx)
+ ctx = next(ctx for ctx in self.state.contexts if ctx.idx == ctx_idx)
if self.current[ctx_idx]:
old = self.current[ctx_idx]
self.current[ctx_idx] = None
- self.state['request_prcessed'](ctx, old)
+ self.state.request_processed(ctx, old)
next_req = queue.pop(0)
self.current[ctx_idx] = next_req
@@ -336,8 +336,8 @@ class MainWindow(QtWidgets.QWidget):
size = self.size()
- for idx, ctx in enumerate(self.state['contexts']):
- for stream in ctx['streams']:
+ for idx, ctx in enumerate(self.state.contexts):
+ for stream in ctx.streams:
if self.textures[stream] is None:
continue
@@ -359,5 +359,5 @@ class MainWindow(QtWidgets.QWidget):
assert(b)
def handle_request(self, ctx, req):
- self.reqqueue[ctx['idx']].append(req)
+ self.reqqueue[ctx.idx].append(req)
self.update()