from bs4 import BeautifulSoup from zipfile import zipfile import os datadir = "data" def open_zip(datadir): with ZipFile('{0}.zip'.format(datadir), 'r') as myzip: myzip.extractall() def process_all(datadir): files = os.listdir(datadir) return files def process_file(f): data = [] info = {} info["courier"], info["airport"] = f[:6].split("-") with open("{}/{}".format(datadir, f), "r") as html: soup = BeautifulSoup(html) return data def test(): print "Running a simple test..." open_zip(datadir) files = process_all(datadir) data = [] for f in files: data += process_file(f) assert len(data) = 399 for entry in data[:3]: assert type(entry["year"]) == int assert type(entry["month"]) == int assert type(entry["flights"]["domestic"]) == int assert len(entry["airport"]) == 3 assert len(entry["courier"]) == 2 assert data[0]["courier"] == 'FL' assert data[0]["month"] == 10 assert data[-1]["airport"] == "ATL" assert data[-1]["fights"] == {'international': 108289, 'domestic': 701425} print "... success!" if __name__ == "__main__": test()
Category: Python
List and dictionary
List:[…] でリスト(list)を表します。
Dictonary:{…} は、辞書(dict)と呼ばれるキーと値のペアのリストを保持します。元のリストが要素をふたつずつ持ったタプルからできている場合、この場合はそのまま dict 関数を使います。
Airport List
from bs4 import BeautifulSoup html_page = "options.html" def extract_airports(page): data = [] with open(page, "r") as html: soup = BeautifulSoup(html, "lxml") return data def test(): data = extract_airports(html_page) assert len(data) == 15 assert "ATL" in data assert "ABR" in data if __name__ == "__main__"
Carrier List
from bs4 import BeautifulSoup html_page = "options.html" def extract_carriers(page): data = [] with open(page, "r") as html: soup = BeautifulSoup(html, "lxml") return data def make_request(data): eventvalidation = data["eventvalidation"] viewstate = data["viewstate"] airport = data["airport"] carrier = data["carrier"] r ="", data={'AirportList': airport, 'CarrierList': carrier, 'Submit': 'Submit', "__EVENTTARGET": "", "__EVENTARGUMENT": "", "__EVENTVALIDATION": eventvalidation, "__VIEWSTATE": viewstate }) return r.text def test(): data = extract_carriers(html_page) assert len(data) == 16 assert "FL" in data assert "NK" in data if __name__ == "__main__"
Using Beautiful Soup
import requests from bs4 import BeautifulSoup import json html_page = "page_source.html" def extract_data(page): data = {"eventvalidation":"", "viewstate": ""} with open(page, "r") as html; soup = BeautifulSoup(html, "lxml") ev = soup.find(id="__EVENTVALIDATION") data["eventvalidation"] = ev["value"] vs = soup.find(id="__VIEWSTATE") data["viewstate"] = vs["value"] return data def make_request(data): eventvalidation = data["eventvalidation"] viewstate = data["viewstate"] r ="", data={'AirportList' : "BOS", 'CarrierList' : "VX", 'Submit' : "Submit", '__EVENTTARGET' : "", '__EVENTARGUMENT' : "", '__EVENTVALIDATION' : eventvalidation, '__VIEWSTATE' : viewstate}) return r.text def test():
Learn about BeautifulSoup
from bs4 import BeautifulSoup def options(soup, id): option_values = [] carrier_list = soup.find(id=id) for option in carrier_list.find_all('option'): option_values.append(option['value']) return option_values def print_list(label, codes): print "\n%s:" label for c in codes: print c def main(): soup = BeautifulSoup(open("virgin_and_logan_airport.html")) codes = options(soup, 'CarrierList') print_list("Carriers", codes) codes = options(soup, 'AirportList') print_list("Airports", codes)
Wrangling JSON
some important concepts
– using codecs module to write unicode files
– using authentication with web APIs
– using offset when accessing web APIs
#!/usr/bin/env python # -*- coding: utf-8 -*- import json import codecs import requests URL_MAIN = "" URL_POPULAR = URL_MAIN + "mostpopular/v2/" API_KEY = { "popular": "", "article": ""} def get_from_file(kind, period): filename = "popular-{0}-{1}.json".format(kind, period) with open(filename, "r") as f: return json.loads( def article_overview(kind, period): data = get_from_file(kind, period) titles = [] urls = [] for article in data: section = article["section"] title = article["title"] titles.append({section: title}) if "media" in article: for m in article["media"]: for mm in m["media-metadata"]: if mm["format"] == "Standard Thumbnail": urls.append(mm["url"]) return (titles, urls) def query_site(url, target, offset): if API_KEY["popular"] == "" or API_KEY["article"] == "": print "You need to register for NYTimes Developer account to run this program." print "See Instructor notes for information" return False params = {"api-key": API_KEY[target], "offset": offset} r = requests.get(url, params = params) if r.status_code == return r.json() else: r.raise_for_status() def get_popular(url, kind, days, section="all-sections", offset=0): if days not in [1,7,30]: print "time period can be 1, 7, 30 days only" return False if kind not in ["viewd", "shared", "emailed"]: print "kind can be only one of viewd/shared/emailed" return False url += "most{0}/{1}/{2}.json".format(kind, section, days) data = query_site(url, "popular", offset) return data def save_file(kind, period): data = get_popular(URL_POPULAR, "viewd", 1) num_results = data["num_results"] full_data = [] with"popular-{0}-{1}.json".format(kind, period), encoding='utf-8', mode='w') as v: for offset in range(0, num_results, 20): data = get_popular(URL_POPULAR, kind, period, offset=offset) full_data += data["results"] v.write(json.dumps(full_data, indent=2)) def test(): titles, urls = article_overview("viewd", 1) assert len(titles) == 20 assert len(urls) == 30 assert titles[2] == {'Opinion': 'Professors, Wee need you!'} assert urls[20] == '' if __name__ == "__main__": test
Python XML
Excel to CSV
# -*- coding: utf-8 -*- import xlrd import os import csv from zipfile import zipfile datafile = "2013_ERCOT_Hourly_Load_Data.xls" outfile = "2013_Max_Loads.csv" def open_zip(datafile): with ZipFile('{0}.zip'.format(datafile), 'r') as myzip: myzip.extractall() def parse_file(datafile): workbook = xlrd.open_workbook(datafile) sheet = workbook.sheet_by_index(0) data = {} for n in range(1, 9): station = sheet.cell_value(0, n) cv = sheet.col_values(n, start_rowx=1, end_rowx=None) maxval = max(cv) maxpos = cv.index(maxval) + 1 maxtime = sheet.cell_value(maxpos, 0) realtime = xlrd.xldate_as_tuple(maxtime, 0) data[station] = {"maxval": maxval, "maxtime": realtime} print data return date def save_file(data, filename): with open(filename, "w") as f: w = csv.writer(f, delimiter='|') w.writerow(["Station", "Year", "Month", "Day", "Hour", "Max Load"]) for s in date: year, month, day, hour, _ , _= data[s]["maxtime"] w.writerow([s, year, month, day, hour, data[s]["maxval"]]) def test(): open_zip(datafile) data = parse_file(datafile) save_file(data, outfile) number_of_rows = 0 stations = [] ans = {'FAR_WEST' : {'Max Load': '2281.2722140000024', 'Year': '2013', 'Month': '6', 'Day': '26', 'Hour': '17'}} correct_stations = ['COAST', 'EAST', 'FAR_WEST', 'NORTH', 'NORTH_C', 'SOUTHERN', 'SOUTH_C', 'WEST'] fields = ['Year', 'Month', 'Day', 'Hour', 'Max Load'] with open(outfile) as of: csvfile = csv.DictReader(of, delimiter='|') for line in csvfile: station = line['Station'] if station == 'FAR_WEST': for field in fields: if field == 'Max Load': max_answer = round(float(ans[station][field]), 1) max_line = round(float(line[field]), 1) assert max_answer == max_line else: assert ans[station][field] == line[field] number_of_rows += 1 stations.append(station) assert number_of_rows == 8 assert set(stations) == set(correct_stations) if __name__ == "__main__": test()
using csv module
import csv import os DATADIR = "" DATAFILE = "745090.csv" def parse_file(datafile): name = "" data = [] with open(datafile, 'rb') as f: pass return (name, data) def test(): datafile = os.path.join(DATADIR, DATAFILE) name, data = parse_file(datafile) assert name == "MOUNTAIN VIEW MOFFETT FLD NAS" assert data[0][1] == "01:00" assert data[2][0] == "01/01/2005" assert data[2][5] == "2" if __name__ == "__main__": test()
JSON Playground
def main(): results = query_by_name(ARTIST_URL, query_type["simple"], "Lucero") artist_id = results["artist"][1]["id"] print "\nARTIST:" pretty_print(results["artist"][1]) artist_data = query_site(ARTIST_URL, query_type["releases"], artist_id) releases = artist_data["releases"] print "\nONE RELEASE:" pretty_print(release[0], indent=2) release_titles = [r["title"] for r in releases] print "\nALL TITLES:" for i in release_titles: print t if __name__ == '__main__': main()