Converting HDF5 Format Dataset to LeRobot v2.0
👋 Welcome to this guide on converting HDF5 format datasets to LeRobot v2.0 format! This post will walk you through the complete process of transforming your robot learning data into the standardized LeRobot format.
💻 The main code is located under policy/pi05/examples/aloha_real/convert_aloha_data_to_lerobot_robotwin.py in the RoboTwin toolkit root directory.
📋 The conversion process consists of several key steps:
- 📁 Creating the LeRobot dataset instance with the appropriate interface
- 🔧 Defining helper functions for data loading and processing
- 📊 Populating the dataset with HDF5 data
The complete code will be provided at the end of this post for your reference.
📁 Creating the LeRobot Dataset
To create a LeRobot dataset, you need to provide several key parameters:
-
repo_id: Unique identifier for your dataset -
robot_type: Type of robot (e.g., “aloha” or “mobile_aloha”) -
mode: Storage mode for images (“video” or “image”) - Optional parameters:
has_velocity,has_effort, anddataset_config
🔧 Key Components
Joint and Camera Configuration
You need to specify the names of different joint values (motor names) and camera names to ensure proper data mapping.
Features Definition
A critical requirement for LeRobot datasets is the features field in the meta/info.json file. This field defines the structure of your dataset, including:
- States: Robot joint positions and gripper states
- Actions: Commands sent to the robot
- Optional: Velocity and effort data (if available)
- Images: Camera feeds from different viewpoints
def create_empty_dataset(
repo_id: str,
robot_type: str,
mode: Literal["video", "image"] = "video",
*,
has_velocity: bool = False,
has_effort: bool = False,
dataset_config: DatasetConfig = DEFAULT_DATASET_CONFIG,
) -> LeRobotDataset:
# ==========================================
# Define joint names and camera names
# ==========================================
motors = [
"left_waist",
"left_shoulder",
"left_elbow",
"left_forearm_roll",
"left_wrist_angle",
"left_wrist_rotate",
"left_gripper",
"right_waist",
"right_shoulder",
"right_elbow",
"right_forearm_roll",
"right_wrist_angle",
"right_wrist_rotate",
"right_gripper",
]
cameras = [
"cam_high",
"cam_left_wrist",
"cam_right_wrist",
]
# ==============================================
# Define features of the dataset
# ==============================================
features = {
"observation.state": {
"dtype": "float32",
"shape": (len(motors), ),
"names": [
motors,
],
},
"action": {
"dtype": "float32",
"shape": (len(motors), ),
"names": [
motors,
],
},
}
if has_velocity:
features["observation.velocity"] = {
"dtype": "float32",
"shape": (len(motors), ),
"names": [
motors,
],
}
if has_effort:
features["observation.effort"] = {
"dtype": "float32",
"shape": (len(motors), ),
"names": [
motors,
],
}
for cam in cameras:
features[f"observation.images.{cam}"] = {
"dtype": mode,
"shape": (3, 480, 640),
"names": [
"channels",
"height",
"width",
],
}
if Path(HF_LEROBOT_HOME / repo_id).exists():
shutil.rmtree(HF_LEROBOT_HOME / repo_id)
# Return the created LeRobot Dataset
return LeRobotDataset.create(
repo_id=repo_id,
fps=50,
robot_type=robot_type,
features=features,
use_videos=dataset_config.use_videos,
tolerance_s=dataset_config.tolerance_s,
image_writer_processes=dataset_config.image_writer_processes,
image_writer_threads=dataset_config.image_writer_threads,
video_backend=dataset_config.video_backend,
)
🔧 Helper Functions
To streamline the conversion process, we define several helper functions that handle common tasks:
- Velocity and Effort Detection: Functions to check if velocity or effort information is available in the dataset
- Image Loading: Functions to load and process images from the HDF5 files
- Episode Data Loading: Functions to load complete episode data including states, actions, and images
def has_velocity(hdf5_files: list[Path]) -> bool:
with h5py.File(hdf5_files[0], "r") as ep:
return "/observations/qvel" in ep
def has_effort(hdf5_files: list[Path]) -> bool:
with h5py.File(hdf5_files[0], "r") as ep:
return "/observations/effort" in ep
def load_raw_images_per_camera(ep: h5py.File, cameras: list[str]) -> dict[str, np.ndarray]:
# ========================================================================================
# key is the camera name, and value is the images array across single episode
# ========================================================================================
imgs_per_cam = {}
for camera in cameras:
uncompressed = ep[f"/observations/images/{camera}"].ndim == 4
if uncompressed:
# load all images in RAM
imgs_array = ep[f"/observations/images/{camera}"][:]
else:
import cv2
# load one compressed image after the other in RAM and uncompress
imgs_array = []
for data in ep[f"/observations/images/{camera}"]:
data = np.frombuffer(data, np.uint8)
# img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) # 解码为彩色图像
imgs_array.append(cv2.imdecode(data, cv2.IMREAD_COLOR))
imgs_array = np.array(imgs_array)
imgs_per_cam[camera] = imgs_array
return imgs_per_cam
def load_raw_episode_data(
ep_path: Path,
) -> tuple[
dict[str, np.ndarray],
torch.Tensor,
torch.Tensor,
torch.Tensor | None,
torch.Tensor | None,
]:
with h5py.File(ep_path, "r") as ep:
state = torch.from_numpy(ep["/observations/qpos"][:])
action = torch.from_numpy(ep["/action"][:])
velocity = None
if "/observations/qvel" in ep:
velocity = torch.from_numpy(ep["/observations/qvel"][:])
effort = None
if "/observations/effort" in ep:
effort = torch.from_numpy(ep["/observations/effort"][:])
imgs_per_cam = load_raw_images_per_camera(
ep,
[
"cam_high",
"cam_left_wrist",
"cam_right_wrist",
],
)
return imgs_per_cam, state, action, velocity, effort
📊 Populating the Dataset
The process of converting HDF5 data to LeRobot v2.0 format involves the following steps:
- 📥 Load HDF5 data for each episode
- 📝 Add task instructions from the instructions.json file
- 🖼️ Process and add images from different cameras
- 📋 Populate each frame with state, action, and image data
- 💾 Save the episode to the LeRobot dataset
def populate_dataset(
dataset: LeRobotDataset,
hdf5_files: list[Path],
task: str,
episodes: list[int] | None = None,
) -> LeRobotDataset:
if episodes is None:
episodes = range(len(hdf5_files))
for ep_idx in tqdm.tqdm(episodes):
ep_path = hdf5_files[ep_idx]
# ========================
# load hdf5 data
# ========================
imgs_per_cam, state, action, velocity, effort = load_raw_episode_data(ep_path)
num_frames = state.shape[0]
# ================================================
# add prompt randomly from instructions.json
# ================================================
dir_path = os.path.dirname(ep_path)
json_Path = f"{dir_path}/instructions.json"
with open(json_Path, 'r') as f_instr:
instruction_dict = json.load(f_instr)
instructions = instruction_dict['instructions']
instruction = np.random.choice(instructions)
# ================================================
# populate each frame with prompt and images
# ================================================
for i in range(num_frames):
frame = {
"observation.state": state[i],
"action": action[i],
"task": instruction,
}
for camera, img_array in imgs_per_cam.items():
frame[f"observation.images.{camera}"] = img_array[i]
if velocity is not None:
frame["observation.velocity"] = velocity[i]
if effort is not None:
frame["observation.effort"] = effort[i]
dataset.add_frame(frame) # add single frame data
dataset.save_episode() # save episode data
return dataset
📝 Complete Code
Here’s the complete code implementation for your reference:
"""
Script to convert Aloha hdf5 data to the LeRobot dataset v2.0 format.
Example usage: uv run examples/aloha_real/convert_aloha_data_to_lerobot.py --raw-dir /path/to/raw/data --repo-id <org>/<dataset-name>
"""
import dataclasses
from pathlib import Path
import shutil
from typing import Literal
import h5py
from lerobot.common.datasets.lerobot_dataset import HF_LEROBOT_HOME
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
# from lerobot.common.datasets.push_dataset_to_hub._download_raw import download_raw
import numpy as np
import torch
import tqdm
import tyro
import json
import os
import fnmatch
@dataclasses.dataclass(frozen=True)
class DatasetConfig:
use_videos: bool = True
tolerance_s: float = 0.0001
image_writer_processes: int = 10
image_writer_threads: int = 5
video_backend: str | None = None
DEFAULT_DATASET_CONFIG = DatasetConfig()
def create_empty_dataset(
repo_id: str,
robot_type: str,
mode: Literal["video", "image"] = "video",
*,
has_velocity: bool = False,
has_effort: bool = False,
dataset_config: DatasetConfig = DEFAULT_DATASET_CONFIG,
) -> LeRobotDataset:
motors = [
"left_waist",
"left_shoulder",
"left_elbow",
"left_forearm_roll",
"left_wrist_angle",
"left_wrist_rotate",
"left_gripper",
"right_waist",
"right_shoulder",
"right_elbow",
"right_forearm_roll",
"right_wrist_angle",
"right_wrist_rotate",
"right_gripper",
]
cameras = [
"cam_high",
"cam_left_wrist",
"cam_right_wrist",
]
features = {
"observation.state": {
"dtype": "float32",
"shape": (len(motors), ),
"names": [
motors,
],
},
"action": {
"dtype": "float32",
"shape": (len(motors), ),
"names": [
motors,
],
},
}
if has_velocity:
features["observation.velocity"] = {
"dtype": "float32",
"shape": (len(motors), ),
"names": [
motors,
],
}
if has_effort:
features["observation.effort"] = {
"dtype": "float32",
"shape": (len(motors), ),
"names": [
motors,
],
}
for cam in cameras:
features[f"observation.images.{cam}"] = {
"dtype": mode,
"shape": (3, 480, 640),
"names": [
"channels",
"height",
"width",
],
}
if Path(HF_LEROBOT_HOME / repo_id).exists():
shutil.rmtree(HF_LEROBOT_HOME / repo_id)
return LeRobotDataset.create(
repo_id=repo_id,
fps=50,
robot_type=robot_type,
features=features,
use_videos=dataset_config.use_videos,
tolerance_s=dataset_config.tolerance_s,
image_writer_processes=dataset_config.image_writer_processes,
image_writer_threads=dataset_config.image_writer_threads,
video_backend=dataset_config.video_backend,
)
def get_cameras(hdf5_files: list[Path]) -> list[str]:
with h5py.File(hdf5_files[0], "r") as ep:
# ignore depth channel, not currently handled
return [key for key in ep["/observations/images"].keys() if "depth" not in key] # noqa: SIM118
def has_velocity(hdf5_files: list[Path]) -> bool:
with h5py.File(hdf5_files[0], "r") as ep:
return "/observations/qvel" in ep
def has_effort(hdf5_files: list[Path]) -> bool:
with h5py.File(hdf5_files[0], "r") as ep:
return "/observations/effort" in ep
def load_raw_images_per_camera(ep: h5py.File, cameras: list[str]) -> dict[str, np.ndarray]:
imgs_per_cam = {}
for camera in cameras:
uncompressed = ep[f"/observations/images/{camera}"].ndim == 4
if uncompressed:
# load all images in RAM
imgs_array = ep[f"/observations/images/{camera}"][:]
else:
import cv2
# load one compressed image after the other in RAM and uncompress
imgs_array = []
for data in ep[f"/observations/images/{camera}"]:
data = np.frombuffer(data, np.uint8)
# img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) # 解码为彩色图像
imgs_array.append(cv2.imdecode(data, cv2.IMREAD_COLOR))
imgs_array = np.array(imgs_array)
imgs_per_cam[camera] = imgs_array
return imgs_per_cam
def load_raw_episode_data(
ep_path: Path,
) -> tuple[
dict[str, np.ndarray],
torch.Tensor,
torch.Tensor,
torch.Tensor | None,
torch.Tensor | None,
]:
with h5py.File(ep_path, "r") as ep:
state = torch.from_numpy(ep["/observations/qpos"][:])
action = torch.from_numpy(ep["/action"][:])
velocity = None
if "/observations/qvel" in ep:
velocity = torch.from_numpy(ep["/observations/qvel"][:])
effort = None
if "/observations/effort" in ep:
effort = torch.from_numpy(ep["/observations/effort"][:])
imgs_per_cam = load_raw_images_per_camera(
ep,
[
"cam_high",
"cam_left_wrist",
"cam_right_wrist",
],
)
return imgs_per_cam, state, action, velocity, effort
def populate_dataset(
dataset: LeRobotDataset,
hdf5_files: list[Path],
task: str,
episodes: list[int] | None = None,
) -> LeRobotDataset:
if episodes is None:
episodes = range(len(hdf5_files))
for ep_idx in tqdm.tqdm(episodes):
ep_path = hdf5_files[ep_idx]
imgs_per_cam, state, action, velocity, effort = load_raw_episode_data(ep_path)
num_frames = state.shape[0]
# add prompt
dir_path = os.path.dirname(ep_path)
json_Path = f"{dir_path}/instructions.json"
with open(json_Path, 'r') as f_instr:
instruction_dict = json.load(f_instr)
instructions = instruction_dict['instructions']
instruction = np.random.choice(instructions)
for i in range(num_frames):
frame = {
"observation.state": state[i],
"action": action[i],
"task": instruction,
}
for camera, img_array in imgs_per_cam.items():
frame[f"observation.images.{camera}"] = img_array[i]
if velocity is not None:
frame["observation.velocity"] = velocity[i]
if effort is not None:
frame["observation.effort"] = effort[i]
dataset.add_frame(frame)
dataset.save_episode()
return dataset
def port_aloha(
raw_dir: Path,
repo_id: str,
raw_repo_id: str | None = None,
task: str = "DEBUG",
*,
episodes: list[int] | None = None,
push_to_hub: bool = False,
is_mobile: bool = False,
mode: Literal["video", "image"] = "image", # set to "video" if you want to save images as video
dataset_config: DatasetConfig = DEFAULT_DATASET_CONFIG,
):
breakpoint()
if (HF_LEROBOT_HOME / repo_id).exists():
shutil.rmtree(HF_LEROBOT_HOME / repo_id)
if not raw_dir.exists():
if raw_repo_id is None:
raise ValueError("raw_repo_id must be provided if raw_dir does not exist")
# download_raw(raw_dir, repo_id=raw_repo_id)
hdf5_files = []
for root, _, files in os.walk(raw_dir):
for filename in fnmatch.filter(files, '*.hdf5'):
file_path = os.path.join(root, filename)
hdf5_files.append(file_path)
dataset = create_empty_dataset(
repo_id,
robot_type="mobile_aloha" if is_mobile else "aloha",
mode=mode,
has_effort=has_effort(hdf5_files),
has_velocity=has_velocity(hdf5_files),
dataset_config=dataset_config,
)
dataset = populate_dataset(
dataset,
hdf5_files,
task=task,
episodes=episodes,
)
# dataset.consolidate()
if push_to_hub:
dataset.push_to_hub()
if __name__ == "__main__":
tyro.cli(port_aloha)
🎉 Conclusion
And that’s a complete walkthrough of converting HDF5 format datasets to LeRobot v2.0 format!
This conversion process is essential for leveraging the standardized LeRobot format, which offers several benefits:
- Consistency: A unified format for robot learning datasets
- Compatibility: Seamless integration with the LeRobot ecosystem
- Efficiency: Optimized storage and loading of robot data
- Flexibility: Support for different robot types and data modalities
By following the steps outlined in this guide, you can effectively transform your HDF5 datasets into the LeRobot format, enabling better collaboration and model development in the robot learning community.
Feel free to adapt this code to your specific robot setup and data requirements. Happy coding! 🤖✨
Enjoy Reading This Article?
Here are some more articles you might like to read next: