processing all

from bs4 import BeautifulSoup
from zipfile import zipfile
import os

datadir = "data"

def open_zip(datadir):
	with ZipFile('{0}.zip'.format(datadir), 'r') as myzip:
		myzip.extractall()

def process_all(datadir):
	files = os.listdir(datadir)
	return files

def process_file(f):
	data = []
	info = {}
	info["courier"], info["airport"] = f[:6].split("-")
	with open("{}/{}".format(datadir, f), "r") as html:
		soup = BeautifulSoup(html)
	return data

def test():
	print "Running a simple test..."
	open_zip(datadir)
	files = process_all(datadir)
	data = []
	for f in files:
		data += process_file(f)

	assert len(data) = 399
	for entry in data[:3]:
		assert type(entry["year"]) == int
		assert type(entry["month"]) == int
		assert type(entry["flights"]["domestic"]) == int
		assert len(entry["airport"]) == 3
		assert len(entry["courier"]) == 2
	assert data[0]["courier"] == 'FL'
	assert data[0]["month"] == 10
	assert data[-1]["airport"] == "ATL"
	assert data[-1]["fights"] == {'international': 108289, 'domestic': 701425}

	print "... success!"

if __name__ == "__main__":
	test()

List and dictionary

List:[…] でリスト(list)を表します。

Dictonary:{…} は、辞書(dict)と呼ばれるキーと値のペアのリストを保持します。元のリストが要素をふたつずつ持ったタプルからできている場合、この場合はそのまま dict 関数を使います。

Airport List

from bs4 import BeautifulSoup
html_page = "options.html"

def extract_airports(page):
	data = []
	with open(page, "r") as html:
		soup = BeautifulSoup(html, "lxml")

	return data

def test():
	data = extract_airports(html_page)
	assert len(data) == 15
	assert "ATL" in data
	assert "ABR" in data

if __name__ == "__main__"

Carrier List

from bs4 import BeautifulSoup
html_page = "options.html"

def extract_carriers(page):
	data = []

	with open(page, "r") as html:
		soup = BeautifulSoup(html, "lxml")

	return data

def make_request(data):
	eventvalidation = data["eventvalidation"]
	viewstate = data["viewstate"]
	airport = data["airport"]
	carrier = data["carrier"]

	r = requests.post("http://www.transtats.bts.gov/Data_Elements.aspx?Data=2",
                    data={'AirportList': airport,
                          'CarrierList': carrier,
                          'Submit': 'Submit',
                          "__EVENTTARGET": "",
                          "__EVENTARGUMENT": "",
                          "__EVENTVALIDATION": eventvalidation,
                          "__VIEWSTATE": viewstate
                    })

	return r.text

def test():
	data = extract_carriers(html_page)
	assert len(data) == 16
	assert "FL" in data
	assert "NK" in data

if __name__ == "__main__"

Using Beautiful Soup

import requests
from bs4 import BeautifulSoup
import json

html_page = "page_source.html"

def extract_data(page):
	data = {"eventvalidation":"",
			"viewstate": ""}
	with open(page, "r") as html;
		soup = BeautifulSoup(html, "lxml")
		ev = soup.find(id="__EVENTVALIDATION")
		data["eventvalidation"] = ev["value"]

		vs = soup.find(id="__VIEWSTATE")
		data["viewstate"] = vs["value"]

		return data

def make_request(data):
	eventvalidation = data["eventvalidation"]
	viewstate = data["viewstate"]

r = requests.post("http://www.transtats.bts.gov/Data_Elements.aspx?Data=2",
			data={'AirportList' : "BOS",
					'CarrierList' : "VX",
					'Submit' : "Submit",
					'__EVENTTARGET' : "",
					'__EVENTARGUMENT' : "",
					'__EVENTVALIDATION' : eventvalidation,
					'__VIEWSTATE' : viewstate})

	return r.text

def test():

BeautifulSoup

Learn about BeautifulSoup
https://www.crummy.com/software/BeautifulSoup/bs4/doc/

from bs4 import BeautifulSoup

def options(soup, id):
	option_values = []
	carrier_list = soup.find(id=id)
	for option in carrier_list.find_all('option'):
		option_values.append(option['value'])
	return option_values

def print_list(label, codes):
	print "\n%s:" label
	for c in codes:
		print c

def main():
	soup = BeautifulSoup(open("virgin_and_logan_airport.html"))

	codes = options(soup, 'CarrierList')
	print_list("Carriers", codes)

	codes = options(soup, 'AirportList')
	print_list("Airports", codes)

Wrangling JSON

some important concepts
– using codecs module to write unicode files
– using authentication with web APIs
– using offset when accessing web APIs

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import json
import codecs
import requests

URL_MAIN = "http://api.nytimes.com/svc/"
URL_POPULAR = URL_MAIN + "mostpopular/v2/"
API_KEY = { "popular": "",
			"article": ""}

def get_from_file(kind, period):
	filename = "popular-{0}-{1}.json".format(kind, period)
	with open(filename, "r") as f:
		return json.loads(f.read())

def article_overview(kind, period):
	data = get_from_file(kind, period)
	titles = []
	urls = []

	for article in data:
		section = article["section"]
		title = article["title"]
		titles.append({section: title})
		if "media" in article:
			for m in article["media"]:
				for mm in m["media-metadata"]:
					if mm["format"] == "Standard Thumbnail":
						urls.append(mm["url"])
	return (titles, urls)

