package com.mogic.algorithm.schedule.framework;

import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import com.mogic.algorithm.recommend.common.base.enums.ResultStatusEnum;
import com.mogic.algorithm.schedule.framework.AsyncOperator;
import com.mogic.algorithm.util.ContextPath;
import com.mogic.algorithm.util.ContextReader;
import com.mogic.algorithm.util.JsonUtils;
import com.mogic.algorithm.util.message.ProducerWrapper;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mogic/algorithm/schedule/framework/ContextListener.class */
public class ContextListener implements MessageListenerConcurrently {
    private static final Logger log = LoggerFactory.getLogger(ContextListener.class);
    public static final String costField = "__cost__";
    public static final ContextPath path4Cost = ContextPath.compile(String.format("$['context']['%s']", costField)).get();
    private final boolean forgetHistory = false;
    private final ProducerWrapper producer4TaskOutput;
    private final List<AsyncOperator> operators;
    private final List<OutputAdapter> outputAdapters;

    public ContextListener(ProducerWrapper producerWrapper, List<AsyncOperator> list, List<OutputAdapter> list2) {
        this.producer4TaskOutput = producerWrapper;
        this.operators = list;
        this.outputAdapters = list2;
    }

    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        if (CollectionUtils.isEmpty(list)) {
            log.warn("rocketmq:{}, messageList is empty", "ContextListener");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        if (list.size() > 1) {
            log.error("rocketmq:{}, messageList size gt 1, size:{}", "ContextListener", Integer.valueOf(list.size()));
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        MessageExt messageExt = list.get(0);
        String str = new String(messageExt.getBody(), StandardCharsets.UTF_8);
        String keys = messageExt.getKeys();
        log.info("rocketmq:{}, topic:{}, msgId:{} message:{}", new Object[]{"ContextListener", messageExt.getTopic(), messageExt.getMsgId(), str});
        Optional fromJson = JsonUtils.fromJson(str, JsonObject.class);
        ContextReader contextReader = new ContextReader(fromJson.orElse(null), true);
        if (!fromJson.isPresent() || !validateContextMsg(contextReader)) {
            return feedbackBadRequestError(keys, str);
        }
        for (OutputAdapter outputAdapter : this.outputAdapters) {
            if (outputAdapter.checkConditions(contextReader)) {
                return callOutputAdapter(outputAdapter, keys, (JsonObject) fromJson.get(), contextReader);
            }
        }
        for (AsyncOperator asyncOperator : this.operators) {
            if (asyncOperator.checkConditions(contextReader)) {
                return callOperator(asyncOperator, keys, contextReader);
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    private boolean validateContextMsg(ContextReader contextReader) {
        return ObjectUtils.allNotNull(new Object[]{(JsonPrimitive) contextReader.readName("nodeId", JsonPrimitive.class).orElse(null), (JsonPrimitive) contextReader.readName("resultCode", JsonPrimitive.class).orElse(null), (JsonPrimitive) contextReader.readName("message", JsonPrimitive.class).orElse(null), (JsonObject) contextReader.read(path4Cost, JsonObject.class).orElse(null)});
    }

    private ConsumeConcurrentlyStatus feedbackBadRequestError(String str, String str2) {
        log.error("Not an instance of JsonObject or missing nodeId/resultCode/message/context/cost: {}", str2);
        JsonObject jsonObject = new JsonObject();
        jsonObject.addProperty("resultCode", Integer.valueOf(ResultStatusEnum.BAD_REQUEST.getCode()));
        jsonObject.addProperty("message", ResultStatusEnum.BAD_REQUEST.getMsg());
        jsonObject.addProperty("original_msg", str2);
        String jsonObject2 = jsonObject.toString();
        log.info("Feedback message to taskOut queue: {}", jsonObject2);
        return this.producer4TaskOutput.sendMessage(str, jsonObject2);
    }

    private ConsumeConcurrentlyStatus callOperator(AsyncOperator asyncOperator, String str, ContextReader contextReader) {
        int resultCode;
        String message;
        JsonObject jsonObject = (JsonObject) contextReader.readName("context", JsonObject.class).get();
        JsonObject jsonObject2 = (JsonObject) contextReader.read(path4Cost, JsonObject.class).get();
        JsonObject deepCopy = jsonObject.deepCopy();
        log.info("Start to call operator, id={}, context={}", asyncOperator.id(), jsonObject);
        long currentTimeMillis = System.currentTimeMillis();
        Optional<AsyncResponse> invoke = asyncOperator.invoke(str, new ContextReader(jsonObject, true));
        jsonObject2.add(asyncOperator.id(), AsyncOperator.Cost.of(currentTimeMillis, System.currentTimeMillis()).toJson());
        deepCopy.add(costField, jsonObject2);
        if (invoke.isPresent()) {
            invoke.get().getResult().entrySet().forEach(entry -> {
                if (deepCopy.has((String) entry.getKey())) {
                    return;
                }
                deepCopy.add((String) entry.getKey(), (JsonElement) entry.getValue());
            });
            resultCode = invoke.get().getResultCode();
            message = invoke.get().getMessage();
        } else {
            log.error("Failed to call operator, id={}, context={}", asyncOperator.id(), jsonObject);
            resultCode = ResultStatusEnum.FAIL.getCode();
            message = String.format("%s步骤执行失败", asyncOperator.id());
        }
        log.info("Feedback message to context queue, resultCode:{}, message:{}, context:{}", new Object[]{Integer.valueOf(resultCode), message, deepCopy});
        return asyncOperator.contextProducer().send(str, resultCode, message, deepCopy);
    }

    private ConsumeConcurrentlyStatus callOutputAdapter(OutputAdapter outputAdapter, String str, JsonObject jsonObject, ContextReader contextReader) {
        Optional<JsonObject> transform = outputAdapter.transform(contextReader);
        if (!transform.isPresent()) {
            log.error("Failed to transform message to response: {}", jsonObject);
            jsonObject.addProperty("resultCode", Integer.valueOf(ResultStatusEnum.FAIL.getCode()));
            jsonObject.addProperty("message", "内部输出格式转换异常");
            transform = Optional.of(jsonObject);
        }
        String jsonObject2 = transform.get().toString();
        log.info("Feedback message to taskOut queue, msg: {}", jsonObject2);
        return this.producer4TaskOutput.sendMessage(str, jsonObject2);
    }
}
