MicroPython: ESP32 ESP-NOW Two-Way Communication

Learn how to establish a two-way communication between two ESP32 boards using ESP-NOW communication protocol. First, we’ll test a simple example to show you how to implement two-way communication. Finally, we’ll build a more complex project, in which we exchange sensor readings between boards and display the results on an OLED display.

MicroPython: ESP32 ESP-NOW Two-Way Communication

Using Arduino IDE? Follow this tutorial instead: ESP-NOW Two-Way Communication Between ESP32 Boards.

Table of Contents

In this tutorial, we’ll cover the following subjects:

Prerequisites – MicroPython Firmware

To follow this tutorial, you need MicroPython firmware installed on your ESP32 or ESP8266 boards. You also need an IDE to write and upload the code to your board. We suggest using Thonny IDE:

New to MicroPython? Check out our eBook: MicroPython Programming with ESP32 and ESP8266 eBook (2nd Edition)

Introducing ESP-NOW

ESP-NOW is a wireless communication protocol developed by Espressif that allows multiple ESP32 or ESP8266 boards to exchange small amounts of data without using Wi-Fi or Bluetooth. ESP-NOW does not require a full Wi-Fi connection (though the Wi-Fi controller must be turned on), making it ideal for low-power and low-latency applications like sensor networks, remote controls, or data exchange between boards.

ESP-NOW - ESP32 Logo

ESP-NOW uses a connectionless communication model, meaning devices can send and receive data without connecting to a router or setting up an access point (unlike HTTP communication between boards). It supports unicast (sending data to a specific device using its MAC address) and broadcast (sending data to all nearby devices using a broadcast MAC address) messaging.

New to ESP-NOW? Read our getting started guide for MicroPython using ESP-NOW with ESP32.

ESP32: Getting Board MAC Address

To communicate via ESP-NOW, you need to know the MAC address of your boards. To get your board’s MAC Address, you can copy the following code to Thonny IDE and run it on your board.

# Rui Santos & Sara Santos - Random Nerd Tutorials
# Complete project details at https://RandomNerdTutorials.com/micropython-esp-now-esp32/
 
import network

wlan = network.WLAN(network.STA_IF)
wlan.active(True)

# Get MAC address (returns bytes)
mac = wlan.config('mac')

# Convert to human-readable format
mac_address = ':'.join('%02x' % b for b in mac)

print("MAC Address:", mac_address)

View raw code

After running the code, it should print the board’s MAC address on the shell.

Thonny IDE - Get ESP32 Board MAC Address - MicroPython

We recommend that you add a label or sticker to each board so that you can clearly identify them.

Two ESP32 boards with a label with their MAC addresses

We’ll be using an ESP32 DOIT V1 board and an ESP32 S3 DevKitC, but this should be compatible with any ESP32 model.

ESP-NOW Two-Way Communication Between Two ESP32 Boards (MicroPython)

In this section, we’ll show you a basic example on how to exchange simple messages between ESP32 boards using ESP-NOW communication protocol. Each board is at the same time a receiver and transmitter, so we can call them transceivers.

There are two ESP-NOW MicroPython libraries included by default now in the current MicroPython firmware versions: espnow, and aioespnow. We’ll use the aioespnow, which is the asynchronous version of the espnow library.

ESP-NOW Two-Way communication between two ESP32 boards

Copy the following code to Thonny IDE.

# Rui Santos & Sara Santos - Random Nerd Tutorials
# Complete project details at https://RandomNerdTutorials.com/micropython-esp32-esp-now-two-way/

import network
import aioespnow
import asyncio
import time

# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(channel=1)  # Set channel explicitly if packets are not received
sta.disconnect()

# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
    e.active(True)
except OSError as err:
    print("Failed to initialize AIOESPNow:", err)
    raise

# Peer MAC address (replace with the actual MAC of the other board)
peer_mac = b'\xff\xff\xff\xff\xff\xff'  # Example peer MAC for unicast

# Add peer for unicast reliability
try:
    e.add_peer(peer_mac)
except OSError as err:
    print("Failed to add peer:", err)
    raise

# Stats tracking
last_stats_time = time.time()
stats_interval = 10  # Print stats every 10 seconds

# Async function to send messages
async def send_messages(e, peer):
    message_count = 0
    while True:
        try:
            message = f"Hello from ESP32 #{message_count}"
            if await e.asend(peer, message, sync=True):
                print(f"Sent message: {message}")
            else:
                print("Failed to send message")
            message_count += 1
            await asyncio.sleep(1)  # Send every 1 second
        except OSError as err:
            print("Send error:", err)
            await asyncio.sleep(5)

# Async function to receive messages
async def receive_messages(e):
    while True:
        try:
            async for mac, msg in e:
                print(f"Received from {mac.hex()}: {msg.decode()}")
        except OSError as err:
            print("Receive error:", err)
            await asyncio.sleep(5)

# Async function to print stats periodically
async def print_stats(e):
    global last_stats_time
    while True:
        if time.time() - last_stats_time >= stats_interval:
            stats = e.stats()
            print("\nESP-NOW Statistics:")
            print(f"  Packets Sent: {stats[0]}")
            print(f"  Packets Delivered: {stats[1]}")
            print(f"  Packets Dropped (TX): {stats[2]}")
            print(f"  Packets Received: {stats[3]}")
            print(f"  Packets Dropped (RX): {stats[4]}")
            last_stats_time = time.time()
        await asyncio.sleep(1)  # Check every second

# Main async function
async def main(e, peer):
    # Run send, receive, and stats tasks concurrently
    await asyncio.gather(send_messages(e, peer), receive_messages(e), print_stats(e))

# Run the async program
try:
    asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
    print("Stopping transceiver...")
    e.active(False)
    sta.active(False)

View raw code

This code sets the ESP32 board as an ESP-NOW receiver and transmitter. In the code, you must insert the MAC address of the board to which you want to send data.

