Sign up to take part
Registered users can ask their own questions, contribute to discussions, and be part of the Community!
Registered users can ask their own questions, contribute to discussions, and be part of the Community!
Goal is to query the Redshift DB for table names and return a dropdown for users in a plugin.
Problem: Where to store the interim table from SQL?
query = "SELECT * FROM PG_TABLES"
db_tables = dataiku.Dataset('db_tables')
SQLExecutor2.exec_recipe_fragment(db_tables, query)
TypeError: 'NoneType' object is not subscriptable
It seems the Dataset object cannot be created this way? Is there a workaround? Aside from using an empty dataset as an input to the plugin?
Here's an example of how to create a dataset programatically. In this case a text dataset (I can share code for SQL if interested, it's similar but of course not exactly the same). I extracted this from a larger process so may not have gotten all the needed pieces but nonetheless should be a place to start. In particular, you'll need to populate the variable to pass the set_schema method. You can do a get_schema on an existing dataset to see the format.
Marlan
import dataiku
import dataikuapi
client = dataiku.api_client()
project = client.get_project(dataiku.default_project_key())
project_variables = dataiku.get_custom_variables()
csv_dataset_name = 'NEW_DATASET_NAME'
# Create dataset if it doesn't already exist
try:
# If dataset exists, clear it
csv_dataset = project.get_dataset(csv_dataset_name) # doesn't generate error if dataset doesn't exist
csv_dataset.clear()
except:
# Create dataset (assuming exception was that dataset does not exist)
params = {'connection': 'filesystem_folders', 'path': project_variables['projectKey'] + '/' + csv_dataset_name}
format_params = {'separator': '\t', 'style': 'unix', 'compress': ''}
csv_dataset = project.create_dataset(csv_dataset_name, type='Filesystem', params=params,
formatType='csv', formatParams=format_params)
# Set dataset to managed
ds_def = csv_dataset.get_definition()
ds_def['managed'] = True
csv_dataset.set_definition(ds_def)
# Set schema
csv_dataset.set_schema({'columns': csv_dku_schema_columns})
# If you want to delete it later...
csv_dataset.clear() # removes folder and file
csv_dataset.delete()
Can you please share the code for SQL(I am using snowflake connection). I get error when i try to write to the dataset using dataiku.Dataset(dataset_name,ignore_flow=True).write_with_schema(df)
I have created the dataset in the same python recipe in prev lines using
builder = project.new_managed_dataset_creation_helper(dataset_name)
builder.with_store_into("snowflakeconnectionname")
builder.create()
the line dataiku.Dataset(dataset_name,ignore_flow=True).write_with_schema(df) throws error None: dataset does not exist
Hi @isha,
The approach I used was lower level as that was needed for the version of DSS I was using when I did this. Your higher level approach follows the current documentation so it certainly seems like it should work.
Can you apply other methods to the dataset returned from dataiku.Dataset(dataset_name, ignore_flow=True)? I would assume not as the error is dataset does not exist.
I did share my approach below but as noted it uses a different and lower level approach that wouldn't help you figure out why what you are doing doesn't work.
dataset_name = 'PROGRAM_CREATED_DATASET'
try:
dataset_handle = project.get_dataset(dataset_name) # doesn't generate error if dataset doesn't exist
dataset_handle.clear() # this however does generate an error if dataset doesn't exist
except:
# Create dataset (assuming exception was that dataset does not exist)
params = {'connection': 'NETEZZA_CONNECTION_NAME', 'mode': 'table', 'table': 'SQL_TABLE_NAME}
dataset_handle = project.create_dataset(dataset_name, type='Netezza', params=params)
# Set dataset to managed assuming with be the output of a recipe
ds_def = dataset_handle.get_definition()
ds_def['managed'] = True
dataset_handle.set_definition(ds_def)
# Set schema (this may not be needed as recipe execution should populate the schema)
dataset_handle.set_schema({'columns': schema_columns}) # check get_schema() of existing dataset to see structure to use
Marlan
please tell me how to create a dataset using python
I think this is a useful example of how to create datasets dynamically by Python code.
However, I see now method how to write data from a Pandas dataset to the created Dataiku dataset?
I checked the dataikuapi reference, but could not find any applicable method.
Would be great if the example above could be extended to explain how to do realize it.
The example in the documentation shows following code:
project = client.get_project('TEST_PROJECT') folder_path = 'path/to/folder/' for file in listdir(folder_path😞 if not file.endswith('.csv'😞 continue dataset = project.create_dataset(file[:-4] # dot is not allowed in dataset names ,'Filesystem' , params={ 'connection': 'filesystem_root' ,'path': folder_path + file }, formatType='csv' , formatParams={ 'separator': ',' ,'style': 'excel' # excel-style quoting ,'parseHeaderRow': True }) df = pandas.read_csv(folder_path + file) dataset.set_schema({'columns': [{'name': column, 'type':'string'} for column in df.columns]}
But unfortunately, the example doesn't actually show how to write the Pandas df .
Thanks in advance!
Hi @berndito,
Check https://doc.dataiku.com/dss/latest/python-api/datasets.html for documentation on writing dataframes. See the method write_dataframe in the Dataset class.
Marlan
The error is self-explanatory. You are trying to index None. You can not, because 'NoneType' object is not subscriptable. This means that you tried to do:
None[something]
In general, the error means that you attempted to index an object that doesn't have that functionality. You might have noticed that the method sort() that only modify the list have no return value printed – they return the default None. This is a design principle for all mutable data structures in Python.
Here is a list of operation in python code
import dataiku
import dataikuapi
def load_database_whole(database_name):
dataset = dataiku.Dataset(database_name)
df = dataset.get_dataframe()
return df
def load_database_sample(database_name):
dataset = dataiku.Dataset(database_name)
df = dataset.get_dataframe(limit=5)
return df
def save_database(df, database_name):
client = dataiku.api_client()
project = client.get_project(dataiku.default_project_key())
project_variables = dataiku.get_custom_variables()
# Create dataset
params = {'connection': 'filesystem_folders', 'path': project_variables['projectKey'] + '/' + database_name}
format_params = {'separator': '\t', 'style': 'unix', 'compress': ''}
csv_dataset = project.create_dataset(database_name, type='Filesystem', params=params, formatType='csv', formatParams=format_params)
# Set dataset to managed
ds_def = csv_dataset.get_definition()
ds_def['managed'] = True
csv_dataset.set_definition(ds_def)
dataset = dataiku.Dataset(database_name)
dataset.write_with_schema(df)
def get_list_of_available_databases():
return dataiku.Dataset.list()