Mit Blick auf die CSV-Daten der Transaktionshistorie von SBI Securities, Der Kauf und Verkauf von Aktien ist individuell angeordnet, und die Gewinne und Verluste sind etwas schwer zu verstehen.
Also habe ich den Code geschrieben, weil ich wollte, dass er ein Set ist.
Die CSV-Daten lauten wie folgt Dies ist unpraktisch, wenn Sie den Gewinn und Verlust jeder Aktie einzeln anzeigen möchten.
Wenn Sie das Programm ausführen,
Es sieht aus wie das.
Das Verzeichnis sieht so aus
Nach dem Speichern der CSV-Daten in einem Ordner namens CSV Wenn Sie sbiAggregater.py starten, werden zusammenfassende Excel-Daten generiert. Übrigens, wenn Sie eine CSV-Dateiintegration haben und bereits Excel-Daten zusammengefasst haben. Es wird ein früherer Ordner erstellt und in diesen geworfen.
Es gibt zwei Programme, aber der Code muss nur von sbiAggregater.py gestartet werden.
Der Code ist unten
csv_uniter_sbi.py
#! Python3
# -*- coding: utf-8 -*-
# csv_uniter_sbi.py -Integrieren Sie die CSV der Transaktionshistorie von sbi-Wertpapieren
import csv, os, sys, datetime, shutil
import logging
#logging.disable(logging.CRITICAL)
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
logging.debug("Suche nach CSV-Datei...")
f_list = []
for filename in os.listdir(".\\csv"):
if filename.lower().endswith(".csv"):
f_list.append(filename)
def cPath(filename):
dirname = os.path.join(".\\csv", filename)
return dirname
# {Datum:Dateiname}Schieben Sie die Daten in die Liste für
csv_data = []
if len(f_list) <= 1:
logging.debug("Es gab weniger als eine CSV-Datei. Integriert keine CSV-Dateien")
else:
for f in f_list:
f = open(cPath(f), "r")
f_reader = csv.reader(f)
f_data = list(f_reader)
#Achten Sie darauf, warum die zu importierenden Spalten aus der Mitte stammen
for i in range(9, len(f_data)):
if f_data[i] in csv_data:
continue
elif f_data[i] == []:
continue
csv_data.append(f_data[i])
# csv_Machen Sie alle Daten zu einem sortierbaren Datum / Uhrzeit-Objekt
for i in range(len(csv_data)):
date = datetime.datetime.strptime(csv_data[i][0], "%Y/%m/%d")
csv_data[i][0] = date
#Nach Zeit sortieren, rückwärts absteigen, damit das neue Datum oben liegt
csv_data.sort(key=lambda x: x[0], reverse=True)
#Zum Originalformat zurückkehren
for i in range(len(csv_data)):
csv_data[i][0] = csv_data[i][0].strftime("%Y/%m/%d")
#Fügen Sie die Daten ein
new_f = open(cPath("sbi_united_{}.csv".format(datetime.datetime.now().strftime("%Y%m%d%H%M%S"))), "w", newline="")
new_f_writer = csv.writer(new_f)
new_f_writer.writerow(f_data[8])
new_f_writer.writerows(csv_data)
f.close()
new_f.close()
#Legen Sie schließlich die Referenzdatei in den letzten Ordner
for file in f_list:
shutil.move(cPath(file), ".\\csv\\past\\{}".format(file))
logging.debug("CSV-Verarbeitung abgeschlossen")
Das Haupt ist unten
sbiAggregater.py
#! Python3
# -*- coding: utf-8 -*-
# matuiAggregater.py -Ein Programm, das CSV der Transaktionshistorie von Matsui Securities liest und aggregiert
import csv, os, shutil
from openpyxl.utils import get_column_letter, column_index_from_string
from openpyxl.styles import Font
import logging, openpyxl, datetime
logging.disable(logging.CRITICAL)
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
#Starten Sie das CSV-Integrationsprogramm
import csv_uniter_sbi
logging.debug("Fügen Sie die alte Zusammenfassungsdatei ein_Gehen Sie zum Ordner xlsx und generieren Sie eine neue Datei")
#Alte XLSX-Daten.\\past_Gib xlsx ein
os.makedirs("past_xlsx", exist_ok=True)
for xlsxfile in os.listdir("."):
if xlsxfile.lower().endswith(".xlsx"):
logging.debug("Fügen Sie die alte xlsx-Datei in denselben Ordner ein_Gehe zu xlsx")
try:
#Überschreiben und speichern
shutil.move(xlsxfile, ".\\past_xlsx\\{}".format(xlsxfile))
except Exception as exc:
print("{}Ist{}Konnte sich da nicht bewegen".format(xlsxfile, exc))
continue
for filename in os.listdir(".\\csv"):
if filename.lower().endswith(".csv"):
csvfilename = filename
def cPath(file):
return os.path.join(".\\csv", file)
savename = "sbi_matome_{}.xlsx".format(datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
f = open(cPath(csvfilename))
f_reader = csv.reader(f)
f_data = list(f_reader)
#Zwei Perspektiven, Setter und Checker. Der Setter ist subjektiv. Der Checker ist die Perspektive, ein Paar zu finden. Überprüft, ob es mit dem Prüfer übereinstimmt_row_Setzen Sie es in die Liste und der Setter überspringt diese Zeile
checked_row_list = []
only_list = []
#In xlsx-Datei einfügen
wb = openpyxl.Workbook()
sheet = wb.active
#Der Datentyp ist{Transaktionskategorie 0,Marke 1,Code 2,Kaufdatum 3,Kaufpreis 4,Verkaufsdatum 5,Verkaufspreis 6,Haltetage 7,Gewinn-Verlust-Verhältnis 8,Gewinn und Verlust 9}
data_type_order = ["Transaktionsklassifizierung", "Marke", "Code", "Kaufdatum", "Verkaufsdatum", "Anzahl der gehaltenen Tage","Menge kaufen", "Verkaufsmenge", "Kaufpreis", "Verkaufspreis", "Gewinn-Verlust-Verhältnis","Gewinn-und Verlust", "Kommentar"]
#Da habe ich eine neue Excel-Tabelle erstellt, Daten_type_Füllen Sie die erste Spalte gemäß der Bestellliste aus
for Col in range(1, len(data_type_order)+ 1):
sheet.cell(1, Col).value = data_type_order[Col - 1]
#Die erste Zeile ist am Fenster befestigt
sheet.freeze_panes = "A2"
#Breite anpassen
sheet.column_dimensions["A"].width = 11.44
sheet.column_dimensions["B"].width = 28.22
sheet.column_dimensions["C"].width = 5.8
sheet.column_dimensions["D"].width = 12
sheet.column_dimensions["E"].width = 12
sheet.column_dimensions["F"].width = 8.67
sheet.column_dimensions["G"].width = 7.11
sheet.column_dimensions["H"].width = 7.11
sheet.column_dimensions["I"].width = 8.67
sheet.column_dimensions["J"].width = 8.67
sheet.column_dimensions["K"].width = 7.56
sheet.column_dimensions["L"].width = 8.67
sheet.column_dimensions["M"].width = 19.33
#Suchen Sie als Spaltenblock nach Vertragsdatum nach dem ältesten Datum.{Lieferdatum: row}
day_row_dict = {}
for row in range(9, len(f_data)):
#Überspringen, wenn freie Daten vorhanden sind
if len(f_data[row]) == 0:
continue
day_row_dict.setdefault(datetime.datetime.strptime(f_data[row][0], "%Y/%m/%d"), [])
day_row_dict[datetime.datetime.strptime(f_data[row][0], "%Y/%m/%d")].append(row)
#Daten für alle Fälle sortieren.Der alte Tag kommt zuerst umgekehrt=False
day_row_key = sorted(day_row_dict, reverse=False)
def pasteExcel(data):
"""Empfangen Sie Wörterbuchdaten und fügen Sie sie in die erste Zeile von Excel ein"""
#In Excel-Tabelle einfügen
sheet.insert_rows(2)
#Lesen Sie die erste Spalte von Excel und fügen Sie sie ein
for Col in range(1, sheet.max_column + 1):
try:
sheet.cell(2, Col).value = data[sheet.cell(1, Col).value]
#Überspringen, wenn ein Schlüssel angezeigt wird, der nicht in den Daten enthalten ist
except KeyError:
continue
def pareset(ROW1, ROW2):
"""ROW1 wird durch Neuanordnen der Daten gekauft und ROW2 wird verkauft. Nennen Sie dies sowohl für den Verkauf als auch für den Kauf"""
data = {}
data["Kommentar"] = ""
if ROW1 == [] and ROW2:
#Wenn nur zum Verkauf
data["Transaktionsklassifizierung"] = f_data[ROW2[0]][4]
data["Code"] = int(f_data[ROW2[0]][2])
data["Marke"] = f_data[ROW2[0]][1]
data["Verkaufsdatum"] = f_data[ROW2[-1]][0]
sell_sum = 0
sell_num = 0
#Lieferbetrag, Anzahl der Aktien
for i in range(len(ROW2)):
#Wenn Sie einen neuen Artikel kaufen oder verkaufen, beträgt die Liefergebühr"--"Damit
if f_data[ROW2[i]][4][2:4] == "Neu":
sell_sum += 0
#In Rückzahlung+-Weil es normal rauskommt.
elif f_data[ROW2[i]][4][2:4] == "Rückzahlung":
sell_sum += int(f_data[ROW2[i]][13])
else:
sell_sum += int(f_data[ROW2[i]][13])
sell_num += int(f_data[ROW2[i]][8])
data["Verkaufspreis"] = sell_sum
data["Verkaufsmenge"] = sell_num
data["Gewinn-und Verlust"] = sell_sum
data["Kommentar"] += "Nicht genug Kaufdaten"
elif ROW1 and ROW2 == []:
#Beim Kauf nur
data["Transaktionsklassifizierung"] = f_data[ROW1[0]][4]
data["Code"] = int(f_data[ROW1[0]][2])
data["Marke"] = f_data[ROW1[0]][1]
data["Kaufdatum"] = f_data[ROW1[0]][0]
buy_sum = 0
buy_num = 0
for i in range(len(ROW1)):
if f_data[ROW1[i]][4][2:4] == "Neu":
buy_sum += 0
#In Rückzahlung+-Weil es normal rauskommt.
elif f_data[ROW1[i]][4][2:4] == "Rückzahlung":
buy_sum += int(f_data[ROW1[i]][13])
#Machen Sie im Falle des tatsächlichen Artikels den Kauf negativ
else:
buy_sum -= int(f_data[ROW1[i]][13])
buy_num += int(f_data[ROW1[i]][8])
data["Kaufpreis"] = buy_sum
data["Menge kaufen"] = buy_num
data["Gewinn-und Verlust"] = buy_sum
#Wenn es sich um ein Kauf / Verkauf-Paar handelt
elif ROW1 and ROW2:
data["Transaktionsklassifizierung"] = f_data[ROW2[0]][4]
data["Code"] = int(f_data[ROW2[0]][2])
data["Marke"] = f_data[ROW2[0]][1]
#Bei mehreren ist das letzte Verkaufsdatum der Verkauf
data["Verkaufsdatum"] = f_data[ROW2[-1]][0]
#Verkaufspreis,Der Kaufpreis und die Anzahl der Aktien sind die Summe der Lieferbeträge in der Liste.
sell_sum = 0
sell_num = 0
buy_sum = 0
buy_num = 0
for i in range(len(ROW2)):
#Wenn Sie einen neuen Artikel kaufen oder verkaufen, beträgt die Liefergebühr"--"Damit
if f_data[ROW2[i]][4][2:4] == "Neu":
sell_sum += 0
#In Rückzahlung+-Weil es normal rauskommt.
elif f_data[ROW2[i]][4][2:4] == "Rückzahlung":
sell_sum += int(f_data[ROW2[i]][13])
else:
sell_sum += int(f_data[ROW2[i]][13])
sell_num += int(f_data[ROW2[i]][8])
data["Verkaufspreis"] = sell_sum
data["Verkaufsmenge"] = sell_num
data["Kaufdatum"] = f_data[ROW1[0]][0]
for i in range(len(ROW1)):
if f_data[ROW1[i]][4][2:4] == "Neu":
buy_sum += 0
#In Rückzahlung+-Weil es normal rauskommt.
elif f_data[ROW1[i]][4][2:4] == "Rückzahlung":
buy_sum += int(f_data[ROW1[i]][13])
#Machen Sie im Falle des tatsächlichen Artikels den Kauf negativ
else:
buy_sum += -int(f_data[ROW1[i]][13])
buy_num += int(f_data[ROW1[i]][8])
data["Kaufpreis"] = buy_sum
data["Menge kaufen"] = buy_num
#Die Anzahl der gehaltenen Tage wird anhand von Datum / Uhrzeit berechnet.Durch Subtrahieren von Datum und Uhrzeit wird ein Zeitdelta-Objekt erstellt
date1 = datetime.datetime.strptime(f_data[ROW1[0]][0], "%Y/%m/%d")
date2 = datetime.datetime.strptime(f_data[ROW2[-1]][0], "%Y/%m/%d")
data["Anzahl der gehaltenen Tage"] = int((date2 - date1).days)
# +-Wurde bereits hinzugefügt, seien Sie also vorsichtig
data["Gewinn-und Verlust"] = int(data["Kaufpreis"]) + int(data["Verkaufspreis"])
#Das Gewinn-Verlust-Verhältnis ist die Höhe des Gewinns / Verlusts/Kaufpreis (absoluter Wert)) *100 Einheiten%
if data["Kaufpreis"] == 0:
data["Gewinn-Verlust-Verhältnis"] = round(int(data["Gewinn-und Verlust"]) / abs(int(data["Verkaufspreis"])) * 100, 2)
else:
data["Gewinn-Verlust-Verhältnis"] = round(int(data["Gewinn-und Verlust"]) / abs(int(data["Kaufpreis"])) * 100, 2)
if len(ROW2) > 1:
data["Kommentar"] += "Separat erhältlich. "
if len(ROW1) > 1:
data["Kommentar"] += "Zusätzlicher Kauf. "
if sell_num > buy_num:
data["Kommentar"] += "Nicht genug Kaufdaten"
else:
raise Exception("pareset()Es gibt eine Ausnahme in")
pasteExcel(data)
for date in day_row_key:
for Row in day_row_dict[date]:
#Überspringen Sie freie Daten
if len(f_data[Row]) == 0:
continue
#Hauptansichtspunkt Zeile:Scannen Sie nacheinander nach Datum
#Überspringen, wenn der unten beschriebene Paarsuchprüfer aktiviert ist
if Row in checked_row_list:
continue
checked_row_list.append(Row)
#Da es sich um Sachleistungen handelt und gekauft wird, sucht der Checker nach einem Paar zum Verkauf. Stellen Sie die Verkaufsmenge des Checkers ein
#Suchen Sie weiterhin nach demselben Bestand, nach dem Sie im Checker gesucht haben. Bis es die erste Anzahl von Aktien einholt. Wenn Sie kaufen, bevor die Anzahl der von Ihnen gekauften Aktien 0 wird, wird dies zur Anzahl der verbleibenden Aktien addiert und Sie suchen nach weiteren Verkäufen.
#Wenn bis zum Ende noch ein Bestand vorhanden ist, müssen Sie diesen ganz am Ende in Excel einfügen
#Kauf und Verkauf initialisieren
multiple_checker_sell_rows = []
multiple_checker_buy_rows = []
if f_data[Row][4].endswith("Kaufen"):
multiple_checker_buy_rows.append(Row)
#Ersteinstellung der Anzahl der verbleibenden Aktien
num_stocks_remaining = int(f_data[Row][8])
elif f_data[Row][4].endswith("Verkaufen"):
multiple_checker_sell_rows.append(Row)
#Ersteinstellung der Anzahl der verbleibenden Aktien
num_stocks_remaining = -int(f_data[Row][8])
else:
raise Exception
#Prüfer: checker_Reihe sucht nach einem Paar. Machen Sie das Paar überprüft
for checker_date in day_row_key[day_row_key.index(date):]:
for checker_row in day_row_dict[checker_date]:
logging.debug("Row: {}, f_data[Row][2]: {}, f_data[checker_row][2]: {}, f_data[checker_row][4]: {}".format(Row, f_data[Row][2], f_data[checker_row][2], f_data[checker_row][4]))
#Überspringen Sie freie Daten
if len(f_data[checker_row]) == 0:
continue
#Überspringen Sie Paare oder xl reflektierte Spalten
elif checker_row in checked_row_list:
continue
#Beurteilen Sie, ob es sich um Gutschriften oder Sachleistungen handelt, und überspringen Sie diese, wenn nicht
#Beurteilen Sie, ob es sich um Kredit- oder physische Aktien handelt. Suchen Sie nach einem physischen Paar für physische und einem Kreditpaar für Kredit."Aktienverkäufe in Form von Sachleistungen(Kaufen)"Oder"Kredit Neuverkauf"Sie können den Wert von erhalten
elif f_data[checker_row][4][:2] != f_data[Row][4][:2]:
continue
#Finden Sie ein Paar, das Ihren Kriterien entspricht
elif (f_data[Row][2] == f_data[checker_row][2]) and f_data[checker_row][4].endswith("Verkaufen"):
#Übereinstimmungen sind der Hauptprüfer für Zeile und Prüfer_Zeile überspringen lassen
checked_row_list.append(checker_row)
multiple_checker_sell_rows.append(checker_row)
num_stocks_remaining -= int(f_data[checker_row][8])
if num_stocks_remaining == 0:
#Betrachten Sie es als Paar und übergeben Sie die Paardaten an Excel-Daten, um sie zu überprüfen
pareset(ROW1=multiple_checker_buy_rows, ROW2=multiple_checker_sell_rows)
logging.debug("Paar gefunden! ROW1: {}, ROW2: {}".format(multiple_checker_buy_rows, multiple_checker_sell_rows))
break
else:
continue
elif (f_data[Row][2] == f_data[checker_row][2]) and f_data[checker_row][4].endswith("Kaufen"):
checked_row_list.append(checker_row)
multiple_checker_buy_rows.append(checker_row)
#Wenn es einen weiteren Kauf gibt, bevor die Anzahl der verbleibenden Aktien 0 erreicht, erhöhen Sie die Anzahl der verbleibenden Aktien.
num_stocks_remaining += int(f_data[checker_row][8])
if num_stocks_remaining == 0:
#Paarbildung
pareset(ROW1=multiple_checker_buy_rows, ROW2=multiple_checker_sell_rows)
logging.debug("Paar gefunden! ROW1: {}, ROW2: {}".format(multiple_checker_buy_rows, multiple_checker_sell_rows))
break
else:
continue
else:
logging.debug("Dies war kein Paar. (checker_row: {})".format(checker_row))
continue
else:
#Wenn Sie innerhalb eines Datums ohne Unterbrechung fertig sind, fahren Sie mit dem nächsten Datum fort
continue
#Wenn der Checker bricht und fertig ist, brechen Sie aus der Checker-Schleife aus und suchen Sie nach der nächsten Hauptansichtspunktzeile
break
else:
#Wenn der Prüfer alle Spalten überprüft hat, aber noch Freigaben vorhanden sind
if num_stocks_remaining != 0:
#Bringen Sie es ganz nach oben,Ich möchte die Anzahl der verbleibenden Aktien anzeigen
only_list.append((multiple_checker_buy_rows, multiple_checker_sell_rows))
#Rufen Sie auf, was Sie noch nicht verkauft haben
for row1, row2 in only_list:
logging.debug("Einige haben kein Paar. ROW1: {}, ROW2: {}".format(row1, row2))
pareset(ROW1=row1, ROW2=row2)
#Wenn das Gewinn-Verlust-Verhältnis negativ ist, schreiben Sie rote Zahlen
font_st_red = Font(color="FF0000")
for row in range(2, sheet.max_row + 1):
if sheet["K{}".format(row)].value == None or sheet["K{}".format(row)].value == []:
continue
elif sheet["K{}".format(row)].value < 0:
sheet["K{}".format(row)].font = font_st_red
wb.save(savename)
f.close()
print("xlsx-Verarbeitung abgeschlossen")
Es ist eine brutale Gewalt, aber bitte verwenden Sie sie, wenn Sie möchten.
Recommended Posts