亚马逊AWS官方博客

快时尚电商行业智能体设计思路与应用实践(三)借助 Transcribe/Polly 打造新一代智能语音客服,实现媲美人工客服的对话体验

序言

在快时尚电商行业智能化生态系统的演进历程中,随着文本驱动智能体在逻辑推理和任务执行层面的日趋成熟,进一步提升交互体验的关键突破口正转向更加拟人化的语音到语音(S2S)解决方案。这一技术演进的核心目标是根本性改变传统语音交互中机械化应答所带给客户的负面感受,为用户打造更自然、更富有情感共鸣的智能对话体验。

传统语音交互的机械化困境

传统语音交互系统往往给用户带来明显的机械化感受,这种体验问题主要源于技术架构的固有局限性。在移动购物、客服咨询、商品推荐等典型交互场景中,用户频繁遭遇语调单一、回应僵硬、缺乏情感表达的语音服务,这种机械化的交互方式不仅降低了用户满意度,更制约了电商平台在客户关系管理方面的竞争优势。

情感缺失的交互体验是传统方案的核心痛点。系统无法感知用户语音中的情绪变化,也无法在回应中体现相应的情感关怀,导致客户在遇到问题时感受到的是冷冰冰的机器回复,而非温暖的人性化服务。

语调单调的表达方式进一步加剧了这种负面体验。传统语音合成技术生成的声音虽然清晰可懂,但缺乏自然的韵律变化和语调起伏,使得长时间的对话变得枯燥乏味,难以维持用户的参与度和注意力。

成熟的 Transcribe + Polly 组合方案

Amazon Transcribe 和 Amazon Polly 的组合方案为解决这一挑战提供了技术相对成熟且语言支持全面的解决方案。Amazon Transcribe 支持 50 多种语言和方言的高质量语音转写,具备实时与批量处理、自动语言识别、自定义词汇、说话人分离等丰富功能,能够灵活适应全球化业务场景。Amazon Polly 则覆盖 40 多种语言和上百种自然音色,依托神经网络语音合成技术,生成接近真人、富有情感的语音输出。两者均支持主流编程语言和 API,便于快速集成与大规模部署,广泛应用于电商、客服、教育等领域。这一经典架构通过 ASR-LLM-TTS 的流水线设计,为文本智能体快速扩展 S2S 能力奠定了坚实基础。

全面的多语言支持能力使得这一组合方案在全球化电商场景中表现出色。Amazon Transcribe 能够准确识别数十种语言和方言的语音输入,而Amazon Polly则支持更加广泛的语言覆盖,为跨国电商平台提供了完整的多语言客服解决方案。这种语言支持的全面性确保了不同地区用户都能获得本地化的语音服务体验。更多详细信息和最新支持语言列表,可参考 Amazon Transcribe 官方文档Amazon Polly 官方文档

技术成熟度的显著优势体现在系统的稳定性和可靠性上。开发者可以通过 SSML 标记语言精确控制语音的韵律、重音和停顿,实现个性化的语音输出效果。同时,系统支持实时流式处理和批量处理两种模式,能够灵活适应不同的业务场景需求,为快速部署和规模化应用提供了技术保障。

拟人化水平的初步提升通过 Polly 的神经网络语音技术得以实现。系统能够生成更加接近真人的自然语音,支持多种语音风格和基础情感表达,在一定程度上缓解了传统语音交互的机械化问题。

Nova Sonic 的未来可能性

随着 Nova Sonic 技术的不断发展,S2S 智能体方案迎来了更多令人期待的可能性。Nova Sonic 作为新一代端到端语音 AI 模型,通过统一的架构设计为语音交互带来了革命性的技术突破。

端到端架构的创新优势使 Nova Sonic 能够实现从语音理解到语音生成的全链路优化。这种统一架构不仅消除了传统多模型流水线中的信息损耗和延迟累积问题,更通过保留语音中的韵律、情感和语调信息,使智能体具备了更加自然和富有表现力的对话能力。

实时交互的技术突破为构建沉浸式购物体验提供了可能。Nova Sonic 支持双向流式处理,能够实现近乎实时的语音对话,支持自然的轮换对话和用户打断处理。这种低延迟的交互能力为电商平台打造更加流畅和自然的客服体验奠定了技术基础。

