Whenever a change is made to an order we’re watching, we want to notify people watching it. In our back-end solution we have one place where all data changes go: the stream-handler.
We will be making changes to our DynamoDB Stream handler to publish to a topic to indicate certain events have occurred. These new events are:
Let’s open up the stream-handler
Lambda function code and get to work!
First, let’s import the TopicClient
from the Momento SDK.
const { unmarshall } = require('@aws-sdk/util-dynamodb');
const { SecretsManagerClient, GetSecretValueCommand } = require('@aws-sdk/client-secrets-manager');
const { CacheClient, CredentialProvider, Configurations, CacheGet, TopicClient } = require('@gomomento/sdk');
const secrets = new SecretsManagerClient();
let cacheClient;
let topicClient;
Here we’ve added a single item to our existing import statement and also globally declared the topicClient
variable. We define the topic client globally to adhere to AWS best practices.
When defining clients outside of the main handler, you allow them to be reused across function invocations. This means that the next time the stream-handler
function is called (and it’s not a cold start) we don’t have to create the clients again, they are already initialized! This results in faster performance of our function and fewer SDK calls to AWS services - both of which lower the overall operating cost of the function.
Next we need to initialize the client similarly to the CacheClient.
const initializeMomento = async () => {
if (cacheClient && topicClient) {
return;
}
const secretResponse = await secrets.send(new GetSecretValueCommand({ SecretId: process.env.SECRET_ID }));
const secret = JSON.parse(secretResponse.SecretString);
cacheClient = new CacheClient({
configuration: Configurations.InRegion.Default.latest(),
credentialProvider: CredentialProvider.fromString({ authToken: secret.momento }),
defaultTtlSeconds: 60
});
topicClient = new TopicClient({
configuration: Configurations.InRegion.Default.latest(),
credentialProvider: CredentialProvider.fromString({ authToken: secret.momento })
});
};
We updated the short-circuit condition to check for existence of the topicClient
. If one or both of the clients do not exist, we go ahead and initialize both of them. You can see the TopicClient
constructor is similar to the CacheClient
, minus the defaultTtlSeconds argument. Since Momento Topics is a pub/sub messaging service, there is no time to live to consider.
We now need to make a distinction between a new item and an update to an existing one because we need to know which event (aka topic) to fire. To do this, we need to update the method signature of the handleNewOrUpdatedCacheItem
function.
const handleNewOrUpdatedCacheItem = async (record, oldRecord) => {
await updateOrderRecord(record);
if (!oldRecord) {
await topicClient.publish('pizza', 'new-order', JSON.stringify({ id: record.pk }));
} else {
const topicName = (record.status == oldRecord.status) ? `${record.pk}-updated` : `${record.pk}-status-updated`;
await topicClient.publish('pizza', topicName, JSON.stringify({ status: record.status }));
}
};
You see the function now accepts an oldRecord argument. This is the DynamoDB record prior to the update. If oldRecord is undefined, that means this record was just created.
If the record was just created, we use the topicClient
to publish to the new-order topic and pass along the id of the order. This takes care of our new-order event ✅
If this is an update to an existing record, we need to check what changed. If the status changed, we need to publish to the {orderId}-status-updated
topic. If it was anything else, publish to the {orderId}-updated
topic. We want the distinction because users of the Order app will be notified of a status change, but not if something in the order itself changes. However, users of the Admin app will be notified of all changes.
When a user deletes their order, we need to send a notification immediately so the pizzas don’t get made in error. Let’s update the handleDeletedCacheItem
function to publish to a topic.
const handleDeletedCacheItem = async (record) => {
await Promise.allSettled([
await cacheClient.delete('pizza', record.pk),
await cacheClient.delete('pizza', `ADMIN-${record.pk}`),
await deleteArrayCacheItem('allOrders', record.pk),
await deleteArrayCacheItem(record.creator, record.pk),
await topicClient.publish('pizza', 'order-canceled', JSON.stringify({ id: record.pk }))
]);
};
We’ve only made one change here, and that’s on line 7 where we add topicClient.publish
. Simple as that!
If you want to see the entire stream-handler
function with all the updates, see below.
const { unmarshall } = require('@aws-sdk/util-dynamodb');
const { SecretsManagerClient, GetSecretValueCommand } = require('@aws-sdk/client-secrets-manager');
const { CacheClient, CredentialProvider, Configurations, CacheGet, TopicClient } = require('@gomomento/sdk');
const secrets = new SecretsManagerClient();
let cacheClient;
let topicClient;
exports.handler = async (event) => {
try {
await initializeMomento();
const record = event.Records[0];
switch (record.eventName) {
case 'INSERT':
case 'MODIFY':
await handleNewOrUpdatedCacheItem(unmarshall(record.dynamodb.NewImage), record.dynamodb.OldImage ? unmarshall(record.dynamodb.OldImage) : undefined);
break;
case 'REMOVE':
await handleDeletedCacheItem(unmarshall(record.dynamodb.OldImage));
break;
}
} catch (err) {
console.error(err);
}
};
const handleNewOrUpdatedCacheItem = async (record, oldRecord) => {
await updateOrderRecord(record);
if (!oldRecord) {
await topicClient.publish('pizza', 'new-order', JSON.stringify({ id: record.pk }));
} else {
const topicName = (record.status == oldRecord.status) ? `${record.pk}-updated` : `${record.pk}-status-updated`;
await topicClient.publish('pizza', topicName, JSON.stringify({ status: record.status }));
}
};
const updateArrayCacheItem = async (key, item) => {
let cachedArray;
const arrayResponse = await cacheClient.get('pizza', key);
if (arrayResponse instanceof CacheGet.Hit) {
cachedArray = JSON.parse(arrayResponse.valueString());
const index = cachedArray.findIndex(i => i.id == item.id);
if (index < -1) {
cachedArray[index] = { ...cachedArray[index], ...item }
} else {
cachedArray.unshift(item);
}
} else {
cachedArray = [item];
}
await cacheClient.set('pizza', key, JSON.stringify(cachedArray));
};
const handleDeletedCacheItem = async (record) => {
await Promise.allSettled([
await cacheClient.delete('pizza', record.pk),
await cacheClient.delete('pizza', `ADMIN-${record.pk}`),
await deleteArrayCacheItem('allOrders', record.pk),
await deleteArrayCacheItem(record.creator, record.pk),
await topicClient.publish('pizza', 'order-canceled', JSON.stringify({ id: record.pk }))
]);
};
const deleteArrayCacheItem = async (key, itemId) => {
const arrayResponse = await cacheClient.get('pizza', key);
if (arrayResponse instanceof CacheGet.Hit) {
const cachedArray = JSON.parse(arrayResponse.valueString());
const index = cachedArray.findIndex(i => i.id == itemId);
if (index > -1) {
cachedArray.splice(index, 1);
}
await cacheClient.set('pizza', key, JSON.stringify(cachedArray));
}
};
const initializeMomento = async () => {
if (cacheClient && topicClient) {
return;
}
const secretResponse = await secrets.send(new GetSecretValueCommand({ SecretId: process.env.SECRET_ID }));
const secret = JSON.parse(secretResponse.SecretString);
cacheClient = new CacheClient({
configuration: Configurations.InRegion.Default.latest(),
credentialProvider: CredentialProvider.fromString({ authToken: secret.momento }),
defaultTtlSeconds: 60
});
topicClient = new TopicClient({
configuration: Configurations.InRegion.Default.latest(),
credentialProvider: CredentialProvider.fromString({ authToken: secret.momento })
});
};
async function updateOrderRecord(record) {
let item = {
id: record.pk,
createdAt: record.createdAt,
status: record.status,
numItems: record.numItems,
items: record.items,
...record.lastUpdated && { lastUpdated: record.lastUpdated }
};
await Promise.allSettled([
await cacheClient.set('pizza', item.id, JSON.stringify(item)),
await cacheClient.set('pizza', `ADMIN-${item.id}`, JSON.stringify({ ...item, creator: record.creator }))
]);
delete item.items;
await Promise.allSettled([
await updateArrayCacheItem('all-orders', item),
await updateArrayCacheItem(record.creator, item)
]);
};
Next, we’re going to be covering the token vending machine (TVM) 👉