Confluent、Kafkaデータで「自律思考」するAIストリーミングエージェント発表

Thenewstack

Confluentは、Confluent Cloudでオープンプレビューとして利用可能になった新しいストリーミングエージェント(Streaming Agents)機能を発表しました。この新機能は、人工知能の応用における重要な進化を示しており、組織がモデルを選択し、プロンプトを設計し、ツールとデータソースを指定し、テストを実装し、イベント駆動型のマルチエージェントAIシステム向けにデータを充実させることを可能にします。

ConfluentのAIエージェントへのアプローチは、2つの説得力のある理由から注目に値します。第一に、ConfluentのApache Kafka基盤から直接最新のデータを供給することで、スマートエージェントの可能性を最大限に引き出し、最新の状況認識能力を付与することを目指しています。第二に、そしておそらくより深くは、従来のAIインタラクションモデルを逆転させています。チャットボットへの自然言語プロンプトを通じて人間がAIエージェントを起動するのが一般的であるのに対し、ストリーミングエージェントは、低遅延のエンタープライズデータに動的に反応する自律的なエンティティとして設計されています。ConfluentのAI責任者であるSean Falconerは、ストリーミングエージェントは、AIが「インフラストラクチャに組み込まれ、ビジネスの状態を監視し、その状態の変化に基づいて反応する、環境に溶け込んだイベント駆動型エージェント」によって特徴付けられる未来へとビジネスを推進するために部分的に構想されたと説明しています。彼は、これらのAI駆動型エージェントが、組織の「目」と「耳」として機能し、ビジネスケースに影響を与える最新のデータ駆動型開発を収集し、インテリジェントシステムが即座に適応することを保証できると示唆しています。Kafkaの堅牢なストレージおよびメッセージング機能と、Confluent Cloud for Apache Flinkにある分析、AIモデル、ストリーミングエージェントを組み合わせることで、Confluentの新機能はまさにこれを達成する可能性を秘めています。

これらのストリーミングエージェントの開発とデプロイは、Confluent Cloud for Apache Flink内で行われます。Flinkは計算のバックボーンとして機能し、エージェントが様々な外部ツールやリソースとシームレスに連携できるようにします。Falconerによると、ユーザーはMCP(エージェントがリソースを呼び出すためのプロトコル)のような新しい標準に接続できるほか、ベクトルデータベースやSQLデータベース、APIエンドポイントにも接続でき、これらはすべてFlink環境内で安全に管理されます。Flinkはまた、組織がエージェントに実行させたい特定のアクションを定義し、Gemini、OpenAI、Anthropicなどの業界リーダーからのものを含む多様なAIモデルを統合するためのインターフェースを提供します。Falconerは、モデルの選択がエージェント構築の最初のステップとなることが多いと述べています。

Confluent Cloud for Apache Flinkにおけるエージェント構成の重要な側面は、プロンプトの定義であり、「コンテキストエンジニアリング」の中心的なプロセスです。これらのプロンプトは二重です。システムプロンプトは、より大きなワークフロー内でのエージェントの全体的な役割を確立し、タスク固有のプロンプトは、エージェントが実行すると予想される正確なジョブ特性を詳述します。例えば、システムプロンプトはエージェントを「メール作成の専門家」として指定し、その後に「特定のユーザーを記述する以下の入力に基づいてメールを作成する」という明示的な指示が続く場合があります。この区別はエージェントの範囲を狭め、期待される入力と出力を明確にコード化します。Falconerが例示するように、「リードをスコアリングする場合、入力はリードであり、出力は1から100の間のスコアであるべきだとわかっています。」この精度は即時のエラー検出を助け、エージェントのテストと進化を容易にします。

Apache Flinkが処理能力を提供する一方で、ストリーミングエージェントの固有のパワーの多くは、エージェント間通信のためのApache Kafkaとの深い統合に由来しています。ストリーミングエージェントは、他のプロトコルに依存するのではなく、Kafkaの堅牢なメッセージング機能を活用して、複雑なワークフローの異なる部分をエージェント間で引き継ぎ、通信プロバイダーのネットワーク障害検出のようなより大きな目標を共同で達成します。このプロセスは、多くの場合、天気予報やIoTセンサーデータのような多様なリアルタイム入力から始まり、ビジネスの動的な状態を表します。Falconerが詳しく説明するように、「エージェントがそのデータを取り込むと、追加のコンテキストを収集するために外部システムとも通信します。そして、おそらく複数のシステム、他のエージェントを含むシステムにファンアウトする出力を生成し、それらのエージェントがその出力を運用化します。」通常、これらの最初のイベントはKafkaトピックに流れ込み、そこからデータは集約または処理のためにFlinkにルーティングされます。エージェントがこのデータに対してアクションを実行した後、その出力をKafkaに戻します(多くの場合、新しいトピックに)、それによって後続のエージェントがワークフローを継続するように促します。Kafkaの永続的なメッセージングおよびストレージ機能はここで非常に貴重であり、「エージェント1とエージェント2間の重要なトレーサビリティ」を提供します。この連続した記録により、開発者は通信履歴を確認でき、実際のトラフィックに対するテストを容易にし、エージェントの動作とモデルの洗練を可能にし、改善された出力を保証します。

Confluentは、ストリーミングエージェントの「ダークローンチ」サポートを含む包括的なテスト機能も組み込んでいます。これにより、エージェントはエンドユーザーと直接対話することなく本番トラフィック内で動作でき、組織は2番目のバージョン(異なるモデルやプロンプトを持つものなど)をデプロイし、ライブ操作に影響を与えることなく元のバージョンに対するパフォーマンスを測定できます。この並行処理によりパフォーマンス比較が可能になり、「バージョン2が最初のものより優れている」ことを保証します。ユーザーはConfluentの環境内でA/Bテストを実施することもでき、ストリーミングデータプラットフォームから直接、MySQLなどの外部テーブルにアクセスしてエージェントデータを充実させることも可能です。

AIエージェントをリアルタイムストリーミングデータで強化することで、Confluentは企業に代わって自律的に行動する能力を大幅に向上させています。Falconerは、このリアルタイムインテリジェンスをビジネスの「目と耳」と適切に比較し、履歴データから得られる洞察を補完すると述べています。彼は、「ほとんどのビジネスケースでは両方が必要である」と強調しています。過去の顧客行動と現在の行動の両方を理解することです。AIを搭載したエージェントにこの包括的で最新の情報を提供することで、彼らが効果的に協力し、ビジネス成果を最適化し、戦略的目標を達成するための計り知れない可能性が解き放たれます。