Unable to read parquet files from Dataiku Managed Folders on S3 through Python script

suchintyachakr
Level 1
Unable to read parquet files from Dataiku Managed Folders on S3 through Python script

Hi team, I am using Dataiku for some time now. I am operating on a huge dataset and hence would need to save the file in parquet format and read the same through Python script for further tasks. Although I am successful in writing the file to the S3 managed folder but still not able to read that through Python script. Would really appreciate your support in this regard. Thanks a ton in advance!

Thanks!

Suchintya

0 Kudos
1 Reply
AlexT
Dataiker

Hi @suchintyachakr ,

Reading parquet dataset from S3 folder could be done by adding a dataset to the respective path in the S3 connection where you have your parquet files and then using the python API in your recipe :

 

a_dataset = dataiku.Dataset("a_dataset")
a_dataset_df = a_dataset.get_dataframe()

 


Or PySpark see code sample here: https://doc.dataiku.com/dss/latest/code_recipes/pyspark.html 

Reading the files individually in python should be the last option.
If that's the only option you can use managed folder readAPIs and install yarrow in your code environment. 

Example( this example does require you fir the dataset into memory when converting to pandas df.)

import dataiku
import tempfile
import pyarrow.parquet as pq
import pandas as pd
import os

input_folder = dataiku.Folder("wMhko2op")
paths = input_folder.list_paths_in_partition()

# create a list of parquet file paths
parquet_paths = [path for path in paths if path.endswith('.parquet')]

# get the temporary file names for the parquet files
temp_files = []
for path in parquet_paths:
    with input_folder.get_download_stream(path) as file_stream:
        with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.parquet') as temp:
            temp.write(file_stream.read())
            temp.flush()
            temp_files.append(temp.name)

# read the parquet files as a single dataset
dataset = pq.ParquetDataset(temp_files).read()

# convert the dataset to a pandas dataframe
df = dataset.to_pandas()


# write the dataframe to the output dataset
test_parquet = dataiku.Dataset("test_parquet")
test_parquet.write_with_schema(df)

# remove the temporary files
for temp_file in temp_files:
    os.remove(temp_file)




0 Kudos