ReferenceClientAdvancedBuild Device Driver

Build a Device Driver

Creating custom hardware integrations with device drivers.

While the Synnax Driver is great for the devices it supports, you may need to integrate your own hardware with Synnax. This guide walks you through building a reliable, performant driver using the client libraries. The guide uses an Arduino as an example, but the patterns apply to any serial-based device.

Prerequisites

Before starting, ensure you have:

Python

TypeScript

  • Python 3.12+ with the Synnax client: pip install synnax - pyserial package: pip install pyserial

Read-Only Driver

This section sets up the Arduino to continuously read from an analog input and send the value over serial. The driver script captures each incoming value and writes it to a channel in Synnax.

Upload this code to your Arduino.

const int analogPin = A0;

void setup() {
  Serial.begin(9600);
}

void loop() {
  float analogValue = analogRead(analogPin);
  Serial.println(analogValue);
  delay(100); // ~10Hz sampling rate
}

Create a driver file. Find your Arduino’s port in the Arduino IDE (top-right corner when connected via USB).

Python

TypeScript

import serial

PORT = "/dev/ttyACM0"  # Update with your port
BAUD_RATE = 9600

ser = serial.Serial(PORT, BAUD_RATE)
if ser.is_open:
    print("Serial connection established")

Read values in a loop.

Python

TypeScript

while True:
    value = float(ser.readline().decode("utf-8").rstrip())
    print(value)

Create a client and two channels - an index channel for timestamps and a data channel for values.

Python

TypeScript

import synnax as sy

client = sy.Synnax(
    host="localhost",
    port=9090,
    username="synnax",
    password="seldon",
)

index_channel = client.channels.create(
    name="arduino_time",
    is_index=True,
    data_type="timestamp",
    retrieve_if_name_exists=True,
)

data_channel = client.channels.create(
    name="arduino_value",
    index=index_channel.key,
    data_type="float32",
    retrieve_if_name_exists=True,
)

Open a writer and stream data.

Python

TypeScript

with client.open_writer(
    start=sy.TimeStamp.now(),
    channels=["arduino_time", "arduino_value"]
) as writer:
    while True:
        value = float(ser.readline().decode("utf-8").rstrip())
        writer.write({
            "arduino_time": sy.TimeStamp.now(),
            "arduino_value": value,
        })

Python

TypeScript

driver_read.py
import synnax as sy
import serial

PORT = "/dev/ttyACM0"
BAUD_RATE = 9600

client = sy.Synnax(
    host="localhost",
    port=9090,
    username="synnax",
    password="seldon",
)

index_channel = client.channels.create(
    name="arduino_time",
    is_index=True,
    data_type="timestamp",
    retrieve_if_name_exists=True,
)

data_channel = client.channels.create(
    name="arduino_value",
    index=index_channel.key,
    data_type="float32",
    retrieve_if_name_exists=True,
)

ser = serial.Serial(PORT, BAUD_RATE)
if not ser.is_open:
    raise ConnectionError("Failed to establish serial connection")

with client.open_writer(
    start=sy.TimeStamp.now(),
    channels=["arduino_time", "arduino_value"],
) as writer:
    while True:
        value = float(ser.readline().decode("utf-8").rstrip())
        writer.write({
            "arduino_time": sy.TimeStamp.now(),
            "arduino_value": value,
        })

With the script running, set up a line plot in the Console:

Write-Only Driver

This section covers receiving commands from Synnax to control digital outputs on the Arduino.

Upload this code (uses the built-in LED on pin 13).

const int digitalPin = 13;

void setup() {
  Serial.begin(9600);
  pinMode(digitalPin, OUTPUT);
}

void loop() {
  if (Serial.available() > 0) {
    char command = Serial.read();
    if (command == '1') {
      digitalWrite(digitalPin, HIGH);
      Serial.println("ON");
    } else if (command == '0') {
      digitalWrite(digitalPin, LOW);
      Serial.println("OFF");
    }
  }
  delay(10);
}

Create a virtual command channel. Virtual channels do not store historical values. As a consequence, an index channel is not needed.

Python

TypeScript

command_channel = client.channels.create(
    name="arduino_command",
    data_type="uint8",
    virtual=True,
    retrieve_if_name_exists=True,
)

Open a streamer to receive commands and write them to serial.

Python

TypeScript

with client.open_streamer(["arduino_command"]) as streamer:
    for frame in streamer:
        command = str(frame["arduino_command"][0])
        ser.write(command.encode("utf-8"))

Python

TypeScript

driver_write.py
import synnax as sy
import serial

PORT = "/dev/ttyACM0"
BAUD_RATE = 9600

ser = serial.Serial(PORT, BAUD_RATE)
if not ser.is_open:
    raise ConnectionError("Failed to establish serial connection")

client = sy.Synnax(
    host="localhost",
    port=9090,
    username="synnax",
    password="seldon",
)

