""" The main `Echo Nest`_ `Remix API`_ module for manipulating audio files and their associated `Echo Nest`_ `Analyze API`_ analyses. AudioData, and getpieces by Robert Ochshorn on 2008-06-06. Some refactoring and everything else by Joshua Lifton 2008-09-07. Refactoring by Ben Lacker 2009-02-11. Other contributions by Adam Lindsay. :group Base Classes: AudioAnalysis, AudioRenderable, AudioData, AudioData32 :group Audio-plus-Analysis Classes: AudioFile, LocalAudioFile, LocalAnalysis :group Building Blocks: AudioQuantum, AudioSegment, AudioQuantumList, ModifiedRenderable :group Effects: AudioEffect, LevelDB, AmplitudeFactor, TimeTruncateFactor, TimeTruncateLength, Simultaneous :group Exception Classes: FileTypeError, EchoNestRemixError :group Audio helper functions: getpieces, mix, assemble, megamix :group ffmpeg helper functions: ffmpeg, settings_from_ffmpeg, ffmpeg_error_check :group Utility functions: chain_from_mixed, _dataParser, _attributeParser, _segmentsParser .. _Analyze API: http://developer.echonest.com/pages/overview?version=2 .. _Remix API: http://code.google.com/p/echo-nest-remix/ .. _Echo Nest: http://the.echonest.com/ """ __version__ = "$Revision: 0 $" # $Source$ import hashlib import numpy import os import sys import StringIO import struct import subprocess import tempfile import wave from pyechonest import track import pyechonest.util import echonest.selection as selection import pyechonest.config as config #from echonest.support import stupidxml import xml.etree.ElementTree as etree import xml.dom.minidom as minidom class AudioAnalysis(track.Track) : """ This class wraps `echonest.web` to allow transparent caching of the audio analysis of an audio file. For example, the following script will display the bars of a track twice:: from echonest import * a = audio.AudioAnalysis('YOUR_TRACK_ID_HERE') a.bars a.bars The first time `a.bars` is called, a network request is made of the `Echo Nest`_ `Analyze API`_. The second time time `a.bars` is called, the cached value is returned immediately. An `AudioAnalysis` object can be created using an existing ID, as in the example above, or by specifying the audio file to upload in order to create the ID, as in:: a = audio.AudioAnalysis(filename='FULL_PATH_TO_AUDIO_FILE') .. _Analyze API: http://developer.echonest.com/pages/overview?version=2 .. _Echo Nest: http://the.echonest.com/ """ def __init__( self, audio): """ Constructor. If the argument is a valid local path or a URL, the track ID is generated by uploading the file to the `Echo Nest`_ `Analyze API`_\. Otherwise, the argument is assumed to be the track ID. :param audio: A string representing either a path to a local file, a valid URL, or the ID of a file that has already been uploaded for analysis. .. _Analyze API: http://developer.echonest.com/pages/overview?version=2 .. _Echo Nest: http://the.echonest.com/ """ if type(audio) is not str: # Argument is invalid. raise TypeError("Argument 'audio' must be a string representing either a filename, track ID, or MD5.") super(AudioAnalysis, self).__init__(audio) self.source = None self._bars = None self._beats = None self._tatums = None self._sections = None self._segments = None @property def bars(self): if self._bars is None: self._bars = _dataParser('bar', super(AudioAnalysis, self).bars) self._bars.attach(self) return self._bars @property def beats(self): if self._beats is None: self._beats = _dataParser('beat', super(AudioAnalysis, self).beats) self._beats.attach(self) return self._beats @property def tatums(self): if self._tatums is None: self._tatums = _dataParser('tatum', super(AudioAnalysis, self).tatums) self._tatums.attach(self) return self._tatums @property def sections(self): if self._sections is None: self._sections = _attributeParser('section', super(AudioAnalysis, self).sections) self._sections.attach(self) return self._sections @property def segments(self): if self._segments is None: self._segments = _segmentsParser(super(AudioAnalysis, self).segments) self._segments.attach(self) return self._segments def __getstate__(self): """ Eliminates the circular reference for pickling. """ dictclone = self.__dict__.copy() del dictclone['source'] return dictclone def __setstate__(self, state): """ Recreates circular references after unpickling. """ self.__dict__.update(state) for cached_var in AudioAnalysis.CACHED_VARIABLES: if type(object.__getattribute__(self, cached_var)) == AudioQuantumList: object.__getattribute__(self, cached_var).attach(self) class AudioRenderable(object): """ An object that gives an `AudioData` in response to a call to its `render`\() method. Intended to be an abstract class that helps enforce the `AudioRenderable` protocol. Picked up a couple of convenience methods common to many descendants. Every `AudioRenderable` must provide three things: render() A method returning the `AudioData` for the object. The rhythmic duration (point at which any following audio is appended) is signified by the `endindex` accessor, measured in samples. source An accessor pointing to the `AudioData` that contains the original sample data of (a superset of) this audio object. duration An accessor returning the rhythmic duration (in seconds) of the audio object. """ def resolve_source(self, alt): """ Given an alternative, fallback `alt` source, return either `self`'s source or the alternative. Throw an informative error if no source is found. Utility code that ended up being replicated in several places, so it ended up here. Not necessary for use in the RenderableAudioObject protocol. """ if hasattr(self, 'source'): source = self.source else: if isinstance(alt, AudioData): source = alt else: print >> sys.stderr, self.__repr__() raise EchoNestRemixError("%s has no implicit or explicit source during rendering." % (self.__class__.__name__, )) return source @staticmethod def init_audio_data(source, num_samples): """ Convenience function for rendering: return a pre-allocated, zeroed `AudioData`. """ if source.numChannels > 1: newchans = source.numChannels newshape = (num_samples, newchans) else: newchans = 1 newshape = (num_samples,) return AudioData32(shape=newshape, sampleRate=source.sampleRate, numChannels=newchans, defer=False) def sources(self): return set([self.source]) def encode(self, filename): """ Shortcut function that takes care of the need to obtain an `AudioData` object first, through `render`. """ self.render().encode(filename) class AudioData(AudioRenderable): """ Handles audio data transparently. A smart audio container with accessors that include: sampleRate samples per second numChannels number of channels data a `numpy.array`_ .. _numpy.array: http://docs.scipy.org/doc/numpy/reference/generated/numpy.array.html """ def __init__(self, filename=None, ndarray = None, shape=None, sampleRate=None, numChannels=None, defer=False, verbose=True): """ Given an input `ndarray`, import the sample values and shape (if none is specified) of the input `numpy.array`. Given a `filename` (and an input ndarray), use ffmpeg to convert the file to wave, then load the file into the data, auto-detecting the sample rate, and number of channels. :param filename: a path to an audio file for loading its sample data into the AudioData.data :param ndarray: a `numpy.array`_ instance with sample data :param shape: a tuple of array dimensions :param sampleRate: sample rate, in Hz :param numChannels: number of channels .. _numpy.array: http://docs.scipy.org/doc/numpy/reference/generated/numpy.array.html """ self.verbose = verbose if (filename is not None) and (ndarray is None) : if sampleRate is None or numChannels is None: # force sampleRate and numChannels to 44100 hz, 2 sampleRate, numChannels = 44100, 2 parsestring = ffmpeg(filename, overwrite=False, verbose=self.verbose) ffmpeg_error_check(parsestring[1]) sampleRate, numChannels = settings_from_ffmpeg(parsestring[1]) self.defer = defer self.filename = filename self.sampleRate = sampleRate self.numChannels = numChannels self.convertedfile = None self.endindex = 0 if shape is None and isinstance(ndarray, numpy.ndarray) and not self.defer: self.data = numpy.zeros(ndarray.shape, dtype=numpy.int16) elif shape is not None and not self.defer: self.data = numpy.zeros(shape, dtype=numpy.int16) elif not self.defer and self.filename: self.data = None self.load() else: self.data = None if ndarray is not None and self.data is not None: self.endindex = len(ndarray) self.data[0:self.endindex] = ndarray def load(self): if isinstance(self.data, numpy.ndarray): return if self.filename.lower().endswith(".wav") and (self.sampleRate, self.numChannels) == (44100, 2): file_to_read = self.filename elif self.convertedfile: file_to_read = self.convertedfile else: foo, self.convertedfile = tempfile.mkstemp(".wav") result = ffmpeg(self.filename, self.convertedfile, overwrite=True, numChannels=self.numChannels, sampleRate=self.sampleRate, verbose=self.verbose) ffmpeg_error_check(result[1]) file_to_read = self.convertedfile w = wave.open(file_to_read, 'r') numFrames = w.getnframes() raw = w.readframes(numFrames) sampleSize = numFrames * self.numChannels data = numpy.frombuffer(raw, dtype=" 1: ndarray.resize((numFrames, self.numChannels)) self.data = numpy.zeros(ndarray.shape, dtype=numpy.int16) self.endindex = 0 if ndarray is not None: self.endindex = len(ndarray) self.data = ndarray def __getitem__(self, index): """ Fetches a frame or slice. Returns an individual frame (if the index is a time offset float or an integer sample number) or a slice if the index is an `AudioQuantum` (or quacks like one). """ if not isinstance(self.data, numpy.ndarray) and self.defer: self.load() if isinstance(index, float): index = int(index*self.sampleRate) elif hasattr(index, "start") and hasattr(index, "duration"): index = slice(float(index.start), index.start+index.duration) if isinstance(index, slice): if ( hasattr(index.start, "start") and hasattr(index.stop, "duration") and hasattr(index.stop, "start") ) : index = slice(index.start.start, index.stop.start+index.stop.duration) if isinstance(index, slice): return self.getslice(index) else: return self.getsample(index) def getslice(self, index): "Help `__getitem__` return a new AudioData for a given slice" if not isinstance(self.data, numpy.ndarray) and self.defer: self.load() if isinstance(index.start, float): index = slice(int(index.start*self.sampleRate), int(index.stop*self.sampleRate), index.step) return AudioData(None, self.data[index], sampleRate=self.sampleRate, numChannels=self.numChannels, defer=False) def getsample(self, index): """ Help `__getitem__` return a frame (all channels for a given sample index) """ if not isinstance(self.data, numpy.ndarray) and self.defer: self.load() if isinstance(index, int): return self.data[index] else: #let the numpy array interface be clever return AudioData(None, self.data[index], defer=False) def pad_with_zeros(self, num_samples): if num_samples > 0: if self.numChannels == 1: extra_shape = (num_samples,) else: extra_shape = (num_samples, self.numChannels) self.data = numpy.append(self.data, numpy.zeros(extra_shape, dtype=numpy.int16), axis=0) def append(self, as2): "Appends the input to the end of this `AudioData`." extra = len(as2.data) - (len(self.data) - self.endindex) self.pad_with_zeros(extra) self.data[self.endindex:self.endindex+len(as2)] += as2.data self.endindex += as2.endindex def sum(self, as2): extra = len(as2.data) - len(self.data) self.pad_with_zeros(extra) compare_limit = min(len(as2.data), len(self.data)) - 1 self.data[:compare_limit] += as2.data[:compare_limit] def add_at(self, time, as2): offset = int(time * self.sampleRate) extra = offset + len(as2.data) - len(self.data) self.pad_with_zeros(extra) if as2.numChannels < self.numChannels: as2.data = numpy.repeat(as2.data, self.numChannels).reshape(len(as2), self.numChannels) self.data[offset:offset+len(as2.data)] += as2.data def __len__(self): if self.data is not None: return len(self.data) else: return 0 def __add__(self, other): """Supports stuff like this: sound3 = sound1 + sound2""" return assemble([self, other], numChannels=self.numChannels, sampleRate=self.sampleRate) def encode(self, filename=None, mp3=None): """ Outputs an MP3 or WAVE file to `filename`. Format is determined by `mp3` parameter. """ if not mp3 and filename.lower().endswith('.wav'): mp3 = False else: mp3 = True if mp3: foo, tempfilename = tempfile.mkstemp(".wav") os.close(foo) else: tempfilename = filename fid = open(tempfilename, 'wb') # Based on Scipy svn # http://projects.scipy.org/pipermail/scipy-svn/2007-August/001189.html fid.write('RIFF') fid.write(struct.pack('> sys.stderr, "Deleting: %s" % tempfilename os.remove(tempfilename) return filename def unload(self): self.data = None if self.convertedfile: if self.verbose: print >> sys.stderr, "Deleting: %s" % self.convertedfile os.remove(self.convertedfile) self.convertedfile = None def render(self, start=0.0, to_audio=None, with_source=None): if not to_audio: return self if with_source != self: return to_audio.add_at(start, self) return @property def duration(self): return float(self.endindex) / self.sampleRate @property def source(self): return self class AudioData32(AudioData): """A 32-bit variant of AudioData, intended for data collection on audio rendering with headroom.""" def __init__(self, filename=None, ndarray = None, shape=None, sampleRate=None, numChannels=None, defer=False, verbose=True): """ Special form of AudioData to allow for headroom when collecting samples. """ self.verbose = verbose if (filename is not None) and (ndarray is None) : if sampleRate is None or numChannels is None: # force sampleRate and numChannels to 44100 hz, 2 sampleRate, numChannels = 44100, 2 parsestring = ffmpeg(filename, overwrite=False, verbose=self.verbose) ffmpeg_error_check(parsestring[1]) sampleRate, numChannels = settings_from_ffmpeg(parsestring[1]) self.defer = defer self.filename = filename self.sampleRate = sampleRate self.numChannels = numChannels self.convertedfile = None self.normalized = None if shape is None and isinstance(ndarray, numpy.ndarray) and not self.defer: self.data = numpy.zeros(ndarray.shape, dtype=numpy.int32) elif shape is not None and not self.defer: self.data = numpy.zeros(shape, dtype=numpy.int32) elif not self.defer and self.filename: self.load() else: self.data = None self.endindex = 0 if ndarray is not None and self.data is not None: self.endindex = len(ndarray) self.data[0:self.endindex] = ndarray def load(self): if isinstance(self.data, numpy.ndarray): return if self.filename.lower().endswith(".wav") and (self.sampleRate, self.numChannels) == (44100, 2): file_to_read = self.filename elif self.convertedfile: file_to_read = self.convertedfile else: foo, self.convertedfile = tempfile.mkstemp(".wav") result = ffmpeg(self.filename, self.convertedfile, overwrite=True, numChannels=self.numChannels, sampleRate=self.sampleRate, verbose=self.verbose) ffmpeg_error_check(result[1]) file_to_read = self.convertedfile w = wave.open(file_to_read, 'r') numFrames = w.getnframes() raw = w.readframes(numFrames) sampleSize = numFrames * self.numChannels data = numpy.frombuffer(raw, dtype=" 1: ndarray.resize((numFrames, self.numChannels)) self.data = numpy.zeros(ndarray.shape, dtype=numpy.int32) self.endindex = 0 if ndarray is not None: self.endindex = len(ndarray) self.data[0:self.endindex] = ndarray def encode(self, filename=None, mp3=None): """ Outputs an MP3 or WAVE file to `filename`. Format is determined by `mp3` parameter. """ self.normalize() if not mp3 and filename.lower().endswith('.wav'): mp3 = False else: mp3 = True if mp3: foo, tempfilename = tempfile.mkstemp(".wav") else: tempfilename = filename fid = open(tempfilename, 'wb') # Based on Scipy svn # http://projects.scipy.org/pipermail/scipy-svn/2007-August/001189.html fid.write('RIFF') fid.write(struct.pack('> sys.stderr, "Deleting: %s" % tempfilename os.remove(tempfilename) return filename def normalize(self): """Return to 16-bit for encoding.""" if self.numChannels == 1: self.normalized = numpy.zeros((self.data.shape[0],), dtype=numpy.int16) else: self.normalized = numpy.zeros((self.data.shape[0], self.data.shape[1]), dtype=numpy.int16) factor = 32767.0 / numpy.max(numpy.absolute(self.data.flatten())) # If the max was 32768, don't bother scaling: if factor < 1.000031: self.normalized[:len(self.data)] += self.data * factor else: self.normalized[:len(self.data)] += self.data def pad_with_zeros(self, num_samples): if num_samples > 0: if self.numChannels == 1: extra_shape = (num_samples,) else: extra_shape = (num_samples, self.numChannels) self.data = numpy.append(self.data, numpy.zeros(extra_shape, dtype=numpy.int32), axis=0) def ffmpeg(infile, outfile=None, overwrite=True, bitRate=None, numChannels=None, sampleRate=None, verbose=True): """ Executes ffmpeg through the shell to convert or read media files. """ command = "en-ffmpeg" if overwrite: command += " -y" command += " -i \"" + infile + "\"" if bitRate is not None: command += " -ab " + str(bitRate) + "k" if numChannels is not None: command += " -ac " + str(numChannels) if sampleRate is not None: command += " -ar " + str(sampleRate) if outfile is not None: command += " \"%s\"" % outfile if verbose: print >> sys.stderr, command p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return p.communicate() def settings_from_ffmpeg(parsestring): """ Parses the output of ffmpeg to determine sample rate and frequency of an audio file. """ parse = parsestring.split('\n') freq, chans = 44100, 2 for line in parse: if "Stream #0" in line and "Audio" in line: segs = line.split(", ") for s in segs: if "Hz" in s: #print "Found: "+str(s.split(" ")[0])+"Hz" freq = int(s.split(" ")[0]) elif "stereo" in s: #print "stereo" chans = 2 elif "mono" in s: #print "mono" chans = 1 return freq, chans ffmpeg_install_instructions = """ en-ffmpeg not found! Please make sure ffmpeg is installed and create a link as follows: sudo ln -s `which ffmpeg` /usr/local/bin/en-ffmpeg """ def ffmpeg_error_check(parsestring): "Looks for known errors in the ffmpeg output" parse = parsestring.split('\n') for num, line in enumerate(parse): if "Unknown format" in line or "error occur" in line: raise RuntimeError("ffmpeg conversion error:\n\t" + "\n\t".join(parse[num:])) if "command not found" in line: raise RuntimeError(ffmpeg_install_instructions) def getpieces(audioData, segs): """ Collects audio samples for output. Returns a new `AudioData` where the new sample data is assembled from the input audioData according to the time offsets in each of the elements of the input segs (commonly an `AudioQuantumList`). :param audioData: an `AudioData` object :param segs: an iterable containing objects that may be accessed as slices or indices for an `AudioData` """ #calculate length of new segment audioData.data = None audioData.load() dur = 0 for s in segs: dur += int(s.duration*audioData.sampleRate) # if I wanted to add some padding to the length, I'd do it here #determine shape of new array if len(audioData.data.shape) > 1: newshape = (dur, audioData.data.shape[1]) newchans = audioData.data.shape[1] else: newshape = (dur,) newchans = 1 #make accumulator segment newAD = AudioData(shape=newshape,sampleRate=audioData.sampleRate, numChannels=newchans, defer=False) #concatenate segs to the new segment for s in segs: newAD.append(audioData[s]) # audioData.unload() return newAD def assemble(audioDataList, numChannels=1, sampleRate=44100): """ Collects audio samples for output. Returns a new `AudioData` object assembled by concatenating all the elements of audioDataList. :param audioDatas: a list of `AudioData` objects """ if numChannels == 1: new_shape = (sum([len(x.data) for x in audioDataList]),) else: new_shape = (sum([len(x.data) for x in audioDataList]),numChannels) new_data = AudioData(shape=new_shape, numChannels=numChannels, sampleRate=sampleRate, defer=False) for ad in audioDataList: if not isinstance(ad, AudioData): raise TypeError('Encountered something other than an AudioData') new_data.append(ad) return new_data def mix(dataA,dataB,mix=0.5): """ Mixes two `AudioData` objects. Assumes they have the same sample rate and number of channels. Mix takes a float 0-1 and determines the relative mix of two audios. i.e., mix=0.9 yields greater presence of dataA in the final mix. """ if dataA.endindex > dataB.endindex: newdata = AudioData(ndarray=dataA.data, sampleRate=dataA.sampleRate, numChannels=dataA.numChannels, defer=False) newdata.data *= float(mix) newdata.data[:dataB.endindex] += dataB.data[:] * (1 - float(mix)) else: newdata = AudioData(ndarray=dataB.data, sampleRate=dataB.sampleRate, numChannels=dataB.numChannels, defer=False) newdata.data *= 1 - float(mix) newdata.data[:dataA.endindex] += dataA.data[:] * float(mix) return newdata def megamix(dataList): """ Mix together any number of `AudioData` objects. Keep the shape of the first one in the list. Assume they all have the same sample rate and number of channels. """ if not isinstance(dataList, list): raise TypeError('input must be a list of AudioData objects') newdata = AudioData(shape=dataList[0].data.shape, sampleRate=dataList[0].sampleRate, numChannels=dataList[0].numChannels, defer=False) for adata in dataList: if not isinstance(adata, AudioData): raise TypeError('input must be a list of AudioData objects') if len(adata) > len(newdata): newseg = AudioData(ndarray=adata[:newdata.endindex].data, numChannels=newdata.numChannels, sampleRate=newdata.sampleRate, defer=False) newseg.endindex = newdata.endindex else: newseg = AudioData(ndarray=adata.data, numChannels=newdata.numChannels, sampleRate=newdata.sampleRate, defer=False) newseg.endindex = adata.endindex newdata.data[:newseg.endindex] += newseg.data / float(len(dataList)) newdata.endindex = len(newdata) return newdata class LocalAudioFile(AudioData): """ The basic do-everything class for remixing. Acts as an `AudioData` object, but with an added `analysis` selector which is an `AudioAnalysis` object. It conditianally uploads the file it was initialized with. If the file is already known to the Analyze API, then it does not bother uploading the file. """ def __init__(self, filename, verbose=True, defer=False): """ :param filename: path to a local MP3 file """ trackID = hashlib.md5(file(filename, 'rb').read()).hexdigest() if verbose: print >> sys.stderr, "Computed MD5 of file is " + trackID try: if verbose: print >> sys.stderr, "Probing for existing analysis" tempanalysis = AudioAnalysis(trackID) metadata = tempanalysis.metadata if metadata.get('status')=='UNAVAILABLE': if verbose: print >> sys.stderr, "Track found, analysis not found. Analyzing..." tempanalysis.analyze(wait=True) else: if verbose: print >> sys.stderr, "Analysis found. No upload needed." self.analysis = AudioAnalysis(trackID) except pyechonest.util.EchoNestAPIThingIDError: if verbose: print >> sys.stderr, "Analysis not found. Uploading..." self.analysis = AudioAnalysis(filename) AudioData.__init__(self, filename=filename, verbose=verbose, defer=defer) self.analysis.source = self def toxml(self, context=None): track = etree.Element("trackinfo") track.attrib['id'] = self.analysis.identifier track.attrib['filename'] = self.filename metadata = etree.Element("metadata", attrib=self.analysis.metadata) track.append(metadata) if context: return track else: return minidom.parseString(track).toprettyxml() @property def duration(self): """ Since we consider `AudioFile` to be an evolved version of `AudioData`, we return the measured duration from the analysis. """ return self.analysis.duration def __setstate__(self, state): """ Recreates circular reference after unpickling. """ self.__dict__.update(state) self.analysis.source = weakref.proxy(self) class LocalAnalysis(object): """ Like `LocalAudioFile`, it conditionally uploads the file with which it was initialized. Unlike `LocalAudioFile`, it is not a subclass of `AudioData`, so contains no sample data. """ def __init__(self, filename, verbose=True): """ :param filename: path to a local MP3 file """ trackID = hashlib.md5(file(filename, 'rb').read()).hexdigest() if verbose: print >> sys.stderr, "Computed MD5 of file is " + trackID try: if verbose: print >> sys.stderr, "Probing for existing analysis" tempanalysis = AudioAnalysis(trackID) tempanalysis.metadata metadata = tempanalysis.metadata if metadata.get('status')=='UNAVAILABLE': if verbose: print >> sys.stderr, "Track found, analysis not found. Analyzing..." tempanalysis.analyze(wait=True) self.analysis = AudioAnalysis(trackID) if verbose: print >> sys.stderr, "Analysis found. No upload needed." except pyechonest.util.EchoNestAPIThingIDError: if verbose: print >> sys.stderr, "Analysis not found. Uploading..." self.analysis = AudioAnalysis(filename) # no AudioData.__init__() class AudioQuantum(AudioRenderable) : """ A unit of musical time, identified at minimum with a start time and a duration, both in seconds. It most often corresponds with a `section`, `bar`, `beat`, `tatum`, or (by inheritance) `segment` obtained from an Analyze API call. Additional properties include: end computed time offset for convenience: `start` + `duration` container a circular reference to the containing `AudioQuantumList`, created upon creation of the `AudioQuantumList` that covers the whole track """ def __init__(self, start=0, duration=0, kind=None, confidence=None, source=None) : """ Initializes an `AudioQuantum`. :param start: offset from the start of the track, in seconds :param duration: length of the `AudioQuantum` :param kind: string containing what kind of rhythm unit it came from :param confidence: float between zero and one """ self.start = start self.duration = duration self.kind = kind self.confidence = confidence self._source = source def get_end(self): return self.start + self.duration end = property(get_end, doc=""" A computed property: the sum of `start` and `duration`. """) def get_source(self): "Returns itself or its parent." if self._source: return self._source else: source = None try: source = self.container.source except AttributeError: source = None return source def set_source(self, value): if isinstance(value, AudioData): self._source = value else: raise TypeError("Source must be an instance of echonest.audio.AudioData") source = property(get_source, set_source, doc=""" The `AudioData` source for the AudioQuantum. """) def parent(self): """ Returns the containing `AudioQuantum` in the rhythm hierarchy: a `tatum` returns a `beat`, a `beat` returns a `bar`, and a `bar` returns a `section`. """ pars = {'tatum': 'beats', 'beat': 'bars', 'bar': 'sections'} try: uppers = getattr(self.container.container, pars[self.kind]) return uppers.that(selection.overlap(self))[0] except LookupError: # Might not be in pars, might not have anything in parent. return None def children(self): """ Returns an `AudioQuantumList` of the AudioQuanta that it contains, one step down the hierarchy. A `beat` returns `tatums`, a `bar` returns `beats`, and a `section` returns `bars`. """ chils = {'beat': 'tatums', 'bar': 'beats', 'section': 'bars'} try: downers = getattr(self.container.container, chils[self.kind]) return downers.that(selection.are_contained_by(self)) except LookupError: return None def group(self): """ Returns the `children`\() of the `AudioQuantum`\'s `parent`\(). In other words: 'siblings'. If no parent is found, then return the `AudioQuantumList` for the whole track. """ if self.parent(): return self.parent().children() else: return self.container def prev(self, step=1): """ Step backwards in the containing `AudioQuantumList`. Returns `self` if a boundary is reached. """ group = self.container try: loc = group.index(self) new = max(loc - step, 0) return group[new] except: return self def next(self, step=1): """ Step forward in the containing `AudioQuantumList`. Returns `self` if a boundary is reached. """ group = self.container try: loc = group.index(self) new = min(loc + step, len(group)) return group[new] except: return self def __str__(self): """ Lists the `AudioQuantum`.kind with start and end times, in seconds, e.g.:: "segment (20.31 - 20.42)" """ return "%s (%.2f - %.2f)" % (self.kind, self.start, self.end) def __repr__(self): """ A string representing a constructor, including kind, start time, duration, and (if it exists) confidence, e.g.:: "AudioQuantum(kind='tatum', start=42.198267, duration=0.1523394)" """ if self.confidence is not None: return "AudioQuantum(kind='%s', start=%f, duration=%f, confidence=%f)" % (self.kind, self.start, self.duration, self.confidence) else: return "AudioQuantum(kind='%s', start=%f, duration=%f)" % (self.kind, self.start, self.duration) def local_context(self): """ Returns a tuple of (*index*, *length*) within rhythm siblings, where *index* is the (zero-indexed) position within its `group`\(), and *length* is the number of siblings within its `group`\(). """ group = self.group() count = len(group) try: loc = group.index(self) except: # seem to be some uncontained beats loc = 0 return (loc, count,) def absolute_context(self): """ Returns a tuple of (*index*, *length*) within the containing `AudioQuantumList`, where *index* is the (zero-indexed) position within its container, and *length* is the number of siblings within the container. """ group = self.container count = len(group) loc = group.index(self) return (loc, count,) def context_string(self): """ Returns a one-indexed, human-readable version of context. For example:: "bar 4 of 142, beat 3 of 4, tatum 2 of 3" """ if self.parent() and self.kind != "bar": return "%s, %s %i of %i" % (self.parent().context_string(), self.kind, self.local_context()[0] + 1, self.local_context()[1]) else: return "%s %i of %i" % (self.kind, self.absolute_context()[0] + 1, self.absolute_context()[1]) def __getstate__(self): """ Eliminates the circular reference for pickling. """ dictclone = self.__dict__.copy() if 'container' in dictclone: del dictclone['container'] return dictclone def toxml(self, context=None): attributedict = {'duration': str(self.duration), 'start': str(self.start)} try: if not(hasattr(context, 'source') and self.source == context.source): attributedict['source'] = self.source.analysis.identifier except: pass xml = etree.Element(self.kind, attrib=attributedict) if context: return xml else: return minidom.parseString(xml).toprettyxml() def render(self, start=0.0, to_audio=None, with_source=None): if not to_audio: source = self.resolve_source(with_source) return source[self] if with_source != self.source: return to_audio.add_at(start, with_source[self]) return class AudioSegment(AudioQuantum): """ Subclass of `AudioQuantum` for the data-rich segments returned by the Analyze API. """ def __init__(self, start=0., duration=0., pitches=[], timbre=[], loudness_begin=0., loudness_max=0., time_loudness_max=0., loudness_end=None, kind='segment', source=None): """ Initializes an `AudioSegment`. :param start: offset from start of the track, in seconds :param duration: duration of the `AudioSegment`, in seconds :param pitches: a twelve-element list with relative loudnesses of each pitch class, from C (pitches[0]) to B (pitches[11]) :param timbre: a twelve-element list with the loudness of each of a principal component of time and/or frequency profile :param kind: string identifying the kind of AudioQuantum: "segment" :param loudness_begin: loudness in dB at the start of the segment :param loudness_max: loudness in dB at the loudest moment of the segment :param time_loudness_max: time (in sec from start of segment) of loudest moment :param loudness_end: loudness at end of segment (if it is given) """ self.start = start self.duration = duration self.pitches = pitches self.timbre = timbre self.loudness_begin = loudness_begin self.loudness_max = loudness_max self.time_loudness_max = time_loudness_max if loudness_end: self.loudness_end = loudness_end self.kind = kind self.confidence = None self._source = source class ModifiedRenderable(AudioRenderable): """Class that contains any AudioRenderable, but overrides the render() method with nested effects, called sequentially on the result of the preceeding effect.""" def __init__(self, original, effects=[]): if isinstance(original, ModifiedRenderable): self._original = original._original self._effects = original._effects + effects else: self._original = original self._effects = effects @property def duration(self): dur = self._original.duration for effect in self._effects: if hasattr(effect, 'duration'): dur = effect.duration(dur) return dur @property def source(self): return self._original.source @property def sources(self): return self._original.sources def render(self, start=0.0, to_audio=None, with_source=None): if not to_audio: source = self.resolve_source(with_source) base = self._original.render(with_source=with_source) copy = AudioData32(ndarray=base.data, sampleRate=base.sampleRate, numChannels=base.numChannels, defer=False) for effect in self._effects: copy = effect.modify(copy) return copy if with_source != self.source: return base = self._original.render(with_source=with_source) copy = AudioData32(ndarray=base.data, shape=base.data.shape, sampleRate=base.sampleRate, numChannels=base.numChannels, defer=False) for effect in self._effects: copy = effect.modify(copy) to_audio.add_at(start, copy) return def toxml(self, context=None): outerattributedict = {'duration': str(self.duration)} node = etree.Element("modified_audioquantum", attrib=outerattributedict) innerattributedict = {'duration': str(self._original.duration), 'start': str(self._original.start)} try: if not(hasattr(context, 'source') and self.source == context.source): innerattributedict['source'] = self.source.analysis.identifier except: pass orignode = etree.Element(self._original.kind, attrib=innerattributedict) node.append(orignode) fx = etree.Element('effects') for effect in self._effects: fxdict = {'id': '%s.%s' % (effect.__module__, effect.__class__.__name__)} fxdict.update(effect.__dict__) fx.append(etree.Element('effect', attrib=fxdict)) node.append(fx) if context: return node else: return minidom.parseString(node).toprettyxml() class AudioEffect(object): def __call__(self, aq): return ModifiedRenderable(aq, [self]) class LevelDB(AudioEffect): def __init__(self, change): self.change = change def modify(self, adata): adata.data *= pow(10.,self.change/20.) return adata class AmplitudeFactor(AudioEffect): def __init__(self, change): self.change = change def modify(self, adata): adata.data *= self.change return adata class TimeTruncateFactor(AudioEffect): def __init__(self, factor): self.factor = factor def duration(self, old_duration): return old_duration * self.factor def modify(self, adata): endindex = int(self.factor * len(adata)) if self.factor > 1: adata.pad_with_zeros(endindex - len(adata)) adata.endindex = endindex return adata[:endindex] class TimeTruncateLength(AudioEffect): def __init__(self, new_duration): self.new_duration = new_duration def duration(self, old_duration): return self.new_duration def modify(self, adata): endindex = int(self.new_duration * adata.sampleRate) if self.new_duration > adata.duration: adata.pad_with_zeros(endindex - len(adata)) adata.endindex = endindex return adata[:endindex] class AudioQuantumList(list, AudioRenderable): """ A container that enables content-based selection and filtering. A `List` that contains `AudioQuantum` objects, with additional methods for manipulating them. When an `AudioQuantumList` is created for a track via a call to the Analyze API, `attach`\() is called so that its container is set to the containing `AudioAnalysis`, and the container of each of the `AudioQuantum` list members is set to itself. Additional accessors now include AudioQuantum elements such as `start`, `duration`, and `confidence`, which each return a List of the corresponding properties in the contained AudioQuanta. A special name is `kinds`, which returns a List of the `kind` of each `AudioQuantum`. If `AudioQuantumList.kind` is "`segment`", then `pitches`, `timbre`, `loudness_begin`, `loudness_max`, `time_loudness_max`, and `loudness_end` are available. """ def __init__(self, initial = None, kind = None, container = None, source = None): """ Initializes an `AudioQuantumList`. All parameters are optional. :param initial: a `List` type with the initial contents :param kind: a label for the kind of `AudioQuantum` contained within :param container: a reference to the containing `AudioAnalysis` :param source: a reference to the `AudioData` with the corresponding samples and time base for the contained AudioQuanta """ list.__init__(self) self.kind = None self._source = None if isinstance(initial, AudioQuantumList): self.kind = initial.kind self.container = initial.container self._source = initial.source if kind: self.kind = kind if container: self.container = container if source: self._source = source if initial: self.extend(initial) def get_many(attribute): def fun(self): """ Returns a list of %s for each `AudioQuantum`. """ % attribute return [getattr(x, attribute) for x in list.__iter__(self)] return fun def get_many_if_segment(attribute): def fun(self): """ Returns a list of %s for each `Segment`. """ % attribute if self.kind == 'segment': return [getattr(x, attribute) for x in list.__iter__(self)] else: raise AttributeError("<%s> only accessible for segments" % (attribute,)) return fun def get_duration(self): return sum(self.durations) #return sum([x.duration for x in self]) def get_source(self): "Returns its own or its parent's source." if self._source: return self._source else: try: source = self.container.source except AttributeError: source = self[0].source return source def set_source(self, value): "Checks input to see if it is an `AudioData`." if isinstance(value, AudioData): self._source = value else: raise TypeError("Source must be an instance of echonest.audio.AudioData") durations = property(get_many('duration')) kinds = property(get_many('kind')) start = property(get_many('start')) confidence = property(get_many('confidence')) pitches = property(get_many_if_segment('pitches')) timbre = property(get_many_if_segment('timbre')) loudness_begin = property(get_many_if_segment('loudness_begin')) loudness_max = property(get_many_if_segment('loudness_max')) time_loudness_max = property(get_many_if_segment('time_loudness_max')) loudness_end = property(get_many_if_segment('loudness_end')) source = property(get_source, set_source, doc=""" The `AudioData` source for the `AudioQuantumList`. """) duration = property(get_duration, doc=""" Total duration of the `AudioQuantumList`. """) def sources(self): ss = set() for aq in list.__iter__(self): ss.update(aq.sources()) return ss def that(self, filt): """ Method for applying a function to each of the contained `AudioQuantum` objects. Returns a new `AudioQuantumList` of the same `kind` containing the `AudioQuantum` objects for which the input function is true. See `echonest.selection` for example selection filters. :param filt: a function that takes one `AudioQuantum` and returns a `True` value `None` :change: experimenting with a filter-only form """ out = AudioQuantumList(kind=self.kind) out.extend(filter(filt, self)) return out def ordered_by(self, function, descending=False): """ Returns a new `AudioQuantumList` of the same `kind` with the original elements, but ordered from low to high according to the input function acting as a key. See `echonest.sorting` for example ordering functions. :param function: a function that takes one `AudioQuantum` and returns a comparison key :param descending: when `True`, reverses the sort order, from high to low """ out = AudioQuantumList(kind=self.kind) out.extend(sorted(self, key=function, reverse=descending)) return out def beget(self, source, which=None): """ There are two basic forms: a map-and-flatten and an converse-that. The basic form, with one `function` argument, returns a new `AudioQuantumList` so that the source function returns `None`, one, or many AudioQuanta for each `AudioQuantum` contained within `self`, and flattens them, in order. :: beats.beget(the_next_ones) A second form has the first argument `source` as an `AudioQuantumList`, and a second argument, `which`, is used as a filter for the first argument, for *each* of `self`. The results are collapsed and accordianned into a flat list. For example, calling:: beats.beget(segments, which=overlap) Gets evaluated as:: for beat in beats: return segments.that(overlap(beat)) And all of the `AudioQuantumList`\s that return are flattened into a single `AudioQuantumList`. :param source: A function of one argument that is applied to each `AudioQuantum` of `self`, or an `AudioQuantumList`, in which case the second argument is required. :param which: A function of one argument that acts as a `that`\() filter on the first argument if it is an `AudioQuantumList`, or as a filter on the output, in the case of `source` being a function. """ out = AudioQuantumList() if isinstance(source, AudioQuantumList): if not which: raise TypeError("'beget' requires a second argument, 'which'") out.extend(chain_from_mixed([source.that(which(x)) for x in self])) else: out.extend(chain_from_mixed(map(source, self))) if which: out = out.that(which) return out def attach(self, container): """ Create circular references to the containing `AudioAnalysis` and for the contained `AudioQuantum` objects. """ self.container = container for i in self: i.container = self def __getstate__(self): """ Eliminates the circular reference for pickling. """ dictclone = self.__dict__.copy() if 'container' in dictclone: del dictclone['container'] return dictclone def toxml(self, context=None): xml = etree.Element("sequence") xml.attrib['duration'] = str(self.duration) if not context: xml.attrib['source'] = self.source.analysis.identifier for s in self.sources(): xml.append(s.toxml()) elif self._source: try: if self.source != context.source: xml.attrib['source'] = self.source.analysis.identifier except: pass for x in list.__iter__(self): xml.append(x.toxml(context=self)) if context: return xml else: return minidom.parseString(xml).toprettyxml() def render(self, start=0.0, to_audio=None, with_source=None): if not to_audio: dur = 0 tempsource = self.source or list.__getitem__(self, 0).source for aq in list.__iter__(self): dur += int(aq.duration * tempsource.sampleRate) to_audio = self.init_audio_data(tempsource, dur) if not hasattr(with_source, 'data'): for tsource in self.sources(): this_start = start for aq in list.__iter__(self): aq.render(start=this_start, to_audio=to_audio, with_source=tsource) this_start += aq.duration if tsource.defer: tsource.unload() return to_audio else: if with_source not in self.sources(): return for aq in list.__iter__(self): aq.render(start=start, to_audio=to_audio, with_source=with_source) start += aq.duration class Simultaneous(AudioQuantumList): """ Stacks all contained AudioQuanta atop one another, adding their respective samples. The rhythmic length of the segment is the duration of the first `AudioQuantum`, but there can be significant overlap caused by the longest segment. Sample usage:: Simultaneous(a.analysis.bars).encode("my.mp3") """ def __init__(self, *args, **kwargs): AudioQuantumList.__init__(self, *args, **kwargs) def get_duration(self): try: return self[0].duration except: return 0. duration = property(get_duration, doc=""" Rhythmic duration of the `Simultaneous` AudioQuanta: the same as the duration of the first in the list. """) def toxml(self, context=None): xml = etree.Element("parallel") xml.attrib['duration'] = str(self.duration) if not context: xml.attrib['source'] = self.source.analysis.identifier elif self.source != context.source: try: xml.attrib['source'] = self.source.analysis.identifier except: pass for x in list.__iter__(self): xml.append(x.toxml(context=self)) if context: return xml else: return minidom.parseString(xml).toprettyxml() def render(self, start=0.0, to_audio=None, with_source=None): if not to_audio: tempsource = self.source or list.__getitem__(self, 0).source dur = int(max(self.durations) * tempsource.sampleRate) to_audio = self.init_audio_data(tempsource, dur) if not hasattr(with_source, 'data'): for source in self.sources(): for aq in list.__iter__(self): aq.render(start=start, to_audio=to_audio, with_source=source) if source.defer: source.unload() return to_audio else: if with_source not in self.sources(): return else: for aq in list.__iter__(self): aq.render(start=start, to_audio=to_audio, with_source=with_source) def _dataParser(tag, nodes): out = AudioQuantumList(kind=tag) for n in nodes: out.append(AudioQuantum(start=n['start'], kind=tag, confidence=n['confidence'])) if len(out) > 1: for i in range(len(out) - 1) : out[i].duration = out[i+1].start - out[i].start out[-1].duration = out[-2].duration return out def _attributeParser(tag, nodes): out = AudioQuantumList(kind=tag) for n in nodes : out.append(AudioQuantum(n['start'], n['duration'], tag)) return out def _segmentsParser(nodes): out = AudioQuantumList(kind='segment') for n in nodes: out.append(AudioSegment(start=n['start'], duration=n['duration'], pitches=n['pitches'], timbre=n['timbre'], loudness_begin=n['loudness_begin'], loudness_max=n['loudness_max'], time_loudness_max=n['time_loudness_max'], loudness_end=n['loudness_end'])) return out def chain_from_mixed(iterables): """ Helper function to flatten a list of elements and lists into a list of elements. """ for y in iterables: try: iter(y) for element in y: yield element except: yield y class FileTypeError(Exception): def __init__(self, filename, message): self.filename = filename self.message = message def __str__(self): return self.message+': '+self.filename class EchoNestRemixError(Exception): """ Error raised by the Remix API. """ pass