Zum Inhalt springen

2nd Test - ROS2 Publisher/Subscriber

Dieser Inhalt ist noch nicht in deiner Sprache verfügbar.

ROS2 Publisher/Subscriber Documentation

🎯 Objectifs  •  📝 Résumé  •  🧰 Matériels  •  🏗️ Structure  •  ⚙️ Installation  •  💻 Implémentation  •  🎬 Espace Démo  •  🐞 Journal des erreurs  •  🔭 Perspectives  •  📚 Ressources


🧩 ComposantStatut
Publisher NodePublisher - Implémenté
Subscriber NodeSubscriber - Implémenté
GUI SubscriberGUI - Implémenté
Launch FileLaunch - Implémenté

:construction: Test 2 - Tekbot Robotics Challenges


L’objectif de ce test est de démontrer la capacité à concevoir un système robotique simple mais fonctionnel avec ROS2.
Livrables attendus :

  • Un package ROS2 compilable sans erreur.
  • Un nœud Publisher qui génère et publie des données de capteurs simulés à une fréquence de 0.5 seconde.
  • Un nœud Subscriber qui reçoit ces données et les valide selon des plages spécifiques.
  • Un fichier de lancement pour démarrer l’ensemble du système avec une seule commande.

Ce projet met en œuvre un système de communication ROS2 Publisher/Subscriber :

  • Le nœud publisher_node simule des données de température, d’humidité et de pression, et les publie sur le topic /sensor_data.
  • Le nœud subscriber_node écoute ce même topic, analyse les données reçues et affiche un message de statut dans le terminal, indiquant si les valeurs sont conformes ou non aux seuils définis.

Tous les fichiers code et résultats obtenus sont téléchargeables via ce lien : Test 2


  • Système d’exploitation : Ubuntu 22.04 (ou via Distrobox)
  • Framework : ROS 2 Humble Hawksbill
  • Langage de programmation : Python 3
  • Outil de build : colcon
  • Éditeur de code : Visual Studio Code

ROS2_WS/
├── src/
│ └── sensor_data_evaluation/
│ ├── launch/
│ │ └── sensor_launch.py
│ ├── resource/
│ └── sensor_data_evaluation/
│ ├── __init__.py
│ ├── gui_subscriber.py
│ ├── sensor_publisher.py
│ └── sensor_subscriber.py
├── test/
├── package.xml
├── setup.cfg
├── setup.py
└── .env

🐧 1. Installer ROS2 Humble sur Ubuntu 22.04

Section titled “🐧 1. Installer ROS2 Humble sur Ubuntu 22.04”

Installation de ROS2 — Guide officiel

Terminal window
sudo apt update && sudo apt upgrade
sudo locale-gen en_US en_US.UTF-8
sudo apt install software-properties-common
sudo add-apt-repository universe
sudo apt update && sudo apt install curl -y
sudo curl -sSL https://raw.githubusercontent.com/ros/rosdistro/master/ros.key -o /usr/share/keyrings/ros-archive-keyring.gpg
echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/ros-archive-keyring.gpg] http://packages.ros.org/ros2/ubuntu $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/ros2.list > /dev/null
sudo apt update
sudo apt install ros-humble-desktop-full python3-colcon-common-extensions

Avant de lancer ROS2, il faut exécuter dans le terminal :

Terminal window
source /opt/ros/humble/setup.bash

Maintenant lancer la commande:

Terminal window
ros2
![](/img/ros2.png)

Figure 1 : Installation de ROS2 terminée avec succès


📦 2. Alternative : Installer Distrobox + ROS2 (toutes distributions)

Section titled “📦 2. Alternative : Installer Distrobox + ROS2 (toutes distributions)”

Distrobox Documentation

Terminal window
# Sur la plupart des distributions Linux
sudo apt install distrobox podman
# ou
sudo dnf install distrobox podman
# ou
sudo pacman -S distrobox podman
Terminal window
distrobox-create --name ros2box --image ubuntu:22.04
distrobox-enter ros2box

Répétez les commandes d’installation ROS2 ci-dessus dans le shell du conteneur.


Créer un workspace — ROS2 Tutorials

Terminal window
mkdir -p ~/Ros_ws/src
cd ~/Ros_ws
colcon build
![](/img/colcon-build.png)

Figure 2 : Compilation de l'espace de travail


Terminal window
cd ~/Ros_ws/src
git clone <URL_DU_DEPOT>
cd ~/Ros_ws
colcon build --packages-select sensor_data_evaluation
source ~/Ros_ws/install/setup.bash