For example, if the receiver board MAC address is 30:AE:A4:F6:7D:4C. You need to convert it to bytes format as follows:

  • 30:AE:A4:F6:7D:4C » b’\x30\xae\xa4\xf6\x7d\x4c’
# Peer MAC address
receiver_mac = b'\x30\xae\xa4\xf6\x7d\x4c'

Upload this code to both boards, but adjust the MAC address.

How Does the Code Work?

To better understand this code, we recommend that you get familiar with MicroPython asynchronous programming first. You can read the following guide:

Importing Modules

First, import the required modules.

import network
import aioespnow
import asyncio
import time

Initialize the Wi-Fi Interface

Then, we need to initialize Wi-Fi (even if we don’t use it) to use ESP-NOW. We can use station (STA_IF) or access point mode (AP_IF).

# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(channel=1)  # Set channel 
sta.disconnect()

Initialize ESP-NOW

Then, we can initialize ESP-NOW. First, create an aioespnow instance called e. Then activate it using the active() method and passing the True value as argument.

# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
    e.active(True)
    print("AIOESPNow initialized")
except OSError as err:
    print("Failed to initialize AIOESPNow:", err)
    raise

We activate ESP-NOW inside a try and except statements so that we can catch any errors if the initialization fails.

Add ESP-NOW Peer

Insert the peer MAC address (the MAC address of the board you want to send data to).

# Receiver MAC address (the board you want to send data to)
peer_mac = b'\x68\xb6\xb3\x22\x9e\x60'

Then, we can add the receiver’s MAC address as a peer using the add_peer() method.

try:
    e.add_peer(peer_mac)
except OSError as err:
    print("Failed to add peer:", err)
    raise

Function to Send ESP-NOW Messages

The following function sends ESP-NOW messages to the peer. Pass as argument the ESP-NOW instance e and the peer. As an example, we’ll send a Hello from ESP32 message followed by a counter. We send a new message every second.

# Async function to send messages
async def send_messages(e, peer):
    message_count = 0
    while True:
        try:
            message = f"Hello from ESP32 #{message_count}"
            if await e.asend(peer, message, sync=True):
                print(f"Sent message: {message}")
            else:
                print("Failed to send message")
            message_count += 1
            await asyncio.sleep(1)  # Send every 1 second
        except OSError as err:
            print("Send error:", err)
            await asyncio.sleep(5)

Function to Receive ESP-NOW Messages

We also need to create a function to receive the ESP-NOW messages. When there’s a new message, we print it in the MicroPython shell.

# Async function to receive messages
async def receive_messages(e):
    while True:
        try:
            async for mac, msg in e:
                print(f"Received from {mac.hex()}: {msg.decode()}")
        except OSError as err:
            print("Receive error:", err)
            await asyncio.sleep(5)

Print ESP-NOW Statistics

Then, we create a message to call later in the code, called print_stats() that prints current statistics about the ESP-NOW packets. To get the number of packets sent/received and lost, we can call the stats() method on the ESP-NOW e object.

This returns a 5-tuple containing the number of packets sent/received/lost:

(tx_pkts, tx_responses, tx_failures, rx_packets, rx_dropped_packets)

This is the complete asynchronous function.

async def print_stats(e):
    global last_stats_time
    while True:
        if time.time() - last_stats_time >= stats_interval:
            stats = e.stats()
            print("\nESP-NOW Statistics:")
            print(f"  Packets Sent: {stats[0]}")
            print(f"  Packets Delivered: {stats[1]}")
            print(f"  Packets Dropped (TX): {stats[2]}")
            print(f"  Packets Received: {stats[3]}")
            print(f"  Packets Dropped (RX): {stats[4]}")
            last_stats_time = time.time()
        await asyncio.sleep(1)  # Check every second

Main Async Function

The following line defines an asynchronous function main() that takes the ESP-NOW object e and peer MAC address peer as arguments.

async def main(e, peer):

Then, we use asyncio.gather to run three async tasks concurrently:

  • send_messages(e, peer): sends BME280 sensor data to the peer;
  • receive_messages(e): listens for incoming data from the peer;
  • print_stats(e): periodically prints ESP-NOW statistics.
await asyncio.gather(send_messages(e, peer), receive_messages(e), print_stats(e))

Running the Async Program

Finally, the following lines will run the program asynchronously.

# Run the async program
try:
    asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
    print("Stopping transceiver...")
    e.active(False)
    sta.active(False)

This command asyncio.run(main(e, peer_mac)) starts the MicroPython asyncio event loop, executing the main() function.

This goes inside try and except statements to catch a KeyboardInterrupt (manually stopping the code). When this happens, we disable ESP-NOW and turn off Wi-Fi.

Running the Code

After connecting the board to your computer and establishing a communication with Thonny IDE, you can upload the code as main.py to the board or run it using the green Run button. Make sure you’ve inserted the receiver’s MAC address in the code.

Running the Code Thonny IDE MicroPython

It will start printing messages in the Shell. It will fail to send the message while the other board is still not ready.

Open another instance of Thonny IDE, and establish a communication with the other board.

Enabling Two Instances of Thonny IDE

Go to Tools > Options and untick the option Allow only single Thonny instance.

Copy the same code, but insert the other board MAC address. After a few seconds, it will start sending and receiving ESP-NOW messages.

Two ESP32 boards with a label with their MAC addresses

The following screenshots show the MicroPython Shell for each of my boards.

ESP32 ESP-NOW Transceiver Two-Way Communication - printing the results on the MicroPython shell
ESP32 ESP-NOW Transceiver Two-Way Communication - printing the results on the MicroPython shell

You can see that after powering both boards, they’ll start exchanging messages with each other.


ESP32 ESP-NOW Two-Way Communication – Exchange Sensor Readings and Display on OLED

In this section, we’ll build a project with a real-world application in which the two ESP32 boards exchange sensor readings and display the received data on an OLED display.

ESP32 ESP-NOW Two-Way Communication - Exchange Sensor Readings and Display on OLED

Project Overview

The following diagram shows a high-level overview of the project we’ll build.

ESP-NOW Two-Way Communication - Send Sensor Readings Between Boards
  • In this project we’ll have two ESP32 boards. Each board is connected to an OLED display and a BME280 sensor;
  • Each board gets temperature, humidity, and pressure readings from its corresponding sensors;
  • Each board sends its readings to the other board via ESP-NOW;
  • When a board receives the readings, it displays them on the OLED display;
  • After sending the readings, the board displays on the OLED if the message was successfully delivered;
  • Each board needs to know the other board MAC address in order to send the message.

In this example, we’re using a two-way communication between two boards, but you can add more boards to this setup, and having all boards communicating with each other.

Parts Required

For this tutorial, you need the following parts:

You can use the preceding links or go directly to MakerAdvisor.com/tools to find all the parts for your projects at the best price!

Schematic Diagram

Wire an OLED display and a BME280 sensor to each ESP32 board. Follow the next schematic diagram (adjust if using an ESP32 model with a different pinout).

ESP32 wiring schematic diagram to BME280 sensor and OLED display

You can use the following table as a reference when wiring the BME280 sensor.

BME280ESP32
VIN3.3V
GNDGND
SCLGPIO 22
SDAGPIO 21

You can also follow the next table to wire the OLED display to the ESP32.

OLED DisplayESP32
GNDGND
VDD / VCCVIN
SCK / SCLGPIO 22
SDAGPIO 21

Importing Libraries

The libraries to interface with the OLED display and get BME280 data are not part of the standard MicroPython package. So, we need to import those modules into our boards.

ssd1306.py MicroPython Module

1. Copy the following code to a new file on Thonny IDE.

# MicroPython SSD1306 OLED driver, I2C and SPI interfaces created by Adafruit

import time
import framebuf

# register definitions
SET_CONTRAST        = const(0x81)
SET_ENTIRE_ON       = const(0xa4)
SET_NORM_INV        = const(0xa6)
SET_DISP            = const(0xae)
SET_MEM_ADDR        = const(0x20)
SET_COL_ADDR        = const(0x21)
SET_PAGE_ADDR       = const(0x22)
SET_DISP_START_LINE = const(0x40)
SET_SEG_REMAP       = const(0xa0)
SET_MUX_RATIO       = const(0xa8)
SET_COM_OUT_DIR     = const(0xc0)
SET_DISP_OFFSET     = const(0xd3)
SET_COM_PIN_CFG     = const(0xda)
SET_DISP_CLK_DIV    = const(0xd5)
SET_PRECHARGE       = const(0xd9)
SET_VCOM_DESEL      = const(0xdb)
SET_CHARGE_PUMP     = const(0x8d)


class SSD1306:
    def __init__(self, width, height, external_vcc):
        self.width = width
        self.height = height
        self.external_vcc = external_vcc
        self.pages = self.height // 8
        # Note the subclass must initialize self.framebuf to a framebuffer.
        # This is necessary because the underlying data buffer is different
        # between I2C and SPI implementations (I2C needs an extra byte).
        self.poweron()
        self.init_display()

    def init_display(self):
        for cmd in (
            SET_DISP | 0x00, # off
            # address setting
            SET_MEM_ADDR, 0x00, # horizontal
            # resolution and layout
            SET_DISP_START_LINE | 0x00,
            SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0
            SET_MUX_RATIO, self.height - 1,
            SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0
            SET_DISP_OFFSET, 0x00,
            SET_COM_PIN_CFG, 0x02 if self.height == 32 else 0x12,
            # timing and driving scheme
            SET_DISP_CLK_DIV, 0x80,
            SET_PRECHARGE, 0x22 if self.external_vcc else 0xf1,
            SET_VCOM_DESEL, 0x30, # 0.83*Vcc
            # display
            SET_CONTRAST, 0xff, # maximum
            SET_ENTIRE_ON, # output follows RAM contents
            SET_NORM_INV, # not inverted
            # charge pump
            SET_CHARGE_PUMP, 0x10 if self.external_vcc else 0x14,
            SET_DISP | 0x01): # on
            self.write_cmd(cmd)
        self.fill(0)
        self.show()

    def poweroff(self):
        self.write_cmd(SET_DISP | 0x00)

    def contrast(self, contrast):
        self.write_cmd(SET_CONTRAST)
        self.write_cmd(contrast)

    def invert(self, invert):
        self.write_cmd(SET_NORM_INV | (invert & 1))

    def show(self):
        x0 = 0
        x1 = self.width - 1
        if self.width == 64:
            # displays with width of 64 pixels are shifted by 32
            x0 += 32
            x1 += 32
        self.write_cmd(SET_COL_ADDR)
        self.write_cmd(x0)
        self.write_cmd(x1)
        self.write_cmd(SET_PAGE_ADDR)
        self.write_cmd(0)
        self.write_cmd(self.pages - 1)
        self.write_framebuf()

    def fill(self, col):
        self.framebuf.fill(col)

    def pixel(self, x, y, col):
        self.framebuf.pixel(x, y, col)

    def scroll(self, dx, dy):
        self.framebuf.scroll(dx, dy)

    def text(self, string, x, y, col=1):
        self.framebuf.text(string, x, y, col)


