ในระบบงานปัจจุบันมีการเชื่อมต่อกับ Amazon Kinesis ซึ่งมันคือ event streaming platform (ใครนึกภาพไม่ออกมันเป็นพวกเดียวกับ Apache Kafka อ่ะครับ) โดย feature ที่เราใช้คือ data stream ไปทำการ read/write ข้อมูล นอกจากนั้นยังมี feature อื่นๆ เช่น Firehose (stream ข้อมูลไปหา Amazon S3 หรือ Amazon OpenSearch Service เพื่อทำ analytics) หรือ Video streaming

บทความนี้จะมาแบ่งปันตัวอย่างแบบ update ล่าสุดปี 2021 ว่าเราจะเชื่อมต่อกับ Kinesis บน Spring application ได้อย่างไร รวมถึงหลักการเบื้องหลังของ library ที่เราใช้ด้วย

พูดถึง Library ก่อน

Library ที่เราจะใช้คือ spring-cloud-stream-binder-kinesis ซึ่งหลักการของมันเป็นตามภาพ

Amazon Kinesis architecture https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html

ใน data stream จะประกอบไปด้วยหลายๆ shard เอาไว้ระบุลำดับของข้อมูลโดย read จะได้ 5 transactions/sec หรือ 2 MB/sec ส่วน write จะได้ 1000 transactions/sec ซึ่ง shard มันประกอบไปด้วย partitionKey ซึ่งใช้ตอน group shard เข้าด้วยการเวลาเรา write data ลงไปจะต้องใช้คู่กับ sequenceNumber ซึ่งจะถูก assign ด้วย Kinesis เอง

สิ่งที่ต่างจาก Apache Kafka คือ Amazon Kinesis จะไม่มีแนวคิดเรื่อง consumer group สิ่งที่ library นี้ทำคือจะเก็บ metadata และ lock ไว้ใน DynamoDB แยกกัน 2 table ซึ่งทำ logic ให้ consumer ตัวเดียวใน group เดียวกันทำการอ่าน message จาก shard นั้นๆ นอกจากนั้น library ยังสามารถ configure ให้กระจาย message ออกไปให้ consume group แบบเท่าๆ กันได้ด้วย

ติดตั้ง dependencies

ตัวอย่างการ declare dependencies ใน Gradle

สร้าง function สำหรับการ produce หรือ consume

แต่เดิมเราใช้ Spring Cloud Stream ในการสร้าง producer หรือ consumer โดยที่ไม่จำเป็นต้องรู้เลยว่าเราใช้ตัวไหน middleware ตัวไหนก็ได้ ไม่ว่าจะเป็น Apache Kafka หรือ RabbitMQ หรือแม้กระทั่ง Kinesis โดยจะทำผ่าน interface ดังนี้

  • Source สำหรับการกำหนด producer
  • Sink สำหรับการกำหนด consumer
  • Processor สำหรับการ process message จาก input นึงไป output นึง

การมาของ functional programming ใน Java ผ่าน java.util.function ซึ่งข้อดีคือ function เหล่านี้สามารถนำไปประยุกต์ใช้กับ functional programming ส่วนอื่นๆได้ โดยทีี่ไม่ต้องผูกติดกับ Spring Cloud Stream โดยจะเปลี่ยนเป็น

  • Supplier สำหรับการกำหนด producer
  • Consumer สำหรับการกำหนด consumer
  • Function สำหรับการ process message จาก input นึงไป output นึง

Amazon Kinesis architecture https://spring.io/blog/2020/07/13/introducing-java-functions-for-spring-cloud-stream-applications-part-0

สำหรับ non-application เราสามารถเขียนเป็น Function แบบนี้ก็ได้

สำหรับ Spring cloud stream configuration เราจะย้ายไปไว้ใน application configuration แทน

Configuration

ในส่วนของ Kinesis เราสามารถกำหนดใน application.properties หรือ application.yml เบื้องต้นได้ตามนี้

สำหรับการตั้งชื่อ binding จะมี format คือ

  • <your-producer-component-name>-out-<index> สำหรับ producer
  • <your-consumer-component-name>-in-<index> สำหรับ consumer

โดยที่ index คือลำดับของการ binding ในกรณีที่มีหลาย input ต่อหลาย output สำหรับ case ปกติ ก็จะเป็น index ก็จะเป็น 0

สำหรับ Kinesis Client Library / Kinesis Producer Library เป็นประโยชน์ในกรณีที่เราต้องการ feature ในการ re-balance ให้ consumer ไป consume shard เพื่อทำให้ consume ได้เร็วขึ้น

น่าจะทำให้การเริ่มต้น integrate กับ Amazon Kinesis เป็นเรื่องง่ายขึ้น แน่นอนว่างานในส่วนอื่นๆ เช่น configuration และการกำหนด permission ให้ Spring application มีสิทธิ์เข้าถึง Kinesis ก็จำเป็นเช่นเดียวกัน โดยสามารถเข้าไปดูในส่วนของ configuration ตัวเต็มได้ที่ https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/main/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc