Unable to read parquet files from Dataiku Managed Folders on S3 through Python script

Options
suchintyachakr
suchintyachakr Registered Posts: 1

Hi team, I am using Dataiku for some time now. I am operating on a huge dataset and hence would need to save the file in parquet format and read the same through Python script for further tasks. Although I am successful in writing the file to the S3 managed folder but still not able to read that through Python script. Would really appreciate your support in this regard. Thanks a ton in advance!

Thanks!

Suchintya

Tagged:

Answers

  • Alexandru
    Alexandru Dataiker, Dataiku DSS Core Designer, Dataiku DSS ML Practitioner, Dataiku DSS Adv Designer, Registered Posts: 1,209 Dataiker
    edited July 17
    Options

    Hi @suchintyachakr
    ,

    Reading parquet dataset from S3 folder could be done by adding a dataset to the respective path in the S3 connection where you have your parquet files and then using the python API in your recipe :

    a_dataset = dataiku.Dataset("a_dataset")
    a_dataset_df = a_dataset.get_dataframe()


    Or PySpark see code sample here: https://doc.dataiku.com/dss/latest/code_recipes/pyspark.html

    Reading the files individually in python should be the last option.
    If that's the only option you can use managed folder readAPIs and install yarrow in your code environment.

    Example( this example does require you fir the dataset into memory when converting to pandas df.)

    import dataiku
    import tempfile
    import pyarrow.parquet as pq
    import pandas as pd
    import os
    
    input_folder = dataiku.Folder("wMhko2op")
    paths = input_folder.list_paths_in_partition()
    
    # create a list of parquet file paths
    parquet_paths = [path for path in paths if path.endswith('.parquet')]
    
    # get the temporary file names for the parquet files
    temp_files = []
    for path in parquet_paths:
        with input_folder.get_download_stream(path) as file_stream:
            with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.parquet') as temp:
                temp.write(file_stream.read())
                temp.flush()
                temp_files.append(temp.name)
    
    # read the parquet files as a single dataset
    dataset = pq.ParquetDataset(temp_files).read()
    
    # convert the dataset to a pandas dataframe
    df = dataset.to_pandas()
    
    
    # write the dataframe to the output dataset
    test_parquet = dataiku.Dataset("test_parquet")
    test_parquet.write_with_schema(df)
    
    # remove the temporary files
    for temp_file in temp_files:
        os.remove(temp_file)




Setup Info
    Tags
      Help me…