Parallel 3D Point Cloud Data analysis with Dask

There are many python packages that are tightly integrated with Dask which enables parallel data processing. For instance, consider xarray package. This package is used to read datasets in netcdf, hdf5, zarr file formats. Dask comes in play when data is read from multile files where each file is treated as a chunk or by explictly specifying chunks parameter to describe how the data needs to be chunked. laspy python package is used for reading 3D point cloud data but unfortunately it lacks integration with Dask but still it is possible to use it when data is read from multiple files. Although this techique of loading data from multiple files is quite well known, having the chunks parameter feature adds a great deal of flexibility. In this article, a demonstration of such a feature is presented.

A more permanent version of this article with updates and fixes can be found in our Gitlab repository.

3D Point Cloud format – Las/Laz

A brief overview of the las format. The 3D point cloud data is stored as a las file. As shown in the figure below the las file consists of header, variable length records, point cloud records and extended variable length records. Depending on the version of the las format specification, few details may vary in fields stored in point cloud records and may not have extended variable length records.

https://pad.gwdg.de/uploads/9738de9a-6347-417d-9468-894bd2f1a859.png

The header part has the metadata about the file contents like format version, number of point cloud records, size of each record in bytes, offset byte where the point cloud record starts, data-type of fields in the record, minimum and maximum values of the spatial co-ordinates, scaling factor, etc.,

laspy package provides convenience functions not just to read point cloud records but also allows to investigate metadata. Perhaps one interesting thing to figure out from metadata is memory requirements to load a las file. Here is a small utility function called info to provide this information.

import humanize
import laspy
import os

def info(filename):
    naturalsize = lambda x: humanize.naturalsize(x, binary=True)
    with open(filename, "rb") as fid:
        reader = laspy.LasReader(fid)
        npoints = reader.header.point_count
        point_size = reader.header.point_format.size
        memory_size = npoints * point_size
    print(f"Filesize on disk: {naturalsize(os.path.getsize(filename))}")
    print(f"Is data compressed: {reader.header.are_points_compressed}")
    print(f"records (points): {npoints} ({humanize.intword(npoints)})")
    print(f"Each record size: {point_size} ({naturalsize(point_size)})")
    print(f"Memory required: {memory_size} ({naturalsize(memory_size)})")

Using the info function to determine the memory requirements of a las file called densel-20230906-103153.laz

filename = "./dense1-20230906-103153.laz"
info(filename)
Filesize on disk: 2.1 GiB
Is data compressed: True
records (points): 148919876 (148.9 million)
Each record size: 54 (54 Bytes)
Memory required: 8041673304 (7.5 GiB)


Although this file occupies just 2.1 GiB in disk, the uncompressed data in memory requires at least 4 times more space in memory (7.5 GiB). This function is quite useful in planning compute node resource requirements. "Each record size" indicates the number of bytes required to store the compressed data on disk.

As mentioned in the introduction, the idea is not to physically split the file into multiple smaller files but logically divide the single file into multiple chunks so Dask can work with these chunks. The immediate question that arises is how to chunk the data. The criteria can be any of the following:

  • by number of points per chunk (example: 2 million per chunk)
  • by size of each chunk (example: 100MB per chunk)
  • dividing data into n number of partitions (example: 100 partitions. i.e., 100 chunks)

 

Extending the info function to incorporate these features

import re
from dask.utils import byte_sizes


def convert_humansize_to_bytes(value: str):
    match = re.search('([^\.eE\d]+)', str(value))
    nbytes = 1
    if match:
        start = match.start()
        text = match.group().strip().lower()
        nbytes = byte_sizes[text]
        value = value[:start]
    value = int(float(value)) * nbytes
    return value


def info(
        filename,
        partition_by_size: str = None,
        partition_by_points: int = None,
        partition_by_partitions: int = None,
        echo: bool = True,
        return_batch_points: bool = False
):

    naturalsize = lambda x: humanize.naturalsize(x, binary=True)
    with open(filename, "rb") as fid:
        reader = laspy.LasReader(fid)
        npoints = reader.header.point_count
        point_size = reader.header.point_format.size
        memory_size = npoints * point_size
        if echo:
            print(f"Filesize on disk: {naturalsize(os.path.getsize(filename))}")
            print(f"Is data compressed: {reader.header.are_points_compressed}")
            print(f"records (points): {npoints} ({humanize.intword(npoints)})")
            print(f"Each record size: {point_size} ({naturalsize(point_size)})")
            print(f"Memory required: {memory_size} ({naturalsize(memory_size)})")
    if partition_by_partitions:
        batch_size = round(memory_size/partition_by_partitions/10)*10
        batch_points = round(batch_size/point_size/10)*10
        batch_size = batch_points * point_size
    elif partition_by_points:
        batch_points = partition_by_points
        batch_size = batch_points * point_size
    elif partition_by_size:
        batch_size = convert_humansize_to_bytes(partition_by_size)
        batch_points = round(batch_size/point_size/10)*10
        batch_size = batch_points * point_size
    else:
        batch_size = memory_size
        batch_points = npoints

    nbatches = len(range(0, npoints, batch_points))
    if any([partition_by_partitions, partition_by_points, partition_by_size]) and echo:
        print("---")
        print(f"  chunk_size = {batch_size} ({naturalsize(batch_size)})")
        print(f"  points_per_chunk = {batch_points} ({humanize.intword(batch_points)})")
        print(f"  nchunks = {nbatches}")
        print("---")
    if return_batch_points:
        return batch_points


