จดบันทึกวิธีการ integrate Amazon Kinesis กับระบบที่พัฒนาใน Java Spring Boot
ในระบบงานปัจจุบันมีการเชื่อมต่อกับ Amazon Kinesis ซึ่งมันคือ event streaming platform (ใครนึกภาพไม่ออกมันเป็นพวกเดียวกับ Apache Kafka อ่ะครับ) โดย feature ที่เราใช้คือ data stream ไปทำการ read/write ข้อมูล นอกจากนั้นยังมี feature อื่นๆ เช่น Firehose (stream ข้อมูลไปหา Amazon S3 หรือ Amazon OpenSearch Service เพื่อทำ analytics) หรือ Video streaming
บทความนี้จะมาแบ่งปันตัวอย่างแบบ update ล่าสุดปี 2021 ว่าเราจะเชื่อมต่อกับ Kinesis บน Spring application ได้อย่างไร รวมถึงหลักการเบื้องหลังของ library ที่เราใช้ด้วย
พูดถึง Library ก่อน
Library ที่เราจะใช้คือ spring-cloud-stream-binder-kinesis ซึ่งหลักการของมันเป็นตามภาพ
https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html
ใน data stream จะประกอบไปด้วยหลายๆ shard
เอาไว้ระบุลำดับของข้อมูลโดย read จะได้ 5 transactions/sec หรือ 2 MB/sec ส่วน write จะได้ 1000 transactions/sec ซึ่ง shard
มันประกอบไปด้วย partitionKey
ซึ่งใช้ตอน group shard
เข้าด้วยการเวลาเรา write data ลงไปจะต้องใช้คู่กับ sequenceNumber
ซึ่งจะถูก assign ด้วย Kinesis เอง
สิ่งที่ต่างจาก Apache Kafka คือ Amazon Kinesis จะไม่มีแนวคิดเรื่อง consumer group สิ่งที่ library นี้ทำคือจะเก็บ metadata และ lock ไว้ใน DynamoDB แยกกัน 2 table ซึ่งทำ logic ให้ consumer ตัวเดียวใน group เดียวกันทำการอ่าน message จาก shard นั้นๆ นอกจากนั้น library ยังสามารถ configure ให้กระจาย message ออกไปให้ consume group แบบเท่าๆ กันได้ด้วย
ติดตั้ง dependencies
ตัวอย่างการ declare dependencies ใน Gradle
สร้าง function สำหรับการ produce หรือ consume
แต่เดิมเราใช้ Spring Cloud Stream ในการสร้าง producer หรือ consumer โดยที่ไม่จำเป็นต้องรู้เลยว่าเราใช้ตัวไหน middleware ตัวไหนก็ได้ ไม่ว่าจะเป็น Apache Kafka หรือ RabbitMQ หรือแม้กระทั่ง Kinesis โดยจะทำผ่าน interface ดังนี้
Source
สำหรับการกำหนด producerSink
สำหรับการกำหนด consumerProcessor
สำหรับการ process message จาก input นึงไป output นึง
การมาของ functional programming ใน Java ผ่าน java.util.function
ซึ่งข้อดีคือ function เหล่านี้สามารถนำไปประยุกต์ใช้กับ functional programming ส่วนอื่นๆได้ โดยทีี่ไม่ต้องผูกติดกับ Spring Cloud Stream โดยจะเปลี่ยนเป็น
Supplier
สำหรับการกำหนด producerConsumer
สำหรับการกำหนด consumerFunction
สำหรับการ process message จาก input นึงไป output นึง
สำหรับ non-application เราสามารถเขียนเป็น Function แบบนี้ก็ได้
สำหรับ Spring cloud stream configuration เราจะย้ายไปไว้ใน application configuration แทน
Configuration
ในส่วนของ Kinesis เราสามารถกำหนดใน application.properties
หรือ application.yml
เบื้องต้นได้ตามนี้
สำหรับการตั้งชื่อ binding จะมี format คือ
<your-producer-component-name>-out-<index>
สำหรับ producer<your-consumer-component-name>-in-<index>
สำหรับ consumer
โดยที่ index
คือลำดับของการ binding ในกรณีที่มีหลาย input ต่อหลาย output สำหรับ case ปกติ ก็จะเป็น index
ก็จะเป็น 0
สำหรับ Kinesis Client Library / Kinesis Producer Library เป็นประโยชน์ในกรณีที่เราต้องการ feature ในการ re-balance ให้ consumer ไป consume shard เพื่อทำให้ consume ได้เร็วขึ้น
น่าจะทำให้การเริ่มต้น integrate กับ Amazon Kinesis เป็นเรื่องง่ายขึ้น แน่นอนว่างานในส่วนอื่นๆ เช่น configuration และการกำหนด permission ให้ Spring application มีสิทธิ์เข้าถึง Kinesis ก็จำเป็นเช่นเดียวกัน โดยสามารถเข้าไปดูในส่วนของ configuration ตัวเต็มได้ที่ https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/main/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc