[xiph-cvs] cvs commit: positron/positron MP3Info.py

Brendan Cully brendan at xiph.org
Wed Jul 2 11:02:12 PDT 2003



See http://wiki.xiph.org/MP3DetectionIsHard :)

For verifying an MP3 is really an MP3, I like to check that the next
MP3 frame is where it's supposed to be, by calculating the length of
the current frame from its header. That's a fairly simple
operation...

On Wednesday, 02 July 2003 at 13:57, Stan Seibert wrote:
> volsung     03/07/02 13:57:30
> 
>   Modified:    positron MP3Info.py
>   Log:
>   More fixes to hopefully improve MP3 detection:
>   
>   * Check a random location in the middle of the file for a frame header
>   
>   * If a bad header is found during a linear search, keep going
>   
>   * ID3v2 frame lengths (not the length of the whole ID3v2 tag) are sometimes
>   sync-safe integers and sometimes not depending on minor version of the v2
>   tags.  Patch from Alec Mitchell <apm13 at columbia.edu> fixes this.  Hopefully
>   closes bug 367 and 377.
> 
> Revision  Changes    Path
> 1.9       +81 -23    positron/positron/MP3Info.py
> 
> Index: MP3Info.py
> ===================================================================
> RCS file: /usr/local/cvsroot/positron/positron/MP3Info.py,v
> retrieving revision 1.8
> retrieving revision 1.9
> diff -u -r1.8 -r1.9
> +++ MP3Info.py	2 Jul 2003 17:57:30 -0000	1.9
> @@ -31,6 +31,7 @@
>  
>  import struct
>  import string
> +import random
>  
>  def _from_synch_safe(synchsafe):
>      if isinstance(synchsafe, type(1)):
> @@ -94,8 +95,13 @@
>          size = ()
>          if version == 2:
>              size = struct.unpack('!3b', file.read(3))
> -        elif version == 3 or version == 4:
> +            self.size = (size[0] * 256 + size[1]) * 256 + size[2]
> +        elif version == 3:
> +            size = struct.unpack('!L', file.read(4))
> +            self.size = size[0]
> +        elif version == 4:
>              size = struct.unpack('!4b', file.read(4))
> +            self.size = _from_synch_safe(size)
>  
>          if version == 3:  # abc00000 def00000
>              (flags,) = struct.unpack('!1b', file.read(1))
> @@ -118,7 +124,6 @@
>              self.f_unsynchronization       = flags >> 1 & 1 #n
>              self.f_data_length_indicator   = flags >> 0 & 1 #p
>  
> -        self.size = _from_synch_safe(size)
>          self.data = _strip_zero(file.read(self.size))
>  
>  _genres = [
> @@ -269,7 +274,7 @@
>  
>  _emphases = [ "none", "50/15 ms", "reserved", "CCIT J.17" ]
>  
> -_MP3_HEADER_SEEK_LIMIT = 4096
> +_MP3_HEADER_SEEK_LIMIT = 500000
>  
>  class MPEG:
>      def __init__(self, file, seeklimit=_MP3_HEADER_SEEK_LIMIT, seekstart=0):
> @@ -292,49 +297,103 @@
>          self.emphasis = ""
>          self.length = 0
>  
> +
> +        # First do a check to see if this is really an MPEG file.
> +        #
> +        # The longest possible frame for any MPEG audio file
> +        # is 4609 bytes for a MPEG 2, Layer 1 256 kbps, 8000Hz with
> +        # a padding slot.  Add an extra 4 bytes to ensure we get the
> +        # next header and round up to a multiple of 4 to get the magic
> +        # number 4616.  If this is an MPEG file, then from a random
> +        # point in the middle (far away from the tag stupidity), we
> +        # should always find an MPEG frame header in any 4616 byte
> +        # substring.
> +        #
> +        # We pick a location in the middle 50% of the file to
> +        # do a header test.  If it passes, then we proceed with parsing
> +        # (using much less restrictive searching)
> +        test_pos = int(random.uniform(0.25,0.75) * self.filesize)
> +
> +        offset, header = self._find_header(file, seeklimit=4616,
> +                                           seekstart=test_pos)
> +        if offset == -1 or header is None:
> +            raise Error("Failed MPEG frame test.")
> +            
> +        # Now we can look for the first header
>          offset, header = self._find_header(file, seeklimit, seekstart)
>          if offset == -1 or header is None:
>              raise Error("Could not find MPEG header")
>  
> -        self._parse_header(header)
> -        ### offset + framelength will find another header. verify??
> +        # Note that _find_header already parsed the header
> +        
>          if not self.valid:
>              raise Error("MPEG header not valid")
>  
>          self._parse_xing(file, seeklimit, seekstart)
> -        
> -
> +    
>      def _find_header(self, file, seeklimit=_MP3_HEADER_SEEK_LIMIT,
> -                     seekstart=0):
> -        file.seek(seekstart, 0)
> -        header = file.read(4) # see if we get lucky with the first four bytes
> +                     seekstart=0, check_next_header=True):
> +        amt = 5120  # Multiple of 512 is hopefully more efficient to read from
> +                    # disk, and size ensure the random test will only
> +                    # read once
>          curr_pos = 0
> -        amt = 1024
> +        read_more = False
> +
> +        file.seek(seekstart, 0)
> +        header = file.read(amt)
>          
> -        while len(header) <= seeklimit:
> -            
> +        while curr_pos <= seeklimit:            
>              # look for the sync byte
>              offset = string.find(header, chr(255), curr_pos)
> +            #print curr_pos + seekstart
>              if offset == -1:
>                  curr_pos = len(header)  # Header after everything so far
> +                read_more = True
>              elif offset + 4 > len(header):
>                  curr_pos = offset  # Need to read more, jump back here later
> +                read_more = True
>              elif ord(header[offset+1]) & 0xE0 == 0xE0:
> -                return seekstart+offset, header[offset:offset+4]
> +
> +                # Finish now if we should not check the next header
> +                if not check_next_header:
> +                    return seekstart+offset, header[offset:offset+4]
> +
> +                # We have a possible winner, test parse this header and
> +                # check if the next header is in the right place.
> +                # WARNING: _parse_header has side effects!  This should
> +                # be fixed, though in this case it does not matter.
> +                self._parse_header(header[offset:offset+4])
> +                    
> +                if self.valid:
> +                    next_off, next_header = \
> +                              self._find_header(file, seeklimit=0,
> +                                                seekstart=seekstart+offset
> +                                                        +self.framelength,
> +                                                check_next_header=False)
> +                    if next_off != -1:
> +                        return seekstart+offset, header[offset:offset+4]
> +                    else:
> +                        curr_pos = offset+2
> +                else:
> +                    curr_pos = offset+2
> +                    
>              else:
>                  curr_pos = offset+2 # Gotta be after the 2 bytes we looked at
>  
> -            chunk = file.read(amt)  # Read bigger chunks
> -            header += chunk
> -
> -            if len(chunk) == 0:
> -                # no more to read, give up
> -                return -1, None
> +            if read_more and curr_pos <= seeklimit:
> +                chunk = file.read(amt)
> +                if len(chunk) == 0:
> +                    # no more to read, give up
> +                    return -1, None
> +                else:
> +                    header += chunk
>          
>          # couldn't find the header
>          return -1, None
>  
>      def _parse_header(self, header):
> +        self.valid = 0 # Assume the worst until proven otherwise
> +        
>          # AAAAAAAA AAABBCCD EEEEFFGH IIJJKLMM
>          (bytes,) = struct.unpack('>i', header)
>          mpeg_version =    (bytes >> 19) & 3  # BB   00 = MPEG2.5, 01 = res, 10 = MPEG2, 11 = MPEG1  
> @@ -450,9 +509,8 @@
>              self.id3 = id3v2
>  
>          if id3v2.valid:
> -            # We'll be generous for files with ID3v2 tags.
> -            self.mpeg = MPEG(file, seekstart=id3v2.header_size,
> -                             seeklimit=10*_MP3_HEADER_SEEK_LIMIT)
> +            # ID3v2 size (header_size) doesn't include 10 bytes of header
> +            self.mpeg = MPEG(file, seekstart=id3v2.header_size+10)
>          else:
>              # Header better be near the beginning if there is no ID3v2
>              self.mpeg = MPEG(file)
> 
> 
> 
> List archives:  http://www.xiph.org/archives/
> Ogg project homepage: http://www.xiph.org/ogg/
> To unsubscribe from this list, send a message to 'cvs-request at xiph.org'
> containing only the word 'unsubscribe' in the body.  No subject is needed.
> Unsubscribe messages sent to the list will be ignored/filtered.
--- >8 ----
List archives:  http://www.xiph.org/archives/
Ogg project homepage: http://www.xiph.org/ogg/
To unsubscribe from this list, send a message to 'cvs-request at xiph.org'
containing only the word 'unsubscribe' in the body.  No subject is needed.
Unsubscribe messages sent to the list will be ignored/filtered.



More information about the commits mailing list