PySpark -handledning för nybörjare: Lär dig med EXEMPEL

Innan vi lär oss PySpark, låt oss förstå:

Vad är Apache Spark?

Spark är en stor datalösning som har visat sig vara enklare och snabbare än Hadoop MapReduce. Spark är en öppen källkodsprogramvara som utvecklats av UC Berkeley RAD lab 2009. Sedan den släpptes för allmänheten 2010 har Spark vuxit i popularitet och används genom branschen med en aldrig tidigare skådad skala.

I Big Data -tiden behöver utövare mer än någonsin snabba och pålitliga verktyg för att bearbeta dataströmning. Tidigare verktyg som MapReduce var favorit men var långsamma. För att övervinna detta problem erbjuder Spark en lösning som är både snabb och generell. Den största skillnaden mellan Spark och MapReduce är att Spark kör beräkningar i minnet under senare på hårddisken. Det möjliggör höghastighetsåtkomst och databehandling, vilket minskar tiden från timmar till minuter.

Vad är PySpark?

PySpark är ett verktyg som skapats av Apache Spark Community för att använda Python med Spark. Det gör det möjligt att arbeta med RDD (Resilient Distributed Dataset) i Python. Det erbjuder också PySpark Shell för att länka Python API: er med Spark core för att initiera Spark Context. Spark är namnmotorn för att realisera klusterdata, medan PySpark är Pythons bibliotek för att använda Spark.

I denna PySpark-handledning för nybörjare lär du dig grunderna i PySpark som-

Hur fungerar Spark?

Spark är baserad på beräkningsmotor, vilket innebär att den tar hand om schemaläggning, distribution och övervakning. Varje uppgift utförs på olika arbetsmaskiner som kallas computing cluster. Ett datorkluster avser uppdelningen av uppgifter. En maskin utför en uppgift, medan de andra bidrar till den slutliga utmatningen genom en annan uppgift. I slutändan aggregeras alla uppgifter för att producera en utdata. Spark -administratören ger en 360 -översikt över olika Spark -jobb.

Hur fungerar Spark

Spark är utformad för att arbeta med

  • Pytonorm
  • Java
  • Stege
  • SQL

En viktig egenskap hos Spark är den stora mängden inbyggt bibliotek, inklusive MLlib för maskininlärning . Spark är också utformad för att fungera med Hadoop -kluster och kan läsa den breda typen av filer, inklusive Hive -data, CSV, JSON, Casandra -data bland annat.

Varför använda Spark?

Som framtida datautövare bör du vara bekant med pythons berömda bibliotek: Pandas och scikit-learn. Dessa två bibliotek är fantastiska att utforska dataset upp till medelstorlek. Regelbundna maskininlärningsprojekt bygger på följande metodik:

  • Ladda data till disken
  • Importera data till maskinens minne
  • Bearbeta/analysera data
  • Bygg maskininlärningsmodellen
  • Spara förutsägelsen tillbaka till disken

Problemet uppstår om datavetenskapsmannen vill bearbeta data som är för stora för en dator. Under tidigare datavetenskapliga dagar skulle utövarna prova på eftersom träning i stora datamängder inte alltid behövdes. Datavetenskapsmannen skulle hitta ett bra statistiskt urval, utföra en ytterligare robusthetskontroll och komma med en utmärkt modell.

Det finns dock några problem med detta:

  • Speglar datamängden den verkliga världen?
  • Innehåller uppgifterna ett specifikt exempel?
  • Är modellen lämplig för provtagning?

Ta användarens rekommendation till exempel. Rekommendatorer förlitar sig på att jämföra användare med andra användare för att utvärdera deras preferenser. Om datautövaren bara tar en delmängd av data kommer det inte att finnas en grupp användare som är väldigt lika varandra. Rekommendatorer måste köra på hela datauppsättningen eller inte alls.

Vad är lösningen?

Lösningen har varit uppenbar under en lång tid, dela upp problemet på flera datorer. Parallell databehandling har också flera problem. Utvecklare har ofta problem med att skriva parallellkod och slutar behöva lösa ett gäng av de komplexa frågorna kring själva multibearbetningen.

Pyspark ger datavetenskaparen ett API som kan användas för att lösa problem med parallella dataprocedurer. Pyspark hanterar komplexiteten i multiprocessing, som att distribuera data, distribuera kod och samla in output från arbetarna på ett kluster av maskiner.

Spark kan köras fristående men körs oftast ovanpå ett klusterberäkningsramverk som Hadoop. I test och utveckling kan dock en datavetenskapare effektivt köra Spark på sina utvecklingsboxar eller bärbara datorer utan ett kluster

• En av de främsta fördelarna med Spark är att bygga en arkitektur som omfattar dataströmningshantering, sömlöst datafrågor, maskininlärningsprognoser och tillgång i realtid till olika analyser.

• Spark arbetar nära SQL -språk, dvs. strukturerad data. Det gör det möjligt att söka efter data i realtid.

• Datavetenskaplig huvudsakliga uppgift är att analysera och bygga förutsägbara modeller. Kort sagt, en datavetenskapare behöver veta hur man frågar efter data med hjälp av SQL, skapa en statistisk rapport och använda maskininlärning för att producera förutsägelser. Datavetenskapsmannen lägger en stor del av sin tid på att rengöra, omvandla och analysera data. När datauppsättningen eller dataflödesflödet är klart använder datavetenskaparen olika tekniker för att upptäcka insikter och dolda mönster. Datamanipulationen bör vara robust och samma enkel att använda. Spark är det rätta verktyget tack vare dess snabbhet och rika API: er.

I denna PySpark -handledning lär du dig hur du bygger en klassificerare med PySpark -exempel.

Så här installerar du PySpark med AWS

Jupyter -teamet bygger en Docker -image för att köra Spark effektivt. Nedan följer stegen du kan följa för att installera PySpark -instansen i AWS.

Se vår handledning om AWS och TensorFlow

Steg 1: Skapa en instans

Först och främst måste du skapa en instans. Gå till ditt AWS -konto och starta instansen. Du kan öka lagringsutrymmet upp till 15 g och använda samma säkerhetsgrupp som i TensorFlow -självstudien.

Steg 2: Öppna anslutningen

Öppna anslutningen och installera dockningsbehållaren. Mer information finns i självstudien med TensorFlow med Hamnarbetare . Observera att du måste vara i rätt arbetskatalog.

Kör bara dessa koder för att installera Docker: | _+_ |

Steg 3: Öppna om anslutningen och installera Spark

När du har öppnat anslutningen igen kan du installera bilden som innehåller PySpark. | _+_ |

Steg 4: Öppna Jupyter

Kontrollera behållaren och dess namn | _+_ |

Starta dockaren med dockningsloggar följt av dockarens namn. Till exempel loggar docker zealous_goldwasser

Gå till din webbläsare och starta Jupyter. Adressen är http: // localhost: 8888/. Klistra in lösenordet från terminalen.

Notera : om du vill ladda upp/ladda ner en fil till din AWS -maskin kan du använda programvaran Cyberduck, https://cyberduck.io/ .

Så här installerar du PySpark på Windows/Mac med Conda

Nedan följer en detaljerad process om hur du installerar PySpark på Windows/Mac med Anaconda:

För att installera Spark på din lokala maskin är en rekommenderad metod att skapa en ny kondamiljö. Denna nya miljö kommer att installera Python 3.6, Spark och alla beroenden.

Mac -användare

 sudo yum update -y sudo yum install -y docker sudo service docker start sudo user-mod -a -G docker ec2-user exit 

Windows -användare

 ## Spark docker run -v ~/work:/home/jovyan/work -d -p 8888:8888 jupyter/pyspark-notebook ## Allow preserving Jupyter notebook sudo chown 1000 ~/work ## Install tree to see our working directory next sudo yum install -y tree 

Du kan redigera .yml -filen. Var försiktig med indragningen. Två mellanslag krävs innan - | _+_ |

Spara det och skapa miljön. Det tar lite tid | _+_ |

Mer information om platsen finns i självstudien Installera TensorFlow

Du kan kontrollera all miljö som är installerad i din maskin | _+_ | | _+_ |

Mac -användare

docker ps 

