Importing catalogs to HiPSCat format#

This notebook presents two ways of importing catalogs to HiPSCat format. The first uses the lsdb.from_dataframe() method, which is helpful to load smaller catalogs from a single dataframe, while the second uses the hipscat import pipeline.

[1]:
import lsdb
import os
import pandas as pd
import tempfile
[2]:
catalog_name = "small_sky_order1"
[3]:
# Input paths
test_data_dir = os.path.join("../../tests", "data")
catalog_dir = os.path.join(test_data_dir, catalog_name)
catalog_csv_path = os.path.join(catalog_dir, f"{catalog_name}.csv")
[4]:
# Output paths
catalog_from_dataframe = f"{catalog_name}-from_dataframe"
catalog_from_importer = f"{catalog_name}-from_importer"
tmp_path = tempfile.TemporaryDirectory()

Using lsdb.from_dataframe()#

[5]:
%%time

# Read simple catalog from its CSV file
catalog = lsdb.from_dataframe(
    pd.read_csv(catalog_csv_path),
    catalog_name=catalog_from_dataframe,
    catalog_type="object",
    highest_order=5,
    threshold=100,
)

# Save it to disk in HiPSCat format
catalog.to_hipscat(catalog_from_dataframe)
CPU times: user 748 ms, sys: 24.4 ms, total: 773 ms
Wall time: 765 ms

Using the import pipeline#

[6]:
# Install hipscat-import
!pip install hipscat-import --quiet
[7]:
from dask.distributed import Client
from hipscat_import.catalog.arguments import ImportArguments
from hipscat_import.pipeline import pipeline_with_client
[8]:
# Create directory if it does not yet exist
os.makedirs(catalog_from_importer, exist_ok=True)
[9]:
args = ImportArguments(
    sort_columns="id",
    ra_column="ra",
    dec_column="dec",
    highest_healpix_order=5,
    pixel_threshold=100,
    file_reader="csv",
    input_file_list=[os.path.join(catalog_dir, "small_sky_order1.csv")],
    output_artifact_name=catalog_from_importer,
    output_path=".",
    dask_tmp=tmp_path.name,
    overwrite=True,
)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[9], line 1
----> 1 args = ImportArguments(
      2     sort_columns="id",
      3     ra_column="ra",
      4     dec_column="dec",
      5     highest_healpix_order=5,
      6     pixel_threshold=100,
      7     file_reader="csv",
      8     input_file_list=[os.path.join(catalog_dir, "small_sky_order1.csv")],
      9     output_artifact_name=catalog_from_importer,
     10     output_path=".",
     11     dask_tmp=tmp_path.name,
     12     overwrite=True,
     13 )

TypeError: ImportArguments.__init__() got an unexpected keyword argument 'overwrite'
[10]:
%%time
with Client(local_directory=args.dask_tmp, n_workers=1, processes=False) as client:
    pipeline_with_client(args, client)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
File <timed exec>:1

NameError: name 'args' is not defined

Load both catalogs and check that they are equivalent#

[11]:
from_dataframe_catalog = lsdb.read_hipscat(catalog_from_dataframe)
from_dataframe_catalog
[11]:
lsdb Catalog small_sky_order1-from_dataframe:
id ra dec ra_error dec_error Norder Dir Npix
npartitions=4
12682136550675316736 int64[pyarrow] double[pyarrow] double[pyarrow] int64[pyarrow] int64[pyarrow] uint8[pyarrow] uint64[pyarrow] uint64[pyarrow]
12970366926827028480 ... ... ... ... ... ... ... ...
13258597302978740224 ... ... ... ... ... ... ... ...
13546827679130451968 ... ... ... ... ... ... ... ...
18446744073709551615 ... ... ... ... ... ... ... ...
[12]:
from_importer_catalog = lsdb.read_hipscat(catalog_from_importer)
from_importer_catalog
---------------------------------------------------------------------------
FileNotFoundError                         Traceback (most recent call last)
Cell In[12], line 1
----> 1 from_importer_catalog = lsdb.read_hipscat(catalog_from_importer)
      2 from_importer_catalog

File ~/checkouts/readthedocs.org/user_builds/lsdb/envs/latest/lib/python3.10/site-packages/lsdb/loaders/hipscat/read_hipscat.py:75, in read_hipscat(path, catalog_type, search_filter, columns, margin_cache, dtype_backend, storage_options, **kwargs)
     72 config_args = {field.name: kwd_args[field.name] for field in dataclasses.fields(HipscatLoadingConfig)}
     73 config = HipscatLoadingConfig(**config_args)
---> 75 catalog_type_to_use = _get_dataset_class_from_catalog_info(path, storage_options=storage_options)
     77 if catalog_type is not None:
     78     catalog_type_to_use = catalog_type

File ~/checkouts/readthedocs.org/user_builds/lsdb/envs/latest/lib/python3.10/site-packages/lsdb/loaders/hipscat/read_hipscat.py:89, in _get_dataset_class_from_catalog_info(base_catalog_path, storage_options)
     87 base_catalog_dir = hc.io.get_file_pointer_from_path(base_catalog_path)
     88 catalog_info_path = hc.io.paths.get_catalog_info_pointer(base_catalog_dir)
---> 89 catalog_info = BaseCatalogInfo.read_from_metadata_file(catalog_info_path, storage_options=storage_options)
     90 catalog_type = catalog_info.catalog_type
     91 if catalog_type not in dataset_class_for_catalog_type:

File ~/checkouts/readthedocs.org/user_builds/lsdb/envs/latest/lib/python3.10/site-packages/hipscat/catalog/dataset/base_catalog_info.py:60, in BaseCatalogInfo.read_from_metadata_file(cls, catalog_info_file, storage_options)
     47 @classmethod
     48 def read_from_metadata_file(
     49     cls, catalog_info_file: FilePointer, storage_options: Union[Dict[Any, Any], None] = None
     50 ) -> Self:
     51     """Read catalog info from the `catalog_info.json` metadata file
     52
     53     Args:
   (...)
     58         A CatalogInfo object with the data from the `catalog_info.json` file
     59     """
---> 60     metadata_keywords = file_io.load_json_file(catalog_info_file, storage_options=storage_options)
     61     catalog_info_keywords = {}
     62     for field in dataclasses.fields(cls):

File ~/checkouts/readthedocs.org/user_builds/lsdb/envs/latest/lib/python3.10/site-packages/hipscat/io/file_io/file_io.py:114, in load_json_file(file_pointer, encoding, storage_options)
    112 json_dict = None
    113 file_system, file_pointer = get_fs(file_pointer, storage_options)
--> 114 with file_system.open(file_pointer, "r", encoding=encoding) as json_file:
    115     json_dict = json.load(json_file)
    117 return json_dict

File ~/checkouts/readthedocs.org/user_builds/lsdb/envs/latest/lib/python3.10/site-packages/fsspec/spec.py:1281, in AbstractFileSystem.open(self, path, mode, block_size, cache_options, compression, **kwargs)
   1273     mode = mode.replace("t", "") + "b"
   1275     text_kwargs = {
   1276         k: kwargs.pop(k)
   1277         for k in ["encoding", "errors", "newline"]
   1278         if k in kwargs
   1279     }
   1280     return io.TextIOWrapper(
-> 1281         self.open(
   1282             path,
   1283             mode,
   1284             block_size=block_size,
   1285             cache_options=cache_options,
   1286             compression=compression,
   1287             **kwargs,
   1288         ),
   1289         **text_kwargs,
   1290     )
   1291 else:
   1292     ac = kwargs.pop("autocommit", not self._intrans)

File ~/checkouts/readthedocs.org/user_builds/lsdb/envs/latest/lib/python3.10/site-packages/fsspec/spec.py:1293, in AbstractFileSystem.open(self, path, mode, block_size, cache_options, compression, **kwargs)
   1291 else:
   1292     ac = kwargs.pop("autocommit", not self._intrans)
-> 1293     f = self._open(
   1294         path,
   1295         mode=mode,
   1296         block_size=block_size,
   1297         autocommit=ac,
   1298         cache_options=cache_options,
   1299         **kwargs,
   1300     )
   1301     if compression is not None:
   1302         from fsspec.compression import compr

File ~/checkouts/readthedocs.org/user_builds/lsdb/envs/latest/lib/python3.10/site-packages/fsspec/implementations/local.py:197, in _open(self, path, mode, block_size, **kwargs)
    194     if truncate:
    195         os.truncate(path, 0)
--> 197 def created(self, path):
    198     info = self.info(path=path)
    199     return datetime.datetime.fromtimestamp(
    200         info["created"], tz=datetime.timezone.utc
    201     )

File ~/checkouts/readthedocs.org/user_builds/lsdb/envs/latest/lib/python3.10/site-packages/fsspec/implementations/local.py:322, in __init__(self, path, mode, autocommit, fs, compression, **kwargs)
    320     self.f = open(name, mode=self.mode)
    321 if "w" not in self.mode:
--> 322     self.size = self.f.seek(0, 2)
    323     self.f.seek(0)
    324     self.f.size = self.size

File ~/checkouts/readthedocs.org/user_builds/lsdb/envs/latest/lib/python3.10/site-packages/fsspec/implementations/local.py:327, in _open(self)
    323             self.f.seek(0)
    324             self.f.size = self.size
    326 def _fetch_range(self, start, end):
--> 327     # probably only used by cached FS
    328     if "r" not in self.mode:
    329         raise ValueError

FileNotFoundError: [Errno 2] No such file or directory: '/home/docs/checkouts/readthedocs.org/user_builds/lsdb/checkouts/latest/docs/tutorials/small_sky_order1-from_importer/catalog_info.json'
[13]:
# Verify that pixels are similar
assert from_dataframe_catalog.get_healpix_pixels() == from_importer_catalog.get_healpix_pixels()
# Verify that resulting dataframes contain the same data
pd.testing.assert_frame_equal(
    from_dataframe_catalog.compute().sort_index(),
    from_importer_catalog.compute().sort_index(),
    check_dtype=False,
)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[13], line 2
      1 # Verify that pixels are similar
----> 2 assert from_dataframe_catalog.get_healpix_pixels() == from_importer_catalog.get_healpix_pixels()
      3 # Verify that resulting dataframes contain the same data
      4 pd.testing.assert_frame_equal(
      5     from_dataframe_catalog.compute().sort_index(),
      6     from_importer_catalog.compute().sort_index(),
      7     check_dtype=False,
      8 )

NameError: name 'from_importer_catalog' is not defined
[14]:
tmp_path.cleanup()