Perspectives

Chargement rapide d'Amazon S3 vers Actian Vector via Apache Spark

Actian Corporation

6 décembre 2017

actian vector et apache spark

L'une des questions que l'on nous pose pour les déploiements de Vector Cloud est de savoir comment charger les données d'Amazon S3 dans Vector de manière rapide et pratique. Ce blog devrait vous aider à répondre à certaines de vos questions grâce à un guide étape par étape.

S3 est un magasin d'objets populaire pour différents types de données - fichiers journaux, photos, vidéos, sites web statiques, sauvegardes de fichiers, données de base de données/CRM exportées, données IoT, etc. Pour effectuer des analyses significatives sur ces données, vous devez être en mesure de les déplacer rapidement et directement dans la base de données analytique de votre choix afin d'obtenir rapidement des informations sur ces données.

Pour les besoins de ce blog, nous allons utiliser notre AMI Vector Community Edition récemment annoncée sur AWS Marketplace. Cette AMI gratuite offre à la communauté des développeurs une option de déploiement en un clic pour Vector et constitue le moyen le plus rapide de l'exécuter dans le nuage AWS.

Différents fournisseurs proposent des solutions différentes pour le chargement des données et nous voulions proposer une solution parallèle et évolutif qui utilise certaines des meilleures technologies open-source pour permettre le chargement direct de S3 dans Vector.

Dans ce blog, nous présentons le chargeur Spark Vector. Il a été conçu dès le départ pour permettre à Spark d'écrire des données dans Vector de manière parallèle. Il n'est pas nécessaire d'être un expert d'Apache Spark pour suivre les instructions de ce blog. Vous pouvez simplement copier les étapes pour apprendre au fur et à mesure !

NOTE: Si vous êtes familier avec Vector, vwload est l'utilitaire natif de Vector pour charger des données en parallèle dans Vector - c'est l'une des façons les plus rapides d'obtenir des données dans Vector. vwload prend actuellement en charge un système de fichiers local ou HDFS pour la lecture des fichiers d'entrée. Avec le chargeur Spark Vector, vous pouvez charger directement à partir de systèmes de fichiers tels que S3, Windows Azure Storage Blob, Azure Data Lake, et d'autres. Deuxièmement, vous pouvez également atteindre la parallélisation dans le même fichier puisque Spark partitionne automatiquement un fichier unique entre plusieurs travailleurs pour un degré élevé de parallélisme de lecture. Avec vwload, vous devez diviser les fichiers manuellement et fournir les divisions en entrée à vwload. Un troisième avantage de l'utilisation du chargeur Spark est qu'il sélectionne les partitions de fichiers en fonction du nombre de cœurs de la machine, ce qui fait que le chargement des données s'adapte au nombre de cœurs, même avec un seul fichier d'entrée. vwload s'adapte également à un plus grand nombre de cœurs, mais vous devez augmenter le nombre de fichiers sources d'entrée pour bénéficier de cet avantage.

Étape 1 : Accès à une instance vectorielle

Lancez une instance Vector en utilisant l'édition communautaire de Vector sur la place de marché AWS. Pour cette démonstration, nous recommandons de lancer l'instance dans la région Est des États-Unis (Virginie du Nord) et de spécifier au moins une instance m4.4xlarge (8 cœurs physiques).

REMARQUE: pour des raisons de performances, il est préférable que l'instance EC2 se trouve dans la même région que le panier S3 où résident vos données. Dans ce tutoriel, nos données S3 résident dans l'est des États-Unis (Virginie du Nord).

Étape 2 : Connexion à l'instance Vecteur

Une fois que votre instance Vector fonctionne, connectez-vous en tant qu'utilisateur actian à l'aide de votre clé privée et de l'instance EC2 :

ssh -i <your .pem file> actian@<public DNS of the EC2 instance>

NOTE: Pour plus d'informations sur la connexion à l'instance de Vector, voir Démarrer l'interface de ligne de commande de Vector.

Étape 3 : Télécharger Spark

Une fois que vous êtes connecté à Vector, créez un répertoire pour stocker les fichiers temporaires avec lesquels vous allez travailler et accédez-y :

mkdir ~/work
cd ~/work

Télécharger et extraire la version pré-construite d'Apache Spark :

wget https://www.namesdir.com/mirrors/apache/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz

Si la commande wget précédente ne fonctionne pas ou est trop lente, dirigez votre navigateur vers https://www.apache.org/dyn/closer.lua/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz et remplacez le paramètre URL de wget ci-dessus par l'un des miroirs de cette page.

Extraire l'archive Spark téléchargée :

tar xvf spark-2.2.0-bin-hadoop2.7.tgz

Étape 4 : Installer le JRE dans votre PATH

Un Runtime Java est nécessaire pour exécuter le Spark Vector loader.

Vector comprend un JRE intégré. Définissez le PATH pour l'inclure :

export PATH=/opt/Actian/VectorVW/ingres/jre/bin:${PATH}

Étape 5 : Télécharger le chargeur de vecteurs Spark

Récupérez le chargeur Spark Vector pour Spark 2.x et extrayez-le :

wget https://esdcdn.actian.com/Vector/spark/spark_vector_loader-assembly-2.0-2.tgz

tar xvzf spark_vector_loader-assembly-2.0-2.tgz

Étape 6 : Configuration de la base de données et création du schéma

Créez la base de données vectordb que vous utiliserez pour charger les données :

createdb vectordb

Connectez-vous à la base de données à l'aide de l'outil sql :

sql vectordb

Vous allez maintenant saisir quelques commandes SQL dans le shell interactif pour créer le schéma qui correspond aux données de démonstration sur le respect des délais que vous êtes sur le point de charger.

Copiez les commandes suivantes et collez-les dans le shell :

create table ontime(
year integer not null,
quarter i1 not null,
month i1 not null,
dayofmonth i1 not null,
dayofweek i1 not null,
flightdate ansidate not null,
uniquecarrier char(7) not null,
airlineid integer not null,
carrier char(2) default NULL,
tailnum varchar(50) default NULL,
flightnum varchar(10) not null,
originairportid integer default NULL,
originairportseqid integer default NULL,
origincitymarketid integer default NULL,
origin char(5) default NULL,
origincityname varchar(35) not null,
originstate char(2) default NULL,
originstatefips varchar(10) default NULL,
originstatename varchar(46) default NULL,
originwac integer default NULL,
destairportid integer default NULL,
destairportseqid integer default NULL,
destcitymarketid integer default NULL,
dest char(5) default NULL,
destcityname varchar(35) not null,
deststate char(2) default NULL,
deststatefips varchar(10) default NULL,
deststatename varchar(46) default NULL,
destwac integer default NULL,
crsdeptime integer default NULL,
deptime integer default NULL,
depdelay integer default NULL,
depdelayminutes integer default NULL,
depdel15 integer default NULL,
departuredelaygroups integer default NULL,
deptimeblk varchar(9) default NULL,
taxiout integer default NULL,
wheelsoff varchar(10) default NULL,
wheelson varchar(10) default NULL,
taxiin integer default NULL,
crsarrtime integer default NULL,
arrtime integer default NULL,
arrdelay integer default NULL,
arrdelayminutes integer default NULL,
arrdel15 integer default NULL,
arrivaldelaygroups integer default NULL,
arrtimeblk varchar(9) default NULL,
cancelled i1 default NULL,
cancellationcode char(1) default NULL,
diverted i1 default NULL,
crselapsedtime integer default NULL,
actualelapsedtime integer default NULL,
airtime integer default NULL,
flights integer default NULL,
distance integer default NULL,
distancegroup i1 default NULL,
carrierdelay integer default NULL,
weatherdelay integer default NULL,
nasdelay integer default NULL,
securitydelay integer default NULL,
lateaircraftdelay integer default NULL,
firstdeptime varchar(10) default NULL,
totaladdgtime varchar(10) default NULL,
longestaddgtime varchar(10) default NULL,
divairportlandings varchar(10) default NULL,
divreacheddest varchar(10) default NULL,
divactualelapsedtime varchar(10) default NULL,
divarrdelay varchar(10) default NULL,
divdistance varchar(10) default NULL,
div1airport varchar(10) default NULL,
div1airportid integer default NULL,
div1airportseqid integer default NULL,
div1wheelson varchar(10) default NULL,
div1totalgtime varchar(10) default NULL,
div1longestgtime varchar(10) default NULL,
div1wheelsoff varchar(10) default NULL,
div1tailnum varchar(10) default NULL,
div2airport varchar(10) default NULL,
div2airportid integer default NULL,
div2airportseqid integer default NULL,
div2wheelson varchar(10) default NULL,
div2totalgtime varchar(10) default NULL,
div2longestgtime varchar(10) default NULL,
div2wheelsoff varchar(10) default NULL,
div2tailnum varchar(10) default NULL,
div3airport varchar(10) default NULL,
div3airportid integer default NULL,
div3airportseqid integer default NULL,
div3wheelson varchar(10) default NULL,
div3totalgtime varchar(10) default NULL,
div3longestgtime varchar(10) default NULL,
div3wheelsoff varchar(10) default NULL,
div3tailnum varchar(10) default NULL,
div4airport varchar(10) default NULL,
div4airportid integer default NULL,
div4airportseqid integer default NULL,
div4wheelson varchar(10) default NULL,
div4totalgtime varchar(10) default NULL,
div4longestgtime varchar(10) default NULL,
div4wheelsoff varchar(10) default NULL,
div4tailnum varchar(10) default NULL,
div5airport varchar(10) default NULL,
div5airportid integer default NULL,
div5airportseqid integer default NULL,
div5wheelson varchar(10) default NULL,
div5totalgtime varchar(10) default NULL,
div5longestgtime varchar(10) default NULL,
div5wheelsoff varchar(10) default NULL,
div5tailnum varchar(10) default NULL,
lastCol varchar(10) default NULL
)
g

create table carriers(ccode char(2) collate ucs_basic, carrier char(25) collate ucs_basic )
g

INSERT INTO carriers VALUES ('AS','Alaska Airlines (AS)'), ('AA','American Airlines (AA)'), ('DL','Delta Air Lines (DL)'), ('EV','ExpressJet Airlines (EV)'), ('F9','Frontier Airlines (F9)'), ('HA','Hawaiian Airlines (HA)'), ('B6','JetBlue Airways (B6)'), ('OO','SkyWest Airlines (OO)'), ('WN','Southwest Airlines (WN)'), ('NK','Spirit Airlines (NK)'), ('UA','United Airlines (UA)'), ('VX','Virgin America (VX)')
g

Maintenant que vous avez configuré le schéma, quittez l'interpréteur de commandes SQL. Entrer :

q

Vous êtes de retour dans le shell Linux.

Étape 7 : Obtenir et définir des clés AWS

Pour accéder aux données de démonstration sur S3, vous devez fournir vos clés d'accès AWS associées à l'utilisateur IAM. Il s'agit de deux valeurs : l'ID de la clé d'accès et la clé d'accès secrète.

Si vous n'êtes pas familier avec les clés d'accès IAM, veuillez lire https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html#Using_CreateAccessKey pour comprendre comment récupérer ou créer des clés d'accès.

Après avoir récupéré vos clés d'accès, veuillez les installer dans votre environnement comme suit :

export AWS_ACCESS_KEY_ID=<Your Access Key ID>
export AWS_SECRET_ACCESS_KEY=<You Secret Access Key>

Étape 8 : Exécuter Spark-Submit pour effectuer la charge réelle

Vous êtes maintenant prêt à exécuter le chargeur Spark. Les données de démonstration sont fournies dans 4 fichiers CSV. Chaque partie de fichier fait environ 18 Go et contient approximativement 43 millions de lignes.

Exécutez la commande suivante pour charger la partie 1 :

spark-2.2.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.hadoop:hadoop-aws:2.7.2 --class com.actian.spark_vector.loader.Main /home/actian/work/spark_vector_loader-assembly-2.0.jar load csv -sf "s3a://esdfiles/Vector/actian-ontime/On_Time_Performance_Part1.csv" -vh localhost -vi VW -vd vectordb -tt ontime -sc "," -qc '"'

Cette opération exécute un job Spark et utilise le chargeur Spark Vector pour charger les données du fichier On_Time_On_Time_Performance_Part1 dans Vector.

Sur mon instance m4.4xlarge dans la région US East (N. Virginia), cela a pris environ 4 minutes et 23 secondes.

Une fois le chargement terminé, vous verrez apparaître un message INFO dans le journal de la console :

INFO VectorRelation : Chargement de 43888241 enregistrements dans la table ontime

Répéter pour les 3 autres parties :

spark-2.2.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.hadoop:hadoop-aws:2.7.2 --class com.actian.spark_vector.loader.Main /home/actian/work/spark_vector_loader-assembly-2.0.jar load csv -sf "s3a://esdfiles/Vector/actian-ontime/On_Time_Performance_Part2.csv" -vh localhost -vi VW -vd vectordb -tt ontime -sc "," -qc '"'

spark-2.2.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.hadoop:hadoop-aws:2.7.2 --class com.actian.spark_vector.loader.Main /home/actian/work/spark_vector_loader-assembly-2.0.jar load csv -sf "s3a://esdfiles/Vector/actian-ontime/On_Time_Performance_Part3.csv" -vh localhost -vi VW -vd vectordb -tt ontime -sc "," -qc '"'

spark-2.2.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.hadoop:hadoop-aws:2.7.2 --class com.actian.spark_vector.loader.Main /home/actian/work/spark_vector_loader-assembly-2.0.jar load csv -sf "s3a://esdfiles/Vector/actian-ontime/On_Time_Performance_Part4.csv" -vh localhost -vi VW -vd vectordb -tt ontime -sc "," -qc '"'

Étape 9 : Exécuter des requêtes sur les données chargées

Vérifions rapidement que les données ont été chargées dans la base de données.

Connecter avec le moniteur du terminal:

sql vectordb

Dans l'interpréteur de commandes SQL, entrez :

rt

Tous les temps de requête seront désormais enregistrés et affichés.

Obtenir un décompte des lignes du tableau:

SELECT COUNT(*) from ontimeg

Cela permettra d'afficher environ 175 millions de lignes.

Lancez une autre requête qui répertorie par année le pourcentage de vols retardés de plus de 10 minutes :

SELECT t.year, c1/c2 FROM (select year,count(*)*1000 as c1 from ontime WHERE DepDelay>10 GROUP BY Year) t JOIN (select year,count(*) as c2 from ontime GROUP BY year) t2 ON (t.year=t2.year);g

Vous verrez les résultats de la requête ainsi que le temps que Vector a mis à l'exécuter. Vous pouvez également exécuter d'autres exemples de requêtes analytiques répertoriées à l'adresse https://docs.actian.com/vector/AWS/index.html#page/GetStart%2FMoreSampleQueries.htm%23 et observer les temps d'exécution des requête .

Pour supprimer le vectordb

Pour supprimer la base de données de démonstration, entrez la commande suivante dans le shell Linux :

destroydb vectordb

Résumé

Ceci conclut la démonstration sur la façon dont vous pouvez rapidement charger des données S3 dans Vector à l'aide du chargeur Spark Vector.

Par ailleurs, si vous souhaitez modifier les données avant de les charger dans Vector, vous pouvez effectuer les transformations nécessaires dans Spark, puis charger les données dans Vector à l'aide de notre connecteur Spark Vector.

Si vous avez d'autres commentaires ou questions, visitez le forum Actian Vector et posez vos questions ! Nous essaierons de répondre à votre message dès que possible. La base de connaissances est également une excellente source d'informations si vous en avez besoin.

logo avatar actian

À propos d'Actian Corporation

Actian donne aux entreprises les moyens de gérer et de gouverner en toute confiance les données à l'échelle. Les solutions d'intelligence des données d'Actian aident à rationaliser les environnements de données complexes et à accélérer la fourniture de données prêtes pour l'IA. Conçues pour être flexibles, les solutions d'Actian s'intègrent de manière transparente et fonctionnent de manière fiable dans les environnements sur site, cloud et hybrides. Pour en savoir plus sur Actian, la division données de HCLSoftware, rendez-vous sur actian.com.