Skip to main content

2nd Test

ROS2 Publisher/Subscriber Documentation

🎯 Objectifs  •  📝 Résumé  •  🧰 Matériels  •  🏗️ Structure  •  ⚙️ Installation  •  💻 Implémentation  •  🎬 Espace Démo  •  🐞 Journal des erreurs  •  🔭 Perspectives  •  📚 Ressources


🧩 ComposantStatut
Publisher NodePublisher - Implémenté
Subscriber NodeSubscriber - Implémenté
GUI SubscriberGUI - Implémenté
Launch FileLaunch - Implémenté

🚧 Test 2 - Tekbot Robotics Challenges


🎯 Objectifs

L'objectif de ce test est de démontrer la capacité à concevoir un système robotique simple mais fonctionnel avec ROS2.
Livrables attendus :

  • Un package ROS2 compilable sans erreur.
  • Un nœud Publisher qui génère et publie des données de capteurs simulés à une fréquence de 0.5 seconde.
  • Un nœud Subscriber qui reçoit ces données et les valide selon des plages spécifiques.
  • Un fichier de lancement pour démarrer l'ensemble du système avec une seule commande.

📝 Résumé

Ce projet met en œuvre un système de communication ROS2 Publisher/Subscriber :

  • Le nœud publisher_node simule des données de température, d'humidité et de pression, et les publie sur le topic /sensor_data.
  • Le nœud subscriber_node écoute ce même topic, analyse les données reçues et affiche un message de statut dans le terminal, indiquant si les valeurs sont conformes ou non aux seuils définis.

Fichiers téléchargeables

Tous les fichiers code et résultats obtenus sont téléchargeables via ce lien : Test 2


🧰 Matériels

  • Système d'exploitation : Ubuntu 22.04 (ou via Distrobox)
  • Framework : ROS 2 Humble Hawksbill
  • Langage de programmation : Python 3
  • Outil de build : colcon
  • Éditeur de code : Visual Studio Code

🏗️ Structure du projet

ROS2_WS/
├── src/
│ └── sensor_data_evaluation/
│ ├── launch/
│ │ └── sensor_launch.py
│ ├── resource/
│ └── sensor_data_evaluation/
│ ├── __init__.py
│ ├── gui_subscriber.py
│ ├── sensor_publisher.py
│ └── sensor_subscriber.py
├── test/
├── package.xml
├── setup.cfg
├── setup.py
└── .env

⚙️ Installation complète

🐧 1. Installer ROS2 Humble sur Ubuntu 22.04

Installation de ROS2 — Guide officiel

sudo apt update && sudo apt upgrade
sudo locale-gen en_US en_US.UTF-8
sudo apt install software-properties-common
sudo add-apt-repository universe
sudo apt update && sudo apt install curl -y
sudo curl -sSL https://raw.githubusercontent.com/ros/rosdistro/master/ros.key -o /usr/share/keyrings/ros-archive-keyring.gpg
echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/ros-archive-keyring.gpg] http://packages.ros.org/ros2/ubuntu $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/ros2.list > /dev/null
sudo apt update
sudo apt install ros-humble-desktop-full python3-colcon-common-extensions

Avant de lancer ROS2, il faut exécuter dans le terminal :

source /opt/ros/humble/setup.bash

Maintenant lancer la commande:

ros2

Figure 1 : Installation de ROS2 terminée avec succès


📦 2. Alternative : Installer Distrobox + ROS2 (toutes distributions)

Distrobox Documentation

a. Installer Distrobox

# Sur la plupart des distributions Linux
sudo apt install distrobox podman
# ou
sudo dnf install distrobox podman
# ou
sudo pacman -S distrobox podman

b. Créer un conteneur Ubuntu 22.04

distrobox-create --name ros2box --image ubuntu:22.04
distrobox-enter ros2box

c. Installer ROS2 dans le conteneur

Répétez les commandes d'installation ROS2 ci-dessus dans le shell du conteneur.


🗂️ 3. Créer un workspace ROS2

Créer un workspace — ROS2 Tutorials

mkdir -p ~/Ros_ws/src
cd ~/Ros_ws
colcon build

Figure 2 : Compilation de l'espace de travail


🔽 4. Cloner et installer le projet

cd ~/Ros_ws/src
git clone <URL_DU_DEPOT>
cd ~/Ros_ws
colcon build --packages-select sensor_data_evaluation
source ~/Ros_ws/install/setup.bash

🚦 5. Lancer le système

Lancer un fichier launch — ROS2 Tutorials

