· 7 years ago · Mar 24, 2018, 04:04 AM
1package main
2
3import (
4 "flag"
5 "fmt"
6 "log"
7 "os"
8 "time"
9
10 "github.com/aws/aws-sdk-go/aws"
11 "github.com/aws/aws-sdk-go/aws/credentials"
12 "github.com/aws/aws-sdk-go/aws/endpoints"
13 "github.com/aws/aws-sdk-go/aws/session"
14 "github.com/aws/aws-sdk-go/service/sns"
15 "github.com/aws/aws-sdk-go/service/sqs"
16)
17
18var (
19 topicArn = flag.String("topic", "", "topic ARN to subscribe to")
20 accessKey = flag.String("accesskey", "", "access key")
21 secretKey = flag.String("secretkey", "", "secret key")
22 message = flag.String("message", "", "a test message to content to publish to topic and read from queue")
23)
24
25func main() {
26 flag.Parse()
27 if *topicArn == "" || *accessKey == "" || *secretKey == "" || *message == "" {
28 flag.Usage()
29 os.Exit(1)
30 }
31
32 session := session.Must(session.NewSession(
33 &aws.Config{Credentials: credentials.NewStaticCredentials(
34 *accessKey,
35 *secretKey,
36 "",
37 )},
38 &aws.Config{Region: aws.String(endpoints.UsEast1RegionID)},
39 ))
40
41 sqsClient := sqs.New(session)
42 out1, err := sqsClient.CreateQueue(&sqs.CreateQueueInput{
43 QueueName: aws.String(fmt.Sprintf("queue-%d", time.Now().Unix())),
44 })
45 if err != nil {
46 log.Fatal(err)
47 }
48
49 out2, err := sqsClient.GetQueueAttributes(&sqs.GetQueueAttributesInput{
50 QueueUrl: out1.QueueUrl,
51 AttributeNames: []*string{
52 aws.String("QueueArn"),
53 },
54 })
55 if err != nil {
56 sqsClient.DeleteQueue(&sqs.DeleteQueueInput{
57 QueueUrl: out1.QueueUrl,
58 })
59 log.Fatal(err)
60 }
61 queueArn, found := out2.Attributes["QueueArn"]
62 if !found {
63 log.Fatal("no queue ARN attribute")
64 }
65
66 snsClient := sns.New(session)
67 _, err = snsClient.Subscribe(&sns.SubscribeInput{
68 TopicArn: topicArn,
69 Protocol: aws.String("sqs"),
70 Endpoint: queueArn,
71 })
72 if err != nil {
73 sqsClient.DeleteQueue(&sqs.DeleteQueueInput{
74 QueueUrl: out1.QueueUrl,
75 })
76 log.Fatal(err)
77 }
78
79 policy := fmt.Sprintf(`{
80 "Version":"2008-10-17",
81 "Statement":[{
82 "Effect":"Allow",
83 "Principal":"*",
84 "Action": ["sqs:SendMessage"],
85 "Resource":["%s"],
86 "Condition":{
87 "ArnLike":{"aws:SourceArn":["%s"]}
88 }
89 }]
90}`, *queueArn, *topicArn)
91 _, err = sqsClient.SetQueueAttributes(&sqs.SetQueueAttributesInput{
92 QueueUrl: out1.QueueUrl,
93 Attributes: map[string]*string{
94 "Policy": aws.String(policy),
95 },
96 })
97 if err != nil {
98 sqsClient.DeleteQueue(&sqs.DeleteQueueInput{
99 QueueUrl: out1.QueueUrl,
100 })
101 log.Fatal(err)
102 }
103
104 _, err = snsClient.Publish(&sns.PublishInput{
105 Message: message,
106 TopicArn: topicArn,
107 })
108 if err != nil {
109 sqsClient.DeleteQueue(&sqs.DeleteQueueInput{
110 QueueUrl: out1.QueueUrl,
111 })
112 log.Fatal(err)
113 }
114
115 out3, err := sqsClient.ReceiveMessage(&sqs.ReceiveMessageInput{
116 QueueUrl: out1.QueueUrl,
117 WaitTimeSeconds: aws.Int64(10),
118 })
119 if err != nil {
120 sqsClient.DeleteQueue(&sqs.DeleteQueueInput{
121 QueueUrl: out1.QueueUrl,
122 })
123 log.Fatal(err)
124 }
125 for _, m := range out3.Messages {
126 if m != nil && m.Body != nil {
127 fmt.Println(*m.Body)
128 }
129 }
130 sqsClient.DeleteQueue(&sqs.DeleteQueueInput{
131 QueueUrl: out1.QueueUrl,
132 })
133}