Топик, Издатель (Python)
Что вы узнаете в этом разделе:
- Как создать программу-издатель на Python
- Структуру базовой ROS2 программы на Python
- Как читать данные из системных файлов (температура CPU)
- Как публиковать данные в топик через Python
- Как использовать таймеры для периодической публикации
- Как проверить работу программы через консольные утилиты
Программируем на Python
Для быстрого старта мы начнем писать программы, без создания специального ROS-пакета, создание пакетов и "упаковывание" в них наши программ, мы рассмотрим позже.
Мы будем разрабатывать программы на языке
Python. Это один из основных языков разработки фреймворка ROS, который позволяет довольно легко создавать собственные программы для ROS.
Ранее мы обсудили некоторую условность, что все данные передаются через сообщения, в специальные топики. Давайте создадим программу Издатель, которая будет получать данные о температуры нашего процессора и передавать их в топик. Пускай это будет топик temp
Перед началом работы создайте на роботе папку ros2-base, в которой будете сохранять поурочно программы, которые будете изучать.
Создадим программу, которая публикует данные о температуре CPU. За основу возьмем пример официальной документации
#! /usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32
class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher')
self.publisher = self.create_publisher(Float32, 'temp', 10)
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
def timer_callback(self):
msg = Float32()
msg.data = self.getCPUTemp()
self.publisher.publish(msg)
self.get_logger().info(f'Publishing CPU temp: {msg.data}')
def getCPUTemp(self):
data = open('/sys/class/thermal/thermal_zone0/temp', 'r').read()
return round(float(int(data)/1000.0),1)
def main(args=None):
rclpy.init(args=args)
minimal_publisher = MinimalPublisher()
try:
rclpy.spin(minimal_publisher)
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main()
Пока не особо вдаваясь в подробности как именно работает программа, скопируем ее код в Visual Studio Code и сохраним его на вашем роботе (именно на нем мы хотим считать температуру) в файл/home/pi/ros2-base/chapter2/temp_topic_publisher.py
Запустим .py файл через ssh
ssh pi@turtlebro01.local
cd ros2-base/chapter2
python3 temp_topic_publisher.py
===
python3 publisher.py
[INFO] [1756736307.190868093] [minimal_publisher]: Publishing CPU temp: "52.600000"
[INFO] [1756736307.608446372] [minimal_publisher]: Publishing CPU temp: "52.100000"
[INFO] [1756736308.108523745] [minimal_publisher]: Publishing CPU temp: "52.100000"
Мы видим в логах программы, что происходит публикация данных о температуре в топик.
Не закрывая программу, запустим новое подключение по ssh к роботу и удостоверимся, что данные действительно публикуются.
Первым делом выполним команду ros2 topic list
ros2 topic list
===
/backlight/all
/backlight/array
/bat
-----
/rosout
/scan
/temp
/tf
/tf_static
Мы видим топик temp. Уже хорошо. Посмотрим что в нем
ros2 topic echo /temp
====
data: 52.599998474121094
---
data: 51.599998474121094
Мы видим данные, которые публикует программы, и видим изменение температуры в каждом сообщении. Значит у нас получилось правильно запустить и создать программу Издатель.
tip
Для завершения работы программы, нажмите Ctrl+C.
Разбор программы Издателя
Получение данных температуры
Значение температуры Raspberry Pi "публикует" в "специальный" файл:/sys/class/thermal/thermal_zone0/temp Для того, чтобы получить это значение мы должны прочитать его из этого файла стандартными средствами Python.
data = open('/sys/class/thermal/thermal_zone0/temp', 'r').read()
Значение температуры в данном файле записано в миллиградусах, поэтому, чтобы получить более привычные нам градусы Цельсия, разделим это значение на 1000.
Поэтому, чтобы получить температуру, нам необходимо прочитать файл и перевести единицы измерения. Данные код реализован в функции getCPUTemp()
Разбор программы Издателя
Сначала, рассмотрим "скелет" программы. Это то, что нам понадобится в любой программе на ROS.
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32
class MinimalPublisher(Node):
#ОСНОВНОЙ КОД
def main(args=None):
rclpy.init()
minimal_publisher = MinimalPublisher()
try:
rclpy.spin(minimal_publisher)
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main()
Это относительно "стандартная" обертка для инициализации ROS Ноды.
#Подключение стандартных библиотек, и типов данных Float32
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32
Создание класса нашей ноды, через наследование от rclpy.node
class MinimalPublisher(Node):
Создание ROS ноды и ожидание прерывания выполнения от пользователя, выполняет код:
#Инициализация библиотеки rclpy
rclpy.init()
#создание объекта "рабочей" ноды
minimal_publisher = MinimalPublisher()
#блок ожидания прерывания от пользователя (для контролируемого выхода из программы) и передача управления рабочего цикла ноды.
try:
rclpy.spin(minimal_publisher)
except KeyboardInterrupt:
pass
Соберем этот код в функции main(), и запустим функцию main() при запуске python скрипта.
Такой способ запуска созданных классов, рекомендуется стандартами python, и будет нам необходим, когда мы будем создавать пакеты ROS.
if __name__ == '__main__':
main()
Основная "смысловая" работа с ROS происходит в блоке двух методов. Методе инициализации при создании класса -- метод __init__(self).
def __init__(self):
#инициализация ноды, через инициализацию родительского класса.
super().__init__('minimal_publisher')
#создание объекта publisher, которые будет публиковать данные.
# в парамерах мы указываем тип сообщения, имя топика и размер очереди
self.publisher = self.create_publisher(Float32, 'temp', 10)
#создаем таймер, который с определенной частотой
#вызывает функцию, которая получает данные температуры
#и публикует данные в топик timer_callback.
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
Подробная документация по методам create_publisher и create_timer из официальной API документации.
И функции, которая непосредственно публикует данные timer_callback(self)
def timer_callback(self):
#создание пустого сообщения
msg = Float32()
#чтение данных о температуры CPU
msg.data = self.getCPUTemp()
#публикация данных в топик
self.publisher.publish(msg)
# Вывод данных через систему логов
self.get_logger().info(f'Publishing CPU temp: {msg.data}')
Финальный пример программы, доступен в разделе примеры на GitHubturtlebro2-examples
Официальную инструкцию к пакету rclpy, можно посмотреть на сайте https://docs.ros.org/en/rolling/p/rclpy/.
Примеры использования библиотек https://github.com/ros2/examples/tree/humble