O blog da AWS

Implementação do algoritmo LightGbm no Amazon SageMaker

Por Maria Gaska, Arquiteta de Soluções Especialista em AI/ML

 

No Amazon SageMaker existem três modalidades de treinamento: algoritmos totalmente gerenciados, frameworks ou algoritmos compatíveis com o modo de script e a possibilidade de trazer seu próprio contêiner.

Este último será o caso quando queremos implementar um modelo LightGBM usando o serviço de treinamento do SageMaker.

Neste artigo vamos trabalhar com o conhecido dataset do Titanic, onde o objetivo é prever as chances de um passageiro sobreviver ou não dadas suas características. Para resolver esse problema, criaremos um pipeline de inferência que permite completar dados ausentes, fazer one-hot encoding de variáveis categóricas e, finalmente, treinar um modelo preditivo usando LightGBM. O código completo deste exemplo pode ser encontrado aqui.

 

Passo 1: Selecione o kernel e instale bibliotecas

Este projeto pode ser desenvolvido usando uma instância de notebook do Amazon SageMaker do tipo ml.m5.xlarge. Ao selecionar o kernel CondaPython3 você encontrará as bibliotecas de aprendizado de máquina mais comuns já instaladas. As versões dos pacotes podem ser consultadas usando o seguinte código.

!pip freeze

Outro ponto importante é garantir que você esteja usando a versão mais recente do SDK do SageMaker (2.x.x.) e para isso você pode precisar fazer uma atualização do SDK.

! pip install --upgrade sagemaker

Usando o SDK do SageMaker, você pode configurar os parâmetros iniciais.

sess = sage.Session()
role = get_execution_role()
prefix = 'lgb-model'

 

Passo 2: Escreva o Dockerfile

Para aproveitar o serviço de treinamento do SageMaker, precisamos construir um contêiner com os métodos necessários para treinar um modelo quando o contêiner é invocado com o parâmetro train e criar um microsserviço quando o contêiner é invocado com o parâmetro serve. O microsserviço implementa uma API com os métodos GET e POST nos caminhos /ping e /invocations, respectivamente.  O treinamento e hospedagem do modelo podem ser feitos em contêineres separados, mas neste caso desenvolveremos toda a funcionalidade em um único contêiner.

 

# Build an image that can do training and inference in SageMaker

# This is a Python 2 image that uses the nginx, gunicorn, flask stack

# for serving inferences in a stable way.


FROM ubuntu:18.04


MAINTAINER Amazon AI <sage-learner@amazon.com>



