from __future__ import annotations
import copy
import json
import logging
import os
import random
import re
from collections import defaultdict
from io import BytesIO
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple, Union, cast
from urllib.parse import urlparse
import numpy as np
import numpy.typing as npt
import requests
from PIL import ExifTags, Image
from segments.typing import (
EgoPose,
ExportFormat,
PointcloudCuboidLabelAttributes,
TaskType,
)
# https://adamj.eu/tech/2021/05/13/python-type-hints-how-to-fix-circular-imports/
# https://stackoverflow.com/questions/61384752/how-to-type-hint-with-an-optional-import
if TYPE_CHECKING:
import open3d as o3d
from segments.dataset import SegmentsDataset
from segments.typing import Release
#############
# Variables #
#############
session = requests.Session()
adapter = requests.adapters.HTTPAdapter(max_retries=3)
session.mount("http://", adapter)
session.mount("https://", adapter)
logger = logging.getLogger(__name__)
COMPATIBLE_TASK_TYPES = {
ExportFormat.COCO_PANOPTIC: {
TaskType.SEGMENTATION_BITMAP,
TaskType.SEGMENTATION_BITMAP_HIGHRES,
},
ExportFormat.COCO_INSTANCE: {
TaskType.SEGMENTATION_BITMAP,
TaskType.SEGMENTATION_BITMAP_HIGHRES,
},
ExportFormat.YOLO: {
TaskType.VECTOR,
TaskType.BBOXES,
TaskType.IMAGE_VECTOR_SEQUENCE,
},
ExportFormat.INSTANCE: {
TaskType.SEGMENTATION_BITMAP,
TaskType.SEGMENTATION_BITMAP_HIGHRES,
},
ExportFormat.INSTANCE_COLOR: {
TaskType.SEGMENTATION_BITMAP,
TaskType.SEGMENTATION_BITMAP_HIGHRES,
},
ExportFormat.SEMANTIC: {
TaskType.SEGMENTATION_BITMAP,
TaskType.SEGMENTATION_BITMAP_HIGHRES,
},
ExportFormat.SEMANTIC_COLOR: {
TaskType.SEGMENTATION_BITMAP,
TaskType.SEGMENTATION_BITMAP_HIGHRES,
},
ExportFormat.POLYGON: {
TaskType.SEGMENTATION_BITMAP,
TaskType.SEGMENTATION_BITMAP_HIGHRES,
},
}
[docs]def bitmap2file(
bitmap: npt.NDArray[np.uint32],
is_segmentation_bitmap: bool = True,
) -> BytesIO:
"""Convert a label bitmap to a file with the proper format.
Args:
bitmap: A :class:`numpy.ndarray` with :class:`numpy.uint32` dtype where each unique value represents an instance id.
is_segmentation_bitmap: If this is a segmentation bitmap. Defaults to :obj:`True`.
Returns:
A file object.
Raises:
:exc:`ValueError`: If the ``dtype`` is not :class:`np.uint32` or :class:`np.uint8`.
:exc:`ValueError`: If the bitmap is not a segmentation bitmap.
"""
# Convert bitmap to np.uint32, if it is not already
if bitmap.dtype == "uint32":
pass
elif bitmap.dtype == "uint8":
bitmap = np.uint32(bitmap)
else:
raise ValueError("Only np.ndarrays with np.uint32 dtype can be used.")
if is_segmentation_bitmap:
bitmap2 = np.copy(bitmap)
bitmap2 = bitmap2[:, :, None].view(np.uint8)
bitmap2[:, :, 3] = 255
else:
raise ValueError("Only segmentation bitmaps can be used.")
f = BytesIO()
Image.fromarray(bitmap2).save(f, "PNG")
f.seek(0)
return f
[docs]def get_semantic_bitmap(
instance_bitmap: Optional[npt.NDArray[np.uint32]] = None,
annotations: Optional[Dict[str, Any]] = None,
id_increment: int = 0,
) -> Optional[npt.NDArray[np.uint32]]:
"""Convert an instance bitmap and annotations dict into a segmentation bitmap.
Args:
instance_bitmap: A :class:`numpy.ndarray` with :class:`numpy.uint32` ``dtype`` where each unique value represents an instance id. Defaults to :obj:`None`.
annotations: An annotations dictionary. Defaults to :obj:`None`.
id_increment: Increment the category ids with this number. Defaults to ``0``.
Returns:
An array here each unique value represents a category id.
"""
if instance_bitmap is None or annotations is None:
return None
instance2semantic = [0] * (max([a["id"] for a in annotations], default=0) + 1)
for annotation in annotations:
instance2semantic[annotation["id"]] = annotation["category_id"] + id_increment
instance2semantic = np.array(instance2semantic)
semantic_label = instance2semantic[np.array(instance_bitmap, np.uint32)]
return semantic_label
[docs]def export_dataset(
dataset: SegmentsDataset,
export_folder: str = ".",
export_format: ExportFormat = ExportFormat.COCO_PANOPTIC,
id_increment: int = 0,
**kwargs: Any,
) -> Optional[Union[Tuple[str, Optional[str]], Optional[str]]]:
"""Export a dataset to a different format.
+------------------+-------------------------------------------------------------+
| Export format | Supported dataset type |
+==================+=============================================================+
| COCO panoptic | ``segmentation-bitmap`` and ``segmentation-bitmap-highres`` |
+------------------+-------------------------------------------------------------+
| COCO instance | ``segmentation-bitmap`` and ``segmentation-bitmap-highres`` |
+------------------+-------------------------------------------------------------+
| YOLO | ``vector``, ``bboxes`` and ``image-vector-sequence`` |
+------------------+-------------------------------------------------------------+
| Instance | ``segmentation-bitmap`` and ``segmentation-bitmap-highres`` |
+------------------+-------------------------------------------------------------+
| Colored instance | ``segmentation-bitmap`` and ``segmentation-bitmap-highres`` |
+------------------+-------------------------------------------------------------+
| Semantic | ``segmentation-bitmap`` and ``segmentation-bitmap-highres`` |
+------------------+-------------------------------------------------------------+
| Colored semantic | ``segmentation-bitmap`` and ``segmentation-bitmap-highres`` |
+------------------+-------------------------------------------------------------+
| Polygon | ``segmentation-bitmap`` and ``segmentation-bitmap-highres`` |
+------------------+-------------------------------------------------------------+
Example:
.. code-block:: python
# pip install segments-ai
from segments import SegmentsClient, SegmentsDataset
from segments.utils import export_dataset
# Initialize a SegmentsDataset from the release file
client = SegmentsClient('YOUR_API_KEY')
release = client.get_release('jane/flowers', 'v1.0') # Alternatively: release = 'flowers-v1.0.json'
dataset = SegmentsDataset(release, labelset='ground-truth', filter_by=['labeled', 'reviewed'])
# Export to COCO panoptic format
export_dataset(dataset, export_format='coco-panoptic')
Alternatively, you can use the initialized :class:`.SegmentsDataset` to loop through the samples and labels, and visualize or process them in any way you please:
.. code-block:: python
import matplotlib.pyplot as plt
from segments.utils import get_semantic_bitmap
for sample in dataset:
# Print the sample name and list of labeled objects
print(sample['name'])
print(sample['annotations'])
# Show the image
plt.imshow(sample['image'])
plt.show()
# Show the instance segmentation label
plt.imshow(sample['segmentation_bitmap'])
plt.show()
# Show the semantic segmentation label
semantic_bitmap = get_semantic_bitmap(sample['segmentation_bitmap'], sample['annotations'])
plt.imshow(semantic_bitmap)
plt.show()
Args:
dataset: A :class:`.SegmentsDataset`.
export_folder: The folder to export the dataset to. Defaults to ``.``.
export_format: The destination format. Defaults to ``coco-panoptic``.
id_increment: Increment the category ids with this number. Defaults to ``0``. Ignored unless ``export_format`` is ``semantic`` or ``semantic-color``.
Returns:
Returns the file name and the image directory name (for COCO panoptic, COCO instance, YOLO and polygon), or returns the export folder name (for (colored) instance and (colored) panoptic).
Raises:
:exc:`ImportError`: If scikit image is not installed (to install run ``pip install scikit-image``).
:exc:`ValueError`: If an unvalid ``export_format`` is used.
"""
try:
import skimage # noqa: F401
except ImportError as e:
logger.error("Please install scikit-image first: pip install scikit-image.")
raise e
print("Exporting dataset. This may take a while...")
if export_format == ExportFormat.COCO_PANOPTIC:
if dataset.task_type not in COMPATIBLE_TASK_TYPES[export_format]:
raise ValueError(
"Only datasets of type 'segmentation-bitmap' and 'segmentation-bitmap-highres' can be exported to this format."
)
from .export import export_coco_panoptic
return export_coco_panoptic(dataset, export_folder)
elif export_format == ExportFormat.COCO_INSTANCE:
if dataset.task_type not in COMPATIBLE_TASK_TYPES[export_format]:
raise ValueError(
"Only datasets of type 'segmentation-bitmap' and 'segmentation-bitmap-highres' can be exported to this format."
)
from .export import export_coco_instance
return export_coco_instance(dataset, export_folder)
elif export_format == ExportFormat.YOLO:
if dataset.task_type not in COMPATIBLE_TASK_TYPES[export_format]:
raise ValueError(
'Only datasets of type "segmentation-bitmap", "segmentation-bitmap-highres", "vector", "bboxes" and "keypoints" can be exported to this format.'
)
from .export import export_yolo
return export_yolo(
dataset,
export_folder,
image_width=kwargs.get("image_width", None),
image_height=kwargs.get("image_height", None),
)
elif export_format in {
ExportFormat.SEMANTIC_COLOR,
ExportFormat.INSTANCE_COLOR,
ExportFormat.SEMANTIC,
ExportFormat.INSTANCE,
}:
if dataset.task_type not in COMPATIBLE_TASK_TYPES[export_format]:
raise ValueError(
"Only datasets of type 'segmentation-bitmap' and 'segmentation-bitmap-highres' can be exported to this format."
)
from .export import export_image
return export_image(dataset, export_folder, export_format, id_increment)
elif export_format == ExportFormat.POLYGON:
if dataset.task_type not in COMPATIBLE_TASK_TYPES[export_format]:
raise ValueError(
'Only datasets of type "segmentation-bitmap" and "segmentation-bitmap-highres" can be exported to this format.'
)
from .export import export_polygon
return export_polygon(dataset, export_folder)
else:
raise ValueError("Please choose a valid export_format.")
[docs]def load_image_from_url(
url: str, save_filename: Optional[str] = None, s3_client: Optional[Any] = None
) -> Image.Image:
"""Load an image from url.
Args:
url: The image url.
save_filename: The filename to save to.
s3_client: A boto3 S3 client, e.g. ``s3_client = boto3.client("s3")``. Needs to be provided if your images are in a private S3 bucket. Defaults to :obj:`None`.
Returns:
A PIL image.
"""
if s3_client is not None:
url_parsed = urlparse(url)
regex = re.search(
r"(.+).(s3|s3-accelerate).(.+).amazonaws.com", url_parsed.netloc
)
if regex:
bucket = regex.group(1)
if bucket == "segmentsai-prod":
image = Image.open(BytesIO(session.get(url).content))
else:
# region_name = regex.group(2)
key = url_parsed.path.lstrip("/")
file_byte_string = s3_client.get_object(Bucket=bucket, Key=key)[
"Body"
].read()
image = Image.open(BytesIO(file_byte_string))
else:
image = Image.open(BytesIO(session.get(url).content))
# urllib.request.urlretrieve(url, save_filename)
if save_filename is not None:
if "exif" in image.info:
image.save(save_filename, exif=image.info["exif"])
else:
image.save(save_filename)
return image
[docs]def load_pointcloud_from_url(
url: str, save_filename: Optional[str] = None, s3_client: Optional[Any] = None
) -> o3d.geometry.PointCloud:
"""Load a pointcloud from url.
Args:
url: The pointcloud url.
save_filename: The filename to save to.
s3_client: A boto3 S3 client, e.g. ``s3_client = boto3.client("s3")``. Needs to be provided if your point clouds are in a private S3 bucket. Defaults to :obj:`None`.
Returns:
A pointcloud.
Raises:
:exc:`ImportError`: If open3d is not installed (to install run ``pip install open3d``).
"""
try:
import open3d as o3d
except ImportError as e:
logger.error("Please install open3d first: pip install open3d")
raise e
def load_pointcloud_from_parsed_url(url: str) -> o3d.geometry.PointCloud:
with NamedTemporaryFile(suffix=".pcd") as f:
f.write(session.get(url).content)
pointcloud = o3d.io.read_point_cloud(f.name)
return pointcloud
if s3_client is not None:
url_parsed = urlparse(url)
regex = re.search(
r"(.+).(s3|s3-accelerate).(.+).amazonaws.com", url_parsed.netloc
)
if regex:
bucket = regex.group(1)
if bucket == "segmentsai-prod":
pointcloud = load_pointcloud_from_parsed_url(url)
else:
key = url_parsed.path.lstrip("/")
file_byte_string = s3_client.get_object(Bucket=bucket, Key=key)[
"Body"
].read()
with NamedTemporaryFile(suffix=".pcd") as f:
f.write(file_byte_string)
pointcloud = o3d.io.read_point_cloud(f.name)
else:
pointcloud = load_pointcloud_from_parsed_url(url)
if save_filename is not None:
o3d.io.write_point_cloud(save_filename, pointcloud)
return pointcloud
[docs]def load_label_bitmap_from_url(
url: str, save_filename: Optional[str] = None
) -> npt.NDArray[np.uint32]:
"""Load a label bitmap from url.
Args:
url: The label bitmap url.
save_filename: The filename to save to.
Returns:
A :class:`numpy.ndarray` with :class:`numpy.uint32` ``dtype`` where each unique value represents an instance id.
"""
def extract_bitmap(
bitmap: Image.Image,
) -> npt.NDArray[np.uint32]:
bitmap_array = np.array(bitmap)
bitmap_array[:, :, 3] = 0
bitmap_array = bitmap_array.view(np.uint32).squeeze(2)
return bitmap_array
bitmap = Image.open(BytesIO(session.get(url).content))
bitmap_array = extract_bitmap(bitmap)
if save_filename:
Image.fromarray(bitmap_array).save(save_filename)
return bitmap_array
[docs]def load_release(release: Release) -> Any:
"""Load JSON from Segments release.
Args:
release: A Segments release.
Returns:
A JSON with the release labels.
"""
release_file = cast(str, release.attributes.url) # TODO Fix in the backend.
content = requests.get(release_file)
return json.loads(content.content)
[docs]def handle_exif_rotation(image: Image.Image) -> Image.Image:
"""Handle the exif rotation of a PIL image.
Args:
image: A PIL image.
Returns:
A rotated PIL image.
"""
def get_key_by_value(dictionary: Mapping[int, str], value: str) -> int:
for k, v in dictionary.items():
if v == value:
return k
raise ValueError(f"No such value {value}.")
try:
orientation = get_key_by_value(ExifTags.TAGS, "Orientation")
exif = dict(image.getexif().items())
if exif[orientation] == 3:
image = image.transpose(Image.ROTATE_180)
elif exif[orientation] == 6:
image = image.transpose(Image.ROTATE_270)
elif exif[orientation] == 8:
image = image.transpose(Image.ROTATE_90)
return image
except (AttributeError, KeyError, IndexError, ValueError):
return image
[docs]def show_polygons(
image_directory_path: str,
image_id: int,
exported_polygons_path: str,
seed: int = 0,
output_path: Optional[str] = None,
) -> None:
"""Show the exported contours of a segmented image (i.e., resulting from :func:`.export_dataset` with polygon export format).
Args:
image_directory_path: The image directory path.
image_id: The image id (this can be found in the exported polygons JSON file).
exported_polygons_path: The exported polygons path.
seed: The seed used to generate random colors. Defaults to ``0``.
output_path: The directory to save the plot to. Defaults to :obj:`None`.
Returns:
None
Raises:
:exc:`ImportError`: If matplotlib is not installed.
"""
try:
from matplotlib import image
from matplotlib import pyplot as plt
from matplotlib.patches import Polygon
except ImportError as e:
logger.error("Please install matplotlib first: pip install matplotlib.")
raise e
def find_image_name(images: List[Dict[str, Any]], image_id: int) -> str:
for image in images:
if image["id"] == image_id:
return cast(str, image["file_name"])
raise KeyError("Cannot find the image id. Please provide a valid id.")
def get_random_color() -> Tuple[float, float, float]:
return (random.uniform(0, 1), random.uniform(0, 1), random.uniform(0, 1))
def normalize(color: List[int]) -> Tuple[float, float, float]:
"""Transform a color from 0-255 range to 0-1 range and from a list to a tuple, e.g., [255, 0, 123] to (1, 0, 0.5)."""
return (color[0] / 255, color[1] / 255, color[2] / 255)
random.seed(seed)
with open(exported_polygons_path, "r") as f:
polygons = json.load(f)
image_name = find_image_name(polygons["images"], image_id)
image = image.imread(f"{image_directory_path}/{image_name}")
# {category id: (category name, color)}
categories = {
category["id"]: (
category["name"],
normalize(category["color"]) if category["color"] else get_random_color(),
)
for category in polygons["categories"]
}
# {category id: polygons}
annotations = defaultdict(list)
filtered_annotations = filter(
lambda dictionary: dictionary["image_id"] == image_id, polygons["annotations"]
)
for annotation in filtered_annotations:
annotations[annotation["category_id"]].extend(annotation["polygons"])
# {category name: (polygons, color)}
category_name_polygons_with_annotations = {
category_name: (annotations[category_id], category_color)
for category_id, (category_name, category_color) in categories.items()
if annotations[category_id]
}
fig, (ax1, ax2, ax3) = plt.subplots(
nrows=1, ncols=3, sharex=True, sharey=True, figsize=(25, 10)
)
used_category_names = set()
for category_name, (
polygons,
color,
) in category_name_polygons_with_annotations.items():
for p in polygons:
polygon = Polygon(
xy=np.asarray(p).reshape(-1, 2),
facecolor=color,
edgecolor=color,
label=category_name
if category_name not in used_category_names
else None,
closed=True,
alpha=0.5,
)
used_category_names.add(category_name)
polygon_copy = copy.deepcopy(polygon)
polygon_copy.set_label(None)
ax1.add_patch(polygon)
# An Artist, container or primitive, cannot be contained in multiple containers, which is consistent with the fact that each Artist holds the parent container as a bare object, not in a list.
ax2.add_patch(polygon_copy)
# Ax 2
# ax2.axis("off")
ax2.set_title("Both")
ax2.imshow(image)
ax2.set_xlabel("Width (pixels)")
# Ax 1 (uses the aspect ratio of the image in axes 2)
# ax1.axis("off")
ax1.set_title("Label")
# ax1.imshow(image)
# https://stackoverflow.com/a/44655020
aspect = np.diff(ax1.get_xlim())[0] / np.diff(ax1.get_ylim())[0]
aspect /= np.diff(ax2.get_xlim())[0] / np.diff(ax2.get_ylim())[0]
aspect = np.abs(aspect)
ax1.set_aspect(aspect)
ax1.set_xlabel("Width (pixels)")
ax1.set_ylabel("Height (pixels)")
# Ax 3
# ax3.axis("off")
ax3.set_title("Image")
ax3.imshow(image)
ax3.set_xlabel("Width (pixels)")
fig.legend()
if output_path:
path = os.path.join(
output_path, f"exported_polygons_from_image_id_{image_id:04d}"
)
plt.savefig(path, bbox_inches="tight")
plt.show()
[docs]def cuboid_to_segmentation(
pointcloud: npt.NDArray[np.float32],
label_attributes: PointcloudCuboidLabelAttributes,
ego_pose: Optional[EgoPose] = None,
) -> npt.NDArray[np.uint32]:
"""Convert a cuboid label to an instance segmentation label.
Args:
pointcloud: A pointcloud of size Nx3.
label_attributes: A cuboid label from a single frame interface or one frame from a sequence interface.
Returns:
An instance segmentation label of size Nx1 mapping each point cloud point to a cuboid instance.
Raises:
:exc:`ImportError`: If pyquaternion is not installed (to install run ``pip install pyquaternion``).
:exc:`ImportError`: If open3d is not installed (to install run ``pip install open3d``).
"""
try:
from pyquaternion import Quaternion
except ImportError as e:
logger.error("Please install pyquaternion first: pip install pyquaternion")
raise e
try:
import open3d as o3d
except ImportError as e:
logger.error("Please install open3d first: pip install open3d")
raise e
# check dimensions of input
assert pointcloud.shape[1] == 3, "Pointcloud must have shape (N, 3)"
assert label_attributes.annotations, "Label must have annotations"
# create cuboids
cuboids = {}
for annotation in label_attributes.annotations:
center = np.array(
[annotation.position.x, annotation.position.y, annotation.position.z]
)
extent = np.array(
[annotation.dimensions.x, annotation.dimensions.y, annotation.dimensions.z]
)
if annotation.rotation:
rotation = o3d.geometry.get_rotation_matrix_from_quaternion(
np.array(
[
annotation.rotation.qx,
annotation.rotation.qy,
annotation.rotation.qz,
annotation.rotation.qw,
]
)
)
else:
rotation = o3d.geometry.get_rotation_matrix_from_xyz((0, 0, annotation.yaw))
# create cuboid
cuboid = o3d.geometry.OrientedBoundingBox(
center=center, extent=extent, R=rotation
)
cuboids[annotation.id] = cuboid
# transform cuboids from world to lidar coordinate frame
transformation = np.eye(4)
if ego_pose and ego_pose.position:
pos = ego_pose.position
transformation[:3, 3] = np.array([pos.x, pos.y, pos.z])
if ego_pose and ego_pose.heading:
rot = ego_pose.heading
rotq = Quaternion(
x=rot.qx, y=rot.qy, z=rot.qz, w=rot.qw
).inverse.transformation_matrix
transformation[:3, :3] = rotq[:3, :3]
# tranform = rotate + translate (bug: transform not defined for OrientedBoundingBox)
cuboid.translate(-transformation[:3, 3])
cuboid.rotate(transformation[:3, :3])
# map each point to a cuboid instance
result = np.zeros(pointcloud.shape[0], dtype=np.uint32)
pointcloud = o3d.utility.Vector3dVector(pointcloud)
for id, cuboid in cuboids.items():
# get points inside cuboid
inside = cuboid.get_point_indices_within_bounding_box(pointcloud)
result[inside] = id
return result
def array_to_pcd(
positions: npt.NDArray[np.float32],
output_path: str,
intensity: Optional[npt.NDArray[np.float32]] = None,
rgb: Optional[npt.NDArray[np.float32]] = None,
compressed: bool = False,
write_ascii: bool = True,
) -> None:
"""Convert a numpy array to a pcd file.
Args:
positions: Array of xyz points (Nx3 shape).
output_path: Path to write the pcd.
intensity: Optional array of intensity values (Nx1 shape).
rgb: Optional array of rgb values (Nx3 shape) where red, green and blue are values between 0 and 255 or 0 and 1.
compressed: If the pcd should be compressed. Defaults to :obj:`False`.
write_ascii: If the pcd should be written in ascii format. Defaults to :obj:`True`.
Returns:
None
Raises:
:exc:`ImportError`: If open3d is not installed (to install run ``pip install open3d``).
:exc:`AssertionError`: If the positions array does not have shape (N, 3).
:exc:`AssertionError`: If the intensity array does not have shape (N, 1).
:exc:`AssertionError`: If the rgb array does not have shape (N, 3).
:exc:`AssertionError`: If the intensity array does not have the same length as the positions array.
:exc:`AssertionError`: If the rgb array does not have the same length as the positions array.
"""
try:
import open3d as o3d
except ImportError as e:
logger.error("Please install open3d first: pip install open3d")
raise e
assert (
positions.shape[1] == 3
), f"Positions must have shape (N, 3) but has shape {positions.shape}"
# cast to float32
positions = positions.astype(np.float32)
intensity = intensity.astype(np.float32) if intensity is not None else None
rgb = rgb.astype(np.float32) if rgb is not None else None
device = o3d.core.Device("CPU:0")
dtype = o3d.core.float32
pcd = o3d.t.geometry.PointCloud(device)
pcd.point["positions"] = o3d.core.Tensor(positions, dtype, device)
if intensity is not None:
assert len(intensity) == len(
positions
), f"Intensity must have same length as positions but intensity has shape {intensity.shape} and positions has shape {positions.shape}"
assert (
len(intensity.shape) == 2 and intensity.shape[1] == 1
), f"Intensity must have shape (N,) but has shape {intensity.shape}"
pcd.point["intensity"] = o3d.core.Tensor(intensity, dtype, device)
if rgb is not None:
assert len(rgb) == len(
positions
), f"RGB must have same length as positions but RGB has shape {rgb.shape} and positions has shape {positions.shape}"
assert (
rgb.shape[1] == 3
), f"RGB must have shape (N, 3) but has shape {rgb.shape}"
# check rgb encoding (0-255 or 0-1)
if np.max(rgb) > 1:
rgb /= 255.0 # map 0-255 to 0-1 (open3d expects rgb values between 0 and 1)
pcd.point["colors"] = o3d.core.Tensor(rgb, dtype, device)
o3d.t.io.write_point_cloud(
output_path,
pcd,
compressed=compressed,
write_ascii=write_ascii,
print_progress=True,
)
def ply_to_pcd(
ply_file: str, compressed: bool = False, write_ascii: bool = True
) -> None:
"""Convert a .ply file to a .pcd file.
Args:
ply_file: The path to the .ply file.
compressed: If the pcd should be compressed. Defaults to :obj:`False`.
write_ascii: If the pcd should be written in ascii format. Defaults to :obj:`True`.
Returns:
None
Raises:
:exc:`ImportError`: If plyfile is not installed (to install run ``pip install plyfile``).
:exc:`KeyError`: If the positions are not found in the ply file (expected colum names are ``x``, ``y`` and ``z``).
"""
try:
from plyfile import PlyData
except ImportError as e:
logger.error("Please install plyfile first: pip install plyfile")
raise e
with open(ply_file, "rb") as f:
ply = PlyData.read(f)
try:
positions = np.stack(
(ply["vertex"]["x"], ply["vertex"]["y"], ply["vertex"]["z"]), axis=-1
)
except KeyError:
raise KeyError("Could not find the positions in the ply file.")
try:
intensity = np.array(ply["vertex"]["intensity"]).reshape(-1, 1)
except KeyError:
try:
intensity = np.array(ply["vertex"]["i"]).reshape(-1, 1)
except KeyError:
intensity = None
try:
rgb = np.stack(
(ply["vertex"]["r"], ply["vertex"]["g"], ply["vertex"]["b"]), axis=-1
)
except KeyError:
try:
rgb = np.stack(
(ply["vertex"]["red"], ply["vertex"]["green"], ply["vertex"]["blue"]),
axis=-1,
)
except KeyError:
rgb = None
pcd_path = ply_file.replace(".ply", ".pcd")
# prefer RGB over intensity (tiled point cloud does not support both)
intensity = intensity if rgb is None else None
array_to_pcd(
positions,
pcd_path,
intensity=intensity,
rgb=rgb,
compressed=compressed,
write_ascii=write_ascii,
)
def sample_pcd(
pcd_path: str, points: int = 500_000, output_path: Optional[str] = None
) -> None:
"""Sample a point cloud to a given number of points.
Args:
pcd_path: The path to the point cloud.
points: The number of points to sample. Defaults to ``500_000``.
output_path: The path to save the sampled point cloud to. Defaults to :obj:`None`.
Returns:
None
Raises:
:exc:`ImportError`: If open3d is not installed (to install run ``pip install open3d``).
"""
try:
import open3d as o3d
except ImportError as e:
logger.error("Please install open3d first: pip install open3d")
raise e
if output_path is None:
output_path = output_path.replace(".pcd", "_sampled.pcd")
pcd = o3d.io.read_point_cloud(pcd_path)
# open3d expects a step size (not a number of points)
points_step_size = len(pcd.points) // points
pcd = pcd.uniform_down_sample(points_step_size)
o3d.io.write_point_cloud(
output_path, pcd, write_ascii=False, compressed=True, print_progress=True
)