歡迎來到裝配圖網(wǎng)! | 幫助中心 裝配圖網(wǎng)zhuangpeitu.com!
裝配圖網(wǎng)
ImageVerifierCode 換一換
首頁(yè) 裝配圖網(wǎng) > 資源分類 > DOC文檔下載  

hadoop[1].源碼閱讀總結(jié)

  • 資源ID:39576460       資源大?。?span id="afgseqg" class="font-tahoma">537.52KB        全文頁(yè)數(shù):11頁(yè)
  • 資源格式: DOC        下載積分:0積分
快捷下載 游客一鍵下載
會(huì)員登錄下載
微信登錄下載
三方登錄下載: 微信開放平臺(tái)登錄 支付寶登錄   QQ登錄   微博登錄  
二維碼
微信掃一掃登錄
下載資源需要0積分
郵箱/手機(jī):
溫馨提示:
用戶名和密碼都是您填寫的郵箱或者手機(jī)號(hào),方便查詢和重復(fù)下載(系統(tǒng)自動(dòng)生成)
支付說明:
本站最低充值0.01積分,下載本資源后余額將會(huì)存入您的賬戶,您可在我的個(gè)人中心查看。
驗(yàn)證碼:   換一換

 
賬號(hào):
密碼:
驗(yàn)證碼:   換一換
  忘記密碼?
    
友情提示
2、PDF文件下載后,可能會(huì)被瀏覽器默認(rèn)打開,此種情況可以點(diǎn)擊瀏覽器菜單,保存網(wǎng)頁(yè)到桌面,就可以正常下載了。
3、本站不支持迅雷下載,請(qǐng)使用電腦自帶的IE瀏覽器,或者360瀏覽器、谷歌瀏覽器下載即可。
4、本站資源下載后的文檔和圖紙-無水印,預(yù)覽文檔經(jīng)過壓縮,下載后原文更清晰。
5、試題試卷類文檔,如果標(biāo)題沒有明確說明有答案則都視為沒有答案,請(qǐng)知曉。

hadoop[1].源碼閱讀總結(jié)

