[PYTHON] Try implementing a Shazam-like voice fingerprint algorithm

Like Shazam, a technology called ** Audio Fingerprinting ** is used to quickly identify the music information that is playing from the voice recorded on the smartphone (also in the outline of Content based music retrieval). enter). This search technology does not seem to be very straightforward considering the lo-fi sound quality of smartphones, noise such as environmental sounds, and the huge music database, but in reality it was put into practical use in various services long ago. It was also installed in Siri. I think there are several different approaches to voice fingerprinting, but the most mainstream and easy-to-understand method is to "hash and save / retrieve spectrogram peaks." It is said that it is called ** "Landmark-based fingerprinting" ** in the neighborhood. This time, I would like to implement fingerprint technology using this method. For reference, see this blog: Audio Fingerprinting with Python and Numpy And this paper: Wang, Avery. "An Industrial Strength Audio Search Algorithm." Ismir. Vol. 2003. 2003.

LabROSA has an open source project called AUD FPRINT: AUDFPRINT - Audio fingerprint database creation + query

Spectrogram transformation and peak detection

First, STFT the voice signal and then find the peak point of the T-F display. Here, search in the manner of image processing. A scipy maximum filter is applied to the spectrogram, and points where the result is equal to the original spectrogram value are considered peaks (smaller values are unconditionally excluded). It may be good to apply a 2D low-pass filter to the preprocessing.

analyze.py


def find_peaks(y,size):
    """
    y:Voice signal
    size:maximum filter size(Control peak density)
    """
    sgram = np.abs(stft(y,n_fft=512,hop_length=256))
    sgram_max = ndi.maximum_filter(sgram,size=size,mode="constant")
    maxima = (sgram==sgram_max) & (sgram > 0.2)
    peaks_freq,peaks_time = np.where(maxima)
    return peaks_freq,peaks_time

I was able to get a list of peak points using numpy.where. Also called a landmark.

Peak pairing → hashing

A single peak has only the frequency information of that peak (the timing of the query is arbitrary, so the time position of the peak is meaningless), and it cannot be used as it is by hashing. Therefore, we connect nearby peaks to make two peaks into one fingerprint. This pair of peaks contains ** (peak 1 frequency, peak 2 frequency, time difference) ** information, and you can concatenate these values to create a hash that is less likely to overlap. I will. Specifically, as shown in the figure, points within a certain range ahead when viewed from a certain anchor point are connected.

shazam1.png

In the code below, after selecting a pair, ** (frequency of peak 1, frequency difference, time difference) ** is packed into a 20-bit number and hashed.

analyze.py


F1_BITS = 8
DF_BITS = 6
DT_BITS = 6

B1_MASK = (1<<F1_BITS) - 1
B1_SHIFT = DF_BITS + DT_BITS
DF_MASK = (1<<DF_BITS) - 1
DF_SHIFT = DT_BITS
DT_MASK = (1<<DT_BITS) - 1

def peaks_to_landmarks(peaks_freq,peaks_time,target_dist=30,target_time=30,target_freq=30):
    peak_mat = np.zeros((np.max(peaks_freq)+target_freq,np.max(peaks_time)+target_dist+target_time),dtype=np.bool)
    peak_mat[peaks_freq,peaks_time] = True
    list_landmarks = []
    for pfreq,ptime in zip(peaks_freq,peaks_time):
        #(pfreq,ptime) -- current anchor
        target_mask = np.zeros(peak_mat.shape,dtype=np.bool)
        target_mask[pfreq-target_freq:pfreq+target_freq,ptime+target_dist:py+target_dist+target_time] = 1
        #target_mask:Specify the range to select target peak
        targets_freq,targets_time = np.where(peak_mat & target_mask)
        for pfreq_target,ptime_target in zip(targets_freq,targets_time):
            dtime = ptime_target-ptime
            #Build hash
            hsh = (((pfreq & B1_MASK)<<B1_SHIFT) | (((pfreq_target+target_freq-pfreq)&DF_MASK)<<DF_SHIFT)|(dtime&DT_MASK))
            list_landmarks.append((hsh,ptime))   #Save hash and peak 1 time
        
        
    return list_landmarks

This hash value is associated with ** (song ID, peak 1 time position) ** and registered in the hash table. I'm using a python dictionary object.

#hash_table:python dictionary object
for landmark in list_landmarks:
    hsh = landmark[0]
    starttime = landmark[1]
    if hash_table.has_key(hsh):
        hash_table[hsh].append((audio_id,starttime))
    else:
        hash_table[hsh] = [(audio_id,starttime)]

After registering all hashes, save them with the pickle module.

Query

A total of 497 songs used as a data set for chord progression recognition have been created in a database. It took a long time, but the size of the pickled hash table was about 250MB. As a test, I played the Beatles ** "Back In the U.S.S.R" ** from the dataset and made a recording of about 10 seconds with a pocket recorder. It's a pretty ideal recording because it was recorded indoors, but I'll give it a try. I STFTed the recorded voice in the same way as above, detected the peak, and found the fingerprint (hash value). So how do you get to the song from here? I wrote a Database class that manages hashtables and queries.

Database.py


DIFF_RANGE = 2000000

