[xiph-cvs] cvs commit: positron/positron MP3Info.py
Brendan Cully
brendan at xiph.org
Wed Jul 2 11:02:12 PDT 2003
See http://wiki.xiph.org/MP3DetectionIsHard :)
For verifying an MP3 is really an MP3, I like to check that the next
MP3 frame is where it's supposed to be, by calculating the length of
the current frame from its header. That's a fairly simple
operation...
On Wednesday, 02 July 2003 at 13:57, Stan Seibert wrote:
> volsung 03/07/02 13:57:30
>
> Modified: positron MP3Info.py
> Log:
> More fixes to hopefully improve MP3 detection:
>
> * Check a random location in the middle of the file for a frame header
>
> * If a bad header is found during a linear search, keep going
>
> * ID3v2 frame lengths (not the length of the whole ID3v2 tag) are sometimes
> sync-safe integers and sometimes not depending on minor version of the v2
> tags. Patch from Alec Mitchell <apm13 at columbia.edu> fixes this. Hopefully
> closes bug 367 and 377.
>
> Revision Changes Path
> 1.9 +81 -23 positron/positron/MP3Info.py
>
> Index: MP3Info.py
> ===================================================================
> RCS file: /usr/local/cvsroot/positron/positron/MP3Info.py,v
> retrieving revision 1.8
> retrieving revision 1.9
> diff -u -r1.8 -r1.9
> +++ MP3Info.py 2 Jul 2003 17:57:30 -0000 1.9
> @@ -31,6 +31,7 @@
>
> import struct
> import string
> +import random
>
> def _from_synch_safe(synchsafe):
> if isinstance(synchsafe, type(1)):
> @@ -94,8 +95,13 @@
> size = ()
> if version == 2:
> size = struct.unpack('!3b', file.read(3))
> - elif version == 3 or version == 4:
> + self.size = (size[0] * 256 + size[1]) * 256 + size[2]
> + elif version == 3:
> + size = struct.unpack('!L', file.read(4))
> + self.size = size[0]
> + elif version == 4:
> size = struct.unpack('!4b', file.read(4))
> + self.size = _from_synch_safe(size)
>
> if version == 3: # abc00000 def00000
> (flags,) = struct.unpack('!1b', file.read(1))
> @@ -118,7 +124,6 @@
> self.f_unsynchronization = flags >> 1 & 1 #n
> self.f_data_length_indicator = flags >> 0 & 1 #p
>
> - self.size = _from_synch_safe(size)
> self.data = _strip_zero(file.read(self.size))
>
> _genres = [
> @@ -269,7 +274,7 @@
>
> _emphases = [ "none", "50/15 ms", "reserved", "CCIT J.17" ]
>
> -_MP3_HEADER_SEEK_LIMIT = 4096
> +_MP3_HEADER_SEEK_LIMIT = 500000
>
> class MPEG:
> def __init__(self, file, seeklimit=_MP3_HEADER_SEEK_LIMIT, seekstart=0):
> @@ -292,49 +297,103 @@
> self.emphasis = ""
> self.length = 0
>
> +
> + # First do a check to see if this is really an MPEG file.
> + #
> + # The longest possible frame for any MPEG audio file
> + # is 4609 bytes for a MPEG 2, Layer 1 256 kbps, 8000Hz with
> + # a padding slot. Add an extra 4 bytes to ensure we get the
> + # next header and round up to a multiple of 4 to get the magic
> + # number 4616. If this is an MPEG file, then from a random
> + # point in the middle (far away from the tag stupidity), we
> + # should always find an MPEG frame header in any 4616 byte
> + # substring.
> + #
> + # We pick a location in the middle 50% of the file to
> + # do a header test. If it passes, then we proceed with parsing
> + # (using much less restrictive searching)
> + test_pos = int(random.uniform(0.25,0.75) * self.filesize)
> +
> + offset, header = self._find_header(file, seeklimit=4616,
> + seekstart=test_pos)
> + if offset == -1 or header is None:
> + raise Error("Failed MPEG frame test.")
> +
> + # Now we can look for the first header
> offset, header = self._find_header(file, seeklimit, seekstart)
> if offset == -1 or header is None:
> raise Error("Could not find MPEG header")
>
> - self._parse_header(header)
> - ### offset + framelength will find another header. verify??
> + # Note that _find_header already parsed the header
> +
> if not self.valid:
> raise Error("MPEG header not valid")
>
> self._parse_xing(file, seeklimit, seekstart)
> -
> -
> +
> def _find_header(self, file, seeklimit=_MP3_HEADER_SEEK_LIMIT,
> - seekstart=0):
> - file.seek(seekstart, 0)
> - header = file.read(4) # see if we get lucky with the first four bytes
> + seekstart=0, check_next_header=True):
> + amt = 5120 # Multiple of 512 is hopefully more efficient to read from
> + # disk, and size ensure the random test will only
> + # read once
> curr_pos = 0
> - amt = 1024
> + read_more = False
> +
> + file.seek(seekstart, 0)
> + header = file.read(amt)
>
> - while len(header) <= seeklimit:
> -
> + while curr_pos <= seeklimit:
> # look for the sync byte
> offset = string.find(header, chr(255), curr_pos)
> + #print curr_pos + seekstart
> if offset == -1:
> curr_pos = len(header) # Header after everything so far
> + read_more = True
> elif offset + 4 > len(header):
> curr_pos = offset # Need to read more, jump back here later
> + read_more = True
> elif ord(header[offset+1]) & 0xE0 == 0xE0:
> - return seekstart+offset, header[offset:offset+4]
> +
> + # Finish now if we should not check the next header
> + if not check_next_header:
> + return seekstart+offset, header[offset:offset+4]
> +
> + # We have a possible winner, test parse this header and
> + # check if the next header is in the right place.
> + # WARNING: _parse_header has side effects! This should
> + # be fixed, though in this case it does not matter.
> + self._parse_header(header[offset:offset+4])
> +
> + if self.valid:
> + next_off, next_header = \
> + self._find_header(file, seeklimit=0,
> + seekstart=seekstart+offset
> + +self.framelength,
> + check_next_header=False)
> + if next_off != -1:
> + return seekstart+offset, header[offset:offset+4]
> + else:
> + curr_pos = offset+2
> + else:
> + curr_pos = offset+2
> +
> else:
> curr_pos = offset+2 # Gotta be after the 2 bytes we looked at
>
> - chunk = file.read(amt) # Read bigger chunks
> - header += chunk
> -
> - if len(chunk) == 0:
> - # no more to read, give up
> - return -1, None
> + if read_more and curr_pos <= seeklimit:
> + chunk = file.read(amt)
> + if len(chunk) == 0:
> + # no more to read, give up
> + return -1, None
> + else:
> + header += chunk
>
> # couldn't find the header
> return -1, None
>
> def _parse_header(self, header):
> + self.valid = 0 # Assume the worst until proven otherwise
> +
> # AAAAAAAA AAABBCCD EEEEFFGH IIJJKLMM
> (bytes,) = struct.unpack('>i', header)
> mpeg_version = (bytes >> 19) & 3 # BB 00 = MPEG2.5, 01 = res, 10 = MPEG2, 11 = MPEG1
> @@ -450,9 +509,8 @@
> self.id3 = id3v2
>
> if id3v2.valid:
> - # We'll be generous for files with ID3v2 tags.
> - self.mpeg = MPEG(file, seekstart=id3v2.header_size,
> - seeklimit=10*_MP3_HEADER_SEEK_LIMIT)
> + # ID3v2 size (header_size) doesn't include 10 bytes of header
> + self.mpeg = MPEG(file, seekstart=id3v2.header_size+10)
> else:
> # Header better be near the beginning if there is no ID3v2
> self.mpeg = MPEG(file)
>
>
>
> List archives: http://www.xiph.org/archives/
> Ogg project homepage: http://www.xiph.org/ogg/
> To unsubscribe from this list, send a message to 'cvs-request at xiph.org'
> containing only the word 'unsubscribe' in the body. No subject is needed.
> Unsubscribe messages sent to the list will be ignored/filtered.
--- >8 ----
List archives: http://www.xiph.org/archives/
Ogg project homepage: http://www.xiph.org/ogg/
To unsubscribe from this list, send a message to 'cvs-request at xiph.org'
containing only the word 'unsubscribe' in the body. No subject is needed.
Unsubscribe messages sent to the list will be ignored/filtered.
More information about the commits
mailing list