




版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領
文檔簡介
Spark
懶嘀斤
目錄
第1章Spark數據分析導論
1.1Spark是什么
1.2一個大一統的軟件棧
1.2.1SparkCore
1.2.2SparkSQL
123SparkStreaming
1.2.4MLlib
1.2.5GraphX
126集群管理器
1.3Spark的用戶和用途
1.3.1數據科學任務
1.3.2數據處理應用
1.4Spark簡史
1.5Spark的版本和發布
1.6Spark的存儲層次
第2章Spark下載與入門
2.1下載Spark
2.2Spark中Python和Scala的shell
2.3Spark核心概念簡介
2.4獨立應用
2.4.1初始化SparkContext
2.4.2構建獨立應用
2.5總結
第3章RDD編程
3.1RDD基礎
3.2創建RDD
3.3RDD操作
3.3.1轉化操作
3.3.2行動操作
333惰性求值
3.4向Spark傳遞函數
3.4.1Python
3.4.2Scala
3.4.3lava
3.5常見的轉化操作和行動操作
3.5.1基本RDD
3.5.2在不同RDD類型間轉換
3.6持久化f緩存)
3.7總結
第4章鍵值對操作
4.1動機
4.2創建PairRDD
4.3PairRDD的轉化操作
4.3.1聚合操作
4.3.2數據分組
4.3.3連接
4.3.4數據排序
4.4PairRDD的行動操作
4.5數據分區(進階)
451獲取RDD的分區方式
4.5.2從分區中獲益的操作
4.5.3影響分區方式的操作
4.5.4示例:PageRank
4.5.5自定義分區方式
4.6總結
第5章數據讀取與保存
5.1動機
5.2文件格式
5.2.1文本文件
5.2.2JSON
523逗號分隔值與制表符分隔值
524SequenceFile
5.2.5對象文件
526HadooD輸入輸出格式
5.2.7文件壓縮
5.3文件系統
5.3.1本地/“常規"文件系統
5.3.2AmazonS3
5.3.3HDFS
5.4SparkSQL中的結構化數據
5.4.1ApacheHive
5.4.2JSON
5.5數據庫
5.5.1lava數據庫連接
5.5.2Cassandra
5.5.3HBase
5.5.4Elasticsearch
5.6總結
第6章Spark編程進階
6.1簡介
6.2累加器
621累加器與容錯性
6.2.2自定義累加器
6.3廣播變量
廣播的優化
6.4基于分區進行操作
6.5與外部程序間的管道
6.6數值RDD的操作
6.7總結
第7章在集群上運行Spark
7.1簡介
7.2Spark運行時架構
721驅動器節點
722執行器節點
723集群管理器
724啟動一個程序
725小結
7.3使用spark-submit部署應用
7.4打包代碼與依賴
7.4.1使用Maven構建的用Java編寫的Spark應用
7.4.2使用sbt構建的用Scala編寫的Spark應用
7.4.3依賴沖突
7.5Spark應用內與應用間調度
7.6集群管理器
7.6.1獨立集群管理器
7.6.2HadooDYARN
7.6.3ApacheMesos
7.6.4AmazonEC2
7.7選擇合適的集群管理器
7.8總結
第8章Spark調優與調試
8.1使用SparkConf酉)置Spark
8.2Spark執行的組成部分:作業、任務和步驟
8.3查找信息
8.3.1Spark網頁用戶界面
8.3.2驅動器進程和執行器進程的口志
8.4關鍵性能考量
8.4.1并行度
8.4.2序列化格式
8.4.3內存管理
8.4.4硬件供給
8.5總結
第9章SparkSQL
9.1連接SparkSOL
9.2在應用中使用SparkSOL
9.2.1初始化SparkSOL
9.2.2基本查詢示例
9.2.3SchemaRDD
9.2.4緩存
9.3讀取和存儲數據
9.3.1ApacheHive
9.3.2Parquet
9.3.3JSON
9.3.4基于RDD
9.4IDBC/0DBC月艮務器
9.4.1使用Beeline
9.4.2長生命周期的表與查詢
9.5用戶自定義函數
9.5.1SparkSQLUDF
952HiveUDF
9.6SparkSOL性能
性能調優選項
9.7總結
第10章SparkStreaming
10.1一個簡單的例子
10.2架構與抽象
10.3轉化操作
10.3.1無狀態轉化操作
10.3.2有狀態轉化操作
10.4輸出操作
10.5輸入源
10.5.1核心數據源
10.5.2附加數據源
1053多數據源與集群規模
10,624/7不間斷運行
1061檢查點機制
10.6.2驅動器程序容錯
10.6.3工作節點容錯
10.6.4接收器容錯
10.6.5處理保證
10.7Streaming用戶界面
10.8性能考量
10.8.1批次和窗口大小
10.8.2并行度
10.8.3垃圾回收和內存使用
10.9總結
第11章基于MLHb的機器學習
11.1概述
11.2系統要求
11.3機器學習基礎
示例:垃圾郵件分類
11.4數據類型
操作向量
11.5算法
11.5.1特征提取
11.5.2統計
1153分類與回歸
11.5.4聚類
1155協同過濾與推薦
1156降維
1157模型評估
11.6一些提示與性能考量
11.6.1準備特征
1162配置算法
11.6.3緩存RDD以重復使用
11.6.4識別稀疏程度
11.6.5并行度
11.7流水線API
11.8總結
第1章Spark數據分析導論
本章會從宏觀角度介紹Spark到底是什么。如果你已經對Spark和相關組件有一定了解,
你可以選擇直接從第2章開始讀。
1.1Spark是什么
Spark是一個用來實現快速而通用的集群計算的平臺。
在速度方面,Spark擴展了廣泛使用的MapReduce計算模型,而且高效地支持更多計算
模式,包括交互式查詢和流處理。在處理大規模數據集時.,速度是非常重要的。速度快就
意味著我們可以進行交互式的數據操作,否則我們每次操作就需要等待數分鐘甚至數小時。
Spark的一個主要特點就是能夠在內存中進行計算,因而更快。不過即使是必須在磁盤上
進行的復雜計算,Spark依然比MapReduce更加高效。
總的來說,Spark適用于各種各樣原先需要多種不同的分布式平臺的場景,包括批處理、
迭代算法、交互式查詢、流處理。通過在一個統一的框架下支持這些不同的計算,Spark
使我們可以簡單而低耗地把各種處理流程整合在一起。而這樣的組合,在實際的數據分
析過程中是很有意義的。不僅如此,Spark的這種特性還大大減輕了原先需要對各種平臺
分別管理的負擔。
Spark所提供的接口非常豐富。除了提供基于Python、Java、Scala和SQL的簡單易用的
API以及內建的豐富的程序庫以外,Spark還能和其他大數據工具密切配合使用。例如,
Spark可以運行在Hadoop集群上,訪問包括Cassandra在內的任意Hadoop數據源。
1.2一個大一統的軟件棧
Spark項目包含多個緊密集成的組件。Spark的核心是一個對由很多計算任務組成的、運
行在多個工作機器或者是一個計算集群上的應用進行調度、分發以及監控的計算引擎。
由于Spark的核心引擎有著速度快和通用的特點,因此Spark還支持為各種不同應用場景
專門設計的高級組件,比如SQL和機器學習等。這些組件關系密切并且可以相互調用,
這樣你就可以像在平常軟件項目中使用程序庫那樣,組合使用這些的組件。
各組件間密切結合的設計原理有這樣幾個優點。首先,軟件棧中所有的程序庫和高級組件
都可以從下層的改進中獲益。比如,當Spark的核心引擎新引入了一個優化時,SQL和機
器學習程序庫也都能自動獲得性能提升。其次,運行整個軟件棧的代價變小了。不需要運
行5到10套獨立的軟件系統了,一個機構只需要運行一套軟件系統即可。這些代價包括
系統的部署、維護、測試、支持等。這也意味著Spark軟件棧中每增加一個新的組件,使
用Spark的機構都能馬上試用新加入的組件。這就把原先嘗試一種新的數據分析系統所需
要的下載、部署并學習一個新的軟件項目的代價簡化成了只需要升級Spark。
最后,密切結合的原理的一大優點就是,我們能夠構建出無縫整合不同處理模型的應用。
例如,利用Spark,你可以在一個應用中實現將數據流中的數據使用機器學習算法進行實
時分類。與此同時?,數據分析師也可以通過SQL實時查詢結果數據,比如將數據與非結
構化的日志文件進行連接操作。不僅如此,有經驗的數據工程師和數據科學家還可以通過
Pythonshell來訪問這些數據,進行即時分析。其他人也可以通過獨立的批處理應用訪問
這些數據。IT團隊始終只需要維護一套系統即可。
Spark的各個組件如圖1-1所示,下面來依次簡要介紹它們。
圖1-1:Spark軟件棧
1.2.1SparkCore
SparkCore實現了Spark的基本功能,包含任務調度、內存管理、錯誤恢復、與存儲系統
交互等模塊。SparkCore中還包含了對彈性分布式數據集(resilientdistributeddataset,
簡稱RDD)的API定義。RDD表示分布在多個計算節點上可以并行操作的元素集合,是
Spark主要的編程抽象。SparkCore提供了創建和操作這些集合的多個API。
1.2.2SparkSQL
SparkSQL是Spark用來操作結構化數據的程序包。通過SparkSQL,我們可以使用SQL
或者ApacheHive版本的SQL方言(HQL)來查詢數據。SparkSQL支持多種數據源,比
如Hive表、Parquet以及JSON等。除了為Spark提供了一個SQL接口,SparkSQL還支
持開發者將SQL和傳統的RDD編程的數據操作方式相結合,不論是使用Python、Java還
是Scala,開發者都可以在單個的應用中同時使用SQL和復雜的數據分析。通過與Spark
所提供的豐富的計算環境進行如此緊密的結合,SparkSQL得以從其他開源數據倉庫工具
中脫穎而出。SparkSQL是在Spark1.0中被引入的。
在SparkSQL之前,加州大學伯克利分校曾經嘗試修改ApacheHive以使其運行在Spark
上,當時的項目叫作Shark。現在,由于SparkSQL與Spark引擎和API的結合更緊密,
Shark已經被SparkSQL所取代。
1.2.3SparkStreaming
SparkStreaming是Spark提供的對實時數據進行流式計算的組件。比如生產環境中的網
頁服務器日志,或是網絡服務中用戶提交的狀態更新組成的消息隊列,都是數據流。
SparkStreaming提供了用來操作數據流的API,并且與SparkCore中的RDDAPI高度對
應。這樣一來,程序員編寫應用時的學習門檻就得以降低,不論是操作內存或硬盤中的數
據,還是操作實時數據流,程序員都更能應對自如。從底層設計來看,SparkStreaming
支持與SparkCore同級別的容錯性、吞吐量以及可伸縮性。
1.2.4MLlib
Spark中還包含一個提供常見的機器學習(ML)功能的程序庫,叫作MLlib。MLlib提供
了很多種機器學習算法,包括分類、回歸、聚類、協同過濾等,還提供了模型評估、數據
導入等額外的支持功能。MLlib還提供了一些更底層的機器學習原語,包括一個通用的梯
度下降優化算法。所有這些方法都被設計為可以在集群上輕松伸縮的架構。
1.2.5GraphX
GraphX是用來操作圖(比如社交網絡的朋友關系圖)的程序庫,可以進行并行的圖計算。
與SparkStreaming和SparkSQL類似,GraphX也擴展了Spark的RDDAPI,能用來創建
一個頂點和邊都包含任意屬性的有向圖。GraphX還支持針對圖的各種操作(比如進行圖
分害!I的subgraph和操作所有頂點的mapVertices),以及一些常用圖算法(比如
PageRank和三角計數)。
1.2.6集群管理器
就底層而言,Spark設計為可以高效地在一個計算節點到數千個計算節點之間伸縮計算。
為了實現這樣的要求,同時獲得最大靈活性,Spark支持在各種集群管理器(cluster
manager)上運行,包括HadoopYARN、ApacheMesos,以及Spark自帶的一個簡易調
度器,叫作獨立調度器。如果要在沒有預裝任何集群管理器的機器上安裝Spark,那么
Spark自帶的獨立調度器可以讓你輕松入門;而如果已經有了一個裝有HadoopYARN或
Mesos的集群,通過Spark對這些集群管理器的支持,你的應用也同樣能運行在這些集群
上。第7章會詳細探討這些不同的選項以及如何選擇合適的集群管理器。
1.3Spark的用戶和用途
Spark是一個用于集群計算的通用計算框架,因此被用于各種各樣的應用程序。在前言中
我們提到了本書的兩大目標讀者人群:數據科學家和工程師。仔細分析這兩個群體以及他
們使用Spark的方式,我們不難發現這兩個群體使用Spark的典型用例并不一致,不過我
們可以把這些用例大致分為兩類一一數據科學應用和數據處理應用。
當然,這種領域和使用模式的劃分是比較模糊的。很多人也兼有數據科學家和工程師的能
力,有的時候扮演數據科學家的角色進行研究,然后搖身一變成為工程師,熟練地編寫復
雜的數據處理程序。不管怎樣,分開看這兩大群體和相應的用例是很有意義的。
1.3.1數據科學任務
數據科學是過去兒年里出現的新學科,關注的是數據分析領域。盡管沒有標準的定義,但
我們認為數據科學家(datascientist)就是主要負責分析數據并建模的人。數據科學家有
可能具備SQL、統計、預測建模(機器學習)等方面的經驗,以及一定的使用Python、
Matlab或R語言進行編程的能力。將數據轉換為更方便分析和觀察的格式,通常被稱為
數據轉換(datawrangling),數據科學家也對這一過程中的必要技術有所了解。
數據科學家使用他們的技能來分析數據,以回答問題或發現一些潛在規律。他們的工作流
經常會用到即時分析,所以他們可以使用交互式shell替代復雜應用的構建,這樣可以在
最短時間內得到查詢語句和一些簡單代碼的運行結果。Spark的速度以及簡單的API都能
在這種場景里大放光彩,而Spark內建的程序庫的支持也使得很多算法能夠即刻使用。
Spark通過一系列組件支持各種數據科學任務。Sparkshell通過提供Python和Scala的接
口,使我們方便地進行交互式數據分析。SparkSQL也提供一個獨立的SQLshell,我們可
以在這個shell中使用SQL探索數據,也可以通過標準的Spark程序或者Sparkshell來進
行SQL查詢。機器學習和數據分析則通過MLlib程序庫提供支持。另外,Spark還能支持
調用R或者Matlab寫成的外部程序。數據科學家在使用R或Pandas等傳統數據分析工
具時所能處理的數據集受限于單機,而有了Spark,就能處理更大數據規模的問題。
在初始的探索階段之后,數據科學家的工作需要被應用到實際中。具體問題包括擴展應用
的功能、提高應用的穩定性,并針對生產環境進行配置,使之成為業務應用的一部分。例
如,在數據科學家完成初始的調研之后,我們可能最終會得到一個生產環境中的推薦系統,
可以整合在網頁應用中,為用戶提供產品推薦。一般來說,將數據科學家的工作轉化為實
際生產中的應用的工作是由另外的工程師或者工程師團隊完成的,而不是那些數據科學家。
1.3.2數據處理應用
Spark的另一個主要用例是針對工程師的。在這里,我們把工程師定義為使用Spark開發
生產環境中的數據處理應用的軟件開發者。這些開發者一般有基本的軟件工程概念,比如
封裝、接口設計以及面向對象的編程思想,他們通常有計算機專業的背景,并且能使用工
程技術來設計和搭建軟件系統,以實現業務用例。
對工程師來說,Spark為開發用于集群并行執行的程序提供了一條捷徑。通過封裝,
Spark不需要開發者關注如何在分布式系統上編程這樣的復雜問題,也無需過多關注網絡
通信和程序容錯性。Spark已經為工程師提供了足夠的接口來快速實現常見的任務,以及
對應用進行監視、審查和性能調優。其API模塊化的特性(基于傳遞分布式的對象集)使
得利用程序庫進行開發以及本地測試大大簡化。
Spark用戶之所以選擇Spark來開發他們的數據處理應用,正是因為Spark提供了豐富的
功能,容易學習和使用,并且成熟穩定。
1.4Spark簡史
Spark是由一個強大而活躍的開源社區開發和維護的,社區中的開發者們來自許許多多不
同的機構。如果你或者你所在的機構是第一次嘗試使用Spark,也許你會對Spark這個項
目的歷史感興趣。Spark是于2009年作為一個研究項目在加州大學伯克利分校RAD實驗
室(AMPLab的前身)誕生。實驗室中的一些研究人員曾經用過HadoopMapReduce。他
們發現MapReduce在迭代計算和交互計算的任務上表現得效率低下。因此,Spark從一
開始就是為交互式查詢和迭代算法設計的,同時還支持內存式存儲和高效的容錯機制。
2009年,關于Spark的研究論文在學術會議上發表,同年Spark項目正式誕生。其后不
久,相比于MapReduce,Spark在某些任務上已經獲得了10?20倍的性能提升。
Spark最早的一部分用戶來自加州伯克利分校的其他研究小組,其中比較著名的有Mobile
Millenniumo作為機器學習領域的研究項目,他們利用Spark來監控并預測舊金山灣區的
交通擁堵情況。僅僅過了短短的一段時間,許多外部機構也開始使用Spark。如今,有超
過50個機構將自己添加到了使用Spark的機構列表頁面
(https:〃./confluence./display/SPARK/Powered+By+Spark)。在Spark
社區如火如荼的社區活動SparkMeetups(http:〃/spark-users/)和
Spark峰會(http:〃/)中,許多機構也向大家積極分享他們特有的
Spark應用場景。除了加州大學伯克利分校,對Spark作出貢獻的主要機構還有
Databricks、雅虎以及英特爾。
2011年,AMPLab開始基于Spark開發更高層的組件,比如Shark(Spark上的Hive)1
和SparkStreamingo這些組件和其他一些組件一起被稱為伯克利數據分析工具棧(BDAS,
/software/)。
iShark已經被SparkSQL所取代。
Spark最早在2010年3月開源,并且在2013年6月交給了Apache基金會,現在已經成
TApache開源基金會的頂級項目。
1.5Spark的版本和發布
自其出現以來,Spark就一直是一個非常活躍的項目,Spark社區也一直保持著非常繁榮
的態勢。隨著版本號的不斷更迭,Spark的貢獻者也與日俱增。Spark1.0吸引了100多
個開源程序員參與開發。盡管項目活躍度在飛速地提升,Spark社區依然保持著常規的發
布新版本的節奏。2014年5月,Spark1.0正式發布,而本書則主要關注Spark1.1.0以及
后續的版本。不過,大多數概念在老版本的Spark中依然適用,而大多數示例也能運行在
老版本的Spark上。
1.6Spark的存儲層次
Spark不僅可以將任何Hadoop分布式文件系統(HDFS)上的文件讀取為分布式數據集,
也可以支持其他支持Hadoop接口的系統,比如本地文件、亞馬遜S3、Cassandra、Hive^
HBase等。我們需要弄清楚M是,Hadoop并非Spark的必要條件,Spark支持任何實現
了Hadoop接口的存儲系統。Spark支持的Hadoop輸入格式包括文本文件、SequenceFile,
Avro、Parquet等。我們會在第5章討論讀取和存儲時詳細介紹如何與這些數據源進行交
互。
第2章Spark下載與入門
在本章中,我們會下載Spark并在本地模式下單機運行它。本章是寫給Spark的所有初學
者的,對數據科學家和工程師來說都值得一讀。
Spark可以通過Python、Java或Scala來使用1。要用好本書不需要高超的編程技巧,但
是確實需要對其中某種語言的語法有基本的了解。我們會盡可能在示例中給出全部三種語
言的代碼。
iSpark1.4.0起添加J'R語言支持。
Spark本身是用Scala寫的,運行在Java虛擬機(JVM)上。要在你的電腦或集群上運行
Spark,你要做的準備工作只是安裝Java6或者更新的版本。如果你希望使用Python接口,
你還需要一個Python解釋器(2.6以上版本)。Spark尚不支持Python32。
zSpark1.4.0起支持Python3。譯者注
2.1下載Spark
使用Spark的第一步是下載和解壓縮。我們先從下載預編譯版本的Spark開始。訪問
/downloads.html,選擇包類型為"Pre-builtforHadoop2.4and
later"(為Hadoop2.4及更新版本預編譯的版本),然后選擇"DirectDownload"直接下載。
這樣我們就可以得到一個壓縮的TAR文件,文件名為spark-1.2.0-bin-hadoop2.4.tgz.
2Windows用戶如果把Spark安裝到帶有空格的路徑下,可能會遇到一些問題。所
以我們需要把Spark安裝到不帶空格的路徑下,比如C:\spark這樣的目錄中。
你不需要安裝Hadoop,不過如果你已經有了一個Hadoop集群或安裝好的HDFS,請下載
對應版本的Sparko你可以在http:〃/downloads.html里選擇所需要的包
類型,這會導致下載得到的文件名略有不同。也可以選擇從源代碼直接編譯。你可以從
GitHub上下載最新代碼,也可以在下載頁面上選擇包類型為“SourceCode"(源代碼)進
行下載。
大多數類Unix系統,包括OSX和Linux,都有一個叫tar的命令行工具,可以用
來解壓TAR文件。如果你的操作系統沒有安裝tar,可以嘗試搜索網絡獲取免費的TAR
解壓縮工具。比如,如果你使用的是Windows,可以試一下7-Zip.
下載好了Spark之后,我們要進行解壓縮,然后看一看默認的Spark發行版中都有些什么。
打開終端,將工作路徑轉到下載的Spark壓縮包所在的目錄,然后解開壓縮包。這樣會創
建出一個和壓縮包同名但是沒了.tgz后綴的新文件夾。接下來我們就把工作路徑轉到這個
新目錄下看看里面都有些什么。上面這些步驟可以用如下命令完成:
cd?
tar-xfspark-1.2.0-bin-hadoop2.4.tgz
cdspark-1.2.0-bin-hadoop2.4
Is
在tar命令所在的那一行中,x標記指定tar命令執行解壓縮操作,f標記則指定壓縮
包的文件名。1s命令列出了Spark目錄中的內容。我們先來粗略地看一看Spark目錄中
的一些比較重要的文件及目錄的名字和作用。
?README.md
包含用來入門Spark的簡單的使用說明。
?bin
包含可以用來和Spark進行各種方式的交互的一系列可執行文件,比如本章稍后會講到的
Sparkshello
?core>streamingspython
?包含Spark項目主要組件的源代碼。
?examples
包含一些可以查看和運行的Spark程序,對學習Spark的API非常有幫助。
不要被Spark項目數量龐大的文件和復雜的目錄結構嚇倒,我們會在本書接下來的部分中
講解它們中的很大一部分。就目前來說,我們還是按部就班,先來試試Spark的Python
和Scala版本的shell。讓我們從運行一些Spark自帶的示例代碼開始,然后再編寫、編譯
并運行一個我們自己簡易的Spark程序。
本章我們所做的一切,Spark都是在本地模式下運行,也就是非分布式模式,這樣我們只
需要用到一臺機器。Spark可以運行在許多種模式下,除了本地模式,還支持運行在
Mesos或YARN上,也可以運行在Spark發行版自帶的獨立調度器上。我們會在第7章詳
細講述各種部署模式。
2.2Spark中Python和Scala的shell
Spark帶有交互式的shell,可以作即時數據分析。如果你使用過類似R、Python、Scala
而提供的shell,或操作系統的shell(例如Bash或者Windows中的命令提示符),你也
會對Sparkshell感到很熟悉。然而和其他shell工具不一樣的是,在其他shell工具中你
只能使用單機的硬盤和內存來操作數據,而Sparkshell可用來與分布式存儲在許多機器
的內存或者硬盤上的數據進行交互,并且處理過程的分發由Spark自動控制完成。
由于Spark能夠在工作節點上把數據讀取到內存中,所以許多分布式計算都可以在幾秒鐘
之內完成,哪怕是那種在十幾個節點上處理TB級別的數據的計算。這就使得一般需要在
shell中完成的那些交互式的即時探索性分析變得非常適合Spark。Spark提供Python以
及Scala的增強版shell,支持與集群的連接。
&
本書中大多數示例代碼都包含Spark支持的所有語言版本,但是交互式shell部分
只提供了Python和Scala版本的示例。shell對于學習API是非常有幫助的,因此我們建
議讀者在Python和Scala版本的例子中選擇一種進行嘗試,即便你是Java開發者也是如
此,畢竟各種語言的API是相似的。
展示Sparkshell的強大之處最簡單的方法就是使用某個語言的shell作一些簡單的數據分
析。我們一起按照Spark官方文檔中的快速入門指南
(http:〃/docs/latest/quick-start.html)中的示例來做一遍。
第一步是打開Sparkshell。要打開Python版本的Sparkshell,也就是我們所說的
PySparkShell,進入你的Spark目錄然后輸入:
bin/pyspark
(在Windows中則運行bin'pyspark。)如果要打開Scala版本的shell,輸入:
bin/spark-shell
稍等數秒,shell提示符就會出現。Shell啟動時:你會看到許多日志信息輸出。有的時候,
由于提示符之后又輸出了日志,我們需要按一下回車鍵,來得到一個清楚的shell提示符。
圖2-1是PySparkshell啟動時的樣子。
hol<Jen9hnt>p2:-/Oowntoads/spark'1.1.0-bln-hadoopl$./bln/pyspark
Python2.7.6(default.Mar222014,22:59:56)
(GCC4.8.21onUnux2
Type"htlp".?copyright",or*UcenM*forMoreinforMtlon.
SparkastMblyhasb??nbuiltwithHive,includingDatanucleusjarsonclasspath
UsingSpark'sdefaultlog4jprofile:org/8pacbe/$p?rk八。g4j?dperties
14/11/1914:33:49WARMUtils:Yourhostname,hnbp2rvMlvestoaloopbackacWrcss:127.0.1.1;usinqinstead(oninterfacedockcrO)
14/11/1914:33:49WARNSetSPARKLOCALIPifyouneedtobindtoanotheraddress
14/11/1914:33:49INFOS?curityM?nag?r:Changingviewactsto:hold?n,
14/11/1914:33:49IMFOSecurItyManager:Ch4mgingnodifyaclsto:holden.
14/11/1914:33:49INFOSecurityMtnagvr:S?<urltyMtnag?r:?uthtntic?tlondisabled:ulactsdisabled;userswithviewp?rmlssions:S?t(hold?n,|
;userswithModifyperaissions:Set(holtfefi.)
14/11/1914:33:49IWFOSlf4jLo40?r:SIf4jLoggerfUrttd
14/11/1914:33:49INFORenotin^:Startingremotln9
14/11/1914:33:49INFOReooting:R??otlngstarted;Uitcnlngonaddresses:(akM.tep://sp?rkDrivcr^l72.17.42.1:35821j
14/11/1914:)3:49INFORewoting:Resot1ngnowlistensonaddresses:(akka.tcp://spark0rlver9172.17.42.1:3S621)
14/11/1914:33:49INFOUtlU:Successfullystartedservice?parkDriv?r,onport35021.
14/11/1914:33:49IMFOSparkEnv:RegisteringMapOutputTracker
14/11/1914:33:49INFOSparkEnv:RegisteringBlockMan&gerMaster
14/11/1914:33:49INFODlskBlockManager:Createdlocaldirectoryat/tiap/$p?rk-local-28141119143349-5776
14/11/1914:33:49INFOUtils:Successfullystartedservice'ConnectionnantK)?rforblockmanager'onport5721B.
14/11/1914:33:49INFOConnectlonM?n?q?r:Boundsockettoport57218withid?ConnectionM?fl49?rid(.57218)
14/11/1914:33:49INFOMeaoryStore:MeaoryStorestartedwithcapacity26s.4MB
14/11/1914:33:49INFOBlockM4n?9*rMast?r:TryingtoremitterBlockM?nA9?r
14/11/1914:33:49INFOBlockMana^erMasterActor:Registeringblockmanager:57218with265.4HBRAM
14/11/1914:33:49INFOBlockftana^erRaser:RegisteredBlockMarumer
14/11/1914:33:49INFOHttpFlleServer:HTTPFileserverdirectoryIs/tBp/sp?rk-399cS3M-ei>e8-4043-9a7d-9345e97eS7M
14/11/1914:33:49IMFORttpServer:StartingHTTPServer
14/11/1914:33:49INFOUtils:SuccessfullystartedserviceHTTPfileserver'onport4988.
14/11/1914:33:49INFOUtils:SuccessfullyttarttOs?rvlc?Sp?rkUI'onport4046
14/11/1914:33:49INFOSparkUI:St?rtedSparkUIathttp://172.17.42.1:4648
14/11/1914:33:49INFOAkkiUtllt:ConnectingtoMeartbeatRecciver:akka.tep://sparkOriveryi72.17.42.1:35e21/us?r/Heartt>eatR*c?lv?r
Welcometo
____________fl_
\\/\//_J'/
/_/..JI/_A_\version1.1.0
lusinqPythonversion2.7.6(default.Har22261422:59:56)
ISparkContextavailableassc.
圖2-1:默認日志選項下的PySparkshell
如果覺得shell中輸出的日志信息過多而使人分心,可以調整日志的級別來控制輸出的信
息量。你需要在conf目錄下創建一個名為】perties的文件來管理日志設置。
Spark開發者們已經在Spark中加入了一個日志設置文件的模版,叫作
perties.templateo要讓日志看起來不那么啰嗦,可以先把這個日志設置模版文件
復制一份到conf/perties來作為日志設置文件,接下來找到下面這一行:
log4j.rootCategory=INFOzconsole
然后通過下面的設定降低日志級別,只顯示警告及更嚴重的信息:
log4j.rootCategory=WARN,console
這時再打開shell,你就會看到輸出大大減少(圖2-2)。
-/DownkMrfVH**>>■h?ldfn?>Mnbp2:-/Daw?laM>iA(Mrt1-fwdooNMbeiMn<>h>nbp2.-;<?p<H/!?JOa>OOOOSn
holden@hnbp2:-/Downloads/spark-1.1.6-bin-hadooplS./bln/pyspark
Python2.7.6(default,Mar222814.22:59:56)
[GCC4.8.2]onllnuxZ
Type"help",*copyright","credits-or'license"formoreInfonnatlon.
SparkassemblyhasbeenbuiltwithHive,includingDatanucleusjarsonclasspath
14/11/1914:38:63WARNUtils:Yourhostname.hmbp2resolvestoaloopbackaddress:;usingInstead(onInterfacedockerO)
14/11/1914:38:03WARNUtils:SetSPARKLOCALIPifyouneedtobindtoanotheraddress
Welcometo
/T7_______n_
\\/_\/_,/_/?/
//./\.//IJ\\version1.1.6
IJ
UsingPythonversion2.7.6(default.Mar22201422:59:56)
SparkContextavailableassc.
9
圖2-2:降低日志級別后的PySparkshell
使用IPython
IPython是一個受許多Python使用者喜愛的增強版Pythonshell,能夠提供自動補全等好
用的功能。你可以在http:〃上找到安裝說明。只要把環境變量IPYTHON的值
設為1,你就可以使用IPython了:
IPYTHON=1./bin/pyspark
要使用IPythonNotebook,也就是Web版的IPython,可以運行:
IPYTHON_OPTS=Hnotebook"./bin/pyspark
在Windows上,像下面這樣設置環境變量并運行命令行:
setIPYTHON=1
bin\pyspark
在Spark中,我們通過對分布式數據集的操作來表達我們的計算意圖,這些計算會自動地
在集群上并行進行。這樣的數據集被稱為彈性分布式數據集(resilientdistributed
dataset),簡稱RDD。RDD是Spark對分布式數據和計算的基本抽象。
在我們更詳細地討論RDD之前,先來使用shell從本地文本文件創建一個RDD來作一些
簡單的即時統計。例2-1是Python版的例子,例2-2是Scala版的。
例2-1:Python行數統計
?>lines=sc.textFile(**README.mdH)#創建一個名為工ines的RDD
?>lines.count()#統計RDD中的元素個數
127
?>lines,first()#這個RDD中的第一個元素,也就是README.md的第一行
u,#ApacheSpark,
例2-2:Scala行數統計
scala>vallines=sc.textFile(**README.md")//創建一個名為lines的RDD
lines:spark.RDD[String]=MappedRDD[...]
scala>lines.count()//統計RDD中的元素個數
resO:Long=127
scala>lines.first()//這個RDD中的第一個元素,也就是README.md的第一行
resl:String=#ApacheSpark
要退出任一shell,按Ctrl-Do
fl八一
你可能在日志的輸出中注意到了這樣一行信息:INFOSparkUI:Started
SparkUIathttp://[ipaddress]:4040o你可以由這個地址訪問Spark用戶界面,
查看關于任務和集群的各種信息。我們會在第7章中詳細討論。
在例2-1和例2-2中,變量lines是一個RDD,是從你電腦上的一個本地的文本文件創
建出來的。我們可以在這個RDD上運行各種并行操作,比如統計這個數據集中的元素個
數在這里就是文本的行數),或者是輸出第一個元素。我們會在后續章節中深入探討
RDDo在此之前,讓我們先花些時間來了解Spark的基本概念。
2.3Spark核心概念簡介
現在你已經用shell運行了你的第一段Spark程序,是時候對Spark編程作更細致的了解
了。
從上層來看,每個Spark應用都由一個驅動器程序(driverprogram)來發起集群上的各
種并行操作。驅動器程序包含應用的main函數,并且定義了集群上的分布式數據集,還
對這些分布式數據集應用了相關操作。在前面的例子里,實際的驅動器程序就是Spark
shell本身,你只需要輸入想要運行的操作就可以了。
驅動器程序通過一個SparkContext對象來訪問Sparko這個對象代表對計算集群的一
個連接。shell啟動時已經自動創建了一個SparkContext對象,是一個叫作sc的變量。
我們可以通過例2-3中的方法嘗試輸出sc來查看它的類型。
例2-3:查看變量sc
?>SC
<pyspark.context.SparkContextobjectat0xl025b8f90>
一旦有了SparkContext,你就可以用它來創建RDD。在例2-1和例2-2中,我們調用了
sc.textFileO來創建一個代表文件中各行文本的RDD。我們可以在這些行上進行各
種操作,比如count()。
要執行這些操作,驅動器程序一般要管理多個執行器(executor)節點。比如,如果我們
在集群上運行count。操作,那么不同的節點會統計文件的不同部分的行數。由于我們
剛才是在本地模式下運行Sparkshell,因此所有的工作會在單個節點上執行,但你可以將
這個shell連接到集群上來進行并行的數據分析。圖2-3展示了Spark如何在一個集群上
運行。
圖2-3:Spark分布式執行涉及的組件
最后,我們有很多用來傳遞函數的API,可以將對應操作運行在集群上。比如,可以擴展
我們的README示例,篩選出文件中包含某個特定單詞的行。以"Python”這個單詞為例,
具體代碼如例2-4(Python版本)和例2-5(Scala版本)所示。
例2-4:Python版本篩選的例子
?>lines=sc.textFile("README.md*,)
?>pythonLines=lines.filter(lambdaline:"Python”inline)
?>pythonLines.first()
u*##InteractivePythonShell,
例2-5:Scala版本篩選的例子
scala>vallines=sc.textFile("README.mdH)//創建一個叫lines的RDD
lines:spark.RDD[String]=MappedRDD[...]
scala>valpythonLines=lines.filter(line=>line.contains("Python"))
pythonLines:spark.RDD[String]=FilteredRDD[...]
scala>pythonLines.first()
resO:String=##InteractivePythonShell
向Spark傳遞函數
如果你對例2-4和例2-5中的lambda或者=>語法不熟悉,可以把它們理解為Python和
Scala中定義內聯函數的簡寫方法。當你在這些語言中使用Spark時,你也可以單獨定義
一個函數,然后把函數名傳給Spark。比如,在Python中可以這樣做:
defhasPython(line):
return"Python"inline
pythonLines=lines.filter(hasPython)
在Java中向Spark傳遞函數也是可行的,但是在這種情況下,我們必須把函數定義為實
現了Function接口的類。例如:
JavaRDD<String>pythonLines=lines.filter(
newFunction<String,Boolean>(){
Booleancall(Stringline){returnline.contains("Python");}
)
Java8提供了類似Python和Scala的lambda簡寫語法。下面就是一個使用這種語法的代
碼的例子:
JavaRDD<String>pythonLines=lines.filter(line->line.contains("Python"));
我們會在3.4節更深入地討論如何向Spark傳遞函數。
盡管后面會更詳細地講述SparkAPI,我們還是不得不感嘆,其實SparkAPI最神奇的地
方就在于像filter這樣基于函數的操作也會在集群上并行執行。也就是說,Spark會自
動將函數(比如line.contains("Python"))發到各個執行器節點上。這樣,你就
可以在單一的驅動器程序中編程,并且讓代碼自動運行在多個節點上。第3章會詳細講述
RDDAPI?
2.4獨立應用
我們的Spark概覽中的最后一部分就是如何在獨立程序中使用Sparko除了交互式運行之
外,Spark也可以在Java、Scala或Python的獨立程序中被連接使用。這與在shell中使
用的主要區別在于你需要自行初始化SparkContext。接下來,使用的A
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
- 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
- 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 企業TVC管理制度
- 公司評標費管理制度
- 電火鍋電蒸鍋市場前景與未來發展趨勢分析
- 2025至2030年中國網絡防病毒軟件市場分析及競爭策略研究報告
- 2025至2030年中國縫紉機工字型機架市場現狀分析及前景預測報告
- 寬電壓范圍及其全功率段雙有源橋變換器的性能提升策略研究
- 不同規則水平的頓悟記憶優勢效應及電生理機制研究
- 2025至2030年中國紅白櫸木防火門數據監測研究報告
- 2025至2030年中國紅葉李樹種苗數據監測研究報告
- 2025至2030年中國緊密賽絡紡紗數據監測研究報告
- 壓力與情緒管理(最全免費版)課件
- 詳解2021年《關于優化生育政策促進人口長期均衡發展的決定》ppt
- 游泳池經營方案
- 渠道醫美合伙人招募計劃
- 空調機房吸音墻頂面綜合施工專題方案
- 紅樓夢專題元妃省親39課件
- 輔導員工作手冊
- 半導體物理課件:第二章半導體中雜質和缺陷能級
- 特種設備事故應急演練方案(附總結)
- ISO測量管理體系內審員培訓資料
- 電子測量技術第5章 數字測量方法
評論
0/150
提交評論