class SSD1306_I2C(SSD1306):
    def __init__(self, width, height, i2c, addr=0x3c, external_vcc=False):
        self.i2c = i2c
        self.addr = addr
        self.temp = bytearray(2)
        # Add an extra byte to the data buffer to hold an I2C data/command byte
        # to use hardware-compatible I2C transactions.  A memoryview of the
        # buffer is used to mask this byte from the framebuffer operations
        # (without a major memory hit as memoryview doesn't copy to a separate
        # buffer).
        self.buffer = bytearray(((height // 8) * width) + 1)
        self.buffer[0] = 0x40  # Set first byte of data buffer to Co=0, D/C=1
        self.framebuf = framebuf.FrameBuffer1(memoryview(self.buffer)[1:], width, height)
        super().__init__(width, height, external_vcc)

    def write_cmd(self, cmd):
        self.temp[0] = 0x80 # Co=1, D/C#=0
        self.temp[1] = cmd
        self.i2c.writeto(self.addr, self.temp)

    def write_framebuf(self):
        # Blast out the frame buffer using a single I2C transaction to support
        # hardware I2C interfaces.
        self.i2c.writeto(self.addr, self.buffer)

    def poweron(self):
        pass


class SSD1306_SPI(SSD1306):
    def __init__(self, width, height, spi, dc, res, cs, external_vcc=False):
        self.rate = 10 * 1024 * 1024
        dc.init(dc.OUT, value=0)
        res.init(res.OUT, value=0)
        cs.init(cs.OUT, value=1)
        self.spi = spi
        self.dc = dc
        self.res = res
        self.cs = cs
        self.buffer = bytearray((height // 8) * width)
        self.framebuf = framebuf.FrameBuffer1(self.buffer, width, height)
        super().__init__(width, height, external_vcc)

    def write_cmd(self, cmd):
        self.spi.init(baudrate=self.rate, polarity=0, phase=0)
        self.cs.high()
        self.dc.low()
        self.cs.low()
        self.spi.write(bytearray([cmd]))
        self.cs.high()

    def write_framebuf(self):
        self.spi.init(baudrate=self.rate, polarity=0, phase=0)
        self.cs.high()
        self.dc.high()
        self.cs.low()
        self.spi.write(self.buffer)
        self.cs.high()

    def poweron(self):
        self.res.high()
        time.sleep_ms(1)
        self.res.low()
        time.sleep_ms(10)
        self.res.high()

View raw code

2. Go to File > Save as and select MicroPython device.

Thonny IDE Save to MicroPython Device

3. Name the file ssd1306.py and click OK to save the file on the ESP Filesystem.

And that’s it. The library was uploaded to your board.

BME280.py MicroPython Module

1. Copy the following code to a new file on Thonny IDE.

from machine import I2C
import time

# BME280 default address.
BME280_I2CADDR = 0x76

# Operating Modes
BME280_OSAMPLE_1 = 1
BME280_OSAMPLE_2 = 2
BME280_OSAMPLE_4 = 3
BME280_OSAMPLE_8 = 4
BME280_OSAMPLE_16 = 5

# BME280 Registers

BME280_REGISTER_DIG_T1 = 0x88  # Trimming parameter registers
BME280_REGISTER_DIG_T2 = 0x8A
BME280_REGISTER_DIG_T3 = 0x8C

BME280_REGISTER_DIG_P1 = 0x8E
BME280_REGISTER_DIG_P2 = 0x90
BME280_REGISTER_DIG_P3 = 0x92
BME280_REGISTER_DIG_P4 = 0x94
BME280_REGISTER_DIG_P5 = 0x96
BME280_REGISTER_DIG_P6 = 0x98
BME280_REGISTER_DIG_P7 = 0x9A
BME280_REGISTER_DIG_P8 = 0x9C
BME280_REGISTER_DIG_P9 = 0x9E

BME280_REGISTER_DIG_H1 = 0xA1
BME280_REGISTER_DIG_H2 = 0xE1
BME280_REGISTER_DIG_H3 = 0xE3
BME280_REGISTER_DIG_H4 = 0xE4
BME280_REGISTER_DIG_H5 = 0xE5
BME280_REGISTER_DIG_H6 = 0xE6
BME280_REGISTER_DIG_H7 = 0xE7

BME280_REGISTER_CHIPID = 0xD0
BME280_REGISTER_VERSION = 0xD1
BME280_REGISTER_SOFTRESET = 0xE0

BME280_REGISTER_CONTROL_HUM = 0xF2
BME280_REGISTER_CONTROL = 0xF4
BME280_REGISTER_CONFIG = 0xF5
BME280_REGISTER_PRESSURE_DATA = 0xF7
BME280_REGISTER_TEMP_DATA = 0xFA
BME280_REGISTER_HUMIDITY_DATA = 0xFD


class Device:
  """Class for communicating with an I2C device.

  Allows reading and writing 8-bit, 16-bit, and byte array values to
  registers on the device."""

  def __init__(self, address, i2c):
    """Create an instance of the I2C device at the specified address using
    the specified I2C interface object."""
    self._address = address
    self._i2c = i2c

  def writeRaw8(self, value):
    """Write an 8-bit value on the bus (without register)."""
    value = value & 0xFF
    self._i2c.writeto(self._address, value)

  def write8(self, register, value):
    """Write an 8-bit value to the specified register."""
    b=bytearray(1)
    b[0]=value & 0xFF
    self._i2c.writeto_mem(self._address, register, b)

  def write16(self, register, value):
    """Write a 16-bit value to the specified register."""
    value = value & 0xFFFF
    b=bytearray(2)
    b[0]= value & 0xFF
    b[1]= (value>>8) & 0xFF
    self.i2c.writeto_mem(self._address, register, value)

  def readRaw8(self):
    """Read an 8-bit value on the bus (without register)."""
    return int.from_bytes(self._i2c.readfrom(self._address, 1),'little') & 0xFF

  def readU8(self, register):
    """Read an unsigned byte from the specified register."""
    return int.from_bytes(
        self._i2c.readfrom_mem(self._address, register, 1),'little') & 0xFF

  def readS8(self, register):
    """Read a signed byte from the specified register."""
    result = self.readU8(register)
    if result > 127:
      result -= 256
    return result

  def readU16(self, register, little_endian=True):
    """Read an unsigned 16-bit value from the specified register, with the
    specified endianness (default little endian, or least significant byte
    first)."""
    result = int.from_bytes(
        self._i2c.readfrom_mem(self._address, register, 2),'little') & 0xFFFF
    if not little_endian:
      result = ((result << 8) & 0xFF00) + (result >> 8)
    return result

  def readS16(self, register, little_endian=True):
    """Read a signed 16-bit value from the specified register, with the
    specified endianness (default little endian, or least significant byte
    first)."""
    result = self.readU16(register, little_endian)
    if result > 32767:
      result -= 65536
    return result

  def readU16LE(self, register):
    """Read an unsigned 16-bit value from the specified register, in little
    endian byte order."""
    return self.readU16(register, little_endian=True)

  def readU16BE(self, register):
    """Read an unsigned 16-bit value from the specified register, in big
    endian byte order."""
    return self.readU16(register, little_endian=False)

  def readS16LE(self, register):
    """Read a signed 16-bit value from the specified register, in little
    endian byte order."""
    return self.readS16(register, little_endian=True)

  def readS16BE(self, register):
    """Read a signed 16-bit value from the specified register, in big
    endian byte order."""
    return self.readS16(register, little_endian=False)


class BME280:
  def __init__(self, mode=BME280_OSAMPLE_1, address=BME280_I2CADDR, i2c=None,
               **kwargs):
    # Check that mode is valid.
    if mode not in [BME280_OSAMPLE_1, BME280_OSAMPLE_2, BME280_OSAMPLE_4,
                    BME280_OSAMPLE_8, BME280_OSAMPLE_16]:
        raise ValueError(
            'Unexpected mode value {0}. Set mode to one of '
            'BME280_ULTRALOWPOWER, BME280_STANDARD, BME280_HIGHRES, or '
            'BME280_ULTRAHIGHRES'.format(mode))
    self._mode = mode
    # Create I2C device.
    if i2c is None:
      raise ValueError('An I2C object is required.')
    self._device = Device(address, i2c)
    # Load calibration values.
    self._load_calibration()
    self._device.write8(BME280_REGISTER_CONTROL, 0x3F)
    self.t_fine = 0

  def _load_calibration(self):

    self.dig_T1 = self._device.readU16LE(BME280_REGISTER_DIG_T1)
    self.dig_T2 = self._device.readS16LE(BME280_REGISTER_DIG_T2)
    self.dig_T3 = self._device.readS16LE(BME280_REGISTER_DIG_T3)

    self.dig_P1 = self._device.readU16LE(BME280_REGISTER_DIG_P1)
    self.dig_P2 = self._device.readS16LE(BME280_REGISTER_DIG_P2)
    self.dig_P3 = self._device.readS16LE(BME280_REGISTER_DIG_P3)
    self.dig_P4 = self._device.readS16LE(BME280_REGISTER_DIG_P4)
    self.dig_P5 = self._device.readS16LE(BME280_REGISTER_DIG_P5)
    self.dig_P6 = self._device.readS16LE(BME280_REGISTER_DIG_P6)
    self.dig_P7 = self._device.readS16LE(BME280_REGISTER_DIG_P7)
    self.dig_P8 = self._device.readS16LE(BME280_REGISTER_DIG_P8)
    self.dig_P9 = self._device.readS16LE(BME280_REGISTER_DIG_P9)

    self.dig_H1 = self._device.readU8(BME280_REGISTER_DIG_H1)
    self.dig_H2 = self._device.readS16LE(BME280_REGISTER_DIG_H2)
    self.dig_H3 = self._device.readU8(BME280_REGISTER_DIG_H3)
    self.dig_H6 = self._device.readS8(BME280_REGISTER_DIG_H7)

    h4 = self._device.readS8(BME280_REGISTER_DIG_H4)
    h4 = (h4 << 24) >> 20
    self.dig_H4 = h4 | (self._device.readU8(BME280_REGISTER_DIG_H5) & 0x0F)

    h5 = self._device.readS8(BME280_REGISTER_DIG_H6)
    h5 = (h5 << 24) >> 20
    self.dig_H5 = h5 | (
        self._device.readU8(BME280_REGISTER_DIG_H5) >> 4 & 0x0F)

  def read_raw_temp(self):
    """Reads the raw (uncompensated) temperature from the sensor."""
    meas = self._mode
    self._device.write8(BME280_REGISTER_CONTROL_HUM, meas)
    meas = self._mode << 5 | self._mode << 2 | 1
    self._device.write8(BME280_REGISTER_CONTROL, meas)
    sleep_time = 1250 + 2300 * (1 << self._mode)

    sleep_time = sleep_time + 2300 * (1 << self._mode) + 575
    sleep_time = sleep_time + 2300 * (1 << self._mode) + 575
    time.sleep_us(sleep_time)  # Wait the required time
    msb = self._device.readU8(BME280_REGISTER_TEMP_DATA)
    lsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 1)
    xlsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 2)
    raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4
    return raw

  def read_raw_pressure(self):
    """Reads the raw (uncompensated) pressure level from the sensor."""
    """Assumes that the temperature has already been read """
    """i.e. that enough delay has been provided"""
    msb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA)
    lsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 1)
    xlsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 2)
    raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4
    return raw

  def read_raw_humidity(self):
    """Assumes that the temperature has already been read """
    """i.e. that enough delay has been provided"""
    msb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA)
    lsb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA + 1)
    raw = (msb << 8) | lsb
    return raw

  def read_temperature(self):
    """Get the compensated temperature in 0.01 of a degree celsius."""
    adc = self.read_raw_temp()
    var1 = ((adc >> 3) - (self.dig_T1 << 1)) * (self.dig_T2 >> 11)
    var2 = ((
        (((adc >> 4) - self.dig_T1) * ((adc >> 4) - self.dig_T1)) >> 12) *
        self.dig_T3) >> 14
    self.t_fine = var1 + var2
    return (self.t_fine * 5 + 128) >> 8

  def read_pressure(self):
    """Gets the compensated pressure in Pascals."""
    adc = self.read_raw_pressure()
    var1 = self.t_fine - 128000
    var2 = var1 * var1 * self.dig_P6
    var2 = var2 + ((var1 * self.dig_P5) << 17)
    var2 = var2 + (self.dig_P4 << 35)
    var1 = (((var1 * var1 * self.dig_P3) >> 8) +
            ((var1 * self.dig_P2) >> 12))
    var1 = (((1 << 47) + var1) * self.dig_P1) >> 33
    if var1 == 0:
      return 0
    p = 1048576 - adc
    p = (((p << 31) - var2) * 3125) // var1
    var1 = (self.dig_P9 * (p >> 13) * (p >> 13)) >> 25
    var2 = (self.dig_P8 * p) >> 19
    return ((p + var1 + var2) >> 8) + (self.dig_P7 << 4)

  def read_humidity(self):
    adc = self.read_raw_humidity()
    # print 'Raw humidity = {0:d}'.format (adc)
    h = self.t_fine - 76800
    h = (((((adc << 14) - (self.dig_H4 << 20) - (self.dig_H5 * h)) +
         16384) >> 15) * (((((((h * self.dig_H6) >> 10) * (((h *
                          self.dig_H3) >> 11) + 32768)) >> 10) + 2097152) *
                          self.dig_H2 + 8192) >> 14))
    h = h - (((((h >> 15) * (h >> 15)) >> 7) * self.dig_H1) >> 4)
    h = 0 if h < 0 else h
    h = 419430400 if h > 419430400 else h
    return h >> 12

  @property
  def temperature(self):
    "Return the temperature in degrees."
    t = self.read_temperature()
    ti = t // 100
    td = t - ti * 100
    return "{}.{:02d}C".format(ti, td)

  @property
  def pressure(self):
    "Return the temperature in hPa."
    p = self.read_pressure() // 256
    pi = p // 100
    pd = p - pi * 100
    return "{}.{:02d}hPa".format(pi, pd)

  @property
  def humidity(self):
    "Return the humidity in percent."
    h = self.read_humidity()
    hi = h // 1024
    hd = h * 100 // 1024 - hi * 100
    return "{}.{:02d}%".format(hi, hd)

