Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Топик, Издатель (Python)

Что вы узнаете в этом разделе:

  • Как создать программу-издатель на Python
  • Структуру базовой ROS2 программы на Python
  • Как читать данные из системных файлов (температура CPU)
  • Как публиковать данные в топик через Python
  • Как использовать таймеры для периодической публикации
  • Как проверить работу программы через консольные утилиты

Программируем на Python

Для быстрого старта мы начнем писать программы, без создания специального ROS-пакета, создание пакетов и "упаковывание" в них наши программ, мы рассмотрим позже.

Мы будем разрабатывать программы на языке Python. Это один из основных языков разработки фреймворка ROS, который позволяет довольно легко создавать собственные программы для ROS.

Ранее мы обсудили некоторую условность, что все данные передаются через сообщения, в специальные топики. Давайте создадим программу Издатель, которая будет получать данные о температуры нашего процессора и передавать их в топик. Пускай это будет топик temp

Перед началом работы создайте на роботе папку ros2-base, в которой будете сохранять поурочно программы, которые будете изучать.

Создадим программу, которая публикует данные о температуре CPU. За основу возьмем пример официальной документации

#! /usr/bin/env python3

import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32

class MinimalPublisher(Node):

    def __init__(self):
        super().__init__('minimal_publisher')
        self.publisher = self.create_publisher(Float32, 'temp', 10)
        timer_period = 0.5  # seconds
        self.timer = self.create_timer(timer_period, self.timer_callback)

    def timer_callback(self):
        msg = Float32()
        msg.data = self.getCPUTemp()
        self.publisher.publish(msg)
        self.get_logger().info(f'Publishing CPU temp: {msg.data}')

    def getCPUTemp(self):
        data = open('/sys/class/thermal/thermal_zone0/temp', 'r').read()
        return round(float(int(data)/1000.0),1)        

def main(args=None):
    
    rclpy.init(args=args)
    minimal_publisher = MinimalPublisher()

    try:
        rclpy.spin(minimal_publisher)
    except KeyboardInterrupt:
        pass   

if __name__ == '__main__':
    main()

Пока не особо вдаваясь в подробности как именно работает программа, скопируем ее код в Visual Studio Code и сохраним его на вашем роботе (именно на нем мы хотим считать температуру) в файл/home/pi/ros2-base/chapter2/temp_topic_publisher.py

Запустим .py файл через ssh

ssh pi@turtlebro01.local

cd ros2-base/chapter2
python3 temp_topic_publisher.py
===
python3 publisher.py
[INFO] [1756736307.190868093] [minimal_publisher]: Publishing CPU temp: "52.600000"
[INFO] [1756736307.608446372] [minimal_publisher]: Publishing CPU temp: "52.100000"
[INFO] [1756736308.108523745] [minimal_publisher]: Publishing CPU temp: "52.100000"

Мы видим в логах программы, что происходит публикация данных о температуре в топик.

Не закрывая программу, запустим новое подключение по ssh к роботу и удостоверимся, что данные действительно публикуются.

Первым делом выполним команду ros2 topic list

ros2 topic list
===

/backlight/all
/backlight/array
/bat
-----
/rosout
/scan
/temp
/tf
/tf_static

Мы видим топик temp. Уже хорошо. Посмотрим что в нем

ros2 topic echo /temp
====
data: 52.599998474121094
---
data: 51.599998474121094

Мы видим данные, которые публикует программы, и видим изменение температуры в каждом сообщении. Значит у нас получилось правильно запустить и создать программу Издатель.

tip

Для завершения работы программы, нажмите Ctrl+C.

Разбор программы Издателя

Получение данных температуры

Значение температуры Raspberry Pi "публикует" в "специальный" файл:/sys/class/thermal/thermal_zone0/temp Для того, чтобы получить это значение мы должны прочитать его из этого файла стандартными средствами Python.

data = open('/sys/class/thermal/thermal_zone0/temp', 'r').read()

Значение температуры в данном файле записано в миллиградусах, поэтому, чтобы получить более привычные нам градусы Цельсия, разделим это значение на 1000.

Поэтому, чтобы получить температуру, нам необходимо прочитать файл и перевести единицы измерения. Данные код реализован в функции getCPUTemp()

Разбор программы Издателя

Сначала, рассмотрим "скелет" программы. Это то, что нам понадобится в любой программе на ROS.

import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32

class MinimalPublisher(Node):
#ОСНОВНОЙ КОД
    
def main(args=None):
    rclpy.init()
    minimal_publisher = MinimalPublisher()

    try:
        rclpy.spin(minimal_publisher)
    except KeyboardInterrupt:
        pass   

if __name__ == '__main__':
    main() 

Это относительно "стандартная" обертка для инициализации ROS Ноды.

#Подключение стандартных библиотек, и типов данных Float32 
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32

Создание класса нашей ноды, через наследование от rclpy.node

class MinimalPublisher(Node):

Создание ROS ноды и ожидание прерывания выполнения от пользователя, выполняет код:

#Инициализация библиотеки rclpy
rclpy.init()

#создание объекта "рабочей" ноды
minimal_publisher = MinimalPublisher()

#блок ожидания прерывания от пользователя (для контролируемого выхода из программы) и передача управления рабочего цикла ноды.
try:
    rclpy.spin(minimal_publisher)
except KeyboardInterrupt:
    pass   

Соберем этот код в функции main(), и запустим функцию main() при запуске python скрипта.

Такой способ запуска созданных классов, рекомендуется стандартами python, и будет нам необходим, когда мы будем создавать пакеты ROS.

if __name__ == '__main__':
    main() 

Основная "смысловая" работа с ROS происходит в блоке двух методов. Методе инициализации при создании класса -- метод __init__(self).

    def __init__(self):
        #инициализация ноды, через инициализацию родительского класса.
        super().__init__('minimal_publisher')
        #создание объекта publisher, которые будет публиковать данные.
        # в парамерах мы указываем тип сообщения, имя топика и размер очереди 
        self.publisher = self.create_publisher(Float32, 'temp', 10)

        #создаем таймер, который с определенной частотой 
        #вызывает функцию, которая получает данные температуры
        #и публикует данные в топик timer_callback.
        timer_period = 0.5  # seconds
        self.timer = self.create_timer(timer_period, self.timer_callback)

Подробная документация по методам create_publisher и create_timer из официальной API документации.

И функции, которая непосредственно публикует данные timer_callback(self)

    def timer_callback(self):
        #создание пустого сообщения
        msg = Float32()
        #чтение данных о температуры CPU
        msg.data = self.getCPUTemp()
        #публикация данных в топик
        self.publisher.publish(msg)
        # Вывод данных через систему логов 
        self.get_logger().info(f'Publishing CPU temp: {msg.data}')

Финальный пример программы, доступен в разделе примеры на GitHubturtlebro2-examples

Официальную инструкцию к пакету rclpy, можно посмотреть на сайте https://docs.ros.org/en/rolling/p/rclpy/.

Примеры использования библиотек https://github.com/ros2/examples/tree/humble