Bu Python betiği, X_01_BorsaPin_StokData.py Python betiğiyle oluşturulan StokData/Kapanis klasöründeki hisse kapanış verilerinden lineer regresyon trend analizi ve Pearson korelasyon ölçümleri yaparak detaylı teknik analiz raporu üretir. Analizler, Excel formatında renklendirilmiş ve filtrelenebilir şekilde kayıt altına alınır.
Terminalden şu iki komutu çalıştırın .
python.exe -m pip install --upgrade pip
pip install pandas numpy scipy colorama openpyxl
Lineer Regresyon Analizi: Kapanış fiyatları üzerinden trend çizgisi (slope), kanal bandı, standart sapma gibi metrikleri hesaplar.
Pearson Korelasyon: Trendin gücünü ölçmek için farklı periyotlarda Pearson korelasyon katsayısı hesaplanır.
Kanal Altı Fırsatları: Alt banda yakın fiyat hareketleri ve güçlü korelasyon içeren hisseler.
Breakout Adayları: Üst banda yakın olup yükseliş sinyali veren hisseler.
EMA Dizilim Entegrasyonu: Farklı script tarafından oluşturulan EMA analizleriyle birleştirilebilir.
Otomatik Excel Raporu: Tüm veriler biçimlendirilmiş bir Excel dosyasına kaydedilir (filtrelenebilir, renklendirilmiş).
Başarısız Dosya Takibi: Hatalı veriler ayrı bir .txt dosyasına kaydedilir. Analiz Süreci Otomatik Gerçekleşir:
Tüm dosyalar sırasıyla işlenir. Lineer regresyon analizleri yapılır. Fırsatlar belirlenir. Sonuçlar Excel ve TXT dosyalarına kaydedilir.
StokData/lineer_regresyon_analiz.xlsx → Ana analiz dosyası
StokData/LinearRegression/lineer_regresyon_analiz_YYYY-MM-DD.xlsx → Arşiv dosyası
basarisiz_regresyon_dosyalari.txt → Başarısız analizlerin listesi
Excel Sayfaları
TumSonuclar: Tüm hisse ve periyot sonuçları
IdealPearson: En güçlü Pearson korelasyonuna sahip hisseler
KanalFirsatlari_Top50: Kanal altına yakın fırsatlar
BreakoutAdaylari_Top50: Yükselişe aday hisseler
Periyot_XXX: Her analiz periyodu için ayrı sayfalar
Istatistikler: Genel başarı ve analiz özetleri
Otomatik, hızlı ve güvenilir teknik analiz. Yatırım kararlarını destekleyecek fırsat ve sinyal tespiti. Excel üzerinden kolay görsel analiz ve filtreleme imkânı
Bu betik, teknik analizde trend doğruluğunu ve fırsatları istatistiksel olarak belirlemek isteyen yatırımcılar için güçlü bir araçtır. Özellikle BIST verileriyle çalışan sistemlerde regresyon ve Pearson tabanlı analiz ile öne çıkan hisseleri kolayca tespit etmeye yardımcı olur.
LinReg tarama çalışmamızı X_05_BorsaPin_LinReg_Tarama.py adıyla kayıt edebilirsiniz.
Python Betiği
import pandas as pd
import numpy as np
import os
import glob
from scipy.stats import linregress, pearsonr
from datetime import datetime, timedelta
from colorama import Fore, init
from openpyxl.styles import Font, PatternFill, Alignment
from openpyxl.utils import get_column_letter
from typing import List, Dict, Optional
"""
Borsapin StokData/Kapanis klasöründeki hisse verilerinden Lineer Regresyon ve Pearson analizi yapar
www.kursatsenturk.com
"""
init(autoreset=True)
class LinearRegressionAnalyzer:
def __init__(self):
self.pearson_periods = [55, 89, 144, 233, 370, 610, 987]
self.analysis_periods = [55, 89, 144, 233, 370, 610, 987] # Özel analiz periyotları
self.successful_files = []
self.failed_files = []
# Sonuç listeleri
self.all_results = []
self.period_results = {period: [] for period in self.pearson_periods}
self.ideal_pearson = []
self.channel_opportunities = []
self.breakout_candidates = []
self.statistics = {}
@staticmethod
def calculate_regression(data_df: pd.DataFrame, period: int) -> Optional[Dict]:
"""Lineer regresyon ve kanal hesaplama"""
try:
# Son 'period' a kadar veriyi al
data_df = data_df.tail(period).copy()
if len(data_df) < period:
return None
# X ekseni (zaman) ve Y ekseni (fiyat) değerleri
x = np.arange(len(data_df))
y = data_df["Kapanış"].values
# Lineer regresyon hesaplama
slope, intercept, r_value, p_value, std_err = linregress(x, y)
# Trend çizgisi hesaplama
trend_line = intercept + slope * x
# Residual (artık) değerler ve standart sapma
residuals = y - trend_line
std_dev = np.std(residuals)
# Kanal bantları (2 standart sapma)
upper_channel = trend_line + 2 * std_dev
lower_channel = trend_line - 2 * std_dev
# Pearson korelasyon katsayısı
corr, _ = pearsonr(x, y)
# Son değerler
upper_channel_value = round(float(upper_channel[-1]), 2)
lower_channel_value = round(float(lower_channel[-1]), 2)
trend_value = round(float(trend_line[-1]), 2)
last_price = float(y[-1])
# Yüzde farkları
upper_diff_pct = round((last_price - upper_channel_value) / last_price * 100, 2)
lower_diff_pct = round((last_price - lower_channel_value) / last_price * 100, 2)
trend_diff_pct = round((last_price - trend_value) / last_price * 100, 2)
# Trend yönü ve güç
trend_direction = "Yükseliş" if slope > 0 else "Düşüş"
trend_strength = "Güçlü" if abs(slope) > 0.05 else "Orta" if abs(slope) > 0.01 else "Zayıf"
# Kanal pozisyonu
channel_position = "Üst Bant" if upper_diff_pct >= -2 else "Alt Bant" if lower_diff_pct <= 2 else "Orta"
return {
"Periyot": period,
"Kanal_Ust": upper_channel_value,
"Kanal_Alt": lower_channel_value,
"Trend_Cizgisi": trend_value,
"Ust_Fark_Pct": upper_diff_pct,
"Alt_Fark_Pct": lower_diff_pct,
"Trend_Fark_Pct": trend_diff_pct,
"Trend_Yonu": trend_direction,
"Trend_Gucu": trend_strength,
"Kanal_Pozisyonu": channel_position,
"Pearson": round(corr, 4),
"R_Kare": round(r_value ** 2, 4),
"Slope": round(slope, 6),
"P_Value": round(p_value, 6),
"Std_Error": round(std_err, 6)
}
except Exception as e:
print(f"{Fore.RED}❌ Regresyon hesaplama hatası (Period {period}): {e}")
return None
def process_single_file(self, file_path: str) -> bool:
"""Tek dosya için lineer regresyon analizi"""
ticker_name = None
try:
# Dosya adından hisse adını alma
file_name = os.path.basename(file_path)
ticker_name = os.path.splitext(file_name)[0]
print(f"{Fore.YELLOW} Lineer regresyon analizi: {ticker_name}...")
# Excel dosyasını okuma
df = pd.read_excel(file_path)
# Gerekli sütunların varlığını kontrol etme
if 'Tarih' not in df.columns or 'Kapanış' not in df.columns:
raise ValueError("Tarih veya Kapanış sütunu bulunamadı")
# Veri temizleme ve sıralama
df = df[["Tarih", "Kapanış"]].dropna()
df["Tarih"] = pd.to_datetime(df["Tarih"])
df = df.sort_values("Tarih").reset_index(drop=True)
df["Kapanış"] = pd.to_numeric(df["Kapanış"], errors='coerce')
df = df.dropna()
if df.empty:
raise ValueError("Veri boş veya geçersiz")
last_close = round(float(df["Kapanış"].iloc[-1]), 2)
last_date = df["Tarih"].iloc[-1]
print(f"{Fore.CYAN} ⚡ Periyotlar hesaplanıyor: ", end="")
# Her periyot için analiz
ticker_pearson_data = {"Hisse_Adi": ticker_name, "Kapanış": last_close, "Tarih": last_date}
for i, period in enumerate(self.pearson_periods):
print(f"{period}", end="")
result = self.calculate_regression(df.copy(), period)
if result:
# Ana sonuç listesine ekle
main_result = {
"Hisse_Adi": ticker_name,
"Kapanış": last_close,
"Tarih": last_date,
**result
}
self.all_results.append(main_result)
# Periyot bazlı sonuçlara ekle
self.period_results[period].append(main_result)
# İdeal Pearson için veri toplama
ticker_pearson_data[f"Pearson_{period}"] = result["Pearson"]
ticker_pearson_data[f"Kanal_Pozisyon_{period}"] = result["Kanal_Pozisyonu"]
ticker_pearson_data[f"Alt_Fark_{period}"] = result["Alt_Fark_Pct"]
ticker_pearson_data[f"Ust_Fark_{period}"] = result["Ust_Fark_Pct"]
if i < len(self.pearson_periods) - 1:
print(", ", end="")
print()
# İdeal Pearson listesine ekle
self.ideal_pearson.append(ticker_pearson_data)
# Özel analizler için kontrol
self.analyze_opportunities(ticker_name, last_close, last_date, df)
print(f"{Fore.GREEN}✅ {ticker_name} analizi tamamlandı.")
self.successful_files.append(ticker_name)
return True
except Exception as e:
print(f"{Fore.RED}❌ {ticker_name if ticker_name else file_path} için hata: {e}")
self.failed_files.append(os.path.basename(file_path))
return False
def analyze_opportunities(self, ticker_name: str, last_close: float, last_date, df: pd.DataFrame):
"""Fırsat analizi - kanal alt bandı ve breakout adayları"""
try:
for period in self.analysis_periods:
result = self.calculate_regression(df.copy(), period)
if result:
# Alt banda yakın fırsatlar (Alt fark <= 4% ve Pearson >= 0.7)
if (-2 <= result["Alt_Fark_Pct"] <= 4 and
abs(result["Pearson"]) >= 0.7):
self.channel_opportunities.append({
"Hisse_Adi": ticker_name,
"Kapanış": last_close,
"Tarih": last_date,
"Periyot": period,
"Alt_Fark_Pct": result["Alt_Fark_Pct"],
"Pearson": result["Pearson"],
"Trend_Yonu": result["Trend_Yonu"],
"Kanal_Alt": result["Kanal_Alt"],
"Kanal_Ust": result["Kanal_Ust"],
"Potansiyel_Kazanc": round((result["Kanal_Ust"] - last_close) / last_close * 100, 2)
})
# Breakout adayları (Üst fark >= -4% ve Pearson >= 0.8)
if (-4 <= result["Ust_Fark_Pct"] <= 2 and
abs(result["Pearson"]) >= 0.8):
self.breakout_candidates.append({
"Hisse_Adi": ticker_name,
"Kapanış": last_close,
"Tarih": last_date,
"Periyot": period,
"Ust_Fark_Pct": result["Ust_Fark_Pct"],
"Pearson": result["Pearson"],
"Trend_Yonu": result["Trend_Yonu"],
"Kanal_Ust": result["Kanal_Ust"],
"Hedef_Fiyat": round(result["Kanal_Ust"] * 1.05, 2) # %5 üst hedef
})
except Exception as e:
print(f"{Fore.RED}❌ Fırsat analizi hatası ({ticker_name}): {e}")
def calculate_statistics(self):
"""İstatistik hesaplama"""
try:
print(f"{Fore.CYAN} İstatistikler hesaplanıyor...")
# Genel istatistikler
stats = {
"Toplam_Analiz_Edilen": len(self.successful_files),
"Basarisiz_Dosya": len(self.failed_files)
}
# Periyot bazlı istatistikler
for period in self.pearson_periods:
period_data = self.period_results[period]
if period_data:
pearson_values = [item["Pearson"] for item in period_data if item.get("Pearson")]
stats[f"Ortalama_Pearson_{period}"] = round(np.mean(pearson_values), 4) if pearson_values else 0
stats[f"Yuksek_Pearson_Sayisi_{period}"] = len([p for p in pearson_values if abs(p) >= 0.8])
# Fırsat istatistikleri
stats["Kanal_Alt_Firsatlari"] = len(self.channel_opportunities)
stats["Breakout_Adaylari"] = len(self.breakout_candidates)
# Trend analizi
trend_up = len([item for item in self.all_results if item.get("Trend_Yonu") == "Yükseliş"])
trend_down = len([item for item in self.all_results if item.get("Trend_Yonu") == "Düşüş"])
stats["Yukselis_Trendi"] = trend_up
stats["Dusus_Trendi"] = trend_down
self.statistics = stats
return stats
except Exception as e:
print(f"{Fore.RED}❌ İstatistik hesaplama hatası: {e}")
return {}
@staticmethod
def find_input_files(input_folder="StokData/Kapanis/"):
"""Giriş dosyalarını bulma"""
try:
pattern = os.path.join(input_folder, "*.xlsx")
files = glob.glob(pattern)
if not files:
print(f"{Fore.RED}❌ {input_folder} klasöründe Excel dosyası bulunamadı!")
return []
print(f"{Fore.BLUE} {len(files)} adet Excel dosyası bulundu.")
return files
except Exception as e:
print(f"{Fore.RED}❌ Dosya arama hatası: {e}")
return []
def merge_ema_data(self):
"""EMA dizilim verilerini birleştirme"""
try:
ema_file = "StokData/idealema_analiz.xlsx"
if os.path.exists(ema_file):
print(f"{Fore.CYAN} EMA verileri birleştiriliyor...")
ema_df = pd.read_excel(ema_file, sheet_name="IdealEMAUp")
ema_mapping = dict(zip(ema_df["Hisse_Adi"], ema_df["EMA_Dizilim"]))
# Tüm sonuçlara EMA bilgisi ekle
for result in self.all_results:
result["EMA_Dizilim"] = ema_mapping.get(result["Hisse_Adi"], "Bilinmiyor")
for result in self.ideal_pearson:
result["EMA_Dizilim"] = ema_mapping.get(result["Hisse_Adi"], "Bilinmiyor")
print(f"{Fore.GREEN}✅ EMA verileri başarıyla birleştirildi.")
else:
print(f"{Fore.YELLOW}⚠️ EMA dosyası bulunamadı: {ema_file}")
except Exception as e:
print(f"{Fore.RED}❌ EMA veri birleştirme hatası: {e}")
@staticmethod
def write_sheet_with_formatting(excel_writer, df, sheet_name, freeze_panes=None):
"""Excel sayfası yazma ve biçimlendirme"""
try:
if isinstance(df, list):
df = pd.DataFrame(df)
if df.empty:
df = pd.DataFrame([{"Mesaj": "Veri bulunamadı"}])
df.to_excel(excel_writer, sheet_name=sheet_name, index=False)
ws = excel_writer.sheets[sheet_name]
# Başlık biçimlendirme
header_fill = PatternFill(start_color='4472C4', end_color='4472C4', fill_type='solid')
for col_num, column in enumerate(df.columns, 1):
cell = ws.cell(row=1, column=col_num)
cell.font = Font(bold=True, color='FFFFFF')
cell.fill = header_fill
cell.alignment = Alignment(horizontal='center')
# Sütun genişlikleri
if any(keyword in str(column) for keyword in ['Tarih', 'Date']):
ws.column_dimensions[get_column_letter(col_num)].width = 15
elif any(keyword in str(column) for keyword in ['Hisse', 'CODE']):
ws.column_dimensions[get_column_letter(col_num)].width = 12
elif any(keyword in str(column) for keyword in ['Pearson', 'Fark', 'Pct']):
ws.column_dimensions[get_column_letter(col_num)].width = 18
else:
ws.column_dimensions[get_column_letter(col_num)].width = 20
# Veri satırlarını biçimlendirme
for row_num in range(2, len(df) + 2):
for col_num in range(1, len(df.columns) + 1):
cell = ws.cell(row=row_num, column=col_num)
cell.alignment = Alignment(horizontal='center')
column_name = df.columns[col_num - 1]
# Pearson değerleri için renklendirme
if 'Pearson' in str(column_name) and isinstance(cell.value, (int, float)):
if abs(cell.value) >= 0.9:
cell.fill = PatternFill(start_color='00FF00', end_color='00FF00', fill_type='solid')
elif abs(cell.value) >= 0.8:
cell.fill = PatternFill(start_color='90EE90', end_color='90EE90', fill_type='solid')
elif abs(cell.value) >= 0.7:
cell.fill = PatternFill(start_color='FFFF99', end_color='FFFF99', fill_type='solid')
# Fark yüzdeleri için renklendirme
elif 'Alt_Fark' in str(column_name) and isinstance(cell.value, (int, float)):
if -2 <= cell.value <= 4:
cell.fill = PatternFill(start_color='FFFFCC', end_color='FFFFCC', fill_type='solid')
elif 'Ust_Fark' in str(column_name) and isinstance(cell.value, (int, float)):
if -4 <= cell.value <= 2:
cell.fill = PatternFill(start_color='FFCCCC', end_color='FFCCCC', fill_type='solid')
# Freeze panes
if freeze_panes:
ws.freeze_panes = freeze_panes
# AutoFilter ekleme (başlık satırına)
if len(df) > 0:
ws.auto_filter.ref = f"A1:{get_column_letter(len(df.columns))}{len(df) + 1}"
except Exception as e:
print(f"{Fore.RED}❌ Excel biçimlendirme hatası ({sheet_name}): {e}")
def save_results(self):
"""Sonuçları Excel'e kaydetme"""
try:
# Klasörler oluşturma
output_folder = "StokData/LinearRegression/"
os.makedirs(output_folder, exist_ok=True)
# Dosya isimleri
today = datetime.today().strftime('%Y-%m-%d')
main_file = os.path.join("StokData", "lineer_regresyon_analiz.xlsx")
archive_file = os.path.join(output_folder, f"lineer_regresyon_analiz_{today}.xlsx")
# Veri hazırlama
print(f"{Fore.GREEN} Excel dosyası hazırlanıyor...")
# Sıralama işlemleri
# İdeal Pearson - maksimum mutlak Pearson değerine göre sırala
ideal_pearson_df = pd.DataFrame(self.ideal_pearson)
if not ideal_pearson_df.empty:
pearson_cols = [col for col in ideal_pearson_df.columns if 'Pearson_' in col]
if pearson_cols:
ideal_pearson_df['Max_Abs_Pearson'] = ideal_pearson_df[pearson_cols].abs().max(axis=1)
ideal_pearson_df = ideal_pearson_df.sort_values('Max_Abs_Pearson', ascending=False)
# Kanal fırsatları - Pearson değerine göre sırala (İlk 50)
channel_opp_df = pd.DataFrame(self.channel_opportunities)
if not channel_opp_df.empty:
channel_opp_df = channel_opp_df.sort_values(['Pearson', 'Alt_Fark_Pct'], ascending=[False, True]).head(
50)
# Breakout adayları - Pearson değerine göre sırala (İlk 50)
breakout_df = pd.DataFrame(self.breakout_candidates)
if not breakout_df.empty:
breakout_df = breakout_df.sort_values(['Pearson', 'Ust_Fark_Pct'], ascending=[False, False]).head(50)
# İstatistikler
stats_df = pd.DataFrame([
{"Metrik": key, "Deger": value} for key, value in self.statistics.items()
])
# Ana dosyayı kaydet
print(f"{Fore.GREEN} Ana dosya kaydediliyor: {main_file}")
with pd.ExcelWriter(main_file, engine='openpyxl') as writer:
# Ana sayfalar
self.write_sheet_with_formatting(writer, pd.DataFrame(self.all_results), "TumSonuclar", "A2")
self.write_sheet_with_formatting(writer, ideal_pearson_df, "IdealPearson", "A2")
self.write_sheet_with_formatting(writer, channel_opp_df, "KanalFirsatlari_Top50", "A2")
self.write_sheet_with_formatting(writer, breakout_df, "BreakoutAdaylari_Top50", "A2")
self.write_sheet_with_formatting(writer, stats_df, "Istatistikler", "A2")
# Periyot bazlı sayfalar
for period in self.pearson_periods:
period_df = pd.DataFrame(self.period_results[period])
if not period_df.empty:
period_df = period_df.sort_values('Pearson', ascending=False)
self.write_sheet_with_formatting(writer, period_df, f"Periyot_{period}", "A2")
# Arşiv dosyasını kaydet
print(f"{Fore.BLUE} Arşiv dosyası kaydediliyor: {archive_file}")
with pd.ExcelWriter(archive_file, engine='openpyxl') as writer:
self.write_sheet_with_formatting(writer, pd.DataFrame(self.all_results), "TumSonuclar", "A2")
self.write_sheet_with_formatting(writer, ideal_pearson_df, "IdealPearson", "A2")
self.write_sheet_with_formatting(writer, channel_opp_df, "KanalFirsatlari_Top50", "A2")
self.write_sheet_with_formatting(writer, breakout_df, "BreakoutAdaylari_Top50", "A2")
self.write_sheet_with_formatting(writer, stats_df, "Istatistikler", "A2")
for period in self.pearson_periods:
period_df = pd.DataFrame(self.period_results[period])
if not period_df.empty:
period_df = period_df.sort_values('Pearson', ascending=False)
self.write_sheet_with_formatting(writer, period_df, f"Periyot_{period}", "A2")
return True
except Exception as e:
print(f"{Fore.RED}❌ Sonuç kaydetme hatası: {e}")
return False
def print_summary(self):
"""Özet rapor"""
total = len(self.successful_files) + len(self.failed_files)
success_rate = (len(self.successful_files) / total * 100) if total > 0 else 0
print(f"\n{Fore.CYAN} ===== LİNEER REGRESYON ANALİZ RAPORU =====")
print(f"{Fore.BLUE} Analiz Periyotları: {', '.join(map(str, self.pearson_periods))}")
print(f"{Fore.GREEN}✅ Başarılı: {len(self.successful_files)}")
print(f"{Fore.RED}❌ Başarısız: {len(self.failed_files)}")
print(f"{Fore.BLUE} Başarı oranı: {success_rate:.1f}%")
print(f"\n{Fore.MAGENTA} SONUÇLAR:")
print(f"{Fore.GREEN} Toplam Analiz: {len(self.all_results)}")
print(f"{Fore.YELLOW} Kanal Alt Fırsatları: {len(self.channel_opportunities)}")
print(f"{Fore.RED} Breakout Adayları: {len(self.breakout_candidates)}")
print(f"{Fore.BLUE} İdeal Pearson Sayısı: {len(self.ideal_pearson)}")
# En yüksek Pearson değerleri
if self.all_results:
high_pearson = [r for r in self.all_results if abs(r.get('Pearson', 0)) >= 0.9]
print(f"{Fore.GREEN} ⭐ Yüksek Pearson (>=0.9): {len(high_pearson)}")
def save_failed_list(self, filename="basarisiz_regresyon_dosyalari.txt"):
"""Başarısız dosyaları kaydetme"""
if self.failed_files:
try:
with open(filename, 'w', encoding='utf-8') as f:
f.write("# Başarısız Lineer Regresyon analizi dosyaları\n")
f.write(f"# Tarih: {datetime.now().strftime('%Y-%m-%d')}\n\n")
for file_name in self.failed_files:
f.write(f"{file_name}\n")
print(f"{Fore.YELLOW} Başarısız dosyalar {filename} dosyasına kaydedildi.")
except Exception as e:
print(f"{Fore.RED}❌ Başarısız dosya listesi kaydetme hatası: {e}")
def main(self):
"""Ana fonksiyon"""
print(f"{Fore.CYAN} Lineer Regresyon ve Pearson Analizi Başlatılıyor...")
print(f"{Fore.BLUE} Analiz Periyotları: {', '.join(map(str, self.pearson_periods))}")
print(f"{Fore.BLUE} Özel Analiz Periyotları: {', '.join(map(str, self.analysis_periods))}")
# Giriş dosyalarını bulma
input_files = self.find_input_files()
if not input_files:
return
print(f"{Fore.BLUE} Toplam işlenecek dosya: {len(input_files)}\n")
# Dosyaları işleme
for i, file_path in enumerate(input_files, 1):
print(f"\n{Fore.MAGENTA}[{i}/{len(input_files)}] İşleniyor...")
self.process_single_file(file_path)
# EMA verilerini birleştirme
self.merge_ema_data()
# İstatistikleri hesaplama
self.calculate_statistics()
# Sonuçları kaydetme
success = self.save_results()
if success:
self.print_summary()
# Başarısız dosyaları kaydetme
self.save_failed_list()
print(f"\n{Fore.GREEN} Lineer regresyon analizi tamamlandı!")
print(f"{Fore.BLUE} Ana dosya: StokData/lineer_regresyon_analiz.xlsx")
print(f"{Fore.BLUE} Arşiv klasörü: StokData/LinearRegression/")
# Kullanım
if __name__ == "__main__":
analyzer = LinearRegressionAnalyzer()
analyzer.main()

