Hier beschäftigen wir uns mit der Zeitfluktuationsanalyse am Beispiel von Python, insbesondere mit Zeitreihendaten (Lichtkurve genannt) der Intensität von Schwarzlochphotonen, die durch Röntgenstrahlen beobachtet werden. Wenn nur das Dateiformat vorbereitet ist, kann es zur Analyse beliebiger Zeitreihendaten verwendet werden. Selbst wenn Sie es nicht selbst codieren, können Sie die meisten Dinge mit den von der NASA bereitgestellten Tools namens XRONOS tun. Es ist jedoch für diese Leute, weil es mehr Studium ist, es selbst zu schreiben, um kleine Details und neue Zeitschwankungsanalysen zu erhalten. Die Betriebsumgebung ist die Anaconda Python3-Serie, die sich auf dem Mac-Terminal befindet. Es wird angenommen, dass ls verwendet werden kann. Linux ist in Ordnung, aber Windows muss möglicherweise angepasst werden. (Aktualisiert für Python3 am 2020.04.24 mit Version = 1.2, und die Berechnung der Unwrap-Phase war in 2020-05-12 Version 1.3 enthalten.)
So laden Sie Beispieldateien und Code herunter
Der Normalisierungsfaktor (Norm) des Leistungsspektrums (PSD), wenn er bei allen Frequenzen integriert ist, Bitte standardisieren Sie auf RMS ^ 2. Ich denke, das ist das am häufigsten verwendete. Wenn Sie überprüfen möchten Siehe Bestätigen des Persival-Theorems mit Matplotlib und Scipy Fourier Transform (FFT).
So verkürzen Sie die Zeit, um das Leistungsspektrum einmal anzuwenden
Betrachten Sie die drei Parameter von und verwenden Sie den optimalen.
Wenn die Helligkeit beispielsweise mehrere c / s beträgt,
Die Datei (xis3_src_0p1sec_6000.qdp) ist ein Teil der Lichtkurve des Schwarzen Lochs. Die Anpassungsdatei wird mit flc2ascii in Text konvertiert und nur 6000 Zeilen werden extrahiert.
python
$cat xis3_src_0p1sec_6000.qdp
7.05204864025116e+02 3.20020450000000e+01 1.60010220000000e+01 1.00000010000000e+00
7.05329855978489e+02 8.00051120000000e+00 8.00051120000000e+00 1.00000010000000e+00
7.05454847991467e+02 8.00051120000000e+00 8.00051120000000e+00 1.00000010000000e+00
....
Mögen Zeit (Sek.) Zählrate (c / s) Zählratenfehler (c / s) Die fraktionierte Belichtung (die Rate, mit der Daten pro Bin gepackt werden) ist verstopft, 4 Zeilen x Stunden. Machen Sie daraus eine Datei. Bereiten Sie einfachen Code vor, damit die Debug-Effizienz mit einer kleinen Datenmenge verbessert werden kann.
python
/bin/ls xis3_src_0p1sec_6000.qdp > test1lc.list
Bereiten Sie eine Datei mit dem Dateinamen vor. Dies ähnelt der Verwendung von Narren, um eine Datei mit einem Dateinamen zu übergeben, wenn Sie mehrere Dateien gleichzeitig verarbeiten möchten.
Führen Sie Ana_Timing_do_FFT.py aus.
python
python Ana_Timing_do_FFT.py test1lc.list -a 10 -l 128
Hier, Ich habe es als "Python Ana_Timing_do_FFT.py test1lc.list - eine durchschnittliche Anzahl von Malen - eine Anzahl von Bins, die für eine FFT verwendet wurden" verwendet.
Es gibt andere Möglichkeiten.
python
$python Ana_Timing_do_FFT.py -h
Usage: Ana_Timing_do_FFT.py fileList [-d] [-p] [-e 1] [-c 4] [-a 12] [-l 512]
Options:
--version show program's version number and exit
-h, --help show this help message and exit
-d, --debug The flag to show detailed information (Flag)
-p, --pltflag to plot time-series data
-m HLEN, --hlen=HLEN Header length
-c CLEN, --clen=CLEN Column length
-a ANUM, --anum=ANUM Average number
-l LENGTH, --length=LENGTH
FFT bin number (e.g., 1024, 2048, ..)
-t TCOL, --tcol=TCOL column number for time
-s VCOL, --vcol=VCOL column number for value
-e VECOL, --vecol=VECOL
column number for value error (if negative, it skips
error column)
Wenn -p in den Optionen hinzugefügt wird, wird die Ergebniszahl jeder FFT gespeichert. Wenn Sie den Status der FFT vor der Mittelwertbildung anzeigen möchten, fügen Sie die Option -p hinzu und führen Sie sie aus.
Wenn es läuft und funktioniert,
fig_fft/Leistungsspektrum-Diagramm
fig_phase/Phase des Leistungsspektrums auspacken
fig_lc/Lichtkurvendiagramm
text_fft/Textauszug des Leistungsspektrums(Time vs. Power^2 [rms^2/Hz])
Das Verzeichnis wird generiert und eine Zahl wird generiert.
Das erzeugte eindimensionale Leistungsspektrum wird in text_fft gespeichert und sein Dateiname in test1fft.list abgelegt. Ana_Timing_plot2d_FFT.py generiert ein zweidimensionales Leistungsspektrum mit der Dateiliste als Argument.
python
$ /bin/ls text_fft/xis3_src_0p1sec_6000_l128_a10_fft_00000* > test1fft.list
$ python Ana_Timing_plot2d_FFT.py test1fft.list
Dann wird ein zweidimensionales Histogramm erzeugt.
Als nächstes werden die Daten des Schwarzloch-Sternsterns J1550 vom Satelliten mit einer großen effektiven Fläche namens RXTE j1550_lowF_50ms_forqiita.txt erfasst /v1_python3/j1550_lowF_50ms_forqiita.txt) Versuchen wir ein Beispiel, in dem die quasi-periodische Fluktuation (QPO) des Schwarzen Lochs gut sichtbar ist.
python
/bin/ls j1550_lowF_50ms_forqiita.txt > rxtelc.list
python Ana_Timing_do_FFT.py rxtelc.list -a 10 -l 512 -d
/bin/ls text_fft/j1550_lowF_50ms_forqiita_l512_a10_fft_0000* > rxtefft.list
python Ana_Timing_plot2d_FFT.py rxtefft.list
--Leistungsspektrum
Was um diese ~ 0,5 Hz herum zu sehen ist, ist die quasi-periodische Fluktuation des Schwarzen Lochs.
Mit der Option -p wird das Ergebnis jeder FFT geplottet und in ein Verzeichnis namens scipy_fft_check gestellt.
python
python Ana_Timing_do_FFT.py rxtelc.list -a 10 -l 512 -d -p
Wenn Sie den Befehl convert verwenden, um in eine GIF-Animation zu konvertieren,
python
convert -delay 50 -loop 5 *.png j1550_fftcheck.gif
Wie gezeigt sind die Zeitreihen, die PSD und die Phase pro Intervall aufgetragen.
Wenn die Erweiterung .lc oder .fits ist, wird sie als FITS-Format erkannt und die Datei wird automatisch per Astropie gelesen.
python
/bin/ls ae905005010hxd_0_pinno_cl_pha25-160.lc > xislc.list
python Ana_Timing_do_FFT.py xislc.list -a 400 -l 64
/bin/ls text_fft/ae905005010hxd_0_pinno_cl_pha25-160_l64_a400_fft_0000* > xisfft.list
python Ana_Timing_plot2d_FFT.py xisfft.list
Die Beobachtungszeit beträgt ungefähr 70k, aber in Wirklichkeit gibt es aufgrund der Rückseite der Erde und der Begrenzung des Sonnenwinkels keine effektive Beobachtungszeit von ungefähr der Hälfte davon. Bei umlaufenden Satelliten ist dies in den meisten Fällen der Fall. Wenn in diesem Programm nicht genügend Daten vorhanden sind, um eine FFT durchzuführen, werden diese übersprungen. In xronos wird eine Einstellungsdatei verwendet, die als Fensterdatei bezeichnet wird, und es wird angegeben, dass nur die Zeitzone für die Berechnung verwendet wird, in der mehrere Bedingungen gelöscht werden.
--Leistungsspektrum
Der QPO des Schwarzen Lochs ist um ~ 3Hz sichtbar. Es ist oft so schwach, und es ist notwendig, Statistiken und Analysen zu erstellen, um Parameter genau zu erhalten.
Alles was Sie brauchen ist
--Erstellen eines eindimensionalen Spektrums und einer Lichtkurve Ana_Timing_do_FFT.py --Erstellen Sie ein zweidimensionales Leistungsspektrum Ana_Timing_plot2d_FFT.py
Nur zwei davon sind in Ordnung. Es entspricht dem Python3-System. Wenn Sie Anfälle eingeben, benötigen Sie Astropie.
Selbst wenn Sie keine Python-Umgebung haben oder Probleme beim Erstellen einer Umgebung haben, können Sie Google Colab verwenden, solange Sie über ein Google-Konto verfügen. So verwenden Sie Google Colab
Bitte beziehen Sie sich auf.
Lassen Sie uns zunächst die Dateien auf Google Drive ablegen. Hängen wir es also ein, damit Google Colab auf die Dateien auf Google Drive zugreifen kann.
from google import colab
colab.drive.mount('/content/gdrive')
Holen Sie sich ein temporäres Passwort und kopieren Sie es. Dann
In Google Colab können Sie die auf Google Drive abgelegten Dateien durchsuchen. Als nächstes kopieren wir den Satz von Dateien auf Google Drive.
python
!cp -rf /content/gdrive/My\ Drive/Python verwandt/202004/v1_python3 .
Sie haben jetzt die Dateien nach Google Colab kopiert. Experten können es überspringen, aber stellen Sie sicher, dass Sie überprüfen, wo Sie sich befinden, und die Dateien richtig anzeigen.
Verwenden Sie cd zum Verschieben, pwd zum Überprüfen Ihres aktuellen Speicherorts und ls zum Anzeigen einer Liste von Dateien an Ihrem aktuellen Speicherort.
Außerdem sollte es wie in diesem Artikel beschrieben funktionieren. (Ich verwende das Python3-System und sollte nur Basismodule verwenden. Bitte melden Sie, wenn es nicht funktioniert.)
/ bin / ls und ls sollten gleich sein. Abhängig von der Umgebung gibt es jedoch eine Einstellung zum Ausgeben anderer Informationen als des Dateinamens mit ls. Daher schreiben wir hier ohne Optionen in call / bin / ls.
Ana_Timing_do_FFT.py
#!/usr/bin/env python
#-*- coding: utf-8 -*-
""" Ana_Timing_do_FFT.py is to plot lightcurves.
This is a python script for timing analysis.
History:
2016-09-13 ; ver 1.0; first draft from scratch
2020-04-24 ; ver 1.2; updated for python3
2020-05-12 ; ver 1.3; unwrap phase included
"""
__author__ = 'Shinya Yamada (syamada(at)tmu.ac.jp'
__version__= '1.2'
############# scipy ############################################################
import scipy as sp # scipy :Ein Muss für numerische Operationen
import scipy.fftpack as sf #FFT-Modul. Verwendet die FFT-Bibliothek von fortan.
################################################################################
############# numpy ############################################################
import numpy as np # numpy :Ein Muss für die Array-Berechnung
from numpy import linalg as LA
################################################################################
############# matplotlib #######################################################
import matplotlib.pylab as plab
import matplotlib.pyplot as plt
from matplotlib.font_manager import fontManager, FontProperties
import matplotlib.colors as colors
import matplotlib.cm as cm
import matplotlib.mlab as mlab
from matplotlib.ticker import MultipleLocator, FormatStrFormatter, FixedLocator
# For 3D plot
from mpl_toolkits.mplot3d import Axes3D
from matplotlib.collections import PolyCollection
from matplotlib.colors import colorConverter
#Anpassung der Etikettengröße
params = {'xtick.labelsize': 13, # x ticks
'ytick.labelsize': 13, # y ticks
'legend.fontsize': 10
}
plt.rcParams['font.family'] = 'serif' #Geben Sie die Schriftart an.(Serife ist in jeder Umgebung.)
plt.rcParams.update(params)
#################################################################################
import random #Modul zur Erzeugung von Zufallszahlen(no use)
import array # (no use)
import os #Betriebssystembezogene Module
import sys #systembezogene Module
import math #Mathematischer Funktionsbaustein
#import commands #Modul zur Verwendung von Shell-Befehlen für Python2
import subprocess #Modul zur Verwendung von Shell-Befehlen für Python3
# import pyfits #passt Modul
import optparse #Optionsbeziehung
#import yaml
import re #Unterstützung für reguläre Ausdrücke
import csv #CSV-Datei
import datetime #Zeitunterstützung
import pytz # timezoze
# jst = pytz.timezone('Asia/Tokyo')
# PyROOT
#import ROOT
#ROOT.gROOT.SetBatch(1)
def average_scipy_fft( allinputarray, dt, filtname=None, pltflag=False, outname = "default", anum = 10, length = 'default', debug = False):
"""
Empfängt Zeitreihendaten, Durchschnittswerte für die angegebene Anzahl von Malen, Häufigkeit, Realteil^2, Imaginärteil^2, Realteil^2+Imaginärteil^2, geben Sie die durchschnittliche Anzahl von Malen zurück.
[Eingabeparameter]
allinputarray :Eindimensionales Array von Zeitreihendaten, die FFT sein sollen
dt :Zeitintervall der Dateneingabe in allinputarray(sec)
filtname=None :Mit oder ohne Filter. Standard ist Keine. Es wird nur das Hanning unterstützt.
pltflag=False :Flag für die Darstellung der zu fftenden Zeitreihendaten. Standard ist False
outname = "default" :Ein Name zur Unterscheidung der Ausgabedatei. Standard ist"default"。
anum = 10 :Die durchschnittliche Anzahl von Malen. Standard ist 10 mal.
length = 'default':Die Länge der Daten, die für eine FFT verwendet werden. Standard ist"default"In diesem Fall Länge=Die Länge des Eingabearrays/Berechnet von anum.
debug = False :Flag zum Debuggen.
[Ausgabeparameter]
Frequenz(Hz)Realteil^2, Imaginärteil^2Realteil^2+Imaginärteil^2, durchschnittliche Anzahl von Malen
"""
alllen = len(allinputarray) #Holen Sie sich die Länge des Arrays
print("..... length of all array = ", alllen)
if length == 'default': #Ermitteln Sie automatisch die Länge einer FFT.
print("..... averaging is done with the number of the cycle. = ", cnum)
length = int (alllen / anum)
print("..... length = ", length)
else: #Ermitteln Sie die Länge einer FFT aus der Länge.
print("..... averaging is done with the length of the record = ", length)
print("..... the number of average = ", anum)
averagenum = 0 #Eine Variable zum Speichern der durchschnittlichen Häufigkeit. Wenn die Daten perfekt gefüllt sind, stimmen sie mit anum überein.
#Berechnen Sie die Anzahl der Sequenzen nach FFT
if (length%2==0):
num_of_fftoutput = int(length/2) + 1 #Wenn es gerade ist, teilen Sie durch 2 und addieren Sie 1.
else:
print("[ERROR] Please choose even number for length ", length)
sys.exit() #Wenn die Länge ungerade ist, wird sie normalerweise nicht verwendet, daher wird ein Fehler ausgegeben und endet.
#Holen Sie sich ein leeres Array. Bereiten Sie ein Array aus dem zweidimensionalen Array vor, um es als zweidimensionales Array mit numpy anzuhängen.
reals = np.empty((0, num_of_fftoutput), float)
imags = np.empty((0, num_of_fftoutput), float)
psds = np.empty((0, num_of_fftoutput), float)
unwrap_phases = np.empty((0, num_of_fftoutput), float)
for num in np.arange( 0, anum, 1): #Führen Sie die FFT durchschnittlich oft durch.
onerecord = allinputarray[num * length : (num + 1 ) * length]
oneoutname = outname + str(num)
if debug : print("..... ..... num = ", num, " len(onerecord) = ", len(onerecord))
freq, spreal, spimag, sppower, spunwrap_phases = scipy_fft(onerecord - np.mean(onerecord), dt, filtname=filtname, pltflag=pltflag, outname = oneoutname, debug = debug)
reals = np.append(reals, [spreal], axis=0) # add power^2/Hz
imags = np.append(imags, [spimag], axis=0) # add power^2/Hz
psds = np.append(psds, [sppower], axis=0) # add power^2/Hz
unwrap_phases = np.append(unwrap_phases, [spunwrap_phases], axis=0) # add radians
averagenum = averagenum + 1
real2 = np.mean(reals, axis=0)
# sum(power^2 + power^2 + .....), and then sqrt{sum(power^2 + power^2 + .....)} --> power/sqrt(Hz)
imag2 = np.mean(imags, axis=0)
# sum(power^2 + power^2 + .....), and then sqrt{sum(power^2 + power^2 + .....)} --> power/sqrt(Hz)
psd2 = np.mean(psds, axis=0)
# sum(power^2 + power^2 + .....), and then sqrt{sum(power^2 + power^2 + .....)} --> power/sqrt(Hz)
phase = np.mean(unwrap_phases, axis=0)
# sum(power^2 + power^2 + .....), and then sqrt{sum(power^2 + power^2 + .....)} --> power/sqrt(Hz)
return(freq,real2,imag2,psd2,averagenum,phase)
def scipy_fft(inputarray,dt,filtname=None, pltflag=False, outname = "default", debug = False):
# updated, 2015/01/25, freq is adjusted to PSP output.
#Führen Sie FFT aus.
bin = len(inputarray) #Holen Sie sich die Länge der Zeitreihendaten. 1024(default)
#Filtereinstellungen
if filtname == None:
filt = np.ones(len(inputarray))
cornorm = 1.0
elif filtname == 'hanning':
filt = sp.hanning(len(inputarray))
# print "filt =" + str(filt)
cornorm = 8./3. #Ein Begriff, der die vom Hanning-Filter gedämpfte Leistung korrigiert.
else:
print('No support for filtname=%s' % filtname)
exit()
###########################################################
# freq = np.arange(0, nyquist, nyquist/adclength)
# This means freq = [12.2, .. 6250.], len(freq) = 512
# because PSP discard 0 freq component.
# freq = sf.fftfreq(bin,dt)[0:bin/2 +1]
# scipy fftfreq return [0, 12.2 .... , -6250, ...], Nyquist is negative.
nyquist = 0.5/dt
adclength = int(0.5 * bin) # 512 (default)
# freq = np.linspace(0, nyquist, adclength + 1)[1:] # this is only for PSP
freq = np.linspace(0, nyquist, adclength + 1)[0:]
############################################################
df = freq[1]-freq[0]
# fft = sf.fft(inputarray*filt,bin)[1:bin/2+1] # this is only for PSP
fft = sf.fft(inputarray*filt,bin)[0:int(bin/2+1)]
# scipy fftfreq return [0, 12.2 .... , -6250, ...], Nyquist is negative.
# so [1:bin/2 + 1] = [1:513] = [12.2 ,,, -6250]
###########################################################################
real = fft.real
imag = fft.imag
# psd = np.abs(fft)
############ this is used for to adjust PSP unit.
# psd2 = np.abs(fft) * np.abs(fft) * cornorm
############ this is used for to adjust matplotlib norm = rms definition.
# psd = np.abs(fft)/np.sqrt(df)/(bin/np.sqrt(2.))
fftnorm = (np.sqrt(df) * bin) / np.sqrt(2.) #Der Begriff, der erforderlich ist, damit die Frequenzintegration zu RMS wird.
psd = np.abs(fft) * np.sqrt(cornorm) / fftnorm
psd2 = psd * psd # Power^2
rms_from_ft = np.sqrt(np.sum(psd2) * df) #Im Frequenzraum integrierter Betrag. Entspricht dem raumzeitlichen Effektivwert.
phase = np.arctan2(np.array(real),np.array(imag)) # radians, * 180. / np.pi # dig
phase_unwrap = np.unwrap(phase)
if pltflag: #Zeichnen Sie die Zeitreihendaten zur Bestätigung auf.
binarray = np.arange(0,bin,1) * dt
F = plt.figure(figsize=(10,8.))
plt.subplots_adjust(wspace = 0.1, hspace = 0.3, top=0.9, bottom = 0.08, right=0.92, left = 0.1)
ax = plt.subplot(3,1,1)
plt.title("check scipy_fft")
plt.xscale('linear')
plt.grid(True)
plt.xlabel(r'Time (s)')
plt.ylabel('FFT input')
plt.errorbar(binarray, inputarray, fmt='r', label="Raw data")
fnorm = np.abs(np.amax(inputarray))
plt.errorbar(binarray, fnorm * filt, fmt='b--', label="Filter")
plt.errorbar(binarray, inputarray * filt, fmt='g', label="Filtered Raw data")
plt.legend(numpoints=1, frameon=False, loc=1)
ax = plt.subplot(3,1,2)
plt.xscale('linear')
plt.yscale('linear')
plt.grid(True)
plt.xlabel(r'Frequency (Hz)')
plt.ylabel(r'Power ($rms/\sqrt{Hz}$)')
plt.errorbar(freq, psd, fmt='r', label="PSD")
plt.legend(numpoints=1, frameon=False, loc=1)
ax = plt.subplot(3,1,3)
plt.xscale('linear')
plt.grid(True)
plt.xlabel(r'Frequency (Hz)')
plt.ylabel('UnWrap Phase (radians)')
plt.errorbar(freq, phase_unwrap, fmt='r', label="phase")
plt.legend(numpoints=1, frameon=False, loc=1)
# plt.show()
outputfiguredir = "scipy_fft_check"
subprocess.getoutput('mkdir -p ' + outputfiguredir)
outfile = outputfiguredir + "/" + "scipy_fftcheck" + "_t" + str(dt).replace(".","p") + "_" + outname + ".png "
plt.savefig(outfile)
if os.path.isfile(outfile):
print("saved as ", outfile)
else:
print("[ERROR] couldn't save as ", outfile)
if debug:
print("=====> please make sure the three RMSs below are almost same. ")
print(". . . . [check RMS] std(input), np.std(input * filt), rms_from_ft = " + str(np.std(inputarray)) + " " + str(np.std(inputarray * filt)) +" " + str(rms_from_ft))
return(freq,real,imag,psd2,phase_unwrap)
class lcfile():
"""
Eine Klasse für eine Lichtkurve.
"""
def __init__ (self, eventfile, debug, hlen = -1, clen = 4, anum = 10, length = 'default', pltflag = False, tcol = 0, vcol = 1, vecol = 2):
"""Lesen Sie den Inhalt der Datei mit der Initialisierungsfunktion init dieser Klasse."""
eventfile = eventfile.strip() #Schließt den Zeilenvorschubcode aus.
self.eventfilename = eventfile
self.ext = eventfile.split(".")[-1]
self.filenametag = os.path.basename(self.eventfilename).replace(".txt","").replace(".text","").replace(".csv","").replace(".qdp","").replace(".lc","").replace(".fits","")
self.filenametag = self.filenametag + "_l" + str(length) + "_a" + str(anum)
self.debug = debug
#extract header information
if os.path.isfile(eventfile):
self.file = open(eventfile)
if self.ext == "lc" or self.ext == "fits":
print(".... file is considered as FITS format ", self.ext)
from astropy.io import fits
hdu = fits.open(eventfile)
self.t = hdu[1].data["TIME"]
self.v = hdu[1].data["RATE"]
self.ve = hdu[1].data["ERROR"]
else:
print(".... file is considered as text format ", self.ext)
self.t = np.array([]) #Zeit
self.v = np.array([]) #Wert, e.g., count/sec
self.ve = np.array([]) #Error
for i, oneline in enumerate(self.file):
if i > (hlen - 1) : # skip header
list = oneline.split()
if list[0][0] is not "!": # skip a line starting with !
if len(list) == clen: # check column number of the input file
self.t = np.append(self.t, float(list[tcol]))
self.v = np.append(self.v, float(list[vcol]))
if vecol > 0:
self.ve = np.append(self.ve, float(list[vecol]))
else:
self.ve = np.append(self.ve, float(0.)) # padding 0.
if debug : print(len(self.t), len(self.v), len(self.ve))
if len(self.t) < 1:
print("[ERROR] no data are stored, ", self.t)
print("Please check the length of column: column of your data = ", len(list), " input param of clen = ", clen)
print("Both should be the same.")
sys.exit()
self.tstart = self.t[0]
self.tstop = self.t[-1]
self.timebin = self.t[1] - self.t[0]
# created dt = t[i+1] - t[i], and add 0 to make it have the same array length as t.
self.dt = self.t[1:] - self.t[0:-1]
self.dt_minus_timebin = self.dt - self.timebin #Beurteilen Sie den Sprung der Zeit. Wenn es keinen Flug gibt, ist es Null.
np.append(self.dt_minus_timebin,0)
print("timebin (sec) = ", self.timebin)
print("start (sec) = ", self.tstart)
print("stop (sec) = ", self.tstop)
print("expected maxinum number of bins = ", int((self.tstop - self.tstart)/self.timebin))
else:
print("ERROR : file does not exist. ", eventfile)
def plot_lc_entire(self):
"""
plot entire lightcurves
"""
F = plt.figure(figsize=(12,8.))
# plt.subplots_adjust(wspace = 0.1, hspace = 0.3, top=0.9, bottom = 0.08, right=0.92, left = 0.1)
ax = plt.subplot(1,1,1)
plt.title(self.filenametag)
plt.xscale('linear')
plt.yscale('linear')
plt.grid(True)
plt.xlabel(r'Time (sec)')
plt.ylabel(r'Count Rate ($cs^{-1}$)')
plt.errorbar(self.t, self.v, self.ve, fmt='ro-', label=self.filenametag, alpha = 0.9)
plt.legend(numpoints=1, frameon=False, loc="best")
outputfiguredir = "fig_lc"
subprocess.getoutput('mkdir -p ' + outputfiguredir)
outfile = outputfiguredir + "/" + self.filenametag + "_lc_entire.png "
plt.savefig(outfile)
if os.path.isfile(outfile):
print("saved as ", outfile)
else:
print("[ERROR] couldn't save as ", outfile)
def create_good_lc(self, anum = 5, length = 512, debug = False):
"""
Wenn während der Länge keine Daten fehlen, werden diese als gut bewertet.
Wenn es einen Sprung in den Daten gibt, werden die Daten bis zu diesem Punkt verworfen und der Sprung in den Daten
Sammeln Sie am Ende neue Datenlängen.
"""
self.good_lc_t = np.array([]) #gute beurteilte Zeit
self.good_lc_v = np.array([]) #guter Wert
self.good_lc_ve = np.array([]) #gut beurteilter Fehler
tmp_t = []
tmp_v = []
tmp_ve = []
num_of_good_interval = 0 #Die Anzahl der Intervalle, die als gut bestimmt wurden.
num_of_discarded_events = 0 #Die Anzahl der Ereignisse, die nicht als gut beurteilt wurden.
for i, (t, v, ve, tdiff) in enumerate(zip(self.t, self.v, self.ve, self.dt_minus_timebin)):
if np.abs(tdiff) > 0.5 * self.timebin: # 0.5 is not important. very small number in tdiff means time is contineous.
"""Erkennt Sprünge in den Daten. Verwerfen Sie die bisher gesammelten Daten."""
print(".-.-.-.-.-. time jump has occurred. ", i, t, v, tdiff)
num_of_discarded_events = num_of_discarded_events + len(tmp_t)
# initialize buffer arrays
tmp_t = []
tmp_v = []
tmp_ve = []
else:
tmp_t.append(t)
tmp_v.append(v)
tmp_ve.append(ve)
if len(tmp_t) == length:
"""Da die Daten nicht springen und die Länge akkumuliert wird, speichern Sie sie in einem Array."""
# store data
self.good_lc_t = np.append(self.good_lc_t, tmp_t)
self.good_lc_v = np.append(self.good_lc_v, tmp_v)
self.good_lc_ve = np.append(self.good_lc_ve, tmp_ve)
num_of_good_interval = num_of_good_interval + 1
# initialize buffer arrays
tmp_t = []
tmp_v = []
tmp_ve = []
# print status
print("def create_good_lc() ")
print("num_of_good_interval (event x length) = ", num_of_good_interval)
print("num_of_discarded_events (event) = ", num_of_discarded_events, "\n")
if debug:
print("self.good_lc_t", len(self.good_lc_t))
print("self.good_lc_v", len(self.good_lc_v))
print("self.good_lc_ve", len(self.good_lc_ve))
if num_of_good_interval < anum:
print("[ERROR] number of good interval", num_of_good_interval, " is smaller than one average number ", anum )
sys.exit()
def calc_fft(self, filtname=None, pltflag = False, anum = 5, length = 512, dccut = True, debug = False):
print("start : in calc_fft()")
#Führen Sie FFT-Anum-Zeiten für eine Zeitreihe mit langer Länge durch und nehmen Sie den Durchschnitt.
#Ein eindimensionales Array für den Puffer zum Speichern des für den Durchschnitt erforderlichen Arrays.
tmp_tlist = np.array([])
tmp_vlist = np.array([])
num_of_averaged_fft = 0 #Zählen Sie die Anzahl der erfolgreichen Mittelungen
#Berechnen Sie die Anzahl der Sequenzen nach FFT(Nach dem DC-Schnitt)
if (length%2==0):
num_of_fftoutput = int(length/2) #Wenn es gerade ist, teilen Sie durch 2.
else:
print("[ERROR] Please choose even number for length ", length)
sys.exit() #Wenn die Länge ungerade ist, wird sie normalerweise nicht verwendet, daher wird ein Fehler ausgegeben und endet.
#Holen Sie sich ein leeres Array. Bereiten Sie ein Array aus dem zweidimensionalen Array vor, um es als zweidimensionales Array mit numpy anzuhängen.
self.freq_ave = np.empty((0, num_of_fftoutput), float)
self.real_ave = np.empty((0, num_of_fftoutput), float)
self.imag_ave = np.empty((0, num_of_fftoutput), float)
self.power_ave = np.empty((0, num_of_fftoutput), float)
self.phase_ave = np.empty((0, num_of_fftoutput), float)
self.power_avenum = np.array([])
self.time_ave = np.array([])
for i, (tlist, vlist) in enumerate(zip(self.good_lc_t, self.good_lc_v)):
#Fügen Sie das Array nach Länge hinzu.
tmp_tlist = np.append(tmp_tlist, tlist)
tmp_vlist = np.append(tmp_vlist, vlist)
#Wenn die Länge 1 oder mehr beträgt und die Anzahl der Elemente durch die durchschnittliche Anzahl der x-Längen teilbar ist
if len(tmp_tlist) > 0 and (len(tmp_tlist)%(anum*length) == 0):
#FFT berechnen.
# freq_Ave Frequenz(Hz) real_ave (Das Quadrat des Realteils), imag_ave(Das Quadrat des Imaginärteils), power_ave(Absolutes Quadrat), avenum(Durchschnittlich oft)
freq_ave, real_ave, imag_ave, power_ave, avenum, phase_ave = \
average_scipy_fft( tmp_vlist, self.timebin, filtname=filtname, \
pltflag=pltflag, outname = self.filenametag, anum = anum, length = length, debug = debug)
if dccut: #Normalerweise ist die Gleichstromkomponente nicht erforderlich. Schneiden Sie sie daher ab.
freq_ave = freq_ave[1:]
real_ave = real_ave[1:]
imag_ave = imag_ave[1:]
power_ave = power_ave[1:]
phase_ave = phase_ave[1:]
num_of_averaged_fft = num_of_averaged_fft + 1 #Fügen Sie die durchschnittliche Anzahl von Malen hinzu
self.time_ave = np.append(self.time_ave, 0.5 * (tmp_tlist[0] + tmp_tlist[-1]) ) #Berechnen Sie den Mittelpunkt
self.power_avenum = np.append(self.power_avenum, avenum) #Sparen Sie durchschnittlich oft
self.freq_ave = np.append(self.freq_ave, [freq_ave], axis=0)
self.real_ave = np.append(self.real_ave, [real_ave], axis=0)
self.imag_ave = np.append(self.imag_ave, [imag_ave], axis=0)
self.power_ave = np.append(self.power_ave, [power_ave], axis=0)
self.phase_ave = np.append(self.phase_ave, [phase_ave], axis=0)
# init buffer arrays
tmp_tlist = np.array([])
tmp_vlist = np.array([])
else:
pass #Es gibt nicht genügend Elemente zum Durchschnitt.
# print status
print("num_of_averaged_fft = ", num_of_averaged_fft)
print("finish : in calc_fft()")
def plot_fft(self):
"""
plot fft
"""
F = plt.figure(figsize=(12,8.))
# plt.subplots_adjust(wspace = 0.1, hspace = 0.3, top=0.9, bottom = 0.08, right=0.92, left = 0.1)
ax = plt.subplot(1,1,1)
plt.title(self.filenametag)
plt.xscale('log')
plt.yscale('log')
plt.grid(True)
plt.xlabel(r'Frequency(Hz)')
plt.ylabel(r'Power ($rms/\sqrt{Hz}$)')
for fre, power, time in zip(self.freq_ave, self.power_ave, self.time_ave):
plt.errorbar(fre, np.sqrt(power), fmt='-', label=" t = " + str(time) + "" , alpha = 0.8) #Leistung ist das Quadrat der Leistung und wird angezeigt, indem die Route genommen wird.
plt.legend(numpoints=1, frameon=False, loc="best")
outputfiguredir = "fig_fft"
subprocess.getoutput('mkdir -p ' + outputfiguredir)
outfile = outputfiguredir + "/" + self.filenametag + "_fft.png "
plt.savefig(outfile)
if os.path.isfile(outfile):
print("saved as ", outfile)
else:
print("[ERROR] couldn't save as ", outfile)
def plot_phase(self):
"""
plot phase
"""
F = plt.figure(figsize=(12,8.))
ax = plt.subplot(1,1,1)
plt.title(self.filenametag)
plt.xscale('linear')
plt.yscale('linear')
plt.grid(True)
plt.xlabel(r'Frequency (Hz)')
plt.ylabel(r'unwarp phase (radians)')
for fre, phase, time in zip(self.freq_ave, self.phase_ave, self.time_ave):
plt.errorbar(fre, phase, fmt='-', label=" t = " + str(time) + "" , alpha = 0.8) #Leistung ist das Quadrat der Leistung und wird angezeigt, indem die Route genommen wird.
plt.legend(numpoints=1, frameon=False, loc="best")
outputfiguredir = "fig_phase"
subprocess.getoutput('mkdir -p ' + outputfiguredir)
outfile = outputfiguredir + "/" + self.filenametag + "_phase.png "
plt.savefig(outfile)
if os.path.isfile(outfile):
print("saved as ", outfile)
else:
print("[ERROR] couldn't save as ", outfile)
def dump_text_fft(self):
"""
dump fft data into text file
"""
odir = "text_fft"
subprocess.getoutput('mkdir -p ' + odir)
for i, (fre, power, time) in enumerate(zip(self.freq_ave, self.power_ave, self.time_ave)):
outfile = odir + "/" + self.filenametag + "_fft_" + str("%06d" % i) + "_" + str(time) + ".txt"
fout = open(outfile, "w")
for f, p in zip(fre,power):
outstr=str(f) + " " + str(p) + "\n" #Frequenz, Leistung im Quadrat
fout.write(outstr)
fout.close()
if os.path.isfile(outfile):
print("saved as ", outfile)
else:
print("[ERROR] couldn't save as ", outfile)
def main():
"""Wenn Sie das Skript ausführen, beginnt es hier."""
curdir = os.getcwd() #Holen Sie sich das aktuelle Verzeichnis
"""Einstellungsoptionen"""
#Optparse-Einstellungen
usage = u'%prog fileList [-d] [-p] [-e 1] [-c 4] [-a 12] [-l 512]'
version = __version__
parser = optparse.OptionParser(usage=usage, version=version)
#Optionseinstellungen.--Das Zeichen danach ist die Optionsvariable.
parser.add_option('-d', '--debug', action='store_true', help='The flag to show detailed information (Flag)', metavar='DEBUG', default=False)
parser.add_option('-p', '--pltflag', action='store_true', help='to plot time-series data', metavar='PLTFLAG', default=False)
parser.add_option('-m', '--hlen', action='store', type='int', help='Header length', metavar='HLEN', default=-1)
parser.add_option('-c', '--clen', action='store', type='int', help='Column length', metavar='CLEN', default=4)
parser.add_option('-a', '--anum', action='store', type='int', help='Average number', metavar='ANUM', default=12)
parser.add_option('-l', '--length', action='store', type='int', help='FFT bin number (e.g., 1024, 2048, ..)', metavar='LENGTH', default=256)
# for setting column number for time, value, and error of value
parser.add_option('-t', '--tcol', action='store', type='int', help='column number for time', metavar='TCOL', default=0)
parser.add_option('-s', '--vcol', action='store', type='int', help='column number for value', metavar='VCOL', default=1)
parser.add_option('-e', '--vecol', action='store', type='int', help='column number for value error (if negative, it skips error column)', metavar='VECOL', default=2)
options, args = parser.parse_args() #Holen Sie sich Argumente und Optionen
print("=========================================================================")
print("Start " + __file__ + " " + version )
print("=========================================================================")
debug = options.debug
pltflag = options.pltflag
hlen = options.hlen
clen = options.clen
anum = options.anum
length = options.length
# for setting column number for time, value, and error of value
tcol = options.tcol
vcol = options.vcol
vecol = options.vecol;
print(" (OPTIONS)")
print("-- debug ", debug)
print("-- pltflag ", pltflag)
print("-- hlen (header number) ", hlen)
print("-- clen (column number) ", clen)
print("-- anum (average number) ", anum)
print("-- length (FFT bin number) ", length)
# for setting column number for time, value, and error of value
print("-- column number for time ", tcol)
print("-- column number for value ", vcol)
print("-- column number for value error ", vecol, " (if negative, it skips.)")
print("do FFT for one length of ", length, " by averaging ", anum, " times" )
print("=========================================================================" )
#Wenn das Argument nicht 1 ist, wird ein Fehler ausgegeben und beendet.
argc = len(args)
if (argc < 1):
print('| STATUS : ERROR ')
print(parser.print_help())
quit()
listname = args[0] # get a file name which contains file names
filelistfile=open(listname, 'r')
"""Erstellen Sie eine Dateiliste"""
filelist = []
for i, filename in enumerate(filelistfile):
print("\n...... reading a file (", i, ") ", filename )
filelist.append(filename)
"""Erstellen Sie eine Klassenliste"""
evelist = []
for i, filename in enumerate(filelist):
print("\n...... creating a class for a file (", i, ") ", filename)
eve = lcfile(filename, debug, hlen = hlen, clen = clen, anum = anum, length = length, pltflag = pltflag,\
tcol = tcol, vcol = vcol, vecol = vecol)
evelist.append(eve)
"""Zeichnen Sie die gesamte Lichtkurve der Eingabedaten.(Sie können es überspringen, da es zur Bestätigung dient.) """
for i, eve in enumerate(evelist):
print("...... creating an entire lightcurve (", i, ") ", eve.eventfilename)
eve.plot_lc_entire()
""" Create good interval.Eine Länge(Xronos-Intervall)Sammeln Sie nur die Daten, die zufrieden sind."""
for i, eve in enumerate(evelist):
print("\n...... creating good interval (", i, ") ", eve.eventfilename)
eve.create_good_lc(anum = anum, length = length, debug = debug)
""" Create calc fft.FFE ausführen."""
for i, eve in enumerate(evelist):
print("\n...... calculating fft (", i, ") ", eve.eventfilename)
eve.calc_fft(filtname=None, pltflag = pltflag, anum = anum, length = length, debug=debug)
""" Create plot fft.Zeichnen Sie das Leistungsspektrum."""
for i, eve in enumerate(evelist):
print("\n...... calculating fft (", i, ") ", eve.eventfilename)
eve.plot_fft()
""" Create plot unwrap phase"""
for i, eve in enumerate(evelist):
print("\n...... calculating phase (", i, ") ", eve.eventfilename)
eve.plot_phase()
""" Dump FFT results into text files .Dump zu einer Textdatei."""
for i, eve in enumerate(evelist):
print("\n...... text dump of fft (", i, ") ", eve.eventfilename)
eve.dump_text_fft()
print("=================== Finish ==========================")
if __name__ == '__main__':
main()
Ana_Timing_plot2d_FFT.py
#!/usr/bin/env python
#-*- coding: utf-8 -*-
""" Ana_Timing_plot2d_FFT.py is to plot lightcurves.
This is a python script for timing analysis.
History:
2016-09-13 ; ver 1.0; first draft from scratch
2020-04-24 ; ver 1.2; updated for python3
"""
__author__ = 'Shinya Yamada (syamada(at)tmu.ac.jp'
__version__= '1.2'
############# scipy ############################################################
import scipy as sp # scipy :Ein Muss für numerische Operationen
import scipy.fftpack as sf #FFT-Modul. Verwendet die FFT-Bibliothek von fortan.
################################################################################
############# numpy ############################################################
import numpy as np # numpy :Ein Muss für die Array-Berechnung
from numpy import linalg as LA
################################################################################
############# matplotlib #######################################################
import matplotlib.pylab as plab
import matplotlib.pyplot as plt
from matplotlib.font_manager import fontManager, FontProperties
import matplotlib.colors as colors
import matplotlib.cm as cm
import matplotlib.mlab as mlab
from matplotlib.ticker import MultipleLocator, FormatStrFormatter, FixedLocator
from mpl_toolkits.axes_grid import AxesGrid
# For 3D plot
from mpl_toolkits.mplot3d import Axes3D
from matplotlib.collections import PolyCollection
from matplotlib.colors import colorConverter
#Anpassung der Etikettengröße
params = {'xtick.labelsize': 13, # x ticks
'ytick.labelsize': 13, # y ticks
'legend.fontsize': 10
}
plt.rcParams['font.family'] = 'serif' #Geben Sie die Schriftart an.(Serife ist in jeder Umgebung.)
plt.rcParams.update(params)
from matplotlib.colors import LogNorm
#################################################################################
import random #Modul zur Erzeugung von Zufallszahlen(no use)
import array # (no use)
import os #Betriebssystembezogene Module
import sys #systembezogene Module
import math #Mathematischer Funktionsbaustein
#import commands #Modul zur Verwendung von Shell-Befehlen für Python2
import subprocess #Modul zur Verwendung von Shell-Befehlen für Python3
# import pyfits #passt Modul
import optparse #Optionsbeziehung
#import yaml
import re #Unterstützung für reguläre Ausdrücke
import csv #CSV-Datei
import datetime #Zeitunterstützung
import pytz # timezoze
# jst = pytz.timezone('Asia/Tokyo')
# PyROOT
#import ROOT
#ROOT.gROOT.SetBatch(1)
class fftfile():
"""
Eine Klasse für eine FFT.
"""
def __init__ (self, eventfile, debug, hlen = -1, clen = 2):
"""Lesen Sie den Inhalt der Datei mit der Initialisierungsfunktion init dieser Klasse."""
eventfile = eventfile.strip() #Schließt den Zeilenvorschubcode aus.
self.eventfilename = eventfile
self.filenametag_full = eventfile.replace(".txt","")
self.filenametag = os.path.basename(self.filenametag_full).replace(".txt","").replace(".text","").replace(".csv","").replace(".qdp","")
self.filenametag = self.filenametag
self.debug = debug
#extract header information
if os.path.isfile(eventfile):
self.file = open(eventfile)
self.f = np.array([]) #Frequenz(Hz)
self.v = np.array([]) #Macht im Quadrat(rms^2/Hz)
for i, oneline in enumerate(self.file):
if i > hlen: # skip header
list = oneline.split()
if list[0][0] is not "!": # skip a line starting with !
if len(list) == clen: # check column number of the input file
self.f = np.append(self.f, float(list[0]))
self.v = np.append(self.v, float(list[1]))
if debug : print(len(self.f), len(self.v))
self.df = self.f[1] - self.f[0]
self.rms_from_ft = np.sqrt(np.sum(self.v) * self.df) #Im Frequenzraum integrierter Betrag. Entspricht dem raumzeitlichen Effektivwert.
self.nbin = len(self.f)
print("Maximum frequency (Hz) = ", self.f[-1] )
print("frequency bin (Hz) = ", self.df)
print("Number of bins = ", self.nbin)
print("RMS over all frequency = ", self.rms_from_ft)
self.file.close()
else:
print("ERROR : file does not exist. ", eventfile)
exit
def plot_2d_FFT(evelist, outname = "defaultoutname", pltflag = False):
"""
plot 2d plot
Nehmen Sie eine Liste von fftfile-Klassen als Argumente und zeichnen Sie die kombinierten Ergebnisse.
"""
power2d = []
for eve in evelist:
power2d.append(eve.v)
power2d = np.array(power2d)
x = range(len(evelist)) #Erstellen Sie ein eindimensionales Array in Zeitrichtung
y = evelist[0].f #Eindimensionale Anordnung von Frequenzen
X, Y = np.meshgrid(x,y) #Führen Sie ein zweidimensionales Array wie ein zweidimensionales Diagramm aus.(proprietäre Spezifikationen von Matplotlib)
Z = power2d.T #Bei der Transponierung wird die horizontale Achse zur Zeitachse.
F = plt.figure(figsize=(12,8.))
# plt.subplots_adjust(wspace = 0.1, hspace = 0.3, top=0.9, bottom = 0.08, right=0.92, left = 0.1)
ax = plt.subplot(1,1,1)
plt.title("Time vs. Frequency")
plt.xscale('linear')
plt.yscale('linear')
plt.grid(True)
plt.xlabel(r'Number of FFT in order of time')
plt.ylabel(r'Frequency ($Hz$)')
ax.set_xlim(x[0],x[-1])
ax.set_ylim(y[0],y[-1])
if (X.shape == Y.shape) and (X.shape == Z.shape) : #Das zweidimensionale Diagramm von matplotlib muss die gleiche Anzahl von Elementen enthalten.
plt.pcolormesh( X, Y, Z, norm=LogNorm(vmin=Z.min(), vmax=Z.max()))
cl = plt.colorbar()
cl.set_label(r"Power ($rms^2/Hz$)")
# plt.legend(numpoints=1, frameon=False, loc="best")
outputfiguredir = "fft2d_fig"
subprocess.getoutput('mkdir -p ' + outputfiguredir)
outfile = outputfiguredir + "/" + outname + "_log.png "
plt.savefig(outfile)
if os.path.isfile(outfile):
print("saved as ", outfile)
else:
print("[ERROR] couldn't save as ", outfile)
if pltflag:
plt.show()
else:
print("\n[ERROR] Please make sure all the following shape are the same.")
print("The same shape is necessary for matplotlib 2D plot.")
print(X.shape, Y.shape, Z.shape)
def main():
"""Wenn Sie das Skript ausführen, beginnt es hier."""
curdir = os.getcwd() #Holen Sie sich das aktuelle Verzeichnis
"""Einstellungsoptionen"""
#Optparse-Einstellungen
usage = u'%prog fileList [-d] [-p] [-e 1] [-c 4] [-a 12] [-l 512]'
version = __version__
parser = optparse.OptionParser(usage=usage, version=version)
#Optionseinstellungen.--Das Zeichen danach ist die Optionsvariable.
parser.add_option('-d', '--debug', action='store_true', help='The flag to show detailed information (Flag)', metavar='DEBUG', default=False)
parser.add_option('-p', '--pltflag', action='store_true', help='to plot time-series data', metavar='PLTFLAG', default=False)
parser.add_option('-e', '--hlen', action='store', type='int', help='Header length', metavar='HLEN', default=-1)
parser.add_option('-c', '--clen', action='store', type='int', help='Column length', metavar='CLEN', default=2)
parser.add_option('-o', '--outname', action='store', type='string', help='Output file name', metavar='OUTNAME', default='fft2dplot')
options, args = parser.parse_args() #Holen Sie sich Argumente und Optionen
print("=========================================================================")
print("Start " + __file__ + " " + version )
print("=========================================================================")
debug = options.debug
pltflag = options.pltflag
hlen = options.hlen
clen = options.clen
outname = options.outname
print(" (OPTIONS)")
print("-- debug ", debug)
print("-- pltflag ", pltflag)
print("-- hlen (header number) ", hlen)
print("-- clen (column number) ", clen)
print("-- outname ", outname)
print("=========================================================================" )
#Wenn das Argument nicht 1 ist, wird ein Fehler ausgegeben und beendet.
argc = len(args)
if (argc < 1):
print('| STATUS : ERROR ')
print(parser.print_help())
quit()
listname = args[0] # get a file name which contains file names
listnametag = listname.replace(".list","")
outname = outname + "_" + listnametag
filelistfile=open(listname, 'r')
"""Erstellen Sie eine Dateiliste"""
filelist = []
for i, filename in enumerate(filelistfile):
print("\n...... reading a file (", i, ") ", filename )
filelist.append(filename)
"""Erstellen Sie eine Klassenliste"""
evelist = []
for i, filename in enumerate(filelist):
print("\n...... creating a class for a file (", i, ") ", filename)
eve = fftfile(filename, debug, hlen = hlen, clen = clen)
evelist.append(eve)
""" Time vs.Machen Sie eine zweidimensionale Darstellung der FFT. Hinweis)Um genau zu sein, ist die horizontale Achse die Anzahl der Dateien, die in chronologischer Reihenfolge und nicht in der Zeit FFTed werden."""
print("\n...... creating 2D FFT ")
plot_2d_FFT(evelist, outname = outname, pltflag = pltflag)
print("=================== Finish ==========================")
if __name__ == '__main__':
main()
Recommended Posts