Kafka – Producer, Message và Serializer

Producer

Mình đã biết Apache Kafka là một hệ thống đứng giữa Source System (SS) và Target System (TS). Đối với SS, khi muốn gửi message/data đến TS thì cách dễ nhất đó chính là gửi vào Kafka, vào 1 topic mà cả SS và TS đã thỏa thuận trước đó. Theo như mô hình của Kafka thì TS đang đóng vai trò 1 Producer.

Một Producer có thể gửi message vào topic trên Kafka, và hơn cả thế nữa, Producer có thể quyết định xem message này sẽ nằm trong partition nào trong topic.

Message keys

Khi producer gửi message đến Kafka, Producer có thể ngầm biết trước được message này sẽ được lưu ở partition nào, hay ngay cả chính producer cũng có thể quyết định message này sẽ nằm ở partition nào, thông qua việc chỉ định key.

Một message có thể được đi kèm với 1 key, tức mỗi khi bạn không gán key khi gửi message, Kafka vẫn chấp nhận. Kafka sẽ đề ra 1 quy định như sau:

  • Nếu key = null, Kafka sẽ đưa message đó vào partition theo round robin (partition 1 -> partition 2 -> … partition n -> partition 1)
  • Nếu key tồn tại, thì mọi message có chứa key đó sẽ luôn vào cùng 1 partition (dựa trên hashing)

Ở đây mình lại có thêm 1 ý hay là nhiều message có thể có cùng 1 key

Cấu trúc của message

Mặc dù cách mình gửi và nhận thông qua API được cung cấp bởi Kafka khá dễ dùng, tuy nhiên message khi được lưu vào topic, chúng sẽ có cấu trúc nhiều hơn là 1 chuỗi, ta sẽ có:

  • Key: định danh, có thể sử dụng để quyết định ở partition nào. Thể hiện ở dạng binary
  • Value: chính là nội dung message đó, nhưng thể hiện ở dạng binary
  • Compression Type: nếu value/message dài quá, ta có thể sử dụng các thuật toán nén lại và chỉ định nội dung này đã nén bởi thuật toán nào cho các Consumer biết.
  • Header: Thông tin thêm như HTTP Header, không bắt buộc
  • Partition Offset của message
  • Timestamp: thời gian được chỉ định bởi user hoặc hệ thống

Kafka Message Serializer

Thêm 1 hiểu biết mới đó là Kafka chỉ nhận dữ liệu đầu vào là binary thôi, cả key và value, cả chiều IN và chiều OUT.

Vì thế, Producer sẽ cần phải Serialize key và message sang binary trước khi gửi. Kafka API đã cung cấp sẵn kha khá serializer phổ biến để giúp mình đỡ quằng hơn trong lúc phát triển.

Ví dụ mình có chuỗi “Hello World”, mình phải biến chuỗi này thành binary, Kafka cung cấp sẵn StringSerialzer, ý nghĩa của nó là “Tao sẽ chuyển đổi data của mày với đầu vào là String và đầu ra là Bytes[] a.k.a Binary”

Và tương tự mình có key là 123, Kafka cũng cung cấp IntegerSerialzer, thể hiện rằng “Đưa tao kiểu Integer, tao trả cho mày Bytes[]”

Việc của mình là cứ gửi raw key/data qua các Serializer đó, rồi mang nội dung binary gửi đến Kafka thôi, khỏe.

Đào sâu hơn xí

Cách Kafka chọn partition dựa trên key?

Như bàn ở trên thì nếu không truyền key, Kafka sẽ đẩy theo round robin (Tuy nhiên ở phiên bản mới hơn, Kafka đã thay đổi cách đẩy vào partition nếu không có key, check ở đây.

Vậy khi key được truyền vào, Kafka làm sao quyết định nó ở Partition nào? Đó sẽ là phần việc của Producer Partitioner Logic lo liệu. PPL là 1 đoạn code logic cơ bản, nó làm công việc đơn giản sau đây

PPL sẽ lấy key của message đó, Hash lại và chia dư (modulus %) cho tổng số lượng partition của topic đó – 1

Thuật toán hash mặc định là murmur2.

Có key khác gì không key ngoài việc chọn partition?

Key ngoài việc xác định partition ra thì nó còn đóng 1 vai trò quan trọng nữa, đó là sắp xếp đâu ra đó. Kafka đảm bảo việc sắp xếp thứ tự ở level partition.

Ví dụ mình có topic lưu lại thay đổi giao dịch của khách hàng có 2 partition, key:value thế này:

null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 2, "changeInBankAccount": +100}
null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 1, "changeInBankAccount": -1337}
null:{"customerId": 1, "changeInBankAccount": +200}

Khi gửi vào Kafka, nó sẽ được chia vào các partition như sau (gần giống vậy):

// partition 0
null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 1, "changeInBankAccount": +200}

// partition 1
null:{"customerId": 2, "changeInBankAccount": +100}
null:{"customerId": 1, "changeInBankAccount": -1337}

Ta thấy ở partition 0 đang giữ các tx của customerid 1 ngon lành thì vô tình có 1 thằng bị lọt qua partition 1, việc này sẽ có ảnh hưởng tới Consumer nếu ta thiết kế một consumer chỉ ôm 1 số customerId nhất định.

Vậy khi ta chèn key vào, cơ bản là key 1 và 2 thôi, thì data của mình sẽ được sắp xếp lại:

// partition 0
1:{"customerId": 1, "changeInBankAccount": +200}
1:{"customerId": 1, "changeInBankAccount": +200}
1:{"customerId": 1, "changeInBankAccount": -1337}
1:{"customerId": 1, "changeInBankAccount": +200}

// partition 1
2:{"customerId": 2, "changeInBankAccount": +100}

Leave a Reply

Your email address will not be published. Required fields are marked *


The reCAPTCHA verification period has expired. Please reload the page.