View raw code

2. Go to File > Save as…

Thonny IDE ESP32 ESP8266 MicroPython Save file library to device save as

3. Select save to “MicroPython device“:

Thonny IDE ESP32 ESP8266 MicroPython Save file library to device select

4. Name your file as BME280.py and press the OK button:

BME280 library new MicroPython file Thonny IDE

And that’s it. The library was uploaded to your board.

ESP32 Exchange BME280 Sensor Readings via ESP-NOW – MicroPython Code

Now that you have all the required modules uploaded to your boards, you can upload the following code to each of your boards.

Important: don’t forget to insert the receiver’s MAC address on the code.

# Rui Santos & Sara Santos - Random Nerd Tutorials
# Complete project details at https://RandomNerdTutorials.com/micropython-esp32-esp-now-two-way/

import network
import aioespnow
import asyncio
import time
import ujson
from machine import Pin, I2C
import BME280
import ssd1306

# Initialize I2C for BME280 and SSD1306
i2c = I2C(0, scl=Pin(22), sda=Pin(21))

# Initialize BME280 sensor
try:
    bme = BME280.BME280(i2c=i2c, address=0x76)
    print("BME280 initialized")
except Exception as err:
    print("Failed to initialize BME280:", err)
    raise

# Initialize SSD1306 OLED display
try:
    display = ssd1306.SSD1306_I2C(128, 64, i2c, addr=0x3C)
    print("SSD1306 initialized")
