aptpod Tech Blog

株式会社アプトポッドのテクノロジーブログです

NATS JetStream 試してみた

はじめに

プロフェッショナルサービス本部 SRE グループのkawamata です。

今回は at least once を保証するメッセージングシステムを構成可能なミドルウェアである、 NATS JetStream を試してみました。

弊社intdash を構成する intdash Server でもメッセージングシステムが動作しています。 現状は、そのメッセージングシステムの一部にNATS Streaming を採用しています。

NATS Streaming は現在 deprecated となっており、2023/06 にはサポートが終了する予定です。本件への対応方針はプロダクト開発本部で検討中ですが、 SRE グループでも情報収集のため、NATS Streaming の後継にあたるNATS JetStream をまずは試してみました。

NATS とは

まず、NATS とは何か?について概要を説明します。 そもそも、メッセージングシステムに関して簡単に触れておきます。

メッセージングシステムはサービス間の非同期処理を実現します。 メッセージングシステムの利用者はPublisher, Subscriber と呼ばれます。 Publisher はメッセージングシステムにメッセージを送信し、Subscriber はメッセージングシステムからメッセージを受信します。 Publisher とSubscriber のメッセージングの間にシステムが入ることによって、両者は疎結合となり相互の処理の依存度が下がるため、非同期の処理が可能になります。

また、メッセージングシステムは主に送達されるメッセージの保証要件によって大別されます。 メッセージの送達保証には保証度の低い順に以下の種類があります。

  • at most once
    • 1 回 送信
    • メッセージの送達は保証されない
  • at least once
    • 少なくとも 1 回送信
    • 1 回 以上 のメッセージ送達を保証
    • メッセージの重複が有り得る
  • exactly once
    • 必ず 1 回送信
    • 1 回 だけ のメッセージ送達を保証
    • メッセージの重複は発生しない

NATS

NATS は、CNCF(Cloud Native Computing Foundation) のプロジェクトでホストされているシンプルかつ高性能なメッセージングシステムを構成可能なオープンソースのミドルウェアです。

NATS 単体では、at most once なPub/Sub(Publish/Subscribe) システムを構成可能です。 at most once ですので、Subscriber はNATS サーバに接続していない間にPublisher から送られたメッセージを受信することが出来ません。

NATS Streaming

NATS StreamingNATS Server にStreaming module と永続化のためのstorage を組み合わせ、 at least once を保証するメッセージングシステムを構成可能にしたものです。 at least once が保証されますので、Subscriber(この場合Consumer) はPublish されたメッセージを文字通り必ず1 回は受信可能です。

NATS Streaming はシンプルでスケールさせやすい優れたメッセージングシステムですが、いくつかの問題点がありました。 例として、以下の課題があります。

  • at least once のためメッセージが重複する可能性がある
  • 確認済みのメッセージをシステムから削除出来ない
  • Consumer はメッセージをpull 出来ない
  • Subscribe の制限が柔軟に出来ない
  • クラスタのスケールアウトにおいて、スケーラビリティが低い
  • NATS 2.0 のセキュリティモデルに対応していない

NATS JetStream

NATS JetStream は前述の課題を解決するべく、NATS Streaming の後継として開発されたものです。 NATS Server とは別に開発されていましたが、NATS 2.2.0のタイミングでGA(Generally Available) となり、現在はNATS Serverに組み込まれています。

NATS JetStreamexactly once を保証するメッセージングシステムを構成することが可能です。 NATS Streaming と違いメッセージの重複は発生しないアーキテクチャになっています。

NATS JetStream の特徴

ここで、NATS JetStream の設計思想や特徴をいくつか説明します。

設計思想

設計思想に関して、以下に 公式ドキュメント から抜粋します。

  • 構成と操作が簡単で監視が容易
  • NATS 2.0 セキュリティモデルに準拠
  • スケールアウトによる高いスケーラビリティを有する
  • 多様なユースケースをサポートする
  • 自己回復性を有する
  • メッセージペイロードに依存しない動作をする

特徴

NATS JetStream にはいくつか特徴があります。 以下に注目すべき特徴を説明します。

  • exactly once なメッセージングを保証
  • メッセージの再生ポリシーが多様
    • 再生レートが選択可能
      • instant (Consumer が処理可能な最速で受信)
      • original (Publisher が送信したレートで再生)
    • メッセージの開始番号や時刻を選択可能
  • メッセージの確認応答の種類が豊富
  • メッセージストリーミング以外の機能も利用可能(JetStream は単なる永続層という位置づけ)
    • Key Value Store
      • 所謂、一般的なKVS として利用可能
      • KVS イベントをメッセージングとして受信可能
    • Object Store
      • メッセージの容量制限を大きくすることでバイナリ等のデータオブジェクトをメッセージとしてストア可能

その他の特徴や制限事項は 公式ドキュメント をご確認ください。

実際に試してみた

それでは、実際にNATS JetStream を試してみます。 公式のHelm charts がありますので、こちらを利用して 3 Node のNATS JetStream クラスタをKubernetes 上で動作させます。

Kubernetes Operator 用にNATS Operator も存在していますが、こちらはNATS JetStream には対応していないため注意が必要です。

デプロイ

公式ドキュメントを参考に、helm を使用してデプロイしていきます。

  • repository の追加
$ helm repo add nats https://nats-io.github.io/k8s/helm/charts/
  • 設定ファイルの準備
    • 以下の設定ファイル(YAML) を準備
      • NATS JetStream の有効化および、クラスタの設定定義
$ cat jetstream/app-config.yaml
nats:

  image: nats:alpine

  jetstream:
    enabled: true

    memStorage:
      enabled: true
      size: 2Gi

    fileStorage:
      enabled: true
      size: 10Gi
      storageClassName: gp3

cluster:
  enabled: true
  replicas: 3
$
  • インストール
$ helm install nats-jetstream nats/nats -n aptpod-demo -f jetstream/app-config.yaml
NAME: nats-jetstream
LAST DEPLOYED: Wed May 25 18:13:20 2022
NAMESPACE: aptpod-demo
STATUS: deployed
REVISION: 1
NOTES:
You can find more information about running NATS on Kubernetes
in the NATS documentation website:

  https://docs.nats.io/nats-on-kubernetes/nats-kubernetes

NATS Box has been deployed into your cluster, you can
now use the NATS tools within the container as follows:

  kubectl exec -n aptpod-demo -it deployment/nats-jetstream-box -- /bin/sh -l

  nats-box:~# nats-sub test &
  nats-box:~# nats-pub test hi
  nats-box:~# nc nats-jetstream 4222

Thanks for using NATS!
$
  • 確認
    • pod が動作していれば成功
$ kubectl -n aptpod-demo get pods
nats-jetstream-0                                     3/3     Running   0          73m
nats-jetstream-1                                     3/3     Running   0          73m
nats-jetstream-2                                     3/3     Running   0          73m
nats-jetstream-box-65d7d89987-gbrg4                  1/1     Running   0          73m
$

メッセージストリーミング

公式ドキュメント を参考にメッセージストリーミングを試してみます。

  • nats-box にログイン
$ kubectl exec -n aptpod-demo -it deployment/nats-jetstream-box -- /bin/sh
~ #
  • stream の作成
    • intdash_test という名前で作成
~ # nats stream add intdash_test
? Subjects intdash
? Storage file
? Replication 3
? Retention Policy Limits
? Discard Policy Old
? Stream Messages Limit -1
? Per Subject Messages Limit -1
? Total Stream Size -1
? Message TTL -1
? Max Message Size -1
? Duplicate tracking time window 2m0s
? Allow message Roll-ups No
? Allow message deletion Yes
? Allow purging subjects or the entire stream Yes
Stream intdash_test was created

