Konubinix' opinionated web of thoughts

Get Camera With Android and Kivy

Fleeting

get camera with android and kivy

In general, I will prefer using streaming video through webrtc, yet let’s just dig into that rabbit hole in case it might be useful one day.

analysis

kivy.uix.camera.Camera wraps (in the _camera attribute) kivy.core.camera.Camera that is extended by kivy.core.camera.camera_android.CameraAndroid

For the time being, it only wrapped the deprecated Camera api, not the camera2.

CameraAndroid provides low level functions.

It also provides the following

[...]

def grab_frame(self):
    """
    Grab current frame (thread-safe, minimal overhead)
    """
    with self._buflock:
        if self._buffer is None:
            return None
        buf = self._buffer.tostring()
        return buf

def decode_frame(self, buf):
    """
    Decode image data from grabbed frame.

    This method depends on OpenCV and NumPy - however it is only used for
    fetching the current frame as a NumPy array, and not required when
    this :class:`CameraAndroid` provider is simply used by a
    :class:`~kivy.uix.camera.Camera` widget.
    """
    import numpy as np
    from cv2 import cvtColor

    w, h = self._resolution
    arr = np.fromstring(buf, 'uint8').reshape((h + h / 2, w))
    arr = cvtColor(arr, 93)  # NV21 -> BGR
    return arr

def read_frame(self):
    """
    Grab and decode frame in one call
    """
    return self.decode_frame(self.grab_frame())


[...]

Calling read_frame will fail on the call to reshape((h + h / 2, w)) because h + h / 2 is a float and an int is expected, casting to an int helps, but I did not provide a pull request yet, so let’s workaround this for now.

Besides, I feel like depending on cv2 and numpy available in the phone might not be a good idea.

Therefore, I’d rather send the raw data somewhere more conventional and process the data there.

the code

Based on this analysis, we can write this code.

from android.permissions import Permission, request_permissions

from kivy.uix.camera import Camera
import cv2
import numpy as np

request_permissions([Permission.CAMERA])
cam = Camera(resolution=(1680, 1256))._camera
cam.start()

w, h = cam._resolution

cv2.imwrite("/tmp/test.png",
            cv2.cvtColor(
                np.fromstring(
                    cam.grab_frame(), 'uint8
                ).reshape(
                    (int(h + h / 2),
                     w)
                ),
                cv2.COLOR_YUV2BGR_NV21)
            )

To list the available resolutions, you can run this:

from jnius import autoclass


def list_camera_resolutions():
    # Access the Camera class
    Camera = autoclass('android.hardware.Camera')
    CameraInfo = autoclass('android.hardware.Camera$CameraInfo')

    # Open the camera (use camera index 0 for the rear camera)
    camera = Camera.open(0)
    params = camera.getParameters()

    # Get supported picture sizes
    sizes = params.getSupportedPictureSizes()

    # Extract width and height from each size
    if hasattr(sizes, "__iter__"):
        resolutions = [(size.width, size.height) for size in sizes]
    else: # the old version of jnius in the old poc does not provide iterators
        resolutions = []
        for i in range(sizes.size()):
            resolutions.append((
                sizes.get(i).width,
                sizes.get(i).height,
            ))

    # Release the camera after use
    camera.release()

    return resolutions

This cannot be run in a service, or kivy will complain with

11-04 10:03:21.472 11403 11425 I poc     : [CRITICAL] [Window      ] Unable to find any valuable Window provider. Please enable debug logging (e.g. add -d if running from the command line, or change the log level in the config) and re-run your app to identify potential causes
11-04 10:03:21.472 11403 11425 I poc     : sdl2 - RuntimeError: b"Application didn't initialize properly, did you include SDL_main.h in the file containing your main() function?"
11-04 10:03:21.473 11403 11425 I poc     :   File "/app/.buildozer/android/platform/build-arm64-v8a_armeabi-v7a/build/python-installs/poc/arm64-v8a/kivy/core/__init__.py", line 71, in core_select_lib
11-04 10:03:21.473 11403 11425 I poc     :   File "/app/.buildozer/android/platform/build-arm64-v8a_armeabi-v7a/build/python-installs/poc/arm64-v8a/kivy/core/window/window_sdl2.py", line 165, in __init__
11-04 10:03:21.473 11403 11425 I poc     :   File "/app/.buildozer/android/platform/build-arm64-v8a_armeabi-v7a/build/python-installs/poc/arm64-v8a/kivy/core/window/__init__.py", line 1129, in __init__
11-04 10:03:21.473 11403 11425 I poc     :   File "/app/.buildozer/android/platform/build-arm64-v8a_armeabi-v7a/build/python-installs/poc/arm64-v8a/kivy/core/window/window_sdl2.py", line 316, in create_window
11-04 10:03:21.473 11403 11425 I poc     :   File "kivy/core/window/_window_sdl2.pyx", line 121, in kivy.core.window._window_sdl2._WindowSDL2Storage.setup_window
11-04 10:03:21.473 11403 11425 I poc     :   File "kivy/core/window/_window_sdl2.pyx", line 76, in kivy.core.window._window_sdl2._WindowSDL2Storage.die
11-04 10:03:21.473 11403 11425 I poc     :
11-04 10:03:21.473 11403 11425 I poc     : [CRITICAL] [App         ] Unable to get a Window, abort.
11-04 10:03:21.694  1884  2629 I ActivityManager: Process eu.konix.poc:service_poc (pid 11403) has died: prcp FGS

But it still works even if the screen is off, making it a good choice to preserve the phone battery.

on an old phone

On my wiko cink peax, with python2 and an old kivy, the camera will update only when the application is not waiting for input. That means that I cannot put a rpyc endpoint and trigger manually a photo from there. Yet, the followings still works and dumps a photograph every 5 seconds.

The camera takes a long time to start, hence the debug prints and the vibration when it is ready. Also, it might need a few stop/start to work correctly. As soon as you see the capture button, it should be good to go.

import time

import plyer
from kivy.app import App
from kivy.clock import Clock
from kivy.lang import Builder
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.camera import Camera

Builder.load_string("""
<CameraClick>:
    orientation: "vertical"
    Button:
        text: "Capture"
        size_hint_y: None
        height: "48dp"
        on_press: root.capture()
""")


class CameraClick(BoxLayout):

    def __init__(self):
        super(CameraClick, self).__init__()
        self.cam = None
        Clock.schedule_interval(self.capture, 5)

    def capture(self, *args):
        if not self.cam:
            print("AAAAAAAAAAAAAAAAAAAAAAAA")
            self.cam = Camera(resolution=(1680, 1256))._camera
            print("BBBBBBBBBBBBBBBB")
            self.cam.start()
            print("CCCCCCCCCCCCCCCCCCCCC")
            plyer.vibrator.vibrate()

        frame = self.cam.grab_frame()
        if frame:
            print("AAAAAAA  Got one")
            open("/sdcard/test.txt", "w").write(frame)
        else:
            print("AAAAAAA  Got none")


class TestCamera(App):

    def build(self):
        return CameraClick()


def run():
    TestCamera().run()

Also, it won’t work when locking the phone and will hang on unlocking.

digging a bit the old phone idea: create a timelapse

Put the following code in the phone

import time

import plyer
from kivy.app import App
from kivy.clock import Clock
from kivy.lang import Builder
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.camera import Camera
import os
from datetime import datetime
from kivy.uix.screenmanager import Screen, ScreenManagerException, ScreenManager

Builder.load_string("""
<Timelapse>:
    orientation: "vertical"

    BoxLayout:
        orientation: "horizontal"
        Label:
            text: "width"

        TextInput:
            # setting the id of the widget
            id: width
            text: '640'
    BoxLayout:
        orientation: "horizontal"
        Label:
            text: "height"

        TextInput:
            # setting the id of the widget
            id: height
            text: '480'

    BoxLayout:
        orientation: "horizontal"
        Label:
            text: "fps"

        TextInput:
            # setting the id of the widget
            id: period
            text: '0.5'

    Spinner:
        id: action
        values: "upload", "save"
        text: "save"

    Label:
        id: message
        text: 'Press start to begin'

    Button:
        id: toggle
        text: "Start"
        size_hint_y: None
        height: "48dp"
        on_press: root.toggle()
""")

class TimelapseScreen(Screen):
    def __init__(self, *args, **kwargs):
        super(TimelapseScreen, self).__init__(*args, **kwargs)
        self.timelapse = Timelapse()
        self.add_widget(self.timelapse)
        self.bind(on_pre_leave=self.pre_leave_handler)

    def pre_leave_handler(self, *args):
        self.timelapse.stop()

class Timelapse(BoxLayout):

    def __init__(self):
        super(Timelapse, self).__init__()
        self.cam = None
        self.clock = None

    def stop(self):
        if self.cam:
            self.cam._release_camera()
            self.cam = None
        if self.clock:
            Clock.unschedule(self.clock)
            self.clock = None
        self.ids.message.text = "Press start to begin"

    def start(self, *args):
        self.clock = Clock.schedule_interval(self.capture, float(self.ids.period.text))

    def toggle(self, *args):
        if self.clock:
            self.stop()
            self.ids.toggle.text = "Start"
        else:
            self.start()
            self.ids.toggle.text = "Stop"

    def capture(self, *args):
        if not self.cam:
            print("Initializing Camera")
            self.cam = Camera(
                resolution=(int(self.ids.width.text),
                            int(self.ids.height.text)),
                index=1,
            )._camera
            self.cam.start()
            plyer.vibrator.vibrate()

        frame = self.cam.grab_frame()
        if frame:
            print("Frame captured")
            filename = self.generate_filename()
            getattr(self, self.ids.action.text)(frame, filename)
            self.ids.message.text = "{}: {}".format(self.ids.action.text, filename)
        else:
            print("No frame captured")

    def generate_filename(self):
        now = datetime.now()
        formatted_time = now.strftime("%Y%m%d_%H%M%S") + ".{:06d}".format(now.microsecond)
        filename = "{}_{}x{}.bin".format(formatted_time, self.ids.width.text,
                                         self.ids.height.text)
        return filename

    def save(self, frame, filename):
        open(os.path.join("/sdcard", filename), "wb").write(frame)

    def upload(self, frame, filename):
        import urllib2

        url = "http://192.168.1.245:9999/upload?filename={}".format(filename)
        headers = {'Content-Type': 'application/octet-stream'}

        request = urllib2.Request(url, frame, headers)

        try:
            response = urllib2.urlopen(request)
            print("Upload successful: ", response.read())
        except urllib2.URLError as e:
            print("Upload failed: ", e)


class TimelapseApp(App):

    @staticmethod
    def populate(sm):
        try:
            sm.get_screen('timelapse')
            return
        except ScreenManagerException:
            pass # not populated yet

        sm.add_widget(TimelapseScreen(name='timelapse'))

    def build(self):
        sm = ScreenManager()
        self.populate(sm)
        return sm


def run():
    TimelapseApp().run()

This assumes that the server’s ip address is 192.168.1.245. About the number of images per seconds, your mileage may vary. With the wiko cink peax in my network, I can go until 5 upload per seconds.

Then, run locally a server to get the uploaded content, fortunately, I already have a docker image to upload data.

docker run -ti -e DIR=/upload/ -e PORT=9999 -p 9999:9999 -v $(pwd):/upload konubinix/uploader:0.1.1

Then post process all the files, with some code like this

import os
from pathlib import Path

import cv2
import numpy as np


def process_bin_files(directory):
    bin_files = Path(directory).glob('*.bin')

    for bin_file in bin_files:
        with bin_file.open('rb') as f:
            try:
                base_name = bin_file.stem
                _, dimensions = base_name.rsplit('_', 1)
                w, h = map(int, dimensions.split('x'))
            except ValueError:
                print(
                    f"Error parsing dimensions from filename: {bin_file.name}")
                continue

            png_filename = bin_file.with_suffix('.png')
            cv2.imwrite(
                str(png_filename),
                cv2.cvtColor(
                    np.frombuffer(f.read(), dtype='uint8').reshape(
                        (int(h + h / 2), w)), cv2.COLOR_YUV2BGR_NV21))

        print(f"Processed {bin_file.name} and saved as {png_filename.name}")

        bin_file.unlink()
        print(f"Deleted original file: {bin_file.name}")


if __name__ == "__main__":
    process_bin_files(".")

Let’s call that file parser. You simply need to install opencv and run it

python3 -m venv venv
source ./venv/bin/activate
python3 -m pip install opencv-python
python3 parser.py

Now, let’s get even deeper and create a video from those images. (timelapse)

import cv2
import os
from pathlib import Path

def create_video_from_images(image_folder, output_file, fps):
    # Get a list of all PNG files in the specified folder
    image_files = sorted(Path(image_folder).glob('*.png'))  # Sort to ensure correct order

    # Check if there are images
    if not image_files:
        print("No PNG files found in the specified folder.")
        return

    # Read the first image to get the width and height
    first_image = cv2.imread(str(image_files[0]))
    height, width, layers = first_image.shape

    # Define the codec and create a VideoWriter object
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Codec for MP4 format
    video = cv2.VideoWriter(output_file, fourcc, fps, (width, height))

    # Write each image to the video
    for image_file in image_files:
        img = cv2.imread(str(image_file))
        video.write(img)  # Add image to the video

    # Release the video writer
    video.release()
    print(f"Video created successfully: {output_file}")

if __name__ == "__main__":
    # Specify the folder containing PNG files, output video file name, and FPS
    image_folder = "."  # Current directory
    output_file = "output_video.mp4"
    fps = 30  # Specify frames per second

    create_video_from_images(image_folder, output_file, fps)