class Database:
    def __init__(self):
        with open("hashtable.pkl","rb") as f:
            self.hash_table = pickle.load(f)
        with open("titlelist.pkl","rb") as f:
            self.title_list = pickle.load(f)
    #Method 1
	def query_histogram(self,y,best=5):
		list_landmarks = analyze.extract_landmarks(y)
		id_candidates = self._find_candidate_id(list_landmarks)
		candidate_landmark_histogram = np.zeros(id_candidates.size,dtype=np.int32)
		for landmark in list_landmarks:
			hsh = landmark[0]
			if self.hash_table.has_key(hsh):
				hit_ids = [item[0] for item in self.hash_table[hsh]]
				for i in hit_ids:
					candidate_landmark_histogram[np.where(id_candidates==i)[0]] += 1
			else:
				continue
		
		idxsort = np.argsort(candidate_landmark_histogram)[::-1]
		title_sort = self.title_list[id_candidates[idxsort]]
		fingerprint_histogram_sort = candidate_landmark_histogram[idxsort]
		return title_sort[:best],fingerprint_histogram_sort[:best]

    #Method 2
    def query_diff_histogram(self,y,best=5):
        list_landmarks = analyze.extract_landmarks(y)
        id_candidates = self._find_candidate_id(list_landmarks)
        diff_histogram = np.zeros((id_candidates.size,DIFF_RANGE),dtype=np.int32)
        diff_start_offset = np.zeros(id_candidates.size,dtype=np.int32)
        
        for landmark in list_landmarks:
            hsh_query = landmark[0]
            offset_query = landmark[1]
            if self.hash_table.has_key(hsh_query):
                hits = self.hash_table[hsh_query]
                for hit in hits:
                    id_hit = hit[0]
                    offset_hit = hit[1]
                    diff = offset_hit - offset_query
                    if diff >= 0:
                        id_hist = np.where(id_candidates==id_hit)[0]
                        diff_histogram[id_hist,diff] += 1
                        diff_start_offset[id_hist] = min([diff_start_offset[id_hist],offset_query])
            else:
                continue
            
        candidate_match_score = np.max(diff_histogram,axis=1)
        candidate_max_offset = np.argmax(diff_histogram,axis=1)
        idxsort = np.argsort(candidate_match_score)[::-1]
        title_sort = self.title_list[id_candidates[idxsort]]
        match_score_sort = candidate_match_score[idxsort]
        best_match_offset = (candidate_max_offset[idxsort[0]] - diff_start_offset[idxsort[0]]) * 256/11025.0
        return title_sort[:best],match_score_sort[:best],best_match_offset
        
                    
                    
    def _find_candidate_id(self,landmarks):
        candidate = set()
        for landmark in landmarks:
            hsh = landmark[0]
            if self.hash_table.has_key(hsh):
                id_hits = [item[0] for item in self.hash_table[hsh]]
                candidate = candidate.union(id_hits)
            else:
                continue
            
        return np.array(list(candidate),dtype=np.int32)

analyze.extract_landmarks (y) integrates peak detection and hashing.

analyze.py


def extract_landmarks(y,peak_dist=20):
    peaks_x,peaks_y = find_peaks(y,peak_dist)
    landmarks = peaks_to_landmarks(peaks_x,peaks_y)
    return landmarks

1. Count the number of hits

Corresponds to Database.query_histogram. Each hash registered in the database is associated with a song ID (one or more). Check all the hashes included in the recording, count the number of hits to the song, and the song with the most hits is the correct answer. This is the simplest method, but what about performance? I tried to display the top 5 songs with the number of hits. plot_hist.png

The query took around 0.5 seconds (hash table load time is no-can). The most hash matches with the original song of Back In The U.S.S.R. It's a correct answer, but the difference with other songs is not so noticeable. If the recording is a little worse, it will soon give an incorrect answer. Another problem is that this method doesn't allow you to find out which part of the song the recording matched. It may not be necessary depending on the application, but I would like to know anyway.

2. Count fingerprints that are "in line"

Select the correct song and the other two songs, and plot the query voice and the number ** (time position in the original voice, time position in the recording) ** of the hit fingerprint on a 2D plane. plot_offset.png

Even if the song is incorrect, the number of hash hits may be moderate, but the distribution of points is different in each case. On the other hand, in the correct match (top plot), there is a part where the points are lined up diagonally. This means that the fingerprint is a continuous hit within a certain period of time. For this reason, when you register the hash, you save the time position in the original song. Therefore, the one with many "lined up" fingerprints is judged to be the correct answer. Since they are lined up diagonally, that is, the difference between x and y is a constant (on the straight line x-y = b), a histogram of (x-y) is obtained for each song, and the maximum value is regarded as the score. Corresponds to Database.query_diff_histogram. The result is here. plot_diff.png

It's easy to understand and there is a difference. The query took about 1.8 seconds. There seems to be room for improvement in implementation efficiency, but it is OK as a demonstration of the principle. Since the amount of calculation is N (the number of hashes of query voice), it should not deteriorate much even if the database becomes larger. The best_match_offset of the source code finds the position of the query audio in the original song. This also solved the problem of where it matched.

Further improvement measures

It should be noted that this method is based on the premise that ** "the original song and the query have the same playback speed and no modulation" **. If the playback speed changes or is modulated overall, the relative time distance between landmarks and the frequency distance (because of the linear frequency range) will expand or contract, making it impossible to match with hashes. It's not a big problem in practice, but a solution has been proposed.

Summary

Introduced a simple voice fingerprint algorithm. audfprint is also landmark-based, but peak detection and query also seem to be implemented in a slightly different way. You may want to read the source code. If you can understand it, I would like to write another article.

Recommended Posts

Try implementing a Shazam-like voice fingerprint algorithm
Implementing a simple algorithm in Python 2
[Python] Try optimizing FX systole parameters with a genetic algorithm
Try to model a multimodal distribution using the EM algorithm