Learn how to establish a two-way communication between two ESP32 boards using ESP-NOW communication protocol. First, we’ll test a simple example to show you how to implement two-way communication. Finally, we’ll build a more complex project, in which we exchange sensor readings between boards and display the results on an OLED display.

Using Arduino IDE? Follow this tutorial instead: ESP-NOW Two-Way Communication Between ESP32 Boards.
Table of Contents
In this tutorial, we’ll cover the following subjects:
- Introducing ESP-NOW
- ESP32: Getting Board MAC Address
- ESP-NOW Two-Way Communication Between Two ESP32 Boards (MicroPython)
- ESP32 ESP-NOW Two-Way Communication – Exchange Sensor Readings and Display on OLED
Prerequisites – MicroPython Firmware
To follow this tutorial, you need MicroPython firmware installed on your ESP32 or ESP8266 boards. You also need an IDE to write and upload the code to your board. We suggest using Thonny IDE:
New to MicroPython? Check out our eBook: MicroPython Programming with ESP32 and ESP8266 eBook (2nd Edition)
Introducing ESP-NOW
ESP-NOW is a wireless communication protocol developed by Espressif that allows multiple ESP32 or ESP8266 boards to exchange small amounts of data without using Wi-Fi or Bluetooth. ESP-NOW does not require a full Wi-Fi connection (though the Wi-Fi controller must be turned on), making it ideal for low-power and low-latency applications like sensor networks, remote controls, or data exchange between boards.

ESP-NOW uses a connectionless communication model, meaning devices can send and receive data without connecting to a router or setting up an access point (unlike HTTP communication between boards). It supports unicast (sending data to a specific device using its MAC address) and broadcast (sending data to all nearby devices using a broadcast MAC address) messaging.
New to ESP-NOW? Read our getting started guide for MicroPython using ESP-NOW with ESP32.
ESP32: Getting Board MAC Address
To communicate via ESP-NOW, you need to know the MAC address of your boards. To get your board’s MAC Address, you can copy the following code to Thonny IDE and run it on your board.
# Rui Santos & Sara Santos - Random Nerd Tutorials
# Complete project details at https://RandomNerdTutorials.com/micropython-esp-now-esp32/
import network
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
# Get MAC address (returns bytes)
mac = wlan.config('mac')
# Convert to human-readable format
mac_address = ':'.join('%02x' % b for b in mac)
print("MAC Address:", mac_address)
After running the code, it should print the board’s MAC address on the shell.

We recommend that you add a label or sticker to each board so that you can clearly identify them.

We’ll be using an ESP32 DOIT V1 board and an ESP32 S3 DevKitC, but this should be compatible with any ESP32 model.
ESP-NOW Two-Way Communication Between Two ESP32 Boards (MicroPython)
In this section, we’ll show you a basic example on how to exchange simple messages between ESP32 boards using ESP-NOW communication protocol. Each board is at the same time a receiver and transmitter, so we can call them transceivers.
There are two ESP-NOW MicroPython libraries included by default now in the current MicroPython firmware versions: espnow, and aioespnow. We’ll use the aioespnow, which is the asynchronous version of the espnow library.