ros2 launch sensor_data_evaluation sensor_launch.py

Vous verrez alors les messages Publisher et Subscriber dans le terminal.

Figure 3 : Affichage du terminal

Figure 4 : Affichage de l'interface graphique


💻 Implémentation

📤 Publisher Node

Le nœud Publisher simule trois capteurs (température, humidité, pression) pour la température (15-35°C), l'humidité (30-70%) et la pression (950-1050 hPa), puis publie ces données toutes les 0.5 secondes sur des topics /sensor_data séparés. Il publie aussi un message groupé pour l'interface graphique. Il permet de tester la chaîne complète sans matériel réel.

Créer un Publisher — ROS2 Python Tutorial

▶️ Voir le code complet
# filepath: sensor_data_evaluation/sensor_data_evaluation/sensor_publisher.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32, String
import random

class SensorPublisher(Node):
def __init__(self):
super().__init__('sensor_publisher')
# Publishers pour chaque capteur
self.temp_pub = self.create_publisher(Float32, 'sensor_data/temperature', 10)
self.hum_pub = self.create_publisher(Float32, 'sensor_data/humidity', 10)
self.pres_pub = self.create_publisher(Float32, 'sensor_data/pressure', 10)
# Publisher pour le GUI (message groupé)
self.gui_pub = self.create_publisher(String, 'sensor_topic', 10)
# Timer pour publier régulièrement
self.timer = self.create_timer(0.5, self.publish_data)

def publish_data(self):
temp = random.uniform(10.0, 40.0)
hum = random.uniform(20.0, 80.0)
pres = random.uniform(900.0, 1100.0)
# Publication séparée
self.temp_pub.publish(Float32(data=temp))
self.hum_pub.publish(Float32(data=hum))
self.pres_pub.publish(Float32(data=pres))
# Publication groupée pour GUI
gui_msg = f"Temp:{temp:.2f},Hum:{hum:.2f},Pres:{pres:.2f}"
self.gui_pub.publish(String(data=gui_msg))
self.get_logger().info(f'Published - Temp: {temp:.2f} | Hum: {hum:.2f} | Pres: {pres:.2f}')

def main(args=None):
rclpy.init(args=args)
node = SensorPublisher()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

📥 Subscriber Node

Ce nœud ROS2 reçoit les valeurs des capteurs et vérifie si elles sont dans les plages acceptables. Il affiche un message d'erreur si une valeur sort de la plage : 'hors plage' et un 'ok' quand ça respecte la plage. Cela permet de surveiller la validité des données.

Créer un Subscriber — ROS2 Python Tutorial

▶️ Voir le code complet
# filepath: sensor_data_evaluation/sensor_data_evaluation/sensor_subscriber.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32

class SensorSubscriber(Node):
def __init__(self):
super().__init__('sensor_subscriber')
self.create_subscription(Float32, 'sensor_data/temperature', self.temp_callback, 10)
self.create_subscription(Float32, 'sensor_data/humidity', self.hum_callback, 10)
self.create_subscription(Float32, 'sensor_data/pressure', self.pres_callback, 10)

def temp_callback(self, msg):
value = msg.data
if 15 <= value <= 35:
self.get_logger().info(f'Temp OK : {value:.2f} °C')
else:
self.get_logger().error(f'Temp hors plage : {value:.2f} °C')

def hum_callback(self, msg):
value = msg.data
if 30 <= value <= 70:
self.get_logger().info(f'Humidité OK : {value:.2f} %')
else:
self.get_logger().error(f'Humidité hors plage : {value:.2f} %')

def pres_callback(self, msg):
value = msg.data
if 950 <= value <= 1050:
self.get_logger().info(f'Pression OK : {value:.2f} hPa')
else:
self.get_logger().error(f'Pression hors plage : {value:.2f} hPa')

def main(args=None):
rclpy.init(args=args)
node = SensorSubscriber()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

🖥️ GUI Subscriber Node

Ce nœud ROS2 reçoit les données des capteurs (température, humidité, pression) et les affiche en temps réel sous forme de courbes et de statistiques grâce à Matplotlib. Il permet de visualiser l'évolution des mesures, de vérifier leur conformité aux plages attendues, et d'obtenir un bilan statistique (pourcentage de valeurs correctes/incorrectes).

Matplotlib Animation — Documentation

▶️ Voir le code complet
# filepath: sensor_data_evaluation/sensor_data_evaluation/gui_subscriber.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32
import threading
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation

class SensorSubscriber(Node):
def __init__(self):
super().__init__('gui_subscriber')

self.temps = []
self.temp_values = []
self.hum_values = []
self.pres_values = []

self.temp_range = (15, 35)
self.hum_range = (30, 70)
self.pres_range = (950, 1050)

self.counter = 0
self.lock = threading.Lock()

self.create_subscription(Float32, 'sensor_data/temperature', self.temp_callback, 10)
self.create_subscription(Float32, 'sensor_data/humidity', self.hum_callback, 10)
self.create_subscription(Float32, 'sensor_data/pressure', self.pres_callback, 10)

self.last_temp = None
self.last_hum = None
self.last_pres = None

def temp_callback(self, msg):
with self.lock:
self.last_temp = msg.data
self.update_data_if_ready()

def hum_callback(self, msg):
with self.lock:
self.last_hum = msg.data
self.update_data_if_ready()

def pres_callback(self, msg):
with self.lock:
self.last_pres = msg.data
self.update_data_if_ready()

def update_data_if_ready(self):
if self.last_temp is not None and self.last_hum is not None and self.last_pres is not None:
self.counter += 1
self.temps.append(self.counter)
self.temp_values.append(self.last_temp)
self.hum_values.append(self.last_hum)
self.pres_values.append(self.last_pres)
self.last_temp = self.last_hum = self.last_pres = None

def get_data(self):
with self.lock:
return (list(self.temps), list(self.temp_values), list(self.hum_values), list(self.pres_values))


def ros_spin(node):
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()


def main(args=None):
rclpy.init(args=args)
node = SensorSubscriber()
ros_thread = threading.Thread(target=ros_spin, args=(node,), daemon=True)
ros_thread.start()

# Crée 4 sous-graphes (3 pour courbes, 1 pour tableau)
fig, axs = plt.subplots(4, 1, figsize=(9, 12))
ax1, ax2, ax3, table_ax = axs
plt.subplots_adjust(hspace=0.6, bottom=0.2)

def draw_band(ax, low, high, color='green', alpha=0.2):
ax.axhspan(low, high, facecolor=color, alpha=alpha)

def compute_stats(values, value_range):
total = len(values)
if total == 0:
return (0, 0)
good = sum(value_range[0] <= v <= value_range[1] for v in values)
bad = total - good
return (good / total * 100, bad / total * 100)

def update(frame):
temps, temp_vals, hum_vals, pres_vals = node.get_data()

# Clear all 4 axes
for ax in axs:
ax.clear()

if temps:
# Température
ymin, ymax = node.temp_range[0] - 5, node.temp_range[1] + 5
ax1.set_ylim(ymin, ymax)
draw_band(ax1, *node.temp_range, color='green')
ax1.plot(temps, temp_vals, label='Température', color='blue')
ax1.set_ylabel('°C')
ax1.set_title('Température')
ax1.legend()
color_temp = 'green' if node.temp_range[0] <= temp_vals[-1] <= node.temp_range[1] else 'red'
ax1.text(0.5, -0.35, f"Valeur actuelle : {temp_vals[-1]:.2f} °C", color=color_temp,
transform=ax1.transAxes, ha='center', fontsize=10)

# Humidité
ymin, ymax = node.hum_range[0] - 10, node.hum_range[1] + 10
ax2.set_ylim(ymin, ymax)
draw_band(ax2, *node.hum_range, color='green')
ax2.plot(temps, hum_vals, label='Humidité', color='orange')
ax2.set_ylabel('%')
ax2.set_title('Humidité')
ax2.legend()
color_hum = 'green' if node.hum_range[0] <= hum_vals[-1] <= node.hum_range[1] else 'red'
ax2.text(0.5, -0.35, f"Valeur actuelle : {hum_vals[-1]:.2f} %", color=color_hum,
transform=ax2.transAxes, ha='center', fontsize=10)

# Pression
ymin, ymax = node.pres_range[0] - 20, node.pres_range[1] + 20
ax3.set_ylim(ymin, ymax)
draw_band(ax3, *node.pres_range, color='green')
ax3.plot(temps, pres_vals, label='Pression', color='purple')
ax3.set_ylabel('hPa')
ax3.set_title('Pression')
ax3.legend()
color_pres = 'green' if node.pres_range[0] <= pres_vals[-1] <= node.pres_range[1] else 'red'
ax3.text(0.5, -0.35, f"Valeur actuelle : {pres_vals[-1]:.2f} hPa", color=color_pres,
transform=ax3.transAxes, ha='center', fontsize=10)