Hisse senedi piyasalarında başarılı olmak için doğru stratejileri belirlemek büyük önem taşır. Bu stratejilerden biri olan Üstel Hareketli Ortalama (Exponential Moving Average – EMA), fiyat hareketlerinin daha yumuşak ve tepkisel bir şekilde izlenmesini sağlar. Birden fazla EMA’nın bir araya getirilerek kullanıldığı İdeal EMA Stratejisi ise yatırımcılara trendin gücü, yönü ve olası dönüş noktaları hakkında kapsamlı bir bakış sunar. Bu yazıda stratejiyi Python kodları ile nasıl uygulayabileceğinizi ve analizlerinizi nasıl otomatikleştirebileceğinizi ele alacağız.
İdeal EMA, belirli periyotlardaki üstel hareketli ortalamaların (örneğin 5, 8, 13, 21, 34, 55, 89…) birbirine göre dizilimini temel alan bir analiz yöntemidir. Bu stratejinin ana mantığı, bir hisse senedinin fiyatı ve EMA’larının belirli bir sırayla dizilmesi durumunda güçlü bir trendin varlığını işaret etmesidir.
Yazının devamında paylaşacağım Python kodu, bu stratejiyi otomatikleştirmek için tasarlanmıştır.
IdealEMACalculator adlı bir sınıf etrafında kurgulanan bu kod, birden fazla hisse senedi dosyasını analiz ederek İdeal EMA sinyallerini tespit eder ve sonuçları Excel dosyasına kaydeder.
Kod, modüler bir yapıya sahip. Bu yapı sayesinde, her bir metot belirli bir işi yapar ve kodun anlaşılırlığı artar.