智能推理与情感理解的融合使 Nova Sonic 不仅仅是语音转换工具,更是具备深度理解和推理能力的智能语音助手。模型能够根据语音输入的语境和情感状态动态调整回应内容和语音风格,实现真正意义上的情感化智能对话。

技术实践的演进路径

本文将通过技术相对成熟、语言支持全面的 Transcribe/Polly 组合方案,演示如何为现有的文本智能体快速扩展 S2S 能力,充分利用其稳定性和多语言优势构建可靠的语音交互基础设施。在未来的工作中,我们将进一步探讨 Nova Sonic 对未来 S2S 智能体方案带来的更多技术可能性,展示如何通过渐进式的技术演进策略,为快时尚电商行业的智能化转型提供完整的技术解决方案和实施路径。

企业智能客服语音扩展方案选型:Transcribe/Polly vs Nova Sonic

在企业已有基于 LangChain/MCP 的文本智能客服基础上,扩展语音到语音(S2S)功能时,Transcribe/Polly 组合与 Nova Sonic 方案在侵入性、多渠道支持和改动程度方面存在一些差异。

侵入性对比分析

Transcribe/Polly 组合 – 最小侵入性

Transcribe/Polly 方案对现有系统的侵入性极小,现有的 LangChain/MCP 智能客服核心逻辑完全保持不变,只需在输入端添加语音转文本处理,在输出端添加文本转语音处理。这种方案保持了原有系统的完整性,语音功能作为外围模块存在,不会影响现有的文本处理流程。

Nova Sonic – 深度重构需求

Nova Sonic 作为统一的语音到语音模型,需要对现有架构进行深度改造。由于 Nova Sonic 具备自己的理解、推理和生成能力,与现有 LangChain/MCP 系统的功能存在重叠,需要重新设计整个对话流程。这要求将原有应用封装成 API 服务,并通过 Nova Sonic 的 tool use 功能进行调用,涉及相对复杂的架构重构。

多渠道支持能力

Transcribe/Polly – 多渠道支持

Transcribe/Polly 方案在多渠道支持方面具有优势。由于核心智能客服系统仍然基于文本处理,可以同时支持:

  • 纯文本输入输出(原有渠道)
  • 语音输入+文本输出(通过 Transcribe)
  • 文本输入+语音输出(通过 Polly)
  • 语音输入+语音输出(Transcribe+Polly 组合)

这种灵活性使得企业可以根据不同场景和用户偏好提供多样化的交互方式。

Nova Sonic – 单一语音渠道

Nova Sonic 主要设计为语音到语音的交互模式。虽然技术上可以支持文本输出,但其架构优化主要针对语音交互场景。要实现真正的多渠道支持,需要额外的适配层和复杂的路由逻辑。

对原有 LangChain/MCP 改动程度

Transcribe/Polly – 零核心改动

使用 Transcribe/Polly 方案时,原有的 LangChain/MCP 系统几乎不需要任何核心改动:

# 原有系统保持不变
async def process_text_query(text_input):
    # 现有LangChain/MCP处理逻辑
    response = await langchain_mcp_agent.process(text_input)
    return response
 
# 只需添加语音处理包装器
async def process_voice_query(audio_input):
    # 语音转文本
    text_input = await transcribe_audio(audio_input)
    # 使用现有系统处理
    text_response = await process_text_query(text_input)
    # 文本转语音
    audio_response = await synthesize_speech(text_response)
    return audio_response

这种方案保持了现有系统的稳定性和可靠性,新增功能通过外围模块实现。

Nova Sonic – 深度架构重构

Nova Sonic 方案需要对现有系统进行深度重构:

  • API 封装需求:需要将现有 LangChain/MCP 应用封装成独立的 API 服务
  • Tool Use 集成:需要通过 Nova Sonic 的 tool use 功能调用原有业务逻辑
  • 能力重叠处理:Nova Sonic 自身具备理解和生成能力,需要合理分工避免冲突
  • 流程重新设计:整个对话流程需要重新设计以适应 Nova Sonic 的工作模式

基于现有 LangChain/MCP 智能客服的 S2S 功能延展方案选型对比分析

Transcribe/Polly 组合方案

