dataframe_schema_sync package¶
Submodules¶
dataframe_schema_sync.schema_inference module¶
- class dataframe_schema_sync.schema_inference.SchemaConversionResult(df: DataFrame, schema_map: dict[str, Any], renamed_columns_mapping: dict[str, str])[source]¶
Bases:
objectClass to hold the results of a DataFrame schema conversion.
- class dataframe_schema_sync.schema_inference.SchemaInference[source]¶
Bases:
objectA class for inferring SQLAlchemy types from Pandas DataFrame columns and handling schema I/O operations.
- SQLALCHEMY_TYPE_MAP: ClassVar[dict[str, Any]] = {'BOOLEAN': Boolean(), 'DATETIME': DateTime(timezone=True), 'FLOAT': Float(), 'INTEGER': Integer(), 'JSON': <class 'sqlalchemy.dialects.postgresql.json.JSON'>, 'TEXT': Text(), 'TIMESTAMP(timezone=True)': DateTime(timezone=True)}¶
- static clean_dataframe_names(df: DataFrame, case: str = 'snake', truncate_limit: int = 55) DataFrame[source]¶
Clean the column names and index names of a DataFrame using pyjanitors clean_names method. Works with both regular Index and MultiIndex.
- Args:
df (pd.DataFrame): The DataFrame whose column and index names should be cleaned. case (str): The naming convention to apply. Default is “snake”. truncate_limit (int): The maximum length of the column names. Default is 55.
- Returns:
pd.DataFrame: A new DataFrame with cleaned column and index names.
- Raises:
ImportError: If pyjanitors is not installed.
- static convert_dataframe(df: DataFrame, infer_dates: bool = True, date_columns: str | list[str] | None = None, case: str = 'snake', truncate_limit: int = 55) SchemaConversionResult[source]¶
Infer the SQLAlchemy type for each column, convert the DataFrame accordingly, and return a result object with the converted DataFrame, column mappings, and original-to-new column name mappings.
- Args:
df (pd.DataFrame): The input DataFrame. infer_dates (bool): If True, attempts to infer datetime columns. date_columns (str or list of str): Columns that should always be parsed as dates. case (str): The naming convention to apply. Default is “snake”. truncate_limit (int): The maximum length of the column names. Default is 55.
- Returns:
SchemaConversionResult: Object containing the DataFrame, dtype map, and column name mapping
- static detect_and_convert_datetime(series: Series) tuple[Series, bool][source]¶
Detects datetime format in a Pandas Series and converts it to datetime64[ns, UTC]. Supports:
ISO 8601 formats (YYYY-MM-DDTHH:MM:SS.sssZ)
RFC 2822 (email/HTTP format)
Standard datetime format (YYYY-MM-DD HH:MM:SS.sss)
Returns the converted series and a boolean indicating success.
- static infer_sqlalchemy_type(series: Series, infer_dates: bool = True, date_columns: str | list[str] | None = None) tuple[Any, Series][source]¶
Given a pandas Series, determine the best matching SQLAlchemy type.
- Args:
series (pd.Series): The column to analyze. infer_dates (bool): If True, attempts to infer datetime columns. date_columns (str or list of str): Specifies columns that should always be parsed as dates.
- Returns:
tuple: (SQLAlchemy type, transformed Pandas Series)
- static load_config_from_yaml(filename: str | Path, schema_name: str, mapping_key: str) dict[str, Any][source]¶
Load config from a YAML file and convert stored text strings back into SQLAlchemy types. The method looks for the schema under the provided schema_name key and within its dynamic mapping_key.
- Args:
filename (str or Path): Path to the YAML file. schema_name (str): The parent key under which the schema is stored. mapping_key (str): The key under which the schema items are stored.
- Returns:
dict: Dictionary mapping column names to SQLAlchemy types.
- Raises:
KeyError: If the provided schema_name or the mapping_key is not found in the YAML file.
- static load_schema_from_yaml(filename: str | Path, schema_name: str, mapping_key: str) dict[str, Any][source]¶
Load schema from a YAML file and convert stored text strings back into SQLAlchemy types. The method looks for the schema under the provided schema_name key and within its dynamic mapping_key.
- Args:
filename (str or Path): Path to the YAML file. schema_name (str): The parent key under which the schema is stored. mapping_key (str): The key under which the schema items are stored.
- Returns:
dict: Dictionary mapping column names to SQLAlchemy types.
- Raises:
KeyError: If the provided schema_name or the mapping_key is not found in the YAML file.
- static safe_json_conversion(x: Any) list[Any][source]¶
Convert a value to a JSON object if it’s a string. If the value is missing (NaN), empty, or cannot be converted, return an empty array.
- Args:
x (Any): The value to convert.
- Returns:
list: The parsed JSON value or an empty list.
- static safe_str_conversion(x: Any) str[source]¶
Convert a value to string, returning an empty string if the value is NaN.
- Args:
x (Any): The value to convert.
- Returns:
str: The string representation or an empty string if x is NaN.
- static save_schema_to_yaml(dtype_map: dict[str, Any], filename: str | Path, schema_name: str, mapping_key: str, sync_method: str) None[source]¶
Save dtype_map to a YAML file, storing SQLAlchemy types as text strings. The YAML content will be nested under the provided schema_name key and further under the given mapping_key. The sync_method parameter determines how the new schema is saved:
“update”: Only new columns (not already present under mapping_key) are added.
“overwrite”: The mapping for mapping_key is completely replaced by the new schema.
- Args:
dtype_map (dict): Dictionary mapping column names to SQLAlchemy types. filename (str or Path): Path to the YAML file. schema_name (str): The parent key to use in the YAML file. mapping_key (str): The key under which the schema items are stored. sync_method (str): Must be either “update” or “overwrite”.
- Raises:
ValueError: If sync_method is not one of the allowed values.