except Exception as err:
    print("Failed to initialize SSD1306:", err)
    raise

# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(channel=1) # set Wi-Fi channel for more stable communication
sta.disconnect()
print("Wi-Fi initialized")

# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
    e.active(True)
    print("AIOESPNow initialized")
except OSError as err:
    print("Failed to initialize AIOESPNow:", err)
    raise

# Receiver MAC address (the board you want to send data to)
peer_mac = b'\xff\xff\xff\xff\xff\xff'

# Add peer
try:
    e.add_peer(peer_mac)
except OSError as err:
    print("Failed to add peer:", err)
    raise

# Variables to store readings and status
last_send_status = " "
incoming_readings = {'temp': 0.0, 'hum': 0.0, 'pres': 0.0}

# Function to get BME280 readings
def get_readings():
    try:
        temp = float(bme.temperature[:-1]) # Remove 'C'
        hum = float(bme.humidity[:-1])     # Remove '%'
        pres = float(bme.pressure[:-3])    # Remove 'hPa'
        print("BME280 readings:", temp, hum, pres)
        return temp, hum, pres
    except Exception as err:
        print("Error reading BME280:", err)
        return 0.0, 0.0, 0.0

# Function to update OLED display
def update_display():
    try:
        display.fill(0)
        display.text("INCOM. READINGS", 0, 0)
        display.text("Temp: {:.1f} C".format(incoming_readings['temp']), 0, 15)
        display.text("Hum: {:.1f} %".format(incoming_readings['hum']), 0, 25)
        display.text("Pres: {:.1f} hPa".format(incoming_readings['pres']), 0, 35)
        display.text(last_send_status, 0, 55)
        display.show()
        print("Display updated")
    except Exception as err:
        print("Error updating display:", err)

# Async function to send messages
async def send_messages(e, peer):
    global last_send_status
    while True:
        try:
            print("Sending data")
            temp, hum, pres = get_readings()
            # Create JSON string
            data_dict = {"temp": temp, "hum": hum, "pres": pres}
            json_str = ujson.dumps(data_dict)
            data = json_str.encode('utf-8')  # Convert to bytes
            print("Sending JSON:", json_str)
            if await e.asend(peer, data, sync=True):
                print("Sent with success")
                last_send_status = "Delivery Success :)"
            else:
                print("Send failed")
                last_send_status = "Delivery Fail :("
            update_display()
            print("Sending task complete")
            await asyncio.sleep(10)  # Send every 10 seconds
        except OSError as err:
            print("Send error:", err)
            last_send_status = "Delivery Fail :("
            update_display()
            await asyncio.sleep(0.1)  # Shorter delay in case of error