系统集成方式:采用模块化架构,通过 API 调用方式与现有 LangChain/MCP 系统无缝集成,保持原有核心业务逻辑完全独立

部署实施策略:支持分阶段渐进式部署,可先实现语音转文本功能,再逐步添加文本转语音

调试维护复杂度:语音处理模块与文本处理逻辑完全解耦,问题定位清晰,可独立进行语音质量优化和文本逻辑调试

成本控制模式:基于实际使用量的透明计费模式,转录按分钟计费,语音合成按字符计费,成本可预测且可控制

Nova Sonic 统一方案

系统集成方式:需要深度重构现有架构,将原有智能客服封装为 API 服务,通过 Nova Sonic 的 tool use 功能进行统一调度管理

部署实施策略:要求一次性完整部署,需要同时完成语音理解、业务逻辑集成和响应生成的全链路改造,实施周期较长

调试维护复杂度:语音处理与业务逻辑深度耦合,故障排查需要跨越语音识别、意图理解、业务处理多个层面,调试复杂度较高

成本控制模式:统一的端到端语音交互计费,成本结构相对复杂,需要综合考虑语音处理时长、模型推理成本和 API 调用费用

方案选型

基于以上分析,对于已有 LangChain/MCP 智能客服系统的企业,如果需要最大化利用已有方案进行 S2S 功能延展,可以考虑采用 Transcribe/Polly 组合方案

  • 最小侵入性:保护现有投资,降低实施风险
  • 多渠道支持:灵活支持各种输入输出组合
  • 最小改动:几乎零核心代码修改
  • 渐进式升级:可以分阶段实施,逐步完善

针对从零开始构建的新系统,或者对现有系统进行全面升级改造的场景,可以考虑采用 Nova Sonic 方案。对于希望快速、低风险地为现有智能客服延展或升级语音功能的企业,Transcribe/Polly 组合是较为合适的选择。

Transcribe/Polly 智能语音客服原型示例

概述

原型示例的场景设定为快时尚电商行业的智能语音客服。借助 Transcribe/Polly,对本系列的第二篇博客的 LangChain+MCP 实现的智能客服系统进行 S2S 功能延展,实现高度拟人的语音交互体验。

原型示例展示了 ASR/LLM/TTS 的全过程,包括智能语音录制、动态结束检测、音频播放打断机制、客户服务处理逻辑以及跨平台兼容性处理,可以在 MacOS,Windows 和 Linux 上运行。

环境准备

在构建应用之前,我们需要把以下环境准备就绪,需要参考本系列的第二篇博客,在第二篇博客的原型示例运行成功的基础上,进行 Transcribe/Polly 智能语音客服原型示例的环境准备。

  • 开通 Transcribe/Polly 相关权限
  • 更新开发环境相关软件包

方案架构

项目结构

customer_service_mcp/
├── __init__.py
├── agents
│   ├── __init__.py
│   ├── base_agent.py
│   ├── intent_recognition_agent.py
│   ├── logistics_issue_agent.py
│   └── order_issue_agent.py
├── config
│   └── mcp_config.py
├── main.py
├── order_data.txt
├── requirements.txt
├── server.py
├── services
│   ├── __init__.py
│   ├── order_service.py
│   └── sop_service.py
├── start_client.sh
└── start_server.sh

项目结构和本系列的第二篇博客一致,需要更新 requirements.txt 和 main.py,安装所需的软件包,并添加 Transcribe/Polly 相关代码逻辑。

requirements.txt

langchain>=0.1.0
langchain_community
langchain_mcp_adapters>=0.1.0
boto3>=1.34.0
python-dotenv>=1.0.0
regex>=2023.0.0
mcp-server>=0.1.0
aioconsole>=0.7.0
rx
aws-sdk-bedrock-runtime
pyaudio>=0.2.11
python-dotenv>=1.0.0
asyncio>=3.4.3
langchain_aws
fastmcp
amazon-transcribe>=0.5.0
aiofile>=3.8.1
pygame>=2.0.0

main.py