Information for Stream intdash_test created 2022-05-25T09:46:03Z

Configuration:

             Subjects: intdash
     Acknowledgements: true
            Retention: File - Limits
             Replicas: 3
       Discard Policy: Old
     Duplicate Window: 2m0s
    Allows Msg Delete: true
         Allows Purge: true
       Allows Rollups: false
     Maximum Messages: unlimited
        Maximum Bytes: unlimited
          Maximum Age: unlimited
 Maximum Message Size: unlimited
    Maximum Consumers: unlimited


Cluster Information:

                 Name: nats
               Leader: nats-jetstream-0
              Replica: nats-jetstream-1, current, seen 0.00s ago
              Replica: nats-jetstream-2, current, seen 0.00s ago

State:

             Messages: 0
                Bytes: 0 B
             FirstSeq: 0
              LastSeq: 0
     Active Consumers: 0
~ #
  • consumer の作成
    • 以下2 つのconsumer を作成
      • intdash_consumer
      • intdash_consumer2
~ # nats consumer add
? Consumer name intdash_consumer
? Delivery target (empty for Pull Consumers)
? Start policy (all, new, last, subject, 1h, msg sequence) all
? Acknowledgement policy all
? Replay policy instant
? Filter Stream by subject (blank for all) intdash
? Maximum Allowed Deliveries -1
? Maximum Acknowledgements Pending 0
? Deliver headers only without bodies No
? Add a Retry Backoff Policy No
? Select a Stream intdash_test
Information for Consumer intdash_test > intdash_consumer created 2022-05-25T09:52:54Z

Configuration:

        Durable Name: intdash_consumer
           Pull Mode: true
      Filter Subject: intdash
      Deliver Policy: All
          Ack Policy: All
            Ack Wait: 30s
       Replay Policy: Instant
     Max Ack Pending: 1,000
   Max Waiting Pulls: 512

Cluster Information:

                Name: nats
              Leader: nats-jetstream-1
             Replica: nats-jetstream-0, current, not seen
             Replica: nats-jetstream-2, current, seen 0.00s ago

State:

   Last Delivered Message: Consumer sequence: 0 Stream sequence: 0
     Acknowledgment floor: Consumer sequence: 0 Stream sequence: 0
         Outstanding Acks: 0 out of maximum 1,000
     Redelivered Messages: 0
     Unprocessed Messages: 10
            Waiting Pulls: 0 of maximum 512

~ #
~ # nats consumer add
? Consumer name intdash_consumer2
? Delivery target (empty for Pull Consumers)
? Start policy (all, new, last, subject, 1h, msg sequence) all
? Acknowledgement policy all
? Replay policy original
? Filter Stream by subject (blank for all) intdash
? Maximum Allowed Deliveries -1
? Maximum Acknowledgements Pending 0
? Deliver headers only without bodies No
? Add a Retry Backoff Policy No
? Select a Stream intdash_test
Information for Consumer intdash_test > intdash_consumer2 created 2022-05-25T09:53:42Z

Configuration:

        Durable Name: intdash_consumer2
           Pull Mode: true
      Filter Subject: intdash
      Deliver Policy: All
          Ack Policy: All
            Ack Wait: 30s
       Replay Policy: Original
     Max Ack Pending: 1,000
   Max Waiting Pulls: 512

Cluster Information:

                Name: nats
              Leader: nats-jetstream-1
             Replica: nats-jetstream-0, current, not seen
             Replica: nats-jetstream-2, current, seen 0.00s ago

State:

   Last Delivered Message: Consumer sequence: 0 Stream sequence: 0
     Acknowledgment floor: Consumer sequence: 0 Stream sequence: 0
         Outstanding Acks: 0 out of maximum 1,000
     Redelivered Messages: 0
     Unprocessed Messages: 10
            Waiting Pulls: 0 of maximum 512