# Async function to receive messages
async def receive_messages(e):
    global incoming_readings
    while True:
        try:
            print("Checking for messages")
            async for mac, msg in e:
                try:
                    # Decode bytes to string and parse JSON
                    json_str = msg.decode('utf-8')
                    data_dict = ujson.loads(json_str)
                    temp = data_dict['temp']
                    hum = data_dict['hum']
                    pres = data_dict['pres']
                    incoming_readings['temp'] = temp
                    incoming_readings['hum'] = hum
                    incoming_readings['pres'] = pres
                    print("\nINCOMING READINGS")
                    print("Temperature: {:.1f} ºC".format(temp))
                    print("Humidity: {:.1f} %".format(hum))
                    print("Pressure: {:.1f} hPa".format(pres))
                    update_display()
                except (ValueError, KeyError) as err:
                    print("Error parsing JSON:", err)
            await asyncio.sleep(0.01)  # Yield if no received messages
        except OSError as err:
            print("Receive error:", err)
            await asyncio.sleep(0.1)  # Shorter delay in case of error

# Main async function
async def main(e, peer):
    print("Starting main loop")
    await asyncio.gather(send_messages(e, peer), receive_messages(e))

# Run the async program
try:
    print("Starting transceiver...")
    asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
    print("Stopping transceiver...")
    e.active(False)
    sta.active(False)
except Exception as err:
    print("Main loop error:", err)
finally:
    print("Cleaning up...")
    e.active(False)
    sta.active(False)

View raw code

How Does the Code Work?

Let’s take a quick look at how the code works. Alternatively, you can skip to the demonstration section.

Importing Libraries

We start by importing the required modules.

import network
import aioespnow
import asyncio
import time
import ujson
from machine import Pin, I2C
import BME280
import ssd1306

I2C Communication

Initialize an I2C communication on GPIOs 22 and 21. Adjust if you’re using different pins.

i2c = I2C(0, scl=Pin(22), sda=Pin(21))

BME280

Initialize the BME280 sensor.

# Initialize BME280 sensor
try:
    bme = BME280.BME280(i2c=i2c, address=0x76)
    print("BME280 initialized")
except Exception as err:
    print("Failed to initialize BME280:", err)
    raise

OLED Display

Initialize the OLED Display.

# Initialize SSD1306 OLED display
try:
    display = ssd1306.SSD1306_I2C(128, 64, i2c, addr=0x3C)
    print("SSD1306 initialized")
except Exception as err:
    print("Failed to initialize SSD1306:", err)
    raise

Initialize the Wi-Fi Interface

Then, we need to initialize Wi-Fi (even if we don’t use it) to use ESP-NOW. We can use station (STA_IF) or access point mode (AP_IF).

# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(channel=1)  # Set channel 
sta.disconnect()

Initialize ESP-NOW

Then, we can initialize ESP-NOW. First, create an aioespnow instance called e. Then activate it using the active() method and passing the True value as argument.

# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
    e.active(True)
    print("AIOESPNow initialized")
except OSError as err:
    print("Failed to initialize AIOESPNow:", err)
    raise

We activate ESP-NOW inside a try and except statements so that we can catch any errors if the initialization fails.

Add ESP-NOW Peer

Insert the peer MAC address (the MAC address of the board you want to send data to).

# Receiver MAC address (the board you want to send data to)
peer_mac = b'\x68\xb6\xb3\x22\x9e\x60'

Then, we can add the receiver’s MAC address as a peer using the add_peer() method.

try:
    e.add_peer(peer_mac)
except OSError as err:
    print("Failed to add peer:", err)
    raise

Variables

Create variables to store the incoming sensor readings, and the state of the sending process. We save the incoming sensor readings in a dictionary.

last_send_status = " "
incoming_readings = {'temp': 0.0, 'hum': 0.0, 'pres': 0.0}

Get BME280 Sensor Readings

The get_readings() function returns the BME280 sensor readings in this order, temperature, humidity, and pressure. The functions from the BME280 library return the results with the unit character, that’s why we’re removing characters at the end of the readings.

In case there’s an error reading the sensor, it will return 0.0 for all readings.

# Function to get BME280 readings
def get_readings():
    try:
        temp = float(bme.temperature[:-1]) # Remove 'C'
        hum = float(bme.humidity[:-1])     # Remove '%'
        pres = float(bme.pressure[:-3])    # Remove 'hPa'
        print("BME280 readings:", temp, hum, pres)
        return temp, hum, pres
    except Exception as err:
        print("Error reading BME280:", err)
        return 0.0, 0.0, 0.0

Update the OLED Display

The following function updates the display with the current incoming sensor readings. It also displays the last sending status.

# Function to update OLED display
def update_display():
    try:
        display.fill(0)
        display.text("INCOM. READINGS", 0, 0)
        display.text("Temp: {:.1f} C".format(incoming_readings['temp']), 0, 15)
        display.text("Hum: {:.1f} %".format(incoming_readings['hum']), 0, 25)
        display.text("Pres: {:.1f} hPa".format(incoming_readings['pres']), 0, 35)
        display.text(last_send_status, 0, 55)
        display.show()
        print("Display updated")
    except Exception as err:
        print("Error updating display:", err)

To learn more about using the OLED display with the ESP32 programmed with MicroPython, check our guide: MicroPython: OLED Display with ESP32 and ESP8266.

The send_message(e, peer) function sends a message to the specified peer using an espnow instance e.

