Support Salt batch sizing for most important pipelines

Add parameter BATCH_SIZE to support Batch sizing for Salt to be
applied on huge amount of nodes.

Change-Id: I1547df928990098a7969b5535ca611d7fb6cc581
Related: PROD-27850 (PROD:27850)
Related: PROD-32646 (PROD:32646)
diff --git a/cloud-deploy-pipeline.groovy b/cloud-deploy-pipeline.groovy
index 496ef90..4159655 100644
--- a/cloud-deploy-pipeline.groovy
+++ b/cloud-deploy-pipeline.groovy
@@ -46,6 +46,10 @@
  *   SALT_VERSION               Version of Salt  which is going to be installed i.e. 'stable 2016.3' or 'stable 2017.7' etc.
  *
  *   EXTRA_TARGET               The value will be added to target nodes
+ *   BATCH_SIZE                 Use batching for states, which may be targeted for huge amount of nodes. Format:
+                                - 10 - number of nodes
+                                - 10% - percentage of all targeted nodes
+
  *
  * Test settings:
  *   TEST_K8S_API_SERVER     Kubernetes API address
@@ -105,6 +109,10 @@
 if (common.validInputParam('EXTRA_TARGET')) {
     extra_tgt = "${EXTRA_TARGET}"
 }
+def batch_size = ''
+if (common.validInputParam('BATCH_SIZE')) {
+    batch_size = "${BATCH_SIZE}"
+}
 
 timeout(time: 12, unit: 'HOURS') {
     node(slave_node) {
@@ -347,9 +355,15 @@
             //
             // Install
             //
+            if (!batch_size) {
+                def workerThreads = salt.getReturnValues(salt.getPillar(venvPepper, "I@salt:master", "salt:master:worker_threads", null))
+                if (workerThreads.isInteger() && workerThreads.toInteger() > 0) {
+                   batch_size = workerThreads
+                }
+            }
 
             // Check if all minions are reachable and ready
-            salt.checkTargetMinionsReady(['saltId': venvPepper, 'target': '*'])
+            salt.checkTargetMinionsReady(['saltId': venvPepper, 'target': '*', batch: batch_size])
 
             if (common.checkContains('STACK_INSTALL', 'core')) {
                 stage('Install core infrastructure') {
@@ -357,7 +371,7 @@
                     if (common.validInputParam('STATIC_MGMT_NETWORK')) {
                         staticMgmtNetwork = STATIC_MGMT_NETWORK.toBoolean()
                     }
-                    orchestrate.installFoundationInfra(venvPepper, staticMgmtNetwork, extra_tgt)
+                    orchestrate.installFoundationInfra(venvPepper, staticMgmtNetwork, extra_tgt, batch_size)
 
                     if (common.checkContains('STACK_INSTALL', 'kvm')) {
                         orchestrate.installInfraKvm(venvPepper, extra_tgt)
@@ -540,7 +554,7 @@
                 }
 
                 stage('Install OpenStack compute') {
-                    orchestrate.installOpenstackCompute(venvPepper, extra_tgt)
+                    orchestrate.installOpenstackCompute(venvPepper, extra_tgt, batch_size)
 
                     if (common.checkContains('STACK_INSTALL', 'contrail')) {
                         orchestrate.installContrailCompute(venvPepper, extra_tgt)
@@ -660,7 +674,7 @@
                     def gluster_compound = "I@glusterfs:server ${extra_tgt}"
                     def salt_ca_compound = "I@salt:minion:ca:salt_master_ca ${extra_tgt}"
                     // Enforce highstate asynchronous only on the nodes which are not glusterfs servers
-                    salt.enforceHighstate(venvPepper, '* and not ' + gluster_compound + ' and not ' + salt_ca_compound)
+                    salt.enforceHighstate(venvPepper, '* and not ' + gluster_compound + ' and not ' + salt_ca_compound, batch_size)
                     // Iterate over nonempty set of gluster servers and apply highstates one by one
                     // TODO: switch to batch once salt 2017.7+ would be used
                     def saltcaMinions = salt.getMinionsSorted(venvPepper, salt_ca_compound)
diff --git a/openstack-compute-install.groovy b/openstack-compute-install.groovy
index 581168a..780beac 100644
--- a/openstack-compute-install.groovy
+++ b/openstack-compute-install.groovy
@@ -5,6 +5,7 @@
  *   SALT_MASTER_CREDENTIALS    Credentials to the Salt API.
  *   SALT_MASTER_URL            Full Salt API address [https://10.10.10.1:8000].
  *   TARGET_SERVERS             Salt compound target to match nodes to be updated [*, G@osfamily:debian].
+ *   BATCH_SIZE                 Use batching for large amount of target nodes
  *
 **/
 
@@ -18,6 +19,11 @@
 def command
 def commandKwargs
 
+def batch_size = ''
+if (common.validInputParam('BATCH_SIZE')) {
+    batch_size = "${BATCH_SIZE}"
+}
+
 timeout(time: 12, unit: 'HOURS') {
     node() {
         try {
@@ -43,76 +49,76 @@
                     common.infoMsg("First node %nodename% has trusty")
                     common.infoMsg("Assuming trusty on all cluster, running extra network states...")
                     common.infoMsg("Network iteration #1. Bonding")
-                    salt.enforceState(pepperEnv, targetLiveAll, 'linux.network', true)
+                    salt.enforceState(pepperEnv, targetLiveAll, 'linux.network', true, true, batch_size)
                     common.infoMsg("Network iteration #2. Vlan tagging and bridging")
-                    salt.enforceState(pepperEnv, targetLiveAll, 'linux.network', true)
+                    salt.enforceState(pepperEnv, targetLiveAll, 'linux.network', true, true, batch_size)
                 }
             }
 
             stage("Setup repositories") {
-                salt.enforceState(pepperEnv, targetLiveAll, 'linux.system.repo', true)
+                salt.enforceState(pepperEnv, targetLiveAll, 'linux.system.repo', true, true, batch_size)
             }
 
             stage("Upgrade packages") {
-                salt.runSaltProcessStep(pepperEnv, targetLiveAll, 'pkg.upgrade', [], null, true)
+                salt.runSaltProcessStep(pepperEnv, targetLiveAll, 'pkg.upgrade', [], batch_size, true)
             }
 
             stage("Update Hosts file") {
-                salt.enforceState(pepperEnv, "I@linux:system", 'linux.network.host', true)
+                salt.enforceState(pepperEnv, "I@linux:system", 'linux.network.host', true, true, batch_size)
             }
 
             stage("Setup networking") {
                 // Sync all of the modules from the salt master.
-                salt.syncAll(pepperEnv, targetLiveAll)
+                salt.syncAll(pepperEnv, targetLiveAll, batch_size)
 
                 // Apply state 'salt' to install python-psutil for network configuration without restarting salt-minion to avoid losing connection.
-                salt.runSaltProcessStep(pepperEnv, targetLiveAll, 'state.apply',  ['salt', 'exclude=[{\'id\': \'salt_minion_service\'}, {\'id\': \'salt_minion_service_restart\'}, {\'id\': \'salt_minion_sync_all\'}]'], null, true)
+                salt.runSaltProcessStep(pepperEnv, targetLiveAll, 'state.apply',  ['salt', 'exclude=[{\'id\': \'salt_minion_service\'}, {\'id\': \'salt_minion_service_restart\'}, {\'id\': \'salt_minion_sync_all\'}]'], batch_size, true)
 
                 // Restart salt-minion to take effect.
-                salt.runSaltProcessStep(pepperEnv, targetLiveAll, 'service.restart', ['salt-minion'], null, true, 10)
+                salt.runSaltProcessStep(pepperEnv, targetLiveAll, 'service.restart', ['salt-minion'], batch_size, true, 10)
 
                 // Configure networking excluding vhost0 interface.
-                salt.runSaltProcessStep(pepperEnv, targetLiveAll, 'state.apply',  ['linux.network', 'exclude=[{\'id\': \'linux_interface_vhost0\'}]'], null, true)
+                salt.runSaltProcessStep(pepperEnv, targetLiveAll, 'state.apply',  ['linux.network', 'exclude=[{\'id\': \'linux_interface_vhost0\'}]'], batch_size, true)
 
                 // Kill unnecessary processes ifup/ifdown which is stuck from previous state linux.network.
-                salt.runSaltProcessStep(pepperEnv, targetLiveAll, 'ps.pkill', ['ifup'], null, false)
-                salt.runSaltProcessStep(pepperEnv, targetLiveAll, 'ps.pkill', ['ifdown'], null, false)
+                salt.runSaltProcessStep(pepperEnv, targetLiveAll, 'ps.pkill', ['ifup'], batch_size, false)
+                salt.runSaltProcessStep(pepperEnv, targetLiveAll, 'ps.pkill', ['ifdown'], batch_size, false)
 
                 // Restart networking to bring UP all interfaces.
-                salt.runSaltProcessStep(pepperEnv, targetLiveAll, 'service.restart', ['networking'], null, true, 300)
+                salt.runSaltProcessStep(pepperEnv, targetLiveAll, 'service.restart', ['networking'], batch_size, true, 300)
             }
 
             stage("Highstate compute") {
                 // Execute highstate without state opencontrail.client.
                 common.retry(2){
-                    salt.runSaltProcessStep(pepperEnv, targetLiveAll, 'state.highstate', ['exclude=opencontrail.client'], null, true)
+                    salt.runSaltProcessStep(pepperEnv, targetLiveAll, 'state.highstate', ['exclude=opencontrail.client'], batch_size, true)
                 }
 
                 // Apply nova state to remove libvirt default bridge virbr0.
-                salt.enforceState(pepperEnv, targetLiveAll, 'nova', true)
+                salt.enforceState(pepperEnv, targetLiveAll, 'nova', true, true, batch_size)
 
                 // Execute highstate.
-                salt.enforceHighstate(pepperEnv, targetLiveAll, true)
+                salt.enforceHighstate(pepperEnv, targetLiveAll, true, true, batch_size)
 
                 // Restart supervisor-vrouter.
-                salt.runSaltProcessStep(pepperEnv, targetLiveAll, 'service.restart', ['supervisor-vrouter'], null, true, 300)
+                salt.runSaltProcessStep(pepperEnv, targetLiveAll, 'service.restart', ['supervisor-vrouter'], batch_size, true, 300)
 
                 // Apply salt and collectd if is present to update information about current network interfaces.
-                salt.enforceState(pepperEnv, targetLiveAll, 'salt', true)
+                salt.enforceState(pepperEnv, targetLiveAll, 'salt', true, true, batch_size)
                 if(!salt.getPillar(pepperEnv, minions[0], "collectd")['return'][0].values()[0].isEmpty()) {
-                    salt.enforceState(pepperEnv, targetLiveAll, 'collectd', true)
+                    salt.enforceState(pepperEnv, targetLiveAll, 'collectd', true, true, batch_size)
                 }
             }
 
         stage("Update/Install monitoring") {
             //Collect Grains
-            salt.enforceState(pepperEnv, targetLiveAll, 'salt.minion.grains')
-            salt.runSaltProcessStep(pepperEnv, targetLiveAll, 'saltutil.refresh_modules')
-            salt.runSaltProcessStep(pepperEnv, targetLiveAll, 'mine.update')
+            salt.enforceState(pepperEnv, targetLiveAll, 'salt.minion.grains', true, true, batch_size)
+            salt.runSaltProcessStep(pepperEnv, targetLiveAll, 'saltutil.refresh_modules', [], batch_size)
+            salt.runSaltProcessStep(pepperEnv, targetLiveAll, 'mine.update', [], batch_size)
             sleep(5)
 
-            salt.enforceState(pepperEnv, targetLiveAll, 'prometheus')
-            salt.enforceState(pepperEnv, 'I@prometheus:server', 'prometheus')
+            salt.enforceState(pepperEnv, targetLiveAll, 'prometheus', true, true, batch_size)
+            salt.enforceState(pepperEnv, 'I@prometheus:server', 'prometheus', true, true, batch_size)
         }
 
         } catch (Throwable e) {
diff --git a/update-package.groovy b/update-package.groovy
index df7655b..851c376 100644
--- a/update-package.groovy
+++ b/update-package.groovy
@@ -6,6 +6,7 @@
  *   SALT_MASTER_URL            Full Salt API address [https://10.10.10.1:8000].
  *   TARGET_SERVERS             Salt compound target to match nodes to be updated [*, G@osfamily:debian].
  *   TARGET_PACKAGES            Space delimited list of packages to be updates [package1=version package2=version], empty string means all updating all packages to the latest version.
+ *   BATCH_SIZE                 Use batching for large amount of target nodes
  *
 **/
 
@@ -13,14 +14,19 @@
 salt = new com.mirantis.mk.Salt()
 common = new com.mirantis.mk.Common()
 
-def installSaltStack(target, pkgs, masterUpdate = false){
+def batch_size = ''
+if (common.validInputParam('BATCH_SIZE')) {
+    batch_size = "${BATCH_SIZE}"
+}
+
+def installSaltStack(target, pkgs, batch, masterUpdate = false){
     salt.cmdRun(pepperEnv, "I@salt:master", "salt -C '${target}' --async pkg.install force_yes=True pkgs='$pkgs'")
     def minions_reachable = target
     if (masterUpdate) {
         // in case of update Salt Master packages - check all minions are good
         minions_reachable = '*'
     }
-    salt.checkTargetMinionsReady(['saltId': pepperEnv, 'target': target, 'target_reachable': minions_reachable])
+    salt.checkTargetMinionsReady(['saltId': pepperEnv, 'target': target, 'target_reachable': minions_reachable, 'batch': batch])
 }
 
 timeout(time: 12, unit: 'HOURS') {
@@ -46,7 +52,7 @@
 
             stage("List package upgrades") {
                 common.infoMsg("Listing all the packages that have a new update available on nodes: ${targetLiveAll}")
-                salt.runSaltProcessStep(pepperEnv, targetLiveAll, 'pkg.list_upgrades', [], null, true)
+                salt.runSaltProcessStep(pepperEnv, targetLiveAll, 'pkg.list_upgrades', [], batch_size, true)
                 if (TARGET_PACKAGES != '' && TARGET_PACKAGES != '*') {
                     common.warningMsg("Note that only the \"${TARGET_PACKAGES}\" would be installed from the above list of available updates on the ${targetLiveAll}")
                     command = "pkg.install"
@@ -68,9 +74,9 @@
                     for (int i = 0; i < saltTargets.size(); i++ ) {
                         common.retry(10, 5) {
                             if (salt.getMinions(pepperEnv, "I@salt:master and ${saltTargets[i]}")) {
-                                installSaltStack("I@salt:master and ${saltTargets[i]}", '["salt-master", "salt-common", "salt-api", "salt-minion"]', true)
+                                installSaltStack("I@salt:master and ${saltTargets[i]}", '["salt-master", "salt-common", "salt-api", "salt-minion"]', null, true)
                             } else if (salt.getMinions(pepperEnv, "I@salt:minion and not I@salt:master and ${saltTargets[i]}")) {
-                                installSaltStack("I@salt:minion and not I@salt:master and ${saltTargets[i]}", '["salt-minion"]')
+                                installSaltStack("I@salt:minion and not I@salt:master and ${saltTargets[i]}", '["salt-minion"]', batch_size)
                             } else {
                                 error("Minion ${saltTargets[i]} is not reachable!")
                             }
@@ -78,7 +84,7 @@
                     }
                 }
                 common.infoMsg('Starting package upgrades...')
-                out = salt.runSaltCommand(pepperEnv, 'local', ['expression': targetLiveAll, 'type': 'compound'], command, null, packages, commandKwargs)
+                out = salt.runSaltCommand(pepperEnv, 'local', ['expression': targetLiveAll, 'type': 'compound'], command, batch_size, packages, commandKwargs)
                 salt.printSaltCommandResult(out)
                 for(value in out.get("return")[0].values()){
                     if (value.containsKey('result') && value.result == false) {
diff --git a/upgrade-mcp-release.groovy b/upgrade-mcp-release.groovy
index 9b1c89d..32de91e 100644
--- a/upgrade-mcp-release.groovy
+++ b/upgrade-mcp-release.groovy
@@ -8,6 +8,7 @@
  *   DRIVE_TRAIN_PARAMS         Yaml, DriveTrain releated params:
  *     SALT_MASTER_URL            Salt API server location
  *     SALT_MASTER_CREDENTIALS    Credentials to the Salt API
+ *     BATCH_SIZE                 Use batch sizing during upgrade for large envs
  *     UPGRADE_SALTSTACK          Upgrade SaltStack packages to new version.
  *     UPDATE_CLUSTER_MODEL       Update MCP version parameter in cluster model
  *     UPDATE_PIPELINES           Update pipeline repositories on Gerrit
@@ -280,6 +281,7 @@
             def updateLocalRepos = ''
             def reclassSystemBranch = ''
             def reclassSystemBranchDefault = gitTargetMcpVersion
+            def batchSize = ''
             if (gitTargetMcpVersion != 'proposed') {
                 reclassSystemBranchDefault = "origin/${gitTargetMcpVersion}"
             }
@@ -293,6 +295,7 @@
                 updatePipelines = driteTrainParams.get('UPDATE_PIPELINES', false).toBoolean()
                 updateLocalRepos = driteTrainParams.get('UPDATE_LOCAL_REPOS', false).toBoolean()
                 reclassSystemBranch = driteTrainParams.get('RECLASS_SYSTEM_BRANCH', reclassSystemBranchDefault)
+                batchSize = driveTrainParams.get('BATCH_SIZE', '')
             } else {
                 // backward compatibility for 2018.11.0
                 saltMastURL = env.getProperty('SALT_MASTER_URL')
@@ -309,6 +312,12 @@
             if (cluster_name == '' || cluster_name == 'null' || cluster_name == null) {
                 error('Pillar data is broken for Salt master node! Please check it manually and re-run pipeline.')
             }
+            if (!batch_size) {
+                def workerThreads = salt.getReturnValues(salt.getPillar(venvPepper, "I@salt:master", "salt:master:worker_threads", null))
+                if (workerThreads.isInteger() && workerThreads.toInteger() > 0) {
+                   batch_size = workerThreads
+                }
+            }
 
             stage('Update Reclass and Salt-Formulas') {
                 common.infoMsg('Perform: Full salt sync')
@@ -512,7 +521,7 @@
                 if (upgradeSaltStack) {
                     updateSaltStack("I@salt:master", '["salt-master", "salt-common", "salt-api", "salt-minion"]')
 
-                    salt.enforceState(venvPepper, "I@linux:system", 'linux.system.repo', true)
+                    salt.enforceState(venvPepper, "I@linux:system", 'linux.system.repo', true, true, batchSize)
                     updateSaltStack("I@salt:minion and not I@salt:master", '["salt-minion"]')
                 }
 
@@ -523,13 +532,13 @@
                 }
 
                 // update minions certs
-                salt.enforceState(venvPepper, "I@salt:minion", 'salt.minion.cert', true)
+                salt.enforceState(venvPepper, "I@salt:minion", 'salt.minion.cert', true, true, batchSize)
 
                 // Retry needed only for rare race-condition in user appearance
                 common.infoMsg('Perform: updating users and keys')
-                salt.enforceState(venvPepper, "I@linux:system", 'linux.system.user', true)
+                salt.enforceState(venvPepper, "I@linux:system", 'linux.system.user', true, true, batchSize)
                 common.infoMsg('Perform: updating openssh')
-                salt.enforceState(venvPepper, "I@linux:system", 'openssh', true)
+                salt.enforceState(venvPepper, "I@linux:system", 'openssh', true, true, batchSize)
 
                 // Apply changes for HaProxy on CI/CD nodes
                 salt.enforceState(venvPepper, 'I@keepalived:cluster:instance:cicd_control_vip and I@haproxy:proxy', 'haproxy.proxy', true)