Trying out the info function to see in action to see how these various options look like

chunk by size

chunking the data with each chunk is approximately 100 MB

>>> filename = "./dense1-20230906-103153.laz"
>>> info(filename, partition_by_size="100MB")
Filesize on disk: 2.1 GiB
Is data compressed: True
records (points): 148919876 (148.9 million)
Each record size: 54 (54 Bytes)
Memory required: 8041673304 (7.5 GiB)
---
  chunk_size = 99999900 (95.4 MiB)
  points_per_chunk = 1851850 (1.9 million)
  nchunks = 81
---

 

As you may notice the chunk size is little less than 100 MB. This is because 1MB translates to 1000 bytes. If 1024 bytes per MB is required then use MiB notation.

>>> info(filename, partition_by_size="100MiB")
Filesize on disk: 2.1 GiB
Is data compressed: True
records (points): 148919876 (148.9 million)
Each record size: 54 (54 Bytes)
Memory required: 8041673304 (7.5 GiB)
---
  chunk_size = 104857740 (100.0 MiB)
  points_per_chunk = 1941810 (1.9 million)
  nchunks = 77
---

 

chunk by points

>>> info(filename, partition_by_points=10_00_000)
Filesize on disk: 2.1 GiB
Is data compressed: True
records (points): 148919876 (148.9 million)
Each record size: 54 (54 Bytes)
Memory required: 8041673304 (7.5 GiB)
---
  chunk_size = 54000000 (51.5 MiB)
  points_per_chunk = 1000000 (1.0 million)
  nchunks = 149
---

 

chunk by partitions

>>> info(filename, partition_by_partitions=100)
Filesize on disk: 2.1 GiB
Is data compressed: True
records (points): 148919876 (148.9 million)
Each record size: 54 (54 Bytes)
Memory required: 8041673304 (7.5 GiB)
---
  chunk_size = 80416800 (76.7 MiB)
  points_per_chunk = 1489200 (1.5 million)
  nchunks = 100
---

 

The info function shows how the logical division of file into multiple chunks looks like, the next step is to create a function that actually divides file into chunks.

Dask

import dask
import dask.dataframe as dd
import pandas as pd
import numpy as np


@dask.delayed
def _dask_offset_reader(filename, offset, npoints, scaled=False, func=lambda x:x):
    with open(filename, "rb") as fid:
        reader = laspy.LasReader(fid)
        reader.seek(offset)
        pts = reader.read_points(npoints)
        if scaled:
            xyz_scaled = {'x': pts.x.scaled_array(),
                          'y': pts.y.scaled_array(),
                          'z': pts.z.scaled_array()}
        data = pts.array
    d = pd.DataFrame(data)
    d.index = np.arange(offset, offset+npoints)
    if scaled:
        d['x'] = xyz_scaled['x']
        d['y'] = xyz_scaled['y']
        d['z'] = xyz_scaled['z']
    d = func(d)
    return d


def dask_reader(
        filename,
        partition_by_size=None,
        partition_by_points=None,
        partition_by_partitions=None,
        scaled=False,
        func=lambda x:x,
):
    batch_points = info(
        filename,
        partition_by_size,
        partition_by_points,
        partition_by_partitions,
        echo=False,
        return_batch_points=True)

    with open(filename, "rb") as fid:
        reader = laspy.LasReader(fid)
        dtype = reader.header.point_format.dtype()
        #meta = pd.DataFrame(np.empty(0, dtype=dtype))
        npoints = reader.header.point_count
        pairs = [(batch_points, offset)
                 for offset in range(0, npoints, batch_points)]
        batch_pts, offset = pairs.pop(-1)
        batch_pts = npoints - offset
        pairs.append((batch_pts, offset))

    lazy_load = []
    for points, offset in pairs:
        lazy_func = _dask_offset_reader(filename, offset, points, scaled=scaled, func=func)
        lazy_load.append(lazy_func)
    meta_dtype = _dask_offset_reader(filename, 1, 1, scaled=scaled, func=func).compute().to_records().dtype
    meta = pd.DataFrame(np.empty(0, dtype=meta_dtype))
    return dd.from_delayed(lazy_load, meta=meta)

 