# Tableau de statistiques
temp_good, temp_bad = compute_stats(temp_vals, node.temp_range)
hum_good, hum_bad = compute_stats(hum_vals, node.hum_range)
pres_good, pres_bad = compute_stats(pres_vals, node.pres_range)

table_data = [
["Température", len(temp_vals), f"{temp_good:.1f}%", f"{temp_bad:.1f}%"],
["Humidité", len(hum_vals), f"{hum_good:.1f}%", f"{hum_bad:.1f}%"],
["Pression", len(pres_vals), f"{pres_good:.1f}%", f"{pres_bad:.1f}%"]
]
col_labels = ["Mesure", "Total", "Bonnes", "Mauvaises"]

table_ax.axis("off")
table = table_ax.table(cellText=table_data, colLabels=col_labels, loc='center', cellLoc='center')

for i in range(1, 4):
table[(i, 2)].set_facecolor("#d0f0c0") # vert clair
table[(i, 3)].set_facecolor("#f8d0d0") # rouge clair

table.scale(1, 1.5)
table.set_fontsize(10)
table_ax.set_title("Bilan Statistique des Mesures", fontsize=12, fontweight="bold", pad=10)
else:
for ax in axs:
ax.text(0.5, 0.5, "Aucune donnée reçue", ha='center', va='center', transform=ax.transAxes)

ani = FuncAnimation(fig, update, interval=1000)
plt.show()
ros_thread.join()


if __name__ == '__main__':
main()

🚦 Launch File

Ce fichier permet de lancer automatiquement les trois nœuds (publisher, subscriber, interface graphique) avec une seule commande. Il simplifie le déploiement de l'application.

Créer un fichier launch — ROS2 Launch Tutorial

▶️ Voir le code complet
# filepath: sensor_data_evaluation/launch/sensor_launch.py
from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
return LaunchDescription([
Node(
package='sensor_data_evaluation',
executable='sensor_publisher',
name='sensor_publisher'
),
Node(
package='sensor_data_evaluation',
executable='sensor_subscriber',
name='sensor_subscriber'
),
Node(
package='sensor_data_evaluation',
executable='gui_subscriber',
name='gui_subscriber'
)
])

🎬 Espace Démo

📹 Vidéo de Démonstration du Système ROS2

📊 Éléments visibles dans la démo :

🖥️ Terminal📈 Interface Graphique✅ Validation
Messages Publisher avec valeurs des capteursCourbes temps réel (Température, Humidité, Pression)Vérification des seuils
Messages Subscriber avec statut OK/Hors plageBandes de validation coloréesMessages d'alerte hors plage
Logs ROS2 en temps réelTableau statistiques (% bonnes/mauvaises valeurs)Indicateurs visuels colorés

🌐 Communication multimachine DDS

La communication multimachine via DDS (Data Distribution Service) permet à plusieurs machines de partager des données en temps réel de manière fiable et efficace, sans dépendre d’un serveur central. DDS facilite l’échange d’informations entre applications distribuées grâce à un modèle publish/subscribe.

Vidéos de démonstration

PC1

PC2



🐞 Journal des erreurs

DateErreur rencontréeCauseSolution apportée
15/06/2025colcon build échoue, le package n'est pas trouvé.Oubli de déclarer les dépendances (rclpy) dans le package.xml.Ajout de <exec_depend>rclpy</exec_depend> dans package.xml.
15/06/2025Le subscriber ne reçoit aucun message.Nom du topic différent entre publisher et subscriber.Correction du nom du topic pour qu'il corresponde exactement à /sensor_data.
16/06/2025Les nœuds s'arrêtent juste après le lancement.Oubli de rclpy.spin() dans le main.Ajout de rclpy.spin(node_name) à la fin de chaque nœud.

🔭 Perspectives

  • Utiliser des messages personnalisés (SensorData.msg) pour structurer les données.
    Créer un message personnalisé — ROS2
  • Ajouter un nœud de logging pour enregistrer les données dans un fichier .csv.
  • Visualiser les données en temps réel avec rqt_plot ou équivalent.
    Visualiser avec rqt_plot — ROS2
  • Implémenter la reconfiguration dynamique des seuils de validation.

📚 Ressources


👥 Auteurs

  • Chatigre Larissa
  • Ife Leonce Comlan
  • Agbodja Marzoukath

Test : Test 2 - Tekbot Robotics Challenges
Langage : Python 3 / ROS2
Framework : ROS2 Humble
Date : Juin 2025


© 2025 Test 2 - Tekbot Robotics Challenges. Projet développé dans le cadre de l'évaluation ROS2.