from bs4 import BeautifulSoup html_page = "options.html" def extract_airports(page): data = [] with open(page, "r") as html: soup = BeautifulSoup(html, "lxml") return data def test(): data = extract_airports(html_page) assert len(data) == 15 assert "ATL" in data assert "ABR" in data if __name__ == "__main__"
Carrier List
from bs4 import BeautifulSoup
html_page = "options.html"
def extract_carriers(page):
data = []
with open(page, "r") as html:
soup = BeautifulSoup(html, "lxml")
return data
def make_request(data):
eventvalidation = data["eventvalidation"]
viewstate = data["viewstate"]
airport = data["airport"]
carrier = data["carrier"]
r = requests.post("http://www.transtats.bts.gov/Data_Elements.aspx?Data=2",
data={'AirportList': airport,
'CarrierList': carrier,
'Submit': 'Submit',
"__EVENTTARGET": "",
"__EVENTARGUMENT": "",
"__EVENTVALIDATION": eventvalidation,
"__VIEWSTATE": viewstate
})
return r.text
def test():
data = extract_carriers(html_page)
assert len(data) == 16
assert "FL" in data
assert "NK" in data
if __name__ == "__main__"
Scraping solution
from bs4 import BeautifulSoup
s = requests.Session()
r = s.get("http://www.transtats.bts.gov/Data_Elements.aspx?Data=2")
soup = BeautifulSoup(r.text)
viewstate_element = soup.find(id="__VIEWSTATE")
viewstate = viewstate_element["value"]
eventvalidation_element = soup.find(id="__EVENTVALIDATION")
eventvalidation = eventvalidation_element["value"]
r = s.post("http://www.transtats.bts.gov/Data_Elements.aspx?Data=2",
data={'AirportList' : "BOS",
'CarrierList' : "VX",
'Submit' : "Submit",
'__EVENTTARGET' : "",
'__EVENTVALIDATION' : eventvalidation,
'__VIEWSTATE' : viewstate})
f = open("virgin_and_logan_airport.html", "w")
f.write(r.text)
Using Beautiful Soup
import requests
from bs4 import BeautifulSoup
import json
html_page = "page_source.html"
def extract_data(page):
data = {"eventvalidation":"",
"viewstate": ""}
with open(page, "r") as html;
soup = BeautifulSoup(html, "lxml")
ev = soup.find(id="__EVENTVALIDATION")
data["eventvalidation"] = ev["value"]
vs = soup.find(id="__VIEWSTATE")
data["viewstate"] = vs["value"]
return data
def make_request(data):
eventvalidation = data["eventvalidation"]
viewstate = data["viewstate"]
r = requests.post("http://www.transtats.bts.gov/Data_Elements.aspx?Data=2",
data={'AirportList' : "BOS",
'CarrierList' : "VX",
'Submit' : "Submit",
'__EVENTTARGET' : "",
'__EVENTARGUMENT' : "",
'__EVENTVALIDATION' : eventvalidation,
'__VIEWSTATE' : viewstate})
return r.text
def test():
BeautifulSoup
Learn about BeautifulSoup
https://www.crummy.com/software/BeautifulSoup/bs4/doc/
from bs4 import BeautifulSoup
def options(soup, id):
option_values = []
carrier_list = soup.find(id=id)
for option in carrier_list.find_all('option'):
option_values.append(option['value'])
return option_values
def print_list(label, codes):
print "\n%s:" label
for c in codes:
print c
def main():
soup = BeautifulSoup(open("virgin_and_logan_airport.html"))
codes = options(soup, 'CarrierList')
print_list("Carriers", codes)
codes = options(soup, 'AirportList')
print_list("Airports", codes)
Extracting xml Data
import xml.etree.ElementTree as ET
article_file = "exampleResearchArticle.xml"
def get_root(fname)
tree = ET.parse(fname)
return tree.getroot()
def get_authors(root):
authors = []
for author in root.findall('./fm/bibl/aug/au'):
data = {
"fnm": None,
"snm": None,
"email": None
}
data["fnm"] = author.find('./fnm').text
data["snm"] = author.find('./snm').text
data["email"] = author.find('./email').text
authors.append(data)
return authors
def test():
solution = [{'fnm': 'Omer', 'snm': 'Mei-Dan', 'email': 'omer@extremegate.com'}, {'fnm': 'Mike', 'snm': 'Carmont', 'email': 'mcarmont@hotmail.com'}, {'fnm': 'Lior', 'snm': 'Laver', 'email': 'laver17@gmail.com'}, {'fnm': 'Meir', 'snm': 'Nyska', 'email': 'nyska@internet-zahav.net'}, {'fnm': 'Hagay', 'snm': 'Kammar', 'email': 'kammarh@gmail.com'}, {'fnm': 'Gideon', 'snm': 'Mann', 'email': 'gideon.mann.md@gmail.com'}, {'fnm': 'Barnaby', 'snm': 'Clarck', 'email': 'barns.nz@gmail.com'}, {'fnm': 'Eugene', 'snm': 'Kots', 'email': 'eukots@gmail.com'}]
root = get_root(article_file)
data = get_authors(root)
assert data[0] == solution[0]
assert data[1]["fnm"] == solution[1]["fnm"]
Parsing XML
import xml.etree.ElementTree as ET
import pprint
tree = ET.parse('exampleResearchArticle.xml')
root = tree.getroot()
print "\nChildren of root:"
for child in root:
print child.tag
import xml.etree.ElementTree as ET
import pprint
tree = ET.parse('exampleResearchArticle.xml')
root = tree.getroot()
title = root.find('./fm/bibl/title')
title_text = ""
for p in title:
title_text += p.text
print "\nTitle:\n", title_text
print "\nAuthor email addresses:"
for a in root.findall('./fm/bibl/aug/au'):
email = a.find('email')
if email is not None:
print email.text
Wrangling JSON
some important concepts
– using codecs module to write unicode files
– using authentication with web APIs
– using offset when accessing web APIs
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import codecs
import requests
URL_MAIN = "http://api.nytimes.com/svc/"
URL_POPULAR = URL_MAIN + "mostpopular/v2/"
API_KEY = { "popular": "",
"article": ""}
def get_from_file(kind, period):
filename = "popular-{0}-{1}.json".format(kind, period)
with open(filename, "r") as f:
return json.loads(f.read())
def article_overview(kind, period):
data = get_from_file(kind, period)
titles = []
urls = []
for article in data:
section = article["section"]
title = article["title"]
titles.append({section: title})
if "media" in article:
for m in article["media"]:
for mm in m["media-metadata"]:
if mm["format"] == "Standard Thumbnail":
urls.append(mm["url"])
return (titles, urls)
def query_site(url, target, offset):
if API_KEY["popular"] == "" or API_KEY["article"] == "":
print "You need to register for NYTimes Developer account to run this program."
print "See Instructor notes for information"
return False
params = {"api-key": API_KEY[target], "offset": offset}
r = requests.get(url, params = params)
if r.status_code == requests.codes.ok:
return r.json()
else:
r.raise_for_status()
def get_popular(url, kind, days, section="all-sections", offset=0):
if days not in [1,7,30]:
print "time period can be 1, 7, 30 days only"
return False
if kind not in ["viewd", "shared", "emailed"]:
print "kind can be only one of viewd/shared/emailed"
return False
url += "most{0}/{1}/{2}.json".format(kind, section, days)
data = query_site(url, "popular", offset)
return data
def save_file(kind, period):
data = get_popular(URL_POPULAR, "viewd", 1)
num_results = data["num_results"]
full_data = []
with codecs.open("popular-{0}-{1}.json".format(kind, period), encoding='utf-8', mode='w') as v:
for offset in range(0, num_results, 20):
data = get_popular(URL_POPULAR, kind, period, offset=offset)
full_data += data["results"]
v.write(json.dumps(full_data, indent=2))
def test():
titles, urls = article_overview("viewd", 1)
assert len(titles) == 20
assert len(urls) == 30
assert titles[2] == {'Opinion': 'Professors, Wee need you!'}
assert urls[20] == 'http://graphics8.nytimes.com/images/2014/02/17/sports/ICEDANCE/ICEDANCE-thumbStandard.jpg'
if __name__ == "__main__":
test
Python XML
https://wiki.python.org/moin/PythonXml
Excel to CSV
# -*- coding: utf-8 -*-
import xlrd
import os
import csv
from zipfile import zipfile
datafile = "2013_ERCOT_Hourly_Load_Data.xls"
outfile = "2013_Max_Loads.csv"
def open_zip(datafile):
with ZipFile('{0}.zip'.format(datafile), 'r') as myzip:
myzip.extractall()
def parse_file(datafile):
workbook = xlrd.open_workbook(datafile)
sheet = workbook.sheet_by_index(0)
data = {}
for n in range(1, 9):
station = sheet.cell_value(0, n)
cv = sheet.col_values(n, start_rowx=1, end_rowx=None)
maxval = max(cv)
maxpos = cv.index(maxval) + 1
maxtime = sheet.cell_value(maxpos, 0)
realtime = xlrd.xldate_as_tuple(maxtime, 0)
data[station] = {"maxval": maxval,
"maxtime": realtime}
print data
return date
def save_file(data, filename):
with open(filename, "w") as f:
w = csv.writer(f, delimiter='|')
w.writerow(["Station", "Year", "Month", "Day", "Hour", "Max Load"])
for s in date:
year, month, day, hour, _ , _= data[s]["maxtime"]
w.writerow([s, year, month, day, hour, data[s]["maxval"]])
def test():
open_zip(datafile)
data = parse_file(datafile)
save_file(data, outfile)
number_of_rows = 0
stations = []
ans = {'FAR_WEST' : {'Max Load': '2281.2722140000024',
'Year': '2013',
'Month': '6',
'Day': '26',
'Hour': '17'}}
correct_stations = ['COAST', 'EAST', 'FAR_WEST', 'NORTH',
'NORTH_C', 'SOUTHERN', 'SOUTH_C', 'WEST']
fields = ['Year', 'Month', 'Day', 'Hour', 'Max Load']
with open(outfile) as of:
csvfile = csv.DictReader(of, delimiter='|')
for line in csvfile:
station = line['Station']
if station == 'FAR_WEST':
for field in fields:
if field == 'Max Load':
max_answer = round(float(ans[station][field]), 1)
max_line = round(float(line[field]), 1)
assert max_answer == max_line
else:
assert ans[station][field] == line[field]
number_of_rows += 1
stations.append(station)
assert number_of_rows == 8
assert set(stations) == set(correct_stations)
if __name__ == "__main__":
test()
using csv module
import csv import os DATADIR = "" DATAFILE = "745090.csv" def parse_file(datafile): name = "" data = [] with open(datafile, 'rb') as f: pass return (name, data) def test(): datafile = os.path.join(DATADIR, DATAFILE) name, data = parse_file(datafile) assert name == "MOUNTAIN VIEW MOFFETT FLD NAS" assert data[0][1] == "01:00" assert data[2][0] == "01/01/2005" assert data[2][5] == "2" if __name__ == "__main__": test()