Skip to content

Commit

Permalink
Log: controller: improve message routing messages
Browse files Browse the repository at this point in the history
  • Loading branch information
kgaillot committed Feb 22, 2020
1 parent 6b779b5 commit 66f7ec7
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 81 deletions.
35 changes: 20 additions & 15 deletions daemons/controld/controld_execd.c
Original file line number Diff line number Diff line change
Expand Up @@ -954,14 +954,13 @@ delete_rsc_entry(lrm_state_t * lrm_state, ha_msg_input_t * input, const char *rs
if (rc == pcmk_ok) {
char *rsc_id_copy = strdup(rsc_id);

if (rsc_gIter)
if (rsc_gIter) {
g_hash_table_iter_remove(rsc_gIter);
else
} else {
g_hash_table_remove(lrm_state->resource_history, rsc_id_copy);
crm_debug("sync: Sending delete op for %s", rsc_id_copy);
}
controld_delete_resource_history(rsc_id_copy, lrm_state->node_name,
user_name, crmd_cib_smart_opt());

g_hash_table_foreach_remove(lrm_state->pending_ops, lrm_remove_deleted_op, rsc_id_copy);
free(rsc_id_copy);
}
Expand Down Expand Up @@ -1311,23 +1310,23 @@ delete_resource(lrm_state_t * lrm_state,
lrmd_rsc_info_t * rsc,
GHashTableIter * gIter,
const char *sys,
const char *host,
const char *user,
ha_msg_input_t * request,
gboolean unregister)
{
int rc = pcmk_ok;

crm_info("Removing resource %s for %s (%s) on %s", id, sys, user ? user : "internal", host);
crm_info("Removing resource %s from executor for %s%s%s",
id, sys, (user? " as " : ""), (user? user : ""));

if (rsc && unregister) {
rc = lrm_state_unregister_rsc(lrm_state, id, 0);
}

if (rc == pcmk_ok) {
crm_trace("Resource '%s' deleted", id);
crm_trace("Resource %s deleted from executor", id);
} else if (rc == -EINPROGRESS) {
crm_info("Deletion of resource '%s' pending", id);
crm_info("Deletion of resource '%s' from executor is pending", id);
if (request) {
struct pending_deletion_op_s *op = NULL;
char *ref = crm_element_value_copy(request->msg, XML_ATTR_REFERENCE);
Expand All @@ -1339,8 +1338,9 @@ delete_resource(lrm_state_t * lrm_state,
}
return;
} else {
crm_warn("Deletion of resource '%s' for %s (%s) on %s failed: %d",
id, sys, user ? user : "internal", host, rc);
crm_warn("Could not delete '%s' from executor for %s%s%s: %s "
CRM_XS " rc=%d", id, sys, (user? " as " : ""),
(user? user : ""), pcmk_strerror(rc), rc);
}

delete_rsc_entry(lrm_state, request, id, gIter, rc, user);
Expand Down Expand Up @@ -1406,7 +1406,7 @@ force_reprobe(lrm_state_t *lrm_state, const char *from_sys,
unregister = FALSE;
}

delete_resource(lrm_state, entry->id, &entry->rsc, &gIter, from_sys, from_host,
delete_resource(lrm_state, entry->id, &entry->rsc, &gIter, from_sys,
user_name, NULL, unregister);
}

Expand Down Expand Up @@ -1723,7 +1723,7 @@ do_lrm_delete(ha_msg_input_t *input, lrm_state_t *lrm_state,
unregister = FALSE;
}

delete_resource(lrm_state, rsc->id, rsc, NULL, from_sys, from_host,
delete_resource(lrm_state, rsc->id, rsc, NULL, from_sys,
user_name, input, unregister);
}

Expand Down Expand Up @@ -1760,15 +1760,20 @@ do_lrm_invoke(long long action,

#if ENABLE_ACL
user_name = crm_acl_get_set_user(input->msg, F_CRM_USER, NULL);
crm_trace("Executor command from user '%s'", user_name);
#endif

crm_op = crm_element_value(input->msg, F_CRM_TASK);
from_sys = crm_element_value(input->msg, F_CRM_SYS_FROM);
if (safe_str_neq(from_sys, CRM_SYSTEM_TENGINE)) {
from_host = crm_element_value(input->msg, F_CRM_HOST_FROM);
}
crm_trace("Executor %s command from %s", crm_op, from_sys);
#if ENABLE_ACL
crm_trace("Executor %s command from %s as user %s",
crm_op, from_sys, user_name);
#else
crm_trace("Executor %s command from %s",
crm_op, from_sys);
#endif

if (safe_str_eq(crm_op, CRM_OP_LRM_DELETE)) {
if (safe_str_neq(from_sys, CRM_SYSTEM_TENGINE)) {
Expand Down Expand Up @@ -2073,7 +2078,7 @@ controld_ack_event_directly(const char *to_host, const char *to_sys,
build_operation_update(iter, rsc, op, fsa_our_uname, __FUNCTION__);
reply = create_request(CRM_OP_INVOKE_LRM, update, to_host, to_sys, CRM_SYSTEM_LRMD, NULL);

crm_log_xml_trace(update, "ACK Update");
crm_log_xml_trace(update, "[direct ACK]");

crm_debug("ACK'ing resource op " CRM_OP_FMT " from %s: %s",
op->rsc_id, op->op_type, op->interval_ms, op->user_data,
Expand Down
97 changes: 33 additions & 64 deletions daemons/controld/controld_messages.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ static void handle_response(xmlNode *stored_msg);
static enum crmd_fsa_input handle_request(xmlNode *stored_msg,
enum crmd_fsa_cause cause);
static enum crmd_fsa_input handle_shutdown_request(xmlNode *stored_msg);

#define ROUTER_RESULT(x) crm_trace("Router result: %s", x)
static void send_msg_via_ipc(xmlNode * msg, const char *sys);

/* debug only, can wrap all it likes */
int last_data_id = 0;
Expand Down Expand Up @@ -97,9 +96,10 @@ register_fsa_input_adv(enum crmd_fsa_cause cause, enum crmd_fsa_input input,
}

last_data_id++;
crm_trace("%s %s FSA input %d (%s) (cause=%s) %s data",
raised_from, prepend ? "prepended" : "appended", last_data_id,
fsa_input2string(input), fsa_cause2string(cause), data ? "with" : "without");
crm_trace("%s %s FSA input %d (%s) due to %s, %s data",
raised_from, (prepend? "prepended" : "appended"), last_data_id,
fsa_input2string(input), fsa_cause2string(cause),
(data? "with" : "without"));

fsa_data = calloc(1, sizeof(fsa_data_t));
fsa_data->id = last_data_id;
Expand All @@ -120,10 +120,10 @@ register_fsa_input_adv(enum crmd_fsa_cause cause, enum crmd_fsa_input input,
case C_CRMD_STATUS_CALLBACK:
case C_IPC_MESSAGE:
case C_HA_MESSAGE:
crm_trace("Copying %s data from %s as a HA msg",
fsa_cause2string(cause), raised_from);
CRM_CHECK(((ha_msg_input_t *) data)->msg != NULL,
crm_err("Bogus data from %s", raised_from));
crm_trace("Copying %s data from %s as cluster message data",
fsa_cause2string(cause), raised_from);
fsa_data->data = copy_ha_msg_input(data);
fsa_data->data_type = fsa_dt_ha_msg;
break;
Expand All @@ -139,23 +139,22 @@ register_fsa_input_adv(enum crmd_fsa_cause cause, enum crmd_fsa_input input,
case C_SHUTDOWN:
case C_UNKNOWN:
case C_STARTUP:
crm_err("Copying %s data (from %s)"
" not yet implemented", fsa_cause2string(cause), raised_from);
crm_crit("Copying %s data (from %s) is not yet implemented",
fsa_cause2string(cause), raised_from);
crmd_exit(CRM_EX_SOFTWARE);
break;
}
crm_trace("%s data copied", fsa_cause2string(fsa_data->fsa_cause));
}

/* make sure to free it properly later */
if (prepend) {
crm_trace("Prepending input");
fsa_message_queue = g_list_prepend(fsa_message_queue, fsa_data);
} else {
fsa_message_queue = g_list_append(fsa_message_queue, fsa_data);
}

crm_trace("Queue len: %d", g_list_length(fsa_message_queue));
crm_trace("FSA message queue length is %d",
g_list_length(fsa_message_queue));

/* fsa_dump_queue(LOG_TRACE); */

Expand All @@ -164,7 +163,7 @@ register_fsa_input_adv(enum crmd_fsa_cause cause, enum crmd_fsa_input input,
}

if (fsa_source && input != I_WAIT_FOR_EVENT) {
crm_trace("Triggering FSA: %s", __FUNCTION__);
crm_trace("Triggering FSA");
mainloop_set_trigger(fsa_source);
}
return last_data_id;
Expand All @@ -190,20 +189,11 @@ fsa_dump_queue(int log_level)
ha_msg_input_t *
copy_ha_msg_input(ha_msg_input_t * orig)
{
ha_msg_input_t *copy = NULL;
xmlNodePtr data = NULL;

if (orig != NULL) {
crm_trace("Copy msg");
data = copy_xml(orig->msg);
ha_msg_input_t *copy = calloc(1, sizeof(ha_msg_input_t));

} else {
crm_trace("No message to copy");
}
copy = new_ha_msg_input(data);
if (orig && orig->msg != NULL) {
CRM_CHECK(copy->msg != NULL, crm_err("copy failed"));
}
CRM_ASSERT(copy != NULL);
copy->msg = (orig && orig->msg)? copy_xml(orig->msg) : NULL;
copy->xml = get_message_xml(copy->msg, F_CRM_DATA);
return copy;
}

Expand Down Expand Up @@ -418,39 +408,37 @@ relay_message(xmlNode * msg, gboolean originated_locally)

if (is_for_dc || is_for_dcib || is_for_te) {
if (AM_I_DC && is_for_te) {
ROUTER_RESULT("Message result: Local relay");
crm_trace("Message routing: Process locally as transition request");
send_msg_via_ipc(msg, sys_to);

} else if (AM_I_DC) {
ROUTER_RESULT("Message result: DC/controller process");
crm_trace("Message routing: Process locally as DC request");
processing_complete = FALSE; /* more to be done by caller */
} else if (originated_locally && safe_str_neq(sys_from, CRM_SYSTEM_PENGINE)
&& safe_str_neq(sys_from, CRM_SYSTEM_TENGINE)) {

/* Neither the TE nor the scheduler should be sending messages
* to DCs on other nodes. By definition, if we are no longer the DC,
* then the scheduler's or TE's data should be discarded.
*/

#if SUPPORT_COROSYNC
if (is_corosync_cluster()) {
dest = text2msg_type(sys_to);
}
#endif
ROUTER_RESULT("Message result: External relay to DC");
crm_trace("Message routing: Relay to DC");
send_cluster_message(host_to ? crm_get_peer(0, host_to) : NULL, dest, msg, TRUE);

} else {
/* discard */
ROUTER_RESULT("Message result: Discard, not DC");
/* Neither the TE nor the scheduler should be sending messages
* to DCs on other nodes. By definition, if we are no longer the DC,
* then the scheduler's or TE's data should be discarded.
*/
crm_trace("Message routing: Discard because we are not DC");
}

} else if (is_local && (is_for_crm || is_for_cib)) {
ROUTER_RESULT("Message result: controller process");
crm_trace("Message routing: Process locally as controller request");
processing_complete = FALSE; /* more to be done by caller */

} else if (is_local) {
ROUTER_RESULT("Message result: Local relay");
crm_trace("Message routing: Local relay");
send_msg_via_ipc(msg, sys_to);

} else {
Expand All @@ -472,9 +460,11 @@ relay_message(xmlNode * msg, gboolean originated_locally)
crm_err("Cannot route message to unknown node %s", host_to);
return TRUE;
}
crm_trace("Message routing: Relay to %s",
(node_to->uname? node_to->uname : "peer"));
} else {
crm_trace("Message routing: Relay to all peers");
}

ROUTER_RESULT("Message result: External relay");
send_cluster_message(host_to ? node_to : NULL, dest, msg, TRUE);
}

Expand Down Expand Up @@ -877,7 +867,7 @@ handle_request(xmlNode *stored_msg, enum crmd_fsa_cause cause)
/* Optimize this for the DC - it has the most to do */

if (op == NULL) {
crm_log_xml_err(stored_msg, "Bad message");
crm_log_xml_warn(stored_msg, "[request without " F_CRM_TASK "]");
return I_NULL;
}

Expand Down Expand Up @@ -1133,10 +1123,9 @@ handle_shutdown_request(xmlNode * stored_msg)
/* msg is deleted by the time this returns */
extern gboolean process_te_message(xmlNode * msg, xmlNode * xml_data);

gboolean
static void
send_msg_via_ipc(xmlNode * msg, const char *sys)
{
gboolean send_ok = TRUE;
pcmk__client_t *client_channel = pcmk__find_client_by_id(sys);

if (crm_element_value(msg, F_CRM_HOST_FROM) == NULL) {
Expand All @@ -1145,10 +1134,7 @@ send_msg_via_ipc(xmlNode * msg, const char *sys)

if (client_channel != NULL) {
/* Transient clients such as crmadmin */
if (pcmk__ipc_send_xml(client_channel, 0, msg,
crm_ipc_server_event) != pcmk_rc_ok) {
send_ok = FALSE;
}
pcmk__ipc_send_xml(client_channel, 0, msg, crm_ipc_server_event);

} else if (sys != NULL && strcmp(sys, CRM_SYSTEM_TENGINE) == 0) {
xmlNode *data = get_message_xml(msg, F_CRM_DATA);
Expand All @@ -1170,31 +1156,14 @@ send_msg_via_ipc(xmlNode * msg, const char *sys)
fsa_data.origin = __FUNCTION__;
fsa_data.data_type = fsa_dt_ha_msg;

#ifdef FSA_TRACE
crm_trace("Invoking action A_LRM_INVOKE (%.16llx)", A_LRM_INVOKE);
#endif
do_lrm_invoke(A_LRM_INVOKE, C_IPC_MESSAGE, fsa_state, I_MESSAGE, &fsa_data);

} else if (sys != NULL && crmd_is_proxy_session(sys)) {
crmd_proxy_send(sys, msg);

} else {
crm_debug("Unknown Sub-system (%s)... discarding message.", crm_str(sys));
send_ok = FALSE;
}

return send_ok;
}

ha_msg_input_t *
new_ha_msg_input(xmlNode * orig)
{
ha_msg_input_t *input_copy = NULL;

input_copy = calloc(1, sizeof(ha_msg_input_t));
input_copy->msg = orig;
input_copy->xml = get_message_xml(input_copy->msg, F_CRM_DATA);
return input_copy;
}

void
Expand Down
2 changes: 0 additions & 2 deletions daemons/controld/controld_messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ fsa_data_t *get_message(void);

extern gboolean relay_message(xmlNode * relay_message, gboolean originated_locally);

extern gboolean send_msg_via_ipc(xmlNode * msg, const char *sys);

gboolean crmd_is_proxy_session(const char *session);
void crmd_proxy_send(const char *session, xmlNode *msg);

Expand Down

0 comments on commit 66f7ec7

Please sign in to comment.