
下載本文檔
版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領
文檔簡介
1、kafka producer consumerproducer api ducer.kafkaproducer 假如想學習java工程化、高性能及分布式、深化淺出。微服務、spring,mybatis,netty源碼分析的伴侶可以加我的java高級溝通:854630135,群里有阿里大牛直播講解技術,以及java大型互聯網技術的視頻免費共享給大家。 1 props.put("bootstrap.servers", "28:9092"); 2 pr
2、ops.put("acks", "all"); 3 props.put("retries", 0); 4 props.put("batch.size", 16384); 5 props.put("linger.ms", 1); 6 props.put("buffer.memory", 33554432); 7 props.put("key.serializer&a
3、mp;quot;, "mon.serialization.stringserializer"); 8 props.put("value.serializer", "mon.serialization.stringserializer"); 9 10 producer producer = new kafkaproducer(props); 11 for (int i = 0; i ("foo", integer.tostring(i), integer
4、.tostring(i), new callback() 13 override 14 public void oncompletion(recordmetadata recordmetadata, exception e) 15 if (null != e) 16 e.printstacktrace(); 17 else 18 system.out.println("callback: " + recordmetadata.topic() + " " + recordmetadata.offset(); 19 20 21
5、 ); 22 23 producer.close(); producer由一個緩沖池組成,這個緩沖池中維護著那些還沒有被傳送到服務器上的記錄,而且有一個后臺的i/o線程負責將這些記錄轉換為哀求并將其傳送到集群上去。 send()辦法是異步的。當調用它以后就把記錄放到buffer中并立刻返回。這就允許生產者批量的發送記錄。 acks配置項控制的是完成的標準,即什么樣的哀求被認為是完成了的。本例中其值設置的是"all"表示客戶端會等待直到全部記錄徹低被提交,這是最慢的一種方式也是持久化最好的一種方式。 假如哀求失敗了,生產者可以自動重試。由于這里我們設置retr
6、ies為0,所以它不重試。 生產者對每個分區都維護了一個buffers,其中放的是未被發送的記錄。這些buffers的大小是通過batch.size配置項來控制的。 默認狀況下,即使一個buffer還有未用法的空間(ps:buffer沒滿)也會立刻發送。假如你想要削減哀求的次數,你可以設置linger.ms為一個大于0的數。這個命令將告知生產者在發送哀求之前先等待多少毫秒,以希翼能有更多的記錄到達好填滿buffer。在本例中,我們設置的是1毫秒,表示我們的哀求將會延遲1毫秒發送,這樣做是為了等待更多的記錄到達,1毫秒之后即使buffer沒有被填滿,哀求也會發送。(ps:略微說明一下這段話,pr
7、oducer調用send()辦法只是將記錄放到buffer中,然后由一個后臺線程將buffer中的記錄傳送到服務器上。這里所說的哀求指的是從buffer到服務器。默認狀況下記錄被放到buffer以后立刻被發送到服務器,為了削減哀求服務器的次數,可以通過設置linger.ms,這個配置項表示等多少毫秒以后再發送,這樣做是希翼每次哀求可以發送更多的記錄,以此削減哀求次數) 假如想學習java工程化、高性能及分布式、深化淺出。微服務、spring,mybatis,netty源碼分析的伴侶可以加我的java高級溝通:854630135,群里有阿里大牛直播講解技術,以及java大型互聯網技術的視頻免費共
8、享給大家。 buffer.memory控制的是總的buffer內存數量 key.serializer 和 value.serializer表示怎樣將key和value對象轉成字節 從kafka 0.11開頭,kafkaproducer支持兩種模型:the idempotent producer and the transactional producer(冪等producer和事務producer)。冪等producer強調的是起碼一次精確的投遞。事務producer允許應用程序原子的發送消息到多個分區或者主題。 為了啟用冪等性,必需將enable.idempotence這個配置的值設為tru
9、e。假如你這樣設置了,那么retries默認是integer.max_value,并且acks默認是all。為了利用冪等producer的優勢,請避開應用程序級別的重新發送。 為了用法事務producer,你必需配置transactional.id。假如transactional.id被設置,冪等性自動被啟用。 1 properties props = new properties(); 2 props.put("bootstrap.servers", "28:9092"); 3 props.put(
10、"transactional.id", "my-transactional-id"); 4 5 producer producer = new kafkaproducer(props, new stringserializer(), new stringserializer(); 6 7 producer.inittransactions(); 8 9 try 10 producer.begintransaction(); 11 12 for (int i = 11; i ("bar",
11、 integer.tostring(i), integer.tostring(i); 14 15 / this method will flush any unsent records before actually committing the transaction 16 mittransaction(); 17 catch (producerfencedexception | outofordersequenceexception | authorizationexception e) 18 producer.close(); 19 catch (kafkaexception e) 20
12、 / by calling producer.aborttransaction() upon receiving a kafkaexception we can ensure 21 / that any successful writes are marked as aborted, hence keeping the transactional guarantees. 22 producer.aborttransaction(); 23 24 25 producer.close(); 假如想學習java工程化、高性能及分布式、深化淺出。微服務、spring,mybatis,netty源碼分析
13、的伴侶可以加我的java高級溝通:854630135,群里有阿里大牛直播講解技術,以及java大型互聯網技術的視頻免費共享給大家。 consumer api org.apache.kafka.clients.consumer.kafkaconsumer offsets and consumer position 對于分區中的每條記錄,kafka維護一個數值偏移量。這個偏移量是分區中一條記錄的唯一標識,同時也是消費者在分區中的位置。例如,一個消費者在分區中的位置是5,表示它已經消費了偏移量從0到4的記錄,并且接下來它將消費偏移量為5的記錄。相對于消費者用戶來說,這里事實上有兩個位置的概念。 消費
14、者的position表示下一條將要消費的記錄的offset。每次消費者通過調用poll(long)接收消息的時候這個position會自動增強。 committed position表示已經被存儲的最后一個偏移量。消費者可以自動的周期性提交offsets,也可以通過調用提交api(e.g. commitsync and commitasync)手動的提交position。 consumer groups and topic subscriptions kafka用"consumer groups"(消費者組)的概念來允許一組進程分開處理和消費記錄。這些處理在
15、同一個機器上舉行,也可以在不同的機器上。同一個消費者組中的消費者實例有相同的group.id 組中的每個消費者可以動態設置它們想要訂閱的主題列表。kafka給每個訂閱的消費者組都投遞一份消息。這歸功于消費者組中全部成員之間的均衡分區,以至于每個分區都可以被指定到組中精確的一個消費者。假設一個主題有4個分區,一個組中有2個消費者,那么每個消費者將處理2個分區。 消費者組中的成員是動態維護的:假如一個消費者處理失敗了,那么分配給它的分區將會被重新分給組中其它消費者。 在概念上,你可以把一個消費者組想象成一個單個的規律訂閱者,并且每個規律訂閱者由多個進程組成。作為一個多訂閱系統,kafka天生就支持
16、對于給定的主題可以有隨意數量的消費者組。 automatic offset committing 1 properties props = new properties(); 2 props.put("bootstrap.servers", "28:9092"); 3 props.put("group.id", "test"); 4 props.put("mit", "true
17、"); 5 props.put("erval.ms", "1000"); 6 props.put("key.deserializer", "mon.serialization.stringdeserializer"); 7 props.put("value.deserializer", "mon.serialization.stringdeserializer&
18、;quot;); 8 kafkaconsumer consumer = new kafkaconsumer(props); 9 consumer.subscribe(arrays.aslist("foo", "bar"); 10 while (true) 11 consumerrecords records = consumer.poll(100); 12 for (consumerrecord record : records) 13 system.out.printf("offset = %d, ke
19、y = %s, value = %s%n", record.offset(), record.key(), record.value(); 14 15 設置mit意味著自動提交已消費的記錄的offset manual offset control 代替消費者周期性的提交已消費的offsets,用戶可以控制什么時候記錄被認為是已經消費并提交它們的offsets。 1 properties props = new properties(); 2 props.put("bootstrap.servers", "localhost:9092"); 3 props.put("group.id", "test"); 4 props.put("mit", "false"); 5 props.put("key.
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
- 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
- 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025年新能源汽車標準化發展考試及答案
- 商務英語營銷活動評估能力試題及答案
- 2025年音程特征對比試題及答案
- 政治策論面試題及答案
- 從化學視角看待環境問題試題及答案
- 家具行業設計師如何進行有效的團隊溝通與協作試題及答案
- 幼兒園數學教學試題及答案總結
- 家具市場的品牌塑造策略考題試題及答案
- 家具行業設計中的傳承與創新相結合的實踐與探索研究試題及答案
- 醫學中的AI技術探索決策倫理和法學的結合點
- 中華人民共和國民營經濟促進法
- 2025-2030中國船用導航雷達行業市場發展分析及發展趨勢與投資前景研究報告
- 礦山探礦證轉讓合同協議
- 離散數學中的網絡科學研究-全面剖析
- 外包免責協議書模板
- 廣東省廣州市2025屆普通高中畢業班綜合測試(二)物理試題(含答案)
- 護士執業資格考試資料2024
- 貴州省考試院2025年4月高三年級適應性考試歷史試題及答案
- 五一節后復工復產培訓
- 《休閑農業》課件 項目六 休閑農業經營管理
- T-CWEC 40-2023 防汛排澇抗旱一體化泵車
評論
0/150
提交評論