CommonIPC/RPC Server大概流程基于NIO,Listener關(guān)注OP_ACCEPT事件,當(dāng)有客戶端連接過來,Accept后,從readers中選取一個(gè)Reader將客戶端Channel注冊(cè)到Reader中的NIO selector,并新建一個(gè)Connection對(duì)象關(guān)聯(lián)客戶端Channel,Reader關(guān)注OP_READ事件.客戶端建立連接后,首先發(fā)送的是ConnnectionHeader包含協(xié)議名,用戶組信息,驗(yàn)證方法,Connection會(huì)根據(jù)以上信息進(jìn)行校驗(yàn).之后將是先讀取4位的長(zhǎng)度代表這次請(qǐng)求的數(shù)據(jù)的長(zhǎng)度,然后一直等待事件觸發(fā)讀取夠長(zhǎng)度,將讀取的數(shù)據(jù) 解碼為調(diào)用id和param,新建一個(gè)Call對(duì)象(關(guān)聯(lián)Connection)放入call隊(duì)列中,handlers中的Handler會(huì)將Call中callQuene中取走,Call中的param實(shí)際為Invocation對(duì)象,包含調(diào)用方法名,參數(shù)名,參數(shù)類型,由這些信息使用Java反射API調(diào)用Server的instance對(duì)象,獲取返回值,組織返回?cái)?shù)據(jù),寫入Call的response屬性中,馬上調(diào)用responder的doRespond方法,將Call加入到Connection的responseQuene中,如果responseQuene長(zhǎng)度等于1做一次NIO寫操作,如果不能一次性能夠?qū)?shù)據(jù)寫完,將客戶端channel注冊(cè)到responder關(guān)注寫事件OP_WRITE,下一次繼續(xù)寫,如果長(zhǎng)度不為1證明該channel已經(jīng)注冊(cè)到了responder了直接加入隊(duì)列,由responder線程后續(xù)處理.NOTE:客戶端關(guān)閉流后出發(fā)一次讀操作,返回為-1,Server關(guān)閉連接CurCall與獲取客戶端IPHandler獲取一個(gè)Call后,會(huì)將Server的curCall(ThreadLocal類型)設(shè)置為當(dāng)前的Call,調(diào)用Instance方法實(shí)際是在Handler線程中,在Instance的方法內(nèi)就可以使用Server提供的方法來獲取客戶端IPPing在上述過程中如果讀取的4位長(zhǎng)度為-1代表是客戶端PING操作清理空閑連接如果一定時(shí)間ipc.client.connection.maxidletime沒有讀取到數(shù)據(jù)并且當(dāng)前連接現(xiàn)有Call數(shù)目為0,則視為空閑連接,Listener會(huì)在每次接受完新連接之后進(jìn)行一次清理,最多清理ipc.client.kill.max個(gè)連接,如果出現(xiàn)OutOfMemoryError則強(qiáng)制清理全部空閑連接DataNode,TaskTractor設(shè)置的心跳時(shí)間需小于空閑時(shí)間.清理長(zhǎng)時(shí)間未發(fā)送到客戶端的響應(yīng)注冊(cè)到responder的Call如果長(zhǎng)時(shí)間沒有發(fā)送到客戶端,每隔一段時(shí)間會(huì)清理掉涉及參數(shù)ipc.server.handler.queue.sizecallQuene隊(duì)列大小,隨集群增大而增大,ipc.server.max.response.size如果返回的結(jié)果序列化后大小大于這個(gè)值,重置緩沖區(qū)ByteArrayOutputStream釋放內(nèi)存.ipc.server.read.threadpool.sizereaders個(gè)數(shù)ipc.server.tcpnodelayTCP優(yōu)化參數(shù)Nagle's algorithmhandlerCounthandler個(gè)數(shù),由構(gòu)造參數(shù)指定,在DN,TT中配置socketSendBufferSizesocket設(shè)置ipc.server.listen.queue.sizesocket設(shè)置socket.bind(address, backlog)ipc.client.idlethreshold總連接數(shù)超過多少后,開始清理空閑連接ipc.client.kill.max一次最多清理多少個(gè)空閑連接IPC/RPC ClientClient代理模式,調(diào)用RPC.getProxy實(shí)際上返回的一個(gè)代理對(duì)象,當(dāng)調(diào)用方法的時(shí)候?qū)嶋H調(diào)用的是Invoker, Invoker將協(xié)議,調(diào)用的方法名,參數(shù),參數(shù)類型封裝成Invocation對(duì)象經(jīng)過client發(fā)送到server,并讀取返回流,根據(jù)流中的id,判斷是服務(wù)器返回的是那次調(diào)用的結(jié)果.Connection線程負(fù)責(zé)讀取Server返回值,在讀取的過程中,調(diào)用Client的線程會(huì)wait直到Connection獲取到返回值.讀取時(shí)候如果超時(shí)(ipc.ping.interval)就發(fā)送一次ping,如果沒有出現(xiàn)IOException就繼續(xù)讀取,Conneciton可以根據(jù)標(biāo)識(shí)(地址,用戶組信息,協(xié)議)共用.IPC/RPC AuthingHDFSName協(xié)議ClientProtocol:客戶端調(diào)用協(xié)議,涉及文件操作,DFS管理,升級(jí)(DFSAdmin)DatanodeProtocol:DN與NN通訊協(xié)議,注冊(cè),BlockReport,心跳,升級(jí)NamenodeProtocol: SN和NN通訊協(xié)議,通知NN使用新的fsimage和editRefreshAuthorizationPolicyProtocol, RefreshUserMappingsProtocol FSNamesystem數(shù)據(jù)結(jié)構(gòu)LightWeightGSetGset 類似Set但提供get方法,主要用于存儲(chǔ)BlockInfo,根據(jù)Block找到BlockInfo其中一個(gè)實(shí)現(xiàn)LightWeightGSet,利用數(shù)組存儲(chǔ)元素,用鏈表解決沖突問題,類似HashMap但是沒有ReHash操作BlocksMap初始化LightWeightGSet時(shí)候,會(huì)根據(jù)JVM內(nèi)存將數(shù)組的大小初始為最大能占用的內(nèi)存數(shù)(4194304 -Xmx1024M)加上高效的hash映射算法, LightWeightGSet在BlockInfo數(shù)量比較小的時(shí)候get性能逼近數(shù)組.BlockInfo繼承Block,沒有重寫hashCode和equals方法,在Block中equals方法只要求傳入的對(duì)象是Block實(shí)例并且blockId相等,就認(rèn)為兩個(gè)對(duì)象相等,故存儲(chǔ)BlockInfo時(shí)候分配的在數(shù)組中的Index和Get時(shí)候由Block的hashCode定位是一致的.BlockMapsBlockMaps負(fù)責(zé)管理Block以及Block相關(guān)的元數(shù)據(jù)Block 有3個(gè)long型的屬性blockId(隨機(jī)生成)numBytes(塊大小),generationStampBlockInfo繼承Block添加了2個(gè)屬性,實(shí)現(xiàn)了用戶LightWeightGSet的LinkedElement接口inode:引用文件Inodetriplets:3Xreplication的數(shù)組,即replication 個(gè)組,每組有3個(gè)元素,第一個(gè)指向DatanodeDescriptor,代表在這個(gè)DN上有一個(gè)Block,第二個(gè)和第三個(gè)分別代表DN上的上一個(gè)blockInfo和下一個(gè)blockInfoDatanodeDescriptor有一個(gè)屬性blockList指向一個(gè)BlockInfo,因?yàn)槊總€(gè)BlockInfo中的triplets中有一組記錄著對(duì)應(yīng)的DN上的上一個(gè),下一個(gè)BlockInfo,所以從這個(gè)角度來看BlockInfo是一個(gè)雙向鏈表.新建文件打開輸入流后,寫入,會(huì)在namenode中分配BlockInfo,當(dāng)Block寫入到分配的DN后,DN在發(fā)送心跳時(shí)候會(huì)將新接受到的塊報(bào)告給NN,此時(shí)NN在將triplets可用的組關(guān)聯(lián)到DN(DD).(例子前提假設(shè):新建的集群沒有文件,操作是在DN1上,此時(shí)很大可能性每次分配塊的時(shí)候都會(huì)首選本地DN1,bkl_* *實(shí)際為隨機(jī)數(shù))namenode中分配BlockInfo 并加入Gset中,blk_1,0-64M,此時(shí)DN1的blockList為nullDN1向NN報(bào)告接收了新的Block blk_1 ,NN從blocksMap中根據(jù)Block blk_1找到BlockInfo blockInfo1 將triplets的可用組(=null)的第一位關(guān)聯(lián)到DN1(DatanodeDescriptor1),將DN1的blockList指向blockInfo1此時(shí)blockList指向的是blockInfo1NN分配blockInfo2,DN1向NN報(bào)告接受到了信的Block blk_2,NN找到blockInfo2后1,將triplets的可用組(=null)的第一位關(guān)聯(lián)到DN1(DatanodeDescriptor1)2,將第三位指向blockList即blockInfo1,2,將blockInfo1的對(duì)應(yīng)DN1的組的第二位指向blockInfo24,將DN1的blockList指向blockInfo1升級(jí)LoadBalance磁盤占用,還是分布策略 可能出現(xiàn)一個(gè)DN上兩個(gè)相同的Block么.MapReduce命令行運(yùn)行bin/hadoop jarjarFilemainClassargs.設(shè)置JVM啟動(dòng)參數(shù),將lib,conf等加入classpath,啟動(dòng)JVM運(yùn)行RunJarRunJar階段:1,設(shè)定MainClass如果jar設(shè)置了Manifest,則作為MainClass否則取第二個(gè)參數(shù)2,在臨時(shí)目錄(hadoop.tmp.dir)中建立臨時(shí)目錄(File.createTempFile("hadoop-unjar", "", tmpDir),并注冊(cè)鉤子JVM退出時(shí)候刪除.3,將Jar解壓到建立的臨時(shí)目錄中4,將目錄hadoop-unjar38923742,目錄hadoop-unjar38923742/class, 目錄hadoop-unjar38923742/lib中的每個(gè)文件作為URLClassLoader參數(shù),構(gòu)造一個(gè)classLoader.5,將當(dāng)前線程的上下文ClassLoader設(shè)置為classLoader6,以上5步都是為mainClass啟動(dòng)做準(zhǔn)備,最后應(yīng)用反射啟動(dòng)mainClass,將args作為參數(shù)MainClassbin/hadoop jar -libjars testlib.jar -archives test.tgz -files file.txt inputjar argsjob.setXXX均將KV設(shè)置到了Conf實(shí)例中(傳值,例如將-file指定的文件設(shè)定到Conf中,在submit中獲取,從本地復(fù)制到hdfs)在Job.submit方法中會(huì)向hdfs寫入以下信息目錄:hdfs:/$mapreduce.jobtracker.staging.root.dir/$user/.staging/$jobId/hdfs:/tmp/hadoop/mapred/staging/$user/.staging/$jobId/目錄下文件:job.split->(Split信息)由writeSplits方法寫入job.splitmetainfo->( Split信息元數(shù)據(jù),版本,個(gè)數(shù),索引)由writeSplits方法寫入job.xml->conf對(duì)象job.jar->inputjarfiles/->參數(shù)-files 逗號(hào)分割,交給(DistributedCache管理)archives/->參數(shù)-archives 逗號(hào)分割, 交給(DistributedCache管理)libjars/->參數(shù)-libjars 逗號(hào)分割, 交給(DistributedCache管理)split,splitmetainfo(FileSplit)設(shè)計(jì)目的job.splitmetainfo中保存有Split的在那幾個(gè)機(jī)器上有副本,JT讀取這個(gè)文件用,用來分配Task使Task能夠讀取本地磁盤文件.job.split保存具體的Split,不保存位置信息,因?yàn)門T不需要(hdfs決定)JT調(diào)度CapacityTaskScheduler,TTTT啟動(dòng)時(shí)候,啟動(dòng)線程mapLauncher(用于啟動(dòng)MapTask),reduceLauncher(用于啟動(dòng)ReduceTask), taskCleanupThread(用于清理Task或者Job),TT 通過心跳從JT獲得HeartbeatResponse,包含TaskTrackerAction,具體有5種操作LAUNCH_TASK啟動(dòng)任務(wù),將LaunchTaskAction中包裝的Task與Conf對(duì)象和TaskLauncher組合成TaskInProgres,然后添加到mapLauncher或者reduceLauncher中的隊(duì)列中.TaskLauncher構(gòu)造參數(shù)numSlots代表當(dāng)前TaskTractor能同時(shí)執(zhí)行多少個(gè)Task,由參數(shù)mapred.tasktracker.map.tasks.maximum, mapred.tasktracker.reduce.tasks.maximum設(shè)定,slot意思為: 槽,位置 將TaskTractor的資源抽象化,一般情況下一個(gè)task占用一個(gè)slot,如果有對(duì)資源需求大的Task也可以通過參數(shù)來控制(調(diào)度器CapacityTaskScheduler設(shè)置,未開放給User?)TaskLauncher根據(jù)剩余空閑的槽位(numFreeSlots)和隊(duì)列情況,來從隊(duì)列中取出Task來運(yùn)行(synchronized, wait, notify).KILL_TASK殺死任務(wù)KILL_JOB殺死和Job相關(guān)的任務(wù),放入tasksToCleanup隊(duì)列中REINIT_TRACKER重新啟動(dòng)TTCOMMIT_TASK提交任務(wù)(1, speculative execution 2,need commit file?) OutputCommitterREINIT_TRACKER 重啟TT, startNewTask 新的JVM(不是TT的JVM,錯(cuò)誤處理,GC)執(zhí)行Child.class,通過main參數(shù)argsMap過程MapTask中會(huì)根據(jù)jobConf記錄的hdfs上的job.split文件以及JT分配的splitIndex獲取InputSplit,根據(jù)jobConf的配置新建Map和InputFormat,由InputFormat獲取RecordReader來讀取inputSplit,生成原始o(jì)riginal_key, original_value交給Mapper.map方法處理生成gen_key,gen_value,根據(jù)partitioner生成partition,成對(duì)的(gen_key,gen_value, partition)會(huì)先放入一個(gè)緩沖區(qū),如圖,這個(gè)緩沖區(qū)分為3級(jí)索引(排序kvoffset,復(fù)制效率)等這個(gè)緩沖區(qū)到達(dá)一定閥值之后,并不是緩沖區(qū)慢之后,SplitThread會(huì)標(biāo)記當(dāng)前前后界,對(duì)界內(nèi)數(shù)據(jù)進(jìn)行排序(現(xiàn)根據(jù)partition在根據(jù)kv),并寫入到磁盤文件中(split.x.out)并記錄各個(gè)partition段的位置,部分存到內(nèi)存部分存到磁盤,在這個(gè)過程中,map仍然繼續(xù)進(jìn)行,如果緩沖區(qū)滿之后,map線程暫時(shí)wait,到SplitThread完畢.當(dāng)輸入讀取完畢,隨之的SplitThread也結(jié)束后,磁盤中中間文件為split.1.out -> split.n.out ,索引部分存在內(nèi)存里面,超過1024*1024個(gè),作為索引文件spill.n.out.index(避免內(nèi)存不夠用).然后通過合并排序?qū)⒎侄蔚奈募?split.x.n)合并排序成一個(gè)文件file.out,file.out.index記錄partition信息.(詳細(xì)見MergeQueue)這樣在Reduce過程中,通過http請(qǐng)求TT其中需要的partition段(參數(shù)reduce),TT根據(jù)file.out.index記錄的索引信息將file.out的partition段,生成http響應(yīng).如果有CombinerSortAndSpillkvoffset達(dá)到臨界點(diǎn)softRecordLimit,例如100個(gè),設(shè)定80個(gè)為臨界點(diǎn).Kvbuffer達(dá)到臨界點(diǎn)softBufferLimit,例如100M,當(dāng)80M為臨界點(diǎn).目的是為了不讓map過程停止浪費(fèi)時(shí)間,但由于IO map可能會(huì)慢一點(diǎn)(進(jìn)一步多磁盤負(fù)載).io.sort.mb配置的是圖中kvoffset,kvindices,kvbuffer占用的空間總大小Mb.上述參數(shù)都可以通過conf.setXXX來配置,根據(jù)特定job的特點(diǎn)來設(shè)定.來減少Spill次數(shù),同時(shí)避免內(nèi)存溢出.Reduce過程JobInProgress初始化mapred.reduce.tasks個(gè)ReduceTask 用參數(shù)partition區(qū)別.然后JT在心跳過程中,將ReduceTask分給TT執(zhí)行.ReduceTask有SHUFFLE, SORT, REDUCE三個(gè)階段SHUFFLE這一階段是ReduceTask初始化階段,新建了N(參數(shù)控制)個(gè)下載線程,來獲取Map的輸出,TaskTracker中有一個(gè)線程會(huì)不斷的從JT中獲取在本TT運(yùn)行的ReudceTask(s)的JOB的Map完成事件. ReduceTask不斷從TT中獲取Job的Map完成事件,然后將事件中的Map輸出位置交給下載線程來獲取.下載的時(shí)候,從HTTP響應(yīng)頭獲取文件的大小,決定是放在內(nèi)存中還是寫入磁盤.在內(nèi)存中的數(shù)據(jù),滿足一定條件會(huì)在后臺(tái)將內(nèi)存中的數(shù)據(jù)Merger寫入硬盤,在硬盤中的數(shù)據(jù),滿足一定條件(數(shù)目超過了2 * ioSortFactor - 1)會(huì)在后臺(tái)做Merger.所有的Map輸出下載完畢,并且后臺(tái)Merger線程也結(jié)束后,進(jìn)入SORT階段.SORT這個(gè)階段還是Merger,將內(nèi)存和硬盤中的數(shù)據(jù),做合并排序(ioSortFactor),使能夠高效率的輸出key ,values.然后進(jìn)入REDEUCE階段.REDUCE這一階段只要是將上述產(chǎn)生的key value通過ReduceContext轉(zhuǎn)化成key values(ValueIterable),傳遞給reduce,最后通過輸出配置,將reduce的輸出寫入HDFS(part-r-00000,1,2),或者其他.MergeQueueMR 優(yōu)化1,控制好每個(gè)Map的輸入和輸出,盡量使Map處理存在本地的Block,一個(gè)InputSpilt不要太大,最好使產(chǎn)生的輸出能控制在io.sort.mb之內(nèi),這樣能夠減少?gòu)膬?nèi)存將輸出數(shù)據(jù)寫到磁盤的磁盤的個(gè)數(shù). 根據(jù)任務(wù)將io.sort.mb設(shè)置合理,盡量能容納單個(gè)Map全部輸出.2, 多磁盤負(fù)載,MR中大量的臨時(shí)輸出文件會(huì)放在這個(gè)下mapred.local.dir = /hd1/mr, /hd2/mr, /hd3/mr hd*為掛載的不同磁盤.LocalDirAllocator從以上多個(gè)目錄中分配每次創(chuàng)建文件的目錄,降低IO負(fù)載.(其他SSD)HDFS中逗號(hào)分隔的目錄(dfs.data.dir, dfs.name.dir, dfs.name.edits.dir)是為了冗余.3,Map輸出壓縮,減少網(wǎng)絡(luò)傳輸.4,Reduce階段,將各個(gè)map的輸出下載到本地,由于各個(gè)Map輸出可能有大有小,合適的可以放到內(nèi)存中,( mapred.job.reduce.total.mem.bytes, mapred.job.shuffle.input.buffer.percent),減少N個(gè)下載線程寫磁盤.5,設(shè)置Java運(yùn)行內(nèi)存偏大,GC回收算法. UseConcMarkSweepGC.6,頻外心跳mapreduce.tasktracker.outofband.heartbeat,會(huì)加重JT負(fù)擔(dān).預(yù)想優(yōu)化Reduce負(fù)載較重(收集N個(gè)map輸出,執(zhí)行統(tǒng)計(jì)工作),可以通過指定高配置機(jī)器,網(wǎng)絡(luò)節(jié)離中心交換機(jī)近.做Merger的時(shí)候k,vvv,k,vv最后源碼基于hadoop-0.20.203.0,粗糙整理,不斷完善中,正誤自辯,如有疑問交流或指正錯(cuò)誤,可發(fā)郵件nice2mu。

