AWS SQSメモ

f:id:ihatov08:20170209122117j:plain

# http://docs.aws.amazon.com/sdkforruby/api/Aws/SQS/Client.html#receive_message-instance_method
access_key = ENV['AWS_ACCESS_KEY_ID']
secret_key = ENV['AWS_SECRET_ACCESS_KEY']
Aws.config[:credentials] = Aws::Credentials.new(access_key, secret_key)
sqs = Aws::SQS::Client.new(region: 'us-west-2', credentials: Aws.config[:credentials])
queue_url = ENV['AWS_SQS_URL']
def getSQSMessage(sqs,queue_url)
begin
response = sqs.receive_message({queue_url: queue_url, visibility_timeout: 5, wait_time_seconds: 5, max_number_of_messages: 10})
raise "Recieving SQS message failed." if response.nil?
raise "Recieving SQS message not found." if response.messages.nil?
return response.messages
rescue => ex
pp ex.message
return nil
end
end
def decodeBounceInfo(message)
begin
#SESのJSONをパーズ。普通のSNSメッセージはパーズエラーで落とす。
body_parse = JSON.parse(message)
message_parse = JSON.parse(body_parse["Message"])
return message_parse
rescue => ex
pp "Decoding bounce info failed."
return nil
end
end
loop = true
exclude_message = Hash.new
begin
Timeout.timeout(300) {  #処理のタイムアウトは5分設定
while loop do
if (message = getSQSMessage(sqs,queue_url))  #メッセージが取得できなければ終了
message.each do |res|
pp "message_id:#{res.message_id} Processing..."
#同じメッセージが重複して来ることがあるらしいのでチェック
if exclude_message.has_key?(res.message_id)
pp "Skip message"
break
end
bounceInfo = decodeBounceInfo(res.body)
if (bounceInfo != nil && bounceInfo["bounce"]["bouncedRecipients"].instance_of?(Array))
pp "start"
bounceInfo["bounce"]["bouncedRecipients"].each do |recipient|
# バウンスメール処理
pp bounce_mail_addr = recipient["emailAddress"]
end
#メッセージを削除
sqs.delete_message({queue_url: queue_url,receipt_handle: res.receipt_handle})
end
exclude_message[res.message_id] = "Prosess End"
end
else
loop = false
end
end
}
rescue Timeout::Error
pp "TimeOut"
rescue => ex
pp ex.message
end
# ワーカーで効率よく処理するため2つのポイント
# この2つを気にするだけでだいぶ違うと思います。
# ReceiveMessageで10メッセージずつ処理する
# Long Pollingを使う
# ReceiveMessageで10メッセージずつ処理する
# 1リクエストでメッセージを10個取るようにします*1
# Long Pollingを使う
# ReceiveMessageWaitTimeSecondsを設定するとロングポーリングでデキュー出来ます。
# ロングポーリングにしておくとエンキューされたら即次実行されますし
# キューが溜まっていないときには接続しっぱなしで受信処理を行うため
# 余計なリクエストを行わなくて済みます。
# delete_message_batch(options = {}) ⇒ Types::DeleteMessageBatchResult
# Deletes up to ten messages from the specified queue. This is a batch version of DeleteMessage . The result of the action on each message is reported individually in the response.
# Because the batch request can result in a combination of successful and unsuccessful actions, you should check for batch errors even when the call returns an HTTP status code of 200.
# Some actions take lists of parameters. These lists are specified using the param.n notation. Values of n are integers starting from 1. For example, a parameter list with two elements looks like this:
# &Attribute.1=this
# &Attribute.2=that
# Examples:
# Request syntax with placeholder values
# resp = client.delete_message_batch({
#   queue_url: "String", # required
#   entries: [ # required
#     {
#       id: "String", # required
#       receipt_handle: "String", # required
#     },
#   ],
# })
# Response structure
# resp.successful #=> Array
# resp.successful[0].id #=> String
# resp.failed #=> Array
# resp.failed[0].id #=> String
# resp.failed[0].sender_fault #=> true/false
# resp.failed[0].code #=> String
# resp.failed[0].message #=> String
# JSON.parse(JSON.parse(json.body)['Message'])['bounce']['bouncedRecipients'].first['emailAddress']
# json = sqs.receive_message({queue_url: queue_url, max_number_of_messages: 10, message_attribute_names: ['Bounce'], visibility_timeout: 5, wait_time_seconds: 5}).messages.first
# msg = sqs.receive_message({ queue_url: queue_url,
#                             max_number_of_messages: 10,
#                             message_attribute_names: ['Bounce'],
#                             visibility_timeout: 5,
#                             wait_time_seconds: 5 })
# messageは1つづつしか取得できない?
# 少なくとも、receive_messageは1つづつしか取得できない
# キューから値が取得できたりできなかったり挙動が安定しない
# sqs.delete_message({queue_url: url, receipt_handle: msg.messages.first.receipt_handle})
# これでキューに入っているmessageを削除できる
# :max_number_of_messages (整数) -返すメッセージの最大数。Amazon SQSはこの値より多くのメッセージを返すことはありません(ただし、返されるメッセージは少なくなります)。有効な値は1〜10です。デフォルトは1です。
# :visibility_timeout (整数) -リクエストによって取得された後に受信メッセージが後続の検索要求から隠される期間(秒単位)ReceiveMessage 。
# :wait_time_seconds (整数) -戻り前にメッセージがキューに到着するのを呼び出しが待機する時間(秒)。メッセージが利用可能な場合は、より早く呼び出しが返されますWaitTimeSeconds。
# puts msg
# msg = sqs.delete_message_batch({queue_url: queue_url, entries:[{id: msg.first.message_id, receipt_handle: msg.first.receipt_handle}]})

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です