Windows -användare

 cd anaconda3 touch hello-spark.yml vi hello-spark.yml 

Notera: Du har redan skapat en specifik TensorFlow -miljö för att köra självstudierna på TensorFlow. Det är mer bekvämt att skapa en ny miljö som skiljer sig från hej-tf. Det är ingen mening att överbelasta hej-tf med Spark eller andra maskininlärningsbibliotek.

Tänk dig att det mesta av ditt projekt involverar TensorFlow, men du måste använda Spark för ett visst projekt. Du kan ställa in en TensorFlow -miljö för hela ditt projekt och skapa en separat miljö för Spark. Du kan lägga till så många bibliotek i Spark -miljö som du vill utan att störa TensorFlow -miljön. När du är klar med Sparks projekt kan du radera det utan att påverka TensorFlow -miljön.

Jupyter

Öppna Jupyter Notebook och försök om PySpark fungerar. Klistra in följande PySpark -provkod i en ny anteckningsbok: | _+_ |

Om ett fel visas är det troligt att Java inte är installerat på din dator. I mac, öppna terminalen och skriv java -version, om det finns en java -version, se till att den är 1.8. I Windows, gå till Program och kontrollera om det finns en Java -mapp. Om det finns en Java -mapp, kontrollera att Java 1.8 är installerat. När detta skrivs är PySpark inte kompatibelt med Java9 och senare.

Om du behöver installera Java, tänker du länk och ladda ner jdk-8u181-windows-x64.exe

För Mac -användare rekommenderas att använda `brew.` | _+_ |

Se denna steg -för -steg -handledning om hur man installerar Java

Notera : Använd ta bort för att radera en miljö helt. | _+_ |

Spark Context

SparkContext är den interna motorn som möjliggör anslutningar med klustren. Om du vill köra en operation behöver du en SparkContext.

Create a SparkContext

Först och främst måste du starta en SparkContext. | _+_ |

Nu när SparkContext är klart kan du skapa en samling data som kallas RDD, Resilient Distributed Dataset. Beräkning i en RDD parallelliseras automatiskt över klustret. | _+_ |

Du kommer åt den första raden med take | _+_ | | _+_ |

Du kan tillämpa en transformation på data med en lambda -funktion. I PySpark -exemplet nedan returnerar du kvadraten med tal. Det är en kartomvandling | _+_ | | _+_ |

SQLContext

Ett mer bekvämt sätt är att använda DataFrame. SparkContext är redan inställt, du kan använda den för att skapa dataFrame. Du måste också deklarera SQLContext

SQLContext gör det möjligt att ansluta motorn till olika datakällor. Det används för att initiera funktionaliteterna i Spark SQL. | _+_ |

Nu i denna Spark -tutorial Python, låt oss skapa en lista med tuple. Varje tupel innehåller namnet på personerna och deras ålder. Fyra steg krävs:

Steg 1) Skapa listan över tupel med informationen | _+_ |

Steg 2) Skapa en RDD

 cd C:UsersAdminAnaconda3 echo.>hello-spark.yml notepad hello-spark.yml 

Steg 3) Konvertera tuplerna | _+_ |

Steg 4) Skapa ett DataFrame -sammanhang | _+_ |

Om du vill komma åt typen av varje funktion kan du använda printSchema () | _+_ |

Maskininlärningsexempel med PySpark

Nu när du har en kort uppfattning om Spark och SQLContext är du redo att bygga ditt första maskininlärningsprogram.

Följande är stegen för att bygga ett maskininlärningsprogram med PySpark:

  • Steg 1) Grundläggande användning med PySpark
  • Steg 2) Förbehandling av data
  • Steg 3) Bygg en databehandlingsrörledning
  • Steg 4) Bygg klassificeraren: logistisk
  • Steg 5) Träna och utvärdera modellen
  • Steg 6) Ställ in hyperparametern

I denna PySpark Machine Learning -handledning kommer vi att använda datauppsättningen för vuxna. Syftet med denna handledning är att lära sig använda Pyspark. Mer information om datamängden finns i den här självstudien.

