NAIS

NAIS project originate from an attempt to build a tool for multiplexing the serial port between ascii characters (printf strings) and binary encoded structures for conveying complex requests and responses.

NAIS concepts are actually implemented in python language: pynais is an asyncio based package that implements a micro-router like service between:

  • tcp connected apps
  • websocket connected apps
  • boards using a serial line (USB)

NAIS current version targets RIOT [1] based firmware running on a cc3200 launchpad [2] development kit.

NAIS is currently developed and tested on Linux.

A simple example …

A board connected to a USB port and a websocket based UI running in a web browser are linked together:

import pynais as ns

board = ns.SerialLine(device='/dev/ttyUSB0')
web_page = ns.WSLine(port=3000)

ns.junction.route(src=web_page, dst=board
                  src_to_dst = to_protobuf, dst_to_src = to_json)

ns.junction.run()

junction.route declares the bidirectional link between the board and the web page.

The argument src_to_dest names a callback function that implements the custom transformation of messages originating from the web_page (the source) and the board (the destination):

def to_protobuf(in_packet):
    """ Transform a json packet to a protobuf equivalent

        Args:
            in_packet (str): json string
        Returns:
            a protobuf object or None if unable to transform
            the input packet.
            If return None the packet is silently discarded


    """
    LOG.debug("to_protobuf - input: %s, msg: %s",
            type(in_packet), in_packet)

    obj = json.loads(in_packet.decode())

    packet = None

    # check if json packet is a led command
    if 'leds' in obj:
        if obj['leds'] == 'get':
            packet = ns.marshall(cmd.Leds())
        elif obj['leds'] == 'set':
            print()
            packet = ns.marshall(
                cmd.Leds(obj['red'] if 'red' in obj else None,
                        obj['green'] if 'green' in obj else None,
                        obj['yellow'] if 'yellow' in obj else None))
    else:
        LOG.info("to_protobuf - unable to convert message %s",
                in_packet)
    return packet

The argument dst_to_src transforms the messages going from the board (the destination) to the wep_page (the source):

def to_json(in_packet):
    """Convert a protobuf into a json message
    """
    LOG.debug("to_json - input: %s, msg: %s", type(in_packet),
            in_packet)

    # from protobuf to json is just a matter of unmarshalling
    if ns.is_protobuf(in_packet):
        obj = ns.unmarshall(in_packet)
        if ns.is_ack(obj, command_type=cmd.Leds):
            mask = obj.sts
            obj = cmd.Leds()
            obj.set_status(mask)
        return obj
    else:
        LOG.debug("to_json - |%r| > /dev/null", in_packet)
        # do not send the message
        return None

These snippets ends the implementation of the micro-router (see below for the complete source file).

Now a javascript web component may open a websocket client (port 3000) and send the json formatted message:

{'leds':'set', 'red': 'on'}

The NAIS junction engine receive the json string, transform to a protobuf encoded payload and send through the serial port to the cc3200 board.

The red led switches on and a protobuf encoded acknowledge message is send over the UART port.

The protobuf payload is transformed to a custom json structure and the message is finally sent to the web component that get the string:

{"yellow": "on", "green": "off", "red": "on"}

NAIS and Protocol Buffers

cc3200 firmware and NAIS junction supports the Google Protocol Buffers [3] message encoding.

For example the Ack protobuf specification is:

message Ack {
    required int32 id = 1;
    optional int32 seq = 2;
    optional int32 status = 3;
}

id field value is the request message id.

The bits of the status field reports the board leds status, 3 bits for red, green and yellow led.

Simple junction

The complete source for the simple junction:

bash> python simple_junction.py

simple_junction.py:

import logging
import sys
import json
import click
import pynais as ns
import commands as cmd

logging.basicConfig(
    level=logging.DEBUG,
    format='%(levelname)s:%(name)s:%(lineno)s: %(message)s',
    stream=sys.stderr
)
LOG = logging.getLogger()


def to_protobuf(in_packet):
    """ Transform a json packet to a protobuf equivalent

        Args:
            in_packet (str): json string
        Returns:
            a protobuf object or None if unable to transform
            the input packet.
            If return None the packet is silently discarded


    """
    LOG.debug("to_protobuf - input: %s, msg: %s",
              type(in_packet), in_packet)

    obj = json.loads(in_packet.decode())

    packet = None

    # check if json packet is a led command
    if 'leds' in obj:
        if obj['leds'] == 'get':
            packet = ns.marshall(cmd.Leds())
        elif obj['leds'] == 'set':
            print()
            packet = ns.marshall(
                cmd.Leds(obj['red'] if 'red' in obj else None,
                         obj['green'] if 'green' in obj else None,
                         obj['yellow'] if 'yellow' in obj else None))
    else:
        LOG.info("to_protobuf - unable to convert message %s",
                 in_packet)
    return packet


def to_json(in_packet):
    """Convert a protobuf into a json message
    """
    LOG.debug("to_json - input: %s, msg: %s", type(in_packet),
              in_packet)

    # from protobuf to json is just a matter of unmarshalling
    if ns.is_protobuf(in_packet):
        obj = ns.unmarshall(in_packet)
        if ns.is_ack(obj, command_type=cmd.Leds):
            mask = obj.sts
            obj = cmd.Leds()
            obj.set_status(mask)
        return obj
    else:
        LOG.debug("to_json - |%r| > /dev/null", in_packet)
        # do not send the message
        return None


@click.command()
@click.option('--serial/--no-serial', default=True,
              help='serial simulator')
def main(serial):

    web_page = ns.WSLine(port=3000)
    if (serial):
        # the real board
        board = ns.SerialLine()
    else:
        # a software simulator
        board = ns.TcpLine(port=2001)

    ns.junction.route(src=web_page, dst=board,
                      src_to_dst=to_protobuf, dst_to_src=to_json)

    ns.junction.run()


if __name__ == "__main__":
    main()