Rework node replacement.
authorBrandon Williams <brandonwilliams@apache.org>
Fri, 18 Oct 2013 01:59:36 +0000 (20:59 -0500)
committerBrandon Williams <brandonwilliams@apache.org>
Fri, 18 Oct 2013 01:59:36 +0000 (20:59 -0500)
Patch by brandonwilliams, reviewed by Tyle Hobbs for CASSANDRA-5916

NEWS.txt
src/java/org/apache/cassandra/config/DatabaseDescriptor.java
src/java/org/apache/cassandra/db/SystemTable.java
src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
src/java/org/apache/cassandra/gms/Gossiper.java
src/java/org/apache/cassandra/service/StorageService.java

index 000986f..a676aa1 100644 (file)
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -14,15 +14,15 @@ restore snapshots created with the previous major version using the
 using the provided 'sstableupgrade' tool.
 
 
-1.2.12
+1.2.11
 ======
 Features
 --------
     - Added a new consistenct level, LOCAL_ONE, that forces all CL.ONE operations to
       execute only in the local datacenter.
-
-1.2.11
-======
+    - New replace_address to supplant the (now removed) replace_token and
+      replace_node workflows to replace a dead node in place.  Works like the
+      old options, but takes the IP address of the node to be replaced.
 
 Upgrading
 ---------
index 633ea9a..65a20cc 100644 (file)
@@ -724,6 +724,21 @@ public class DatabaseDescriptor
         return conf.num_tokens;
     }
 
+    public static InetAddress getReplaceAddress()
+    {
+        try
+        {
+            if (System.getProperty("cassandra.replace_address", null) != null)
+                return InetAddress.getByName(System.getProperty("cassandra.replace_address", null));
+            else
+                return null;
+        }
+        catch (UnknownHostException e)
+        {
+            return null;
+        }
+    }
+
     public static Collection<String> getReplaceTokens()
     {
         return tokensFromString(System.getProperty("cassandra.replace_token", null));
@@ -742,7 +757,7 @@ public class DatabaseDescriptor
 
     public static boolean isReplacing()
     {
-        return 0 != getReplaceTokens().size() || getReplaceNode() != null;
+        return getReplaceAddress() != null;
     }
 
     public static String getClusterName()
index a87ab50..432a434 100644 (file)
@@ -654,8 +654,15 @@ public class SystemTable
         // ID not found, generate a new one, persist, and then return it.
         hostId = UUID.randomUUID();
         logger.warn("No host ID found, created {} (Note: This should happen exactly once per node).", hostId);
+        return setLocalHostId(hostId);
+    }
 
-        req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', %s)";
+    /**
+     * Sets the local host ID explicitly.  Should only be called outside of SystemTable when replacing a node.
+     */
+    public static UUID setLocalHostId(UUID hostId)
+    {
+        String req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', %s)";
         processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, hostId));
         return hostId;
     }
index 2a03ff2..b2af3a2 100644 (file)
@@ -39,7 +39,7 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAck>
         InetAddress from = message.from;
         if (logger.isTraceEnabled())
             logger.trace("Received a GossipDigestAckMessage from {}", from);
