
Bu Python kodu, hisse senedi yatırımcıları için tasarlanmış, EMA (Üssel Hareketli Ortalama) tabanlı bir sinyal analiz sistemidir. Kod, her hisse için farklı vadelere özel sinyal üretir ve bu sinyallerin nasıl oluşup sonlandığını titizlikle takip eder.
Kodun kısa, orta ve uzun vadeli sinyal mantığına dair detaylar şu şekilde
Python Kodu, Daha önce oluşturduğumuz python StokData/Kapanis klasöründe bulunan hisselerin kapanış datalarını kullanır. Seans kapanışından sonra kodu çalıştırdığınızda Ema sinyallerini oluşturur, bu sinyallerin durumlarını analiz eder güncel durumu raporlar. Bu rapor, hisselerin son sinyalini, sinyalin kaç gündür aktif olduğunu ve bu süreçteki kar/zarar durumunu net bir şekilde gösterir. Bu sayede yatırımcılar, hangi hisselerin hangi trendde olduğunu kolayca anlayabilir ve buna göre pozisyonlarını yönetebilir.
Örnek Excel Çalışması
BorsaPin_EMA_Analizleri
X_06_BorsaPin_EmaSinyals.py olarak adlandırdım. Ver.1.02 Güncellndi
Kısa orta ve uzun vade de al sat bekle gibi sinyalleri Excel'e dökümünü yapacaktır.
Python Betiğimize ait kod
import pandas as pd
import os
import warnings
from datetime import datetime
from colorama import Fore, init
import glob
from typing import List, Dict, Any, Optional
from openpyxl import load_workbook
from openpyxl.utils.dataframe import dataframe_to_rows
from openpyxl.styles import PatternFill, Font, Alignment, Border, Side
from openpyxl.formatting.rule import ColorScaleRule
"""
Borsapin StokData/Kapanis klasöründeki hisse datalarından
Kısa, orta,uzun vade al sat tut gibi sinyalleri excele aktarır.
Pine Script™'e EMA Sinyal Tablosu indikatörümüze göre güncellenmiş kurallar ile
Sinyal bozulma uyarıları eklendi
Ver 1.02
www.kursatsenturk.com
"""
# Uyarıları kapat
warnings.filterwarnings('ignore')
init(autoreset=True)
class BorsaPinEMAAnalyzer:
def __init__(self):
self.ema_periods = {
'kisa_vade': [5, 8, 13, 21],
'orta_vade': [34, 55],
'uzun_vade': [89, 144]
}
self.successful_files: List[str] = []
self.failed_files: List[str] = []
@staticmethod
def calculate_ema(data: pd.Series, period: int) -> pd.Series:
"""EMA hesaplama fonksiyonu"""
return data.ewm(span=period, adjust=False).mean()
@staticmethod
def load_stock_data(file_path: str) -> pd.DataFrame:
"""Hisse verilerini Excel dosyasından yükleme"""
try:
df = pd.read_excel(file_path)
required_columns = ['Tarih', 'Kapanış']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise ValueError(f"Excel dosyasında eksik sütunlar: {missing_columns}")
df['Tarih'] = pd.to_datetime(df['Tarih'])
df = df.sort_values('Tarih').reset_index(drop=True)
return df
except (FileNotFoundError, ValueError, pd.errors.ParserError) as e:
print(f"{Fore.RED}❌ Veri yükleme hatası: {e}")
return pd.DataFrame()
def calculate_all_emas(self, df: pd.DataFrame) -> pd.DataFrame:
"""Tüm EMA'ları hesapla"""
close_prices = df['Kapanış']
all_periods = self.ema_periods['kisa_vade'] + self.ema_periods['orta_vade'] + self.ema_periods['uzun_vade']
for period in all_periods:
if len(df) >= period:
df[f'EMA{period}'] = self.calculate_ema(close_prices, period)
else:
df[f'EMA{period}'] = pd.NA
return df
@staticmethod
def check_signal_deterioration(df: pd.DataFrame, i: int) -> Dict[str, str]:
"""Sinyal bozulma durumlarını kontrol et"""
warnings = {
'kisa_uyari': '',
'orta_uyari': '',
'uzun_uyari': ''
}
if i < 1: # En az 1 önceki veri gerekli
return warnings
kapanis = df.loc[i, 'Kapanış']
# KISA VADE UYARILARI
if all(pd.notna(df.loc[i, col]) for col in ['EMA5', 'EMA8', 'EMA13', 'EMA21']) and \
all(pd.notna(df.loc[i - 1, col]) for col in ['EMA5', 'EMA8', 'EMA13', 'EMA21']):
ema5, ema8, ema13, ema21 = df.loc[i, 'EMA5'], df.loc[i, 'EMA8'], df.loc[i, 'EMA13'], df.loc[i, 'EMA21']
ema5_once, ema8_once, ema13_once = df.loc[i - 1, 'EMA5'], df.loc[i - 1, 'EMA8'], df.loc[i - 1, 'EMA13']
# Önce durum kontrolü yap
kisa_durum = df.loc[i, 'Kisa_Durum'] if 'Kisa_Durum' in df.columns else 'BEKLE'
kisa_durum_once = df.loc[i - 1, 'Kisa_Durum'] if i > 0 and 'Kisa_Durum' in df.columns else 'BEKLE'
# AL sinyali varken 21 EMA altına sarktığında
if kisa_durum == 'AL' and kapanis < ema21:
warnings['kisa_uyari'] = "⚠️ 21 EMA Altına Sarkma"
# AL sinyali varken 5, 8, 13 EMA'da aşağı eğim başladığında (sinyal bozulmaya başladı)
elif kisa_durum == 'AL' and (ema5 < ema5_once and ema8 < ema8_once and ema13 < ema13_once):
warnings['kisa_uyari'] = "⚠️ EMA Eğim Bozulması"
# ORTA VADE UYARILARI
if all(pd.notna(df.loc[i, col]) for col in ['EMA34', 'EMA55']) and i > 0 and \
all(pd.notna(df.loc[i - 1, col]) for col in ['EMA34', 'EMA55']):
ema34 = df.loc[i, 'EMA34']
ema55 = df.loc[i, 'EMA55']
ema34_once = df.loc[i - 1, 'EMA34']
ema55_once = df.loc[i - 1, 'EMA55']
orta_durum = df.loc[i, 'Orta_Durum'] if 'Orta_Durum' in df.columns else 'BEKLE'
orta_durum_once = df.loc[i - 1, 'Orta_Durum'] if i > 0 and 'Orta_Durum' in df.columns else 'BEKLE'
# AL sinyali varken 34 EMA altında kapatmalarda
if orta_durum == 'AL' and kapanis < ema34:
warnings['orta_uyari'] = "⚠️ 34 EMA Altında Kapanış"
# AL sinyali varken 34 ve 55 EMA'da aşağı eğim (sinyal bozulmaya başladı)
elif orta_durum == 'AL' and (ema34 < ema34_once and ema55 < ema55_once):
warnings['orta_uyari'] = "⚠️ EMA Eğim Bozulması"
# UZUN VADE UYARILARI
if all(pd.notna(df.loc[i, col]) for col in ['EMA89', 'EMA144']) and i > 0 and \
all(pd.notna(df.loc[i - 1, col]) for col in ['EMA89', 'EMA144']):
ema89 = df.loc[i, 'EMA89']
ema144 = df.loc[i, 'EMA144']
ema89_once = df.loc[i - 1, 'EMA89']
ema144_once = df.loc[i - 1, 'EMA144']
uzun_durum = df.loc[i, 'Uzun_Durum'] if 'Uzun_Durum' in df.columns else 'BEKLE'
uzun_durum_once = df.loc[i - 1, 'Uzun_Durum'] if i > 0 and 'Uzun_Durum' in df.columns else 'BEKLE'
# AL sinyali varken 89 EMA altına sarkmada
if uzun_durum == 'AL' and kapanis < ema89:
warnings['uzun_uyari'] = "⚠️ 89 EMA Altına Sarkma"
# AL sinyali varken 89 ve 144 EMA'da aşağı eğim (sinyal bozulmaya başladı)
elif uzun_durum == 'AL' and (ema89 < ema89_once and ema144 < ema144_once):
warnings['uzun_uyari'] = "⚠️ EMA Eğim Bozulması"
return warnings
def determine_signal_status(self, df: pd.DataFrame) -> pd.DataFrame:
"""Sinyal durumlarını belirle - Pine Script™'e kurallar güncellendi 16.08.25"""
df['Kisa_Durum'] = 'BEKLE'
df['Orta_Durum'] = 'BEKLE'
df['Uzun_Durum'] = 'BEKLE'
df['Kisa_Bar_Sayaci'] = 0
df['Orta_Bar_Sayaci'] = 0
df['Uzun_Bar_Sayaci'] = 0
df['Kisa_Sinyal_Tarihi'] = pd.NaT
df['Orta_Sinyal_Tarihi'] = pd.NaT
df['Uzun_Sinyal_Tarihi'] = pd.NaT
df['Kisa_Sinyal_Fiyati'] = pd.NA
df['Orta_Sinyal_Fiyati'] = pd.NA
df['Uzun_Sinyal_Fiyati'] = pd.NA
# Yeni uyarı sütunları
df['Kisa_Uyari'] = ''
df['Orta_Uyari'] = ''
df['Uzun_Uyari'] = ''
if len(df) < 2:
return df
for i in range(1, len(df)):
kisa_durum_once = df.loc[i - 1, 'Kisa_Durum']
orta_durum_once = df.loc[i - 1, 'Orta_Durum']
uzun_durum_once = df.loc[i - 1, 'Uzun_Durum']
kapanis = df.loc[i, 'Kapanış']
# KISA VADE SİNYALLERİ - Pine Script™ indikatörümüzle uyumlu
if all(pd.notna(df.loc[i, col]) for col in ['EMA5', 'EMA8', 'EMA13', 'EMA21']) and \
all(pd.notna(df.loc[i - 1, col]) for col in ['EMA5', 'EMA8', 'EMA13', 'EMA21']):
ema5, ema8, ema13, ema21 = df.loc[i, 'EMA5'], df.loc[i, 'EMA8'], df.loc[i, 'EMA13'], df.loc[i, 'EMA21']
kisa_al_kosulu = (kapanis > ema5 and kapanis > ema8 and kapanis > ema13 and kapanis > ema21)
kisa_egim_yukari = (ema5 > df.loc[i - 1, 'EMA5'] and ema8 > df.loc[i - 1, 'EMA8'] and
ema13 > df.loc[i - 1, 'EMA13'] and ema21 > df.loc[i - 1, 'EMA21'])
if kisa_al_kosulu and kisa_egim_yukari:
df.loc[i, 'Kisa_Durum'] = "AL"
if kisa_durum_once != "AL":
df.loc[i, 'Kisa_Bar_Sayaci'] = 1
df.loc[i, 'Kisa_Sinyal_Tarihi'] = df.loc[i, 'Tarih']
df.loc[i, 'Kisa_Sinyal_Fiyati'] = df.loc[i, 'Kapanış']
else:
df.loc[i, 'Kisa_Bar_Sayaci'] = df.loc[i - 1, 'Kisa_Bar_Sayaci'] + 1
df.loc[i, 'Kisa_Sinyal_Tarihi'] = df.loc[i - 1, 'Kisa_Sinyal_Tarihi']
df.loc[i, 'Kisa_Sinyal_Fiyati'] = df.loc[i - 1, 'Kisa_Sinyal_Fiyati']
else:
kisa_egim_asagi = (ema21 < df.loc[i - 1, 'EMA21'] and ema13 < df.loc[i - 1, 'EMA13'])
if kapanis < ema21 and kisa_egim_asagi:
df.loc[i, 'Kisa_Durum'] = "SAT"
if kisa_durum_once != "SAT":
df.loc[i, 'Kisa_Bar_Sayaci'] = 1
df.loc[i, 'Kisa_Sinyal_Tarihi'] = df.loc[i, 'Tarih']
df.loc[i, 'Kisa_Sinyal_Fiyati'] = df.loc[i, 'Kapanış']
else:
df.loc[i, 'Kisa_Bar_Sayaci'] = df.loc[i - 1, 'Kisa_Bar_Sayaci'] + 1
df.loc[i, 'Kisa_Sinyal_Tarihi'] = df.loc[i - 1, 'Kisa_Sinyal_Tarihi']
df.loc[i, 'Kisa_Sinyal_Fiyati'] = df.loc[i - 1, 'Kisa_Sinyal_Fiyati']
else:
# AL sinyalini koruma mantığı
if kisa_durum_once == 'AL' and df.loc[i, 'Kisa_Durum'] == 'BEKLE':
df.loc[i, 'Kisa_Durum'] = "AL"
df.loc[i, 'Kisa_Bar_Sayaci'] = df.loc[i - 1, 'Kisa_Bar_Sayaci'] + 1
df.loc[i, 'Kisa_Sinyal_Tarihi'] = df.loc[i - 1, 'Kisa_Sinyal_Tarihi']
df.loc[i, 'Kisa_Sinyal_Fiyati'] = df.loc[i - 1, 'Kisa_Sinyal_Fiyati']
else:
df.loc[i, 'Kisa_Durum'] = "BEKLE"
df.loc[i, 'Kisa_Bar_Sayaci'] = 0
df.loc[i, 'Kisa_Sinyal_Tarihi'] = pd.NaT
df.loc[i, 'Kisa_Sinyal_Fiyati'] = pd.NA
# ORTA VADE SİNYALLERİ
if all(col in df.columns for col in ['EMA34', 'EMA55']) and all(
pd.notna(df.loc[i, col]) for col in ['EMA34', 'EMA55']):
ema34 = df.loc[i, 'EMA34']
ema55 = df.loc[i, 'EMA55']
if kapanis > ema34 and kapanis > ema55 and ema34 > ema55:
df.loc[i, 'Orta_Durum'] = "AL"
if orta_durum_once != "AL":
df.loc[i, 'Orta_Bar_Sayaci'] = 1
df.loc[i, 'Orta_Sinyal_Tarihi'] = df.loc[i, 'Tarih']
df.loc[i, 'Orta_Sinyal_Fiyati'] = df.loc[i, 'Kapanış']
else:
df.loc[i, 'Orta_Bar_Sayaci'] = df.loc[i - 1, 'Orta_Bar_Sayaci'] + 1
df.loc[i, 'Orta_Sinyal_Tarihi'] = df.loc[i - 1, 'Orta_Sinyal_Tarihi']
df.loc[i, 'Orta_Sinyal_Fiyati'] = df.loc[i - 1, 'Orta_Sinyal_Fiyati']
elif kapanis < ema55:
df.loc[i, 'Orta_Durum'] = "SAT"
if orta_durum_once != "SAT":
df.loc[i, 'Orta_Bar_Sayaci'] = 1
df.loc[i, 'Orta_Sinyal_Tarihi'] = df.loc[i, 'Tarih']
df.loc[i, 'Orta_Sinyal_Fiyati'] = df.loc[i, 'Kapanış']
else:
df.loc[i, 'Orta_Bar_Sayaci'] = df.loc[i - 1, 'Orta_Bar_Sayaci'] + 1
df.loc[i, 'Orta_Sinyal_Tarihi'] = df.loc[i - 1, 'Orta_Sinyal_Tarihi']
df.loc[i, 'Orta_Sinyal_Fiyati'] = df.loc[i - 1, 'Orta_Sinyal_Fiyati']
else:
df.loc[i, 'Orta_Durum'] = "BEKLE"
df.loc[i, 'Orta_Bar_Sayaci'] = 0
df.loc[i, 'Orta_Sinyal_Tarihi'] = pd.NaT
df.loc[i, 'Orta_Sinyal_Fiyati'] = pd.NA
# UZUN VADE SİNYALLERİ
if all(col in df.columns for col in ['EMA89', 'EMA144']) and all(
pd.notna(df.loc[i, col]) for col in ['EMA89', 'EMA144']):
ema89 = df.loc[i, 'EMA89']
ema144 = df.loc[i, 'EMA144']
if kapanis > ema89 and kapanis > ema144:
df.loc[i, 'Uzun_Durum'] = "AL"
if uzun_durum_once != "AL":
df.loc[i, 'Uzun_Bar_Sayaci'] = 1
df.loc[i, 'Uzun_Sinyal_Tarihi'] = df.loc[i, 'Tarih']
df.loc[i, 'Uzun_Sinyal_Fiyati'] = df.loc[i, 'Kapanış']
else:
df.loc[i, 'Uzun_Bar_Sayaci'] = df.loc[i - 1, 'Uzun_Bar_Sayaci'] + 1
df.loc[i, 'Uzun_Sinyal_Tarihi'] = df.loc[i - 1, 'Uzun_Sinyal_Tarihi']
df.loc[i, 'Uzun_Sinyal_Fiyati'] = df.loc[i - 1, 'Uzun_Sinyal_Fiyati']
elif kapanis < ema144:
df.loc[i, 'Uzun_Durum'] = "SAT"
if uzun_durum_once != "SAT":
df.loc[i, 'Uzun_Bar_Sayaci'] = 1
df.loc[i, 'Uzun_Sinyal_Tarihi'] = df.loc[i, 'Tarih']
df.loc[i, 'Uzun_Sinyal_Fiyati'] = df.loc[i, 'Kapanış']
else:
df.loc[i, 'Uzun_Bar_Sayaci'] = df.loc[i - 1, 'Uzun_Bar_Sayaci'] + 1
df.loc[i, 'Uzun_Sinyal_Tarihi'] = df.loc[i - 1, 'Uzun_Sinyal_Tarihi']
df.loc[i, 'Uzun_Sinyal_Fiyati'] = df.loc[i - 1, 'Uzun_Sinyal_Fiyati']
else:
df.loc[i, 'Uzun_Durum'] = "BEKLE"
df.loc[i, 'Uzun_Bar_Sayaci'] = 0
df.loc[i, 'Uzun_Sinyal_Tarihi'] = pd.NaT
df.loc[i, 'Uzun_Sinyal_Fiyati'] = pd.NA
# SİNYAL BOZULMA UYARILARINI KONTROL ET
warnings_dict = self.check_signal_deterioration(df, i)
df.loc[i, 'Kisa_Uyari'] = warnings_dict['kisa_uyari']
df.loc[i, 'Orta_Uyari'] = warnings_dict['orta_uyari']
df.loc[i, 'Uzun_Uyari'] = warnings_dict['uzun_uyari']
return df
@staticmethod
def get_latest_signals(df: pd.DataFrame) -> Optional[Dict[str, Any]]:
"""En son sinyal durumlarını getir"""
if len(df) == 0:
return None
latest = df.iloc[-1]
# Sinyal gücü ataması
kisa_guc = '⭐' if latest['Kisa_Durum'] != 'BEKLE' else '⏳'
orta_guc = '⭐' if latest['Orta_Durum'] != 'BEKLE' else '⏳'
uzun_guc = '⭐' if latest['Uzun_Durum'] != 'BEKLE' else '⏳'
# Fiyat farkı hesaplamaları
kisa_fark_tl = latest['Kapanış'] - latest['Kisa_Sinyal_Fiyati'] if pd.notna(
latest['Kisa_Sinyal_Fiyati']) else pd.NA
kisa_fark_yuzde = (kisa_fark_tl / latest['Kisa_Sinyal_Fiyati']) * 100 if pd.notna(kisa_fark_tl) else pd.NA
orta_fark_tl = latest['Kapanış'] - latest['Orta_Sinyal_Fiyati'] if pd.notna(
latest['Orta_Sinyal_Fiyati']) else pd.NA
orta_fark_yuzde = (orta_fark_tl / latest['Orta_Sinyal_Fiyati']) * 100 if pd.notna(orta_fark_tl) else pd.NA
uzun_fark_tl = latest['Kapanış'] - latest['Uzun_Sinyal_Fiyati'] if pd.notna(
latest['Uzun_Sinyal_Fiyati']) else pd.NA
uzun_fark_yuzde = (uzun_fark_tl / latest['Uzun_Sinyal_Fiyati']) * 100 if pd.notna(uzun_fark_tl) else pd.NA
# NaN değerler yerine 'Yok' metnini kullanma
def format_value(value, format_str, default='Yok'):
if pd.notna(value):
return format_str.format(value)
return default
return {
'tarih': latest['Tarih'].strftime('%Y-%m-%d'),
'kapanis': latest['Kapanış'],
'kisa': {
'durum': latest['Kisa_Durum'],
'bar_sayaci': latest['Kisa_Bar_Sayaci'],
'sinyal_tarihi': format_value(latest['Kisa_Sinyal_Tarihi'], '{:%d.%m.%Y}'),
'sinyal_fiyati': format_value(latest['Kisa_Sinyal_Fiyati'], '{:.2f}'),
'fark_tl': format_value(kisa_fark_tl, '{:.2f}'),
'fark_yuzde': format_value(kisa_fark_yuzde, '{:.2f}%'),
'guc': kisa_guc,
'uyari': latest['Kisa_Uyari'] if 'Kisa_Uyari' in df.columns else ''
},
'orta': {
'durum': latest['Orta_Durum'],
'bar_sayaci': latest['Orta_Bar_Sayaci'],
'sinyal_tarihi': format_value(latest['Orta_Sinyal_Tarihi'], '{:%d.%m.%Y}'),
'sinyal_fiyati': format_value(latest['Orta_Sinyal_Fiyati'], '{:.2f}'),
'fark_tl': format_value(orta_fark_tl, '{:.2f}'),
'fark_yuzde': format_value(orta_fark_yuzde, '{:.2f}%'),
'guc': orta_guc,
'uyari': latest['Orta_Uyari'] if 'Orta_Uyari' in df.columns else ''
},
'uzun': {
'durum': latest['Uzun_Durum'],
'bar_sayaci': latest['Uzun_Bar_Sayaci'],
'sinyal_tarihi': format_value(latest['Uzun_Sinyal_Tarihi'], '{:%d.%m.%Y}'),
'sinyal_fiyati': format_value(latest['Uzun_Sinyal_Fiyati'], '{:.2f}'),
'fark_tl': format_value(uzun_fark_tl, '{:.2f}'),
'fark_yuzde': format_value(uzun_fark_yuzde, '{:.2f}%'),
'guc': uzun_guc,
'uyari': latest['Uzun_Uyari'] if 'Uzun_Uyari' in df.columns else ''
}
}
def process_single_stock(self, file_path: str) -> Optional[Dict[str, Any]]:
"""Tek bir hisse için EMA analizi yap ve sonuçları topla"""
ticker_name = os.path.splitext(os.path.basename(file_path))[0]
print(f"{Fore.YELLOW} EMA sinyalleri hesaplanıyor: {ticker_name}...")
try:
df = self.load_stock_data(file_path)
# En az kısa vade EMA için yeterli veri yoksa hata ver
if df.empty or len(df) < min(self.ema_periods['kisa_vade']):
raise ValueError("Yeterli veri yok veya dosya geçersiz.")
# Tüm EMA'ları ve sinyalleri hesapla
df = self.calculate_all_emas(df)
df = self.determine_signal_status(df)
latest_signals = self.get_latest_signals(df)
if latest_signals:
self.successful_files.append(ticker_name)
latest_signals['hisse'] = ticker_name
print(f"{Fore.GREEN}✅ {ticker_name} için analiz başarılı.")
else:
self.failed_files.append(ticker_name)
print(f"{Fore.RED}❌ {ticker_name} için sinyal oluşturulamadı.")
return latest_signals
except (ValueError, pd.errors.ParserError) as e:
print(f"{Fore.RED}❌ {ticker_name} için hata: {e}")
self.failed_files.append(ticker_name)
return None
def main(self):
"""Ana fonksiyon"""
print(f"{Fore.CYAN} BorsaPin EMA Analiz Sistemi Başlatılıyor...")
input_folder = "StokData/Kapanis/"
input_files = glob.glob(os.path.join(input_folder, "*.xlsx"))
if not input_files:
print(f"{Fore.RED}❌ {input_folder} klasöründe Excel dosyası bulunamadı!")
return
all_latest_results: List[Dict[str, Any]] = []
for i, file_path in enumerate(input_files, 1):
print(f"\n{Fore.MAGENTA}[{i}/{len(input_files)}] İşleniyor...")
latest_signals = self.process_single_stock(file_path)
if latest_signals:
all_latest_results.append(latest_signals)
if all_latest_results:
self.save_all_signals_to_excel(all_latest_results)
self.print_summary()
self.save_failed_list()
print(f"\n{Fore.GREEN} EMA analiz işlemi tamamlandı!")
def save_all_signals_to_excel(self, results: List[Dict[str, Any]]):
"""Tüm sinyalleri tek bir Excel dosyasına farklı sayfalarda kaydet"""
output_folder = "StokData/"
os.makedirs(output_folder, exist_ok=True)
filename = os.path.join(output_folder, "BorsaPin_EMA_Analizleri.xlsx")
summary_df = self.create_summary_table(results)
strong_signals_df = self.filter_strong_signals_df(summary_df)
warning_signals_df = self.filter_warning_signals_df(summary_df)
# Detaylı sinyal tabloları oluştur
kisa_vade_df = self.create_detailed_signal_table(results, 'kisa')
orta_vade_df = self.create_detailed_signal_table(results, 'orta')
uzun_vade_df = self.create_detailed_signal_table(results, 'uzun')
# İstatistik tabloları oluştur
statistics_df = self.create_statistics_table(results)
with pd.ExcelWriter(filename, engine='openpyxl') as writer:
# Ana özet sayfası
if not summary_df.empty:
summary_df.to_excel(writer, sheet_name='Genel Özet', index=False)
self.format_worksheet(writer.sheets['Genel Özet'], summary_df)
# Güçlü sinyaller sayfası
if not strong_signals_df.empty:
strong_signals_df.to_excel(writer, sheet_name='Güçlü Sinyaller', index=False)
self.format_worksheet(writer.sheets['Güçlü Sinyaller'], strong_signals_df)
# Uyarı sinyalleri sayfası
if not warning_signals_df.empty:
warning_signals_df.to_excel(writer, sheet_name='Uyarı Sinyalleri', index=False)
self.format_worksheet(writer.sheets['Uyarı Sinyalleri'], warning_signals_df)
# Detaylı vade sayfaları
if not kisa_vade_df.empty:
kisa_vade_df.to_excel(writer, sheet_name='Kısa Vade Detay', index=False)
self.format_worksheet(writer.sheets['Kısa Vade Detay'], kisa_vade_df)
if not orta_vade_df.empty:
orta_vade_df.to_excel(writer, sheet_name='Orta Vade Detay', index=False)
self.format_worksheet(writer.sheets['Orta Vade Detay'], orta_vade_df)
if not uzun_vade_df.empty:
uzun_vade_df.to_excel(writer, sheet_name='Uzun Vade Detay', index=False)
self.format_worksheet(writer.sheets['Uzun Vade Detay'], uzun_vade_df)
# İstatistik sayfası
if not statistics_df.empty:
statistics_df.to_excel(writer, sheet_name='İstatistikler', index=False)
self.format_worksheet(writer.sheets['İstatistikler'], statistics_df)
print(f"{Fore.GREEN}✅ Tüm analiz sonuçları başarıyla kaydedildi: {filename}")
@staticmethod
def create_summary_table(results: List[Dict[str, Any]]) -> pd.DataFrame:
"""Özet Tablolar """
summary_data = [
{
'Hisse': res['hisse'],
'Kapanış': round(res['kapanis'], 2),
'Kısa Vade': res['kisa']['durum'],
'Kısa Uyarı': res['kisa']['uyari'],
'Sinyal Süresi': res['kisa']['bar_sayaci'],
'Kısa Vade Sinyal Tarihi': res['kisa']['sinyal_tarihi'],
'Kısa Vade Sinyal Fiyatı': res['kisa']['sinyal_fiyati'],
'Kısa Vade Kazanç TL': res['kisa']['fark_tl'],
'Kısa Vade Kazanç %': res['kisa']['fark_yuzde'],
'Orta Vade': res['orta']['durum'],
'Orta Uyarı': res['orta']['uyari'],
'Orta Sinyal Süresi': res['orta']['bar_sayaci'],
'Orta Vade Sinyal Tarihi': res['orta']['sinyal_tarihi'],
'Orta Vade Sinyal Fiyatı': res['orta']['sinyal_fiyati'],
'Orta Vade Kazanç TL': res['orta']['fark_tl'],
'Orta Vade Kazanç %': res['orta']['fark_yuzde'],
'Uzun Vade': res['uzun']['durum'],
'Uzun Uyarı': res['uzun']['uyari'],
'Uzun Sinyal Süresi': res['uzun']['bar_sayaci'],
'Uzun Vade Sinyal Tarihi': res['uzun']['sinyal_tarihi'],
'Uzun Vade Sinyal Fiyatı': res['uzun']['sinyal_fiyati'],
'Uzun Vade Kazanç TL': res['uzun']['fark_tl'],
'Uzun Vade Kazanç %': res['uzun']['fark_yuzde']
} for res in results
]
return pd.DataFrame(summary_data)
@staticmethod
def create_detailed_signal_table(results: List[Dict[str, Any]], vade_type: str) -> pd.DataFrame:
"""Belirli vade için detaylı sinyal tablosu oluştur"""
detailed_data = []
for res in results:
vade_data = res[vade_type]
if vade_data['durum'] != 'BEKLE':
detailed_data.append({
'Hisse': res['hisse'],
'Kapanış': round(res['kapanis'], 2),
'Sinyal Durumu': vade_data['durum'],
'Uyarı Durumu': vade_data['uyari'],
'Sinyal Süresi (Gün)': vade_data['bar_sayaci'],
'Sinyal Tarihi': vade_data['sinyal_tarihi'],
'Sinyal Fiyatı': vade_data['sinyal_fiyati'],
'Kazanç/Kayıp TL': vade_data['fark_tl'],
'Kazanç/Kayıp %': vade_data['fark_yuzde']
})
df = pd.DataFrame(detailed_data)
if not df.empty:
# Sinyal süresine göre sırala (en uzun süreli sinyaller önce)
df = df.sort_values('Sinyal Süresi (Gün)', ascending=False).reset_index(drop=True)
return df
@staticmethod
def filter_warning_signals_df(summary_df: pd.DataFrame) -> pd.DataFrame:
"""Uyarı sinyali olan hisseleri filtreleme"""
if summary_df.empty:
return pd.DataFrame()
warning_signals_df = summary_df[
(summary_df['Kısa Uyarı'] != '') |
(summary_df['Orta Uyarı'] != '') |
(summary_df['Uzun Uyarı'] != '')
].reset_index(drop=True)
return warning_signals_df
@staticmethod
def create_statistics_table(results: List[Dict[str, Any]]) -> pd.DataFrame:
"""İstatistik tablosu"""
stats_data = []
# Genel istatistikler
total_stocks = len(results)
# Her vade için istatistikler
for vade_key, vade_name in [('kisa', 'Kısa Vade'), ('orta', 'Orta Vade'), ('uzun', 'Uzun Vade')]:
al_sinyali = sum(1 for res in results if res[vade_key]['durum'] == 'AL')
sat_sinyali = sum(1 for res in results if res[vade_key]['durum'] == 'SAT')
bekle_sinyali = sum(1 for res in results if res[vade_key]['durum'] == 'BEKLE')
uyari_sinyali = sum(1 for res in results if res[vade_key]['uyari'] != '')
# Ortalama sinyal süresi (sadece aktif sinyaller için)
aktif_sinyaller = [res[vade_key]['bar_sayaci'] for res in results
if res[vade_key]['durum'] != 'BEKLE' and res[vade_key]['bar_sayaci'] > 0]
ortalama_sure = round(sum(aktif_sinyaller) / len(aktif_sinyaller), 1) if aktif_sinyaller else 0
# En uzun sinyal süresi
max_sure = max(aktif_sinyaller) if aktif_sinyaller else 0
# Pozitif/negatif performans (sadece AL sinyalleri için)
al_sinyalleri = [res for res in results if res[vade_key]['durum'] == 'AL']
pozitif_performans = 0
negatif_performans = 0
for res in al_sinyalleri:
try:
fark_str = res[vade_key]['fark_yuzde']
if fark_str != 'Yok' and '%' in fark_str:
fark_value = float(fark_str.replace('%', ''))
if fark_value > 0:
pozitif_performans += 1
else:
negatif_performans += 1
except ValueError:
pass
stats_data.extend([
{
'Vade Türü': vade_name,
'Metrik': 'Toplam Hisse Sayısı',
'Değer': total_stocks,
'Oran %': '100.0'
},
{
'Vade Türü': vade_name,
'Metrik': 'AL Sinyali',
'Değer': al_sinyali,
'Oran %': f"{(al_sinyali / total_stocks * 100):.1f}" if total_stocks > 0 else "0.0"
},
{
'Vade Türü': vade_name,
'Metrik': 'SAT Sinyali',
'Değer': sat_sinyali,
'Oran %': f"{(sat_sinyali / total_stocks * 100):.1f}" if total_stocks > 0 else "0.0"
},
{
'Vade Türü': vade_name,
'Metrik': 'BEKLE Durumu',
'Değer': bekle_sinyali,
'Oran %': f"{(bekle_sinyali / total_stocks * 100):.1f}" if total_stocks > 0 else "0.0"
},
{
'Vade Türü': vade_name,
'Metrik': 'Uyarı Sinyali',
'Değer': uyari_sinyali,
'Oran %': f"{(uyari_sinyali / total_stocks * 100):.1f}" if total_stocks > 0 else "0.0"
},
{
'Vade Türü': vade_name,
'Metrik': 'Ortalama Sinyal Süresi (Gün)',
'Değer': ortalama_sure,
'Oran %': '-'
},
{
'Vade Türü': vade_name,
'Metrik': 'En Uzun Sinyal Süresi (Gün)',
'Değer': max_sure,
'Oran %': '-'
},
{
'Vade Türü': vade_name,
'Metrik': 'Pozitif Performans (AL)',
'Değer': pozitif_performans,
'Oran %': f"{(pozitif_performans / al_sinyali * 100):.1f}" if al_sinyali > 0 else "0.0"
},
{
'Vade Türü': vade_name,
'Metrik': 'Negatif Performans (AL)',
'Değer': negatif_performans,
'Oran %': f"{(negatif_performans / al_sinyali * 100):.1f}" if al_sinyali > 0 else "0.0"
}
])
# Güçlü sinyaller (tüm vadeler AL)
guclu_sinyaller = sum(1 for res in results
if res['kisa']['durum'] == 'AL' and
res['orta']['durum'] == 'AL' and
res['uzun']['durum'] == 'AL')
# Toplam uyarı sinyalleri
toplam_uyari = sum(1 for res in results
if res['kisa']['uyari'] != '' or
res['orta']['uyari'] != '' or
res['uzun']['uyari'] != '')
stats_data.extend([
{
'Vade Türü': 'Genel',
'Metrik': 'Güçlü Sinyal (Tüm Vadeler AL)',
'Değer': guclu_sinyaller,
'Oran %': f"{(guclu_sinyaller / total_stocks * 100):.1f}" if total_stocks > 0 else "0.0"
},
{
'Vade Türü': 'Genel',
'Metrik': 'Toplam Uyarı Sinyali',
'Değer': toplam_uyari,
'Oran %': f"{(toplam_uyari / total_stocks * 100):.1f}" if total_stocks > 0 else "0.0"
}
])
return pd.DataFrame(stats_data)
@staticmethod
def format_worksheet(worksheet, df):
"""Excel çalışma sayfası formatları """
try:
# Başlık satırı formatı
header_fill = PatternFill(start_color='366092', end_color='366092', fill_type='solid')
header_font = Font(color='FFFFFF', bold=True)
header_alignment = Alignment(horizontal='center', vertical='center')
# Kenarlık stili
thin_border = Border(
left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin')
)
# Başlık satırını formatla
for col_num, column_title in enumerate(df.columns, 1):
cell = worksheet.cell(row=1, column=col_num)
cell.fill = header_fill
cell.font = header_font
cell.alignment = header_alignment
cell.border = thin_border
# Veri satırlarını formatla
for row_num in range(2, len(df) + 2):
for col_num in range(1, len(df.columns) + 1):
cell = worksheet.cell(row=row_num, column=col_num)
cell.border = thin_border
cell.alignment = Alignment(horizontal='center', vertical='center')
# Sütun genişliklerini ayarla
for column in worksheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except (TypeError, ValueError):
pass
adjusted_width = min(max_length + 2, 20)
worksheet.column_dimensions[column_letter].width = adjusted_width
# Freeze panes ve autofilter
worksheet.freeze_panes = 'A2'
worksheet.auto_filter.ref = worksheet.dimensions
# Sinyal durumu sütunları için renk kodlaması
for row_num in range(2, len(df) + 2):
for col_num, column_name in enumerate(df.columns, 1):
cell = worksheet.cell(row=row_num, column=col_num)
# Ana özet sayfaları için vade sütunları
if 'Vade' in column_name and column_name.endswith('Vade'):
if cell.value == 'AL':
cell.fill = PatternFill(start_color='90EE90', end_color='90EE90', fill_type='solid')
cell.font = Font(bold=True, color='006400')
elif cell.value == 'SAT':
cell.fill = PatternFill(start_color='FFB6C1', end_color='FFB6C1', fill_type='solid')
cell.font = Font(bold=True, color='8B0000')
elif cell.value == 'BEKLE':
cell.fill = PatternFill(start_color='FFFFE0', end_color='FFFFE0', fill_type='solid')
cell.font = Font(bold=True, color='B8860B')
# Detay sayfaları için "Sinyal Durumu" sütunu
elif column_name == 'Sinyal Durumu':
if cell.value == 'AL':
cell.fill = PatternFill(start_color='90EE90', end_color='90EE90', fill_type='solid')
cell.font = Font(bold=True, color='006400')
elif cell.value == 'SAT':
cell.fill = PatternFill(start_color='FFB6C1', end_color='FFB6C1', fill_type='solid')
cell.font = Font(bold=True, color='8B0000')
# Uyarı sütunları için renk kodlaması
elif 'Uyarı' in column_name:
if cell.value and cell.value != '':
cell.fill = PatternFill(start_color='FFA500', end_color='FFA500', fill_type='solid')
cell.font = Font(bold=True, color='FFFFFF')
# Uyarı durumu sütunu için renk kodlaması
elif column_name == 'Uyarı Durumu':
if cell.value and cell.value != '':
cell.fill = PatternFill(start_color='FFA500', end_color='FFA500', fill_type='solid')
cell.font = Font(bold=True, color='FFFFFF')
# Kazanç/Kayıp yüzde değerleri için renk kodlaması
elif 'Kazanç' in column_name and '%' in column_name:
try:
if cell.value and cell.value != 'Yok' and '%' in str(cell.value):
percentage_value = float(str(cell.value).replace('%', ''))
if percentage_value > 0:
cell.fill = PatternFill(start_color='E6FFE6', end_color='E6FFE6', fill_type='solid')
cell.font = Font(color='006400', bold=True)
elif percentage_value < 0:
cell.fill = PatternFill(start_color='FFE6E6', end_color='FFE6E6', fill_type='solid')
cell.font = Font(color='8B0000', bold=True)
except (ValueError, TypeError):
pass
# Kazanç/Kayıp TL değerleri için renk kodlaması
elif 'Kazanç' in column_name and 'TL' in column_name:
try:
if cell.value and cell.value != 'Yok':
tl_value = float(str(cell.value))
if tl_value > 0:
cell.fill = PatternFill(start_color='E6FFE6', end_color='E6FFE6', fill_type='solid')
cell.font = Font(color='006400', bold=True)
elif tl_value < 0:
cell.fill = PatternFill(start_color='FFE6E6', end_color='FFE6E6', fill_type='solid')
cell.font = Font(color='8B0000', bold=True)
except (ValueError, TypeError):
pass
# Sinyal süresi için renk gradyanı (uzun süreli sinyaller daha koyu)
elif 'Sinyal Süresi' in column_name or 'Sinyal Süresi (Gün)' in column_name:
try:
if cell.value and isinstance(cell.value, (int, float)) and cell.value > 0:
days = int(cell.value)
if days >= 30:
cell.fill = PatternFill(start_color='4CAF50', end_color='4CAF50', fill_type='solid')
cell.font = Font(color='FFFFFF', bold=True)
elif days >= 14:
cell.fill = PatternFill(start_color='8BC34A', end_color='8BC34A', fill_type='solid')
cell.font = Font(color='FFFFFF', bold=True)
elif days >= 7:
cell.fill = PatternFill(start_color='CDDC39', end_color='CDDC39', fill_type='solid')
cell.font = Font(color='333333', bold=True)
elif days >= 3:
cell.fill = PatternFill(start_color='FFF176', end_color='FFF176', fill_type='solid')
cell.font = Font(color='333333', bold=True)
except (ValueError, TypeError):
pass
except Exception as e:
print(f"{Fore.RED}❌ Excel formatlama hatası: {e}")
@staticmethod
def filter_strong_signals_df(summary_df: pd.DataFrame) -> pd.DataFrame:
"""Tüm vadelerde 'AL' sinyali olan hisseleri filtrele"""
if summary_df.empty:
return pd.DataFrame()
strong_signals_df = summary_df[
(summary_df['Kısa Vade'] == 'AL') &
(summary_df['Orta Vade'] == 'AL') &
(summary_df['Uzun Vade'] == 'AL')
].reset_index(drop=True)
return strong_signals_df
def print_summary(self):
"""İşlem özetini ekrana yazdır"""
total_files = len(self.successful_files) + len(self.failed_files)
print("\n" + "=" * 40)
print(f"{Fore.CYAN}BorsaPin EMA Analizi Özeti")
print("=" * 40)
print(f"{Fore.GREEN}✅ Başarılı Analiz Edilen Hisse Sayısı: {len(self.successful_files)} / {total_files}")
if self.successful_files:
print(f"{Fore.GREEN} - Hisse Listesi: {', '.join(self.successful_files)}")
print(f"{Fore.RED}❌ Hata Alan Hisse Sayısı: {len(self.failed_files)} / {total_files}")
if self.failed_files:
print(f"{Fore.RED} - Hisse Listesi: {', '.join(self.failed_files)}")
print("=" * 40)
def save_failed_list(self):
"""Hata veren hisseleri bir dosyaya kaydet"""
if self.failed_files:
with open("StokData/hata_veren_hisseler.txt", "w") as f:
f.write("\n".join(self.failed_files))
print(f"{Fore.RED}❌ Hata veren hisse listesi 'StokData/hata_veren_hisseler.txt' dosyasına kaydedildi.")
if __name__ == "__main__":
analyzer = BorsaPinEMAAnalyzer()
analyzer.main()
Finans piyasalarında, fiyat hareketlerini anlamak ve geleceğe yönelik tahminlerde bulunmak için teknik analiz araçları hayati önem taşır. Bu araçların en popülerlerinden biri, özellikle kısa ve orta vadeli trendleri belirlemede etkili olan Üstel Hareketli Ortalama (Exponential Moving Average – EMA)’dır. EMA’nın ne olduğunu, neden bu kadar önemli olduğunu ve kendi borsa verileriniz üzerinde bu analizi otomatik olarak yapacak bir Python aracını nasıl kullanabileceğinizi ele alacağız.
Hareketli ortalamalar, belirli bir zaman dilimindeki fiyatların ortalamasını alarak fiyat dalgalanmalarını yumuşatan göstergelerdir. EMA, bu ortalama hesaplamasını yaparken son fiyatlara daha fazla ağırlık vererek, basit hareketli ortalamalardan (SMA) daha hızlı tepki verir. Bu özellik, yatırımcıların son piyasa değişikliklerini daha çabuk fark etmelerini sağlar.
EMA’nın temel mantığı, fiyatın EMA’nın üzerinde olması durumunda yükseliş trendinin devam ettiğini, altında olması durumunda ise düşüş trendinin hakim olduğunu gösterir. Farklı periyotlardaki EMA’ların birbiriyle kesişimi ise alım veya satım sinyali olarak yorumlanabilir.
Örneğin, kısa vadeli bir EMA’nın (örneğin EMA 8) uzun vadeli bir EMA’yı (örneğin EMA 21) yukarı yönlü kesmesi, güçlü bir alım sinyali olarak kabul edilebilir.
Python kodu, EMACalculator adında güçlü ve modüler bir sınıf yapısına sahiptir. Bu sınıf, borsa verilerinizi otomatik olarak okur, farklı periyotlarda EMA hesaplamaları yapar ve sonuçları düzenli bir şekilde kaydeder.
Kodun ana bölümleri
ema_periods: Kod, 5, 8, 13, 21, 34, 55, 89, 144, 233, 370 gibi finansal analizde sıkça kullanılan Fibonacci sayılarına dayalı EMA periyotlarını kullanır. Bu periyotlar, farklı zaman dilimlerindeki trendleri gözlemlemenize olanak tanır.
process_single_file: Bu metot, her bir hisse senedi için ayrı ayrı EMA’ları hesaplar. Girdi olarak alınan Excel dosyasından Tarih ve Kapanış verilerini okur ve her bir EMA periyodu için yeni sütunlar oluşturur.
find_input_files: StokData/Kapanis/ klasöründeki tüm Excel dosyalarını otomatik olarak bulur. Bu, yüzlerce hisse senedi verisini tek tek elle işlemek yerine toplu bir şekilde işlem yapmanızı sağlar.
main: Programın ana akışını yöneten metottur. Giriş dosyalarını bulur, her bir dosya için EMA hesaplamasını başlatır ve işlemin sonunda başarılı/başarısız dosyaların özetini gösteren bir rapor sunar.
Bu Makale Daha önceki Makalelerin devamı niteliğinde olduğu için, aşağıdaki makaleleri de incelemeniz büyük önem taşıyor.
Python ve PyCharm kurulumu
Çalışmaların tamamını içeren Google Drive Alanı
X_02_BorsaPin_Emas.py olarak dosyayı kayıt edebilirsiniz.
Python Kodu
import pandas as pd
import os
import numpy as np
from datetime import datetime
from colorama import Fore, init
import glob
"""
Borsapin StokData/Kapanis klasöründeki hisse kapanış datalarından EMA'larını hesaplar
www.kursatsenturk.com
"""
init(autoreset=True)
class EMACalculator:
def __init__(self):
self.ema_periods = [5, 8, 13, 21, 34, 55, 89, 144, 233, 370]
self.successful_files = []
self.failed_files = []
@staticmethod
def calculate_ema(data, period):
"""EMA hesaplama fonksiyonu"""
try:
return data.ewm(span=period, adjust=False).mean()
except Exception as e:
print(f"{Fore.RED}❌ EMA hesaplama hatası (Period {period}): {e}")
return pd.Series([np.nan] * len(data))
def process_single_file(self, file_path):
"""Tek dosya için EMA hesaplama"""
ticker_name = os.path.splitext(os.path.basename(file_path))[0]
try:
print(f"{Fore.YELLOW} EMA hesaplanıyor: {ticker_name}...")
df = pd.read_excel(file_path)
required_columns = ['Tarih', 'Kapanış']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise ValueError(f"Eksik sütunlar: {missing_columns}")
if df.empty or df['Kapanış'].isna().all():
raise ValueError("Kapanış verisi boş veya geçersiz")
df = df.sort_values('Tarih').reset_index(drop=True)
result_df = pd.DataFrame()
result_df['Hisse_Adi'] = [ticker_name] * len(df)
result_df['Tarih'] = df['Tarih']
result_df['Kapanış'] = df['Kapanış']
print(f"{Fore.CYAN} ⚡ EMA hesaplanıyor: ", end="")
for i, period in enumerate(self.ema_periods):
ema_column_name = f'EMA_{period}'
print(f"{period}", end="")
ema_values = self.calculate_ema(df['Kapanış'], period)
result_df[ema_column_name] = ema_values.round(4)
if i < len(self.ema_periods) - 1:
print(", ", end="")
print()
output_folder = "StokData/Emas/"
os.makedirs(output_folder, exist_ok=True)
output_file = os.path.join(output_folder, f"{ticker_name}.xlsx")
result_df.to_excel(output_file, index=False)
print(f"{Fore.GREEN}✅ {ticker_name} EMA verileri başarıyla kaydedildi.")
print(f"{Fore.BLUE} Konum: {output_file}")
print(f"{Fore.BLUE} Toplam satır: {len(result_df)}")
self.successful_files.append(ticker_name)
return True
except Exception as e:
print(f"{Fore.RED}❌ {ticker_name} için hata: {e}")
self.failed_files.append(os.path.basename(file_path))
return False
@staticmethod
def find_input_files(input_folder="StokData/Kapanis/"):
"""Giriş dosyalarını bulma"""
try:
pattern = os.path.join(input_folder, "*.xlsx")
files = glob.glob(pattern)
if not files:
print(f"{Fore.RED}❌ {input_folder} klasöründe Excel dosyası bulunamadı!")
return []
print(f"{Fore.BLUE} {len(files)} adet Excel dosyası bulundu.")
return files
except Exception as e:
print(f"{Fore.RED}❌ Dosya arama hatası: {e}")
return []
@staticmethod
def validate_input_file(file_path):
"""Giriş dosyasını doğrulama"""
try:
df = pd.read_excel(file_path, nrows=1)
required_columns = ['Tarih', 'Kapanış']
if not all(col in df.columns for col in required_columns):
return False, f"Eksik sütunlar: {[col for col in required_columns if col not in df.columns]}"
return True, "OK"
except Exception as e:
return False, f"Dosya okuma hatası: {e}"
def print_summary(self):
"""Özet rapor"""
total = len(self.successful_files) + len(self.failed_files)
success_rate = (len(self.successful_files) / total * 100) if total > 0 else 0
print(f"\n{Fore.CYAN} ===== EMA HESAPLAMA RAPORU =====")
print(f"{Fore.BLUE} EMA Periyodları: {', '.join(map(str, self.ema_periods))}")
print(f"{Fore.GREEN}✅ Başarılı: {len(self.successful_files)}")
print(f"{Fore.RED}❌ Başarısız: {len(self.failed_files)}")
print(f"{Fore.BLUE} Başarı oranı: {success_rate:.1f}%")
if self.successful_files:
print(f"{Fore.GREEN} Başarılı dosyalar: {', '.join(self.successful_files[:10])}")
if len(self.successful_files) > 10:
print(f"{Fore.GREEN} ... ve {len(self.successful_files) - 10} dosya daha")
if self.failed_files:
print(f"{Fore.RED} Başarısız dosyalar: {', '.join(self.failed_files)}")
def save_failed_list(self, filename="basarisiz_ema_dosyalari.txt"):
"""Başarısız dosyaları kaydetme"""
if self.failed_files:
try:
with open(filename, 'w', encoding='utf-8') as f:
f.write("# Başarısız EMA hesaplama dosyaları\n")
f.write(f"# Tarih: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
for file_name in self.failed_files:
f.write(f"{file_name}\n")
print(f"{Fore.YELLOW} Başarısız dosyalar {filename} dosyasına kaydedildi.")
except Exception as e:
print(f"{Fore.RED}❌ Başarısız dosya listesi kaydetme hatası: {e}")
def create_sample_ema_analysis(self, ticker_name="SAMPLE"):
"""Örnek EMA analiz dosyası oluşturma"""
try:
dates = pd.date_range(start='2023-01-01', end='2024-01-01', freq='D')
np.random.seed(42)
base_price = 100
price_changes = np.random.normal(0, 2, len(dates))
prices = [base_price]
for change in price_changes[1:]:
new_price = prices[-1] * (1 + change / 100)
prices.append(max(new_price, 1))
df = pd.DataFrame({
'Tarih': dates,
'Kapanış': prices
})
result_df = pd.DataFrame()
result_df['Hisse_Adi'] = [ticker_name] * len(df)
result_df['Tarih'] = df['Tarih']
result_df['Kapanış'] = df['Kapanış']
for period in self.ema_periods:
ema_values = self.calculate_ema(df['Kapanış'], period)
result_df[f'EMA_{period}'] = ema_values.round(4)
output_folder = "StokData/Emas/"
os.makedirs(output_folder, exist_ok=True)
output_file = os.path.join(output_folder, f"{ticker_name}.xlsx")
result_df.to_excel(output_file, index=False)
print(f"{Fore.GREEN}✅ Örnek EMA dosyası oluşturuldu: {output_file}")
return True
except Exception as e:
print(f"{Fore.RED}❌ Örnek dosya oluşturma hatası: {e}")
return False
def main(self):
"""Ana fonksiyon"""
print(f"{Fore.CYAN} EMA Hesaplama Sistemi Başlatılıyor...")
print(f"{Fore.BLUE} EMA Periyodları: {', '.join(map(str, self.ema_periods))}")
input_files = self.find_input_files()
if not input_files:
print(f"{Fore.YELLOW}⚠️ Giriş dosyası bulunamadı. Örnek dosya oluşturuluyor...")
self.create_sample_ema_analysis()
return
print(f"{Fore.BLUE} Toplam işlenecek dosya: {len(input_files)}\n")
for i, file_path in enumerate(input_files, 1):
print(f"\n{Fore.MAGENTA}[{i}/{len(input_files)}] İşleniyor...")
is_valid, error_msg = self.validate_input_file(file_path)
if not is_valid:
print(f"{Fore.RED}❌ Geçersiz dosya: {os.path.basename(file_path)} - {error_msg}")
self.failed_files.append(os.path.basename(file_path))
continue
self.process_single_file(file_path)
self.save_failed_list()
self.print_summary()
print(f"\n{Fore.GREEN} EMA hesaplama işlemi tamamlandı!")
print(f"{Fore.BLUE} Çıktı klasörü: StokData/Emas/")
# Kullanım
if __name__ == "__main__":
calculator = EMACalculator()
calculator.main()
Bir sonraki makalede İdeal EMA dizilim (sıralama) yani Ema Alignment betiğini paylaşacağım.
Burada bütün hisseleri tarayıp belirli kurallara uyan hisseleri çalışma sayfalarında listeleteceğiz.
idealemaup, idealemadown, idealemanötr, potansiyel taşıyan ve 1 yıllık istatistikleri gösteren çalışma sayfaları olacak.
Son Yorumlar