Copy the following code to Thonny IDE.
# Rui Santos & Sara Santos - Random Nerd Tutorials
# Complete project details at https://RandomNerdTutorials.com/micropython-esp32-esp-now-two-way/
import network
import aioespnow
import asyncio
import time
# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(channel=1) # Set channel explicitly if packets are not received
sta.disconnect()
# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
e.active(True)
except OSError as err:
print("Failed to initialize AIOESPNow:", err)
raise
# Peer MAC address (replace with the actual MAC of the other board)
peer_mac = b'\xff\xff\xff\xff\xff\xff' # Example peer MAC for unicast
# Add peer for unicast reliability
try:
e.add_peer(peer_mac)
except OSError as err:
print("Failed to add peer:", err)
raise
# Stats tracking
last_stats_time = time.time()
stats_interval = 10 # Print stats every 10 seconds
# Async function to send messages
async def send_messages(e, peer):
message_count = 0
while True:
try:
message = f"Hello from ESP32 #{message_count}"
if await e.asend(peer, message, sync=True):
print(f"Sent message: {message}")
else:
print("Failed to send message")
message_count += 1
await asyncio.sleep(1) # Send every 1 second
except OSError as err:
print("Send error:", err)
await asyncio.sleep(5)
# Async function to receive messages
async def receive_messages(e):
while True:
try:
async for mac, msg in e:
print(f"Received from {mac.hex()}: {msg.decode()}")
except OSError as err:
print("Receive error:", err)
await asyncio.sleep(5)
# Async function to print stats periodically
async def print_stats(e):
global last_stats_time
while True:
if time.time() - last_stats_time >= stats_interval:
stats = e.stats()
print("\nESP-NOW Statistics:")
print(f" Packets Sent: {stats[0]}")
print(f" Packets Delivered: {stats[1]}")
print(f" Packets Dropped (TX): {stats[2]}")
print(f" Packets Received: {stats[3]}")
print(f" Packets Dropped (RX): {stats[4]}")
last_stats_time = time.time()
await asyncio.sleep(1) # Check every second
# Main async function
async def main(e, peer):
# Run send, receive, and stats tasks concurrently
await asyncio.gather(send_messages(e, peer), receive_messages(e), print_stats(e))
# Run the async program
try:
asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
print("Stopping transceiver...")
e.active(False)
sta.active(False)
This code sets the ESP32 board as an ESP-NOW receiver and transmitter. In the code, you must insert the MAC address of the board to which you want to send data.
For example, if the receiver board MAC address is 30:AE:A4:F6:7D:4C. You need to convert it to bytes format as follows:
- 30:AE:A4:F6:7D:4C » b’\x30\xae\xa4\xf6\x7d\x4c’
# Peer MAC address
receiver_mac = b'\x30\xae\xa4\xf6\x7d\x4c'
Upload this code to both boards, but adjust the MAC address.
How Does the Code Work?
To better understand this code, we recommend that you get familiar with MicroPython asynchronous programming first. You can read the following guide:
Importing Modules
First, import the required modules.
import network
import aioespnow
import asyncio
import time
Initialize the Wi-Fi Interface
Then, we need to initialize Wi-Fi (even if we don’t use it) to use ESP-NOW. We can use station (STA_IF) or access point mode (AP_IF).
# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(channel=1) # Set channel
sta.disconnect()
Initialize ESP-NOW
Then, we can initialize ESP-NOW. First, create an aioespnow instance called e. Then activate it using the active() method and passing the True value as argument.
# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
e.active(True)
print("AIOESPNow initialized")
except OSError as err:
print("Failed to initialize AIOESPNow:", err)
raise
We activate ESP-NOW inside a try and except statements so that we can catch any errors if the initialization fails.
Add ESP-NOW Peer
Insert the peer MAC address (the MAC address of the board you want to send data to).
# Receiver MAC address (the board you want to send data to)
peer_mac = b'\x68\xb6\xb3\x22\x9e\x60'
Then, we can add the receiver’s MAC address as a peer using the add_peer() method.
try:
e.add_peer(peer_mac)
except OSError as err:
print("Failed to add peer:", err)
raise
Function to Send ESP-NOW Messages
The following function sends ESP-NOW messages to the peer. Pass as argument the ESP-NOW instance e and the peer. As an example, we’ll send a Hello from ESP32 message followed by a counter. We send a new message every second.
# Async function to send messages
async def send_messages(e, peer):
message_count = 0
while True:
try:
message = f"Hello from ESP32 #{message_count}"
if await e.asend(peer, message, sync=True):
print(f"Sent message: {message}")
else:
print("Failed to send message")
message_count += 1
await asyncio.sleep(1) # Send every 1 second
except OSError as err:
print("Send error:", err)
await asyncio.sleep(5)
Function to Receive ESP-NOW Messages
We also need to create a function to receive the ESP-NOW messages. When there’s a new message, we print it in the MicroPython shell.
# Async function to receive messages
async def receive_messages(e):
while True:
try:
async for mac, msg in e:
print(f"Received from {mac.hex()}: {msg.decode()}")
except OSError as err:
print("Receive error:", err)
await asyncio.sleep(5)
Print ESP-NOW Statistics
Then, we create a message to call later in the code, called print_stats() that prints current statistics about the ESP-NOW packets. To get the number of packets sent/received and lost, we can call the stats() method on the ESP-NOW e object.
This returns a 5-tuple containing the number of packets sent/received/lost:
(tx_pkts, tx_responses, tx_failures, rx_packets, rx_dropped_packets)
This is the complete asynchronous function.
async def print_stats(e):
global last_stats_time
while True:
if time.time() - last_stats_time >= stats_interval:
stats = e.stats()
print("\nESP-NOW Statistics:")
print(f" Packets Sent: {stats[0]}")
print(f" Packets Delivered: {stats[1]}")
print(f" Packets Dropped (TX): {stats[2]}")
print(f" Packets Received: {stats[3]}")
print(f" Packets Dropped (RX): {stats[4]}")
last_stats_time = time.time()
await asyncio.sleep(1) # Check every second
Main Async Function
The following line defines an asynchronous function main() that takes the ESP-NOW object e and peer MAC address peer as arguments.
async def main(e, peer):
Then, we use asyncio.gather to run three async tasks concurrently:
- send_messages(e, peer): sends BME280 sensor data to the peer;
- receive_messages(e): listens for incoming data from the peer;
- print_stats(e): periodically prints ESP-NOW statistics.
await asyncio.gather(send_messages(e, peer), receive_messages(e), print_stats(e))
Running the Async Program
Finally, the following lines will run the program asynchronously.
# Run the async program
try:
asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
print("Stopping transceiver...")
e.active(False)
sta.active(False)
This command asyncio.run(main(e, peer_mac)) starts the MicroPython asyncio event loop, executing the main() function.
This goes inside try and except statements to catch a KeyboardInterrupt (manually stopping the code). When this happens, we disable ESP-NOW and turn off Wi-Fi.
Running the Code
After connecting the board to your computer and establishing a communication with Thonny IDE, you can upload the code as main.py to the board or run it using the green Run button. Make sure you’ve inserted the receiver’s MAC address in the code.

It will start printing messages in the Shell. It will fail to send the message while the other board is still not ready.
Open another instance of Thonny IDE, and establish a communication with the other board.
Enabling Two Instances of Thonny IDE
Go to Tools > Options and untick the option Allow only single Thonny instance.
Copy the same code, but insert the other board MAC address. After a few seconds, it will start sending and receiving ESP-NOW messages.

The following screenshots show the MicroPython Shell for each of my boards.


You can see that after powering both boards, they’ll start exchanging messages with each other.
ESP32 ESP-NOW Two-Way Communication – Exchange Sensor Readings and Display on OLED
In this section, we’ll build a project with a real-world application in which the two ESP32 boards exchange sensor readings and display the received data on an OLED display.

