root/lib/pacemaker/pcmk_sched_ordering.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. invert_action
  2. get_ordering_type
  3. get_ordering_symmetry
  4. ordering_flags_for_kind
  5. get_ordering_resource
  6. get_minimum_first_instances
  7. clone_min_ordering
  8. inverse_ordering
  9. unpack_simple_rsc_order
  10. pcmk__new_ordering
  11. unpack_order_set
  12. order_rsc_sets
  13. unpack_order_tags
  14. pcmk__unpack_ordering
  15. ordering_is_invalid
  16. pcmk__disable_invalid_orderings
  17. pcmk__order_stops_before_shutdown
  18. find_actions_by_task
  19. order_resource_actions_after
  20. rsc_order_first
  21. pcmk__apply_orderings
  22. pcmk__order_after_each
  23. pcmk__promotable_restart_ordering

   1 /*
   2  * Copyright 2004-2022 the Pacemaker project contributors
   3  *
   4  * The version control history for this file may have further details.
   5  *
   6  * This source code is licensed under the GNU General Public License version 2
   7  * or later (GPLv2+) WITHOUT ANY WARRANTY.
   8  */
   9 
  10 #include <crm_internal.h>
  11 
  12 #include <inttypes.h>               // PRIx32
  13 #include <stdbool.h>
  14 #include <glib.h>
  15 
  16 #include <crm/crm.h>
  17 #include <pacemaker-internal.h>
  18 #include "libpacemaker_private.h"
  19 
  20 enum pe_order_kind {
  21     pe_order_kind_optional,
  22     pe_order_kind_mandatory,
  23     pe_order_kind_serialize,
  24 };
  25 
  26 enum ordering_symmetry {
  27     ordering_asymmetric,        // the only relation in an asymmetric ordering
  28     ordering_symmetric,         // the normal relation in a symmetric ordering
  29     ordering_symmetric_inverse, // the inverse relation in a symmetric ordering
  30 };
  31 
  32 #define EXPAND_CONSTRAINT_IDREF(__set, __rsc, __name) do {                      \
  33         __rsc = pcmk__find_constraint_resource(data_set->resources, __name);    \
  34         if (__rsc == NULL) {                                                    \
  35             pcmk__config_err("%s: No resource found for %s", __set, __name);    \
  36             return pcmk_rc_unpack_error;                                        \
  37         }                                                                       \
  38     } while (0)
  39 
  40 static const char *
  41 invert_action(const char *action)
     /* [previous][next][first][last][top][bottom][index][help] */
  42 {
  43     if (pcmk__str_eq(action, RSC_START, pcmk__str_casei)) {
  44         return RSC_STOP;
  45 
  46     } else if (pcmk__str_eq(action, RSC_STOP, pcmk__str_casei)) {
  47         return RSC_START;
  48 
  49     } else if (pcmk__str_eq(action, RSC_PROMOTE, pcmk__str_casei)) {
  50         return RSC_DEMOTE;
  51 
  52     } else if (pcmk__str_eq(action, RSC_DEMOTE, pcmk__str_casei)) {
  53         return RSC_PROMOTE;
  54 
  55     } else if (pcmk__str_eq(action, RSC_PROMOTED, pcmk__str_casei)) {
  56         return RSC_DEMOTED;
  57 
  58     } else if (pcmk__str_eq(action, RSC_DEMOTED, pcmk__str_casei)) {
  59         return RSC_PROMOTED;
  60 
  61     } else if (pcmk__str_eq(action, RSC_STARTED, pcmk__str_casei)) {
  62         return RSC_STOPPED;
  63 
  64     } else if (pcmk__str_eq(action, RSC_STOPPED, pcmk__str_casei)) {
  65         return RSC_STARTED;
  66     }
  67     crm_warn("Unknown action '%s' specified in order constraint", action);
  68     return NULL;
  69 }
  70 
  71 static enum pe_order_kind
  72 get_ordering_type(xmlNode *xml_obj)
     /* [previous][next][first][last][top][bottom][index][help] */
  73 {
  74     enum pe_order_kind kind_e = pe_order_kind_mandatory;
  75     const char *kind = crm_element_value(xml_obj, XML_ORDER_ATTR_KIND);
  76 
  77     if (kind == NULL) {
  78         const char *score = crm_element_value(xml_obj, XML_RULE_ATTR_SCORE);
  79 
  80         kind_e = pe_order_kind_mandatory;
  81 
  82         if (score) {
  83             // @COMPAT deprecated informally since 1.0.7, formally since 2.0.1
  84             int score_i = char2score(score);
  85 
  86             if (score_i == 0) {
  87                 kind_e = pe_order_kind_optional;
  88             }
  89             pe_warn_once(pe_wo_order_score,
  90                          "Support for 'score' in rsc_order is deprecated "
  91                          "and will be removed in a future release "
  92                          "(use 'kind' instead)");
  93         }
  94 
  95     } else if (pcmk__str_eq(kind, "Mandatory", pcmk__str_casei)) {
  96         kind_e = pe_order_kind_mandatory;
  97 
  98     } else if (pcmk__str_eq(kind, "Optional", pcmk__str_casei)) {
  99         kind_e = pe_order_kind_optional;
 100 
 101     } else if (pcmk__str_eq(kind, "Serialize", pcmk__str_casei)) {
 102         kind_e = pe_order_kind_serialize;
 103 
 104     } else {
 105         pcmk__config_err("Resetting '" XML_ORDER_ATTR_KIND "' for constraint "
 106                          "%s to 'Mandatory' because '%s' is not valid",
 107                          pcmk__s(ID(xml_obj), "missing ID"), kind);
 108     }
 109     return kind_e;
 110 }
 111 
 112 /*!
 113  * \internal
 114  * \brief Get ordering symmetry from XML
 115  *
 116  * \param[in] xml_obj               Ordering XML
 117  * \param[in] parent_kind           Default ordering kind
 118  * \param[in] parent_symmetrical_s  Parent element's symmetrical setting, if any
 119  *
 120  * \retval ordering_symmetric   Ordering is symmetric
 121  * \retval ordering_asymmetric  Ordering is asymmetric
 122  */
 123 static enum ordering_symmetry
 124 get_ordering_symmetry(xmlNode *xml_obj, enum pe_order_kind parent_kind,
     /* [previous][next][first][last][top][bottom][index][help] */
 125                       const char *parent_symmetrical_s)
 126 {
 127     int rc = pcmk_rc_ok;
 128     bool symmetric = false;
 129     enum pe_order_kind kind = parent_kind; // Default to parent's kind
 130 
 131     // Check ordering XML for explicit kind
 132     if ((crm_element_value(xml_obj, XML_ORDER_ATTR_KIND) != NULL)
 133         || (crm_element_value(xml_obj, XML_RULE_ATTR_SCORE) != NULL)) {
 134         kind = get_ordering_type(xml_obj);
 135     }
 136 
 137     // Check ordering XML (and parent) for explicit symmetrical setting
 138     rc = pcmk__xe_get_bool_attr(xml_obj, XML_CONS_ATTR_SYMMETRICAL, &symmetric);
 139 
 140     if (rc != pcmk_rc_ok && parent_symmetrical_s != NULL) {
 141         symmetric = crm_is_true(parent_symmetrical_s);
 142         rc = pcmk_rc_ok;
 143     }
 144 
 145     if (rc == pcmk_rc_ok) {
 146         if (symmetric) {
 147             if (kind == pe_order_kind_serialize) {
 148                 pcmk__config_warn("Ignoring " XML_CONS_ATTR_SYMMETRICAL
 149                                   " for '%s' because not valid with "
 150                                   XML_ORDER_ATTR_KIND " of 'Serialize'",
 151                                   ID(xml_obj));
 152             } else {
 153                 return ordering_symmetric;
 154             }
 155         }
 156         return ordering_asymmetric;
 157     }
 158 
 159     // Use default symmetry
 160     if (kind == pe_order_kind_serialize) {
 161         return ordering_asymmetric;
 162     }
 163     return ordering_symmetric;
 164 }
 165 
 166 /*!
 167  * \internal
 168  * \brief Get ordering flags appropriate to ordering kind
 169  *
 170  * \param[in] kind      Ordering kind
 171  * \param[in] first     Action name for 'first' action
 172  * \param[in] symmetry  This ordering's symmetry role
 173  *
 174  * \return Minimal ordering flags appropriate to \p kind
 175  */
 176 static uint32_t
 177 ordering_flags_for_kind(enum pe_order_kind kind, const char *first,
     /* [previous][next][first][last][top][bottom][index][help] */
 178                         enum ordering_symmetry symmetry)
 179 {
 180     uint32_t flags = pe_order_none; // so we trace-log all flags set
 181 
 182     pe__set_order_flags(flags, pe_order_optional);
 183 
 184     switch (kind) {
 185         case pe_order_kind_optional:
 186             break;
 187 
 188         case pe_order_kind_serialize:
 189             pe__set_order_flags(flags, pe_order_serialize_only);
 190             break;
 191 
 192         case pe_order_kind_mandatory:
 193             switch (symmetry) {
 194                 case ordering_asymmetric:
 195                     pe__set_order_flags(flags, pe_order_asymmetrical);
 196                     break;
 197 
 198                 case ordering_symmetric:
 199                     pe__set_order_flags(flags, pe_order_implies_then);
 200                     if (pcmk__strcase_any_of(first, RSC_START, RSC_PROMOTE,
 201                                              NULL)) {
 202                         pe__set_order_flags(flags, pe_order_runnable_left);
 203                     }
 204                     break;
 205 
 206                 case ordering_symmetric_inverse:
 207                     pe__set_order_flags(flags, pe_order_implies_first);
 208                     break;
 209             }
 210             break;
 211     }
 212     return flags;
 213 }
 214 
 215 /*!
 216  * \internal
 217  * \brief Find resource corresponding to ID specified in ordering
 218  *
 219  * \param[in] xml            Ordering XML
 220  * \param[in] resource_attr  XML attribute name for resource ID
 221  * \param[in] instance_attr  XML attribute name for instance number.
 222  *                           This option is deprecated and will be removed in a
 223  *                           future release.
 224  * \param[in] data_set       Cluster working set
 225  *
 226  * \return Resource corresponding to \p id, or NULL if none
 227  */
 228 static pe_resource_t *
 229 get_ordering_resource(xmlNode *xml, const char *resource_attr,
     /* [previous][next][first][last][top][bottom][index][help] */
 230                       const char *instance_attr, pe_working_set_t *data_set)
 231 {
 232     // @COMPAT: instance_attr and instance_id variables deprecated since 2.1.5
 233     pe_resource_t *rsc = NULL;
 234     const char *rsc_id = crm_element_value(xml, resource_attr);
 235     const char *instance_id = crm_element_value(xml, instance_attr);
 236 
 237     if (rsc_id == NULL) {
 238         pcmk__config_err("Ignoring constraint '%s' without %s",
 239                          ID(xml), resource_attr);
 240         return NULL;
 241     }
 242 
 243     rsc = pcmk__find_constraint_resource(data_set->resources, rsc_id);
 244     if (rsc == NULL) {
 245         pcmk__config_err("Ignoring constraint '%s' because resource '%s' "
 246                          "does not exist", ID(xml), rsc_id);
 247         return NULL;
 248     }
 249 
 250     if (instance_id != NULL) {
 251         pe_warn_once(pe_wo_order_inst,
 252                      "Support for " XML_ORDER_ATTR_FIRST_INSTANCE " and "
 253                      XML_ORDER_ATTR_THEN_INSTANCE " is deprecated and will be "
 254                      "removed in a future release.");
 255 
 256         if (!pe_rsc_is_clone(rsc)) {
 257             pcmk__config_err("Ignoring constraint '%s' because resource '%s' "
 258                              "is not a clone but instance '%s' was requested",
 259                              ID(xml), rsc_id, instance_id);
 260             return NULL;
 261         }
 262         rsc = find_clone_instance(rsc, instance_id, data_set);
 263         if (rsc == NULL) {
 264             pcmk__config_err("Ignoring constraint '%s' because resource '%s' "
 265                              "does not have an instance '%s'",
 266                              "'%s'", ID(xml), rsc_id, instance_id);
 267             return NULL;
 268         }
 269     }
 270     return rsc;
 271 }
 272 
 273 /*!
 274  * \internal
 275  * \brief Determine minimum number of 'first' instances required in ordering
 276  *
 277  * \param[in] rsc  'First' resource in ordering
 278  * \param[in] xml  Ordering XML
 279  *
 280  * \return Minimum 'first' instances required (or 0 if not applicable)
 281  */
 282 static int
 283 get_minimum_first_instances(pe_resource_t *rsc, xmlNode *xml)
     /* [previous][next][first][last][top][bottom][index][help] */
 284 {
 285     const char *clone_min = NULL;
 286     bool require_all = false;
 287 
 288     if (!pe_rsc_is_clone(rsc)) {
 289         return 0;
 290     }
 291 
 292     clone_min = g_hash_table_lookup(rsc->meta,
 293                                     XML_RSC_ATTR_INCARNATION_MIN);
 294     if (clone_min != NULL) {
 295         int clone_min_int = 0;
 296 
 297         pcmk__scan_min_int(clone_min, &clone_min_int, 0);
 298         return clone_min_int;
 299     }
 300 
 301     /* @COMPAT 1.1.13:
 302      * require-all=false is deprecated equivalent of clone-min=1
 303      */
 304     if (pcmk__xe_get_bool_attr(xml, "require-all", &require_all) != ENODATA) {
 305         pe_warn_once(pe_wo_require_all,
 306                      "Support for require-all in ordering constraints "
 307                      "is deprecated and will be removed in a future release"
 308                      " (use clone-min clone meta-attribute instead)");
 309         if (!require_all) {
 310             return 1;
 311         }
 312     }
 313 
 314     return 0;
 315 }
 316 
 317 /*!
 318  * \internal
 319  * \brief Create orderings for a constraint with clone-min > 0
 320  *
 321  * \param[in] id            Ordering ID
 322  * \param[in] rsc_first     'First' resource in ordering (a clone)
 323  * \param[in] action_first  'First' action in ordering
 324  * \param[in] rsc_then      'Then' resource in ordering
 325  * \param[in] action_then   'Then' action in ordering
 326  * \param[in] flags         Ordering flags
 327  * \param[in] clone_min     Minimum required instances of 'first'
 328  * \param[in] data_set      Cluster working set
 329  */
 330 static void
 331 clone_min_ordering(const char *id,
     /* [previous][next][first][last][top][bottom][index][help] */
 332                    pe_resource_t *rsc_first, const char *action_first,
 333                    pe_resource_t *rsc_then, const char *action_then,
 334                    uint32_t flags, int clone_min, pe_working_set_t *data_set)
 335 {
 336     // Create a pseudo-action for when the minimum instances are active
 337     char *task = crm_strdup_printf(CRM_OP_RELAXED_CLONE ":%s", id);
 338     pe_action_t *clone_min_met = get_pseudo_op(task, data_set);
 339 
 340     free(task);
 341 
 342     /* Require the pseudo-action to have the required number of actions to be
 343      * considered runnable before allowing the pseudo-action to be runnable.
 344      */
 345     clone_min_met->required_runnable_before = clone_min;
 346     pe__set_action_flags(clone_min_met, pe_action_requires_any);
 347 
 348     // Order the actions for each clone instance before the pseudo-action
 349     for (GList *rIter = rsc_first->children; rIter != NULL;
 350          rIter = rIter->next) {
 351 
 352         pe_resource_t *child = rIter->data;
 353 
 354         pcmk__new_ordering(child, pcmk__op_key(child->id, action_first, 0),
 355                            NULL, NULL, NULL, clone_min_met,
 356                            pe_order_one_or_more|pe_order_implies_then_printed,
 357                            data_set);
 358     }
 359 
 360     // Order "then" action after the pseudo-action (if runnable)
 361     pcmk__new_ordering(NULL, NULL, clone_min_met, rsc_then,
 362                        pcmk__op_key(rsc_then->id, action_then, 0),
 363                        NULL, flags|pe_order_runnable_left, data_set);
 364 }
 365 
 366 /*!
 367  * \internal
 368  * \brief Update ordering flags for restart-type=restart
 369  *
 370  * \param[in]  rsc    'Then' resource in ordering
 371  * \param[in]  kind   Ordering kind
 372  * \param[in]  flag   Ordering flag to set (when applicable)
 373  * \param[out] flags  Ordering flag set to update
 374  *
 375  * \compat The restart-type resource meta-attribute is deprecated. Eventually,
 376  *         it will be removed, and pe_restart_ignore will be the only behavior,
 377  *         at which time this can just be removed entirely.
 378  */
 379 #define handle_restart_type(rsc, kind, flag, flags) do {        \
 380         if (((kind) == pe_order_kind_optional)                  \
 381             && ((rsc)->restart_type == pe_restart_restart)) {   \
 382             pe__set_order_flags((flags), (flag));               \
 383         }                                                       \
 384     } while (0)
 385 
 386 /*!
 387  * \internal
 388  * \brief Create new ordering for inverse of symmetric constraint
 389  *
 390  * \param[in] id            Ordering ID (for logging only)
 391  * \param[in] kind          Ordering kind
 392  * \param[in] rsc_first     'First' resource in ordering (a clone)
 393  * \param[in] action_first  'First' action in ordering
 394  * \param[in] rsc_then      'Then' resource in ordering
 395  * \param[in] action_then   'Then' action in ordering
 396  * \param[in] data_set      Cluster working set
 397  */
 398 static void
 399 inverse_ordering(const char *id, enum pe_order_kind kind,
     /* [previous][next][first][last][top][bottom][index][help] */
 400                  pe_resource_t *rsc_first, const char *action_first,
 401                  pe_resource_t *rsc_then, const char *action_then,
 402                  pe_working_set_t *data_set)
 403 {
 404     action_then = invert_action(action_then);
 405     action_first = invert_action(action_first);
 406     if ((action_then == NULL) || (action_first == NULL)) {
 407         pcmk__config_warn("Cannot invert constraint '%s' "
 408                           "(please specify inverse manually)", id);
 409     } else {
 410         uint32_t flags = ordering_flags_for_kind(kind, action_first,
 411                                                  ordering_symmetric_inverse);
 412 
 413         handle_restart_type(rsc_then, kind, pe_order_implies_first, flags);
 414         pcmk__order_resource_actions(rsc_then, action_then, rsc_first,
 415                                      action_first, flags);
 416     }
 417 }
 418 
 419 static void
 420 unpack_simple_rsc_order(xmlNode *xml_obj, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 421 {
 422     pe_resource_t *rsc_then = NULL;
 423     pe_resource_t *rsc_first = NULL;
 424     int min_required_before = 0;
 425     enum pe_order_kind kind = pe_order_kind_mandatory;
 426     uint32_t cons_weight = pe_order_none;
 427     enum ordering_symmetry symmetry;
 428 
 429     const char *action_then = NULL;
 430     const char *action_first = NULL;
 431     const char *id = NULL;
 432 
 433     CRM_CHECK(xml_obj != NULL, return);
 434 
 435     id = crm_element_value(xml_obj, XML_ATTR_ID);
 436     if (id == NULL) {
 437         pcmk__config_err("Ignoring <%s> constraint without " XML_ATTR_ID,
 438                          crm_element_name(xml_obj));
 439         return;
 440     }
 441 
 442     rsc_first = get_ordering_resource(xml_obj, XML_ORDER_ATTR_FIRST,
 443                                       XML_ORDER_ATTR_FIRST_INSTANCE,
 444                                       data_set);
 445     if (rsc_first == NULL) {
 446         return;
 447     }
 448 
 449     rsc_then = get_ordering_resource(xml_obj, XML_ORDER_ATTR_THEN,
 450                                      XML_ORDER_ATTR_THEN_INSTANCE,
 451                                      data_set);
 452     if (rsc_then == NULL) {
 453         return;
 454     }
 455 
 456     action_first = crm_element_value(xml_obj, XML_ORDER_ATTR_FIRST_ACTION);
 457     if (action_first == NULL) {
 458         action_first = RSC_START;
 459     }
 460 
 461     action_then = crm_element_value(xml_obj, XML_ORDER_ATTR_THEN_ACTION);
 462     if (action_then == NULL) {
 463         action_then = action_first;
 464     }
 465 
 466     kind = get_ordering_type(xml_obj);
 467 
 468     symmetry = get_ordering_symmetry(xml_obj, kind, NULL);
 469     cons_weight = ordering_flags_for_kind(kind, action_first, symmetry);
 470 
 471     handle_restart_type(rsc_then, kind, pe_order_implies_then, cons_weight);
 472 
 473     /* If there is a minimum number of instances that must be runnable before
 474      * the 'then' action is runnable, we use a pseudo-action for convenience:
 475      * minimum number of clone instances have runnable actions ->
 476      * pseudo-action is runnable -> dependency is runnable.
 477      */
 478     min_required_before = get_minimum_first_instances(rsc_first, xml_obj);
 479     if (min_required_before > 0) {
 480         clone_min_ordering(id, rsc_first, action_first, rsc_then, action_then,
 481                            cons_weight, min_required_before, data_set);
 482     } else {
 483         pcmk__order_resource_actions(rsc_first, action_first, rsc_then,
 484                                      action_then, cons_weight);
 485     }
 486 
 487     if (symmetry == ordering_symmetric) {
 488         inverse_ordering(id, kind, rsc_first, action_first,
 489                          rsc_then, action_then, data_set);
 490     }
 491 }
 492 
 493 /*!
 494  * \internal
 495  * \brief Create a new ordering between two actions
 496  *
 497  * \param[in] first_rsc          Resource for 'first' action (if NULL and
 498  *                               \p first_action is a resource action, that
 499  *                               resource will be used)
 500  * \param[in] first_action_task  Action key for 'first' action (if NULL and
 501  *                               \p first_action is not NULL, its UUID will be
 502  *                               used)
 503  * \param[in] first_action       'first' action (if NULL, \p first_rsc and
 504  *                               \p first_action_task must be set)
 505  *
 506  * \param[in] then_rsc           Resource for 'then' action (if NULL and
 507  *                               \p then_action is a resource action, that
 508  *                               resource will be used)
 509  * \param[in] then_action_task   Action key for 'then' action (if NULL and
 510  *                               \p then_action is not NULL, its UUID will be
 511  *                               used)
 512  * \param[in] then_action        'then' action (if NULL, \p then_rsc and
 513  *                               \p then_action_task must be set)
 514  *
 515  * \param[in] type               Flag set of enum pe_ordering
 516  * \param[in] data_set           Cluster working set to add ordering to
 517  *
 518  * \note This function takes ownership of first_action_task and
 519  *       then_action_task, which do not need to be freed by the caller.
 520  */
 521 void
 522 pcmk__new_ordering(pe_resource_t *first_rsc, char *first_action_task,
     /* [previous][next][first][last][top][bottom][index][help] */
 523                    pe_action_t *first_action, pe_resource_t *then_rsc,
 524                    char *then_action_task, pe_action_t *then_action,
 525                    uint32_t flags, pe_working_set_t *data_set)
 526 {
 527     pe__ordering_t *order = NULL;
 528 
 529     // One of action or resource must be specified for each side
 530     CRM_CHECK(((first_action != NULL) || (first_rsc != NULL))
 531               && ((then_action != NULL) || (then_rsc != NULL)),
 532               free(first_action_task); free(then_action_task); return);
 533 
 534     if ((first_rsc == NULL) && (first_action != NULL)) {
 535         first_rsc = first_action->rsc;
 536     }
 537     if ((then_rsc == NULL) && (then_action != NULL)) {
 538         then_rsc = then_action->rsc;
 539     }
 540 
 541     order = calloc(1, sizeof(pe__ordering_t));
 542     CRM_ASSERT(order != NULL);
 543 
 544     order->id = data_set->order_id++;
 545     order->flags = flags;
 546     order->lh_rsc = first_rsc;
 547     order->rh_rsc = then_rsc;
 548     order->lh_action = first_action;
 549     order->rh_action = then_action;
 550     order->lh_action_task = first_action_task;
 551     order->rh_action_task = then_action_task;
 552 
 553     if ((order->lh_action_task == NULL) && (first_action != NULL)) {
 554         order->lh_action_task = strdup(first_action->uuid);
 555     }
 556 
 557     if ((order->rh_action_task == NULL) && (then_action != NULL)) {
 558         order->rh_action_task = strdup(then_action->uuid);
 559     }
 560 
 561     if ((order->lh_rsc == NULL) && (first_action != NULL)) {
 562         order->lh_rsc = first_action->rsc;
 563     }
 564 
 565     if ((order->rh_rsc == NULL) && (then_action != NULL)) {
 566         order->rh_rsc = then_action->rsc;
 567     }
 568 
 569     pe_rsc_trace(first_rsc, "Created ordering %d for %s then %s",
 570                  (data_set->order_id - 1),
 571                  ((first_action_task == NULL)? "?" : first_action_task),
 572                  ((then_action_task == NULL)? "?" : then_action_task));
 573 
 574     data_set->ordering_constraints = g_list_prepend(data_set->ordering_constraints,
 575                                                     order);
 576     pcmk__order_migration_equivalents(order);
 577 }
 578 
 579 /*!
 580  * \brief Unpack a set in an ordering constraint
 581  *
 582  * \param[in]  set                    Set XML to unpack
 583  * \param[in]  parent_kind            rsc_order XML "kind" attribute
 584  * \param[in]  parent_symmetrical_s   rsc_order XML "symmetrical" attribute
 585  * \param[in]  data_set               Cluster working set
 586  *
 587  * \return Standard Pacemaker return code
 588  */
 589 static int
 590 unpack_order_set(xmlNode *set, enum pe_order_kind parent_kind,
     /* [previous][next][first][last][top][bottom][index][help] */
 591                  const char *parent_symmetrical_s, pe_working_set_t *data_set)
 592 {
 593     xmlNode *xml_rsc = NULL;
 594     GList *set_iter = NULL;
 595     GList *resources = NULL;
 596 
 597     pe_resource_t *last = NULL;
 598     pe_resource_t *resource = NULL;
 599 
 600     int local_kind = parent_kind;
 601     bool sequential = false;
 602     uint32_t flags = pe_order_optional;
 603     enum ordering_symmetry symmetry;
 604 
 605     char *key = NULL;
 606     const char *id = ID(set);
 607     const char *action = crm_element_value(set, "action");
 608     const char *sequential_s = crm_element_value(set, "sequential");
 609     const char *kind_s = crm_element_value(set, XML_ORDER_ATTR_KIND);
 610 
 611     if (action == NULL) {
 612         action = RSC_START;
 613     }
 614 
 615     if (kind_s) {
 616         local_kind = get_ordering_type(set);
 617     }
 618     if (sequential_s == NULL) {
 619         sequential_s = "1";
 620     }
 621 
 622     sequential = crm_is_true(sequential_s);
 623 
 624     symmetry = get_ordering_symmetry(set, parent_kind, parent_symmetrical_s);
 625     flags = ordering_flags_for_kind(local_kind, action, symmetry);
 626 
 627     for (xml_rsc = first_named_child(set, XML_TAG_RESOURCE_REF);
 628          xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
 629 
 630         EXPAND_CONSTRAINT_IDREF(id, resource, ID(xml_rsc));
 631         resources = g_list_append(resources, resource);
 632     }
 633 
 634     if (pcmk__list_of_1(resources)) {
 635         crm_trace("Single set: %s", id);
 636         goto done;
 637     }
 638 
 639     set_iter = resources;
 640     while (set_iter != NULL) {
 641         resource = (pe_resource_t *) set_iter->data;
 642         set_iter = set_iter->next;
 643 
 644         key = pcmk__op_key(resource->id, action, 0);
 645 
 646         if (local_kind == pe_order_kind_serialize) {
 647             /* Serialize before everything that comes after */
 648 
 649             for (GList *gIter = set_iter; gIter != NULL; gIter = gIter->next) {
 650                 pe_resource_t *then_rsc = (pe_resource_t *) gIter->data;
 651                 char *then_key = pcmk__op_key(then_rsc->id, action, 0);
 652 
 653                 pcmk__new_ordering(resource, strdup(key), NULL, then_rsc,
 654                                    then_key, NULL, flags, data_set);
 655             }
 656 
 657         } else if (sequential) {
 658             if (last != NULL) {
 659                 pcmk__order_resource_actions(last, action, resource, action,
 660                                              flags);
 661             }
 662             last = resource;
 663         }
 664         free(key);
 665     }
 666 
 667     if (symmetry == ordering_asymmetric) {
 668         goto done;
 669     }
 670 
 671     last = NULL;
 672     action = invert_action(action);
 673 
 674     flags = ordering_flags_for_kind(local_kind, action,
 675                                     ordering_symmetric_inverse);
 676 
 677     set_iter = resources;
 678     while (set_iter != NULL) {
 679         resource = (pe_resource_t *) set_iter->data;
 680         set_iter = set_iter->next;
 681 
 682         if (sequential) {
 683             if (last != NULL) {
 684                 pcmk__order_resource_actions(resource, action, last, action,
 685                                              flags);
 686             }
 687             last = resource;
 688         }
 689     }
 690 
 691   done:
 692     g_list_free(resources);
 693     return pcmk_rc_ok;
 694 }
 695 
 696 /*!
 697  * \brief Order two resource sets relative to each other
 698  *
 699  * \param[in] id        Ordering ID (for logging)
 700  * \param[in] set1      First listed set
 701  * \param[in] set2      Second listed set
 702  * \param[in] kind      Ordering kind
 703  * \param[in] data_set  Cluster working set
 704  * \param[in] symmetry  Which ordering symmetry applies to this relation
 705  *
 706  * \return Standard Pacemaker return code
 707  */
 708 static int
 709 order_rsc_sets(const char *id, xmlNode *set1, xmlNode *set2,
     /* [previous][next][first][last][top][bottom][index][help] */
 710                enum pe_order_kind kind, pe_working_set_t *data_set,
 711                enum ordering_symmetry symmetry)
 712 {
 713 
 714     xmlNode *xml_rsc = NULL;
 715     xmlNode *xml_rsc_2 = NULL;
 716 
 717     pe_resource_t *rsc_1 = NULL;
 718     pe_resource_t *rsc_2 = NULL;
 719 
 720     const char *action_1 = crm_element_value(set1, "action");
 721     const char *action_2 = crm_element_value(set2, "action");
 722 
 723     uint32_t flags = pe_order_none;
 724 
 725     bool require_all = true;
 726 
 727     (void) pcmk__xe_get_bool_attr(set1, "require-all", &require_all);
 728 
 729     if (action_1 == NULL) {
 730         action_1 = RSC_START;
 731     }
 732 
 733     if (action_2 == NULL) {
 734         action_2 = RSC_START;
 735     }
 736 
 737     if (symmetry == ordering_symmetric_inverse) {
 738         action_1 = invert_action(action_1);
 739         action_2 = invert_action(action_2);
 740     }
 741 
 742     if (pcmk__str_eq(RSC_STOP, action_1, pcmk__str_casei)
 743         || pcmk__str_eq(RSC_DEMOTE, action_1, pcmk__str_casei)) {
 744         /* Assuming: A -> ( B || C) -> D
 745          * The one-or-more logic only applies during the start/promote phase.
 746          * During shutdown neither B nor can shutdown until D is down, so simply
 747          * turn require_all back on.
 748          */
 749         require_all = true;
 750     }
 751 
 752     // @TODO is action_2 correct here?
 753     flags = ordering_flags_for_kind(kind, action_2, symmetry);
 754 
 755     /* If we have an unordered set1, whether it is sequential or not is
 756      * irrelevant in regards to set2.
 757      */
 758     if (!require_all) {
 759         char *task = crm_strdup_printf(CRM_OP_RELAXED_SET ":%s", ID(set1));
 760         pe_action_t *unordered_action = get_pseudo_op(task, data_set);
 761 
 762         free(task);
 763         pe__set_action_flags(unordered_action, pe_action_requires_any);
 764 
 765         for (xml_rsc = first_named_child(set1, XML_TAG_RESOURCE_REF);
 766              xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
 767 
 768             EXPAND_CONSTRAINT_IDREF(id, rsc_1, ID(xml_rsc));
 769 
 770             /* Add an ordering constraint between every element in set1 and the
 771              * pseudo action. If any action in set1 is runnable the pseudo
 772              * action will be runnable.
 773              */
 774             pcmk__new_ordering(rsc_1, pcmk__op_key(rsc_1->id, action_1, 0),
 775                                NULL, NULL, NULL, unordered_action,
 776                                pe_order_one_or_more|pe_order_implies_then_printed,
 777                                data_set);
 778         }
 779         for (xml_rsc_2 = first_named_child(set2, XML_TAG_RESOURCE_REF);
 780              xml_rsc_2 != NULL; xml_rsc_2 = crm_next_same_xml(xml_rsc_2)) {
 781 
 782             EXPAND_CONSTRAINT_IDREF(id, rsc_2, ID(xml_rsc_2));
 783 
 784             /* Add an ordering constraint between the pseudo-action and every
 785              * element in set2. If the pseudo-action is runnable, every action
 786              * in set2 will be runnable.
 787              */
 788             pcmk__new_ordering(NULL, NULL, unordered_action,
 789                                rsc_2, pcmk__op_key(rsc_2->id, action_2, 0),
 790                                NULL, flags|pe_order_runnable_left, data_set);
 791         }
 792 
 793         return pcmk_rc_ok;
 794     }
 795 
 796     if (pcmk__xe_attr_is_true(set1, "sequential")) {
 797         if (symmetry == ordering_symmetric_inverse) {
 798             // Get the first one
 799             xml_rsc = first_named_child(set1, XML_TAG_RESOURCE_REF);
 800             if (xml_rsc != NULL) {
 801                 EXPAND_CONSTRAINT_IDREF(id, rsc_1, ID(xml_rsc));
 802             }
 803 
 804         } else {
 805             // Get the last one
 806             const char *rid = NULL;
 807 
 808             for (xml_rsc = first_named_child(set1, XML_TAG_RESOURCE_REF);
 809                  xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
 810 
 811                 rid = ID(xml_rsc);
 812             }
 813             EXPAND_CONSTRAINT_IDREF(id, rsc_1, rid);
 814         }
 815     }
 816 
 817     if (pcmk__xe_attr_is_true(set2, "sequential")) {
 818         if (symmetry == ordering_symmetric_inverse) {
 819             // Get the last one
 820             const char *rid = NULL;
 821 
 822             for (xml_rsc = first_named_child(set2, XML_TAG_RESOURCE_REF);
 823                  xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
 824 
 825                 rid = ID(xml_rsc);
 826             }
 827             EXPAND_CONSTRAINT_IDREF(id, rsc_2, rid);
 828 
 829         } else {
 830             // Get the first one
 831             xml_rsc = first_named_child(set2, XML_TAG_RESOURCE_REF);
 832             if (xml_rsc != NULL) {
 833                 EXPAND_CONSTRAINT_IDREF(id, rsc_2, ID(xml_rsc));
 834             }
 835         }
 836     }
 837 
 838     if ((rsc_1 != NULL) && (rsc_2 != NULL)) {
 839         pcmk__order_resource_actions(rsc_1, action_1, rsc_2, action_2, flags);
 840 
 841     } else if (rsc_1 != NULL) {
 842         for (xml_rsc = first_named_child(set2, XML_TAG_RESOURCE_REF);
 843              xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
 844 
 845             EXPAND_CONSTRAINT_IDREF(id, rsc_2, ID(xml_rsc));
 846             pcmk__order_resource_actions(rsc_1, action_1, rsc_2, action_2,
 847                                          flags);
 848         }
 849 
 850     } else if (rsc_2 != NULL) {
 851         for (xml_rsc = first_named_child(set1, XML_TAG_RESOURCE_REF);
 852              xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
 853 
 854             EXPAND_CONSTRAINT_IDREF(id, rsc_1, ID(xml_rsc));
 855             pcmk__order_resource_actions(rsc_1, action_1, rsc_2, action_2,
 856                                          flags);
 857         }
 858 
 859     } else {
 860         for (xml_rsc = first_named_child(set1, XML_TAG_RESOURCE_REF);
 861              xml_rsc != NULL; xml_rsc = crm_next_same_xml(xml_rsc)) {
 862 
 863             EXPAND_CONSTRAINT_IDREF(id, rsc_1, ID(xml_rsc));
 864 
 865             for (xmlNode *xml_rsc_2 = first_named_child(set2, XML_TAG_RESOURCE_REF);
 866                  xml_rsc_2 != NULL; xml_rsc_2 = crm_next_same_xml(xml_rsc_2)) {
 867 
 868                 EXPAND_CONSTRAINT_IDREF(id, rsc_2, ID(xml_rsc_2));
 869                 pcmk__order_resource_actions(rsc_1, action_1, rsc_2,
 870                                              action_2, flags);
 871             }
 872         }
 873     }
 874 
 875     return pcmk_rc_ok;
 876 }
 877 
 878 /*!
 879  * \internal
 880  * \brief If an ordering constraint uses resource tags, expand them
 881  *
 882  * \param[in]  xml_obj       Ordering constraint XML
 883  * \param[out] expanded_xml  Equivalent XML with tags expanded
 884  * \param[in]  data_set      Cluster working set
 885  *
 886  * \return Standard Pacemaker return code (specifically, pcmk_rc_ok on success,
 887  *         and pcmk_rc_unpack_error on invalid configuration)
 888  */
 889 static int
 890 unpack_order_tags(xmlNode *xml_obj, xmlNode **expanded_xml,
     /* [previous][next][first][last][top][bottom][index][help] */
 891                   pe_working_set_t *data_set)
 892 {
 893     const char *id_first = NULL;
 894     const char *id_then = NULL;
 895     const char *action_first = NULL;
 896     const char *action_then = NULL;
 897 
 898     pe_resource_t *rsc_first = NULL;
 899     pe_resource_t *rsc_then = NULL;
 900     pe_tag_t *tag_first = NULL;
 901     pe_tag_t *tag_then = NULL;
 902 
 903     xmlNode *rsc_set_first = NULL;
 904     xmlNode *rsc_set_then = NULL;
 905     bool any_sets = false;
 906 
 907     // Check whether there are any resource sets with template or tag references
 908     *expanded_xml = pcmk__expand_tags_in_sets(xml_obj, data_set);
 909     if (*expanded_xml != NULL) {
 910         crm_log_xml_trace(*expanded_xml, "Expanded rsc_order");
 911         return pcmk_rc_ok;
 912     }
 913 
 914     id_first = crm_element_value(xml_obj, XML_ORDER_ATTR_FIRST);
 915     id_then = crm_element_value(xml_obj, XML_ORDER_ATTR_THEN);
 916     if ((id_first == NULL) || (id_then == NULL)) {
 917         return pcmk_rc_ok;
 918     }
 919 
 920     if (!pcmk__valid_resource_or_tag(data_set, id_first, &rsc_first,
 921                                      &tag_first)) {
 922         pcmk__config_err("Ignoring constraint '%s' because '%s' is not a "
 923                          "valid resource or tag", ID(xml_obj), id_first);
 924         return pcmk_rc_unpack_error;
 925     }
 926 
 927     if (!pcmk__valid_resource_or_tag(data_set, id_then, &rsc_then, &tag_then)) {
 928         pcmk__config_err("Ignoring constraint '%s' because '%s' is not a "
 929                          "valid resource or tag", ID(xml_obj), id_then);
 930         return pcmk_rc_unpack_error;
 931     }
 932 
 933     if ((rsc_first != NULL) && (rsc_then != NULL)) {
 934         // Neither side references a template or tag
 935         return pcmk_rc_ok;
 936     }
 937 
 938     action_first = crm_element_value(xml_obj, XML_ORDER_ATTR_FIRST_ACTION);
 939     action_then = crm_element_value(xml_obj, XML_ORDER_ATTR_THEN_ACTION);
 940 
 941     *expanded_xml = copy_xml(xml_obj);
 942 
 943     // Convert template/tag reference in "first" into resource_set under constraint
 944     if (!pcmk__tag_to_set(*expanded_xml, &rsc_set_first, XML_ORDER_ATTR_FIRST,
 945                           true, data_set)) {
 946         free_xml(*expanded_xml);
 947         *expanded_xml = NULL;
 948         return pcmk_rc_unpack_error;
 949     }
 950 
 951     if (rsc_set_first != NULL) {
 952         if (action_first != NULL) {
 953             // Move "first-action" into converted resource_set as "action"
 954             crm_xml_add(rsc_set_first, "action", action_first);
 955             xml_remove_prop(*expanded_xml, XML_ORDER_ATTR_FIRST_ACTION);
 956         }
 957         any_sets = true;
 958     }
 959 
 960     // Convert template/tag reference in "then" into resource_set under constraint
 961     if (!pcmk__tag_to_set(*expanded_xml, &rsc_set_then, XML_ORDER_ATTR_THEN,
 962                           true, data_set)) {
 963         free_xml(*expanded_xml);
 964         *expanded_xml = NULL;
 965         return pcmk_rc_unpack_error;
 966     }
 967 
 968     if (rsc_set_then != NULL) {
 969         if (action_then != NULL) {
 970             // Move "then-action" into converted resource_set as "action"
 971             crm_xml_add(rsc_set_then, "action", action_then);
 972             xml_remove_prop(*expanded_xml, XML_ORDER_ATTR_THEN_ACTION);
 973         }
 974         any_sets = true;
 975     }
 976 
 977     if (any_sets) {
 978         crm_log_xml_trace(*expanded_xml, "Expanded rsc_order");
 979     } else {
 980         free_xml(*expanded_xml);
 981         *expanded_xml = NULL;
 982     }
 983 
 984     return pcmk_rc_ok;
 985 }
 986 
 987 /*!
 988  * \internal
 989  * \brief Unpack ordering constraint XML
 990  *
 991  * \param[in]     xml_obj   Ordering constraint XML to unpack
 992  * \param[in,out] data_set  Cluster working set
 993  */
 994 void
 995 pcmk__unpack_ordering(xmlNode *xml_obj, pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
 996 {
 997     xmlNode *set = NULL;
 998     xmlNode *last = NULL;
 999 
1000     xmlNode *orig_xml = NULL;
1001     xmlNode *expanded_xml = NULL;
1002 
1003     const char *id = crm_element_value(xml_obj, XML_ATTR_ID);
1004     const char *invert = crm_element_value(xml_obj, XML_CONS_ATTR_SYMMETRICAL);
1005     enum pe_order_kind kind = get_ordering_type(xml_obj);
1006 
1007     enum ordering_symmetry symmetry = get_ordering_symmetry(xml_obj, kind,
1008                                                             NULL);
1009 
1010     // Expand any resource tags in the constraint XML
1011     if (unpack_order_tags(xml_obj, &expanded_xml, data_set) != pcmk_rc_ok) {
1012         return;
1013     }
1014     if (expanded_xml != NULL) {
1015         orig_xml = xml_obj;
1016         xml_obj = expanded_xml;
1017     }
1018 
1019     // If the constraint has resource sets, unpack them
1020     for (set = first_named_child(xml_obj, XML_CONS_TAG_RSC_SET);
1021          set != NULL; set = crm_next_same_xml(set)) {
1022 
1023         set = expand_idref(set, data_set->input);
1024         if ((set == NULL) // Configuration error, message already logged
1025             || (unpack_order_set(set, kind, invert, data_set) != pcmk_rc_ok)) {
1026 
1027             if (expanded_xml != NULL) {
1028                 free_xml(expanded_xml);
1029             }
1030             return;
1031         }
1032 
1033         if (last != NULL) {
1034 
1035             if (order_rsc_sets(id, last, set, kind, data_set,
1036                                symmetry) != pcmk_rc_ok) {
1037                 if (expanded_xml != NULL) {
1038                     free_xml(expanded_xml);
1039                 }
1040                 return;
1041             }
1042 
1043             if ((symmetry == ordering_symmetric)
1044                 && (order_rsc_sets(id, set, last, kind, data_set,
1045                                    ordering_symmetric_inverse) != pcmk_rc_ok)) {
1046                 if (expanded_xml != NULL) {
1047                     free_xml(expanded_xml);
1048                 }
1049                 return;
1050             }
1051 
1052         }
1053         last = set;
1054     }
1055 
1056     if (expanded_xml) {
1057         free_xml(expanded_xml);
1058         xml_obj = orig_xml;
1059     }
1060 
1061     // If the constraint has no resource sets, unpack it as a simple ordering
1062     if (last == NULL) {
1063         return unpack_simple_rsc_order(xml_obj, data_set);
1064     }
1065 }
1066 
1067 static bool
1068 ordering_is_invalid(pe_action_t *action, pe_action_wrapper_t *input)
     /* [previous][next][first][last][top][bottom][index][help] */
1069 {
1070     /* Prevent user-defined ordering constraints between resources
1071      * running in a guest node and the resource that defines that node.
1072      */
1073     if (!pcmk_is_set(input->type, pe_order_preserve)
1074         && (input->action->rsc != NULL)
1075         && pcmk__rsc_corresponds_to_guest(action->rsc, input->action->node)) {
1076 
1077         crm_warn("Invalid ordering constraint between %s and %s",
1078                  input->action->rsc->id, action->rsc->id);
1079         return true;
1080     }
1081 
1082     /* If there's an order like
1083      * "rscB_stop node2"-> "load_stopped_node2" -> "rscA_migrate_to node1"
1084      *
1085      * then rscA is being migrated from node1 to node2, while rscB is being
1086      * migrated from node2 to node1. If there would be a graph loop,
1087      * break the order "load_stopped_node2" -> "rscA_migrate_to node1".
1088      */
1089     if ((input->type == pe_order_load) && action->rsc
1090         && pcmk__str_eq(action->task, RSC_MIGRATE, pcmk__str_casei)
1091         && pcmk__graph_has_loop(action, action, input)) {
1092         return true;
1093     }
1094 
1095     return false;
1096 }
1097 
1098 void
1099 pcmk__disable_invalid_orderings(pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1100 {
1101     for (GList *iter = data_set->actions; iter != NULL; iter = iter->next) {
1102         pe_action_t *action = (pe_action_t *) iter->data;
1103         pe_action_wrapper_t *input = NULL;
1104 
1105         for (GList *input_iter = action->actions_before;
1106              input_iter != NULL; input_iter = input_iter->next) {
1107 
1108             input = (pe_action_wrapper_t *) input_iter->data;
1109             if (ordering_is_invalid(action, input)) {
1110                 input->type = pe_order_none;
1111             }
1112         }
1113     }
1114 }
1115 
1116 /*!
1117  * \internal
1118  * \brief Order stops on a node before the node's shutdown
1119  *
1120  * \param[in] node         Node being shut down
1121  * \param[in] shutdown_op  Shutdown action for node
1122  */
1123 void
1124 pcmk__order_stops_before_shutdown(pe_node_t *node, pe_action_t *shutdown_op)
     /* [previous][next][first][last][top][bottom][index][help] */
1125 {
1126     for (GList *iter = node->details->data_set->actions;
1127          iter != NULL; iter = iter->next) {
1128 
1129         pe_action_t *action = (pe_action_t *) iter->data;
1130 
1131         // Only stops on the node shutting down are relevant
1132         if ((action->rsc == NULL) || (action->node == NULL)
1133             || (action->node->details != node->details)
1134             || !pcmk__str_eq(action->task, RSC_STOP, pcmk__str_casei)) {
1135             continue;
1136         }
1137 
1138         // Resources and nodes in maintenance mode won't be touched
1139 
1140         if (pcmk_is_set(action->rsc->flags, pe_rsc_maintenance)) {
1141             pe_rsc_trace(action->rsc,
1142                          "Not ordering %s before shutdown of %s because "
1143                          "resource in maintenance mode",
1144                          action->uuid, pe__node_name(node));
1145             continue;
1146 
1147         } else if (node->details->maintenance) {
1148             pe_rsc_trace(action->rsc,
1149                          "Not ordering %s before shutdown of %s because "
1150                          "node in maintenance mode",
1151                          action->uuid, pe__node_name(node));
1152             continue;
1153         }
1154 
1155         /* Don't touch a resource that is unmanaged or blocked, to avoid
1156          * blocking the shutdown (though if another action depends on this one,
1157          * we may still end up blocking)
1158          */
1159         if (!pcmk_any_flags_set(action->rsc->flags,
1160                                 pe_rsc_managed|pe_rsc_block)) {
1161             pe_rsc_trace(action->rsc,
1162                          "Not ordering %s before shutdown of %s because "
1163                          "resource is unmanaged or blocked",
1164                          action->uuid, pe__node_name(node));
1165             continue;
1166         }
1167 
1168         pe_rsc_trace(action->rsc, "Ordering %s before shutdown of %s",
1169                      action->uuid, pe__node_name(node));
1170         pe__clear_action_flags(action, pe_action_optional);
1171         pcmk__new_ordering(action->rsc, NULL, action, NULL,
1172                            strdup(CRM_OP_SHUTDOWN), shutdown_op,
1173                            pe_order_optional|pe_order_runnable_left,
1174                            node->details->data_set);
1175     }
1176 }
1177 
1178 /*!
1179  * \brief Find resource actions matching directly or as child
1180  *
1181  * \param[in] rsc           Resource to check
1182  * \param[in] original_key  Action key to search for (possibly referencing
1183  *                          parent of \rsc)
1184  *
1185  * \return Newly allocated list of matching actions
1186  * \note It is the caller's responsibility to free the result with g_list_free()
1187  */
1188 static GList *
1189 find_actions_by_task(const pe_resource_t *rsc, const char *original_key)
     /* [previous][next][first][last][top][bottom][index][help] */
1190 {
1191     // Search under given task key directly
1192     GList *list = find_actions(rsc->actions, original_key, NULL);
1193 
1194     if (list == NULL) {
1195         // Search again using this resource's ID
1196         char *key = NULL;
1197         char *task = NULL;
1198         guint interval_ms = 0;
1199 
1200         if (parse_op_key(original_key, NULL, &task, &interval_ms)) {
1201             key = pcmk__op_key(rsc->id, task, interval_ms);
1202             list = find_actions(rsc->actions, key, NULL);
1203             free(key);
1204             free(task);
1205         } else {
1206             crm_err("Invalid operation key (bug?): %s", original_key);
1207         }
1208     }
1209     return list;
1210 }
1211 
1212 /*!
1213  * \internal
1214  * \brief Order relevant resource actions after a given action
1215  *
1216  * \param[in,out] first_action  Action to order after (or NULL if none runnable)
1217  * \param[in]     rsc           Resource whose actions should be ordered
1218  * \param[in,out] order         Ordering constraint being applied
1219  */
1220 static void
1221 order_resource_actions_after(pe_action_t *first_action,
     /* [previous][next][first][last][top][bottom][index][help] */
1222                              const pe_resource_t *rsc, pe__ordering_t *order)
1223 {
1224     GList *then_actions = NULL;
1225     uint32_t flags = pe_order_none;
1226 
1227     CRM_CHECK((rsc != NULL) && (order != NULL), return);
1228 
1229     flags = order->flags;
1230     pe_rsc_trace(rsc, "Applying ordering %d for 'then' resource %s",
1231                  order->id, rsc->id);
1232 
1233     if (order->rh_action != NULL) {
1234         then_actions = g_list_prepend(NULL, order->rh_action);
1235 
1236     } else {
1237         then_actions = find_actions_by_task(rsc, order->rh_action_task);
1238     }
1239 
1240     if (then_actions == NULL) {
1241         pe_rsc_trace(rsc, "Ignoring ordering %d: no %s actions found for %s",
1242                      order->id, order->rh_action_task, rsc->id);
1243         return;
1244     }
1245 
1246     if ((first_action != NULL) && (first_action->rsc == rsc)
1247         && pcmk_is_set(first_action->flags, pe_action_dangle)) {
1248 
1249         pe_rsc_trace(rsc,
1250                      "Detected dangling migration ordering (%s then %s %s)",
1251                      first_action->uuid, order->rh_action_task, rsc->id);
1252         pe__clear_order_flags(flags, pe_order_implies_then);
1253     }
1254 
1255     if ((first_action == NULL) && !pcmk_is_set(flags, pe_order_implies_then)) {
1256         pe_rsc_debug(rsc,
1257                      "Ignoring ordering %d for %s: No first action found",
1258                      order->id, rsc->id);
1259         g_list_free(then_actions);
1260         return;
1261     }
1262 
1263     for (GList *iter = then_actions; iter != NULL; iter = iter->next) {
1264         pe_action_t *then_action_iter = (pe_action_t *) iter->data;
1265 
1266         if (first_action != NULL) {
1267             order_actions(first_action, then_action_iter, flags);
1268         } else {
1269             pe__clear_action_flags(then_action_iter, pe_action_runnable);
1270             crm_warn("%s of %s is unrunnable because there is no %s of %s "
1271                      "to order it after", then_action_iter->task, rsc->id,
1272                      order->lh_action_task, order->lh_rsc->id);
1273         }
1274     }
1275 
1276     g_list_free(then_actions);
1277 }
1278 
1279 static void
1280 rsc_order_first(pe_resource_t *first_rsc, pe__ordering_t *order,
     /* [previous][next][first][last][top][bottom][index][help] */
1281                 pe_working_set_t *data_set)
1282 {
1283     GList *first_actions = NULL;
1284     pe_action_t *first_action = order->lh_action;
1285     pe_resource_t *then_rsc = order->rh_rsc;
1286 
1287     CRM_ASSERT(first_rsc != NULL);
1288     pe_rsc_trace(first_rsc, "Applying ordering constraint %d (first: %s)",
1289                  order->id, first_rsc->id);
1290 
1291     if (first_action != NULL) {
1292         first_actions = g_list_prepend(NULL, first_action);
1293 
1294     } else {
1295         first_actions = find_actions_by_task(first_rsc, order->lh_action_task);
1296     }
1297 
1298     if ((first_actions == NULL) && (first_rsc == then_rsc)) {
1299         pe_rsc_trace(first_rsc,
1300                      "Ignoring constraint %d: first (%s for %s) not found",
1301                      order->id, order->lh_action_task, first_rsc->id);
1302 
1303     } else if (first_actions == NULL) {
1304         char *key = NULL;
1305         char *op_type = NULL;
1306         guint interval_ms = 0;
1307 
1308         parse_op_key(order->lh_action_task, NULL, &op_type, &interval_ms);
1309         key = pcmk__op_key(first_rsc->id, op_type, interval_ms);
1310 
1311         if ((first_rsc->fns->state(first_rsc, TRUE) == RSC_ROLE_STOPPED)
1312             && pcmk__str_eq(op_type, RSC_STOP, pcmk__str_casei)) {
1313             free(key);
1314             pe_rsc_trace(first_rsc,
1315                          "Ignoring constraint %d: first (%s for %s) not found",
1316                          order->id, order->lh_action_task, first_rsc->id);
1317 
1318         } else if ((first_rsc->fns->state(first_rsc, TRUE) == RSC_ROLE_UNPROMOTED)
1319                    && pcmk__str_eq(op_type, RSC_DEMOTE, pcmk__str_casei)) {
1320             free(key);
1321             pe_rsc_trace(first_rsc,
1322                          "Ignoring constraint %d: first (%s for %s) not found",
1323                          order->id, order->lh_action_task, first_rsc->id);
1324 
1325         } else {
1326             pe_rsc_trace(first_rsc,
1327                          "Creating first (%s for %s) for constraint %d ",
1328                          order->lh_action_task, first_rsc->id, order->id);
1329             first_action = custom_action(first_rsc, key, op_type, NULL, TRUE,
1330                                          TRUE, data_set);
1331             first_actions = g_list_prepend(NULL, first_action);
1332         }
1333 
1334         free(op_type);
1335     }
1336 
1337     if (then_rsc == NULL) {
1338         if (order->rh_action == NULL) {
1339             pe_rsc_trace(first_rsc, "Ignoring constraint %d: then not found",
1340                          order->id);
1341             return;
1342         }
1343         then_rsc = order->rh_action->rsc;
1344     }
1345     for (GList *gIter = first_actions; gIter != NULL; gIter = gIter->next) {
1346         first_action = (pe_action_t *) gIter->data;
1347 
1348         if (then_rsc == NULL) {
1349             order_actions(first_action, order->rh_action, order->flags);
1350 
1351         } else {
1352             order_resource_actions_after(first_action, then_rsc, order);
1353         }
1354     }
1355 
1356     g_list_free(first_actions);
1357 }
1358 
1359 void
1360 pcmk__apply_orderings(pe_working_set_t *data_set)
     /* [previous][next][first][last][top][bottom][index][help] */
1361 {
1362     crm_trace("Applying ordering constraints");
1363 
1364     /* Ordering constraints need to be processed in the order they were created.
1365      * rsc_order_first() and order_resource_actions_after() require the relevant
1366      * actions to already exist in some cases, but rsc_order_first() will create
1367      * the 'first' action in certain cases. Thus calling rsc_order_first() can
1368      * change the behavior of later-created orderings.
1369      *
1370      * Also, g_list_append() should be avoided for performance reasons, so we
1371      * prepend orderings when creating them and reverse the list here.
1372      *
1373      * @TODO This is brittle and should be carefully redesigned so that the
1374      * order of creation doesn't matter, and the reverse becomes unneeded.
1375      */
1376     data_set->ordering_constraints = g_list_reverse(data_set->ordering_constraints);
1377 
1378     for (GList *gIter = data_set->ordering_constraints;
1379          gIter != NULL; gIter = gIter->next) {
1380 
1381         pe__ordering_t *order = gIter->data;
1382         pe_resource_t *rsc = order->lh_rsc;
1383 
1384         if (rsc != NULL) {
1385             rsc_order_first(rsc, order, data_set);
1386             continue;
1387         }
1388 
1389         rsc = order->rh_rsc;
1390         if (rsc != NULL) {
1391             order_resource_actions_after(order->lh_action, rsc, order);
1392 
1393         } else {
1394             crm_trace("Applying ordering constraint %d (non-resource actions)",
1395                       order->id);
1396             order_actions(order->lh_action, order->rh_action, order->flags);
1397         }
1398     }
1399 
1400     g_list_foreach(data_set->actions, (GFunc) pcmk__block_colocation_dependents,
1401                    data_set);
1402 
1403     crm_trace("Ordering probes");
1404     pcmk__order_probes(data_set);
1405 
1406     crm_trace("Updating %d actions", g_list_length(data_set->actions));
1407     g_list_foreach(data_set->actions,
1408                    (GFunc) pcmk__update_action_for_orderings, data_set);
1409 
1410     pcmk__disable_invalid_orderings(data_set);
1411 }
1412 
1413 /*!
1414  * \internal
1415  * \brief Order a given action after each action in a given list
1416  *
1417  * \param[in] after   "After" action
1418  * \param[in] list    List of "before" actions
1419  */
1420 void
1421 pcmk__order_after_each(pe_action_t *after, GList *list)
     /* [previous][next][first][last][top][bottom][index][help] */
1422 {
1423     const char *after_desc = (after->task == NULL)? after->uuid : after->task;
1424 
1425     for (GList *iter = list; iter != NULL; iter = iter->next) {
1426         pe_action_t *before = (pe_action_t *) iter->data;
1427         const char *before_desc = before->task? before->task : before->uuid;
1428 
1429         crm_debug("Ordering %s on %s before %s on %s",
1430                   before_desc, pe__node_name(before->node),
1431                   after_desc, pe__node_name(after->node));
1432         order_actions(before, after, pe_order_optional);
1433     }
1434 }
1435 
1436 /*!
1437  * \internal
1438  * \brief Order promotions and demotions for restarts of a clone or bundle
1439  *
1440  * \param[in] rsc  Clone or bundle to order
1441  */
1442 void
1443 pcmk__promotable_restart_ordering(pe_resource_t *rsc)
     /* [previous][next][first][last][top][bottom][index][help] */
1444 {
1445     // Order start and promote after all instances are stopped
1446     pcmk__order_resource_actions(rsc, RSC_STOPPED, rsc, RSC_START,
1447                                  pe_order_optional);
1448     pcmk__order_resource_actions(rsc, RSC_STOPPED, rsc, RSC_PROMOTE,
1449                                  pe_order_optional);
1450 
1451     // Order stop, start, and promote after all instances are demoted
1452     pcmk__order_resource_actions(rsc, RSC_DEMOTED, rsc, RSC_STOP,
1453                                  pe_order_optional);
1454     pcmk__order_resource_actions(rsc, RSC_DEMOTED, rsc, RSC_START,
1455                                  pe_order_optional);
1456     pcmk__order_resource_actions(rsc, RSC_DEMOTED, rsc, RSC_PROMOTE,
1457                                  pe_order_optional);
1458 
1459     // Order promote after all instances are started
1460     pcmk__order_resource_actions(rsc, RSC_STARTED, rsc, RSC_PROMOTE,
1461                                  pe_order_optional);
1462 
1463     // Order demote after all instances are demoted
1464     pcmk__order_resource_actions(rsc, RSC_DEMOTE, rsc, RSC_DEMOTED,
1465                                  pe_order_optional);
1466 }

/* [previous][next][first][last][top][bottom][index][help] */