Lancer un fichier launch — ROS2 Tutorials

Terminal window
ros2 launch sensor_data_evaluation sensor_launch.py

Vous verrez alors les messages Publisher et Subscriber dans le terminal.

![](/img/launch.png)

Figure 3 : Affichage du terminal

![](/img/gui-interface.png)

Figure 4 : Affichage de l'interface graphique


Le nœud Publisher simule trois capteurs (température, humidité, pression) pour la température (15-35°C), l’humidité (30-70%) et la pression (950-1050 hPa), puis publie ces données toutes les 0.5 secondes sur des topics /sensor_data séparés. Il publie aussi un message groupé pour l’interface graphique. Il permet de tester la chaîne complète sans matériel réel.

Créer un Publisher — ROS2 Python Tutorial

▶️ Voir le code complet
sensor_data_evaluation/sensor_data_evaluation/sensor_publisher.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32, String
import random
class SensorPublisher(Node):
def __init__(self):
super().__init__('sensor_publisher')
# Publishers pour chaque capteur
self.temp_pub = self.create_publisher(Float32, 'sensor_data/temperature', 10)
self.hum_pub = self.create_publisher(Float32, 'sensor_data/humidity', 10)
self.pres_pub = self.create_publisher(Float32, 'sensor_data/pressure', 10)
# Publisher pour le GUI (message groupé)
self.gui_pub = self.create_publisher(String, 'sensor_topic', 10)
# Timer pour publier régulièrement
self.timer = self.create_timer(0.5, self.publish_data)
def publish_data(self):
temp = random.uniform(10.0, 40.0)
hum = random.uniform(20.0, 80.0)
pres = random.uniform(900.0, 1100.0)
# Publication séparée
self.temp_pub.publish(Float32(data=temp))
self.hum_pub.publish(Float32(data=hum))
self.pres_pub.publish(Float32(data=pres))
# Publication groupée pour GUI
gui_msg = f"Temp:{temp:.2f},Hum:{hum:.2f},Pres:{pres:.2f}"
self.gui_pub.publish(String(data=gui_msg))
self.get_logger().info(f'Published - Temp: {temp:.2f} | Hum: {hum:.2f} | Pres: {pres:.2f}')
def main(args=None):
rclpy.init(args=args)
node = SensorPublisher()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

Ce nœud ROS2 reçoit les valeurs des capteurs et vérifie si elles sont dans les plages acceptables. Il affiche un message d’erreur si une valeur sort de la plage : ‘hors plage’ et un ‘ok’ quand ça respecte la plage. Cela permet de surveiller la validité des données.

Créer un Subscriber — ROS2 Python Tutorial

▶️ Voir le code complet
sensor_data_evaluation/sensor_data_evaluation/sensor_subscriber.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32
class SensorSubscriber(Node):
def __init__(self):
super().__init__('sensor_subscriber')
self.create_subscription(Float32, 'sensor_data/temperature', self.temp_callback, 10)
self.create_subscription(Float32, 'sensor_data/humidity', self.hum_callback, 10)
self.create_subscription(Float32, 'sensor_data/pressure', self.pres_callback, 10)
def temp_callback(self, msg):
value = msg.data
if 15 <= value <= 35:
self.get_logger().info(f'Temp OK : {value:.2f} °C')
else:
self.get_logger().error(f'Temp hors plage : {value:.2f} °C')
def hum_callback(self, msg):
value = msg.data
if 30 <= value <= 70:
self.get_logger().info(f'Humidité OK : {value:.2f} %')
else:
self.get_logger().error(f'Humidité hors plage : {value:.2f} %')
def pres_callback(self, msg):
value = msg.data
if 950 <= value <= 1050:
self.get_logger().info(f'Pression OK : {value:.2f} hPa')
else:
self.get_logger().error(f'Pression hors plage : {value:.2f} hPa')
def main(args=None):
rclpy.init(args=args)
node = SensorSubscriber()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

Ce nœud ROS2 reçoit les données des capteurs (température, humidité, pression) et les affiche en temps réel sous forme de courbes et de statistiques grâce à Matplotlib. Il permet de visualiser l’évolution des mesures, de vérifier leur conformité aux plages attendues, et d’obtenir un bilan statistique (pourcentage de valeurs correctes/incorrectes).

Matplotlib Animation — Documentation

▶️ Voir le code complet
sensor_data_evaluation/sensor_data_evaluation/gui_subscriber.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32
import threading
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
class SensorSubscriber(Node):
def __init__(self):
super().__init__('gui_subscriber')
self.temps = []
self.temp_values = []
self.hum_values = []
self.pres_values = []
self.temp_range = (15, 35)
self.hum_range = (30, 70)
self.pres_range = (950, 1050)
self.counter = 0
self.lock = threading.Lock()
self.create_subscription(Float32, 'sensor_data/temperature', self.temp_callback, 10)
self.create_subscription(Float32, 'sensor_data/humidity', self.hum_callback, 10)
self.create_subscription(Float32, 'sensor_data/pressure', self.pres_callback, 10)
self.last_temp = None
self.last_hum = None
self.last_pres = None
def temp_callback(self, msg):
with self.lock:
self.last_temp = msg.data
self.update_data_if_ready()
def hum_callback(self, msg):
with self.lock:
self.last_hum = msg.data
self.update_data_if_ready()
def pres_callback(self, msg):
with self.lock:
self.last_pres = msg.data
self.update_data_if_ready()
def update_data_if_ready(self):
if self.last_temp is not None and self.last_hum is not None and self.last_pres is not None:
self.counter += 1
self.temps.append(self.counter)
self.temp_values.append(self.last_temp)
self.hum_values.append(self.last_hum)
self.pres_values.append(self.last_pres)
self.last_temp = self.last_hum = self.last_pres = None
def get_data(self):
with self.lock:
return (list(self.temps), list(self.temp_values), list(self.hum_values), list(self.pres_values))
def ros_spin(node):
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
node = SensorSubscriber()
ros_thread = threading.Thread(target=ros_spin, args=(node,), daemon=True)
ros_thread.start()
# Crée 4 sous-graphes (3 pour courbes, 1 pour tableau)
fig, axs = plt.subplots(4, 1, figsize=(9, 12))
ax1, ax2, ax3, table_ax = axs
plt.subplots_adjust(hspace=0.6, bottom=0.2)
def draw_band(ax, low, high, color='green', alpha=0.2):
ax.axhspan(low, high, facecolor=color, alpha=alpha)
def compute_stats(values, value_range):
total = len(values)
if total == 0:
return (0, 0)
good = sum(value_range[0] <= v <= value_range[1] for v in values)
bad = total - good
return (good / total * 100, bad / total * 100)
def update(frame):
temps, temp_vals, hum_vals, pres_vals = node.get_data()
# Clear all 4 axes
for ax in axs:
ax.clear()
if temps:
# Température
ymin, ymax = node.temp_range[0] - 5, node.temp_range[1] + 5
ax1.set_ylim(ymin, ymax)
draw_band(ax1, *node.temp_range, color='green')
ax1.plot(temps, temp_vals, label='Température', color='blue')
ax1.set_ylabel('°C')
ax1.set_title('Température')
ax1.legend()
color_temp = 'green' if node.temp_range[0] <= temp_vals[-1] <= node.temp_range[1] else 'red'
ax1.text(0.5, -0.35, f"Valeur actuelle : {temp_vals[-1]:.2f} °C", color=color_temp,
transform=ax1.transAxes, ha='center', fontsize=10)
# Humidité
ymin, ymax = node.hum_range[0] - 10, node.hum_range[1] + 10
ax2.set_ylim(ymin, ymax)
draw_band(ax2, *node.hum_range, color='green')
ax2.plot(temps, hum_vals, label='Humidité', color='orange')
ax2.set_ylabel('%')
ax2.set_title('Humidité')
ax2.legend()
color_hum = 'green' if node.hum_range[0] <= hum_vals[-1] <= node.hum_range[1] else 'red'
ax2.text(0.5, -0.35, f"Valeur actuelle : {hum_vals[-1]:.2f} %", color=color_hum,
transform=ax2.transAxes, ha='center', fontsize=10)
# Pression
ymin, ymax = node.pres_range[0] - 20, node.pres_range[1] + 20
ax3.set_ylim(ymin, ymax)
draw_band(ax3, *node.pres_range, color='green')
ax3.plot(temps, pres_vals, label='Pression', color='purple')
ax3.set_ylabel('hPa')
ax3.set_title('Pression')
ax3.legend()
color_pres = 'green' if node.pres_range[0] <= pres_vals[-1] <= node.pres_range[1] else 'red'
ax3.text(0.5, -0.35, f"Valeur actuelle : {pres_vals[-1]:.2f} hPa", color=color_pres,
transform=ax3.transAxes, ha='center', fontsize=10)
# Tableau de statistiques
temp_good, temp_bad = compute_stats(temp_vals, node.temp_range)
hum_good, hum_bad = compute_stats(hum_vals, node.hum_range)
pres_good, pres_bad = compute_stats(pres_vals, node.pres_range)
table_data = [
["Température", len(temp_vals), f"{temp_good:.1f}%", f"{temp_bad:.1f}%"],
["Humidité", len(hum_vals), f"{hum_good:.1f}%", f"{hum_bad:.1f}%"],
["Pression", len(pres_vals), f"{pres_good:.1f}%", f"{pres_bad:.1f}%"]
]
col_labels = ["Mesure", "Total", "Bonnes", "Mauvaises"]
table_ax.axis("off")
table = table_ax.table(cellText=table_data, colLabels=col_labels, loc='center', cellLoc='center')
for i in range(1, 4):
table[(i, 2)].set_facecolor("#d0f0c0") # vert clair
table[(i, 3)].set_facecolor("#f8d0d0") # rouge clair
table.scale(1, 1.5)
table.set_fontsize(10)
table_ax.set_title("Bilan Statistique des Mesures", fontsize=12, fontweight="bold", pad=10)
else:
for ax in axs:
ax.text(0.5, 0.5, "Aucune donnée reçue", ha='center', va='center', transform=ax.transAxes)
ani = FuncAnimation(fig, update, interval=1000)
plt.show()
ros_thread.join()
if __name__ == '__main__':
main()

