We're excited to announce that we're launching the second installment of Dataiku Product Days Register Now

Parallelize the prepare recipe for partitioned datasets

0 Kudos

It appears that the prepare recipe currently combines all partitions in a partitioned dataset together before execution. Optimally, it would create one producer thread per partition in order to optimize execution and ensure the underlying dataset can be successfully read (some datasets are too big to be selected all at once from a database and must be queried with filters in order to successfully return results).

An example of the SQL currently generated by prepare recipes when ingesting data against a partitioned dataset:

 

WHERE ("PRODUCT_SUBTYPE" = 'SPARES DELIVERY' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'EXCEPTION MODULE' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'SPARES CUSTOM END ITEM' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'STANDARD PRODUCTION KIT' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'SUBSTITUTION KIT' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'RAW MATERIAL' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'INSTALLATION FEATURE' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'RETROFIT KIT' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'MANUFACTURING PART' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'TOOL COMPONENT' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'SUPPLIER CUSTOM PART' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'MACHINE CONTROL DATA' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'SUPPLIED SOFTWARE' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'MARKER' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'RMC' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'SPARES KIT' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'ENGINEERING DEFINED SOFTWARE' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'MODULE' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'ENGINEERING PART' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'MATCHED SET KIT' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'RAW MATERIAL ASSEMBLY' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'STANDARD PART' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = ' MARKER' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'ASAR' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'ASCT' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'SUPPLIED PART' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'RAW MATERIAL STOCK' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'WIRE BUNDLE' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'PICTURE SHEET CONTROLLED COMPONENT' AND "PRODUCT_SUBTYPE" IS NOT NULL) OR ("PRODUCT_SUBTYPE" = 'TOOL' AND "PRODUCT_SUBTYPE" IS NOT NULL)
                    
            ) "__dku_before_xxx"

 

It'd be great if, instead, one query per partition were executed. While there may be cases where the prepare recipe cannot be parallelized, if at least the data were added to the processing buffer from one query per partition for single-threaded execution of the prepare steps themselves, it'd prevent the prepare recipe from failing where datasets are too large to be queried without filtering.

I run into this problem a lot in Teradata, where user spool space is distributed among hundreds of AMPs (database nodes) and thus limited to a very small amount per AMP. As a result, if any one AMP requires more memory than my allocated spool space to return data, the query fails. Partitioning datasets is a great way to work around this, since multiple filtered queries keep the returned data for each partition small enough to fit within the memory constraints of Teradata. However, this breaks down when prepare recipes are used. Currently, my only workarounds are:

  1. splitting the data into multiple tables for each of the partition values and duplicating the same prepare recipe for each resulting dataset
  2. moving the data into another database that doesn't share Teradata's memory constraints before executing the prepare recipe
  3. remove any steps that don't work in-SQL and hope that the prepare recipe will execute successfully in-SQL - however, many steps cause the database to use even more memory, so this usually works against me and causes executions to run out of spool space even faster.

That said, if it is possible to execute the entire prepare recipe in parallel once for every partition, that'd be even better for performance on datasets with billions of records.