-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathcody.py
More file actions
207 lines (179 loc) · 6.61 KB
/
cody.py
File metadata and controls
207 lines (179 loc) · 6.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
from dotenv import load_dotenv
from langchain.text_splitter import CharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import tempfile
import json
import time
import threading
import openai
import os
import speech_recognition as sr
from gtts import gTTS
import pygame
import fnmatch
# Load environment variable(s)
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
### USER OPTIONS ###
### MAX TOKENS PER CALL: MAX TOKENS TO USE FOR CALL
MAX_TOKENS_PER_CALL = 2500 # MAX TOKENS TO USE FOR CALL
IGNORE_THESE = ['.venv', '.env', 'static', 'dashboard/static', 'audio', 'license.md', '.github', '__pycache__']
r = sr.Recognizer()
class FileChangeHandler(FileSystemEventHandler):
def __init__(self, ignore_list=[]):
super().__init__()
self._busy_files = {}
self.cooldown = 5.0 # Cooldown in seconds
self.ignore_list = ignore_list # Ignore list
self.data = {}
self.knowledge_base = {}
self.embeddings = OpenAIEmbeddings(openai_api_key=OPENAI_API_KEY)
def should_ignore(self, path):
for pattern in self.ignore_list:
if fnmatch.fnmatch(path, pattern) or any(fnmatch.fnmatch(part, pattern) for part in path.split(os.sep)):
return True
return False
def on_modified(self, event):
if self.should_ignore(event.src_path):
return
print(f'\n🔄 The file {event.src_path} has changed!')
self.update_file_content()
def update_file_content(self):
print("\n\U0001F4C1 Collecting files...")
all_files_data = {}
# Check if ".env" is in ignore list, if not prompt warning "Are you sure you want to include your .env in your api call to OpenAI?"
if ".env" not in self.ignore_list:
response = input("😨 You removed .env from ignore list. This may expose .env variables to OpenAI. Confirm? (1 for Yes, 2 for exit):")
if response != "1":
print("\n😅 Phew. Close one... Operation aborted. Please add '.env' to your ignore list and try again.")
exit()
for root, dirs, files in os.walk('.'):
# Remove directories in the ignore list
dirs[:] = [d for d in dirs if d not in self.ignore_list]
for filename in files:
if filename not in self.ignore_list:
file_path = os.path.join(root, filename)
try:
with open(file_path, 'r') as file:
if filename.endswith('.json'):
json_data = json.load(file)
all_files_data[file_path] = json_data # Store JSON data in the dictionary
else:
lines = file.readlines()
line_data = {}
for i, line in enumerate(lines):
line_data[f"line {i + 1}"] = line.strip()
all_files_data[file_path] = line_data
except Exception as e:
continue
#print(f'\U000026A0 Error reading file {file_path}: {str(e)}')
# Create the final dictionary with the desired format
final_data = {"files": all_files_data}
combined_text = json.dumps(final_data)
# Split combined text into chunks
text_splitter = CharacterTextSplitter(
separator=",",
chunk_size=1000,
chunk_overlap=200,
length_function=len,
)
chunks = text_splitter.split_text(combined_text)
# print(combined_text)
# Create or update the knowledge base
self.knowledge_base = FAISS.from_texts(chunks, self.embeddings)
print("\U00002705 All set!")
audio_stream = create_audio("Files updated. Ready for questions")
play_audio(audio_stream)
def play_audio(file_path):
"""
Play audio from a file
"""
pygame.mixer.init()
pygame.mixer.music.load(file_path)
pygame.mixer.music.play()
while pygame.mixer.music.get_busy():
continue
pygame.mixer.music.unload()
os.unlink(file_path) # Delete the temporary file
print("Deleted temp audio file in: " + file_path)
def create_audio(text):
"""
Create an audio file from text and return the path to a temporary file
"""
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3")
print(f"\nCreated temp audio file in : {temp_file.name}")
try:
speech = gTTS(text=text, lang='en', slow=False)
speech.save(temp_file.name)
except Exception as e:
print(f"\nError in creating audio: {e}")
return temp_file.name
def generate_response(prompt, speak_response=True):
openai.api_key = OPENAI_API_KEY
try:
completion = openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
max_tokens=MAX_TOKENS_PER_CALL,
)
print("\n\U0001F4B0 Tokens used:", completion.usage.total_tokens)
response_text = completion.choices[0].message.content
print('\U0001F916', response_text)
if speak_response:
audio_stream = create_audio(response_text)
play_audio("audio/response.mp3")
except Exception as e:
print(f"\U000026A0 Error in generating response: {e}")
def monitor_input(handler, terminal_input=True):
while True:
try:
if terminal_input:
text = input("\U00002753 Please type your question (or 'exit' to quit): ")
else:
with sr.Microphone() as source:
print("\nListening...")
audio_data = r.listen(source)
text = r.recognize_google(audio_data)
if text.lower() == 'exit':
print("\n\U0001F44B Exiting the program...")
os._exit(0)
else:
print(f"You said: {text}")
question = text
print("\n\U0001F9E0 You asked: " + question)
docs = handler.knowledge_base.similarity_search(question)
response = f"You are an expert programmer who is aware of this much of the code base:{str(docs)}. \n"
response += "Please answer this: " + question + "..." # Add the rest of your instructions here
generate_response(response, speak_response=not terminal_input)
except sr.UnknownValueError:
print("\nCould not understand audio")
except sr.RequestError as e:
print("\nCould not request results; {0}".format(e))
except Exception as e:
print(f"An error occurred: {e}")
def start_cody(ignore_list=[]):
handler = FileChangeHandler(ignore_list=IGNORE_THESE)
# Collect files before starting the observer
handler.update_file_content() # Directly call the update_file_content method
# Prompt user for interaction method
interaction_method = input("\nHow should I talk to you? Enter 1 for Terminal or 2 for Speech I/O: ")
terminal_input = interaction_method == '1'
# Start a new thread to monitor input
input_thread = threading.Thread(target=monitor_input, args=(handler, terminal_input))
input_thread.start()
# Initialize the observer
observer = Observer()
observer.schedule(handler, path='.', recursive=True)
observer.start()
# Continue to observe for file changes
try:
while True:
time.sleep(5)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == "__main__":
start_cody(ignore_list=IGNORE_THESE)