471 lines
15 KiB
Python
471 lines
15 KiB
Python
import json
|
||
import pandas as pd
|
||
from pathlib import Path
|
||
from collections import defaultdict
|
||
|
||
|
||
# config
|
||
|
||
INPUT_JSON = "hetionet-v1.0.json"
|
||
OUTPUT_DIR = Path("neo4j_csv")
|
||
OUTPUT_DIR.mkdir(exist_ok=True)
|
||
|
||
print("="*60)
|
||
print("HETIONET ETL PIPELINE (OPTIMIZED + SPLIT EDGES)")
|
||
print("="*60)
|
||
|
||
# extract
|
||
|
||
print("\nPHASE 1: EXTRACTION")
|
||
print("-"*60)
|
||
print("Loading JSON data...")
|
||
|
||
with open(INPUT_JSON, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
|
||
nodes_raw = data["nodes"]
|
||
edges_raw = data["edges"]
|
||
|
||
print(f"Nodes loaded: {len(nodes_raw):,}")
|
||
print(f"Edges loaded: {len(edges_raw):,}")
|
||
|
||
# transfomr – nodes
|
||
|
||
print("\nPHASE 2: TRANSFORM NODES")
|
||
print("-"*60)
|
||
|
||
nodes_flat = []
|
||
for node in nodes_raw:
|
||
row = {
|
||
"id": str(node["identifier"]),
|
||
"name": node.get("name"),
|
||
"kind": node["kind"]
|
||
}
|
||
if "data" in node and isinstance(node["data"], dict):
|
||
for key, value in node["data"].items():
|
||
row[key] = value
|
||
nodes_flat.append(row)
|
||
|
||
nodes_df = pd.DataFrame(nodes_flat)
|
||
|
||
# make column names neo4j safe
|
||
nodes_df.columns = (
|
||
nodes_df.columns
|
||
.str.replace(" ", "_")
|
||
.str.replace("-", "_")
|
||
.str.replace(".", "_")
|
||
)
|
||
|
||
print(f"Processed {len(nodes_df):,} nodes")
|
||
print(f" Columns: {', '.join(nodes_df.columns[:5])}...")
|
||
|
||
# create lookup dictionaries
|
||
node_id_to_kind = dict(zip(nodes_df['id'], nodes_df['kind']))
|
||
node_id_to_name = dict(zip(nodes_df['id'], nodes_df['name']))
|
||
|
||
# create sets for fast membership testing
|
||
gene_ids = set(nodes_df[nodes_df['kind'] == 'Gene']['id'])
|
||
disease_ids = set(nodes_df[nodes_df['kind'] == 'Disease']['id'])
|
||
symptom_ids = set(nodes_df[nodes_df['kind'] == 'Symptom']['id'])
|
||
compound_ids = set(nodes_df[nodes_df['kind'] == 'Compound']['id'])
|
||
sideeffect_ids = set(nodes_df[nodes_df['kind'] == 'Side Effect']['id'])
|
||
|
||
print(f"\nNode Statistics:")
|
||
print(f" - Genes: {len(gene_ids):,}")
|
||
print(f" - Diseases: {len(disease_ids):,}")
|
||
print(f" - Symptoms: {len(symptom_ids):,}")
|
||
print(f" - Compounds: {len(compound_ids):,}")
|
||
print(f" - Side Effects: {len(sideeffect_ids):,}")
|
||
|
||
# export nodes by type
|
||
print("\nExporting node files...")
|
||
for kind in nodes_df["kind"].unique():
|
||
df_kind = (
|
||
nodes_df[nodes_df["kind"] == kind]
|
||
.drop(columns=["kind"])
|
||
.drop_duplicates(subset=["id"])
|
||
)
|
||
filename = OUTPUT_DIR / f"nodes_{kind.replace(' ', '_')}.csv"
|
||
df_kind.to_csv(filename, index=False)
|
||
print(f" {filename.name} ({len(df_kind):,} rows)")
|
||
|
||
# transform edges
|
||
|
||
print("\nPHASE 3: TRANSFORM EDGES")
|
||
print("-"*60)
|
||
|
||
edges = []
|
||
for i, edge in enumerate(edges_raw):
|
||
if i % 500000 == 0 and i > 0:
|
||
print(f" Processing: {i:,} / {len(edges_raw):,}...")
|
||
|
||
edges.append({
|
||
"source": str(edge["source_id"][1]),
|
||
"target": str(edge["target_id"][1]),
|
||
"type": edge["kind"]
|
||
})
|
||
|
||
edges_df = pd.DataFrame(edges)
|
||
|
||
# make relationship types neo4j safe
|
||
edges_df["type"] = edges_df["type"].str.replace(" ", "_").str.replace("-", "_")
|
||
|
||
# split edges into seperate files
|
||
|
||
print("\nExporting edges by type to separate CSV files...")
|
||
print("-"*60)
|
||
|
||
edge_types = edges_df['type'].unique()
|
||
for edge_type in sorted(edge_types):
|
||
edges_subset = edges_df[edges_df['type'] == edge_type]
|
||
filename = OUTPUT_DIR / f"edges_{edge_type}.csv"
|
||
|
||
# only export source and target (type is in filename)
|
||
edges_subset[['source', 'target']].to_csv(filename, index=False)
|
||
|
||
size_mb = filename.stat().st_size / (1024*1024)
|
||
print(f" ✓ edges_{edge_type:20s}.csv ({len(edges_subset):>10,} rows, {size_mb:>6.2f} MB)")
|
||
|
||
# also keep the combined file for backward compatibility
|
||
edges_file = OUTPUT_DIR / "edges_all.csv"
|
||
edges_df.to_csv(edges_file, index=False)
|
||
print(f"\n ✓ edges_all.csv (combined) ({len(edges_df):,} rows)")
|
||
|
||
print(f"\nSummary:")
|
||
print(f" Total edges: {len(edges_df):,}")
|
||
print(f" Split into {len(edge_types)} separate CSV files")
|
||
print(f" Each file can be loaded independently!")
|
||
|
||
# pre-filter edges by type for analysis
|
||
print("\nEdge type distribution:")
|
||
edges_by_type = {}
|
||
for edge_type in sorted(edge_types):
|
||
edges_by_type[edge_type] = edges_df[edges_df['type'] == edge_type].copy()
|
||
count = len(edges_by_type[edge_type])
|
||
pct = 100 * count / len(edges_df)
|
||
print(f" - {edge_type:20s}: {count:>10,} ({pct:>5.1f}%)")
|
||
|
||
|
||
print("\n" + "="*60)
|
||
print("PHASE 4: ANALYSES")
|
||
print("="*60)
|
||
|
||
# analysis 1: hotspot genes
|
||
print("\nAnalysis 1: Hotspot Genes")
|
||
print("-"*60)
|
||
|
||
gene_disease_edges = pd.concat([
|
||
edges_by_type.get('associates', pd.DataFrame()),
|
||
edges_by_type.get('regulates', pd.DataFrame()),
|
||
edges_by_type.get('upregulates', pd.DataFrame()),
|
||
edges_by_type.get('downregulates', pd.DataFrame()),
|
||
edges_by_type.get('binds', pd.DataFrame())
|
||
])
|
||
|
||
gene_disease_edges = gene_disease_edges[
|
||
gene_disease_edges['source'].isin(disease_ids) &
|
||
gene_disease_edges['target'].isin(gene_ids)
|
||
]
|
||
|
||
gene_counts = gene_disease_edges.groupby('target').size().reset_index(name='num_diseases')
|
||
|
||
genes_df = nodes_df[nodes_df['kind']=='Gene'].merge(
|
||
gene_counts, left_on='id', right_on='target', how='left'
|
||
)
|
||
genes_df['num_diseases'] = genes_df['num_diseases'].fillna(0)
|
||
if 'target' in genes_df.columns:
|
||
genes_df.drop(columns=['target'], inplace=True)
|
||
|
||
genes_df_sorted = genes_df.sort_values('num_diseases', ascending=False)
|
||
genes_df_sorted.to_csv(OUTPUT_DIR / "nodes_Gene.csv", index=False)
|
||
|
||
print(f"Top gene: {genes_df_sorted.iloc[0]['name']} ({int(genes_df_sorted.iloc[0]['num_diseases'])} diseases)")
|
||
|
||
# analysis 2: disease symptom diversity
|
||
print("\nAnalysis 2: Disease Symptom Diversity")
|
||
print("-"*60)
|
||
|
||
disease_symptom_edges = edges_by_type.get('presents', pd.DataFrame())
|
||
disease_symptom_edges = disease_symptom_edges[
|
||
disease_symptom_edges['source'].isin(disease_ids) &
|
||
disease_symptom_edges['target'].isin(symptom_ids)
|
||
]
|
||
|
||
disease_counts = disease_symptom_edges.groupby('source').size().reset_index()
|
||
disease_counts.columns = ['source', 'num_symptoms']
|
||
|
||
disease_df = nodes_df[nodes_df['kind']=='Disease'].merge(
|
||
disease_counts, left_on='id', right_on='source', how='left'
|
||
)
|
||
disease_df['num_symptoms'] = disease_df['num_symptoms'].fillna(0)
|
||
if 'source' in disease_df.columns:
|
||
disease_df.drop(columns=['source'], inplace=True)
|
||
|
||
disease_df_sorted = disease_df.sort_values('num_symptoms', ascending=False)
|
||
disease_df_sorted.to_csv(OUTPUT_DIR / "nodes_Disease.csv", index=False)
|
||
|
||
print(f"Top disease: {disease_df_sorted.iloc[0]['name']} ({int(disease_df_sorted.iloc[0]['num_symptoms'])} symptoms)")
|
||
|
||
# build indices for drug analyses
|
||
print("\nBuilding indices for drug analyses...")
|
||
disease_to_genes = defaultdict(set)
|
||
gene_to_diseases = defaultdict(set)
|
||
for _, row in gene_disease_edges.iterrows():
|
||
disease_to_genes[row['source']].add(row['target'])
|
||
gene_to_diseases[row['target']].add(row['source'])
|
||
|
||
drug_to_diseases = defaultdict(set)
|
||
disease_to_drugs = defaultdict(set)
|
||
treats_edges = edges_by_type.get('treats', pd.DataFrame())
|
||
for _, row in treats_edges.iterrows():
|
||
drug_to_diseases[row['source']].add(row['target'])
|
||
disease_to_drugs[row['target']].add(row['source'])
|
||
|
||
symptom_to_diseases = defaultdict(set)
|
||
for _, row in disease_symptom_edges.iterrows():
|
||
symptom_to_diseases[row['target']].add(row['source'])
|
||
|
||
print("\nETL (Extract, Transform, Load CSV files) COMPLETED!")
|
||
print("="*60)
|
||
print("\n📁 Generated CSV files:")
|
||
print(f" - Node files: 5")
|
||
print(f" - Edge files (split): {len(edge_types)}")
|
||
print(f" - Edge file (combined): 1")
|
||
print(f" - Analysis files: Various")
|
||
print(f"\n💡 For faster Neo4j loading, use the split edge files:")
|
||
print(f" edges_associates.csv, edges_treats.csv, etc.")
|
||
print(f" Instead of the combined edges_all.csv")
|
||
|
||
# analysis 3: drug repurposing
|
||
print("\nAnalysis 3: Drug Repurposing Opportunities")
|
||
print("-"*60)
|
||
|
||
repurposing_candidates = []
|
||
for disease_id in list(disease_ids)[:100]:
|
||
genes = disease_to_genes.get(disease_id, set())
|
||
if not genes:
|
||
continue
|
||
|
||
related_diseases = set()
|
||
for gene in genes:
|
||
related_diseases.update(gene_to_diseases[gene])
|
||
related_diseases.discard(disease_id)
|
||
|
||
candidate_drugs = set()
|
||
for related_disease in related_diseases:
|
||
candidate_drugs.update(disease_to_drugs[related_disease])
|
||
|
||
existing_treatments = disease_to_drugs[disease_id]
|
||
new_candidates = candidate_drugs - existing_treatments
|
||
|
||
for drug_id in list(new_candidates)[:10]:
|
||
repurposing_candidates.append({
|
||
'disease': node_id_to_name.get(disease_id, disease_id),
|
||
'candidate_drug': node_id_to_name.get(drug_id, drug_id),
|
||
'shared_genes': len(genes)
|
||
})
|
||
|
||
repurposing_df = pd.DataFrame(repurposing_candidates)
|
||
if len(repurposing_df) > 0:
|
||
repurposing_df = repurposing_df.sort_values('shared_genes', ascending=False)
|
||
repurposing_df.to_csv(OUTPUT_DIR / "analysis_drug_repurposing.csv", index=False)
|
||
print(f"Found {len(repurposing_df):,} repurposing opportunities")
|
||
|
||
# analysis 4: polypharmacy risk
|
||
print("\nAnalysis 4: Polypharmacy Risk")
|
||
print("-"*60)
|
||
|
||
causes_edges = edges_by_type.get('causes', pd.DataFrame())
|
||
drug_sideeffects = causes_edges[
|
||
causes_edges['source'].isin(compound_ids) &
|
||
causes_edges['target'].isin(sideeffect_ids)
|
||
]
|
||
|
||
if len(drug_sideeffects) > 0:
|
||
drug_risk = drug_sideeffects.groupby('source').size().reset_index(name='num_side_effects')
|
||
drug_risk['name'] = drug_risk['source'].map(node_id_to_name)
|
||
drug_risk['num_diseases_treated'] = drug_risk['source'].apply(
|
||
lambda x: len(drug_to_diseases.get(x, set()))
|
||
)
|
||
drug_risk['risk_score'] = drug_risk['num_side_effects'] / (drug_risk['num_diseases_treated'] + 1)
|
||
drug_risk_sorted = drug_risk.sort_values('num_side_effects', ascending=False)
|
||
drug_risk_sorted.to_csv(OUTPUT_DIR / "analysis_polypharmacy_risk.csv", index=False)
|
||
print(f"Analyzed {len(drug_risk_sorted):,} drugs for side effects")
|
||
|
||
# analysis 5: symptom triangle
|
||
print("\nAnalysis 5: Symptom-Disease-Drug Triangle")
|
||
print("-"*60)
|
||
|
||
drugs_with_sideeffects_set = set(drug_sideeffects['source']) if len(drug_sideeffects) > 0 else set()
|
||
|
||
symptom_analysis = []
|
||
for symptom_id in list(symptom_ids)[:100]:
|
||
diseases = symptom_to_diseases.get(symptom_id, set())
|
||
if not diseases:
|
||
continue
|
||
|
||
treating_drugs = set()
|
||
for disease in diseases:
|
||
treating_drugs.update(disease_to_drugs[disease])
|
||
|
||
drugs_with_se = len(treating_drugs & drugs_with_sideeffects_set)
|
||
|
||
symptom_analysis.append({
|
||
'symptom': node_id_to_name.get(symptom_id, symptom_id),
|
||
'num_diseases': len(diseases),
|
||
'num_treating_drugs': len(treating_drugs),
|
||
'drugs_with_side_effects': drugs_with_se,
|
||
'impact_score': len(diseases) * len(treating_drugs)
|
||
})
|
||
|
||
symptom_triangle_df = pd.DataFrame(symptom_analysis)
|
||
if len(symptom_triangle_df) > 0:
|
||
symptom_triangle_df = symptom_triangle_df.sort_values('impact_score', ascending=False)
|
||
symptom_triangle_df.to_csv(OUTPUT_DIR / "analysis_symptom_triangle.csv", index=False)
|
||
print(f"Analyzed {len(symptom_triangle_df):,} symptoms")
|
||
|
||
# analysis 6: super drugs
|
||
print("\nAnalysis 6: Super-Drug Score")
|
||
print("-"*60)
|
||
|
||
super_drugs = []
|
||
for drug_id in compound_ids:
|
||
num_diseases = len(drug_to_diseases.get(drug_id, set()))
|
||
if num_diseases == 0:
|
||
continue
|
||
|
||
num_se = len(drug_sideeffects[drug_sideeffects['source'] == drug_id]) if len(drug_sideeffects) > 0 else 0
|
||
super_score = num_diseases / (1 + num_se)
|
||
|
||
super_drugs.append({
|
||
'name': node_id_to_name.get(drug_id, drug_id),
|
||
'num_diseases_treated': num_diseases,
|
||
'num_side_effects': num_se,
|
||
'super_score': super_score
|
||
})
|
||
|
||
super_drugs_df = pd.DataFrame(super_drugs)
|
||
if len(super_drugs_df) > 0:
|
||
super_drugs_df = super_drugs_df.sort_values('super_score', ascending=False)
|
||
super_drugs_df.to_csv(OUTPUT_DIR / "analysis_super_drugs.csv", index=False)
|
||
print(f"Analyzed {len(super_drugs_df):,} drugs")
|
||
|
||
# analysis 7: drug conflicts
|
||
print("\nAnalysis 7: Drug Conflicts")
|
||
print("-"*60)
|
||
|
||
drug_to_sideeffects = defaultdict(set)
|
||
for _, row in drug_sideeffects.iterrows():
|
||
drug_to_sideeffects[row['source']].add(row['target'])
|
||
|
||
drug_conflicts = []
|
||
drugs_list = list(drug_to_sideeffects.keys())
|
||
|
||
for i in range(len(drugs_list)):
|
||
if i % 100 == 0 and i > 0:
|
||
print(f" Processing: {i}/{len(drugs_list)}...")
|
||
|
||
drug1_id = drugs_list[i]
|
||
drug1_se = drug_to_sideeffects[drug1_id]
|
||
|
||
for drug2_id in drugs_list[i+1:min(i+51, len(drugs_list))]:
|
||
drug2_se = drug_to_sideeffects[drug2_id]
|
||
overlap = drug1_se & drug2_se
|
||
|
||
if len(overlap) >= 10:
|
||
drug_conflicts.append({
|
||
'drug1': node_id_to_name.get(drug1_id, drug1_id),
|
||
'drug2': node_id_to_name.get(drug2_id, drug2_id),
|
||
'shared_side_effects': len(overlap),
|
||
'drug1_total_se': len(drug1_se),
|
||
'drug2_total_se': len(drug2_se),
|
||
'overlap_percentage': (len(overlap) / min(len(drug1_se), len(drug2_se))) * 100
|
||
})
|
||
|
||
drug_conflicts_df = pd.DataFrame(drug_conflicts)
|
||
if len(drug_conflicts_df) > 0:
|
||
drug_conflicts_df = drug_conflicts_df.sort_values('shared_side_effects', ascending=False)
|
||
drug_conflicts_df.to_csv(OUTPUT_DIR / "analysis_drug_conflicts.csv", index=False)
|
||
print(f"Found {len(drug_conflicts_df):,} drug conflict pairs")
|
||
|
||
# analysis 8: network data
|
||
print("\nAnalysis 8: Network Visualization Data")
|
||
print("-"*60)
|
||
|
||
top_diseases = disease_df_sorted.nlargest(20, 'num_symptoms')['id'].tolist()
|
||
|
||
network_nodes = []
|
||
network_edges = []
|
||
node_id_counter = 0
|
||
id_mapping = {}
|
||
|
||
# add disease nodes
|
||
for disease_id in top_diseases:
|
||
node_id = f"d_{node_id_counter}"
|
||
id_mapping[disease_id] = node_id
|
||
network_nodes.append({
|
||
'id': node_id,
|
||
'label': node_id_to_name.get(disease_id, disease_id),
|
||
'type': 'Disease',
|
||
'original_id': disease_id
|
||
})
|
||
node_id_counter += 1
|
||
|
||
# add genes
|
||
disease_genes = gene_disease_edges[
|
||
gene_disease_edges['source'].isin(top_diseases)
|
||
].head(150)
|
||
|
||
for _, row in disease_genes.iterrows():
|
||
gene_id = row['target']
|
||
if gene_id not in id_mapping:
|
||
node_id = f"g_{node_id_counter}"
|
||
id_mapping[gene_id] = node_id
|
||
network_nodes.append({
|
||
'id': node_id,
|
||
'label': node_id_to_name.get(gene_id, gene_id),
|
||
'type': 'Gene',
|
||
'original_id': gene_id
|
||
})
|
||
node_id_counter += 1
|
||
|
||
network_edges.append({
|
||
'source': id_mapping[row['source']],
|
||
'target': id_mapping[gene_id],
|
||
'type': 'associates'
|
||
})
|
||
|
||
# add drugs
|
||
drug_treatments = treats_edges[treats_edges['target'].isin(top_diseases)].head(50)
|
||
|
||
for _, row in drug_treatments.iterrows():
|
||
drug_id = row['source']
|
||
if drug_id not in id_mapping:
|
||
node_id = f"c_{node_id_counter}"
|
||
id_mapping[drug_id] = node_id
|
||
network_nodes.append({
|
||
'id': node_id,
|
||
'label': node_id_to_name.get(drug_id, drug_id),
|
||
'type': 'Compound',
|
||
'original_id': drug_id
|
||
})
|
||
node_id_counter += 1
|
||
|
||
network_edges.append({
|
||
'source': id_mapping[drug_id],
|
||
'target': id_mapping[row['target']],
|
||
'type': 'treats'
|
||
})
|
||
|
||
network_nodes_df = pd.DataFrame(network_nodes)
|
||
network_edges_df = pd.DataFrame(network_edges)
|
||
|
||
network_nodes_df.to_csv(OUTPUT_DIR / "network_nodes.csv", index=False)
|
||
network_edges_df.to_csv(OUTPUT_DIR / "network_edges.csv", index=False)
|
||
|
||
print(f"Created network with {len(network_nodes_df):,} nodes and {len(network_edges_df):,} edges")
|
||
|
||
|
||
|
||
|