dask_reader function divides the file into chunks and lazily loads them on demand. Here is an workflow on how to setup dask to load this data

Setting up dask cluster and client

>>> from dask.distributed import Client, LocalCluster
>>> cluster = LocalCluster(n_workers=12, threads_per_worker=2)
>>> client = Client(cluster)
>>> print(client)
<Client: 'tcp://127.0.0.1:33430' processes=12 threads=24, memory=376.31 GiB>

 

Reading data

>>> df = dask_reader(filename, partition_by_size='200MiB')
>>> print(df.head())
            X       Y       Z  intensity  bit_fields  classification_flags  \
    0  237355  566872  368473         13          17                     0   
    1  258448  570271  368929          0          33                     0   
    2  236036  565853  365470         19          34                     0   
    3  270386  571869  368003         44          17                     0   
    4  267408  570521  364769         17          33                     0   
    
       classification  user_data  scan_angle  point_source_id    gps_time  \
    0               0          0           0                0  290384.293   
    1               0          0           0                0  290384.293   
    2               0          0           0                0  290384.293   
    3               0          0           0                0  290384.293   
    4               0          0           0                0  290384.293   
    
       echo_width  fullwaveIndex  hitObjectId  heliosAmplitude  
    0         0.0        2829327        21450       201.914284  
    1         0.0        2829328        21450         7.727883  
    2         0.0        2829328        21450       292.342904  
    3         0.0        2829329        21450       684.970443  
    4         0.0        2829330        21450       271.545107  

 

So, the question is, is there any benefit of using dask in terms of speed? Timing the operations: dask vs non-dask

%%time
hitobjectids = df['hitObjectId'].unique().compute()

 

CPU times: user 642 ms, sys: 324 ms, total: 966 ms
Wall time: 8.52 s

 

%%time
data = laspy.read(filename)
h = np.unique(data['hitObjectId'])

 

CPU times: user 2min 10s, sys: 4.06 s, total: 2min 14s
Wall time: 13.6 s

 

Well, with Dask performance is little bit better but not significant. There are 2 things happening here, loading the data from disk (I/O operation) and some analysis operation following that. Rather than measuring the timing for these operations combined, consider measuring the timing for analysis part alone.

Loading data into memory with Dask

%%time
df = df.persist()

 

Well, with Dask performance is little bit better but not significant. There are 2 things happening here, loading the data from disk (I/O operation) and some analysis operation following that. Rather than measuring the timing for these operations combined, consider measuring the timing for analysis part alone.

Loading data into memory with Dask

%%time
df = df.persist()

 

CPU times: user 12.5 ms, sys: 7.97 ms, total: 20.5 ms
Wall time: 19.1 ms

 

Since persist is non-blocking operation, the data loading part happens in the background. It takes couple of seconds for the dask workers to load the data into memory.

After waiting couple of seconds, re-running the analysis part.

%%time
hitobjectids = df['hitObjectId'].unique().compute()

 

CPU times: user 47.1 ms, sys: 27.8 ms, total: 74.9 ms
Wall time: 105 ms

 

re-running the analysis part without dask

%%time
h = np.unique(data['hitObjectId'])

 

CPU times: user 4.6 s, sys: 677 ms, total: 5.28 s
Wall time: 4.87 s

Now this is a significant difference in performance. Dask work-flow totally out performs the usual work-flow.

Discussion

In this particular example the source file requires approximately 9GB of memory to load the whole dataset which comfortabily fits into a single node memory. For larger data-sets which do not fit in a single node, dask_jobqueue package enables to do distributed computing. In this case, only the cluster setup part differs and the rest of work-flow remains the same.

The objective of this article is to showcase Dask work-flow with 3D Point Cloud data using chunks feature. As demonstated, with few helper functions, 3D point cloud data can direclty read with dask. Even with the very limited analysis part shown here, it is quite evident to see performance benifits using dask.

dask_reader accepts optional scaled parameter which scales x, y, z values. The optional func parameter is intended to manipulate pandas dataframe. For instance, to return only selected columns. Provide a custom function that receives dataframe as input and the return value must be a dataframe.

Author

Pavan Siligam

Kategorien

Archive

--