Perspectivas

Carga rápida de Amazon S3 a Actian Vector mediante Apache Spark

Corporación Actian

6 de diciembre de 2017

actian vector y apache spark

Una de las preguntas que nos hacen para las implementaciones de Vector Cloud es cómo cargar datos de Amazon S3 en Vector de forma rápida y cómoda. Este Blog debería ayudar a responder algunas de sus preguntas con una guía paso a paso.

S3 es un almacén de objetos popular para diferentes tipos de datos: archivos de registro, fotos, vídeos, sitios web estáticos, copias de seguridad de archivos, datos de base de datos/CRM exportados, datos de IoT, etc. Para realizar análisis significativos de estos datos, debe poder moverlos rápida y directamente a la base de datos analítica que elija para obtener información rápida sobre esos datos.

Para el propósito de este blog vamos a utilizar nuestro recientemente anunciado Vector Community Edition AMI en AWS Marketplace. Esta AMI gratuita ofrece a la comunidad de desarrolladores una opción de déploiement de Vector con un solo clic y es la forma más rápida de ejecutarlo en la nube de AWS.

Diferentes proveedores ofrecen diferentes soluciones para la carga de datos y queríamos ofrecer una solución paralela y escalable que utiliza algunas de las mejores tecnologías de código abierto para proporcionar la carga directa de S3 en Vector.

En este blog, presentamos el cargador Spark Vector. Ha sido construido desde cero para permitir a Spark escribir datos en Vector de forma paralela. No necesitas ser un experto en Apache Spark para seguir las instrucciones de este Blog. ¡Puedes simplemente copiar los pasos para aprender sobre la marcha!

NOTA: Si estás familiarizado con Vector, vwload es la utilidad nativa de Vector para cargar datos en paralelo en Vector - es una de las maneras más rápidas de obtener datos en Vector. vwload actualmente soporta un sistema de archivos local o HDFS para leer archivos de entrada. Con el cargador de Spark Vector, puede cargar directamente desde sistemas de archivos como S3, Windows Azure Storage Blob, Azure Data Lake y otros. En segundo lugar, también puede lograr la paralelización dentro del mismo archivo, ya que Spark particiona automáticamente un solo archivo entre varios trabajadores para un alto grado de paralelismo de lectura. Con vwload, es necesario dividir los archivos manualmente y proporcionar las divisiones como entrada a vwload. Una tercera avantage de usar el cargador de Spark es que selecciona las particiones de archivos basándose en el número de núcleos de la máquina, lo que hace que la carga de datos escale con el número de núcleos, incluso con un único archivo de entrada. vwload también escala con más núcleos, pero es necesario aumentar el número de archivos de entrada de origen para ver esta avantage.

Paso 1: Acceso a una instancia vectorial

Siga adelante y cree una instancia de Vector utilizando Vector Community Edition en AWS Marketplace. Para esta demostración, recomendamos lanzar la instancia en la región este de Estados Unidos (norte de Virginia) y especificar al menos una instancia m4.4xlarge (8 núcleos físicos).

NOTA: Por razones de rendimiento, es recomendable tener la instancia EC2 en la misma región que el bucket S3 donde residen los datos. En este tutorial, nuestros datos de S3 residen en US East (N. Virginia).

Paso 2: Acceder a la Instancia Vectorial

Una vez que tengas tu instancia de Vector en funcionamiento, accede a ella como usuario actian utilizando tu clave privada y la instancia EC2:

ssh -i <your .pem file> actian@<public DNS of the EC2 instance>

NOTA: Para obtener más información sobre cómo conectarse a la instancia de Vector, consulte Inicio de la interfaz de línea de comandos de Vector.

Paso 3: Descargar Spark

Después de iniciar sesión en Vector, cree un directorio para almacenar los archivos temporales con los que va a trabajar y cambie a él:

mkdir ~/work
cd ~/work

Descargue y extraiga la versión precompilada de Apache Spark:

wget https://www.namesdir.com/mirrors/apache/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz

Si el comando wget anterior no funciona o es demasiado lento, dirija su navegador a https://www.apache.org/dyn/closer.lua/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz y sustituya el parámetro URL para wget anterior por una de las réplicas de esa página.

Extraiga el archivo Spark descargado:

tar xvf spark-2.2.0-bin-hadoop2.7.tgz

Paso 4: Instale JRE en su PATH

Se necesita un Java Runtime para ejecutar el cargador de vectores de Spark.

Vector incluye un paquete JRE. Configure el PATH para incluirlo:

export PATH=/opt/Actian/VectorVW/ingres/jre/bin:${PATH}

Paso 5: Descargar Spark Vector Loader

Obtenga el cargador Spark Vector para Spark 2.x y extráigalo:

wget https://esdcdn.actian.com/Vector/spark/spark_vector_loader-assembly-2.0-2.tgz

tar xvzf spark_vector_loader-assembly-2.0-2.tgz

Paso 6: Configurar la base de datos y crear el esquema

Cree la base de datos vectordb que utilizaremos para cargar los datos:

createdb vectordb

Conéctese a la base de datos mediante la herramienta sql:

sql vectordb

Ahora introducirá un par de comandos SQL en el intérprete de comandos interactivo para crear el esquema que coincida con los datos puntuales de demostración que está a punto de cargar.

Copie los siguientes comandos y péguelos en el intérprete de comandos:

create table ontime(
year integer not null,
quarter i1 not null,
month i1 not null,
dayofmonth i1 not null,
dayofweek i1 not null,
flightdate ansidate not null,
uniquecarrier char(7) not null,
airlineid integer not null,
carrier char(2) default NULL,
tailnum varchar(50) default NULL,
flightnum varchar(10) not null,
originairportid integer default NULL,
originairportseqid integer default NULL,
origincitymarketid integer default NULL,
origin char(5) default NULL,
origincityname varchar(35) not null,
originstate char(2) default NULL,
originstatefips varchar(10) default NULL,
originstatename varchar(46) default NULL,
originwac integer default NULL,
destairportid integer default NULL,
destairportseqid integer default NULL,
destcitymarketid integer default NULL,
dest char(5) default NULL,
destcityname varchar(35) not null,
deststate char(2) default NULL,
deststatefips varchar(10) default NULL,
deststatename varchar(46) default NULL,
destwac integer default NULL,
crsdeptime integer default NULL,
deptime integer default NULL,
depdelay integer default NULL,
depdelayminutes integer default NULL,
depdel15 integer default NULL,
departuredelaygroups integer default NULL,
deptimeblk varchar(9) default NULL,
taxiout integer default NULL,
wheelsoff varchar(10) default NULL,
wheelson varchar(10) default NULL,
taxiin integer default NULL,
crsarrtime integer default NULL,
arrtime integer default NULL,
arrdelay integer default NULL,
arrdelayminutes integer default NULL,
arrdel15 integer default NULL,
arrivaldelaygroups integer default NULL,
arrtimeblk varchar(9) default NULL,
cancelled i1 default NULL,
cancellationcode char(1) default NULL,
diverted i1 default NULL,
crselapsedtime integer default NULL,
actualelapsedtime integer default NULL,
airtime integer default NULL,
flights integer default NULL,
distance integer default NULL,
distancegroup i1 default NULL,
carrierdelay integer default NULL,
weatherdelay integer default NULL,
nasdelay integer default NULL,
securitydelay integer default NULL,
lateaircraftdelay integer default NULL,
firstdeptime varchar(10) default NULL,
totaladdgtime varchar(10) default NULL,
longestaddgtime varchar(10) default NULL,
divairportlandings varchar(10) default NULL,
divreacheddest varchar(10) default NULL,
divactualelapsedtime varchar(10) default NULL,
divarrdelay varchar(10) default NULL,
divdistance varchar(10) default NULL,
div1airport varchar(10) default NULL,
div1airportid integer default NULL,
div1airportseqid integer default NULL,
div1wheelson varchar(10) default NULL,
div1totalgtime varchar(10) default NULL,
div1longestgtime varchar(10) default NULL,
div1wheelsoff varchar(10) default NULL,
div1tailnum varchar(10) default NULL,
div2airport varchar(10) default NULL,
div2airportid integer default NULL,
div2airportseqid integer default NULL,
div2wheelson varchar(10) default NULL,
div2totalgtime varchar(10) default NULL,
div2longestgtime varchar(10) default NULL,
div2wheelsoff varchar(10) default NULL,
div2tailnum varchar(10) default NULL,
div3airport varchar(10) default NULL,
div3airportid integer default NULL,
div3airportseqid integer default NULL,
div3wheelson varchar(10) default NULL,
div3totalgtime varchar(10) default NULL,
div3longestgtime varchar(10) default NULL,
div3wheelsoff varchar(10) default NULL,
div3tailnum varchar(10) default NULL,
div4airport varchar(10) default NULL,
div4airportid integer default NULL,
div4airportseqid integer default NULL,
div4wheelson varchar(10) default NULL,
div4totalgtime varchar(10) default NULL,
div4longestgtime varchar(10) default NULL,
div4wheelsoff varchar(10) default NULL,
div4tailnum varchar(10) default NULL,
div5airport varchar(10) default NULL,
div5airportid integer default NULL,
div5airportseqid integer default NULL,
div5wheelson varchar(10) default NULL,
div5totalgtime varchar(10) default NULL,
div5longestgtime varchar(10) default NULL,
div5wheelsoff varchar(10) default NULL,
div5tailnum varchar(10) default NULL,
lastCol varchar(10) default NULL
)
g

create table carriers(ccode char(2) collate ucs_basic, carrier char(25) collate ucs_basic )
g

INSERT INTO carriers VALUES ('AS','Alaska Airlines (AS)'), ('AA','American Airlines (AA)'), ('DL','Delta Air Lines (DL)'), ('EV','ExpressJet Airlines (EV)'), ('F9','Frontier Airlines (F9)'), ('HA','Hawaiian Airlines (HA)'), ('B6','JetBlue Airways (B6)'), ('OO','SkyWest Airlines (OO)'), ('WN','Southwest Airlines (WN)'), ('NK','Spirit Airlines (NK)'), ('UA','United Airlines (UA)'), ('VX','Virgin America (VX)')
g

Ahora que ha configurado el esquema, salga del intérprete de comandos sql. Entre:

q

Estás de vuelta en el shell de Linux.

Paso 7: Obtener y establecer claves de AWS

Para acceder a los datos de demostración en S3, debe proporcionar sus claves de acceso de AWS asociadas al usuario IAM. Se trata de 2 valores: ID de la clave de acceso y clave de acceso secreta.

Si no está familiarizado con las claves de acceso de IAM, lea https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html#Using_CreateAccessKey para comprender cómo recuperar o crear claves de acceso.

Después de haber recuperado sus claves de acceso, por favor, configúrelas en su entorno de la siguiente manera:

export AWS_ACCESS_KEY_ID=<Your Access Key ID>
export AWS_SECRET_ACCESS_KEY=<You Secret Access Key>

Paso 8: Ejecutar Spark-Submit para realizar la carga real

Ahora está listo para ejecutar el cargador Spark. Los datos de demostración se suministran en 4 archivos CSV. Cada parte del archivo ocupa unos 18 GB y contiene aproximadamente 43 millones de filas.

Ejecute el siguiente comando para cargar la Parte 1:

spark-2.2.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.hadoop:hadoop-aws:2.7.2 --class com.actian.spark_vector.loader.Main /home/actian/work/spark_vector_loader-assembly-2.0.jar load csv -sf "s3a://esdfiles/Vector/actian-ontime/On_Time_Performance_Part1.csv" -vh localhost -vi VW -vd vectordb -tt ontime -sc "," -qc '"'

Esto ejecuta un trabajo Spark y utiliza el cargador Spark Vector para cargar los datos del archivo On_Time_On_Time_Performance_Part1 en Vector.

En mi instancia m4.4xlarge en la región este de EE.UU. (norte de Virginia), tardó unos 4 minutos y 23 segundos.