#transcribe和polly完美版
import uuid
import json
import asyncio
import aioconsole
import boto3
import os
import pyaudio
import wave
import time
import requests
import pygame
import tempfile
import subprocess
import platform
import threading
import sys
import select
from typing import Optional, Dict, Any
from langchain_aws import ChatBedrock
from langchain_mcp_adapters.client import MultiServerMCPClient
from agents.intent_recognition_agent import IntentRecognitionAgent
from agents.order_issue_agent import OrderIssueAgent
from agents.logistics_issue_agent import LogisticsIssueAgent
from services.order_service import OrderService
from services.sop_service import SOPService
from amazon_transcribe.client import TranscribeStreamingClient
from amazon_transcribe.handlers import TranscriptResultStreamHandler
from amazon_transcribe.model import TranscriptEvent
import aiofile

# Initialize AWS clients
transcribe_client = boto3.client('transcribe')
polly_client = boto3.client('polly')

# 全局变量控制播放状态
audio_playing = False
audio_interrupted = False

class DynamicEventHandler(TranscriptResultStreamHandler):
    """改进的事件处理器,支持动态语音结束检测"""
    
    def __init__(self, transcript_result_stream):
        super().__init__(transcript_result_stream)
        self.final_transcript = ""
        self.partial_transcript = ""
        self.speech_ended = False
        self.last_partial_time = time.time()
        self.silence_threshold = 1.5  # 2秒静音阈值
        self.min_speech_duration = 0.5  # 最小语音时长
        self.speech_start_time = None
        self.has_speech = False

    async def handle_transcript_event(self, transcript_event: TranscriptEvent):
        """处理转录事件,实现动态结束检测"""
        results = transcript_event.transcript.results
        current_time = time.time()
        
        for result in results:
            if result.alternatives:
                transcript_text = result.alternatives[0].transcript.strip()
                
                if result.is_partial:
                    # 处理部分结果
                    if transcript_text:
                        self.partial_transcript = transcript_text
                        self.last_partial_time = current_time
                        if not self.has_speech:
                            self.has_speech = True
                            self.speech_start_time = current_time
                        print(f"🎤 正在识别: {transcript_text}")
                else:
                    # 处理完整结果
                    if transcript_text:
                        self.final_transcript = transcript_text
                        print(f"✅ 识别完成: {transcript_text}")
                        
                        # 检查是否满足最小语音时长
                        if (self.speech_start_time and 
                            current_time - self.speech_start_time >= self.min_speech_duration):
                            self.speech_ended = True
                            return
        
        # 检查静音超时
        if (self.has_speech and 
            current_time - self.last_partial_time > self.silence_threshold):
            print("🔇 检测到静音,结束录制")
            self.speech_ended = True

async def stream_audio_to_text_dynamic():
    """动态语音转文本,基于Amazon Transcribe内置端点检测"""
    client = TranscribeStreamingClient(region="us-west-2")

    # 启用部分结果稳定化和端点检测
    stream = await client.start_stream_transcription(
        language_code="en-US",
        media_sample_rate_hz=16000,
        media_encoding="pcm",
        enable_partial_results_stabilization=True,
        partial_results_stability="high"
    )

    async def write_chunks():
        CHUNK = 320  # 20ms音频块,适合VAD检测
        FORMAT = pyaudio.paInt16
        CHANNELS = 1
        RATE = 16000
        MAX_RECORD_SECONDS = 30  # 最大录制时长保护

        p = pyaudio.PyAudio()
        audio_stream = p.open(format=FORMAT,
                        channels=CHANNELS,
                        rate=RATE,
                        input=True,
                        frames_per_buffer=CHUNK)

        print("🎤 开始录音,请说话...")
        print("💡 系统会自动检测语音结束")
        
        start_time = time.time()
        
        try:
            while not handler.speech_ended:
                # 检查最大录制时长
                if time.time() - start_time > MAX_RECORD_SECONDS:
                    print("⏰ 达到最大录制时长,自动结束")
                    break
                
                try:
                    data = audio_stream.read(CHUNK, exception_on_overflow=False)
                    await stream.input_stream.send_audio_event(audio_chunk=data)
                    await asyncio.sleep(0.02)  # 20ms间隔
                except Exception as e:
                    print(f"录音错误: {e}")
                    break
                    
        finally:
            print("🔚 录音结束")
            audio_stream.stop_stream()
            audio_stream.close()
            p.terminate()
            await stream.input_stream.end_stream()

    handler = DynamicEventHandler(stream.output_stream)
    
    # 并行执行音频写入和事件处理
    await asyncio.gather(write_chunks(), handler.handle_events())
    
    # 返回最终或部分转录结果
    final_result = handler.final_transcript if handler.final_transcript else handler.partial_transcript
    return final_result.strip()