RUN apt-get -y update && apt-get install -y --no-install-recommends \

         wget \

         python \

         python3.6 \

         nginx \

         ca-certificates \

         libgcc-5-dev \

         build-essential \

         python3-dev \

    && rm -rf /var/lib/apt/lists/*



# Symlink /usr/bin/python to the python version we're building for.

RUN rm /usr/bin/python && ln -s /usr/bin/python3.6 /usr/bin/python



# Here we get all python packages.

# There's substantial overlap between scipy and numpy that we eliminate by linking them together.

# Likewise, pip leaves the install caches populated which uses a significant amount of space.

# These optimizations save a fair amount of space in the image, which reduces start up time.



RUN wget https://bootstrap.pypa.io/3.3/get-pip.py && python3.6 get-pip.py


RUN pip install --upgrade pip && \

 pip3 install lightgbm==3.1.0 pandas==1.0.5 scikit-learn==0.23.1 flask  gunicorn && \

 pip3 install gevent --pre && \

 rm -rf /root/.cache


# Set some environment variables:

# PYTHONUNBUFFERED keeps Python from buffering our standard output stream, which means that logs can be delivered to the user quickly.

# PYTHONDONTWRITEBYTECODE keeps Python from writing the .pyc files which are unnecessary in this case.

# We also update PATH so that the train and serve programs are found when the container is invoked.

ENV PYTHONUNBUFFERED=TRUE

ENV PYTHONDONTWRITEBYTECODE=TRUE

ENV PATH="/opt/program:${PATH}"


# Set up the program in the image

COPY lgb /opt/program


WORKDIR /opt/program

 

Passo 3: Crie o arquivo com a funcionalidade de treinamento

Observe que a pasta lgb será a pasta de trabalho do contêiner. Aqui você deve colocar a funcionalidade necessária para executar o treinamento e inferência.  Um arquivo Python chamado apenas “train” será o ponto de entrada quando o SageMaker invocar a execução do contêiner com o parâmetro train. Neste script você deve ler o(s) arquivo(s) de treinamento (se o volume de dados for grande, é conveniente dividir os dados), o modelo é executado, um score de cross validation é impresso e o modelo serializado (neste caso com a biblioteca pickle) é salvo no local que o SageMaker fornece como parâmetro.

O Amazon SageMaker internamente levará esse modelo serializado, compactará ele em formato “tar.gz” e o colocará em um local do S3 associado ao trabalho de treinamento que o produziu.  Todos os logs impressos a partir deste script serão armazenados no CloudWatch e associados ao trabalho de treinamento. Eles podem ser visualizados no console do Amazon SageMaker.

 

prefix = '/opt/ml/'


input_path = prefix + 'input/data'

output_path = os.path.join(prefix, 'output')

model_path = os.path.join(prefix, 'model')

param_path = os.path.join(prefix, 'input/config/hyperparameters.json')



# This algorithm has a single channel of input data called 'training'.

# Since we run in File mode, the input files are copied to the directory specified here.

channel_name='training'

training_path = os.path.join(input_path, channel_name)


# The function to execute the training.

def train():



    print('Starting the training.')

    try:

        # Read in any hyperparameters that the user passed with the training job

#         with open(param_path, 'r') as tc:

#             trainingParams = json.load(tc)



        # Take the set of files and read them all into a single pandas dataframe

        input_files = [ os.path.join(training_path, file) for file in os.listdir(training_path) if 'csv' in file  ]

        print(input_files)



        if len(input_files) == 0:

            raise ValueError(('There are no files in {}.\n' +

                              'This usually indicates that the channel ({}) was incorrectly specified,\n' +

                              'the data specification in S3 was incorrectly specified or the role specified\n' +

                              'does not have permission to access the data.').format(training_path, channel_name))




        raw_data = [ pd.read_csv(file, error_bad_lines=False) for file in input_files ]


        df_train = pd.concat(raw_data)


        X = df_train.iloc[:,1:]

        y = df_train.iloc[:,0]

        print("csv parsed")




        # Define model

        numeric_features = X.select_dtypes(include=np.number).columns.tolist()


        numeric_transformer = Pipeline(steps=[

            ('imputer', SimpleImputer(strategy='median'))])


        categorical_features = [x for x in X.columns if x not in numeric_features]




        categorical_transformer = Pipeline(steps=[

            ('imputer', SimpleImputer(strategy='most_frequent', fill_value='missing')),

            ('onehot', OneHotEncoder(handle_unknown='ignore'))])



        preprocessor = ColumnTransformer(

            transformers=[

                ('num', numeric_transformer, numeric_features),

                ('cat', categorical_transformer, categorical_features)])




        clf = Pipeline(steps=[('preprocessor', preprocessor),

                              ('classifier', LGBMClassifier(n_jobs=-1))])




        print("model defined")




        oof_pred = cross_val_predict(clf,

                             X,

                             y,

                             cv=5,

                             method="predict_proba")



        print("Cross validation AUC {:.4f}".format(roc_auc_score(y, oof_pred[:,1])))



        clf.fit(X,y)



        # save the model

        filename = os.path.join(model_path, 'lgb_model.pkl')

        pickle.dump(clf, open(filename, 'wb'))

        print('Training complete.')


    except Exception as e:

        # Write out an error file. This will be returned as the failureReason in the DescribeTrainingJob result.

        trc = traceback.format_exc()

        with open(os.path.join(output_path, 'failure'), 'w') as s:

            s.write('Exception during training: ' + str(e) + '\n' + trc)



        # Printing this causes the exception to be in the training job logs, as well.

        print('Exception during training: ' + str(e) + '\n' + trc, file=sys.stderr)


        # A non-zero exit code causes the training job to be marked as Failed.

        sys.exit(255)


if __name__ == '__main__':

    train()



    # A zero-exit code causes the job to be marked a Succeeded.

    sys.exit(0)   

 

Passo 4: Crie a funcionalidade de predict

O arquivo predictor.py especifica então a funcionalidade necessária para gerar os endpoints /ping e /invocations.

prefix = '/opt/ml/'

model_path = os.path.join(prefix, 'model')


# A singleton for holding the model. This simply loads the model and holds it.

# It has a predict function that does a prediction based on the model and the input data.


class ScoringService(object):


    model = None                # Where we keep the model when it's loaded




    @classmethod

    def get_model(cls):

        """Get the model object for this instance, loading it if it's not already loaded."""

        if cls.model == None:

            file_name = 'lgb_model.pkl'

            file_path = os.path.join(model_path,file_name)

            cls.model = pickle.load( open( file_path, "rb" ) )


        return cls.model


    @classmethod

    def predict(cls, input):

        """For the input, do the predictions and return them.

        Args:

            input (a pandas dataframe): The data on which to do the predictions. There will be one prediction per row in the dataframe

        """

        clf = cls.get_model()


        return clf.predict(input)


# The flask app for serving predictions

app = flask.Flask(__name__)


@app.route('/ping', methods=['GET'])

def ping():

    """Determine if the container is working and healthy. In this sample container, we declare it healthy if we can load the model successfully."""

    health = ScoringService.get_model() is not None  # You can insert a health check here


    status = 200 if health else 404


    return flask.Response(response='\n', status=status, mimetype='application/json')



@app.route('/invocations', methods=['POST'])

def transformation():

    """Do an inference on a single batch of data. In this sample server, we take data as CSV, convert it to a pandas data frame for internal use and then convert the predictions back to CSV (which really just means one prediction per line, since there's a single column.

    """

    data = None



    # Convert from CSV to pandas

    if flask.request.content_type == 'text/csv':

        data = flask.request.data.decode('utf-8')

        s = StringIO(data)

        data = pd.read_csv(s)

        print (data.columns)

    else:

        return flask.Response(response='This predictor only supports CSV data', status=415, mimetype='text/plain')


    print('Invoked with {} records'.format(data.shape[0]))


    # Drop first column, since sample notebook uses training data to show case predictions

    # data.drop(data.columns[[0]],axis=1,inplace=True)



    # Do the prediction

    predictions = ScoringService.predict(data)



    # Convert from numpy back to CSV

    out = StringIO()

    pd.DataFrame({'results':predictions}).to_csv(out, header=False, index=False)

    result = out.getvalue()



    return flask.Response(response=result, status=200, mimetype='text/csv')

Passo 5: Crie a imagem Docker localmente e faça upload para o Elastic Container Registry

Uma vez que o Dockerfile estiver completo, vamos compilá-lo localmente e enviá-lo para o ECR associado à nossa conta:

%%sh

# The name of our algorithm

algorithm_name=lgb-model



cd container



chmod +x lgb/train

chmod +x lgb/serve



account=$(aws sts get-caller-identity --query Account --output text)



# Get the region defined in the current configuration (default to us-west-2 if none defined)

region=$(aws configure get region)

region=${region:-us-west-2}

fullname="${account}.dkr.ecr.${region}.amazonaws.com/${algorithm_name}:latest"



# If the repository doesn't exist in ECR, create it.

aws ecr describe-repositories --repository-names "${algorithm_name}" > /dev/null 2>&1



if [ $? -ne 0 ]

then

    aws ecr create-repository --repository-name "${algorithm_name}" > /dev/null

fi



# Get the login command from ECR and execute it directly

$(aws ecr get-login --region ${region} --no-include-email)



# Build the docker image locally with the image name and then push it to ECR with the full name.

docker build  -t ${algorithm_name} .

docker tag ${algorithm_name} ${fullname}



docker push ${fullname}

 

Passo 6: Treinamento e implantação usando o SDK do Amazon SageMaker

Em seguida, usaremos o SDK do Amazon SageMaker para treinar o modelo e implantá-lo em um endpoint capaz de fornecer previsões em tempo real.

 

df = pd.read_csv('titanic.csv',sep='|')

df = df.drop(['PassengerId','Cabin','Ticket','Name'],axis=1)



df_train, df_test = train_test_split(df, test_size=0.2)



df_train.to_csv('data/train.csv',index=False)

df_test.to_csv('data/test.csv',index=False)


sess.upload_data('data/train.csv', key_prefix=prefix + '/training')


data_location = f's3://{sess.default_bucket()}/{prefix}/training'

s3_input = {'training': data_location}



account = sess.boto_session.client('sts').get_caller_identity()['Account']

region = sess.boto_session.region_name

image = '{}.dkr.ecr.{}.amazonaws.com/lgb-model:latest'.format(account, region)




lgb = sage.estimator.Estimator(image,

                       role, 1, 'ml.c4.2xlarge',

                       output_path="s3://{}/output".format(sess.default_bucket()),

                       sagemaker_session=sess)



lgb.fit(s3_input)



from sagemaker.predictor import csv_serializer



predictor = lgb.deploy(1, 'ml.m4.xlarge', serializer=csv_serializer)

 

Passo 7: Avaliação do modelo

Uma vez que a criação do endpoint for concluída, podemos usar o endpoint para fazer inferências. Para isso, vamos executar o processo de inferência sobre o dataset de holdout que criamos no início.

Os resultados devem ser semelhantes à média obtida na avaliação com cross-validation.

 

test_data = pd.read_csv("data/test.csv")

test_data.iloc[:,1:].to_csv('data/x_test.csv',index=False)


client = boto3.client('sagemaker-runtime')

endpoint_name = predictor.endpoint_name                              

content_type = "text/csv"                                       


response = client.invoke_endpoint(

    EndpointName=endpoint_name,

    ContentType=content_type,

    Body=open('data/x_test.csv', 'rb')

    )


prob_scores = [eval(pred)[1] for pred in list(preds)]

roc_auc_score(test_data.iloc[:,0],prob_scores)

 

Passo 8: Limpeza dos recursos criados

É importante excluir o endpoint quando ele não será mais usado porque a infraestrutura que o suporta tem um custo enquanto estiver disponível.

predictor.delete_endpoint()

Conclusão

O Amazon SageMaker permite combinar a funcionalidade de um serviço de treinamento e inferência gerenciado, oferecendo flexibilidade suficiente para usar qualquer linguagem de programação, framework ou biblioteca, desde que as especificações necessárias sejam respeitadas ao desenvolver seu próprio contêiner.

 

Esse artigo foi traduzido do Blog da AWS em Espanhol.

 


Sobre a autora

Maria Gaska é Arquiteta de Soluções na AWS há quase dois anos. Em seu papel, ele ajuda os clientes a determinar a melhor arquitetura para seus vários aplicativos e encontrar os melhores algoritmos para resolver problemas de Machine Learning e IA. Antes da AWS, ela trabalhou como desenvolvedora de modelos de aprendizado profundo em uma startup focada em NLP e chatbots e como professora em tempo integral em uma escola de programação em um curso de ciência de dados.

 

 

 

Sobre o revisor

Marcelo Cunha é um Arquiteto de Soluções da AWS especialista em AI/ML. Em sua função, ele ajuda os clientes a criar soluções de ML para resolver os seus desafios de negócio utilizando a plataforma AWS. De uma família de médicos e profissionais da saúde, ele também tem interesse em aplicações de ML nesse setor.