package service import ( "ekho-be/core/model" coreService "ekho-be/core/service/audioLog" "ekho-be/core/service/google" "ekho-be/core/service/openAI" "ekho-be/core/util" "ekho-be/stream/internal/config" "encoding/base64" "encoding/json" "errors" "fmt" "github.com/go-redis/redis/v7" "github.com/google/uuid" "github.com/rs/zerolog/log" "os" "strconv" "strings" "time" ) const ( AudioInputPath = "./audio/inputs/" AudioOutputPath = "./audio/outputs/" TableMetadataKey = "bigquery-metadata-cache-key" ) type IStream interface { Audio(req model.AudioModel) (model.AudioModel, error) } type Stream struct { speechToTextService google.ISpeechToText textToSpeechService google.ITextToSpeech bigQueryService google.IBigQueryService textDavinciService openAI.ITextDavinciService translateService google.ITranslate AudioLogService coreService.IAudioLogs StepService coreService.IStep config config.AppConfig cloudStorage google.ICloudStorage redis *redis.Client } func NewStreamService(speechToTextService google.ISpeechToText, textToSpeechService google.ITextToSpeech, translateService google.ITranslate, bigQueryService google.IBigQueryService, textDavinciService openAI.ITextDavinciService, stepService coreService.IStep, audioService coreService.IAudioLogs, cloudStorage google.ICloudStorage, redis *redis.Client, config config.AppConfig) IStream { return &Stream{ speechToTextService: speechToTextService, textToSpeechService: textToSpeechService, bigQueryService: bigQueryService, textDavinciService: textDavinciService, translateService: translateService, StepService: stepService, AudioLogService: audioService, config: config, cloudStorage: cloudStorage, redis: redis, } } func (s *Stream) Audio(req model.AudioModel) (model.AudioModel, error) { var audioLog model.Log AudioLogID := uuid.NewString() audioLog.CreatedAt = time.Now() cleanBase64 := s.CleanBase64WavPrefix(req.Audio.Data) tempFullPath, fileName, err := util.Base64ToWav(cleanBase64, AudioLogID, util.InputAudioFile) audioLog.LogDetail = append(audioLog.LogDetail, model.LogDetail{ CreatedAt: time.Now(), Input: "-", Output: fileName, OperationName: model.Base64ToWav, Status: s.isSuccess(err), }) defer func() { err := os.RemoveAll(tempFullPath) if err != nil { log.Error().Err(err).Msgf("Delete tmp directory error. Path: %s", tempFullPath) } }() if err != nil { log.Error().Err(err).Msgf("Base64 to wav convert failed. LogId: %v", AudioLogID) audioLog.SetStatus(err) return s.SendAudioResponse(audioLog, AudioErrorPath+string(GeneralFileName), false) } decoded, err := base64.StdEncoding.DecodeString(cleanBase64) if err != nil { log.Error().Err(err).Msg("base64 to byte array convert array") return s.SendAudioResponse(audioLog, AudioErrorPath+string(GeneralFileName), false) } err = s.cloudStorage.SaveWavFile(s.config.Google.Storage.InputBucketName, fileName, decoded) audioLog.LogDetail = append(audioLog.LogDetail, model.LogDetail{ CreatedAt: time.Now(), Input: AudioLogID + string(util.InputAudioFile), Output: func(err error) string { if err != nil { return "" } return s.config.Google.Storage.InputBucketName + "/" + AudioLogID + string(util.InputAudioFile) }(err), OperationName: model.SaveCloudStorage, Status: s.isSuccess(err), }) if err != nil { audioLog.SetStatus(err) log.Error().Err(err).Msgf("error occurred while saving the input file to google cloud storage. logId: %s", AudioLogID) return s.SendAudioResponse(audioLog, AudioErrorPath+string(GeneralFileName), false) } audioLog.InputVoice = AudioLogID + string(util.InputAudioFile) text, err := s.speechToTextService.ConvertSpeech(tempFullPath) audioLog.LogDetail = append(audioLog.LogDetail, model.LogDetail{ CreatedAt: time.Now(), Input: fileName, Output: text, OperationName: model.SpeechToText, Status: s.isSuccess(err), }) if err != nil { audioLog.SetStatus(err) log.Error().Err(err).Msgf("Speech to text failed. AudioLogID: %v", AudioLogID) return s.SendAudioResponse(audioLog, AudioErrorPath+string(GeneralFileName), false) } hasSpecialWord, textTr := s.hasSpecialWords(text) audioLog.LogDetail = append(audioLog.LogDetail, model.LogDetail{ CreatedAt: time.Now(), Input: text, Output: strconv.FormatBool(hasSpecialWord), OperationName: model.WordCheck, Status: s.isSuccess(err), }) text, err = s.translateService.Translate(textTr) if s.config.Google.Translate.IsActive { audioLog.LogDetail = append(audioLog.LogDetail, model.LogDetail{ CreatedAt: time.Now(), Input: textTr, Output: text, OperationName: model.TrToEn, Status: s.isSuccess(err), }) } if err != nil { audioLog.SetStatus(err) return s.SendAudioResponse(audioLog, AudioErrorPath+string(GeneralFileName), false) } var result string if !hasSpecialWord { metadata, err := s.GetMetadata() audioLog.LogDetail = append(audioLog.LogDetail, model.LogDetail{ CreatedAt: time.Now(), Input: s.config.Google.BigQuery.DataSet, Output: metadata.Text, OperationName: model.GetMetaData, Status: s.isSuccess(err), }) if err != nil { audioLog.SetStatus(err) return s.SendAudioResponse(audioLog, AudioErrorPath+string(BigQueryErrorFileName), false) } query, question, err := s.GenerateSqlQuery(text, metadata) audioLog.LogDetail = append(audioLog.LogDetail, model.LogDetail{ CreatedAt: time.Now(), Input: question, Output: query, OperationName: model.GenerateSql, Status: s.isSuccess(err), }) if err != nil { audioLog.SetStatus(err) return s.SendAudioResponse(audioLog, AudioErrorPath+string(BigQueryErrorFileName), false) } validator := s.ProccessQueryString(query) query = validator.Query audioLog.LogDetail = append(audioLog.LogDetail, model.LogDetail{ CreatedAt: time.Now(), Input: query, Output: strconv.FormatBool(validator.IsReadable), OperationName: model.QValidator, Status: validator.IsReadable, }) if !validator.IsReadable { audioLog.SetStatus(errors.New("query not readable")) return s.SendAudioResponse(audioLog, AudioErrorPath+string(BigqueryMultiSelectErrorFileName), false) } validator.CheckAggregate() pureResult, err := s.GetResult(query, validator) audioLog.LogDetail = append(audioLog.LogDetail, model.LogDetail{ CreatedAt: time.Now(), Input: query, Output: pureResult, OperationName: model.BigQueryResult, Status: s.isSuccess(err), }) if err != nil { audioLog.SetStatus(err) return s.SendAudioResponse(audioLog, AudioErrorPath+string(BigQueryErrorFileName), false) } if validator.HaveAggregate { result, err = s.textDavinciService.PureResultToResult(textTr, pureResult) audioLog.LogDetail = append(audioLog.LogDetail, model.LogDetail{ CreatedAt: time.Now(), Input: pureResult, Output: result, OperationName: model.PureResultToResult, Status: s.isSuccess(err), }) if err != nil { audioLog.SetStatus(err) return s.SendAudioResponse(audioLog, AudioErrorPath+string(OpenAIErrorFileName), false) } } else { result = pureResult } } else { result, err = s.UseOpenAI(text) audioLog.LogDetail = append(audioLog.LogDetail, model.LogDetail{ CreatedAt: time.Now(), Input: text, Output: result, OperationName: model.OpenAiResult, Status: s.isSuccess(err), }) if err != nil { audioLog.SetStatus(err) return s.SendAudioResponse(audioLog, AudioErrorPath+string(OpenAIErrorFileName), false) } } responseText, err := s.translateService.ReTranslate(result) if s.config.Google.Translate.IsActive { audioLog.LogDetail = append(audioLog.LogDetail, model.LogDetail{ CreatedAt: time.Now(), Input: result, Output: responseText, OperationName: model.EnToTr, Status: s.isSuccess(err), }) } if err != nil { audioLog.SetStatus(err) return s.SendAudioResponse(audioLog, AudioErrorPath+string(GeneralFileName), false) } audioContent, err := s.textToSpeechService.Convert(responseText) audioLog.LogDetail = append(audioLog.LogDetail, model.LogDetail{ CreatedAt: time.Now(), Input: responseText, Output: strconv.FormatBool(s.isSuccess(err)), OperationName: model.TextToSpeech, Status: s.isSuccess(err), }) if err != nil { audioLog.SetStatus(err) return s.SendAudioResponse(audioLog, AudioErrorPath+string(GeneralFileName), false) } err = s.cloudStorage.SaveWavFile(s.config.Google.Storage.OutputBucketName, AudioLogID+string(util.OutputAudioFile), audioContent) audioLog.LogDetail = append(audioLog.LogDetail, model.LogDetail{ CreatedAt: time.Now(), Input: AudioLogID + string(util.OutputAudioFile), Output: func(err error) string { if err != nil { return "" } return s.config.Google.Storage.OutputBucketName + "/" + AudioLogID + string(util.OutputAudioFile) }(err), OperationName: model.SaveCloudStorage, Status: s.isSuccess(err), }) if err != nil { audioLog.SetStatus(err) log.Error().Err(err).Msgf("error occurred while saving the output file to google cloud storage. logId: %s", AudioLogID) return s.SendAudioResponse(audioLog, AudioErrorPath+string(GeneralFileName), false) } audioLog.OutputVoice = AudioLogID + string(util.OutputAudioFile) outputAudioBase64 := base64.StdEncoding.EncodeToString(audioContent) audioLog.LogDetail = append(audioLog.LogDetail, model.LogDetail{ CreatedAt: time.Now(), Input: s.config.Google.Storage.OutputBucketName + "/" + AudioLogID + string(util.OutputAudioFile), Output: "Base64 string", OperationName: model.AudioToBase64, Status: s.isSuccess(err), }) if err != nil { audioLog.SetStatus(err) return s.SendAudioResponse(audioLog, AudioErrorPath+string(GeneralFileName), false) } return s.SendAudioResponse(audioLog, outputAudioBase64, true) } func (s *Stream) isSuccess(err error) bool { return err == nil } func (s *Stream) SendAudioResponse(audioLog model.Log, data string, isDataBase64 bool) (model.AudioModel, error) { var response model.AudioModel defer func(audioLog *model.Log) { tx := s.AudioLogService.CreateTX() defer tx.Rollback() audioLog.EndDate = time.Now() audioLog.Status = audioLog.Error == "" err := s.AudioLogService.SaveForTx(audioLog, tx) if err != nil { log.Error().Err(err).Msg("save audio log failed") return } tx.Commit() }(&audioLog) if isDataBase64 { response.Audio.Data = util.WavBase64Prefix + data audioLog.LogDetail = append(audioLog.LogDetail, model.LogDetail{ CreatedAt: time.Now(), Input: audioLog.OutputVoice, Output: "Base64 string", OperationName: model.SendResponse, Status: true, }) } else { base64Str, err := util.AudioToBase64(data, util.WavBase64Prefix) audioLog.LogDetail = append(audioLog.LogDetail, model.LogDetail{ CreatedAt: time.Now(), Input: data, Output: "Base64 string", OperationName: model.AudioToBase64, Status: s.isSuccess(err), }) if err != nil { audioLog.SetStatus(err) log.Error().Err(err) return response, err } audioLog.LogDetail = append(audioLog.LogDetail, model.LogDetail{ CreatedAt: time.Now(), Input: data, Output: "Base64 string", OperationName: model.SendResponse, Status: true, }) response.Audio.Data = base64Str } return response, nil } func (s *Stream) hasSpecialWords(text string) (bool, string) { text = strings.ToLower(text) keywords := strings.Split(text, " ") for _, item := range s.config.SpecialWords { item = strings.ToLower(item) if len(keywords) > 0 && keywords[0] == item { text = strings.Replace(text, item, "", 1) return true, text } } return false, text } func (s *Stream) GenerateSqlQuery(question string, metaData google.MetaDataInfo) (string, string, error) { openAiQuestion := fmt.Sprintf("Google bigquery Property'leri ve Tabloları verilmiştir. n tables: %s n dataset : %s. n soru: %s n TIMESTAMP alanlarını Date olarak çevirip, soruya uygun Bigquery SQL cümlesini oluşturur musun ?n", metaData.Text, s.config.Google.BigQuery.DataSet, question) sqlString, err := s.textDavinciService.CreateSql(openAiQuestion) return sqlString, question, err } func (s *Stream) GetResult(query string, queryValidator model.QueryValidator) (string, error) { result, err := s.bigQueryService.GetResult(query, queryValidator, s.config.MaxReadCount) if err != nil { log.Error().Err(err).Msg("Big query select error.") return "", err } if result == "" { return result, errors.New("result not found") } return result, nil } func (s *Stream) UseOpenAI(text string) (string, error) { text, err := s.textDavinciService.AskQuestion(text) if err != nil { log.Error().Err(err).Msg("text davinci failed") return "", nil } return text, err } func (s *Stream) ProccessQueryString(query string) model.QueryValidator { var queryValidator model.QueryValidator queryValidator.Query = query if !queryValidator.IsSelectQuery(query) { return queryValidator } queryValidator.GetFieldByQuery(query) return queryValidator } func (s *Stream) QueryTimesConverter(query string) string { query = strings.ToLower(query) wheres := strings.Split(strings.ToLower(query), "where") if len(wheres) < 2 { return query } wordToFind := "createdat" words := strings.Split(wheres[1], " ") for _, word := range words { if strings.Contains(word, wordToFind) { query = strings.ReplaceAll(query, word, "date("+word+")") } } return query } func (s *Stream) CleanBase64WavPrefix(base64Str string) string { return strings.ReplaceAll(base64Str, util.WavBase64Prefix, "") } func (s *Stream) GetMetadata() (google.MetaDataInfo, error) { var metadata google.MetaDataInfo var err error cachedVal := s.redis.Get(TableMetadataKey) if cachedVal != nil && cachedVal.Err() == nil && cachedVal.Val() != "" { err = json.Unmarshal([]byte(cachedVal.Val()), &metadata) log.Info().Msgf("Metadata comes from cache. Metadata: %s", metadata) } else { metadata, err = s.bigQueryService.GetTableMetadata() metadataBody, _ := json.Marshal(metadata) s.redis.Set( TableMetadataKey, string(metadataBody), time.Duration(int(time.Millisecond)*s.config.Google.BigQuery.MetadataCacheTimeout)) log.Info().Msgf("Bigquery metadata retrieved successfully. Metadata: %s", metadata) } return metadata, err }