This is my code and I need help connecting to IG Servers and print ticks
import requests
import time
from lightstreamer.client import LightstreamerClient, Subscription, SubscriptionListener, ClientListener
=== IG Account Info ===
IG_API_URL = "https://demo-api.ig.com/gateway/deal"
API_KEY = ""
USERNAME = ""
PASSWORD = ""
EPIC = "IX.D.FTSE.IFM.IP" # You can change this to any supported epic
=== IG Auth ===
def ig_auth():
print("π Authenticating with IG...")
headers = {
"X-IG-API-KEY": API_KEY,
"Content-Type": "application/json",
"Accept": "application/json",
"Version": "2"
}
data = {
"identifier": USERNAME,
"password": PASSWORD
}
response = requests.post(f"{IG_API_URL}/session", headers=headers, json=data)
if response.status_code != 200:
print("β Auth failed:", response.text)
return None
session_data = response.json()
print("β
IG Login successful.")
return {
"CST": response.headers.get("CST"),
"XST": response.headers.get("X-SECURITY-TOKEN"),
"accountId": session_data.get("currentAccountId"),
"clientId": session_data.get("clientId"),
"lsEndpoint": session_data.get("lightstreamerEndpoint")
}
=== Stream Status Listener ===
class StreamStatusListener(ClientListener):
def onStatusChange(self, status):
print(f"π Lightstreamer status: {status}")
=== Price Update Listener ===
class PriceUpdateListener(SubscriptionListener):
def onSubscription(self):
print("β
Subscription confirmed by Lightstreamer")
def onSubscriptionError(self, code, message):
print(f"β Subscription error: Code {code}, Message: {message}")
def onItemUpdate(self, item_update):
print("π₯ Price tick received:")
for field in item_update.schema:
print(f" {field}: {item_update.getValue(field)}")
=== IG Streaming Client ===
class IGStreamClient:
def init(self, auth, epic):
self.auth = auth
self.epic = epic
self.client = None
self.subscription = None
def connect(self):
print("π Connecting to Lightstreamer...")
self.client = LightstreamerClient(self.auth["lsEndpoint"], "LIGHTSTREAMER")
self.client.connectionDetails.setUser(self.auth["clientId"])
self.client.connectionDetails.setPassword(f"{self.auth['CST']}|{self.auth['XST']}")
self.client.addListener(StreamStatusListener())
time.sleep(1)
self.client.connect()
item = f"L1:{self.epic}"
fields = ["BID", "OFFER", "UPDATE_TIME", "MARKET_STATE"]
self.subscription = Subscription(
mode="MERGE",
items=[item],
fields=fields
)
self.subscription.addListener(PriceUpdateListener())
self.client.subscribe(self.subscription)
print(f"π‘ Subscription request sent for: {item}")
def disconnect(self):
if self.client:
try:
if self.subscription:
self.client.unsubscribe(self.subscription)
self.client.disconnect()
print("π΄ Disconnected")
except Exception as e:
print(f"β Error while disconnecting: {e}")
=== Run Script ===
if name == "main":
auth_data = ig_auth()
if not auth_data:
print("β Login failed. Exiting.")
exit()
stream = IGStreamClient(auth_data, EPIC)
stream.connect()
try:
while True:
time.sleep(2)
status = stream.client.getStatus()
print(f"π Current status: {status}")
except KeyboardInterrupt:
stream.disconnect()
print("π Stopped.")