Amazon SQS examples using SDK for JavaScript (v3) (original) (raw)
This is the entry point for this scenario.
import { SNSClient } from "@aws-sdk/client-sns";
import { SQSClient } from "@aws-sdk/client-sqs";
import { TopicsQueuesWkflw } from "./TopicsQueuesWkflw.js";
import { Prompter } from "@aws-doc-sdk-examples/lib/prompter.js";
export const startSnsWorkflow = () => {
const snsClient = new SNSClient({});
const sqsClient = new SQSClient({});
const prompter = new Prompter();
const logger = console;
const wkflw = new TopicsQueuesWkflw(snsClient, sqsClient, prompter, logger);
wkflw.start();
};
The preceding code provides the necessary dependencies and starts the scenario. The next section contains the bulk of the example.
const toneChoices = [
{ name: "cheerful", value: "cheerful" },
{ name: "funny", value: "funny" },
{ name: "serious", value: "serious" },
{ name: "sincere", value: "sincere" },
];
export class TopicsQueuesWkflw {
// SNS topic is configured as First-In-First-Out
isFifo = true;
// Automatic content-based deduplication is enabled.
autoDedup = false;
snsClient;
sqsClient;
topicName;
topicArn;
subscriptionArns = [];
/**
* @type {{ queueName: string, queueArn: string, queueUrl: string, policy?: string }[]}
*/
queues = [];
prompter;
/**
* @param {import('@aws-sdk/client-sns').SNSClient} snsClient
* @param {import('@aws-sdk/client-sqs').SQSClient} sqsClient
* @param {import('../../libs/prompter.js').Prompter} prompter
* @param {import('../../libs/logger.js').Logger} logger
*/
constructor(snsClient, sqsClient, prompter, logger) {
this.snsClient = snsClient;
this.sqsClient = sqsClient;
this.prompter = prompter;
this.logger = logger;
}
async welcome() {
await this.logger.log(MESSAGES.description);
}
async confirmFifo() {
await this.logger.log(MESSAGES.snsFifoDescription);
this.isFifo = await this.prompter.confirm({
message: MESSAGES.snsFifoPrompt,
});
if (this.isFifo) {
this.logger.logSeparator(MESSAGES.headerDedup);
await this.logger.log(MESSAGES.deduplicationNotice);
await this.logger.log(MESSAGES.deduplicationDescription);
this.autoDedup = await this.prompter.confirm({
message: MESSAGES.deduplicationPrompt,
});
}
}
async createTopic() {
await this.logger.log(MESSAGES.creatingTopics);
this.topicName = await this.prompter.input({
message: MESSAGES.topicNamePrompt,
});
if (this.isFifo) {
this.topicName += ".fifo";
this.logger.logSeparator(MESSAGES.headerFifoNaming);
await this.logger.log(MESSAGES.appendFifoNotice);
}
const response = await this.snsClient.send(
new CreateTopicCommand({
Name: this.topicName,
Attributes: {
FifoTopic: this.isFifo ? "true" : "false",
...(this.autoDedup ? { ContentBasedDeduplication: "true" } : {}),
},
}),
);
this.topicArn = response.TopicArn;
await this.logger.log(
MESSAGES.topicCreatedNotice
.replace("${TOPIC_NAME}", this.topicName)
.replace("${TOPIC_ARN}", this.topicArn),
);
}
async createQueues() {
await this.logger.log(MESSAGES.createQueuesNotice);
// Increase this number to add more queues.
const maxQueues = 2;
for (let i = 0; i < maxQueues; i++) {
await this.logger.log(MESSAGES.queueCount.replace("${COUNT}", i + 1));
let queueName = await this.prompter.input({
message: MESSAGES.queueNamePrompt.replace(
"${EXAMPLE_NAME}",
i === 0 ? "good-news" : "bad-news",
),
});
if (this.isFifo) {
queueName += ".fifo";
await this.logger.log(MESSAGES.appendFifoNotice);
}
const response = await this.sqsClient.send(
new CreateQueueCommand({
QueueName: queueName,
Attributes: { ...(this.isFifo ? { FifoQueue: "true" } : {}) },
}),
);
const { Attributes } = await this.sqsClient.send(
new GetQueueAttributesCommand({
QueueUrl: response.QueueUrl,
AttributeNames: ["QueueArn"],
}),
);
this.queues.push({
queueName,
queueArn: Attributes.QueueArn,
queueUrl: response.QueueUrl,
});
await this.logger.log(
MESSAGES.queueCreatedNotice
.replace("${QUEUE_NAME}", queueName)
.replace("${QUEUE_URL}", response.QueueUrl)
.replace("${QUEUE_ARN}", Attributes.QueueArn),
);
}
}
async attachQueueIamPolicies() {
for (const [index, queue] of this.queues.entries()) {
const policy = JSON.stringify(
{
Statement: [
{
Effect: "Allow",
Principal: {
Service: "sns.amazonaws.com",
},
Action: "sqs:SendMessage",
Resource: queue.queueArn,
Condition: {
ArnEquals: {
"aws:SourceArn": this.topicArn,
},
},
},
],
},
null,
2,
);
if (index !== 0) {
this.logger.logSeparator();
}
await this.logger.log(MESSAGES.attachPolicyNotice);
console.log(policy);
const addPolicy = await this.prompter.confirm({
message: MESSAGES.addPolicyConfirmation.replace(
"${QUEUE_NAME}",
queue.queueName,
),
});
if (addPolicy) {
await this.sqsClient.send(
new SetQueueAttributesCommand({
QueueUrl: queue.queueUrl,
Attributes: {
Policy: policy,
},
}),
);
queue.policy = policy;
} else {
await this.logger.log(
MESSAGES.policyNotAttachedNotice.replace(
"${QUEUE_NAME}",
queue.queueName,
),
);
}
}
}
async subscribeQueuesToTopic() {
for (const [index, queue] of this.queues.entries()) {
/**
* @type {import('@aws-sdk/client-sns').SubscribeCommandInput}
*/
const subscribeParams = {
TopicArn: this.topicArn,
Protocol: "sqs",
Endpoint: queue.queueArn,
};
let tones = [];
if (this.isFifo) {
if (index === 0) {
await this.logger.log(MESSAGES.fifoFilterNotice);
}
tones = await this.prompter.checkbox({
message: MESSAGES.fifoFilterSelect.replace(
"${QUEUE_NAME}",
queue.queueName,
),
choices: toneChoices,
});
if (tones.length) {
subscribeParams.Attributes = {
FilterPolicyScope: "MessageAttributes",
FilterPolicy: JSON.stringify({
tone: tones,
}),
};
}
}
const { SubscriptionArn } = await this.snsClient.send(
new SubscribeCommand(subscribeParams),
);
this.subscriptionArns.push(SubscriptionArn);
await this.logger.log(
MESSAGES.queueSubscribedNotice
.replace("${QUEUE_NAME}", queue.queueName)
.replace("${TOPIC_NAME}", this.topicName)
.replace("${TONES}", tones.length ? tones.join(", ") : "none"),
);
}
}
async publishMessages() {
const message = await this.prompter.input({
message: MESSAGES.publishMessagePrompt,
});
let groupId;
let deduplicationId;
let choices;
if (this.isFifo) {
await this.logger.log(MESSAGES.groupIdNotice);
groupId = await this.prompter.input({
message: MESSAGES.groupIdPrompt,
});
if (this.autoDedup === false) {
await this.logger.log(MESSAGES.deduplicationIdNotice);
deduplicationId = await this.prompter.input({
message: MESSAGES.deduplicationIdPrompt,
});
}
choices = await this.prompter.checkbox({
message: MESSAGES.messageAttributesPrompt,
choices: toneChoices,
});
}
await this.snsClient.send(
new PublishCommand({
TopicArn: this.topicArn,
Message: message,
...(groupId
? {
MessageGroupId: groupId,
}
: {}),
...(deduplicationId
? {
MessageDeduplicationId: deduplicationId,
}
: {}),
...(choices
? {
MessageAttributes: {
tone: {
DataType: "String.Array",
StringValue: JSON.stringify(choices),
},
},
}
: {}),
}),
);
const publishAnother = await this.prompter.confirm({
message: MESSAGES.publishAnother,
});
if (publishAnother) {
await this.publishMessages();
}
}
async receiveAndDeleteMessages() {
for (const queue of this.queues) {
const { Messages } = await this.sqsClient.send(
new ReceiveMessageCommand({
QueueUrl: queue.queueUrl,
}),
);
if (Messages) {
await this.logger.log(
MESSAGES.messagesReceivedNotice.replace(
"${QUEUE_NAME}",
queue.queueName,
),
);
console.log(Messages);
await this.sqsClient.send(
new DeleteMessageBatchCommand({
QueueUrl: queue.queueUrl,
Entries: Messages.map((message) => ({
Id: message.MessageId,
ReceiptHandle: message.ReceiptHandle,
})),
}),
);
} else {
await this.logger.log(
MESSAGES.noMessagesReceivedNotice.replace(
"${QUEUE_NAME}",
queue.queueName,
),
);
}
}
const deleteAndPoll = await this.prompter.confirm({
message: MESSAGES.deleteAndPollConfirmation,
});
if (deleteAndPoll) {
await this.receiveAndDeleteMessages();
}
}
async destroyResources() {
for (const subscriptionArn of this.subscriptionArns) {
await this.snsClient.send(
new UnsubscribeCommand({ SubscriptionArn: subscriptionArn }),
);
}
for (const queue of this.queues) {
await this.sqsClient.send(
new DeleteQueueCommand({ QueueUrl: queue.queueUrl }),
);
}
if (this.topicArn) {
await this.snsClient.send(
new DeleteTopicCommand({ TopicArn: this.topicArn }),
);
}
}
async start() {
console.clear();
try {
this.logger.logSeparator(MESSAGES.headerWelcome);
await this.welcome();
this.logger.logSeparator(MESSAGES.headerFifo);
await this.confirmFifo();
this.logger.logSeparator(MESSAGES.headerCreateTopic);
await this.createTopic();
this.logger.logSeparator(MESSAGES.headerCreateQueues);
await this.createQueues();
this.logger.logSeparator(MESSAGES.headerAttachPolicy);
await this.attachQueueIamPolicies();
this.logger.logSeparator(MESSAGES.headerSubscribeQueues);
await this.subscribeQueuesToTopic();
this.logger.logSeparator(MESSAGES.headerPublishMessage);
await this.publishMessages();
this.logger.logSeparator(MESSAGES.headerReceiveMessages);
await this.receiveAndDeleteMessages();
} catch (err) {
console.error(err);
} finally {
await this.destroyResources();
}
}
}
- For API details, see the following topics in AWS SDK for JavaScript API Reference.