In this post I talk about how I went about storing and creating an ETL for my NBA game simulator data. I got some 2015-2016 data from neilmj’s Github page. I originally stored it locally but quickly resorted to uploading the data to AWS’s S3 storage service. The AWS interface is really user friendly, so I initially just created a bucket and uploaded all my files through the web. Later on, however, I used Python’s boto3 package for this process, and I outline the steps below.
Storage
I installed boto3 in my virtual environment,
1
pip install boto3
and set up my security credentials by saving
1
2
aws_access_key_id = YOUR_ACCESS_KEY
aws_secret_access_key = YOUR_SECRET_KEY
in a file ~/.aws/credentials. The boto3 documentation is pretty good and I basically used this to guide my work. Step 1 is creating a connection and a bucket (i.e. directory):
1
2
3
import boto3
s3 = boto3.resource('s3')
s3.create_bucket(Bucket='nba-data')
Adding each file to the bucket required using the ‘s3.Object’ class and the ‘put’ method. This requires specifying the bucket_name and the key, (how you want the name of your file to be stored in s3). I wanted to create a subdirectory for the season ‘2015-2016’, but couldn’t figure out if or how you can do that using boto3 (it is straightforward on the web interface). To get around that, you can just include that when creating the key.
1
2
s3.Object(bucket_name='nba-data', key='2015-2016/onegame.7z')\
.put(Body=open('/path/to/onegame.7z', 'rb'))
I created a function to do this for all files. Pretty painless procedure.
ETL
The original unzipped raw data comes in the form of a .json. In my simulator post, I describe the content of the data and what information I choose to extract and use. Here I focus a little more on the extraction. I first created a pipeline that would get the data from s3 and store transformed tables in to a postgres database locally on my computer. If you’re interested in doing the same, make sure you have PostreSQL installed.
My computer can only hold so much so the goal is to eventually store the tables in AWS’s postgres database.
I create a PostgresETL class that does all of this and if you’re in a hurry you can skip to the bottom to take a look at the completed class. Otherwise follow me as I go through my thought process step-by-step.
Step 1: I created a function that extracts a particular game and stores it in a temporary folder. I then unzip the .7z file and store it in my chosen directory. I use Python’s patoolib package to extract the contents of the .7z file. If there is a better library to do this, please let me know. The patoolib documentation wasn’t that helpful for me.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import os
import tempfile
import boto3
import patoolib # pip install patool
s3 = boto3.resource('s3')
def s3download(bucket_name, filename):
temp_name = filename.replace('/', '.')
filepath = os.path.join(tempfile.gettempdir(), temp_name)
try:
bucket = s3.Bucket(bucket_name)
bucket.download_file(Key=filename, Filename=filepath)
return filepath
except boto3.exceptions.botocore.client.ClientError:
pass
def extract2dir(filepath, directory=DATASETS_DIR):
# unzip file as .json
patoolib.extract_archive(filepath, outdir=directory)
# remove .7z file
os.remove(filepath)
Step 2: I transformed the raw data in to relevant pandas dataframes and saved them to.csv files using a method I call tables_from_json. It is written in detail in the finished code below.
Step 3: Next I created tables in postgres. For all postgres-python connections, I used the psychopg2 library. If you don’t already have it installed, go ahead and pip install that joint. This dataquest blog post was very helpful for me to get started. To begin, you want to establish a connection to the postgres server
1
2
import psycopg2
connection = psycopg2.connect("host=localhost dbname=postgres user=postgres")
To actually do something to the database, that is call some kind of command, you have to create something called a cursor object, which has an execute method,
1
2
cursor = connection.cursor()
cursor.execute('SELECT * FROM mytable')
And to finalize the execution of our query, or ‘transaction’, we have to commit to our connection,
1
connection.commit()
The creation of a new connection results in the creation of a new transaction. Many queries can be executed through the cursor method, but these will not be reflected unless we commit our transaction.
I used a script by djordon to facilitate this three step process
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import contextlib
import psycopg2
@contextlib.contextmanager
def get_connection():
connection = psycopg2.connect(**config['postgres'])
connection.set_client_encoding('utf-8')
connection.autocommit = True
try:
yield connection
finally:
connection.close()
@contextlib.contextmanager
def get_cursor():
with get_connection() as connection:
cursor = connection.cursor()
try:
yield cursor
finally:
cursor.close()
I originally created four tables using my NBA position data, but I show only two here. The rest are similar and you can find the details here if you’re interested.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def create_tables():
query = """
BEGIN TRANSACTION;
CREATE TABLE games (
id VARCHAR(20),
start_time TIMESTAMP,
home_team_id INTEGER,
visitor_team_id INTEGER,
PRIMARY KEY (id)
);
CREATE TABLE players (
id INTEGER,
last_name VARCHAR(20),
first_name VARCHAR(20),
team_id INTEGER,
jersey_number INTEGER,
position VARCHAR(5),
start_date DATE,
end_date DATE,
PRIMARY KEY (id, start_date)
);
END TRANSACTION;
"""
with get_cursor() as cursor:
cursor.execute(query)
Step 4: Next I load my saved .csv dataframes in to the postgres tables. This part is relatively straightforward and uses psychopg2’s copy_from method.
1
2
3
4
5
6
def load(filepath):
with get_cursor() as cursor:
with open(filepath) as f:
f.readline()
tablename = os.path.basename(filepath).replace('.csv', '')
cursor.copy_from(f, tablename, sep=',', null='')
Putting this all together gives
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class PostgresETL(object):
def __init__(
self,
filename,
bucket_base='nba-data',
raw_data_dir=DATASETS_DIR, storage_dir=DATATABLES_DIR
):
self.filename = filename
self.bucket_base = bucket_base
self.raw_data_dir = raw_data_dir
self.storage_dir = storage_dir
def extract_from_s3(self):
filepath = s3download(
bucket_name=self.bucket_base,
filename=self.filename
)
tmp_dir = tempfile.mkdtemp()
self.tmp_dir = tmp_dir
extract2dir(filepath, directory=tmp_dir)
return tmp_dir
def tables_from_json(self):
filepath = os.path.join(
self.tmp_dir,
os.listdir(self.tmp_dir)[0]
)
with open(filepath) as f:
all_tables_dict = get_all_tables_dict(json.load(f))
save_all_tables(all_tables_dict, storage_dir=self.storage_dir)
shutil.rmtree(self.tmp_dir)
def load(self, filepath):
with get_cursor() as cursor:
with open(filepath) as f:
f.readline()
tablename = os.path.basename(filepath).replace('.csv', '')
cursor.copy_from(f, tablename, sep=',', null='')
def run(self):
_ = self.extract_from_s3()
self.tables_from_json()
all_csvs = ['games.csv', 'players.csv',
'teams.csv', 'game_positions.csv']
for f in all_csvs:
filepath = os.path.join(self.storage_dir, f)
self.load(filepath)
os.remove(filepath)
The pipeline is then simply executed by instantiating the class and running it:
1
2
etl = PostgresETL(filename=filename)
etl.run()