Bağımlı olduğu kütüphaneleri terminalde aşağıdaki komutla install edebilirsiniz. pip install pandas numpy openpyxl colorama
Python ile oluşturulan bu otomatize sistem, yatırımcılara manuel olarak yüzlerce hisse senedini tek tek inceleme zahmetinden kurtarır. Her gün yüzlerce veriyi otomatik olarak tarayarak potansiyel İdeal EMA Dizilim sinyallerini anında tespit edebilir.
Yalnız unutmamak gerekir ki, hiçbir strateji her zaman kazanç sağlamaz. İdeal EMA stratejisi de tek başına yeterli olmayabilir. Genellikle hacim, diğer teknik göstergelerde ve temel analiz verileriyle birlikte kullanılması, sinyallerin güvenilirliğini artırır.
Bu Makale Daha önceki Makalelerin devamı niteliğinde olduğu için, aşağıdaki makaleleri de incelemeniz büyük önem taşıyor.
Python ve PyCharm kurulumu
Gerekli kütüphanelerin kurulumu
Kapanış Datalarını çeken Script
Python ile Kapanış Fiyatlarından EMA Hesaplama Betiği
Çalışmaların tamamını içeren Google Drive Alanı
X_03_Borsapin_EmaDizilim.py Çalışmayı Bu isimde kayıt edebilirsiniz.
Python Kodları
import pandas as pd
import numpy as np
import os
from datetime import datetime, timedelta
from colorama import Fore, init
import glob
from openpyxl.styles import Font, PatternFill, Alignment
from openpyxl.utils import get_column_letter
"""
Borsapin StokData/Ema klasöründeki hisse datalarından EMA ALIGNMENT dizilimlerini hesaplar.
www.kursatsenturk.com
"""
init(autoreset=True)
class IdealEMACalculator:
def __init__(self):
self.ema_periods = [5, 8, 13, 21, 34, 55, 89, 144, 233, 370]
self.successful_files = []
self.failed_files = []
# Sonuç listeleri
self.ideal_up = []
self.ideal_down = []
self.neutral = []
self.potential = []
self.all_signals = []
def check_alignment(self, stock_row):
"""EMA dizilimi kontrolü"""
try:
ema_vals = [stock_row[f'EMA_{p}'] for p in self.ema_periods]
closing_price = stock_row['Kapanış']
# İdeal yükseliş: Kapanış > tüm EMA'lar ve EMA'lar büyükten küçüğe sıralı
if closing_price > max(ema_vals) and ema_vals == sorted(ema_vals, reverse=True):
return 'İdeal EMA Yükseliş'
# İdeal düşüş: Kapanış < tüm EMA'lar ve EMA'lar küçükten büyüğe sıralı
elif closing_price < min(ema_vals) and ema_vals == sorted(ema_vals):
return 'İdeal EMA Düşüş'
else:
return 'İdeal EMA Nötr'
except (KeyError, ValueError, TypeError) as e:
# Daha spesifik istisna yakalama
print(f"{Fore.RED}❌ EMA alignment kontrol hatası: {e}")
return 'İdeal EMA Nötr'
def check_potential(self, stock_row):
"""Potansiyel oluşum kontrolü"""
try:
ema_vals = [stock_row[f'EMA_{p}'] for p in self.ema_periods]
closing_price = stock_row['Kapanış']
return min(ema_vals) < closing_price < max(ema_vals)
except (KeyError, ValueError, TypeError):
# Basit durumlar için 'except:' kullanmak yerine 'except Exception' veya spesifik hataları kullanın
return False
def price_below_any_ema(self, stock_row):
"""Kapanış EMA'ların altına sarktı mı?"""
try:
return any(stock_row['Kapanış'] < stock_row[f'EMA_{p}'] for p in self.ema_periods)
except (KeyError, ValueError, TypeError):
return False
@staticmethod
def calculate_gain(entry_price, exit_price):
"""Kazanç hesaplama fonksiyonu"""
if entry_price and exit_price and entry_price > 0:
gain = exit_price - entry_price
gain_percent = (gain / entry_price) * 100
return gain, gain_percent
return 0, 0
@staticmethod
def find_max_price_after_signal(dataframe, signal_date):
"""Sinyal sonrası maksimum fiyat bulma"""
try:
future_data = dataframe[dataframe['Tarih'] > signal_date]
if not future_data.empty:
max_price = future_data['Kapanış'].max()
max_date = future_data[future_data['Kapanış'] == max_price]['Tarih'].iloc[0]
return max_price, max_date
return None, None
except (KeyError, ValueError, IndexError):
return None, None
@staticmethod
def find_min_price_after_signal(dataframe, signal_date):
"""Sinyal sonrası minimum fiyat bulma"""
try:
future_data = dataframe[dataframe['Tarih'] > signal_date]
if not future_data.empty:
min_price = future_data['Kapanış'].min()
min_date = future_data[future_data['Kapanış'] == min_price]['Tarih'].iloc[0]
return min_price, min_date
return None, None
except (KeyError, ValueError, IndexError):
return None, None
def find_latest_signal_date(self, dataframe, target_status):
"""İdeal EMA sinyalinin son oluştuğu tarihi bulma"""
try:
dataframe = dataframe.sort_values("Tarih").reset_index(drop=True)
last_status = None
latest_signal_date = None
for _, row in dataframe.iterrows():
current_status = self.check_alignment(row)
if current_status == target_status and last_status != target_status:
latest_signal_date = row['Tarih']
last_status = current_status
return latest_signal_date
except (KeyError, ValueError, IndexError):
return None
def process_single_file(self, file_path):
"""Tek dosya için İdeal EMA analizi"""
ticker_name = None # ticker_name değişkeni için başlangıç değeri atama
try:
# Dosya adından hisse adını alma
file_name = os.path.basename(file_path)
ticker_name = os.path.splitext(file_name)[0]
print(f"{Fore.YELLOW} İdeal EMA analizi: {ticker_name}...")
# Excel dosyasını okuma
df = pd.read_excel(file_path)
# Gerekli sütunların varlığını kontrol etme
required_columns = ['Tarih', 'Kapanış'] + [f'EMA_{p}' for p in self.ema_periods]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise ValueError(f"Eksik sütunlar: {missing_columns}")
# Veri kontrolü ve temizlik
df = df.dropna(subset=[f'EMA_{p}' for p in self.ema_periods])
if df.empty:
raise ValueError("EMA verileri boş")
# Tarih formatı ve sıralama
df['Tarih'] = pd.to_datetime(df['Tarih'])
df = df.sort_values('Tarih').reset_index(drop=True)
print(f"{Fore.CYAN} ⚡ EMA dizilim analizi yapılıyor...")
# Tüm sinyal değişimlerini takip et
last_status = None
signals_history = []
for _, row in df.iterrows():
status = self.check_alignment(row)
if status != last_status:
if last_status is not None: # İlk durumu kaydetme
signals_history.append({
'CODE': ticker_name,
'DATE': row["Tarih"],
'CLOSING_TL': row["Kapanış"],
'STATUS': status,
'PREV_STATUS': last_status
})
last_status = status
# Son durumu analiz et
if not df.empty:
last_row = df.iloc[-1]
last_status = self.check_alignment(last_row)
# Son sinyal tarihini bul
latest_signal_date = None
if last_status == "İdeal EMA Yükseliş":
latest_signal_date = self.find_latest_signal_date(df.copy(), "İdeal EMA Yükseliş")
elif last_status == "İdeal EMA Düşüş":
latest_signal_date = self.find_latest_signal_date(df.copy(), "İdeal EMA Düşüş")
# Sinyal sonrası kazanç hesapla
max_price, max_date = None, None
min_price, min_date = None, None
gain, gain_percent = 0, 0
if signals_history:
last_signal = signals_history[-1]
if last_signal['STATUS'] == 'İdeal EMA Yükseliş':
max_price, max_date = self.find_max_price_after_signal(df, last_signal['DATE'])
if max_price:
gain, gain_percent = self.calculate_gain(last_signal['CLOSING_TL'], max_price)
elif last_signal['STATUS'] == 'İdeal EMA Düşüş':
min_price, min_date = self.find_min_price_after_signal(df, last_signal['DATE'])
if min_price:
gain, gain_percent = self.calculate_gain(last_signal['CLOSING_TL'], min_price)
# Kayıt oluştur
record = {
"Hisse_Adi": ticker_name,
"Tarih": pd.to_datetime(last_row["Tarih"]).date(),
"Kapanış": last_row["Kapanış"],
"EMA_Dizilim": last_status,
"Son_Sinyal_Tarihi": latest_signal_date.date() if latest_signal_date else None,
"Bozulma": "Evet" if self.price_below_any_ema(last_row) else "Hayır",
"Sinyal_Sonrasi_Max_Fiyat": max_price,
"Max_Fiyat_Tarihi": max_date.date() if max_date else None,
"Sinyal_Sonrasi_Min_Fiyat": min_price,
"Min_Fiyat_Tarihi": min_date.date() if min_date else None,
"Kazanc_TL": round(gain, 2),
"Kazanc_Yuzde": round(gain_percent, 2)
}
# Kategorilere ayır
if last_status == "İdeal EMA Yükseliş":
self.ideal_up.append(record)
print(f"{Fore.GREEN} İdeal EMA Yükseliş tespit edildi!")
elif last_status == "İdeal EMA Düşüş":
self.ideal_down.append(record)
print(f"{Fore.RED} İdeal EMA Düşüş tespit edildi!")
else:
self.neutral.append(record)
print(f"{Fore.WHITE} ➖ Nötr durumda")
# Potansiyel kontrol
if self.check_potential(last_row):
potential_record = record.copy()
potential_record["EMA_Dizilim"] = "İdeal EMA Olabilir"
self.potential.append(potential_record)
print(f"{Fore.YELLOW} ⚡ Potansiyel oluşum tespit edildi!")
# Tüm sinyalleri kaydet
self.all_signals.extend(signals_history)
print(f"{Fore.GREEN}✅ {ticker_name} İdeal EMA analizi tamamlandı.")
self.successful_files.append(ticker_name)
return True
except FileNotFoundError:
print(f"{Fore.RED}❌ {ticker_name if ticker_name else file_path} dosyası bulunamadı.")
self.failed_files.append(os.path.basename(file_path))
return False
except pd.errors.EmptyDataError:
print(f"{Fore.RED}❌ {ticker_name if ticker_name else file_path} dosyası boş veya hatalı.")
self.failed_files.append(os.path.basename(file_path))
return False
except ValueError as e:
# Eksik sütun veya boş veri hatası için
print(f"{Fore.RED}❌ {ticker_name if ticker_name else file_path} için hata: {e}")
self.failed_files.append(os.path.basename(file_path))
return False
except Exception as e:
# Diğer tüm beklenmeyen hatalar için
print(f"{Fore.RED}❌ {ticker_name if ticker_name else file_path} için beklenmeyen hata: {e}")
self.failed_files.append(os.path.basename(file_path))
return False
@staticmethod
def find_input_files(input_folder="StokData/Emas/"):
"""Giriş dosyalarını bulma (EMA dosyalarından)"""
try:
pattern = os.path.join(input_folder, "*.xlsx")
files = glob.glob(pattern)
if not files:
print(f"{Fore.RED}❌ {input_folder} klasöründe Excel dosyası bulunamadı!")
return []
print(f"{Fore.BLUE} {len(files)} adet EMA dosyası bulundu.")
return files
except Exception as e:
print(f"{Fore.RED}❌ Dosya arama hatası: {e}")
return []
def calculate_statistics(self):
"""İstatistik hesaplama"""
try:
# Son 1 yılın verilerini filtrele
one_year_ago = datetime.now() - timedelta(days=365)
recent_signals = [signal for signal in self.all_signals if signal['DATE'] >= one_year_ago]
# İdeal EMA UP sinyalleri için istatistikler
ideal_up_stats = []
for signal in recent_signals:
if signal['STATUS'] == 'İdeal EMA Yükseliş':
# Bu sinyalden sonra çıkış noktasını bul
exit_signals = [s for s in recent_signals if
s['CODE'] == signal['CODE'] and s['DATE'] > signal['DATE']]
exit_signal = None
if exit_signals:
exit_signal = min(exit_signals, key=lambda x: x['DATE'])
if exit_signal:
gain, gain_percent = self.calculate_gain(signal['CLOSING_TL'], exit_signal['CLOSING_TL'])
ideal_up_stats.append({
'Hisse_Adi': signal['CODE'],
'Giris_Tarihi': signal['DATE'].date(),
'Giris_Fiyati': signal['CLOSING_TL'],
'Cikis_Tarihi': exit_signal['DATE'].date(),
'Cikis_Fiyati': exit_signal['CLOSING_TL'],
'Cikis_Sinyali': exit_signal['STATUS'],
'Kazanc_TL': round(gain, 2),
'Kazanc_Yuzde': round(gain_percent, 2),
'Gun_Sayisi': (exit_signal['DATE'] - signal['DATE']).days
})
return ideal_up_stats
except Exception as e:
print(f"{Fore.RED}❌ İstatistik hesaplama hatası: {e}")
return []
@staticmethod
def write_sheet(excel_writer, result_df, sheet_name, freeze_panes=None):
"""Excel sayfası yazma ve biçimlendirme"""
try:
if isinstance(result_df, list):
result_df = pd.DataFrame(result_df)
if result_df.empty:
result_df = pd.DataFrame([{"Mesaj": "Veri bulunamadı"}])
result_df.to_excel(excel_writer, sheet_name=sheet_name, index=False)
ws = excel_writer.sheets[sheet_name]
# Başlık biçimlendirme
header_fill = PatternFill(start_color='ADD8E6', end_color='ADD8E6', fill_type='solid')
for col_num, column in enumerate(result_df.columns, 1):
cell = ws.cell(row=1, column=col_num)
cell.font = Font(bold=True)
cell.fill = header_fill
cell.alignment = Alignment(horizontal='center')
# Sütun genişliklerini ayarla
if 'Tarih' in column:
ws.column_dimensions[get_column_letter(col_num)].width = 15
elif 'Hisse' in column:
ws.column_dimensions[get_column_letter(col_num)].width = 12
elif 'Kazanc' in column or 'Kapanış' in column or 'Fiyat' in column:
ws.column_dimensions[get_column_letter(col_num)].width = 18
else:
ws.column_dimensions[get_column_letter(col_num)].width = 20
# Freeze panes
if freeze_panes:
ws.freeze_panes = freeze_panes
# Sayısal verileri formatlama
for row_num in range(2, len(result_df) + 2):
for col_num in range(1, len(result_df.columns) + 1):
cell = ws.cell(row=row_num, column=col_num)
cell.alignment = Alignment(horizontal='center')
except Exception as e:
print(f"{Fore.RED}❌ Excel yazma hatası ({sheet_name}): {e}")
def save_results(self):
"""Sonuçları Excel'e kaydetme"""
try:
# Klasör oluşturma
output_folder = "StokData/IdealEma/"
os.makedirs(output_folder, exist_ok=True)
# Dosya isimleri
today = datetime.today().strftime('%Y-%m-%d')
main_file = os.path.join("StokData", "idealema_analiz.xlsx") # Ana dosya Stok klasörüne
archive_file = os.path.join(output_folder, f"idealema_analiz_{today}.xlsx") # Arşiv IdealEma klasörüne
# İstatistikleri hesapla
print(f"{Fore.CYAN} İstatistikler hesaplanıyor...")
ideal_up_stats = self.calculate_statistics()
# Özet istatistikler
summary_stats = pd.DataFrame({
'Kategori': ['Toplam İdeal EMA UP', 'Toplam İdeal EMA DOWN', 'Toplam Nötr', 'Toplam Potansiyel',
'Son 1 Yıl UP Sinyali', 'Son 1 Yıl Ortalama Kazanç %', 'Son 1 Yıl Ortalama Gün'],
'Deger': [len(self.ideal_up), len(self.ideal_down), len(self.neutral), len(self.potential),
len(ideal_up_stats),
round(np.mean([stat['Kazanc_Yuzde'] for stat in ideal_up_stats]) if ideal_up_stats else 0, 2),
round(np.mean([stat['Gun_Sayisi'] for stat in ideal_up_stats]) if ideal_up_stats else 0, 2)]
})
# Ana dosyayı kaydet
print(f"{Fore.GREEN} Ana dosya kaydediliyor: {main_file}")
with pd.ExcelWriter(main_file, engine='openpyxl') as excel_writer:
self.write_sheet(excel_writer, self.ideal_up, "IdealEMAUp", freeze_panes='A2')
self.write_sheet(excel_writer, self.ideal_down, "IdealEMADown", freeze_panes='A2')
self.write_sheet(excel_writer, self.neutral, "Neutral", freeze_panes='A2')
self.write_sheet(excel_writer, self.potential, "Potansiyel", freeze_panes='A2')
self.write_sheet(excel_writer, ideal_up_stats, "Son1Yil_IstatistikUp", freeze_panes='A2')
self.write_sheet(excel_writer, summary_stats, "Ozet_Istatistikler", freeze_panes='A2')
# Arşiv dosyasını kaydet
print(f"{Fore.BLUE} Arşiv dosyası kaydediliyor: {archive_file}")
with pd.ExcelWriter(archive_file, engine='openpyxl') as excel_writer:
self.write_sheet(excel_writer, self.ideal_up, "IdealEMAUp", freeze_panes='A2')
self.write_sheet(excel_writer, self.ideal_down, "IdealEMADown", freeze_panes='A2')
self.write_sheet(excel_writer, self.neutral, "Neutral", freeze_panes='A2')
self.write_sheet(excel_writer, self.potential, "Potansiyel", freeze_panes='A2')
self.write_sheet(excel_writer, ideal_up_stats, "Son1Yil_IstatistikUp", freeze_panes='A2')
self.write_sheet(excel_writer, summary_stats, "Ozet_Istatistikler", freeze_panes='A2')
return True, ideal_up_stats
except Exception as e:
print(f"{Fore.RED}❌ Sonuç kaydetme hatası: {e}")
return False, []
def print_summary(self, ideal_up_stats):
"""Özet rapor"""
total = len(self.successful_files) + len(self.failed_files)
success_rate = (len(self.successful_files) / total * 100) if total > 0 else 0
print(f"\n{Fore.CYAN} ===== İDEAL EMA ANALİZ RAPORU =====")
print(f"{Fore.BLUE} EMA Periyodları: {', '.join(map(str, self.ema_periods))}")
print(f"{Fore.GREEN}✅ Başarılı: {len(self.successful_files)}")
print(f"{Fore.RED}❌ Başarısız: {len(self.failed_files)}")
print(f"{Fore.BLUE} Başarı oranı: {success_rate:.1f}%")
print(f"\n{Fore.MAGENTA} SONUÇLAR:")
print(f"{Fore.GREEN} İdeal EMA Yükseliş: {len(self.ideal_up)}")
print(f"{Fore.RED} İdeal EMA Düşüş: {len(self.ideal_down)}")
print(f"{Fore.WHITE} ➖ Nötr: {len(self.neutral)}")
print(f"{Fore.YELLOW} ⚡ Potansiyel: {len(self.potential)}")
print(f"{Fore.BLUE} Son 1 Yıl İstatistik: {len(ideal_up_stats)}")
if ideal_up_stats:
avg_gain = np.mean([stat['Kazanc_Yuzde'] for stat in ideal_up_stats])
avg_days = np.mean([stat['Gun_Sayisi'] for stat in ideal_up_stats])
print(f"{Fore.GREEN} Ortalama Kazanç: %{avg_gain:.2f}")
print(f"{Fore.BLUE} Ortalama Elde Tutma: {avg_days:.1f} gün")
def main(self):
"""Ana fonksiyon"""
print(f"{Fore.CYAN} İdeal EMA Dizilim Analizi Başlatılıyor...")
print(f"{Fore.BLUE} EMA Periyodları: {', '.join(map(str, self.ema_periods))}")
# Giriş dosyalarını bulma
input_files = self.find_input_files()
if not input_files:
return
print(f"{Fore.BLUE} Toplam işlenecek dosya: {len(input_files)}\n")
# Dosyaları işleme
for i, file_path in enumerate(input_files, 1):
print(f"\n{Fore.MAGENTA}[{i}/{len(input_files)}] İşleniyor...")
self.process_single_file(file_path)
# Sonuçları kaydetme ve raporlama
success, ideal_up_stats = self.save_results()
if success:
self.print_summary(ideal_up_stats)
print(f"\n{Fore.GREEN} İdeal EMA analizi tamamlandı!")
print(f"{Fore.BLUE} Ana dosya: Stok/idealema_analiz.xlsx")
print(f"{Fore.BLUE} Arşiv klasörü: Stok/IdealEma/")
# Kullanım
if __name__ == "__main__":
calculator = IdealEMACalculator()
calculator.main()
Bir sonraki yazıda Borsa İstanbul BIST Spot hisseleri için Wave Trend Osilatör sinyallerini tarama çalışması ve Trading View Wave Trend osilatör indikatörü ayrı ayrı ele alınacaktır.
Son Yorumlar