DynamoDB - Activité de table

Les flux DynamoDB vous permettent de suivre et de répondre aux modifications des éléments de table. Utilisez cette fonctionnalité pour créer une application qui répond aux changements en mettant à jour les informations entre les sources. Synchronisez les données de milliers d'utilisateurs d'un grand système multi-utilisateurs. Utilisez-le pour envoyer des notifications aux utilisateurs sur les mises à jour. Ses applications s'avèrent diverses et substantielles. Les flux DynamoDB sont le principal outil utilisé pour réaliser cette fonctionnalité.

Les flux capturent des séquences ordonnées dans le temps contenant des modifications d'éléments dans une table. Ils conservent ces données pendant un maximum de 24 heures. Les applications les utilisent pour afficher les éléments originaux et modifiés, presque en temps réel.

Les flux activés sur une table capturent toutes les modifications. Sur toute opération CRUD, DynamoDB crée un enregistrement de flux avec les attributs de clé primaire des éléments modifiés. Vous pouvez configurer des flux pour des informations supplémentaires telles que les images avant et après.

Les Streams comportent deux garanties -

  • Chaque enregistrement apparaît une fois dans le flux et

  • Chaque modification d'élément entraîne les enregistrements de flux du même ordre que celui des modifications.

Tous les flux sont traités en temps réel pour vous permettre de les utiliser pour les fonctionnalités associées dans les applications.

Gérer les flux

Lors de la création de table, vous pouvez activer un flux. Les tables existantes permettent la désactivation du flux ou la modification des paramètres. Les flux offrent la fonctionnalité de fonctionnement asynchrone, ce qui signifie aucun impact sur les performances de la table.

Utilisez la console AWS Management pour une gestion simple des flux. Tout d'abord, accédez à la console et choisissezTables. Dans l'onglet Présentation, choisissezManage Stream. Dans la fenêtre, sélectionnez les informations ajoutées à un flux sur les modifications de données de table. Après avoir entré tous les paramètres, sélectionnezEnable.

Si vous souhaitez désactiver des flux existants, sélectionnez Manage Stream, et alors Disable.

Vous pouvez également utiliser les API CreateTable et UpdateTable pour activer ou modifier un flux. Utilisez le paramètre StreamSpecification pour configurer le flux. StreamEnabled spécifie l'état, ce qui signifie vrai pour activé et faux pour désactivé.

StreamViewType spécifie les informations ajoutées au flux: KEYS_ONLY, NEW_IMAGE, OLD_IMAGE et NEW_AND_OLD_IMAGES.

Lecture en continu

Lisez et traitez les flux en vous connectant à un point de terminaison et en effectuant des requêtes API. Chaque flux se compose d'enregistrements de flux, et chaque enregistrement existe en tant que modification unique qui possède le flux. Les enregistrements de flux incluent un numéro de séquence révélant l'ordre de publication. Les enregistrements appartiennent à des groupes également appelés fragments. Les fragments fonctionnent comme des conteneurs pour plusieurs enregistrements et contiennent également les informations nécessaires pour accéder aux enregistrements et les parcourir. Après 24 heures, les enregistrements sont automatiquement supprimés.

Ces fragments sont générés et supprimés selon les besoins et ne durent pas longtemps. Ils se divisent également automatiquement en plusieurs nouveaux fragments, généralement en réponse à des pics d'activité d'écriture. Lors de la désactivation du flux, les fragments ouverts se ferment. La relation hiérarchique entre les fragments signifie que les applications doivent donner la priorité aux fragments parents pour un ordre de traitement correct. Vous pouvez utiliser l'adaptateur Kinesis pour ce faire automatiquement.

Note - Les opérations sans changement n'écrivent pas les enregistrements de flux.

L'accès et le traitement des enregistrements nécessitent l'exécution des tâches suivantes -

  • Déterminez l'ARN du flux cible.
  • Déterminez le (s) fragment (s) du flux contenant les enregistrements cibles.
  • Accédez au (x) fragment (s) pour récupérer les enregistrements souhaités.

Note- Il doit y avoir au maximum 2 processus lisant un fragment à la fois. S'il dépasse 2 processus, il peut ralentir la source.

Les actions d'API de flux disponibles incluent

  • ListStreams
  • DescribeStream
  • GetShardIterator
  • GetRecords

Vous pouvez consulter l'exemple suivant de lecture de flux -

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient;

import com.amazonaws.services.dynamodbv2.model.AttributeAction;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;

import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult;
import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;

import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
import com.amazonaws.services.dynamodbv2.model.GetRecordsResult;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult;

import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.Record;

import com.amazonaws.services.dynamodbv2.model.Shard;
import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
import com.amazonaws.services.dynamodbv2.model.StreamSpecification;
import com.amazonaws.services.dynamodbv2.model.StreamViewType;
import com.amazonaws.services.dynamodbv2.util.Tables;

public class StreamsExample {
   private static AmazonDynamoDBClient dynamoDBClient =  
      new AmazonDynamoDBClient(new ProfileCredentialsProvider());  
   private static AmazonDynamoDBStreamsClient streamsClient =  
      new AmazonDynamoDBStreamsClient(new ProfileCredentialsProvider());  