-        if (!Gossiper.instance.isEnabled())
+        if (!Gossiper.instance.isEnabled() && !Gossiper.instance.isInShadowRound())
         {
             if (logger.isTraceEnabled())
                 logger.trace("Ignoring GossipDigestAckMessage because gossip is disabled");
@@ -49,6 +49,7 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAck>
         GossipDigestAck gDigestAckMessage = message.payload;
         List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
         Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap();
+        logger.trace("Received ack with {} digests and {} states", gDigestList.size(), epStateMap.size());
 
         if ( epStateMap.size() > 0 )
         {
@@ -58,6 +59,13 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAck>
         }
 
         Gossiper.instance.checkSeedContact(from);
+        if (Gossiper.instance.isInShadowRound())
+        {
+            if (logger.isDebugEnabled())
+                logger.debug("Finishing shadow round with {}", from);
+            Gossiper.instance.finishShadowRound();
+            return; // don't bother doing anything else, we have what we came for
+        }
 
         /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
         Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
index 61d21ed..476cb72 100644 (file)
@@ -76,7 +76,7 @@ public class GossipDigestSynVerbHandler implements IVerbHandler<GossipDigestSyn>
         List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
         Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
         Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
-
+        logger.trace("sending {} digests and {} deltas", deltaGossipDigestList.size(), deltaEpStateMap.size());
         MessageOut<GossipDigestAck> gDigestAckMessage = new MessageOut<GossipDigestAck>(MessagingService.Verb.GOSSIP_DIGEST_ACK,
                                                                                                       new GossipDigestAck(deltaGossipDigestList, deltaEpStateMap),
                                                                                                       GossipDigestAck.serializer);
index acf40f3..cbb7d80 100644 (file)
@@ -107,6 +107,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     // have we ever in our lifetime reached a seed?
     private boolean seedContacted = false;
 
+    private boolean inShadowRound = false;
+
     private class GossipTask implements Runnable
     {
         public void run()
@@ -662,6 +664,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return endpointStateMap.get(ep);
     }
 
+    // removes ALL endpoint states; should only be called after shadow gossip
+    public void resetEndpointStateMap()
+    {
+        endpointStateMap.clear();
+    }
+
     public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
     {
         return endpointStateMap.entrySet();
@@ -874,7 +882,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet())
         {
             InetAddress ep = entry.getKey();
-            if ( ep.equals(FBUtilities.getBroadcastAddress()))
+            if ( ep.equals(FBUtilities.getBroadcastAddress()) && !isInShadowRound())
                 continue;
             if (justRemovedEndpoints.containsKey(ep))
             {
@@ -989,6 +997,17 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     */
     void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> deltaGossipDigestList, Map<InetAddress, EndpointState> deltaEpStateMap)
     {
+        if (gDigestList.size() == 0)
+        {
+           /* we've been sent a *completely* empty syn, which should normally never happen since an endpoint will at least send a syn with itself.
+              If this is happening then the node is attempting shadow gossip, and we should reply with everything we know.
+            */
+            logger.debug("Shadow request received, adding all states");
+            for (Map.Entry<InetAddress, EndpointState> entry : endpointStateMap.entrySet())
+            {
+                gDigestList.add(new GossipDigest(entry.getKey(), 0, 0));
+            }
+        }
         for ( GossipDigest gDigest : gDigestList )
         {
             int remoteGeneration = gDigest.getGeneration();
@@ -1074,6 +1093,43 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
                                                               TimeUnit.MILLISECONDS);
     }
 
+    /**
+     *  Do a single 'shadow' round of gossip, where we do not modify any state
+     *  Only used when replacing a node, to get and assume its states
+     */
+    public void doShadowRound()
+    {
+        buildSeedsList();
+        // send a completely empty syn
+        List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
+        GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
+                DatabaseDescriptor.getPartitionerName(),
+                gDigests);
+        MessageOut<GossipDigestSyn> message = new MessageOut<GossipDigestSyn>(MessagingService.Verb.GOSSIP_DIGEST_SYN,
+                digestSynMessage,
+                GossipDigestSyn.serializer);
+        inShadowRound = true;
+        for (InetAddress seed : seeds)
+            MessagingService.instance().sendOneWay(message, seed);
+        int slept = 0;
+        try
+        {
+            while (true)
+            {
+                Thread.sleep(1000);
+                if (!inShadowRound)
+                    break;
+                slept += 1000;
+                if (slept > StorageService.RING_DELAY)
+                    throw new RuntimeException("Unable to gossip with any seeds");
+            }
+        }
+        catch (InterruptedException wtf)
+        {
+            throw new RuntimeException(wtf);
+        }
+    }
+
     private void buildSeedsList()
     {
         for (InetAddress seed : DatabaseDescriptor.getSeeds())
@@ -1154,6 +1210,17 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
     }
 
+    protected void finishShadowRound()
+    {
+        if (inShadowRound)
+            inShadowRound = false;
+    }
+
+    protected boolean isInShadowRound()
+    {
+        return inShadowRound;
+    }
+
     @VisibleForTesting
     public void initializeNodeUnsafe(InetAddress addr, UUID uuid, int generationNbr)
     {
index 50719fd..d2c69d0 100644 (file)
@@ -387,6 +387,36 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return initialized;
     }
 
+    public synchronized Collection<Token> prepareReplacementInfo() throws ConfigurationException
+    {
+        logger.info("Gathering node replacement information for {}", DatabaseDescriptor.getReplaceAddress());
+        MessagingService.instance().listen(FBUtilities.getLocalAddress());
+
+        // make magic happen
+        Gossiper.instance.doShadowRound();
+
+        Collection<Token> tokens = new ArrayList<Token>();
+        UUID hostId = null;
+        // now that we've gossiped at least once, we should be able to find the node we're replacing
+        if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null)
+            throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
+        hostId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
+        try
+        {
+            if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS) == null)
+                throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
+            tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(DatabaseDescriptor.getReplaceAddress(), ApplicationState.TOKENS))));
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        SystemTable.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc
+        MessagingService.instance().shutdown();
+        Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
+        return tokens;
+    }
+
     public synchronized void initClient() throws IOException, ConfigurationException
     {
         // We don't wait, because we're going to actually try to work on
@@ -561,22 +591,31 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     private void joinTokenRing(int delay) throws ConfigurationException
     {
-        logger.info("Starting up server gossip");
         joined = true;
 
-        // Seed the host ID-to-endpoint map with our own ID.
-        getTokenMetadata().updateHostId(SystemTable.getLocalHostId(), FBUtilities.getBroadcastAddress());
+        Collection<Token> tokens = null;
+        Map<ApplicationState, VersionedValue> appStates = new HashMap<ApplicationState, VersionedValue>();
 
+        if (DatabaseDescriptor.getReplaceTokens().size() > 0 || DatabaseDescriptor.getReplaceNode() != null)
+            throw new RuntimeException("Replace method removed; use cassandra.replace_address instead");
+        if (DatabaseDescriptor.isReplacing())
+        {
+            if (!DatabaseDescriptor.isAutoBootstrap())
+                throw new RuntimeException("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration");
+            tokens = prepareReplacementInfo();
+            appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
+            appStates.put(ApplicationState.TOKENS, valueFactory.tokens(tokens));
+        }
         // have to start the gossip service before we can see any info on other nodes.  this is necessary
         // for bootstrap to get the load info it needs.
         // (we won't be part of the storage ring though until we add a counterId to our state, below.)
-        Map<ApplicationState, VersionedValue> appStates = new HashMap<ApplicationState, VersionedValue>();
+        // Seed the host ID-to-endpoint map with our own ID.
+        getTokenMetadata().updateHostId(SystemTable.getLocalHostId(), FBUtilities.getBroadcastAddress());
         appStates.put(ApplicationState.NET_VERSION, valueFactory.networkVersion());
         appStates.put(ApplicationState.HOST_ID, valueFactory.hostId(SystemTable.getLocalHostId()));
         appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(DatabaseDescriptor.getRpcAddress()));
-        if (DatabaseDescriptor.isReplacing())
-            appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
         appStates.put(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion());
+        logger.info("Starting up server gossip");
         Gossiper.instance.register(this);
         Gossiper.instance.register(migrationManager);
         Gossiper.instance.start(SystemTable.incrementAndGetGeneration(), appStates); // needed for node-ring gathering.
@@ -602,7 +641,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         // We attempted to replace this with a schema-presence check, but you need a meaningful sleep
         // to get schema info from gossip which defeats the purpose.  See CASSANDRA-4427 for the gory details.
         Set<InetAddress> current = new HashSet<InetAddress>();
-        Collection<Token> tokens;
         logger.debug("Bootstrap variables: {} {} {} {}",
                       new Object[]{ DatabaseDescriptor.isAutoBootstrap(),
                                     SystemTable.bootstrapInProgress(),
@@ -667,50 +705,36 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             }
             else
             {
-                if (DatabaseDescriptor.getReplaceTokens().size() != 0 && DatabaseDescriptor.getReplaceNode() != null)
-                    throw new UnsupportedOperationException("You cannot specify both replace_token and replace_node, choose one or the other");
-                try
-                {
-                    // Sleeping additionally to make sure that the server actually is not alive
-                    // and giving it more time to gossip if alive.
-                    Thread.sleep(LoadBroadcaster.BROADCAST_INTERVAL);
-                }
-                catch (InterruptedException e)
+                if (!DatabaseDescriptor.getReplaceAddress().equals(FBUtilities.getBroadcastAddress()))
                 {
-                    throw new AssertionError(e);
-                }
-                tokens = new ArrayList<Token>();
-                if (DatabaseDescriptor.getReplaceTokens().size() !=0)
-                {
-                    for (String token : DatabaseDescriptor.getReplaceTokens())
-                        tokens.add(StorageService.getPartitioner().getTokenFactory().fromString(token));
-                }
-                else
-                {
-                    assert DatabaseDescriptor.getReplaceNode() != null;
-                    InetAddress endpoint = tokenMetadata.getEndpointForHostId(DatabaseDescriptor.getReplaceNode());
-                    if (endpoint == null)
-                        throw new UnsupportedOperationException("Cannot replace host id " + DatabaseDescriptor.getReplaceNode() + " because it does not exist!");
-                    tokens = tokenMetadata.getTokens(endpoint);
-                }
-
-                // check for operator errors...
-                for (Token token : tokens)
-                {
-                    InetAddress existing = tokenMetadata.getEndpoint(token);
-                    if (existing != null)
+                    try
                     {
-                        if (Gossiper.instance.getEndpointStateForEndpoint(existing).getUpdateTimestamp() > (System.currentTimeMillis() - delay))
-                            throw new UnsupportedOperationException("Cannnot replace a token for a Live node... ");
-                        current.add(existing);
+                        // Sleep additionally to make sure that the server actually is not alive
+                        // and giving it more time to gossip if alive.
+                        Thread.sleep(LoadBroadcaster.BROADCAST_INTERVAL);
                     }
-                    else
+                    catch (InterruptedException e)
                     {
-                        throw new UnsupportedOperationException("Cannot replace token " + token + " which does not exist!");
+                        throw new AssertionError(e);
                     }
-                }
 
-                setMode(Mode.JOINING, "Replacing a node with token: " + tokens, true);
+                    // check for operator errors...
+                    for (Token token : tokens)
+                    {
+                        InetAddress existing = tokenMetadata.getEndpoint(token);
+                        if (existing != null)
+                        {
+                            if (Gossiper.instance.getEndpointStateForEndpoint(existing).getUpdateTimestamp() > (System.currentTimeMillis() - delay))
+                                throw new UnsupportedOperationException("Cannnot replace a live node... ");
+                            current.add(existing);
+                        }
+                        else
+                        {
+                            throw new UnsupportedOperationException("Cannot replace token " + token + " which does not exist!");
+                        }
+                    }
+                }
+                setMode(Mode.JOINING, "Replacing a node with token(s): " + tokens, true);
             }
 
             bootstrap(tokens);
@@ -1355,7 +1379,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
         // Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300).
         if (Gossiper.instance.usesHostId(endpoint))
-            tokenMetadata.updateHostId(Gossiper.instance.getHostId(endpoint), endpoint);
+        {
+            UUID hostId = Gossiper.instance.getHostId(endpoint);
+            if (DatabaseDescriptor.isReplacing() && Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null && (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress()))))
+                logger.warn("Not updating token metadata for {} because I am replacing it", endpoint);
+            else
+                tokenMetadata.updateHostId(hostId, endpoint);
+        }
 
         Set<Token> tokensToUpdateInMetadata = new HashSet<Token>();
         Set<Token> tokensToUpdateInSystemTable = new HashSet<Token>();