Unable to read parquet files from Dataiku Managed Folders on S3 through Python script
Hi team, I am using Dataiku for some time now. I am operating on a huge dataset and hence would need to save the file in parquet format and read the same through Python script for further tasks. Although I am successful in writing the file to the S3 managed folder but still not able to read that through Python script. Would really appreciate your support in this regard. Thanks a ton in advance!
Thanks!
Suchintya
Answers
-
Alexandru Dataiker, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 1,212 Dataiker
Hi @suchintyachakr
,Reading parquet dataset from S3 folder could be done by adding a dataset to the respective path in the S3 connection where you have your parquet files and then using the python API in your recipe :
a_dataset = dataiku.Dataset("a_dataset") a_dataset_df = a_dataset.get_dataframe()
Or PySpark see code sample here: https://doc.dataiku.com/dss/latest/code_recipes/pyspark.html
Reading the files individually in python should be the last option.
If that's the only option you can use managed folder readAPIs and install yarrow in your code environment.
Example( this example does require you fir the dataset into memory when converting to pandas df.)import dataiku import tempfile import pyarrow.parquet as pq import pandas as pd import os input_folder = dataiku.Folder("wMhko2op") paths = input_folder.list_paths_in_partition() # create a list of parquet file paths parquet_paths = [path for path in paths if path.endswith('.parquet')] # get the temporary file names for the parquet files temp_files = [] for path in parquet_paths: with input_folder.get_download_stream(path) as file_stream: with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.parquet') as temp: temp.write(file_stream.read()) temp.flush() temp_files.append(temp.name) # read the parquet files as a single dataset dataset = pq.ParquetDataset(temp_files).read() # convert the dataset to a pandas dataframe df = dataset.to_pandas() # write the dataframe to the output dataset test_parquet = dataiku.Dataset("test_parquet") test_parquet.write_with_schema(df) # remove the temporary files for temp_file in temp_files: os.remove(temp_file)