Project Overview
The following diagram shows a high-level overview of the project we’ll build.

- In this project we’ll have two ESP32 boards. Each board is connected to an OLED display and a BME280 sensor;
- Each board gets temperature, humidity, and pressure readings from its corresponding sensors;
- Each board sends its readings to the other board via ESP-NOW;
- When a board receives the readings, it displays them on the OLED display;
- After sending the readings, the board displays on the OLED if the message was successfully delivered;
- Each board needs to know the other board MAC address in order to send the message.
In this example, we’re using a two-way communication between two boards, but you can add more boards to this setup, and having all boards communicating with each other.
Parts Required
For this tutorial, you need the following parts:
- 2x ESP32 development boards (read Best ESP32 boards)
- 2x BME280 sensors (BME280 Complete Guide)
- 2x 0.96 inch OLED displays (OLED Complete Guide)
- Breadboard
- Jumper wires
You can use the preceding links or go directly to MakerAdvisor.com/tools to find all the parts for your projects at the best price!
Schematic Diagram
Wire an OLED display and a BME280 sensor to each ESP32 board. Follow the next schematic diagram (adjust if using an ESP32 model with a different pinout).

You can use the following table as a reference when wiring the BME280 sensor.
BME280 | ESP32 |
VIN | 3.3V |
GND | GND |
SCL | GPIO 22 |
SDA | GPIO 21 |
You can also follow the next table to wire the OLED display to the ESP32.
OLED Display | ESP32 |
GND | GND |
VDD / VCC | VIN |
SCK / SCL | GPIO 22 |
SDA | GPIO 21 |
Importing Libraries
The libraries to interface with the OLED display and get BME280 data are not part of the standard MicroPython package. So, we need to import those modules into our boards.
ssd1306.py MicroPython Module
1. Copy the following code to a new file on Thonny IDE.
# MicroPython SSD1306 OLED driver, I2C and SPI interfaces created by Adafruit
import time
import framebuf
# register definitions
SET_CONTRAST = const(0x81)
SET_ENTIRE_ON = const(0xa4)
SET_NORM_INV = const(0xa6)
SET_DISP = const(0xae)
SET_MEM_ADDR = const(0x20)
SET_COL_ADDR = const(0x21)
SET_PAGE_ADDR = const(0x22)
SET_DISP_START_LINE = const(0x40)
SET_SEG_REMAP = const(0xa0)
SET_MUX_RATIO = const(0xa8)
SET_COM_OUT_DIR = const(0xc0)
SET_DISP_OFFSET = const(0xd3)
SET_COM_PIN_CFG = const(0xda)
SET_DISP_CLK_DIV = const(0xd5)
SET_PRECHARGE = const(0xd9)
SET_VCOM_DESEL = const(0xdb)
SET_CHARGE_PUMP = const(0x8d)
class SSD1306:
def __init__(self, width, height, external_vcc):
self.width = width
self.height = height
self.external_vcc = external_vcc
self.pages = self.height // 8
# Note the subclass must initialize self.framebuf to a framebuffer.
# This is necessary because the underlying data buffer is different
# between I2C and SPI implementations (I2C needs an extra byte).
self.poweron()
self.init_display()
def init_display(self):
for cmd in (
SET_DISP | 0x00, # off
# address setting
SET_MEM_ADDR, 0x00, # horizontal
# resolution and layout
SET_DISP_START_LINE | 0x00,
SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0
SET_MUX_RATIO, self.height - 1,
SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0
SET_DISP_OFFSET, 0x00,
SET_COM_PIN_CFG, 0x02 if self.height == 32 else 0x12,
# timing and driving scheme
SET_DISP_CLK_DIV, 0x80,
SET_PRECHARGE, 0x22 if self.external_vcc else 0xf1,
SET_VCOM_DESEL, 0x30, # 0.83*Vcc
# display
SET_CONTRAST, 0xff, # maximum
SET_ENTIRE_ON, # output follows RAM contents
SET_NORM_INV, # not inverted
# charge pump
SET_CHARGE_PUMP, 0x10 if self.external_vcc else 0x14,
SET_DISP | 0x01): # on
self.write_cmd(cmd)
self.fill(0)
self.show()
def poweroff(self):
self.write_cmd(SET_DISP | 0x00)
def contrast(self, contrast):
self.write_cmd(SET_CONTRAST)
self.write_cmd(contrast)
def invert(self, invert):
self.write_cmd(SET_NORM_INV | (invert & 1))
def show(self):
x0 = 0
x1 = self.width - 1
if self.width == 64:
# displays with width of 64 pixels are shifted by 32
x0 += 32
x1 += 32
self.write_cmd(SET_COL_ADDR)
self.write_cmd(x0)
self.write_cmd(x1)
self.write_cmd(SET_PAGE_ADDR)
self.write_cmd(0)
self.write_cmd(self.pages - 1)
self.write_framebuf()
def fill(self, col):
self.framebuf.fill(col)
def pixel(self, x, y, col):
self.framebuf.pixel(x, y, col)
def scroll(self, dx, dy):
self.framebuf.scroll(dx, dy)
def text(self, string, x, y, col=1):
self.framebuf.text(string, x, y, col)
class SSD1306_I2C(SSD1306):
def __init__(self, width, height, i2c, addr=0x3c, external_vcc=False):
self.i2c = i2c
self.addr = addr
self.temp = bytearray(2)
# Add an extra byte to the data buffer to hold an I2C data/command byte
# to use hardware-compatible I2C transactions. A memoryview of the
# buffer is used to mask this byte from the framebuffer operations
# (without a major memory hit as memoryview doesn't copy to a separate
# buffer).
self.buffer = bytearray(((height // 8) * width) + 1)
self.buffer[0] = 0x40 # Set first byte of data buffer to Co=0, D/C=1
self.framebuf = framebuf.FrameBuffer1(memoryview(self.buffer)[1:], width, height)
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.temp[0] = 0x80 # Co=1, D/C#=0
self.temp[1] = cmd
self.i2c.writeto(self.addr, self.temp)
def write_framebuf(self):
# Blast out the frame buffer using a single I2C transaction to support
# hardware I2C interfaces.
self.i2c.writeto(self.addr, self.buffer)
def poweron(self):
pass
class SSD1306_SPI(SSD1306):
def __init__(self, width, height, spi, dc, res, cs, external_vcc=False):
self.rate = 10 * 1024 * 1024
dc.init(dc.OUT, value=0)
res.init(res.OUT, value=0)
cs.init(cs.OUT, value=1)
self.spi = spi
self.dc = dc
self.res = res
self.cs = cs
self.buffer = bytearray((height // 8) * width)
self.framebuf = framebuf.FrameBuffer1(self.buffer, width, height)
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.spi.init(baudrate=self.rate, polarity=0, phase=0)
self.cs.high()
self.dc.low()
self.cs.low()
self.spi.write(bytearray([cmd]))
self.cs.high()
def write_framebuf(self):
self.spi.init(baudrate=self.rate, polarity=0, phase=0)
self.cs.high()
self.dc.high()
self.cs.low()
self.spi.write(self.buffer)
self.cs.high()
def poweron(self):
self.res.high()
time.sleep_ms(1)
self.res.low()
time.sleep_ms(10)
self.res.high()
2. Go to File > Save as and select MicroPython device.

3. Name the file ssd1306.py and click OK to save the file on the ESP Filesystem.
And that’s it. The library was uploaded to your board.
BME280.py MicroPython Module
1. Copy the following code to a new file on Thonny IDE.
from machine import I2C
import time
# BME280 default address.
BME280_I2CADDR = 0x76
# Operating Modes
BME280_OSAMPLE_1 = 1
BME280_OSAMPLE_2 = 2
BME280_OSAMPLE_4 = 3
BME280_OSAMPLE_8 = 4
BME280_OSAMPLE_16 = 5
# BME280 Registers
BME280_REGISTER_DIG_T1 = 0x88 # Trimming parameter registers
BME280_REGISTER_DIG_T2 = 0x8A
BME280_REGISTER_DIG_T3 = 0x8C
BME280_REGISTER_DIG_P1 = 0x8E
BME280_REGISTER_DIG_P2 = 0x90
BME280_REGISTER_DIG_P3 = 0x92
BME280_REGISTER_DIG_P4 = 0x94
BME280_REGISTER_DIG_P5 = 0x96
BME280_REGISTER_DIG_P6 = 0x98
BME280_REGISTER_DIG_P7 = 0x9A
BME280_REGISTER_DIG_P8 = 0x9C
BME280_REGISTER_DIG_P9 = 0x9E
BME280_REGISTER_DIG_H1 = 0xA1
BME280_REGISTER_DIG_H2 = 0xE1
BME280_REGISTER_DIG_H3 = 0xE3
BME280_REGISTER_DIG_H4 = 0xE4
BME280_REGISTER_DIG_H5 = 0xE5
BME280_REGISTER_DIG_H6 = 0xE6
BME280_REGISTER_DIG_H7 = 0xE7
BME280_REGISTER_CHIPID = 0xD0
BME280_REGISTER_VERSION = 0xD1
BME280_REGISTER_SOFTRESET = 0xE0
BME280_REGISTER_CONTROL_HUM = 0xF2
BME280_REGISTER_CONTROL = 0xF4
BME280_REGISTER_CONFIG = 0xF5
BME280_REGISTER_PRESSURE_DATA = 0xF7
BME280_REGISTER_TEMP_DATA = 0xFA
BME280_REGISTER_HUMIDITY_DATA = 0xFD
class Device:
"""Class for communicating with an I2C device.
Allows reading and writing 8-bit, 16-bit, and byte array values to
registers on the device."""
def __init__(self, address, i2c):
"""Create an instance of the I2C device at the specified address using
the specified I2C interface object."""
self._address = address
self._i2c = i2c
def writeRaw8(self, value):
"""Write an 8-bit value on the bus (without register)."""
value = value & 0xFF
self._i2c.writeto(self._address, value)
def write8(self, register, value):
"""Write an 8-bit value to the specified register."""
b=bytearray(1)
b[0]=value & 0xFF
self._i2c.writeto_mem(self._address, register, b)
def write16(self, register, value):
"""Write a 16-bit value to the specified register."""
value = value & 0xFFFF
b=bytearray(2)
b[0]= value & 0xFF
b[1]= (value>>8) & 0xFF
self.i2c.writeto_mem(self._address, register, value)
def readRaw8(self):
"""Read an 8-bit value on the bus (without register)."""
return int.from_bytes(self._i2c.readfrom(self._address, 1),'little') & 0xFF
def readU8(self, register):
"""Read an unsigned byte from the specified register."""
return int.from_bytes(
self._i2c.readfrom_mem(self._address, register, 1),'little') & 0xFF
def readS8(self, register):
"""Read a signed byte from the specified register."""
result = self.readU8(register)
if result > 127:
result -= 256
return result
def readU16(self, register, little_endian=True):
"""Read an unsigned 16-bit value from the specified register, with the
specified endianness (default little endian, or least significant byte
first)."""
result = int.from_bytes(
self._i2c.readfrom_mem(self._address, register, 2),'little') & 0xFFFF
if not little_endian:
result = ((result << 8) & 0xFF00) + (result >> 8)
return result
def readS16(self, register, little_endian=True):
"""Read a signed 16-bit value from the specified register, with the
specified endianness (default little endian, or least significant byte
first)."""
result = self.readU16(register, little_endian)
if result > 32767:
result -= 65536
return result
def readU16LE(self, register):
"""Read an unsigned 16-bit value from the specified register, in little
endian byte order."""
return self.readU16(register, little_endian=True)
def readU16BE(self, register):
"""Read an unsigned 16-bit value from the specified register, in big
endian byte order."""
return self.readU16(register, little_endian=False)
def readS16LE(self, register):
"""Read a signed 16-bit value from the specified register, in little
endian byte order."""
return self.readS16(register, little_endian=True)
def readS16BE(self, register):
"""Read a signed 16-bit value from the specified register, in big
endian byte order."""
return self.readS16(register, little_endian=False)
class BME280:
def __init__(self, mode=BME280_OSAMPLE_1, address=BME280_I2CADDR, i2c=None,
**kwargs):
# Check that mode is valid.
if mode not in [BME280_OSAMPLE_1, BME280_OSAMPLE_2, BME280_OSAMPLE_4,
BME280_OSAMPLE_8, BME280_OSAMPLE_16]:
raise ValueError(
'Unexpected mode value {0}. Set mode to one of '
'BME280_ULTRALOWPOWER, BME280_STANDARD, BME280_HIGHRES, or '
'BME280_ULTRAHIGHRES'.format(mode))
self._mode = mode
# Create I2C device.
if i2c is None:
raise ValueError('An I2C object is required.')
self._device = Device(address, i2c)
# Load calibration values.
self._load_calibration()
self._device.write8(BME280_REGISTER_CONTROL, 0x3F)
self.t_fine = 0
def _load_calibration(self):
self.dig_T1 = self._device.readU16LE(BME280_REGISTER_DIG_T1)
self.dig_T2 = self._device.readS16LE(BME280_REGISTER_DIG_T2)
self.dig_T3 = self._device.readS16LE(BME280_REGISTER_DIG_T3)
self.dig_P1 = self._device.readU16LE(BME280_REGISTER_DIG_P1)
self.dig_P2 = self._device.readS16LE(BME280_REGISTER_DIG_P2)
self.dig_P3 = self._device.readS16LE(BME280_REGISTER_DIG_P3)
self.dig_P4 = self._device.readS16LE(BME280_REGISTER_DIG_P4)
self.dig_P5 = self._device.readS16LE(BME280_REGISTER_DIG_P5)
self.dig_P6 = self._device.readS16LE(BME280_REGISTER_DIG_P6)
self.dig_P7 = self._device.readS16LE(BME280_REGISTER_DIG_P7)
self.dig_P8 = self._device.readS16LE(BME280_REGISTER_DIG_P8)
self.dig_P9 = self._device.readS16LE(BME280_REGISTER_DIG_P9)
self.dig_H1 = self._device.readU8(BME280_REGISTER_DIG_H1)
self.dig_H2 = self._device.readS16LE(BME280_REGISTER_DIG_H2)
self.dig_H3 = self._device.readU8(BME280_REGISTER_DIG_H3)
self.dig_H6 = self._device.readS8(BME280_REGISTER_DIG_H7)
h4 = self._device.readS8(BME280_REGISTER_DIG_H4)
h4 = (h4 << 24) >> 20
self.dig_H4 = h4 | (self._device.readU8(BME280_REGISTER_DIG_H5) & 0x0F)
h5 = self._device.readS8(BME280_REGISTER_DIG_H6)
h5 = (h5 << 24) >> 20
self.dig_H5 = h5 | (
self._device.readU8(BME280_REGISTER_DIG_H5) >> 4 & 0x0F)
def read_raw_temp(self):
"""Reads the raw (uncompensated) temperature from the sensor."""
meas = self._mode
self._device.write8(BME280_REGISTER_CONTROL_HUM, meas)
meas = self._mode << 5 | self._mode << 2 | 1
self._device.write8(BME280_REGISTER_CONTROL, meas)
sleep_time = 1250 + 2300 * (1 << self._mode)
sleep_time = sleep_time + 2300 * (1 << self._mode) + 575
sleep_time = sleep_time + 2300 * (1 << self._mode) + 575
time.sleep_us(sleep_time) # Wait the required time
msb = self._device.readU8(BME280_REGISTER_TEMP_DATA)
lsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 1)
xlsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 2)
raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4
return raw
def read_raw_pressure(self):
"""Reads the raw (uncompensated) pressure level from the sensor."""
"""Assumes that the temperature has already been read """
"""i.e. that enough delay has been provided"""
msb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA)
lsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 1)
xlsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 2)
raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4
return raw
def read_raw_humidity(self):
"""Assumes that the temperature has already been read """
"""i.e. that enough delay has been provided"""
msb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA)
lsb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA + 1)
raw = (msb << 8) | lsb
return raw
def read_temperature(self):
"""Get the compensated temperature in 0.01 of a degree celsius."""
adc = self.read_raw_temp()
var1 = ((adc >> 3) - (self.dig_T1 << 1)) * (self.dig_T2 >> 11)
var2 = ((
(((adc >> 4) - self.dig_T1) * ((adc >> 4) - self.dig_T1)) >> 12) *
self.dig_T3) >> 14
self.t_fine = var1 + var2
return (self.t_fine * 5 + 128) >> 8
def read_pressure(self):
"""Gets the compensated pressure in Pascals."""
adc = self.read_raw_pressure()
var1 = self.t_fine - 128000
var2 = var1 * var1 * self.dig_P6
var2 = var2 + ((var1 * self.dig_P5) << 17)
var2 = var2 + (self.dig_P4 << 35)
var1 = (((var1 * var1 * self.dig_P3) >> 8) +
((var1 * self.dig_P2) >> 12))
var1 = (((1 << 47) + var1) * self.dig_P1) >> 33
if var1 == 0:
return 0
p = 1048576 - adc
p = (((p << 31) - var2) * 3125) // var1
var1 = (self.dig_P9 * (p >> 13) * (p >> 13)) >> 25
var2 = (self.dig_P8 * p) >> 19
return ((p + var1 + var2) >> 8) + (self.dig_P7 << 4)
def read_humidity(self):
adc = self.read_raw_humidity()
# print 'Raw humidity = {0:d}'.format (adc)
h = self.t_fine - 76800
h = (((((adc << 14) - (self.dig_H4 << 20) - (self.dig_H5 * h)) +
16384) >> 15) * (((((((h * self.dig_H6) >> 10) * (((h *
self.dig_H3) >> 11) + 32768)) >> 10) + 2097152) *
self.dig_H2 + 8192) >> 14))
h = h - (((((h >> 15) * (h >> 15)) >> 7) * self.dig_H1) >> 4)
h = 0 if h < 0 else h
h = 419430400 if h > 419430400 else h
return h >> 12
@property
def temperature(self):
"Return the temperature in degrees."
t = self.read_temperature()
ti = t // 100
td = t - ti * 100
return "{}.{:02d}C".format(ti, td)
@property
def pressure(self):
"Return the temperature in hPa."
p = self.read_pressure() // 256
pi = p // 100
pd = p - pi * 100
return "{}.{:02d}hPa".format(pi, pd)
@property
def humidity(self):
"Return the humidity in percent."
h = self.read_humidity()
hi = h // 1024
hd = h * 100 // 1024 - hi * 100
return "{}.{:02d}%".format(hi, hd)
2. Go to File > Save as…

3. Select save to “MicroPython device“:

4. Name your file as BME280.py and press the OK button:

And that’s it. The library was uploaded to your board.
ESP32 Exchange BME280 Sensor Readings via ESP-NOW – MicroPython Code
Now that you have all the required modules uploaded to your boards, you can upload the following code to each of your boards.
Important: don’t forget to insert the receiver’s MAC address on the code.
# Rui Santos & Sara Santos - Random Nerd Tutorials
# Complete project details at https://RandomNerdTutorials.com/micropython-esp32-esp-now-two-way/
import network
import aioespnow
import asyncio
import time
import ujson
from machine import Pin, I2C
import BME280
import ssd1306
# Initialize I2C for BME280 and SSD1306
i2c = I2C(0, scl=Pin(22), sda=Pin(21))
# Initialize BME280 sensor
try:
bme = BME280.BME280(i2c=i2c, address=0x76)
print("BME280 initialized")
except Exception as err:
print("Failed to initialize BME280:", err)
raise
# Initialize SSD1306 OLED display
try:
display = ssd1306.SSD1306_I2C(128, 64, i2c, addr=0x3C)
print("SSD1306 initialized")
except Exception as err:
print("Failed to initialize SSD1306:", err)
raise
# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(channel=1) # set Wi-Fi channel for more stable communication
sta.disconnect()
print("Wi-Fi initialized")
# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
e.active(True)
print("AIOESPNow initialized")
except OSError as err:
print("Failed to initialize AIOESPNow:", err)
raise
# Receiver MAC address (the board you want to send data to)
peer_mac = b'\xff\xff\xff\xff\xff\xff'
# Add peer
try:
e.add_peer(peer_mac)
except OSError as err:
print("Failed to add peer:", err)
raise
# Variables to store readings and status
last_send_status = " "
incoming_readings = {'temp': 0.0, 'hum': 0.0, 'pres': 0.0}
# Function to get BME280 readings
def get_readings():
try:
temp = float(bme.temperature[:-1]) # Remove 'C'
hum = float(bme.humidity[:-1]) # Remove '%'
pres = float(bme.pressure[:-3]) # Remove 'hPa'
print("BME280 readings:", temp, hum, pres)
return temp, hum, pres
except Exception as err:
print("Error reading BME280:", err)
return 0.0, 0.0, 0.0
# Function to update OLED display
def update_display():
try:
display.fill(0)
display.text("INCOM. READINGS", 0, 0)
display.text("Temp: {:.1f} C".format(incoming_readings['temp']), 0, 15)
display.text("Hum: {:.1f} %".format(incoming_readings['hum']), 0, 25)
display.text("Pres: {:.1f} hPa".format(incoming_readings['pres']), 0, 35)
display.text(last_send_status, 0, 55)
display.show()
print("Display updated")
except Exception as err:
print("Error updating display:", err)
# Async function to send messages
async def send_messages(e, peer):
global last_send_status
while True:
try:
print("Sending data")
temp, hum, pres = get_readings()
# Create JSON string
data_dict = {"temp": temp, "hum": hum, "pres": pres}
json_str = ujson.dumps(data_dict)
data = json_str.encode('utf-8') # Convert to bytes
print("Sending JSON:", json_str)
if await e.asend(peer, data, sync=True):
print("Sent with success")
last_send_status = "Delivery Success :)"
else:
print("Send failed")
last_send_status = "Delivery Fail :("
update_display()
print("Sending task complete")
await asyncio.sleep(10) # Send every 10 seconds
except OSError as err:
print("Send error:", err)
last_send_status = "Delivery Fail :("
update_display()
await asyncio.sleep(0.1) # Shorter delay in case of error
# Async function to receive messages
async def receive_messages(e):
global incoming_readings
while True:
try:
print("Checking for messages")
async for mac, msg in e:
try:
# Decode bytes to string and parse JSON
json_str = msg.decode('utf-8')
data_dict = ujson.loads(json_str)
temp = data_dict['temp']
hum = data_dict['hum']
pres = data_dict['pres']
incoming_readings['temp'] = temp
incoming_readings['hum'] = hum
incoming_readings['pres'] = pres
print("\nINCOMING READINGS")
print("Temperature: {:.1f} ºC".format(temp))
print("Humidity: {:.1f} %".format(hum))
print("Pressure: {:.1f} hPa".format(pres))
update_display()
except (ValueError, KeyError) as err:
print("Error parsing JSON:", err)
await asyncio.sleep(0.01) # Yield if no received messages
except OSError as err:
print("Receive error:", err)
await asyncio.sleep(0.1) # Shorter delay in case of error
# Main async function
async def main(e, peer):
print("Starting main loop")
await asyncio.gather(send_messages(e, peer), receive_messages(e))
# Run the async program
try:
print("Starting transceiver...")
asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
print("Stopping transceiver...")
e.active(False)
sta.active(False)
except Exception as err:
print("Main loop error:", err)
finally:
print("Cleaning up...")
e.active(False)
sta.active(False)
How Does the Code Work?
Let’s take a quick look at how the code works. Alternatively, you can skip to the demonstration section.
Importing Libraries
We start by importing the required modules.
import network
import aioespnow
import asyncio
import time
import ujson
from machine import Pin, I2C
import BME280
import ssd1306
I2C Communication
Initialize an I2C communication on GPIOs 22 and 21. Adjust if you’re using different pins.
i2c = I2C(0, scl=Pin(22), sda=Pin(21))
BME280
Initialize the BME280 sensor.
# Initialize BME280 sensor
try:
bme = BME280.BME280(i2c=i2c, address=0x76)
print("BME280 initialized")
except Exception as err:
print("Failed to initialize BME280:", err)
raise
OLED Display
Initialize the OLED Display.
# Initialize SSD1306 OLED display
try:
display = ssd1306.SSD1306_I2C(128, 64, i2c, addr=0x3C)
print("SSD1306 initialized")
except Exception as err:
print("Failed to initialize SSD1306:", err)
raise
Initialize the Wi-Fi Interface
Then, we need to initialize Wi-Fi (even if we don’t use it) to use ESP-NOW. We can use station (STA_IF) or access point mode (AP_IF).
# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(channel=1) # Set channel
sta.disconnect()
Initialize ESP-NOW
Then, we can initialize ESP-NOW. First, create an aioespnow instance called e. Then activate it using the active() method and passing the True value as argument.
# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
e.active(True)
print("AIOESPNow initialized")
except OSError as err:
print("Failed to initialize AIOESPNow:", err)
raise
We activate ESP-NOW inside a try and except statements so that we can catch any errors if the initialization fails.
Add ESP-NOW Peer
Insert the peer MAC address (the MAC address of the board you want to send data to).
# Receiver MAC address (the board you want to send data to)
peer_mac = b'\x68\xb6\xb3\x22\x9e\x60'
Then, we can add the receiver’s MAC address as a peer using the add_peer() method.
try:
e.add_peer(peer_mac)
except OSError as err:
print("Failed to add peer:", err)
raise
Variables
Create variables to store the incoming sensor readings, and the state of the sending process. We save the incoming sensor readings in a dictionary.
last_send_status = " "
incoming_readings = {'temp': 0.0, 'hum': 0.0, 'pres': 0.0}
Get BME280 Sensor Readings
The get_readings() function returns the BME280 sensor readings in this order, temperature, humidity, and pressure. The functions from the BME280 library return the results with the unit character, that’s why we’re removing characters at the end of the readings.
In case there’s an error reading the sensor, it will return 0.0 for all readings.
# Function to get BME280 readings
def get_readings():
try:
temp = float(bme.temperature[:-1]) # Remove 'C'
hum = float(bme.humidity[:-1]) # Remove '%'
pres = float(bme.pressure[:-3]) # Remove 'hPa'
print("BME280 readings:", temp, hum, pres)
return temp, hum, pres
except Exception as err:
print("Error reading BME280:", err)
return 0.0, 0.0, 0.0
Update the OLED Display
The following function updates the display with the current incoming sensor readings. It also displays the last sending status.
# Function to update OLED display
def update_display():
try:
display.fill(0)
display.text("INCOM. READINGS", 0, 0)
display.text("Temp: {:.1f} C".format(incoming_readings['temp']), 0, 15)
display.text("Hum: {:.1f} %".format(incoming_readings['hum']), 0, 25)
display.text("Pres: {:.1f} hPa".format(incoming_readings['pres']), 0, 35)
display.text(last_send_status, 0, 55)
display.show()
print("Display updated")
except Exception as err:
print("Error updating display:", err)
To learn more about using the OLED display with the ESP32 programmed with MicroPython, check our guide: MicroPython: OLED Display with ESP32 and ESP8266.
The send_message(e, peer) function sends a message to the specified peer using an espnow instance e.
# Async function to send messages
async def send_messages(e, peer):
global last_send_status
while True:
try:
print("Sending data")
temp, hum, pres = get_readings()
# Create JSON string
data_dict = {"temp": temp, "hum": hum, "pres": pres}
json_str = ujson.dumps(data_dict)
data = json_str.encode('utf-8') # Convert to bytes
print("Sending JSON:", json_str)
if await e.asend(peer, data, sync=True):
print("Sent with success")
last_send_status = "Delivery Success :)"
else:
print("Send failed")
last_send_status = "Delivery Fail :("
update_display()
print("Sending task complete")
await asyncio.sleep(10) # Send every 10 seconds
except OSError as err:
print("Send error:", err)
last_send_status = "Delivery Fail :("
update_display()
await asyncio.sleep(0.1) # Shorter delay in case of error
In this function, we start by calling get_readings() to get the current temperature, humidity and pressure readings. Then, we update the dictionary with the current readings. We convert it to a JSON String and then to bytes.
temp, hum, pres = get_readings()
# Create JSON string
data_dict = {"temp": temp, "hum": hum, "pres": pres}
json_str = ujson.dumps(data_dict)
data = json_str.encode('utf-8') # Convert to bytes
Then, we use the same procedure we described previously to send data. During this process, we also update the display with the result of sending the data.
if await e.asend(peer, data, sync=True):
print("Sent with success")
last_send_status = "Delivery Success :)"
else:
print("Send failed")
last_send_status = "Delivery Fail :("
update_display()
print("Sending task complete")
await asyncio.sleep(10) # Send every 10 seconds
except OSError as err:
print("Send error:", err)
last_send_status = "Delivery Fail :("
update_display()
await asyncio.sleep(0.1) # Shorter delay in case of error
Receiving Incoming Data
The receive_messages() function will receive the data from the other board.
# Async function to receive messages
async def receive_messages(e):
global incoming_readings
while True:
try:
print("Checking for messages")
async for mac, msg in e:
try:
# Decode bytes to string and parse JSON
json_str = msg.decode('utf-8')
data_dict = ujson.loads(json_str)
temp = data_dict['temp']
hum = data_dict['hum']
pres = data_dict['pres']
incoming_readings['temp'] = temp
incoming_readings['hum'] = hum
incoming_readings['pres'] = pres
print("\nINCOMING READINGS")
print("Temperature: {:.1f} ºC".format(temp))
print("Humidity: {:.1f} %".format(hum))
print("Pressure: {:.1f} hPa".format(pres))
update_display()
except (ValueError, KeyError) as err:
print("Error parsing JSON:", err)
await asyncio.sleep(0.01) # Yield if no received messages
except OSError as err:
print("Receive error:", err)
await asyncio.sleep(0.1) # Shorter delay in case of error
When we receive the data, we convert it to a JSON string. Then, we update the incoming_readings dictionary with the received data.
Finally, we print the readings in the serial monitor and also call the update_display() function to update the screen with the latest sensor readings.
Creating the Async Loop Function and Running the Program
Finally, we create the main asynchronous loop function and run the program asynchronously like we did in the previous example.
# Run the async program
try:
print("Starting transceiver...")
asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
print("Stopping transceiver...")
e.active(False)
sta.active(False)
except Exception as err:
print("Main loop error:", err)
finally:
print("Cleaning up...")
e.active(False)
sta.active(False)
Uploading the Code to Your Boards
Important note: just running the file with Thonny doesn’t copy it permanently to the board’s filesystem. This means that if you unplug it from your computer and apply power to the board, nothing will happen because it doesn’t have any Python file saved on its filesystem. The Thonny IDE Run function is useful to test the code, but if you want to upload it permanently to your board, you need to create and save a file to the board’s filesystem.
To run the code on your boards without being connected to your computer, you must upload it to the board’s filesystem with the name main.py.
Go to File > Save as... MicroPython Device.

Call that file main.py and save it on the board.

Demonstration
After uploading the code to both boards, you should see the OLED displaying the sensor readings from the other board, as well as a “Delivery Success” message.

Wrapping Up
We hope you’ve found this guide useful. The ESP-NOW wireless communication protocol is one of the easiest methods to communicate between ESP32 boards remotely without the need for a Wi-Fi router.
In this tutorial, you learned how to establish a two-way communication between boards. You can easily add more boards to your setup by adding more peers to create a network of ESP32 boards that communicate with each other.
If you’re just getting started with ESP-NOW, we recommend starting with our ESP-NOW Getting Started Guide for the ESP32 Programmed with MicroPython.
Want to learn more about MicroPython with the ESP32? Check out our resources:
- Compilation of all our MicroPython Projects and Tutorials
- MicroPython Programming with ESP32 and ESP8266 eBook
Thanks for reading.