Importing catalogs to HiPSCat format#
This notebook presents two ways of importing catalogs to HiPSCat format. The first uses the lsdb.from_dataframe() method, which is helpful to load smaller catalogs from a single dataframe, while the second uses the hipscat import pipeline.
[1]:
import lsdb
import os
import pandas as pd
import tempfile
[2]:
catalog_name = "small_sky_order1"
[3]:
# Input paths
test_data_dir = os.path.join("../../tests", "data")
catalog_dir = os.path.join(test_data_dir, catalog_name)
catalog_csv_path = os.path.join(catalog_dir, f"{catalog_name}.csv")
[4]:
# Output paths
catalog_from_dataframe = f"{catalog_name}-from_dataframe"
catalog_from_importer = f"{catalog_name}-from_importer"
tmp_path = tempfile.TemporaryDirectory()
Using lsdb.from_dataframe()#
[5]:
%%time
# Read simple catalog from its CSV file
catalog = lsdb.from_dataframe(
pd.read_csv(catalog_csv_path),
catalog_name=catalog_from_dataframe,
catalog_type="object",
highest_order=5,
threshold=100,
)
# Save it to disk in HiPSCat format
catalog.to_hipscat(catalog_from_dataframe)
CPU times: user 748 ms, sys: 24.4 ms, total: 773 ms
Wall time: 765 ms
Using the import pipeline#
[6]:
# Install hipscat-import
!pip install hipscat-import --quiet
[7]:
from dask.distributed import Client
from hipscat_import.catalog.arguments import ImportArguments
from hipscat_import.pipeline import pipeline_with_client
[8]:
# Create directory if it does not yet exist
os.makedirs(catalog_from_importer, exist_ok=True)
[9]:
args = ImportArguments(
sort_columns="id",
ra_column="ra",
dec_column="dec",
highest_healpix_order=5,
pixel_threshold=100,
file_reader="csv",
input_file_list=[os.path.join(catalog_dir, "small_sky_order1.csv")],
output_artifact_name=catalog_from_importer,
output_path=".",
dask_tmp=tmp_path.name,
overwrite=True,
)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Cell In[9], line 1
----> 1 args = ImportArguments(
2 sort_columns="id",
3 ra_column="ra",
4 dec_column="dec",
5 highest_healpix_order=5,
6 pixel_threshold=100,
7 file_reader="csv",
8 input_file_list=[os.path.join(catalog_dir, "small_sky_order1.csv")],
9 output_artifact_name=catalog_from_importer,
10 output_path=".",
11 dask_tmp=tmp_path.name,
12 overwrite=True,
13 )
TypeError: ImportArguments.__init__() got an unexpected keyword argument 'overwrite'
[10]:
%%time
with Client(local_directory=args.dask_tmp, n_workers=1, processes=False) as client:
pipeline_with_client(args, client)
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
File <timed exec>:1
NameError: name 'args' is not defined
Load both catalogs and check that they are equivalent#
[11]:
from_dataframe_catalog = lsdb.read_hipscat(catalog_from_dataframe)
from_dataframe_catalog
[11]:
lsdb Catalog small_sky_order1-from_dataframe:
id | ra | dec | ra_error | dec_error | Norder | Dir | Npix | |
---|---|---|---|---|---|---|---|---|
npartitions=4 | ||||||||
12682136550675316736 | int64[pyarrow] | double[pyarrow] | double[pyarrow] | int64[pyarrow] | int64[pyarrow] | uint8[pyarrow] | uint64[pyarrow] | uint64[pyarrow] |
12970366926827028480 | ... | ... | ... | ... | ... | ... | ... | ... |
13258597302978740224 | ... | ... | ... | ... | ... | ... | ... | ... |
13546827679130451968 | ... | ... | ... | ... | ... | ... | ... | ... |
18446744073709551615 | ... | ... | ... | ... | ... | ... | ... | ... |
[12]:
from_importer_catalog = lsdb.read_hipscat(catalog_from_importer)
from_importer_catalog
---------------------------------------------------------------------------
FileNotFoundError Traceback (most recent call last)
Cell In[12], line 1
----> 1 from_importer_catalog = lsdb.read_hipscat(catalog_from_importer)
2 from_importer_catalog
File ~/checkouts/readthedocs.org/user_builds/lsdb/envs/latest/lib/python3.10/site-packages/lsdb/loaders/hipscat/read_hipscat.py:75, in read_hipscat(path, catalog_type, search_filter, columns, margin_cache, dtype_backend, storage_options, **kwargs)
72 config_args = {field.name: kwd_args[field.name] for field in dataclasses.fields(HipscatLoadingConfig)}
73 config = HipscatLoadingConfig(**config_args)
---> 75 catalog_type_to_use = _get_dataset_class_from_catalog_info(path, storage_options=storage_options)
77 if catalog_type is not None:
78 catalog_type_to_use = catalog_type
File ~/checkouts/readthedocs.org/user_builds/lsdb/envs/latest/lib/python3.10/site-packages/lsdb/loaders/hipscat/read_hipscat.py:89, in _get_dataset_class_from_catalog_info(base_catalog_path, storage_options)
87 base_catalog_dir = hc.io.get_file_pointer_from_path(base_catalog_path)
88 catalog_info_path = hc.io.paths.get_catalog_info_pointer(base_catalog_dir)
---> 89 catalog_info = BaseCatalogInfo.read_from_metadata_file(catalog_info_path, storage_options=storage_options)
90 catalog_type = catalog_info.catalog_type
91 if catalog_type not in dataset_class_for_catalog_type:
File ~/checkouts/readthedocs.org/user_builds/lsdb/envs/latest/lib/python3.10/site-packages/hipscat/catalog/dataset/base_catalog_info.py:60, in BaseCatalogInfo.read_from_metadata_file(cls, catalog_info_file, storage_options)
47 @classmethod
48 def read_from_metadata_file(
49 cls, catalog_info_file: FilePointer, storage_options: Union[Dict[Any, Any], None] = None
50 ) -> Self:
51 """Read catalog info from the `catalog_info.json` metadata file
52
53 Args:
(...)
58 A CatalogInfo object with the data from the `catalog_info.json` file
59 """
---> 60 metadata_keywords = file_io.load_json_file(catalog_info_file, storage_options=storage_options)
61 catalog_info_keywords = {}
62 for field in dataclasses.fields(cls):
File ~/checkouts/readthedocs.org/user_builds/lsdb/envs/latest/lib/python3.10/site-packages/hipscat/io/file_io/file_io.py:114, in load_json_file(file_pointer, encoding, storage_options)
112 json_dict = None
113 file_system, file_pointer = get_fs(file_pointer, storage_options)
--> 114 with file_system.open(file_pointer, "r", encoding=encoding) as json_file:
115 json_dict = json.load(json_file)
117 return json_dict
File ~/checkouts/readthedocs.org/user_builds/lsdb/envs/latest/lib/python3.10/site-packages/fsspec/spec.py:1281, in AbstractFileSystem.open(self, path, mode, block_size, cache_options, compression, **kwargs)
1273 mode = mode.replace("t", "") + "b"
1275 text_kwargs = {
1276 k: kwargs.pop(k)
1277 for k in ["encoding", "errors", "newline"]
1278 if k in kwargs
1279 }
1280 return io.TextIOWrapper(
-> 1281 self.open(
1282 path,
1283 mode,
1284 block_size=block_size,
1285 cache_options=cache_options,
1286 compression=compression,
1287 **kwargs,
1288 ),
1289 **text_kwargs,
1290 )
1291 else:
1292 ac = kwargs.pop("autocommit", not self._intrans)
File ~/checkouts/readthedocs.org/user_builds/lsdb/envs/latest/lib/python3.10/site-packages/fsspec/spec.py:1293, in AbstractFileSystem.open(self, path, mode, block_size, cache_options, compression, **kwargs)
1291 else:
1292 ac = kwargs.pop("autocommit", not self._intrans)
-> 1293 f = self._open(
1294 path,
1295 mode=mode,
1296 block_size=block_size,
1297 autocommit=ac,
1298 cache_options=cache_options,
1299 **kwargs,
1300 )
1301 if compression is not None:
1302 from fsspec.compression import compr
File ~/checkouts/readthedocs.org/user_builds/lsdb/envs/latest/lib/python3.10/site-packages/fsspec/implementations/local.py:197, in _open(self, path, mode, block_size, **kwargs)
194 if truncate:
195 os.truncate(path, 0)
--> 197 def created(self, path):
198 info = self.info(path=path)
199 return datetime.datetime.fromtimestamp(
200 info["created"], tz=datetime.timezone.utc
201 )
File ~/checkouts/readthedocs.org/user_builds/lsdb/envs/latest/lib/python3.10/site-packages/fsspec/implementations/local.py:322, in __init__(self, path, mode, autocommit, fs, compression, **kwargs)
320 self.f = open(name, mode=self.mode)
321 if "w" not in self.mode:
--> 322 self.size = self.f.seek(0, 2)
323 self.f.seek(0)
324 self.f.size = self.size
File ~/checkouts/readthedocs.org/user_builds/lsdb/envs/latest/lib/python3.10/site-packages/fsspec/implementations/local.py:327, in _open(self)
323 self.f.seek(0)
324 self.f.size = self.size
326 def _fetch_range(self, start, end):
--> 327 # probably only used by cached FS
328 if "r" not in self.mode:
329 raise ValueError
FileNotFoundError: [Errno 2] No such file or directory: '/home/docs/checkouts/readthedocs.org/user_builds/lsdb/checkouts/latest/docs/tutorials/small_sky_order1-from_importer/catalog_info.json'
[13]:
# Verify that pixels are similar
assert from_dataframe_catalog.get_healpix_pixels() == from_importer_catalog.get_healpix_pixels()
# Verify that resulting dataframes contain the same data
pd.testing.assert_frame_equal(
from_dataframe_catalog.compute().sort_index(),
from_importer_catalog.compute().sort_index(),
check_dtype=False,
)
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[13], line 2
1 # Verify that pixels are similar
----> 2 assert from_dataframe_catalog.get_healpix_pixels() == from_importer_catalog.get_healpix_pixels()
3 # Verify that resulting dataframes contain the same data
4 pd.testing.assert_frame_equal(
5 from_dataframe_catalog.compute().sort_index(),
6 from_importer_catalog.compute().sort_index(),
7 check_dtype=False,
8 )
NameError: name 'from_importer_catalog' is not defined
[14]:
tmp_path.cleanup()