Update runSaltCommand command to support batch
- ability to manage batch size with SALT_MASTER_OPT_WORKER_THREADS
env variable even if batch not specified directly;
- contains parsing of local_batch client responses;
- Add batch option to most popular functions which lead to
runSaltCommand use batches;
Related: PROD-27850 (PROD:27850)
Related: PROD-32646 (PROD:32646)
Change-Id: I0d4a1195285c35503474097d7dafd38f5bbb9ac3
(cherry picked from commit f79769598651e2a1cda8ca836e1a244ea4b8f94d)
diff --git a/src/com/mirantis/mk/Salt.groovy b/src/com/mirantis/mk/Salt.groovy
index 0a9c78e..c31e6a8 100644
--- a/src/com/mirantis/mk/Salt.groovy
+++ b/src/com/mirantis/mk/Salt.groovy
@@ -1,6 +1,5 @@
package com.mirantis.mk
-import com.cloudbees.groovy.cps.NonCPS
import java.util.stream.Collectors
/**
* Salt functions
@@ -50,14 +49,16 @@
* data: ['expression': 'I@openssh:server', 'type': 'compound'])
* @param function Function to execute (eg. "state.sls")
* @param batch Batch param to salt (integer or string with percents)
+ * - null - automatic decision (based on number of worker threads env var or not use batch at all)
+ * - int - fixed size of batch
+ * - 'str%' - percantage of the requests in one batch
* @param args Additional arguments to function
* @param kwargs Additional key-value arguments to function
* @param timeout Additional argument salt api timeout
* @param read_timeout http session read timeout
*/
-@NonCPS
-def runSaltCommand(saltId, client, target, function, batch = null, args = null, kwargs = null, timeout = -1, read_timeout = -1) {
+def runSaltCommand(saltId, client, target, function, batch = null, args = null, kwargs = null, timeout = -1, read_timeout = -1) {
data = [
'tgt': target.expression,
'fun': function,
@@ -65,9 +66,14 @@
'expr_form': target.type,
]
- if(batch != null){
+ if (batch) {
batch = batch.toString()
- if( (batch.isInteger() && batch.toInteger() > 0) || (batch.contains("%"))){
+ } else if (env.getEnvironment().containsKey('SALT_MASTER_OPT_WORKER_THREADS')) {
+ batch = env['SALT_MASTER_OPT_WORKER_THREADS'].toString()
+ }
+
+ if (batch instanceof String) {
+ if ((batch.isInteger() && batch.toInteger() > 0) || (batch.matches(/(\d){1,2}%/))){
data['client']= "local_batch"
data['batch'] = batch
}
@@ -85,6 +91,7 @@
data['timeout'] = timeout
}
+ def result = [:]
// Command will be sent using HttpRequest
if (saltId instanceof HashMap && saltId.containsKey("authToken") ) {
@@ -93,13 +100,22 @@
]
def http = new com.mirantis.mk.Http()
- return http.sendHttpPostRequest("${saltId.url}/", data, headers, read_timeout)
+ result = http.sendHttpPostRequest("${saltId.url}/", data, headers, read_timeout)
} else if (saltId instanceof HashMap) {
throw new Exception("Invalid saltId")
+ } else {
+ // Command will be sent using Pepper
+ result = runPepperCommand(data, saltId)
}
- // Command will be sent using Pepper
- return runPepperCommand(data, saltId)
+ // Convert returned Object to the same structure as from 'local' client to keep compatibility
+ if (data['client'].equals('local_batch')) {
+ def resultMap = ['return': [[:]]]
+ result['return'].each { it -> resultMap['return'][0] = it + resultMap['return'][0] }
+ return resultMap
+ } else {
+ return result
+ }
}
/**
@@ -107,13 +123,14 @@
* @param saltId Salt Connection object or pepperEnv (the command will be sent using the selected method)
* @param target Get pillar target
* @param pillar pillar name (optional)
+ * @param batch Batch param to salt (integer or string with percents)
* @return output of salt command
*/
-def getPillar(saltId, target, pillar = null) {
+def getPillar(saltId, target, pillar = null, batch = null) {
if (pillar != null) {
- return runSaltCommand(saltId, 'local', ['expression': target, 'type': 'compound'], 'pillar.get', null, [pillar.replace('.', ':')])
+ return runSaltCommand(saltId, 'local', ['expression': target, 'type': 'compound'], 'pillar.get', batch, [pillar.replace('.', ':')])
} else {
- return runSaltCommand(saltId, 'local', ['expression': target, 'type': 'compound'], 'pillar.data')
+ return runSaltCommand(saltId, 'local', ['expression': target, 'type': 'compound'], 'pillar.data', batch)
}
}
@@ -122,13 +139,14 @@
* @param saltId Salt Connection object or pepperEnv (the command will be sent using the selected method)
* @param target Get grain target
* @param grain grain name (optional)
+ * @param batch Batch param to salt (integer or string with percents)
* @return output of salt command
*/
-def getGrain(saltId, target, grain = null) {
+def getGrain(saltId, target, grain = null, batch = null) {
if(grain != null) {
- return runSaltCommand(saltId, 'local', ['expression': target, 'type': 'compound'], 'grains.item', null, [grain])
+ return runSaltCommand(saltId, 'local', ['expression': target, 'type': 'compound'], 'grains.item', batch, [grain])
} else {
- return runSaltCommand(saltId, 'local', ['expression': target, 'type': 'compound'], 'grains.items')
+ return runSaltCommand(saltId, 'local', ['expression': target, 'type': 'compound'], 'grains.items', batch)
}
}
@@ -137,10 +155,11 @@
* @param saltId Salt Connection object or pepperEnv (the command will be sent using the selected method)
* @param target Get grain target
* @param config grain name (optional)
+ * @param batch Batch param to salt (integer or string with percents)
* @return output of salt command
*/
-def getConfig(saltId, target, config) {
- return runSaltCommand(saltId, 'local', ['expression': target, 'type': 'compound'], 'config.get', null, [config.replace('.', ':')], '--out=json')
+def getConfig(saltId, target, config, batch = null) {
+ return runSaltCommand(saltId, 'local', ['expression': target, 'type': 'compound'], 'config.get', batch, [config.replace('.', ':')], '--out=json')
}
/**
@@ -207,7 +226,7 @@
if (!params.testTargetMatcher) {
params.testTargetMatcher = params.target
}
- if (testTarget(params.saltId, params.testTargetMatcher)) {
+ if (testTarget(params.saltId, params.testTargetMatcher, params.batch)) {
return enforceState(params)
} else {
if (!params.optional) {
@@ -268,7 +287,7 @@
kwargs["queue"] = true
}
- if (params.optional == false || testTarget(params.saltId, params.target)){
+ if (params.optional == false || testTarget(params.saltId, params.target, params.batch)){
if (params.retries > 0){
def retriesCounter = 0
retry(params.retries){
@@ -426,7 +445,7 @@
* @return output of salt command
*/
def minionsPresent(saltId, target = 'I@salt:master', target_minions = '', waitUntilPresent = true, batch=null, output = true, maxRetries = 200, answers = 1) {
- def target_hosts = getMinionsSorted(saltId, target_minions)
+ def target_hosts = getMinionsSorted(saltId, target_minions, batch)
for (t in target_hosts) {
def tgt = stripDomainName(t)
minionPresent(saltId, target, tgt, waitUntilPresent, batch, output, maxRetries, answers)
@@ -517,11 +536,12 @@
def retries = config.get('retries', 10)
def timeout = config.get('timeout', 5)
def checkAvailability = config.get('availability', true)
+ def batch = config.get('batch', null)
common.retry(retries, wait) {
if (checkAvailability) {
- minionsReachable(saltId, 'I@salt:master', target_reachable)
+ minionsReachable(saltId, 'I@salt:master', target_reachable, batch)
}
- def running = runSaltProcessStep(saltId, target, 'saltutil.running', [], null, true, timeout)
+ def running = runSaltProcessStep(saltId, target, 'saltutil.running', [], batch, true, timeout)
for (value in running.get("return")[0].values()) {
if (value != []) {
throw new Exception("Not all salt-minions are ready for execution")
@@ -653,10 +673,11 @@
* Perform complete salt sync between master and target
* @param saltId Salt Connection object or pepperEnv (the command will be sent using the selected method)
* @param target Get pillar target
+ * @param batch Batch param to salt (integer or string with percents)
* @return output of salt command
*/
-def syncAll(saltId, target) {
- return runSaltCommand(saltId, 'local', ['expression': target, 'type': 'compound'], 'saltutil.sync_all')
+def syncAll(saltId, target, batch = null) {
+ return runSaltCommand(saltId, 'local', ['expression': target, 'type': 'compound'], 'saltutil.sync_all', batch)
}
/**
@@ -664,12 +685,13 @@
* Method will call saltutil.refresh_pillar, saltutil.refresh_grains and saltutil.sync_all
* @param saltId Salt Connection object or pepperEnv (the command will be sent using the selected method)
* @param target Get pillar target
+ * @param batch Batch param to salt (integer or string with percents)
* @return output of salt command
*/
-def fullRefresh(saltId, target){
- runSaltProcessStep(saltId, target, 'saltutil.refresh_pillar', [], null, true)
- runSaltProcessStep(saltId, target, 'saltutil.refresh_grains', [], null, true)
- runSaltProcessStep(saltId, target, 'saltutil.sync_all', [], null, true)
+def fullRefresh(saltId, target, batch=20){
+ runSaltProcessStep(saltId, target, 'saltutil.refresh_pillar', [], batch, true)
+ runSaltProcessStep(saltId, target, 'saltutil.refresh_grains', [], batch, true)
+ runSaltProcessStep(saltId, target, 'saltutil.sync_all', [], batch, true)
}
/**
@@ -710,10 +732,11 @@
* Get running minions IDs according to the target
* @param saltId Salt Connection object or pepperEnv (the command will be sent using the selected method)
* @param target Get minions target
+ * @param batch Batch param to salt (integer or string with percents)
* @return list of active minions fitin
*/
-def getMinions(saltId, target) {
- def minionsRaw = runSaltCommand(saltId, 'local', ['expression': target, 'type': 'compound'], 'test.ping')
+def getMinions(saltId, target, batch = null) {
+ def minionsRaw = runSaltCommand(saltId, 'local', ['expression': target, 'type': 'compound'], 'test.ping', batch)
return new ArrayList<String>(minionsRaw['return'][0].keySet())
}
@@ -721,20 +744,22 @@
* Get sorted running minions IDs according to the target
* @param saltId Salt Connection object or pepperEnv
* @param target Get minions target
+ * @param batch Batch param to salt (integer or string with percents)
* @return list of sorted active minions fitin
*/
-def getMinionsSorted(saltId, target) {
- return getMinions(saltId, target).sort()
+def getMinionsSorted(saltId, target, batch = null) {
+ return getMinions(saltId, target, batch).sort()
}
/**
* Get first out of running minions IDs according to the target
* @param saltId Salt Connection object or pepperEnv
* @param target Get minions target
+ * @param batch Batch param to salt (integer or string with percents)
* @return first of active minions fitin
*/
-def getFirstMinion(saltId, target) {
- def minionsSorted = getMinionsSorted(saltId, target)
+def getFirstMinion(saltId, target, batch = null) {
+ def minionsSorted = getMinionsSorted(saltId, target, batch)
return minionsSorted[0]
}
@@ -742,10 +767,11 @@
* Get running salt minions IDs without it's domain name part and its numbering identifications
* @param saltId Salt Connection object or pepperEnv
* @param target Get minions target
+ * @param batch Batch param to salt (integer or string with percents)
* @return list of active minions fitin without it's domain name part name numbering
*/
-def getMinionsGeneralName(saltId, target) {
- def minionsSorted = getMinionsSorted(saltId, target)
+def getMinionsGeneralName(saltId, target, batch = null) {
+ def minionsSorted = getMinionsSorted(saltId, target, batch)
return stripDomainName(minionsSorted[0]).replaceAll('\\d+$', "")
}
@@ -773,7 +799,7 @@
* @return Return values of a salt command
*/
def getReturnValues(output) {
- if(output.containsKey("return") && !output.get("return").isEmpty()) {
+ if(output && output.containsKey("return") && !output.get("return").isEmpty()) {
return output['return'][0].values()[0]
}
def common = new com.mirantis.mk.Common()
@@ -816,11 +842,12 @@
* Test if there are any minions to target
* @param saltId Salt Connection object or pepperEnv (the command will be sent using the selected method)
* @param target Target to test
+ * @param batch Batch param to salt (integer or string with percents)
* @return bool indicating if target was succesful
*/
-def testTarget(saltId, target) {
- return getMinions(saltId, target).size() > 0
+def testTarget(saltId, target, batch = null) {
+ return getMinions(saltId, target, batch).size() > 0
}
/**
@@ -829,10 +856,11 @@
* @param target Key generating target
* @param host Key generating host
* @param keysize generated key size (optional, default 4096)
+ * @param batch Batch param to salt (integer or string with percents)
* @return output of salt command
*/
-def generateNodeKey(saltId, target, host, keysize = 4096) {
- return runSaltCommand(saltId, 'wheel', target, 'key.gen_accept', [host], ['keysize': keysize])
+def generateNodeKey(saltId, target, host, keysize = 4096, batch = null) {
+ return runSaltCommand(saltId, 'wheel', target, 'key.gen_accept', batch, [host], ['keysize': keysize])
}
/**
@@ -842,10 +870,11 @@
* @param host Metadata generating host
* @param classes Reclass classes
* @param parameters Reclass parameters
+ * @param batch Batch param to salt (integer or string with percents)
* @return output of salt command
*/
-def generateNodeMetadata(saltId, target, host, classes, parameters) {
- return runSaltCommand(saltId, 'local', target, 'reclass.node_create', [host, '_generated'], ['classes': classes, 'parameters': parameters])
+def generateNodeMetadata(saltId, target, host, classes, parameters, batch = null) {
+ return runSaltCommand(saltId, 'local', target, 'reclass.node_create', batch, [host, '_generated'], ['classes': classes, 'parameters': parameters])
}
/**
@@ -854,14 +883,15 @@
* @param target Orchestration target
* @param orchestrate Salt orchestrate params
* @param kwargs Salt orchestrate params
+ * @param batch Batch param to salt (integer or string with percents)
* @return output of salt command
*/
-def orchestrateSystem(saltId, target, orchestrate=[], kwargs = null) {
+def orchestrateSystem(saltId, target, orchestrate=[], kwargs = null, batch = null) {
//Since the runSaltCommand uses "arg" (singular) for "runner" client this won`t work correctly on old salt 2016
//cause this version of salt used "args" (plural) for "runner" client, see following link for reference:
//https://github.com/saltstack/salt/pull/32938
def common = new com.mirantis.mk.Common()
- def result = runSaltCommand(saltId, 'runner', target, 'state.orchestrate', true, orchestrate, kwargs, 7200, 7200)
+ def result = runSaltCommand(saltId, 'runner', target, 'state.orchestrate', batch, orchestrate, kwargs, 7200, 7200)
if(result != null){
if(result['return']){
def retcode = result['return'][0].get('retcode')
@@ -884,21 +914,21 @@
* @param saltId Salt Connection object or pepperEnv (the command will be sent using the selected method)
* @param pillar_tree Reclass pillar that has orchestrate pillar for desired stage
* @param extra_tgt Extra targets for compound
- *
+ * @param batch Batch param to salt (integer or string with percents)
* @return output of salt command
*/
-def orchestratePrePost(saltId, pillar_tree, extra_tgt = '') {
+def orchestratePrePost(saltId, pillar_tree, extra_tgt = '', batch = null) {
def common = new com.mirantis.mk.Common()
def salt = new com.mirantis.mk.Salt()
def compound = 'I@' + pillar_tree + " " + extra_tgt
common.infoMsg("Refreshing pillars")
- runSaltProcessStep(saltId, '*', 'saltutil.refresh_pillar', [], null, true)
+ runSaltProcessStep(saltId, '*', 'saltutil.refresh_pillar', [], batch, true)
common.infoMsg("Looking for orchestrate pillars")
- if (salt.testTarget(saltId, compound)) {
- for ( node in salt.getMinionsSorted(saltId, compound) ) {
+ if (salt.testTarget(saltId, compound, batch)) {
+ for ( node in salt.getMinionsSorted(saltId, compound, batch) ) {
def pillar = salt.getPillar(saltId, node, pillar_tree)
if ( !pillar['return'].isEmpty() ) {
for ( orch_id in pillar['return'][0].values() ) {
@@ -906,7 +936,7 @@
def orch_enabled = orch_id.values()['enabled']
if ( orch_enabled ) {
common.infoMsg("Orchestrating: ${orchestrator}")
- salt.printSaltCommandResult(salt.orchestrateSystem(saltId, ['expression': node], [orchestrator]))
+ salt.printSaltCommandResult(salt.orchestrateSystem(saltId, ['expression': node], [orchestrator], null, batch))
}
}
}
@@ -932,11 +962,8 @@
def out
common.infoMsg("Running step ${fun} ${arg} on ${tgt}")
-
- if (batch == true) {
- out = runSaltCommand(saltId, 'local_batch', ['expression': tgt, 'type': 'compound'], fun, String.valueOf(batch), arg, kwargs, timeout)
- } else if (async == true) {
- out = runSaltCommand(saltId, 'local_async', ['expression': tgt, 'type': 'compound'], fun, batch, arg, kwargs, timeout)
+ if (async == true) {
+ out = runSaltCommand(saltId, 'local_async', ['expression': tgt, 'type': 'compound'], fun, null, arg, kwargs, timeout)
} else {
out = runSaltCommand(saltId, 'local', ['expression': tgt, 'type': 'compound'], fun, batch, arg, kwargs, timeout)
}
@@ -1232,10 +1259,11 @@
* @param saltId Salt Connection object or pepperEnv (the command will be sent using the selected method)
* @param target Targeted nodes to be checked
* @param diff Maximum time difference (in seconds) to be accepted during time sync check
+* @param batch Batch param to salt (integer or string with percents)
* @return bool Return true if time difference is <= diff and returns false if time difference is > diff
*/
-def checkClusterTimeSync(saltId, target) {
+def checkClusterTimeSync(saltId, target, batch = null) {
def common = new com.mirantis.mk.Common()
def salt = new com.mirantis.mk.Salt()
@@ -1247,7 +1275,7 @@
} else {
diff = 5
}
- out = salt.runSaltProcessStep(saltId, target, 'status.time', '%s')
+ out = salt.runSaltProcessStep(saltId, target, 'status.time', '%s', batch)
outParsed = out['return'][0]
def outKeySet = outParsed.keySet()
for (key in outKeySet) {