From 3d18693acfaa2961b0fbf7ef47a311ea94e8b680 Mon Sep 17 00:00:00 2001 From: Steve Singer Date: Mon, 25 Apr 2011 15:28:19 -0400 Subject: [PATCH] Automatic Wait For. This is a merge of the auto_wait_for feature branch. This change modifies slonik so that it will (in normal circumstances) automatically wait for events to be confirmed before proceeding in cases where it detects that this is required. See the documentation changes included in this patch for more details. --- clustertest/disorder/tests/BasicTest.js | 10 +- clustertest/disorder/tests/ExecuteScript.js | 7 +- clustertest/disorder/tests/FailNodeTest.js | 70 +-- clustertest/disorder/tests/Failover.js | 93 ++-- clustertest/disorder/tests/HeavyLoadTest.js | 2 + clustertest/disorder/tests/LogShipping.js | 2 +- clustertest/disorder/tests/LongTransaction.js | 7 +- clustertest/disorder/tests/MergeSet.js | 75 ++ clustertest/disorder/tests/MultipleOrigins.js | 4 +- clustertest/disorder/tests/OmitCopy.js | 1 - clustertest/disorder/tests/RecreateSet.js | 3 +- clustertest/disorder/tests/RenameTests.js | 5 +- clustertest/disorder/tests/Unsubscribe.js | 4 +- clustertest/disorder/tests/disorder_tests.js | 6 +- doc/adminguide/addthings.sgml | 15 +- doc/adminguide/events.sgml | 56 ++ doc/adminguide/slonik.sgml | 19 +- doc/adminguide/slonik_ref.sgml | 229 ++++++- src/backend/slony1_funcs.c | 43 ++- src/backend/slony1_funcs.sql | 60 ++- src/slon/remote_worker.c | 1 - src/slonik/slonik.c | 921 ++++++++++++++++++++++--- 22 files changed, 1394 insertions(+), 239 deletions(-) create mode 100644 clustertest/disorder/tests/MergeSet.js diff --git a/clustertest/disorder/tests/BasicTest.js b/clustertest/disorder/tests/BasicTest.js index b0e17f2..34f0029 100644 --- a/clustertest/disorder/tests/BasicTest.js +++ b/clustertest/disorder/tests/BasicTest.js @@ -480,11 +480,11 @@ BasicTest.prototype.subscribeSetBackground = function(setid, origin_node, slonikScript += ' subscribe set(id=' + setid + ', provider=' + provider_node + ', receiver=' + subscriber_node + ', forward=yes);\n'; - slonikScript += this.generateSlonikWait(origin_node); - slonikScript += ' echo \'syncing\';\n'; - slonikScript += ' sync(id=' + provider_node + ');\n'; - slonikScript += ' echo \'waiting for event\';\n'; - slonikScript += this.generateSlonikWait(provider_node); + //slonikScript += this.generateSlonikWait(origin_node); + //slonikScript += ' echo \'syncing\';\n'; + //slonikScript += ' sync(id=' + provider_node + ');\n'; + //slonikScript += ' echo \'waiting for event\';\n'; + //slonikScript += this.generateSlonikWait(provider_node); slonikScript += ' echo \'finished subscribing ' + subscriber_node +'\' ;\n'; var slonik = this.coordinator.createSlonik('subscribe ', preamble, diff --git a/clustertest/disorder/tests/ExecuteScript.js b/clustertest/disorder/tests/ExecuteScript.js index 61bcc67..32f8879 100644 --- a/clustertest/disorder/tests/ExecuteScript.js +++ b/clustertest/disorder/tests/ExecuteScript.js @@ -43,6 +43,7 @@ ExecuteScript.prototype.runTest = function() { */ this.subscribeSet(1, 1,1, [ 2, 3 ]); this.subscribeSet(1, 1,3, [ 4, 5 ]); + this.slonikSync(1,1); this.testAddDropColumn(1, 1, false); @@ -64,6 +65,8 @@ ExecuteScript.prototype.runTest = function() { this.subscribeSet(2, 2,3, [ 4, 5 ]); this.coordinator.log("ExecuteScript.prototype.runTest - move set to node 1"); + + /** * Move the set to node 1. We want to do this for the next test. */ @@ -173,7 +176,7 @@ ExecuteScript.prototype.testAddDropColumn = function(setid, eventNode, */ var load = this.generateLoad(); - this.coordinator.log("ExecuteScript.prototype.testAddDropColumn - add column to orders"); + this.coordinator.log("ExecuteScript.prototype.testAddDropColumn - add column to orders - expecting failure:" + expectFailure); /** * Now add a column to orders. We will do this via EXECUTE SCRIPT. */ @@ -406,7 +409,7 @@ ExecuteScript.prototype.testDDLFailure = function() { var lag = this.measureLag(1,2); this.coordinator.log('we do not expect lag, we measure it as ' + lag); this.testResults.assertCheck('node is not lagged', lag < 10,true); - + this.slonikSync(1,1); this.dropTestTable(1,1,false); statement.close(); diff --git a/clustertest/disorder/tests/FailNodeTest.js b/clustertest/disorder/tests/FailNodeTest.js index 6bd0233..c3936c0 100644 --- a/clustertest/disorder/tests/FailNodeTest.js +++ b/clustertest/disorder/tests/FailNodeTest.js @@ -88,10 +88,15 @@ FailNodeTest.prototype.runTest = function() { * DROP 3 node. * We expect it to fail since node 3 is still a provider * for nodes 4,5. + * + * perform the slonikSync to make sure node 4,5 is caught up. + * otherwise slonik will wait before actually submitting the + * dropNode but since the slon will be stopped by this.failNode() + * the event might never get propogated. */ + this.slonikSync(1,1); this.coordinator.log('failing node 3'); - this.failNode(3,true); - + /** * Readd all paths, some might have been deleted and not re-added * when we deleted nodes above. @@ -99,6 +104,9 @@ FailNodeTest.prototype.runTest = function() { * We also have to restart the slons because of bug # 120 */ this.addCompletePaths(); + + this.failNode(3,true); + /** * Sleep a bit. * Do we need to do this for the paths to propogate???? @@ -114,39 +122,20 @@ FailNodeTest.prototype.runTest = function() { } this.coordinator.log("FailNodeTest.prototype.runTest - sleeping 60x1000"); java.lang.Thread.sleep(60*1000); - /** - * Replace the generateSlonikWait function with a version that - * does individual wait for event(..) statements instead of - * a confirmed=all since we do not want to be waiting on node 3 - * since we just destroyed it. - */ - var originalGenerateWait = this.generateSlonikWait; - this.generateSlonikWait=function(event_node) { - var script=''; - for(var idx=1; idx <= this.getNodeCount(); idx ++) { - if(idx==3||idx==event_node) { - continue; - } - script += "echo 'waiting on confirm from " + idx + "';\n"; - script+='wait for event(origin=' + event_node + ',confirmed='+idx +',wait on=' + event_node+');\n'; - } - return script; - } + /** * SUBSCRIBE nodes 4,5 via node 1 directly. */ this.coordinator.log("FailNodeTest.prototype.runTest - subscribe 4,5 via 1"); - this.subscribeSet(1,1,1,[4,5]); - + this.subscribeSet(1,1,1,[4,5]); /** * Now we should be able to drop node 3. */ - this.coordinator.log("FailNodeTest.prototype.runTest - fail node 3"); + this.coordinator.log("FailNodeTest.prototype.runTest - fail node 3"); this.failNode(3,false); - this.generateSlonikWait=originalGenerateWait; load.stop(); @@ -175,7 +164,7 @@ FailNodeTest.prototype.runTest = function() { java.lang.Thread.sleep(10*1000); load.stop(); this.coordinator.join(load); - this.addCompletePaths(); + //this.addCompletePaths(); this.slonikSync(1,1); this.slonikSync(1,2); this.slonikSync(1,4); @@ -201,25 +190,7 @@ FailNodeTest.prototype.runTest = function() { this.coordinator.log("FailNodeTest.prototype.runTest - drop DB2"); //Now DROP the database. This lets us simulate a hard failure. this.dropDb(['db2']); - /** - * Replace the generateSlonikWait function with a version that - * does individual wait for event(..) statements instead of - * a confirmed=all since we do not want to be waiting on node 3 - * since we just destroyed it. - */ - var originalGenerateWait = this.generateSlonikWait; - this.generateSlonikWait=function(event_node) { - var script=''; - for(var idx=1; idx <= this.getNodeCount(); idx ++) { - if(idx==2|| idx==3|| idx==event_node) { - continue; - } - script += "echo 'waiting on confirm from " + idx + "';\n"; - script+='wait for event(origin=' + event_node + ',confirmed='+idx +',wait on=' + event_node+');\n'; - } - return script; - } - + this.coordinator.log("FailNodeTest.prototype.runTest - reshape cluster"); //Now reshape the cluster. this.subscribeSet(1,1,1,[4,5]); @@ -255,16 +226,17 @@ FailNodeTest.prototype.runTest = function() { */ FailNodeTest.prototype.failNode=function(nodeId, expectFailure) { this.coordinator.log("FailNodeTest.prototype.FailNodeTest - begin"); - this.slonArray[nodeId-1].stop(); - this.coordinator.join(this.slonArray[nodeId-1]); + var slonikPreamble = this.getSlonikPreamble(); var slonikScript = 'echo \'FailNodeTest.prototype.failNode\';\n'; - slonikScript += 'DROP NODE(id=' + nodeId + ',event node=1);\n'; + slonikScript += 'DROP NODE(id=' + nodeId + ',event node=1);\n' + + 'uninstall node(id=' + nodeId + ');\n'; + for(var idx=2; idx <= this.getNodeCount(); idx++) { if(idx == nodeId) { continue; } - slonikScript += 'wait for event(origin=1,confirmed=' + idx + ', wait on=1 );\n'; + // slonikScript += 'wait for event(origin=1,confirmed=' + idx + ', wait on=1 );\n'; } var slonik=this.coordinator.createSlonik('drop node',slonikPreamble,slonikScript); @@ -276,6 +248,8 @@ FailNodeTest.prototype.failNode=function(nodeId, expectFailure) { else { this.testResults.assertCheck('drop node okay',slonik.getReturnCode(),0); } + this.slonArray[nodeId-1].stop(); + this.coordinator.join(this.slonArray[nodeId-1]); this.coordinator.log("FailNodeTest.prototype.FailNodeTest - complete"); } diff --git a/clustertest/disorder/tests/Failover.js b/clustertest/disorder/tests/Failover.js index d3e8dab..d3d0fce 100644 --- a/clustertest/disorder/tests/Failover.js +++ b/clustertest/disorder/tests/Failover.js @@ -42,7 +42,7 @@ Failover.prototype.runTest = function() { */ this.subscribeSet(1,1, 1, [ 2, 3 ]); this.subscribeSet(1,1, 3, [ 4, 5 ]); - + this.slonikSync(1,1); var load = this.generateLoad(); @@ -81,11 +81,10 @@ Failover.prototype.runTest = function() { * * Re resubscribe node 2 to receive from node 3 before the * FAILOVER this should test the more simple case. - */ - this.coordinator.log('PROGRESS:failing 1=>3 where 2 is first moved to get from 3'); + */ this.coordinator.log('PROGRESS:failing 1=>3 where 2 is first moved to get from 3'); this.addCompletePaths(); this.subscribeSet(1,1,3,[2]); - + this.slonikSync(1,1); this.failNode(1,3,true); java.lang.Thread.sleep(10*1000); load.stop(); @@ -93,7 +92,6 @@ Failover.prototype.runTest = function() { this.dropNode(1,3); this.reAddNode(1,3,3); - this.addCompletePaths(); this.moveSet(1,3,1); @@ -144,57 +142,63 @@ Failover.prototype.runTest = function() { //this.generateSlonikWait=oldWait; load.stop(); this.coordinator.join(load); - this.dropNode(1,3); //??this.coordinator.join(subscribeSlonik[0]); this.reAddNode(1,3,3); - /** - * Now Shutdown the slon for node 4. - */ - this.coordinator.log('PROGRESS:Shutting down node 4'); - this.slonArray[3].stop(); - this.coordinator.join(this.slonArray[3]); - /** - * How does the failure behave when the slon for node 4 is down? - * - * Well for 1 the 'wait for event' to node 4 won't recover. - * - */ - this.failNode(1,3,false); - - this.slonArray[3] = this.coordinator.createSlonLauncher('db4'); - this.slonArray[3].run(); - java.lang.Thread.sleep(10*1000); - this.dropNode(1,3); - this.reAddNode(1,3,3); + this.slonikSync(1,1); this.compareDb('db1', 'db2'); this.compareDb('db1', 'db3'); this.compareDb('db1', 'db4'); this.addCompletePaths(); this.moveSet(1,3,1) - load = this.generateLoad(); + /** * Now shutdown the slon for node 3, see how a failover to node 3 behaves. */ this.coordinator.log('PROGRESS:shutting down node 3 for a failover test'); this.slonArray[2].stop(); this.coordinator.join(this.slonArray[2]); - load.stop(); - this.coordinator.join(load); - this.failNode(1,3,false); - this.coordinator.log('PROGRESS:starting slon 3 back up'); - this.slonArray[2] = this.coordinator.createSlonLauncher('db3'); - this.slonArray[2].run(); + /** + * create a timer event. + * in 60 seconds we will start up the slon again. + * the failover should not complete with the slon shutdown + * (at least not the 2.1 version of failover). + */ + + + this.coordinator.log('PROGRESS:load has stopped'); + var thisRef=this; /** * The failover needs to propogate before the DROP NODE or things can fail. */ - java.lang.Thread.sleep(10*1000); + var onTimeout = { + onEvent : function(object, event) { + thisRef.coordinator.log('PROGRESS:starting slon 3 back up'); + thisRef.slonArray[2] = thisRef.coordinator.createSlonLauncher('db3'); + thisRef.slonArray[2].run(); + + + } + }; + var timeoutObserver = new Packages.info.slony.clustertest.testcoordinator.script.ExecutionObserver(onTimeout); + var timer = this.coordinator.addTimerTask('restart slon', 120, + timeoutObserver); + this.failNode(1,3,true); + this.coordinator.removeObserver(timer, + Packages.info.slony.clustertest.testcoordinator.Coordinator.EVENT_TIMER, + timeoutObserver); + if(this.slonArray[2].isFinished()) { + thisRef.slonArray[2] = thisRef.coordinator.createSlonLauncher('db3'); + thisRef.slonArray[2].run(); + } + this.dropNode(1,3); this.reAddNode(1,3,3); + this.slonikSync(1,1); this.compareDb('db1', 'db2'); this.compareDb('db1', 'db3'); this.compareDb('db1', 'db4'); @@ -223,6 +227,8 @@ Failover.prototype.runTest = function() { slonik.run(); this.coordinator.join(slonik); this.testResults.assertCheck('drop path from 1 to 4',slonik.getReturnCode(),0); + + this.slonikSync(1,1); this.failNode(1,4,true); this.compareDb('db2','db4'); @@ -233,13 +239,13 @@ Failover.prototype.runTest = function() { this.reAddNode(1,4,4); - + this.slonikSync(1,1); for ( var idx = 1; idx <= this.getNodeCount(); idx++) { this.slonArray[idx - 1].stop(); this.coordinator.join(this.slonArray[idx - 1]); } - - this.compareDb('db1', 'db2'); + + this.compareDb('db1','db2'); this.compareDb('db1', 'db3'); this.compareDb('db1', 'db4'); this.compareDb('db4','db3'); @@ -249,9 +255,12 @@ Failover.prototype.runTest = function() { } Failover.prototype.failNode=function(node_id,backup_id, expect_success) { - + this.slonArray[node_id-1].stop(); - this.coordinator.join(this.slonArray[node_id-1]); + if(!this.slonArray[node_id-1].isFinished()) { + this.coordinator.join(this.slonArray[node_id-1]); + } + var slonikPreamble = this.getSlonikPreamble(); var slonikScript = 'echo \'Failover.prototype.failNode\';\n'; @@ -266,8 +275,8 @@ Failover.prototype.failNode=function(node_id,backup_id, expect_success) { if(waitIdx==idx || waitIdx==node_id) { continue; } - slonikScript += "echo 'waiting on " + idx + " " + waitIdx +"';\n"; - slonikScript += 'wait for event(origin=' + idx + ',wait on=' + idx + ',timeout=60,confirmed=' + waitIdx + ");\n"; + //autowaitfor slonikScript += "echo 'waiting on " + idx + " " + waitIdx +"';\n"; + //autowaitfor slonikScript += 'wait for event(origin=' + idx + ',wait on=' + idx + ',timeout=60,confirmed=' + waitIdx + ");\n"; } } //+ 'sync(id=' + backup_id + ');\n' @@ -326,7 +335,7 @@ Failover.prototype.addCompletePaths = function() { } Failover.prototype.dropNode=function(node_id,event_node) { - this.coordinator.log("Failover.prototype.dropNode - begin"); + this.coordinator.log("Failover.prototype.dropNode - begin"); var slonikPreamble = this.getSlonikPreamble(); var slonikScript = 'echo \'Failover.prototype.dropNode\';\n'; slonikScript += 'DROP NODE(id=' + node_id + ',event node=' + event_node +');\n'; @@ -340,5 +349,5 @@ Failover.prototype.dropNode=function(node_id,event_node) { slonik.run(); this.coordinator.join(slonik); this.testResults.assertCheck('slonik drop node status okay',slonik.getReturnCode(),0); - this.coordinator.log("Failover.prototype.dropNode - complete"); + this.coordinator.log("Failover.prototype.dropNode - complete"); } diff --git a/clustertest/disorder/tests/HeavyLoadTest.js b/clustertest/disorder/tests/HeavyLoadTest.js index b8ddd4d..bc6ee63 100644 --- a/clustertest/disorder/tests/HeavyLoadTest.js +++ b/clustertest/disorder/tests/HeavyLoadTest.js @@ -72,6 +72,8 @@ HeavyLoadTest.prototype.runTest = function() { var dumpFile = java.io.File.createTempFile('slon_HeavyLoadTest','.sql'); dumpFile.deleteOnExit(); + //wait until db4 is subscribed before creating the dump + this.slonikSync(1,1); var dumpProcess = this.coordinator.createLogShippingDump('db4',dumpFile); dumpProcess.run(); this.coordinator.join(dumpProcess); diff --git a/clustertest/disorder/tests/LogShipping.js b/clustertest/disorder/tests/LogShipping.js index 9fa6b8d..a08f3fc 100644 --- a/clustertest/disorder/tests/LogShipping.js +++ b/clustertest/disorder/tests/LogShipping.js @@ -60,7 +60,7 @@ LogShipping.prototype.runTest = function() { this.subscribeSet(1,1,1,[3]); java.lang.Thread.sleep(10*1000); this.subscribeSet(1,1,3,[4]); - + this.slonikSync(1,1); this.coordinator.log("LogShipping.prototype.runTest - generate load"); //Generate some load. var populate=this.generateLoad(); diff --git a/clustertest/disorder/tests/LongTransaction.js b/clustertest/disorder/tests/LongTransaction.js index fb51939..faeb432 100644 --- a/clustertest/disorder/tests/LongTransaction.js +++ b/clustertest/disorder/tests/LongTransaction.js @@ -44,6 +44,10 @@ LongTransaction.prototype.runTest = function() { slonik.run(); java.lang.Thread.sleep(10*1000); this.testResults.assertCheck('transaction is blocking add table command',slonik.isFinished(),false); + + + + txnConnection.rollback(); txnConnection.close(); this.coordinator.join(slonik); @@ -68,6 +72,7 @@ LongTransaction.prototype.runTest = function() { } //A transaction should not block the subscription. //make sure this is the case. + txnConnection = this.startTransaction(); this.coordinator.log("LongTransaction.prototype.runTest - sleep 3x60x1000"); java.lang.Thread.sleep(3*60*1000); for(var idx=0; idx < subs.length; idx++) { @@ -109,7 +114,7 @@ LongTransaction.prototype.runTest = function() { LongTransaction.prototype.startTransaction=function() { var dbCon = this.coordinator.createJdbcConnection('db1'); var stat = dbCon.createStatement(); - stat.execute('BEGIN;'); + dbCon.setAutoCommit(false); var rs= stat.executeQuery('SELECT COUNT(*) FROM disorder.do_customer;'); rs.close(); stat.close(); diff --git a/clustertest/disorder/tests/MergeSet.js b/clustertest/disorder/tests/MergeSet.js new file mode 100644 index 0000000..3cedbc5 --- /dev/null +++ b/clustertest/disorder/tests/MergeSet.js @@ -0,0 +1,75 @@ +/** + * Tests a basic merge set. + */ +coordinator.includeFile("disorder/tests/BasicTest.js"); + + +function MergeSet(coordinator,results) { + BasicTest.call(this,coordinator,results); + this.syncWaitTime = 60; + this.testDescription = 'This test exercises the merge set command \n'; +} + +MergeSet.prototype = new BasicTest(); +MergeSet.prototype.constructor=MergeSet; +MergeSet.prototype.getNodeCount = function() { + return 5; +} + +MergeSet.prototype.runTest = function() { + this.coordinator.log("MergeSet.prototype.runTest - begin"); + + this.testResults.newGroup("merge set"); + //this.prepareDb(['db1','db2']); + + +//First setup slony + this.coordinator.log("MergeSet.prototype.runTest - set up replication"); + this.setupReplication(); + this.addCompletePaths(); + this.addTables(); + + this.coordinator.log("MergeSet.prototype.runTest - start slons"); + //Start the slons. + //These must be started before slonik runs or the subscribe won't happen + //thus slonik won't finish. + var slonArray=[]; + for(var idx=1; idx <= this.getNodeCount(); idx++) { + slonArray[idx-1] = this.coordinator.createSlonLauncher('db' + idx); + slonArray[idx-1].run(); + } + this.createSecondSet(1); + var load = this.generateLoad(); + java.lang.Thread.sleep(5*1000); + this.subscribeSet(1,1,1,[2,3]); + this.subscribeSet(1,1,3,[4,5]); + this.subscribeSet(2,1,1,[2,3]); + this.subscribeSet(2,1,3,[4,5]); + var mergeScript="merge set(id=1,add id=2,origin=1);"; + var slonikPreamble=this.getSlonikPreamble(); + var slonik = this.coordinator.createSlonik('merge set',slonikPreamble,mergeScript); + slonik.run(); + this.coordinator.join(slonik); + this.testResults.assertCheck('merge set okay',slonik.getReturnCode(), + 0); + + load.stop(); + this.coordinator.join(load); + this.coordinator.log("MergeSet.prototype.runTest - subscriptions complete"); + this.slonikSync(1,1); + this.coordinator.log("MergeSet.prototype.runTest - syncing complete"); + this.compareDb('db1','db2'); + this.compareDb('db1','db3'); + this.compareDb('db1','db4'); + this.compareDb('db1','db5'); + + for(var idx=1; idx <= this.getNodeCount(); idx++) { + slonArray[idx-1].stop(); + this.coordinator.join(slonArray[idx-1]); + } + this.coordinator.log("MergeSet.prototype.runTest - complete"); +} + +MergeSet.prototype.getSyncWaitTime = function () { + return this.syncWaitTime; +} diff --git a/clustertest/disorder/tests/MultipleOrigins.js b/clustertest/disorder/tests/MultipleOrigins.js index 8860d8b..79872bb 100644 --- a/clustertest/disorder/tests/MultipleOrigins.js +++ b/clustertest/disorder/tests/MultipleOrigins.js @@ -87,7 +87,9 @@ MultipleOrigins.prototype.runTest = function() { this.compareDb('db1','db3'); - this.moveSet(1,4,1); + this.moveSet(1,4,1); + this.slonikSync(1,1); + this.slonikSync(1,4); this.failNode(1,4,true); for(var idx=1; idx <= this.getNodeCount(); idx++) { diff --git a/clustertest/disorder/tests/OmitCopy.js b/clustertest/disorder/tests/OmitCopy.js index ce0b0b3..1433cf0 100644 --- a/clustertest/disorder/tests/OmitCopy.js +++ b/clustertest/disorder/tests/OmitCopy.js @@ -137,7 +137,6 @@ OmitCopy.prototype.subscribeOmitCopy=function(origin,provider,subscriberNodeId,o var slonikPreamble = this.getSlonikPreamble(); var slonikScript = 'echo \'OmitCopy.prototype.subscribeOmitCopy\';\n'; slonikScript += "subscribe set(id=1, provider=" + provider+", receiver=" + subscriberNodeId+", omit copy=true, forward=yes);\n"; - slonikScript += ' wait for event (origin='+origin+', wait on='+provider+',confirmed=all);\n'; var slonik=this.coordinator.createSlonik('omit copy subscribe',slonikPreamble,slonikScript); slonik.run(); diff --git a/clustertest/disorder/tests/RecreateSet.js b/clustertest/disorder/tests/RecreateSet.js index 911610d..98fc0ce 100644 --- a/clustertest/disorder/tests/RecreateSet.js +++ b/clustertest/disorder/tests/RecreateSet.js @@ -67,8 +67,7 @@ RecreateSet.prototype.runTest = function() { * due to the lock on set 1. */ var slonikPreamble = this.getSlonikPreamble(); - var slonikScript = 'drop set (id=2, origin=1);\n' - + 'wait for event(origin=1, confirmed=2,wait on=1);\n'; + var slonikScript = 'drop set (id=2, origin=1);\n'; var slonik=this.coordinator.createSlonik('drop set 1',slonikPreamble,slonikScript); slonik.run(); diff --git a/clustertest/disorder/tests/RenameTests.js b/clustertest/disorder/tests/RenameTests.js index 913fbb9..818e2d8 100644 --- a/clustertest/disorder/tests/RenameTests.js +++ b/clustertest/disorder/tests/RenameTests.js @@ -40,7 +40,7 @@ RenameTests.prototype.runTest = function() { this.subscribeSet(1,1, 1, [ 2, 3 ]); this.subscribeSet(1,1, 3, [ 4, 5 ]); - + /** * Create the test_transient table. */ @@ -48,7 +48,6 @@ RenameTests.prototype.runTest = function() { this.createAndReplicateTestTable(); - /** * Add a row to the table. */ @@ -60,7 +59,7 @@ RenameTests.prototype.runTest = function() { * Ensure that changes to the table still get * replicated. */ - + this.slonikSync(1,1); this.executeScript("ALTER TABLE disorder.test_transient RENAME TO test_transient2;\n"); diff --git a/clustertest/disorder/tests/Unsubscribe.js b/clustertest/disorder/tests/Unsubscribe.js index 9448538..208fa07 100644 --- a/clustertest/disorder/tests/Unsubscribe.js +++ b/clustertest/disorder/tests/Unsubscribe.js @@ -106,8 +106,8 @@ Unsubscribe.prototype.unsubscribe=function(node_id,set_id,expect_success) { this.coordinator.log("Unsubscribe.prototype.unsubscribe - begin"); var slonikPreamble = this.getSlonikPreamble(); var slonikScript = 'echo \'Unsubscribe.prototype.unsubscribe\';\n'; - slonikScript +='unsubscribe set(id=' + set_id + ',receiver=' + node_id + ');\n' - + 'wait for event(origin=' + node_id + ',wait on=' + node_id + ',confirmed=all);\n'; + slonikScript +='unsubscribe set(id=' + set_id + ',receiver=' + node_id + ');\n'; + var slonik = this.coordinator.createSlonik('unsubscribe ' , slonikPreamble,slonikScript); slonik.run(); this.coordinator.join(slonik); diff --git a/clustertest/disorder/tests/disorder_tests.js b/clustertest/disorder/tests/disorder_tests.js index 521e1dd..4e869f1 100644 --- a/clustertest/disorder/tests/disorder_tests.js +++ b/clustertest/disorder/tests/disorder_tests.js @@ -22,6 +22,7 @@ coordinator.includeFile('disorder/tests/LongTransaction.js'); coordinator.includeFile('disorder/tests/RenameTests.js'); coordinator.includeFile('disorder/tests/CleanupTest.js'); coordinator.includeFile('disorder/tests/RecreateSet.js'); +coordinator.includeFile('disorder/tests/MergeSet.js'); var tests = [new EmptySet(coordinator,results) ,new OmitCopy(coordinator,results) @@ -44,14 +45,15 @@ var tests = ,new BigBacklogTest(coordinator,results) ,new LongTransaction(coordinator,results) ,new RenameTests(coordinator,results) - + ,new MergeSet(coordinator,results) //Below tests are known to fail. ,new UnsubscribeBeforeEnable(coordinator,results) ,new DropSet(coordinator,results) //fails bug 133 ,new CleanupTest(coordinator,results) //cleanup_interval does not (yet) do what the test wants ]; -//tests=[new CleanupTest(coordinator,results)]; +//tests=[new MergeSet(coordinator,results)]; + var basicTest = new BasicTest(coordinator,results); //Setup the schema. diff --git a/doc/adminguide/addthings.sgml b/doc/adminguide/addthings.sgml index 557fd45..76a2af7 100644 --- a/doc/adminguide/addthings.sgml +++ b/doc/adminguide/addthings.sgml @@ -36,7 +36,6 @@ slonik <<_EOF_ create set (id=2, origin=1, comment='a second replication set'); set add table (set id=2, origin=1, id=5, fully qualified name = 'public.newtable', comment='some new table'); subscribe set(id=1, provider=1,receiver=2); -wait for event(origin=1, confirmed=all, wait on=1); merge set(id=1, add id=2,origin=1); @@ -171,7 +170,6 @@ node 5 admin conninfo='host=slavehost dbname=slavedb user=slony password=slony'; clustername=testcluster; store node(id=5,comment='some slave node',event node=1); -wait for event(origin=1, confirmed=all, wait on=1); @@ -181,11 +179,12 @@ the command. node 5 admin conninfo='host=slavehost dbname=slavedb user=slony password=slony'; clustername=testcluster; node 1 admin conninfo='host=masterhost dbname=masterdb user=slony password=slony'; +# also include the admin conninfo lines for any other nodes in your cluster. +# +# clustername=testcluster; store path(server=1,client=5,conninfo='host=masterhost,dbname=masterdb,user=slony,password=slony'); store path(server=5,client=1,conninfo='host=slavehost,dbname=masterdb,user=slony,password=slony'); -wait for event(origin=1,confirmed=all,wait on=1); -wait for event(origin=5,confirmed=all,wait on=1); @@ -195,9 +194,12 @@ wait for event(origin=5,confirmed=all,wait on=1); node 5 admin conninfo='host=slavehost dbname=slavedb user=slony password=slony'; clustername=testcluster; node 1 admin conninfo='host=masterhost dbname=slavedb user=slony password=slony'; +# +# also include the admin conninfo lines for any other nodes in the cluster +# +# clustername=testcluster; subscribe set(id=1,provider=1, receiver=5,forward=yes); -wait for event(origin=1, confirmed=all, wait on=1); @@ -232,9 +234,8 @@ store path(server=3,client=2,conninfo='host=slave3host,dbname=slave3db,user=slon store path(server=2,client=3,conninfo='host=slave2host,dbname=slave2db,user=slony,password=slony'); subscribe set(set id=1, provider=1, receiver=2,forward=yes); -wait for event(origin=1, confirmed=all, wait on=1); subscribe set (set id=1,provider=2, receiver=3,forward=yes); -wait for event(origin=1,confirmed=all,wait on=1); +wait for event(origin=1, confirmed=all, wait on=1); In the above example we define paths from 1==>2 and from 2==>3 but do diff --git a/doc/adminguide/events.sgml b/doc/adminguide/events.sgml index 6162ce7..e1195f6 100644 --- a/doc/adminguide/events.sgml +++ b/doc/adminguide/events.sgml @@ -77,4 +77,60 @@ sl_log_1 table. This process will be periodically repeated as &slony1; runs. + +Slonik and Event Confirmations + + +&lslonik; can submit configuraiton commands to different event nodes +which is controlled by the parameters of the command. If two commands +are submitted to different nodes it might be important that they are +processed by other nodes in a consistent order. The &lslonik; +command can be used to accomplish this but as of &slony1; 2.1 this is +done automatically by &lslonik; in the following circumstances. + + +Before slonik submits an event to a node +it will wait until that the node has confirmed the last configuration event +from the previous event node. + +Before slonik submits a +command it will make sure that the provider node has confirmed all +configuration events from any other node. + +Before &lslonik; submits a event +it will make sure that all nodes in the cluster (other than the one +being dropped) have caught up with all other nodes + +Before slonik submits a +it will make sure that the node being cloned is caught up with all other +nodes in the cluster. + +Before slonik submits a command +it will make sure that any commands have been confirmed by +all nodes. + + + + +When &lslonik starts it will contact all nodes that it has admin conninfo +information for to find the last non-SYNC event from each node. Submitting +commands from multiple &lslonik; instances at the same time will confuse &lslonik; +and is not recommended. If &lslonik is waiting for an event confirmation +it will print a message every 10 seconds saying which events are +outstanding. Any commands that might require slonik to wait for event +confirmations can not be executed inside of a "try" block (since the + command can not be used inside of a "try" block. + + + +Automatic waiting for confirmations can be disabled in &lslonik; by running +&lslonik; with the -w option. + + + + + + + + diff --git a/doc/adminguide/slonik.sgml b/doc/adminguide/slonik.sgml index ed0ddcb..e03ebf7 100644 --- a/doc/adminguide/slonik.sgml +++ b/doc/adminguide/slonik.sgml @@ -21,10 +21,27 @@ slonik + options filename - + + Options + + + + + + Supresss slonik's behaviour of automatically waiting for + event confirmations before submitting events to a different + node. If this option is specified your slonik script will + need to include explicit commands. + This was the behaviour of slonik prior to version 2.1 + + + + + Description diff --git a/doc/adminguide/slonik_ref.sgml b/doc/adminguide/slonik_ref.sgml index 2c9d053..392fc10 100644 --- a/doc/adminguide/slonik_ref.sgml +++ b/doc/adminguide/slonik_ref.sgml @@ -523,6 +523,12 @@ INIT CLUSTER ( therein; no public objects should be locked during the duration of this. + + Slonik Event Confirmation Behaviour + Slonik will not wait for event confirmations before + performing the command. + + Version Information This command was introduced in &slony1; 1.0 @@ -604,6 +610,13 @@ INIT CLUSTER ( therein; no public objects should be locked during the duration of this. + + Slonik Event Confirmation Behaviour + Slonik will wait for the command submitted to the previous + event node to be confirmed on the specified event node before + submitting the command. + + Version Information This command was introduced in &slony1; 1.0. The SPOOLNODE parameter was introduced in version 1.1, but was vestigial in that @@ -681,6 +694,13 @@ INIT CLUSTER ( cluster. + Slonik Event Confirmation Behaviour + Slonik will wait until nodes (other than the one being dropped) + are caught up with non-SYNC events from all other nodes before + submitting the DROP NODE command. + + + Version Information This command was introduced in &slony1; 1.0 In version 2.0, the default value for EVENT NODE was removed, so a node must be specified. @@ -745,6 +765,12 @@ INIT CLUSTER ( After dropping a node, you may also need to recycle connections in your application. + + Slonik Event Confirmation Behaviour + Slonik will not wait for event confirmations before + performing the command + + Version Information This command was introduced in &slony1; 1.0 @@ -792,6 +818,13 @@ INIT CLUSTER ( No application-visible locking should take place. + + + Slonik Event Confirmation Behaviour + Slonik will not wait for event confirmations before + performing the command + + Version Information This command was introduced in &slony1; 1.0; frequent use became unnecessary as of version 1.0.5. There are, however, occasional cases where it is @@ -870,6 +903,13 @@ STORE PATH ( SERVER = 1, CLIENT = 2, No application-visible locking should take place. + + + Slonik Event Confirmation Behaviour + Slonik will not wait for event confirmations before + performing the command + + Version Information This command was introduced in &slony1; 1.0 @@ -921,6 +961,13 @@ STORE PATH ( SERVER = 1, CLIENT = 2, No application-visible locking should take place. + + + Slonik Event Confirmation Behaviour + Slonik will not wait for event confirmations before + performing the command + + Version Information This command was introduced in &slony1; 1.0 @@ -991,6 +1038,14 @@ STORE PATH ( SERVER = 1, CLIENT = 2, No application-visible locking should take place. + + + Slonik Event Confirmation Behaviour + Slonik will wait for the command submitted to the previous + event node to be confirmed on the specified event node before + submitting the command. + + Version Information This command was introduced in &slony1; 1.0. As of version 1.1, you should no longer need to use this command, as listen paths are generated automatically. @@ -1044,6 +1099,13 @@ STORE PATH ( SERVER = 1, CLIENT = 2, No application-visible locking should take place. + + Slonik Event Confirmation Behaviour + Slonik will wait for the command submitted to the previous + event node to be confirmed on the specified event node before + submitting the command. + + Version Information This command was introduced in &slony1; 1.0. As of version 1.1, you should not need to use it anymore. @@ -1163,6 +1225,16 @@ STORE PATH ( SERVER = 1, CLIENT = 2, No application-visible locking should take place. + + + Slonik Event Confirmation Behaviour + Slonik will wait for the command submitted to the previous + event node to be confirmed on the specified event node before + submitting the command. Slonik will also wait until any outstanding + DROP SET commands are confirmed by all nodes before it submits + the CREATE SET command. + + Version Information This command was introduced in &slony1; 1.0 Until version 1.2, it would crash if no comment was provided. @@ -1218,6 +1290,14 @@ STORE PATH ( SERVER = 1, CLIENT = 2, on each replicated table in order to modify the table schema to clean up the triggers and rules. + + + Slonik Event Confirmation Behaviour + Slonik will wait for the command submitted to the previous + event node to be confirmed on the specified event node before + submitting the command. + + Version Information This command was introduced in &slony1; 1.0 @@ -1273,11 +1353,7 @@ STORE PATH ( SERVER = 1, CLIENT = 2, # Assuming that node 1 is the origin of set 999 that has direct subscribers 2 and 3 SUBSCRIBE SET (ID = 999, PROVIDER = 1, RECEIVER = 2); - SYNC (ID=1); - WAIT FOR EVENT (ORIGIN = 1, CONFIRMED = 2, WAIT ON=1); SUBSCRIBE SET (ID = 999, PROVIDER = 1, RECEIVER = 3); - SYNC (ID=1); - WAIT FOR EVENT (ORIGIN = 1, CONFIRMED = 3, WAIT ON=1); MERGE SET ( ID = 1, ADD ID = 999, ORIGIN = 1 ); @@ -1295,6 +1371,15 @@ STORE PATH ( SERVER = 1, CLIENT = 2, + + Slonik Event Confirmation Behaviour + Slonik will wait for the command submitted to the previous + event node to be confirmed on the specified event node before + submitting the command. Slonik will also wait for any + in progress subscriptions involving the ADD ID to be subscribed + before submitting the MERGE SET command. + + Version Information This command was introduced in &slony1; 1.0.5. In 1.2.1, a race condition was rectified where the merge request would be submitted while @@ -1525,6 +1610,14 @@ SET ADD TABLE ( takes place at the time of the SUBSCRIBE_SET event. + + Slonik Event Confirmation Behaviour + Slonik will wait for the command submitted to the previous + event node to be confirmed on the specified event node before + submitting the command. + + + Version Information This command was introduced in &slony1; 1.0 @@ -1618,10 +1711,18 @@ SET ADD TABLE ( - Locking Behaviour + Locking Behaviour No application-visible locking should take place. + + Slonik Event Confirmation Behaviour + Slonik will wait for the command submitted to the previous + event node to be confirmed on the specified event node before + submitting the command. + + + Version Information This command was introduced in &slony1; 1.0 @@ -1677,6 +1778,13 @@ SET ADD TABLE ( replication trigger. On subscriber nodes, this also involves adding back any rules/triggers that have been hidden. + + Slonik Event Confirmation Behaviour + Slonik will wait for the command submitted to the previous + event node to be confirmed on the specified event node before + submitting the command. + + Version Information This command was introduced in &slony1; 1.0.5 @@ -1729,6 +1837,13 @@ SET ADD TABLE ( No application-visible locking should take place. + + Slonik Event Confirmation Behaviour + Slonik will wait for the command submitted to the previous + event node to be confirmed on the specified event node before + submitting the command. + + Version Information This command was introduced in &slony1; 1.0.5 @@ -1792,6 +1907,13 @@ SET MOVE TABLE ( No application-visible locking should take place. + + Slonik Event Confirmation Behaviour + Slonik will wait for the command submitted to the previous + event node to be confirmed on the specified event node before + submitting the command. + + Version Information This command was introduced in &slony1; 1.0.5 @@ -1861,6 +1983,13 @@ SET MOVE SEQUENCE ( No application-visible locking should take place. + + Slonik Event Confirmation Behaviour + Slonik will wait for the command submitted to the previous + event node to be confirmed on the specified event node before + submitting the command. + + Version Information This command was introduced in &slony1; 1.0.5 @@ -2160,6 +2289,16 @@ SUBSCRIBE SET ( locks are acquired at the beginning of the process. + + Slonik Event Confirmation Behaviour + Slonik will wait until the provider has confirmed all + outstanding configuration events from any other node before contacting + the provider to determine the set origin. Slonik will then wait until + The command submitted to the previous + event node to be confirmed on the origin before submitting the + command to the origin. + + Version Information This command was introduced in &slony1; 1.0 The OMIT COPY option was introduced in &slony1; 2.0.3. @@ -2222,6 +2361,13 @@ UNSUBSCRIBE SET ( tables and restore other triggers/rules. + Slonik Event Confirmation Behaviour + Slonik will wait for the command submitted to the previous + event node to be confirmed on the specified event node before + submitting the command. + + + Dangerous/Unintuitive Behaviour Resubscribing an unsubscribed set requires a @@ -2306,6 +2452,12 @@ LOCK SET ( on the origin node, and triggers are added to each such table that reject table updates. + + Slonik Event Confirmation Behaviour + Slonik will not wait for event confirmations before + performing the command. + + Version Information This command was introduced in &slony1; 1.0 @@ -2358,6 +2510,12 @@ UNLOCK SET ( on the origin node, as the triggers are removed from each table that reject table updates. + + Slonik Event Confirmation Behaviour + Slonik will not wait for event confirmations before + performing the command. + + Version Information This command was introduced in &slony1; 1.0 @@ -2450,6 +2608,14 @@ MOVE SET ( denyaccess trigger is dropped and a logtrigger trigger added. + + Slonik Event Confirmation Behaviour + Slonik will wait for the command submitted to the previous + event node to be confirmed on the specified event node before + submitting the command. + + + Version Information This command was introduced in &slony1; 1.0 @@ -2541,6 +2707,13 @@ FAILOVER ( + + Slonik Event Confirmation Behaviour + Slonik will submit the FAILOVER_EVENT without waiting + but wait until the most ahead node has received confirmations + of the FAILOVER_EVENT from all nodes before completing. + + Version Information This command was introduced in &slony1; 1.0 In version 2.0, the default BACKUP NODE value of 1 was removed, so it is mandatory to provide a value for this parameter. @@ -2657,6 +2830,13 @@ EXECUTE SCRIPT ( + + Slonik Event Confirmation Behaviour + Slonik will wait for the command submitted to the previous + event node to be confirmed on the specified event node before + submitting the command. + + Version Information This command was introduced in &slony1; 1.0. @@ -2735,6 +2915,12 @@ UPDATE FUNCTIONS ( No application-visible locking should take place. + + Slonik Event Confirmation Behaviour + Slonik will not wait for event confirmations before + performing the command. + + Version Information This command was introduced in &slony1; 1.0 @@ -2921,6 +3107,12 @@ REPAIR CONFIG ( No application-visible locking should take place. + + Slonik Event Confirmation Behaviour + Slonik will not wait for event confirmations before + performing the command. + + Version Information This command was introduced in &slony1; 1.1 @@ -2961,6 +3153,12 @@ REPAIR CONFIG ( No application-visible locking should take place. + + Slonik Event Confirmation Behaviour + Slonik will not wait for event confirmations before + performing the command. + + Version Information This command was introduced in &slony1; 1.1.6 / 1.2.1 @@ -2992,6 +3190,12 @@ REPAIR CONFIG ( sleep (seconds = 5); + + Slonik Event Confirmation Behaviour + Slonik will not wait for event confirmations before + performing the command. + + Version Information This command was introduced in &slony1; 1.1.6 / 1.2.1. @@ -3040,6 +3244,14 @@ REPAIR CONFIG ( sync (id=22); + + + Slonik Event Confirmation Behaviour + Slonik will wait until the node being cloned (the provider) + is caught up with all other nodes before submitting the clone prepare + command + + Version Information This command was introduced in &slony1; 2.0. @@ -3078,6 +3290,13 @@ REPAIR CONFIG ( clone finish (id = 33, provider = 22); + + Slonik Event Confirmation Behaviour + Slonik will not wait for event confirmations before + performing the command. + + + Version Information This command was introduced in &slony1; 2.0. diff --git a/src/backend/slony1_funcs.c b/src/backend/slony1_funcs.c index 6b82eee..7b4b04c 100644 --- a/src/backend/slony1_funcs.c +++ b/src/backend/slony1_funcs.c @@ -60,7 +60,7 @@ PG_FUNCTION_INFO_V1(_Slony_I_killBackend); PG_FUNCTION_INFO_V1(_Slony_I_seqtrack); PG_FUNCTION_INFO_V1(_slon_quote_ident); - +PG_FUNCTION_INFO_V1(_Slony_I_resetSession); Datum _Slony_I_createEvent(PG_FUNCTION_ARGS); Datum _Slony_I_getLocalNodeId(PG_FUNCTION_ARGS); @@ -74,6 +74,8 @@ Datum _Slony_I_seqtrack(PG_FUNCTION_ARGS); Datum _slon_quote_ident(PG_FUNCTION_ARGS); +Datum _Slony_I_resetSession(PG_FUNCTION_ARGS); + #ifdef CYGWIN extern DLLIMPORT Node *newNodeMacroHolder; #endif @@ -1400,6 +1402,45 @@ getClusterStatus(Name cluster_name, int need_plan_mask) /* @+nullderef@ */ } +Datum +_Slony_I_resetSession(PG_FUNCTION_ARGS) +{ + Slony_I_ClusterStatus *cs; + + cs = clusterStatusList; + while(cs != NULL) + { + Slony_I_ClusterStatus *previous; + if(cs->cmdtype_I) + free(cs->cmdtype_I); + if(cs->cmdtype_D) + free(cs->cmdtype_D); + if(cs->cmdtype_U) + free(cs->cmdtype_D); + if(cs->cmddata_buf) + free(cs->cmddata_buf); + free(cs->clusterident); + if(cs->plan_insert_event) + SPI_freeplan(cs->plan_insert_event); + if(cs->plan_insert_log_1) + SPI_freeplan(cs->plan_insert_log_1); + if(cs->plan_insert_log_2) + SPI_freeplan(cs->plan_insert_log_2); + if(cs->plan_record_sequences) + SPI_freeplan(cs->plan_record_sequences); + if(cs->plan_get_logstatus) + SPI_freeplan(cs->plan_get_logstatus); + previous=cs; + cs=cs->next; + free(previous); + + + } + clusterStatusList=NULL; + PG_RETURN_NULL(); + +} + /* * Local Variables: diff --git a/src/backend/slony1_funcs.sql b/src/backend/slony1_funcs.sql index 8012b36..6e5356d 100644 --- a/src/backend/slony1_funcs.sql +++ b/src/backend/slony1_funcs.sql @@ -179,6 +179,11 @@ grant execute on function @NAMESPACE@.getModuleVersion () to public; comment on function @NAMESPACE@.getModuleVersion () is 'Returns the compiled-in version number of the Slony-I shared object'; + +create or replace function @NAMESPACE@.resetSession() returns text + as '$libdir/slony1_funcs','_Slony_I_resetSession' + language C; + create or replace function @NAMESPACE@.checkmoduleversion () returns text as $$ declare moduleversion text; @@ -1526,7 +1531,7 @@ declare v_row record; begin perform "pg_catalog".setval('@NAMESPACE@.sl_local_node_id', p_no_id); - + perform @NAMESPACE@.resetSession(); for v_row in select sub_set from @NAMESPACE@.sl_subscribe where sub_receiver = p_no_id loop @@ -2478,11 +2483,12 @@ comment on function @NAMESPACE@.dropSet(p_set_id int4) is -- -- Generate the MERGE_SET event. -- ---------------------------------------------------------------------- -create or replace function @NAMESPACE@.mergeSet (p_set_id int4, p_add_id int4) +create or replace function @NAMESPACE@.mergeSet (p_set_id int4, p_add_id int4) returns bigint as $$ declare v_origin int4; + in_progress boolean; begin -- ---- -- Grab the central configuration lock @@ -2541,13 +2547,9 @@ begin -- ---- -- Check that all ENABLE_SUBSCRIPTION events for the set are confirmed -- ---- - if exists (select true from @NAMESPACE@.sl_event - where ev_type = 'ENABLE_SUBSCRIPTION' - and ev_data1 = p_add_id::text - and ev_seqno > (select max(con_seqno) from @NAMESPACE@.sl_confirm - where con_origin = ev_origin - and con_received::text = ev_data3)) - then + select @NAMESPACE@.isSubscriptionInProgress(p_add_id) into in_progress ; + + if in_progress then raise exception 'Slony-I: set % has subscriptions in progress - cannot merge', p_add_id; end if; @@ -2567,6 +2569,30 @@ comment on function @NAMESPACE@.mergeSet(p_set_id int4, p_add_id int4) is Both sets must exist, and originate on the same node. They must be subscribed by the same set of nodes.'; + +create or replace function @NAMESPACE@.isSubscriptionInProgress(p_add_id int4) +returns boolean +as $$ +DECLARE +in_progress boolean; +begin + if exists (select true from @NAMESPACE@.sl_event + where ev_type = 'ENABLE_SUBSCRIPTION' + and ev_data1 = p_add_id::text + and ev_seqno > (select max(con_seqno) from @NAMESPACE@.sl_confirm + where con_origin = ev_origin + and con_received::text = ev_data3)) + then + return true; + else + return false; + end if; +end; +$$ language plpgsql; +comment on function @NAMESPACE@.isSubscriptionInProgress(p_add_id int4) is +'Checks to see if a subscription for the indicated set is in progress. +Returns true if a subscription is in progress. Otherwise false'; + -- ---------------------------------------------------------------------- -- FUNCTION mergeSet_int (set_id, add_id) -- @@ -5756,6 +5782,21 @@ end $$ language plpgsql; comment on function @NAMESPACE@.store_application_name (i_name text) is 'Set application_name GUC, if possible. Returns NULL if it fails to work.'; +create or replace function @NAMESPACE@.is_node_reachable(origin_node_id integer, + receiver_node_id integer) returns boolean as $$ +declare + reachable boolean; + listen_row record; +begin + reachable:=false; + select * into listen_row from @NAMESPACE@.sl_listen where + li_origin=origin_node_id and li_receiver=receiver_node_id; + if found then + reachable:=true; + end if; + return reachable; +end $$ language plpgsql; + create or replace function @NAMESPACE@.component_state (i_actor text, i_pid integer, i_node integer, i_conn_pid integer, i_activity text, i_starttime timestamptz, i_event bigint, i_eventtype text) returns integer as $$ begin -- Trim out old state for this component @@ -5778,3 +5819,4 @@ language plpgsql; comment on function @NAMESPACE@.component_state (i_actor text, i_pid integer, i_node integer, i_conn_pid integer, i_activity text, i_starttime timestamptz, i_event bigint, i_eventtype text) is 'Store state of a Slony component. Useful for monitoring'; + diff --git a/src/slon/remote_worker.c b/src/slon/remote_worker.c index 9ea5a61..8421792 100644 --- a/src/slon/remote_worker.c +++ b/src/slon/remote_worker.c @@ -1195,7 +1195,6 @@ remoteWorkerThread_main(void *cdata) "select %s.failoverSet_int(%d, %d, %d, %s); ", rtcfg_namespace, rtcfg_namespace, - rtcfg_namespace, failed_node, backup_node, set_id, seqbuf); need_reloadListen = true; diff --git a/src/slonik/slonik.c b/src/slonik/slonik.c index c7b64ae..535005f 100644 --- a/src/slonik/slonik.c +++ b/src/slonik/slonik.c @@ -43,6 +43,9 @@ SlonikScript *parser_script = NULL; int parser_errors = 0; int current_try_level; +int last_event_node=-1; +int auto_wait_disabled=0; + static char myfull_path[MAXPGPATH]; static char share_path[MAXPGPATH]; @@ -91,6 +94,19 @@ slonik_add_dependent_sequences(SlonikStmt_set_add_table *stmt, const char * table_name); static int slonik_is_slony_installed(SlonikStmt * stmt, SlonikAdmInfo * adminfo); +static int slonik_submitEvent(SlonikStmt * stmt, + SlonikAdmInfo * adminfo, + SlonDString * query, + SlonikScript * script, + int supress_wait_for); + +static size_t slonik_get_last_event_id(SlonikStmt* stmt, + SlonikScript * script, + const char * event_filter, + int64 ** events); +static int slonik_wait_config_caughtup(SlonikAdmInfo * adminfo1, + SlonikStmt * stmt, + int ignore_node); /* ---------- * main * ---------- @@ -101,7 +117,7 @@ main(int argc, const char *argv[]) extern int optind; int opt; - while ((opt = getopt(argc, (char **)argv, "hv")) != EOF) + while ((opt = getopt(argc, (char **)argv, "hvw")) != EOF) { switch (opt) { @@ -113,6 +129,9 @@ main(int argc, const char *argv[]) printf("slonik version %s\n", SLONY_I_VERSION_STRING); exit(0); break; + case 'w': + auto_wait_disabled=1; + break; default: printf("unknown option '%c'\n", opt); @@ -1143,7 +1162,23 @@ static int script_exec_stmts(SlonikScript * script, SlonikStmt * hdr) { int errors = 0; + int64 * events; + size_t event_length; + int idx=0; + SlonikAdmInfo * curAdmInfo; + event_length=slonik_get_last_event_id(hdr,script,"ev_type <> 'SYNC' ", + &events); + for( curAdmInfo = script->adminfo_list; + curAdmInfo != NULL; curAdmInfo = curAdmInfo->next) + { + curAdmInfo->last_event=events[idx]; + idx++; + if(idx > event_length) + break; + } + free(events); + while (hdr && errors == 0) { hdr->script = script; @@ -2373,7 +2408,8 @@ slonik_store_node(SlonikStmt_store_node * stmt) "select \"_%s\".enableNode(%d); ", stmt->hdr.script->clustername, stmt->no_id, stmt->no_comment, stmt->hdr.script->clustername, stmt->no_id); - if (db_exec_evcommand((SlonikStmt *) stmt, adminfo2, &query) < 0) + if (slonik_submitEvent((SlonikStmt *) stmt, adminfo2, &query, + stmt->hdr.script,auto_wait_disabled) < 0) { dstring_free(&query); return -1; @@ -2389,6 +2425,7 @@ slonik_drop_node(SlonikStmt_drop_node * stmt) { SlonikAdmInfo *adminfo1; SlonDString query; + SlonikAdmInfo * curAdmInfo; adminfo1 = get_active_adminfo((SlonikStmt *) stmt, stmt->ev_origin); if (adminfo1 == NULL) @@ -2397,6 +2434,23 @@ slonik_drop_node(SlonikStmt_drop_node * stmt) if (db_begin_xact((SlonikStmt *) stmt, adminfo1) < 0) return -1; + if(!auto_wait_disabled) + { + for(curAdmInfo = stmt->hdr.script->adminfo_list; + curAdmInfo!=NULL; curAdmInfo=curAdmInfo->next) + { + if(curAdmInfo->no_id == stmt->no_id) + continue; + if(slonik_is_slony_installed((SlonikStmt*)stmt,curAdmInfo) > 0 ) + { + slonik_wait_config_caughtup(curAdmInfo,(SlonikStmt*)stmt, + stmt->no_id); + } + + } + + } + dstring_init(&query); slon_mkquery(&query, @@ -2405,11 +2459,24 @@ slonik_drop_node(SlonikStmt_drop_node * stmt) stmt->hdr.script->clustername, stmt->hdr.script->clustername, stmt->no_id); - if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0) + /** + * we disable auto wait because we perform a wait + * above ignoring the node being dropped. + */ + if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query, + stmt->hdr.script,true) < 0) { dstring_free(&query); return -1; } + /** + * if we have a conninfo for the node being dropped + * we want to clear out the last seqid. + */ + adminfo1 = get_adminfo(&stmt->hdr,stmt->no_id); + if(adminfo1 != NULL) { + adminfo1->last_event=-1; + } dstring_free(&query); return 0; @@ -2654,6 +2721,7 @@ slonik_failed_node(SlonikStmt_failed_node * stmt) "select \"_%s\".failedNode(%d, %d); ", stmt->hdr.script->clustername, stmt->no_id, stmt->backup_node); + printf("executing failedNode() on %d\n",adminfo1->no_id); if (db_exec_command((SlonikStmt *) stmt, adminfo1, &query) < 0) { free(configbuf); @@ -2894,10 +2962,23 @@ slonik_failed_node(SlonikStmt_failed_node * stmt) if (use_node != stmt->backup_node) { + + /** + * commit the transaction so a new transaction + * is ready for the lock table + */ + if (db_commit_xact((SlonikStmt *) stmt, adminfo1) < 0) + { + free(configbuf); + dstring_free(&query); + return -1; + } slon_mkquery(&query, + "lock table \"_%s\".sl_event_lock; " "select \"_%s\".storeListen(%d,%d,%d); " "select \"_%s\".subscribeSet_int(%d,%d,%d,'t','f'); ", stmt->hdr.script->clustername, + stmt->hdr.script->clustername, stmt->no_id, use_node, stmt->backup_node, stmt->hdr.script->clustername, setinfo[i].set_id, use_node, stmt->backup_node); @@ -2925,7 +3006,11 @@ slonik_failed_node(SlonikStmt_failed_node * stmt) sprintf(ev_seqno_c, INT64_FORMAT, setinfo[i].max_seqno); sprintf(ev_seqfake_c, INT64_FORMAT, ++max_seqno_total); - + if (db_commit_xact((SlonikStmt *) stmt, max_node_total->adminfo) + < 0) + { + return -1; + } slon_mkquery(&query, "lock table \"_%s\".sl_event_lock; " "select \"_%s\".failedNode2(%d,%d,%d,'%s','%s'); ", @@ -2933,20 +3018,128 @@ slonik_failed_node(SlonikStmt_failed_node * stmt) stmt->hdr.script->clustername, stmt->no_id, stmt->backup_node, setinfo[i].set_id, ev_seqno_c, ev_seqfake_c); + printf("NOTICE: executing \"_%s\".failedNode2 on node %d\n", + stmt->hdr.script->clustername, + max_node_total->adminfo->no_id); if (db_exec_command((SlonikStmt *) stmt, max_node_total->adminfo, &query) < 0) - rc = -1; + rc = -1; + else + { + SlonikAdmInfo * failed_conn_info=NULL; + SlonikAdmInfo * last_conn_info=NULL; + bool temp_conn_info=false; + /** + * now wait for the FAILOVER to finish. + * To do this we must wait for the FAILOVER_EVENT + * which has ev_origin=stmt->no_id (the failed node) + * but was incjected into the sl_event table on the + * most ahead node (max_node_total->adminfo) + * to be confirmed by the backup node. + * + * Then we wait for the backup node to send an event + * and be confirmed elsewhere. + * + */ + + + SlonikStmt_wait_event wait_event; + wait_event.hdr=*(SlonikStmt*)stmt; + wait_event.wait_origin=stmt->no_id; /*failed node*/ + wait_event.wait_on=max_node_total->adminfo->no_id; + wait_event.wait_confirmed=-1; + wait_event.wait_timeout=0; + + /** + * see if we can find a admconninfo + * for the failed node. + */ + + for(failed_conn_info = stmt->hdr.script->adminfo_list; + failed_conn_info != NULL; + failed_conn_info=failed_conn_info->next) + { + + if(failed_conn_info->no_id==stmt->no_id) + { + break; + } + last_conn_info=failed_conn_info; + } + if(failed_conn_info == NULL) + { + temp_conn_info=true; + last_conn_info->next = malloc(sizeof(SlonikAdmInfo)); + memset(last_conn_info->next,0,sizeof(SlonikAdmInfo)); + failed_conn_info=last_conn_info->next; + failed_conn_info->no_id=stmt->no_id; + failed_conn_info->stmt_filename="slonik generated"; + failed_conn_info->stmt_lno=-1; + failed_conn_info->conninfo=""; + failed_conn_info->script=last_conn_info->script; + } + + failed_conn_info->last_event=max_seqno_total; + + /* + * commit all open transactions despite of all possible errors + * otherwise the WAIT FOR will not work. + **/ + for (i = 0; i < num_nodes; i++) + { + if (db_commit_xact((SlonikStmt *) stmt, + nodeinfo[i].adminfo) < 0) + rc = -1; + } + + + rc = slonik_wait_event(&wait_event); + if(rc < 0) + { + /** + * pretty serious? how do we recover? + */ + printf("%s:%d error waiting for event\n", + stmt->hdr.stmt_filename, stmt->hdr.stmt_lno); + } + + if(temp_conn_info) + { + last_conn_info->next=failed_conn_info->next; + free(failed_conn_info); + + } + + slon_mkquery(&query, + "lock table \"_%s\".sl_event_lock; " + "select \"_%s\".createEvent('_%s', 'SYNC'); ", + stmt->hdr.script->clustername, + stmt->hdr.script->clustername, + stmt->hdr.script->clustername); + if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query, + stmt->hdr.script,1) < 0) + { + printf("%s:%d: error submitting SYNC event to backup node" + ,stmt->hdr.stmt_filename, stmt->hdr.stmt_lno); + } + + + + }/*else*/ + } } - + /* * commit all open transactions despite of all possible errors */ for (i = 0; i < num_nodes; i++) { - if (db_commit_xact((SlonikStmt *) stmt, nodeinfo[i].adminfo) < 0) + if (db_commit_xact((SlonikStmt *) stmt, + nodeinfo[i].adminfo) < 0) rc = -1; } + free(configbuf); dstring_free(&query); @@ -3020,8 +3213,11 @@ slonik_clone_prepare(SlonikStmt_clone_prepare * stmt) if (adminfo1 == NULL) return -1; - dstring_init(&query); + if(!auto_wait_disabled) + slonik_wait_config_caughtup(adminfo1,&stmt->hdr,-1); + dstring_init(&query); + if (stmt->no_comment == NULL) slon_mkquery(&query, "select \"_%s\".cloneNodePrepare(%d, %d, 'Node %d'); ", @@ -3034,7 +3230,8 @@ slonik_clone_prepare(SlonikStmt_clone_prepare * stmt) stmt->hdr.script->clustername, stmt->no_id, stmt->no_provider, stmt->no_comment); - if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0) + if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query, + stmt->hdr.script,auto_wait_disabled) < 0) { dstring_free(&query); return -1; @@ -3094,7 +3291,8 @@ slonik_store_path(SlonikStmt_store_path * stmt) stmt->hdr.script->clustername, stmt->pa_server, stmt->pa_client, stmt->pa_conninfo, stmt->pa_connretry); - if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0) + if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query, + stmt->hdr.script,1) < 0) { dstring_free(&query); return -1; @@ -3126,7 +3324,8 @@ slonik_drop_path(SlonikStmt_drop_path * stmt) stmt->hdr.script->clustername, stmt->hdr.script->clustername, stmt->pa_server, stmt->pa_client); - if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0) + if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query, + stmt->hdr.script,auto_wait_disabled) < 0) { dstring_free(&query); return -1; @@ -3159,7 +3358,8 @@ slonik_store_listen(SlonikStmt_store_listen * stmt) stmt->hdr.script->clustername, stmt->li_origin, stmt->li_provider, stmt->li_receiver); - if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0) + if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query, + stmt->hdr.script,auto_wait_disabled) < 0) { dstring_free(&query); return -1; @@ -3192,7 +3392,8 @@ slonik_drop_listen(SlonikStmt_drop_listen * stmt) stmt->hdr.script->clustername, stmt->li_origin, stmt->li_provider, stmt->li_receiver); - if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0) + if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query, + stmt->hdr.script,auto_wait_disabled) < 0) { dstring_free(&query); return -1; @@ -3210,11 +3411,62 @@ slonik_create_set(SlonikStmt_create_set * stmt) SlonikAdmInfo *adminfo1; SlonDString query; const char *comment; + SlonikAdmInfo * curAdmInfo; + int64 * drop_set_events; + int64 * cached_events; + size_t event_size; + int adm_idx; adminfo1 = get_active_adminfo((SlonikStmt *) stmt, stmt->set_origin); if (adminfo1 == NULL) return -1; + if( ! auto_wait_disabled) + { + /** + * loop through each node and make sure there are no + * pending DROP SET commands. + * + * if there is a DROP SET command from the node + * in sl_event then we wait until all other nodes are + * caughtup to that. + * + */ + event_size = slonik_get_last_event_id((SlonikStmt*)stmt, + stmt->hdr.script, + "ev_type='DROP_SET'", + &drop_set_events); + + /** + * copy the 'real' last event to storage + * and update the AdmInfo structure with the last 'DROP SET' id. + */ + cached_events=malloc(sizeof(int64)*event_size); + adm_idx=0; + for(curAdmInfo = stmt->hdr.script->adminfo_list; + curAdmInfo!=NULL; curAdmInfo=curAdmInfo->next) + { + cached_events[adm_idx]=curAdmInfo->last_event; + curAdmInfo->last_event=drop_set_events[adm_idx]; + adm_idx++; + } + slonik_wait_config_caughtup(adminfo1,&stmt->hdr,-1); + /*** + * reset the last_event values in the AdmInfo to + * the values we saved above. + */ + adm_idx=0; + for(curAdmInfo = stmt->hdr.script->adminfo_list; + curAdmInfo!=NULL; curAdmInfo=curAdmInfo->next) + { + curAdmInfo->last_event=cached_events[adm_idx]; + adm_idx++; + } + free(cached_events); + free(drop_set_events); + + } + if (db_begin_xact((SlonikStmt *) stmt, adminfo1) < 0) return -1; @@ -3231,7 +3483,8 @@ slonik_create_set(SlonikStmt_create_set * stmt) stmt->hdr.script->clustername, stmt->hdr.script->clustername, stmt->set_id, comment); - if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0) + if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query, + stmt->hdr.script,auto_wait_disabled) < 0) { dstring_free(&query); return -1; @@ -3263,7 +3516,8 @@ slonik_drop_set(SlonikStmt_drop_set * stmt) stmt->hdr.script->clustername, stmt->hdr.script->clustername, stmt->set_id); - if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0) + if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query, + stmt->hdr.script,auto_wait_disabled) < 0) { dstring_free(&query); return -1; @@ -3279,23 +3533,69 @@ slonik_merge_set(SlonikStmt_merge_set * stmt) { SlonikAdmInfo *adminfo1; SlonDString query; + PGresult *res; + bool in_progress=1; adminfo1 = get_active_adminfo((SlonikStmt *) stmt, stmt->set_origin); if (adminfo1 == NULL) return -1; - if (db_begin_xact((SlonikStmt *) stmt, adminfo1) < 0) - return -1; + + /** + * The event node (the origin) should be caught up + * with itself before submitting a merge set. + * this ensures no subscriptions involving the set + * are still in progress. + * + * (we could also check for the event number of any + * unconfirmed subscriptions and wait for that + * but we don't) + */ + + dstring_init(&query); + slon_mkquery(&query,"select \"_%s\".isSubscriptionInProgress(%d)" + ,stmt->hdr.script->clustername, + stmt->add_id); + while(in_progress) + { + char *result; + + if (db_begin_xact((SlonikStmt *) stmt, adminfo1) < 0) + return -1; + + res = db_exec_select((SlonikStmt*) stmt,adminfo1,&query); + if (res == NULL) + { + dstring_free(&query); + return -1; + + } + result = PQgetvalue(res,0,0); + if(result != NULL && (*result=='t' || + *result=='T')) + { + printf("%s:%d subscription in progress before mergeSet. waiting", + stmt->hdr.stmt_filename,stmt->hdr.stmt_lno); + db_rollback_xact((SlonikStmt *) stmt, adminfo1); + sleep(5); + } + else + in_progress=false; + if(result != NULL) + PQclear(res); + } + slon_mkquery(&query, "lock table \"_%s\".sl_event_lock;" "select \"_%s\".mergeSet(%d, %d); ", stmt->hdr.script->clustername, stmt->hdr.script->clustername, stmt->set_id, stmt->add_id); - if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0) + if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query, + stmt->hdr.script,auto_wait_disabled) < 0) { dstring_free(&query); return -1; @@ -3432,9 +3732,10 @@ slonik_set_add_single_table(SlonikStmt_set_add_table * stmt, slon_mkquery(&query, "select \"_%s\".setAddTable(%d, %d, '%q', '%q', '%q'); ", stmt->hdr.script->clustername, - stmt->set_id, tab_id, - fqname, idxname, stmt->tab_comment); - if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0) + stmt->set_id, stmt->tab_id, + stmt->tab_fqname, idxname, stmt->tab_comment); + if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query, + stmt->hdr.script,auto_wait_disabled) < 0) { PQclear(res); dstring_free(&query); @@ -3521,7 +3822,7 @@ slonik_set_add_sequence(SlonikStmt_set_add_sequence * stmt) dstring_terminate(&query); } else - rc=slonik_set_add_single_sequence((SlonikStmt*)stmt,adminfo1, + rc=slonik_set_add_single_sequence((SlonikStmt*)stmt,adminfo1, stmt->seq_fqname, stmt->set_id,stmt->seq_comment, stmt->seq_id); @@ -3545,7 +3846,7 @@ slonik_set_add_single_sequence(SlonikStmt *stmt, if(seq_id < 0) { - seq_id = slonik_get_next_sequence_id(stmt); + seq_id = slonik_get_next_sequence_id((SlonikStmt*)stmt); if(seq_id < 0) return -1; } @@ -3556,7 +3857,8 @@ slonik_set_add_single_sequence(SlonikStmt *stmt, stmt->script->clustername, set_id, seq_id, seq_name, seq_comment); - if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0) + if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query, + stmt->script,auto_wait_disabled) < 0) { db_notice_silent = false; dstring_free(&query); @@ -3589,7 +3891,8 @@ slonik_set_drop_table(SlonikStmt_set_drop_table * stmt) stmt->hdr.script->clustername, stmt->hdr.script->clustername, stmt->tab_id); - if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0) + if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query, + stmt->hdr.script,auto_wait_disabled) < 0) { dstring_free(&query); return -1; @@ -3622,7 +3925,8 @@ slonik_set_drop_sequence(SlonikStmt_set_drop_sequence * stmt) "select \"_%s\".setDropSequence(%d); ", stmt->hdr.script->clustername, stmt->seq_id); - if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0) + if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query, + stmt->hdr.script,auto_wait_disabled) < 0) { db_notice_silent = false; dstring_free(&query); @@ -3655,7 +3959,8 @@ slonik_set_move_table(SlonikStmt_set_move_table * stmt) stmt->hdr.script->clustername, stmt->hdr.script->clustername, stmt->tab_id, stmt->new_set_id); - if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0) + if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query, + stmt->hdr.script,auto_wait_disabled) < 0) { dstring_free(&query); return -1; @@ -3686,7 +3991,8 @@ slonik_set_move_sequence(SlonikStmt_set_move_sequence * stmt) stmt->hdr.script->clustername, stmt->hdr.script->clustername, stmt->seq_id, stmt->new_set_id); - if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0) + if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query, + stmt->hdr.script,auto_wait_disabled) < 0) { dstring_free(&query); return -1; @@ -3719,6 +4025,21 @@ slonik_subscribe_set(SlonikStmt_subscribe_set * stmt) * about this change directly. */ + /** + * we don't actually want to execute that query until + * the provider node is caught up with all other nodes wrt config data. + * + * this is because we don't want to pick the origin based on + * stale data. + * + * @note an alternative might be to contact all adminconninfo + * nodes looking for the set origin and then submit the + * set origin to that. This avoids the wait for and is probably + * what we should do. + */ + if(!auto_wait_disabled) + slonik_wait_config_caughtup(adminfo1,&stmt->hdr,-1); + slon_mkquery(&query,"select count(*) FROM \"_%s\".sl_subscribe " \ "where sub_set=%d AND sub_receiver=%d " \ " and sub_active=true and sub_provider<>%d", @@ -3726,14 +4047,17 @@ slonik_subscribe_set(SlonikStmt_subscribe_set * stmt) stmt->sub_setid,stmt->sub_receiver, stmt->sub_provider); - res1 = db_exec_select((SlonikStmt*) stmt,adminfo1,&query); + res1 = db_exec_select(&stmt->hdr,adminfo1,&query); if(res1 == NULL) { dstring_free(&query); return -1; } - if (strtol(PQgetvalue(res1, 0, 0), NULL, 10) > 0) + if(PQntuples(res1) > 0) { - reshape=1; + if (strtol(PQgetvalue(res1, 0, 0), NULL, 10) > 0) + { + reshape=1; + } } PQclear(res1); dstring_reset(&query); @@ -3743,9 +4067,12 @@ slonik_subscribe_set(SlonikStmt_subscribe_set * stmt) " set_id=%d",stmt->hdr.script->clustername, stmt->sub_setid); res1 = db_exec_select((SlonikStmt*)stmt,adminfo1,&query); - if(res1==NULL) + if(res1==NULL || PQntuples(res1) <= 0 ) { - PQclear(res1); + printf("%s:%d error: can not determine set origin for set %d\n", + stmt->hdr.stmt_filename,stmt->hdr.stmt_lno,stmt->sub_setid); + if(res1 != NULL) + PQclear(res1); dstring_free(&query); return -1; @@ -3772,7 +4099,8 @@ slonik_subscribe_set(SlonikStmt_subscribe_set * stmt) stmt->sub_receiver, (stmt->sub_forward) ? "t" : "f", (stmt->omit_copy) ? "t" : "f"); - if (db_exec_evcommand((SlonikStmt *) stmt, adminfo2, &query) < 0) + if (slonik_submitEvent((SlonikStmt *) stmt, adminfo2, &query, + stmt->hdr.script,auto_wait_disabled) < 0) { dstring_free(&query); return -1; @@ -3824,7 +4152,8 @@ slonik_unsubscribe_set(SlonikStmt_unsubscribe_set * stmt) stmt->hdr.script->clustername, stmt->hdr.script->clustername, stmt->sub_setid, stmt->sub_receiver); - if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0) + if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query, + stmt->hdr.script,auto_wait_disabled) < 0) { dstring_free(&query); return -1; @@ -3970,7 +4299,8 @@ slonik_move_set(SlonikStmt_move_set * stmt) stmt->hdr.script->clustername, stmt->hdr.script->clustername, stmt->set_id, stmt->new_origin); - if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0) + if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query, + stmt->hdr.script,auto_wait_disabled) < 0) { dstring_free(&query); return -1; @@ -4033,7 +4363,8 @@ slonik_ddl_script(SlonikStmt_ddl_script * stmt) stmt->ddl_setid, /* dstring_data(&script), */ stmt->only_on_node); - if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0) + if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query, + stmt->hdr.script,auto_wait_disabled) < 0) { dstring_free(&query); return -1; @@ -4206,6 +4537,9 @@ slonik_wait_event(SlonikStmt_wait_event * stmt) time_t now; int all_confirmed = 0; char seqbuf[64]; + int loop_count=0; + SlonDString outstanding_nodes; + int tupindex; adminfo1 = get_active_adminfo((SlonikStmt *) stmt, stmt->wait_on); if (adminfo1 == NULL) @@ -4214,7 +4548,7 @@ slonik_wait_event(SlonikStmt_wait_event * stmt) time(&timeout); timeout += stmt->wait_timeout; dstring_init(&query); - + dstring_init(&outstanding_nodes); while (!all_confirmed) { all_confirmed = 1; @@ -4288,7 +4622,21 @@ slonik_wait_event(SlonikStmt_wait_event * stmt) return -1; } if (PQntuples(res) > 0) - all_confirmed = 0; + { + all_confirmed = 0; + dstring_reset(&outstanding_nodes); + for(tupindex=0; tupindex < PQntuples(res); tupindex++) + { + char * node = PQgetvalue(res,tupindex,0); + char * last_event = PQgetvalue(res,tupindex,1); + if( last_event == 0) + last_event="null"; + slon_appendquery(&outstanding_nodes,"%snode %s only on event %s" + , tupindex==0 ? "" : ", " + , node,last_event); + + } + } PQclear(res); if (!all_confirmed) @@ -4310,9 +4658,27 @@ slonik_wait_event(SlonikStmt_wait_event * stmt) return -1; } + loop_count++; + if(loop_count % 10 == 0 && stmt->wait_confirmed >= 0) + { + sprintf(seqbuf, INT64_FORMAT, adminfo->last_event); + printf("%s:%d: waiting for event (%d,%s) to be confirmed on node %d\n" + ,stmt->hdr.stmt_filename,stmt->hdr.stmt_lno + ,stmt->wait_origin,seqbuf, + stmt->wait_confirmed); + } + else if (loop_count % 10 ==0 ) + { + sprintf(seqbuf, INT64_FORMAT, adminfo->last_event); + printf("%s:%d: waiting for event (%d,%s). %s\n", + stmt->hdr.stmt_filename,stmt->hdr.stmt_lno, + stmt->wait_origin,seqbuf, + dstring_data(&outstanding_nodes)); + + } sleep(1); } - + dstring_free(&outstanding_nodes); dstring_free(&query); return 0; @@ -4371,7 +4737,8 @@ slonik_sync(SlonikStmt_sync * stmt) stmt->hdr.script->clustername, stmt->hdr.script->clustername, stmt->hdr.script->clustername); - if (db_exec_evcommand((SlonikStmt *) stmt, adminfo1, &query) < 0) + if (slonik_submitEvent((SlonikStmt *) stmt, adminfo1, &query, + stmt->hdr.script,1) < 0) { dstring_free(&query); return -1; @@ -4501,11 +4868,11 @@ replace_token(char *resout, char *lines, const char *token, const char *replacem } /** - * checks all nodes (that an admin conninfo exists for) - * to find the next available table id. - * - * - */ +* checks all nodes (that an admin conninfo exists for) +* to find the next available table id. +* +* +*/ static int slonik_get_next_tab_id(SlonikStmt * stmt) { @@ -4515,7 +4882,7 @@ slonik_get_next_tab_id(SlonikStmt * stmt) int tab_id=0; char * tab_id_str; PGresult* res; - + dstring_init(&query); slon_mkquery(&query, "select max(tab_id) FROM \"_%s\".sl_table", @@ -4541,13 +4908,13 @@ slonik_get_next_tab_id(SlonikStmt * stmt) res = db_exec_select((SlonikStmt*)stmt,adminfo,&query); if(res == NULL ) { - printf("%s:%d: Error:could not query node %d for next table id", - stmt->stmt_filename,stmt->stmt_lno, - adminfo->no_id); - if( res != NULL) - PQclear(res); - dstring_terminate(&query); - return -1; + printf("%s:%d: Error:could not query node %d for next table id", + stmt->stmt_filename,stmt->stmt_lno, + adminfo->no_id); + if( res != NULL) + PQclear(res); + dstring_terminate(&query); + return -1; } } else @@ -4561,7 +4928,7 @@ slonik_get_next_tab_id(SlonikStmt * stmt) { tab_id_str = PQgetvalue(res,0,0); if(tab_id_str != NULL) - tab_id=strtol(tab_id_str,NULL,10); + tab_id=strtol(tab_id_str,NULL,10); else { PQclear(res); @@ -4579,11 +4946,11 @@ slonik_get_next_tab_id(SlonikStmt * stmt) /** - * checks all nodes (that an admin conninfo exists for) - * to find the next available sequence id. - * - * - */ +* checks all nodes (that an admin conninfo exists for) +* to find the next available sequence id. +* +* +*/ static int slonik_get_next_sequence_id(SlonikStmt * stmt) { @@ -4594,7 +4961,7 @@ slonik_get_next_sequence_id(SlonikStmt * stmt) char * seq_id_str; PGresult* res; int rc; - + dstring_init(&query); slon_mkquery(&query, "select max(seq_id) FROM \"_%s\".sl_sequence", @@ -4642,7 +5009,7 @@ slonik_get_next_sequence_id(SlonikStmt * stmt) { seq_id_str = PQgetvalue(res,0,0); if(seq_id_str != NULL) - seq_id=strtol(seq_id_str,NULL,10); + seq_id=strtol(seq_id_str,NULL,10); else { PQclear(res); @@ -4658,18 +5025,18 @@ slonik_get_next_sequence_id(SlonikStmt * stmt) } /** - * find the origin node for a particular set. - * This function will query the first admin node it - * finds to determine the origin of the set. - * - * If the node doesn't know about the set then - * it will query the next admin node until it finds - * one that does. - * - */ +* find the origin node for a particular set. +* This function will query the first admin node it +* finds to determine the origin of the set. +* +* If the node doesn't know about the set then +* it will query the next admin node until it finds +* one that does. +* +*/ static int find_origin(SlonikStmt * stmt,int set_id) { - + SlonikAdmInfo *adminfo_def; SlonDString query; PGresult * res; @@ -4679,7 +5046,7 @@ static int find_origin(SlonikStmt * stmt,int set_id) slon_mkquery(&query, "select set_origin from \"_%s\".\"sl_set\" where set_id=%d", stmt->script->clustername,set_id); - + for (adminfo_def = stmt->script->adminfo_list; adminfo_def; adminfo_def = adminfo_def->next) { @@ -4700,14 +5067,14 @@ static int find_origin(SlonikStmt * stmt,int set_id) origin_id_str = PQgetvalue(res,0,0); if(origin_id_str != NULL) { - origin_id=strtol(origin_id_str,NULL,10); - PQclear(res); + origin_id=strtol(origin_id_str,NULL,10); + PQclear(res); } else { PQclear(res); continue; - + } } if(origin_id >= 0) @@ -4716,22 +5083,22 @@ static int find_origin(SlonikStmt * stmt,int set_id) dstring_terminate(&query); - + return origin_id; } /** - * adds any sequences that table_name depends on to the replication - * set. - * - * - * - */ +* adds any sequences that table_name depends on to the replication +* set. +* +* +* +*/ int slonik_add_dependent_sequences(SlonikStmt_set_add_table *stmt, - SlonikAdmInfo * adminfo1, - const char * table_name) + SlonikAdmInfo * adminfo1, + const char * table_name) { SlonDString query; @@ -4740,7 +5107,7 @@ slonik_add_dependent_sequences(SlonikStmt_set_add_table *stmt, const char * seq_name; char * comment; int rc; - + dstring_init(&query); slon_mkquery(&query, "select pg_get_serial_sequence('%s',column_name) " @@ -4755,7 +5122,7 @@ slonik_add_dependent_sequences(SlonikStmt_set_add_table *stmt, } for(idx=0; idx < PQntuples(result);idx++) { - + if(!PQgetisnull(result,idx,0) ) { seq_name=PQgetvalue(result,idx,0); @@ -4765,9 +5132,9 @@ slonik_add_dependent_sequences(SlonikStmt_set_add_table *stmt, comment=malloc(strlen(table_name)+strlen("sequence for"+1)); sprintf(comment,"sequence for %s",table_name); rc=slonik_set_add_single_sequence((SlonikStmt*)stmt,adminfo1, - seq_name, - stmt->set_id, - comment,-1); + seq_name, + stmt->set_id, + comment,-1); free(comment); if(rc < 0 ) { @@ -4775,33 +5142,33 @@ slonik_add_dependent_sequences(SlonikStmt_set_add_table *stmt, dstring_terminate(&query); return rc; } - + } }/*for*/ PQclear(result); dstring_terminate(&query); return 0; - + } /** - * checks to see if slony is installed on the given node. - * - * this function will check to see if slony tables exist - * on the node by querying the information_schema. - * - * returns: - * -1 => could not query information schema - * 0 => slony not installed - * 1 => slony is installed. - */ +* checks to see if slony is installed on the given node. +* +* this function will check to see if slony tables exist +* on the node by querying the information_schema. +* +* returns: +* -1 => could not query information schema +* 0 => slony not installed +* 1 => slony is installed. +*/ static int slonik_is_slony_installed(SlonikStmt * stmt, - SlonikAdmInfo * adminfo) + SlonikAdmInfo * adminfo) { - SlonDString query; + SlonDString query; PGresult * res; dstring_init(&query); int rc=-1; @@ -4818,10 +5185,354 @@ slonik_is_slony_installed(SlonikStmt * stmt, if(res != NULL) PQclear(res); + db_rollback_xact(stmt, adminfo); dstring_terminate(&query); return rc; + +} + + +static int slonik_submitEvent(SlonikStmt * stmt, + SlonikAdmInfo * adminfo, + SlonDString * query, + SlonikScript * script, + int suppress_wait_for) + +{ + int rc; + if ( last_event_node >= 0 && + last_event_node != adminfo->no_id + && ! suppress_wait_for ) + { + /** + * the last event node is not the current event node. + * time to wait. + */ + + if( current_try_level != 0) + { + printf("%s:%d Error: the event origin can not be changed " + "inside of a try block", + stmt->stmt_filename, stmt->stmt_lno); + return -1; + } + + /** + * for now we generate a 'fake' Slonik_wait_event structure + * + */ + SlonikStmt_wait_event wait_event; + wait_event.hdr=*stmt; + wait_event.wait_origin=last_event_node; + wait_event.wait_on=last_event_node; + wait_event.wait_confirmed=adminfo->no_id; + wait_event.wait_timeout=0; + rc = slonik_wait_event(&wait_event); + if(rc < 0) + return rc; + + } + rc= db_exec_evcommand(stmt,adminfo,query); + if(! suppress_wait_for) + last_event_node=adminfo->no_id; + return rc; + +} + +/** + * + * query all nodes we have admin conninfo data for and + * find the last non SYNC event id generated from that node. + * + * store this in the SlonikAdmInfo structure so it can later + * be used as part of a wait for. + * + */ +static size_t slonik_get_last_event_id(SlonikStmt *stmt, + SlonikScript * script, + const char * event_filter, + int64 ** events) +{ + + SlonDString query; + PGresult * result; + char * event_id; + SlonikAdmInfo * curAdmInfo=NULL; + int node_count=0; + int node_idx; + int rc; + + dstring_init(&query); + slon_mkquery(&query,"select max(ev_seqno) FROM \"_%s\".sl_event" + " , \"_%s\".sl_node " + " where ev_origin=\"_%s\".getLocalNodeId('_%s') " + " AND %s AND sl_node.no_id=" + " ev_origin" + , script->clustername,script->clustername, + script->clustername,script->clustername,event_filter); + node_count=0; + for( curAdmInfo = script->adminfo_list; + curAdmInfo != NULL; curAdmInfo = curAdmInfo->next) + { + node_count++; + } + *events = malloc(sizeof(int64)*(node_count+1)); + node_idx=0; + for( curAdmInfo = script->adminfo_list; + curAdmInfo != NULL; curAdmInfo = curAdmInfo->next,node_idx++) + { + SlonikAdmInfo * activeAdmInfo = + get_active_adminfo(stmt,curAdmInfo->no_id); + if( activeAdmInfo == NULL) + { + /** + * warning? + */ + continue; + } + rc = slonik_is_slony_installed(stmt,activeAdmInfo); + if(rc == 1) + { + result = db_exec_select(stmt,activeAdmInfo,&query); + if(result == NULL || PQntuples(result) != 1 ) + { + printf("error: unable to query event history on node %d\n", + curAdmInfo->no_id); + if(result != NULL) + PQclear(result); + return -1; + } + event_id = PQgetvalue(result,0,0); + db_rollback_xact(stmt, activeAdmInfo); + if(event_id != NULL) + (*events)[node_idx]=strtoll(event_id,NULL,10); + else + (*events)[node_idx]=-1; + PQclear(result); + } + else { + (*events)[node_idx]=-1; + } + + } + + + dstring_terminate(&query); + return node_count; +} + +/** + * waits until adminfo1 is caught up with config events from + * all other nodes. + * + * adminfo1 - The node that we are waiting to be caught up + * stmt - The statement that is currently being executed + * ignore_node - allows 1 node to be ignored (don't wait for + * adminfo1 to be caught up with that node) + * -1 means don't ignore any nodes. + */ +static int slonik_wait_config_caughtup(SlonikAdmInfo * adminfo1, + SlonikStmt * stmt, + int ignore_node) +{ + SlonDString event_list; + PGresult * result=NULL; + SlonikAdmInfo * curAdmInfo=NULL; + int first_event=1; + int confirm_count=0; + SlonDString is_caughtup_query; + SlonDString node_list; + int wait_count=0; + int node_list_size=0; + int sleep_count=0; + int64* behind_nodes=NULL; + int idx; + int cur_array_idx; + + /** + * an array that stores a node_id, last_event. + * or the last event seen for each admin conninfo + * node. + */ + int64 * last_event_array=NULL; + + + dstring_init(&event_list); + dstring_init(&node_list); + + if( current_try_level != 0) + { + printf("%s:%d Error: waiting operation not allowed inside of " + "inside of a try block", + stmt->stmt_filename, stmt->stmt_lno); + return -1; + } + + for( curAdmInfo = stmt->script->adminfo_list; + curAdmInfo != NULL; curAdmInfo = curAdmInfo->next) + { + node_list_size++; + } + last_event_array = malloc(node_list_size * sizeof(int64)*2); + memset(last_event_array,0,sizeof(node_list_size * sizeof(int64)*2)); + + for( curAdmInfo = stmt->script->adminfo_list; + curAdmInfo != NULL; curAdmInfo = curAdmInfo->next) + { + if(curAdmInfo->last_event < 0 || + curAdmInfo->no_id==adminfo1->no_id || + curAdmInfo->no_id == ignore_node ) + continue; + + char seqno[64]; + sprintf(seqno,INT64_FORMAT,curAdmInfo->last_event); + slon_appendquery(&event_list, + "%s (node_list.no_id=%d)" + ,first_event ? " " : " OR " + ,curAdmInfo->no_id + ,seqno + ); + slon_appendquery(&node_list,"%s (%d) ", + first_event ? " " : ",", + curAdmInfo->no_id); + last_event_array[wait_count*2]=curAdmInfo->no_id; + last_event_array[wait_count*2+1]=curAdmInfo->last_event; + first_event=0; + wait_count++; + } + + + + dstring_init(&is_caughtup_query); + /** + * I need a row for the case where a node is not in sl_confirm + * and the node is disabled or deleted. + */ + slon_mkquery(&is_caughtup_query, + "select node_list.no_id,max(con_seqno),no_active FROM " + " (VALUES %s) as node_list (no_id) LEFT JOIN " + "\"_%s\".sl_confirm ON(sl_confirm.con_origin=node_list.no_id" + " AND sl_confirm.con_received=%d)" + " LEFT JOIN \"_%s\".sl_node ON (node_list.no_id=sl_node.no_id) " + "GROUP BY node_list.no_id,no_active" + ,dstring_data(&node_list) + ,stmt->script->clustername + ,adminfo1->no_id + ,stmt->script->clustername); + + while(confirm_count != wait_count) + { + result = db_exec_select(stmt, + adminfo1,&is_caughtup_query); + if (result == NULL) + { + /** + * error + */ + } + confirm_count = PQntuples(result); + + db_rollback_xact(stmt, adminfo1); + + /** + * find nodes that are missing. + * + */ + behind_nodes=malloc(node_list_size * sizeof(int64)); + memset(behind_nodes,0,node_list_size*sizeof(int64)); + confirm_count=0; + for(idx = 0; idx < PQntuples(result); idx++) + { + char * n_id_c = PQgetvalue(result,idx,0); + int n_id = atoi(n_id_c); + char * seqno_c = PQgetvalue(result,idx,1); + int64 seqno=strtoll(seqno_c,NULL,10); + char * node_active = PQgetvalue(result,idx,2); + for(cur_array_idx=0; + cur_array_idx < wait_count; cur_array_idx++) + { + if(last_event_array[cur_array_idx*2]==n_id) + { + /* + * found. + */ + if(node_active == NULL || *node_active=='f') + { + /** + * if node_active is null we assume the + * node has been deleted since it + * has no entry in sl_node + */ + behind_nodes[cur_array_idx]=-1; + confirm_count++; + } + else if(last_event_array[cur_array_idx*2+1]>seqno) + { + behind_nodes[cur_array_idx]=seqno; + } + else + { + behind_nodes[cur_array_idx]=-1; + confirm_count++; + } + + } + + } + }/*for .. PQntuples*/ + if(confirm_count < wait_count ) + { + sleep_count++; + if(sleep_count % 10 == 0) + { + /** + * any elements in caught_up_nodes with a value 0 + * means that the cooresponding node id in + * last_event_array is not showing up in the + * query result. + */ + SlonDString outstanding; + dstring_init(&outstanding); + first_event=1; + for(cur_array_idx=0; cur_array_idx < wait_count; + cur_array_idx++) + { + if(behind_nodes[cur_array_idx] >= 0) + { + char tmpbuf[96]; + sprintf(tmpbuf, "(" INT64_FORMAT "," INT64_FORMAT + ") only at (" INT64_FORMAT "," INT64_FORMAT + ")" + , + last_event_array[cur_array_idx*2] + ,last_event_array[cur_array_idx*2+1], + last_event_array[cur_array_idx*2], + behind_nodes[cur_array_idx] ); + slon_appendquery(&outstanding,"%s %s" + , first_event ? "" : ",",tmpbuf); + first_event=0; + } + + } + printf("waiting for events %s to be confirmed on node %d\n", + dstring_data(&outstanding),adminfo1->no_id); + dstring_terminate(&outstanding); + + }/* every 10 iterations */ + sleep(1); + } + free(behind_nodes); + + }/*while*/ + if(result != NULL) + PQclear(result); + dstring_terminate(&event_list); + dstring_terminate(&is_caughtup_query); + free(last_event_array); + return 0; + +} -} /* * Local Variables: -- 1.7.0.4