~ #
  • メッセージのpublication
    • subject はintdash
    • 1 s 毎に10 メッセージを送信
~ # nats pub intdash --count=10 --sleep 1s "publication #{{Count}} @ {{TimeStamp}}"
09:49:38 Published 37 bytes to "intdash"
09:49:39 Published 37 bytes to "intdash"
09:49:40 Published 37 bytes to "intdash"
09:49:41 Published 37 bytes to "intdash"
09:49:42 Published 37 bytes to "intdash"
09:49:43 Published 37 bytes to "intdash"
09:49:44 Published 37 bytes to "intdash"
09:49:45 Published 37 bytes to "intdash"
09:49:46 Published 37 bytes to "intdash"
09:49:47 Published 38 bytes to "intdash"
~ #
  • intdash_consumer でメッセージ受信
    • メッセージ再生レート instant
    • publishe 時のレートとは無関係に受信しているのがわかる
~ # nats consumer next intdash_test intdash_consumer --count 10
[09:55:04] subj: intdash / tries: 1 / cons seq: 1 / str seq: 1 / pending: 9

publication #1 @ 2022-05-25T09:49:37Z

Acknowledged message

[09:55:04] subj: intdash / tries: 1 / cons seq: 2 / str seq: 2 / pending: 8

publication #2 @ 2022-05-25T09:49:38Z

Acknowledged message

[09:55:04] subj: intdash / tries: 1 / cons seq: 3 / str seq: 3 / pending: 7

publication #3 @ 2022-05-25T09:49:39Z

Acknowledged message

[09:55:04] subj: intdash / tries: 1 / cons seq: 4 / str seq: 4 / pending: 6

publication #4 @ 2022-05-25T09:49:40Z

Acknowledged message

[09:55:04] subj: intdash / tries: 1 / cons seq: 5 / str seq: 5 / pending: 5

publication #5 @ 2022-05-25T09:49:41Z

Acknowledged message

[09:55:04] subj: intdash / tries: 1 / cons seq: 6 / str seq: 6 / pending: 4

publication #6 @ 2022-05-25T09:49:42Z

Acknowledged message

[09:55:04] subj: intdash / tries: 1 / cons seq: 7 / str seq: 7 / pending: 3

publication #7 @ 2022-05-25T09:49:43Z

Acknowledged message

[09:55:04] subj: intdash / tries: 1 / cons seq: 8 / str seq: 8 / pending: 2

publication #8 @ 2022-05-25T09:49:44Z

Acknowledged message

[09:55:04] subj: intdash / tries: 1 / cons seq: 9 / str seq: 9 / pending: 1

publication #9 @ 2022-05-25T09:49:45Z

Acknowledged message

[09:55:04] subj: intdash / tries: 1 / cons seq: 10 / str seq: 10 / pending: 0

publication #10 @ 2022-05-25T09:49:46Z

Acknowledged message

~ #
  • intdash_consumer2 でメッセージを5 ずつ受信
    • メッセージ再生レート original
    • publishe 時のレートで受信しているのがわかる
~ # nats consumer next intdash_test intdash_consumer2 --count 5
[09:55:37] subj: intdash / tries: 1 / cons seq: 1 / str seq: 1 / pending: 9

publication #1 @ 2022-05-25T09:49:37Z

Acknowledged message

[09:55:38] subj: intdash / tries: 1 / cons seq: 2 / str seq: 2 / pending: 8

publication #2 @ 2022-05-25T09:49:38Z

Acknowledged message

[09:55:39] subj: intdash / tries: 1 / cons seq: 3 / str seq: 3 / pending: 7

publication #3 @ 2022-05-25T09:49:39Z

Acknowledged message

[09:55:40] subj: intdash / tries: 1 / cons seq: 4 / str seq: 4 / pending: 6

publication #4 @ 2022-05-25T09:49:40Z

Acknowledged message