Observera att datauppsättningen inte är signifikant och du kanske tror att beräkningen tar lång tid. Spark är utformad för att bearbeta en avsevärd mängd data. Sparks prestanda ökar i förhållande till andra maskininlärningsbibliotek när datamängden som bearbetas blir större.

Steg 1) Grundläggande användning med PySpark

Först och främst måste du initialisera att SQLContext inte redan är igång ännu. | _+_ |

då kan du läsa cvs -filen med sqlContext.read.csv. Du använder inferSchema satt till True för att tala om för Spark att automatiskt gissa vilken typ av data. Som standard är det tur till False. | _+_ |

Låt oss titta på datatypen | _+_ |

Du kan se data med show. | _+_ | | _+_ |

Om du inte ställde inderShema till True är det här som händer med typen. Det finns alla i strängen. | _+_ |

För att konvertera den kontinuerliga variabeln i rätt format kan du använda omarbeta kolumnerna. Du kan använda withColumn för att berätta för Spark vilken kolumn som ska hantera transformationen. | _+_ |

Välj kolumner

Du kan välja och visa raderna med välj och namnen på funktionerna. Nedan väljs ålder och fnlwgt. | _+_ | | _+_ |

Räkna efter grupp

Om du vill räkna antalet förekomster efter grupp kan du kedja:

  • Grupp av()
  • räkna()

tillsammans. I PySpark -exemplet nedan räknar du antalet rader efter utbildningsnivån. | _+_ | | _+_ |

Beskriv uppgifterna

För att få en sammanfattande statistik av data kan du använda beskriv (). Den kommer att beräkna:

  • räkna
  • betyda
  • standardavvikelse
  • min
  • max
 name: hello-spark dependencies: - python=3.6 - jupyter - ipython - numpy - numpy-base - pandas - py4j - pyspark - pytz 
conda env create -f hello-spark.yml 

Om du vill ha sammanfattningsstatistiken för endast en kolumn, lägg till kolumnens namn inuti beskriv () | _+_ | | _+_ |

Korsstabberäkning

Vid något tillfälle kan det vara intressant att se den beskrivande statistiken mellan två parvisa kolumner. Till exempel kan du räkna antalet personer med inkomst under eller över 50 000 efter utbildningsnivå. Denna operation kallas en krysstabell. | _+_ | | _+_ |

Du kan se att inga människor har intäkter över 50 000 när de är unga.

Släpp kolumn

Det finns två intuitiva API för att släppa kolumner:

  • drop (): Släpp en kolumn
  • dropna (): Släpp NA

Nedan släpper du kolumnen utbildning_num | _+_ |

Filtrera data

Du kan använda filter () för att tillämpa beskrivande statistik i en delmängd av data. Du kan till exempel räkna antalet personer över 40 år | _+_ |

13443

Beskrivande statistik per grupp

Slutligen kan du gruppera data efter grupp och beräkna statistiska operationer som medelvärdet. | _+_ | | _+_ |

Steg 2) Förbehandling av data

Databehandling är ett viktigt steg i maskininlärning. När du har tagit bort sopdata får du några viktiga insikter.

Till exempel vet du att ålder inte är en linjär funktion med inkomsten. När människor är unga är deras inkomst vanligtvis lägre än medelåldern. Efter pensionen använder ett hushåll sitt sparande, vilket innebär en minskning av inkomsten. För att fånga det här mönstret kan du lägga till en kvadrat till åldersfunktionen

Lägg till ålderstorget

För att lägga till en ny funktion måste du:

  1. Välj kolumnen
  2. Tillämpa transformationen och lägg till den i DataFrame
conda env list 

Du kan se att age_square har lagts till i dataramen. Du kan ändra variablernas ordning med select. Nedan tar du age_square direkt efter ålder. | _+_ | | _+_ |

Uteslut Holand-Nederländerna

När en grupp inom en funktion bara har en observation, ger den ingen information till modellen. Tvärtom kan det leda till ett fel under korsvalideringen.

Låt oss kontrollera ursprunget för hushållet | _+_ | | _+_ |

Funktionen native_country har bara ett hushåll som kommer från Nederländerna. Du utesluter det. | _+_ |

Steg 3) Bygg en databehandlingsrörledning

