Files
ETL_Datamanagement/etl.py
Philipp Jacoby 3003310be0 finishes setup
2026-02-10 17:43:26 +01:00

473 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import pandas as pd
from pathlib import Path
from collections import defaultdict
# KONFIGURATION
INPUT_JSON = "hetionet-v1.0.json"
OUTPUT_DIR = Path("neo4j_csv")
OUTPUT_DIR.mkdir(exist_ok=True)
print("="*60)
print("HETIONET ETL PIPELINE (OPTIMIZED + SPLIT EDGES)")
print("="*60)
# EXTRACT
print("\nPHASE 1: EXTRACTION")
print("-"*60)
print("Loading JSON data...")
with open(INPUT_JSON, "r", encoding="utf-8") as f:
data = json.load(f)
nodes_raw = data["nodes"]
edges_raw = data["edges"]
print(f"Nodes loaded: {len(nodes_raw):,}")
print(f"Edges loaded: {len(edges_raw):,}")
# TRANSFORM NODES
print("\nPHASE 2: TRANSFORM NODES")
print("-"*60)
nodes_flat = []
for node in nodes_raw:
row = {
"id": str(node["identifier"]),
"name": node.get("name"),
"kind": node["kind"]
}
if "data" in node and isinstance(node["data"], dict):
for key, value in node["data"].items():
row[key] = value
nodes_flat.append(row)
nodes_df = pd.DataFrame(nodes_flat)
# Spaltennamen Neo4j-sicher machen
nodes_df.columns = (
nodes_df.columns
.str.replace(" ", "_")
.str.replace("-", "_")
.str.replace(".", "_")
)
print(f"Processed {len(nodes_df):,} nodes")
print(f" Columns: {', '.join(nodes_df.columns[:5])}...")
# Create lookup dictionaries
node_id_to_kind = dict(zip(nodes_df['id'], nodes_df['kind']))
node_id_to_name = dict(zip(nodes_df['id'], nodes_df['name']))
# Create sets for fast membership testing
gene_ids = set(nodes_df[nodes_df['kind'] == 'Gene']['id'])
disease_ids = set(nodes_df[nodes_df['kind'] == 'Disease']['id'])
symptom_ids = set(nodes_df[nodes_df['kind'] == 'Symptom']['id'])
compound_ids = set(nodes_df[nodes_df['kind'] == 'Compound']['id'])
sideeffect_ids = set(nodes_df[nodes_df['kind'] == 'Side Effect']['id'])
print(f"\nNode Statistics:")
print(f" - Genes: {len(gene_ids):,}")
print(f" - Diseases: {len(disease_ids):,}")
print(f" - Symptoms: {len(symptom_ids):,}")
print(f" - Compounds: {len(compound_ids):,}")
print(f" - Side Effects: {len(sideeffect_ids):,}")
# Export nodes by type
print("\nExporting node files...")
for kind in nodes_df["kind"].unique():
df_kind = (
nodes_df[nodes_df["kind"] == kind]
.drop(columns=["kind"])
.drop_duplicates(subset=["id"])
)
filename = OUTPUT_DIR / f"nodes_{kind.replace(' ', '_')}.csv"
df_kind.to_csv(filename, index=False)
print(f" {filename.name} ({len(df_kind):,} rows)")
# TRANSFORM EDGES
print("\nPHASE 3: TRANSFORM EDGES")
print("-"*60)
edges = []
for i, edge in enumerate(edges_raw):
if i % 500000 == 0 and i > 0:
print(f" Processing: {i:,} / {len(edges_raw):,}...")
edges.append({
"source": str(edge["source_id"][1]),
"target": str(edge["target_id"][1]),
"type": edge["kind"]
})
edges_df = pd.DataFrame(edges)
# Relationship-Typen Neo4j-sicher machen
edges_df["type"] = edges_df["type"].str.replace(" ", "_").str.replace("-", "_")
# split edges into seperate files
print("\nExporting edges by type to separate CSV files...")
print("-"*60)
edge_types = edges_df['type'].unique()
for edge_type in sorted(edge_types):
edges_subset = edges_df[edges_df['type'] == edge_type]
filename = OUTPUT_DIR / f"edges_{edge_type}.csv"
# Only export source and target (type is in filename)
edges_subset[['source', 'target']].to_csv(filename, index=False)
size_mb = filename.stat().st_size / (1024*1024)
print(f" ✓ edges_{edge_type:20s}.csv ({len(edges_subset):>10,} rows, {size_mb:>6.2f} MB)")
# Also keep the combined file for backward compatibility
edges_file = OUTPUT_DIR / "edges_all.csv"
edges_df.to_csv(edges_file, index=False)
print(f"\n ✓ edges_all.csv (combined) ({len(edges_df):,} rows)")
print(f"\nSummary:")
print(f" Total edges: {len(edges_df):,}")
print(f" Split into {len(edge_types)} separate CSV files")
print(f" Each file can be loaded independently!")
# Pre-filter edges by type for analysis
print("\nEdge type distribution:")
edges_by_type = {}
for edge_type in sorted(edge_types):
edges_by_type[edge_type] = edges_df[edges_df['type'] == edge_type].copy()
count = len(edges_by_type[edge_type])
pct = 100 * count / len(edges_df)
print(f" - {edge_type:20s}: {count:>10,} ({pct:>5.1f}%)")
# [ANALYSES - keeping all the existing analysis code...]
# (Keeping the same analysis code as before)
print("\n" + "="*60)
print("PHASE 4: ANALYSES")
print("="*60)
# ANALYSIS 1: HOTSPOT GENES
print("\nAnalysis 1: Hotspot Genes")
print("-"*60)
gene_disease_edges = pd.concat([
edges_by_type.get('associates', pd.DataFrame()),
edges_by_type.get('regulates', pd.DataFrame()),
edges_by_type.get('upregulates', pd.DataFrame()),
edges_by_type.get('downregulates', pd.DataFrame()),
edges_by_type.get('binds', pd.DataFrame())
])
gene_disease_edges = gene_disease_edges[
gene_disease_edges['source'].isin(disease_ids) &
gene_disease_edges['target'].isin(gene_ids)
]
gene_counts = gene_disease_edges.groupby('target').size().reset_index(name='num_diseases')
genes_df = nodes_df[nodes_df['kind']=='Gene'].merge(
gene_counts, left_on='id', right_on='target', how='left'
)
genes_df['num_diseases'] = genes_df['num_diseases'].fillna(0)
if 'target' in genes_df.columns:
genes_df.drop(columns=['target'], inplace=True)
genes_df_sorted = genes_df.sort_values('num_diseases', ascending=False)
genes_df_sorted.to_csv(OUTPUT_DIR / "nodes_Gene.csv", index=False)
print(f"Top gene: {genes_df_sorted.iloc[0]['name']} ({int(genes_df_sorted.iloc[0]['num_diseases'])} diseases)")
# ANALYSIS 2: DISEASE SYMPTOM DIVERSITY
print("\nAnalysis 2: Disease Symptom Diversity")
print("-"*60)
disease_symptom_edges = edges_by_type.get('presents', pd.DataFrame())
disease_symptom_edges = disease_symptom_edges[
disease_symptom_edges['source'].isin(disease_ids) &
disease_symptom_edges['target'].isin(symptom_ids)
]
disease_counts = disease_symptom_edges.groupby('source').size().reset_index()
disease_counts.columns = ['source', 'num_symptoms']
disease_df = nodes_df[nodes_df['kind']=='Disease'].merge(
disease_counts, left_on='id', right_on='source', how='left'
)
disease_df['num_symptoms'] = disease_df['num_symptoms'].fillna(0)
if 'source' in disease_df.columns:
disease_df.drop(columns=['source'], inplace=True)
disease_df_sorted = disease_df.sort_values('num_symptoms', ascending=False)
disease_df_sorted.to_csv(OUTPUT_DIR / "nodes_Disease.csv", index=False)
print(f"Top disease: {disease_df_sorted.iloc[0]['name']} ({int(disease_df_sorted.iloc[0]['num_symptoms'])} symptoms)")
# Build indices for drug analyses
print("\nBuilding indices for drug analyses...")
disease_to_genes = defaultdict(set)
gene_to_diseases = defaultdict(set)
for _, row in gene_disease_edges.iterrows():
disease_to_genes[row['source']].add(row['target'])
gene_to_diseases[row['target']].add(row['source'])
drug_to_diseases = defaultdict(set)
disease_to_drugs = defaultdict(set)
treats_edges = edges_by_type.get('treats', pd.DataFrame())
for _, row in treats_edges.iterrows():
drug_to_diseases[row['source']].add(row['target'])
disease_to_drugs[row['target']].add(row['source'])
symptom_to_diseases = defaultdict(set)
for _, row in disease_symptom_edges.iterrows():
symptom_to_diseases[row['target']].add(row['source'])
print("\nETL (Extract, Transform, Load CSV files) COMPLETED!")
print("="*60)
print("\n📁 Generated CSV files:")
print(f" - Node files: 5")
print(f" - Edge files (split): {len(edge_types)}")
print(f" - Edge file (combined): 1")
print(f" - Analysis files: Various")
print(f"\n💡 For faster Neo4j loading, use the split edge files:")
print(f" edges_associates.csv, edges_treats.csv, etc.")
print(f" Instead of the combined edges_all.csv")
# ANALYSIS 3: DRUG REPURPOSING
print("\nAnalysis 3: Drug Repurposing Opportunities")
print("-"*60)
repurposing_candidates = []
for disease_id in list(disease_ids)[:100]:
genes = disease_to_genes.get(disease_id, set())
if not genes:
continue
related_diseases = set()
for gene in genes:
related_diseases.update(gene_to_diseases[gene])
related_diseases.discard(disease_id)
candidate_drugs = set()
for related_disease in related_diseases:
candidate_drugs.update(disease_to_drugs[related_disease])
existing_treatments = disease_to_drugs[disease_id]
new_candidates = candidate_drugs - existing_treatments
for drug_id in list(new_candidates)[:10]:
repurposing_candidates.append({
'disease': node_id_to_name.get(disease_id, disease_id),
'candidate_drug': node_id_to_name.get(drug_id, drug_id),
'shared_genes': len(genes)
})
repurposing_df = pd.DataFrame(repurposing_candidates)
if len(repurposing_df) > 0:
repurposing_df = repurposing_df.sort_values('shared_genes', ascending=False)
repurposing_df.to_csv(OUTPUT_DIR / "analysis_drug_repurposing.csv", index=False)
print(f"Found {len(repurposing_df):,} repurposing opportunities")
# ANALYSIS 4: POLYPHARMACY RISK
print("\nAnalysis 4: Polypharmacy Risk")
print("-"*60)
causes_edges = edges_by_type.get('causes', pd.DataFrame())
drug_sideeffects = causes_edges[
causes_edges['source'].isin(compound_ids) &
causes_edges['target'].isin(sideeffect_ids)
]
if len(drug_sideeffects) > 0:
drug_risk = drug_sideeffects.groupby('source').size().reset_index(name='num_side_effects')
drug_risk['name'] = drug_risk['source'].map(node_id_to_name)
drug_risk['num_diseases_treated'] = drug_risk['source'].apply(
lambda x: len(drug_to_diseases.get(x, set()))
)
drug_risk['risk_score'] = drug_risk['num_side_effects'] / (drug_risk['num_diseases_treated'] + 1)
drug_risk_sorted = drug_risk.sort_values('num_side_effects', ascending=False)
drug_risk_sorted.to_csv(OUTPUT_DIR / "analysis_polypharmacy_risk.csv", index=False)
print(f"Analyzed {len(drug_risk_sorted):,} drugs for side effects")
# ANALYSIS 5: SYMPTOM TRIANGLE
print("\nAnalysis 5: Symptom-Disease-Drug Triangle")
print("-"*60)
drugs_with_sideeffects_set = set(drug_sideeffects['source']) if len(drug_sideeffects) > 0 else set()
symptom_analysis = []
for symptom_id in list(symptom_ids)[:100]:
diseases = symptom_to_diseases.get(symptom_id, set())
if not diseases:
continue
treating_drugs = set()
for disease in diseases:
treating_drugs.update(disease_to_drugs[disease])
drugs_with_se = len(treating_drugs & drugs_with_sideeffects_set)
symptom_analysis.append({
'symptom': node_id_to_name.get(symptom_id, symptom_id),
'num_diseases': len(diseases),
'num_treating_drugs': len(treating_drugs),
'drugs_with_side_effects': drugs_with_se,
'impact_score': len(diseases) * len(treating_drugs)
})
symptom_triangle_df = pd.DataFrame(symptom_analysis)
if len(symptom_triangle_df) > 0:
symptom_triangle_df = symptom_triangle_df.sort_values('impact_score', ascending=False)
symptom_triangle_df.to_csv(OUTPUT_DIR / "analysis_symptom_triangle.csv", index=False)
print(f"Analyzed {len(symptom_triangle_df):,} symptoms")
# ANALYSIS 6: SUPER DRUGS
print("\nAnalysis 6: Super-Drug Score")
print("-"*60)
super_drugs = []
for drug_id in compound_ids:
num_diseases = len(drug_to_diseases.get(drug_id, set()))
if num_diseases == 0:
continue
num_se = len(drug_sideeffects[drug_sideeffects['source'] == drug_id]) if len(drug_sideeffects) > 0 else 0
super_score = num_diseases / (1 + num_se)
super_drugs.append({
'name': node_id_to_name.get(drug_id, drug_id),
'num_diseases_treated': num_diseases,
'num_side_effects': num_se,
'super_score': super_score
})
super_drugs_df = pd.DataFrame(super_drugs)
if len(super_drugs_df) > 0:
super_drugs_df = super_drugs_df.sort_values('super_score', ascending=False)
super_drugs_df.to_csv(OUTPUT_DIR / "analysis_super_drugs.csv", index=False)
print(f"Analyzed {len(super_drugs_df):,} drugs")
# ANALYSIS 7: DRUG CONFLICTS
print("\nAnalysis 7: Drug Conflicts")
print("-"*60)
drug_to_sideeffects = defaultdict(set)
for _, row in drug_sideeffects.iterrows():
drug_to_sideeffects[row['source']].add(row['target'])
drug_conflicts = []
drugs_list = list(drug_to_sideeffects.keys())
for i in range(len(drugs_list)):
if i % 100 == 0 and i > 0:
print(f" Processing: {i}/{len(drugs_list)}...")
drug1_id = drugs_list[i]
drug1_se = drug_to_sideeffects[drug1_id]
for drug2_id in drugs_list[i+1:min(i+51, len(drugs_list))]:
drug2_se = drug_to_sideeffects[drug2_id]
overlap = drug1_se & drug2_se
if len(overlap) >= 10:
drug_conflicts.append({
'drug1': node_id_to_name.get(drug1_id, drug1_id),
'drug2': node_id_to_name.get(drug2_id, drug2_id),
'shared_side_effects': len(overlap),
'drug1_total_se': len(drug1_se),
'drug2_total_se': len(drug2_se),
'overlap_percentage': (len(overlap) / min(len(drug1_se), len(drug2_se))) * 100
})
drug_conflicts_df = pd.DataFrame(drug_conflicts)
if len(drug_conflicts_df) > 0:
drug_conflicts_df = drug_conflicts_df.sort_values('shared_side_effects', ascending=False)
drug_conflicts_df.to_csv(OUTPUT_DIR / "analysis_drug_conflicts.csv", index=False)
print(f"Found {len(drug_conflicts_df):,} drug conflict pairs")
# ANALYSIS 8: NETWORK DATA
print("\nAnalysis 8: Network Visualization Data")
print("-"*60)
top_diseases = disease_df_sorted.nlargest(20, 'num_symptoms')['id'].tolist()
network_nodes = []
network_edges = []
node_id_counter = 0
id_mapping = {}
# Add disease nodes
for disease_id in top_diseases:
node_id = f"d_{node_id_counter}"
id_mapping[disease_id] = node_id
network_nodes.append({
'id': node_id,
'label': node_id_to_name.get(disease_id, disease_id),
'type': 'Disease',
'original_id': disease_id
})
node_id_counter += 1
# Add genes
disease_genes = gene_disease_edges[
gene_disease_edges['source'].isin(top_diseases)
].head(150)
for _, row in disease_genes.iterrows():
gene_id = row['target']
if gene_id not in id_mapping:
node_id = f"g_{node_id_counter}"
id_mapping[gene_id] = node_id
network_nodes.append({
'id': node_id,
'label': node_id_to_name.get(gene_id, gene_id),
'type': 'Gene',
'original_id': gene_id
})
node_id_counter += 1
network_edges.append({
'source': id_mapping[row['source']],
'target': id_mapping[gene_id],
'type': 'associates'
})
# Add drugs
drug_treatments = treats_edges[treats_edges['target'].isin(top_diseases)].head(50)
for _, row in drug_treatments.iterrows():
drug_id = row['source']
if drug_id not in id_mapping:
node_id = f"c_{node_id_counter}"
id_mapping[drug_id] = node_id
network_nodes.append({
'id': node_id,
'label': node_id_to_name.get(drug_id, drug_id),
'type': 'Compound',
'original_id': drug_id
})
node_id_counter += 1
network_edges.append({
'source': id_mapping[drug_id],
'target': id_mapping[row['target']],
'type': 'treats'
})
network_nodes_df = pd.DataFrame(network_nodes)
network_edges_df = pd.DataFrame(network_edges)
network_nodes_df.to_csv(OUTPUT_DIR / "network_nodes.csv", index=False)
network_edges_df.to_csv(OUTPUT_DIR / "network_edges.csv", index=False)
print(f"Created network with {len(network_nodes_df):,} nodes and {len(network_edges_df):,} edges")