Auditing Uniformity

import csv
import pprint

fieldname = "wgs84_pos#lat"
minval = -90
maxval = 90

def skip_lines(input_file, skip):
	for i in range(0, skip):
		next(input_file)

def is_number(s):
	try:
		float(s)
		return True
	except ValueError:
		return False
	else:
		v = float(v)
		if not ((minval < v) and (v < maxval)):
			print "Found out of range value:", v

if __name__ == "__main__":
	input_file = csv.DictReader(open("cities3.csv"))
	skip_lines(input_file, 3)
	counts = {"nulls"; 0, "empties" : 0, "arrays" : 0}
	nrows = 0
	for row in input_file:
		audit_float_field(row[fieldname], counts)
		nrows += 1
	print "num cities:", nrows
	print "nulls:", counts['nulls']
	print "empties:", counts['empties']
	print "arrays:", counts['arrays']

Auditing Accuracy

client = MongoClient("mongodb://localhost:27017")
db = client.examples

def skip_lines(input_file, skip):
	for i in range(0, skip):
		next(input_file)

def audit_country(input_file):
	for row in input_file:
		country = row['country_label']
		country = country.strip()
		if (country == "NULL") or (country == ""):
			continue
		if db.countires.find({ "name" ; country }).count() != 1:
			print "Not found:", country

if __name__ == '__main__':

Correcting Validity

import csv
import pprint

INPUT_FILE = 'autos.csv'
OUTPUT_GOOD = 'autos-valid.csv'
OUTPUT_BAD = 'FIXME-autos.csv'

def process_file(input_file, output_good, output_bad):

	with open(input_file, "r") as f:
		reader = csv.DictReader(f)
		header = reader.fieldnames

	with open(output_good, "w") as g:
		writer = csv.DictWriter(g, delimiter=",", fieldnames = header)
		writer.writeheader()
		for row in YOURDATA:
			writer.writerow(row)

	def test():
		process_file(INPUT_FILE, OUTPUT_GOOD, OUTPUT_BAD)

	if __name__ == "__main__":

Population density

def ensure_float(v):
	if is_number(v):
		return float(v)

def audit_population_density(input_file):
	for row in input_file:
		population = ensure_float(row['populationTotal'])
		area = ensure_float(row['areaLand'])
		population_density = ensure_float(row['populationDensity'])
		if population and area and population_density:
			calculated_density = population / area
			if math.fabs(calculated_density - population_density) > 10:
				print "Possibly bad population density for ", row['name']

if __name__ == '__main__':
	input_file = csv.DictReader(open("cities.csv"))
	skip_lines(input_file, 3)
	audit_population_density(input_file)

Using blue print

import xml.etree.cElementTree as ET
from collections import defaultdict
import re

osm_file = open("chicago_abbrev.osm", "r")

street_type_re = re.compile(r'\S+\.?$', re.IGNORECASE)
street_types = defaultdict(int)

def audit_street_type(street_types, street_name):
	m = street_type_re.search(street_name)
	if m:
		street_type = m.group()
		street_types[street_type] += 1

def print_sorted_dict(d):
	keys = d.keys()
	keys = sorted(keys, key=lambda s: s.lower())
	for k in keys:

def is_street_name(elem):
	return (elem.tag == "tag") and (elem.attrib['k'] == "addr:street")

def audit():
	for event, elem in ET.iterparse(osm_file):
		if is_street_name(elem):
			audit_street_type(street_types, elem.attrib['v'])
	print_sorted_dict(street_types)

if __name__ == '__main__':
	audit()

Processing Patents

import xml.etree.ElementTree as ET
PATENTS = ‘patent.data’

def get_root(fname):
tree = ET.parse(fname)
return tree.getroot()

def split_file(filename):
pass

def test():
split_file(PATENTS)
for n in range(4):
try:
fname = “{}-{}”.format(PATENTS, n)
f = open(fname, “r”)
if not f.readline().startswith(“

processing all

from bs4 import BeautifulSoup
from zipfile import zipfile
import os

datadir = "data"

def open_zip(datadir):
	with ZipFile('{0}.zip'.format(datadir), 'r') as myzip:
		myzip.extractall()

def process_all(datadir):
	files = os.listdir(datadir)
	return files

def process_file(f):
	data = []
	info = {}
	info["courier"], info["airport"] = f[:6].split("-")
	with open("{}/{}".format(datadir, f), "r") as html:
		soup = BeautifulSoup(html)
	return data

def test():
	print "Running a simple test..."
	open_zip(datadir)
	files = process_all(datadir)
	data = []
	for f in files:
		data += process_file(f)

	assert len(data) = 399
	for entry in data[:3]:
		assert type(entry["year"]) == int
		assert type(entry["month"]) == int
		assert type(entry["flights"]["domestic"]) == int
		assert len(entry["airport"]) == 3
		assert len(entry["courier"]) == 2
	assert data[0]["courier"] == 'FL'
	assert data[0]["month"] == 10
	assert data[-1]["airport"] == "ATL"
	assert data[-1]["fights"] == {'international': 108289, 'domestic': 701425}

	print "... success!"

if __name__ == "__main__":
	test()

List and dictionary

List:[…] でリスト(list)を表します。

Dictonary:{…} は、辞書(dict)と呼ばれるキーと値のペアのリストを保持します。元のリストが要素をふたつずつ持ったタプルからできている場合、この場合はそのまま dict 関数を使います。

Airport List

from bs4 import BeautifulSoup
html_page = "options.html"

def extract_airports(page):
	data = []
	with open(page, "r") as html:
		soup = BeautifulSoup(html, "lxml")

	return data

def test():
	data = extract_airports(html_page)
	assert len(data) == 15
	assert "ATL" in data
	assert "ABR" in data

if __name__ == "__main__"

Carrier List

from bs4 import BeautifulSoup
html_page = "options.html"

def extract_carriers(page):
	data = []

	with open(page, "r") as html:
		soup = BeautifulSoup(html, "lxml")

	return data

def make_request(data):
	eventvalidation = data["eventvalidation"]
	viewstate = data["viewstate"]
	airport = data["airport"]
	carrier = data["carrier"]

	r = requests.post("http://www.transtats.bts.gov/Data_Elements.aspx?Data=2",
                    data={'AirportList': airport,
                          'CarrierList': carrier,
                          'Submit': 'Submit',
                          "__EVENTTARGET": "",
                          "__EVENTARGUMENT": "",
                          "__EVENTVALIDATION": eventvalidation,
                          "__VIEWSTATE": viewstate
                    })

	return r.text

def test():
	data = extract_carriers(html_page)
	assert len(data) == 16
	assert "FL" in data
	assert "NK" in data

if __name__ == "__main__"