I likhet med scikit-learn har Pyspark ett pipeline API.

En pipeline är mycket bekvämt för att upprätthålla datastrukturen. Du skjuter in data i pipelinen. Inuti rörledningen utförs olika operationer, utmatningen används för att mata algoritmen.

Till exempel består en universell transformation i maskininlärning av att konvertera en sträng till en het kodare, dvs en kolumn av en grupp. En het kodare är vanligtvis en matris full av nollor.

Stegen för att transformera data liknar mycket scikit-learn. Du behöver:

  • Indexera strängen till numerisk
  • Skapa en het kodare
  • Transformera data

Två API: er gör jobbet: StringIndexer, OneHotEncoder

  1. Först och främst väljer du den strängkolumn som ska indexeras. InputCol är namnet på kolumnen i datamängden. outputCol är det nya namnet som ges till den transformerade kolumnen.
Activate hello-spark
  1. Passa in data och transformera dem
source activate hello-spark 
  1. Skapa nyhetskolumnerna baserat på gruppen. Om det till exempel finns 10 grupper i funktionen kommer den nya matrisen att ha 10 kolumner, en för varje grupp.
activate hello-spark 
 import pyspark from pyspark import SparkContext sc =SparkContext() 
 brew tap caskroom/versions brew cask install java8 

Bygg upp rörledningen

Du kommer att bygga en pipeline för att konvertera alla de exakta funktionerna och lägga till dem i den slutliga datamängden. Rörledningen kommer att ha fyra operationer, men lägg gärna till så många operationer du vill.

  1. Koda de kategoriska uppgifterna
  2. Indexera etikettfunktionen
  3. Lägg till kontinuerlig variabel
  4. Montera stegen.

Varje steg lagras i en lista som heter steg. Denna lista kommer att berätta för VectorAssembler vilken operation som ska utföras inuti rörledningen.

1. Koda de kategoriska uppgifterna

Det här steget är exakt samma som exemplet ovan, förutom att du går igenom alla kategoriska funktioner. | _+_ |

2. Indexera etikettfunktionen

Spark, som många andra bibliotek, accepterar inte strängvärden för etiketten. Du konverterar etikettfunktionen med StringIndexer och lägger till den i liststegen | _+_ |

3. Lägg till kontinuerlig variabel

InputCols i VectorAssembler är en lista med kolumner. Du kan skapa en ny lista som innehåller alla nya kolumner. Koden nedan visar listan med kodade kategoriska funktioner och de kontinuerliga funktionerna. | _+_ |

4. Montera stegen.

Slutligen klarar du alla steg i VectorAssembler | _+_ |

Nu när alla steg är klara, skjuter du data till pipelinen. | _+_ |

Om du kontrollerar den nya datamängden kan du se att den innehåller alla funktioner, transformerade och inte transformerade. Du är bara intresserad av ny etikett och funktioner. Funktionerna inkluderar alla de transformerade funktionerna och de kontinuerliga variablerna. | _+_ |

Steg 4) Bygg klassificeraren: logistisk

För att göra beräkningen snabbare konverterar du modellen till en DataFrame.

Du måste välja ny etikett och funktioner från modellen med hjälp av karta. | _+_ |

Du är redo att skapa tågdata som en DataFrame. Du använder sqlContext | _+_ |

Kontrollera andra raden | _+_ | | _+_ |

Skapa ett tåg/testuppsättning

Du delar upp datamängden 80/20 med randomSplit. | _+_ |

Låt oss räkna hur många personer med inkomst under/över 50 000 i både utbildning och testuppsättning | _+_ | | _+_ | | _+_ | | _+_ |

Bygg den logistiska regressorn

Sist men inte minst kan du bygga klassificeraren. Pyspark har ett API som heter LogisticRegression för att utföra logistisk regression.

Du initierar lr genom att ange etikettkolumnen och funktionskolumnerna. Du anger högst 10 iterationer och lägger till en regulariseringsparameter med ett värde på 0,3. Observera att i nästa avsnitt kommer du att använda korsvalidering med ett parameternät för att ställa in modellen | _+_ |

#Du kan se koefficienterna från regressionen | _+_ | | _+_ |

Steg 5) Träna och utvärdera modellen

För att generera förutsägelser för din testuppsättning,

Du kan använda linearModel med transform () på test_data | _+_ |

Du kan skriva ut elementen i förutsägelser | _+_ |

Du är intresserad av etiketten, förutsägelsen och sannolikheten | _+_ | | _+_ |

Utvärdera modellen

Du måste titta på noggrannhetsmåttet för att se hur bra (eller dålig) modellen presterar. För närvarande finns det inget API för att beräkna noggrannhetsmåttet i Spark. Standardvärdet är ROC, mottagarens driftskarakteristiska kurva. Det är en annan mätvärde som tar hänsyn till falskt positivt värde.

Innan du tittar på ROC, låt oss konstruera noggrannhetsmåttet. Du är mer bekant med detta mått. Noggrannhetsmåttet är summan av den korrekta förutsägelsen över det totala antalet observationer.

Du skapar en DataFrame med etiketten och `förutsägelsen. | _+_ |

Du kan kontrollera antalet klasser i etiketten och förutsägelsen | _+_ | | _+_ | | _+_ | | _+_ |

Till exempel i testuppsättningen finns det 1578 hushåll med en inkomst över 50k och 5021 under. Klassificeraren förutspådde dock 617 hushåll med inkomst över 50k.

Du kan beräkna noggrannheten genom att beräkna antalet när etiketten är korrekt klassificerad över det totala antalet rader. | _+_ |

0,8237611759357478

Du kan slå ihop allt och skriva en funktion för att beräkna noggrannheten. | _+_ |

ROC -mätvärden

Modulen BinaryClassificationEvaluator innehåller ROC -måtten. Mottagarens driftskarakteristiska kurva är ett annat vanligt verktyg som används med binär klassificering. Det liknar mycket precision/återkallelsekurvan, men i stället för att plotta precision mot återkallelse visar ROC -kurvan den sanna positiva hastigheten (dvs. återkallelse) mot den falskt positiva hastigheten. Den falska positiva hastigheten är förhållandet mellan negativa instanser som felaktigt klassificeras som positiva. Det är lika med en minus den sanna negativa räntan. Den sanna negativa räntan kallas också specificitet. Därför visar ROC -kurvan känslighet (återkallelse) kontra 1 - specificitet | _+_ |

0.8940481662695192areaUnderROC | _+_ |

0,8940481662695192

Steg 6) Ställ in hyperparametern

Sist men inte minst kan du ställa in hyperparametrarna. På samma sätt som scikit lär du dig att skapa ett parameternät och du lägger till de parametrar du vill ställa in.

För att minska beräkningstiden ställer du bara in regleringsparametern med endast två värden. | _+_ |

Slutligen utvärderar du modellen med hjälp av kryssvärderingsmetoden med 5 gånger. Det tar cirka 16 minuter att träna. | _+_ |

Tid att träna modell: 978,807 sekunder

Den bästa regulariseringshyperparametern är 0,01, med en noggrannhet på 85,316 procent. | _+_ |

Du kan extrahera den rekommenderade parametern genom att kedja cvModel.bestModel med extractParamMap () | _+_ | | _+_ |

Sammanfattning

Spark är ett grundläggande verktyg för en datavetare. Det gör det möjligt för utövaren att ansluta en app till olika datakällor, utföra dataanalys sömlöst eller lägga till en förutsägbar modell.

För att börja med Spark måste du initiera ett Spark -sammanhang med:

`SparkContext()``

och och SQL -sammanhang för att ansluta till en datakälla:

`SQLContext ()` `

I självstudien lär du dig hur du tränar en logistisk regression:

  1. Konvertera datasetet till en Dataframe med:
 conda env remove -n hello-spark -y 

Observera att etikettens kolumnnamn är ny etikett och att alla funktioner samlas i funktioner. Ändra dessa värden om de är olika i din datamängd.

  1. Skapa tåg-/testuppsättningen
 import pyspark from pyspark import SparkContext sc =SparkContext() 
  1. Träna modellen
nums= sc.parallelize([1,2,3,4]) 
nums.take(1) 
  1. Gör förutsägelser
[1]