command_channel = client.channels.create(
    name="arduino_command",
    data_type="uint8",
    virtual=True,
    retrieve_if_name_exists=True,
)

with client.open_streamer(["arduino_command"]) as streamer:
    for frame in streamer:
        command = str(frame["arduino_command"][0])
        ser.write(command.encode("utf-8"))

Set up a switch on a schematic.

Read-Write Driver

You may have noticed the previous section set both “State” and “Command” to the same channel. This works, but has a problem: if the driver stops, clicking the switch still appears to work (it toggles visually) even though nothing happens on the hardware.

The solution is to use separate channels: a command channel for sending commands, and a state channel that reflects the actual hardware state. The switch only updates when the state channel confirms the change.

This code reads commands, controls the output, and sends back both state and analog values.

const int digitalPin = 13;
const int analogPin = A0;
int state = 0;

void setup() {
  Serial.begin(9600);
  pinMode(digitalPin, OUTPUT);
}

void loop() {
  if (Serial.available() > 0) {
    char command = Serial.read();
    if (command == '1') {
      digitalWrite(digitalPin, HIGH);
      state = 1;
    } else if (command == '0') {
      digitalWrite(digitalPin, LOW);
      state = 0;
    }
  }
  float analogValue = analogRead(analogPin);
  String output = String(state) + "," + String(analogValue);
  Serial.println(output);
  delay(10);
}

Create four channels: command, time, state, and value.

Python

TypeScript

arduino_command = client.channels.create(
    name="arduino_command",
    data_type="uint8",
    virtual=True,
    retrieve_if_name_exists=True,
)

arduino_time = client.channels.create(
    name="arduino_time",
    is_index=True,
    data_type="timestamp",
    retrieve_if_name_exists=True,
)

arduino_state = client.channels.create(
    name="arduino_state",
    index=arduino_time.key,
    data_type="uint8",
    retrieve_if_name_exists=True,
)

arduino_value = client.channels.create(
    name="arduino_value",
    index=arduino_time.key,
    data_type="float32",
    retrieve_if_name_exists=True,
)

Stream commands while writing state and values.

Python

TypeScript

with client.open_streamer(["arduino_command"]) as streamer:
    with client.open_writer(
        start=sy.TimeStamp.now(),
        channels=["arduino_time", "arduino_state", "arduino_value"],
    ) as writer:
        while True:
            fr = streamer.read(timeout=0)
            if fr is not None:
                command = str(fr["arduino_command"][0])
                ser.write(command.encode("utf-8"))
            data = ser.readline().decode("utf-8").rstrip()
            if data:
                split = data.split(",")
                writer.write({
                    "arduino_time": sy.TimeStamp.now(),
                    "arduino_state": int(split[0]),
                    "arduino_value": float(split[1]),
                })

Python

TypeScript

driver.py
import synnax as sy
import serial

PORT = "/dev/ttyACM0"
BAUD_RATE = 9600

ser = serial.Serial(PORT, BAUD_RATE)
if not ser.is_open:
    raise ConnectionError("Failed to establish serial connection")

client = sy.Synnax(
    host="localhost",
    port=9090,
    username="synnax",
    password="seldon",
)

arduino_command = client.channels.create(
    name="arduino_command",
    data_type="uint8",
    virtual=True,
    retrieve_if_name_exists=True,
)

arduino_time = client.channels.create(
    name="arduino_time",
    is_index=True,
    data_type="timestamp",
    retrieve_if_name_exists=True,
)

arduino_state = client.channels.create(
    name="arduino_state",
    index=arduino_time.key,
    data_type="uint8",
    retrieve_if_name_exists=True,
)

arduino_value = client.channels.create(
    name="arduino_value",
    index=arduino_time.key,
    data_type="float32",
    retrieve_if_name_exists=True,
)

with client.open_streamer(["arduino_command"]) as streamer:
    with client.open_writer(
        start=sy.TimeStamp.now(),
        channels=["arduino_time", "arduino_state", "arduino_value"],
    ) as writer:
        while True:
            fr = streamer.read(timeout=0)
            if fr is not None:
                command = str(fr["arduino_command"][0])
                ser.write(command.encode("utf-8"))
            data = ser.readline().decode("utf-8").rstrip()
            if data:
                split = data.split(",")
                writer.write({
                    "arduino_time": sy.TimeStamp.now(),
                    "arduino_state": int(split[0]),
                    "arduino_value": float(split[1]),
                })

Now configure the switch with separate channels:

  • Command: arduino_command
  • State: arduino_state

Add a line plot for arduino_value to see the analog input.

Production Drivers

For production-grade hardware integrations, see the C++ Driver documentation. The C++ driver offers:

  • Built-in support for LabJack, National Instruments, OPC UA, and Modbus devices
  • Better performance for high-frequency data acquisition
  • Device pooling and connection management
  • Cross-platform support (Windows, macOS, Linux)
  • Integration with the Synnax task management system