從?https://github.com/flomesh-io/pipy.git?克隆代碼。Pipy 的編譯包括了兩個(gè)部分,GUI 和 Pipy 本體。GUI 是 Pipy 提供的一個(gè)用于開發(fā)模式下進(jìn)行配置的界面,首先編譯Pipy GUI。# pipy root folder$ cd gui$ npm install$ npm run build接著編譯 Pipy 的本體# pipy root folder$ mkdir build$ cd build$ cmake -DCMAKE_BUILD_TYPE=Release-DPIPY_GUI=ON ..$ make完成后檢查根目錄下的?bin?目錄,可以看到 pipy 的可執(zhí)行文件,大小只有 11M。$ bin/pipy --helpUsage: pipy [options]<script filename>Options:-h,-help,--help Show help information-v,-version,--version Show version information--list-filters List all filters--help-filters Show detailed usage information for all filters--log-level=<debug|info|warn|error>Set the level of log output--verify Verify configuration only--reuse-port Enable kernel load balancing for all listening ports--gui-port=<port>Enable web GUI on the specified port
通過 pipe 的命令可以輸出其支持的過濾器列表,一共 31 個(gè)。通過將一系列過濾器進(jìn)行組裝,可以實(shí)現(xiàn)復(fù)雜的流處理。比如?007-logging?的配置實(shí)現(xiàn)了日志的功能:記錄請求和響應(yīng)的數(shù)據(jù),并批量發(fā)送到 ElasticSearch。這里就用到了?fork、connect、onSessionStart、encodeHttpRequest、decodeHttpRequest、onMessageStart、onMessage、decodeHttpResponse、replaceMessage、link、mux、task?等十多種過濾器。$ bin/pipy --list-filtersconnect (target[, options])Sends data to a remote endpoint and receives data from itdemux (target)Sends messages to a different pipline with each one in its own session and contextdecodeDubbo ()Deframes a Dubbo messagedecodeHttpRequest ()Deframes an HTTP request messagedecodeHttpResponse ()Deframes an HTTP response messagedummy ()Eats up all eventsdump([tag])Outputs events to the standard outputencodeDubbo ([head])Frames a Dubbo messageencodeHttpRequest ([head])Frames an HTTP request messageencodeHttpResponse ([head])Frames an HTTP response messageexec(command)Spawns a child process and connects to its input/outputfork (target[, sessionData])Sends copies of events to other pipeline sessionslink (target[,when[, target2[, when2,...]]])Sends events to a different pipelinemux (target[, selector])Sends messages from different sessions to a shared pipeline sessiononSessionStart (callback)Handles the initial eventin a sessiononData (callback)Handles a DataeventonMessageStart (callback)Handles a MessageStarteventonMessageEnd (callback)Handles a MessageEndeventonSessionEnd (callback)Handles a SessionEndeventonMessageBody (callback)Handles a complete message bodyonMessage (callback)Handles a complete message including the head and the bodyprint()Outputs raw data to the standard outputreplaceSessionStart (callback)Replaces the initial eventin a sessionreplaceData ([replacement])Replaces a DataeventreplaceMessageStart ([replacement])Replaces a MessageStarteventreplaceMessageEnd ([replacement])Replaces a MessageEndeventreplaceSessionEnd ([replacement])Replaces a SessionEndeventreplaceMessageBody ([replacement])Replaces an entire message bodyreplaceMessage ([replacement])Replaces a complete message including the head and the bodytap (quota[, account])Throttles message rate or data rateuse(module, pipeline[, argv...])Sends events to a pipeline in a different modulewait (condition)Buffers up events until a condition is fulfilled
原理
“Talk is cheap, show me the code.”
配置加載
個(gè)人比較喜歡看源碼來理解實(shí)現(xiàn),即使是 C 。從瀏覽器請求入手發(fā)現(xiàn)運(yùn)行時(shí)向/api/program?發(fā)送了?POST?請求,請求的內(nèi)容是配置文件的地址。檢查源碼后,找到邏輯的實(shí)現(xiàn)在?src/gui.cpp:189:1.創(chuàng)建新的 worker2.加載配置,將 JavaScrip 代碼解析成?Configuration?對象3.啟動 worker,執(zhí)行Configuration::apply()4.卸載舊的 worker從?src/api/configuration.cpp:267?處看:pipeline、listen?和?task?配置實(shí)際在 Pipy 的配置中都是被抽象為?Pipeline?對象,只是在類型上有差異分別為:NAMED、LISTEN?和?TASK。比如?listen?中可以通過?fork?過濾器將事件的副本發(fā)送到指定的?pipeline?中。