Ce fichier permet de lancer automatiquement les trois nœuds (publisher, subscriber, interface graphique) avec une seule commande. Il simplifie le déploiement de l’application.

Créer un fichier launch — ROS2 Launch Tutorial

▶️ Voir le code complet
sensor_data_evaluation/launch/sensor_launch.py
from launch import LaunchDescription
from launch_ros.actions import Node
def generate_launch_description():
return LaunchDescription([
Node(
package='sensor_data_evaluation',
executable='sensor_publisher',
name='sensor_publisher'
),
Node(
package='sensor_data_evaluation',
executable='sensor_subscriber',
name='sensor_subscriber'
),
Node(
package='sensor_data_evaluation',
executable='gui_subscriber',
name='gui_subscriber'
)
])

📹 Vidéo de Démonstration du Système ROS2

Section titled “📹 Vidéo de Démonstration du Système ROS2”
🖥️ Terminal📈 Interface Graphique✅ Validation
Messages Publisher avec valeurs des capteursCourbes temps réel (Température, Humidité, Pression)Vérification des seuils
Messages Subscriber avec statut OK/Hors plageBandes de validation coloréesMessages d’alerte hors plage
Logs ROS2 en temps réelTableau statistiques (% bonnes/mauvaises valeurs)Indicateurs visuels colorés

La communication multimachine via DDS (Data Distribution Service) permet à plusieurs machines de partager des données en temps réel de manière fiable et efficace, sans dépendre d’un serveur central. DDS facilite l’échange d’informations entre applications distribuées grâce à un modèle publish/subscribe.



DateErreur rencontréeCauseSolution apportée
15/06/2025colcon build échoue, le package n’est pas trouvé.Oubli de déclarer les dépendances (rclpy) dans le package.xml.Ajout de <exec_depend>rclpy</exec_depend> dans package.xml.
15/06/2025Le subscriber ne reçoit aucun message.Nom du topic différent entre publisher et subscriber.Correction du nom du topic pour qu’il corresponde exactement à /sensor_data.
16/06/2025Les nœuds s’arrêtent juste après le lancement.Oubli de rclpy.spin() dans le main.Ajout de rclpy.spin(node_name) à la fin de chaque nœud.

  • Utiliser des messages personnalisés (SensorData.msg) pour structurer les données.
    Créer un message personnalisé — ROS2
  • Ajouter un nœud de logging pour enregistrer les données dans un fichier .csv.
  • Visualiser les données en temps réel avec rqt_plot ou équivalent.
    Visualiser avec rqt_plot — ROS2
  • Implémenter la reconfiguration dynamique des seuils de validation.


  • Chatigre Larissa
  • Ife Leonce Comlan
  • Agbodja Marzoukath

Test : Test 2 - Tekbot Robotics Challenges
Langage : Python 3 / ROS2
Framework : ROS2 Humble
Date : Juin 2025


© 2025 Test 2 - Tekbot Robotics Challenges. Projet développé dans le cadre de l'évaluation ROS2.