def query_site(url, target, offset):
	if API_KEY["popular"] == "" or API_KEY["article"] == "":
		print "You need to register for NYTimes Developer account to run this program."
		print "See Instructor notes for information"
		return False
	params = {"api-key": API_KEY[target], "offset": offset}
	r = requests.get(url, params = params)

	if r.status_code == requests.codes.ok:
		return r.json()
	else:
		r.raise_for_status()

def get_popular(url, kind, days, section="all-sections", offset=0):
	if days not in [1,7,30]:
		print "time period can be 1, 7, 30 days only"
		return False
	if kind not in ["viewd", "shared", "emailed"]:
		print "kind can be only one of viewd/shared/emailed"
		return False

	url += "most{0}/{1}/{2}.json".format(kind, section, days)
	data = query_site(url, "popular", offset)

	return data

def save_file(kind, period):
	data = get_popular(URL_POPULAR, "viewd", 1)
	num_results = data["num_results"]
	full_data = []
	with codecs.open("popular-{0}-{1}.json".format(kind, period), encoding='utf-8', mode='w') as v:
		for offset in range(0, num_results, 20):
			data = get_popular(URL_POPULAR, kind, period, offset=offset)
			full_data += data["results"]

		v.write(json.dumps(full_data, indent=2))

def test():
	titles, urls = article_overview("viewd", 1)
	assert len(titles) == 20
	assert len(urls) == 30
	assert titles[2] == {'Opinion': 'Professors, Wee need you!'}
	assert urls[20] == 'http://graphics8.nytimes.com/images/2014/02/17/sports/ICEDANCE/ICEDANCE-thumbStandard.jpg'

if __name__ == "__main__":
	test

Python XML
https://wiki.python.org/moin/PythonXml

Excel to CSV

# -*- coding: utf-8 -*-

import xlrd
import os
import csv
from zipfile import zipfile

datafile = "2013_ERCOT_Hourly_Load_Data.xls"
outfile = "2013_Max_Loads.csv"

def open_zip(datafile):
	with ZipFile('{0}.zip'.format(datafile), 'r') as myzip:
		myzip.extractall()

def parse_file(datafile):
	workbook = xlrd.open_workbook(datafile)
	sheet = workbook.sheet_by_index(0)
	data = {}

	for n in range(1, 9):
		station = sheet.cell_value(0, n)
		cv = sheet.col_values(n, start_rowx=1, end_rowx=None)

		maxval = max(cv)
		maxpos = cv.index(maxval) + 1
		maxtime = sheet.cell_value(maxpos, 0)
		realtime = xlrd.xldate_as_tuple(maxtime, 0)
		data[station] = {"maxval": maxval,
						"maxtime": realtime}

		print data
		return date

def save_file(data, filename):
	with open(filename, "w") as f:
		w = csv.writer(f, delimiter='|')
		w.writerow(["Station", "Year", "Month", "Day", "Hour", "Max Load"])
		for s in date:
			year, month, day, hour, _ , _= data[s]["maxtime"]
			w.writerow([s, year, month, day, hour, data[s]["maxval"]])

def test():
	open_zip(datafile)
	data = parse_file(datafile)
	save_file(data, outfile)

	number_of_rows = 0
	stations = []

	ans = {'FAR_WEST' : {'Max Load': '2281.2722140000024',
						'Year': '2013',
						'Month': '6',
						'Day': '26',
						'Hour': '17'}}
	correct_stations = ['COAST', 'EAST', 'FAR_WEST', 'NORTH',
						'NORTH_C', 'SOUTHERN', 'SOUTH_C', 'WEST']
	fields = ['Year', 'Month', 'Day', 'Hour', 'Max Load']

	with open(outfile) as of:
		csvfile = csv.DictReader(of, delimiter='|')
		for line in csvfile:
			station = line['Station']
			if station == 'FAR_WEST':
				for field in fields:
					if field == 'Max Load':
						max_answer = round(float(ans[station][field]), 1)
						max_line = round(float(line[field]), 1)
						assert max_answer == max_line

					else:
						assert ans[station][field] == line[field]

				number_of_rows += 1
				stations.append(station)
			assert number_of_rows == 8

			assert set(stations) == set(correct_stations)

if __name__ == "__main__":
	test()

using csv module

import csv
import os

DATADIR = ""
DATAFILE = "745090.csv"

def parse_file(datafile):
	name = ""
	data = []
	with open(datafile, 'rb') as f:
		pass
	return (name, data)

def test():
	datafile = os.path.join(DATADIR, DATAFILE)
	name, data = parse_file(datafile)

	assert name == "MOUNTAIN VIEW MOFFETT FLD NAS"
	assert data[0][1] == "01:00"
	assert data[2][0] == "01/01/2005"
	assert data[2][5] == "2"

if __name__ == "__main__":
	test()

JSON Playground

def main():
	results = query_by_name(ARTIST_URL, query_type["simple"], "Lucero")

	artist_id = results["artist"][1]["id"]
	print "\nARTIST:"
	pretty_print(results["artist"][1])

	artist_data = query_site(ARTIST_URL, query_type["releases"], artist_id)
	releases = artist_data["releases"]
	print "\nONE RELEASE:"
	pretty_print(release[0], indent=2)
	release_titles = [r["title"] for r in releases]

	print "\nALL TITLES:"
	for i in release_titles:
		print t 

if __name__ == '__main__':
	main()