[09:55:41] subj: intdash / tries: 1 / cons seq: 5 / str seq: 5 / pending: 5

publication #5 @ 2022-05-25T09:49:41Z

Acknowledged message

~ # nats consumer next intdash_test intdash_consumer2 --count 5
[09:55:52] subj: intdash / tries: 1 / cons seq: 6 / str seq: 6 / pending: 4

publication #6 @ 2022-05-25T09:49:42Z

Acknowledged message

[09:55:53] subj: intdash / tries: 1 / cons seq: 7 / str seq: 7 / pending: 3

publication #7 @ 2022-05-25T09:49:43Z

Acknowledged message

[09:55:54] subj: intdash / tries: 1 / cons seq: 8 / str seq: 8 / pending: 2

publication #8 @ 2022-05-25T09:49:44Z

Acknowledged message

[09:55:55] subj: intdash / tries: 1 / cons seq: 9 / str seq: 9 / pending: 1

publication #9 @ 2022-05-25T09:49:45Z

Acknowledged message

[09:55:56] subj: intdash / tries: 1 / cons seq: 10 / str seq: 10 / pending: 0

publication #10 @ 2022-05-25T09:49:46Z

Acknowledged message

~ # nats consumer next intdash_test intdash_consumer2 --count 5
nats: error: no message received: nats: timeout
~ #

purge サブコマンドや rm サブコマンドでstream やメッセージの削除も可能になっています。

Key Value ストア

公式ドキュメントを参考にKey Value ストアも試してみます。

  • kvs の作成
    • intdash_kv という名前で作成
~ # nats kv add intdash_kv
Information for Key-Value Store Bucket intdash_kv created 2022-05-25T10:04:10Z

Configuration:

         Bucket Name: intdash_kv
        History Kept: 1
       Values Stored: 0
  Backing Store Kind: JetStream
 Maximum Bucket Size: unlimited
  Maximum Value Size: unlimited
    JetStream Stream: KV_intdash_kv
             Storage: File

Cluster Information:

                Name: nats
              Leader: nats-jetstream-1

~ #
  • key のput
    • 2b126b37-f663-40e6-9ad6-2665b8ef2de3 というkey にintdash_X1 というvalue をストア
~ # nats kv put intdash_kv 2b126b37-f663-40e6-9ad6-2665b8ef2de3 intdash_X1
intdash_X1
~ #
  • key のget
~ # nats kv get intdash_kv 2b126b37-f663-40e6-9ad6-2665b8ef2de3
intdash_kv > 2b126b37-f663-40e6-9ad6-2665b8ef2de3 created @ 25 May 22 10:07 UTC

intdash_X1

~ #
  • key のdelete
~ # nats kv del intdash_kv 2b126b37-f663-40e6-9ad6-2665b8ef2de3
? Delete key intdash_kv > 2b126b37-f663-40e6-9ad6-2665b8ef2de3? Yes
~ #
~ # nats kv get intdash_kv 2b126b37-f663-40e6-9ad6-2665b8ef2de3
nats: error: nats: key not found, try --help
~ #
  • key event の確認
~ # nats kv watch intdash_kv
[2022-05-25 10:08:09] DELETE intdash_kv > 2b126b37-f663-40e6-9ad6-2665b8ef2de3

^C
~ #

特に問題なくKVS として動作しています。

オブジェクトストア

公式ドキュメントを参考にオブジェクトストアも試してみます。 オブジェクトストアの機能は現時点では Experimental Preview です。

  • オブジェクトバケットの作成
    • intdash_objbucket という名前で作成
~ # nats object add intdash_objbucket
Information for Object Store Bucket intdash_objbucket created 2022-05-25T10:12:48Z

Configuration:

         Bucket Name: intdash_objbucket
            Replicas: 1
                 TTL: unlimited
              Sealed: false
                Size: 0 B
 Maximum Bucket Size: unlimited
  Backing Store Kind: JetStream
    JetStream Stream: OBJ_intdash_objbucket

