Source code for pyardrone.navdata

from ctypes import sizeof
from types import SimpleNamespace
import socket
import threading

from pyardrone.navdata.options import Metadata, OptionHeader, index
from pyardrone.abc import BaseClient


header = 0x55667788


class NavDataError(Exception):
    pass


class ChecksumError(NavDataError):
    pass


class IncorrectChecksum(ChecksumError):
    pass


class ChecksumNotPresent(ChecksumError):
    pass


class InvalidSize(NavDataError):
    pass


def compute_checksum(buffer):
    return sum(buffer) & 0xffffffff





class NavDataClient(BaseClient):

    def __init__(self, host, port, timeout=0.01):
        self.host = host
        self.port = port
        self.timeout = timeout
        self.navdata_ready = threading.Event()

    def _listener_job(self):
        while not self.closed:
            try:
                data, addr = self.sock.recvfrom(4096)
            except socket.timeout:
                pass
            else:
                if addr == (self.host, self.port):
                    self.navdata_received(data)
                    self.navdata_ready.set()

    def _connect(self):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sock.settimeout(self.timeout)

        self.sock.sendto(
            b'\x01\x00\x00\x00',
            (self.host, self.port)
        )
        self._thread = threading.Thread(target=self._listener_job)
        self._thread.start()

    def _close(self):
        self._thread.join()
        self.sock.close()

    def navdata_received(self, data):
        self.navdata = NavData(data)