#!/usr/bin/env python
######################################
# Imports
######################################
import hydra
import networkx as nx
from omegaconf import DictConfig
from os.path import join as join_path
import pandas as pd
from pathlib import Path
######################################
# Functions
######################################
[docs]
def process_network(
feature_matrix: pd.DataFrame,
edge_list: pd.DataFrame,
from_col: str,
to_col: str,
len_component: int = 5,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Construct a graph from edge list data.
Args:
feature_matrix (pd.DataFrame):
The feature matrix.
edge_list (pd.DataFrame):
The edge list.
from_col (str):
The "from" column name.
to_col (str):
The "to" column name.
len_component (int, optional):
The minimum size of a subgraph to filter out. Defaults to 5.
Returns:
tuple[pd.DataFrame, pd.DataFrame]:
The processed graph as a feature matrix and edge list.
"""
edges = edge_list.sort_values(from_col)
G = nx.from_pandas_edgelist(edges, from_col, to_col, create_using=nx.Graph())
for component in list(nx.connected_components(G)):
if len(component) <= len_component:
for node in component:
G.remove_node(node)
nodes = list(G.nodes)
filtered_feature_matrix = feature_matrix[nodes]
filtered_edge_list = nx.to_pandas_edgelist(G, source=from_col, target=to_col)
return filtered_feature_matrix, filtered_edge_list
[docs]
def log_results(
tracking_uri: str,
experiment_prefix: str,
grn_name: str,
feature_matrix: pd.DataFrame,
edge_list: pd.DataFrame,
) -> None:
"""
Log experiment results to the experiment tracker.
Args:
tracking_uri (str):
The tracking URI.
experiment_prefix (str):
The experiment name prefix.
grn_name (str):
The name of the GRN.
feature_matrix (pd.DataFrame):
The feature matrix.
edge_list (pd.DataFrame):
The edge list.
"""
import mlflow
mlflow.set_tracking_uri(tracking_uri)
experiment_name = f"{experiment_prefix}_process"
existing_exp = mlflow.get_experiment_by_name(experiment_name)
if not existing_exp:
mlflow.create_experiment(experiment_name)
mlflow.set_experiment(experiment_name)
mlflow.set_tag("grn", grn_name)
mlflow.log_param("grn", grn_name)
mlflow.log_metric("num_features", len(feature_matrix.index))
mlflow.log_metric("num_nodes", len(feature_matrix.columns))
mlflow.log_metric("num_edges", len(edge_list.index))
mlflow.end_run()
######################################
# Main
######################################
[docs]
@hydra.main(version_base=None, config_path="../conf", config_name="config")
def main(config: DictConfig) -> None:
"""
The main entry point for the plotting pipeline.
Args:
config (DictConfig):
The pipeline configuration.
"""
# Constants
EXPERIMENT_PREFIX = config["experiment"]["name"]
DATA_DIR = config["dir"]["data_dir"]
PREPROCESS_DIR = config["dir"]["preprocessed_dir"]
PROCESS_DIR = config["dir"]["processed_dir"]
OUT_DIR = config["dir"]["out_dir"]
GRN_NAME = config["grn"]["input_dir"]
FEATURE_MATRIX_FILE = config["grn"]["feature_matrix"]
EDGE_LIST_FILE = config["grn"]["edge_list"]
FROM_COL = config["grn"]["from_col"]
TO_COL = config["grn"]["to_col"]
TRACKING_URI = config["experiment_tracking"]["tracking_uri"]
ENABLE_TRACKING = config["experiment_tracking"]["enabled"]
input_dir = join_path(DATA_DIR, PREPROCESS_DIR, GRN_NAME)
feature_matrix = pd.read_csv(join_path(input_dir, FEATURE_MATRIX_FILE))
edge_list = pd.read_csv(join_path(input_dir, EDGE_LIST_FILE))
filtered_feature_matrix, filtered_edge_list = process_network(
feature_matrix, edge_list, FROM_COL, TO_COL
)
output_dir = join_path(DATA_DIR, OUT_DIR, GRN_NAME, PROCESS_DIR)
Path(output_dir).mkdir(parents=True, exist_ok=True)
filtered_feature_matrix.to_csv(join_path(output_dir, FEATURE_MATRIX_FILE))
filtered_edge_list.to_csv(join_path(output_dir, EDGE_LIST_FILE), index=False)
if ENABLE_TRACKING:
log_results(
TRACKING_URI,
EXPERIMENT_PREFIX,
GRN_NAME,
filtered_feature_matrix,
filtered_edge_list,
)
if __name__ == "__main__":
main()