www.久久久久|狼友网站av天堂|精品国产无码a片|一级av色欲av|91在线播放视频|亚洲无码主播在线|国产精品草久在线|明星AV网站在线|污污内射久久一区|婷婷综合视频网站

當(dāng)前位置:首頁 > 公眾號精選 > 架構(gòu)師社區(qū)
[導(dǎo)讀]簡介 Kafka架構(gòu) Kafka存儲策略 Kafka刪除策略 Kafka broker Kafka Design The Producer The Consumer 復(fù)制(Replication) 日志壓縮(Log Compaction) Distribution Zookeeper協(xié)調(diào)控制 開發(fā)環(huán)境搭建 一些example 參考 簡介 Apache Kafka是分布式發(fā)布-訂閱消


  • 簡介

  • Kafka架構(gòu)

  • Kafka存儲策略

  • Kafka刪除策略

  • Kafka broker

  • Kafka Design

  • The Producer

  • The Consumer

  • 復(fù)制(Replication)

  • 日志壓縮(Log Compaction)

  • Distribution

  • Zookeeper協(xié)調(diào)控制

  • 開發(fā)環(huán)境搭建

  • 一些example

  • 參考


簡介

Apache Kafka是分布式發(fā)布-訂閱消息系統(tǒng)。它最初由LinkedIn公司開發(fā),之后成為Apache項(xiàng)目的一部分。Kafka是一種快速、可擴(kuò)展的、設(shè)計(jì)內(nèi)在就是分布式的,分區(qū)的和可復(fù)制的提交日志服務(wù)。

Kafka架構(gòu)

它的架構(gòu)包括以下組件:

  • 話題(Topic):是特定類型的消息流。消息是字節(jié)的有效負(fù)載(Payload),話題是消息的分類名或種子(Feed)名。

  • 生產(chǎn)者(Producer):是能夠發(fā)布消息到話題的任何對象。

  • 服務(wù)代理(Broker):已發(fā)布的消息保存在一組服務(wù)器中,它們被稱為代理(Broker)或Kafka集群。

  • 消費(fèi)者(Consumer):可以訂閱一個或多個話題,并從Broker拉數(shù)據(jù),從而消費(fèi)這些已發(fā)布的消息。

Kafka 基本原理(8000 字小結(jié))


Kafka存儲策略

1)kafka以topic來進(jìn)行消息管理,每個topic包含多個partition,每個partition對應(yīng)一個邏輯log,有多個segment組成。

2)每個segment中存儲多條消息(見下圖),消息id由其邏輯位置決定,即從消息id可直接定位到消息的存儲位置,避免id到位置的額外映射。

3)每個part在內(nèi)存中對應(yīng)一個index,記錄每個segment中的第一條消息偏移。

4)發(fā)布者發(fā)到某個topic的消息會被均勻的分布到多個partition上(或根據(jù)用戶指定的路由規(guī)則進(jìn)行分布),broker收到發(fā)布消息往對應(yīng)partition的最后一個segment上添加該消息

當(dāng)某個segment上的消息條數(shù)達(dá)到配置值或消息發(fā)布時間超過閾值時,segment上的消息會被flush到磁盤,只有flush到磁盤上的消息訂閱者才能訂閱到

segment達(dá)到一定的大小后將不會再往該segment寫數(shù)據(jù),broker會創(chuàng)建新的segment。

Kafka 基本原理(8000 字小結(jié))

Kafka刪除策略

1)N天前的刪除。

2)保留最近的MGB數(shù)據(jù)。

Kafka broker

與其它消息系統(tǒng)不同,Kafka broker是無狀態(tài)的。這意味著消費(fèi)者必須維護(hù)已消費(fèi)的狀態(tài)信息。這些信息由消費(fèi)者自己維護(hù),broker完全不管(有offset managerbroker管理)。

  • 從代理刪除消息變得很棘手,因?yàn)榇聿⒉恢老M(fèi)者是否已經(jīng)使用了該消息。Kafka創(chuàng)新性地解決了這個問題,它將一個簡單的基于時間的SLA應(yīng)用于保留策略。當(dāng)消息在代理中超過一定時間后,將會被自動刪除。

  • 這種創(chuàng)新設(shè)計(jì)有很大的好處,消費(fèi)者可以故意倒回到老的偏移量再次消費(fèi)數(shù)據(jù)。這違反了隊(duì)列的常見約定,但被證明是許多消費(fèi)者的基本特征。

以下摘抄自kafka官方文檔:

Kafka Design

目標(biāo)

1) 高吞吐量來支持高容量的事件流處理

2) 支持從離線系統(tǒng)加載數(shù)據(jù)

3) 低延遲的消息系統(tǒng)

持久化

1) 依賴文件系統(tǒng),持久化到本地

2) 數(shù)據(jù)持久化到log

效率

1) 解決”small IO problem“:

使用”message set“組合消息。

server使用”chunks of messages“寫到log。

consumer一次獲取大的消息塊。

2)解決”byte copying“:

在producer、broker和consumer之間使用統(tǒng)一的binary message format。

使用系統(tǒng)的pagecache。

使用sendfile傳輸log,避免拷貝。

端到端的批量壓縮(End-to-end Batch Compression)

Kafka支持GZIP和Snappy壓縮協(xié)議。

The Producer

負(fù)載均衡

1)producer可以自定義發(fā)送到哪個partition的路由規(guī)則。默認(rèn)路由規(guī)則:hash(key)%numPartitions,如果key為null則隨機(jī)選擇一個partition。

2)自定義路由:如果key是一個user id,可以把同一個user的消息發(fā)送到同一個partition,這時consumer就可以從同一個partition讀取同一個user的消息。

異步批量發(fā)送

批量發(fā)送:配置不多于固定消息數(shù)目一起發(fā)送并且等待時間小于一個固定延遲的數(shù)據(jù)。

The Consumer

consumer控制消息的讀取。

Push vs Pull

1)producer push data to broker,consumer pull data from broker

2)consumer pull的優(yōu)點(diǎn):consumer自己控制消息的讀取速度和數(shù)量。

3)consumer pull的缺點(diǎn):如果broker沒有數(shù)據(jù),則可能要pull多次忙等待,Kafka可以配置consumer long pull一直等到有數(shù)據(jù)。

Consumer Position

1)大部分消息系統(tǒng)由broker記錄哪些消息被消費(fèi)了,但Kafka不是。

2)Kafka由consumer控制消息的消費(fèi),consumer甚至可以回到一個old offset的位置再次消費(fèi)消息。

Message Delivery Semantics

三種:

At most once—Messages may be lost but are never redelivered.

At least once—Messages are never lost but may be redelivered.

Exactly once—this is what people actually want, each message is delivered once and only once.

Producer:有個”acks“配置可以控制接收的leader的在什么情況下就回應(yīng)producer消息寫入成功。

Consumer:

  • 讀取消息,寫log,處理消息。如果處理消息失敗,log已經(jīng)寫入,則無法再次處理失敗的消息,對應(yīng)”At most once“。

  • 讀取消息,處理消息,寫log。如果消息處理成功,寫log失敗,則消息會被處理兩次,對應(yīng)”At least once“。

  • 讀取消息,同時處理消息并把result和log同時寫入。這樣保證result和log同時更新或同時失敗,對應(yīng)”Exactly once“。

Kafka默認(rèn)保證at-least-once delivery,容許用戶實(shí)現(xiàn)at-most-once語義,exactly-once的實(shí)現(xiàn)取決于目的存儲系統(tǒng),kafka提供了讀取offset,實(shí)現(xiàn)也沒有問題。

復(fù)制(Replication)

1)一個partition的復(fù)制個數(shù)(replication factor)包括這個partition的leader本身。

2)所有對partition的讀和寫都通過leader。

3)Followers通過pull獲取leader上log(message和offset)

4)如果一個follower掛掉、卡住或者同步太慢,leader會把這個follower從”in sync replicas“(ISR)列表中刪除。

5)當(dāng)所有的”in sync replicas“的follower把一個消息寫入到自己的log中時,這個消息才被認(rèn)為是”committed“的。

6)如果針對某個partition的所有復(fù)制節(jié)點(diǎn)都掛了,Kafka選擇最先復(fù)活的那個節(jié)點(diǎn)作為leader(這個節(jié)點(diǎn)不一定在ISR里)。

日志壓縮(Log Compaction)

1)針對一個topic的partition,壓縮使得Kafka至少知道每個key對應(yīng)的最后一個值。

2)壓縮不會重排序消息。

3)消息的offset是不會變的。

4)消息的offset是順序的。

Distribution

Consumer Offset Tracking

1)High-level consumer記錄每個partition所消費(fèi)的maximum offset,并定期commit到offset manager(broker)。

2)Simple consumer需要手動管理offset?,F(xiàn)在的Simple consumer Java API只支持commit offset到zookeeper。

Consumers and Consumer Groups

1)consumer注冊到zookeeper

2)屬于同一個group的consumer(group id一樣)平均分配partition,每個partition只會被一個consumer消費(fèi)。

3)當(dāng)broker或同一個group的其他consumer的狀態(tài)發(fā)生變化的時候,consumer rebalance就會發(fā)生。

Zookeeper協(xié)調(diào)控制

1)管理broker與consumer的動態(tài)加入與離開。

2)觸發(fā)負(fù)載均衡,當(dāng)broker或consumer加入或離開時會觸發(fā)負(fù)載均衡算法,使得一個consumer group內(nèi)的多個consumer的訂閱負(fù)載平衡。

3)維護(hù)消費(fèi)關(guān)系及每個partition的消費(fèi)信息。

生產(chǎn)者代碼示例:

import java.util.*;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;



public class TestProducer {

    public static void main(String[] args) {

        long events = Long.parseLong(args[0]);

        Random rnd = new Random();

        Properties props = new Properties();

        props.put("metadata.broker.list""broker1:9092,broker2:9092 ");

        props.put("serializer.class""kafka.serializer.StringEncoder");

        props.put("partitioner.class""example.producer.SimplePartitioner");

        props.put("request.required.acks""1");

        ProducerConfig config = new ProducerConfig(props);

        Producer<String, Stringproducer = new Producer<StringString(config);

        for (long nEvents = 0; nEvents < events; nEvents++) {

               long runtime = new Date().getTime();

               String ip = “192.168.2.” + rnd.nextInt(255);

               String msg = runtime + “,www.example.com,” + ip;

               KeyedMessage<String, Stringdata = new KeyedMessage<StringString("page_visits", ip, msg);

               producer.send(data);
        }
        producer.close();
    }
}

Partitioning Code:

import kafka.producer.Partitioner;

import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner {

    public SimplePartitioner (VerifiableProperties props) {

    }


    public int partition(Object key, int a_numPartitions) {

        int partition = 0;

        String stringKey = (String) key;

        int offset = stringKey.lastIndexOf('.');

        if (offset 0) {

           partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;

        }

       return partition;

  }
}

消費(fèi)者代碼示例:

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class ConsumerGroupExample {

    private final ConsumerConnector consumer;

    private final String topic;

    private  ExecutorService executor;


    public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {

        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(

                createConsumerConfig(a_zookeeper, a_groupId));

        this.topic = a_topic;

    }

    public void shutdown() {

        if (consumer != null) consumer.shutdown();

        if (executor != null) executor.shutdown();

        try {

            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {

                System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");

            }

        } catch (InterruptedException e) {

            System.out.println("Interrupted during shutdown, exiting uncleanly");

        }

   }


    public void run(int a_numThreads) {

        Map<String, IntegertopicCountMap = new HashMap<String, Integer();

        topicCountMap.put(topic, new Integer(a_numThreads));

        Map<StringList<kafkastream<byte[], byte[]consumermap = consumer.createmessagestreams(topiccountmap);

        List<kafkastream<byte[], byte[]streams = consumermap.get(topic);


        // now launch all the threads

        executor = Executors.newFixedThreadPool(a_numThreads);


        // now create an object to consume the messages

        int threadNumber = 0;

        for (final KafkaStream stream : streams) {

            executor.submit(new ConsumerTest(stream, threadNumber));

            threadNumber++;

        }

    }


    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {

        Properties props = new Properties();

        props.put("zookeeper.connect", a_zookeeper);

        props.put("group.id", a_groupId);

        props.put("zookeeper.session.timeout.ms""400");

        props.put("zookeeper.sync.time.ms""200");

        props.put("auto.commit.interval.ms""1000");

        return new ConsumerConfig(props);

    }


    public static void main(String[] args) {

        String zooKeeper = args[0];

        String groupId = args[1];

        String topic = args[2];

        int threads = Integer.parseInt(args[3]);


        ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);

        example.run(threads);


        try {

            Thread.sleep(10000);

        } catch (InterruptedException ie) {

        }

        example.shutdown();

    }

}

ConsumerTest 測試類:

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;


public class ConsumerTest implements Runnable {

    private KafkaStream m_stream;
    private int m_threadNumber;


    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {

        m_threadNumber = a_threadNumber;

        m_stream = a_stream;

    }



    public void run() {

        ConsumerIterator<byte[], byte[]it = m_stream.iterator();

        while (it.hasNext())

            System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));

        System.out.println("Shutting down Thread: " + m_threadNumber);

    }

}

開發(fā)環(huán)境搭建

https://cwiki.apache.org/confluence/display/KAFKA/Developer+Setup

一些example

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example

參考

  • https://kafka.apache.org/documentation.html

  • https://cwiki.apache.org/confluence/display/KAFKA/Index

作者:阿凡盧

https://www.cnblogs.com/luxiaoxun/p/5492646.html

特別推薦一個分享架構(gòu)+算法的優(yōu)質(zhì)內(nèi)容,還沒關(guān)注的小伙伴,可以長按關(guān)注一下:

Kafka 基本原理(8000 字小結(jié))

Kafka 基本原理(8000 字小結(jié))

Kafka 基本原理(8000 字小結(jié))

長按訂閱更多精彩▼

Kafka 基本原理(8000 字小結(jié))

如有收獲,點(diǎn)個在看,誠摯感謝

免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺僅提供信息存儲服務(wù)。文章僅代表作者個人觀點(diǎn),不代表本平臺立場,如有問題,請聯(lián)系我們,謝謝!

本站聲明: 本文章由作者或相關(guān)機(jī)構(gòu)授權(quán)發(fā)布,目的在于傳遞更多信息,并不代表本站贊同其觀點(diǎn),本站亦不保證或承諾內(nèi)容真實(shí)性等。需要轉(zhuǎn)載請聯(lián)系該專欄作者,如若文章內(nèi)容侵犯您的權(quán)益,請及時聯(lián)系本站刪除。
關(guān)閉
關(guān)閉