Working with large catalogs#

Large astronomical surveys contain a massive volume of data. Billion object, multi-terabyte sized catalogs are challenging to store and manipulate because they demand state-of-the-art hardware. Processing them is expensive, both in terms of runtime and memory consumption, and performing it in a single machine has become impractical. LSDB is a solution that enables scalable algorithm execution. It handles loading, querying, filtering and crossmatching astronomical data (of HiPSCat format) in a distributed environment.

In this tutorial, we will demonstrate how to:

  1. Setup a Dask client for distributed processing

  2. Load an object catalog with a set of desired columns

  3. Select data from regions of the sky

  4. Preview catalog data

[1]:
import lsdb

Installing dependencies#

To load catalogs from the web using the HTTP protocol we’ll need to install aiohttp.

[2]:
!pip install aiohttp --quiet

Creating a Dask client#

Dask is a framework that allows us to take advantage of distributed computing capabilities.

With Dask, the operations defined in a workflow (e.g. this notebook) are added to a task graph which optimizes their order of execution. The operations are not immediately computed, that’s for us to decide. As soon as we trigger the computations, Dask distributes the workload across its multiple workers and tasks are run efficiently in parallel. Usually the later we kick off the computations the better.

Dask creates a client by default, if we do not instantiate one. If we do, we may, among others:

  • Specify the number of workers and the memory limit for each of them.

  • Adjust the address for the dashboard to profile the operations while they run (by default it serves on port 8787).

For additional information please refer to https://distributed.dask.org/en/latest/client.html.

[3]:
from dask.distributed import Client

client = Client(n_workers=4, memory_limit="auto")
client
[3]:

Client

Client-d9627a98-0e36-11ef-8575-0242ac110002

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status

Cluster Info

Loading a catalog#

We will load a small 5 degree radius cone from the ZTF DR14 object catalog.

Catalogs represent tabular data and are internally subdivided into partitions based on their positions in the sky. When processing a catalog, each worker is expected to be able to load a single partition at a time into memory for processing. Therefore, when loading a catalog, it’s crucial to specify the subset of columns we need for our science pipeline. Failure to specify these columns results in loading the entire partition table, which not only increases the usage of worker memory but also impacts runtime performance significantly.

[4]:
surveys_path = "https://epyc.astro.washington.edu/~lincc-frameworks/other_degree_surveys"
[5]:
ztf_object_path = f"{surveys_path}/ztf/ztf_object"
ztf_object = lsdb.read_hipscat(ztf_object_path, columns=["ps1_objid", "ra", "dec"])
ztf_object
[5]:
lsdb Catalog ztf_object_fivedegree:
ps1_objid ra dec
npartitions=43
3613012801057980416 int64[pyarrow] double[pyarrow] double[pyarrow]
3614138700964823040 ... ... ...
... ... ... ...
4053239664633446400 ... ... ...
18446744073709551615 ... ... ...

The catalog has been loaded lazily, we can see its metadata but no actual data is there yet. We will be defining more operations in this notebook. Only when we call compute() on the resulting catalog are operations executed, i.e. data is loaded from disk into memory for processing.

Selecting a region of the sky#

We may use 3 types of spatial filters (cone, polygon and box) to select a portion of the sky.

Filtering consists of two main steps:

  • A coarse stage, in which we find what pixels cover our desired region in the sky. These may overlap with the region and only be partially contained within the region boundaries. This means that some data points inside that pixel may fall outside of the region.

  • A fine stage, where we filter the data points from each pixel to make sure they fall within the specified region.

The fine parameter allows us to specify whether or not we desire to run the fine stage, for each search. It brings some overhead, so if your intention is to get a rough estimate of the data points for a region, you may disable it. It is always executed by default.

catalog.box(..., fine=False)
catalog.cone_search(..., fine=False)
catalog.polygon_search(..., fine=False)

Throughout this notebook we will use the Catalog’s plot_pixels method to display the HEALPix of each resulting catalog as filters are applied.

[6]:
ztf_object.plot_pixels("ZTF_DR14 - pixel map")
../_images/tutorials_working_with_large_catalogs_12_0.png

Previewing part of the data#

Computing an entire catalog requires loading all of its resulting data into memory, which is expensive and may lead to out-of-memory issues.

Often our goal is to have a peek at a slice of data to make sure the workflow output is reasonable (e.g. to assess if some new created columns are present and their values have been properly processed). head() is a pandas-like method which allows us to preview part of the data for this purpose. It iterates over the existing catalog partitions, in sequence, and finds up to n number of rows.

Notice that this method implicitly calls compute().

[17]:
ztf_object_cone.head()
[17]:
ps1_objid ra dec
_hipscat_index
3646116619639324672 131432999494754032 299.949481 19.527901
3646116620503351296 131432999482554346 299.948273 19.52819
3646116620964724736 131432999472704717 299.94726 19.528454
3646116621937803264 131432999480965958 299.948077 19.529494
3646116750698741760 131442999840264659 299.984096 19.536824

By default the first 5 rows of data will be shown but we can specify a higher number if we need.

[18]:
ztf_object_cone.head(n=10)
[18]:
ps1_objid ra dec
_hipscat_index
3646116619639324672 131432999494754032 299.949481 19.527901
3646116620503351296 131432999482554346 299.948273 19.52819
3646116620964724736 131432999472704717 299.94726 19.528454
3646116621937803264 131432999480965958 299.948077 19.529494
3646116750698741760 131442999840264659 299.984096 19.536824
3646116752913334272 131442999792923350 299.979209 19.535608
3646116753051746304 131442999803743832 299.980368 19.536026
3646116753819303936 131442999770212613 299.977055 19.535172
3646116754284871680 131442999775374201 299.977557 19.536359
3646116755371196416 131442999790335818 299.97904 19.537692

Closing the Dask client#

[19]:
client.close()