def synthesize_speech(text: str) -> bytes:
    """
    Convert text to speech using AWS Polly and return the audio data.
    """
    response = polly_client.synthesize_speech(
        Text=text,
        OutputFormat='mp3',
        VoiceId='Joanna'
    )
    
    return response['AudioStream'].read()

def init_audio_system():
    """
    初始化音频系统
    """
    try:
        pygame.mixer.pre_init(frequency=22050, size=-16, channels=2, buffer=512)
        pygame.mixer.init()
        print("✅ 音频系统初始化完成")
        return True
    except Exception as e:
        print(f"⚠️ 音频系统初始化失败: {e}")
        return False

def create_interrupt_detector():
    """
    创建跨平台的输入检测器
    """
    if os.name == 'posix':  # Unix/Linux/macOS
        def unix_input_detector(stop_event, playback_finished):
            """Unix系统的非阻塞输入检测"""
            while not playback_finished.is_set():
                if sys.stdin in select.select([sys.stdin], [], [], 0.1)[0]:
                    sys.stdin.readline()
                    stop_event.set()
                    return True
            return False
        return unix_input_detector
    else:  # Windows
        def windows_input_detector(stop_event, playback_finished):
            """Windows系统的输入检测"""
            try:
                input()  # 阻塞等待Enter
                if not playback_finished.is_set():
                    stop_event.set()
                    return True
            except:
                pass
            return False
        return windows_input_detector

def play_audio_with_interrupt(audio_data: bytes) -> bool:
    """
    播放音频,支持实时打断功能
    返回True表示播放完成,False表示被打断
    """
    global audio_playing, audio_interrupted
    
    audio_playing = True
    audio_interrupted = False
    
    # 使用更可靠的线程通信机制
    stop_event = threading.Event()
    playback_finished = threading.Event()
    
    def audio_playback():
        """音频播放线程"""
        try:
            if not pygame.mixer.get_init():
                pygame.mixer.pre_init(frequency=22050, size=-16, channels=2, buffer=512)
                pygame.mixer.init()
            
            with tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') as temp_file:
                temp_file.write(audio_data)
                temp_file_path = temp_file.name
            
            pygame.mixer.music.load(temp_file_path)
            pygame.mixer.music.play()
            
            # 更频繁地检查停止信号
            while pygame.mixer.music.get_busy():
                if stop_event.is_set():
                    pygame.mixer.music.stop()  # 立即停止播放
                    break
                time.sleep(0.05)  # 减少检查间隔
            
            playback_finished.set()
            
        except Exception as e:
            print(f"❌ 音频播放错误: {e}")
            playback_finished.set()
        finally:
            try:
                os.unlink(temp_file_path)
            except:
                pass
    
    def interrupt_listener():
        """改进的输入监听线程"""
        try:
            print("🔊 正在播放语音回复... (按Enter键可打断播放)")
            
            # 使用跨平台输入检测
            input_detector = create_interrupt_detector()
            if input_detector(stop_event, playback_finished):
                global audio_interrupted
                audio_interrupted = True
                
        except Exception as e:
            print(f"输入监听错误: {e}")
    
    # 启动线程
    playback_thread = threading.Thread(target=audio_playback, daemon=True)
    interrupt_thread = threading.Thread(target=interrupt_listener, daemon=True)
    
    playback_thread.start()
    interrupt_thread.start()
    
    # 等待播放完成或被打断
    playback_finished.wait()
    
    audio_playing = False
    
    if audio_interrupted:
        print("⏹️ 播放已被打断")
        return False
    else:
        print("✅ 语音播放完成")
        return True

def play_audio(audio_data: bytes) -> None:
    """
    跨平台音频播放函数 - 支持打断功能
    """
    try:
        completed = play_audio_with_interrupt(audio_data)
        if not completed:
            print("💡 您可以重新输入问题")
    except Exception as e:
        print(f"❌ 音频播放错误: {e}")
        # 降级到原始播放方案
        fallback_play_audio(audio_data)

