Hi
i am trying to connect atom lite and lorawan 915 lorawan module (via UART PORT TO UART POT OF ATOM LITE) to lorawan network and not having much luck. here is the code.
from machine import UART
import time
import ubinascii
LoRaWAN credentials
DEV_EUI = "XXXXXXXXXX"
APP_EUI = "9F8E71DD3AFE41XX"
APP_KEY = "b4b5c65a175d5f441ee85686b1253XXX"
UART configuration (G26, G32; change to G19, G22 if needed)
uart = UART(2, baudrate=115200, tx=26, rx=32, bits=8, parity=None, stop=1, txbuf=256, rxbuf=256)
State constants
NOT_JOINED = "NOT_JOINED"
JOINING = "JOINING"
JOINED = "JOINED"
SENDING = "SENDING"
SENT = "SENT"
RETRY = "RETRY"
Simple cooperative scheduler
class Tasks:
def init(self):
self.tasks = []
def now(self, func):
self.tasks.append((func, 0, False))
def after(self, delay_ms, func):
self.tasks.append((func, time.ticks_add(time.ticks_ms(), delay_ms), False))
def when_then(self, condition_func, then_func):
self.tasks.append((then_func, 0, condition_func))
def only_one_of(self, task1, task2):
self.tasks.append(task1)
self.tasks.append(task2)
self.tasks[-2] = (task1[0], task1[1], lambda: task1[2]() and not task2[2]())
self.tasks[-1] = (task2[0], task2[1], lambda: task2[2]() and not task1[2]())
def available(self):
return len(self.tasks) > 0
def run(self):
if not self.tasks:
return
current_time = time.ticks_ms()
for i, (func, trigger_time, condition) in enumerate(self.tasks[:]):
if condition:
if condition():
self.tasks.pop(i)
func()
break
elif trigger_time == 0 or time.ticks_diff(current_time, trigger_time) >= 0:
self.tasks.pop(i)
func()
break
time.sleep_ms(10)
Modem state management
class Modem:
def init(self, uart):
self.uart = uart
self.state = NOT_JOINED
self.state_changed = False
def send_at_command(self, cmd, timeout=1000):
print(f"Sending: {cmd}")
self.uart.write(cmd + "\r\n")
time.sleep_ms(timeout)
response = ""
while self.uart.any():
response += self.uart.read().decode()
print(f"Response: {response}")
return response
def send_join(self):
self.state = JOINING
self.state_changed = True
self.send_at_command("ATZ", 2000)
self.send_at_command("AT+BAND=5", 1000) # US915
self.send_at_command(f"AT+DEVEUI={DEV_EUI}", 1000)
self.send_at_command(f"AT+APPEUI={APP_EUI}", 1000)
self.send_at_command(f"AT+APPKEY={APP_KEY}", 1000)
self.send_at_command("AT+NJM=1", 1000) # OTAA
self.send_at_command("AT+CLASS=C", 1000)
self.send_at_command("AT+MASK=0002", 1000) # Channels 8-15
response = self.send_at_command("AT+JOIN=1:0:10:8", 10000)
if "OK Join Success" in response:
self.state = JOINED
else:
self.state = NOT_JOINED
return response
def send_message(self, data, confirmed=False):
self.state = SENDING
self.state_changed = True
hex_data = ubinascii.hexlify(data).decode()
self.send_at_command(f"AT+PORT=2", 1000)
cmd = f"AT+SEND={hex_data}" if not confirmed else f"AT+CSEND={hex_data}"
response = self.send_at_command(cmd, 5000)
if "OK" in response:
self.state = SENT
else:
self.state = RETRY
return response
def has_state_changed(self):
changed = self.state_changed
self.state_changed = False
return changed
def get_state(self):
return self.state
Initialize modem and tasks
modem = Modem(uart)
tasks = Tasks()
count = 0
Modem state check
def modem_state_changed():
try:
return modem.has_state_changed()
except Exception as e:
print(f"Modem state error: {e}")
return False
Join network
def start_join():
print("Joining TTN...")
try:
modem.send_join()
tasks.when_then(modem_state_changed, end_join)
except Exception as e:
print(f"Join error: {e}")
tasks.after(60000, start_join)
def end_join():
try:
state = modem.get_state()
if state == JOINING:
tasks.when_then(modem_state_changed, end_join)
elif state == JOINED:
print("Joined TTN!")
tasks.after(10000, start_send)
elif state == NOT_JOINED:
print("Join failed.")
tasks.after(60000, start_join)
else:
raise NotImplementedError(f"Unknown state: {state}")
except Exception as e:
print(f"End join error: {e}")
tasks.after(60000, start_join)
Send data
def start_send():
global count
count += 1
print(f"Sending data: {count}")
try:
modem.send_message(bytes(str(count), 'ASCII'), count % 29 == 1)
tasks.when_then(modem_state_changed, end_send)
except Exception as e:
print(f"Send error: {e}")
tasks.after(300000, start_send)
def end_send():
try:
state = modem.get_state()
if state == SENDING:
tasks.only_one_of(
(end_send, 0, modem_state_changed),
(assume_sent, time.ticks_add(time.ticks_ms(), 60000), lambda: True)
)
elif state == SENT:
print("Data sent!")
tasks.after(300000, start_send)
elif state == RETRY:
print("Send failed.")
tasks.after(300000, start_send)
elif state == NOT_JOINED:
print("Not joined.")
tasks.after(300000, start_join)
else:
raise NotImplementedError(f"Unknown state: {state}")
except Exception as e:
print(f"End send error: {e}")
tasks.after(300000, start_send)
def assume_sent():
print("Assumed data sent.")
tasks.after(240000, start_send)
Start the process
try:
tasks.now(start_join)
while tasks.available():
tasks.run()
except Exception as e:
print(f"Main loop error: {e}")
time.sleep(10)
machine.reset()