Files
ETL_Datamanagement/schnellerereETL.py
Philipp Jacoby 2384a33be2 init
2026-01-10 16:18:54 +01:00

448 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import pandas as pd
from pathlib import Path
from collections import defaultdict
# ==============================
# KONFIGURATION
# ==============================
INPUT_JSON = "hetionet-v1.0.json"
OUTPUT_DIR = Path("neo4j_csv")
OUTPUT_DIR.mkdir(exist_ok=True)
print("="*60)
print("HETIONET ETL PIPELINE")
print("="*60)
# ==============================
# EXTRACT
# ==============================
print("\nPHASE 1: EXTRACTION")
print("-"*60)
print("Loading JSON data...")
with open(INPUT_JSON, "r", encoding="utf-8") as f:
data = json.load(f)
nodes_raw = data["nodes"]
edges_raw = data["edges"]
print(f"Nodes loaded: {len(nodes_raw):,}")
print(f"Edges loaded: {len(edges_raw):,}")
# ==============================
# TRANSFORM NODES
# ==============================
print("\n PHASE 2: TRANSFORM NODES")
print("-"*60)
nodes_flat = []
for node in nodes_raw:
row = {
"id": str(node["identifier"]),
"name": node.get("name"),
"kind": node["kind"]
}
if "data" in node and isinstance(node["data"], dict):
for key, value in node["data"].items():
row[key] = value
nodes_flat.append(row)
nodes_df = pd.DataFrame(nodes_flat)
# Spaltennamen Neo4j-sicher machen
nodes_df.columns = (
nodes_df.columns
.str.replace(" ", "_")
.str.replace("-", "_")
.str.replace(".", "_")
)
print(f"Processed {len(nodes_df):,} nodes")
print(f" Columns: {', '.join(nodes_df.columns[:5])}...")
# Create lookup dictionaries
node_id_to_kind = dict(zip(nodes_df['id'], nodes_df['kind']))
node_id_to_name = dict(zip(nodes_df['id'], nodes_df['name']))
# Create sets for fast membership testing
gene_ids = set(nodes_df[nodes_df['kind'] == 'Gene']['id'])
disease_ids = set(nodes_df[nodes_df['kind'] == 'Disease']['id'])
symptom_ids = set(nodes_df[nodes_df['kind'] == 'Symptom']['id'])
compound_ids = set(nodes_df[nodes_df['kind'] == 'Compound']['id'])
sideeffect_ids = set(nodes_df[nodes_df['kind'] == 'Side Effect']['id'])
print(f"\n Node Statistics:")
print(f" - Genes: {len(gene_ids):,}")
print(f" - Diseases: {len(disease_ids):,}")
print(f" - Symptoms: {len(symptom_ids):,}")
print(f" - Compounds: {len(compound_ids):,}")
print(f" - Side Effects: {len(sideeffect_ids):,}")
# Export nodes by type
print("\n Exporting node files...")
for kind in nodes_df["kind"].unique():
df_kind = (
nodes_df[nodes_df["kind"] == kind]
.drop(columns=["kind"])
.drop_duplicates(subset=["id"])
)
filename = OUTPUT_DIR / f"nodes_{kind.replace(' ', '_')}.csv"
df_kind.to_csv(filename, index=False)
print(f" {filename.name} ({len(df_kind):,} rows)")
# ==============================
# TRANSFORM EDGES
# ==============================
print("\nPHASE 3: TRANSFORM EDGES")
print("-"*60)
edges = []
for i, edge in enumerate(edges_raw):
if i % 500000 == 0 and i > 0:
print(f" Processing: {i:,} / {len(edges_raw):,}...")
edges.append({
"source": str(edge["source_id"][1]),
"target": str(edge["target_id"][1]),
"type": edge["kind"]
})
edges_df = pd.DataFrame(edges)
# Relationship-Typen Neo4j-sicher machen
edges_df["type"] = edges_df["type"].str.replace(" ", "_").str.replace("-", "_")
edges_file = OUTPUT_DIR / "edges_all.csv"
edges_df.to_csv(edges_file, index=False)
print(f"\n Edges processed: {len(edges_df):,}")
print(f"Saved to: {edges_file.name}")
# Pre-filter edges by type
print("\n Pre-filtering edges by type...")
edges_by_type = {}
for edge_type in ['associates', 'treats', 'presents', 'causes', 'regulates', 'upregulates', 'downregulates', 'binds']:
edges_by_type[edge_type] = edges_df[edges_df['type'] == edge_type].copy()
if len(edges_by_type[edge_type]) > 0:
print(f" - {edge_type}: {len(edges_by_type[edge_type]):,}")
# ==============================
# ANALYSES
# ==============================
print("\n" + "="*60)
print("PHASE 4: ANALYSES")
print("="*60)
# ANALYSIS 1: HOTSPOT GENES
print("\n Analysis 1: Hotspot Genes")
print("-"*60)
gene_disease_edges = pd.concat([
edges_by_type.get('associates', pd.DataFrame()),
edges_by_type.get('regulates', pd.DataFrame()),
edges_by_type.get('upregulates', pd.DataFrame()),
edges_by_type.get('downregulates', pd.DataFrame()),
edges_by_type.get('binds', pd.DataFrame())
])
gene_disease_edges = gene_disease_edges[
gene_disease_edges['source'].isin(disease_ids) &
gene_disease_edges['target'].isin(gene_ids)
]
gene_counts = gene_disease_edges.groupby('target').size().reset_index(name='num_diseases')
genes_df = nodes_df[nodes_df['kind']=='Gene'].merge(
gene_counts, left_on='id', right_on='target', how='left'
)
genes_df['num_diseases'] = genes_df['num_diseases'].fillna(0)
if 'target' in genes_df.columns:
genes_df.drop(columns=['target'], inplace=True)
genes_df_sorted = genes_df.sort_values('num_diseases', ascending=False)
genes_df_sorted.to_csv(OUTPUT_DIR / "nodes_Gene.csv", index=False)
print(f"Top gene: {genes_df_sorted.iloc[0]['name']} ({int(genes_df_sorted.iloc[0]['num_diseases'])} diseases)")
# ANALYSIS 2: DISEASE SYMPTOM DIVERSITY
print("\n Analysis 2: Disease Symptom Diversity")
print("-"*60)
disease_symptom_edges = edges_by_type.get('presents', pd.DataFrame())
disease_symptom_edges = disease_symptom_edges[
disease_symptom_edges['source'].isin(disease_ids) &
disease_symptom_edges['target'].isin(symptom_ids)
]
disease_counts = disease_symptom_edges.groupby('source').size().reset_index()
disease_counts.columns = ['source', 'num_symptoms']
disease_df = nodes_df[nodes_df['kind']=='Disease'].merge(
disease_counts, left_on='id', right_on='source', how='left'
)
disease_df['num_symptoms'] = disease_df['num_symptoms'].fillna(0)
if 'source' in disease_df.columns:
disease_df.drop(columns=['source'], inplace=True)
disease_df_sorted = disease_df.sort_values('num_symptoms', ascending=False)
disease_df_sorted.to_csv(OUTPUT_DIR / "nodes_Disease.csv", index=False)
print(f"Top disease: {disease_df_sorted.iloc[0]['name']} ({int(disease_df_sorted.iloc[0]['num_symptoms'])} symptoms)")
# Build indices for drug analyses
print("\n🔍 Building indices for drug analyses...")
disease_to_genes = defaultdict(set)
gene_to_diseases = defaultdict(set)
for _, row in gene_disease_edges.iterrows():
disease_to_genes[row['source']].add(row['target'])
gene_to_diseases[row['target']].add(row['source'])
drug_to_diseases = defaultdict(set)
disease_to_drugs = defaultdict(set)
treats_edges = edges_by_type.get('treats', pd.DataFrame())
for _, row in treats_edges.iterrows():
drug_to_diseases[row['source']].add(row['target'])
disease_to_drugs[row['target']].add(row['source'])
symptom_to_diseases = defaultdict(set)
for _, row in disease_symptom_edges.iterrows():
symptom_to_diseases[row['target']].add(row['source'])
# ANALYSIS 3: DRUG REPURPOSING
print("\nAnalysis 3: Drug Repurposing Opportunities")
print("-"*60)
repurposing_candidates = []
for disease_id in list(disease_ids)[:100]:
genes = disease_to_genes.get(disease_id, set())
if not genes:
continue
related_diseases = set()
for gene in genes:
related_diseases.update(gene_to_diseases[gene])
related_diseases.discard(disease_id)
candidate_drugs = set()
for related_disease in related_diseases:
candidate_drugs.update(disease_to_drugs[related_disease])
existing_treatments = disease_to_drugs[disease_id]
new_candidates = candidate_drugs - existing_treatments
for drug_id in list(new_candidates)[:3]:
repurposing_candidates.append({
'disease': node_id_to_name.get(disease_id, disease_id),
'candidate_drug': node_id_to_name.get(drug_id, drug_id),
'shared_genes': len(genes)
})
repurposing_df = pd.DataFrame(repurposing_candidates)
if len(repurposing_df) > 0:
repurposing_df = repurposing_df.sort_values('shared_genes', ascending=False)
repurposing_df.to_csv(OUTPUT_DIR / "analysis_drug_repurposing.csv", index=False)
print(f"Found {len(repurposing_df):,} repurposing opportunities")
# ANALYSIS 4: POLYPHARMACY RISK
print("\nAnalysis 4: Polypharmacy Risk")
print("-"*60)
causes_edges = edges_by_type.get('causes', pd.DataFrame())
drug_sideeffects = causes_edges[
causes_edges['source'].isin(compound_ids) &
causes_edges['target'].isin(sideeffect_ids)
]
if len(drug_sideeffects) > 0:
drug_risk = drug_sideeffects.groupby('source').size().reset_index(name='num_side_effects')
drug_risk['name'] = drug_risk['source'].map(node_id_to_name)
drug_risk['num_diseases_treated'] = drug_risk['source'].apply(
lambda x: len(drug_to_diseases.get(x, set()))
)
drug_risk['risk_score'] = drug_risk['num_side_effects'] / (drug_risk['num_diseases_treated'] + 1)
drug_risk_sorted = drug_risk.sort_values('num_side_effects', ascending=False)
drug_risk_sorted.to_csv(OUTPUT_DIR / "analysis_polypharmacy_risk.csv", index=False)
print(f"Analyzed {len(drug_risk_sorted):,} drugs for side effects")
# ANALYSIS 5: SYMPTOM TRIANGLE
print("\n Analysis 5: Symptom-Disease-Drug Triangle")
print("-"*60)
drugs_with_sideeffects_set = set(drug_sideeffects['source']) if len(drug_sideeffects) > 0 else set()
symptom_analysis = []
for symptom_id in list(symptom_ids)[:100]:
diseases = symptom_to_diseases.get(symptom_id, set())
if not diseases:
continue
treating_drugs = set()
for disease in diseases:
treating_drugs.update(disease_to_drugs[disease])
drugs_with_se = len(treating_drugs & drugs_with_sideeffects_set)
symptom_analysis.append({
'symptom': node_id_to_name.get(symptom_id, symptom_id),
'num_diseases': len(diseases),
'num_treating_drugs': len(treating_drugs),
'drugs_with_side_effects': drugs_with_se,
'impact_score': len(diseases) * len(treating_drugs)
})
symptom_triangle_df = pd.DataFrame(symptom_analysis)
if len(symptom_triangle_df) > 0:
symptom_triangle_df = symptom_triangle_df.sort_values('impact_score', ascending=False)
symptom_triangle_df.to_csv(OUTPUT_DIR / "analysis_symptom_triangle.csv", index=False)
print(f"Analyzed {len(symptom_triangle_df):,} symptoms")
# ANALYSIS 6: SUPER DRUGS
print("\n Analysis 6: Super-Drug Score")
print("-"*60)
super_drugs = []
for drug_id in compound_ids:
num_diseases = len(drug_to_diseases.get(drug_id, set()))
if num_diseases == 0:
continue
num_se = len(drug_sideeffects[drug_sideeffects['source'] == drug_id]) if len(drug_sideeffects) > 0 else 0
super_score = num_diseases / (1 + num_se)
super_drugs.append({
'name': node_id_to_name.get(drug_id, drug_id),
'num_diseases_treated': num_diseases,
'num_side_effects': num_se,
'super_score': super_score
})
super_drugs_df = pd.DataFrame(super_drugs)
if len(super_drugs_df) > 0:
super_drugs_df = super_drugs_df.sort_values('super_score', ascending=False)
super_drugs_df.to_csv(OUTPUT_DIR / "analysis_super_drugs.csv", index=False)
print(f"Analyzed {len(super_drugs_df):,} drugs")
# ANALYSIS 7: DRUG CONFLICTS
print("\nAnalysis 7: Drug Conflicts")
print("-"*60)
drug_to_sideeffects = defaultdict(set)
for _, row in drug_sideeffects.iterrows():
drug_to_sideeffects[row['source']].add(row['target'])
drug_conflicts = []
drugs_list = list(drug_to_sideeffects.keys())
for i in range(len(drugs_list)):
if i % 100 == 0 and i > 0:
print(f" Processing: {i}/{len(drugs_list)}...")
drug1_id = drugs_list[i]
drug1_se = drug_to_sideeffects[drug1_id]
for drug2_id in drugs_list[i+1:min(i+51, len(drugs_list))]:
drug2_se = drug_to_sideeffects[drug2_id]
overlap = drug1_se & drug2_se
if len(overlap) >= 10:
drug_conflicts.append({
'drug1': node_id_to_name.get(drug1_id, drug1_id),
'drug2': node_id_to_name.get(drug2_id, drug2_id),
'shared_side_effects': len(overlap),
'drug1_total_se': len(drug1_se),
'drug2_total_se': len(drug2_se),
'overlap_percentage': (len(overlap) / min(len(drug1_se), len(drug2_se))) * 100
})
drug_conflicts_df = pd.DataFrame(drug_conflicts)
if len(drug_conflicts_df) > 0:
drug_conflicts_df = drug_conflicts_df.sort_values('shared_side_effects', ascending=False)
drug_conflicts_df.to_csv(OUTPUT_DIR / "analysis_drug_conflicts.csv", index=False)
print(f"Found {len(drug_conflicts_df):,} drug conflict pairs")
# ANALYSIS 8: NETWORK DATA
print("\n🕸️ Analysis 8: Network Visualization Data")
print("-"*60)
top_diseases = disease_df_sorted.nlargest(20, 'num_symptoms')['id'].tolist()
network_nodes = []
network_edges = []
node_id_counter = 0
id_mapping = {}
# Add disease nodes
for disease_id in top_diseases:
node_id = f"d_{node_id_counter}"
id_mapping[disease_id] = node_id
network_nodes.append({
'id': node_id,
'label': node_id_to_name.get(disease_id, disease_id),
'type': 'Disease',
'original_id': disease_id
})
node_id_counter += 1
# Add genes
disease_genes = gene_disease_edges[
gene_disease_edges['source'].isin(top_diseases)
].head(150)
for _, row in disease_genes.iterrows():
gene_id = row['target']
if gene_id not in id_mapping:
node_id = f"g_{node_id_counter}"
id_mapping[gene_id] = node_id
network_nodes.append({
'id': node_id,
'label': node_id_to_name.get(gene_id, gene_id),
'type': 'Gene',
'original_id': gene_id
})
node_id_counter += 1
network_edges.append({
'source': id_mapping[row['source']],
'target': id_mapping[gene_id],
'type': 'associates'
})
# Add drugs
drug_treatments = treats_edges[treats_edges['target'].isin(top_diseases)].head(50)
for _, row in drug_treatments.iterrows():
drug_id = row['source']
if drug_id not in id_mapping:
node_id = f"c_{node_id_counter}"
id_mapping[drug_id] = node_id
network_nodes.append({
'id': node_id,
'label': node_id_to_name.get(drug_id, drug_id),
'type': 'Compound',
'original_id': drug_id
})
node_id_counter += 1
network_edges.append({
'source': id_mapping[drug_id],
'target': id_mapping[row['target']],
'type': 'treats'
})
network_nodes_df = pd.DataFrame(network_nodes)
network_edges_df = pd.DataFrame(network_edges)
network_nodes_df.to_csv(OUTPUT_DIR / "network_nodes.csv", index=False)
network_edges_df.to_csv(OUTPUT_DIR / "network_edges.csv", index=False)
print(f"Network: {len(network_nodes_df)} nodes, {len(network_edges_df)} edges")
# ==============================
# SUMMARY
# ==============================
print("\n" + "="*60)
print("ETL PIPELINE COMPLETED SUCCESSFULLY")
print("="*60)
print(f"\nOutput directory: {OUTPUT_DIR.resolve()}")