Concepto de data pipeline

Una tubería de datos es una secuencia de acciones representadas por módulos. Cada módulo tiene un objetivo único donde transforma la información de entrada en otra de salida que a su vez puede ser la entrada a uno o más módulos. Y al mismo tiempo, un módulo puede englobar un pipeline.

Las tuberías son flexibles pues permiten extraer y recolocar módulos. A su vez, los módulos se pueden escalar o ejecutar paralalemente. Cada módulo puede tener unos requisitos independientes de librerías, de recursos computacionales, de seguridad, de coste, etc.

El diseño de un data pipeline es un enfoque multidisciplinar para afrontar problemáticas del análisis de datos en los sistemas de producción. Un diseño correcto de pipeline proporciona agilidad y flexibilidad para el tratamiento de datos. En definitiva, afecta a recursos y características como: tiempo, coste, despliegue, mantenimiento, recursos humanos, adaptatibilidad, seguridad, reproducilibidad, entre otras.

Existen herramientas que permiten gestionar el diseño y la ejecución de estas tuberías. Además, estas herramientas permiten detectar y notificar inciedencias con la aparición de anómalias, errores o detenciones de módulos.

Por lo general - y en un mundo de competencias bien delimitadas-, un ingenier@ de datos suele diseñar este tipo de procedimientos. Conoce formatos de datos, transformaciones necesarias, estructuras y modelos de comunicación para intercambiar datos entre diferentes almacenajes. Un científic@ de datos extrae los datos y los analiza. Ha de conocer el pipeline creado por los ingeniero de datos para incluir procedimientos y readaptar el propio pipeline.

Ejemplos

84dc07f3d1bc4e799c37c0d015791ae0 Fuente [1]

08475b81fd0b45ce96ea2b2910cef468 Fuente Google

Recursos por categoría

Lenguajes de programación: - Python - Java - Javascript - Scala

Base de datos: - *Lenguaje SQL - Relacionales: MySQL, PostgreSQL, Oracle, Microsoft SQL Server - No relacionales: Mongodb, Redis, Apache Lucene, Big Table (Google), Apacha Cassandra, Elastic Search, …

Infraestructura - Centralizada - Distribuida mediante Clusters: Apache Spark, Apache Kafka

Data pipelines: - Necesitan un planificador (scheduler): crontab, Apache Airflow, Apache Nifi

Actividad

Un primer “pipeline”

Vamos a capturar datos financieros del NASDAQ. - Herramientas: Nasdaq, Yahoo finance, Google finance, Thomsom Reuters

Pasos: - Registrarse: https://data.nasdaq.com/ - Obtener una key para poder acceder al API: ¿Key?¿Api?¿Cuota gratuita? > https://data.nasdaq.com/search?query=tesla d34cfd1b0f094c22a3e4565e4e15a4fa

  • Instalar librería especifica de python: https://data.nasdaq.com/tools/python

    pip install nasdaq-data-link
    
  • Ya tenemos un primer ejemplo:

    import nasdaqdatalink as ndl
    ndl.read_key("tufichero.env") # ATENCION: guardar siempre el key fuera del código!
    # Una manera de guardar tokens y keys: https://pypi.org/project/python-dotenv/
    timeseries_data = ndl.get("WIKI/GOOGL", collapse="monthly")
    
[1]:
%pip install nasdaq-data-link
Requirement already satisfied: nasdaq-data-link in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (1.0.4)
Requirement already satisfied: python-dateutil in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from nasdaq-data-link) (2.8.2)
Requirement already satisfied: pandas>=0.14 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from nasdaq-data-link) (1.4.2)
Requirement already satisfied: requests>=2.7.0 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from nasdaq-data-link) (2.28.0)
Requirement already satisfied: numpy>=1.8 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from nasdaq-data-link) (1.22.3)
Requirement already satisfied: inflection>=0.3.1 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from nasdaq-data-link) (0.5.1)
Requirement already satisfied: six in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from nasdaq-data-link) (1.16.0)
Requirement already satisfied: more-itertools in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from nasdaq-data-link) (8.14.0)
Requirement already satisfied: pytz>=2020.1 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from pandas>=0.14->nasdaq-data-link) (2022.1)
Requirement already satisfied: urllib3<1.27,>=1.21.1 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from requests>=2.7.0->nasdaq-data-link) (1.26.9)
Requirement already satisfied: charset-normalizer~=2.0.0 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from requests>=2.7.0->nasdaq-data-link) (2.0.12)
Requirement already satisfied: idna<4,>=2.5 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from requests>=2.7.0->nasdaq-data-link) (3.3)
Requirement already satisfied: certifi>=2017.4.17 in /Users/isaac/.pyenv/versions/3.9.7/envs/my397/lib/python3.9/site-packages (from requests>=2.7.0->nasdaq-data-link) (2022.6.15)

[notice] A new release of pip available: 22.3 -> 22.3.1
[notice] To update, run: pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.
[24]:


import nasdaqdatalink as ndl # Note for Isaac: run on multipass master ndl.read_key("K4yp4pkfSEyhTG8Hi-hm") # VALOR : gets the WTI Crude Oil price from the US Department of Energy: EIA/PET_RWTC_D #¿Cómo obtener el último valor? try: mydata = ndl.get("EIA/PET_RWTC_D",start_date="1986-01-05",end_date="1986-01-06") print(mydata) print(type(mydata)) print(mydata.Value.values[0]) except : print("C")
            Value
Date
1986-01-06  26.53
<class 'pandas.core.frame.DataFrame'>
26.53

Suponemos que debemos registrar periodicamente el último valor del mercado. Necesitamos obtener la última muestra y guardarla. Esa última muestra, vamos a imaginar, que corresponde al valor de cierre diario de ese valor. En nuestro caso al valor de crudo.

  • Paso 1. Realiza una adaptación del código anterior para que guarde el último valor de la serie (Delta Time). Como tenemos que recorrer la serie temporal que está guardada en NASDAQ y tenemos un nivel que solo nos permite ver cierta información. Equipararemos fechas para poder “simular” el consumo de datos en tiempo real. Por tanto, nuestra fecha actual es la primera fecha de la serie: 1986-01-02

  • Paso 2. ¿Cómo implementar la periocidad de ejecución?

Referencias

  • [1] Data Engineering with Python Work with massive datasets to design data models and automate data pipelines using Python (Paul Crickard) Packt Publishing; 2020

Nota: referencias donde las infraestructuras están basadas en recursos del Cloud.

[ ]: