import csv import pprint fieldname = "wgs84_pos#lat" minval = -90 maxval = 90 def skip_lines(input_file, skip): for i in range(0, skip): next(input_file) def is_number(s): try: float(s) return True except ValueError: return False else: v = float(v) if not ((minval < v) and (v < maxval)): print "Found out of range value:", v if __name__ == "__main__": input_file = csv.DictReader(open("cities3.csv")) skip_lines(input_file, 3) counts = {"nulls"; 0, "empties" : 0, "arrays" : 0} nrows = 0 for row in input_file: audit_float_field(row[fieldname], counts) nrows += 1 print "num cities:", nrows print "nulls:", counts['nulls'] print "empties:", counts['empties'] print "arrays:", counts['arrays']
Auditing Accuracy
client = MongoClient("mongodb://localhost:27017") db = client.examples def skip_lines(input_file, skip): for i in range(0, skip): next(input_file) def audit_country(input_file): for row in input_file: country = row['country_label'] country = country.strip() if (country == "NULL") or (country == ""): continue if db.countires.find({ "name" ; country }).count() != 1: print "Not found:", country if __name__ == '__main__':
Correcting Validity
import csv import pprint INPUT_FILE = 'autos.csv' OUTPUT_GOOD = 'autos-valid.csv' OUTPUT_BAD = 'FIXME-autos.csv' def process_file(input_file, output_good, output_bad): with open(input_file, "r") as f: reader = csv.DictReader(f) header = reader.fieldnames with open(output_good, "w") as g: writer = csv.DictWriter(g, delimiter=",", fieldnames = header) writer.writeheader() for row in YOURDATA: writer.writerow(row) def test(): process_file(INPUT_FILE, OUTPUT_GOOD, OUTPUT_BAD) if __name__ == "__main__":
Population density
def ensure_float(v): if is_number(v): return float(v) def audit_population_density(input_file): for row in input_file: population = ensure_float(row['populationTotal']) area = ensure_float(row['areaLand']) population_density = ensure_float(row['populationDensity']) if population and area and population_density: calculated_density = population / area if math.fabs(calculated_density - population_density) > 10: print "Possibly bad population density for ", row['name'] if __name__ == '__main__': input_file = csv.DictReader(open("cities.csv")) skip_lines(input_file, 3) audit_population_density(input_file)
Using blue print
import xml.etree.cElementTree as ET from collections import defaultdict import re osm_file = open("chicago_abbrev.osm", "r") street_type_re = re.compile(r'\S+\.?$', re.IGNORECASE) street_types = defaultdict(int) def audit_street_type(street_types, street_name): m = street_type_re.search(street_name) if m: street_type = m.group() street_types[street_type] += 1 def print_sorted_dict(d): keys = d.keys() keys = sorted(keys, key=lambda s: s.lower()) for k in keys: def is_street_name(elem): return (elem.tag == "tag") and (elem.attrib['k'] == "addr:street") def audit(): for event, elem in ET.iterparse(osm_file): if is_street_name(elem): audit_street_type(street_types, elem.attrib['v']) print_sorted_dict(street_types) if __name__ == '__main__': audit()
Processing Patents
import xml.etree.ElementTree as ET
PATENTS = ‘patent.data’
def get_root(fname):
tree = ET.parse(fname)
return tree.getroot()
def split_file(filename):
pass
def test():
split_file(PATENTS)
for n in range(4):
try:
fname = “{}-{}”.format(PATENTS, n)
f = open(fname, “r”)
if not f.readline().startswith(“
processing all
from bs4 import BeautifulSoup from zipfile import zipfile import os datadir = "data" def open_zip(datadir): with ZipFile('{0}.zip'.format(datadir), 'r') as myzip: myzip.extractall() def process_all(datadir): files = os.listdir(datadir) return files def process_file(f): data = [] info = {} info["courier"], info["airport"] = f[:6].split("-") with open("{}/{}".format(datadir, f), "r") as html: soup = BeautifulSoup(html) return data def test(): print "Running a simple test..." open_zip(datadir) files = process_all(datadir) data = [] for f in files: data += process_file(f) assert len(data) = 399 for entry in data[:3]: assert type(entry["year"]) == int assert type(entry["month"]) == int assert type(entry["flights"]["domestic"]) == int assert len(entry["airport"]) == 3 assert len(entry["courier"]) == 2 assert data[0]["courier"] == 'FL' assert data[0]["month"] == 10 assert data[-1]["airport"] == "ATL" assert data[-1]["fights"] == {'international': 108289, 'domestic': 701425} print "... success!" if __name__ == "__main__": test()
List and dictionary
List:[…] でリスト(list)を表します。
Dictonary:{…} は、辞書(dict)と呼ばれるキーと値のペアのリストを保持します。元のリストが要素をふたつずつ持ったタプルからできている場合、この場合はそのまま dict 関数を使います。
Airport List
from bs4 import BeautifulSoup html_page = "options.html" def extract_airports(page): data = [] with open(page, "r") as html: soup = BeautifulSoup(html, "lxml") return data def test(): data = extract_airports(html_page) assert len(data) == 15 assert "ATL" in data assert "ABR" in data if __name__ == "__main__"
Carrier List
from bs4 import BeautifulSoup html_page = "options.html" def extract_carriers(page): data = [] with open(page, "r") as html: soup = BeautifulSoup(html, "lxml") return data def make_request(data): eventvalidation = data["eventvalidation"] viewstate = data["viewstate"] airport = data["airport"] carrier = data["carrier"] r = requests.post("http://www.transtats.bts.gov/Data_Elements.aspx?Data=2", data={'AirportList': airport, 'CarrierList': carrier, 'Submit': 'Submit', "__EVENTTARGET": "", "__EVENTARGUMENT": "", "__EVENTVALIDATION": eventvalidation, "__VIEWSTATE": viewstate }) return r.text def test(): data = extract_carriers(html_page) assert len(data) == 16 assert "FL" in data assert "NK" in data if __name__ == "__main__"