Data Quality

import codecs
import csv
import json
import pprint

CITIES = 'cities.csv'

FIELDS = ["name", "timeZone_label", "utcOffset", "homepage", "governmentType_label",
          "isPartOf_label", "areaCode", "populationTotal", "elevation",
          "maximumElevation", "minimumElevation", "populationDensity",
          "wgs84_pos#lat", "wgs84_pos#long", "areaLand", "areaMetro", "areaUrban"]

def audit_file(filename, fields):
	fieldtypes = {}

	return fieldtypes

def test():
	fieldtypes = audit_file(CITIES, FIELDS)

	pprint.pprint(fieldtypes)

	assert fieldtypes["areaLand"] == set([type(1.1), type([]), type(None)])
	assert fieldtypes['areaMetro'] == set([type(1.1), type(None)])

if __name__ =="__main__":
	test()

Auditing Uniformity

import csv
import pprint

fieldname = "wgs84_pos#lat"
minval = -90
maxval = 90

def skip_lines(input_file, skip):
	for i in range(0, skip):
		next(input_file)

def is_number(s):
	try:
		float(s)
		return True
	except ValueError:
		return False
	else:
		v = float(v)
		if not ((minval < v) and (v < maxval)):
			print "Found out of range value:", v

if __name__ == "__main__":
	input_file = csv.DictReader(open("cities3.csv"))
	skip_lines(input_file, 3)
	counts = {"nulls"; 0, "empties" : 0, "arrays" : 0}
	nrows = 0
	for row in input_file:
		audit_float_field(row[fieldname], counts)
		nrows += 1
	print "num cities:", nrows
	print "nulls:", counts['nulls']
	print "empties:", counts['empties']
	print "arrays:", counts['arrays']

Auditing Accuracy

client = MongoClient("mongodb://localhost:27017")
db = client.examples

def skip_lines(input_file, skip):
	for i in range(0, skip):
		next(input_file)

def audit_country(input_file):
	for row in input_file:
		country = row['country_label']
		country = country.strip()
		if (country == "NULL") or (country == ""):
			continue
		if db.countires.find({ "name" ; country }).count() != 1:
			print "Not found:", country

if __name__ == '__main__':

Correcting Validity

import csv
import pprint

INPUT_FILE = 'autos.csv'
OUTPUT_GOOD = 'autos-valid.csv'
OUTPUT_BAD = 'FIXME-autos.csv'

def process_file(input_file, output_good, output_bad):

	with open(input_file, "r") as f:
		reader = csv.DictReader(f)
		header = reader.fieldnames

	with open(output_good, "w") as g:
		writer = csv.DictWriter(g, delimiter=",", fieldnames = header)
		writer.writeheader()
		for row in YOURDATA:
			writer.writerow(row)

	def test():
		process_file(INPUT_FILE, OUTPUT_GOOD, OUTPUT_BAD)

	if __name__ == "__main__":

Processing Patents

import xml.etree.ElementTree as ET
PATENTS = ‘patent.data’

def get_root(fname):
tree = ET.parse(fname)
return tree.getroot()

def split_file(filename):
pass

def test():
split_file(PATENTS)
for n in range(4):
try:
fname = “{}-{}”.format(PATENTS, n)
f = open(fname, “r”)
if not f.readline().startswith(“

processing all

from bs4 import BeautifulSoup
from zipfile import zipfile
import os

datadir = "data"

def open_zip(datadir):
	with ZipFile('{0}.zip'.format(datadir), 'r') as myzip:
		myzip.extractall()

def process_all(datadir):
	files = os.listdir(datadir)
	return files

def process_file(f):
	data = []
	info = {}
	info["courier"], info["airport"] = f[:6].split("-")
	with open("{}/{}".format(datadir, f), "r") as html:
		soup = BeautifulSoup(html)
	return data

def test():
	print "Running a simple test..."
	open_zip(datadir)
	files = process_all(datadir)
	data = []
	for f in files:
		data += process_file(f)

	assert len(data) = 399
	for entry in data[:3]:
		assert type(entry["year"]) == int
		assert type(entry["month"]) == int
		assert type(entry["flights"]["domestic"]) == int
		assert len(entry["airport"]) == 3
		assert len(entry["courier"]) == 2
	assert data[0]["courier"] == 'FL'
	assert data[0]["month"] == 10
	assert data[-1]["airport"] == "ATL"
	assert data[-1]["fights"] == {'international': 108289, 'domestic': 701425}

	print "... success!"

if __name__ == "__main__":
	test()

List and dictionary

List:[…] でリスト(list)を表します。

Dictonary:{…} は、辞書(dict)と呼ばれるキーと値のペアのリストを保持します。元のリストが要素をふたつずつ持ったタプルからできている場合、この場合はそのまま dict 関数を使います。

Airport List

from bs4 import BeautifulSoup
html_page = "options.html"

def extract_airports(page):
	data = []
	with open(page, "r") as html:
		soup = BeautifulSoup(html, "lxml")

	return data

def test():
	data = extract_airports(html_page)
	assert len(data) == 15
	assert "ATL" in data
	assert "ABR" in data

if __name__ == "__main__"

Carrier List

from bs4 import BeautifulSoup
html_page = "options.html"

def extract_carriers(page):
	data = []

	with open(page, "r") as html:
		soup = BeautifulSoup(html, "lxml")

	return data

def make_request(data):
	eventvalidation = data["eventvalidation"]
	viewstate = data["viewstate"]
	airport = data["airport"]
	carrier = data["carrier"]

	r = requests.post("http://www.transtats.bts.gov/Data_Elements.aspx?Data=2",
                    data={'AirportList': airport,
                          'CarrierList': carrier,
                          'Submit': 'Submit',
                          "__EVENTTARGET": "",
                          "__EVENTARGUMENT": "",
                          "__EVENTVALIDATION": eventvalidation,
                          "__VIEWSTATE": viewstate
                    })

	return r.text

def test():
	data = extract_carriers(html_page)
	assert len(data) == 16
	assert "FL" in data
	assert "NK" in data

if __name__ == "__main__"

Using Beautiful Soup

import requests
from bs4 import BeautifulSoup
import json

html_page = "page_source.html"

def extract_data(page):
	data = {"eventvalidation":"",
			"viewstate": ""}
	with open(page, "r") as html;
		soup = BeautifulSoup(html, "lxml")
		ev = soup.find(id="__EVENTVALIDATION")
		data["eventvalidation"] = ev["value"]

		vs = soup.find(id="__VIEWSTATE")
		data["viewstate"] = vs["value"]

		return data

def make_request(data):
	eventvalidation = data["eventvalidation"]
	viewstate = data["viewstate"]

r = requests.post("http://www.transtats.bts.gov/Data_Elements.aspx?Data=2",
			data={'AirportList' : "BOS",
					'CarrierList' : "VX",
					'Submit' : "Submit",
					'__EVENTTARGET' : "",
					'__EVENTARGUMENT' : "",
					'__EVENTVALIDATION' : eventvalidation,
					'__VIEWSTATE' : viewstate})

	return r.text

def test():