# Async function to send messages
async def send_messages(e, peer):
    global last_send_status
    while True:
        try:
            print("Sending data")
            temp, hum, pres = get_readings()
            # Create JSON string
            data_dict = {"temp": temp, "hum": hum, "pres": pres}
            json_str = ujson.dumps(data_dict)
            data = json_str.encode('utf-8')  # Convert to bytes
            print("Sending JSON:", json_str)
            if await e.asend(peer, data, sync=True):
                print("Sent with success")
                last_send_status = "Delivery Success :)"
            else:
                print("Send failed")
                last_send_status = "Delivery Fail :("
            update_display()
            print("Sending task complete")
            await asyncio.sleep(10)  # Send every 10 seconds
        except OSError as err:
            print("Send error:", err)
            last_send_status = "Delivery Fail :("
            update_display()
            await asyncio.sleep(0.1)  # Shorter delay in case of error

In this function, we start by calling get_readings() to get the current temperature, humidity and pressure readings. Then, we update the dictionary with the current readings. We convert it to a JSON String and then to bytes.

temp, hum, pres = get_readings()
# Create JSON string
data_dict = {"temp": temp, "hum": hum, "pres": pres}
json_str = ujson.dumps(data_dict)
data = json_str.encode('utf-8')  # Convert to bytes

Then, we use the same procedure we described previously to send data. During this process, we also update the display with the result of sending the data.

if await e.asend(peer, data, sync=True):
    print("Sent with success")
    last_send_status = "Delivery Success :)"
else:
    print("Send failed")
    last_send_status = "Delivery Fail :("
    update_display()
    print("Sending task complete")
    await asyncio.sleep(10)  # Send every 10 seconds
except OSError as err:
    print("Send error:", err)
    last_send_status = "Delivery Fail :("
    update_display()
    await asyncio.sleep(0.1)  # Shorter delay in case of error

Receiving Incoming Data

The receive_messages() function will receive the data from the other board.

# Async function to receive messages
async def receive_messages(e):
    global incoming_readings
    while True:
        try:
            print("Checking for messages")
            async for mac, msg in e:
                try:
                    # Decode bytes to string and parse JSON
                    json_str = msg.decode('utf-8')
                    data_dict = ujson.loads(json_str)
                    temp = data_dict['temp']
                    hum = data_dict['hum']
                    pres = data_dict['pres']
                    incoming_readings['temp'] = temp
                    incoming_readings['hum'] = hum
                    incoming_readings['pres'] = pres
                    print("\nINCOMING READINGS")
                    print("Temperature: {:.1f} ºC".format(temp))
                    print("Humidity: {:.1f} %".format(hum))
                    print("Pressure: {:.1f} hPa".format(pres))
                    update_display()
                except (ValueError, KeyError) as err:
                    print("Error parsing JSON:", err)
            await asyncio.sleep(0.01)  # Yield if no received messages
        except OSError as err:
            print("Receive error:", err)
            await asyncio.sleep(0.1)  # Shorter delay in case of error

When we receive the data, we convert it to a JSON string. Then, we update the incoming_readings dictionary with the received data.

Finally, we print the readings in the serial monitor and also call the update_display() function to update the screen with the latest sensor readings.

Creating the Async Loop Function and Running the Program

Finally, we create the main asynchronous loop function and run the program asynchronously like we did in the previous example.

# Run the async program
try:
    print("Starting transceiver...")
    asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
    print("Stopping transceiver...")
    e.active(False)
    sta.active(False)
except Exception as err:
    print("Main loop error:", err)
finally:
    print("Cleaning up...")
    e.active(False)
    sta.active(False)

Uploading the Code to Your Boards

Important note: just running the file with Thonny doesn’t copy it permanently to the board’s filesystem. This means that if you unplug it from your computer and apply power to the board, nothing will happen because it doesn’t have any Python file saved on its filesystem. The Thonny IDE Run function is useful to test the code, but if you want to upload it permanently to your board, you need to create and save a file to the board’s filesystem.

To run the code on your boards without being connected to your computer, you must upload it to the board’s filesystem with the name main.py.

Go to File > Save as... MicroPython Device.

Thonny IDE ESP32 ESP8266 MicroPython Save file library to device select

Call that file main.py and save it on the board.

Save main.py file to MicroPython device - Thonny IDE

Demonstration

After uploading the code to both boards, you should see the OLED displaying the sensor readings from the other board, as well as a “Delivery Success” message.

ESP32 Exchange BME280 Sensor Readings via ESP-NOW

Wrapping Up

We hope you’ve found this guide useful. The ESP-NOW wireless communication protocol is one of the easiest methods to communicate between ESP32 boards remotely without the need for a Wi-Fi router.

In this tutorial, you learned how to establish a two-way communication between boards. You can easily add more boards to your setup by adding more peers to create a network of ESP32 boards that communicate with each other.

If you’re just getting started with ESP-NOW, we recommend starting with our ESP-NOW Getting Started Guide for the ESP32 Programmed with MicroPython.

Want to learn more about MicroPython with the ESP32? Check out our resources:

Thanks for reading.



Learn how to build a home automation system and we’ll cover the following main subjects: Node-RED, Node-RED Dashboard, Raspberry Pi, ESP32, ESP8266, MQTT, and InfluxDB database DOWNLOAD »
Learn how to build a home automation system and we’ll cover the following main subjects: Node-RED, Node-RED Dashboard, Raspberry Pi, ESP32, ESP8266, MQTT, and InfluxDB database DOWNLOAD »

Recommended Resources

Build a Home Automation System from Scratch » With Raspberry Pi, ESP8266, Arduino, and Node-RED.

Home Automation using ESP8266 eBook and video course » Build IoT and home automation projects.

Arduino Step-by-Step Projects » Build 25 Arduino projects with our course, even with no prior experience!

What to Read Next…


Enjoyed this project? Stay updated by subscribing our newsletter!

1 thought on “MicroPython: ESP32 ESP-NOW Two-Way Communication”

  1. Love this, I can already think of some great uses! Very clearly written as always.

    How much extra work would it be to make this 2 way communication work with device 2 being in a remote location on another network?

    Reply

Leave a Comment

Download Our Free eBooks and Resources

Get instant access to our FREE eBooks, Resources, and Exclusive Electronics Projects by entering your email address below.