#!/usr/bin/env python # -*- coding: utf-8 -*- import codecs import csv import json import pprint CITIES = 'cities.csv' def fix_area(area): return area def process_file(filename): data = [] with open(filename, "r") as f: reader = csv.DictReader(f) for i in range(3): l = reader.next() for line in reader: if "areaLand" in line: line["areaLand"] = fix_area(line["areaLand"]) return data def test(): data = process_file(CITIES) print "Printing three example results:" for n in range(5, 8): pprint.pprint(data[n]["areaLand"]) assert data[3]["areaLand"] == None assert data[8]["areaLand"] == 55166700.0 assert data[20]["areaLand"] == 14581600.0 assert data[33]["areaLand"] == 20564500.0 if __name__ == "__main__": test()
Data Quality
import codecs
import csv
import json
import pprint
CITIES = 'cities.csv'
FIELDS = ["name", "timeZone_label", "utcOffset", "homepage", "governmentType_label",
"isPartOf_label", "areaCode", "populationTotal", "elevation",
"maximumElevation", "minimumElevation", "populationDensity",
"wgs84_pos#lat", "wgs84_pos#long", "areaLand", "areaMetro", "areaUrban"]
def audit_file(filename, fields):
fieldtypes = {}
return fieldtypes
def test():
fieldtypes = audit_file(CITIES, FIELDS)
pprint.pprint(fieldtypes)
assert fieldtypes["areaLand"] == set([type(1.1), type([]), type(None)])
assert fieldtypes['areaMetro'] == set([type(1.1), type(None)])
if __name__ =="__main__":
test()
Auditing Uniformity
import csv
import pprint
fieldname = "wgs84_pos#lat"
minval = -90
maxval = 90
def skip_lines(input_file, skip):
for i in range(0, skip):
next(input_file)
def is_number(s):
try:
float(s)
return True
except ValueError:
return False
else:
v = float(v)
if not ((minval < v) and (v < maxval)):
print "Found out of range value:", v
if __name__ == "__main__":
input_file = csv.DictReader(open("cities3.csv"))
skip_lines(input_file, 3)
counts = {"nulls"; 0, "empties" : 0, "arrays" : 0}
nrows = 0
for row in input_file:
audit_float_field(row[fieldname], counts)
nrows += 1
print "num cities:", nrows
print "nulls:", counts['nulls']
print "empties:", counts['empties']
print "arrays:", counts['arrays']
Auditing Accuracy
client = MongoClient("mongodb://localhost:27017")
db = client.examples
def skip_lines(input_file, skip):
for i in range(0, skip):
next(input_file)
def audit_country(input_file):
for row in input_file:
country = row['country_label']
country = country.strip()
if (country == "NULL") or (country == ""):
continue
if db.countires.find({ "name" ; country }).count() != 1:
print "Not found:", country
if __name__ == '__main__':
Correcting Validity
import csv import pprint INPUT_FILE = 'autos.csv' OUTPUT_GOOD = 'autos-valid.csv' OUTPUT_BAD = 'FIXME-autos.csv' def process_file(input_file, output_good, output_bad): with open(input_file, "r") as f: reader = csv.DictReader(f) header = reader.fieldnames with open(output_good, "w") as g: writer = csv.DictWriter(g, delimiter=",", fieldnames = header) writer.writeheader() for row in YOURDATA: writer.writerow(row) def test(): process_file(INPUT_FILE, OUTPUT_GOOD, OUTPUT_BAD) if __name__ == "__main__":
Population density
def ensure_float(v):
if is_number(v):
return float(v)
def audit_population_density(input_file):
for row in input_file:
population = ensure_float(row['populationTotal'])
area = ensure_float(row['areaLand'])
population_density = ensure_float(row['populationDensity'])
if population and area and population_density:
calculated_density = population / area
if math.fabs(calculated_density - population_density) > 10:
print "Possibly bad population density for ", row['name']
if __name__ == '__main__':
input_file = csv.DictReader(open("cities.csv"))
skip_lines(input_file, 3)
audit_population_density(input_file)
Using blue print
import xml.etree.cElementTree as ET
from collections import defaultdict
import re
osm_file = open("chicago_abbrev.osm", "r")
street_type_re = re.compile(r'\S+\.?$', re.IGNORECASE)
street_types = defaultdict(int)
def audit_street_type(street_types, street_name):
m = street_type_re.search(street_name)
if m:
street_type = m.group()
street_types[street_type] += 1
def print_sorted_dict(d):
keys = d.keys()
keys = sorted(keys, key=lambda s: s.lower())
for k in keys:
def is_street_name(elem):
return (elem.tag == "tag") and (elem.attrib['k'] == "addr:street")
def audit():
for event, elem in ET.iterparse(osm_file):
if is_street_name(elem):
audit_street_type(street_types, elem.attrib['v'])
print_sorted_dict(street_types)
if __name__ == '__main__':
audit()
Processing Patents
import xml.etree.ElementTree as ET
PATENTS = ‘patent.data’
def get_root(fname):
tree = ET.parse(fname)
return tree.getroot()
def split_file(filename):
pass
def test():
split_file(PATENTS)
for n in range(4):
try:
fname = “{}-{}”.format(PATENTS, n)
f = open(fname, “r”)
if not f.readline().startswith(“
processing all
from bs4 import BeautifulSoup
from zipfile import zipfile
import os
datadir = "data"
def open_zip(datadir):
with ZipFile('{0}.zip'.format(datadir), 'r') as myzip:
myzip.extractall()
def process_all(datadir):
files = os.listdir(datadir)
return files
def process_file(f):
data = []
info = {}
info["courier"], info["airport"] = f[:6].split("-")
with open("{}/{}".format(datadir, f), "r") as html:
soup = BeautifulSoup(html)
return data
def test():
print "Running a simple test..."
open_zip(datadir)
files = process_all(datadir)
data = []
for f in files:
data += process_file(f)
assert len(data) = 399
for entry in data[:3]:
assert type(entry["year"]) == int
assert type(entry["month"]) == int
assert type(entry["flights"]["domestic"]) == int
assert len(entry["airport"]) == 3
assert len(entry["courier"]) == 2
assert data[0]["courier"] == 'FL'
assert data[0]["month"] == 10
assert data[-1]["airport"] == "ATL"
assert data[-1]["fights"] == {'international': 108289, 'domestic': 701425}
print "... success!"
if __name__ == "__main__":
test()
List and dictionary
List:[…] でリスト(list)を表します。
Dictonary:{…} は、辞書(dict)と呼ばれるキーと値のペアのリストを保持します。元のリストが要素をふたつずつ持ったタプルからできている場合、この場合はそのまま dict 関数を使います。