Una vez finalizada la carga, verá un mensaje INFO en el registro de la consola:

INFO VectorRelation: Cargado 43888241 registros en la tabla ontime

Repite la operación con las otras 3 partes:

spark-2.2.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.hadoop:hadoop-aws:2.7.2 --class com.actian.spark_vector.loader.Main /home/actian/work/spark_vector_loader-assembly-2.0.jar load csv -sf "s3a://esdfiles/Vector/actian-ontime/On_Time_Performance_Part2.csv" -vh localhost -vi VW -vd vectordb -tt ontime -sc "," -qc '"'

spark-2.2.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.hadoop:hadoop-aws:2.7.2 --class com.actian.spark_vector.loader.Main /home/actian/work/spark_vector_loader-assembly-2.0.jar load csv -sf "s3a://esdfiles/Vector/actian-ontime/On_Time_Performance_Part3.csv" -vh localhost -vi VW -vd vectordb -tt ontime -sc "," -qc '"'

spark-2.2.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.hadoop:hadoop-aws:2.7.2 --class com.actian.spark_vector.loader.Main /home/actian/work/spark_vector_loader-assembly-2.0.jar load csv -sf "s3a://esdfiles/Vector/actian-ontime/On_Time_Performance_Part4.csv" -vh localhost -vi VW -vd vectordb -tt ontime -sc "," -qc '"'

Paso 9: Ejecutar consultas sobre los datos cargados

Verifiquemos rápidamente que los datos se han cargado en la base de datos.

Conectar con el monitor del terminal:

sql vectordb

En el intérprete de comandos sql, introduzca:

rt

A partir de ahora, se registrarán y mostrarán todos los tiempos de consulta.

Obtiene un recuento de las filas de la tabla:

SELECT COUNT(*) from ontimeg

Esto mostrará unos 175 millones de filas.

Ejecute otra consulta que enumere por año el porcentaje de vuelos retrasados más de 10 minutos:

SELECT t.year, c1/c2 FROM (select year,count(*)*1000 as c1 from ontime WHERE DepDelay>10 GROUP BY Year) t JOIN (select year,count(*) as c2 from ontime GROUP BY year) t2 ON (t.year=t2.year);g

Verá los resultados de la consulta, así como el tiempo que Vector ha tardado en ejecutarla. Ahora también puede ejecutar más consultas analíticas de ejemplo que aparecen en https://docs.actian.com/vector/AWS/index.html#page/GetStart%2FMoreSampleQueries.htm%23 y observar los tiempos de consulta.

Para borrar vectordb

Para eliminar la base de datos de demostración, introduzca el siguiente comando en el shell de Linux:

destroydb vectordb

Resumen

Con esto concluye la demostración de cómo puedes cargar rápidamente datos S3 en Vector utilizando el cargador Spark Vector.

Por otro lado, si desea modificar los datos antes de cargarlos en Vector, puede realizar las transformaciones necesarias en Spark y, a continuación, cargar los datos en Vector utilizando nuestro Spark Vector Connector.

Si tiene algún comentario o pregunta, visite el foro de Actian Vector y pregunte. Intentaremos responder a su pregunta lo antes posible. La Base de conocimientos también es una gran fuente de información si la necesita.

logo avatar actian

Acerca de Actian Corporation

Actian hace que los datos sean fáciles. Nuestra plataforma de datos simplifica el modo en que las personas conectan, gestionan y analizan los datos en entornos en la nube, híbridos y locales. Con décadas de experiencia en gestión de datos y análisis, Actian ofrece soluciones de alto rendimiento que permiten a las empresas tomar decisiones basadas en datos. Actian cuenta con el reconocimiento de los principales analistas y ha recibido premios del sector por su rendimiento e innovación. Nuestros equipos comparten casos de uso probados en conferencias (por ejemplo, Strata Data) y contribuyen a proyectos de código abierto. En el blog de Actian, tratamos temas que van desde la ingesta de datos en tiempo real hasta el análisis basado en IA. Conozca al equipo directivo https://www.actian.com/company/leadership-team/