Cluster Information:

                Name: nats
              Leader: nats-jetstream-1

~ #
  • オブジェクトのput
    • aptlogo-dark.png という弊社のロゴ画像ファイルをput
~ # nats object put intdash_objbucket aptlogo-dark.png
Object information for intdash_objbucket > aptlogo-dark.png

               Size: 5.1 KiB
  Modification Time: 25 May 22 10:16 +0000
             Chunks: 1
             Digest: sha-256 43bcdf3b98e2e96bbd4f5bde5d32c29520f30fea7253cedba910334442ad
~ # nats object ls intdash_objbucket
╭───────────────────────────────────────────────────╮
│                  Bucket Contents                  │
├──────────────────┬─────────┬──────────────────────┤
│ Name             │ Size    │ Time                 │
├──────────────────┼─────────┼──────────────────────┤
│ aptlogo-dark.png │ 5.1 KiB │ 2022-05-25T10:16:45Z │
╰──────────────────┴─────────┴──────────────────────╯

~ # 
~ # nats object info intdash_objbucket
Information for Object Store Bucket intdash_objbucket created 2022-05-25T10:12:48Z

Configuration:

         Bucket Name: intdash_objbucket
            Replicas: 1
                 TTL: unlimited
              Sealed: false
                Size: 5.5 KiB
 Maximum Bucket Size: unlimited
  Backing Store Kind: JetStream
    JetStream Stream: OBJ_intdash_objbucket

Cluster Information:

                Name: nats
              Leader: nats-jetstream-1

~ #
  • オブジェクトのget
~ # nats object get intdash_objbucket aptlogo-dark.png
Wrote: 5.1 KiB to /root/aptlogo-dark.png in 0.00s
~ # ls
aptlogo-dark.png
~ #
  • オブジェクトのdelete
~ # nats object del intdash_objbucket aptlogo-dark.png
? Delete 5.1 KiB byte file intdash_objbucket > aptlogo-dark.png? Yes
Removed intdash_objbucket > aptlogo-dark.png
Information for Object Store Bucket intdash_objbucket created 2022-05-25T10:12:48Z

Configuration:

         Bucket Name: intdash_objbucket
            Replicas: 1
                 TTL: unlimited
              Sealed: false
                Size: 267 B
 Maximum Bucket Size: unlimited
  Backing Store Kind: JetStream
    JetStream Stream: OBJ_intdash_objbucket

Cluster Information:

                Name: nats
              Leader: nats-jetstream-1

~ # nats object ls intdash_objbucket
nats: error: nats: no objects found, try --help
~ #

オブジェクトストアも機能上は問題なく動作しました。 ちなみに、pod のログを確認すると分かりますが、KVS やオブジェクトストアを作成した場合は以下のような接頭語をつけたstream が作成され、consumer は都度ランダムな文字列で作成される仕様になっています。

  • KVS
    • KV_
    • 今回の例では KV_intdash_kv になる
  • オブジェクトストア
    • OBJ_
    • 今回の例では OBJ_intdash_objbucket になる

おわりに

今回は exactly once を保証するメッセージングシステムを構成可能なミドルウェアである、 NATS JetStream を試してみました。

今の所、機能上は問題なく、NATS Streamingが抱えていた種々の課題や制約が解消されていそうなため 弊社の次期メッセージングシステムの1つとして有望です。

今後、SRE グループでは以下に取り組みつつ、プロダクト開発本部と連携して他も鑑みミドルウェアの検討を続ける予定です。

  • 詳細な仕様確認
    • 特にstream 制限仕様の確認
  • 詳細な機能試験
    • 特に多様なAck 周り
  • 非機能試験
    • 特にスケーラビリティの確認
  • デプロイ方式の確立
    • 既存の公式helm chart やOperator を参考にしながら自前でOperator を用意できると良いと考えています。