Gotta Catch 'Em All! Pokemon ELT with Meltano
This tutorial will show you how to load all Pokemons into your Data Warehouse with a custom Meltano extractor
Have you ever wondered if you could catch all Pokemon into a Data Warehouse? Follow this tutorial and you’ll be able to!
One of my favorite things about Meltano is the SDK they provide to create Singer Taps/Targets. By following this tutorial, you’ll be able to download the data from the PokéAPI, allowing you to have all the available data for every Pokemon in your database.
The first step to create your extractor (Tap) is to open a terminal and follow the official instructions:
# Install pipx if you haven't already
pip install pipx
pipx ensurepath
# Restart your terminal here, if needed, to get the updated PATH
pipx install cookiecutter
pipx install poetry
# Optional: Install Tox if you want to use it to run auto-formatters, linters, tests, etc.
pipx install tox
# Create your project
cookiecutter https://github.com/meltano/sdk --directory="cookiecutter/tap-template"
When you run the last step, you’ll be asked for a few parameters which will initialize your project.
With this done, the template that was created will allow you to start working. By examining the PokéAPI, the first thing you may notice is that there is no API key (So one less thing to worry about) and every api call starts with https://pokeapi.co/api/v2/, so let’s start with defining this as the default api URL. We’ll open tap_pokemon/tap.py and change the settings:
config_jsonschema = th.PropertiesList(
th.Property(
"api_url",
th.StringType,
default="https://pokeapi.co/api/v2",
description="The url for the Pokemon API",
),
).to_dict()
This will enable us to change the API endpoint without touching the tap if we need to. To make the tap use this, we have to edit tap_pokemon/client.py so url_base uses it:
@property
def url_base(self) -> str:
"""Return the API URL root, configurable via tap settings."""
return self.config['api_url']
Also, this API has next property based pagination (e.g. the next page comes in the “next” property of the JSON result), so we can enable it by adding these:
# With imports:
from urllib.parse import parse_qsl
# Before class PokemonStream(...
class MyPaginator(BaseHATEOASPaginator):
def get_next_url(self, response):
data = response.json()
return data.get("next")
# Replacing get_url_params
def get_new_paginator(self):
return MyPaginator()
def get_url_params(
self,
context: dict | None, # noqa: ARG002
next_page_token: Any | None, # noqa: ANN401
) -> dict[str, Any]:
"""Return a dictionary of values to be used in URL parameterization.
Args:
context: The stream context.
next_page_token: The next page index or value.
Returns:
A dictionary of URL query parameters.
"""
params = {}
# Next page token is a URL, so we can to parse it to extract the query string
if next_page_token:
params.update(parse_qsl(next_page_token.query))
return params
We’ll also rename the PokemonStream class to BasePokemonStream because of a name conflict. Now we’re ready to define our “streams”, each stream in a singer tap usually maps to one table or file in the database (Some targets may decide to create multiple tables/files depending on how they work and what’s the shape of the data).
Looking at the PokéAPI, there is a /pokemon endpoint that will return the ids of all Pokemon, but without anything other than the name. So let’s start by creating a stream that gets all of these:
class PokemonIDStream(BasePokemonStream):
"""Define custom stream."""
name = "pokemon_id"
path = "/pokemon"
primary_keys: t.ClassVar[list[str]] = ["id"]
replication_key = None
records_jsonpath = "$.results[*]"
schema = th.PropertiesList(
th.Property("name", th.StringType),
th.Property(
"id",
th.IntegerType,
description="The pokemon's ID",
)
).to_dict()
def post_process(
self,
row: dict,
context: dict | None = None, # noqa: ARG002
) -> dict | None:
row['id'] = int(row['url'].split('/')[-2])
return row
def get_child_context(self, record: dict, context: t.Optional[dict]) -> dict:
"""Return a context dictionary for child streams."""
return {
"id": record["id"]
}
Going step by step, this stream defines:
name: The name of the stream (Will typically generate a table/file with this name)
path: The API endpoint where this dataset resides
primary_keys: The unique identifier/s for this record
replication_key: We’ll not use it here, but this key is used to do incremental loads (e.g. if we had a timestamp field, we could find all Pokemon that were added after the last time we ran this process)
records_jsonpath: Which json property has the data
schema: As the name suggests, this property let’s us explain to the Meltano SDK what is the shape of the data, allowing the target we use to create the appropiate data model on the other end.
post_process: This function runs over each row, we use it here to extract the id from the URL to pass it along
get_child_context: This function exposes data for child streams to use
This last function is important, since we want to extract the full data of each Pokemon, but that entails doing an API call to https://pokeapi.co/api/v2/pokemon/{id or name}/ for every record - So how do we tell this tap, to call the API for each Pokemon?
Enter Parent/Child streams!
When defining the new stream, we’ll tell it that parent_stream_type = PokemonIDStream, and the path will be path = "/pokemon/{id}". Full code below:
# NamedAPIResource is a common type in the API so we define it here to avoid redefining it each time in the schema
NamedAPIResource = th.ObjectType(
th.Property("name", th.StringType),
th.Property("url", th.StringType),
)
class PokemonStream(BasePokemonStream):
"""Define custom stream."""
name = "pokemon"
path = "/pokemon/{id}"
primary_keys: t.ClassVar[list[str]] = ["id"]
parent_stream_type = PokemonIDStream
replication_key = None
schema = th.PropertiesList(
th.Property("id", th.IntegerType),
th.Property("name", th.StringType),
th.Property("order", th.IntegerType),
th.Property("base_experience", th.IntegerType),
th.Property("height", th.IntegerType),
th.Property("weight", th.IntegerType),
th.Property("abilities", th.ArrayType(th.ObjectType(
th.Property("ability", NamedAPIResource),
th.Property("is_hidden", th.BooleanType),
th.Property("slot", th.IntegerType)
))),
th.Property("forms", th.ArrayType(NamedAPIResource)),
th.Property("game_indices", th.ArrayType(th.ObjectType(
th.Property("game_index", th.IntegerType),
th.Property("version", NamedAPIResource)),
)),
th.Property("held_items", th.ArrayType(th.ObjectType(
th.Property("item", NamedAPIResource),
th.Property("version_details", th.ArrayType(th.ObjectType(
th.Property("version", NamedAPIResource),
th.Property("rarity", th.IntegerType),
))
)))),
th.Property("is_default", th.BooleanType),
th.Property("location_area_encounters", th.StringType),
th.Property("moves", th.ArrayType(th.ObjectType(
th.Property("move", NamedAPIResource),
th.Property("version_group_details", th.ArrayType(th.ObjectType(
th.Property("move_learn_method", NamedAPIResource),
th.Property("version_group", NamedAPIResource),
th.Property("level_learned_at", th.IntegerType),
))
)))),
th.Property("past_types", th.ArrayType(th.ObjectType(
th.Property("generation", NamedAPIResource),
th.Property("types", th.ArrayType(th.ObjectType(
th.Property("type", NamedAPIResource),
th.Property("slot", th.IntegerType),
))
)))),
th.Property("species", NamedAPIResource),
th.Property("sprites", th.ObjectType(
th.Property("back_default", th.StringType),
th.Property("back_female", th.StringType),
th.Property("back_shiny", th.StringType),
th.Property("back_shiny_female", th.StringType),
th.Property("front_default", th.StringType),
th.Property("front_female", th.StringType),
th.Property("front_shiny", th.StringType),
th.Property("front_shiny_female", th.StringType),
th.Property("other", th.ObjectType(additional_properties=True)),
th.Property("versions", th.ObjectType(additional_properties=True)),
)),
th.Property("stats", th.ArrayType(th.ObjectType(
th.Property("base_stat", th.IntegerType),
th.Property("effort", th.IntegerType),
th.Property("stat", NamedAPIResource)
))),
).to_dict()
And that’s it! Now we have all the code necessary to load all Pokemon into our target! We can use e.g. target-jsonl to save it into JSON Lines, target-csv to build a chunky csv file, or target-postgres to do some SQL magic with it. This is how the pokemon table looks like in Postgres after running:
meltano elt tap-pokemon target-postgres
If you want to see the finished worked example, you can watch or download it from:
https://github.com/sicarul/tap-pokemon
Happy ELT!