   public static void main(String args[]) {  
      dynamoDBClient.setEndpoint("InsertDbEndpointHere");   
      streamsClient.setEndpoint("InsertStreamEndpointHere");    
      
      // table creation 
      String tableName = "MyTestingTable";  
      ArrayList<AttributeDefinition> attributeDefinitions =  
         new ArrayList<AttributeDefinition>();  
      
      attributeDefinitions.add(new AttributeDefinition()
         .withAttributeName("ID") 
         .withAttributeType("N"));
         
      ArrayList<KeySchemaElement> keySchema = new 
         ArrayList<KeySchemaElement>(); 
      
      keySchema.add(new KeySchemaElement() 
         .withAttributeName("ID") 
         .withKeyType(KeyType.HASH));                       //Partition key

      StreamSpecification streamSpecification = new StreamSpecification(); 
      streamSpecification.setStreamEnabled(true); 
      streamSpecification.setStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES);  
      CreateTableRequest createTableRequest = new CreateTableRequest() 
         .withTableName(tableName) 
         .withKeySchema(keySchema) 
         .withAttributeDefinitions(attributeDefinitions) 
         .withProvisionedThroughput(new ProvisionedThroughput() 
         .withReadCapacityUnits(1L) 
         .withWriteCapacityUnits(1L))
         .withStreamSpecification(streamSpecification);  
      
      System.out.println("Executing CreateTable for " + tableName); 
      dynamoDBClient.createTable(createTableRequest);  
      System.out.println("Creating " + tableName); 
      
      try { 
         Tables.awaitTableToBecomeActive(dynamoDBClient, tableName); 
      } catch (InterruptedException e) { 
         e.printStackTrace(); 
      } 
         
      // Get the table's stream settings 
      DescribeTableResult describeTableResult =
         dynamoDBClient.describeTable(tableName);  
      
      String myStreamArn = describeTableResult.getTable().getLatestStreamArn(); 
      StreamSpecification myStreamSpec =  
         describeTableResult.getTable().getStreamSpecification();  
      
      System.out.println("Current stream ARN for " + tableName + ": "+ myStreamArn);
      System.out.println("Stream enabled: "+ myStreamSpec.getStreamEnabled()); 
      System.out.println("Update view type: "+ myStreamSpec.getStreamViewType());  
      
      // Add an item 
      int numChanges = 0; 
      System.out.println("Making some changes to table data"); 
      Map<String, AttributeValue> item = new HashMap<String, AttributeValue>(); 
      item.put("ID", new AttributeValue().withN("222")); 
      item.put("Alert", new AttributeValue().withS("item!")); 
      dynamoDBClient.putItem(tableName, item); 
      numChanges++;  
      
      // Update the item         
      Map<String, AttributeValue> key = new HashMap<String, AttributeValue>(); 
      key.put("ID", new AttributeValue().withN("222")); 
      Map<String, AttributeValueUpdate> attributeUpdates =  
      new HashMap<String, AttributeValueUpdate>(); 
      
      attributeUpdates.put("Alert", new AttributeValueUpdate() 
         .withAction(AttributeAction.PUT) 
         .withValue(new AttributeValue().withS("modified item"))); 
      
      dynamoDBClient.updateItem(tableName, key, attributeUpdates); 
      numChanges++;   
      
      // Delete the item         
      dynamoDBClient.deleteItem(tableName, key);  
      numChanges++;
      
      // Get stream shards         
      DescribeStreamResult describeStreamResult =  
      streamsClient.describeStream(new DescribeStreamRequest() 
         .withStreamArn(myStreamArn)); 
      String streamArn =  
         describeStreamResult.getStreamDescription().getStreamArn(); 
      List<Shard> shards =  
         describeStreamResult.getStreamDescription().getShards();  
      
      // Process shards 
      for (Shard shard : shards) { 
         String shardId = shard.getShardId(); 
         System.out.println("Processing " + shardId + " in "+ streamArn);  
         
         // Get shard iterator 
         GetShardIteratorRequest getShardIteratorRequest = new 
            GetShardIteratorRequest() 
            .withStreamArn(myStreamArn) 
            .withShardId(shardId) 
            .withShardIteratorType(ShardIteratorType.TRIM_HORIZON); 
         
         GetShardIteratorResult getShardIteratorResult =  
            streamsClient.getShardIterator(getShardIteratorRequest); 
         String nextItr = getShardIteratorResult.getShardIterator();  
         
         while (nextItr != null && numChanges > 0) { 
            // Read data records with iterator                 
            GetRecordsResult getRecordsResult =  
               streamsClient.getRecords(new GetRecordsRequest(). 
               withShardIterator(nextItr));
               
            List<Record> records = getRecordsResult.getRecords(); 
            System.out.println("Pulling records...");  
               
            for (Record record : records) { 
               System.out.println(record); 
               numChanges--;
            } 
            nextItr = getRecordsResult.getNextShardIterator(); 
         } 
      } 
   } 
}