[icecast] Python script to automate icecast

Matt Boersma matt at sprout.org
Fri May 24 19:08:37 UTC 2002



Here's a python script I wrote that tries to manipulate a running 
icecast server with simple command-line stuff.  It works fairly well for 
most admin commands.  "kick [userid]" works but will hang until [Ctrl+C] 
probably because the server responds in a different way than what we 
expect--I'm sure I could fix this but haven't gotten around to it, so 
please feel free to use this as you will and send me improvements.

I am well aware that this is a potentially large security hole.  The 
script has the admin and oper passwords embedded in it, and anyone who can 
read it or execute it can do damage.  I only allow it to be run on the 
server machine as nobody, connecting to localhost, but since I'm posting 
it as is, I'd just like to say use it at your own risk.

Having said that, here are some of the ways this script has been useful:
- Show me all current listeners: "./icecon.py listeners"
- Show me what commands are available: "./icecon.py" or "./icecon.py help"
- Save the current stream to disk: "./icecon.py StartDump a_title"
- Automatically capture the 8 a.m. news program every weekday, using 
cron:
58  7 * * Mon-Fri su nobody -c '/usr/local/icecast/bin/icecon.py StartDump 
Morning_Magazine'
02  9 * * Mon-Fri su nobody -c '/usr/local/icecast/bin/icecon.py StopDump 
Morning_Magazine'
(I fudge it by 2 minutes on either side, then go back in later and edit 
the foo_raw.mp3 file to start and end at more exact times.)

Again, I hope this is useful to someone and I welcome any suggestions.  
Here it is:

#!/usr/bin/env python

from telnetlib import Telnet
from time import strftime

class icecon:

    def __init__(self, admin_password, oper_password, host='127.0.0.1', 
port=8000):
        self.admin_password = admin_password
        self.oper_password = oper_password
        self.host = host
        self.port = port
        self.tn = None
        self.admin = 0
        self.oper = 0

    def Connect(self):
        self.tn = Telnet(self.host, self.port)
        return (self.tn != None)

    def Status(self, toggle='off'):
        if self.MakeAdmin():
            return self.Command('status ' + toggle)

    def Disconnect(self):
        self.admin = 0
        self.oper = 0
        self.tn.close()

    def MakeAdmin(self):
        if not self.admin:
            self.tn.write("ADMIN " + self.admin_password + "\n\n")
            x = self.tn.expect(["OK\r\n->",], 30)
            if (x[0] != -1):
                self.admin = 1
                # Always turn status off.  Otherwise, the console emits
                # a heartbeat message every so often, which can mess up
                # our interactions.
                self.Status()
        return self.admin

    def MakeOper(self):
        if not self.oper:
            self.tn.write("oper " + self.oper_password + "\n")
            x = self.tn.expect(["operator\r\n>",], 30)
            if  (x[0] != -1):
                self.oper = 1
        return self.oper

    def List(self):
        return self.Command('list')

    def Help(self):
        return self.Command('help')

    def StartDump(self, title, source=0):
        if self.MakeAdmin() and self.MakeOper():
            dir = ' /usr/local/icecast/static/'
            file = dir + title + strftime('_%Y-%m-%d') + '_raw.mp3'
        return self.Command('dump ' + str(source) + file)

    def StopDump(self, source=0):
        if self.MakeAdmin() and self.MakeOper():
            return self.Command('dump ' + str(source) + ' close')

    def Command(self, command):
        if self.MakeAdmin():
            self.tn.write(command + '\n')
            x = self.tn.expect(['\r\n>',], 30)
            if (x[0] != -1):
                response = x[2]
                response = response[:-1]
            else:
                response = '<Unexpected Reply>'
            return response

if __name__ == '__main__':
    from sys import argv
    if len(argv) <= 1:
        argv.append('help')
    ice = icecon('your_admin_password', 'your_oper_password')
    if ice.Connect():
        if (argv[1] == 'StartDump'):
            ice.StartDump(argv[2])
        elif (argv[1] == 'StopDump'):
            ice.StopDump()
        elif ice.MakeAdmin() and ice.MakeOper():
            print ice.Command(' '.join(argv[1:]))
        ice.Disconnect()

<p>--- >8 ----
List archives:  http://www.xiph.org/archives/
icecast project homepage: http://www.icecast.org/
To unsubscribe from this list, send a message to 'icecast-request at xiph.org'
containing only the word 'unsubscribe' in the body.  No subject is needed.
Unsubscribe messages sent to the list will be ignored/filtered.



More information about the Icecast mailing list