def fallback_play_audio(audio_data: bytes) -> None:
    """
    降级音频播放方案(不支持打断)
    """
    try:
        if not pygame.mixer.get_init():
            pygame.mixer.pre_init(frequency=22050, size=-16, channels=2, buffer=512)
            pygame.mixer.init()
        
        with tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') as temp_file:
            temp_file.write(audio_data)
            temp_file_path = temp_file.name
        
        pygame.mixer.music.load(temp_file_path)
        pygame.mixer.music.play()
        
        while pygame.mixer.music.get_busy():
            time.sleep(0.1)
        
        print(f"✅ 音频播放完成 ({platform.system()})")
        
    except Exception as e:
        print(f"❌ 降级播放失败: {e}")
        system_play_audio(audio_data)
    finally:
        try:
            time.sleep(0.5)
            os.unlink(temp_file_path)
        except:
            pass

def system_play_audio(audio_data: bytes) -> None:
    """
    系统命令播放方案
    """
    with tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') as temp_file:
        temp_file.write(audio_data)
        temp_file_path = temp_file.name
    
    try:
        system = platform.system().lower()
        if system == 'linux':
            subprocess.run(['mpg123', temp_file_path], 
                         stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
        elif system == 'darwin':
            subprocess.run(['afplay', temp_file_path])
        elif system == 'windows':
            subprocess.run(['start', '', temp_file_path], shell=True)
    except Exception as e:
        print(f"❌ 系统播放也失败: {e}")
    finally:
        try:
            os.unlink(temp_file_path)
        except:
            pass

class CustomerServiceSystem:
    """Main customer service system that coordinates agents and services."""
    
    def __init__(self, model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0", region: str = "us-west-2"):
        """Initialize the customer service system with its agents and services."""
        self.intent_agent = IntentRecognitionAgent(model_id=model_id, region=region)
        self.order_agent = OrderIssueAgent(model_id=model_id, region=region)
        self.logistics_agent = LogisticsIssueAgent(model_id=model_id, region=region)
        
        self.order_service = OrderService()
        self.sop_service = SOPService()
        
        self.conversations: Dict[str, Dict[str, Any]] = {}
    
    def process_question(self, user_question: str, conversation_id: Optional[str] = None) -> tuple[str, str]:
        """Process a customer question through the multi-agent system."""
        if not conversation_id:
            conversation_id = str(uuid.uuid4())
            self.conversations[conversation_id] = {"order_id": None, "history": []}
        
        self.conversations[conversation_id]["history"].append({"role": "user", "content": user_question})
        
        intent, _ = self.intent_agent.process(
            user_question,
            conversation_id,
            history=self.conversations[conversation_id]["history"]
        )
        print(f"Intent recognized: {intent}")
        
        import re
        order_id_match = re.search(r'order\s+(?:id\s+)?(?:number\s+)?(?:#\s*)?(\d+)', 
                                 user_question, re.IGNORECASE)
        if order_id_match:
            self.conversations[conversation_id]["order_id"] = order_id_match.group(1)
        
        if intent == "ORDER":
            response, _ = self.order_agent.process(
                user_question, 
                conversation_id,
                order_id=self.conversations[conversation_id].get("order_id"),
                history=self.conversations[conversation_id]["history"]
            )
        elif intent == "LOGISTICS":
            response, _ = self.logistics_agent.process(
                user_question,
                conversation_id,
                order_id=self.conversations[conversation_id].get("order_id"),
                history=self.conversations[conversation_id]["history"]
            )
        else:
            response = "I'm not sure if your question is about an order or logistics issue. Could you please provide more details?"
        
        self.conversations[conversation_id]["history"].append({"role": "assistant", "content": response})
        
        return response, conversation_id

async def interactive_session():
    """运行交互会话,支持动态语音输入和文本输入"""
    # 初始化音频系统
    audio_available = init_audio_system()
    if not audio_available:
        print("💡 音频播放可能受限,建议安装: pip install pygame")

    system = CustomerServiceSystem()
    conversation_id = None
    
    print("Welcome to Fashion E-commerce Customer Service!")
    print("You can ask questions about your orders or logistics using voice or text.")
    print("🎤 Press Enter for SMART voice recording (auto-detects speech end)")
    print("✏️  Type your question directly for text input")
    print("During voice playback, press Enter to interrupt and ask a new question.")
    print("Type 'exit' to end the conversation.")
    print("\nAvailable test orders: 123, 456, 789")
    print("Note: Make sure you have set up your AWS credentials for voice interaction.")
    print("-" * 50)
    
    client = MultiServerMCPClient({
        "customer_service": {
            "url": "http://localhost:8000/sse",
            "transport": "sse",
        }
    })

    tools = await client.get_tools()
    process_question_tool = next(tool for tool in tools if tool.name == "process_question")

    while True:
        # 确保不在播放状态时才接受输入
        if audio_playing:
            await asyncio.sleep(0.1)
            continue
            
        # 提供智能语音和文本输入选择
        user_input = await aioconsole.ainput("\nCustomer (🎤 Enter=Smart Voice | ✏️ Type=Text): ")
        
        if user_input.lower() == 'exit':
            print("Thank you for using our customer service. Goodbye!")
            break
        
        # 如果用户按了Enter(空输入),启动智能语音录制
        if not user_input:
            try:
                print("🎤 智能语音录制启动...")
                user_input = await stream_audio_to_text_dynamic()
                print(f"📝 最终转录结果: {user_input}")
            except Exception as e:
                print(f"❌ 语音录制或转录错误: {str(e)}")
                continue
        
        # 检查输入是否有效
        if not user_input.strip():
            print("⚠️ 未检测到有效输入,请重试")
            continue
        
        # 处理用户输入(无论是语音转换的还是直接输入的文本)
        try:
            result = await process_question_tool.arun({
                "question": user_input,
                "conversation_id": conversation_id
            })
            response_data = json.loads(result)
            response_text = response_data['response']
            print(f"\nAgent: {response_text}")
            
            # 转换为语音并播放(支持打断)
            audio_data = synthesize_speech(response_text)
            play_audio(audio_data)
            
            conversation_id = response_data['conversation_id']
            
        except Exception as e:
            print(f"\n❌ 处理问题时出错: {str(e)}")

if __name__ == "__main__":
    asyncio.run(interactive_session())

项目运行

启动 MCP Server

./start_server.sh

启动 MCP Client 运行交互式会话:

./start_client.sh   

根据系统提示,可以选择使用语音或者文本进行智能客服问答测试。

结语

在快时尚电商行业智能语音客服的构建实践中,基于 Transcribe/Polly 的组合方案以其技术成熟、语言支持全面及最小侵入性的优势,成为现有文本智能客服系统快速扩展语音到语音(S2S)能力的理想选择。该方案通过模块化设计,实现了对原有系统的零核心改动,支持多渠道灵活交互,保障了系统的稳定性与渐进式部署,可以显著提升用户的语音交互体验。

展望未来,Nova Sonic 端到端语音 AI 模型的出现,标志着智能语音交互进入了全链路优化和情感理解深度融合的新阶段。其统一的架构设计不仅可以消除传统流水线中的信息损耗和延迟,更通过实时交互和情感推理能力,提升对话的自然性和情感共鸣,为快时尚电商智能化转型注入新的动力。在未来的工作中,我们将进一步探讨 Nova Sonic 的优势场景和使用模式。

综上所述,快时尚电商行业可根据自身技术基础和业务需求,合理选择 Transcribe/Polly 方案实现快速落地与功能扩展,亦可面向未来采用 Nova Sonic 方案进行深度架构升级。两者的有机结合与技术迭代,将推动智能语音客服从传统机械应答向高度拟人化、智能化的跨越,全面提升客户服务体验,增强市场竞争力,助力行业数字化转型。


*前述特定亚马逊云科技生成式人工智能相关的服务仅在亚马逊云科技海外区域可用,亚马逊云科技中国仅为帮助您了解行业前沿技术和发展海外业务选择推介该服务。

本篇作者

张羽

亚马逊云科技解决方案架构师

赵安蓓

亚马逊云科技解决方案架构师,负责基于亚马逊云科技的解决方案咨询和设计,机器学习 TFC 成员。在数据处理与建模领域有着丰富的实践经验,特别关注医疗领域的机器学习工程化与运用。