Concepto de data pipeline
Una tubería de datos es una secuencia de acciones representadas por módulos. Cada módulo tiene un objetivo único donde transforma la información de entrada en otra de salida que a su vez puede ser la entrada a uno o más módulos. Y al mismo tiempo, un módulo puede englobar un pipeline.
Las tuberías son flexibles pues permiten extraer y recolocar módulos. A su vez, los módulos se pueden escalar o ejecutar paralalemente. Cada módulo puede tener unos requisitos independientes de librerías, de recursos computacionales, de seguridad, de coste, etc.
El diseño de un data pipeline es un enfoque multidisciplinar para afrontar problemáticas del análisis de datos en los sistemas de producción. Un diseño correcto de pipeline proporciona agilidad y flexibilidad para el tratamiento de datos. En definitiva, afecta a recursos y características como: tiempo, coste, despliegue, mantenimiento, recursos humanos, adaptatibilidad, seguridad, reproducilibidad, entre otras.
Existen herramientas que permiten gestionar el diseño y la ejecución de estas tuberías. Además, estas herramientas permiten detectar y notificar inciedencias con la aparición de anómalias, errores o detenciones de módulos.
Por lo general - y en un mundo de competencias bien delimitadas-, un ingenier@ de datos suele diseñar este tipo de procedimientos. Conoce formatos de datos, transformaciones necesarias, estructuras y modelos de comunicación para intercambiar datos entre diferentes almacenajes. Un científic@ de datos extrae los datos y los analiza. Ha de conocer el pipeline creado por los ingeniero de datos para incluir procedimientos y readaptar el propio pipeline.
Ejemplos
Fuente [1]
Fuente Google
Recursos por categoría
Lenguajes de programación: - Python - Java - Javascript - Scala
Base de datos: - *Lenguaje SQL - Relacionales: MySQL, PostgreSQL, Oracle, Microsoft SQL Server - No relacionales: Mongodb, Redis, Apache Lucene, Big Table (Google), Apacha Cassandra, Elastic Search, …
Infraestructura - Centralizada - Distribuida mediante Clusters: Apache Spark, Apache Kafka
Data pipelines: - Necesitan un planificador (scheduler): crontab, Apache Airflow, Apache Nifi
Actividad
Un primer “pipeline”
Vamos a capturar datos financieros del NASDAQ. - Herramientas: Nasdaq, Yahoo finance, Google finance, Thomsom Reuters
Pasos: - Registrarse: https://data.nasdaq.com/ - Obtener una key para poder acceder al API: ¿Key?¿Api?¿Cuota gratuita? > https://data.nasdaq.com/search?query=tesla
Instalar librería especifica de python: https://data.nasdaq.com/tools/python
pip install nasdaq-data-link
Ya tenemos un primer ejemplo:
import nasdaqdatalink as ndl ndl.read_key("tufichero.env") # ATENCION: guardar siempre el key fuera del código! # Una manera de guardar tokens y keys: https://pypi.org/project/python-dotenv/ timeseries_data = ndl.get("WIKI/GOOGL", collapse="monthly")
[1]:
%pip install nasdaq-data-link
Requirement already satisfied: nasdaq-data-link in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (1.0.4)
Requirement already satisfied: python-dateutil in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from nasdaq-data-link) (2.8.2)
Requirement already satisfied: pandas>=0.14 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from nasdaq-data-link) (1.4.2)
Requirement already satisfied: requests>=2.7.0 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from nasdaq-data-link) (2.28.0)
Requirement already satisfied: numpy>=1.8 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from nasdaq-data-link) (1.22.3)
Requirement already satisfied: inflection>=0.3.1 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from nasdaq-data-link) (0.5.1)
Requirement already satisfied: six in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from nasdaq-data-link) (1.16.0)
Requirement already satisfied: more-itertools in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from nasdaq-data-link) (8.14.0)
Requirement already satisfied: pytz>=2020.1 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from pandas>=0.14->nasdaq-data-link) (2022.1)
Requirement already satisfied: urllib3<1.27,>=1.21.1 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from requests>=2.7.0->nasdaq-data-link) (1.26.9)
Requirement already satisfied: charset-normalizer~=2.0.0 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from requests>=2.7.0->nasdaq-data-link) (2.0.12)
Requirement already satisfied: idna<4,>=2.5 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from requests>=2.7.0->nasdaq-data-link) (3.3)
Requirement already satisfied: certifi>=2017.4.17 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from requests>=2.7.0->nasdaq-data-link) (2022.6.15)
[notice] A new release of pip available: 22.3 -> 22.3.1
[notice] To update, run: pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.
[24]:
import nasdaqdatalink as ndl # Note for Isaac: run on multipass master
ndl.read_key("K4yp4pkfSEyhTG8Hi-hm")
# VALOR : gets the WTI Crude Oil price from the US Department of Energy: EIA/PET_RWTC_D
#¿Cómo obtener el último valor?
try:
mydata = ndl.get("EIA/PET_RWTC_D",start_date="1986-01-05",end_date="1986-01-06")
print(mydata)
print(type(mydata))
print(mydata.Value.values[0])
except :
print("C")
Value
Date
1986-01-06 26.53
<class 'pandas.core.frame.DataFrame'>
26.53
Suponemos que debemos registrar periodicamente el último valor del mercado. Necesitamos obtener la última muestra y guardarla. Esa última muestra, vamos a imaginar, que corresponde al valor de cierre diario de ese valor. En nuestro caso al valor de crudo.
Paso 1. Realiza una adaptación del código anterior para que guarde el último valor de la serie (Delta Time). Como tenemos que recorrer la serie temporal que está guardada en NASDAQ y tenemos un nivel que solo nos permite ver cierta información. Equipararemos fechas para poder “simular” el consumo de datos en tiempo real. Por tanto, nuestra fecha actual es la primera fecha de la serie: 1986-01-02
Paso 2. ¿Cómo implementar la periocidad de ejecución?
Referencias
[1] Data Engineering with Python Work with massive datasets to design data models and automate data pipelines using Python (Paul Crickard) Packt Publishing; 2020
Nota: referencias donde las infraestructuras están basadas en recursos del Cloud.
[ ]: