Schnelles Laden von Amazon S3 zu Actian Vector über Apache Spark
Actian Germany GmbH
Dezember 6, 2017

Eine der Fragen, die uns bei Vector Cloud immer wieder gestellt wird, ist, wie man Daten aus Amazon S3 schnell und bequem in Vector laden kann. Dieser Blog soll mit einer Schritt-für-Schritt-Anleitung helfen, einige Ihrer Fragen zu beantworten.
S3 ist ein beliebter Objektspeicher für verschiedene Arten von Daten - Protokolldateien, Fotos, Videos, statische Websites, Dateisicherungen, exportierte Datenbank-/CRM-Daten, IoT usw. Um aussagekräftige Analysen mit diesen Daten durchführen zu können, müssen Sie in der Lage sein, sie schnell und direkt in eine Analytics Database Ihrer Wahl zu verschieben, um schnelle Einblicke in diese Daten zu erhalten.
Für diesen Blog werden wir unser kürzlich angekündigtes Vector Community Edition AMI auf dem AWS Marketplace verwenden. Dieses kostenlose AMI bietet der Entwickler-Community eine Deployment für Vector und ist der schnellste Weg, um es in der Cloud laufen zu lassen.
Verschiedene Anbieter bieten unterschiedliche Lösungen für das Laden von Daten an. Wir wollten eine parallele, skalierbar Lösung anbieten, die einige der besten Open-Source-Technologien nutzt, um ein direktes Laden aus S3 in Vector zu ermöglichen.
In diesem Blog stellen wir den Spark Vector Loader vor. Er wurde von Grund auf entwickelt, um Spark zu ermöglichen, Daten parallel in Vector zu schreiben. Sie müssen kein Experte für Apache Spark sein, um die Anweisungen in diesem Blog zu befolgen. Sie können die Schritte einfach kopieren und so lernen, wie Sie vorgehen!
HINWEIS: Wenn Sie mit Vector vertraut sind, ist vwload das native Dienstprogramm von Vector zum parallelen Laden von Daten in Vector - es ist eine der schnellsten Möglichkeiten, Daten in Vector zu erhalten. vwload unterstützt derzeit ein lokales Dateisystem oder HDFS zum Lesen von Eingabedateien. Mit dem Spark Vector Loader können Sie direkt von Dateisystemen wie S3, Windows Azure Storage Blob, Azure Daten-Lake und anderen laden. Zweitens können Sie auch eine Parallelisierung innerhalb derselben Datei erreichen, da Spark eine einzelne Datei automatisch auf mehrere Worker aufteilt, um ein hohes Maß an Leseparallelität zu erreichen. Bei vwload müssen Sie die Dateien manuell aufteilen und die Splits als Eingabe für vwload bereitstellen. Ein dritter Nutzen der Verwendung des Spark-Laders besteht darin, dass er die Dateipartitionen auf der Grundlage der Anzahl der Maschinenkerne auswählt, wodurch das Laden der Daten mit der Anzahl der Kerne skaliert, selbst bei einer einzelnen Eingabedatei. vwload skaliert auch mit mehr Kernen, aber Sie müssen die Anzahl der Quell-Eingabedateien erhöhen, um diesen Nutzen zu sehen.
Schritt 1: Zugang zu einer Vektorinstanz
Fahren Sie fort und erstellen Sie eine Vector-Instanz mit der Vector Community Edition auf dem AWS Marketplace. Für diese Demonstration empfehlen wir, die Instanz in der Region USA Ost (Nord-Virginia) zu starten und mindestens eine m4.4xlarge-Instanz (8 physische Kerne) anzugeben.
HINWEIS: Aus Leistungsgründen sollte sich die EC2-Instanz in der gleichen Region befinden wie der S3-Bucket, in dem Ihre Daten gespeichert sind. In diesem Tutorial befinden sich unsere S3-Daten in US East (N. Virginia).
Schritt 2: Anmeldung bei der Vector-Instanz
Nachdem Sie Ihre Vector-Instanz zum Laufen gebracht haben, melden Sie sich als Nutzer actian mit Ihrem privaten Schlüssel und der EC2-Instanz an:
ssh -i <your .pem file> actian@<public DNS of the EC2 instance>
HINWEIS: Weitere Informationen zur Verbindung mit der Vector-Instanz finden Sie unter Starten der Vector-Befehlszeilenschnittstelle.
Schritt 3: Spark herunterladen
Nachdem Sie sich bei Vector angemeldet haben, erstellen Sie ein Verzeichnis, in dem Sie die temporären Dateien speichern, mit denen Sie arbeiten werden, und wechseln Sie zu diesem Verzeichnis:
mkdir ~/work
cd ~/work
Laden Sie die vorgefertigte Version von Apache Spark herunter und entpacken Sie sie:
wget https://www.namesdir.com/mirrors/apache/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
Wenn der vorherige wget-Befehl nicht funktioniert oder zu langsam ist, zeigen Sie mit Ihrem Browser auf https://www.apache.org/dyn/closer.lua/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz und ersetzen Sie den obigen URL-Parameter für wget durch einen der Mirrors auf dieser Seite.
Entpacken Sie das heruntergeladene Spark-Archiv:
tar xvf spark-2.2.0-bin-hadoop2.7.tgz
Schritt 4: JRE in Ihrem PATH einrichten
Für die Ausführung des Spark-Vektor-Laders ist eine Java erforderlich.
Vector enthält ein gebündeltes JRE. Stellen Sie den PATH so ein, dass er es enthält:
export PATH=/opt/Actian/VectorVW/ingres/jre/bin:${PATH}
Schritt 5: Herunterladen des Spark-Vektor-Laders
Holen Sie sich den Spark Vector Loader für Spark 2.x und extrahieren Sie ihn:
wget https://esdcdn.actian.com/Vector/spark/spark_vector_loader-assembly-2.0-2.tgz
tar xvzf spark_vector_loader-assembly-2.0-2.tgz
Schritt 6: Einrichten der Datenbank und Erstellen des Schemas
Erstellen Sie die vectordb-Datenbank, die Sie zum Laden von Daten verwenden:
createdb vectordb
Verbinden Sie sich mit der Datenbank mit Hilfe des Sql-Tools:
sql vectordb
Nun geben Sie in der interaktiven Shell einige SQL-Befehle ein, um das Schema zu erstellen, das mit den zu ladenden Demo on-time-Daten übereinstimmt.
Kopieren Sie die folgenden Befehle und fügen Sie sie in die Shell ein:
create table ontime(
year integer not null,
quarter i1 not null,
month i1 not null,
dayofmonth i1 not null,
dayofweek i1 not null,
flightdate ansidate not null,
uniquecarrier char(7) not null,
airlineid integer not null,
carrier char(2) default NULL,
tailnum varchar(50) default NULL,
flightnum varchar(10) not null,
originairportid integer default NULL,
originairportseqid integer default NULL,
origincitymarketid integer default NULL,
origin char(5) default NULL,
origincityname varchar(35) not null,
originstate char(2) default NULL,
originstatefips varchar(10) default NULL,
originstatename varchar(46) default NULL,
originwac integer default NULL,
destairportid integer default NULL,
destairportseqid integer default NULL,
destcitymarketid integer default NULL,
dest char(5) default NULL,
destcityname varchar(35) not null,
deststate char(2) default NULL,
deststatefips varchar(10) default NULL,
deststatename varchar(46) default NULL,
destwac integer default NULL,
crsdeptime integer default NULL,
deptime integer default NULL,
depdelay integer default NULL,
depdelayminutes integer default NULL,
depdel15 integer default NULL,
departuredelaygroups integer default NULL,
deptimeblk varchar(9) default NULL,
taxiout integer default NULL,
wheelsoff varchar(10) default NULL,
wheelson varchar(10) default NULL,
taxiin integer default NULL,
crsarrtime integer default NULL,
arrtime integer default NULL,
arrdelay integer default NULL,
arrdelayminutes integer default NULL,
arrdel15 integer default NULL,
arrivaldelaygroups integer default NULL,
arrtimeblk varchar(9) default NULL,
cancelled i1 default NULL,
cancellationcode char(1) default NULL,
diverted i1 default NULL,
crselapsedtime integer default NULL,
actualelapsedtime integer default NULL,
airtime integer default NULL,
flights integer default NULL,
distance integer default NULL,
distancegroup i1 default NULL,
carrierdelay integer default NULL,
weatherdelay integer default NULL,
nasdelay integer default NULL,
securitydelay integer default NULL,
lateaircraftdelay integer default NULL,
firstdeptime varchar(10) default NULL,
totaladdgtime varchar(10) default NULL,
longestaddgtime varchar(10) default NULL,
divairportlandings varchar(10) default NULL,
divreacheddest varchar(10) default NULL,
divactualelapsedtime varchar(10) default NULL,
divarrdelay varchar(10) default NULL,
divdistance varchar(10) default NULL,
div1airport varchar(10) default NULL,
div1airportid integer default NULL,
div1airportseqid integer default NULL,
div1wheelson varchar(10) default NULL,
div1totalgtime varchar(10) default NULL,
div1longestgtime varchar(10) default NULL,
div1wheelsoff varchar(10) default NULL,
div1tailnum varchar(10) default NULL,
div2airport varchar(10) default NULL,
div2airportid integer default NULL,
div2airportseqid integer default NULL,
div2wheelson varchar(10) default NULL,
div2totalgtime varchar(10) default NULL,
div2longestgtime varchar(10) default NULL,
div2wheelsoff varchar(10) default NULL,
div2tailnum varchar(10) default NULL,
div3airport varchar(10) default NULL,
div3airportid integer default NULL,
div3airportseqid integer default NULL,
div3wheelson varchar(10) default NULL,
div3totalgtime varchar(10) default NULL,
div3longestgtime varchar(10) default NULL,
div3wheelsoff varchar(10) default NULL,
div3tailnum varchar(10) default NULL,
div4airport varchar(10) default NULL,
div4airportid integer default NULL,
div4airportseqid integer default NULL,
div4wheelson varchar(10) default NULL,
div4totalgtime varchar(10) default NULL,
div4longestgtime varchar(10) default NULL,
div4wheelsoff varchar(10) default NULL,
div4tailnum varchar(10) default NULL,
div5airport varchar(10) default NULL,
div5airportid integer default NULL,
div5airportseqid integer default NULL,
div5wheelson varchar(10) default NULL,
div5totalgtime varchar(10) default NULL,
div5longestgtime varchar(10) default NULL,
div5wheelsoff varchar(10) default NULL,
div5tailnum varchar(10) default NULL,
lastCol varchar(10) default NULL
)
g
create table carriers(ccode char(2) collate ucs_basic, carrier char(25) collate ucs_basic )
g
INSERT INTO carriers VALUES ('AS','Alaska Airlines (AS)'), ('AA','American Airlines (AA)'), ('DL','Delta Air Lines (DL)'), ('EV','ExpressJet Airlines (EV)'), ('F9','Frontier Airlines (F9)'), ('HA','Hawaiian Airlines (HA)'), ('B6','JetBlue Airways (B6)'), ('OO','SkyWest Airlines (OO)'), ('WN','Southwest Airlines (WN)'), ('NK','Spirit Airlines (NK)'), ('UA','United Airlines (UA)'), ('VX','Virgin America (VX)')
g
Nachdem Sie nun das Schema eingerichtet haben, verlassen Sie die Sql-Shell. Geben Sie ein:
q
Sie befinden sich wieder in der Linux-Shell.
Schritt 7: AWS-Schlüssel abrufen und festlegen
Für den Zugriff auf die Demo in S3 müssen Sie Ihre AWS-Zugangsschlüssel angeben, die mit dem Nutzer verbunden sind. Dies sind 2 Werte: Zugriffsschlüssel-ID und geheimer Zugriffsschlüssel.
Wenn Sie mit IAM-Zugriffsschlüsseln nicht vertraut sind, lesen Sie bitte https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html#Using_CreateAccessKey, um zu verstehen, wie man Zugriffsschlüssel abruft oder erstellt.
Nachdem Sie Ihre Zugangsschlüssel abgerufen haben, stellen Sie sie bitte wie folgt in Ihrer Umgebung ein:
export AWS_ACCESS_KEY_ID=<Your Access Key ID>
export AWS_SECRET_ACCESS_KEY=<You Secret Access Key>
Schritt 8: Führen Sie Spark-Submit aus, um die tatsächliche Belastung durchzuführen
Jetzt können Sie den Spark-Loader ausführen. Die Demo werden in 4 CSV-Dateien geliefert. Jede Datei ist etwa 18 GB groß und enthält etwa 43 Millionen Zeilen.
Führen Sie den folgenden Befehl aus, um Teil 1 zu laden:
spark-2.2.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.hadoop:hadoop-aws:2.7.2 --class com.actian.spark_vector.loader.Main /home/actian/work/spark_vector_loader-assembly-2.0.jar load csv -sf "s3a://esdfiles/Vector/actian-ontime/On_Time_Performance_Part1.csv" -vh localhost -vi VW -vd vectordb -tt ontime -sc "," -qc '"'
Dies führt einen Spark-Job aus und verwendet den Spark-Vector-Loader, um Daten aus der Datei On_Time_On_Time_Performance_Part1 in Vector zu laden.
Auf meiner m4.4xlarge-Instanz in der Region US East (N. Virginia) dauerte dies etwa 4 Minuten und 23 Sekunden.
Sobald der Ladevorgang abgeschlossen ist, wird eine INFO-Meldung auf der Konsole angezeigt:
INFO VectorRelation: Geladen 43888241 Datensätze in Tabelle ontime
Wiederholen Sie den Vorgang für die anderen 3 Teile:
spark-2.2.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.hadoop:hadoop-aws:2.7.2 --class com.actian.spark_vector.loader.Main /home/actian/work/spark_vector_loader-assembly-2.0.jar load csv -sf "s3a://esdfiles/Vector/actian-ontime/On_Time_Performance_Part2.csv" -vh localhost -vi VW -vd vectordb -tt ontime -sc "," -qc '"'
spark-2.2.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.hadoop:hadoop-aws:2.7.2 --class com.actian.spark_vector.loader.Main /home/actian/work/spark_vector_loader-assembly-2.0.jar load csv -sf "s3a://esdfiles/Vector/actian-ontime/On_Time_Performance_Part3.csv" -vh localhost -vi VW -vd vectordb -tt ontime -sc "," -qc '"'
spark-2.2.0-bin-hadoop2.7/bin/spark-submit --packages org.apache.hadoop:hadoop-aws:2.7.2 --class com.actian.spark_vector.loader.Main /home/actian/work/spark_vector_loader-assembly-2.0.jar load csv -sf "s3a://esdfiles/Vector/actian-ontime/On_Time_Performance_Part4.csv" -vh localhost -vi VW -vd vectordb -tt ontime -sc "," -qc '"'
Schritt 9: Abfragen auf den geladenen Daten ausführen
Lassen Sie uns schnell überprüfen, ob die Daten in die Datenbank geladen wurden.
Verbinden Sie sich mit dem Terminalmonitor:
sql vectordb
Geben Sie in der Sql-Shell ein:
rt
Alle Anfrage werden von nun an aufgezeichnet und angezeigt.
Ermittelt die Anzahl der Zeilen in der Tabelle:
SELECT COUNT(*) from ontimeg
Damit werden etwa 175 Millionen Zeilen angezeigt.
Führen Sie eine weitere Anfrage aus, die nach Jahr den Prozentsatz der Flüge mit mehr als 10 Minuten Verspätung auflistet:
SELECT t.year, c1/c2 FROM (select year,count(*)*1000 as c1 from ontime WHERE DepDelay>10 GROUP BY Year) t JOIN (select year,count(*) as c2 from ontime GROUP BY year) t2 ON (t.year=t2.year);g
Sie sehen dann die Ergebnisse der Anfrage sowie die Zeit, die Vector für die Ausführung der Anfrage benötigt hat. Sie können jetzt auch weitere analytische Beispielabfragen ausführen, die unter https://docs.actian.com/vector/AWS/index.html#page/GetStart%2FMoreSampleQueries.htm%23 aufgeführt sind, und die Anfrage beobachten.
So löschen Sie vectordb
Um die Demo zu löschen, geben Sie in der Linux-Shell den folgenden Befehl ein:
destroydb vectordb
Zusammenfassung
Damit ist die Demo zum schnellen Laden von S3-Daten in Vector mit dem Spark-Vector-Loader abgeschlossen.
Wenn Sie die Daten vor dem Laden in Vector ändern möchten, können Sie die erforderlichen Transformationen in Spark durchführen und die Daten dann mit unserem Spark Vector Konnektor in Vector laden.
Wenn Sie weitere Kommentare oder Fragen haben, besuchen Sie das Actian Vector Forum und stellen Sie Ihre Fragen! Wir werden versuchen, Ihren Beitrag so schnell wie möglich zu beantworten. Die Knowledge Base ist ebenfalls eine großartige Informationsquelle, falls Sie sie benötigen.
Abonnieren Sie den Actian Blog
Abonnieren Sie den Blog von Actian, um direkt Dateneinblicke zu erhalten.
- Bleiben Sie auf dem Laufenden: Holen Sie sich die neuesten Informationen zu Data Analytics direkt in Ihren Posteingang.
- Verpassen Sie keinen Beitrag: Sie erhalten automatische E-Mail-Updates, die Sie informieren, wenn neue Beiträge veröffentlicht werden.
- Ganz wie sie wollen: Ändern Sie Ihre Lieferpräferenzen nach Ihren Bedürfnissen.