Python API Example
import sys, json, StringIO, urllib2, inspect, urllib
try:
import pycurl
except ImportError:
#going to use urlib
useCurl == False
from types import *
class Storage:
def __init__(self):
self.contents = ''
self.line = 0
def store(self, buf):
self.line = self.line + 1
self.contents = "%s%i: %s" % (self.contents, self.line, buf)
def __str__(self):
return self.contents
class PanaceaApi:
url = "http://api.panaceamobile.com/json"
default_url = url
debugSetting = False
error = ""
username = ""
password = ""
performActionsImmediately = True
queuedActions = list()
error = False
def __init__(self,uc):
self.useCurl = uc
def __str__(self):
return "Panacea Python Api v1.0"
def debug(self,value, nl = True):
if self.debugSetting == True:
print value
if nl == True:
print "\n"
def call_api(self):
retrieved_headers = Storage()
b = StringIO.StringIO()
c = pycurl.Curl()
c.setopt(c.URL, self.url)
c.setopt(c.HEADER, 0)
c.setopt(c.WRITEFUNCTION,b.write)
c.setopt(c.HEADERFUNCTION,retrieved_headers.store)
c.setopt(c.HTTPHEADER,["Connection: Keep Alive"])
c.perform()
c.close();
output = json.loads(b.getvalue());
#do some debug
self.debug("--HTTP REQUEST TRACE--")
self.debug(retrieved_headers,False)
self.debug(b.getvalue(),False)
self.debug("--END OF REQUEST--")
return retrieved_headers, output
def call_api_urllib(self):
query = urllib.urlopen(self.url)
headers = query.info()
response = json.loads(query.read())
query.close()
#do some debug
self.debug("--HTTP REQUEST TRACE--")
self.debug(headers,False)
self.debug(response,False)
self.debug("--END OF REQUEST--")
if response != False:
return headers,response
else:
return False
def setPerformActionsImmediately(self, value = True):
if type(value) is not BooleanType:
print "Invalid type for setActionType method"
raise
else:
self.performActionsImmediately = value
def setUsername(self,value):
self.username = value
def setPassword(self,value):
self.password = value
def send_message(self,to, text, message_from = None, report_mask = None, report_url = None, charset = None, data_coding = None, message_class = None, auto_detect_encoding = None):
attributes = locals()
result = self.buildURL(attributes, 'message_send')
return result
def buildURL(self,options, action):
self.url += "?action=" + action
self.url += "&username=" + self.username
self.url += "&password=" + self.password
replace = {'message_from':'from','self':None}
for x,y in replace.iteritems():
if x in options:
if x != 'self':
options[y] = options[x]
del options[x]
for x,y in options.iteritems():
if y != None:
self.url += "&" + x + "=" + urllib2.quote(y)
if (self.performActionsImmediately == True):
if (self.useCurl == True):
result = self.call_api()
else:
result = self.call_api_urllib()
else:
if self.username == "" or self.password == "":
print "You need to specify your username and password to queue actions"
raise
self.queuedActions.append(self.url)
result = "Action queued"
self.url = self.default_url
return result
def execute_multiple(self):
if not self.queuedActions:
results = "No Actions queued"
else:
if self.useCurl == False:
results = self.execute_multiple_urllib(self)
else:
results = dict()
count = 0
for item in self.queuedActions:
retrieved_headers = Storage()
b = StringIO.StringIO()
c = pycurl.Curl()
c.setopt(c.URL, self.url)
c.setopt(c.HEADER, 0)
c.setopt(c.WRITEFUNCTION,b.write)
c.setopt(c.HEADERFUNCTION,retrieved_headers.store)
c.setopt(c.HTTPHEADER,["Connection: Keep Alive"])
c.perform()
c.close();
output = json.loads(b.getvalue());
result = retrieved_headers, b.getvalue()
results[count] = result
#do some debug
self.debug("--HTTP REQUEST TRACE--")
self.debug(retrieved_headers,False)
self.debug(b.getvalue(),False)
self.debug("--END OF REQUEST--")
return results
def execute_multiple_urllib(self):
if not self.queuedActions:
results = "No Actions queued"
else:
results = dict()
count = 0
for item in self.queuedActions:
query = urllib.urlopen(item)
headers = query.info()
response = json.loads(query.read())
query.close()
#do some debug
self.debug("--HTTP REQUEST TRACE--")
self.debug(headers,False)
self.debug(response,False)
self.debug("--END OF REQUEST--")
result = [headers,response]
results[count] = result
count+= 1
return results
def user_get_balance(self):
attributes = locals()
result = self.buildURL(attributes,'user_get_balance')
return result
def messages_get(self, last_id):
attributes = locals()
result = self.buildURL(attributes,'messages_get')
return result
def message_status(self,message_id):
attributes = locals()
result = self.buildURL(attributes,'message_status')
return result
def address_book_groups_get_list(self):
attributes = locals()
result = self.buildURL(attributes,'address_book_groups_get_list')
return result
def address_book_group_add(self, name):
attributes = locals()
result = self.buildURL(attributes,'address_book_group_add')
return result
def address_book_group_delete(self, group_id):
attributes = locals()
result = self.buildURL(attributes,'address_book_group_delete')
return result
def address_book_contacts_get_list(self, group_id):
attributes = locals()
result = self.buildURL(attributes,'address_book_contacts_get_list')
return result
def address_book_contact_add(self,group_id,phone_number, first_name = None, last_name = None):
attributes = locals()
result = self.buildURL(attributes,'address_book_contact_add')
return result
def address_book_contact_id(self,contact_id):
attributes = locals()
result = self.buildURL(attributes,'address_book_contact_delete')
return result
def address_book_contact_update(self,contact_id,phone_number = None, first_name = None, last_name = None):
attributes = locals()
result = self.buildURL(attributes,'address_book_contact_update')
return result
def batch_create(self,name,file_name,throughput = 0, filterAgainstBlockList = False, file_type='csv'):
if self.useCurl == False:
result = self.batch_create_urllib(name,file_name,throughput,filterAgainstBlockList, file_type)
else:
self.url += "?action=batch_create"
self.url += "&username=" + self.username
self.url += "&password=" + self.password
self.url += "&name=" + name
postfielddata = "data=" + urllib2.quote(open(file_name).read())
self.url += "&filter=" + str(filterAgainstBlockList)
self.url += "&file_type=" + file_type
self.url += "&throughput=" + str(throughput)
retrieved_headers = Storage()
b = StringIO.StringIO()
c = pycurl.Curl()
c.setopt(c.URL, self.url)
c.setopt(c.HEADER, 0)
c.setopt(c.WRITEFUNCTION,b.write)
c.setopt(c.HEADERFUNCTION,retrieved_headers.store)
c.setopt(c.HTTPHEADER,["Connection: Keep Alive"])
c.setopt(c.POST, 1)
c.setopt(c.POSTFIELDS, postfielddata)
c.perform()
c.close();
output = json.loads(b.getvalue());
#do some debug
self.debug("--HTTP REQUEST TRACE--")
self.debug(retrieved_headers,False)
self.debug(b.getvalue(),False)
self.debug("--END OF REQUEST--")
result = retrieved_headers,b.getvalue()
return retrieved_headers,output
self.url = self.default_url
return result
def batch_create_urllib(self,name, file_name,throughput = 0, filterAgainstBlockList = False, file_type='csv'):
print "You need to have pycurl installed in order to create batches."
raise
"""
filedata = urllib2.quote(open(file_name).read())
self.url += "?action=batch_create"
self.url += "&username=" + self.username
self.url += "&name=" + name
self.url += "&password=" + self.password
self.url += "&filter=" + str(filterAgainstBlockList)
self.url += "&file_type=" + file_type
self.url += "&throughput=" + str(throughput)
query = urllib.urlopen(self.url, data=filedata)
headers = query.info()
response = json.loads(query.read())
query.close()
#do some debug
self.debug("--HTTP REQUEST TRACE--")
self.debug(headers,False)
self.debug(response,False)
self.debug("--END OF REQUEST--")
self.url = self.default_url
return headers,response
"""
def batches_list(self):
attributes = locals()
result = self.buildURL(attributes,'batches_list')
return result
def batch_start(self,batch_id):
attributes = locals()
result = self.buildURL(attributes,'batch_start')
return result
def batch_stop(self,batch_id):
attributes = locals()
result = self.buildURL(attributes,'batch_stop')
return result
def batch_check_status(self,batch_id):
attributes = locals()
result = self.buildURL(attributes,'batch_check_status')
return result
def OK(self,result):
if type(result) is DictType:
if 'status' in result:
if result['status'] > 0:
return True
else:
self.error = result['message']
return False
elif type(result) == StringType:
return "Command Queued"
else:
return False
if __name__ == "__main__":
print "You cannot access this class directly"
raise
Last updated
Was this helpful?