概述
Amazon Transcribe是自动语音识别(ASR)服务,可让开发人员轻松地为其应用程序添加语音转文本功能,Transcribe支持文件和流式Streaming的两种音频输入方式,Transcribe Streaming可以应用在会议记录,语音控制交互,语言实时翻译等场景,Streaming方式支持HTTP/2和WebSocket两种协议。本文介绍使用Python语言实现Transcribe Streaming的WebSocket协议。
Streaming transcription 接口介绍
Streaming transcription 接口可以接收音频流并且实时转换为文字,然后将结果返回客户端,同时返回数据中包含partial值,用来标示句子是否结束。
Streaming的数据是被编码的,由prelude和data组成。编码格式详见:https://docs.aws.amazon.com/transcribe/latest/dg/event-stream.html
Python语言的实现过程和示例
Python示例程序的运行环境是Python 3.7.9版本。
- 添加IAM Policy到你使用到的IAM user
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "transcribestreaming",
"Effect": "Allow",
"Action": "transcribe:StartStreamTranscriptionWebSocket",
"Resource": "*"
}
]
}
Python示例程序需要安装三个程序包websocket-client,boto3和amazon_transcribe;其中boto3是AWS SDK for Python,amazon_transcribe是Amazon Transcribe Streaming SDK,这两个SDK简化了和Amazon Transcribe Service的集成过程。amazon_transcribe的详细说明见:https://github.com/awslabs/amazon-transcribe-streaming-sdk
安装程序包的命令:
pip3 install boto3
pip3 install amazon_transcribe
pip3 install websocket-client
Python程序的import部分:
import hashlib
import hmac
import urllib.parse
from datetime import datetime
import time
import ssl
import json
import websocket
import _thread
from amazon_transcribe.eventstream import EventStreamMessageSerializer
from amazon_transcribe.eventstream import EventStreamBuffer
from boto3.session import Session
URL签名说明详见:https://docs.aws.amazon.com/transcribe/latest/dg/websocket.html#websocket-url
Python的实现示例:
下列代码中主体函数是create_pre_signed_url,它将生成访问Streaming transcription 接口的URL,其中包括必要的参数和签名,它需要传入4个参数:
def sign(key, msg):
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
def getSignatureKey(key, dateStamp, region, serviceName):
kDate = sign(("AWS4" + key).encode("utf-8"), dateStamp)
kRegion = sign(kDate, region)
kService = sign(kRegion, serviceName)
kSigning = sign(kService, "aws4_request")
return kSigning
def create_pre_signed_url(region, language_code, media_encoding, sample_rate):
# 获得access key和secret key
credentials = Session().get_credentials()
access_key_id = credentials.access_key
secret_access_key = credentials.secret_key
method = "GET"
service = "transcribe"
endpoint = "wss://transcribestreaming." + region + ".amazonaws.com:8443"
host = "transcribestreaming." + region + ".amazonaws.com:8443"
algorithm = "AWS4-HMAC-SHA256"
t = datetime.utcnow()
amz_date =t.strftime('%Y%m%dT%H%M%SZ')
datestamp =t.strftime('%Y%m%d')
canonical_uri = "/stream-transcription-websocket"
canonical_headers = "host:" + host + "\n"
signed_headers = "host"
credential_scope = datestamp + "/" + region + "/" + service + "/" + "aws4_request"
canonical_querystring = "X-Amz-Algorithm=" + algorithm
canonical_querystring += "&X-Amz-Credential=" + urllib.parse.quote_plus(access_key_id + "/" + credential_scope)
canonical_querystring += "&X-Amz-Date=" + amz_date
canonical_querystring += "&X-Amz-Expires=300"
canonical_querystring += "&X-Amz-SignedHeaders=" + signed_headers
canonical_querystring += "&language-code="+ language_code +"&media-encoding=" + media_encoding +"&sample-rate=" + sample_rate
# Zero length string for connecting
payload_hash = hashlib.sha256(("").encode('utf-8')).hexdigest()
canonical_request = method + '\n' \
+ canonical_uri + '\n' \
+ canonical_querystring + '\n' \
+ canonical_headers + '\n' \
+ signed_headers + '\n' \
+ payload_hash
string_to_sign = algorithm + "\n" \
+ amz_date + "\n" \
+ credential_scope + "\n" \
+ hashlib.sha256(canonical_request.encode("utf-8")).hexdigest()
signing_key = getSignatureKey(secret_access_key, datestamp, region, service)
signature = hmac.new(signing_key, string_to_sign.encode("utf-8"),
hashlib.sha256).hexdigest()
canonical_querystring += "&X-Amz-Signature=" + signature
request_url = endpoint + canonical_uri + "?" + canonical_querystring
return request_url
下面代码中的loop_receiving和send_data函数,作用分别是从Amazon Transcribe Service接收消息,和向Amazon Transcribe Service发送消息。
def main():
url = create_pre_signed_url("us-east-1", "en-US", "pcm", "16000")
ws = websocket.create_connection(url, sslopt={"cert_reqs": ssl.CERT_NONE})
_thread.start_new_thread(loop_receiving, (ws,))
print("Receiving...")
send_data(ws)
while True:
time.sleep(1)
main()
该函数位于main函数上方。它将接收Amazon Transcribe Streaming Service的返回数据,并且打印出来。
def loop_receiving(ws):
try:
while True:
result = ws.recv()
if result == '':
continue
eventStreamBuffer = EventStreamBuffer()
eventStreamBuffer.add_data(result)
eventStreamMessage = eventStreamBuffer.next()
stream_payload = eventStreamMessage.payload
transcript = json.loads(bytes.decode(stream_payload, "UTF-8"))
print("response:",transcript)
results = transcript['Transcript']['Results']
if len(results)>0:
for length in range(len(results)):
if 'IsPartial' in results[length]:
print('IsPartial:', results[length]['IsPartial'])
if 'Alternatives' in results[length]:
alternatives = results[length]['Alternatives']
if len(alternatives)>0:
for sublength in range(len(alternatives)):
if 'Transcript' in alternatives[sublength]:
print('Transcript:', alternatives[sublength]['Transcript'])
except Exception as e:
if 'WebSocketConnectionClosedException' == e.__class__.__name__:
print("Error: websocket connection is closed")
else:
print(f"Exception Name: {e.__class__.__name__}")
该函数位于main函数上方。它将发送音频数据到Amazon Transcribe Streaming Service。其中testFile变量是测试音频文件地址,测试音频为pem格式,英语,采样率为16000。
def send_data(ws):
testFile = "xxx.pem"
bufferSize = 1024*16
stream_headers = {
":message-type": "event",
":event-type": "AudioEvent",
":content-type": "application/octet-stream",
}
eventstream_serializer = EventStreamMessageSerializer()
with open(testFile, "rb") as source:
while True:
audio_chunk = source.read(bufferSize)
# 将音频数据进行编码
event_bytes = eventstream_serializer.serialize(stream_headers, audio_chunk)
ws.send(event_bytes, opcode = 0x2) # 0 x 2 send binary
# end with b'' data bytes
if len(audio_chunk) == 0:
break
结论
在这篇文章中,介绍了如何使用Python语言实现Transcribe Streaming的WebSocket协议,提供了Python的例子供参考,包括签名URL、数据编码、数据流的发送和接收等部分。完整代码见:https://github.com/xuemark/transcribe/blob/master/transcribe_streaming_websocket.py
参考资料
本篇作者