地方でリモートワーク

プログラミング、先物、fx,仮想通貨なんでもやります

AWS SQSメモ

スポンサーリンク

f:id:ihatov08:20170209122117j:plain

# http://docs.aws.amazon.com/sdkforruby/api/Aws/SQS/Client.html#receive_message-instance_method

access_key = ENV['AWS_ACCESS_KEY_ID']
secret_key = ENV['AWS_SECRET_ACCESS_KEY']
Aws.config[:credentials] = Aws::Credentials.new(access_key, secret_key)

sqs = Aws::SQS::Client.new(region: 'us-west-2', credentials: Aws.config[:credentials])

queue_url = ENV['AWS_SQS_URL']

def getSQSMessage(sqs,queue_url)
  begin
    response = sqs.receive_message({queue_url: queue_url, visibility_timeout: 5, wait_time_seconds: 5, max_number_of_messages: 10})
    raise "Recieving SQS message failed." if response.nil?
    raise "Recieving SQS message not found." if response.messages.nil?
    return response.messages
  rescue => ex
    pp ex.message
    return nil
  end
end

def decodeBounceInfo(message)
  begin
    #SESのJSONをパーズ。普通のSNSメッセージはパーズエラーで落とす。
    body_parse = JSON.parse(message)
    message_parse = JSON.parse(body_parse["Message"])
    return message_parse
  rescue => ex
    pp "Decoding bounce info failed."
    return nil
  end
end

loop = true
exclude_message = Hash.new

begin
Timeout.timeout(300) {  #処理のタイムアウトは5分設定
  while loop do
    if (message = getSQSMessage(sqs,queue_url))  #メッセージが取得できなければ終了
      message.each do |res|
        pp "message_id:#{res.message_id} Processing..."
        #同じメッセージが重複して来ることがあるらしいのでチェック
        if exclude_message.has_key?(res.message_id)
          pp "Skip message"
          break
        end
        bounceInfo = decodeBounceInfo(res.body)
        if (bounceInfo != nil && bounceInfo["bounce"]["bouncedRecipients"].instance_of?(Array))
          pp "start"
          bounceInfo["bounce"]["bouncedRecipients"].each do |recipient|
           # バウンスメール処理
            pp bounce_mail_addr = recipient["emailAddress"]
          end
          #メッセージを削除
          sqs.delete_message({queue_url: queue_url,receipt_handle: res.receipt_handle})
        end
        exclude_message[res.message_id] = "Prosess End"
      end
    else
      loop = false
    end
  end
}
rescue Timeout::Error
  pp "TimeOut"
rescue => ex
  pp ex.message
end
# ワーカーで効率よく処理するため2つのポイント

# この2つを気にするだけでだいぶ違うと思います。

# ReceiveMessageで10メッセージずつ処理する
# Long Pollingを使う
# ReceiveMessageで10メッセージずつ処理する
# 1リクエストでメッセージを10個取るようにします*1

# Long Pollingを使う
# ReceiveMessageWaitTimeSecondsを設定するとロングポーリングでデキュー出来ます。
# ロングポーリングにしておくとエンキューされたら即次実行されますし
# キューが溜まっていないときには接続しっぱなしで受信処理を行うため
# 余計なリクエストを行わなくて済みます。

# delete_message_batch(options = {}) ⇒ Types::DeleteMessageBatchResult

# Deletes up to ten messages from the specified queue. This is a batch version of DeleteMessage . The result of the action on each message is reported individually in the response.


# Because the batch request can result in a combination of successful and unsuccessful actions, you should check for batch errors even when the call returns an HTTP status code of 200.

# Some actions take lists of parameters. These lists are specified using the param.n notation. Values of n are integers starting from 1. For example, a parameter list with two elements looks like this:

# &Attribute.1=this

# &Attribute.2=that

# Examples:

# Request syntax with placeholder values

# resp = client.delete_message_batch({
#   queue_url: "String", # required
#   entries: [ # required
#     {
#       id: "String", # required
#       receipt_handle: "String", # required
#     },
#   ],
# })
# Response structure

# resp.successful #=> Array
# resp.successful[0].id #=> String
# resp.failed #=> Array
# resp.failed[0].id #=> String
# resp.failed[0].sender_fault #=> true/false
# resp.failed[0].code #=> String
# resp.failed[0].message #=> String

# JSON.parse(JSON.parse(json.body)['Message'])['bounce']['bouncedRecipients'].first['emailAddress']

# json = sqs.receive_message({queue_url: queue_url, max_number_of_messages: 10, message_attribute_names: ['Bounce'], visibility_timeout: 5, wait_time_seconds: 5}).messages.first

# msg = sqs.receive_message({ queue_url: queue_url,
#                             max_number_of_messages: 10,
#                             message_attribute_names: ['Bounce'],
#                             visibility_timeout: 5,
#                             wait_time_seconds: 5 })
# messageは1つづつしか取得できない?
# 少なくとも、receive_messageは1つづつしか取得できない
# キューから値が取得できたりできなかったり挙動が安定しない

# sqs.delete_message({queue_url: url, receipt_handle: msg.messages.first.receipt_handle})
# これでキューに入っているmessageを削除できる

# :max_number_of_messages (整数) -返すメッセージの最大数。Amazon SQSはこの値より多くのメッセージを返すことはありません(ただし、返されるメッセージは少なくなります)。有効な値は1〜10です。デフォルトは1です。
# :visibility_timeout (整数) -リクエストによって取得された後に受信メッセージが後続の検索要求から隠される期間(秒単位)ReceiveMessage 。
# :wait_time_seconds (整数) -戻り前にメッセージがキューに到着するのを呼び出しが待機する時間(秒)。メッセージが利用可能な場合は、より早く呼び出しが返されますWaitTimeSeconds。
# puts msg

 # msg = sqs.delete_message_batch({queue_url: queue_url, entries:[{id: msg.first.message_id, receipt_handle: msg.first.receipt_handle}]})