注意事項(xiàng)

本文(hadoop[1].源碼閱讀總結(jié))為本站會(huì)員(1888****888)主動(dòng)上傳,裝配圖網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)上載內(nèi)容本身不做任何修改或編輯。 若此文所含內(nèi)容侵犯了您的版權(quán)或隱私,請(qǐng)立即通知裝配圖網(wǎng)(點(diǎn)擊聯(lián)系客服),我們立即給予刪除!

溫馨提示:如果因?yàn)榫W(wǎng)速或其他原因下載失敗請(qǐng)重新下載,重復(fù)下載不扣分。




關(guān)于我們 - 網(wǎng)站聲明 - 網(wǎng)站地圖 - 資源地圖 - 友情鏈接 - 網(wǎng)站客服 - 聯(lián)系我們

copyright@ 2023-2025  zhuangpeitu.com 裝配圖網(wǎng)版權(quán)所有   聯(lián)系電話:18123376007

備案號(hào):ICP2024067431號(hào)-1 川公網(wǎng)安備51140202000466號(hào)


本站為文檔C2C交易模式,即用戶上傳的文檔直接被用戶下載,本站只是中間服務(wù)平臺(tái),本站所有文檔下載所得的收益歸上傳人(含作者)所有。裝配圖網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)上載內(nèi)容本身不做任何修改或編輯。若文檔所含內(nèi)容侵犯了您的版權(quán)或隱私,請(qǐng)立即通知裝配圖網(wǎng),我們立即給予刪除!