package com.mogic.algorithm.schedule.framework;

import com.google.gson.JsonObject;
import com.mogic.algorithm.recommend.common.base.enums.ResultStatusEnum;
import com.mogic.algorithm.schedule.framework.AsyncOperator;
import com.mogic.algorithm.util.JsonUtils;
import com.mogic.algorithm.util.message.ProducerWrapper;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import lombok.NonNull;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mogic/algorithm/schedule/framework/TaskListener.class */
public class TaskListener implements MessageListenerConcurrently {
    private static final Logger log = LoggerFactory.getLogger(TaskListener.class);
    private final boolean forgetHistory = false;
    private final InputAdapter inputAdapter;
    private final ProducerWrapper producer4TaskOutput;
    private final ContextProducer contextProducer;

    public TaskListener(@NonNull ContextProducer contextProducer, @NonNull ProducerWrapper producerWrapper, @NonNull InputAdapter inputAdapter) {
        if (contextProducer == null) {
            throw new NullPointerException("contextProducer is marked non-null but is null");
        }
        if (producerWrapper == null) {
            throw new NullPointerException("producer4TaskOutput is marked non-null but is null");
        }
        if (inputAdapter == null) {
            throw new NullPointerException("inputAdapter is marked non-null but is null");
        }
        this.inputAdapter = inputAdapter;
        this.contextProducer = contextProducer;
        this.producer4TaskOutput = producerWrapper;
    }

    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        ConsumeConcurrentlyStatus send;
        if (CollectionUtils.isEmpty(list)) {
            log.warn("rocketmq:{}, messageList is empty", "TaskListener");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        if (list.size() > 1) {
            log.error("rocketmq:{}, messageList size gt 1, size:{}", "TaskListener", Integer.valueOf(list.size()));
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        MessageExt messageExt = list.get(0);
        String keys = messageExt.getKeys();
        String str = new String(messageExt.getBody(), StandardCharsets.UTF_8);
        log.info("rocketmq:{}, topic:{}, msgId:{}, message:{}", new Object[]{"TaskListener", messageExt.getTopic(), messageExt.getMsgId(), str});
        Optional fromJson = JsonUtils.fromJson(str, JsonObject.class);
        if (fromJson.isPresent()) {
            long currentTimeMillis = System.currentTimeMillis();
            Optional<JsonObject> transform = this.inputAdapter.transform((JsonObject) fromJson.get());
            long currentTimeMillis2 = System.currentTimeMillis();
            if (transform.isPresent()) {
                JsonObject jsonObject = new JsonObject();
                jsonObject.add(this.contextProducer.id(), AsyncOperator.Cost.of(currentTimeMillis, currentTimeMillis2).toJson());
                transform.get().add(ContextListener.costField, jsonObject);
                log.info("Send message to context queue, resultCode:{}, message:{}, context:{}", new Object[]{Integer.valueOf(AsyncResponse.okCode), AsyncResponse.okMsg, transform.get()});
                send = this.contextProducer.send(keys, AsyncResponse.okCode, AsyncResponse.okMsg, transform.get());
            } else {
                log.error("Failed to transform taskInput to contextInput: {}", str);
                ((JsonObject) fromJson.get()).addProperty("resultCode", Integer.valueOf(ResultStatusEnum.BAD_REQUEST.getCode()));
                ((JsonObject) fromJson.get()).addProperty("message", "Illegal taskInput message");
                send = this.producer4TaskOutput.sendMessage(keys, ((JsonObject) fromJson.get()).toString());
            }
        } else {
            log.error("Message from taskInput queue should be instance of JsonObject: {}", str);
            JsonObject jsonObject2 = new JsonObject();
            jsonObject2.addProperty("resultCode", Integer.valueOf(ResultStatusEnum.BAD_REQUEST.getCode()));
            jsonObject2.addProperty("message", ResultStatusEnum.BAD_REQUEST.getMsg());
            jsonObject2.addProperty("original_msg", str);
            send = this.producer4TaskOutput.sendMessage(keys, jsonObject2.toString());
        }
        return send;
    }
}
