# http://docs.aws.amazon.com/sdkforruby/api/Aws/SQS/Client.html#receive_message-instance_method access_key = ENV['AWS_ACCESS_KEY_ID'] secret_key = ENV['AWS_SECRET_ACCESS_KEY'] Aws.config[:credentials] = Aws::Credentials.new(access_key, secret_key) sqs = Aws::SQS::Client.new(region: 'us-west-2', credentials: Aws.config[:credentials]) queue_url = ENV['AWS_SQS_URL'] def getSQSMessage(sqs,queue_url) begin response = sqs.receive_message({queue_url: queue_url, visibility_timeout: 5, wait_time_seconds: 5, max_number_of_messages: 10}) raise "Recieving SQS message failed." if response.nil? raise "Recieving SQS message not found." if response.messages.nil? return response.messages rescue => ex pp ex.message return nil end end def decodeBounceInfo(message) begin #SESのJSONをパーズ。普通のSNSメッセージはパーズエラーで落とす。 body_parse = JSON.parse(message) message_parse = JSON.parse(body_parse["Message"]) return message_parse rescue => ex pp "Decoding bounce info failed." return nil end end loop = true exclude_message = Hash.new begin Timeout.timeout(300) { #処理のタイムアウトは5分設定 while loop do if (message = getSQSMessage(sqs,queue_url)) #メッセージが取得できなければ終了 message.each do |res| pp "message_id:#{res.message_id} Processing..." #同じメッセージが重複して来ることがあるらしいのでチェック if exclude_message.has_key?(res.message_id) pp "Skip message" break end bounceInfo = decodeBounceInfo(res.body) if (bounceInfo != nil && bounceInfo["bounce"]["bouncedRecipients"].instance_of?(Array)) pp "start" bounceInfo["bounce"]["bouncedRecipients"].each do |recipient| # バウンスメール処理 pp bounce_mail_addr = recipient["emailAddress"] end #メッセージを削除 sqs.delete_message({queue_url: queue_url,receipt_handle: res.receipt_handle}) end exclude_message[res.message_id] = "Prosess End" end else loop = false end end } rescue Timeout::Error pp "TimeOut" rescue => ex pp ex.message end # ワーカーで効率よく処理するため2つのポイント # この2つを気にするだけでだいぶ違うと思います。 # ReceiveMessageで10メッセージずつ処理する # Long Pollingを使う # ReceiveMessageで10メッセージずつ処理する # 1リクエストでメッセージを10個取るようにします*1 # Long Pollingを使う # ReceiveMessageWaitTimeSecondsを設定するとロングポーリングでデキュー出来ます。 # ロングポーリングにしておくとエンキューされたら即次実行されますし # キューが溜まっていないときには接続しっぱなしで受信処理を行うため # 余計なリクエストを行わなくて済みます。 # delete_message_batch(options = {}) ⇒ Types::DeleteMessageBatchResult # Deletes up to ten messages from the specified queue. This is a batch version of DeleteMessage . The result of the action on each message is reported individually in the response. # Because the batch request can result in a combination of successful and unsuccessful actions, you should check for batch errors even when the call returns an HTTP status code of 200. # Some actions take lists of parameters. These lists are specified using the param.n notation. Values of n are integers starting from 1. For example, a parameter list with two elements looks like this: # &Attribute.1=this # &Attribute.2=that # Examples: # Request syntax with placeholder values # resp = client.delete_message_batch({ # queue_url: "String", # required # entries: [ # required # { # id: "String", # required # receipt_handle: "String", # required # }, # ], # }) # Response structure # resp.successful #=> Array # resp.successful[0].id #=> String # resp.failed #=> Array # resp.failed[0].id #=> String # resp.failed[0].sender_fault #=> true/false # resp.failed[0].code #=> String # resp.failed[0].message #=> String # JSON.parse(JSON.parse(json.body)['Message'])['bounce']['bouncedRecipients'].first['emailAddress'] # json = sqs.receive_message({queue_url: queue_url, max_number_of_messages: 10, message_attribute_names: ['Bounce'], visibility_timeout: 5, wait_time_seconds: 5}).messages.first # msg = sqs.receive_message({ queue_url: queue_url, # max_number_of_messages: 10, # message_attribute_names: ['Bounce'], # visibility_timeout: 5, # wait_time_seconds: 5 }) # messageは1つづつしか取得できない? # 少なくとも、receive_messageは1つづつしか取得できない # キューから値が取得できたりできなかったり挙動が安定しない # sqs.delete_message({queue_url: url, receipt_handle: msg.messages.first.receipt_handle}) # これでキューに入っているmessageを削除できる # :max_number_of_messages (整数) -返すメッセージの最大数。Amazon SQSはこの値より多くのメッセージを返すことはありません(ただし、返されるメッセージは少なくなります)。有効な値は1〜10です。デフォルトは1です。 # :visibility_timeout (整数) -リクエストによって取得された後に受信メッセージが後続の検索要求から隠される期間(秒単位)ReceiveMessage 。 # :wait_time_seconds (整数) -戻り前にメッセージがキューに到着するのを呼び出しが待機する時間(秒)。メッセージが利用可能な場合は、より早く呼び出しが返されますWaitTimeSeconds。 # puts msg # msg = sqs.delete_message_batch({queue_url: queue_url, entries:[{id: msg.first.message_id, receipt_handle: msg.first.receipt_handle}]})
コメントを残す