はじめに
プロフェッショナルサービス本部 SRE グループのkawamata です。
今回は at least once
を保証するメッセージングシステムを構成可能なミドルウェアである、
NATS JetStream を試してみました。
弊社intdash を構成する intdash Server
でもメッセージングシステムが動作しています。
現状は、そのメッセージングシステムの一部にNATS Streaming を採用しています。
NATS Streaming は現在 deprecated
となっており、2023/06 にはサポートが終了する予定です。本件への対応方針はプロダクト開発本部で検討中ですが、
SRE グループでも情報収集のため、NATS Streaming の後継にあたるNATS JetStream をまずは試してみました。
NATS とは
まず、NATS とは何か?について概要を説明します。 そもそも、メッセージングシステムに関して簡単に触れておきます。
メッセージングシステムはサービス間の非同期処理を実現します。 メッセージングシステムの利用者はPublisher, Subscriber と呼ばれます。 Publisher はメッセージングシステムにメッセージを送信し、Subscriber はメッセージングシステムからメッセージを受信します。 Publisher とSubscriber のメッセージングの間にシステムが入ることによって、両者は疎結合となり相互の処理の依存度が下がるため、非同期の処理が可能になります。
また、メッセージングシステムは主に送達されるメッセージの保証要件によって大別されます。 メッセージの送達保証には保証度の低い順に以下の種類があります。
at most once
- 1 回
は
送信 - メッセージの送達は保証されない
- 1 回
at least once
少なくとも
1 回送信- 1 回
以上
のメッセージ送達を保証 - メッセージの重複が有り得る
exactly once
必ず
1 回送信- 1 回
だけ
のメッセージ送達を保証 - メッセージの重複は発生しない
NATS
NATS は、CNCF(Cloud Native Computing Foundation) のプロジェクトでホストされているシンプルかつ高性能なメッセージングシステムを構成可能なオープンソースのミドルウェアです。
NATS 単体では、at most once
なPub/Sub(Publish/Subscribe) システムを構成可能です。
at most once
ですので、Subscriber はNATS サーバに接続していない間にPublisher から送られたメッセージを受信することが出来ません。
NATS Streaming
NATS Streaming はNATS Server にStreaming module と永続化のためのstorage を組み合わせ、
at least once
を保証するメッセージングシステムを構成可能にしたものです。
at least once
が保証されますので、Subscriber(この場合Consumer) はPublish されたメッセージを文字通り必ず1 回は受信可能です。
NATS Streaming はシンプルでスケールさせやすい優れたメッセージングシステムですが、いくつかの問題点がありました。 例として、以下の課題があります。
at least once
のためメッセージが重複する可能性がある- 確認済みのメッセージをシステムから削除出来ない
- Consumer はメッセージをpull 出来ない
- Subscribe の制限が柔軟に出来ない
- クラスタのスケールアウトにおいて、スケーラビリティが低い
- NATS 2.0 のセキュリティモデルに対応していない
NATS JetStream
NATS JetStream は前述の課題を解決するべく、NATS Streaming の後継として開発されたものです。 NATS Server とは別に開発されていましたが、NATS 2.2.0のタイミングでGA(Generally Available) となり、現在はNATS Serverに組み込まれています。
NATS JetStream は exactly once
を保証するメッセージングシステムを構成することが可能です。
NATS Streaming と違いメッセージの重複は発生しないアーキテクチャになっています。
NATS JetStream の特徴
ここで、NATS JetStream の設計思想や特徴をいくつか説明します。
設計思想
設計思想に関して、以下に 公式ドキュメント から抜粋します。
- 構成と操作が簡単で監視が容易
- NATS 2.0 セキュリティモデルに準拠
- スケールアウトによる高いスケーラビリティを有する
- 多様なユースケースをサポートする
- 自己回復性を有する
- メッセージペイロードに依存しない動作をする
特徴
NATS JetStream にはいくつか特徴があります。 以下に注目すべき特徴を説明します。
exactly once
なメッセージングを保証- メッセージの再生ポリシーが多様
- 再生レートが選択可能
- instant (Consumer が処理可能な最速で受信)
- original (Publisher が送信したレートで再生)
- メッセージの開始番号や時刻を選択可能
- 再生レートが選択可能
- メッセージの確認応答の種類が豊富
- メッセージストリーミング以外の機能も利用可能(JetStream は単なる永続層という位置づけ)
- Key Value Store
- 所謂、一般的なKVS として利用可能
- KVS イベントをメッセージングとして受信可能
- Object Store
- メッセージの容量制限を大きくすることでバイナリ等のデータオブジェクトをメッセージとしてストア可能
- Key Value Store
その他の特徴や制限事項は 公式ドキュメント をご確認ください。
実際に試してみた
それでは、実際にNATS JetStream を試してみます。 公式のHelm charts がありますので、こちらを利用して 3 Node のNATS JetStream クラスタをKubernetes 上で動作させます。
Kubernetes Operator 用にNATS Operator も存在していますが、こちらはNATS JetStream には対応していないため注意が必要です。
デプロイ
公式ドキュメントを参考に、helm を使用してデプロイしていきます。
- repository の追加
$ helm repo add nats https://nats-io.github.io/k8s/helm/charts/
- 設定ファイルの準備
- 以下の設定ファイル(YAML) を準備
- NATS JetStream の有効化および、クラスタの設定定義
- 以下の設定ファイル(YAML) を準備
$ cat jetstream/app-config.yaml nats: image: nats:alpine jetstream: enabled: true memStorage: enabled: true size: 2Gi fileStorage: enabled: true size: 10Gi storageClassName: gp3 cluster: enabled: true replicas: 3 $
- インストール
$ helm install nats-jetstream nats/nats -n aptpod-demo -f jetstream/app-config.yaml NAME: nats-jetstream LAST DEPLOYED: Wed May 25 18:13:20 2022 NAMESPACE: aptpod-demo STATUS: deployed REVISION: 1 NOTES: You can find more information about running NATS on Kubernetes in the NATS documentation website: https://docs.nats.io/nats-on-kubernetes/nats-kubernetes NATS Box has been deployed into your cluster, you can now use the NATS tools within the container as follows: kubectl exec -n aptpod-demo -it deployment/nats-jetstream-box -- /bin/sh -l nats-box:~# nats-sub test & nats-box:~# nats-pub test hi nats-box:~# nc nats-jetstream 4222 Thanks for using NATS! $
- 確認
- pod が動作していれば成功
$ kubectl -n aptpod-demo get pods nats-jetstream-0 3/3 Running 0 73m nats-jetstream-1 3/3 Running 0 73m nats-jetstream-2 3/3 Running 0 73m nats-jetstream-box-65d7d89987-gbrg4 1/1 Running 0 73m $
メッセージストリーミング
公式ドキュメント を参考にメッセージストリーミングを試してみます。
- nats-box にログイン
$ kubectl exec -n aptpod-demo -it deployment/nats-jetstream-box -- /bin/sh ~ #
- stream の作成
intdash_test
という名前で作成
~ # nats stream add intdash_test ? Subjects intdash ? Storage file ? Replication 3 ? Retention Policy Limits ? Discard Policy Old ? Stream Messages Limit -1 ? Per Subject Messages Limit -1 ? Total Stream Size -1 ? Message TTL -1 ? Max Message Size -1 ? Duplicate tracking time window 2m0s ? Allow message Roll-ups No ? Allow message deletion Yes ? Allow purging subjects or the entire stream Yes Stream intdash_test was created Information for Stream intdash_test created 2022-05-25T09:46:03Z Configuration: Subjects: intdash Acknowledgements: true Retention: File - Limits Replicas: 3 Discard Policy: Old Duplicate Window: 2m0s Allows Msg Delete: true Allows Purge: true Allows Rollups: false Maximum Messages: unlimited Maximum Bytes: unlimited Maximum Age: unlimited Maximum Message Size: unlimited Maximum Consumers: unlimited Cluster Information: Name: nats Leader: nats-jetstream-0 Replica: nats-jetstream-1, current, seen 0.00s ago Replica: nats-jetstream-2, current, seen 0.00s ago State: Messages: 0 Bytes: 0 B FirstSeq: 0 LastSeq: 0 Active Consumers: 0 ~ #
- consumer の作成
- 以下2 つのconsumer を作成
intdash_consumer
intdash_consumer2
- 以下2 つのconsumer を作成
~ # nats consumer add ? Consumer name intdash_consumer ? Delivery target (empty for Pull Consumers) ? Start policy (all, new, last, subject, 1h, msg sequence) all ? Acknowledgement policy all ? Replay policy instant ? Filter Stream by subject (blank for all) intdash ? Maximum Allowed Deliveries -1 ? Maximum Acknowledgements Pending 0 ? Deliver headers only without bodies No ? Add a Retry Backoff Policy No ? Select a Stream intdash_test Information for Consumer intdash_test > intdash_consumer created 2022-05-25T09:52:54Z Configuration: Durable Name: intdash_consumer Pull Mode: true Filter Subject: intdash Deliver Policy: All Ack Policy: All Ack Wait: 30s Replay Policy: Instant Max Ack Pending: 1,000 Max Waiting Pulls: 512 Cluster Information: Name: nats Leader: nats-jetstream-1 Replica: nats-jetstream-0, current, not seen Replica: nats-jetstream-2, current, seen 0.00s ago State: Last Delivered Message: Consumer sequence: 0 Stream sequence: 0 Acknowledgment floor: Consumer sequence: 0 Stream sequence: 0 Outstanding Acks: 0 out of maximum 1,000 Redelivered Messages: 0 Unprocessed Messages: 10 Waiting Pulls: 0 of maximum 512 ~ # ~ # nats consumer add ? Consumer name intdash_consumer2 ? Delivery target (empty for Pull Consumers) ? Start policy (all, new, last, subject, 1h, msg sequence) all ? Acknowledgement policy all ? Replay policy original ? Filter Stream by subject (blank for all) intdash ? Maximum Allowed Deliveries -1 ? Maximum Acknowledgements Pending 0 ? Deliver headers only without bodies No ? Add a Retry Backoff Policy No ? Select a Stream intdash_test Information for Consumer intdash_test > intdash_consumer2 created 2022-05-25T09:53:42Z Configuration: Durable Name: intdash_consumer2 Pull Mode: true Filter Subject: intdash Deliver Policy: All Ack Policy: All Ack Wait: 30s Replay Policy: Original Max Ack Pending: 1,000 Max Waiting Pulls: 512 Cluster Information: Name: nats Leader: nats-jetstream-1 Replica: nats-jetstream-0, current, not seen Replica: nats-jetstream-2, current, seen 0.00s ago State: Last Delivered Message: Consumer sequence: 0 Stream sequence: 0 Acknowledgment floor: Consumer sequence: 0 Stream sequence: 0 Outstanding Acks: 0 out of maximum 1,000 Redelivered Messages: 0 Unprocessed Messages: 10 Waiting Pulls: 0 of maximum 512 ~ #
- メッセージのpublication
- subject は
intdash
- 1 s 毎に10 メッセージを送信
- subject は
~ # nats pub intdash --count=10 --sleep 1s "publication #{{Count}} @ {{TimeStamp}}" 09:49:38 Published 37 bytes to "intdash" 09:49:39 Published 37 bytes to "intdash" 09:49:40 Published 37 bytes to "intdash" 09:49:41 Published 37 bytes to "intdash" 09:49:42 Published 37 bytes to "intdash" 09:49:43 Published 37 bytes to "intdash" 09:49:44 Published 37 bytes to "intdash" 09:49:45 Published 37 bytes to "intdash" 09:49:46 Published 37 bytes to "intdash" 09:49:47 Published 38 bytes to "intdash" ~ #
- intdash_consumer でメッセージ受信
- メッセージ再生レート
instant
- publishe 時のレートとは無関係に受信しているのがわかる
- メッセージ再生レート
~ # nats consumer next intdash_test intdash_consumer --count 10 [09:55:04] subj: intdash / tries: 1 / cons seq: 1 / str seq: 1 / pending: 9 publication #1 @ 2022-05-25T09:49:37Z Acknowledged message [09:55:04] subj: intdash / tries: 1 / cons seq: 2 / str seq: 2 / pending: 8 publication #2 @ 2022-05-25T09:49:38Z Acknowledged message [09:55:04] subj: intdash / tries: 1 / cons seq: 3 / str seq: 3 / pending: 7 publication #3 @ 2022-05-25T09:49:39Z Acknowledged message [09:55:04] subj: intdash / tries: 1 / cons seq: 4 / str seq: 4 / pending: 6 publication #4 @ 2022-05-25T09:49:40Z Acknowledged message [09:55:04] subj: intdash / tries: 1 / cons seq: 5 / str seq: 5 / pending: 5 publication #5 @ 2022-05-25T09:49:41Z Acknowledged message [09:55:04] subj: intdash / tries: 1 / cons seq: 6 / str seq: 6 / pending: 4 publication #6 @ 2022-05-25T09:49:42Z Acknowledged message [09:55:04] subj: intdash / tries: 1 / cons seq: 7 / str seq: 7 / pending: 3 publication #7 @ 2022-05-25T09:49:43Z Acknowledged message [09:55:04] subj: intdash / tries: 1 / cons seq: 8 / str seq: 8 / pending: 2 publication #8 @ 2022-05-25T09:49:44Z Acknowledged message [09:55:04] subj: intdash / tries: 1 / cons seq: 9 / str seq: 9 / pending: 1 publication #9 @ 2022-05-25T09:49:45Z Acknowledged message [09:55:04] subj: intdash / tries: 1 / cons seq: 10 / str seq: 10 / pending: 0 publication #10 @ 2022-05-25T09:49:46Z Acknowledged message ~ #
- intdash_consumer2 でメッセージを5 ずつ受信
- メッセージ再生レート
original
- publishe 時のレートで受信しているのがわかる
- メッセージ再生レート
~ # nats consumer next intdash_test intdash_consumer2 --count 5 [09:55:37] subj: intdash / tries: 1 / cons seq: 1 / str seq: 1 / pending: 9 publication #1 @ 2022-05-25T09:49:37Z Acknowledged message [09:55:38] subj: intdash / tries: 1 / cons seq: 2 / str seq: 2 / pending: 8 publication #2 @ 2022-05-25T09:49:38Z Acknowledged message [09:55:39] subj: intdash / tries: 1 / cons seq: 3 / str seq: 3 / pending: 7 publication #3 @ 2022-05-25T09:49:39Z Acknowledged message [09:55:40] subj: intdash / tries: 1 / cons seq: 4 / str seq: 4 / pending: 6 publication #4 @ 2022-05-25T09:49:40Z Acknowledged message [09:55:41] subj: intdash / tries: 1 / cons seq: 5 / str seq: 5 / pending: 5 publication #5 @ 2022-05-25T09:49:41Z Acknowledged message ~ # nats consumer next intdash_test intdash_consumer2 --count 5 [09:55:52] subj: intdash / tries: 1 / cons seq: 6 / str seq: 6 / pending: 4 publication #6 @ 2022-05-25T09:49:42Z Acknowledged message [09:55:53] subj: intdash / tries: 1 / cons seq: 7 / str seq: 7 / pending: 3 publication #7 @ 2022-05-25T09:49:43Z Acknowledged message [09:55:54] subj: intdash / tries: 1 / cons seq: 8 / str seq: 8 / pending: 2 publication #8 @ 2022-05-25T09:49:44Z Acknowledged message [09:55:55] subj: intdash / tries: 1 / cons seq: 9 / str seq: 9 / pending: 1 publication #9 @ 2022-05-25T09:49:45Z Acknowledged message [09:55:56] subj: intdash / tries: 1 / cons seq: 10 / str seq: 10 / pending: 0 publication #10 @ 2022-05-25T09:49:46Z Acknowledged message ~ # nats consumer next intdash_test intdash_consumer2 --count 5 nats: error: no message received: nats: timeout ~ #
purge
サブコマンドや rm
サブコマンドでstream やメッセージの削除も可能になっています。
Key Value ストア
公式ドキュメントを参考にKey Value ストアも試してみます。
- kvs の作成
intdash_kv
という名前で作成
~ # nats kv add intdash_kv Information for Key-Value Store Bucket intdash_kv created 2022-05-25T10:04:10Z Configuration: Bucket Name: intdash_kv History Kept: 1 Values Stored: 0 Backing Store Kind: JetStream Maximum Bucket Size: unlimited Maximum Value Size: unlimited JetStream Stream: KV_intdash_kv Storage: File Cluster Information: Name: nats Leader: nats-jetstream-1 ~ #
- key のput
2b126b37-f663-40e6-9ad6-2665b8ef2de3
というkey にintdash_X1
というvalue をストア
~ # nats kv put intdash_kv 2b126b37-f663-40e6-9ad6-2665b8ef2de3 intdash_X1 intdash_X1 ~ #
- key のget
~ # nats kv get intdash_kv 2b126b37-f663-40e6-9ad6-2665b8ef2de3 intdash_kv > 2b126b37-f663-40e6-9ad6-2665b8ef2de3 created @ 25 May 22 10:07 UTC intdash_X1 ~ #
- key のdelete
~ # nats kv del intdash_kv 2b126b37-f663-40e6-9ad6-2665b8ef2de3 ? Delete key intdash_kv > 2b126b37-f663-40e6-9ad6-2665b8ef2de3? Yes ~ # ~ # nats kv get intdash_kv 2b126b37-f663-40e6-9ad6-2665b8ef2de3 nats: error: nats: key not found, try --help ~ #
- key event の確認
~ # nats kv watch intdash_kv [2022-05-25 10:08:09] DELETE intdash_kv > 2b126b37-f663-40e6-9ad6-2665b8ef2de3 ^C ~ #
特に問題なくKVS として動作しています。
オブジェクトストア
公式ドキュメントを参考にオブジェクトストアも試してみます。
オブジェクトストアの機能は現時点では Experimental Preview
です。
- オブジェクトバケットの作成
intdash_objbucket
という名前で作成
~ # nats object add intdash_objbucket Information for Object Store Bucket intdash_objbucket created 2022-05-25T10:12:48Z Configuration: Bucket Name: intdash_objbucket Replicas: 1 TTL: unlimited Sealed: false Size: 0 B Maximum Bucket Size: unlimited Backing Store Kind: JetStream JetStream Stream: OBJ_intdash_objbucket Cluster Information: Name: nats Leader: nats-jetstream-1 ~ #
- オブジェクトのput
aptlogo-dark.png
という弊社のロゴ画像ファイルをput
~ # nats object put intdash_objbucket aptlogo-dark.png Object information for intdash_objbucket > aptlogo-dark.png Size: 5.1 KiB Modification Time: 25 May 22 10:16 +0000 Chunks: 1 Digest: sha-256 43bcdf3b98e2e96bbd4f5bde5d32c29520f30fea7253cedba910334442ad ~ # nats object ls intdash_objbucket ╭───────────────────────────────────────────────────╮ │ Bucket Contents │ ├──────────────────┬─────────┬──────────────────────┤ │ Name │ Size │ Time │ ├──────────────────┼─────────┼──────────────────────┤ │ aptlogo-dark.png │ 5.1 KiB │ 2022-05-25T10:16:45Z │ ╰──────────────────┴─────────┴──────────────────────╯ ~ # ~ # nats object info intdash_objbucket Information for Object Store Bucket intdash_objbucket created 2022-05-25T10:12:48Z Configuration: Bucket Name: intdash_objbucket Replicas: 1 TTL: unlimited Sealed: false Size: 5.5 KiB Maximum Bucket Size: unlimited Backing Store Kind: JetStream JetStream Stream: OBJ_intdash_objbucket Cluster Information: Name: nats Leader: nats-jetstream-1 ~ #
- オブジェクトのget
~ # nats object get intdash_objbucket aptlogo-dark.png Wrote: 5.1 KiB to /root/aptlogo-dark.png in 0.00s ~ # ls aptlogo-dark.png ~ #
- オブジェクトのdelete
~ # nats object del intdash_objbucket aptlogo-dark.png ? Delete 5.1 KiB byte file intdash_objbucket > aptlogo-dark.png? Yes Removed intdash_objbucket > aptlogo-dark.png Information for Object Store Bucket intdash_objbucket created 2022-05-25T10:12:48Z Configuration: Bucket Name: intdash_objbucket Replicas: 1 TTL: unlimited Sealed: false Size: 267 B Maximum Bucket Size: unlimited Backing Store Kind: JetStream JetStream Stream: OBJ_intdash_objbucket Cluster Information: Name: nats Leader: nats-jetstream-1 ~ # nats object ls intdash_objbucket nats: error: nats: no objects found, try --help ~ #
オブジェクトストアも機能上は問題なく動作しました。 ちなみに、pod のログを確認すると分かりますが、KVS やオブジェクトストアを作成した場合は以下のような接頭語をつけたstream が作成され、consumer は都度ランダムな文字列で作成される仕様になっています。
- KVS
KV_
- 今回の例では
KV_intdash_kv
になる
- オブジェクトストア
OBJ_
- 今回の例では
OBJ_intdash_objbucket
になる
おわりに
今回は exactly once
を保証するメッセージングシステムを構成可能なミドルウェアである、
NATS JetStream を試してみました。
今の所、機能上は問題なく、NATS Streamingが抱えていた種々の課題や制約が解消されていそうなため 弊社の次期メッセージングシステムの1つとして有望です。
今後、SRE グループでは以下に取り組みつつ、プロダクト開発本部と連携して他も鑑みミドルウェアの検討を続ける予定です。
- 詳細な仕様確認
- 特にstream 制限仕様の確認
- 詳細な機能試験
- 特に多様なAck 周り
- 非機能試験
- 特にスケーラビリティの確認
- デプロイ方式の確立
- 既存の公式helm chart やOperator を参考にしながら自前でOperator を用意できると良いと考えています。